Commit a8c461d4 authored by Philipp Gadow's avatar Philipp Gadow
Browse files

Merge branch 'convert_h5_to_tf' into 'master'

Convert h5 to tf

See merge request atlas-flavor-tagging-tools/algorithms/umami!235
parents ab5d7627 3d822d82
......@@ -170,6 +170,9 @@ preparation:
class_labels: [ujets, cjets, bjets]
convert:
chunk_size: 5000
samples:
training_ttbar_bjets:
type: ttbar
......@@ -285,6 +288,13 @@ In the `Preparation`, the size of the batches which are be loaded from the ntupl
Another important part are the `class_labels` which are defined here. You can define here which flavours are used in the preprocessing. The name of the available flavours can be find [here](https://gitlab.cern.ch/atlas-flavor-tagging-tools/algorithms/umami/-/blob/master/umami/configs/global_config.yaml). Add the names of those to the list here to add them to the preprocessing. **PLEASE KEEP THE ORDERING CONSTANT! THIS IS VERY IMPORTANT**. This list must be the same as the one in the train config!
If you want to save the samples as TFRecord files you can specify under `convert` the `chunk_size`, i.e. the number of samples to be loaded and saved per file.
??? info "TF records"
TF records are the Tensorflow's own file format to store datasets. Especially when working with large datasets this format can be useful. In TF records the data is saved as a sequence of binary strings. This has the advatage that reading the data is significatly faster than from a .h5 file. In addition the data can be saved in multiple files instead of one big file containing all data. This way the reading procedure can be parallised which speeds up the whole training.
Besides of this, since TF records are the Tensorflow's own file format, it is optimised for the usage with Tensorflow. For example, the dataset is not stored completely in memory but automatically loaded in batches as soon as needed.
The last part is the exact splitting of the flavours. In `samples`, you define for each of ttbar/zprime and training/validation/testing the flavours you want to use. You need to give a type (ttbar/zprime), a category (flavour or `inclusive`) and the number of jets you want for this specific flavour. Also you need to apply the template cuts we defined already. The `f_output` defines where the output files is saved. `path` defines the folder, `file` defines the name.
In the example above, we specify the paths for `ttbar` and `zprime` ntuples. Since we define them there, we can then use these ntuples in the `samples` section. So if you want to use e.g. Z+jets ntuples for bb-jets, define the corresponding `zjets` entry in the ntuples section before using it in the `samples` section.
......@@ -419,6 +429,12 @@ preprocessing.py --config <path to config file> --apply_scales
preprocessing.py --config <path to config file> --write
```
If you are saving the tracks with the extra flag `--tracks` it might be useful to save your samples as a directory with tf Records. This can be done by using `--to_records`.
```bash
preprocessing.py --config <path to config file> --to_records --tracks
```
## Full example
There are several training and validation/test samples to produce. See the following link for a list of all the necessary ones in a complete configuration file: [`examples/PFlow-Preprocessing.yaml`](https://gitlab.cern.ch/atlas-flavor-tagging-tools/algorithms/umami/-/blob/master/examples/PFlow-Preprocessing.yaml)
......
......@@ -77,6 +77,9 @@ preparation:
class_labels: [ujets, cjets, bjets]
convert:
chunk_size: 5000
samples:
training_ttbar_bjets:
type: ttbar
......
......@@ -31,6 +31,9 @@ var_dict: /nfs/dust/atlas/user/ahnenjan/phd/umami/umami-git/umami/configs/Umami_
exclude: []
# number of files to be loaded in parallel when using TF Records as input files
nfiles: 5
NN_structure:
# Decide, which tagger is used
tagger: "umami"
......
......@@ -147,7 +147,7 @@ test_train_umami:
dependencies:
- test_preprocessing_umami
script:
- pytest --cov=./ --cov-report= ./umami/tests/integration/test_train_umami.py -v -s --junitxml=report.xml
- pytest --cov=./ --cov-report= ./umami/tests/integration/test_train_umami.py -v -s --junitxml=report.xml -k "test_train_umami"
- cp .coverage ./coverage_files/.coverage.test_train_umami
artifacts:
<<: *artifact_template
......@@ -155,6 +155,21 @@ test_train_umami:
- test_umami_model/
- coverage_files/
test_train_umami_tfrecords:
<<: *test_template
stage: integration_test_tagger
needs: ["test_preprocessing_umami"]
dependencies:
- test_preprocessing_umami
script:
- pytest --cov=./ --cov-report= ./umami/tests/integration/test_train_umami.py -v -s --junitxml=report.xml -k "test_tfrecords_train_umami"
- cp .coverage ./coverage_files/.coverage.test_train_umami_tfrecords
artifacts:
<<: *artifact_template
paths:
- test_umami_model/
- coverage_files/
test_plot_input_vars:
<<: *test_template
stage: integration_test_plotting
......
#!/usr/bin/env python
from umami.configuration import logger # isort:skip
import json
import os
import h5py
import tensorflow as tf
......@@ -177,13 +178,50 @@ def Umami(args, train_config, preprocess_config):
variable_config["train_variables"], exclude
)
with h5py.File(train_config.train_file, "r") as f:
nJets, nTrks, nFeatures = f["X_trk_train"].shape
nJets, nDim = f["Y_train"].shape
nJets, njet_features = f["X_train"].shape
if ".h5" in train_config.train_file:
with h5py.File(train_config.train_file, "r") as f:
nJets, nTrks, nFeatures = f["X_trk_train"].shape
nJets, nDim = f["Y_train"].shape
nJets, njet_features = f["X_train"].shape
elif os.path.isdir(train_config.train_file):
train_file_names = os.listdir(train_config.train_file)
for train_file_name in train_file_names:
if not (".tfrecord" in train_file_name) and not (
train_file_name == "metadata.json"
):
raise ValueError(
f"input file {train_config.train_file} is neither a .h5 file nor a directory with TF Record Files. You should check this."
)
if "metadata.json" not in train_file_names:
raise KeyError("No metadata file in directory.")
try:
nfiles = train_config.config["nfiles"]
except KeyError:
logger.warning(
"no number of files to be loaded in parallel defined. Set to 5"
)
nfiles = 5
tfrecord_reader = utf.TFRecordReader(
train_config.train_file, NN_structure["batch_size"], nfiles
)
train_dataset = tfrecord_reader.load_Dataset()
metadata_name = (train_config.train_file + "/metadata.json").replace(
"//", "/"
)
with open(metadata_name, "r") as metadata_file:
metadata = json.load(metadata_file)
nJets = metadata["nJets"]
nTrks = metadata["nTrks"]
nFeatures = metadata["nFeatures"]
njet_features = metadata["njet_features"]
nDim = metadata["nDim"]
else:
raise ValueError(
f"input file {train_config.train_file} is neither a .h5 file nor a directory with TF Record Files. You should check this."
)
if NN_structure["nJets_train"] is not None:
nJets = int(NN_structure["nJets_train"])
if NN_structure["nJets_train"] is not None:
nJets = int(NN_structure["nJets_train"])
logger.info(f"nJets: {nJets}, nTrks: {nTrks}")
logger.info(f"nFeatures: {nFeatures}, njet_features: {njet_features}")
......@@ -194,32 +232,33 @@ def Umami(args, train_config, preprocess_config):
njet_features=njet_features,
)
train_dataset = (
tf.data.Dataset.from_generator(
utf.umami_generator(
train_file_path=train_config.train_file,
X_Name="X_train",
X_trk_Name="X_trk_train",
Y_Name="Y_train",
n_jets=nJets,
batch_size=NN_structure["batch_size"],
excluded_var=excluded_var,
),
output_types=(
{"input_1": tf.float32, "input_2": tf.float32},
tf.float32,
),
output_shapes=(
{
"input_1": tf.TensorShape([None, nTrks, nFeatures]),
"input_2": tf.TensorShape([None, njet_features]),
},
tf.TensorShape([None, nDim]),
),
if ".h5" in train_config.train_file:
train_dataset = (
tf.data.Dataset.from_generator(
utf.umami_generator(
train_file_path=train_config.train_file,
X_Name="X_train",
X_trk_Name="X_trk_train",
Y_Name="Y_train",
n_jets=nJets,
batch_size=NN_structure["batch_size"],
excluded_var=excluded_var,
),
output_types=(
{"input_1": tf.float32, "input_2": tf.float32},
tf.float32,
),
output_shapes=(
{
"input_1": tf.TensorShape([None, nTrks, nFeatures]),
"input_2": tf.TensorShape([None, njet_features]),
},
tf.TensorShape([None, nDim]),
),
)
.repeat()
.prefetch(3)
)
.repeat()
.prefetch(3)
)
# Check if epochs is set via argparser or not
if args.epochs is None:
......
......@@ -35,6 +35,7 @@ def GetParser():
default=int(1e6),
help="Set the chunk size of the generators.",
)
# possible job options for the different preprocessing steps
action = parser.add_mutually_exclusive_group(required=True)
action.add_argument(
......@@ -74,6 +75,13 @@ def GetParser():
training labels to disk""",
)
action.add_argument(
"-r",
"--to_records",
action="store_true",
help="convert h5 file into tf records",
)
parser.add_argument(
"--flavour",
nargs="+",
......@@ -149,6 +157,10 @@ if __name__ == "__main__":
)
Writer.WriteTrainSample()
elif args.to_records:
Converter = upt.h5toTFRecordConverter(config)
Converter.write_tfrecord()
# Give error when nothing is used
else:
raise ValueError(
......
from umami.configuration import logger # isort:skip
import json
import os
import h5py
import tensorflow as tf
import tqdm
class h5toTFRecordConverter:
def __init__(self, config):
self.config = config
self.path_h5 = self.config.GetFileName(
option="resampled_scaled_shuffled"
)
try:
self.chunk_size = config.preparation["convert"]["chunk_size"]
logger.info(f"Save {self.chunk_size} entries in one file")
except KeyError:
logger.warning(
"Chunk size for conversion into tf records not set in config file. Set to 5000"
)
self.chunk_size = 5_000
def load_h5File_Train(self):
"""
load the numbers of entries given by the chunk size for the jets, tracks and labels from train file
"""
with h5py.File(self.path_h5, "r") as hFile:
length_dataset = len(hFile["X_train"])
logger.info(
f"Total length of the dataset is {length_dataset}. Load {self.chunk_size} samples at a time"
)
total_loads = int(length_dataset / self.chunk_size)
if length_dataset % self.chunk_size != 0:
total_loads += 1
logger.info(f"Total number of loading steps is {total_loads}")
for i in tqdm.tqdm(range(total_loads)):
start = i * self.chunk_size
end = (i + 1) * self.chunk_size
X_jets = hFile["X_train"][start:end]
X_trks = hFile["X_trk_train"][start:end]
Y = hFile["Y_train"][start:end]
Weights = hFile["weight"][start:end]
yield X_jets, X_trks, Y, Weights
def save_parameters(self, record_dir):
"""
:param record_dir: directory where metadata should be saved
write metadata into metadata.json and save it with tf record files
"""
with h5py.File(self.path_h5) as h5file:
nJets = len(h5file["X_train"])
njet_feature = len(h5file["X_train"][0])
nTrks = len(h5file["X_trk_train"][0])
nFeatures = len(h5file["X_trk_train"][0][0])
nDim = len(h5file["Y_train"][0])
data = {
"nJets": nJets,
"njet_features": njet_feature,
"nTrks": nTrks,
"nFeatures": nFeatures,
"nDim": nDim,
}
metadata_filename = record_dir + "/metadata.json"
with open(metadata_filename, "w") as metadata:
logger.info(f"Writing metadata to {metadata_filename}")
json.dump(data, metadata)
def write_tfrecord(self):
"""
write inputs and labels of train file into a TFRecord
"""
record_dir = self.path_h5.replace(".h5", "")
os.makedirs(record_dir, exist_ok=True)
tf_filename_start = record_dir.split("/")[-1]
n = 0
for X_jets, X_trks, Y, Weights in self.load_h5File_Train():
n += 1
filename = (
record_dir
+ "/"
+ tf_filename_start
+ "_"
+ str(n).zfill(4)
+ ".tfrecord"
)
with tf.io.TFRecordWriter(filename) as file_writer:
for (x_jets, x_trks, y, weight) in zip(
X_jets, X_trks, Y, Weights
):
record_bytes = tf.train.Example()
record_bytes.features.feature[
"X_jets"
].float_list.value.extend(x_jets.reshape(-1))
record_bytes.features.feature[
"X_trks"
].float_list.value.extend(x_trks.reshape(-1))
record_bytes.features.feature["Y"].int64_list.value.extend(
y
)
record_bytes.features.feature[
"Weights"
].float_list.value.extend(weight.reshape(-1))
file_writer.write(record_bytes.SerializeToString())
logger.info(f"Data written in {filename}")
self.save_parameters(record_dir=record_dir)
# flake8: noqa
from umami.preprocessing_tools.Configuration import Configuration
from umami.preprocessing_tools.Convert_to_Record import h5toTFRecordConverter
from umami.preprocessing_tools.Cuts import GetCategoryCuts, GetSampleCuts
from umami.preprocessing_tools.Merging import (
add_data,
......
......@@ -143,6 +143,29 @@ def runPreprocessing(config, tagger):
if isSuccess is True:
run_write
logger.info(
"Test: shuffling the samples, writing the samples to disk and convert them to tf record files..."
)
if tagger == "umami":
run_record = run(
[
"preprocessing.py",
"-c",
f"{config}",
"--to_records",
]
)
try:
run_record.check_returncode()
except CalledProcessError:
logger.info("Test failed: preprocessing.py --to_records.")
isSuccess = False
if isSuccess is True:
run_record
if isSuccess is True:
tagger_path = f"./preprocessing_{tagger}/"
if not os.path.isdir(tagger_path):
run(["mkdir", tagger_path])
......
......@@ -125,6 +125,10 @@ class TestUmamiTraining(unittest.TestCase):
"./preprocessing_umami/preprocessing/",
"PFlow-hybrid_70-test-resampled_scaled_shuffled.h5",
)
self.train_tfrecords_files = os.path.join(
"./preprocessing_umami/preprocessing/",
"PFlow-hybrid_70-test-resampled_scaled_shuffled",
)
self.test_file_ttbar = os.path.join(
test_dir, "MC16d_hybrid_odd_100_PFlow-no_pTcuts-file_0.h5"
)
......@@ -207,6 +211,31 @@ class TestUmamiTraining(unittest.TestCase):
with open(self.config, "w") as config:
yaml.dump(self.config_file, config, default_flow_style=False)
# copy config file but change the train_file to the directory with the tf record files instead of the h5 file
# and change name of model to save model in new folder
self.tfrecords_config = (
self.config[:].replace(".yaml", "") + "_tfrecords.yaml"
)
with open(self.config, "r") as tfrecords_config:
self.config_tfrecords_file = yaml.load(
tfrecords_config, Loader=yaml_loader
)
self.config_tfrecords_file[
"train_file"
] = f"{self.train_tfrecords_files}"
self.config_tfrecords_file["model_name"] = (
self.data["test_umami"]["model_name"] + "_tfrecords"
)
with open(self.tfrecords_config, "w") as tfrecords_config:
yaml.dump(
self.config_tfrecords_file,
tfrecords_config,
default_flow_style=False,
)
logger.info("Downloading test data...")
for file in self.data["test_umami"]["files"]:
path = os.path.join(
......@@ -220,3 +249,7 @@ class TestUmamiTraining(unittest.TestCase):
def test_train_umami(self):
"""Integration test of train.py for Umami script."""
self.assertTrue(runTrainingUmami(self.config))
def test_tfrecords_train_umami(self):
"""Integration test of train_umami.py script with tf records as input."""
self.assertTrue(runTrainingUmami(self.tfrecords_config))
import json
import os
import tempfile
import unittest
import h5py
import numpy as np
from umami.preprocessing_tools import Configuration, Convert_to_Record
class ConvertTest(unittest.TestCase):
"""
Unit test the functions inside the Scaling class.
"""
def setUp(self):
self.config_file = os.path.join(
os.path.dirname(__file__), "test_preprocess_config.yaml"
)
self.config = Configuration(self.config_file)
# create dummy data
x_train = np.ones(shape=(3, 41))
x_trks_train = np.ones(shape=(3, 40, 5))
y_train = np.ones(shape=(3, 3))
# save dummy data to temporary file
self.tfh5 = tempfile.NamedTemporaryFile(
suffix="-resampled_scaled_shuffled.h5"
)
with h5py.File(self.tfh5, "w") as out_file:
out_file.create_dataset("X_train", data=x_train)
out_file.create_dataset("X_trk_train", data=x_trks_train)
out_file.create_dataset("Y_train", data=y_train)
self.config.outfile_name = self.tfh5.name.replace(
"-resampled_scaled_shuffled.h5", ".h5"
)
def test_save_parameters(self):
cv = Convert_to_Record.h5toTFRecordConverter(self.config)
# create temporary directory where data should be saved
record_dir = tempfile.TemporaryDirectory()
cv.save_parameters(record_dir.name)
parameters = {
"nJets": 3,
"njet_features": 41,
"nTrks": 40,
"nFeatures": 5,
"nDim": 3,
}
metadata_file = os.path.join(record_dir.name, "metadata.json")
with open(metadata_file, "r") as metadata:
parameters_saved = json.load(metadata)
self.assertEqual(parameters, parameters_saved)
......@@ -16,5 +16,6 @@ from umami.tf_tools.layers import (
MaskedSoftmax,
Sum,
)
from umami.tf_tools.load_tfrecord import TFRecordReader
from umami.tf_tools.models import Deepsets_model
from umami.tf_tools.tools import GetLRReducer
import json
import tensorflow as tf
class TFRecordReader:
def __init__(self, path, batch_size, nfiles):
"""
:param path: path where TFRecord is saved
batch_size: size of batches for the training
nfiles: number of tf record files loaded in parallel
"""
self.path = path
self.batch_size = batch_size
self.nfiles = nfiles
def load_Dataset(self):
"""
Load TFRecord and create Dataset for training
"""
data_files = tf.io.gfile.glob(
(self.path + "/*.tfrecord").replace("//", "/")
)
Dataset_shards = tf.data.Dataset.from_tensor_slices([data_files])
Dataset_shards.shuffle(tf.cast(tf.shape(data_files)[0], tf.int64))
tf_Dataset = Dataset_shards.interleave(
tf.data.TFRecordDataset,
num_parallel_calls=tf.data.AUTOTUNE,
cycle_length=self.nfiles,
)
tf_Dataset = (
tf_Dataset.shuffle(self.batch_size * 10)
.batch(self.batch_size)
.map(
self.decode_fn,
num_parallel_calls=tf.data.experimental.AUTOTUNE,
)
.repeat()
.prefetch(3)
)
return tf_Dataset
def decode_fn(self, record_bytes):
"""
:param record_bytes: serialized Dataset
Convert serialized Dataset to dictionary and return inputs and labels
"""
metadata_name = (self.path + "/metadata.json").replace("//", "/")
with open(metadata_name, "r") as metadata_file:
metadata = json.load(metadata_file)
shapes = {
"shape_Xjets": [metadata["njet_features"]],
"shape_Xtrks": [metadata["nTrks"], metadata["nFeatures"]],
"shape_Y": [metadata["nDim"]],
}
parse_ex = tf.io.parse_example(
record_bytes,
{
"X_jets": tf.io.FixedLenFeature(
shape=shapes["shape_Xjets"], dtype=tf.float32
),
"X_trks": tf.io.FixedLenFeature(
shape=shapes["shape_Xtrks"], dtype=tf.float32
),
"Y": tf.io.FixedLenFeature(
shape=shapes["shape_Y"], dtype=tf.int64
),
"Weights": tf.io.FixedLenFeature(shape=[1], dtype=tf.float32),
},
)
# return the jet inputs and labels
return {
"input_1": parse_ex["X_trks"],
"input_2": parse_ex["X_jets"],
}, parse_ex["Y"]
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment