From 938d3b96a2861f0bf566fccd015e2439dd20866b Mon Sep 17 00:00:00 2001 From: ibelyaev <Ivan.Belyaev@cern.ch> Date: Tue, 24 Jan 2017 15:40:42 +0100 Subject: [PATCH] Analysis/Ostap: improve paralellisation ! 2017-01-24 - Vanya Belyaev - Kisa.py, TreeDeco.py - improve tree/chain -> histogram projections - more functionality, better parallelisation --- Analysis/Ostap/doc/release.notes | 5 + Analysis/Ostap/python/Ostap/Kisa.py | 133 ++++++++++++++++-------- Analysis/Ostap/python/Ostap/TreeDeco.py | 92 ++++++++++++++-- Analysis/Ostap/tests/TestKisa.py | 4 +- 4 files changed, 177 insertions(+), 57 deletions(-) diff --git a/Analysis/Ostap/doc/release.notes b/Analysis/Ostap/doc/release.notes index 666fed27c..205f824fc 100644 --- a/Analysis/Ostap/doc/release.notes +++ b/Analysis/Ostap/doc/release.notes @@ -8,6 +8,11 @@ ! by $Author$ ! ----------------------------------------------------------------------------- +! 2017-01-24 - Vanya Belyaev + - Kisa.py, TreeDeco.py + - improve tree/chain -> histogram projections + - more functionality, better parallelisation + ! 2017-01-18 - Vanya Belyaev - Kisa.py: - improve parallelization diff --git a/Analysis/Ostap/python/Ostap/Kisa.py b/Analysis/Ostap/python/Ostap/Kisa.py index b1b924c1a..7a69e241f 100644 --- a/Analysis/Ostap/python/Ostap/Kisa.py +++ b/Analysis/Ostap/python/Ostap/Kisa.py @@ -51,23 +51,16 @@ class ProjectTask(Parallel.Task) : """ The simple task object for efficient parallel projection of looooooong TChains/TTrees into histograms """ - ## constructor: chain name, historgam , variable , cuts - def __init__ ( self , tree , histo , what , cuts = '' ) : - """Constructor: chain/tree name, historgam , variable , cuts + ## constructor: historgam , variable , cuts + def __init__ ( self , histo ) : + """Constructor: historgam >>> histo = ... - >>> task = ProjectTask ( 'MyTuple' , histo , 'mass' , 'pt>10' ) + >>> task = ProjectTask ( histo ) """ self.histo = histo self.histo.Reset() - import ROOT - if isinstance ( tree , ROOT.TTree ) : self.tree = tree.GetName() - elif isinstance ( tree , str ) : self.tree = tree - - self.what = what - self.cuts = cuts - ## local initialization (executed once in parent process) def initializeLocal ( self ) : """Local initialization (executed once in parent process) @@ -98,13 +91,16 @@ class ProjectTask(Parallel.Task) : elif isinstance ( params , ROOT.TChainElement ) : params = ( params.GetTitle() , 0 , n_large ) - fname = params[0] - first = params[1] if 1 < len(params) else 0 - nentries = params[2] if 2 < len(params) else n_large + fname = params[0] ## file name + tname = params[1] ## tree name + what = params[2] ## variable/expression to project + cuts = params[3] if 3 < len ( params ) else '' ## cuts + first = params[4] if 4 < len ( params ) else 0 ## the first event + nentries = params[5] if 5 < len ( params ) else n_large ## number of events if isinstance ( fname , ROOT.TChainElement ) : fname = fname.GetTitle() - chain = ROOT.TChain ( self.tree ) + chain = ROOT.TChain ( tname ) chain.Add ( fname ) ## Create the output histogram NB! (why here???) @@ -113,7 +109,7 @@ class ProjectTask(Parallel.Task) : ## use the regular projection from Ostap.TreeDeco import _tt_project_ self.output = _tt_project_ ( chain , self.output[1] , - self.what , self.cuts , + what , cuts , '' , nentries , first ) del chain @@ -154,27 +150,36 @@ def cproject ( chain , histo , what , cuts ) : For 12-core machine, clear speedup factor of about 8 is achieved """ # - - if not tree : + if not chain : return 0 , histo if not histo : logger.error ('cproject: invalid histogram') return 0 , histo - import ROOT + import ROOT histo.Reset() - if not isinstance ( ROOT , TChain ) : + if not isinstance ( chain , ROOT.TChain ) : logger.warning ('cproject method is TChain-specific, skip parallelization') from Ostap.TreeDeco import _tt_project_ return _tt_project_ ( chain , histo , what , cuts ) + if isinstance ( cuts , ROOT.TCut ) : cuts = str( cuts ) + ## + if isinstance ( what , str ) : what = what.split(',') + if isinstance ( what , str ) : what = what.split(';') + if isinstance ( what , str ) : what = [ what ] + import Ostap.TreeDeco files = chain.files() + + cname = chain.GetName() - task = ProjectTask ( chain , histo , what , cuts ) + params = [ ( f , cname , str(w) , cuts ) for f in files for w in what ] + + task = ProjectTask ( histo ) wmgr = Parallel.WorkManager () - wmgr.process( task, files ) + wmgr.process( task, params ) filtered = task.output[0] histo += task.output[1] @@ -197,9 +202,22 @@ ROOT.TChain.pproject = cproject # @endcode # - significant gain can be achieved for very large ttrees with complicated expressions and cuts # - <code>maxentries</code> parameter should be rather large +# @param tree the tree +# @param histo the histogram +# @param what variable/expression/varlist to be projected +# @param cuts selection/weighting criteria +# @param nentries number of entries to process (>0: all entries in th tree) +# @param first the first entry to process +# @param maxentries chunk size for parallel processing # @author Vanya BELYAEV Ivan.Belyaev@itep.ru # @date 2014-09-23 -def tproject ( tree , histo , what , cuts , nentries = -1 , first = 0 , maxentries = 1000000 ) : +def tproject ( tree , ## the tree + histo , ## histogram + what , ## variable/expression/list to be projected + cuts = '' , ## selection/weighting criteria + nentries = -1 , ## number of entries + first = 0 , ## the first entry + maxentries = 1000000 ) : ## chunk size """Make a projection of the loooong tree into histogram >>> tree = ... ## large chain >>> histo = ... ## histogram template @@ -207,6 +225,14 @@ def tproject ( tree , histo , what , cuts , nentries = -1 , first = 0 , maxentr >>> tree.pproject ( histo , 'mass' , 'pt>10' ) ## ditto - significant gain can be achieved for very large ttrees with complicated expressions and cuts - maxentries parameter should be rather large + Arguments: + - tree the tree + - histo the histogram + - what variable/expression/varlist to be projected + - cuts selection/weighting criteria + - nentries number of entries to process (>0: all entries in th tree) + - first the first entry to process + - maxentries chunk size for parallel processing """ # if not tree : @@ -221,13 +247,13 @@ def tproject ( tree , histo , what , cuts , nentries = -1 , first = 0 , maxentr num = len( tree ) if num <= first : return 0, histo - - if 0 > nentries : nentries = n_large + + if 0 > nentries : nentries = n_large maxentries = long(maxentries) if 0 >= maxentries : maxentries = n_large - if 0 > first : first = 0 + if 0 > first : first = 0 ## use the regular projection from Ostap.TreeDeco import _tt_project_ @@ -235,36 +261,55 @@ def tproject ( tree , histo , what , cuts , nentries = -1 , first = 0 , maxentr if isinstance ( tree , ROOT.TChain ) : logger.warning ('``tproject'' method is TTree-specific, skip parallelization') return _tt_project_ ( tree , histo , what , cuts , '' , total , first ) - - ## total number of events to process : - total = min ( num - first , nentries ) - ## the event range is rather short, no need in parallel processing - if total < maxentries : - return _tt_project_ ( tree , histo , what , cuts , '', total , first ) - + ## ## check if tree is file-resident: tdir = tree.GetDirectory() if tdir and isinstance ( tdir , ROOT.TFile ) : pass else : + logger.debug ('TTree is not file resident, skip parallelization') return _tt_project_ ( tree , histo , what , cuts , '', total , first ) + # + # + if isinstance ( cuts , ROOT.TCut ) : cuts = str( cuts ) + if isinstance ( what , ROOT.TCut ) : what = str( what ) + ## + if isinstance ( what , str ) : what = what.split(',') + if isinstance ( what , str ) : what = what.split(',') + if isinstance ( what , str ) : what = what.split(';') + if isinstance ( what , str ) : what = [ what ] + + ## nothing to project + if not what : + return 0 , histo + + ## total number of events to process : + total = min ( num - first , nentries ) - fname = tdir.GetName() + ## the event range is rather short, no real need in parallel processing + if total * len ( what ) < maxentries and len ( what ) < 4 : + return _tt_project_ ( tree , histo , what , cuts , '', total , first ) - ## number of jobs & reminder - njobs, rest = divmod ( total , maxentries ) - csize = int ( total / njobs ) ## chunk size + fname = tdir.GetName() + tname = tree.GetName() + + ## number of chunks & reminder + nchunks , rest = divmod ( total , maxentries ) + csize = int ( total / nchunks ) ## chunk size - ## final list of parameters [ (file_name, first_event , num_events ) , ... ] + ## final list of parameters [ (file_name, what , cuts , first_event , num_events ) , ... ] params = [] - for i in range(njobs) : - params.append ( ( fname , first + i * csize , csize ) ) - + + for i in range(nchunks) : + for w in what : + params.append ( ( fname , tname , str(w) , cuts , first + i * csize , csize ) ) + if rest : - params.append ( ( fname , first + njobs * csize , rest ) ) - njobs += 1 + nchunks += 1 + for w in what : + params.append ( ( fname , tname , str(w) , cuts , first + nchunks * csize , rest ) ) - task = ProjectTask ( tree , histo , what , cuts ) + task = ProjectTask ( histo ) wmgr = Parallel.WorkManager () wmgr.process( task, params ) diff --git a/Analysis/Ostap/python/Ostap/TreeDeco.py b/Analysis/Ostap/python/Ostap/TreeDeco.py index 9459babaa..8eae890f7 100755 --- a/Analysis/Ostap/python/Ostap/TreeDeco.py +++ b/Analysis/Ostap/python/Ostap/TreeDeco.py @@ -192,12 +192,26 @@ ROOT.TChain.__call__ = _tc_call_ # # >>> h1 = ROOT.TH1D(... ) # >>> tree.project ( h1 , 'm', 'chi2<10' ) ## use histo +# +# ## make invididual projections of 'm1' and 'm2' and make a sum of distributions +# >>> h1 = ROOT.TH1D(... ) +# >>> tree.project ( h1 , ['m1','m2'] , 'chi2<10' ) ## use histo +# +# ## make invididual projections of 'm1' and 'm2' and make a sum of distributions +# >>> h1 = ROOT.TH1D(... ) +# >>> tree.project ( h1 , "m1,m2" , 'chi2<10' ) +# >>> tree.project ( h1 , "m1;m2" , 'chi2<10' ) # @endcode # +# @param tree the tree +# @param histo the histogram or histogram name +# @param what variable/expression to be projected. +# It could be a list/tuple of variables/expressions or just a comma-separated expression +# @param cuts expression for cuts/weights # @see TTree::Project # @author Vanya BELYAEV Ivan.Belyaev@itep.ru # @date 2013-07-06 -def _tt_project_ ( tree , histo , what , *args ) : +def _tt_project_ ( tree , histo , what , cuts = '' , *args ) : """Helper project method >>> tree = ... @@ -209,29 +223,85 @@ def _tt_project_ ( tree , histo , what , *args ) : >>> tree.project ( h1.GetName() , 'm', 'chi2<10' ) ## ditto >>> h1 = ROOT.TH1D(... ) - >>> tree.project ( h1 , 'm', 'chi2<10' ) ## use histo - + >>> tree.project ( h1 , 'm', 'chi2<10' ) ## use histo + + ## make invididual projections of m1 and m2 and make a sum of distributions + >>> h1 = ROOT.TH1D(... ) + >>> tree.project ( h1 , ('m1','m2') , 'chi2<10' ) ## two variables + >>> tree.project ( h1 , 'm1,m2' , 'chi2<10' ) ## ditto + >>> tree.project ( h1 , 'm1;m2' , 'chi2<10' ) ## ditto + + - tree : the tree + - histo : the histogram (or histogram name) + - what : variable/expression to project. It can be expression or list/tuple of expression or comma (or semicolumn) separated expression + - cuts : selection criteria/weights """ # hname = histo - if hasattr ( histo , 'GetName' ) : hname = histo.GetName() + if hasattr ( histo , 'GetName' ) : hname = histo.GetName() + ## elif isinstance ( histo , str ) : + ## h = ROOT.gROOT.FindObject ( hname ) + ## if h : histo = h + + ## reset it! + if histo and isinstance ( histo , ROOT.TH1 ) : histo.Reset() + # + if isinstance ( cuts , ROOT.TCut ) : cuts = str(cuts) + if not what : return 0, histo # - if args and isinstance ( args[0] , ROOT.TCut ) : - _args = list ( args ) - _args [0] = str ( _args[0] ) - args = tuple ( _args ) + ## trivial 1-item list + if hasattr ( what , '__len__' ) and 1 == len ( what ) and not isinstance ( what , (str, ROOT.TCut) ): + what = what[0] + + ## check for comma-separated list of expressions: + if isinstance ( what , str ) : + what = what.split(',') + if 1 == len(what) : what = what[0] + + ## check for semicolumn-separated list of expressions: + if isinstance ( what , str ) : + what = what.split(';') + if 1 == len(what) : what = what[0] + # + if isinstance ( what , str ) : what = what + elif isinstance ( what , ROOT.TCut ) : what = str(what) + elif isinstance ( histo , ROOT.TH1 ) : + rr = 0 + hh = histo.clone() + for v in what : + r , h = _tt_project_ ( tree , hh , v , cuts , *args ) + rr += r + histo += h + hh.Delete() + del hh + return rr , histo + elif isinstance ( histo , str ) : + ## process the head of the list: the first call creates the histo... + rr, hh = _tt_project_ ( tree , histo , what[0] , cuts , *args ) + histo = hh + if 1 == len ( what ) : return rr , histo + # normal processing of the tail of the list using created historgams + hh = histo.clone() + r1 , h1 = _tt_project_ ( tree , hh , what[1:] , cuts , *args ) + rr += r1 + histo += h1 + hh.Delete() + del hh, h1 + return rr , histo + + ## the basic case from Ostap.TFileDeco import ROOTCWD with ROOTCWD() : - ROOT.gROOT.cd () + ROOT.gROOT.cd () ## make projection - result = tree.Project ( hname , what , *args ) + result = tree.Project ( hname , what , cuts , *args ) if isinstance ( histo , ROOT.TH1 ) : return result, histo elif isinstance ( histo , str ) : h = ROOT.gROOT.FindObject ( hname ) if h : return result, h - return result, hname + return result, histo ROOT.TTree .project = _tt_project_ ROOT.TChain.project = _tt_project_ diff --git a/Analysis/Ostap/tests/TestKisa.py b/Analysis/Ostap/tests/TestKisa.py index b442856de..93652e032 100755 --- a/Analysis/Ostap/tests/TestKisa.py +++ b/Analysis/Ostap/tests/TestKisa.py @@ -63,14 +63,14 @@ chain = data.chain with timing('SEQUENTIAL(%s):' % len(chain) , logger ) : - chain.project ( h2 , 'mass' , '50<=mass && mass<=120 && 0<=c2dtf && c2dtf<5' ) + chain. project ( h2 , 'mass' , '50<=mass && mass<=120 && 0<=c2dtf && c2dtf<5' ) logger.info ( h1.dump(100,30) ) import Ostap.Kisa with timing('PARALLEL(%s):' % len(chain) , logger ) : - chain._project ( h2 , 'mass' , '50<=mass && mass<=120 && 0<=c2dtf && c2dtf<5' ) + chain.pproject ( h2 , 'mass' , '50<=mass && mass<=120 && 0<=c2dtf && c2dtf<5' ) logger.info ( h2.dump(100,30) ) -- GitLab