From 938d3b96a2861f0bf566fccd015e2439dd20866b Mon Sep 17 00:00:00 2001
From: ibelyaev <Ivan.Belyaev@cern.ch>
Date: Tue, 24 Jan 2017 15:40:42 +0100
Subject: [PATCH] Analysis/Ostap: improve paralellisation ! 2017-01-24 - Vanya
 Belyaev  - Kisa.py, TreeDeco.py    - improve tree/chain -> histogram
 projections    - more functionality, better parallelisation

---
 Analysis/Ostap/doc/release.notes        |   5 +
 Analysis/Ostap/python/Ostap/Kisa.py     | 133 ++++++++++++++++--------
 Analysis/Ostap/python/Ostap/TreeDeco.py |  92 ++++++++++++++--
 Analysis/Ostap/tests/TestKisa.py        |   4 +-
 4 files changed, 177 insertions(+), 57 deletions(-)

diff --git a/Analysis/Ostap/doc/release.notes b/Analysis/Ostap/doc/release.notes
index 666fed27c..205f824fc 100644
--- a/Analysis/Ostap/doc/release.notes
+++ b/Analysis/Ostap/doc/release.notes
@@ -8,6 +8,11 @@
 !  by                $Author$
 ! -----------------------------------------------------------------------------
 
+! 2017-01-24 - Vanya Belyaev
+ - Kisa.py, TreeDeco.py
+   - improve tree/chain -> histogram projections
+   - more functionality, better parallelisation 
+
 ! 2017-01-18 - Vanya Belyaev
  - Kisa.py: 
    - improve parallelization 
diff --git a/Analysis/Ostap/python/Ostap/Kisa.py b/Analysis/Ostap/python/Ostap/Kisa.py
index b1b924c1a..7a69e241f 100644
--- a/Analysis/Ostap/python/Ostap/Kisa.py
+++ b/Analysis/Ostap/python/Ostap/Kisa.py
@@ -51,23 +51,16 @@ class ProjectTask(Parallel.Task) :
     """ The simple task  object for efficient parallel
     projection of looooooong TChains/TTrees into histograms  
     """
-    ## constructor: chain name, historgam , variable , cuts 
-    def __init__ ( self , tree , histo , what , cuts = '' ) :
-        """Constructor: chain/tree name, historgam , variable , cuts
+    ## constructor: historgam , variable , cuts 
+    def __init__ ( self , histo ) :
+        """Constructor: historgam 
         
         >>> histo = ...
-        >>> task  = ProjectTask ( 'MyTuple' , histo , 'mass' , 'pt>10' ) 
+        >>> task  = ProjectTask ( histo ) 
         """        
         self.histo = histo
         self.histo.Reset()
         
-        import ROOT
-        if   isinstance ( tree , ROOT.TTree ) : self.tree = tree.GetName()
-        elif isinstance ( tree , str        ) : self.tree = tree 
-        
-        self.what = what
-        self.cuts = cuts
-
     ## local initialization (executed once in parent process)
     def initializeLocal   ( self ) :
         """Local initialization (executed once in parent process)
@@ -98,13 +91,16 @@ class ProjectTask(Parallel.Task) :
         elif isinstance ( params , ROOT.TChainElement ) :
             params = ( params.GetTitle()  , 0 , n_large  )
 
-        fname    = params[0]
-        first    = params[1] if 1 < len(params) else 0 
-        nentries = params[2] if 2 < len(params) else n_large 
+        fname    = params[0] ## file name 
+        tname    = params[1] ## tree name 
+        what     = params[2] ## variable/expression to project 
+        cuts     = params[3] if 3 < len ( params ) else ''       ## cuts    
+        first    = params[4] if 4 < len ( params ) else 0        ## the first event
+        nentries = params[5] if 5 < len ( params ) else n_large  ## number of events 
         
         if isinstance ( fname , ROOT.TChainElement ) : fname = fname.GetTitle() 
         
-        chain = ROOT.TChain ( self.tree )
+        chain = ROOT.TChain ( tname )
         chain.Add ( fname )
         
         ## Create the output histogram   NB! (why here???) 
@@ -113,7 +109,7 @@ class ProjectTask(Parallel.Task) :
         ## use the regular projection  
         from Ostap.TreeDeco import _tt_project_ 
         self.output = _tt_project_ ( chain      , self.output[1] ,
-                                     self.what  , self.cuts      ,
+                                     what       , cuts           ,
                                      ''         ,
                                      nentries   , first          )
         del chain 
