Skip to content
Snippets Groups Projects
Commit 34e0515e authored by Alkaid Cheng's avatar Alkaid Cheng
Browse files

Merge branch 'dev' into 'master'

Aggregated updates for the next release (7.0.0)

See merge request !143
parents 9143d459 1f44ed6a
Branches
Tags 0.7.0.0
1 merge request!143Aggregated updates for the next release (7.0.0)
Pipeline #7112720 passed
Showing
with 170 additions and 87 deletions
......@@ -19,3 +19,4 @@ dev/
quickstats/macros/CMSSWCore*/
quickstats/macros/ATLASSWCore/
quickstats/resources/workspace_extensions.json
workspace_extensions_bak.json
__version__ = "0.6.9.9"
__version__ = "0.7.0.0"
......@@ -60,12 +60,15 @@ class NTupleProcessTool(ConfigurableObject):
def attribute_df(self):
return self._attribute_df
def __init__(self, sample_config:Union[Dict, str], outdir:str='output',
processor_config:Optional[str]=None,
processor_flags:Optional[List[str]]=None,
def __init__(self, sample_config:Union[Dict, str],
outdir:str='output',
process_config:Optional[Union["RooProcessConfig", str]]=None,
process_flags:Optional[List[str]]=None,
cache:bool=True,
use_template:bool=False,
multithread:bool=True,
backend:Optional[str]=None,
backend_options:Optional[Dict]=None,
disable_config_message:bool=False,
verbosity:Optional[Union[int, str]]="INFO",
**kwargs):
......@@ -73,23 +76,23 @@ class NTupleProcessTool(ConfigurableObject):
super().__init__(disable_config_message=disable_config_message,
verbosity=verbosity)
self.outdir = outdir
self.path_manager = AnalysisPathManager(outdir)
self.path_manager = AnalysisPathManager(base_path=outdir)
self.path_manager.set_directory("ntuple", "ntuples")
self.path_manager.set_directory("cutflow", "cutflow")
self.load_sample_config(sample_config)
self.processor = None
if processor_config is not None:
self.load_processor_config(processor_config,
cache=cache,
use_template=use_template,
multithread=multithread)
if process_config is not None:
self.load_process_config(process_config,
cache=cache,
use_template=use_template,
multithread=multithread)
if processor_flags is not None:
self.processor_flags = list(processor_flags)
if process_flags is not None:
self.process_flags = list(process_flags)
else:
self.processor_flags = []
self.process_flags = []
self.cutflow_report = None
......@@ -115,8 +118,8 @@ class NTupleProcessTool(ConfigurableObject):
sample_list.extend(list(self.sample_config['systematic_samples'][syst_theme]))
self._syst_theme_list.append('Nominal')
import pandas as pd
self._sample_list = list(pd.unique(sample_list))
merge_sample_map = combine_dict(self.sample_config['merge_samples'])
self._sample_list = pd.unique(np.array(sample_list))
merge_sample_map = combine_dict(self.sample_config.get('merge_samples', {}))
if not merge_sample_map:
merge_sample_map = {sample: [sample] for sample in self.sample_list}
else:
......@@ -132,7 +135,7 @@ class NTupleProcessTool(ConfigurableObject):
syst_name_list = []
for syst_names in self.sample_config['systematics'].values():
syst_name_list.extend(syst_names)
self._syst_name_list = pd.unique(syst_name_list)
self._syst_name_list = pd.unique(np.array(syst_name_list))
def get_sample_df(self):
sample_data = []
......@@ -183,17 +186,22 @@ class NTupleProcessTool(ConfigurableObject):
attribute_df = pd.DataFrame(attribute_data).set_index(index_list)
return attribute_df
def load_processor_config(self, config_path:str,
cache:bool=True,
multithread:bool=True,
use_template:bool=False):
from quickstats.components.processors import RooProcessor
self.processor = RooProcessor(config_path,
def load_process_config(self, config_source:Union["RooProcessConfig", str],
cache:bool=True,
multithread:bool=True,
use_template:bool=False,
backend:Optional[str]=None,
backend_options:Optional[Dict]=None):
from quickstats.components import RooProcessor
self.processor = RooProcessor(config_source,
cache=cache,
use_template=use_template,
multithread=multithread,
backend=backend,
backend_options=backend_options,
verbosity=self.stdout.verbosity)
self.path_manager.set_file("processor_config", os.path.abspath(config_path))
if isinstance(config_source, str):
self.path_manager.set_file("process_config", os.path.abspath(config_source))
def get_validated_syst_themes(self, syst_themes:Optional[List[str]]=None):
if syst_themes is None:
......@@ -354,7 +362,7 @@ class NTupleProcessTool(ConfigurableObject):
samples:Optional[List[str]]=None,
sample_types:Optional[List[str]]=None):
if self.processor is None:
raise RuntimeError("processor not initialized (probably missing a processor config)")
raise RuntimeError("processor not initialized (probably missing a process config)")
paths = self.get_selected_paths(syst_themes=syst_themes,
samples=samples,
sample_types=sample_types)
......@@ -362,7 +370,7 @@ class NTupleProcessTool(ConfigurableObject):
syst_names = self.get_validated_syst_names(syst_names)
for syst_theme in paths:
for sample in paths[syst_theme]:
self.processor.set_flags(self.processor_flags)
self.processor.set_flags(self.process_flags)
for sample_type in paths[syst_theme][sample]:
sample_paths = paths[syst_theme][sample][sample_type]
sample_config = {
......@@ -383,10 +391,14 @@ class NTupleProcessTool(ConfigurableObject):
self.processor.run(expended_paths)
self.processor.clear_global_variables()
def process_samples(self, samples:Optional[List[str]]=None,
sample_types:Optional[List[str]]=None):
def process_samples(self, samples:Optional[Union[List[str], str]]=None,
sample_types:Optional[Union[List[str], str]]=None):
if isinstance(samples, str):
samples = [samples]
if isinstance(sample_types, str):
sample_types = [sample_types]
if self.processor is None:
raise RuntimeError("processor not initialized (probably missing a processor config)")
raise RuntimeError("processor not initialized (probably missing a process config)")
paths = self.get_selected_paths(syst_themes=['Nominal'],
samples=samples,
sample_types=sample_types)
......@@ -395,7 +407,7 @@ class NTupleProcessTool(ConfigurableObject):
paths = paths['Nominal']
outdir = self.path_manager.base_path
for sample in paths:
self.processor.set_flags(self.processor_flags)
self.processor.set_flags(self.process_flags)
for sample_type in paths[sample]:
sample_paths = paths[sample][sample_type]
sample_config = {
......
......@@ -17,7 +17,7 @@ from .nuisance_parameter_pull import NuisanceParameterPull
from .nuisance_parameter_ranking import NuisanceParameterRanking
from .nuisance_parameter_harmonizer import NuisanceParameterHarmonizer
from .caching_nll_wrapper import CachingNLLWrapper
from .processors import RooProcessor
from .processors import RooProcessor, RooProcessConfig
#from .signal_modelling import SignalModelling
import ROOT
......
from quickstats.components.processors.actions import *
from quickstats.components.processors.roo_config_parser import RooConfigParser
from quickstats.components.processors.roo_processor import RooProcessor
\ No newline at end of file
from .actions import *
from .roo_process_config import RooProcessConfig
from .roo_processor import RooProcessor
\ No newline at end of file
from .auxiliary import *
from .rooproc_base_action import RooProcBaseAction
from .rooproc_rdf_action import RooProcRDFAction
from .rooproc_helper_action import RooProcHelperAction
......@@ -27,34 +28,4 @@ from .rooproc_if_not_defined import RooProcIfNotDefined
from .rooproc_load_macro import RooProcLoadMacro
from .rooproc_as_parquet import RooProcAsParquet
from .rooproc_as_hdf import RooProcAsHDF
from .auxiliary import *
ACTION_MAP = {
"TREENAME": RooProcTreeName,
"DECLARE": RooProcDeclare,
"GLOBAL": RooProcGlobalVariables,
"ALIAS": RooProcAlias,
"SAFEALIAS": RooProcSafeAlias,
"DEFINE": RooProcDefine,
"SAFEDEFINE": RooProcSafeDefine,
"REDEFINE": RooProcRedefine,
"FILTER": RooProcFilter,
"GETSUM": RooProcSum,
"GETMAX": RooProcMax,
"GETMIN": RooProcMin,
"GETMEAN": RooProcMean,
"SAVE": RooProcSave,
"REPORT": RooProcReport,
"EXPORT": RooProcExport,
"SAVE_FRAME": RooProcSaveFrame,
"LOAD_FRAME": RooProcLoadFrame,
"AS_NUMPY": RooProcAsNumpy,
"IFDEF": RooProcIfDefined,
"IFNDEF": RooProcIfNotDefined,
"LOAD_MACRO": RooProcLoadMacro,
"AS_PARQUET": RooProcAsParquet,
"AS_HDF": RooProcAsHDF
}
def get_action(action_name:str):
return ACTION_MAP.get(action_name, None)
\ No newline at end of file
from .rooproc_progressbar import RooProcProgressBar
\ No newline at end of file
......@@ -2,4 +2,15 @@ from quickstats import GeneralEnum
class RooProcReturnCode(GeneralEnum):
NORMAL = 0
SKIP_CHILD = 1
\ No newline at end of file
SKIP_CHILD = 1
ACTION_MAP = dict()
def get_action(action_name:str):
return ACTION_MAP.get(action_name, None)
def register_action(action_cls):
name = action_cls.NAME
if (name is not None) and (name not in ACTION_MAP):
ACTION_MAP[name] = action_cls
return action_cls
\ No newline at end of file
import re
from quickstats.utils.string_utils import split_str
ListRegex = re.compile(r"\[([^\[\]]+)\]")
def ListFormatter(text:str):
match = ListRegex.match(text)
if not match:
return [text]
return split_str(match.group(1), sep=',', strip=True, remove_empty=True)
\ No newline at end of file
......@@ -2,9 +2,13 @@ from typing import Optional
import re
from .rooproc_rdf_action import RooProcRDFAction
from .auxiliary import register_action
@register_action
class RooProcAlias(RooProcRDFAction):
NAME = "ALIAS"
def __init__(self, alias:str, column_name:str):
super().__init__(alias=alias, column_name=column_name)
......
......@@ -3,16 +3,20 @@ from typing import Optional, List
import numpy as np
from .rooproc_output_action import RooProcOutputAction
from .auxiliary import register_action
from quickstats import module_exist
from quickstats.utils.common_utils import is_valid_file
from quickstats.utils.data_conversion import ConversionMode
from quickstats.interface.root import RDataFrameBackend
@register_action
class RooProcAsHDF(RooProcOutputAction):
NAME = "AS_HDF"
def __init__(self, filename:str, key:str,
columns:Optional[List[str]]=None):
columns:Optional[List[str]]):
super().__init__(filename=filename,
columns=columns,
key=key)
......@@ -21,20 +25,21 @@ class RooProcAsHDF(RooProcOutputAction):
filename = params['filename']
key = params['key']
if processor.cache and is_valid_file(filename):
processor.stdout.info(f"Cached output `{filename}`.")
processor.stdout.info(f'Cached output from "{filename}".')
return rdf, processor
processor.stdout.info(f'Saving output "{filename}".')
processor.stdout.info(f'Writing output to "{filename}".')
import awkward as ak
import pandas as pd
columns = params.get('columns', None)
columns = self.get_valid_columns(rdf, processor, columns=columns,
mode=ConversionMode.REMOVE_NON_STANDARD_TYPE)
array = None
# NB: RDF Dask/Spark does not support GetColumnType yet
if (module_exist('awkward') and \
(processor.backend not in [RDataFrameBackend.DASK, RDataFrameBackend.SPARK])):
if module_exist('awkward'):
try:
import awkward as ak
# NB: RDF Dask/Spark does not support GetColumnType yet
if processor.backend in [RDataFrameBackend.DASK, RDataFrameBackend.SPARK]:
rdf.GetColumnType = rdf._headnode._localdf.GetColumnType
array = ak.from_rdataframe(rdf, columns=columns)
array = ak.to_numpy(array)
except:
......@@ -44,5 +49,6 @@ class RooProcAsHDF(RooProcOutputAction):
if array is None:
array = rdf.AsNumpy(columns)
df = pd.DataFrame(array)
self.makedirs(filename)
df.to_hdf(filename, key=key)
return rdf, processor
\ No newline at end of file
......@@ -3,29 +3,34 @@ from typing import Optional, List
import numpy as np
from .rooproc_output_action import RooProcOutputAction
from .auxiliary import register_action
from quickstats import module_exist
from quickstats.utils.common_utils import is_valid_file
from quickstats.utils.data_conversion import ConversionMode
from quickstats.interface.root import RDataFrameBackend
@register_action
class RooProcAsNumpy(RooProcOutputAction):
NAME = "AS_NUMPY"
def _execute(self, rdf:"ROOT.RDataFrame", processor:"quickstats.RooProcessor", **params):
filename = params['filename']
if processor.cache and is_valid_file(filename):
processor.stdout.info(f"INFO: Cached output `{filename}`.")
processor.stdout.info(f'Cached output from "{filename}".')
return rdf, processor
processor.stdout.info(f'Saving output "{filename}".')
processor.stdout.info(f'Writing output to "{filename}".')
columns = params.get('columns', None)
columns = self.get_valid_columns(rdf, processor, columns=columns,
mode=ConversionMode.REMOVE_NON_STANDARD_TYPE)
array = None
# NB: RDF Dask/Spark does not support GetColumnType yet
if (module_exist('awkward') and \
(processor.backend not in [RDataFrameBackend.DASK, RDataFrameBackend.SPARK])):
if module_exist('awkward'):
try:
import awkward as ak
# NB: RDF Dask/Spark does not support GetColumnType yet
if processor.backend in [RDataFrameBackend.DASK, RDataFrameBackend.SPARK]:
rdf.GetColumnType = rdf._headnode._localdf.GetColumnType
array = ak.from_rdataframe(rdf, columns=columns)
array = ak.to_numpy(array)
except:
......@@ -34,5 +39,6 @@ class RooProcAsNumpy(RooProcOutputAction):
"Falling back to use ROOT instead")
if array is None:
array = rdf.AsNumpy(columns)
self.makedirs(filename)
np.save(filename, array)
return rdf, processor
\ No newline at end of file
......@@ -3,27 +3,34 @@ from typing import Optional, List
import numpy as np
from .rooproc_output_action import RooProcOutputAction
from .auxiliary import register_action
from quickstats.utils.common_utils import is_valid_file
from quickstats.utils.data_conversion import ConversionMode
from quickstats.interface.root import RDataFrameBackend
@register_action
class RooProcAsParquet(RooProcOutputAction):
NAME = "AS_PARQUET"
def _execute(self, rdf:"ROOT.RDataFrame", processor:"quickstats.RooProcessor", **params):
filename = params['filename']
if processor.cache and is_valid_file(filename):
processor.stdout.info(f"Cached output `{filename}`.")
processor.stdout.info(f'Cached output from "{filename}".')
return rdf, processor
processor.stdout.info(f'Saving output "{filename}".')
processor.stdout.info(f'Writing output to "{filename}".')
columns = params.get('columns', None)
columns = self.get_valid_columns(rdf, processor, columns=columns,
mode=ConversionMode.REMOVE_NON_STANDARD_TYPE)
import awkward as ak
# NB: RDF Dask/Spark does not support GetColumnType yet
if processor.backend not in [RDataFrameBackend.DASK, RDataFrameBackend.SPARK]:
try:
# NB: RDF Dask/Spark does not support GetColumnType yet
if processor.backend in [RDataFrameBackend.DASK, RDataFrameBackend.SPARK]:
rdf.GetColumnType = rdf._headnode._localdf.GetColumnType
array = ak.from_rdataframe(rdf, columns=columns)
else:
except:
array = ak.Array(rdf.AsNumpy(columns))
self.makedirs(filename)
ak.to_parquet(array, filename)
return rdf, processor
\ No newline at end of file
......@@ -2,8 +2,13 @@ from typing import Optional, List, Dict
import os
import re
from quickstats.utils.py_utils import get_required_args
class RooProcBaseAction(object):
NAME = None
PARAM_FORMATS = {}
def __init__(self, **params):
self._params = params
self.executed = False
......@@ -12,6 +17,10 @@ class RooProcBaseAction(object):
@staticmethod
def allow_multiline():
return False
@staticmethod
def has_global_var(text:str):
return re.search(r"\${(\w+)}", text) is not None
def get_formatted_parameters(self, global_vars:Optional[Dict]=None):
if global_vars is None:
......@@ -40,6 +49,10 @@ class RooProcBaseAction(object):
if is_list:
v = v.split("__SEPARATOR__")
formatted_parameters[k] = v
for key, value in formatted_parameters.items():
if key in self.PARAM_FORMATS:
formatter = self.PARAM_FORMATS[key]
formatted_parameters[key] = formatter(value)
return formatted_parameters
def makedirs(self, filename:str):
......@@ -69,8 +82,22 @@ class RooProcBaseAction(object):
attributes = re.findall(r"(\w+)=([^,]+)", text)
for attribute in attributes:
kwargs[attribute[0]] = attribute[1]
for key, value in kwargs.items():
if (not cls.has_global_var(value)) and (key in cls.PARAM_FORMATS):
formatter = cls.PARAM_FORMATS[key]
kwargs[key] = formatter(value)
return kwargs
@classmethod
def parse(cls, main_text:str, block_text:Optional[str]=None):
raise NotImplementedError
\ No newline at end of file
return cls()
@classmethod
def _try_create(cls, **kwargs):
try:
return cls(**kwargs)
except Exception:
argnames = get_required_args(cls)
missing_argnames = list(set(argnames) - set(kwargs))
raise ValueError(f'missing keyword argument(s) for the action "{cls.NAME}": '
f'{", ".join(missing_argnames)}')
\ No newline at end of file
from typing import Optional
from .rooproc_helper_action import RooProcHelperAction
from .auxiliary import register_action
from quickstats.utils.root_utils import declare_expression
@register_action
class RooProcDeclare(RooProcHelperAction):
NAME = "DECLARE"
def __init__(self, expression:str, name:Optional[str]=None):
super().__init__(expression=expression,
name=name)
......
......@@ -2,9 +2,13 @@ from typing import Optional
import re
from .rooproc_rdf_action import RooProcRDFAction
from .auxiliary import register_action
@register_action
class RooProcDefine(RooProcRDFAction):
NAME = "DEFINE"
def __init__(self, name:str, expression:str):
super().__init__(name=name, expression=expression)
......
......@@ -3,10 +3,15 @@ import os
import json
from .rooproc_helper_action import RooProcHelperAction
from .auxiliary import register_action
from quickstats.utils.common_utils import is_valid_file
@register_action
class RooProcExport(RooProcHelperAction):
NAME = "EXPORT"
def __init__(self, filename:str):
super().__init__(filename=filename)
......@@ -18,13 +23,13 @@ class RooProcExport(RooProcHelperAction):
def _execute(self, processor:"quickstats.RooProcessor", **params):
filename = params['filename']
if processor.cache and is_valid_file(filename):
processor.stdout.info(f"INFO: Cached output `{filename}`.")
processor.stdout.info(f'Cached output "{filename}".')
return processor
data = {k:v.GetValue() for k,v in processor.external_variables.items()}
dirname = os.path.dirname(filename)
if dirname and (not os.path.exists(dirname)):
os.makedirs(dirname)
with open(filename, 'w') as outfile:
processor.stdout.info(f'INFO: Writing auxiliary data to "{filename}".')
processor.stdout.info(f'Writing auxiliary data to "{filename}".')
json.dump(data, outfile, indent=2)
return processor
\ No newline at end of file
......@@ -2,9 +2,13 @@ from typing import Optional
import re
from .rooproc_rdf_action import RooProcRDFAction
from .auxiliary import register_action
@register_action
class RooProcFilter(RooProcRDFAction):
NAME = "FILTER"
def __init__(self, expression:str, name:Optional[str]=None):
super().__init__(expression=expression,
name=name)
......
......@@ -2,9 +2,13 @@ from typing import Optional, List, Dict
import re
from .rooproc_helper_action import RooProcHelperAction
from .auxiliary import register_action
@register_action
class RooProcGlobalVariables(RooProcHelperAction):
NAME = "GLOBAL"
def __init__(self, **kwargs):
super().__init__(**kwargs)
......
from typing import List
from .rooproc_nested_action import RooProcBaseAction, RooProcNestedAction
from .auxiliary import RooProcReturnCode
from .auxiliary import RooProcReturnCode, register_action
@register_action
class RooProcIfDefined(RooProcNestedAction):
NAME = "IFDEF"
def __init__(self, flag:str):
super().__init__(flag=flag)
......
from typing import List
from .rooproc_nested_action import RooProcBaseAction, RooProcNestedAction
from .auxiliary import RooProcReturnCode
from .auxiliary import RooProcReturnCode, register_action
@register_action
class RooProcIfNotDefined(RooProcNestedAction):
NAME = "IFNDEF"
def __init__(self, flag:str):
super().__init__(flag=flag)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment