From 3630399bdcad971b23c56e447ef814408430968c Mon Sep 17 00:00:00 2001
From: ibelyaev <Ivan.Belyaev@cern.ch>
Date: Wed, 18 Jan 2017 11:08:04 +0100
Subject: [PATCH] Analysis/Ostap: imrpove parallelization functionality !
 2017-01-18 - Vanya Belyaev  - Kisa.py:    - improve parallelization    - add
 parallelization methods for projection of looooong trees

---
 Analysis/Ostap/doc/release.notes    |   5 +
 Analysis/Ostap/python/Ostap/Kisa.py | 207 ++++++++++++++++++++++------
 2 files changed, 171 insertions(+), 41 deletions(-)

diff --git a/Analysis/Ostap/doc/release.notes b/Analysis/Ostap/doc/release.notes
index 70d0a0dc8..666fed27c 100644
--- a/Analysis/Ostap/doc/release.notes
+++ b/Analysis/Ostap/doc/release.notes
@@ -8,6 +8,11 @@
 !  by                $Author$
 ! -----------------------------------------------------------------------------
 
+! 2017-01-18 - Vanya Belyaev
+ - Kisa.py: 
+   - improve parallelization 
+   - add parallelization methods for projection of looooong trees 
+
 ! 2017-01-16 - Vanya Belyaev
  - GraphDeco: 
    1. couple of minor fixes 
diff --git a/Analysis/Ostap/python/Ostap/Kisa.py b/Analysis/Ostap/python/Ostap/Kisa.py
index 000e1c50b..b6180c8b2 100644
--- a/Analysis/Ostap/python/Ostap/Kisa.py
+++ b/Analysis/Ostap/python/Ostap/Kisa.py
@@ -12,10 +12,6 @@
 #  @author Vanya BELYAEV Ivan.Belyaev@itep.ru
 #  @date   2014-09-23
 #
-#  
-#                    $Revision$
-#  Last modification $Date$
-#  by                $Author$
 # =============================================================================
 """ Multiprocessing functionality for Ostap
 Currently it is not loaded on default, and requires manual activation
@@ -25,9 +21,10 @@ __version__ = "$Revision$"
 __author__  = "Vanya BELYAEV Ivan.Belyaev@itep.ru"
 __date__    = "2011-06-07"
 __all__     = (
-    'ProjectTask' , 
-    'FillTask'    , 
-    'project'     , 
+    'ProjectTask' , ## "project task" for chain/trees
+    'FillTask'     , 
+    'cproject'     , ## project method for loooong TChains 
+    'tproject'     , ## project method for loooong TTree  
     'fillDataSet'
     ) 
 # =============================================================================
@@ -41,30 +38,30 @@ else                       : logger = getLogger ( __name__     )
 # =============================================================================
 logger.debug ( 'Multiprocessing functionality for Ostap')
 # =============================================================================
-
+n_large = 2**63 - 1  ## ROOT.TVirtualTreePlayer.kMaxEntries
+## n_large = ROOT.TVirtualTreePlayer.kMaxEntries
 # =============================================================================
-## The simple task object for more efficient projection of loooong TChains
+## The simple task object for more efficient projection of loooong chains/trees 
 #  into histogarms
 #  @see GaudiMP.Parallel
 #  @see GaudiMP.Parallel.Task
 #  @author Vanya BELYAEV Ivan.Belyaev@itep.ru
 #  @date   2014-09-23
 class ProjectTask(Parallel.Task) :
-    """ The simple task  object for efficient projection of loooong TChains
-    into histogarms  
+    """ The simple task  object for efficient parallel
+    projection of looooooong TChains/TTrees into histograms  
     """
     ## constructor: chain name, historgam , variable , cuts 
     def __init__ ( self , tree , histo , what , cuts = '' ) :
-        """
-        Constructor: chain name, historgam , variable , cuts
+        """Constructor: chain/tree name, historgam , variable , cuts
         
         >>> histo = ...
         >>> task  = ProjectTask ( 'MyTuple' , histo , 'mass' , 'pt>10' ) 
-        """
-        
+        """        
         self.histo = histo
         self.histo.Reset()
         
+        import ROOT
         if   isinstance ( tree , ROOT.TTree ) : self.tree = tree.GetName()
         elif isinstance ( tree , str        ) : self.tree = tree 
         
@@ -73,82 +70,211 @@ class ProjectTask(Parallel.Task) :
 
     ## local initialization (executed once in parent process)
     def initializeLocal   ( self ) :