@@ -154,27 +150,36 @@ def  cproject ( chain , histo , what , cuts ) :
     For 12-core machine, clear speedup factor of about 8 is achieved     
     """
     #
-    
-    if not tree  :
+    if not chain :
         return 0 , histo
     if not histo :
         logger.error ('cproject: invalid histogram')
         return 0 , histo
     
-    import ROOT 
+    import ROOT
     histo.Reset()    
 
-    if not isinstance ( ROOT , TChain ) :
+    if not isinstance ( chain , ROOT.TChain ) :
         logger.warning ('cproject method is TChain-specific, skip parallelization') 
         from Ostap.TreeDeco import _tt_project_
         return _tt_project_ ( chain , histo , what , cuts ) 
 
+    if isinstance ( cuts , ROOT.TCut ) : cuts = str( cuts )
+    ##
+    if isinstance ( what  , str ) : what = what.split(',')
+    if isinstance ( what  , str ) : what = what.split(';')
+    if isinstance ( what  , str ) : what = [ what ] 
+
     import Ostap.TreeDeco 
     files = chain.files()
+
+    cname = chain.GetName() 
     
-    task  = ProjectTask          ( chain , histo , what , cuts )
+    params = [ ( f , cname , str(w) , cuts ) for f in files for w in what ] 
+
+    task  = ProjectTask          ( histo )
     wmgr  = Parallel.WorkManager ()
-    wmgr.process( task, files )
+    wmgr.process( task, params )
 
     filtered   = task.output[0] 
     histo     += task.output[1]
@@ -197,9 +202,22 @@ ROOT.TChain.pproject = cproject
 #  @endcode
 #  - significant gain can be achieved for very large ttrees with complicated expressions and cuts
 #  - <code>maxentries</code> parameter should be rather large
+#  @param tree       the tree
+#  @param histo      the histogram
+#  @param what       variable/expression/varlist to be projected
+#  @param cuts       selection/weighting criteria 
+#  @param nentries   number of entries to process  (>0: all entries in th tree)
+#  @param first      the first entry to process
+#  @param maxentries chunk size for parallel processing 
 #  @author Vanya BELYAEV Ivan.Belyaev@itep.ru
 #  @date   2014-09-23
-def  tproject ( tree , histo , what , cuts , nentries = -1 , first = 0 , maxentries = 1000000 ) :
+def  tproject ( tree                 ,   ## the tree 
+                histo                ,   ## histogram 
+                what                 ,   ## variable/expression/list to be projected 
+                cuts       = ''      ,   ## selection/weighting criteria 
+                nentries   = -1      ,   ## number of entries 
+                first      =  0      ,   ## the first entry 
+                maxentries = 1000000 ) : ## chunk size 
     """Make a projection of the loooong tree into histogram
     >>> tree  = ... ## large chain
     >>> histo = ... ## histogram template 
