Skip to content
Snippets Groups Projects
Commit 6cf6ca2c authored by Charles Leggett's avatar Charles Leggett
Browse files

Resurrect the AthenaScenario test in Gaudi Hive

The test was broken (produced a stall). This merge request introduces the migration of the test to the graph-based scheduler, which returns the test back to life.

See [GAUDI-1239](https://its.cern.ch/jira/browse/GAUDI-1239) for the work/test log.

See merge request !190
parents e3d4b07b b899983c
No related branches found
No related tags found
1 merge request!190Resurrect the AthenaScenario test in Gaudi Hive
Pipeline #
import json
from Gaudi.Configuration import *
# ============================================================================
from Configurables import GaudiExamplesCommonConf, CPUCruncher,HiveEventLoopMgr, HiveWhiteBoard
from Configurables import (GaudiExamplesCommonConf,
CPUCruncher,
HiveWhiteBoard,
ForwardSchedulerSvc,
HiveSlimEventLoopMgr)
#GaudiExamplesCommonConf()
# ============================================================================
# ============================================================================
#-------------------------------------------------------------------------------
# Metaconfig
NUMBEROFEVENTS = 1
NUMBEROFEVENTSINFLIGHT = 1
NUMBEROFALGOSINFLIGHT = 1000
NUMBEROFTHREADS = 1
NUMBEROFALGOSINFLIGHT = 1
NUMBEROFTHREADS = NUMBEROFALGOSINFLIGHT
CLONEALGOS = False
DUMPQUEUES = False
VERBOSITY = 3
......@@ -45,19 +49,19 @@ def load_athena_scenario(filename):
cleaned_outputs = [output for output in algo["outputs"] if (output not in all_outputs)]
new_algo = CPUCruncher(algo["name"],
avgRuntime=float(algo["runtimes"][0]/1000000.),
DataInputs = cleaned_inputs,
DataOutputs = cleaned_outputs
inpKeys = cleaned_inputs,
outKeys = cleaned_outputs
)
cpu_cruncher_algos.append(new_algo)
all_outputs.update(algo["outputs"])
all_inputs.update(algo["inputs"])
cpu_cruncher_algos_inputs.append(algo["inputs"])
#look for the objects that haven't been provided within the job. Assume this needs to come via input
new_algo = CPUCruncher("input",
avgRuntime=1,
DataInputs=[],
DataOutputs=[item for item in all_inputs.difference(all_outputs)]
inpKeys=[],
outKeys=[item for item in all_inputs.difference(all_outputs)]
)
cpu_cruncher_algos.append(new_algo)
cpu_cruncher_algos_inputs.append([])
......@@ -67,30 +71,30 @@ def load_athena_scenario(filename):
print len(all_outputs)
print len(cpu_cruncher_algos)
return cpu_cruncher_algos,cpu_cruncher_algos_inputs
# Set output level threshold 2=DEBUG, 3=INFO, 4=WARNING, 5=ERROR, 6=FATAL )
ms = MessageSvc()
ms = MessageSvc()
ms.OutputLevel = Verbosity
crunchers,inputs = load_athena_scenario("Athena_loopfixed.json")
whiteboard = HiveWhiteBoard("EventDataSvc", EventSlots = NumberOfEventsInFlight)
# Setup the Event Loop Manager
evtloop = HiveEventLoopMgr()
evtloop.MaxAlgosParallel = NumberOfAlgosInFlight
evtloop.MaxEventsParallel = NumberOfEventsInFlight
evtloop.NumThreads = NumberOfThreads
evtloop.CloneAlgorithms = CloneAlgos
evtloop.DumpQueues = DumpQueues
evtloop.AlgosDependencies = inputs
slimeventloopmgr = HiveSlimEventLoopMgr(OutputLevel=DEBUG)
scheduler = ForwardSchedulerSvc(MaxAlgosInFlight = NumberOfAlgosInFlight,
ThreadPoolSize = NumberOfThreads,
useGraphFlowManagement = True,
OutputLevel=INFO)
# And the Application Manager
app = ApplicationMgr()
app.TopAlg = crunchers
app.EvtSel = "NONE" # do not use any event input
app.EvtMax = NumberOfEvents
app.EventLoop = evtloop
app.EventLoop = slimeventloopmgr
app.ExtSvc =[whiteboard]
app.MessageSvcType = "InertMessageSvc"
#app.MessageSvcType = "TBBMessageSvc"
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment