diff --git a/Analysis/Ostap/doc/release.notes b/Analysis/Ostap/doc/release.notes index 70d0a0dc8eaa18165e644f603e8b027338e8e61c..666fed27c9e785a529e22d51ea80770ccd23b453 100644 --- a/Analysis/Ostap/doc/release.notes +++ b/Analysis/Ostap/doc/release.notes @@ -8,6 +8,11 @@ ! by $Author$ ! ----------------------------------------------------------------------------- +! 2017-01-18 - Vanya Belyaev + - Kisa.py: + - improve parallelization + - add parallelization methods for projection of looooong trees + ! 2017-01-16 - Vanya Belyaev - GraphDeco: 1. couple of minor fixes diff --git a/Analysis/Ostap/python/Ostap/Kisa.py b/Analysis/Ostap/python/Ostap/Kisa.py index 000e1c50bfbe2e36911777062a11fe5dc5aad5ef..b6180c8b2e4dfa59f4353b4cac9271e6683f04f4 100644 --- a/Analysis/Ostap/python/Ostap/Kisa.py +++ b/Analysis/Ostap/python/Ostap/Kisa.py @@ -12,10 +12,6 @@ # @author Vanya BELYAEV Ivan.Belyaev@itep.ru # @date 2014-09-23 # -# -# $Revision$ -# Last modification $Date$ -# by $Author$ # ============================================================================= """ Multiprocessing functionality for Ostap Currently it is not loaded on default, and requires manual activation @@ -25,9 +21,10 @@ __version__ = "$Revision$" __author__ = "Vanya BELYAEV Ivan.Belyaev@itep.ru" __date__ = "2011-06-07" __all__ = ( - 'ProjectTask' , - 'FillTask' , - 'project' , + 'ProjectTask' , ## "project task" for chain/trees + 'FillTask' , + 'cproject' , ## project method for loooong TChains + 'tproject' , ## project method for loooong TTree 'fillDataSet' ) # ============================================================================= @@ -41,30 +38,30 @@ else : logger = getLogger ( __name__ ) # ============================================================================= logger.debug ( 'Multiprocessing functionality for Ostap') # ============================================================================= - +n_large = 2**63 - 1 ## ROOT.TVirtualTreePlayer.kMaxEntries +## n_large = ROOT.TVirtualTreePlayer.kMaxEntries # ============================================================================= -## The simple task object for more efficient projection of loooong TChains +## The simple task object for more efficient projection of loooong chains/trees # into histogarms # @see GaudiMP.Parallel # @see GaudiMP.Parallel.Task # @author Vanya BELYAEV Ivan.Belyaev@itep.ru # @date 2014-09-23 class ProjectTask(Parallel.Task) : - """ The simple task object for efficient projection of loooong TChains - into histogarms + """ The simple task object for efficient parallel + projection of looooooong TChains/TTrees into histograms """ ## constructor: chain name, historgam , variable , cuts def __init__ ( self , tree , histo , what , cuts = '' ) : - """ - Constructor: chain name, historgam , variable , cuts + """Constructor: chain/tree name, historgam , variable , cuts >>> histo = ... >>> task = ProjectTask ( 'MyTuple' , histo , 'mass' , 'pt>10' ) - """ - + """ self.histo = histo self.histo.Reset() + import ROOT if isinstance ( tree , ROOT.TTree ) : self.tree = tree.GetName() elif isinstance ( tree , str ) : self.tree = tree @@ -73,82 +70,211 @@ class ProjectTask(Parallel.Task) : ## local initialization (executed once in parent process) def initializeLocal ( self ) : - """ - Local initialization (executed once in parent process) + """Local initialization (executed once in parent process) """ import ROOT,Ostap.PyRoUts - self.output = self.histo.Clone() + self.output = 0, self.histo.Clone() ## remote initialization (executed for each sub-processs) def initializeRemote ( self ) : pass - - ## the actual processing + + ## the actual processing + # ``params'' is assumed to be a tuple : + # - the file name + # - the first entry in tree to process + # - number of entries to process def process ( self , params ) : + """The actual processing + ``params'' is assumed to be a tuple-like entity: + 0 - the file name + 1 - the first entry in tree to process + 2 - number of entries to process + """ import ROOT import Ostap.PyRoUts + + if isinstance ( params , str ) : params = ( param , 0 , n_large ) + elif isinstance ( params , ROOT.TChainElement ) : + params = ( params.GetTitle() , 0 , n_large ) + + fname = params[0] + first = params[1] if 1 < len(params) else 0 + nentries = params[2] if 2 < len(params) else n_large - if isinstance ( params , str ) : pars = [ params ] - elif isinstance ( params , ROOT.TChainElement ) : pars = [ params.GetTitle() ] + if isinstance ( fname , ROOT.TChainElement ) : fname = fname.GetTitle() chain = ROOT.TChain ( self.tree ) - for p in pars : chain.Add ( p ) - + chain.Add ( fname ) + ## Create the output histogram NB! (why here???) - self.output = self.histo.Clone() + self.output = 0 , self.histo.Clone() ## use the regular projection from Ostap.TreeDeco import _tt_project_ - _tt_project_ ( chain , self.output , self.what , self.cuts ) - + self.output = _tt_project_ ( chain , self.output[1] , + self.what , self.cuts , + '' , + nentries , first ) del chain - ## finalization (execuetd at the end at parent process) + ## finalization (executed at the end at parent process) def finalize ( self ) : pass ## merge results def _mergeResults(self, result) : # - self.output.Add ( result ) - result.Delete () - + filtered = self.output[0] + result[0] + self.output[1].Add ( result[1] ) + self.output = filtered, self.output[1] + result[1].Delete () + # ============================================================================= -## make a projection of the loooong chain into histogram using +## make a projection of the loooooooong chain into histogram using # multiprocessing functionality for per-file parallelisation # @code # # >>> chain = ... ## large chain # >>> histo = ... ## histogram template -# >>> project ( chain , histo , 'mass' , 'pt>10' ) +# >>> project ( chain , histo , 'mass' , 'pt>10' ) +# >>> chain.pproject ( histo , 'mass' , 'pt>0' ) ## ditto # # @endcode # # For 12-core machine, clear speedup factor of about 8 is achieved # @author Vanya BELYAEV Ivan.Belyaev@itep.ru # @date 2014-09-23 -def project ( chain , histo , what , cuts ) : +def cproject ( chain , histo , what , cuts ) : """Make a projection of the loooong chain into histogram >>> chain = ... ## large chain >>> histo = ... ## histogram template - >>> project ( chain , histo , 'mass' , 'pt>10' ) + >>> cproject ( chain , histo , 'mass' , 'pt>10' ) + >>> chain.ppropject ( histo , 'mass' , 'pt>0' ) ## ditto For 12-core machine, clear speedup factor of about 8 is achieved """ # + + if not tree : + return 0 , histo + if not histo : + logger.error ('cproject: invalid histogram') + return 0 , histo + + import ROOT histo.Reset() - files = chain.files() + if not isinstance ( ROOT , TChain ) : + logger.warning ('cproject method is TChain-specific, skip parallelization') + from Ostap.TreeDeco import _tt_project_ + return _tt_project_ ( chain , histo , what , cuts ) + + import Ostap.TreeDeco + files = chain.files() + task = ProjectTask ( chain , histo , what , cuts ) wmgr = Parallel.WorkManager () wmgr.process( task, files ) - histo += task.output + filtered = task.output[0] + histo += task.output[1] + + return filtered , histo + +ROOT.TChain.cproject = cproject +ROOT.TChain.pproject = cproject + +# ============================================================================= +## make a projection of the loooooooong tree into histogram using +# multiprocessing functionality for per-file parallelisation +# @code +# +# >>> tree = ... ## large tree +# >>> histo = ... ## histogram template +# >>> tproject ( tree , histo , 'mass' , 'pt>10' , maxentries = 1000000 ) +# >>> tree.pproject ( histo , 'mass' , 'pt>10' ) ## ditto +# @endcode +# - significant gain can be achieved for very large ttrees with complicated expressions and cuts +# - <code>maxentries</code> parameter should be rather large +# @author Vanya BELYAEV Ivan.Belyaev@itep.ru +# @date 2014-09-23 +def tproject ( tree , histo , what , cuts , nentries = -1 , first = 0 , maxentries = 1000000 ) : + """Make a projection of the loooong tree into histogram + >>> tree = ... ## large chain + >>> histo = ... ## histogram template + >>> tproject ( tree , histo , 'mass' , 'pt>10' ) + >>> tree.pproject ( histo , 'mass' , 'pt>10' ) ## ditto + - significant gain can be achieved for very large ttrees with complicated expressions and cuts + - maxentries parameter should be rather large + """ + # + if not tree : + return 0 , histo + if not histo : + logger.error ('tproject: invalid histogram') + return 0 , histo + + import ROOT + histo.Reset() + + num = len( tree ) + if num <= first : + return 0, histo + + if 0 > nentries : nentries = n_large + + maxentries = long(maxentries) + if 0 >= maxentries : maxentries = n_large + + if 0 > first : first = 0 + + ## use the regular projection + from Ostap.TreeDeco import _tt_project_ - return histo + if isinstance ( tree , ROOT.TChain ) : + logger.warning ('``tproject'' method is TTree-specific, skip parallelization') + return _tt_project_ ( tree , histo , what , cuts , '' , total , first ) + + ## total number of events to process : + total = min ( num - first , nentries ) + ## the event range is rather short, no need in parallel processing + if total < maxentries : + return _tt_project_ ( tree , histo , what , cuts , '', total , first ) + + ## check if tree is file-resident: + tdir = tree.GetDirectory() + if tdir and isinstance ( tdir , ROOT.TFile ) : pass + else : + return _tt_project_ ( tree , histo , what , cuts , '', total , first ) + + fname = tdir.GetName() + + ## number of jobs & reminder + njobs, rest = divmod ( total , maxentries ) + csize = int ( total / njobs ) ## chunk size + + ## final list of parameters [ (file_name, first_event , num_events ) , ... ] + params = [] + for i in range(njobs) : + params.append ( ( fname , first + i * csize , csize ) ) + + if rest : + params.append ( ( fname , first + njobs * csize , rest ) ) + njobs += 1 + + task = ProjectTask ( tree , histo , what , cuts ) + wmgr = Parallel.WorkManager () + wmgr.process( task, params ) + + filtered = task.output[0] + histo += task.output[1] + + return filtered , histo import ROOT -ROOT.TChain._project = project +ROOT.TTree.tproject = tproject +ROOT.TTree.pproject = tproject # ============================================================================= ## The simple task object for more efficient fill of RooDataSet from TChain @@ -159,8 +285,7 @@ ROOT.TChain._project = project # @author Vanya BELYAEV Ivan.Belyaev@itep.ru # @date 2014-09-23 class FillTask(Parallel.Task) : - """ - The single task object for more efficient fill of RooDataSet from TChain + """The single task object for more efficient fill of RooDataSet from TChain - for 12-core machine, clear speed-up factor of about 8 is achieved """ ##