@@ -207,6 +225,14 @@ def  tproject ( tree , histo , what , cuts , nentries = -1 , first = 0 , maxentr
     >>> tree.pproject ( histo , 'mass' , 'pt>10' )    ## ditto 
     - significant gain can be achieved for very large ttrees with complicated expressions and cuts
     - maxentries parameter should be rather large
+    Arguments:
+    - tree       the tree
+    - histo      the histogram
+    - what       variable/expression/varlist to be projected
+    - cuts       selection/weighting criteria 
+    - nentries   number of entries to process  (>0: all entries in th tree)
+    - first      the first entry to process
+    - maxentries chunk size for parallel processing 
     """
     #
     if not tree  :
@@ -221,13 +247,13 @@ def  tproject ( tree , histo , what , cuts , nentries = -1 , first = 0 , maxentr
     num = len( tree )
     if num <= first :
         return 0, histo
-    
-    if 0 >  nentries : nentries   = n_large 
+
+    if 0 >  nentries   : nentries   = n_large 
 
     maxentries = long(maxentries)
     if 0 >= maxentries : maxentries = n_large 
     
-    if 0 > first    : first     = 0
+    if 0 > first       : first      = 0
     
     ## use the regular projection  
     from Ostap.TreeDeco import _tt_project_ 
@@ -235,36 +261,55 @@ def  tproject ( tree , histo , what , cuts , nentries = -1 , first = 0 , maxentr
     if isinstance ( tree , ROOT.TChain ) :
         logger.warning ('``tproject'' method is TTree-specific, skip parallelization')
         return _tt_project_ ( tree , histo , what , cuts , '' , total , first ) 
-        
-    ## total number of events to process :
-    total = min ( num - first , nentries ) 
 
-    ## the event range is rather short, no need  in parallel processing
-    if total < maxentries : 
-        return _tt_project_ ( tree ,  histo , what , cuts , '', total , first  )
-    
+    ##
     ## check if tree is file-resident:
     tdir = tree.GetDirectory()
     if tdir and isinstance ( tdir , ROOT.TFile ) : pass
     else :
+        logger.debug ('TTree is not file resident, skip parallelization') 
         return _tt_project_ ( tree ,  histo , what , cuts , '', total , first  )
+    # 
+    # 
+    if isinstance ( cuts , ROOT.TCut ) : cuts = str( cuts )
+    if isinstance ( what , ROOT.TCut ) : what = str( what )
+    ##
+    if isinstance ( what , str ) : what = what.split(',')
+    if isinstance ( what , str ) : what = what.split(',')
+    if isinstance ( what , str ) : what = what.split(';')
+    if isinstance ( what , str ) : what = [ what ] 
+
+    ## nothing to project 
+    if not what :
+        return 0 , histo
+        
+    ## total number of events to process :
+    total = min ( num - first , nentries ) 
 
-    fname = tdir.GetName()
+    ## the event range is rather short, no real need  in parallel processing
+    if total * len ( what ) < maxentries and len ( what ) < 4 : 
+        return _tt_project_ ( tree ,  histo , what , cuts , '', total , first  )
     
-    ## number of jobs & reminder 
-    njobs, rest = divmod ( total , maxentries )
-    csize       = int ( total / njobs ) ## chunk size 
+    fname = tdir.GetName()
+    tname = tree.GetName()
+
+    ## number of chunks & reminder 
+    nchunks , rest = divmod ( total , maxentries )
+    csize          = int ( total / nchunks ) ## chunk size 
 
-    ## final list of parameters [ (file_name, first_event , num_events ) , ... ] 
+    ## final list of parameters [ (file_name, what , cuts , first_event , num_events ) , ... ] 
     params = []
-    for i in range(njobs) : 
-        params.append ( ( fname , first +     i * csize , csize ) )
-        
+
+    for i in range(nchunks) :
+        for w in what : 
+            params.append ( ( fname , tname , str(w) , cuts , first +       i * csize , csize ) )
+            
     if rest :
-        params.append ( ( fname , first + njobs * csize , rest  ) )
-        njobs +=  1
+        nchunks +=  1
+        for w in what : 
+            params.append ( ( fname , tname , str(w) , cuts , first + nchunks * csize , rest  ) )
 
-    task  = ProjectTask          ( tree , histo , what , cuts )
+    task  = ProjectTask          ( histo )
     wmgr  = Parallel.WorkManager ()
     wmgr.process( task, params )
 
diff --git a/Analysis/Ostap/python/Ostap/TreeDeco.py b/Analysis/Ostap/python/Ostap/TreeDeco.py
index 9459babaa..8eae890f7 100755
--- a/Analysis/Ostap/python/Ostap/TreeDeco.py
+++ b/Analysis/Ostap/python/Ostap/TreeDeco.py
@@ -192,12 +192,26 @@ ROOT.TChain.__call__  = _tc_call_
 #    
 #    >>> h1   = ROOT.TH1D(... )
 #    >>> tree.project ( h1           , 'm', 'chi2<10' ) ## use histo
+# 
+#    ## make invididual projections of 'm1' and 'm2' and make a sum of distributions
+#    >>> h1   = ROOT.TH1D(... )
+#    >>> tree.project ( h1           , ['m1','m2'] , 'chi2<10' ) ## use histo
+#
+#    ## make invididual projections of 'm1' and 'm2' and make a sum of distributions
+#    >>> h1   = ROOT.TH1D(... )
+#    >>> tree.project ( h1           , "m1,m2"     , 'chi2<10' )
+#    >>> tree.project ( h1           , "m1;m2"     , 'chi2<10' )
 #  @endcode
 #
+#  @param tree   the tree
+#  @param histo  the histogram or histogram name 
+#  @param what variable/expression to be projected.
+#              It could be a list/tuple of variables/expressions or just a comma-separated expression
+#  @param cuts expression for cuts/weights
 #  @see TTree::Project
 #  @author Vanya BELYAEV Ivan.Belyaev@itep.ru
 #  @date   2013-07-06
-def _tt_project_ ( tree , histo , what , *args ) :
+def _tt_project_ ( tree , histo , what , cuts = '' , *args ) :
     """Helper project method
     
     >>> tree = ...
@@ -209,29 +223,85 @@ def _tt_project_ ( tree , histo , what , *args ) :
     >>> tree.project ( h1.GetName() , 'm', 'chi2<10' ) ## ditto 
     
     >>> h1   = ROOT.TH1D(... )
-    >>> tree.project ( h1           , 'm', 'chi2<10' ) ## use histo 
-    
+    >>> tree.project ( h1           ,  'm', 'chi2<10' ) ## use histo
+
+    ## make invididual projections of m1 and m2 and make a sum of distributions
+    >>> h1   = ROOT.TH1D(... )
+    >>> tree.project ( h1           , ('m1','m2') , 'chi2<10' ) ## two variables 
+    >>> tree.project ( h1           , 'm1,m2'     , 'chi2<10' ) ## ditto
+    >>> tree.project ( h1           , 'm1;m2'     , 'chi2<10' ) ## ditto
+    
+    - tree  : the tree
+    - histo : the histogram (or histogram name)
+    - what  : variable/expression to project. It can be expression or list/tuple of expression or comma (or semicolumn) separated expression
+    - cuts  : selection criteria/weights 
     """
     #
     hname = histo 
-    if hasattr (  histo , 'GetName' ) : hname = histo.GetName()
+    if   hasattr    ( histo , 'GetName' ) : hname = histo.GetName()
+    ## elif isinstance ( histo , str       ) : 
+    ##    h = ROOT.gROOT.FindObject ( hname )
+    ##    if h : histo = h
+
+    ## reset it!
+    if histo and isinstance ( histo , ROOT.TH1  ) : histo.Reset()
+    #
+    if isinstance ( cuts  , ROOT.TCut ) : cuts = str(cuts) 
+    if not what : return 0, histo
     #
-    if args and isinstance ( args[0] , ROOT.TCut ) :
-        _args     = list  ( args     )
-        _args [0] = str   ( _args[0] )  
-        args      = tuple ( _args    )
+    ## trivial 1-item list
+    if hasattr ( what , '__len__' ) and 1 == len ( what ) and not isinstance ( what , (str, ROOT.TCut) ): 
+        what = what[0]
+
+    ## check for comma-separated list of expressions:
+    if isinstance ( what , str ) :
+        what = what.split(',')
+        if 1 == len(what) : what = what[0]
+
+    ## check for semicolumn-separated list of expressions:
+    if isinstance ( what , str ) :
+        what = what.split(';')
+        if 1 == len(what) : what = what[0]
+
     #
+    if   isinstance ( what  , str       ) : what =     what 
+    elif isinstance ( what  , ROOT.TCut ) : what = str(what)  
+    elif isinstance ( histo , ROOT.TH1  ) : 
+        rr = 0 
+        hh = histo.clone()
+        for v in what :
+            r , h  = _tt_project_ ( tree , hh , v , cuts , *args )
+            rr    += r
+            histo += h
+        hh.Delete()
+        del hh 
+        return rr , histo
+    elif isinstance ( histo , str ) :
+        ## process the head of the list: the first call creates the histo... 
+        rr, hh =  _tt_project_ ( tree , histo , what[0] , cuts , *args )
+        histo  = hh
+        if 1 == len ( what )   : return rr , histo
+        # normal processing of the tail of the list using created historgams 
+        hh      = histo.clone()
+        r1 , h1 = _tt_project_ ( tree , hh , what[1:] , cuts , *args )
+        rr     += r1
+        histo  += h1
+        hh.Delete()
+        del hh, h1 
+        return rr , histo
+
+    ## the basic case 
     from Ostap.TFileDeco import ROOTCWD
     with ROOTCWD() :
-        ROOT.gROOT.cd () 
+        ROOT.gROOT.cd ()
         ## make projection 
-        result = tree.Project ( hname , what , *args )
+        result = tree.Project ( hname , what , cuts , *args )
         if   isinstance ( histo , ROOT.TH1 ) : return result, histo
         elif isinstance ( histo , str      ) :
             h = ROOT.gROOT.FindObject ( hname )
             if h : return result, h
             
-    return result, hname
+    return result, histo
 
 ROOT.TTree .project = _tt_project_
 ROOT.TChain.project = _tt_project_
diff --git a/Analysis/Ostap/tests/TestKisa.py b/Analysis/Ostap/tests/TestKisa.py
index b442856de..93652e032 100755
--- a/Analysis/Ostap/tests/TestKisa.py
+++ b/Analysis/Ostap/tests/TestKisa.py
@@ -63,14 +63,14 @@ chain = data.chain
 
 
 with timing('SEQUENTIAL(%s):' % len(chain) , logger ) :
-    chain.project ( h2 , 'mass' , '50<=mass && mass<=120 && 0<=c2dtf && c2dtf<5' )
+    chain. project ( h2 , 'mass' , '50<=mass && mass<=120 && 0<=c2dtf && c2dtf<5' )
 
 logger.info ( h1.dump(100,30) ) 
 
 import Ostap.Kisa
 
 with timing('PARALLEL(%s):' % len(chain) , logger ) :
-    chain._project ( h2 , 'mass' , '50<=mass && mass<=120 && 0<=c2dtf && c2dtf<5' )
+    chain.pproject ( h2 , 'mass' , '50<=mass && mass<=120 && 0<=c2dtf && c2dtf<5' )
 
 logger.info ( h2.dump(100,30) ) 
 
-- 
GitLab