diff --git a/.gitignore b/.gitignore index 8ec3de10bb729b2752221ececa456a9d89cf7bbe..e06991f887f96acf4c28b6f89da54889b37a9d3c 100644 --- a/.gitignore +++ b/.gitignore @@ -19,3 +19,4 @@ dev/ quickstats/macros/CMSSWCore*/ quickstats/macros/ATLASSWCore/ quickstats/resources/workspace_extensions.json +workspace_extensions_bak.json diff --git a/quickstats/_version.py b/quickstats/_version.py index 8bf580750acff1d7f2993152e79dc80bca5ec9c2..f58bfc6a7da3511a5610999df62199383aafe0fe 100644 --- a/quickstats/_version.py +++ b/quickstats/_version.py @@ -1 +1 @@ -__version__ = "0.6.9.9" +__version__ = "0.7.0.0" diff --git a/quickstats/analysis/ntuple_process_tool.py b/quickstats/analysis/ntuple_process_tool.py index 455ccac530143b051e0a7cbfcb812d907e765c32..a679c3c5945d855fed74e08149ddbfdbc73cd972 100644 --- a/quickstats/analysis/ntuple_process_tool.py +++ b/quickstats/analysis/ntuple_process_tool.py @@ -60,12 +60,15 @@ class NTupleProcessTool(ConfigurableObject): def attribute_df(self): return self._attribute_df - def __init__(self, sample_config:Union[Dict, str], outdir:str='output', - processor_config:Optional[str]=None, - processor_flags:Optional[List[str]]=None, + def __init__(self, sample_config:Union[Dict, str], + outdir:str='output', + process_config:Optional[Union["RooProcessConfig", str]]=None, + process_flags:Optional[List[str]]=None, cache:bool=True, use_template:bool=False, multithread:bool=True, + backend:Optional[str]=None, + backend_options:Optional[Dict]=None, disable_config_message:bool=False, verbosity:Optional[Union[int, str]]="INFO", **kwargs): @@ -73,23 +76,23 @@ class NTupleProcessTool(ConfigurableObject): super().__init__(disable_config_message=disable_config_message, verbosity=verbosity) self.outdir = outdir - self.path_manager = AnalysisPathManager(outdir) + self.path_manager = AnalysisPathManager(base_path=outdir) self.path_manager.set_directory("ntuple", "ntuples") self.path_manager.set_directory("cutflow", "cutflow") self.load_sample_config(sample_config) self.processor = None - if processor_config is not None: - self.load_processor_config(processor_config, - cache=cache, - use_template=use_template, - multithread=multithread) + if process_config is not None: + self.load_process_config(process_config, + cache=cache, + use_template=use_template, + multithread=multithread) - if processor_flags is not None: - self.processor_flags = list(processor_flags) + if process_flags is not None: + self.process_flags = list(process_flags) else: - self.processor_flags = [] + self.process_flags = [] self.cutflow_report = None @@ -115,8 +118,8 @@ class NTupleProcessTool(ConfigurableObject): sample_list.extend(list(self.sample_config['systematic_samples'][syst_theme])) self._syst_theme_list.append('Nominal') import pandas as pd - self._sample_list = list(pd.unique(sample_list)) - merge_sample_map = combine_dict(self.sample_config['merge_samples']) + self._sample_list = pd.unique(np.array(sample_list)) + merge_sample_map = combine_dict(self.sample_config.get('merge_samples', {})) if not merge_sample_map: merge_sample_map = {sample: [sample] for sample in self.sample_list} else: @@ -132,7 +135,7 @@ class NTupleProcessTool(ConfigurableObject): syst_name_list = [] for syst_names in self.sample_config['systematics'].values(): syst_name_list.extend(syst_names) - self._syst_name_list = pd.unique(syst_name_list) + self._syst_name_list = pd.unique(np.array(syst_name_list)) def get_sample_df(self): sample_data = [] @@ -183,17 +186,22 @@ class NTupleProcessTool(ConfigurableObject): attribute_df = pd.DataFrame(attribute_data).set_index(index_list) return attribute_df - def load_processor_config(self, config_path:str, - cache:bool=True, - multithread:bool=True, - use_template:bool=False): - from quickstats.components.processors import RooProcessor - self.processor = RooProcessor(config_path, + def load_process_config(self, config_source:Union["RooProcessConfig", str], + cache:bool=True, + multithread:bool=True, + use_template:bool=False, + backend:Optional[str]=None, + backend_options:Optional[Dict]=None): + from quickstats.components import RooProcessor + self.processor = RooProcessor(config_source, cache=cache, use_template=use_template, multithread=multithread, + backend=backend, + backend_options=backend_options, verbosity=self.stdout.verbosity) - self.path_manager.set_file("processor_config", os.path.abspath(config_path)) + if isinstance(config_source, str): + self.path_manager.set_file("process_config", os.path.abspath(config_source)) def get_validated_syst_themes(self, syst_themes:Optional[List[str]]=None): if syst_themes is None: @@ -354,7 +362,7 @@ class NTupleProcessTool(ConfigurableObject): samples:Optional[List[str]]=None, sample_types:Optional[List[str]]=None): if self.processor is None: - raise RuntimeError("processor not initialized (probably missing a processor config)") + raise RuntimeError("processor not initialized (probably missing a process config)") paths = self.get_selected_paths(syst_themes=syst_themes, samples=samples, sample_types=sample_types) @@ -362,7 +370,7 @@ class NTupleProcessTool(ConfigurableObject): syst_names = self.get_validated_syst_names(syst_names) for syst_theme in paths: for sample in paths[syst_theme]: - self.processor.set_flags(self.processor_flags) + self.processor.set_flags(self.process_flags) for sample_type in paths[syst_theme][sample]: sample_paths = paths[syst_theme][sample][sample_type] sample_config = { @@ -383,10 +391,14 @@ class NTupleProcessTool(ConfigurableObject): self.processor.run(expended_paths) self.processor.clear_global_variables() - def process_samples(self, samples:Optional[List[str]]=None, - sample_types:Optional[List[str]]=None): + def process_samples(self, samples:Optional[Union[List[str], str]]=None, + sample_types:Optional[Union[List[str], str]]=None): + if isinstance(samples, str): + samples = [samples] + if isinstance(sample_types, str): + sample_types = [sample_types] if self.processor is None: - raise RuntimeError("processor not initialized (probably missing a processor config)") + raise RuntimeError("processor not initialized (probably missing a process config)") paths = self.get_selected_paths(syst_themes=['Nominal'], samples=samples, sample_types=sample_types) @@ -395,7 +407,7 @@ class NTupleProcessTool(ConfigurableObject): paths = paths['Nominal'] outdir = self.path_manager.base_path for sample in paths: - self.processor.set_flags(self.processor_flags) + self.processor.set_flags(self.process_flags) for sample_type in paths[sample]: sample_paths = paths[sample][sample_type] sample_config = { diff --git a/quickstats/components/__init__.py b/quickstats/components/__init__.py index acece3cdb025e0c745087e97680afd90b2b0606b..f2b2513a95ab93512a3a45f1208dd7e5339b6c09 100644 --- a/quickstats/components/__init__.py +++ b/quickstats/components/__init__.py @@ -17,7 +17,7 @@ from .nuisance_parameter_pull import NuisanceParameterPull from .nuisance_parameter_ranking import NuisanceParameterRanking from .nuisance_parameter_harmonizer import NuisanceParameterHarmonizer from .caching_nll_wrapper import CachingNLLWrapper -from .processors import RooProcessor +from .processors import RooProcessor, RooProcessConfig #from .signal_modelling import SignalModelling import ROOT diff --git a/quickstats/components/processors/__init__.py b/quickstats/components/processors/__init__.py index 486d836528f623a0809c7bbde152cc36ef6f0a9e..21829eed9c646a8c9a2a1ec7d2daaa1e0b307cff 100644 --- a/quickstats/components/processors/__init__.py +++ b/quickstats/components/processors/__init__.py @@ -1,3 +1,3 @@ -from quickstats.components.processors.actions import * -from quickstats.components.processors.roo_config_parser import RooConfigParser -from quickstats.components.processors.roo_processor import RooProcessor \ No newline at end of file +from .actions import * +from .roo_process_config import RooProcessConfig +from .roo_processor import RooProcessor \ No newline at end of file diff --git a/quickstats/components/processors/actions/__init__.py b/quickstats/components/processors/actions/__init__.py index 0e24c232dba0cba25efca4adb2a42c6b227d2719..2db142f8072d158aab7d03f06b67eb754066d682 100644 --- a/quickstats/components/processors/actions/__init__.py +++ b/quickstats/components/processors/actions/__init__.py @@ -1,3 +1,4 @@ +from .auxiliary import * from .rooproc_base_action import RooProcBaseAction from .rooproc_rdf_action import RooProcRDFAction from .rooproc_helper_action import RooProcHelperAction @@ -27,34 +28,4 @@ from .rooproc_if_not_defined import RooProcIfNotDefined from .rooproc_load_macro import RooProcLoadMacro from .rooproc_as_parquet import RooProcAsParquet from .rooproc_as_hdf import RooProcAsHDF -from .auxiliary import * - -ACTION_MAP = { - "TREENAME": RooProcTreeName, - "DECLARE": RooProcDeclare, - "GLOBAL": RooProcGlobalVariables, - "ALIAS": RooProcAlias, - "SAFEALIAS": RooProcSafeAlias, - "DEFINE": RooProcDefine, - "SAFEDEFINE": RooProcSafeDefine, - "REDEFINE": RooProcRedefine, - "FILTER": RooProcFilter, - "GETSUM": RooProcSum, - "GETMAX": RooProcMax, - "GETMIN": RooProcMin, - "GETMEAN": RooProcMean, - "SAVE": RooProcSave, - "REPORT": RooProcReport, - "EXPORT": RooProcExport, - "SAVE_FRAME": RooProcSaveFrame, - "LOAD_FRAME": RooProcLoadFrame, - "AS_NUMPY": RooProcAsNumpy, - "IFDEF": RooProcIfDefined, - "IFNDEF": RooProcIfNotDefined, - "LOAD_MACRO": RooProcLoadMacro, - "AS_PARQUET": RooProcAsParquet, - "AS_HDF": RooProcAsHDF -} - -def get_action(action_name:str): - return ACTION_MAP.get(action_name, None) \ No newline at end of file +from .rooproc_progressbar import RooProcProgressBar \ No newline at end of file diff --git a/quickstats/components/processors/actions/auxiliary.py b/quickstats/components/processors/actions/auxiliary.py index 914ccd74c5f59f3e4bd5be4885f25b7f518a946f..727b0fd1ba0b50693b2f85b8479afe0f911343b9 100644 --- a/quickstats/components/processors/actions/auxiliary.py +++ b/quickstats/components/processors/actions/auxiliary.py @@ -2,4 +2,15 @@ from quickstats import GeneralEnum class RooProcReturnCode(GeneralEnum): NORMAL = 0 - SKIP_CHILD = 1 \ No newline at end of file + SKIP_CHILD = 1 + +ACTION_MAP = dict() + +def get_action(action_name:str): + return ACTION_MAP.get(action_name, None) + +def register_action(action_cls): + name = action_cls.NAME + if (name is not None) and (name not in ACTION_MAP): + ACTION_MAP[name] = action_cls + return action_cls \ No newline at end of file diff --git a/quickstats/components/processors/actions/formatter.py b/quickstats/components/processors/actions/formatter.py new file mode 100644 index 0000000000000000000000000000000000000000..eed7656ced878c4e6b1cfda4b1ff1cc9cfad055b --- /dev/null +++ b/quickstats/components/processors/actions/formatter.py @@ -0,0 +1,11 @@ +import re + +from quickstats.utils.string_utils import split_str + +ListRegex = re.compile(r"\[([^\[\]]+)\]") + +def ListFormatter(text:str): + match = ListRegex.match(text) + if not match: + return [text] + return split_str(match.group(1), sep=',', strip=True, remove_empty=True) \ No newline at end of file diff --git a/quickstats/components/processors/actions/rooproc_alias.py b/quickstats/components/processors/actions/rooproc_alias.py index 66a8e94d0d14c3672fe67295a2bcd9788db8db1a..38dc36230f99f6fbb7955059f4b2e4ab66778b31 100644 --- a/quickstats/components/processors/actions/rooproc_alias.py +++ b/quickstats/components/processors/actions/rooproc_alias.py @@ -2,9 +2,13 @@ from typing import Optional import re from .rooproc_rdf_action import RooProcRDFAction +from .auxiliary import register_action +@register_action class RooProcAlias(RooProcRDFAction): + NAME = "ALIAS" + def __init__(self, alias:str, column_name:str): super().__init__(alias=alias, column_name=column_name) diff --git a/quickstats/components/processors/actions/rooproc_as_hdf.py b/quickstats/components/processors/actions/rooproc_as_hdf.py index c296f5b620f74de17ecfeb6dc4d52070e565105e..4161af18bcd6d44f77000822617f9093ce39276d 100644 --- a/quickstats/components/processors/actions/rooproc_as_hdf.py +++ b/quickstats/components/processors/actions/rooproc_as_hdf.py @@ -3,16 +3,20 @@ from typing import Optional, List import numpy as np from .rooproc_output_action import RooProcOutputAction +from .auxiliary import register_action from quickstats import module_exist from quickstats.utils.common_utils import is_valid_file from quickstats.utils.data_conversion import ConversionMode from quickstats.interface.root import RDataFrameBackend +@register_action class RooProcAsHDF(RooProcOutputAction): + NAME = "AS_HDF" + def __init__(self, filename:str, key:str, - columns:Optional[List[str]]=None): + columns:Optional[List[str]]): super().__init__(filename=filename, columns=columns, key=key) @@ -21,20 +25,21 @@ class RooProcAsHDF(RooProcOutputAction): filename = params['filename'] key = params['key'] if processor.cache and is_valid_file(filename): - processor.stdout.info(f"Cached output `{filename}`.") + processor.stdout.info(f'Cached output from "{filename}".') return rdf, processor - processor.stdout.info(f'Saving output "{filename}".') + processor.stdout.info(f'Writing output to "{filename}".') import awkward as ak import pandas as pd columns = params.get('columns', None) columns = self.get_valid_columns(rdf, processor, columns=columns, mode=ConversionMode.REMOVE_NON_STANDARD_TYPE) array = None - # NB: RDF Dask/Spark does not support GetColumnType yet - if (module_exist('awkward') and \ - (processor.backend not in [RDataFrameBackend.DASK, RDataFrameBackend.SPARK])): + if module_exist('awkward'): try: import awkward as ak + # NB: RDF Dask/Spark does not support GetColumnType yet + if processor.backend in [RDataFrameBackend.DASK, RDataFrameBackend.SPARK]: + rdf.GetColumnType = rdf._headnode._localdf.GetColumnType array = ak.from_rdataframe(rdf, columns=columns) array = ak.to_numpy(array) except: @@ -44,5 +49,6 @@ class RooProcAsHDF(RooProcOutputAction): if array is None: array = rdf.AsNumpy(columns) df = pd.DataFrame(array) + self.makedirs(filename) df.to_hdf(filename, key=key) return rdf, processor \ No newline at end of file diff --git a/quickstats/components/processors/actions/rooproc_as_numpy.py b/quickstats/components/processors/actions/rooproc_as_numpy.py index 64343474df28ce500f18dd0107265fe0dec1efaf..d65620ba5f7aa493ca621c61583cca92952ae56c 100644 --- a/quickstats/components/processors/actions/rooproc_as_numpy.py +++ b/quickstats/components/processors/actions/rooproc_as_numpy.py @@ -3,29 +3,34 @@ from typing import Optional, List import numpy as np from .rooproc_output_action import RooProcOutputAction +from .auxiliary import register_action from quickstats import module_exist from quickstats.utils.common_utils import is_valid_file from quickstats.utils.data_conversion import ConversionMode from quickstats.interface.root import RDataFrameBackend +@register_action class RooProcAsNumpy(RooProcOutputAction): + NAME = "AS_NUMPY" + def _execute(self, rdf:"ROOT.RDataFrame", processor:"quickstats.RooProcessor", **params): filename = params['filename'] if processor.cache and is_valid_file(filename): - processor.stdout.info(f"INFO: Cached output `{filename}`.") + processor.stdout.info(f'Cached output from "{filename}".') return rdf, processor - processor.stdout.info(f'Saving output "{filename}".') + processor.stdout.info(f'Writing output to "{filename}".') columns = params.get('columns', None) columns = self.get_valid_columns(rdf, processor, columns=columns, mode=ConversionMode.REMOVE_NON_STANDARD_TYPE) array = None - # NB: RDF Dask/Spark does not support GetColumnType yet - if (module_exist('awkward') and \ - (processor.backend not in [RDataFrameBackend.DASK, RDataFrameBackend.SPARK])): + if module_exist('awkward'): try: import awkward as ak + # NB: RDF Dask/Spark does not support GetColumnType yet + if processor.backend in [RDataFrameBackend.DASK, RDataFrameBackend.SPARK]: + rdf.GetColumnType = rdf._headnode._localdf.GetColumnType array = ak.from_rdataframe(rdf, columns=columns) array = ak.to_numpy(array) except: @@ -34,5 +39,6 @@ class RooProcAsNumpy(RooProcOutputAction): "Falling back to use ROOT instead") if array is None: array = rdf.AsNumpy(columns) + self.makedirs(filename) np.save(filename, array) return rdf, processor \ No newline at end of file diff --git a/quickstats/components/processors/actions/rooproc_as_parquet.py b/quickstats/components/processors/actions/rooproc_as_parquet.py index 7beb87122bd22a65e00d672bf95c31e108ad4c88..b627370ee3ebbca35229f954e351901f61c8754f 100644 --- a/quickstats/components/processors/actions/rooproc_as_parquet.py +++ b/quickstats/components/processors/actions/rooproc_as_parquet.py @@ -3,27 +3,34 @@ from typing import Optional, List import numpy as np from .rooproc_output_action import RooProcOutputAction +from .auxiliary import register_action from quickstats.utils.common_utils import is_valid_file from quickstats.utils.data_conversion import ConversionMode from quickstats.interface.root import RDataFrameBackend +@register_action class RooProcAsParquet(RooProcOutputAction): + + NAME = "AS_PARQUET" def _execute(self, rdf:"ROOT.RDataFrame", processor:"quickstats.RooProcessor", **params): filename = params['filename'] if processor.cache and is_valid_file(filename): - processor.stdout.info(f"Cached output `{filename}`.") + processor.stdout.info(f'Cached output from "{filename}".') return rdf, processor - processor.stdout.info(f'Saving output "{filename}".') + processor.stdout.info(f'Writing output to "{filename}".') columns = params.get('columns', None) columns = self.get_valid_columns(rdf, processor, columns=columns, mode=ConversionMode.REMOVE_NON_STANDARD_TYPE) import awkward as ak - # NB: RDF Dask/Spark does not support GetColumnType yet - if processor.backend not in [RDataFrameBackend.DASK, RDataFrameBackend.SPARK]: + try: + # NB: RDF Dask/Spark does not support GetColumnType yet + if processor.backend in [RDataFrameBackend.DASK, RDataFrameBackend.SPARK]: + rdf.GetColumnType = rdf._headnode._localdf.GetColumnType array = ak.from_rdataframe(rdf, columns=columns) - else: + except: array = ak.Array(rdf.AsNumpy(columns)) + self.makedirs(filename) ak.to_parquet(array, filename) return rdf, processor \ No newline at end of file diff --git a/quickstats/components/processors/actions/rooproc_base_action.py b/quickstats/components/processors/actions/rooproc_base_action.py index 8b95d205bdd0d874c8b752211d1f978b38bdd93c..8b7aa2063c167d7c0c812e6982d129d9d12fe133 100644 --- a/quickstats/components/processors/actions/rooproc_base_action.py +++ b/quickstats/components/processors/actions/rooproc_base_action.py @@ -2,8 +2,13 @@ from typing import Optional, List, Dict import os import re +from quickstats.utils.py_utils import get_required_args + class RooProcBaseAction(object): - + + NAME = None + PARAM_FORMATS = {} + def __init__(self, **params): self._params = params self.executed = False @@ -12,6 +17,10 @@ class RooProcBaseAction(object): @staticmethod def allow_multiline(): return False + + @staticmethod + def has_global_var(text:str): + return re.search(r"\${(\w+)}", text) is not None def get_formatted_parameters(self, global_vars:Optional[Dict]=None): if global_vars is None: @@ -40,6 +49,10 @@ class RooProcBaseAction(object): if is_list: v = v.split("__SEPARATOR__") formatted_parameters[k] = v + for key, value in formatted_parameters.items(): + if key in self.PARAM_FORMATS: + formatter = self.PARAM_FORMATS[key] + formatted_parameters[key] = formatter(value) return formatted_parameters def makedirs(self, filename:str): @@ -69,8 +82,22 @@ class RooProcBaseAction(object): attributes = re.findall(r"(\w+)=([^,]+)", text) for attribute in attributes: kwargs[attribute[0]] = attribute[1] + for key, value in kwargs.items(): + if (not cls.has_global_var(value)) and (key in cls.PARAM_FORMATS): + formatter = cls.PARAM_FORMATS[key] + kwargs[key] = formatter(value) return kwargs @classmethod def parse(cls, main_text:str, block_text:Optional[str]=None): - raise NotImplementedError \ No newline at end of file + return cls() + + @classmethod + def _try_create(cls, **kwargs): + try: + return cls(**kwargs) + except Exception: + argnames = get_required_args(cls) + missing_argnames = list(set(argnames) - set(kwargs)) + raise ValueError(f'missing keyword argument(s) for the action "{cls.NAME}": ' + f'{", ".join(missing_argnames)}') \ No newline at end of file diff --git a/quickstats/components/processors/actions/rooproc_declare.py b/quickstats/components/processors/actions/rooproc_declare.py index fa81a5a4c4f3db1f2864412a165dedfa366b0dfa..821d6d740ffd018e8a1a4623b21a476644c766cc 100644 --- a/quickstats/components/processors/actions/rooproc_declare.py +++ b/quickstats/components/processors/actions/rooproc_declare.py @@ -1,11 +1,15 @@ from typing import Optional from .rooproc_helper_action import RooProcHelperAction +from .auxiliary import register_action from quickstats.utils.root_utils import declare_expression +@register_action class RooProcDeclare(RooProcHelperAction): + NAME = "DECLARE" + def __init__(self, expression:str, name:Optional[str]=None): super().__init__(expression=expression, name=name) diff --git a/quickstats/components/processors/actions/rooproc_define.py b/quickstats/components/processors/actions/rooproc_define.py index a9e46023c3ee36e1a4fcfa843f760e3a622097d0..15ff70f12a2de3f7816bc92c0c2610231b56ae3b 100644 --- a/quickstats/components/processors/actions/rooproc_define.py +++ b/quickstats/components/processors/actions/rooproc_define.py @@ -2,9 +2,13 @@ from typing import Optional import re from .rooproc_rdf_action import RooProcRDFAction +from .auxiliary import register_action +@register_action class RooProcDefine(RooProcRDFAction): + NAME = "DEFINE" + def __init__(self, name:str, expression:str): super().__init__(name=name, expression=expression) diff --git a/quickstats/components/processors/actions/rooproc_export.py b/quickstats/components/processors/actions/rooproc_export.py index c8a1565ac4394e4106eaf0a441f50c2d30a5e644..9586b7aa7a1c010cb0ee1d1e8d01568c83c3358e 100644 --- a/quickstats/components/processors/actions/rooproc_export.py +++ b/quickstats/components/processors/actions/rooproc_export.py @@ -3,10 +3,15 @@ import os import json from .rooproc_helper_action import RooProcHelperAction +from .auxiliary import register_action from quickstats.utils.common_utils import is_valid_file +@register_action class RooProcExport(RooProcHelperAction): + + NAME = "EXPORT" + def __init__(self, filename:str): super().__init__(filename=filename) @@ -18,13 +23,13 @@ class RooProcExport(RooProcHelperAction): def _execute(self, processor:"quickstats.RooProcessor", **params): filename = params['filename'] if processor.cache and is_valid_file(filename): - processor.stdout.info(f"INFO: Cached output `{filename}`.") + processor.stdout.info(f'Cached output "{filename}".') return processor data = {k:v.GetValue() for k,v in processor.external_variables.items()} dirname = os.path.dirname(filename) if dirname and (not os.path.exists(dirname)): os.makedirs(dirname) with open(filename, 'w') as outfile: - processor.stdout.info(f'INFO: Writing auxiliary data to "{filename}".') + processor.stdout.info(f'Writing auxiliary data to "{filename}".') json.dump(data, outfile, indent=2) return processor \ No newline at end of file diff --git a/quickstats/components/processors/actions/rooproc_filter.py b/quickstats/components/processors/actions/rooproc_filter.py index 06bfcd275826eacceb839bb065e7a57b3e8b5aaf..8a0ef4e775452a65d2e8bad93df2b8a9a133df58 100644 --- a/quickstats/components/processors/actions/rooproc_filter.py +++ b/quickstats/components/processors/actions/rooproc_filter.py @@ -2,9 +2,13 @@ from typing import Optional import re from .rooproc_rdf_action import RooProcRDFAction +from .auxiliary import register_action +@register_action class RooProcFilter(RooProcRDFAction): + NAME = "FILTER" + def __init__(self, expression:str, name:Optional[str]=None): super().__init__(expression=expression, name=name) diff --git a/quickstats/components/processors/actions/rooproc_global_variables.py b/quickstats/components/processors/actions/rooproc_global_variables.py index 4b1c7abbb03784ba1539779f266406cd23f7f90c..0d11b7757e630b487ef1c41f5c7bf64374df5e5e 100644 --- a/quickstats/components/processors/actions/rooproc_global_variables.py +++ b/quickstats/components/processors/actions/rooproc_global_variables.py @@ -2,9 +2,13 @@ from typing import Optional, List, Dict import re from .rooproc_helper_action import RooProcHelperAction +from .auxiliary import register_action +@register_action class RooProcGlobalVariables(RooProcHelperAction): + NAME = "GLOBAL" + def __init__(self, **kwargs): super().__init__(**kwargs) diff --git a/quickstats/components/processors/actions/rooproc_if_defined.py b/quickstats/components/processors/actions/rooproc_if_defined.py index f9dc105ccd9161f7258c35cb107e76fa81b6402d..9d0e8772c30d033c9c2c28d47377fc754b898f25 100644 --- a/quickstats/components/processors/actions/rooproc_if_defined.py +++ b/quickstats/components/processors/actions/rooproc_if_defined.py @@ -1,10 +1,13 @@ from typing import List from .rooproc_nested_action import RooProcBaseAction, RooProcNestedAction -from .auxiliary import RooProcReturnCode +from .auxiliary import RooProcReturnCode, register_action +@register_action class RooProcIfDefined(RooProcNestedAction): + NAME = "IFDEF" + def __init__(self, flag:str): super().__init__(flag=flag) diff --git a/quickstats/components/processors/actions/rooproc_if_not_defined.py b/quickstats/components/processors/actions/rooproc_if_not_defined.py index 98e6d0d1c4bb6ebdf7ea121bd355790bd1e75059..eb4a9a1488041bc10dfb148a86ad6bf3915cedb6 100644 --- a/quickstats/components/processors/actions/rooproc_if_not_defined.py +++ b/quickstats/components/processors/actions/rooproc_if_not_defined.py @@ -1,10 +1,13 @@ from typing import List from .rooproc_nested_action import RooProcBaseAction, RooProcNestedAction -from .auxiliary import RooProcReturnCode +from .auxiliary import RooProcReturnCode, register_action +@register_action class RooProcIfNotDefined(RooProcNestedAction): + NAME = "IFNDEF" + def __init__(self, flag:str): super().__init__(flag=flag) diff --git a/quickstats/components/processors/actions/rooproc_load_frame.py b/quickstats/components/processors/actions/rooproc_load_frame.py index 58d6bd62ba3262705b8e94b9c0467773a4f613eb..a667706a65ef93d368d9ae2ba7f146746dec73aa 100644 --- a/quickstats/components/processors/actions/rooproc_load_frame.py +++ b/quickstats/components/processors/actions/rooproc_load_frame.py @@ -1,8 +1,13 @@ from typing import Optional from .rooproc_helper_action import RooProcHelperAction +from .auxiliary import register_action +@register_action class RooProcLoadFrame(RooProcHelperAction): + + NAME = "LOAD_FRAME" + def __init__(self, name:str): super().__init__(name=name) diff --git a/quickstats/components/processors/actions/rooproc_load_macro.py b/quickstats/components/processors/actions/rooproc_load_macro.py index 75749bb70768538b8760848bd0e4f365a0203cae..0a96faa1ac08ef7d0c4cba8c07370b245fc61984 100644 --- a/quickstats/components/processors/actions/rooproc_load_macro.py +++ b/quickstats/components/processors/actions/rooproc_load_macro.py @@ -1,8 +1,13 @@ from typing import Optional from .rooproc_helper_action import RooProcHelperAction +from .auxiliary import register_action +@register_action class RooProcLoadMacro(RooProcHelperAction): + + NAME = "LOAD_MACRO" + def __init__(self, name:str): super().__init__(name=name) diff --git a/quickstats/components/processors/actions/rooproc_max.py b/quickstats/components/processors/actions/rooproc_max.py index 21b2953a1670d2b359803b990b8b212181d038a4..62fee19318ac85dcf5561c68c91a9599260c14ad 100644 --- a/quickstats/components/processors/actions/rooproc_max.py +++ b/quickstats/components/processors/actions/rooproc_max.py @@ -1,5 +1,10 @@ from .rooproc_stat import RooProcStat +from .auxiliary import register_action +@register_action class RooProcMax(RooProcStat): + + NAME = "GETMAX" + def _get_func(self, rdf:"ROOT.RDataFrame"): return rdf.Max \ No newline at end of file diff --git a/quickstats/components/processors/actions/rooproc_mean.py b/quickstats/components/processors/actions/rooproc_mean.py index e5e109f58f9b5b3909e4b0c464d586704406a743..4a8e885fdca6a95e256e8b5f1e01bd7c58d64971 100644 --- a/quickstats/components/processors/actions/rooproc_mean.py +++ b/quickstats/components/processors/actions/rooproc_mean.py @@ -1,5 +1,10 @@ from .rooproc_stat import RooProcStat +from .auxiliary import register_action +@register_action class RooProcMean(RooProcStat): + + NAME = "GETMEAN" + def _get_func(self, rdf:"ROOT.RDataFrame"): return rdf.Mean \ No newline at end of file diff --git a/quickstats/components/processors/actions/rooproc_min.py b/quickstats/components/processors/actions/rooproc_min.py index 50c64ac2ecb3395318b6d638315d832c1dd3a593..53f3f011cb6f6ae1a45522b416ab6b5d37d880b1 100644 --- a/quickstats/components/processors/actions/rooproc_min.py +++ b/quickstats/components/processors/actions/rooproc_min.py @@ -1,5 +1,10 @@ from .rooproc_stat import RooProcStat +from .auxiliary import register_action +@register_action class RooProcMin(RooProcStat): + + NAME = "GETMIN" + def _get_func(self, rdf:"ROOT.RDataFrame"): return rdf.Min \ No newline at end of file diff --git a/quickstats/components/processors/actions/rooproc_output_action.py b/quickstats/components/processors/actions/rooproc_output_action.py index 23e1e1c27a8a26d6c9f068aa14918af2d7385cab..611531125d02357bc3d00c2647ad834bfe76b647 100644 --- a/quickstats/components/processors/actions/rooproc_output_action.py +++ b/quickstats/components/processors/actions/rooproc_output_action.py @@ -4,6 +4,7 @@ import fnmatch import numpy as np from .rooproc_hybrid_action import RooProcHybridAction +from .formatter import ListFormatter from quickstats.interface.root import RDataFrameBackend from quickstats.utils.common_utils import is_valid_file @@ -11,8 +12,12 @@ from quickstats.utils.data_conversion import root_datatypes, get_rdf_column_type class RooProcOutputAction(RooProcHybridAction): + PARAM_FORMATS = { + 'columns': ListFormatter + } + def __init__(self, filename:str, - columns:Optional[List[str]]=None, + columns:Optional[List[str]], **kwargs): super().__init__(filename=filename, columns=columns, @@ -21,12 +26,10 @@ class RooProcOutputAction(RooProcHybridAction): @classmethod def parse(cls, main_text:str, block_text:Optional[str]=None): kwargs = cls.parse_as_kwargs(main_text) - return cls(**kwargs) + return cls._try_create(**kwargs) def get_valid_columns(self, rdf, processor, columns:Optional[List[str]]=None, mode:ConversionMode=ConversionMode.REMOVE_NON_STANDARD_TYPE): - if processor.backend != RDataFrameBackend.DEFAULT: - return columns all_columns = list([str(col) for col in rdf.GetColumnNames()]) if columns is None: columns = all_columns @@ -38,7 +41,7 @@ class RooProcOutputAction(RooProcHybridAction): if not matched_columns: processor.stdout.warning(f'No columns matching the expression "{column}". ' 'It will be excluded from the output') - columns.extend(matched_columns) + columns_.extend(matched_columns) elif column not in all_columns: processor.stdout.warning(f'Column "{column}" does not exist. ' 'It will be excluded from the output') @@ -56,7 +59,7 @@ class RooProcOutputAction(RooProcHybridAction): removed_columns = np.setdiff1d(columns, new_columns) if len(removed_columns) > 0: col_str = ", ".join(removed_columns) - processor.stdout.warning("The following columns will be excluded from the output as they have " + processor.stdout.warning("The following column(s) will be excluded from the output as they have " f"data types incompatible with the output format: {col_str}") columns = new_columns return columns \ No newline at end of file diff --git a/quickstats/components/processors/actions/rooproc_progressbar.py b/quickstats/components/processors/actions/rooproc_progressbar.py new file mode 100644 index 0000000000000000000000000000000000000000..26034fbb24de748622792ede698400dc39627a24 --- /dev/null +++ b/quickstats/components/processors/actions/rooproc_progressbar.py @@ -0,0 +1,23 @@ +from typing import Optional +import re + +from quickstats.utils.common_utils import in_notebook +from .rooproc_hybrid_action import RooProcHybridAction +from .auxiliary import register_action + +@register_action +class RooProcProgressBar(RooProcHybridAction): + + NAME = "PROGRESSBAR" + + def _execute(self, rdf:"ROOT.RDataFrame", processor, **params): + import ROOT + if not isinstance(rdf, ROOT.RDataFrame): + rdf_next = ROOT.RDF.AsRNode(rdf) + else: + rdf_next = rdf + if in_notebook(): + processor.stdout.warning("ProgressBar does not work properly inside jupyter. Disabling for now.") + else: + ROOT.RDF.Experimental.AddProgressBar(rdf_next) + return rdf_next, processor \ No newline at end of file diff --git a/quickstats/components/processors/actions/rooproc_redefine.py b/quickstats/components/processors/actions/rooproc_redefine.py index 8724b958c9f7e4fa5c73f2237edc621fc84405ae..ea97eb1b759ed48dc0c5f02ad86f3dada9fb9a60 100644 --- a/quickstats/components/processors/actions/rooproc_redefine.py +++ b/quickstats/components/processors/actions/rooproc_redefine.py @@ -1,7 +1,11 @@ from .rooproc_define import RooProcDefine +from .auxiliary import register_action +@register_action class RooProcRedefine(RooProcDefine): + NAME = "REDEFINE" + def _execute(self, rdf, **params): name = params['name'] expression = params['expression'] diff --git a/quickstats/components/processors/actions/rooproc_report.py b/quickstats/components/processors/actions/rooproc_report.py index 2fe7abdd9b1eb67f8fa207e9c3bb55ddac46c7e6..d7a60d721e15d4e7c39b68bb9a1fcb72ff3b68c8 100644 --- a/quickstats/components/processors/actions/rooproc_report.py +++ b/quickstats/components/processors/actions/rooproc_report.py @@ -3,10 +3,15 @@ from typing import Optional import pandas as pd from .rooproc_hybrid_action import RooProcHybridAction +from .auxiliary import register_action from quickstats.utils.common_utils import is_valid_file +@register_action class RooProcReport(RooProcHybridAction): + + NAME = "REPORT" + def __init__(self, display:bool=False, filename:Optional[str]=None): super().__init__(display=display, filename=filename) @@ -19,7 +24,7 @@ class RooProcReport(RooProcHybridAction): display = params['display'] filename = params['filename'] if processor.cache and is_valid_file(filename): - processor.stdout.info(f"INFO: Cached output `{filename}`.") + processor.stdout.info(f'Cached output "{filename}".') return rdf, processor cut_report = rdf.Report() result = [] @@ -38,6 +43,6 @@ class RooProcReport(RooProcHybridAction): processor.stdout.info(f'Cutflow Table\n{df}') if filename is not None: self.makedirs(filename) - processor.stdout.info(f'INFO: Writing cutflow report to "{filename}".') + processor.stdout.info(f'Writing cutflow report to "{filename}".') df.to_csv(filename, index=False) return rdf, processor \ No newline at end of file diff --git a/quickstats/components/processors/actions/rooproc_safe_alias.py b/quickstats/components/processors/actions/rooproc_safe_alias.py index a9b891c86067dd11151c6f24861590d16b458b84..9c50e043330a4c5629b7f5834a38cf4d6e62031e 100644 --- a/quickstats/components/processors/actions/rooproc_safe_alias.py +++ b/quickstats/components/processors/actions/rooproc_safe_alias.py @@ -2,9 +2,13 @@ from typing import Optional import re from .rooproc_hybrid_action import RooProcHybridAction +from .auxiliary import register_action +@register_action class RooProcSafeAlias(RooProcHybridAction): + NAME = "SAFEALIAS" + def __init__(self, alias:str, column_name:str): super().__init__(alias=alias, column_name=column_name) diff --git a/quickstats/components/processors/actions/rooproc_safe_define.py b/quickstats/components/processors/actions/rooproc_safe_define.py index fa42571ab480a1eba195e74dee181a63f07225ee..305623efcbc6e46f00902e16645062de4e10a595 100644 --- a/quickstats/components/processors/actions/rooproc_safe_define.py +++ b/quickstats/components/processors/actions/rooproc_safe_define.py @@ -1,6 +1,10 @@ from .rooproc_define import RooProcDefine +from .auxiliary import register_action +@register_action class RooProcSafeDefine(RooProcDefine): + + NAME = "SAFEDEFINE" def _execute(self, rdf:"ROOT.RDataFrame", **params): name = params['name'] diff --git a/quickstats/components/processors/actions/rooproc_save.py b/quickstats/components/processors/actions/rooproc_save.py index b7164798611fbdaac3d1bcc23d738bf421b49e75..ffb8f19c8249f6ca827dde6b33a10f88707138eb 100644 --- a/quickstats/components/processors/actions/rooproc_save.py +++ b/quickstats/components/processors/actions/rooproc_save.py @@ -2,11 +2,15 @@ from typing import Optional, List import fnmatch from .rooproc_hybrid_action import RooProcHybridAction +from .auxiliary import register_action from quickstats.utils.common_utils import is_valid_file, filter_by_wildcards +@register_action class RooProcSave(RooProcHybridAction): + NAME = "SAVE" + def __init__(self, treename:str, filename:str, columns:Optional[List[str]]=None, exclude:Optional[List[str]]=None, @@ -41,10 +45,10 @@ class RooProcSave(RooProcHybridAction): save_columns = filter_by_wildcards(all_columns, columns) save_columns = filter_by_wildcards(save_columns, exclude, exclusion=True) save_columns = list(set(save_columns)) + processor.stdout.info(f'Writing output to "{filename}".') if processor.use_template: from quickstats.utils.root_utils import templated_rdf_snapshot rdf_next = templated_rdf_snapshot(rdf, save_columns)(treename, filename, save_columns) else: rdf_next = rdf.Snapshot(treename, filename, save_columns) - processor.stdout.info(f'Writing output to "{filename}".') return rdf_next, processor \ No newline at end of file diff --git a/quickstats/components/processors/actions/rooproc_save_frame.py b/quickstats/components/processors/actions/rooproc_save_frame.py index cb6dfb0c64c13bda0fe9a9c576216d0fbba2b4d1..b15ab00551dd61730d7525cab30ee6f893d844f7 100644 --- a/quickstats/components/processors/actions/rooproc_save_frame.py +++ b/quickstats/components/processors/actions/rooproc_save_frame.py @@ -1,8 +1,13 @@ from typing import Optional from .rooproc_helper_action import RooProcHelperAction +from .auxiliary import register_action +@register_action class RooProcSaveFrame(RooProcHelperAction): + + NAME = "SAVE_FRAME" + def __init__(self, name:str): super().__init__(name=name) diff --git a/quickstats/components/processors/actions/rooproc_sum.py b/quickstats/components/processors/actions/rooproc_sum.py index 3537359e5d0ba04ad0275c74bf6b603e9bfccd4a..a4a988fb22eaddceaac5d2c7f9a63a1aa0727af0 100644 --- a/quickstats/components/processors/actions/rooproc_sum.py +++ b/quickstats/components/processors/actions/rooproc_sum.py @@ -1,5 +1,10 @@ from .rooproc_stat import RooProcStat +from .auxiliary import register_action +@register_action class RooProcSum(RooProcStat): + + NAME = "GETSUM" + def _get_func(self, rdf:"ROOT.RDataFrame"): return rdf.Sum \ No newline at end of file diff --git a/quickstats/components/processors/actions/rooproc_treename.py b/quickstats/components/processors/actions/rooproc_treename.py index 62d7c0e7aac8e21872ca0abf99a18bf0ba5e0c62..659ba881ab5528d873ac8a16550659c9e919dc26 100644 --- a/quickstats/components/processors/actions/rooproc_treename.py +++ b/quickstats/components/processors/actions/rooproc_treename.py @@ -1,9 +1,13 @@ from typing import Optional from .rooproc_helper_action import RooProcHelperAction +from .auxiliary import register_action +@register_action class RooProcTreeName(RooProcHelperAction): + NAME = "TREENAME" + def __init__(self, treename:str): super().__init__(treename=treename) diff --git a/quickstats/components/processors/roo_config_parser.py b/quickstats/components/processors/roo_process_config.py similarity index 64% rename from quickstats/components/processors/roo_config_parser.py rename to quickstats/components/processors/roo_process_config.py index 15ccae86e0e8e00f1d3e5eeb2116ea17ce084ea1..51c44c4a4d02e05416b165e62771de600946d8bf 100644 --- a/quickstats/components/processors/roo_config_parser.py +++ b/quickstats/components/processors/roo_process_config.py @@ -1,4 +1,5 @@ from typing import List, Optional +import os import re from quickstats import semistaticmethod, TVirtualNode, TVirtualTree, stdout @@ -25,10 +26,22 @@ class ActionNode(TVirtualNode): parent=parent, **data) self.action = None + def get_context(self): + source = self.try_get_data("source", None) + line_number = self.try_get_data("start_line_number", None) + if source and line_number: + context = f" (Line {line_number}, Source {source})" + else: + context = "" + return context + def construct_action(self, rdf_backend:Optional[str]=None): action_cls = _get_action(self.name, rdf_backend=rdf_backend) if action_cls is None: - raise RuntimeError(f'unknown action "{self.name}"') + context = self.get_context() + raise RuntimeError(f'Unknown action "{self.name}"{context}. ' + f'Make sure to use start and end tags <Action> </Action> ' + 'to enclose a multiline block.') main_text = self.get_data("main_text") block_text = self.get_data("block_text") action = action_cls.parse(main_text=main_text, block_text=block_text) @@ -42,11 +55,16 @@ class ActionTree(TVirtualTree): self.reset() node = self.get_next() while node is not None: - node.construct_action(rdf_backend=rdf_backend) + try: + node.construct_action(rdf_backend=rdf_backend) + except Exception as e: + context = node.get_context() + raise RuntimeError("failed to construct action for the instruction " + f"{node.name}{context}. Error message: {e}") from e node = self.get_next() self.reset() - -class RooConfigLine: + +class RooConfigLine(object): def __init__(self, text:str, line_number:int): self.text = text @@ -71,13 +89,39 @@ class RooConfigLine: return result.group(1) return result -class RooConfigParser(object): +class RooProcessConfig(object): - def __init__(self): - pass + def __init__(self, content:Optional[str]=None, file:Optional[str]=None): + self.initialize(content=content, file=file) + + @classmethod + def open(cls, filename:str): + return cls(file=filename) + + def initialize(self, content:Optional[str]=None, file:Optional[str]=None): + if (content is not None) and (file is not None): + raise ValueError('either "content" or "file" should be specified') + if file is not None: + with open(file, "r") as f: + content = f.read() + source = file + else: + source = "text" + self.content = content + self.source = source + + @staticmethod + def _get_iterlines(text:str): + numbered_lines = split_lines(text, comment_string="#", remove_blank=True, with_line_number=True) + clines = [RooConfigLine(line, line_number) for line, line_number in numbered_lines] + clines_iter = iter(clines) + return clines_iter + + def get_iterlines(self): + return self._get_iterlines(self.content) @semistaticmethod - def _get_action_tree(self, clines_iter, action_tree:Optional[ActionTree]=None): + def _get_action_tree(self, clines_iter, action_tree:Optional[ActionTree]=None, source:Optional[str]=None): if action_tree is None: action_tree = ActionTree() cline = next(clines_iter, None) @@ -107,7 +151,8 @@ class RooConfigParser(object): start_line_number=cline.line_number, end_line_number=-1, block_text=block_text, - main_text="") + main_text="", + source=source) action_tree.current_node = child_node elif cline.end_tag: current_node = action_tree.current_node @@ -117,7 +162,9 @@ class RooConfigParser(object): if current_node.name != cline.end_tag: raise RuntimeError(f'Line {cline.line_number}: close tag ' f'"{cline.text}" does not match the start tag ' - f'"{current_node.data["raw_text"]}" (Line {current_node.data["start_line_number"]})') + f'"{current_node.data["raw_text"]}" ' + f'(Line {current_node.data["start_line_number"]}, ' + f'Source {current_node.data["source"]})') current_node.data["end_line_number"] = cline.line_number action_tree.current_node = current_node.parent else: @@ -137,25 +184,22 @@ class RooConfigParser(object): main_text = " ".join(tokens[1:]) else: main_text = None - child_node = action_tree.add_child(action_name, - raw_text=cline.text, - start_line_number=cline.line_number, - end_line_number=cline.line_number, - main_text=main_text, - block_text=None) - return self._get_action_tree(clines_iter, action_tree) - - @semistaticmethod - def parse_file(self, path:str): - with open(path, "r") as f: - text = f.read() - return self.parse_text(text) - - @semistaticmethod - def parse_text(self, text:str): - numbered_lines = split_lines(text, comment_string="#", remove_blank=True, with_line_number=True) - clines = [RooConfigLine(line, line_number) for line, line_number in numbered_lines] - clines_iter = iter(clines) - action_tree = self._get_action_tree(clines_iter) - return action_tree - \ No newline at end of file + if action_name.lower() != "include": + child_node = action_tree.add_child(action_name, + raw_text=cline.text, + start_line_number=cline.line_number, + end_line_number=cline.line_number, + main_text=main_text, + block_text=None, + source=source) + else: + basedir = os.path.dirname(source) + path = os.path.join(basedir, main_text.strip()) + subtree = RooProcessConfig.open(path).get_action_tree() + action_tree.merge(subtree) + return self._get_action_tree(clines_iter, action_tree, source=source) + + def get_action_tree(self): + iterlines = self.get_iterlines() + action_tree = self._get_action_tree(iterlines, source=self.source) + return action_tree \ No newline at end of file diff --git a/quickstats/components/processors/roo_processor.py b/quickstats/components/processors/roo_processor.py index 47265ca118418d5ab5ba8240d6b43dab4963a11a..3c38693dc39de93c31c437a19be77952119ad252 100644 --- a/quickstats/components/processors/roo_processor.py +++ b/quickstats/components/processors/roo_processor.py @@ -7,14 +7,19 @@ import ROOT from .builtin_methods import BUILTIN_METHODS from .actions import * -from .roo_config_parser import RooConfigParser +from .roo_process_config import RooProcessConfig from quickstats import Timer, AbstractObject, PathManager from quickstats.interface.root import TFile, RDataFrame, RDataFrameBackend from quickstats.utils.root_utils import declare_expression, close_all_root_files, set_cachedir class RooProcessor(AbstractObject): - def __init__(self, config_path:Optional[str]=None, + + @property + def distributed(self): + return self.backend != RDataFrameBackend.DEFAULT + + def __init__(self, config_source:Optional[Union[RooProcessConfig, str]]=None, config_text:Optional[str]=None, flags:Optional[List[str]]=None, backend:Optional[str]=None, @@ -37,7 +42,10 @@ class RooProcessor(AbstractObject): self.default_treename = None self.use_template = use_template self.multithread = multithread - self.backend = RDataFrameBackend.parse(backend) + if backend is None: + self.backend = RDataFrameBackend.DEFAULT + else: + self.backend = RDataFrameBackend.parse(backend) self.backend_options = backend_options self.load_buildin_functions() @@ -45,10 +53,8 @@ class RooProcessor(AbstractObject): if multithread: ROOT.EnableImplicitMT() - if config_path is not None: - self.load_config(config_path=config_path) - elif config_text is not None: - self.load_config(config_text=config_text) + if config_source is not None: + self.load_config(config_source) def set_cache(self, cache:bool=True): self.cache = cache @@ -63,20 +69,20 @@ class RooProcessor(AbstractObject): Internal = ROOT.Internal except: Internal = None + distributed = self.distributed for name, definition in BUILTIN_METHODS.items(): - declare_expression(definition, name) + declare_expression(definition, name, distributed=distributed) if Internal is not None: if Internal != ROOT.Internal: ROOT.Internal = Internal - def load_config(self, config_path:Optional[str]=None, - config_text:Optional[str]=None): - if config_path is not None: - action_tree = RooConfigParser.parse_file(config_path) - elif config_text is not None: - action_tree = RooConfigParser.parse_text(config_text) + def load_config(self, config_source:Union[RooProcessConfig, str]): + if isinstance(config_source, RooProcessConfig): + config = config_source else: - raise RuntimeError('missing config input') + config = RooProcessConfig.open(config_source) + self.config = config + action_tree = config.get_action_tree() action_tree.construct_actions(rdf_backend=self.backend) if not action_tree.root_node.has_child: raise RuntimeError("no actions found in the process card") diff --git a/quickstats/core/virtual_trees.py b/quickstats/core/virtual_trees.py index 241c2b3785b7f8017a36c7b0218d3c956b6ba622..e50c89755aa4001eb005553531022c6517bcecf8 100644 --- a/quickstats/core/virtual_trees.py +++ b/quickstats/core/virtual_trees.py @@ -23,6 +23,9 @@ class TVirtualNode: raise RuntimeError(f'missing data attribute "{key}"') return self.data[key] + def try_get_data(self, key:str, default=None): + return self.data.get(key, default) + @property def has_child(self): return self.get_number_of_children() > 0 @@ -91,4 +94,25 @@ class TVirtualTree: node = None break self.current_node = node - return node \ No newline at end of file + return node + + def _add_level_offset(self, node, offset:int): + node.level += offset + for child in node.children: + self._add_level_offset(child, offset) + + def merge(self, tree): + if not tree.root_node.has_child: + return None + other_children = tree.root_node.children + for child in other_children: + child.parent = self.current_node + self._add_level_offset(child, self.current_node.level) + if not self.current_node.has_child: + self.current_node.child = other_children + return None + self_children = self.current_node.children + # connect the childrens + self_children[-1].next_sibling = other_children[0] + other_children[0].prev_sibling = self_children[-1] + self_children.extend(other_children) \ No newline at end of file diff --git a/quickstats/interface/root/RDataFrame.py b/quickstats/interface/root/RDataFrame.py index 9110aa6c7356f2f0d5544f6313faac2e71c34765..f263997488ba2a6cee12c1ba40579d6f33aef30f 100644 --- a/quickstats/interface/root/RDataFrame.py +++ b/quickstats/interface/root/RDataFrame.py @@ -19,9 +19,15 @@ class RDataFrame(TObject): def rdf(self): return self.obj - def __init__(self, rdf=None, **kwargs): - super().__init__(**kwargs) - self.obj = rdf + def __init__(self, *args, verbosity:Optional[Union[int, str]]="INFO", **kwargs): + super().__init__(verbosity=verbosity, **kwargs) + import ROOT + if not args: + self.obj = None + elif (len(args) == 1) and isinstance(args[0], ROOT.RDataFrame): + self.obj = args[0] + else: + self.obj = ROOT.RDataFrame(*args) @semistaticmethod def create_spec(self, source:Union[Dict, List[str], str], @@ -139,18 +145,18 @@ class RDataFrame(TObject): return ds_spec @semistaticmethod - def _awkward_array(self, rdf, columns:Optional[List[str]]=None): + def _awkward_array(self, rdf, columns:Optional[List[str]]=None, **kwargs): if columns is None: columns = list(rdf.GetColumnNames()) import awkward as ak - array = ak.from_rdataframe(rdf, columns) + array = ak.from_rdataframe(rdf, columns=columns, **kwargs) return array - def awkward_array(self, columns:Optional[List[str]]=None): + def awkward_array(self, columns:Optional[List[str]]=None, **kwargs): rdf = self.rdf if rdf is None: RuntimeError('RDataFrame instance not initialized') - return self._awkward_array(rdf, columns=columns) + return self._awkward_array(rdf, columns=columns, **kwargs) @semistaticmethod def from_files(self, filenames:Union[List[str], str], diff --git a/quickstats/interface/root/TObject.py b/quickstats/interface/root/TObject.py index 811c3bb97c7f560340d69e63791b98a28eddc6a5..975363c19b130f684041dd0eee64fb0c560bca92 100644 --- a/quickstats/interface/root/TObject.py +++ b/quickstats/interface/root/TObject.py @@ -10,7 +10,7 @@ class TObject(AbstractObject): self.obj = None self.initialize(**kwargs) - def initialize(**kwargs): + def initialize(self, **kwargs): pass def get(self): diff --git a/quickstats/macros/ResponseFunction/ResponseFunction.cxx b/quickstats/macros/ResponseFunction/ResponseFunction.cxx index 607f762197120b87fa806bb606b60addbda88364..84bb3fe46a3a6f165b6f74504c1e69764f579d89 100644 --- a/quickstats/macros/ResponseFunction/ResponseFunction.cxx +++ b/quickstats/macros/ResponseFunction/ResponseFunction.cxx @@ -215,7 +215,7 @@ Double_t ResponseFunction::evaluate() const RooAbsReal *param; int i = 0; - for (size_t i=0; i < _paramList.getSize(); i++) { + for (int i=0; i < _paramList.getSize(); i++) { const auto& param = static_cast<RooAbsReal&>(_paramList[i]); const auto& low = static_cast<RooAbsReal&>(_lowList[i]); const auto& high = static_cast<RooAbsReal&>(_highList[i]); @@ -281,7 +281,7 @@ void ResponseFunction::printMultiline(std::ostream &os, Int_t contents, void ResponseFunction::printResponseFunctions(std::ostream &os) const { - for (size_t i = 0; i < _paramList.getSize(); i++) + for (int i = 0; i < _paramList.getSize(); i++) { const auto& param = static_cast<RooAbsReal&>(_paramList[i]); const auto& low = static_cast<RooAbsReal&>(_lowList[i]); diff --git a/quickstats/plots/general_2D_plot.py b/quickstats/plots/general_2D_plot.py index 9c8d1a06a4c3e1c942c08f368208bcb3fbe9d123..ea6799c31f247a36bce4e6ee8cd2efd7853de3c9 100644 --- a/quickstats/plots/general_2D_plot.py +++ b/quickstats/plots/general_2D_plot.py @@ -22,6 +22,8 @@ class General2DPlot(AbstractPlot): 'linestyles': 'solid', 'linewidths': 3 }, + 'contourf': { + }, 'scatter': { 'c': 'hh:darkpink', 'marker': 'o', @@ -63,7 +65,8 @@ class General2DPlot(AbstractPlot): zmin:Optional[float]=None, zmax:Optional[float]=None, logx:bool=False, logy:bool=False, norm:Optional=None, draw_colormesh:bool=True, draw_contour:bool=False, - draw_scatter:bool=False, draw_clabel:bool=True, + draw_contourf:bool=False, draw_scatter:bool=False, + draw_clabel:bool=True, contour_levels:Optional[Union[float, List[float]]]=None, transform:Optional[Callable]=None, ax=None): @@ -96,8 +99,9 @@ class General2DPlot(AbstractPlot): handle = ax.contour(X, Y, Z, levels=contour_levels, **self.styles['contour']) if draw_clabel: - ax.clabel(handle, **self.styles['clabel']) - + ax.clabel(handle, **self.styles['clabel']) + if draw_contourf: + handle = ax.contourf(X, Y, Z, levels=contour_levels, **self.styles['contourf']) if draw_scatter: handle = ax.scatter(x, y, **self.styles['scatter']) ax.legend(**self.styles['legend']) @@ -106,4 +110,4 @@ class General2DPlot(AbstractPlot): title=title) self.set_axis_range(ax, xmin=xmin, xmax=xmax, ymin=ymin, ymax=ymax) - return ax + return ax \ No newline at end of file diff --git a/quickstats/utils/data_conversion.py b/quickstats/utils/data_conversion.py index b5fcc47c004a676d42c3f57313e91330bb1b0423..36d7883f0e288b08b4d0b4ec2f3e398bb1b41e44 100644 --- a/quickstats/utils/data_conversion.py +++ b/quickstats/utils/data_conversion.py @@ -121,8 +121,13 @@ def uproot_get_standard_columns(uproot_tree): return np.array(columns)[np.where(np.isin(column_types, uproot_datatypes))] def get_rdf_column_type(rdf, column_name:str): + if hasattr(rdf, "GetColumnType"): + GetColumnType = rdf.GetColumnType + # case distributed dataframe + else: + GetColumnType = rdf._headnode._localdf.GetColumnType try: - column_type = rdf.GetColumnType(column_name) + column_type = GetColumnType(column_name) except Exception: column_type = "" return column_type diff --git a/quickstats/utils/py_utils.py b/quickstats/utils/py_utils.py new file mode 100644 index 0000000000000000000000000000000000000000..6b90094dc3561b437127485ded177a9fb594716f --- /dev/null +++ b/quickstats/utils/py_utils.py @@ -0,0 +1,15 @@ +import inspect + +def get_required_args(obj): + sig = inspect.signature(obj) + required_args = [ + name for name, param in sig.parameters.items() + if param.default is param.empty and + param.kind in [param.POSITIONAL_OR_KEYWORD, param.POSITIONAL_ONLY] + ] + + # remove self in case of a class object + if isinstance(obj, type): + required_args = required_args[1:] + + return required_args \ No newline at end of file diff --git a/quickstats/utils/root_utils.py b/quickstats/utils/root_utils.py index 0d9d629edde23f5e217092887c758f753574f222..058842c9152aeae23e1adbe3376d4116052c98a3 100644 --- a/quickstats/utils/root_utils.py +++ b/quickstats/utils/root_utils.py @@ -243,9 +243,17 @@ def create_declaration(expression:str, name:Optional[str]=None): guarded_declaration += f"\n{expression}\n\n#endif\n" return guarded_declaration -def declare_expression(expression:str, name:Optional[str]=None): +def declare_expression(expression:str, name:Optional[str]=None, + distributed:bool=False): declaration = create_declaration(expression, name) status = ROOT.gInterpreter.Declare(declaration) + if distributed: + def wrapper(): + ROOT.gInterpreter.Declare(declaration) + try: + ROOT.RDF.Experimental.Distributed.initialize(wrapper) + except: + quickstats.stdout.warning("Failed to initialize distributed code declarations. Ignored.") return status def get_tree_names(f:ROOT.TFile): diff --git a/setup.py b/setup.py index 2e1751dc2c3292ca976fddd844c3ef9bfbf21942..dd0f3319526159b777fe93448152368acb7248b6 100644 --- a/setup.py +++ b/setup.py @@ -31,10 +31,10 @@ setup( long_description=long_description, long_description_content_type="text/markdown", packages=setuptools.find_packages(), - package_data={'quickstats':['macros/*/*.cxx', 'macros/*/*.h', 'stylesheets/*', 'resources/*']}, + package_data={'quickstats':['macros/*/*.cxx', 'macros/*/*.h', 'resources/mpl_stylesheets/*', 'resources/*']}, exclude_package_data={'quickstats':['macros/CMSSWCore/*', 'macros/CMSSWCore_HHComb/*', 'macros/ATLASSWCore/*', - 'resources/workspace_extensions.json']}, + 'resources/workspace_extensions*.json']}, classifiers=[ "Programming Language :: Python :: 3.7", "Programming Language :: Python :: 3.8",