diff --git a/setup.cfg b/setup.cfg index acc5c5e8e7379c49547e972065768111dd471dda..5c3ed95f957824a101b7f3cd41e821c87e8cdbab 100644 --- a/setup.cfg +++ b/setup.cfg @@ -12,7 +12,7 @@ select = B,C,E,F,W,T4 darglint-ignore-regex = * [darglint] -ignore=DAR203 +ignore=DAR203, DAR103 docstring_style=numpy strictness=full log_level=INFO diff --git a/umami/preprocessing_tools/Resampling.py b/umami/preprocessing_tools/Resampling.py index 6fa0402b4ae2266b60039e4499530072673e96d2..8191b1a92f506ff440253dd0e6cbca708c3c6533 100644 --- a/umami/preprocessing_tools/Resampling.py +++ b/umami/preprocessing_tools/Resampling.py @@ -23,13 +23,13 @@ from umami.preprocessing_tools.utils import ResamplingPlots, generate_process_ta def SamplingGenerator( file: str, - indices, + indices: np.ndarray, label: int, label_classes: list, use_tracks: bool = False, tracks_names: list = None, chunk_size: int = 10_000, - seed=42, + seed: int = 42, duplicate: bool = False, ): """ @@ -238,18 +238,34 @@ def CorrectFractions( target_fractions: list, class_names: list = None, verbose: bool = True, -): +) -> np.ndarray: """ Corrects the fractions of N classes Parameters ---------- - N_jets: list actual number of available jets per class - target_fractions: list of the target fraction per class + N_jets : list + List actual number of available jets per class + target_fractions : list + List of the target fraction per class + class_names : list, optional + List with the class names, by default None + verbose : bool, optional + Decide, if more detailed output is logged, by default True + Returns ------- - target_N_jets: list of N_jets to keep per class + nJets_to_keep : np.ndarray + Array of N_jets to keep per class + + Raises + ------ + ValueError + If not all N_jets entries are bigger than 0. + ValueError + If the 'target_fractions' don't add up to one. """ + if not np.all(N_jets): raise ValueError("Each N_jets entry needs to be >0.") assert len(N_jets) == len(target_fractions) @@ -328,11 +344,23 @@ def CorrectFractions( return df["target_N_jets"].astype(int).values -def quick_check_duplicates(X): +def quick_check_duplicates(X: list) -> bool: """ This performs a quick duplicate check in list X. If a duplicate is found, returns directly True. + + Parameters + ---------- + X : list + List with entries. + + Returns + ------- + duplicate_is_there : bool + Return True if one element is double in the list, + False if not. """ + set_X = set() for item in X: if item in set_X: @@ -341,15 +369,21 @@ def quick_check_duplicates(X): return False -def CalculateBinning(bins: list): +def CalculateBinning(bins: list) -> np.ndarray: """ + Calculate and return the bin egdes for the provided + bins. + Parameters ---------- - bins: is either a list containing the np.linspace arguments, - or a list of them + bins : list + Is either a list containing the np.linspace arguments, + or a list of them + Returns ------- - bin edges + bin_edges : np.ndarray + Array with the bin edges """ if any(isinstance(i, list) for i in bins): return np.concatenate([np.linspace(*elem) for elem in bins]) @@ -361,15 +395,18 @@ class Resampling: Base class for all resampling methods in umami. """ - def __init__(self, config) -> None: + def __init__(self, config: object) -> None: """ + Initalise the Resampling class and all needed configs. + Parameters ---------- - config: umami.preprocessing_tools.Configuration object which stores - the config for the preprocessing - Returns - ------- + config : object + umami.preprocessing_tools.Configuration object which stores + the config for the preprocessing """ + + # Get options as attributes of self self.config = config self.options = config.sampling.get("options") self.preparation_samples = config.preparation.get("samples") @@ -384,6 +421,7 @@ class Resampling: ) self.tracks_names = self.config.sampling["options"]["tracks_names"] + # Get path attributes self.outfile_name = self.config.GetFileName(option="resampled") self.outfile_path = self.config.config["parameters"]["sample_path"] self.resampled_path = self.config.config["parameters"]["file_path"] @@ -420,25 +458,38 @@ class Resampling: def _GetBinning(self): """ Retrieves the binning and the corresponding variables which are used - for the resampling. - Parameters - ---------- - Returns - ------- - saves the bins and variables to class variables + for the resampling. Saves the bins and variables to class variables + + Raises + ------ + ValueError + If more than two resampling variables are given. """ + + # Get the sampling variables sampling_variables = self.options.get("sampling_variables") + + # Check that not more than two variables are given if len(sampling_variables) != 2: raise ValueError("Resampling is so far only supporting 2 variables.") + + # Get the variable names as list variables = [list(elem.keys())[0] for elem in sampling_variables] + + # Get the two variables from the list self.var_x = variables[0] self.var_y = variables[1] + + # Calculate the binning of the variables with the provided info about + # the binning logger.info(f"Using {variables[0]} and {variables[1]} for resampling.") self.bins_x = CalculateBinning(sampling_variables[0][self.var_x]["bins"]) self.bins_y = CalculateBinning(sampling_variables[1][self.var_y]["bins"]) + + # Get number of bins self.nbins = np.array([len(self.bins_x), len(self.bins_y)]) - def GetBins(self, x, y): + def GetBins(self, x: np.ndarray, y: np.ndarray): """ Calculates the bin statistics for a 2D histogram. This post might be helpful to understand the flattened bin numbering: @@ -446,17 +497,25 @@ class Resampling: Parameters ---------- - x: numpy array with values from variable x - y: numpy array with values from variable y and same length as x + x : np.ndarray + Array with values from variable x. + y : np.ndarray + Array with values from variable y and same length as x + Returns ------- - binnumber: numpy array with bin number of each jet with same length - as x and y - bins_indices_flat: numpy array with flat bin numbers mapped from 2D - with length nBins - statistic: numpy array with counts per bin, legth nBins + binnumber : np.ndarray + Array with bin number of each jet with same length as x and y + bins_indices_flat : np.ndarray + Array with flat bin numbers mapped from 2D with length nBins + statistic : np.ndarray + Array with counts per bin, length nBins """ + + # Assert same shape of x and y assert len(x) == len(y) + + # Get the statistic and binnumbers for the provided binning statistic, _, _, binnumber = binned_statistic_2d( x=x, y=y, @@ -465,10 +524,15 @@ class Resampling: bins=[self.bins_x, self.bins_y], ) + # Get the flat bin indices for 2d bins_indices_flat_2d = np.indices(self.nbins - 1) + 1 + + # Get the flat bin indices for 1d bins_indices_flat = np.ravel_multi_index( bins_indices_flat_2d, self.nbins + 1 ).flatten() + + # Return the binnumer, the flat bin indicies and the flatten statistic return binnumber, bins_indices_flat, statistic.flatten() def ResamplingGenerator( @@ -515,28 +579,44 @@ class Resampling: numpy.ndarray binarised labels """ + + # Get the tracks name. If not provided, use default tracks_names = tracks_names or ["tracks"] + + # Open the h5 file with h5py.File(file, "r") as f: + + # Set the start and end index for the given chunk start_ind = 0 end_ind = int(start_ind + chunk_size) + # create indices and then shuffle those to be used in the loop below tupled_indices = [] while end_ind <= len(indices) or start_ind == 0: tupled_indices.append((start_ind, end_ind)) start_ind = end_ind end_ind = int(start_ind + chunk_size) + + # Get a random generator with specified seed rng = np.random.default_rng(seed=seed) + + # Mix the chunks tupled_indices = rng.choice( tupled_indices, len(tupled_indices), replace=False ) + # Iterate over the indicies for index_tuple in tupled_indices: loading_indices = indices[index_tuple[0] : index_tuple[1]] + + # One hot encode the labels labels = label_binarize( (np.ones(index_tuple[1] - index_tuple[0]) * label), classes=label_classes, ) + # Yield the jets and labels + # If tracks are used, also yield the tracks if use_tracks: tracks = [ f[tracks_name].fields(variables[tracks_name])[loading_indices] @@ -549,22 +629,26 @@ class Resampling: yield f["jets"].fields(variables["jets"])[loading_indices], labels def WriteFile(self, indices: dict, chunk_size: int = 10_000): - """Takes the indices as input calculated in the GetIndices function and + """ + Takes the indices as input calculated in the GetIndices function and reads them in and writes them to disk. Writes the selected jets from the samples to disk Parameters ---------- indices : dict - dict of indices as returned by the GetIndices function + Dict of indices as returned by the GetIndices function. chunk_size : int, optional - size of loaded chunks, by default 10_000 + Size of loaded chunks, by default 10_000 Raises ------ TypeError - in case concatenated samples have different shape + If concatenated samples have different shape. + TypeError + If the used samples don't have the same content of variables. """ + # reading chunks of each sample in here # adding already here a column with labels sample_lengths = [len(indices[sample]) for sample in indices] @@ -711,17 +795,17 @@ class Resampling: class ResamplingTools(Resampling): """Helper class for resampling.""" - def InitialiseSamples(self): + def InitialiseSamples(self) -> None: """ - Initialising input files. - Parameters - ---------- - - Returns - ------- At this point the arrays of the 2 variables are loaded which are used for the sampling and saved into class variables. + + Raises + ------ + KeyError + If the samples are not correctly specified. """ + self.samples = {} try: samples = self.options["samples"] @@ -796,9 +880,29 @@ class ResamplingTools(Resampling): preparation_sample.get("category"): sample, } - def GetValidClassCategories(self, samples): - """helper function to check sample categories requested in resampling were - also defined in the sample preparation step. Returns sample classes.""" + def GetValidClassCategories(self, samples: dict): + """ + Helper function to check sample categories requested in resampling were + also defined in the sample preparation step. Returns sample classes. + + Parameters + ---------- + samples : dict + Dict wih the samples + + Returns + ------- + check_consistency : dict + Dict with the consistency check results. + + Raises + ------ + KeyError + If the sample requested is not in the preparation block. + RuntimeError + If your specified samples in the sampling block don't have + same samples in each category. + """ # ttbar or zprime are the categories, samples are bjets, cjets, etc. categories = samples.keys() check_consistency = {} @@ -1007,164 +1111,388 @@ class PDFSampling(Resampling): # pylint: disable=too-many-public-methods """ return self.inter_func_dict - def Load_Samples_Generator(self, sample_category, sample_id, chunk_size): - """Generator to load samples""" + def Load_Samples_Generator( + self, + sample_category: str, + sample_id: int, + chunk_size: int, + ): + """ + Generator for the loading of the samples. + + Parameters + ---------- + sample_category : str + Sample category that is loaded. + sample_id : int + Index of the sample. + chunk_size : int + Chunk size of the jets that are loaded and yielded. + + Yields + ------- + sample : str + Name of the sample. "training_ttbar_bjets" for example. + samples : dict + Dict with the loaded jet info needed for resampling. + Next_chunk : bool + True if more chunks can be loaded, False if this was + the last chunk. + Njets_initial : int + Number of jets available. + start_index : int + Start index of the chunk. + """ + + # Get the sample name and the preparation configs (cuts etc.) sample, preparation_sample = self.sample_file_map[sample_category][sample_id] + + # Get path of the input file in_file = GetPreparationSamplePath(preparation_sample) - samples = {} + + # Open input file with h5py.File(in_file, "r") as f: + + # Get the number of jets inside the file Njets_initial = len(f["jets"]) + + # Check if custom inital njets are given for this sample if ( "custom_njets_initial" in self.options and self.options["custom_njets_initial"] is not None and sample in list(self.options["custom_njets_initial"]) ): + + # Get the number of jets that are asked for in the config Njets_asked = int(self.options["custom_njets_initial"][sample]) + + # Check that enough jets are there. If not, changed asked njets if Njets_initial <= Njets_asked: logger.warning( f"For sample {sample}, demanding more initial jets" f" ({Njets_asked}) than available ({Njets_initial})." " Forcing to available." ) + else: Njets_initial = Njets_asked + # Set start and end index start_ind = 0 end_ind = int(start_ind + chunk_size) + + # Get a list for the tupled index paris tupled_indices = [] + + # Iterate until the number of asked jets is reached while end_ind <= Njets_initial or start_ind == 0: + + # Check that not an end index is chosen, which is not available + # 100 jets available, chunk size: 30, end index: 80 + # Adding this up would cause an error if end_ind + chunk_size > Njets_initial: # Missing less then a chunk, joining to last chunk end_ind = Njets_initial + + # Append the start and end index pairs to list tupled_indices.append((start_ind, end_ind)) + + # Set the old end index as new start index start_ind = end_ind + + # Get new end index end_ind = int(start_ind + chunk_size) + + # Loop over the index paris for index_tuple in tupled_indices: + + # Get the chunk of jets that is to be loaded to_load = f["jets"][index_tuple[0] : index_tuple[1]] + + # Load the two resampling variables from the jets jets_x = np.asarray(to_load[self.var_x]) jets_y = np.asarray(to_load[self.var_y]) + + # Stack the jet variables sample_vector = np.column_stack((jets_x, jets_y)) + + # Get a dict with the file, the stacked jets and the category samples = { "file": in_file, "sample_vector": sample_vector, "category": preparation_sample.get("category"), } + + # Yield the sample name, the dict with the info, + # a bool value if this was the last chunk (False) if not (True) + # and the available jets and the start index of the chunk yield sample, samples, index_tuple[ 1 ] != Njets_initial, Njets_initial, index_tuple[0] - def Load_Index_Generator(self, in_file, chunk_size): - """Load index generator.""" + def Load_Index_Generator(self, in_file: str, chunk_size: int): + """ + Generator that yields the indicies of the jets that are + to be loaded. + + Parameters + ---------- + in_file : str + Filepath of the input file. + chunk_size : int + Chunk size of the jets that are loaded and yielded. + + Yields + ------- + indices : np.ndarray + Indicies of the jets which are to be loaded. + index_tuple : int + End index of the chunk. + """ + + # Open input file with h5py.File(in_file, "r") as f: + + # Get the number of available jets Nindices = len(f["jets"]) + + # Set start and end index start_ind = 0 end_ind = int(start_ind + chunk_size) + + # Init list for the index pairs (start, end) tupled_indices = [] + + # Get the start and end index pairs for the given + # chunk size while end_ind <= Nindices or start_ind == 0: if end_ind + chunk_size > Nindices: # Missing less then a chunk, joining to last chunk end_ind = Nindices + + # Create the tuple and append it tupled_indices.append((start_ind, end_ind)) + + # Set new start and end index values start_ind = end_ind end_ind = int(start_ind + chunk_size) + # Iterate over the index pairs for index_tuple in tupled_indices: + + # Get the indicies which are to be loaded loading_indices = np.arange(index_tuple[0], index_tuple[1]) + + # Get the jets based on their indicies indices = np.asarray(f["jets"][loading_indices]) + + # Yield the indicies yield indices, index_tuple[1] != Nindices - def Load_Samples(self, sample_category, sample_id): - """Load samples.""" + def Load_Samples(self, sample_category: str, sample_id: int): + """ + Load the input file of the specified category and id. + + Parameters + ---------- + sample_category : str + Sample category that is loaded. + sample_id : int + Index of the sample. + + Returns + ------- + sample : str + Name of the sample which is loaded. + samples : dict + Dict with the info retrieved for resampling. + """ + + # Get the sample name and the preparation configs (cuts etc.) sample, preparation_sample = self.sample_file_map[sample_category][sample_id] + + # Get path of the input file in_file = GetPreparationSamplePath(preparation_sample) - samples = {} + + # Open input file with h5py.File(in_file, "r") as f: + # Get the number of jets inside the file Njets_initial = len(f["jets"]) + + # Check if custom inital njets are given for this sample if ( "custom_njets_initial" in self.options and self.options["custom_njets_initial"] is not None and sample in list(self.options["custom_njets_initial"]) ): + + # Get the number of jets that are asked for in the config Njets_asked = int(self.options["custom_njets_initial"][sample]) + + # Check that enough jets are there. If not, changed asked njets if Njets_initial <= Njets_asked: logger.warning( f"For sample {sample}, demanding more initial jets" f" ({Njets_asked}) than available ({Njets_initial})." " Forcing to available." ) + else: Njets_initial = Njets_asked + + # Get the jets which are to be loaded to_load = f["jets"][:Njets_initial] + + # Retrieve the resampling variables from the jets jets_x = np.asarray(to_load[self.var_x]) jets_y = np.asarray(to_load[self.var_y]) logger.info( f"Loaded {len(jets_x)} {preparation_sample.get('category')}" f" jets from {sample}." ) + + # Stack the jets sample_vector = np.column_stack((jets_x, jets_y)) + + # Create dict with the info for resampling samples = { "file": in_file, "sample_vector": sample_vector, "category": preparation_sample.get("category"), } + + # Return sample name and the dict return sample, samples def File_to_histogram( self, - sample_category, - category_ind, - sample_id, - iterator=True, - chunk_size=1e4, - bins=None, - hist_range=None, - ): - """Convert file to histogram""" + sample_category: str, + category_ind: int, + sample_id: int, + iterator: bool = True, + chunk_size: int = 1e4, + bins: list = None, + hist_range: list = None, + ) -> dict: + """ + Convert the provided sample into a 2d histogram + which is used to calculate the PDF functions. + + Parameters + ---------- + sample_category : str + Sample category that is loaded. + category_ind : int + Index of the category which is used. + sample_id : int + Index of the sample that is used. + iterator : bool, optional + Decide, if the iterative approach is used (True) or + the in memory approach (False), by default True. + chunk_size : int, optional + Chunk size for loading the jets in the iterative approach, + by default 1e4. + bins : list, optional + List with the bins to use for the 2d histogram, + by default None. + hist_range : list, optional + List with histogram ranges for the 2d histogram function, + by default None. + + Returns + ------- + Results_dict : dict + Dict with the 2d histogram info. + """ + if bins is None: bins = self.limit["bins"][category_ind] + if hist_range is None: hist_range = self.limit["ranges"][category_ind] + + # Init the available numbers available_numbers = 0 + + # Check if the iterative method is used if iterator: + # Get the generator which loads the sample generator = self.Load_Samples_Generator( sample_category=sample_category, sample_id=sample_id, chunk_size=chunk_size, ) + # Set load in chunks and chunk counter load_chunk = True chunk_counter = 0 + + # Iterate over until all chunks are loaded while load_chunk: + + # Get the dict with the resampling info, the info if more + # chunks can be loaded and the number of total jets available try: _, target_dist, load_more, total, _ = next(generator) + + # If the no more chunks can be loaded, break except StopIteration: break - if chunk_counter == 0: - pbar = tqdm(total=total) + + # Get the info if more chunks can be loaded or not load_chunk = load_more + + # Check if this is the first chunk if chunk_counter == 0: + # If this is the first chunk, init the progress bar + pbar = tqdm(total=total) + + # Get the 2d histogram of the two resampling variables h_target, x_bin_edges, y_bin_edges = np.histogram2d( target_dist["sample_vector"][:, 0], target_dist["sample_vector"][:, 1], bins=bins, range=hist_range, ) + else: + + # Get the 2d histogram of the two resampling variables new_hist, _, _ = np.histogram2d( target_dist["sample_vector"][:, 0], target_dist["sample_vector"][:, 1], bins=bins, range=hist_range, ) + + # Add the new loaded one to the old h_target += new_hist + + # Get the number of jets that were loaded in this step njets_added = len(target_dist["sample_vector"]) + + # Update the progres bar pbar.update(njets_added) + + # Add the number of loaded jets to the availabe number available_numbers += njets_added + + # Set the chunk counter up chunk_counter += 1 + + # Close progress bar after finishing the loop pbar.close() + + # Or the in memory approach else: + + # Get the target dist from the sample _, target_dist = self.Load_Samples(sample_category, sample_id) + + # Get the number of available jets available_numbers = len(target_dist["sample_vector"]) + + # Create the 2d histogram h_target, x_bin_edges, y_bin_edges = np.histogram2d( target_dist["sample_vector"][:, 0], target_dist["sample_vector"][:, 1], @@ -1172,7 +1500,10 @@ class PDFSampling(Resampling): # pylint: disable=too-many-public-methods range=hist_range, ) + # Get the preparation info (cuts, etc.) for the chosen sample _, preparation_sample = self.sample_file_map[sample_category][sample_id] + + # Add the results to a dict return_dict = { "hist": h_target, "xbins": x_bin_edges, @@ -1180,11 +1511,15 @@ class PDFSampling(Resampling): # pylint: disable=too-many-public-methods "available_numbers": available_numbers, "category": preparation_sample.get("category"), } + + # If the in memory approach is chosen, set the target distribution in dict if not iterator: return_dict["target_dist"] = target_dist + + # Return the dict with the 2d histogram info return return_dict - def Initialise_Flavour_Samples(self): + def Initialise_Flavour_Samples(self) -> None: """ Initialising input files: this one just creates the map. (based on UnderSampling one). At this point the arrays of @@ -1243,73 +1578,125 @@ class PDFSampling(Resampling): # pylint: disable=too-many-public-methods num_available, ) - def CheckSampleConsistency(self, samples): + def CheckSampleConsistency(self, samples: dict) -> None: """ Helper function to check if each sample category has the same amount of samples with same category (e.g. Z' and ttbar both have b, c & light) + + Parameters + ---------- + samples : dict + Dict with the categories (ttbar, zpext) and their corresponding + sample names. + + Raises + ------ + KeyError + If the sample which is requested is not in the preparation stage. + RuntimeError + Your specified samples in the sampling/samples block need to have + the same category in each sample category. """ + + # Init a dict with the sample categories check_consistency = {elem: [] for elem in self.sample_categories} + + # Iterate over the categories for category in self.sample_categories: + + # Iterate over the samples in the category for sample in samples[category]: + + # Get the preparation configs (cuts etc.) for the sample preparation_sample = self.preparation_samples.get(sample) + + # Check that the samples are also defined in the preparation stage if preparation_sample is None: raise KeyError( f"'{sample}' was requested in sampling/samples block," "however, it is not defined in preparation/samples in" "the preprocessing config file" ) + + # Add the sample to the check dict check_consistency[category].append(preparation_sample["category"]) + + # Get the combinations of the dict keys combs = list(itertools.combinations(check_consistency.keys(), 2)) + + # Create a list with the checks of the combinations combs_check = [ sorted(check_consistency[elem[0]]) == sorted(check_consistency[elem[1]]) for elem in combs ] + + # Check that all combinations are valied if not all(combs_check): raise RuntimeError( "Your specified samples in the sampling/samples " "block need to have the same category in each sample category." ) + + # Get the class categories self.class_categories = check_consistency[next(iter(self.sample_categories))] def CalculatePDF( self, - store_key, - x_y_original=None, - x_y_target=None, - target_hist=None, - original_hist=None, - target_bins=None, - bins=None, - limits=None, - ): + store_key: str, + x_y_original: tuple = None, + x_y_target: tuple = None, + target_hist: np.ndarray = None, + original_hist: np.ndarray = None, + target_bins: tuple = None, + bins: list = None, + limits: list = None, + ) -> None: """ Calculates the histograms of the input data and uses them to calculate the PDF Ratio. Works either on dataframe or pre-made histograms CalculatePDFRatio is invoked here. + Provides the PDF interpolation function which is used for sampling + (entry in a dict). It is a property of the class. - Inputs: - x_y_target: A 2D tuple of the target datapoints of x and y. - x_y_original: A 2D tuple of the to resample datapoints of x and y. - - Or, x_y_target and/or x_y_original can be replaced by: - target_hist: hist for the target - original_hist: hist for the original flavour - If using target_hist, need to define target_bins, a tuple with (binx, biny). - - store_key: key of the interpolation function to be added - to self.inter_func_dict (and self._ratio_dict) - bins: This can be all possible binning inputs as for numpy - histogram2d. Not used if hist are passed instead of arrays. - limits: limits for the binning. Not used if hist are passed instead of arrays. + Parameters + ---------- + store_key : str + Key of the interpolation function to be added + to self.inter_func_dict (and self._ratio_dict) + x_y_original : tuple, optional + A 2D tuple of the to resample datapoints of x and y, by default None. + x_y_target : tuple, optional + A 2D tuple of the target datapoints of x and y, by default None. + target_hist : np.ndarray, optional + Histogram for the target, by default None + original_hist : np.ndarray, optional + Histogram for the original flavour, by default None + target_bins : tuple, optional + If using target_hist, need to define target_bins, a tuple with + (binx, biny), by default None. + bins : list, optional + This can be all possible binning inputs as for numpy + histogram2d. Not used if hist are passed instead of arrays, + by default None. + limits : list, optional + Limits for the binning. Not used if hist are passed instead of arrays, + by default None. - Output: - Provides the PDF interpolation function which is used for sampling - (entry in a dict). - It is a property of the class. + Raises + ------ + ValueError + If feeding a histogram but not the bins in PDF calculation. + ValueError + If improper target input for PDF calculation of the store_key. + ValueError + If improper original flavour input for PDF calculation of store_key """ + + # Check if bins are defined and set value if bins is None: bins = [100, 9] - # Calculate the corresponding histograms + + # Calculate the corresponding histograms if target histogram is defined if target_hist is not None: if target_bins is not None: h_target, self._x_bin_edges, self._y_bin_edges = ( @@ -1321,23 +1708,30 @@ class PDFSampling(Resampling): # pylint: disable=too-many-public-methods raise ValueError( "Feeding a histogram but not the bins in PDF calculation." ) + + # Calculate the corresponding histograms if the xy target is defined elif x_y_target is not None: h_target, self._x_bin_edges, self._y_bin_edges = np.histogram2d( x_y_target[:, 0], x_y_target[:, 1], bins, range=limits ) + else: raise ValueError( f"Improper target input for PDF calculation of {store_key}." ) + # Check if the original histogram is provided if original_hist is not None: h_original = original_hist + + # Check if the original x y is provided elif x_y_original is not None: h_original, _, _ = np.histogram2d( x_y_original[:, 0], x_y_original[:, 1], bins=[self._x_bin_edges, self._y_bin_edges], ) + else: raise ValueError( f"Improper original flavour input for PDF calculation of {store_key}." @@ -1345,31 +1739,40 @@ class PDFSampling(Resampling): # pylint: disable=too-many-public-methods # Calculate the PDF Ratio self.CalculatePDFRatio( - store_key, - h_target, - h_original, - self._x_bin_edges, - self._y_bin_edges, + store_key=store_key, + h_target=h_target, + h_original=h_original, + x_bin_edges=self._x_bin_edges, + y_bin_edges=self._y_bin_edges, ) def CalculatePDFRatio( - self, store_key, h_target, h_original, x_bin_edges, y_bin_edges - ): + self, + store_key: str, + h_target: np.ndarray, + h_original: np.ndarray, + x_bin_edges: np.ndarray, + y_bin_edges: np.ndarray, + ) -> None: """ Receives the histograms of the target and original data, the bins - and a max ratio value. Latter is optional. - - Inputs: - h_target: Output of numpy histogram2D for the target datapoints - h_original: Output of numpy histogram2D for the original datapoints - store_key: key of the interpolation function to be added - to self.inter_func_dict (and self._ratio_dict) - bins: The bin edges of the binning used for the numpy histogram2D. - This is also returned from numpy histgram2D - - Output: - Provides the PDF interpolation function which is used for sampling. + and a max ratio value. Latter is optional. Provides the PDF + interpolation function which is used for sampling. This can be returned with Inter_Func_Dict. It is a property of the class. + + Parameters + ---------- + store_key : str + Key of the interpolation function to be added + to self.inter_func_dict (and self._ratio_dict) + h_target : np.ndarray + Output of numpy histogram2D for the target datapoints. + h_original : np.ndarray + Output of numpy histogram2D for the original datapoints. + x_bin_edges : np.ndarray + Array with the x axis bin edges. + y_bin_edges : np.ndarray + Array with the y axis bin edges. """ # Transform bin edges to bin centres @@ -1388,12 +1791,15 @@ class PDFSampling(Resampling): # pylint: disable=too-many-public-methods where=(h_original != 0), ) + # Store the ratio unter the store key as a class attribute self._ratio_dict[store_key] = ratio self._bin_edges_dict[store_key] = ( x_bin_edges[1:-1], y_bin_edges[1:-1], ) + # Get the interpolation function and store it in the inter func dict + # which is an attribute of the class self.inter_func_dict[store_key] = RectBivariateSpline( self.x_bins, self.y_bins, @@ -1407,23 +1813,40 @@ class PDFSampling(Resampling): # pylint: disable=too-many-public-methods y_bin_edges[-1], ], ) + + # Get the path where the interpolation function will be saved save_name = os.path.join( self.resampled_path, "PDF_sampling", f"inter_func_{store_key}", ) + + # Save the interpolation function to file self.save(self.inter_func_dict[store_key], save_name) - def save(self, inter_func, file_name: str, overwrite: bool = True): + def save( + self, + inter_func: RectBivariateSpline, + file_name: str, + overwrite: bool = True, + ) -> None: """ - Save the interpolation function to file + Save the interpolation function to file. - Input: - inter_func: Interpolation function to save - file_name: Path where the pickle file is saved. + Parameters + ---------- + inter_func : RectBivariateSpline + Interpolation function. + file_name : str + Path where the pickle file is saved. + overwrite : bool, optional + Decide if the file is overwritten if it exists already, + by default True - Output: - Pickle file of the PDF interpolation function. + Raises + ------ + ValueError + If no interpolation function is given. """ if inter_func is not None: @@ -1452,38 +1875,68 @@ class PDFSampling(Resampling): # pylint: disable=too-many-public-methods else: raise ValueError("Interpolation function not calculated/given!") - def load(self, file_name: str): + def load(self, file_name: str) -> RectBivariateSpline: """ Load the interpolation function from file. - Input: - file_name: Path where the pickle file is saved. + Parameters + ---------- + file_name : str + Path where the pickle file is saved. - Output: - Returns the PDF interpolation function of the pickle file. + Returns + ------- + RectBivariateSpline + The loaded interpolation function. """ with open(file_name, "rb") as file: inter_func = pickle.load(file) + return inter_func - def inMemoryResample(self, x_values, y_values, size, store_key, replacement=True): + def inMemoryResample( + self, + x_values: np.ndarray, + y_values: np.ndarray, + size: int, + store_key: str, + replacement: bool = True, + ) -> np.ndarray: """ Resample all of the datapoints at once. Requirement for that is that all datapoints fit in the RAM. - Input: - x_values: x values of the datapoints which are to be resampled from (i.e pT) - y_values: y values of the datapoints which are to be resampled from (i.e eta) - size: Number of jets which are resampled. - store_key: key of the interpolation function to be added - to self.inter_func_dict (and self._ratio_dict) + Parameters + ---------- + x_values : np.ndarray + x values of the datapoints which are to be resampled from (i.e pT) + y_values : np.ndarray + y values of the datapoints which are to be resampled from (i.e eta) + size : int + Number of jets which are resampled. + store_key : str + Key of the interpolation function to be added to + self.inter_func_dict (and self._ratio_dict) + replacement : bool, optional + Decide, if replacement is used in the resampling, by default True. - Output: - Resampled jets + Returns + ------- + sampled_indices : np.ndarray + The indicies of the sampled jets. + + Raises + ------ + ValueError + If x_values and y_values have different shapes. """ + + # Check for replacement and log an info if replacement is False: logger.info("PDF sampling without replacement for given set!") + + # Check that the given values are np.ndarrays if isinstance(x_values, (float, int)): x_values = np.asarray([x_values]) @@ -1499,14 +1952,19 @@ class PDFSampling(Resampling): # pylint: disable=too-many-public-methods # Normalise the datapoints for sampling r_resamp = r_resamp / np.sum(r_resamp) + if logger.level <= 10: # When debugging, this snippet plots the # weights and interpolated values vs pT. + # Import plt for plotting import matplotlib.pyplot as plt # pylint: disable=import-outside-toplevel + # Histogram the x values with and without weights n, _ = np.histogram(x_values, bins=self._x_bin_edges) sy, _ = np.histogram(x_values, bins=self._x_bin_edges, weights=r_resamp) + + # Get mean mean = np.divide( sy, n, @@ -1516,6 +1974,8 @@ class PDFSampling(Resampling): # pylint: disable=too-many-public-methods ), where=(n != 0), ) + + # Plot the Distributions with and without weights plt.figure() plt.plot(self.x_bins, mean, label=f"effective weights {store_key}") plt.legend(loc="best", ncol=1, fontsize=7) @@ -1535,58 +1995,131 @@ class PDFSampling(Resampling): # pylint: disable=too-many-public-methods # Resample the datapoints based on their PDF Ratio value sampled_indices = self.Resample_chunk(r_resamp, size, replacement) + # Return sampled jet indicies return sampled_indices - def Return_unnormalised_PDF_weights(self, x_values, y_values, store_key): - """Get unnormalised PDF weight.""" - # Get the inter_func + def Return_unnormalised_PDF_weights( + self, + x_values: np.ndarray, + y_values: np.ndarray, + store_key: str, + ) -> np.ndarray: + """ + Calculate the unnormalised PDF weights and return them. + + Parameters + ---------- + x_values : np.ndarray + x values of the datapoints which are to be resampled from (i.e pT) + y_values : np.ndarray + y values of the datapoints which are to be resampled from (i.e eta) + store_key : str + Key of the interpolation function to be added to + self.inter_func_dict (and self._ratio_dict) + + Returns + ------- + r_resamp : np.ndarray + Array with the PDF weights. + """ + + # Get the filepath of the interpolation function load_name = os.path.join( self.resampled_path, "PDF_sampling", f"inter_func_{store_key}", ) + + # Load the interpolation function inter_func = self.load(load_name) + # Evaluate the given values with the interpolation function r_resamp = inter_func.ev(x_values, y_values) # Neutralise all datapoints where the ratio is less than 0 indices = np.where(r_resamp < 0)[0] r_resamp[indices] = 0 + + # Return the evaluated result return r_resamp - def Resample_chunk(self, r_resamp, size, replacement=True): # pylint: disable=R0201 - """Resampling of chunk.""" + def Resample_chunk( + self, + r_resamp: np.ndarray, + size: int, + replacement: bool = True, + ) -> np.ndarray: + """ + Get sampled indicies from the PDF weights. + + Parameters + ---------- + r_resamp : np.ndarray + PDF weights. + size : int + Number of jets to sample + replacement : bool, optional + Decide, if replacement is used, by default True. + + Returns + ------- + sampled_indices : np.ndarray + Indicies of the resampled jets which are to use. + """ + sampled_indices = np.random.default_rng().choice( len(r_resamp), p=r_resamp, size=size, replace=replacement ) + return sampled_indices def Resample_Iterator( self, - sample_category, - sample_id, - save_name, - sample_name, - chunk_size=1e6, - ): + sample_category: str, + sample_id: int, + save_name: str, + sample_name: str, + chunk_size: int = 1e6, + ) -> None: """ Resample with the data not completely stored in memory. Will load the jets in chunks, computing first the sum of PDF weights and then sampling with replacement based on the normalised weights. + + Parameters + ---------- + sample_category : str + Sample category to resample. + sample_id : int + Index of the sample which to be resampled. + save_name : str + Filepath + Filename and ending of the file where to save the + resampled jets to. + sample_name : str + Name of the sample to use. + chunk_size : int, optional + Chunk size which is loaded per step, by default 1e6. """ + # Get the preparation options (cuts etc.) for the given sample _, preparation_sample = self.sample_file_map[sample_category][sample_id] + + # Create a store_key for the sample store_key = sample_category + "_" + preparation_sample.get("category") - # Load number to sample + # Get filepath where the info of the target data are saved load_name = os.path.join( self.resampled_path, "PDF_sampling", "target_data.json", ) + + # Load the target info from file with open(load_name, "r") as load_file: target_data = json.load(load_file) + + # Load number to sample number_to_sample = target_data["number_to_sample"][sample_name] # First pass over data to get sum of weights @@ -1613,44 +2146,81 @@ class PDFSampling(Resampling): # pylint: disable=too-many-public-methods # sum_of_weights += np.sum(weights) # second pass over data, normalising weights and sampling + + # Init chunk load bool load_chunk = True + + # Get the generator which loads the samples generator = self.Load_Samples_Generator( sample_category=sample_category, sample_id=sample_id, chunk_size=chunk_size, ) + + # Set bool for creation of file and init number of sampled jets create_file = True sampled_jets = 0 + + # Init a progress bar pbar = tqdm(total=number_to_sample) + + # Loop over the chunks until now chunks can be loaded anymore while load_chunk: + + # Try to get chunk from generator try: _, target_dist, load_more, total_number, start_ind = next(generator) + + # Break if no chunk can be loaded anymore except StopIteration: break + + # Check if another chunk can be loaded or if this was the last load_chunk = load_more + + # Get the unnormalised PDF weights of the chunk weights = self.Return_unnormalised_PDF_weights( target_dist["sample_vector"][:, 0], target_dist["sample_vector"][:, 1], store_key=store_key, ) - # weight of the chunk + + # Statistical weight of the chunk chunk_weights = len(weights) / total_number + # Sample a fraction of jets proportional to the chunk weight to_sample = number_to_sample * chunk_weights + + # If this is the last chunk, get the number of jets that still needs to be + # sampled if not load_chunk: # last chunk to_sample = number_to_sample - sampled_jets + # Calculate normalised weights weights = weights / np.sum(weights) + + # Get the selected indicies based on the normalised weights selected_ind = self.Resample_chunk(weights, size=round(to_sample)) + + # Sort the selected indicies selected_indices = np.sort(selected_ind).astype(int) + # Need to turn chunk indices to full list indices selected_indices += start_ind sampled_jets += len(selected_indices) + + # Update progress bar pbar.update(selected_indices.size) + + # If this is the first chunk, create a new file if create_file: + + # Set the creation to false create_file = False with h5py.File(save_name, "w") as f: + + # Create new dataset with the indicies inside f.create_dataset( "jets", data=selected_indices, @@ -1658,39 +2228,67 @@ class PDFSampling(Resampling): # pylint: disable=too-many-public-methods chunks=True, maxshape=(None,), ) + + # If this is not the first chunk, extend existing file else: + with h5py.File(save_name, "a") as f: + # Append indicies to existing ones f["jets"].resize( (f["jets"].shape[0] + selected_indices.shape[0]), axis=0, ) f["jets"][-selected_indices.shape[0] :] = selected_indices + + # Close progress bar pbar.close() def Save_partial_iterator( self, - sample_category, - sample_id, - selected_indices, + sample_category: str, + sample_id: int, + selected_indices: np.ndarray, chunk_size: int = 1e6, - ): + ) -> None: """ Save the selected data to an output file with an iterative approach (generator) for writing only, writing in chunk of size chunk_size. The file is read in one go. + + Parameters + ---------- + sample_category : str + Sample category to save + sample_id : int + Sample index which is to be saved + selected_indices : np.ndarray + Array with the selected indicies + chunk_size : int, optional + Chunk size which is loaded per step, by default 1e6. """ + # Get the preparation options of the given sample _, preparation_sample = self.sample_file_map[sample_category][sample_id] + + # Get the filepath of the input file in_file = GetPreparationSamplePath(preparation_sample) + # Get the number of indicies sample_lengths = len(selected_indices) + + # Return max sample length max_sample = np.amax(sample_lengths) + + # Calculate number of chunks based on max_sample and chunksize n_chunks = round(max_sample / chunk_size + 0.5) + + # Get rounded chunk sizes chunk_sizes = sample_lengths / n_chunks + # Get the sampling generator generators = SamplingGenerator( - in_file, - selected_indices, + file=in_file, + indices=selected_indices, chunk_size=chunk_sizes, label=self.class_labels_map[preparation_sample["category"]], label_classes=list(range(len(self.class_labels_map))), @@ -1700,8 +2298,10 @@ class PDFSampling(Resampling): # pylint: disable=too-many-public-methods duplicate=True, ) - create_file = True + # Init a chunk counter chunk_counter = 0 + + # Get the path where to save the jets save_name = os.path.join( self.resampled_path, "PDF_sampling", @@ -1710,8 +2310,13 @@ class PDFSampling(Resampling): # pylint: disable=too-many-public-methods logger.info(f"Writing to file {save_name}.") + # Init a new progress bar pbar = tqdm(total=np.sum(sample_lengths)) + + # Iterate over the chunks while chunk_counter < n_chunks + 1: + + # Get jets, tracks and labels of the chunk try: if self.save_tracks: jets, tracks, labels = next(generators) @@ -1719,6 +2324,8 @@ class PDFSampling(Resampling): # pylint: disable=too-many-public-methods jets, labels = next(generators) except StopIteration: break + + # Update progress bar pbar.update(jets.size) # Init a index list @@ -1734,9 +2341,10 @@ class PDFSampling(Resampling): # pylint: disable=too-many-public-methods if self.save_tracks: tracks = [trk[rng_index] for trk in tracks] - if create_file: - create_file = False - # write to file by creating dataset + # Create a new file if this is the first chunk + if chunk_counter == 0: + + # Write to file by creating dataset with h5py.File(save_name, "w") as out_file: out_file.create_dataset( "jets", @@ -1761,6 +2369,8 @@ class PDFSampling(Resampling): # pylint: disable=too-many-public-methods chunks=True, maxshape=(None, tracks[i].shape[1]), ) + + # If not the first chunk, extend datasets else: # appending to existing dataset with h5py.File(save_name, "a") as out_file: @@ -1781,60 +2391,117 @@ class PDFSampling(Resampling): # pylint: disable=too-many-public-methods axis=0, ) out_file[tracks_name][-tracks[i].shape[0] :] = tracks[i] + + # Add up chunk counter chunk_counter += 1 + + # Close progress bar pbar.close() - def Save_complete_iterator(self, sample_category, sample_id, chunk_size: int = 1e5): + def Save_complete_iterator( + self, + sample_category: str, + sample_id: int, + chunk_size: int = 1e5, + ) -> None: """ Save the selected data to an output file with an iterative approach (generator) for both writing and reading, in chunk of size chunk_size. + + Parameters + ---------- + sample_category : str + Sample category to save + sample_id : int + Sample index which is to be saved + chunk_size : int, optional + Chunk size which is loaded per step, by default 1e5. """ + # Get the sample name of the config sample_name = self.options["samples"][sample_category][sample_id] + + # Get the preparation options of the sample _, preparation_sample = self.sample_file_map[sample_category][sample_id] + + # Get the filepath of the input sample in_file = GetPreparationSamplePath(preparation_sample) - # Load number to sample + # Get filepath where the info of the target data are saved load_name = os.path.join( self.resampled_path, "PDF_sampling", "target_data.json", ) + + # Load the target info from file with open(load_name, "r") as load_file: target_data = json.load(load_file) + + # Load number to sample number_to_sample = target_data["number_to_sample"][sample_name] + # Get the path of the index file index_file = os.path.join( self.resampled_path, "PDF_sampling", self.options["samples"][sample_category][sample_id] + "_indices.h5", ) - index_generator = self.Load_Index_Generator(index_file, chunk_size) + + # Get the generator which loads the indicies from file + index_generator = self.Load_Index_Generator( + in_file=index_file, + chunk_size=chunk_size, + ) + + # Get the labels label = self.class_labels_map[preparation_sample["category"]] + + # Get the label classes label_classes = list(range(len(self.class_labels_map))) + + # Check if tracks are used use_tracks = self.save_tracks + + # Set duplicate to True for resampling duplicate = True + # Get path where to save the resampled jets save_name = os.path.join( self.resampled_path, "PDF_sampling", self.options["samples"][sample_category][sample_id] + "_selected.h5", ) + logger.info(f"Writing to file {save_name}.") + + # Create bool for loop and file creation load_chunk = True create_file = True + + # Init new progress bar pbar = tqdm(total=number_to_sample) + + # Iterate over chunks while load_chunk: + + # Load the chunk with the generator try: indices, load_more = next(index_generator) except StopIteration: break + + # Check if another chunk can be loaded load_chunk = load_more + # One hot encode the loaded labels labels = label_binarize( (np.ones(len(indices)) * label), classes=label_classes, ) + + # Open the input file and read the jets and tracks + # in a fancy way which allows double index loading with h5py.File(in_file, "r") as file_df: if use_tracks: jets, tracks = read_dataframe_repetition( @@ -1851,6 +2518,8 @@ class PDFSampling(Resampling): # pylint: disable=too-many-public-methods duplicate=duplicate, use_tracks=use_tracks, ) + + # Update the progress bar pbar.update(jets.size) # Init a index list @@ -1866,8 +2535,12 @@ class PDFSampling(Resampling): # pylint: disable=too-many-public-methods if self.save_tracks: tracks = [trk[rng_index] for trk in tracks] + # Check if this is the first chunk if create_file: + + # Set file creation to false create_file = False + # write to file by creating dataset with h5py.File(save_name, "w") as out_file: out_file.create_dataset( @@ -1893,6 +2566,8 @@ class PDFSampling(Resampling): # pylint: disable=too-many-public-methods chunks=True, maxshape=(None, tracks[i].shape[1]), ) + + # If this is not the first chunk, open file and extend the datasets else: # appending to existing dataset with h5py.File(save_name, "a") as out_file: @@ -1913,21 +2588,21 @@ class PDFSampling(Resampling): # pylint: disable=too-many-public-methods axis=0, ) out_file[tracks_name][-tracks[i].shape[0] :] = tracks[i] + + # Close the progress bar pbar.close() - def Generate_Target_PDF(self, iterator=True): + def Generate_Target_PDF(self, iterator: bool = True) -> None: """ This method creates the target distribution (seperated) and store the associated histogram in memory (use for sampling) as well as the target numbers. + Save to memory the target histogram, binning info, and target numbers. Parameters ---------- - iterator: bool, whether to use the iterator approach or - load the whole sample in memory. - - Returns - ------- - Save to memory the target histogram, binning info, and target numbers. + iterator : bool, optional + Whether to use the iterator approach or load the whole sample in memory, + by default True. """ logger.info("Generating target PDF.") @@ -1935,19 +2610,26 @@ class PDFSampling(Resampling): # pylint: disable=too-many-public-methods available_numbers = [] self.target_histo = {} + # Iterate over the samples for cat_ind, sample_category in enumerate(self.options["samples"]): logger.info(f"Loading target in category {sample_category}.") + + # Get the histogram from selected file reading_dict = self.File_to_histogram( sample_category=sample_category, category_ind=cat_ind, sample_id=0, iterator=iterator, ) + + # Create target histogram dict for the category self.target_histo[sample_category] = { "hist": reading_dict["hist"], "xbins": reading_dict["xbins"], "ybins": reading_dict["ybins"], } + + # Add the available number and the fraction of jets available_numbers.append(reading_dict["available_numbers"]) self.target_fractions.append(self.options["fractions"][sample_category]) @@ -1958,10 +2640,15 @@ class PDFSampling(Resampling): # pylint: disable=too-many-public-methods target_fractions=self.target_fractions, verbose=False, ) + + # Check for how many jets are requested if njets_asked == -1: logger.info("Maximising number of jets to target distribution.") + else: logger.info(f"Requesting {njets_asked} in total from target.") + + # Correct the fractions total_corr = sum(target_numbers_corr) if total_corr < njets_asked: logger.info("Requesting more jets from target than available.") @@ -1971,10 +2658,14 @@ class PDFSampling(Resampling): # pylint: disable=too-many-public-methods elif total_corr > njets_asked: ratio = float(njets_asked / total_corr) target_numbers_corr = [int(num * ratio) for num in target_numbers_corr] + + # Add the target number self.target_number = { list(self.options["samples"].keys())[ind]: target for ind, target in enumerate(target_numbers_corr) } + + # Iterate over the samples and log the info for cat_ind, sample_category in enumerate(self.options["samples"]): logger.info( f"target - category {sample_category}: selected" @@ -1990,6 +2681,8 @@ class PDFSampling(Resampling): # pylint: disable=too-many-public-methods "target_number": self.target_number, "target_fraction": self.target_fractions, } + + # Get path of file where to save the target info save_name = os.path.join( self.resampled_path, "PDF_sampling", @@ -1999,40 +2692,62 @@ class PDFSampling(Resampling): # pylint: disable=too-many-public-methods # Ensure the output path exists os.makedirs(os.path.join(self.resampled_path, "PDF_sampling"), exist_ok=True) + # Dump the target info to json with open(save_name, "w") as write_file: json.dump(save_data, write_file, cls=JsonNumpyEncoder) - def Generate_Number_Sample(self, sample_id): + def Generate_Number_Sample(self, sample_id: int) -> None: """ For a given sample, sets the target numbers, respecting flavour ratio and upsampling max ratio (if given). Parameters ---------- - sample_id: int, position of the flavour in the sample list. + sample_id : int + Position of the flavour in the sample list. """ + # Get the path of the target data file load_name = os.path.join( self.resampled_path, "PDF_sampling", "target_data.json", ) + + # Load target data with open(load_name, "r") as load_file: target_data = json.load(load_file) + # Init list for the flavour names flavour_names = [] + + # Iterate over the samples for cat_ind, sample_category in enumerate(self.options["samples"]): + + # Get the flavour name and append it to list flavour_name = self.sample_file_map[sample_category][sample_id][0] flavour_names.append(flavour_name) + + # If the flavour name is in the max upsampling section if any(flavour_name in self.max_upsampling for flavour_name in flavour_names): + + # Init list for asked number of jets asked_num = [] + + # Iterate over samples for cat_ind, flavour_name in enumerate(flavour_names): + + # Get the number of asked jets num_asked = target_data["target_number"][ list(self.options["samples"])[cat_ind] ] + + # Check if the flavour is in max upsampling if flavour_name in self.max_upsampling: upsampling, num_av = self.max_upsampling[flavour_name] upsampling_asked = float(num_asked) / num_av + + # Check if the max upsampling ratio is reached if upsampling < upsampling_asked: num_corrected = int(num_av * upsampling) asked_num.append(num_corrected) @@ -2056,6 +2771,8 @@ class PDFSampling(Resampling): # pylint: disable=too-many-public-methods f"No maximum upsampling ratio demanded to {flavour_name}." ) asked_num.append(num_asked) + + # Correct the number of asked jets asked_num_corr = CorrectFractions( N_jets=asked_num, target_fractions=target_data["target_fraction"], @@ -2087,7 +2804,7 @@ class PDFSampling(Resampling): # pylint: disable=too-many-public-methods category_id: int, sample_id: int, iterator: bool = True, - ): + ) -> dict: """ This method: - create the flavour distribution (also seperated), @@ -2095,30 +2812,37 @@ class PDFSampling(Resampling): # pylint: disable=too-many-public-methods Parameters ---------- - sample_category: str, the name of the category study. - category_id: int, the location of the category in the list. - sample_id: int, the location of the sample flavour in the category dict. - iterator: bool, whether to use the iterator approach or - load the whole sample in memory. + sample_category : str + The name of the category study. + category_id : int + The location of the category in the list. + sample_id : int + The location of the sample flavour in the category dict. + iterator : bool, optional + Whether to use the iterator approach or load the whole + sample in memory, by default True. Returns ------- - Add a dictionary object to the class pointing to the interpolation functions - (also saves them). - Returns None or dataframe of flavour (if iterator or not) and the histogram of - the flavour. - + reading_dict : dict + Add a dictionary object to the class pointing to the + interpolation functions (also saves them). Returns + None or dataframe of flavour (if iterator or not) + and the histogram of the flavour. """ - # Load the target data + # Get filepath of the target data load_name = os.path.join( self.resampled_path, "PDF_sampling", "target_data.json", ) + + # Load the target data with open(load_name, "r") as load_file: target_data = json.load(load_file) + # Create the histogram dict with the target info reading_dict = self.File_to_histogram( sample_category=sample_category, category_ind=category_id, @@ -2128,6 +2852,8 @@ class PDFSampling(Resampling): # pylint: disable=too-many-public-methods logger.info( f"Computing PDF in {sample_category} for the {reading_dict['category']}." ) + + # Calculate the PDF self.CalculatePDF( store_key=f"{sample_category}_{reading_dict['category']}", target_hist=np.asarray( @@ -2144,6 +2870,7 @@ class PDFSampling(Resampling): # pylint: disable=too-many-public-methods if iterator: return None + return reading_dict["target_dist"] def Sample_Flavour( @@ -2151,46 +2878,59 @@ class PDFSampling(Resampling): # pylint: disable=too-many-public-methods sample_category: str, sample_id: int, iterator: bool = True, - flavour_distribution=None, - ): + flavour_distribution: np.ndarray = None, + ) -> np.ndarray: """ This method: - samples the required amount based on PDF and fractions - storing the indices selected to memory. + Parameters ---------- - sample_category: str, the name of the category study. - sample_id: int, the location of the sample flavour in the category dict. - iterator: bool, whether to use the iterator approach or load the whole sample - in memory. - flavour_distribution: None or numpy array, the loaded data (for the flavour). - If it is None, an iterator method is used. + sample_category : str + The name of the category study. + sample_id : int + The location of the sample flavour in the category dict. + iterator : bool, optional + Whether to use the iterator approach or load the whole sample + in memory, by default True. + flavour_distribution : np.ndarray, optional + None or numpy array, the loaded data (for the flavour). + If it is None, an iterator method is used, by default None. Returns ------- - Returns (and stores to memory, if iterator is false) the selected indices - for the flavour studied. + selected_indices : np.ndarray + Returns (and stores to memory, if iterator is false) the selected indices + for the flavour studied. If iterator is True, a None will be returned. """ + # Get the sample name from config sample_name = self.options["samples"][sample_category][sample_id] + + # Get filepath where to save the indicies save_name = os.path.join( self.resampled_path, "PDF_sampling", self.options["samples"][sample_category][sample_id] + "_indices.h5", ) - # Load number to sample + # Get filepath of the target data load_name = os.path.join( self.resampled_path, "PDF_sampling", "target_data.json", ) + + # Load number to sample with open(load_name, "r") as load_file: target_data = json.load(load_file) number_to_sample = target_data["number_to_sample"][sample_name] logger.info(f"Selecting indices for {sample_name}.") logger.info(f"Saving indices to: {save_name}") + + # Use the resample iterator if iterator is True if flavour_distribution is None or iterator: logger.info("Using iterating approach.") self.Resample_Iterator( @@ -2202,20 +2942,29 @@ class PDFSampling(Resampling): # pylint: disable=too-many-public-methods return None logger.info("Using in-memory approach.") + + # Resample in Memory selected_ind = self.inMemoryResample( - flavour_distribution["sample_vector"][:, 0], - flavour_distribution["sample_vector"][:, 1], + x_values=flavour_distribution["sample_vector"][:, 0], + y_values=flavour_distribution["sample_vector"][:, 1], size=number_to_sample, store_key=sample_category + "_" + flavour_distribution["category"], ) + + # Open file where the indicies will be saved with h5py.File(save_name, "w") as f: + + # Sort the indicies selected_indices = np.sort(selected_ind).astype(int) + + # Create new dataset with the indicies f.create_dataset( "jets", data=selected_indices, compression="gzip", ) + # Return the selected indicies return selected_indices def Save_Flavour( @@ -2448,7 +3197,8 @@ class PDFSampling(Resampling): # pylint: disable=too-many-public-methods chunk_size: int = 1e4, iterator: bool = True, ): - """Produce plots of the variables used in resampling + """ + Produce plots of the variables used in resampling (before and after preprocessing). Parameters