From 175f313deae9f2e2948c244208e212ad3bf2f8f0 Mon Sep 17 00:00:00 2001 From: Tomasz Bold <tomasz.bold@gmail.com> Date: Wed, 22 May 2019 08:24:59 +0000 Subject: [PATCH] Fixes sequences merging in ComponentAccumulator --- .../python/ComponentAccumulator.py | 50 +++++++++++++++---- .../python/ComponentAccumulatorTest.py | 47 ++++++++++++++++- .../python/MainServicesConfig.py | 4 +- 3 files changed, 87 insertions(+), 14 deletions(-) diff --git a/Control/AthenaConfiguration/python/ComponentAccumulator.py b/Control/AthenaConfiguration/python/ComponentAccumulator.py index 177c63d1ab6..e33ee50e983 100644 --- a/Control/AthenaConfiguration/python/ComponentAccumulator.py +++ b/Control/AthenaConfiguration/python/ComponentAccumulator.py @@ -14,6 +14,7 @@ from AthenaConfiguration.Deduplication import deduplicate, deduplicateComponent, import ast import collections import six +import copy from AthenaConfiguration.UnifyProperties import unifySet @@ -26,13 +27,14 @@ class ComponentAccumulator(object): def __init__(self,sequenceName='AthAlgSeq'): self._msg=logging.getLogger('ComponentAccumulator') - if not Configurable.configurableRun3Behavior: msg = "discoverd Configurable.configurableRun3Behavior=False while working with ComponentAccumulator" self._msg.error(msg) raise ConfigurationError(msg) - self._sequence=AthSequencer(sequenceName,Sequential=True) #(Nested) sequence of event processing algorithms per sequence + their private tools + self._sequence=AthSequencer(sequenceName,Sequential=True) #(Nested) default sequence of event processing algorithms per sequence + their private tools + self._allSequences = [ self._sequence ] + self._algorithms = {} #Flat algorithms list, useful for merging self._conditionsAlgs=[] #Unordered list of conditions algorithms + their private tools self._services=[] #List of service, not yet sure if the order matters here in the MT age self._eventInputs=set() #List of items (as strings) to be read from the input (required at least for BS-reading). @@ -49,7 +51,7 @@ class ComponentAccumulator(object): self._wasMerged=False self._isMergable=True - self._algorithms = {} + def empty(self): return len(self._sequence)+len(self._conditionsAlgs)+len(self._services)+\ @@ -112,7 +114,10 @@ class ComponentAccumulator(object): self._msg.info( " "*nestLevel +"\\__ "+ c.name() +" (alg)" ) if summariseProps: printProperties(c, nestLevel) - printSeqAndAlgs(self._sequence) + + for n,s in enumerate(self._allSequences): + self._msg.info( "Top sequence {}".format(n) ) + printSeqAndAlgs(s) self._msg.info( "Condition Algorithms" ) self._msg.info( [ a.getName() for a in self._conditionsAlgs ] ) @@ -425,19 +430,39 @@ class ComponentAccumulator(object): existingAlgInDest = findAlgorithm( dest, c.name(), depth=1 ) if not existingAlgInDest: - self._msg.debug("Adding algorithm %s to a sequence %s", c.name(), dest.name() ) + self._msg.debug(" Adding algorithm %s to a sequence %s", c.name(), dest.name() ) dest += c checkSequenceConsistency(self._sequence) - #Merge sequences: - #if (self._sequence.getName()==other._sequence.getName()): - if sequenceName: - destSeq = self.getSequence( sequenceName ) + # Merge sequences: + # mergeSequences(destSeq, other._sequence) + # if sequenceName is provided it means we should be ignoring the actual main seq name there and use the sequenceName + # that means the first search in the destination seqence needs to be cheated + # the sequenceName argument is ambigous when the other CA has more than one sequence and this is checked + + if sequenceName is None: + for otherSeq in other._allSequences: + found=False + for ourSeq in self._allSequences: + ourSeq = findSubSequence(ourSeq, otherSeq.name()) # try to add sequence to the main structure first, to each seq in parent? + if ourSeq: + mergeSequences(ourSeq, otherSeq) + found=True + self._msg.debug(" Succeeded to merge sequence %s to %s", otherSeq.name(), ourSeq.name() ) + else: + self._msg.debug(" Failed to merge sequence %s to any existing one, destination CA will have several top/dangling sequences", otherSeq.name() ) + if not found: # just copy the sequence as a dangling one + self._allSequences.append( copy.copy(otherSeq) ) + mergeSequences( self._allSequences[-1], otherSeq ) else: - destSeq=findSubSequence(self._sequence,other._sequence.name()) or self._sequence - mergeSequences(destSeq,other._sequence) + if len(other._allSequences) > 1: + raise ConfigurationError('Merging of the accumulator that has mutiple top level sequences and changing the destination sequence is not supported') + destSeq = self.getSequence(sequenceName) if sequenceName else self._sequence + mergeSequences(destSeq, other._sequence) + + # Additional checking and updating other accumulator's algorithms list for name, alg in six.iteritems(other._algorithms): if name not in self._algorithms: @@ -576,6 +601,9 @@ class ComponentAccumulator(object): basecfg = MainServicesSerialCfg() basecfg.merge(self) self = basecfg + if len(self._allSequences) != 1: + raise ConfigurationError('It is not allowed for the storable CA to have more than one top sequence, now it has: {}'\ + .format(','.join([ s.name() for s in self._allSequences]))) self._jocat={} self._jocfg={} self._pycomps={} diff --git a/Control/AthenaConfiguration/python/ComponentAccumulatorTest.py b/Control/AthenaConfiguration/python/ComponentAccumulatorTest.py index 6ed69c87906..f03b9686c01 100644 --- a/Control/AthenaConfiguration/python/ComponentAccumulatorTest.py +++ b/Control/AthenaConfiguration/python/ComponentAccumulatorTest.py @@ -395,7 +395,9 @@ class TestDeduplication( unittest.TestCase ): with self.assertRaises(DeduplicationFailed): result3.addService(dummyService(AString="blaOther")) - result3.wasMerged() + + [ ca.wasMerged() for ca in [result1, result2, result3]] + class TestMergeComponentsFromDifferentBranches( unittest.TestCase ): @@ -477,5 +479,48 @@ class TestMergeComponentsFromDifferentBranches( unittest.TestCase ): ca.wasMerged() +class TestSequencesMerging( unittest.TestCase ): + def setUp(self): + Configurable.configurableRun3Behavior=1 + + def test_sequences_merging(self): + from AthenaConfiguration.AllConfigFlags import ConfigFlags + ConfigFlags.lock() + from AthenaCommon.Logging import logging + logging.getLogger('ComponentAccumulator').setLevel(DEBUG) + + + print("ca1") + ca1 = ComponentAccumulator() + ca1.addEventAlgo(ConfigurablePyAlgorithm("alg1")) + ca1.printConfig() + + print("ca2") + from OutputStreamAthenaPool.OutputStreamConfig import OutputStreamCfg + ca2 = OutputStreamCfg(ConfigFlags, "RDO", ItemList = [ + "SCT_RDO_Container#SCT_RDOs", + "InDetSimDataCollection#SCT_SDO_Map" + ]) + ca2.printConfig() + + print("after merge") + ca1.merge(ca2) + ca1.printConfig() + + self.assertEqual( len(ca1._allSequences), 2, "Dangling sequences not maintained" ) + + print("Instantiating top CA") + from MainServicesConfig import MainServicesThreadedCfg + topca = MainServicesThreadedCfg( ConfigFlags ) + topca.printConfig() + + + + + print("Merging to the top level CA") + topca.merge( ca1 ) + topca.printConfig() + topca.wasMerged() + if __name__ == "__main__": unittest.main() diff --git a/Control/AthenaConfiguration/python/MainServicesConfig.py b/Control/AthenaConfiguration/python/MainServicesConfig.py index 609d298997e..a14865399a5 100644 --- a/Control/AthenaConfiguration/python/MainServicesConfig.py +++ b/Control/AthenaConfiguration/python/MainServicesConfig.py @@ -39,7 +39,6 @@ def MainServicesThreadedCfg(cfgFlags): ######################################################################## # Core components needed for serial and threaded jobs cfg=ComponentAccumulator("AthMasterSeq") - cfg.merge( MainServicesMiniCfg(LoopMgr) ) cfg.setAppProperty('TopAlg',['AthSequencer/AthMasterSeq'],overwrite=True) cfg.setAppProperty('OutStreamType', 'AthenaOutputStream') @@ -64,7 +63,8 @@ def MainServicesThreadedCfg(cfgFlags): cfg.addSequence(AthSequencer('AthCondSeq',StopOverride=True),parentName='AthAllAlgSeq') cfg.addSequence(AthSequencer('AthEndSeq',Sequential=True),parentName='AthAlgEvtSeq') - + cfg.merge( MainServicesMiniCfg(LoopMgr) ) + #Set up incident firing: from AthenaServices.AthenaServicesConf import AthIncFirerAlg from GaudiCoreSvc.GaudiCoreSvcConf import IncidentProcAlg -- GitLab