-        """
-        Local initialization (executed once in parent process)
+        """Local initialization (executed once in parent process)
         """
         import ROOT,Ostap.PyRoUts
-        self.output = self.histo.Clone() 
+        self.output = 0, self.histo.Clone() 
         
     ## remote initialization (executed for each sub-processs)
     def initializeRemote  ( self ) : pass 
-
-    ## the actual processing 
+    
+    ## the actual processing
+    #   ``params'' is assumed to be a tuple :
+    #  - the file name
+    #  - the first entry in tree to process
+    #  - number of entries to process
     def process ( self , params ) :
+        """The actual processing
+        ``params'' is assumed to be a tuple-like entity:
+        0 - the file name
+        1 - the first entry in tree to process 
+        2 - number of entries to process        
+        """
 
         import ROOT
         import Ostap.PyRoUts 
+
+        if   isinstance ( params , str ) : params = ( param , 0 , n_large  )
+        elif isinstance ( params , ROOT.TChainElement ) :
+            params = ( params.GetTitle()  , 0 , n_large  )
+
+        fname    = params[0]
+        first    = params[1] if 1 < len(params) else 0 
+        nentries = params[2] if 2 < len(params) else n_large 
         
-        if   isinstance ( params , str                ) : pars = [ params            ]
-        elif isinstance ( params , ROOT.TChainElement ) : pars = [ params.GetTitle() ] 
+        if isinstance ( fname , ROOT.TChainElement ) : fname = fname.GetTitle() 
         
         chain = ROOT.TChain ( self.tree )
-        for p in pars : chain.Add ( p )
-
+        chain.Add ( fname )
+        
         ## Create the output histogram   NB! (why here???) 
-        self.output = self.histo.Clone()
+        self.output = 0 , self.histo.Clone()
         
         ## use the regular projection  
         from Ostap.TreeDeco import _tt_project_ 
-        _tt_project_ ( chain ,  self.output  , self.what , self.cuts )
-
+        self.output = _tt_project_ ( chain      , self.output[1] ,
+                                     self.what  , self.cuts      ,
+                                     ''         ,
+                                     nentries   , first          )
         del chain 
 
-    ## finalization (execuetd at the end at parent process)
+    ## finalization (executed at the end at parent process)
     def finalize ( self ) : pass 
 
     ## merge results 
     def _mergeResults(self, result) :
         #
-        self.output.Add ( result )
-        result.Delete () 
-
+        filtered    = self.output[0] + result[0] 
+        self.output[1].Add ( result[1] )
+        self.output = filtered, self.output[1]
+        result[1].Delete () 
+   
 # =============================================================================  
-## make a projection of the loooong chain into histogram using
+## make a projection of the loooooooong chain into histogram using
 #  multiprocessing functionality for per-file parallelisation
 #  @code
 #
 #  >>> chain = ... ## large chain
 #  >>> histo = ... ## histogram template 
-#  >>> project ( chain , histo , 'mass' , 'pt>10' )
+#  >>> project        ( chain , histo , 'mass' , 'pt>10' )
+#  >>> chain.pproject ( histo , 'mass' , 'pt>0' ) ## ditto 
 #
 #  @endcode
 #
 #  For 12-core machine, clear speedup factor of about 8 is achieved 
 #  @author Vanya BELYAEV Ivan.Belyaev@itep.ru
 #  @date   2014-09-23
-def  project ( chain , histo , what , cuts ) :
+def  cproject ( chain , histo , what , cuts ) :
     """Make a projection of the loooong chain into histogram
     >>> chain = ... ## large chain
     >>> histo = ... ## histogram template 
-    >>> project ( chain , histo , 'mass' , 'pt>10' )
+    >>> cproject        ( chain , histo , 'mass' , 'pt>10' )
+    >>> chain.ppropject ( histo , 'mass' , 'pt>0' ) ## ditto 
     
     For 12-core machine, clear speedup factor of about 8 is achieved     
     """
     #
+    
+    if not tree  :
+        return 0 , histo
+    if not histo :
+        logger.error ('cproject: invalid histogram')
+        return 0 , histo
+    
+    import ROOT 
     histo.Reset()    
-    files = chain.files() 
 
+    if not isinstance ( ROOT , TChain ) :
+        logger.warning ('cproject method is TChain-specific, skip parallelization') 
+        from Ostap.TreeDeco import _tt_project_
+        return _tt_project_ ( chain , histo , what , cuts ) 
+
+    import Ostap.TreeDeco 
+    files = chain.files()
+    
     task  = ProjectTask          ( chain , histo , what , cuts )
     wmgr  = Parallel.WorkManager ()
     wmgr.process( task, files )
 
-    histo += task.output  
+    filtered   = task.output[0] 
+    histo     += task.output[1]
+    
+    return filtered , histo 
+
+ROOT.TChain.cproject = cproject
+ROOT.TChain.pproject = cproject
+
+# =============================================================================  
+## make a projection of the loooooooong tree into histogram using
+#  multiprocessing functionality for per-file parallelisation
+#  @code
+#
+#  >>> tree  = ... ## large tree 
+#  >>> histo = ... ## histogram template 
+#  >>> tproject ( tree , histo , 'mass' , 'pt>10' , maxentries = 1000000 )
+#  >>> tree.pproject ( histo , 'mass' , 'pt>10' ) ## ditto 
+#  @endcode
+#  - significant gain can be achieved for very large ttrees with complicated expressions and cuts
+#  - <code>maxentries</code> parameter should be rather large
+#  @author Vanya BELYAEV Ivan.Belyaev@itep.ru
+#  @date   2014-09-23
+def  tproject ( tree , histo , what , cuts , nentries = -1 , first = 0 , maxentries = 1000000 ) :
+    """Make a projection of the loooong tree into histogram
+    >>> tree  = ... ## large chain
+    >>> histo = ... ## histogram template 
+    >>> tproject ( tree , histo , 'mass' , 'pt>10' )    
+    >>> tree.pproject ( histo , 'mass' , 'pt>10' )    ## ditto 
+    - significant gain can be achieved for very large ttrees with complicated expressions and cuts
+    - maxentries parameter should be rather large
+    """
+    #
+    if not tree  :
+        return 0 , histo
+    if not histo :
+        logger.error ('tproject: invalid histogram')
+        return 0 , histo
+    
+    import ROOT 
+    histo.Reset()
+    
+    num = len( tree )
+    if num <= first :
+        return 0, histo
+    
+    if 0 >  nentries : nentries   = n_large 
+
+    maxentries = long(maxentries)
+    if 0 >= maxentries : maxentries = n_large 
+    
+    if 0 > first    : first     = 0
+    
+    ## use the regular projection  
+    from Ostap.TreeDeco import _tt_project_ 
 
-    return histo
+    if isinstance ( tree , ROOT.TChain ) :
+        logger.warning ('``tproject'' method is TTree-specific, skip parallelization')
+        return _tt_project_ ( tree , histo , what , cuts , '' , total , first ) 
+        
+    ## total number of events to process :
+    total = min ( num - first , nentries ) 
 
+    ## the event range is rather short, no need  in parallel processing
+    if total < maxentries : 
+        return _tt_project_ ( tree ,  histo , what , cuts , '', total , first  )
+    
+    ## check if tree is file-resident:
+    tdir = tree.GetDirectory()
+    if tdir and isinstance ( tdir , ROOT.TFile ) : pass
+    else :
+        return _tt_project_ ( tree ,  histo , what , cuts , '', total , first  )
+
+    fname = tdir.GetName()
+    
+    ## number of jobs & reminder 
+    njobs, rest = divmod ( total , maxentries )
+    csize       = int ( total / njobs ) ## chunk size 
+
+    ## final list of parameters [ (file_name, first_event , num_events ) , ... ] 
+    params = []
+    for i in range(njobs) : 
+        params.append ( ( fname , first +     i * csize , csize ) )
+        
+    if rest :
+        params.append ( ( fname , first + njobs * csize , rest  ) )
+        njobs +=  1
+
+    task  = ProjectTask          ( tree , histo , what , cuts )
+    wmgr  = Parallel.WorkManager ()
+    wmgr.process( task, params )
+
+    filtered   = task.output[0] 
+    histo     += task.output[1]
+    
+    return filtered , histo 
 
 import ROOT 
-ROOT.TChain._project = project
+ROOT.TTree.tproject = tproject
+ROOT.TTree.pproject = tproject
 
 # =============================================================================
 ## The simple task object for more efficient fill of RooDataSet from TChain 
@@ -159,8 +285,7 @@ ROOT.TChain._project = project
 #  @author Vanya BELYAEV Ivan.Belyaev@itep.ru
 #  @date   2014-09-23 
 class  FillTask(Parallel.Task) :
-    """
-    The single task object for more efficient fill of RooDataSet from TChain 
+    """The single task object for more efficient fill of RooDataSet from TChain 
     - for 12-core machine, clear speed-up factor of about 8 is achieved 
     """
     ## 
-- 
GitLab