Commit e5b1fa19 authored by Manuel Guth's avatar Manuel Guth
Browse files

Merge branch 'alfroch-adding-DIPS-attention' into 'preprocessing-remake'

Adding new tf_tools and new conditional deep sets model

See merge request atlas-flavor-tagging-tools/algorithms/umami!171
parents c7e52d60 eb04b914
File mode changed from 100755 to 100644
......@@ -10,6 +10,7 @@ from tensorflow.keras.models import load_model
from tensorflow.keras.utils import CustomObjectScope
import umami.evaluation_tools as uet
import umami.tf_tools as utf
import umami.train_tools as utt
from umami.preprocessing_tools import Configuration
......@@ -127,7 +128,7 @@ def EvaluateModel(
)
# Load the model for evaluation. Note: The Sum is needed here!
with CustomObjectScope({"Sum": utt.Sum}):
with CustomObjectScope({"Sum": utf.Sum}):
model = load_model(model_file)
# Predict the output of the model on the test jets
......@@ -279,7 +280,7 @@ def EvaluateModelDips(
)
# Load the model for evaluation. Note: The Sum is needed here!
with CustomObjectScope({"Sum": utt.Sum}):
with CustomObjectScope({"Sum": utf.Sum}):
model = load_model(model_file)
# Get predictions from trained model
......
import numpy as np
import tensorflow as tf
from umami.tf_tools import Attention, DenseNet
class test_DenseNet(tf.test.TestCase):
def setUp(self):
"""
Setting up the DenseNet
"""
self.nodes = [3, 3, 3]
self.output_nodes = 3
self.activation = "relu"
self.batch_norm = True
super(test_DenseNet, self).setUp()
self.my_dense = DenseNet(
nodes=self.nodes,
output_nodes=self.output_nodes,
activation=self.activation,
batch_norm=self.batch_norm,
)
def test_get_config(self):
# Get configs from Dense Net
configs = self.my_dense.get_config()
# Test configs
self.assertEqual(self.output_nodes, configs["output_nodes"])
self.assertEqual(self.activation, configs["activation"])
self.assertEqual(self.batch_norm, configs["batch_norm"])
self.assertEqual(self.nodes, configs["nodes"])
def test_call(self):
inputs = np.array([[0, 1, 1], [1, 1, 0]])
expected_output = np.array(
[
[0.46534657, 0.23961703, 0.2950364],
[0.35827565, 0.31311923, 0.32860515],
]
)
# Get net output
out = self.my_dense(inputs=inputs)
# Test output
np.testing.assert_almost_equal(expected_output, out)
class test_Attention(tf.test.TestCase):
def setUp(self):
"""
Setting up the Attention network
"""
self.nodes = [3, 3, 3]
self.activation = "relu"
self.mask_zero = True
self.apply_softmax = True
super(test_Attention, self).setUp()
self.my_attention = Attention(
nodes=self.nodes,
activation=self.activation,
mask_zero=self.mask_zero,
apply_softmax=self.apply_softmax,
)
def test_get_config(self):
# Get configs from Dense Net
configs = self.my_attention.get_config()
# Test configs
self.assertEqual(self.nodes, configs["nodes"])
self.assertEqual(self.activation, configs["activation"])
self.assertEqual(self.mask_zero, configs["mask_zero"])
self.assertEqual(self.apply_softmax, configs["apply_softmax"])
def test_call(self):
inputs = np.array(
[
[[0, 1, 1], [1, 1, 0], [1, 1, 1]],
[[0, 1, 1], [1, 1, 0], [1, 1, 1]],
]
)
expected_output = np.array(
[
[0.3267786, 0.3260552, 0.3471662],
[0.3267786, 0.3260552, 0.3471662],
]
)
# Get net output
out = self.my_attention(inputs=inputs, mask=None)
# Test output
np.testing.assert_almost_equal(expected_output, out)
def test_call_no_softmax(self):
attention = Attention(
nodes=self.nodes,
activation=self.activation,
mask_zero=self.mask_zero,
apply_softmax=False,
)
inputs = np.array(
[
[[0, 1, 1], [1, 1, 0], [1, 1, 1]],
[[0, 1, 1], [1, 1, 0], [1, 1, 1]],
]
)
expected_output = np.array(
[
[0.5015888, 0.4993725, 0.5621095],
[0.5015888, 0.4993725, 0.5621095],
]
)
# Get net output
out = attention(inputs=inputs, mask=None)
# Test output
np.testing.assert_almost_equal(expected_output, out)
def test_call_with_mask(self):
attention = Attention(
nodes=self.nodes,
activation=self.activation,
mask_zero=self.mask_zero,
apply_softmax=self.apply_softmax,
)
inputs = np.array(
[
[[0, 1, 2], [1, 2, 0], [1, 2, 1]],
[[0, 1, 2], [1, 2, 0], [1, 2, 1]],
]
)
expected_output = np.array(
[
[0.3121045, 0.3522645, 0.335631],
[0.3121045, 0.3522645, 0.335631],
]
)
# Get net output
out = attention(inputs=inputs, mask=2)
# Test output
np.testing.assert_almost_equal(expected_output, out)
def test_call_AssertionError(self):
inputs = np.array([[0, 1, 1], [1, 1, 0]])
# Get net output
with self.assertRaises(AssertionError):
_ = self.my_attention(inputs=inputs)
def test_compute_mask(self):
inputs = np.array(
[
[[0, 0, 0], [1, 2, 0], [1, 2, 1]],
[[0, 0, 0], [1, 2, 0], [1, 2, 1]],
]
)
expected_output = np.array(
[[True, False, False], [True, False, False]]
)
mask = self.my_attention.compute_mask(
inputs=inputs,
mask=None,
)
self.assertAllEqual(mask, expected_output)
def test_compute_mask_Errors(self):
inputs = np.array(
[
[[0, 0, 0], [1, 2, 0], [1, 2, 1]],
[[0, 0, 0], [1, 2, 0], [1, 2, 1]],
]
)
attention_mask_zero = Attention(
nodes=self.nodes,
activation=self.activation,
mask_zero=True,
apply_softmax=self.apply_softmax,
)
with self.assertRaises(AssertionError):
_ = attention_mask_zero.compute_mask(
inputs=inputs,
mask=2,
)
attention_no_mask_zero = Attention(
nodes=self.nodes,
activation=self.activation,
mask_zero=False,
apply_softmax=self.apply_softmax,
)
with self.assertRaises(AssertionError):
_ = attention_no_mask_zero.compute_mask(
inputs=inputs,
mask=None,
)
# flake8: noqa
from umami.tf_tools.generators import (
dips_generator,
dl1_generator,
umami_generator,
)
from umami.tf_tools.layers import (
Attention,
AttentionPooling,
ConditionalAttention,
ConditionalDeepSet,
DeepSet,
DenseNet,
MaskedAverage1DPooling,
MaskedSoftmax,
Sum,
)
from umami.tf_tools.models import (
Deepsets_model,
Dips_model,
DL1_model,
Umami_model,
)
from umami.configuration import logger # isort:skip
import h5py
import numpy as np
class Model_Generator(object):
def __init__(
self,
train_file_path: str,
Y_Name: str,
n_jets: int,
batch_size: int,
X_Name: str = None,
X_trk_Name: str = None,
excluded_var: list = None,
):
self.train_file_path = train_file_path
self.X_Name = X_Name
self.X_trk_Name = X_trk_Name
self.Y_Name = Y_Name
self.batch_size = batch_size
self.excluded_var = excluded_var
self.chunk_size = 1e6
self.n_jets = len(self.y) if n_jets is None else int(n_jets)
self.length = int(self.n_jets / self.batch_size)
self.step_size = self.batch_size * int(
self.chunk_size / self.batch_size
)
def load_in_memory(
self, load_jets: bool, load_tracks: bool, part: int = 0
):
"""
Load the jets or tracks or both step by step in memory.
Input:
- load_jets, bool: Define, if jets are loaded or not
- load_tracks, bool: Define, if tracks are loaded or not
- part, int: Part of the data, which is to be loaded.
Output:
- Loads the part of data to memory
"""
# Check that the correct X_Name and X_trk_Name is given
if load_jets is True and self.X_Name is None:
raise ValueError(
"X_Name needs to be given when jet features are to be loaded!"
)
elif load_tracks is True and self.X_trk_Name is None:
raise ValueError(
"X_trk_Name needs to be given when track features are to be loaded!"
)
logger.info(
f"\nloading in memory {part + 1}/{1 + self.n_jets // self.step_size}"
)
# Open train file
with h5py.File(self.train_file_path, "r") as f:
# Load jets if wanted
if load_jets:
self.x_in_mem = f[self.X_Name][
self.step_size * part : self.step_size * (part + 1)
]
# Exclude variables if needed
self.x_in_mem = (
np.delete(self.x_in_mem, self.excluded_var, 1)
if self.excluded_var is not None
else self.x_in_mem
)
# Load tracks if wanted
if load_tracks:
self.x_trk_in_mem = f[self.X_trk_Name][
self.step_size * part : self.step_size * (part + 1)
]
# Load truth labels
self.y_in_mem = f[self.Y_Name][
self.step_size * part : self.step_size * (part + 1)
]
class dips_generator(Model_Generator):
def __call__(self):
self.load_in_memory(part=0, load_jets=False, load_tracks=True)
n = 1
small_step = 0
for idx in range(self.length):
if (idx + 1) * self.batch_size > self.step_size * n:
self.load_in_memory(part=n, load_jets=False, load_tracks=True)
n += 1
small_step = 0
batch_x_trk = self.x_trk_in_mem[
small_step
* self.batch_size : (1 + small_step)
* self.batch_size
]
batch_y = self.y_in_mem[
small_step
* self.batch_size : (1 + small_step)
* self.batch_size
]
small_step += 1
yield (batch_x_trk, batch_y)
class dl1_generator(Model_Generator):
def __call__(self):
self.load_in_memory(part=0, load_jets=True, load_tracks=False)
n = 1
small_step = 0
for idx in range(self.length):
if (idx + 1) * self.batch_size > self.step_size * n:
self.load_in_memory(part=n, load_jets=True, load_tracks=False)
n += 1
small_step = 0
batch_x = self.x_in_mem[
small_step
* self.batch_size : (1 + small_step)
* self.batch_size
]
batch_y = self.y_in_mem[
small_step
* self.batch_size : (1 + small_step)
* self.batch_size
]
small_step += 1
yield (batch_x, batch_y)
class umami_generator(Model_Generator):
def __call__(self):
self.load_in_memory(part=0, load_jets=True, load_tracks=True)
n = 1
small_step = 0
for idx in range(self.length):
if (idx + 1) * self.batch_size > self.step_size * n:
self.load_in_memory(part=n, load_jets=True, load_tracks=False)
n += 1
small_step = 0
batch_x = self.x_in_mem[
small_step
* self.batch_size : (1 + small_step)
* self.batch_size
]
batch_x_trk = self.x_trk_in_mem[
small_step
* self.batch_size : (1 + small_step)
* self.batch_size
]
batch_y = self.y_in_mem[
small_step
* self.batch_size : (1 + small_step)
* self.batch_size
]
small_step += 1
yield {"input_1": batch_x_trk, "input_2": batch_x}, batch_y
"""
Implementations by Johnny Raine
"""
from tensorflow.keras import backend as K
from tensorflow.keras.layers import BatchNormalization, Dense, Layer
class DenseNet(Layer):
"""
Define a DenseNet as a layer to easier access it.
"""
def __init__(
self,
nodes,
output_nodes=1,
activation="relu",
batch_norm=False,
**kwargs,
):
# Define the attributes
self.nodes = nodes
self.output_nodes = output_nodes
self.activation = activation
self.batch_norm = batch_norm
# Define the layer structure
self.layers = []
# Iterate over given layers
for node in nodes[:-1]:
# Append dense layer with activation
self.layers.append(Dense(units=node, activation=activation))
# Apply batch normalisation if wanted
if batch_norm is True:
self.layers.append(BatchNormalization())
# Add final dense layer unit
self.layers.append(Dense(units=nodes[-1], activation=activation))
# Define output activation function based on output node size
output_activation = "sigmoid" if output_nodes == 1 else "softmax"
self.layers.append(
Dense(units=output_nodes, activation=output_activation)
)
# Assert that there are nodes defined
assert len(nodes), "No layers in DenseNet"
super().__init__(**kwargs)
def call(self, inputs):
out = self.layers[0](inputs)
for layer in self.layers[1:]:
out = layer(out)
return out
def get_config(self):
# Get configuration of the network
config = {
"nodes": self.nodes,
"output_nodes": self.output_nodes,
"activation": self.activation,
"batch_norm": self.batch_norm,
}
base_config = super(DenseNet, self).get_config()
# Return a dict with the configurations
return dict(list(base_config.items()) + list(config.items()))
class DeepSet(Layer):
"""
Define a deep set layer for easier usage.
"""
def __init__(
self,
nodes,
activation="relu",
batch_norm=False,
mask_zero=True,
**kwargs,
):
# Define attributes
self.nodes = nodes
self.activation = activation
self.batch_norm = batch_norm
self.mask_zero = mask_zero
self.supports_masking = True
# Define the layer structure
self.layers = []
# Iterate over the given nodes
for node in nodes[:-1]:
# Append Dense node with activation
self.layers.append(Dense(units=node, activation=activation))
# Apply batch normalisation if active
if batch_norm is True:
self.layers.append(BatchNormalization())
# Append final dense layer with activation
self.layers.append(Dense(units=nodes[-1], activation=activation))
# Check that nodes are in layers
assert len(self.layers), "No layers in DeepSet"
super().__init__(**kwargs)
def call(self, inputs, mask=None):
# Assert that the tensor shape is at least rank 3
assert (
len(inputs.shape) == 3
), "DeepSets layer requires tensor of rank 3. Shape of tensor received {}".format(
inputs.shape
)
# Check if mask is None and the standard zero mask is used
if mask is None and self.mask_zero:
# Compute zero mask
mask = self.compute_mask(inputs, mask)
# Define out
out = self.layers[0](inputs)
for layer in self.layers[1:]:
out = layer(out)
# if mask is not None:
# out *= (1-K.cast(mask,dtype="float32"))
return out
def compute_mask(self, inputs, mask=None):
# Check if mask zero is true
if not self.mask_zero:
return None
else:
# Return correct masking
return K.equal(K.sum(inputs ** 2, axis=-1), 0)
def get_config(self):
# Get configuration of the network
config = {
"nodes": self.nodes,
"activation": self.activation,
"batch_norm": self.batch_norm,
"mask_zero": self.mask_zero,
}
base_config = super(DeepSet, self).get_config()
# Return a dict with the configurations
return dict(list(base_config.items()) + list(config.items()))
class MaskedSoftmax(Layer):
def __init__(self, axis=-1, **kwargs):
# Get attributes
self.axis = axis
self.supports_masking = True
super().__init__(**kwargs)
def call(self, inputs, mask=None):
# Check for masking
if mask is None:
# Compute masking for not existing inputs
mask = self.compute_mask(inputs)
# Calculate Softmax
inputs = K.exp(inputs) * (1 - K.cast(mask, dtype="float32"))