diff --git a/HLT/Trigger/TrigControl/TrigServices/python/TrigServicesConfig.py b/HLT/Trigger/TrigControl/TrigServices/python/TrigServicesConfig.py index 6bc3c718c52f1e9800e32a415e7c8f18a0dffe9f..b3cd9ae1ac5876336a4149e73e49a6b71d5ee0f8 100644 --- a/HLT/Trigger/TrigControl/TrigServices/python/TrigServicesConfig.py +++ b/HLT/Trigger/TrigControl/TrigServices/python/TrigServicesConfig.py @@ -104,17 +104,11 @@ def getHltROBDataProviderSvc(flags, name='ROBDataProviderSvc'): def getHltEventLoopMgr(flags, name='HltEventLoopMgr'): '''online event loop manager''' - if flags.Trigger.enableAsyncIO: - svc = CompFactory.HltAsyncEventLoopMgr( - name, - setMagFieldFromPtree = flags.Trigger.Online.BFieldAutoConfig - ) - log.info("Running with HltAsyncEventLoopMgr") - else: - svc = CompFactory.HltEventLoopMgr( - name, - setMagFieldFromPtree = flags.Trigger.Online.BFieldAutoConfig - ) + + svc = CompFactory.HltEventLoopMgr( + name, + setMagFieldFromPtree = flags.Trigger.Online.BFieldAutoConfig + ) # Rewrite LVL1 result if L1 simulation and BS-writing is enabled if flags.Trigger.doLVL1 and flags.Trigger.writeBS: diff --git a/HLT/Trigger/TrigControl/TrigServices/python/TriggerUnixStandardSetup.py b/HLT/Trigger/TrigControl/TrigServices/python/TriggerUnixStandardSetup.py index f10e3e04124df80d8cfb209758c460a6fd22828c..3d7fec99101ded3881e78c5c99be2bb8d96c965d 100644 --- a/HLT/Trigger/TrigControl/TrigServices/python/TriggerUnixStandardSetup.py +++ b/HLT/Trigger/TrigControl/TrigServices/python/TriggerUnixStandardSetup.py @@ -41,7 +41,7 @@ def commonServicesCfg(flags): TopAlg=["AthSequencer/AthMasterSeq"]) ) from AthenaConfiguration.MainServicesConfig import AvalancheSchedulerSvcCfg - cfg.merge( AvalancheSchedulerSvcCfg(flags, maxParallelismExtra=1 if flags.Trigger.enableAsyncIO else 0) ) + cfg.merge( AvalancheSchedulerSvcCfg(flags, maxParallelismExtra=1) ) # SGCommitAuditor to sweep new DataObjects at end of Alg execute cfg.addAuditor( CompFactory.SGCommitAuditor() ) diff --git a/HLT/Trigger/TrigControl/TrigServices/src/HltAsyncEventLoopMgr.cxx b/HLT/Trigger/TrigControl/TrigServices/src/HltAsyncEventLoopMgr.cxx deleted file mode 100644 index 85ba8227823061bd3b5feef1c894dabb20269347..0000000000000000000000000000000000000000 --- a/HLT/Trigger/TrigControl/TrigServices/src/HltAsyncEventLoopMgr.cxx +++ /dev/null @@ -1,1690 +0,0 @@ -/* - Copyright (C) 2002-2024 CERN for the benefit of the ATLAS collaboration -*/ - -// Local includes -#include "HltAsyncEventLoopMgr.h" -#include "TrigCOOLUpdateHelper.h" -#include "TrigRDBManager.h" - -// Trigger includes -#include "TrigKernel/HltExceptions.h" -#include "TrigSteeringEvent/HLTResultMT.h" - -// Athena includes -#include "AthenaInterprocess/Incidents.h" -#include "AthenaKernel/AthStatusCode.h" -#include "ByteStreamData/ByteStreamMetadata.h" -#include "ByteStreamData/ByteStreamMetadataContainer.h" -#include "EventInfoUtils/EventInfoFromxAOD.h" -#include "StoreGate/StoreGateSvc.h" -#include "StoreGate/SGHiveMgrSvc.h" - -// Gaudi includes -#include "GaudiKernel/ConcurrencyFlags.h" -#include "GaudiKernel/IAlgManager.h" -#include "GaudiKernel/IAlgorithm.h" -#include "GaudiKernel/IEvtSelector.h" -#include "GaudiKernel/IProperty.h" -#include "GaudiKernel/IIoComponent.h" -#include "GaudiKernel/ThreadLocalContext.h" - -// TDAQ includes -#include "eformat/StreamTag.h" -#include "owl/time.h" - -// ROOT includes -#include "TROOT.h" -#include "TSystem.h" - -// System includes -#include <filesystem> -#include <sstream> -#include <string> - -// ============================================================================= -// Helper macros, typedefs and constants -// ============================================================================= -namespace { - bool isTimedOut(const std::unordered_map<std::string_view,StatusCode>& algErrors) { - for (const auto& [key, sc] : algErrors) { - if (sc == Athena::Status::TIMEOUT) return true; - } - return false; - } - /// Workaround for classes which implement operator<< but don't have a method to convert to string - template <typename T> std::string toString(const T& x) { - std::ostringstream ss; - ss << x; - return ss.str(); - } -} -using namespace boost::property_tree; - -// ============================================================================= -// Standard constructor -// ============================================================================= -HltAsyncEventLoopMgr::HltAsyncEventLoopMgr(const std::string& name, ISvcLocator* svcLoc) -: base_class(name, svcLoc) {} - -// ============================================================================= -// Standard destructor -// ============================================================================= -HltAsyncEventLoopMgr::~HltAsyncEventLoopMgr() noexcept -{ - // tbb:task_group destructor throws if wait() was never called - m_parallelIOTaskGroup.wait(); -} - -// ============================================================================= -// Reimplementation of AthService::initalize (IStateful interface) -// ============================================================================= -StatusCode HltAsyncEventLoopMgr::initialize() -{ - // Do not auto-retrieve tools (see Gaudi!1124) - m_autoRetrieveTools = false; - m_checkToolDeps = false; - - ATH_MSG_VERBOSE("start of " << __FUNCTION__); - - ATH_MSG_INFO(" ---> HltAsyncEventLoopMgr = " << name() << " initialize"); - - //---------------------------------------------------------------------------- - // Setup properties - //---------------------------------------------------------------------------- - - // Set the timeout value (cast float to int) - m_softTimeoutValue = std::chrono::milliseconds(static_cast<int>(m_hardTimeout.value() * m_softTimeoutFraction.value())); - - // Read DataFlow configuration properties - updateDFProps(); - - // print properties - ATH_MSG_INFO(" ---> ApplicationName = " << m_applicationName); - ATH_MSG_INFO(" ---> HardTimeout = " << m_hardTimeout.value()); - ATH_MSG_INFO(" ---> SoftTimeoutFraction = " << m_softTimeoutFraction.value()); - ATH_MSG_INFO(" ---> SoftTimeoutValue = " << m_softTimeoutValue.count()); - ATH_MSG_INFO(" ---> TimeoutThreadIntervalMs = " << m_timeoutThreadIntervalMs.value()); - ATH_MSG_INFO(" ---> TraceOnTimeout = " << m_traceOnTimeout.value()); - ATH_MSG_INFO(" ---> MaxFrameworkErrors = " << m_maxFrameworkErrors.value()); - ATH_MSG_INFO(" ---> FwkErrorDebugStreamName = " << m_fwkErrorDebugStreamName.value()); - ATH_MSG_INFO(" ---> AlgErrorDebugStreamName = " << m_algErrorDebugStreamName.value()); - ATH_MSG_INFO(" ---> TimeoutDebugStreamName = " << m_timeoutDebugStreamName.value()); - ATH_MSG_INFO(" ---> TruncationDebugStreamName = " << m_truncationDebugStreamName.value()); - ATH_MSG_INFO(" ---> SORPath = " << m_sorPath.value()); - ATH_MSG_INFO(" ---> setMagFieldFromPtree = " << m_setMagFieldFromPtree.value()); - ATH_MSG_INFO(" ---> execAtStart = " << m_execAtStart.value()); - ATH_MSG_INFO(" ---> forceRunNumber = " << m_forceRunNumber.value()); - ATH_MSG_INFO(" ---> forceLumiblock = " << m_forceLumiblock.value()); - ATH_MSG_INFO(" ---> forceStartOfRunTime = " << m_forceSOR_ns.value()); - ATH_MSG_INFO(" ---> RewriteLVL1 = " << m_rewriteLVL1.value()); - ATH_MSG_INFO(" ---> EventContextWHKey = " << m_eventContextWHKey.key()); - ATH_MSG_INFO(" ---> EventInfoRHKey = " << m_eventInfoRHKey.key()); - - ATH_CHECK( m_jobOptionsSvc.retrieve() ); - const std::string& slots = m_jobOptionsSvc->get("EventDataSvc.NSlots"); - if (!slots.empty()) - ATH_MSG_INFO(" ---> NumConcurrentEvents = " << slots); - else - ATH_MSG_WARNING("Failed to retrieve the job property EventDataSvc.NSlots"); - const std::string& threads = m_jobOptionsSvc->get("AvalancheSchedulerSvc.ThreadPoolSize"); - if (!threads.empty()) - ATH_MSG_INFO(" ---> NumThreads = " << threads); - else - ATH_MSG_WARNING("Failed to retrieve the job property AvalancheSchedulerSvc.ThreadPoolSize"); - - const std::string& procs = m_jobOptionsSvc->get("DataFlowConfig.DF_NumberOfWorkers"); - if (!procs.empty()) { - ATH_MSG_INFO(" ---> NumProcs = " << procs); - try { - SG::HiveMgrSvc::setNumProcs(std::stoi(procs)); - } - catch (const std::logic_error& ex) { - ATH_MSG_ERROR("Cannot convert " << procs << "to integer: " << ex.what()); - return StatusCode::FAILURE; - } - } - else - ATH_MSG_WARNING("Failed to retrieve the job property DataFlowconfig.DF_NumberOfWorkers"); - - if (m_maxParallelIOTasks.value() <= 0) { - ATH_CHECK(m_maxParallelIOTasks.fromString(threads)); - } - ATH_MSG_INFO(" ---> MaxParallelIOTasks = " << m_maxParallelIOTasks.value()); - ATH_MSG_INFO(" ---> MaxIOWakeUpIntervalMs = " << m_maxIOWakeUpIntervalMs.value()); - - //---------------------------------------------------------------------------- - // Setup all Hive services for multithreaded event processing with the exception of SchedulerSvc, - // which has to be initialised after forking because it opens new threads - //---------------------------------------------------------------------------- - m_whiteboard = serviceLocator()->service(m_whiteboardName); - if( !m_whiteboard.isValid() ) { - ATH_MSG_FATAL("Error retrieving " << m_whiteboardName << " interface IHiveWhiteBoard"); - return StatusCode::FAILURE; - } - ATH_MSG_DEBUG("Initialised " << m_whiteboardName << " interface IHiveWhiteBoard"); - - m_algResourcePool = serviceLocator()->service("AlgResourcePool"); - if( !m_algResourcePool.isValid() ) { - ATH_MSG_FATAL("Error retrieving AlgResourcePool"); - return StatusCode::FAILURE; - } - ATH_MSG_DEBUG("initialised AlgResourcePool"); - - m_aess = serviceLocator()->service("AlgExecStateSvc"); - if( !m_aess.isValid() ) { - ATH_MSG_FATAL("Error retrieving AlgExecStateSvc"); - return StatusCode::FAILURE; - } - ATH_MSG_DEBUG("initialised AlgExecStateSvc"); - - //---------------------------------------------------------------------------- - // Initialise services - //---------------------------------------------------------------------------- - ATH_CHECK(m_incidentSvc.retrieve()); - ATH_CHECK(m_evtStore.retrieve()); - ATH_CHECK(m_detectorStore.retrieve()); - ATH_CHECK(m_inputMetaDataStore.retrieve()); - ATH_CHECK(m_evtSelector.retrieve()); - ATH_CHECK(m_evtSelector->createContext(m_evtSelContext)); // create an EvtSelectorContext - ATH_CHECK(m_outputCnvSvc.retrieve()); - ATH_CHECK(m_ioCompMgr.retrieve()); - if (m_monitorScheduler) { - ATH_CHECK(m_schedulerMonSvc.retrieve()); - } - - //---------------------------------------------------------------------------- - // Initialise tools - //---------------------------------------------------------------------------- - // COOL helper - ATH_CHECK(m_coolHelper.retrieve()); - // HLT result builder - ATH_CHECK(m_hltResultMaker.retrieve()); - // Monitoring tools - if (!m_monTool.empty()) ATH_CHECK(m_monTool.retrieve()); - ATH_CHECK(m_errorMonTool.retrieve()); - - //---------------------------------------------------------------------------- - // Initialise data handle keys - //---------------------------------------------------------------------------- - // EventContext WriteHandle - ATH_CHECK(m_eventContextWHKey.initialize()); - // EventInfo ReadHandle - ATH_CHECK(m_eventInfoRHKey.initialize()); - // HLTResultMT ReadHandle (created dynamically from the result builder property) - m_hltResultRHKey = m_hltResultMaker->resultName(); - ATH_CHECK(m_hltResultRHKey.initialize()); - // L1TriggerResult and RoIBResult ReadHandles for RewriteLVL1 - ATH_CHECK(m_l1TriggerResultRHKey.initialize(SG::AllowEmpty)); - ATH_CHECK(m_roibResultRHKey.initialize(SG::AllowEmpty)); - - ATH_MSG_VERBOSE("end of " << __FUNCTION__); - return StatusCode::SUCCESS; -} - -// ============================================================================= -// Reimplementation of AthService::stop (IStateful interface) -// ============================================================================= -StatusCode HltAsyncEventLoopMgr::stop() -{ - // Need to reinitialize IO in the mother process - if (m_workerID==0) { - ATH_CHECK(m_ioCompMgr->io_reinitialize()); - } - - return StatusCode::SUCCESS; -} - -// ============================================================================= -// Reimplementation of AthService::finalize (IStateful interface) -// ============================================================================= -StatusCode HltAsyncEventLoopMgr::finalize() -{ - ATH_MSG_INFO(" ---> HltAsyncEventLoopMgr/" << name() << " finalize "); - // Usually (but not necessarily) corresponds to the number of processed events +1 - ATH_MSG_INFO("Total number of EventContext objects created " << m_localEventNumber); - - // Release all handles - auto releaseAndCheck = [&](auto& handle, std::string_view handleType) { - if (handle.release().isFailure()) - ATH_MSG_WARNING("finalize(): Failed to release " << handleType << " " << handle.typeAndName()); - }; - auto releaseService = [&](auto&&... args) { (releaseAndCheck(args,"service"), ...); }; - auto releaseTool = [&](auto&&... args) { (releaseAndCheck(args,"tool"), ...); }; - auto releaseSmartIF = [](auto&&... args) { (args.reset(), ...); }; - - releaseService(m_incidentSvc, - m_evtStore, - m_detectorStore, - m_inputMetaDataStore, - m_evtSelector, - m_outputCnvSvc, - m_schedulerMonSvc); - - releaseTool(m_coolHelper, - m_hltResultMaker, - m_monTool); - - releaseSmartIF(m_whiteboard, - m_algResourcePool, - m_aess, - m_schedulerSvc); - - return StatusCode::SUCCESS; -} - -// ============================================================================= -// Implementation of ITrigEventLoopMgr::prepareForStart -// ============================================================================= -StatusCode HltAsyncEventLoopMgr::prepareForStart(const ptree& pt) -{ - try { - const auto& rparams = pt.get_child("RunParams"); - m_sorHelper = std::make_unique<TrigSORFromPtreeHelper>(msgSvc(), m_detectorStore, m_sorPath, rparams); - } - catch(ptree_bad_path& e) { - ATH_MSG_ERROR("Bad ptree path: \"" << e.path<ptree::path_type>().dump() << "\" - " << e.what()); - return StatusCode::FAILURE; - } - - // Override run/timestamp if needed - if (m_forceRunNumber > 0) { - m_sorHelper->setRunNumber(m_forceRunNumber); - ATH_MSG_WARNING("Run number overwrite:" << m_forceRunNumber); - } - if (m_forceSOR_ns > 0) { - m_sorHelper->setSORtime_ns(m_forceSOR_ns); - ATH_MSG_WARNING("SOR time overwrite:" << m_forceSOR_ns); - } - - // Set our "run context" - m_currentRunCtx.setEventID( m_sorHelper->eventID() ); - m_currentRunCtx.setExtension(Atlas::ExtendedEventContext(m_evtStore->hiveProxyDict(), - m_currentRunCtx.eventID().run_number())); - - // Some algorithms expect a valid context during start() - ATH_MSG_DEBUG("Setting context for start transition: " << m_currentRunCtx.eventID()); - Gaudi::Hive::setCurrentContext(m_currentRunCtx); - - try { - ATH_CHECK( clearTemporaryStores() ); // do the necessary resets - ATH_CHECK( m_sorHelper->fillSOR(m_currentRunCtx) ); // update SOR in det store - - const auto& soral = getSorAttrList(); - updateInternal(soral); // update internally kept info - updateMetadataStore(soral); // update metadata store - } - catch(const std::exception& e) { - ATH_MSG_ERROR("Exception: " << e.what()); - } - - ATH_CHECK( updateMagField(pt) ); // update magnetic field - - return StatusCode::SUCCESS; -} - - -// ============================================================================= -// Implementation of ITrigEventLoopMgr::prepareForRun -// ============================================================================= -StatusCode HltAsyncEventLoopMgr::prepareForRun(const ptree& /*pt*/) -{ - ATH_MSG_VERBOSE("start of " << __FUNCTION__); - - try - { - // Reset the AlgExecStateSvc (important in case there was a stop/start) - m_aess->reset(m_currentRunCtx); - - // Fire BeginRun incident - m_incidentSvc->fireIncident(Incident(name(), IncidentType::BeginRun, m_currentRunCtx)); - - // Initialize COOL helper (needs to be done after IOVDbSvc has loaded all folders) - ATH_CHECK(m_coolHelper->readFolderInfo()); - - // Run optional algs/sequences (e.g. CondAlgs ATR-26138) - ATH_CHECK(execAtStart(m_currentRunCtx)); - - // close any open files (e.g. THistSvc) - ATH_CHECK(m_ioCompMgr->io_finalize()); - - // Verify that there are no other open ROOT files (e.g. from dual-use tools). - if ( !gROOT->GetListOfFiles()->IsEmpty() ) { - std::unordered_map<std::string, size_t> dups; - for (const auto f : *gROOT->GetListOfFiles()) { - ++dups[f->GetName()]; - } - // Exception for THistSvc files as those will remain open - auto histsvc = serviceLocator()->service("THistSvc", false).as<IIoComponent>(); - for (const std::string& histfile : m_ioCompMgr->io_retrieve(histsvc.get())) { - dups.erase(histfile); - } - if (!dups.empty()) { - msg() << MSG::ERROR << "The following ROOT files (with #instances) have not been closed yet: "; - for (const auto& [n,c] : dups) msg() << n << "(x" << c << ") "; - msg() << endmsg; - } - } - - // close open DB connections - ATH_CHECK(TrigRDBManager::closeDBConnections(msg())); - - // Assert that scheduler has not been initialised before forking - SmartIF<IService> svc = serviceLocator()->service(m_schedulerName, /*createIf=*/ false); - if (svc.isValid()) { - ATH_MSG_FATAL("Misconfiguration - Scheduler was initialised before forking!"); - return StatusCode::FAILURE; - } - - ATH_MSG_VERBOSE("end of " << __FUNCTION__); - return StatusCode::SUCCESS; - } - catch(const std::runtime_error& e) - { - ATH_MSG_ERROR("Runtime error: " << e.what()); - } - - ATH_MSG_VERBOSE("end of " << __FUNCTION__); - return StatusCode::FAILURE; -} - - -// ============================================================================= -StatusCode HltAsyncEventLoopMgr::execAtStart(const EventContext& ctx) const -{ - const IAlgManager* algMgr = Gaudi::svcLocator()->as<IAlgManager>(); - IAlgorithm* alg{nullptr}; - - StatusCode sc; - for (const std::string& name : m_execAtStart) { - if ( algMgr->getAlgorithm(name, alg) ) { - ATH_MSG_INFO("Executing " << alg->name() << "..."); - sc &= alg->sysExecute(ctx); - } - else ATH_MSG_WARNING("Cannot find algorithm or sequence " << name); - } - return sc; -} - - -// ============================================================================= -// Implementation of ITrigEventLoopMgr::hltUpdateAfterFork -// ============================================================================= -StatusCode HltAsyncEventLoopMgr::hltUpdateAfterFork(const ptree& /*pt*/) -{ - ATH_MSG_VERBOSE("start of " << __FUNCTION__); - - updateDFProps(); - ATH_MSG_INFO("Post-fork initialization for " << m_applicationName); - - ATH_MSG_DEBUG("Initialising the scheduler after forking"); - m_schedulerSvc = serviceLocator()->service(m_schedulerName, /*createIf=*/ true); - if ( !m_schedulerSvc.isValid()){ - ATH_MSG_FATAL("Error retrieving " << m_schedulerName << " interface ISchedulerSvc"); - return StatusCode::FAILURE; - } - ATH_MSG_DEBUG("Initialised " << m_schedulerName << " interface ISchedulerSvc"); - - ATH_MSG_DEBUG("Trying a stop-start of CoreDumpSvc"); - SmartIF<IService> svc = serviceLocator()->service("CoreDumpSvc", /*createIf=*/ false); - if (svc.isValid()) { - StatusCode sc = svc->stop(); - sc &= svc->start(); - if (sc.isFailure()) { - ATH_MSG_WARNING("Could not perform stop/start for CoreDumpSvc"); - } - else { - ATH_MSG_DEBUG("Done a stop-start of CoreDumpSvc"); - } - } - else { - ATH_MSG_WARNING("Could not retrieve CoreDumpSvc"); - } - - // Make sure output files, i.e. histograms are written to their own directory. - // Nothing happens if the online TrigMonTHistSvc is used as there are no output files. - SmartIF<IIoComponent> histsvc = serviceLocator()->service("THistSvc", /*createIf=*/ false).as<IIoComponent>(); - if ( !m_ioCompMgr->io_retrieve(histsvc.get()).empty() ) { - std::filesystem::path worker_dir = std::filesystem::absolute("athenaHLT_workers"); - std::ostringstream oss; - oss << "athenaHLT-" << std::setfill('0') << std::setw(2) << m_workerID; - worker_dir /= oss.str(); - // Delete worker directory if it exists already - if ( std::filesystem::exists(worker_dir) ) { - if ( std::filesystem::remove_all(worker_dir) == 0 ) { - ATH_MSG_FATAL("Cannot delete previous worker directory " << worker_dir); - return StatusCode::FAILURE; - } - } - if ( !std::filesystem::create_directories(worker_dir) ) { - ATH_MSG_FATAL("Cannot create worker directory " << worker_dir); - return StatusCode::FAILURE; - } - ATH_MSG_INFO("Writing worker output files to " << worker_dir); - ATH_CHECK(m_ioCompMgr->io_update_all(worker_dir.string())); - } - ATH_CHECK(m_ioCompMgr->io_reinitialize()); - - const size_t numSlots = m_whiteboard->getNumberOfStores(); - m_freeSlots = numSlots; - - // Initialise vector of time points for event timeout monitoring - m_eventTimerStartPoint.clear(); - m_eventTimerStartPoint.resize(numSlots, std::chrono::steady_clock::now()); - m_isSlotProcessing.resize(numSlots, false); - - // Initialise vector of time points for free slots monitoring - m_freeSlotStartPoint.clear(); - m_freeSlotStartPoint.resize(numSlots, std::chrono::steady_clock::now()); - - // Initialise the queues used in parallel I/O steering - m_parallelIOQueue.set_capacity(static_cast<decltype(m_parallelIOQueue)::size_type>(m_maxParallelIOTasks.value())); - m_finishedEventsQueue.set_capacity(static_cast<decltype(m_finishedEventsQueue)::size_type>(numSlots)); - - // Fire incident to update listeners after forking - m_incidentSvc->fireIncident(AthenaInterprocess::UpdateAfterFork(m_workerID, m_workerPID, name(), m_currentRunCtx)); - - ATH_MSG_VERBOSE("end of " << __FUNCTION__); - return StatusCode::SUCCESS; -} - -// ============================================================================= -// Implementation of IEventProcessor::executeRun -// ============================================================================= -StatusCode HltAsyncEventLoopMgr::executeRun(int maxevt) -{ - ATH_MSG_VERBOSE("start of " << __FUNCTION__); - - if (m_monitorScheduler) ATH_CHECK(m_schedulerMonSvc->startMonitoring()); - - StatusCode sc = StatusCode::SUCCESS; - try { - sc = nextEvent(maxevt); - if (sc.isFailure()) ATH_MSG_FATAL("Event loop failed"); - } - catch (const std::exception& e) { - ATH_MSG_FATAL("Event loop failed, std::exception caught: " << e.what()); - sc = StatusCode::FAILURE; - } - catch (...) { - ATH_MSG_FATAL("Event loop failed, unknown exception caught"); - sc = StatusCode::FAILURE; - } - - if (m_monitorScheduler) ATH_CHECK(m_schedulerMonSvc->stopMonitoring()); - - ATH_MSG_VERBOSE("end of " << __FUNCTION__); - return sc; -} - -// ============================================================================= -// Implementation of IEventProcessor::nextEvent -// maxevt is not used - we always want to process all events delivered -// ============================================================================= -StatusCode HltAsyncEventLoopMgr::nextEvent(int /*maxevt*/) -{ - ATH_MSG_VERBOSE("start of " << __FUNCTION__); - - // Start the event timer thread - ATH_MSG_DEBUG("Starting the timeout thread"); - m_timeoutThread = std::make_unique<HLT::LoopThread>([this]{return eventTimerCallback();}, m_timeoutThreadIntervalMs.value()); - m_timeoutThread->start(); - - // Start the event loop - ATH_MSG_INFO("Starting loop on events"); - std::unique_lock<std::mutex> lock{m_loopStatus.loopEndedMutex}; - m_inputThread = std::make_unique<HLT::LoopThread>([this]{return inputThreadCallback();}, m_maxIOWakeUpIntervalMs.value()); - m_outputThread = std::make_unique<HLT::LoopThread>([this]{return outputThreadCallback();}, m_maxIOWakeUpIntervalMs.value()); - m_outputThread->start(); - m_inputThread->start(); - - // Wait for event loop to end. The condition means the main input and output threads flagged they have - // nothing else to do and will exit asynchronously (later than the wait here ends) - ATH_MSG_DEBUG("Event loop started, the main thread is going to sleep until it finishes"); - m_loopStatus.loopEndedCond.wait(lock, [this](){return m_loopStatus.loopEnded.load();}); - ATH_MSG_INFO("All events processed, finalising the event loop"); - - // Wait for the I/O TBB tasks and main I/O threads to finish. Note the TBB tasks need to finish first - // because they may notify the condition variables in the main I/O threads. The lifetime of the condition - // variables must span beyond any I/O TBB task. - ATH_MSG_DEBUG("Waiting for all I/O tasks and threads to return"); - m_parallelIOTaskGroup.wait(); - m_inputThread.reset(); - m_outputThread.reset(); - - // Stop the event timer thread - ATH_MSG_DEBUG("All I/O threads and tasks finished. Stopping the timeout thread"); - m_timeoutThread->stop(); - m_timeoutThread.reset(); - ATH_MSG_DEBUG("The timeout thread finished"); - - ATH_MSG_INFO("Finished loop on events"); - - ATH_MSG_VERBOSE("end of " << __FUNCTION__); - return StatusCode::SUCCESS; -} - -// ============================================================================= -// Implementation of IEventProcessor::stopRun (obsolete for online runnning) -// ============================================================================= -StatusCode HltAsyncEventLoopMgr::stopRun() { - ATH_MSG_FATAL("Misconfiguration - the method HltAsyncEventLoopMgr::stopRun() cannot be used online"); - return StatusCode::FAILURE; -} - -// ============================================================================= -// Implementation of IEventProcessor::createEventContext -// ============================================================================= -EventContext HltAsyncEventLoopMgr::createEventContext() { - size_t eventNumber = ++m_localEventNumber; - auto slot = m_whiteboard->allocateStore(eventNumber); // returns npos on failure - if (slot == std::string::npos) { - // return an invalid EventContext - return EventContext(); - } - return EventContext{ eventNumber, slot }; -} - -// ============================================================================= -// Implementation of IEventProcessor::executeEvent -// ============================================================================= -StatusCode HltAsyncEventLoopMgr::executeEvent(EventContext &&ctx) -{ - ATH_MSG_VERBOSE("start of " << __FUNCTION__); - - resetTimeout(Athena::Timeout::instance(ctx)); - - // Monitor slot idle time (between scheduler popFinishedEvent and pushNewEvent) - // Note this is time of a scheduler slot being free, not equal to the time of a whiteboard slot being free - const auto slotIdleTime = std::chrono::steady_clock::now() - m_freeSlotStartPoint[ctx.slot()]; - Monitored::Scalar<int64_t> monSlotIdleTime("SlotIdleTime", std::chrono::duration_cast<std::chrono::milliseconds>(slotIdleTime).count()); - Monitored::Group(m_monTool, monSlotIdleTime); - - // Now add event to the scheduler - ATH_MSG_DEBUG("Adding event " << ctx.evt() << ", slot " << ctx.slot() << " to the scheduler"); - StatusCode addEventStatus = m_schedulerSvc->pushNewEvent( new EventContext{std::move(ctx)} ); - - // If this fails, we need to wait for something to complete - if (addEventStatus.isFailure()){ - ATH_MSG_ERROR("Failed adding event to the scheduler"); - return StatusCode::FAILURE; - } - - ATH_MSG_VERBOSE("end of " << __FUNCTION__); - return StatusCode::SUCCESS; -} - -// ============================================================================= -void HltAsyncEventLoopMgr::updateDFProps() -{ - auto getDFProp = [&](const std::string& name, std::string& value, bool required = true) { - if (m_jobOptionsSvc->has("DataFlowConfig."+name)) { - value = m_jobOptionsSvc->get("DataFlowConfig."+name); - ATH_MSG_INFO(" ---> Read from DataFlow configuration: " << name << " = " << value); - } else { - msg() << (required ? MSG::WARNING : MSG::INFO) - << "Could not set Property " << name << " from DataFlow" << endmsg; - } - }; - - getDFProp( "DF_ApplicationName", m_applicationName ); - std::string wid, wpid; - getDFProp( "DF_WorkerId", wid, false ); - getDFProp( "DF_Pid", wpid, false ); - if (!wid.empty()) m_workerID = std::stoi(wid); - if (!wpid.empty()) m_workerPID = std::stoi(wpid); -} - -// ============================================================================= -void HltAsyncEventLoopMgr::updateInternal(const coral::AttributeList & sor_attrlist) -{ - auto detMaskFst = sor_attrlist["DetectorMaskFst"].data<unsigned long long>(); - auto detMaskSnd = sor_attrlist["DetectorMaskSnd"].data<unsigned long long>(); - updateDetMask({detMaskFst, detMaskSnd}); - - if(msgLevel() <= MSG::DEBUG) - { - // save current stream flags for later reset - // cast needed (stream thing returns long, but doesn't take it back) - auto previous_stream_flags = static_cast<std::ios::fmtflags>(msgStream().flags()); - ATH_MSG_DEBUG("Full detector mask (128 bits) = 0x" - << MSG::hex << std::setfill('0') - << std::setw(8) << std::get<3>(m_detector_mask) - << std::setw(8) << std::get<2>(m_detector_mask) - << std::setw(8) << std::get<1>(m_detector_mask) - << std::setw(8) << std::get<0>(m_detector_mask)); - msgStream().flags(previous_stream_flags); - } -} - -// ============================================================================= -void HltAsyncEventLoopMgr::updateMetadataStore(const coral::AttributeList & sor_attrlist) const -{ - // least significant part is "snd" in sor but "fst" for ByteStreamMetadata - auto bs_dm_fst = sor_attrlist["DetectorMaskSnd"].data<unsigned long long>(); - // most significant part is "fst" in sor but "snd" for ByteStreamMetadata - auto bs_dm_snd = sor_attrlist["DetectorMaskFst"].data<unsigned long long>(); - - auto metadatacont = std::make_unique<ByteStreamMetadataContainer>(); - metadatacont->push_back(std::make_unique<ByteStreamMetadata>( - sor_attrlist["RunNumber"].data<unsigned int>(), - 0, - 0, - sor_attrlist["RecordingEnabled"].data<bool>(), - 0, - bs_dm_fst, - bs_dm_snd, - 0, - 0, - "", - "", - "", - 0, - std::vector<std::string>() - )); - // Record ByteStreamMetadataContainer in MetaData Store - if(m_inputMetaDataStore->record(std::move(metadatacont),"ByteStreamMetadata").isFailure()) { - ATH_MSG_WARNING("Unable to record MetaData in InputMetaDataStore"); - } - else { - ATH_MSG_DEBUG("Recorded MetaData in InputMetaDataStore"); - } -} - -//========================================================================= -StatusCode HltAsyncEventLoopMgr::updateMagField(const ptree& pt) const -{ - if (m_setMagFieldFromPtree) { - try { - auto tor_cur = pt.get<float>("Magnets.ToroidsCurrent.value"); - auto sol_cur = pt.get<float>("Magnets.SolenoidCurrent.value"); - - // Set current on conditions alg - const IAlgManager* algMgr = Gaudi::svcLocator()->as<IAlgManager>(); - IAlgorithm* fieldAlg{nullptr}; - algMgr->getAlgorithm("AtlasFieldMapCondAlg", fieldAlg).ignore(); - if ( fieldAlg != nullptr ) { - ATH_MSG_INFO("Setting field currents on AtlasFieldMapCondAlg"); - ATH_CHECK( Gaudi::Utils::setProperty(fieldAlg, "MapSoleCurrent", sol_cur) ); - ATH_CHECK( Gaudi::Utils::setProperty(fieldAlg, "MapToroCurrent", tor_cur) ); - } - else ATH_MSG_WARNING("Cannot retrieve AtlasFieldMapCondAlg"); - - ATH_MSG_INFO("*****************************************"); - ATH_MSG_INFO(" Auto-configuration of magnetic field: "); - ATH_MSG_INFO(" solenoid current from IS = " << sol_cur); - ATH_MSG_INFO(" torroid current from IS = " << tor_cur); - ATH_MSG_INFO("*****************************************"); - } - catch(ptree_bad_path& e) { - ATH_MSG_ERROR( "Cannot read magnet currents from ptree: " << e.what() ); - return StatusCode::FAILURE; - } - } - return StatusCode::SUCCESS; -} - - -// ============================================================================= -StatusCode HltAsyncEventLoopMgr::clearTemporaryStores() -{ - //---------------------------------------------------------------------------- - // Clear the event store, if used in the event loop - //---------------------------------------------------------------------------- - ATH_CHECK(m_evtStore->clearStore()); - ATH_MSG_DEBUG("Cleared the EventStore"); - - //---------------------------------------------------------------------------- - // Clear the InputMetaDataStore - //---------------------------------------------------------------------------- - ATH_CHECK(m_inputMetaDataStore->clearStore()); - ATH_MSG_DEBUG("Cleared the InputMetaDataStore"); - - return StatusCode::SUCCESS; -} - -// ============================================================================= -void HltAsyncEventLoopMgr::updateDetMask(const std::pair<uint64_t, uint64_t>& dm) -{ - m_detector_mask = std::make_tuple( - // least significant 4 bytes - static_cast<EventIDBase::number_type>(dm.second), - // next least significant 4 bytes - static_cast<EventIDBase::number_type>(dm.second >> 32), - // next least significant 4 bytes - static_cast<EventIDBase::number_type>(dm.first), - // most significant 4 bytes - static_cast<EventIDBase::number_type>(dm.first >> 32) - ); -} - -// ============================================================================= -const coral::AttributeList& HltAsyncEventLoopMgr::getSorAttrList() const -{ - auto sor = m_detectorStore->retrieve<const TrigSORFromPtreeHelper::SOR>(m_sorPath); - if (sor==nullptr) { - throw std::runtime_error("Cannot retrieve " + m_sorPath); - } - if(sor->size() != 1) - { - // This branch should never be entered (the CondAttrListCollection - // corresponding to the SOR should contain one single AttrList). Since - // that's required by code ahead but not checked at compile time, we - // explicitly guard against any potential future mistake with this check - throw std::runtime_error("SOR record should have one and one only attribute list, but it has " + std::to_string(sor->size())); - } - - const auto & soral = sor->begin()->second; - printSORAttrList(soral); - return soral; -} - -// ============================================================================= -void HltAsyncEventLoopMgr::printSORAttrList(const coral::AttributeList& atr) const -{ - unsigned long long sorTime_ns(atr["SORTime"].data<unsigned long long>()); - - // Human readable format of SOR time if available - time_t sorTime_sec = sorTime_ns / std::nano::den; - const auto sorTime_readable = OWLTime(sorTime_sec); - - ATH_MSG_INFO("SOR parameters:"); - ATH_MSG_INFO(" RunNumber = " << atr["RunNumber"].data<unsigned int>()); - ATH_MSG_INFO(" SORTime [ns] = " << sorTime_ns << " (" << sorTime_readable << ") "); - - // Use string stream for fixed-width hex detector mask formatting - auto dmfst = atr["DetectorMaskFst"].data<unsigned long long>(); - auto dmsnd = atr["DetectorMaskSnd"].data<unsigned long long>(); - std::ostringstream ss; - ss.setf(std::ios_base::hex,std::ios_base::basefield); - ss << std::setw(16) << std::setfill('0') << dmfst; - ATH_MSG_INFO(" DetectorMaskFst = 0x" << ss.str()); - ss.str(""); // reset the string stream - ss << std::setw(16) << std::setfill('0') << dmsnd; - ATH_MSG_INFO(" DetectorMaskSnd = 0x" << ss.str()); - ss.str(""); // reset the string stream - ss << std::setw(16) << std::setfill('0') << dmfst; - ss << std::setw(16) << std::setfill('0') << dmsnd; - ATH_MSG_INFO(" Complete DetectorMask = 0x" << ss.str()); - - ATH_MSG_INFO(" RunType = " << atr["RunType"].data<std::string>()); - ATH_MSG_INFO(" RecordingEnabled = " << (atr["RecordingEnabled"].data<bool>() ? "true" : "false")); -} - -// ============================================================================= -StatusCode HltAsyncEventLoopMgr::failedEvent(HLT::OnlineErrorCode errorCode, const EventContext& eventContext) -{ - ATH_MSG_VERBOSE("start of " << __FUNCTION__ << " with errorCode = " << errorCode - << ", context = " << eventContext << " eventID = " << eventContext.eventID()); - - // Used by MsgSvc (and possibly others but not relevant here) - Gaudi::Hive::setCurrentContext(eventContext); - - auto returnFailureAndStopEventLoop = [this]() -> StatusCode { - ATH_MSG_INFO("Stopping event loop due to failure"); - // Change the loop exit code to FAILURE - m_loopStatus.exitCode = StatusCode::FAILURE; - - // Flag eventsAvailable=false which will result in I/O threads to finish all the ongoing event processing - // and then stop. We cannot flag loopEnded=true here yet, because it would finish the I/O threads while - // there might be still events being processed and they would crash when finished. - m_loopStatus.eventsAvailable = false; - - // Inform the caller the failure could not be handled cleanly and the event loop will stop - return StatusCode::FAILURE; - }; - - //---------------------------------------------------------------------------- - // Handle framework errors by printing an informative message and breaking the loop - //---------------------------------------------------------------------------- - if (errorCode==HLT::OnlineErrorCode::BEFORE_NEXT_EVENT) { - ATH_MSG_ERROR("Failure occurred with OnlineErrorCode=" << errorCode - << " meaning there was a framework error before requesting a new event. No output will be produced for this event" - << " and the event loop will exit after all ongoing processing is finished."); - return returnFailureAndStopEventLoop(); - } - if (errorCode==HLT::OnlineErrorCode::CANNOT_RETRIEVE_EVENT) { - ATH_MSG_ERROR("Failure occurred with OnlineErrorCode=" << errorCode - << " meaning a new event could not be correctly read. No output will be produced for this event." - << " The event loop will exit after all ongoing processing is finished."); - return returnFailureAndStopEventLoop(); - } - if (errorCode==HLT::OnlineErrorCode::AFTER_RESULT_SENT) { - ATH_MSG_ERROR("Failure occurred with OnlineErrorCode=" << errorCode - << " meaning there was a framework error after HLT result was already sent out." - << " The event loop will exit after all ongoing processing is finished."); - return returnFailureAndStopEventLoop(); - } - if (errorCode==HLT::OnlineErrorCode::CANNOT_ACCESS_SLOT) { - ATH_MSG_ERROR("Failed to access the slot for the processed event, cannot produce output. OnlineErrorCode=" - << errorCode << ". The event loop will exit after all ongoing processing is finished unless the failed event" - << " reaches a hard timeout sooner and this process is killed."); - return returnFailureAndStopEventLoop(); - } - if (errorCode==HLT::OnlineErrorCode::SCHEDULING_FAILURE) { - // Here we cannot be certain if the scheduler started processing the event or not. If yes, the output thread - // will finalise the event as normal. If not, the event will eventually reach a hard timeout and this process - // is killed, or we exit the process without ever producing output for this event (needs to be handled upstream). - ATH_MSG_ERROR("Failure occurred with OnlineErrorCode=" << errorCode - << ". Cannot determine if the event processing started or not and whether a decision for this event will be" - << " produced. The event loop will exit after all ongoing processing is finished, which may include or" - << " not include the problematic event."); - return returnFailureAndStopEventLoop(); - } - if (errorCode==HLT::OnlineErrorCode::SCHEDULER_POP_FAILURE) { - ATH_MSG_ERROR("Failure occurred with OnlineErrorCode=" << errorCode - << " meaning the Scheduler returned FAILURE when asked to give a finished event. Will keep trying to" - << " pop further events if there are any still in the scheduler, but this may keep repeating until" - << " this process is killed by hard timeout or other means. If all ongoing processing manages to finish" - << " then the event loop will exit."); - return returnFailureAndStopEventLoop(); - } - if (!eventContext.valid()) { - ATH_MSG_ERROR("Failure occurred with an invalid EventContext. Likely there was a framework error before" - << " requesting a new event or after sending the result of a finished event. OnlineErrorCode=" << errorCode - << ". The event loop will exit after all ongoing processing is finished."); - return returnFailureAndStopEventLoop(); - } - - //---------------------------------------------------------------------------- - // Make sure we are using the right store - //---------------------------------------------------------------------------- - if (m_whiteboard->selectStore(eventContext.slot()).isFailure()) { - return failedEvent(HLT::OnlineErrorCode::CANNOT_ACCESS_SLOT,eventContext); - } - - //---------------------------------------------------------------------------- - // Define a debug stream tag for the HLT result - //---------------------------------------------------------------------------- - std::string debugStreamName; - switch (errorCode) { - case HLT::OnlineErrorCode::PROCESSING_FAILURE: - debugStreamName = m_algErrorDebugStreamName.value(); - break; - case HLT::OnlineErrorCode::TIMEOUT: - debugStreamName = m_timeoutDebugStreamName.value(); - break; - case HLT::OnlineErrorCode::RESULT_TRUNCATION: - debugStreamName = m_truncationDebugStreamName.value(); - break; - default: - debugStreamName = m_fwkErrorDebugStreamName.value(); - break; - } - eformat::helper::StreamTag debugStreamTag{debugStreamName, eformat::DEBUG_TAG, true}; - - //---------------------------------------------------------------------------- - // Create an HLT result for the failed event (copy one if it exists and contains serialised data) - //---------------------------------------------------------------------------- - std::unique_ptr<HLT::HLTResultMT> hltResultPtr; - StatusCode buildResultCode{StatusCode::SUCCESS}; - auto hltResultRH = SG::makeHandle(m_hltResultRHKey,eventContext); - if (hltResultRH.isValid() && !hltResultRH->getSerialisedData().empty()) { - // There is already an existing result, create a copy with the error code and stream tag - hltResultPtr = std::make_unique<HLT::HLTResultMT>(*hltResultRH); - hltResultPtr->addErrorCode(errorCode); - buildResultCode &= hltResultPtr->addStreamTag(debugStreamTag); - } else { - // Create a result if not available, pre-fill with error code an stream tag, then try to fill event data - hltResultPtr = std::make_unique<HLT::HLTResultMT>(); - hltResultPtr->addErrorCode(errorCode); - buildResultCode &= hltResultPtr->addStreamTag(debugStreamTag); - // Fill the result unless we already failed doing this before - if (errorCode != HLT::OnlineErrorCode::NO_HLT_RESULT) { - buildResultCode &= m_hltResultMaker->fillResult(*hltResultPtr,eventContext); - } - } - - // Try to record the result in th event store - SG::WriteHandleKey<HLT::HLTResultMT> hltResultWHK(m_hltResultRHKey.key()+"_FailedEvent"); - buildResultCode &= hltResultWHK.initialize(); - auto hltResultWH = SG::makeHandle(hltResultWHK,eventContext); - if (buildResultCode.isFailure() || hltResultWH.record(std::move(hltResultPtr)).isFailure()) { - if (errorCode == HLT::OnlineErrorCode::NO_HLT_RESULT) { - // Avoid infinite loop - ATH_MSG_ERROR("Second failure to build or record the HLT Result in event store while handling a failed event. " - << "Cannot force-accept this event from HLT side, will rely on data collector to do this. " - << "The event loop will exit after all ongoing processing is finished."); - return returnFailureAndStopEventLoop(); - } - ATH_MSG_ERROR("Failed to build or record the HLT Result in event store while handling a failed event. " - << "Trying again with skipped filling of the result contents (except debug stream tag)."); - return failedEvent(HLT::OnlineErrorCode::NO_HLT_RESULT,eventContext); - } - - //---------------------------------------------------------------------------- - // Monitor event processing time for the failed (force-accepted) event - //---------------------------------------------------------------------------- - auto eventTime = std::chrono::steady_clock::now() - m_eventTimerStartPoint[eventContext.slot()]; - int64_t eventTimeMillisec = std::chrono::duration_cast<std::chrono::milliseconds>(eventTime).count(); - auto monTimeAny = Monitored::Scalar<int64_t>("TotalTime", eventTimeMillisec); - auto monTimeAcc = Monitored::Scalar<int64_t>("TotalTimeAccepted", eventTimeMillisec); - Monitored::Group(m_monTool, monTimeAny, monTimeAcc); - - //---------------------------------------------------------------------------- - // Try to build and send the output - //---------------------------------------------------------------------------- - if (m_outputCnvSvc->connectOutput("").isFailure()) { - ATH_MSG_ERROR("The output conversion service failed in connectOutput() while handling a failed event. " - << "Cannot force-accept this event from HLT side, will rely on data collector to do this. " - << "The event loop will exit after all ongoing processing is finished."); - return returnFailureAndStopEventLoop(); - } - - DataObject* hltResultDO = m_evtStore->accessData(hltResultWH.clid(),hltResultWH.key()); - if (hltResultDO == nullptr) { - if (errorCode == HLT::OnlineErrorCode::NO_HLT_RESULT) { - // Avoid infinite loop - ATH_MSG_ERROR("Second failure to build or record the HLT Result in event store while handling a failed event. " - << "Cannot force-accept this event from HLT side, will rely on data collector to do this. " - << "The event loop will exit after all ongoing processing is finished."); - return returnFailureAndStopEventLoop(); - } - ATH_MSG_ERROR("Failed to retrieve DataObject for the HLT result object while handling a failed event. " - << "Trying again with skipped filling of the result contents (except debug stream tag)."); - return failedEvent(HLT::OnlineErrorCode::NO_HLT_RESULT,eventContext); - } - - IOpaqueAddress* addr = nullptr; - if (m_outputCnvSvc->createRep(hltResultDO,addr).isFailure() || addr == nullptr) { - ATH_MSG_ERROR("Conversion of HLT result object to the output format failed while handling a failed event. " - << "Cannot force-accept this event from HLT side, will rely on data collector to do this. " - << "The event loop will exit after all ongoing processing is finished."); - delete addr; - return returnFailureAndStopEventLoop(); - } - - if (m_outputCnvSvc->commitOutput("",true).isFailure()) { - ATH_MSG_ERROR("The output conversion service failed in commitOutput() while handling a failed event. " - << "Cannot force-accept this event from HLT side, will rely on data collector to do this. " - << "The event loop will exit after all ongoing processing is finished."); - delete addr; - return returnFailureAndStopEventLoop(); - } - - // The output has been sent out, the ByteStreamAddress can be deleted - delete addr; - - //------------------------------------------------------------------------ - // Reset the timeout flag and the timer, and mark the slot as idle - //------------------------------------------------------------------------ - resetEventTimer(eventContext, /*processing=*/ false); - - //---------------------------------------------------------------------------- - // Clear the event data slot - //---------------------------------------------------------------------------- - // Need to copy the event context because it's managed by the event store and clearWBSlot deletes it - const EventContext eventContextCopy = eventContext; - if (clearWBSlot(eventContext.slot()).isFailure()) - return failedEvent(HLT::OnlineErrorCode::AFTER_RESULT_SENT,eventContextCopy); - - // Only now after store clearing we can allow the slot to be filled again, - // so we increment m_freeSlots and notify the input thread - ++m_freeSlots; - if (!m_loopStatus.loopEnded && m_inputThread!=nullptr) { - m_inputThread->cond().notify_all(); - } - - //---------------------------------------------------------------------------- - // Finish handling the failed event - //---------------------------------------------------------------------------- - - // Unless this is an event data or algorithm processing failure, increment the number of framework failures - if (!HLT::isEventProcessingErrorCode(errorCode)) { - if ( m_maxFrameworkErrors.value()>=0 && ((++m_nFrameworkErrors)>m_maxFrameworkErrors.value()) ) { - ATH_MSG_ERROR("Failure with OnlineErrorCode=" << errorCode - << " was successfully handled, but the number of tolerable framework errors for this HltAsyncEventLoopMgr instance," - << " which is " << m_maxFrameworkErrors.value() << ", was exceeded. Current local event number is " - << eventContextCopy.evt() << ", slot " << eventContextCopy.slot() - << ". The event loop will exit after all ongoing processing is finished."); - return returnFailureAndStopEventLoop(); - } - } - - // Even if handling the failed event succeeded, print an error message with failed event details - ATH_MSG_ERROR("Failed event with OnlineErrorCode=" << errorCode - << " Current local event number is " << eventContextCopy.evt() << ", slot " << eventContextCopy.slot()); - - ATH_MSG_VERBOSE("end of " << __FUNCTION__); - return StatusCode::SUCCESS; // error handling succeeded, event loop may continue -} - -// ============================================================================= -void HltAsyncEventLoopMgr::eventTimerCallback() -{ - ATH_MSG_VERBOSE("start of " << __FUNCTION__); - auto now=std::chrono::steady_clock::now(); - for (size_t i=0; i<m_eventTimerStartPoint.size(); ++i) { - // iterate over all slots and check for timeout - if (!m_isSlotProcessing.at(i)) continue; - if (now > m_eventTimerStartPoint.at(i) + m_softTimeoutValue) { - EventContext ctx(0,i); // we only need the slot number for Athena::Timeout instance - // don't duplicate the actions if the timeout was already reached - if (!Athena::Timeout::instance(ctx).reached()) { - ATH_MSG_ERROR("Soft timeout in slot " << i << ". Processing time exceeded the limit of " << m_softTimeoutValue.count() << " ms"); - setTimeout(Athena::Timeout::instance(ctx)); - // Generate stack trace and scheduler dump only once, on the first timeout - if (m_traceOnTimeout.value() && !m_timeoutTraceGenerated) { - m_schedulerSvc->dumpState(); - ATH_MSG_INFO("Generating stack trace due to the soft timeout"); - m_timeoutTraceGenerated = true; - gSystem->StackTrace(); - } - } - } - } - ATH_MSG_VERBOSE("end of " << __FUNCTION__); -} - -// ============================================================================= -void HltAsyncEventLoopMgr::resetEventTimer(const EventContext& eventContext, bool processing) { - if (!eventContext.valid()) {return;} - { - std::unique_lock<std::mutex> lock(m_timeoutThread->mutex()); - m_eventTimerStartPoint[eventContext.slot()] = std::chrono::steady_clock::now(); - m_isSlotProcessing[eventContext.slot()] = processing; - resetTimeout(Athena::Timeout::instance(eventContext)); - } - m_timeoutThread->cond().notify_all(); -} - -// ============================================================================= -StatusCode HltAsyncEventLoopMgr::clearWBSlot(size_t evtSlot) const -{ - ATH_MSG_VERBOSE("start of " << __FUNCTION__); - auto monTime = Monitored::Timer<std::chrono::duration<float, std::milli>>("TIME_clearStore"); - StatusCode sc = m_whiteboard->clearStore(evtSlot); - Monitored::Group(m_monTool, monTime); - if( !sc.isSuccess() ) { - ATH_MSG_WARNING("Clear of event data store failed"); - } - ATH_MSG_VERBOSE("end of " << __FUNCTION__ << ", returning m_whiteboard->freeStore(evtSlot=" << evtSlot << ")"); - return m_whiteboard->freeStore(evtSlot); -} - -// ============================================================================= -void HltAsyncEventLoopMgr::inputThreadCallback() { - ATH_MSG_VERBOSE("start of " << __FUNCTION__); - if (m_loopStatus.loopEnded) { - ATH_MSG_VERBOSE("Event loop ended, stopping the input thread and returning from " << __FUNCTION__); - m_inputThread->stop(); - // Notify output thread which may be still waiting for events - m_outputThread->cond().notify_all(); - return; - } - - // Early exit conditions - if (!m_loopStatus.eventsAvailable) { - ATH_MSG_VERBOSE("No more events, flagging the event loop as finished, stopping the input thread" - << " and returning from " << __FUNCTION__); - m_inputThread->stop(); - // Notify output thread which may be still waiting for events - m_outputThread->cond().notify_all(); - return; - } - const size_t numSlotsToFill = m_freeSlots.load(); - if (numSlotsToFill==0) { - ATH_MSG_VERBOSE("No free slots, returning from " << __FUNCTION__); - return; - } - m_freeSlots -= numSlotsToFill; - - // Read in and start processing another event - ATH_MSG_DEBUG("Free slots = " << numSlotsToFill << ". Reading new event(s) to fill the slot(s)."); - - // Fill all free slots with new events - for (size_t i=0; i<numSlotsToFill; ++i) { - auto task = [mgr=this](){ - StatusCode sc = StatusCode::SUCCESS; - try { - sc = mgr->startNextEvent(); - } - catch (const std::exception& e) { - mgr->error() << "Exception caught in startNextEvent: " << e.what() << endmsg; - sc = StatusCode::FAILURE; - } - catch (...) { - mgr->error() << "Exception caught in startNextEvent" << endmsg; - sc = StatusCode::FAILURE; - } - if (sc.isFailure()) { - mgr->error() << "startNextEvent failed, stopping the event loop" << endmsg; - mgr->m_loopStatus.exitCode = StatusCode::FAILURE; - mgr->m_loopStatus.eventsAvailable = false; - return; - } - // Pop one item from parallel I/O queue to decrement its size - it doesn't matter which item - // is popped, we only use the queue size to limit the number of tasks running in parallel - bool popIOQueue{false}; - mgr->m_parallelIOQueue.pop(popIOQueue); - }; - - // Push one item to the parallel I/O queue to increment its size - the value doesn't matter, - // we only use the queue size and benefit from the blocking push call here to limit the number - // of tasks running in parallel. Once we can push to the queue, we can schedule the task. - m_parallelIOQueue.push(true); - m_parallelIOTaskGroup.run(std::move(task)); - } - ATH_MSG_VERBOSE("end of " << __FUNCTION__); -} - -// ============================================================================= -void HltAsyncEventLoopMgr::outputThreadCallback() { - ATH_MSG_VERBOSE("start of " << __FUNCTION__); - const size_t nslots = m_isSlotProcessing.size(); // size is fixed in hltUpdateAfterFork after configuring scheduler - if (m_schedulerSvc->freeSlots() == nslots) { - if (m_loopStatus.eventsAvailable) { - ATH_MSG_DEBUG("There are currently no events being processed by the Scheduler, returning from " << __FUNCTION__); - } else { - ATH_MSG_DEBUG("No more events to process and scheduler is empty, stopping the event loop and output thread"); - if (!m_loopStatus.loopEnded && m_outputThread!=nullptr) { - m_outputThread->stop(); - } - // Notify input thread which may be still waiting for free slots - if (!m_loopStatus.loopEnded && m_inputThread!=nullptr) { - m_inputThread->cond().notify_all(); - } - // Notify the main thread that the loop ended - this is the only place able to do this! - m_loopStatus.loopEnded = true; - m_loopStatus.loopEndedCond.notify_all(); - } - return; - } - - //---------------------------------------------------------------------------- - // Pop events from the Scheduler - //---------------------------------------------------------------------------- - std::vector<EventContext*> finishedEvtContexts; - EventContext* finishedEvtContext(nullptr); - const auto popStartTime = std::chrono::steady_clock::now(); - - // Pop one event from the scheduler (blocking call) - ATH_MSG_DEBUG("Waiting for a finished event from the Scheduler"); - if (m_schedulerSvc->popFinishedEvent(finishedEvtContext).isFailure()) { - failedEvent(HLT::OnlineErrorCode::SCHEDULER_POP_FAILURE, EventContext()).ignore(); - delete finishedEvtContext; - return; - } - ATH_MSG_DEBUG("Scheduler returned a finished event: " << finishedEvtContext); - finishedEvtContexts.push_back(finishedEvtContext); - - // See if more events are available (non-blocking call) - while (m_schedulerSvc->tryPopFinishedEvent(finishedEvtContext).isSuccess()){ - ATH_MSG_DEBUG("Scheduler returned a finished event: " << *finishedEvtContext); - finishedEvtContexts.push_back(finishedEvtContext); - } - const auto popSpentTime = std::chrono::steady_clock::now() - popStartTime; - const auto popSpentTimeMs = std::chrono::duration_cast<std::chrono::milliseconds>(popSpentTime).count(); - Monitored::Scalar<int64_t> monPopSchedulerTime{"PopSchedulerTime", popSpentTimeMs}; - Monitored::Scalar<size_t> monPopSchedulerNumEvt{"PopSchedulerNumEvt", finishedEvtContexts.size()}; - Monitored::Group{m_monTool, monPopSchedulerNumEvt, monPopSchedulerTime}; - - //---------------------------------------------------------------------------- - // Post-process the finished events - //---------------------------------------------------------------------------- - const size_t nFinishedEvents = finishedEvtContexts.size(); - ATH_MSG_DEBUG("Number of finished events to post-process: " << nFinishedEvents); - - // Push all post-processing tasks to TBB - for (EventContext* thisFinishedEvtContext : finishedEvtContexts) { - // Reset free slot timer for monitoring - if (thisFinishedEvtContext != nullptr) { - m_freeSlotStartPoint[thisFinishedEvtContext->slot()] = std::chrono::steady_clock::now(); - } - - // Create and enqueue the task - m_finishedEventsQueue.push(thisFinishedEvtContext); - auto task = [mgr=this](){ - StatusCode sc = StatusCode::SUCCESS; - try { - sc = mgr->processFinishedEvent(); - } - catch (const std::exception& e) { - mgr->error() << "Exception caught in processFinishedEvent: " << e.what() << endmsg; - sc = StatusCode::FAILURE; - } - catch (...) { - mgr->error() << "Exception caught in processFinishedEvent" << endmsg; - sc = StatusCode::FAILURE; - } - - if (sc.isFailure()) { - mgr->error() << "processFinishedEvent failed, stopping the event loop" << endmsg; - mgr->m_loopStatus.exitCode = StatusCode::FAILURE; - mgr->m_loopStatus.eventsAvailable = false; - } - - // Pop one item from parallel I/O queue to decrement its size - it doesn't matter which item - // is popped, we only use the queue size to limit the number of tasks running in parallel - bool popIOQueue{false}; - mgr->m_parallelIOQueue.pop(popIOQueue); - - // Wake up the output thread if it's sleeping - this prevents a deadlock after the last event - // when input thread already finished and is no longer waking up the output thread. Spurious wake-ups - // during the event loop from this notification should have negligible effect on CPU load. - mgr->m_outputThread->cond().notify_one(); - }; - - // Push one item to the parallel I/O queue to increment its size - the value doesn't matter, - // we only use the queue size and benefit from the blocking push call here to limit the number - // of tasks running in parallel. Once we can push to the queue, we can schedule the task. - m_parallelIOQueue.push(true); - m_parallelIOTaskGroup.run(std::move(task)); - } - - ATH_MSG_VERBOSE("end of " << __FUNCTION__); -} - -// ============================================================================= -StatusCode HltAsyncEventLoopMgr::startNextEvent() -{ - StatusCode sc = StatusCode::SUCCESS; - auto check = [this, &sc](std::string&& errmsg, HLT::OnlineErrorCode errcode, const EventContext& eventContext) { - if (sc.isSuccess()) {return false;} - ATH_MSG_ERROR(errmsg); - sc = failedEvent(errcode, eventContext); - Gaudi::Hive::setCurrentContext(EventContext()); - return true; - }; - - //------------------------------------------------------------------------ - // Allocate event slot and create new EventContext - //------------------------------------------------------------------------ - - // Create an EventContext, allocating and selecting a whiteboard slot - std::unique_ptr<EventContext> eventContextPtr = std::make_unique<EventContext>(createEventContext()); - - sc = eventContextPtr->valid() ? StatusCode(StatusCode::SUCCESS) : StatusCode(StatusCode::FAILURE); - if (check("Failed to allocate slot for a new event", HLT::OnlineErrorCode::BEFORE_NEXT_EVENT, *eventContextPtr)) { - return sc; - } - - sc = m_whiteboard->selectStore(eventContextPtr->slot()); - if (check("Failed to select event store slot number " + std::to_string(eventContextPtr->slot()), - HLT::OnlineErrorCode::BEFORE_NEXT_EVENT, *eventContextPtr)) { - return sc; - } - - // We can completely avoid using ThreadLocalContext if we store the EventContext in the event store. Any - // service/tool method which does not allow to pass EventContext as argument, can const-retrieve it from the - // event store rather than using ThreadLocalContext. - - // We link the current store in the extension of the EventContext we just created. Only then we create - // a WriteHandle for the EventContext using the EventContext itself. The handle will use the linked hiveProxyDict - // to record the context in the current store. - eventContextPtr->setExtension(Atlas::ExtendedEventContext(m_evtStore->hiveProxyDict(), - m_currentRunCtx.eventID().run_number())); - auto eventContext = SG::makeHandle(m_eventContextWHKey,*eventContextPtr); - sc = eventContext.record(std::move(eventContextPtr)); - if (check("Failed to record new EventContext in the event store", - HLT::OnlineErrorCode::BEFORE_NEXT_EVENT, *eventContext)) { - return sc; - } - - // Reset the AlgExecStateSvc - m_aess->reset(*eventContext); - - ATH_MSG_DEBUG("Created new EventContext with number: " << eventContext->evt() - << ", slot: " << eventContext->slot()); - - // This ThreadLocalContext call is a not-so-nice behind-the-scenes way to inform some services about the current - // context. If possible, services should use EventContext from the event store as recorded above. We have to set - // the ThreadLocalContext here because some services still use it. - Gaudi::Hive::setCurrentContext(*eventContext); - - //------------------------------------------------------------------------ - // Create a new address for EventInfo to facilitate automatic conversion from input data - //------------------------------------------------------------------------ - IOpaqueAddress* addr = nullptr; - sc = m_evtSelector->createAddress(*m_evtSelContext, addr); - if (check("Event selector failed to create an IOpaqueAddress", - HLT::OnlineErrorCode::BEFORE_NEXT_EVENT, *eventContext)) { - return sc; - } - - //------------------------------------------------------------------------ - // Get the next event - //------------------------------------------------------------------------ - try { - bool noEventsTemporarily{false}; - do { - try { - noEventsTemporarily = false; - sc = m_evtSelector->next(*m_evtSelContext); - } catch (const hltonl::Exception::NoEventsTemporarily& e) { - ATH_MSG_DEBUG("No new input events available temporarily, requesting again"); - noEventsTemporarily = true; - } - } while (noEventsTemporarily); - } - catch (const hltonl::Exception::NoMoreEvents& e) { - sc = StatusCode::SUCCESS; - m_loopStatus.eventsAvailable = false; - sc = clearWBSlot(eventContext->slot()); - if (sc.isFailure()) { - ATH_MSG_WARNING("Failed to clear the whiteboard slot " << eventContext->slot() - << " after NoMoreEvents detected"); - } - // Increment m_freeSlots after clearing the store and notify the input thread - ++m_freeSlots; - if (!m_loopStatus.loopEnded && m_inputThread!=nullptr) { - m_inputThread->cond().notify_all(); - } - return StatusCode::SUCCESS; - } - catch (const hltonl::Exception::MissingCTPFragment& e) { - sc = StatusCode::FAILURE; - if (check(e.what(), HLT::OnlineErrorCode::MISSING_CTP_FRAGMENT, *eventContext)) { - return sc; - } - } - catch (const hltonl::Exception::BadCTPFragment& e) { - sc = StatusCode::FAILURE; - if (check(e.what(), HLT::OnlineErrorCode::BAD_CTP_FRAGMENT, *eventContext)) { - return sc; - } - } - catch (const std::exception& e) { - ATH_MSG_ERROR("Failed to get next event from the event source, std::exception caught: " << e.what()); - sc = StatusCode::FAILURE; - } - catch (...) { - ATH_MSG_ERROR("Failed to get next event from the event source, unknown exception caught"); - sc = StatusCode::FAILURE; - } - if (check("Failed to get the next event", - HLT::OnlineErrorCode::CANNOT_RETRIEVE_EVENT, *eventContext)) { - return sc; - } - - //------------------------------------------------------------------------ - // Reset the timeout flag and the timer, and mark the slot as busy - //------------------------------------------------------------------------ - resetEventTimer(*eventContext, /*processing=*/ true); - - //------------------------------------------------------------------------ - // Load event proxies and get event info - //------------------------------------------------------------------------ - sc = m_evtStore->loadEventProxies(); - if (check("Failed to load event proxies", HLT::OnlineErrorCode::NO_EVENT_INFO, *eventContext)) { - return sc; - } - - auto eventInfo = SG::makeHandle(m_eventInfoRHKey,*eventContext); - sc = eventInfo.isValid() ? StatusCode::SUCCESS : StatusCode::FAILURE; - if (check("Failed to retrieve EventInfo", HLT::OnlineErrorCode::NO_EVENT_INFO, *eventContext)) { - return sc; - } - - ATH_MSG_DEBUG("Retrieved event info for the new event " << *eventInfo); - - // Set EventID for the EventContext - EventID eid = eventIDFromxAOD(eventInfo.cptr()); - // Override run/LB/timestamp if needed - if (m_forceRunNumber > 0) { - eid.set_run_number(m_forceRunNumber); - } - if (m_forceLumiblock > 0) { - eid.set_lumi_block(m_forceLumiblock); - } - if (m_forceSOR_ns > 0) { - eid.set_time_stamp(m_forceSOR_ns / std::nano::den); - eid.set_time_stamp_ns_offset(m_forceSOR_ns % std::nano::den); - } - eventContext->setEventID(eid); - - // Update thread-local EventContext after setting EventID - Gaudi::Hive::setCurrentContext(*eventContext); - - //----------------------------------------------------------------------- - // COOL updates for LB changes - //----------------------------------------------------------------------- - - // Check if this is a new LB - EventIDBase::number_type oldMaxLB{0}, newMaxLB{0}; - bool updatedLB{false}; - do { - oldMaxLB = m_loopStatus.maxLB.load(); - newMaxLB = std::max(oldMaxLB, eventContext->eventID().lumi_block()); - updatedLB = newMaxLB > oldMaxLB; - } while (updatedLB && !m_loopStatus.maxLB.compare_exchange_strong(oldMaxLB, newMaxLB)); - m_loopStatus.maxLB.compare_exchange_strong(oldMaxLB, newMaxLB); - - // Wait in case a COOL update is ongoing to avoid executeEvent - // reading conditions data while they are being updated. - { - std::unique_lock<std::mutex> lock(m_loopStatus.coolUpdateMutex); - m_loopStatus.coolUpdateCond.wait(lock, [&]{return !m_loopStatus.coolUpdateOngoing;}); - } - - // Do COOL updates (if needed) and notify other threads about it - if (updatedLB) { - { - std::lock_guard<std::mutex> lock(m_loopStatus.coolUpdateMutex); - m_loopStatus.coolUpdateOngoing = true; - sc = m_coolHelper->hltCoolUpdate(*eventContext); - if (check("Failure during COOL update", HLT::OnlineErrorCode::COOL_UPDATE, *eventContext)) { - m_loopStatus.coolUpdateOngoing = false; - return sc; - } - m_loopStatus.coolUpdateOngoing = false; - } - m_loopStatus.coolUpdateCond.notify_all(); - } - - //------------------------------------------------------------------------ - // Process the event - //------------------------------------------------------------------------ - // We need to make a copy of eventContext, as executeEvent uses move semantics and eventContext is already owned - // by the event store. The copy we create here is pushed to the scheduler and retrieved back in drainScheduler - // where we have to delete it. - sc = executeEvent(EventContext(*eventContext)); - if (check("Failed to schedule event processing", - HLT::OnlineErrorCode::SCHEDULING_FAILURE, *eventContext)) { - return sc; - } - // Notify the output thread to start waiting for a finished event - m_outputThread->cond().notify_one(); - - //------------------------------------------------------------------------ - // Set ThreadLocalContext to an invalid context - //------------------------------------------------------------------------ - // We have passed the event to the scheduler and we are entering back a context-less environment - Gaudi::Hive::setCurrentContext( EventContext() ); - - return sc; -} - -// ============================================================================= -StatusCode HltAsyncEventLoopMgr::processFinishedEvent() -{ - EventContext* eventContext{nullptr}; - m_finishedEventsQueue.pop(eventContext); - - StatusCode sc = StatusCode::SUCCESS; - auto check = [this, &sc, &eventContext](std::string&& errmsg, HLT::OnlineErrorCode errcode) { - if (sc.isSuccess()) {return false;} - ATH_MSG_ERROR(errmsg); - const EventContext& eventContextRef = (eventContext==nullptr) ? EventContext() : *eventContext; - sc = failedEvent(errcode, eventContextRef); - Gaudi::Hive::setCurrentContext(EventContext()); - delete eventContext; - return true; - }; - - //-------------------------------------------------------------------------- - // Basic checks, select slot, retrieve event info - //-------------------------------------------------------------------------- - // Check if the EventContext object exists - if (eventContext == nullptr) { - sc = StatusCode::FAILURE; - if (check("Detected nullptr EventContext while finalising a processed event", - HLT::OnlineErrorCode::CANNOT_ACCESS_SLOT)) { - return sc; - } - } - - // Set ThreadLocalContext to the currently processed finished context - Gaudi::Hive::setCurrentContext(eventContext); - - // Check the event processing status - if (m_aess->eventStatus(*eventContext) != EventStatus::Success) { - sc = StatusCode::FAILURE; - auto algErrors = m_errorMonTool->algExecErrors(*eventContext); - const HLT::OnlineErrorCode errCode = isTimedOut(algErrors) ? - HLT::OnlineErrorCode::TIMEOUT : HLT::OnlineErrorCode::PROCESSING_FAILURE; - if (check("Processing event with context " + toString(*eventContext) + \ - " failed with status " + toString(m_aess->eventStatus(*eventContext)), - errCode)) { - return sc; - } - } - - // Select the whiteboard slot - sc = m_whiteboard->selectStore(eventContext->slot()); - if (check("Failed to select event store slot " + std::to_string(eventContext->slot()), - HLT::OnlineErrorCode::CANNOT_ACCESS_SLOT)) { - return sc; - } - - // Fire EndProcessing incident - some services may depend on this - m_incidentSvc->fireIncident(Incident(name(), IncidentType::EndProcessing, *eventContext)); - - //-------------------------------------------------------------------------- - // HLT output handling - //-------------------------------------------------------------------------- - // Call the result builder to record HLTResultMT in SG - sc = m_hltResultMaker->makeResult(*eventContext); - if (check("Failed to create the HLT result object", HLT::OnlineErrorCode::NO_HLT_RESULT)) {return sc;} - - // Connect output (create the output container) - the argument is currently not used - sc = m_outputCnvSvc->connectOutput(""); - if (check("Conversion service failed to connectOutput", HLT::OnlineErrorCode::OUTPUT_BUILD_FAILURE)) {return sc;} - - // Retrieve the HLT result and the corresponding DataObject - auto hltResult = SG::makeHandle(m_hltResultRHKey,*eventContext); - if (!hltResult.isValid()) {sc = StatusCode::FAILURE;} - if (check("Failed to retrieve the HLT result", HLT::OnlineErrorCode::NO_HLT_RESULT)) {return sc;} - - DataObject* hltResultDO = m_evtStore->accessData(hltResult.clid(),hltResult.key()); - if (hltResultDO == nullptr) {sc = StatusCode::FAILURE;} - if (check("Failed to retrieve the HLTResult DataObject", HLT::OnlineErrorCode::NO_HLT_RESULT)) {return sc;} - - // Check for result truncation - if (!hltResult->getTruncatedModuleIds().empty() && hltResult->severeTruncation()) {sc = StatusCode::FAILURE;} - if (check("HLT result truncation", HLT::OnlineErrorCode::RESULT_TRUNCATION)) {return sc;} - - // Convert the HLT result to the output data format - IOpaqueAddress* addr = nullptr; - sc = m_outputCnvSvc->createRep(hltResultDO,addr); - if (sc.isFailure()) {delete addr;} - if (check("Conversion service failed to convert HLTResult", HLT::OnlineErrorCode::OUTPUT_BUILD_FAILURE)) {return sc;} - - // Retrieve and convert the L1 result to the output data format - IOpaqueAddress* l1addr = nullptr; - IOpaqueAddress* l1addrLegacy = nullptr; - if (m_rewriteLVL1) { - // Run-3 L1 simulation result - if (not m_l1TriggerResultRHKey.empty()) { - auto l1TriggerResult = SG::makeHandle(m_l1TriggerResultRHKey, *eventContext); - if (!l1TriggerResult.isValid()) {sc = StatusCode::FAILURE;} - if (check("Failed to retrieve the L1 Trigger Result for RewriteLVL1", - HLT::OnlineErrorCode::OUTPUT_BUILD_FAILURE)) { - return sc; - } - - DataObject* l1TriggerResultDO = m_evtStore->accessData(l1TriggerResult.clid(),l1TriggerResult.key()); - if (l1TriggerResultDO == nullptr) {sc = StatusCode::FAILURE;} - if (check("Failed to retrieve the L1 Trigger Result DataObject for RewriteLVL1", - HLT::OnlineErrorCode::OUTPUT_BUILD_FAILURE)) { - return sc; - } - - sc = m_outputCnvSvc->createRep(l1TriggerResultDO,l1addr); - if (sc.isFailure()) {delete l1addr;} - if (check("Conversion service failed to convert L1 Trigger Result for RewriteLVL1", - HLT::OnlineErrorCode::OUTPUT_BUILD_FAILURE)) { - return sc; - } - } - // Legacy (Run-2) L1 simulation result - if (not m_roibResultRHKey.empty()) { - auto roibResult = SG::makeHandle(m_roibResultRHKey, *eventContext); - if (!roibResult.isValid()) {sc = StatusCode::FAILURE;} - if (check("Failed to retrieve the RoIBResult for RewriteLVL1", - HLT::OnlineErrorCode::OUTPUT_BUILD_FAILURE)) { - return sc; - } - - DataObject* roibResultDO = m_evtStore->accessData(roibResult.clid(),roibResult.key()); - if (roibResultDO == nullptr) {sc = StatusCode::FAILURE;} - if (check("Failed to retrieve the RoIBResult DataObject for RewriteLVL1", - HLT::OnlineErrorCode::OUTPUT_BUILD_FAILURE)) { - return sc; - } - - sc = m_outputCnvSvc->createRep(roibResultDO,l1addrLegacy); - if (sc.isFailure()) {delete l1addrLegacy;} - if (check("Conversion service failed to convert RoIBResult for RewriteLVL1", - HLT::OnlineErrorCode::OUTPUT_BUILD_FAILURE)) { - return sc; - } - } - } - - // Save event processing time before sending output - bool eventAccepted = !hltResult->getStreamTags().empty(); - auto eventTime = std::chrono::steady_clock::now() - m_eventTimerStartPoint[eventContext->slot()]; - int64_t eventTimeMillisec = std::chrono::duration_cast<std::chrono::milliseconds>(eventTime).count(); - - // Commit output (write/send the output data) - the arguments are currently not used - sc = m_outputCnvSvc->commitOutput("",true); - if (sc.isFailure()) {delete addr;} - if (check("Conversion service failed to commitOutput", HLT::OnlineErrorCode::OUTPUT_SEND_FAILURE)) {return sc;} - - // The output has been sent out, the ByteStreamAddress can be deleted - delete addr; - delete l1addr; - delete l1addrLegacy; - - //------------------------------------------------------------------------ - // Reset the timeout flag and the timer, and mark the slot as idle - //------------------------------------------------------------------------ - resetEventTimer(*eventContext, /*processing=*/ false); - - //-------------------------------------------------------------------------- - // Clear the slot - //-------------------------------------------------------------------------- - ATH_MSG_DEBUG("Clearing slot " << eventContext->slot() - << " (event " << eventContext->evt() << ") of the whiteboard"); - - sc = clearWBSlot(eventContext->slot()); - if (check("Whiteboard slot " + std::to_string(eventContext->slot()) + " could not be properly cleared", - HLT::OnlineErrorCode::AFTER_RESULT_SENT)) { - return sc; - } - - ATH_MSG_DEBUG("Finished processing " << (eventAccepted ? "accepted" : "rejected") - << " event with context " << *eventContext - << " which took " << eventTimeMillisec << " ms"); - - // Only now after store clearing we can allow the slot to be filled again, - // so we increment m_freeSlots and notify the input thread - ++m_freeSlots; - if (!m_loopStatus.loopEnded && m_inputThread!=nullptr) { - m_inputThread->cond().notify_all(); - } - - // Fill the time monitoring histograms - auto monTimeAny = Monitored::Scalar<int64_t>("TotalTime", eventTimeMillisec); - auto monTimeAcc = Monitored::Scalar<int64_t>(eventAccepted ? "TotalTimeAccepted" : "TotalTimeRejected", eventTimeMillisec); - Monitored::Group(m_monTool, monTimeAny, monTimeAcc); - - // Set ThreadLocalContext to an invalid context as we entering a context-less environment - Gaudi::Hive::setCurrentContext( EventContext() ); - - // Delete the EventContext which was created when calling executeEvent( EventContext(*eventContext) ) - delete eventContext; - - return StatusCode::SUCCESS; -} diff --git a/HLT/Trigger/TrigControl/TrigServices/src/HltAsyncEventLoopMgr.h b/HLT/Trigger/TrigControl/TrigServices/src/HltAsyncEventLoopMgr.h deleted file mode 100644 index ac6f60dd145eb5a3c30ade373fd20ee63afa26ba..0000000000000000000000000000000000000000 --- a/HLT/Trigger/TrigControl/TrigServices/src/HltAsyncEventLoopMgr.h +++ /dev/null @@ -1,380 +0,0 @@ -/* - Copyright (C) 2002-2024 CERN for the benefit of the ATLAS collaboration -*/ - -#ifndef TRIGSERVICES_HLTASYNCEVENTLOOPMGR_H -#define TRIGSERVICES_HLTASYNCEVENTLOOPMGR_H - -// Local includes -#include "EventLoopUtils.h" -#include "TrigSORFromPtreeHelper.h" - -// Trigger includes -#include "TrigKernel/ITrigEventLoopMgr.h" -#include "TrigOutputHandling/HLTResultMTMaker.h" -#include "TrigSteeringEvent/OnlineErrorCode.h" -#include "TrigSteerMonitor/ISchedulerMonSvc.h" -#include "TrigSteerMonitor/ITrigErrorMonTool.h" -#include "TrigT1Result/RoIBResult.h" -#include "xAODTrigger/TrigCompositeContainer.h" - -// Athena includes -#include "AthenaBaseComps/AthService.h" -#include "AthenaKernel/EventContextClid.h" -#include "AthenaKernel/Timeout.h" -#include "AthenaMonitoringKernel/Monitored.h" -#include "CxxUtils/checker_macros.h" -#include "xAODEventInfo/EventInfo.h" -#include "StoreGate/ReadHandleKey.h" -#include "StoreGate/WriteHandleKey.h" - -// Gaudi includes -#include "GaudiKernel/EventIDBase.h" // number_type -#include "GaudiKernel/IEventProcessor.h" -#include "GaudiKernel/IEvtSelector.h" -#include "GaudiKernel/IConversionSvc.h" -#include "GaudiKernel/IAlgResourcePool.h" -#include "GaudiKernel/IAlgExecStateSvc.h" -#include "GaudiKernel/IHiveWhiteBoard.h" -#include "GaudiKernel/IScheduler.h" -#include "GaudiKernel/IIoComponentMgr.h" -#include "GaudiKernel/SmartIF.h" -#include "Gaudi/Interfaces/IOptionsSvc.h" - -// TBB includes -#include "tbb/concurrent_queue.h" -#include "tbb/task_group.h" - -// System includes -#include <atomic> -#include <chrono> -#include <condition_variable> -#include <memory> - -// Forward declarations -class CondAttrListCollection; -class IAlgorithm; -class IIncidentSvc; -class StoreGateSvc; -class TrigCOOLUpdateHelper; - -namespace coral { - class AttributeList; -} -namespace HLT { - class HLTResultMT; -} - -/** @class HltAsyncEventLoopMgr - * @brief AthenaMT event loop manager for running HLT online - **/ -class HltAsyncEventLoopMgr : public extends<AthService, ITrigEventLoopMgr, IEventProcessor>, - public Athena::TimeoutMaster -{ - -public: - - /// Standard constructor - HltAsyncEventLoopMgr(const std::string& name, ISvcLocator* svcLoc); - /// Standard destructor - virtual ~HltAsyncEventLoopMgr() noexcept override; - - // Copy and move not allowed - HltAsyncEventLoopMgr(const HltAsyncEventLoopMgr&) = delete; - HltAsyncEventLoopMgr(HltAsyncEventLoopMgr&&) = delete; - HltAsyncEventLoopMgr& operator=(const HltAsyncEventLoopMgr&) = delete; - HltAsyncEventLoopMgr& operator=(HltAsyncEventLoopMgr&&) = delete; - - /// @name Gaudi state transitions (overriden from AthService) - ///@{ - virtual StatusCode initialize() override; - virtual StatusCode stop() override; - virtual StatusCode finalize() override; - ///@} - - /// @name State transitions of ITrigEventLoopMgr interface - ///@{ - virtual StatusCode prepareForStart (const boost::property_tree::ptree &) override; - virtual StatusCode prepareForRun ATLAS_NOT_THREAD_SAFE (const boost::property_tree::ptree& pt) override; - virtual StatusCode hltUpdateAfterFork(const boost::property_tree::ptree& pt) override; - ///@} - - /** - * Implementation of IEventProcessor::executeRun which calls IEventProcessor::nextEvent - * @param maxevt number of events to process, -1 means all - */ - virtual StatusCode executeRun(int maxevt=-1) override; - - /** - * Implementation of IEventProcessor::nextEvent which implements the event loop - * @param maxevt number of events to process, -1 means all - */ - virtual StatusCode nextEvent(int maxevt=-1) override; - - /** - * Implementation of IEventProcessor::executeEvent which processes a single event - * @param ctx the EventContext of the event to process - */ - virtual StatusCode executeEvent( EventContext &&ctx ) override; - - /** - * create an Event Context object - */ - virtual EventContext createEventContext() override; - - /** - * Implementation of IEventProcessor::stopRun (obsolete for online runnning) - */ - virtual StatusCode stopRun() override; - -private: - // ------------------------- Helper types ------------------------------------ - /// Flags and counters steering the main event loop execution - struct EventLoopStatus { - /// Event source has more events - std::atomic<bool> eventsAvailable{true}; - /// No more events available and all ongoing processing has finished - std::atomic<bool> loopEnded{false}; - /// Condition variable to notify the main thread of the end of the event loop - std::condition_variable loopEndedCond; - /// Mutex to notify the main thread of the end of the event loop - std::mutex loopEndedMutex; - /// Max lumiblock number seen in the loop - std::atomic<EventIDBase::number_type> maxLB{0}; - /// Condition variable to synchronize COOL updates - std::condition_variable coolUpdateCond; - /// Mutex to synchronize COOL updates - std::mutex coolUpdateMutex; - /// COOL update ongoing - bool coolUpdateOngoing{false}; - /// Event exit status code - StatusCode exitCode{StatusCode::SUCCESS}; - }; - - // ------------------------- Helper methods ---------------------------------- - - /// Read DataFlow configuration properties - void updateDFProps(); - - // Update internally kept data from new sor - void updateInternal(const coral::AttributeList & sor_attrlist); - - // Update internally kept data from new sor - void updateMetadataStore(const coral::AttributeList & sor_attrlist) const; - - /// Set magnetic field currents from ptree - StatusCode updateMagField(const boost::property_tree::ptree& pt) const; - - /// Clear per-event stores - StatusCode clearTemporaryStores(); - - /// Update the detector mask - void updateDetMask(const std::pair<uint64_t, uint64_t>& dm); - - /// Extract the single attr list off the SOR CondAttrListCollection - const coral::AttributeList& getSorAttrList() const; - - /// Print the SOR record - void printSORAttrList(const coral::AttributeList& atr) const; - - /// Execute optional algs/sequences - StatusCode execAtStart(const EventContext& ctx) const; - - /** @brief Handle a failure to process an event - * @return FAILURE means the event loop was flagged to stop (no new events will be requested) - **/ - StatusCode failedEvent(HLT::OnlineErrorCode errorCode, - const EventContext& eventContext); - - /// Reset the timeout flag and the timer, and mark the slot as busy or idle according to the second argument - void resetEventTimer(const EventContext& eventContext, bool processing); - - /// Clear an event slot in the whiteboard - StatusCode clearWBSlot(size_t evtSlot) const; - - /// @name Methods executed by LoopThreads - ///@{ - /// The method executed by the input handling thread - void inputThreadCallback(); - - /// The method executed by the output handling thread - void outputThreadCallback(); - - /// The method executed by the event timeout monitoring thread - void eventTimerCallback(); - ///@} - - /// @name Methods executed by TBB tasks - ///@{ - /// Perform all start-of-event actions for a single new event and push it to the scheduler - StatusCode startNextEvent(); - - /// Perform all end-of-event actions for a single event popped out from the scheduler - StatusCode processFinishedEvent(); - ///@} - - // ------------------------- Handles to required services/tools -------------- - ServiceHandle<IIncidentSvc> m_incidentSvc{this, "IncidentSvc", "IncidentSvc"}; - ServiceHandle<Gaudi::Interfaces::IOptionsSvc> m_jobOptionsSvc{this, "JobOptionsSvc", "JobOptionsSvc"}; - ServiceHandle<StoreGateSvc> m_evtStore{this, "EventStore", "StoreGateSvc"}; - ServiceHandle<StoreGateSvc> m_detectorStore{this, "DetectorStore", "DetectorStore"}; - ServiceHandle<StoreGateSvc> m_inputMetaDataStore{this, "InputMetaDataStore", "StoreGateSvc/InputMetaDataStore"}; - ServiceHandle<IIoComponentMgr> m_ioCompMgr{this, "IoComponentMgr", "IoComponentMgr"}; - ServiceHandle<IEvtSelector> m_evtSelector{this, "EvtSel", "EvtSel"}; - ServiceHandle<IConversionSvc> m_outputCnvSvc{this, "OutputCnvSvc", "OutputCnvSvc"}; - ServiceHandle<ISchedulerMonSvc> m_schedulerMonSvc{this, "SchedulerMonSvc", "SchedulerMonSvc"}; - ToolHandle<TrigCOOLUpdateHelper> m_coolHelper{this, "CoolUpdateTool", "TrigCOOLUpdateHelper"}; - ToolHandle<HLTResultMTMaker> m_hltResultMaker{this, "ResultMaker", "HLTResultMTMaker"}; - ToolHandle<GenericMonitoringTool> m_monTool{this, "MonTool", "", "Monitoring tool"}; - ToolHandle<ITrigErrorMonTool> m_errorMonTool{this, "TrigErrorMonTool", "TrigErrorMonTool", "Error monitoring tool"}; - - SmartIF<IHiveWhiteBoard> m_whiteboard; - SmartIF<IAlgResourcePool> m_algResourcePool; - SmartIF<IAlgExecStateSvc> m_aess; - SmartIF<IScheduler> m_schedulerSvc; - - std::unique_ptr<TrigSORFromPtreeHelper> m_sorHelper; - - // ------------------------- Other properties -------------------------------------- - Gaudi::Property<std::string> m_schedulerName{ - this, "SchedulerSvc", "AvalancheSchedulerSvc", "Name of the scheduler"}; - - Gaudi::Property<std::string> m_whiteboardName{ - this, "WhiteboardSvc", "EventDataSvc", "Name of the Whiteboard"}; - - Gaudi::Property<float> m_hardTimeout{ - this, "HardTimeout", 10*60*1000/*=10min*/, "Hard event processing timeout in milliseconds"}; - - Gaudi::Property<float> m_softTimeoutFraction{ - this, "SoftTimeoutFraction", 0.8, "Fraction of the hard timeout to be set as the soft timeout"}; - - Gaudi::Property<unsigned int> m_timeoutThreadIntervalMs{ - this, "TimeoutThreadIntervalMs", 1000, "How often the timeout thread checks for soft timeout, in milliseconds"}; - - Gaudi::Property<bool> m_traceOnTimeout{ - this, "TraceOnTimeout", true, - "Print a stack trace on the first soft timeout (might take a while, holding all threads)"}; - - Gaudi::Property<int> m_maxParallelIOTasks{ - this, "MaxParallelIOTasks", -1, - "Maximum number of I/O tasks which can be executed in parallel. " - "If <=0 then the number of scheduler threads is used."}; - - Gaudi::Property<int> m_maxIOWakeUpIntervalMs{ - this, "MaxIOWakeUpIntervalMs", -1, - "Maximum time input or output handling thread will sleep unless notified. Negative value (default) means no limit, " - "i.e. threads will only wake up on notifications. Zero means threads will never wait for notifications. " - "Positive value means the number of milliseconds after which a thread will wake up if it's not notified earlier."}; - - Gaudi::Property<int> m_maxFrameworkErrors{ - this, "MaxFrameworkErrors", 10, - "Tolerable number of recovered framework errors before exiting (<0 means all are tolerated)"}; - - Gaudi::Property<std::string> m_fwkErrorDebugStreamName{ - this, "FwkErrorDebugStreamName", "HLTMissingData", - "Debug stream name for events with HLT framework errors"}; - - Gaudi::Property<std::string> m_algErrorDebugStreamName{ - this, "AlgErrorDebugStreamName", "HltError", - "Debug stream name for events with HLT algorithm errors"}; - - Gaudi::Property<std::string> m_timeoutDebugStreamName{ - this, "TimeoutDebugStreamName", "HltTimeout", - "Debug stream name for events with HLT timeout"}; - - Gaudi::Property<std::string> m_truncationDebugStreamName{ - this, "TruncationDebugStreamName", "TruncatedHLTResult", - "Debug stream name for events with HLT result truncation"}; - - Gaudi::Property<std::string> m_sorPath{ - this, "SORPath", "/TDAQ/RunCtrl/SOR_Params", "Path to StartOfRun parameters in detector store"}; - - Gaudi::Property<std::vector<std::string>> m_execAtStart{ - this, "execAtStart", {}, "List of algorithms/sequences to execute during prepareForRun"}; - - Gaudi::Property<bool> m_setMagFieldFromPtree{ - this, "setMagFieldFromPtree", true, "Read magnet currents from ptree"}; - - Gaudi::Property<unsigned int> m_forceRunNumber{ - this, "forceRunNumber", 0, "Override run number"}; - - Gaudi::Property<unsigned int> m_forceLumiblock{ - this, "forceLumiblock", 0, "Override lumiblock number"}; - - Gaudi::Property<unsigned long long> m_forceSOR_ns{ - this, "forceStartOfRunTime", 0, "Override SOR time (epoch in nano-seconds)"}; - - Gaudi::Property<bool> m_rewriteLVL1{ - this, "RewriteLVL1", false, - "Encode L1 results to ByteStream and write to the output. Possible only with athenaHLT, not online."}; - - Gaudi::Property<bool> m_monitorScheduler{ - this, "MonitorScheduler", false, "Enable SchedulerMonSvc to collect scheduler status data in online histograms"}; - - SG::WriteHandleKey<EventContext> m_eventContextWHKey{ - this, "EventContextWHKey", "EventContext", "StoreGate key for recording EventContext"}; - - SG::ReadHandleKey<xAOD::EventInfo> m_eventInfoRHKey{ - this, "EventInfoRHKey", "EventInfo", "StoreGate key for reading xAOD::EventInfo"}; - - SG::ReadHandleKey<xAOD::TrigCompositeContainer> m_l1TriggerResultRHKey{ - this, "L1TriggerResultRHKey", "", "StoreGate key for reading L1TriggerResult for RewriteLVL1"}; - - SG::ReadHandleKey<ROIB::RoIBResult> m_roibResultRHKey{ - this, "RoIBResultRHKey", "", "StoreGate key for reading RoIBResult for RewriteLVL1 with legacy (Run-2) L1 simulation"}; - - SG::ReadHandleKey<HLT::HLTResultMT> m_hltResultRHKey; ///< StoreGate key for reading the HLT result - - // ------------------------- Other private members --------------------------- - /// typedef used for detector mask fields - typedef EventIDBase::number_type numt; - /** - * Detector mask0,1,2,3 - bit field indicating which TTC zones have been built into the event, - * one bit per zone, 128 bit total, significance increases from first to last - */ - std::tuple<numt, numt, numt, numt> m_detector_mask{0xffffffff, 0xffffffff, 0, 0}; - - /// "Event" context of current run with dummy event/slot number - EventContext m_currentRunCtx{0,0}; - /// Event counter used for local bookkeeping; incremental per instance of HltAsyncEventLoopMgr, unrelated to global_id - std::atomic<size_t> m_localEventNumber{0}; - /// Event selector context - IEvtSelector::Context* m_evtSelContext{nullptr}; - /// Vector of event start-processing time stamps in each slot - std::vector<std::chrono::steady_clock::time_point> m_eventTimerStartPoint; - /// Vector of time stamps telling when each scheduler slot was freed - std::vector<std::chrono::steady_clock::time_point> m_freeSlotStartPoint; - /// Vector of flags to tell if a slot is idle or processing - std::vector<bool> m_isSlotProcessing; // be aware of vector<bool> specialisation - /// Number of free slots used to synchronise input/output tasks - std::atomic<size_t> m_freeSlots{0}; - /// Input handling thread (triggers reading new events) - std::unique_ptr<HLT::LoopThread> m_inputThread; - /// Output handling thread (triggers post-processing of finished events) - std::unique_ptr<HLT::LoopThread> m_outputThread; - /// Timeout thread - std::unique_ptr<HLT::LoopThread> m_timeoutThread; - /// Soft timeout value set to HardTimeout*SoftTimeoutFraction at initialisation - std::chrono::milliseconds m_softTimeoutValue{0}; - /// Task group to execute parallel I/O tasks asynchronously - tbb::task_group m_parallelIOTaskGroup; - /// Queue limiting the number of parallel I/O tasks - tbb::concurrent_bounded_queue<bool> m_parallelIOQueue; - /// Queue of events ready for output processing - tbb::concurrent_bounded_queue<EventContext*> m_finishedEventsQueue; - /// Object keeping track of the event loop status - EventLoopStatus m_loopStatus{}; - /// Flag set when a soft timeout produces a stack trace, to avoid producing multiple traces - bool m_timeoutTraceGenerated{false}; - /// Counter of framework errors - std::atomic<int> m_nFrameworkErrors{0}; - /// Application name - std::string m_applicationName; - /// Worker ID - int m_workerID{0}; - /// Worker PID - int m_workerPID{0}; - -}; - -#endif // TRIGSERVICES_HLTASYNCEVENTLOOPMGR_H diff --git a/HLT/Trigger/TrigControl/TrigServices/src/HltEventLoopMgr.cxx b/HLT/Trigger/TrigControl/TrigServices/src/HltEventLoopMgr.cxx index 97a33efbec8d59cfe2d5480f3aab0e3e1f6f1159..e288ede3e275cccd1b2335f97bc5b439079d7689 100644 --- a/HLT/Trigger/TrigControl/TrigServices/src/HltEventLoopMgr.cxx +++ b/HLT/Trigger/TrigControl/TrigServices/src/HltEventLoopMgr.cxx @@ -2,11 +2,13 @@ Copyright (C) 2002-2024 CERN for the benefit of the ATLAS collaboration */ -// Trigger includes +// Local includes #include "HltEventLoopMgr.h" #include "TrigCOOLUpdateHelper.h" -#include "TrigKernel/HltExceptions.h" #include "TrigRDBManager.h" + +// Trigger includes +#include "TrigKernel/HltExceptions.h" #include "TrigSteeringEvent/HLTResultMT.h" // Athena includes @@ -31,14 +33,12 @@ #include "eformat/StreamTag.h" #include "owl/time.h" -// Boost includes -#include <filesystem> - // ROOT includes #include "TROOT.h" #include "TSystem.h" // System includes +#include <filesystem> #include <sstream> #include <string> @@ -67,6 +67,15 @@ using namespace boost::property_tree; HltEventLoopMgr::HltEventLoopMgr(const std::string& name, ISvcLocator* svcLoc) : base_class(name, svcLoc) {} +// ============================================================================= +// Standard destructor +// ============================================================================= +HltEventLoopMgr::~HltEventLoopMgr() noexcept +{ + // tbb:task_group destructor throws if wait() was never called + m_parallelIOTaskGroup.wait(); +} + // ============================================================================= // Reimplementation of AthService::initalize (IStateful interface) // ============================================================================= @@ -95,6 +104,7 @@ StatusCode HltEventLoopMgr::initialize() ATH_MSG_INFO(" ---> HardTimeout = " << m_hardTimeout.value()); ATH_MSG_INFO(" ---> SoftTimeoutFraction = " << m_softTimeoutFraction.value()); ATH_MSG_INFO(" ---> SoftTimeoutValue = " << m_softTimeoutValue.count()); + ATH_MSG_INFO(" ---> TimeoutThreadIntervalMs = " << m_timeoutThreadIntervalMs.value()); ATH_MSG_INFO(" ---> TraceOnTimeout = " << m_traceOnTimeout.value()); ATH_MSG_INFO(" ---> MaxFrameworkErrors = " << m_maxFrameworkErrors.value()); ATH_MSG_INFO(" ---> FwkErrorDebugStreamName = " << m_fwkErrorDebugStreamName.value()); @@ -103,10 +113,11 @@ StatusCode HltEventLoopMgr::initialize() ATH_MSG_INFO(" ---> TruncationDebugStreamName = " << m_truncationDebugStreamName.value()); ATH_MSG_INFO(" ---> SORPath = " << m_sorPath.value()); ATH_MSG_INFO(" ---> setMagFieldFromPtree = " << m_setMagFieldFromPtree.value()); + ATH_MSG_INFO(" ---> execAtStart = " << m_execAtStart.value()); ATH_MSG_INFO(" ---> forceRunNumber = " << m_forceRunNumber.value()); + ATH_MSG_INFO(" ---> forceLumiblock = " << m_forceLumiblock.value()); ATH_MSG_INFO(" ---> forceStartOfRunTime = " << m_forceSOR_ns.value()); ATH_MSG_INFO(" ---> RewriteLVL1 = " << m_rewriteLVL1.value()); - ATH_MSG_INFO(" ---> PopAllMode = " << m_popAll.value()); ATH_MSG_INFO(" ---> EventContextWHKey = " << m_eventContextWHKey.key()); ATH_MSG_INFO(" ---> EventInfoRHKey = " << m_eventInfoRHKey.key()); @@ -140,8 +151,7 @@ StatusCode HltEventLoopMgr::initialize() ATH_CHECK(m_maxParallelIOTasks.fromString(threads)); } ATH_MSG_INFO(" ---> MaxParallelIOTasks = " << m_maxParallelIOTasks.value()); - ATH_MSG_INFO(" ---> PopFromSchedulerTimeout = " << m_popFromSchedulerTimeout.value()); - ATH_MSG_INFO(" ---> PopFromSchedulerQueryInterval = " << m_popFromSchedulerQueryInterval.value()); + ATH_MSG_INFO(" ---> MaxIOWakeUpIntervalMs = " << m_maxIOWakeUpIntervalMs.value()); //---------------------------------------------------------------------------- // Setup all Hive services for multithreaded event processing with the exception of SchedulerSvc, @@ -455,29 +465,21 @@ StatusCode HltEventLoopMgr::hltUpdateAfterFork(const ptree& /*pt*/) } ATH_CHECK(m_ioCompMgr->io_reinitialize()); - // Start the timeout thread - ATH_MSG_DEBUG("Starting the timeout thread"); - m_timeoutThread = std::make_unique<std::thread>(std::bind(&HltEventLoopMgr::runEventTimer,this)); + const size_t numSlots = m_whiteboard->getNumberOfStores(); + m_freeSlots = numSlots; // Initialise vector of time points for event timeout monitoring - { - std::unique_lock<std::mutex> lock(m_timeoutMutex); - m_eventTimerStartPoint.clear(); - m_eventTimerStartPoint.resize(m_whiteboard->getNumberOfStores(), std::chrono::steady_clock::now()); - m_isSlotProcessing.resize(m_whiteboard->getNumberOfStores(), false); - } - m_timeoutCond.notify_all(); + m_eventTimerStartPoint.clear(); + m_eventTimerStartPoint.resize(numSlots, std::chrono::steady_clock::now()); + m_isSlotProcessing.resize(numSlots, false); // Initialise vector of time points for free slots monitoring m_freeSlotStartPoint.clear(); - m_freeSlotStartPoint.resize(m_whiteboard->getNumberOfStores(), std::chrono::steady_clock::now()); + m_freeSlotStartPoint.resize(numSlots, std::chrono::steady_clock::now()); - // Initialise arena and queues used in parallel I/O steering - m_parallelIOTaskArena = std::make_unique<tbb::task_arena>(int(tbb::task_arena::automatic), 0); - m_parallelIOQueue.set_capacity(static_cast<size_t>(m_maxParallelIOTasks.value())); - m_finishedEventsQueue.set_capacity(m_whiteboard->getNumberOfStores()); - m_drainSchedulerStatusQueue.set_capacity(m_whiteboard->getNumberOfStores()); - m_startNextEventStatusQueue.set_capacity(m_whiteboard->getNumberOfStores()); + // Initialise the queues used in parallel I/O steering + m_parallelIOQueue.set_capacity(static_cast<decltype(m_parallelIOQueue)::size_type>(m_maxParallelIOTasks.value())); + m_finishedEventsQueue.set_capacity(static_cast<decltype(m_finishedEventsQueue)::size_type>(numSlots)); // Fire incident to update listeners after forking m_incidentSvc->fireIncident(AthenaInterprocess::UpdateAfterFork(m_workerID, m_workerPID, name(), m_currentRunCtx)); @@ -511,17 +513,6 @@ StatusCode HltEventLoopMgr::executeRun(int maxevt) if (m_monitorScheduler) ATH_CHECK(m_schedulerMonSvc->stopMonitoring()); - // Stop the timer thread - { - ATH_MSG_DEBUG("Stopping the timeout thread"); - std::unique_lock<std::mutex> lock(m_timeoutMutex); - m_runEventTimer = false; - } - m_timeoutCond.notify_all(); - m_timeoutThread->join(); - m_timeoutThread.reset(); - ATH_MSG_DEBUG("The timeout thread finished"); - ATH_MSG_VERBOSE("end of " << __FUNCTION__); return sc; } @@ -534,94 +525,40 @@ StatusCode HltEventLoopMgr::nextEvent(int /*maxevt*/) { ATH_MSG_VERBOSE("start of " << __FUNCTION__); - ATH_MSG_INFO("Starting loop on events"); - - EventLoopStatus loopStatus{}; - - while (!loopStatus.loopEnded) { - ATH_MSG_DEBUG("Free processing slots = " << m_schedulerSvc->freeSlots()); - ATH_MSG_DEBUG("Free event data slots = " << m_whiteboard->freeSlots()); - - if (m_schedulerSvc->freeSlots() != m_whiteboard->freeSlots()) { - // Starvation detected - try to recover and return FAILURE if the recovery fails. This can only happen if there - // is an unhandled error after popping an event from the scheduler and before clearing the event data slot for - // this finished event. It's an extra protection in the unlikely case that failedEvent doesn't cover all errors. - ATH_CHECK(recoverFromStarvation()); - } - - // Decide what to do in this event loop step - bool do_fill_scheduler = m_schedulerSvc->freeSlots()>0 && loopStatus.eventsAvailable && !loopStatus.triggerOnHold; - bool do_drain_scheduler = !do_fill_scheduler; - // Clear the trigger_on_hold flag - loopStatus.triggerOnHold.store(false, std::memory_order_relaxed); - - // Read in and start processing another event - if (do_fill_scheduler) { - size_t numSlotsToFill = m_schedulerSvc->freeSlots(); - ATH_MSG_DEBUG("Free slots = " << numSlotsToFill << ". Reading new event(s) to fill the slot(s)."); - - // Fill all free slots with new events - for (size_t i=0; i<numSlotsToFill; ++i) { - auto task = [mgr=this, &loopStatus](){ - StatusCode sc = StatusCode::SUCCESS; - try { - sc = mgr->startNextEvent(loopStatus); - } - catch (const std::exception& e) { - mgr->error() << "Exception caught in startNextEvent: " << e.what() << endmsg; - sc = StatusCode::FAILURE; - } - catch (...) { - mgr->error() << "Exception caught in startNextEvent" << endmsg; - sc = StatusCode::FAILURE; - } - mgr->m_startNextEventStatusQueue.push(sc); - // Pop one item from parallel I/O queue to decrement its size - it doesn't matter which item - // is popped, we only use the queue size to limit the number of tasks running in parallel - bool popIOQueue{false}; - mgr->m_parallelIOQueue.pop(popIOQueue); - }; - - // Push one item to the parallel I/O queue to increment its size - the value doesn't matter, - // we only use the queue size and benefit from the blocking push call here to limit the number - // of tasks running in parallel. Once we can push to the queue, we can enqueue the task to the arena. - m_parallelIOQueue.push(true); - m_parallelIOTaskArena->enqueue(std::move(task)); - } + // Start the event timer thread + ATH_MSG_DEBUG("Starting the timeout thread"); + m_timeoutThread = std::make_unique<HLT::LoopThread>([this]{return eventTimerCallback();}, m_timeoutThreadIntervalMs.value()); + m_timeoutThread->start(); - // Wait until startNextEvent is done for all slots - std::vector<StatusCode> statusCodes; - while (statusCodes.size() < numSlotsToFill) { - StatusCode sc = StatusCode::FAILURE; - m_startNextEventStatusQueue.pop(sc); - statusCodes.push_back(sc); - } + // Start the event loop + ATH_MSG_INFO("Starting loop on events"); + std::unique_lock<std::mutex> lock{m_loopStatus.loopEndedMutex}; + m_inputThread = std::make_unique<HLT::LoopThread>([this]{return inputThreadCallback();}, m_maxIOWakeUpIntervalMs.value()); + m_outputThread = std::make_unique<HLT::LoopThread>([this]{return outputThreadCallback();}, m_maxIOWakeUpIntervalMs.value()); + m_outputThread->start(); + m_inputThread->start(); + + // Wait for event loop to end. The condition means the main input and output threads flagged they have + // nothing else to do and will exit asynchronously (later than the wait here ends) + ATH_MSG_DEBUG("Event loop started, the main thread is going to sleep until it finishes"); + m_loopStatus.loopEndedCond.wait(lock, [this](){return m_loopStatus.loopEnded.load();}); + ATH_MSG_INFO("All events processed, finalising the event loop"); + + // Wait for the I/O TBB tasks and main I/O threads to finish. Note the TBB tasks need to finish first + // because they may notify the condition variables in the main I/O threads. The lifetime of the condition + // variables must span beyond any I/O TBB task. + ATH_MSG_DEBUG("Waiting for all I/O tasks and threads to return"); + m_parallelIOTaskGroup.wait(); + m_inputThread.reset(); + m_outputThread.reset(); + + // Stop the event timer thread + ATH_MSG_DEBUG("All I/O threads and tasks finished. Stopping the timeout thread"); + m_timeoutThread->stop(); + m_timeoutThread.reset(); + ATH_MSG_DEBUG("The timeout thread finished"); - // Check the startNextEvent status code for all slots - for (StatusCode sc : statusCodes) { - if (sc.isFailure()) {return sc;} - } - } // End of if(do_fill_scheduler) - - // Wait for events to finish processing and write their output - if (do_drain_scheduler) { - ATH_MSG_DEBUG("No free slots or no more events to process - draining the scheduler"); - DrainSchedulerStatusCode drainResult = drainScheduler(); - if (drainResult==DrainSchedulerStatusCode::FAILURE) { - ATH_MSG_ERROR("Error in draining scheduler, exiting the event loop"); - return StatusCode::FAILURE; - } - if (drainResult==DrainSchedulerStatusCode::RECOVERABLE) { - ATH_MSG_WARNING("Recoverable error in draining scheduler, continuing the event loop"); - continue; - } - if (drainResult==DrainSchedulerStatusCode::SCHEDULER_EMPTY && !loopStatus.eventsAvailable) { - ATH_MSG_INFO("All events processed, finalising the event loop"); - loopStatus.loopEnded = true; - } - // else drainResult is SUCCESS or NO_EVENT, so we just continue - } - } + ATH_MSG_INFO("Finished loop on events"); ATH_MSG_VERBOSE("end of " << __FUNCTION__); return StatusCode::SUCCESS; @@ -883,55 +820,70 @@ StatusCode HltEventLoopMgr::failedEvent(HLT::OnlineErrorCode errorCode, const Ev // Used by MsgSvc (and possibly others but not relevant here) Gaudi::Hive::setCurrentContext(eventContext); - auto drainAllAndProceed = [&]() -> StatusCode { - resetEventTimer(eventContext, /*processing=*/ false); // stop the timeout monitoring for the failed slot - ATH_CHECK(drainAllSlots()); // break the event loop on failure - if ( m_maxFrameworkErrors.value()>=0 && ((++m_nFrameworkErrors)>m_maxFrameworkErrors.value()) ) { - ATH_MSG_ERROR("The number of tolerable framework errors for this HltEventLoopMgr instance, which is " - << m_maxFrameworkErrors.value() << ", was exceeded. Exiting the event loop."); - return StatusCode::FAILURE; // break the event loop - } - return StatusCode::SUCCESS; // continue the event loop + auto returnFailureAndStopEventLoop = [this]() -> StatusCode { + ATH_MSG_INFO("Stopping event loop due to failure"); + // Change the loop exit code to FAILURE + m_loopStatus.exitCode = StatusCode::FAILURE; + + // Flag eventsAvailable=false which will result in I/O threads to finish all the ongoing event processing + // and then stop. We cannot flag loopEnded=true here yet, because it would finish the I/O threads while + // there might be still events being processed and they would crash when finished. + m_loopStatus.eventsAvailable = false; + + // Inform the caller the failure could not be handled cleanly and the event loop will stop + return StatusCode::FAILURE; }; //---------------------------------------------------------------------------- - // Handle cases where we can only try to clear all slots and continue + // Handle framework errors by printing an informative message and breaking the loop //---------------------------------------------------------------------------- - if (errorCode==HLT::OnlineErrorCode::BEFORE_NEXT_EVENT) { ATH_MSG_ERROR("Failure occurred with OnlineErrorCode=" << errorCode - << " meaning there was a framework error before requesting a new event. No output will be produced and" - << " all slots of this HltEventLoopMgr instance will be drained before proceeding."); - return drainAllAndProceed(); + << " meaning there was a framework error before requesting a new event. No output will be produced for this event" + << " and the event loop will exit after all ongoing processing is finished."); + return returnFailureAndStopEventLoop(); + } + if (errorCode==HLT::OnlineErrorCode::CANNOT_RETRIEVE_EVENT) { + ATH_MSG_ERROR("Failure occurred with OnlineErrorCode=" << errorCode + << " meaning a new event could not be correctly read. No output will be produced for this event." + << " The event loop will exit after all ongoing processing is finished."); + return returnFailureAndStopEventLoop(); } if (errorCode==HLT::OnlineErrorCode::AFTER_RESULT_SENT) { ATH_MSG_ERROR("Failure occurred with OnlineErrorCode=" << errorCode << " meaning there was a framework error after HLT result was already sent out." - << " All slots of this HltEventLoopMgr instance will be drained before proceeding."); - return drainAllAndProceed(); + << " The event loop will exit after all ongoing processing is finished."); + return returnFailureAndStopEventLoop(); } if (errorCode==HLT::OnlineErrorCode::CANNOT_ACCESS_SLOT) { - ATH_MSG_ERROR("Failed to access the slot for the processed event, cannot produce output. OnlineErrorCode=" << errorCode - << ". All slots of this HltEventLoopMgr instance will be drained before proceeding, then either the loop will" - << " exit with a failure code or the failed event will reach a hard timeout."); - return drainAllAndProceed(); + ATH_MSG_ERROR("Failed to access the slot for the processed event, cannot produce output. OnlineErrorCode=" + << errorCode << ". The event loop will exit after all ongoing processing is finished unless the failed event" + << " reaches a hard timeout sooner and this process is killed."); + return returnFailureAndStopEventLoop(); + } + if (errorCode==HLT::OnlineErrorCode::SCHEDULING_FAILURE) { + // Here we cannot be certain if the scheduler started processing the event or not. If yes, the output thread + // will finalise the event as normal. If not, the event will eventually reach a hard timeout and this process + // is killed, or we exit the process without ever producing output for this event (needs to be handled upstream). + ATH_MSG_ERROR("Failure occurred with OnlineErrorCode=" << errorCode + << ". Cannot determine if the event processing started or not and whether a decision for this event will be" + << " produced. The event loop will exit after all ongoing processing is finished, which may include or" + << " not include the problematic event."); + return returnFailureAndStopEventLoop(); + } + if (errorCode==HLT::OnlineErrorCode::SCHEDULER_POP_FAILURE) { + ATH_MSG_ERROR("Failure occurred with OnlineErrorCode=" << errorCode + << " meaning the Scheduler returned FAILURE when asked to give a finished event. Will keep trying to" + << " pop further events if there are any still in the scheduler, but this may keep repeating until" + << " this process is killed by hard timeout or other means. If all ongoing processing manages to finish" + << " then the event loop will exit."); + return returnFailureAndStopEventLoop(); } if (!eventContext.valid()) { ATH_MSG_ERROR("Failure occurred with an invalid EventContext. Likely there was a framework error before" << " requesting a new event or after sending the result of a finished event. OnlineErrorCode=" << errorCode - << ". All slots of this HltEventLoopMgr instance will be drained before proceeding."); - return drainAllAndProceed(); - } - - //---------------------------------------------------------------------------- - // In case of event source failure, drain the scheduler and break the loop - //---------------------------------------------------------------------------- - if (errorCode==HLT::OnlineErrorCode::CANNOT_RETRIEVE_EVENT) { - ATH_MSG_ERROR("Failure occurred with OnlineErrorCode=" << errorCode - << " meaning a new event could not be correctly read. No output will be produced for this event." - << " All slots of this HltEventLoopMgr instance will be drained and the loop will exit."); - ATH_CHECK(drainAllSlots()); - return StatusCode::FAILURE; + << ". The event loop will exit after all ongoing processing is finished."); + return returnFailureAndStopEventLoop(); } //---------------------------------------------------------------------------- @@ -941,20 +893,6 @@ StatusCode HltEventLoopMgr::failedEvent(HLT::OnlineErrorCode errorCode, const Ev return failedEvent(HLT::OnlineErrorCode::CANNOT_ACCESS_SLOT,eventContext); } - //---------------------------------------------------------------------------- - // Handle SCHEDULING_FAILURE - //---------------------------------------------------------------------------- - if (errorCode==HLT::OnlineErrorCode::SCHEDULING_FAILURE) { - // Here we cannot be certain if the scheduler started processing the event or not, so we can only try to drain - // the scheduler and continue. Trying to create a debug stream result for this event and clear the event slot may - // lead to further problems if the event is being processed - ATH_MSG_ERROR("Failure occurred with OnlineErrorCode=" << errorCode - << ". Cannot determine if the event processing started or not. Current local event number is " - << eventContext.evt() << ", slot " << eventContext.slot() - << ". All slots of this HltEventLoopMgr instance will be drained before proceeding."); - return drainAllAndProceed(); - } - //---------------------------------------------------------------------------- // Define a debug stream tag for the HLT result //---------------------------------------------------------------------------- @@ -1006,9 +944,8 @@ StatusCode HltEventLoopMgr::failedEvent(HLT::OnlineErrorCode errorCode, const Ev // Avoid infinite loop ATH_MSG_ERROR("Second failure to build or record the HLT Result in event store while handling a failed event. " << "Cannot force-accept this event from HLT side, will rely on data collector to do this. " - << "All slots of this HltEventLoopMgr instance will be drained and the loop will exit."); - ATH_CHECK(drainAllSlots()); - return StatusCode::FAILURE; + << "The event loop will exit after all ongoing processing is finished."); + return returnFailureAndStopEventLoop(); } ATH_MSG_ERROR("Failed to build or record the HLT Result in event store while handling a failed event. " << "Trying again with skipped filling of the result contents (except debug stream tag)."); @@ -1030,9 +967,8 @@ StatusCode HltEventLoopMgr::failedEvent(HLT::OnlineErrorCode errorCode, const Ev if (m_outputCnvSvc->connectOutput("").isFailure()) { ATH_MSG_ERROR("The output conversion service failed in connectOutput() while handling a failed event. " << "Cannot force-accept this event from HLT side, will rely on data collector to do this. " - << "All slots of this HltEventLoopMgr instance will be drained and the loop will exit."); - ATH_CHECK(drainAllSlots()); - return StatusCode::FAILURE; + << "The event loop will exit after all ongoing processing is finished."); + return returnFailureAndStopEventLoop(); } DataObject* hltResultDO = m_evtStore->accessData(hltResultWH.clid(),hltResultWH.key()); @@ -1041,9 +977,8 @@ StatusCode HltEventLoopMgr::failedEvent(HLT::OnlineErrorCode errorCode, const Ev // Avoid infinite loop ATH_MSG_ERROR("Second failure to build or record the HLT Result in event store while handling a failed event. " << "Cannot force-accept this event from HLT side, will rely on data collector to do this. " - << "All slots of this HltEventLoopMgr instance will be drained and the loop will exit."); - ATH_CHECK(drainAllSlots()); - return StatusCode::FAILURE; + << "The event loop will exit after all ongoing processing is finished."); + return returnFailureAndStopEventLoop(); } ATH_MSG_ERROR("Failed to retrieve DataObject for the HLT result object while handling a failed event. " << "Trying again with skipped filling of the result contents (except debug stream tag)."); @@ -1054,19 +989,17 @@ StatusCode HltEventLoopMgr::failedEvent(HLT::OnlineErrorCode errorCode, const Ev if (m_outputCnvSvc->createRep(hltResultDO,addr).isFailure() || addr == nullptr) { ATH_MSG_ERROR("Conversion of HLT result object to the output format failed while handling a failed event. " << "Cannot force-accept this event from HLT side, will rely on data collector to do this. " - << "All slots of this HltEventLoopMgr instance will be drained and the loop will exit."); + << "The event loop will exit after all ongoing processing is finished."); delete addr; - ATH_CHECK(drainAllSlots()); - return StatusCode::FAILURE; + return returnFailureAndStopEventLoop(); } if (m_outputCnvSvc->commitOutput("",true).isFailure()) { ATH_MSG_ERROR("The output conversion service failed in commitOutput() while handling a failed event. " << "Cannot force-accept this event from HLT side, will rely on data collector to do this. " - << "All slots of this HltEventLoopMgr instance will be drained and the loop will exit."); + << "The event loop will exit after all ongoing processing is finished."); delete addr; - ATH_CHECK(drainAllSlots()); - return StatusCode::FAILURE; + return returnFailureAndStopEventLoop(); } // The output has been sent out, the ByteStreamAddress can be deleted @@ -1085,6 +1018,13 @@ StatusCode HltEventLoopMgr::failedEvent(HLT::OnlineErrorCode errorCode, const Ev if (clearWBSlot(eventContext.slot()).isFailure()) return failedEvent(HLT::OnlineErrorCode::AFTER_RESULT_SENT,eventContextCopy); + // Only now after store clearing we can allow the slot to be filled again, + // so we increment m_freeSlots and notify the input thread + ++m_freeSlots; + if (!m_loopStatus.loopEnded && m_inputThread!=nullptr) { + m_inputThread->cond().notify_all(); + } + //---------------------------------------------------------------------------- // Finish handling the failed event //---------------------------------------------------------------------------- @@ -1096,9 +1036,8 @@ StatusCode HltEventLoopMgr::failedEvent(HLT::OnlineErrorCode errorCode, const Ev << " was successfully handled, but the number of tolerable framework errors for this HltEventLoopMgr instance," << " which is " << m_maxFrameworkErrors.value() << ", was exceeded. Current local event number is " << eventContextCopy.evt() << ", slot " << eventContextCopy.slot() - << ". All slots of this HltEventLoopMgr instance will be drained and the loop will exit."); - ATH_CHECK(drainAllSlots()); - return StatusCode::FAILURE; + << ". The event loop will exit after all ongoing processing is finished."); + return returnFailureAndStopEventLoop(); } } @@ -1107,33 +1046,29 @@ StatusCode HltEventLoopMgr::failedEvent(HLT::OnlineErrorCode errorCode, const Ev << " Current local event number is " << eventContextCopy.evt() << ", slot " << eventContextCopy.slot()); ATH_MSG_VERBOSE("end of " << __FUNCTION__); - return StatusCode::SUCCESS; // continue the event loop + return StatusCode::SUCCESS; // error handling succeeded, event loop may continue } // ============================================================================= -void HltEventLoopMgr::runEventTimer() +void HltEventLoopMgr::eventTimerCallback() { ATH_MSG_VERBOSE("start of " << __FUNCTION__); - std::unique_lock<std::mutex> lock(m_timeoutMutex); - while (m_runEventTimer) { - m_timeoutCond.wait_for(lock,std::chrono::seconds(1)); - auto now=std::chrono::steady_clock::now(); - for (size_t i=0; i<m_eventTimerStartPoint.size(); ++i) { - // iterate over all slots and check for timeout - if (!m_isSlotProcessing.at(i)) continue; - if (now > m_eventTimerStartPoint.at(i) + m_softTimeoutValue) { - EventContext ctx(0,i); // we only need the slot number for Athena::Timeout instance - // don't duplicate the actions if the timeout was already reached - if (!Athena::Timeout::instance(ctx).reached()) { - ATH_MSG_ERROR("Soft timeout in slot " << i << ". Processing time exceeded the limit of " << m_softTimeoutValue.count() << " ms"); - setTimeout(Athena::Timeout::instance(ctx)); - // Generate stack trace and scheduler dump only once, on the first timeout - if (m_traceOnTimeout.value() && !m_timeoutTraceGenerated) { - m_schedulerSvc->dumpState(); - ATH_MSG_INFO("Generating stack trace due to the soft timeout"); - m_timeoutTraceGenerated = true; - gSystem->StackTrace(); - } + auto now=std::chrono::steady_clock::now(); + for (size_t i=0; i<m_eventTimerStartPoint.size(); ++i) { + // iterate over all slots and check for timeout + if (!m_isSlotProcessing.at(i)) continue; + if (now > m_eventTimerStartPoint.at(i) + m_softTimeoutValue) { + EventContext ctx(0,i); // we only need the slot number for Athena::Timeout instance + // don't duplicate the actions if the timeout was already reached + if (!Athena::Timeout::instance(ctx).reached()) { + ATH_MSG_ERROR("Soft timeout in slot " << i << ". Processing time exceeded the limit of " << m_softTimeoutValue.count() << " ms"); + setTimeout(Athena::Timeout::instance(ctx)); + // Generate stack trace and scheduler dump only once, on the first timeout + if (m_traceOnTimeout.value() && !m_timeoutTraceGenerated) { + m_schedulerSvc->dumpState(); + ATH_MSG_INFO("Generating stack trace due to the soft timeout"); + m_timeoutTraceGenerated = true; + gSystem->StackTrace(); } } } @@ -1145,16 +1080,203 @@ void HltEventLoopMgr::runEventTimer() void HltEventLoopMgr::resetEventTimer(const EventContext& eventContext, bool processing) { if (!eventContext.valid()) {return;} { - std::unique_lock<std::mutex> lock(m_timeoutMutex); + std::unique_lock<std::mutex> lock(m_timeoutThread->mutex()); m_eventTimerStartPoint[eventContext.slot()] = std::chrono::steady_clock::now(); m_isSlotProcessing[eventContext.slot()] = processing; resetTimeout(Athena::Timeout::instance(eventContext)); } - m_timeoutCond.notify_all(); + m_timeoutThread->cond().notify_all(); } // ============================================================================= -StatusCode HltEventLoopMgr::startNextEvent(EventLoopStatus& loopStatus) +StatusCode HltEventLoopMgr::clearWBSlot(size_t evtSlot) const +{ + ATH_MSG_VERBOSE("start of " << __FUNCTION__); + auto monTime = Monitored::Timer<std::chrono::duration<float, std::milli>>("TIME_clearStore"); + StatusCode sc = m_whiteboard->clearStore(evtSlot); + Monitored::Group(m_monTool, monTime); + if( !sc.isSuccess() ) { + ATH_MSG_WARNING("Clear of event data store failed"); + } + ATH_MSG_VERBOSE("end of " << __FUNCTION__ << ", returning m_whiteboard->freeStore(evtSlot=" << evtSlot << ")"); + return m_whiteboard->freeStore(evtSlot); +} + +// ============================================================================= +void HltEventLoopMgr::inputThreadCallback() { + ATH_MSG_VERBOSE("start of " << __FUNCTION__); + if (m_loopStatus.loopEnded) { + ATH_MSG_VERBOSE("Event loop ended, stopping the input thread and returning from " << __FUNCTION__); + m_inputThread->stop(); + // Notify output thread which may be still waiting for events + m_outputThread->cond().notify_all(); + return; + } + + // Early exit conditions + if (!m_loopStatus.eventsAvailable) { + ATH_MSG_VERBOSE("No more events, flagging the event loop as finished, stopping the input thread" + << " and returning from " << __FUNCTION__); + m_inputThread->stop(); + // Notify output thread which may be still waiting for events + m_outputThread->cond().notify_all(); + return; + } + const size_t numSlotsToFill = m_freeSlots.load(); + if (numSlotsToFill==0) { + ATH_MSG_VERBOSE("No free slots, returning from " << __FUNCTION__); + return; + } + m_freeSlots -= numSlotsToFill; + + // Read in and start processing another event + ATH_MSG_DEBUG("Free slots = " << numSlotsToFill << ". Reading new event(s) to fill the slot(s)."); + + // Fill all free slots with new events + for (size_t i=0; i<numSlotsToFill; ++i) { + auto task = [mgr=this](){ + StatusCode sc = StatusCode::SUCCESS; + try { + sc = mgr->startNextEvent(); + } + catch (const std::exception& e) { + mgr->error() << "Exception caught in startNextEvent: " << e.what() << endmsg; + sc = StatusCode::FAILURE; + } + catch (...) { + mgr->error() << "Exception caught in startNextEvent" << endmsg; + sc = StatusCode::FAILURE; + } + if (sc.isFailure()) { + mgr->error() << "startNextEvent failed, stopping the event loop" << endmsg; + mgr->m_loopStatus.exitCode = StatusCode::FAILURE; + mgr->m_loopStatus.eventsAvailable = false; + return; + } + // Pop one item from parallel I/O queue to decrement its size - it doesn't matter which item + // is popped, we only use the queue size to limit the number of tasks running in parallel + bool popIOQueue{false}; + mgr->m_parallelIOQueue.pop(popIOQueue); + }; + + // Push one item to the parallel I/O queue to increment its size - the value doesn't matter, + // we only use the queue size and benefit from the blocking push call here to limit the number + // of tasks running in parallel. Once we can push to the queue, we can schedule the task. + m_parallelIOQueue.push(true); + m_parallelIOTaskGroup.run(std::move(task)); + } + ATH_MSG_VERBOSE("end of " << __FUNCTION__); +} + +// ============================================================================= +void HltEventLoopMgr::outputThreadCallback() { + ATH_MSG_VERBOSE("start of " << __FUNCTION__); + const size_t nslots = m_isSlotProcessing.size(); // size is fixed in hltUpdateAfterFork after configuring scheduler + if (m_schedulerSvc->freeSlots() == nslots) { + if (m_loopStatus.eventsAvailable) { + ATH_MSG_DEBUG("There are currently no events being processed by the Scheduler, returning from " << __FUNCTION__); + } else { + ATH_MSG_DEBUG("No more events to process and scheduler is empty, stopping the event loop and output thread"); + if (!m_loopStatus.loopEnded && m_outputThread!=nullptr) { + m_outputThread->stop(); + } + // Notify input thread which may be still waiting for free slots + if (!m_loopStatus.loopEnded && m_inputThread!=nullptr) { + m_inputThread->cond().notify_all(); + } + // Notify the main thread that the loop ended - this is the only place able to do this! + m_loopStatus.loopEnded = true; + m_loopStatus.loopEndedCond.notify_all(); + } + return; + } + + //---------------------------------------------------------------------------- + // Pop events from the Scheduler + //---------------------------------------------------------------------------- + std::vector<EventContext*> finishedEvtContexts; + EventContext* finishedEvtContext(nullptr); + const auto popStartTime = std::chrono::steady_clock::now(); + + // Pop one event from the scheduler (blocking call) + ATH_MSG_DEBUG("Waiting for a finished event from the Scheduler"); + if (m_schedulerSvc->popFinishedEvent(finishedEvtContext).isFailure()) { + failedEvent(HLT::OnlineErrorCode::SCHEDULER_POP_FAILURE, EventContext()).ignore(); + delete finishedEvtContext; + return; + } + ATH_MSG_DEBUG("Scheduler returned a finished event: " << finishedEvtContext); + finishedEvtContexts.push_back(finishedEvtContext); + + // See if more events are available (non-blocking call) + while (m_schedulerSvc->tryPopFinishedEvent(finishedEvtContext).isSuccess()){ + ATH_MSG_DEBUG("Scheduler returned a finished event: " << *finishedEvtContext); + finishedEvtContexts.push_back(finishedEvtContext); + } + const auto popSpentTime = std::chrono::steady_clock::now() - popStartTime; + const auto popSpentTimeMs = std::chrono::duration_cast<std::chrono::milliseconds>(popSpentTime).count(); + Monitored::Scalar<int64_t> monPopSchedulerTime{"PopSchedulerTime", popSpentTimeMs}; + Monitored::Scalar<size_t> monPopSchedulerNumEvt{"PopSchedulerNumEvt", finishedEvtContexts.size()}; + Monitored::Group{m_monTool, monPopSchedulerNumEvt, monPopSchedulerTime}; + + //---------------------------------------------------------------------------- + // Post-process the finished events + //---------------------------------------------------------------------------- + const size_t nFinishedEvents = finishedEvtContexts.size(); + ATH_MSG_DEBUG("Number of finished events to post-process: " << nFinishedEvents); + + // Push all post-processing tasks to TBB + for (EventContext* thisFinishedEvtContext : finishedEvtContexts) { + // Reset free slot timer for monitoring + if (thisFinishedEvtContext != nullptr) { + m_freeSlotStartPoint[thisFinishedEvtContext->slot()] = std::chrono::steady_clock::now(); + } + + // Create and enqueue the task + m_finishedEventsQueue.push(thisFinishedEvtContext); + auto task = [mgr=this](){ + StatusCode sc = StatusCode::SUCCESS; + try { + sc = mgr->processFinishedEvent(); + } + catch (const std::exception& e) { + mgr->error() << "Exception caught in processFinishedEvent: " << e.what() << endmsg; + sc = StatusCode::FAILURE; + } + catch (...) { + mgr->error() << "Exception caught in processFinishedEvent" << endmsg; + sc = StatusCode::FAILURE; + } + + if (sc.isFailure()) { + mgr->error() << "processFinishedEvent failed, stopping the event loop" << endmsg; + mgr->m_loopStatus.exitCode = StatusCode::FAILURE; + mgr->m_loopStatus.eventsAvailable = false; + } + + // Pop one item from parallel I/O queue to decrement its size - it doesn't matter which item + // is popped, we only use the queue size to limit the number of tasks running in parallel + bool popIOQueue{false}; + mgr->m_parallelIOQueue.pop(popIOQueue); + + // Wake up the output thread if it's sleeping - this prevents a deadlock after the last event + // when input thread already finished and is no longer waking up the output thread. Spurious wake-ups + // during the event loop from this notification should have negligible effect on CPU load. + mgr->m_outputThread->cond().notify_one(); + }; + + // Push one item to the parallel I/O queue to increment its size - the value doesn't matter, + // we only use the queue size and benefit from the blocking push call here to limit the number + // of tasks running in parallel. Once we can push to the queue, we can schedule the task. + m_parallelIOQueue.push(true); + m_parallelIOTaskGroup.run(std::move(task)); + } + + ATH_MSG_VERBOSE("end of " << __FUNCTION__); +} + +// ============================================================================= +StatusCode HltEventLoopMgr::startNextEvent() { StatusCode sc = StatusCode::SUCCESS; auto check = [this, &sc](std::string&& errmsg, HLT::OnlineErrorCode errcode, const EventContext& eventContext) { @@ -1224,25 +1346,29 @@ StatusCode HltEventLoopMgr::startNextEvent(EventLoopStatus& loopStatus) // Get the next event //------------------------------------------------------------------------ try { - sc = m_evtSelector->next(*m_evtSelContext); + bool noEventsTemporarily{false}; + do { + try { + noEventsTemporarily = false; + sc = m_evtSelector->next(*m_evtSelContext); + } catch (const hltonl::Exception::NoEventsTemporarily& e) { + ATH_MSG_DEBUG("No new input events available temporarily, requesting again"); + noEventsTemporarily = true; + } + } while (noEventsTemporarily); } catch (const hltonl::Exception::NoMoreEvents& e) { sc = StatusCode::SUCCESS; - loopStatus.eventsAvailable = false; + m_loopStatus.eventsAvailable = false; sc = clearWBSlot(eventContext->slot()); if (sc.isFailure()) { ATH_MSG_WARNING("Failed to clear the whiteboard slot " << eventContext->slot() << " after NoMoreEvents detected"); } - return StatusCode::SUCCESS; - } - catch (const hltonl::Exception::NoEventsTemporarily& e) { - sc = StatusCode::SUCCESS; - loopStatus.triggerOnHold = true; - sc = clearWBSlot(eventContext->slot()); - if (sc.isFailure()) { - ATH_MSG_WARNING("Failed to clear the whiteboard slot " << eventContext->slot() - << " after NoEventsTemporarily detected"); + // Increment m_freeSlots after clearing the store and notify the input thread + ++m_freeSlots; + if (!m_loopStatus.loopEnded && m_inputThread!=nullptr) { + m_inputThread->cond().notify_all(); } return StatusCode::SUCCESS; } @@ -1318,32 +1444,32 @@ StatusCode HltEventLoopMgr::startNextEvent(EventLoopStatus& loopStatus) EventIDBase::number_type oldMaxLB{0}, newMaxLB{0}; bool updatedLB{false}; do { - oldMaxLB = loopStatus.maxLB.load(); + oldMaxLB = m_loopStatus.maxLB.load(); newMaxLB = std::max(oldMaxLB, eventContext->eventID().lumi_block()); updatedLB = newMaxLB > oldMaxLB; - } while (updatedLB && !loopStatus.maxLB.compare_exchange_strong(oldMaxLB, newMaxLB)); - loopStatus.maxLB.compare_exchange_strong(oldMaxLB, newMaxLB); + } while (updatedLB && !m_loopStatus.maxLB.compare_exchange_strong(oldMaxLB, newMaxLB)); + m_loopStatus.maxLB.compare_exchange_strong(oldMaxLB, newMaxLB); // Wait in case a COOL update is ongoing to avoid executeEvent // reading conditions data while they are being updated. { - std::unique_lock<std::mutex> lock(loopStatus.coolUpdateMutex); - loopStatus.coolUpdateCond.wait(lock, [&]{return !loopStatus.coolUpdateOngoing;}); + std::unique_lock<std::mutex> lock(m_loopStatus.coolUpdateMutex); + m_loopStatus.coolUpdateCond.wait(lock, [&]{return !m_loopStatus.coolUpdateOngoing;}); } // Do COOL updates (if needed) and notify other threads about it if (updatedLB) { { - std::lock_guard<std::mutex> lock(loopStatus.coolUpdateMutex); - loopStatus.coolUpdateOngoing = true; + std::lock_guard<std::mutex> lock(m_loopStatus.coolUpdateMutex); + m_loopStatus.coolUpdateOngoing = true; sc = m_coolHelper->hltCoolUpdate(*eventContext); if (check("Failure during COOL update", HLT::OnlineErrorCode::COOL_UPDATE, *eventContext)) { - loopStatus.coolUpdateOngoing = false; + m_loopStatus.coolUpdateOngoing = false; return sc; } - loopStatus.coolUpdateOngoing = false; + m_loopStatus.coolUpdateOngoing = false; } - loopStatus.coolUpdateCond.notify_all(); + m_loopStatus.coolUpdateCond.notify_all(); } //------------------------------------------------------------------------ @@ -1357,6 +1483,8 @@ StatusCode HltEventLoopMgr::startNextEvent(EventLoopStatus& loopStatus) HLT::OnlineErrorCode::SCHEDULING_FAILURE, *eventContext)) { return sc; } + // Notify the output thread to start waiting for a finished event + m_outputThread->cond().notify_one(); //------------------------------------------------------------------------ // Set ThreadLocalContext to an invalid context @@ -1368,150 +1496,20 @@ StatusCode HltEventLoopMgr::startNextEvent(EventLoopStatus& loopStatus) } // ============================================================================= -/** - * @brief Retrieves finished events from the scheduler, processes their output and cleans up the slots - * @return SUCCESS if at least one event was finished, NO_EVENT if there were no finished events before timeout, - * SCHEDULER_EMPTY if there are no events being processed, RECOVERABLE if there was an error which was handled - * correctly, FAILURE if the error should break the event loop - **/ -HltEventLoopMgr::DrainSchedulerStatusCode HltEventLoopMgr::drainScheduler() -{ - ATH_MSG_VERBOSE("start of " << __FUNCTION__); - - //---------------------------------------------------------------------------- - // Pop events from the Scheduler - //---------------------------------------------------------------------------- - std::vector<EventContext*> finishedEvtContexts; - EventContext* finishedEvtContext(nullptr); - const size_t nslots = m_isSlotProcessing.size(); // size is fixed in hltUpdateAfterFork after configuring scheduler - - ATH_MSG_DEBUG("Waiting for a finished event from the Scheduler"); - const auto popStartTime = std::chrono::steady_clock::now(); - auto popSpentTimeMs = [&popStartTime]() { - const auto popSpentTime = std::chrono::steady_clock::now() - popStartTime; - const auto popSpentTimeMs = std::chrono::duration_cast<std::chrono::milliseconds>(popSpentTime); - return popSpentTimeMs.count(); - }; - auto monPopScheduler = [](const ToolHandle<GenericMonitoringTool>& monTool, const size_t nEvents, const int64_t popTime) { - Monitored::Scalar<int64_t> monPopSchedulerNumEvt("PopSchedulerNumEvt", nEvents); - Monitored::Scalar<int64_t> monPopSchedulerTime("PopSchedulerTime", popTime); - Monitored::Group(monTool, monPopSchedulerNumEvt, monPopSchedulerTime); - }; - while (true) { - if (m_schedulerSvc->tryPopFinishedEvent(finishedEvtContext).isSuccess()) { - ATH_MSG_DEBUG("Scheduler returned a finished event: " << finishedEvtContext); - finishedEvtContexts.push_back(finishedEvtContext); - if (!m_popAll) { - // Already got one event and we don't want more in PopAll=False mode - break; - } - } else { - // Got no new finished event in the last query - if (!finishedEvtContexts.empty()) { - // Already got some finished events and there aren't any more available - break; - } - if (m_schedulerSvc->freeSlots() == nslots) { - // There are no events being processed by the Scheduler - ATH_MSG_DEBUG("Scheduler empty"); - monPopScheduler(m_monTool, finishedEvtContexts.size(), popSpentTimeMs()); - return DrainSchedulerStatusCode::SCHEDULER_EMPTY; - } - const auto popTime = popSpentTimeMs(); - if (popTime > m_popFromSchedulerTimeout.value()) { - // Got no finished events and ran past the timeout - ATH_MSG_DEBUG("PopFromSchedulerTimeout reached, drainScheduler() returns NO_EVENT"); - monPopScheduler(m_monTool, finishedEvtContexts.size(), popTime); - return DrainSchedulerStatusCode::NO_EVENT; - } - std::this_thread::sleep_for(std::chrono::milliseconds(m_popFromSchedulerQueryInterval.value())); - } - } - monPopScheduler(m_monTool, finishedEvtContexts.size(), popSpentTimeMs()); - - //---------------------------------------------------------------------------- - // Post-process the finished events - //---------------------------------------------------------------------------- - const size_t nFinishedEvents = finishedEvtContexts.size(); - ATH_MSG_DEBUG("Number of finished events to post-process: " << nFinishedEvents); - - // Push all post-processing tasks to TBB - for (EventContext* thisFinishedEvtContext : finishedEvtContexts) { - // Reset free slot timer for monitoring - if (thisFinishedEvtContext != nullptr) { - m_freeSlotStartPoint[thisFinishedEvtContext->slot()] = std::chrono::steady_clock::now(); - } - - // Create and enqueue the task - m_finishedEventsQueue.push(thisFinishedEvtContext); - auto task = [mgr=this](){ - DrainSchedulerStatusCode sc = DrainSchedulerStatusCode::INVALID; - try { - sc = mgr->processFinishedEvent(); - } - catch (const std::exception& e) { - mgr->error() << "Exception caught in processFinishedEvent: " << e.what() << endmsg; - sc = DrainSchedulerStatusCode::FAILURE; - } - catch (...) { - mgr->error() << "Exception caught in processFinishedEvent" << endmsg; - sc = DrainSchedulerStatusCode::FAILURE; - } - mgr->m_drainSchedulerStatusQueue.push(sc); - - // Pop one item from parallel I/O queue to decrement its size - it doesn't matter which item - // is popped, we only use the queue size to limit the number of tasks running in parallel - bool popIOQueue{false}; - mgr->m_parallelIOQueue.pop(popIOQueue); - }; - - // Push one item to the parallel I/O queue to increment its size - the value doesn't matter, - // we only use the queue size and benefit from the blocking push call here to limit the number - // of tasks running in parallel. Once we can push to the queue, we can enqueue the task to the arena. - m_parallelIOQueue.push(true); - m_parallelIOTaskArena->enqueue(std::move(task)); - } - - // Wait until post-processing is done for all events - std::vector<DrainSchedulerStatusCode> statusCodes; - while (statusCodes.size() < nFinishedEvents) { - DrainSchedulerStatusCode sc = DrainSchedulerStatusCode::INVALID; - m_drainSchedulerStatusQueue.pop(sc); - if (sc == DrainSchedulerStatusCode::INVALID) { - ATH_MSG_ERROR("DrainSchedulerStatusCode::INVALID popped from the queue"); - continue; - } - statusCodes.push_back(sc); - } - - // Check the post-processing status code for all events - DrainSchedulerStatusCode worstCode = DrainSchedulerStatusCode::SUCCESS; - for (DrainSchedulerStatusCode sc : statusCodes) { - if (static_cast<int>(sc) < static_cast<int>(worstCode)) { - worstCode = sc; - } - } - - ATH_MSG_VERBOSE("end of " << __FUNCTION__); - return worstCode; -} - -// ============================================================================= -HltEventLoopMgr::DrainSchedulerStatusCode HltEventLoopMgr::processFinishedEvent() +StatusCode HltEventLoopMgr::processFinishedEvent() { EventContext* eventContext{nullptr}; m_finishedEventsQueue.pop(eventContext); StatusCode sc = StatusCode::SUCCESS; - DrainSchedulerStatusCode rc = DrainSchedulerStatusCode::SUCCESS; auto check = [this, &sc, &eventContext](std::string&& errmsg, HLT::OnlineErrorCode errcode) { - if (sc.isSuccess()) {return DrainSchedulerStatusCode::SUCCESS;} + if (sc.isSuccess()) {return false;} ATH_MSG_ERROR(errmsg); const EventContext& eventContextRef = (eventContext==nullptr) ? EventContext() : *eventContext; sc = failedEvent(errcode, eventContextRef); Gaudi::Hive::setCurrentContext(EventContext()); delete eventContext; - return (sc.isSuccess() ? DrainSchedulerStatusCode::RECOVERABLE : DrainSchedulerStatusCode::FAILURE); + return true; }; //-------------------------------------------------------------------------- @@ -1520,8 +1518,10 @@ HltEventLoopMgr::DrainSchedulerStatusCode HltEventLoopMgr::processFinishedEvent( // Check if the EventContext object exists if (eventContext == nullptr) { sc = StatusCode::FAILURE; - return check("Detected nullptr EventContext while finalising a processed event", - HLT::OnlineErrorCode::CANNOT_ACCESS_SLOT); + if (check("Detected nullptr EventContext while finalising a processed event", + HLT::OnlineErrorCode::CANNOT_ACCESS_SLOT)) { + return sc; + } } // Set ThreadLocalContext to the currently processed finished context @@ -1531,19 +1531,21 @@ HltEventLoopMgr::DrainSchedulerStatusCode HltEventLoopMgr::processFinishedEvent( if (m_aess->eventStatus(*eventContext) != EventStatus::Success) { sc = StatusCode::FAILURE; auto algErrors = m_errorMonTool->algExecErrors(*eventContext); - HLT::OnlineErrorCode errCode = isTimedOut(algErrors) ? - HLT::OnlineErrorCode::TIMEOUT : HLT::OnlineErrorCode::PROCESSING_FAILURE; - rc = check("Processing event with context " + toString(*eventContext) + \ - " failed with status " + toString(m_aess->eventStatus(*eventContext)), - errCode); - if (rc != DrainSchedulerStatusCode::SUCCESS) {return rc;} + const HLT::OnlineErrorCode errCode = isTimedOut(algErrors) ? + HLT::OnlineErrorCode::TIMEOUT : HLT::OnlineErrorCode::PROCESSING_FAILURE; + if (check("Processing event with context " + toString(*eventContext) + \ + " failed with status " + toString(m_aess->eventStatus(*eventContext)), + errCode)) { + return sc; + } } // Select the whiteboard slot sc = m_whiteboard->selectStore(eventContext->slot()); - rc = check("Failed to select event store slot " + std::to_string(eventContext->slot()), - HLT::OnlineErrorCode::CANNOT_ACCESS_SLOT); - if (rc != DrainSchedulerStatusCode::SUCCESS) {return rc;}; + if (check("Failed to select event store slot " + std::to_string(eventContext->slot()), + HLT::OnlineErrorCode::CANNOT_ACCESS_SLOT)) { + return sc; + } // Fire EndProcessing incident - some services may depend on this m_incidentSvc->fireIncident(Incident(name(), IncidentType::EndProcessing, *eventContext)); @@ -1553,36 +1555,30 @@ HltEventLoopMgr::DrainSchedulerStatusCode HltEventLoopMgr::processFinishedEvent( //-------------------------------------------------------------------------- // Call the result builder to record HLTResultMT in SG sc = m_hltResultMaker->makeResult(*eventContext); - rc = check("Failed to create the HLT result object", HLT::OnlineErrorCode::NO_HLT_RESULT); - if (rc != DrainSchedulerStatusCode::SUCCESS) {return rc;} + if (check("Failed to create the HLT result object", HLT::OnlineErrorCode::NO_HLT_RESULT)) {return sc;} // Connect output (create the output container) - the argument is currently not used sc = m_outputCnvSvc->connectOutput(""); - rc = check("Conversion service failed to connectOutput", HLT::OnlineErrorCode::OUTPUT_BUILD_FAILURE); - if (rc != DrainSchedulerStatusCode::SUCCESS) {return rc;} + if (check("Conversion service failed to connectOutput", HLT::OnlineErrorCode::OUTPUT_BUILD_FAILURE)) {return sc;} // Retrieve the HLT result and the corresponding DataObject auto hltResult = SG::makeHandle(m_hltResultRHKey,*eventContext); if (!hltResult.isValid()) {sc = StatusCode::FAILURE;} - rc = check("Failed to retrieve the HLT result", HLT::OnlineErrorCode::NO_HLT_RESULT); - if (rc != DrainSchedulerStatusCode::SUCCESS) {return rc;} + if (check("Failed to retrieve the HLT result", HLT::OnlineErrorCode::NO_HLT_RESULT)) {return sc;} DataObject* hltResultDO = m_evtStore->accessData(hltResult.clid(),hltResult.key()); if (hltResultDO == nullptr) {sc = StatusCode::FAILURE;} - rc = check("Failed to retrieve the HLTResult DataObject", HLT::OnlineErrorCode::NO_HLT_RESULT); - if (rc != DrainSchedulerStatusCode::SUCCESS) {return rc;} + if (check("Failed to retrieve the HLTResult DataObject", HLT::OnlineErrorCode::NO_HLT_RESULT)) {return sc;} // Check for result truncation if (!hltResult->getTruncatedModuleIds().empty() && hltResult->severeTruncation()) {sc = StatusCode::FAILURE;} - rc = check("HLT result truncation", HLT::OnlineErrorCode::RESULT_TRUNCATION); - if (rc != DrainSchedulerStatusCode::SUCCESS) {return rc;} + if (check("HLT result truncation", HLT::OnlineErrorCode::RESULT_TRUNCATION)) {return sc;} // Convert the HLT result to the output data format IOpaqueAddress* addr = nullptr; sc = m_outputCnvSvc->createRep(hltResultDO,addr); if (sc.isFailure()) {delete addr;} - rc = check("Conversion service failed to convert HLTResult", HLT::OnlineErrorCode::OUTPUT_BUILD_FAILURE); - if (rc != DrainSchedulerStatusCode::SUCCESS) {return rc;} + if (check("Conversion service failed to convert HLTResult", HLT::OnlineErrorCode::OUTPUT_BUILD_FAILURE)) {return sc;} // Retrieve and convert the L1 result to the output data format IOpaqueAddress* l1addr = nullptr; @@ -1592,41 +1588,47 @@ HltEventLoopMgr::DrainSchedulerStatusCode HltEventLoopMgr::processFinishedEvent( if (not m_l1TriggerResultRHKey.empty()) { auto l1TriggerResult = SG::makeHandle(m_l1TriggerResultRHKey, *eventContext); if (!l1TriggerResult.isValid()) {sc = StatusCode::FAILURE;} - rc = check("Failed to retrieve the L1 Trigger Result for RewriteLVL1", - HLT::OnlineErrorCode::OUTPUT_BUILD_FAILURE); - if (rc != DrainSchedulerStatusCode::SUCCESS) {return rc;} + if (check("Failed to retrieve the L1 Trigger Result for RewriteLVL1", + HLT::OnlineErrorCode::OUTPUT_BUILD_FAILURE)) { + return sc; + } DataObject* l1TriggerResultDO = m_evtStore->accessData(l1TriggerResult.clid(),l1TriggerResult.key()); if (l1TriggerResultDO == nullptr) {sc = StatusCode::FAILURE;} - rc = check("Failed to retrieve the L1 Trigger Result DataObject for RewriteLVL1", - HLT::OnlineErrorCode::OUTPUT_BUILD_FAILURE); - if (rc != DrainSchedulerStatusCode::SUCCESS) {return rc;} + if (check("Failed to retrieve the L1 Trigger Result DataObject for RewriteLVL1", + HLT::OnlineErrorCode::OUTPUT_BUILD_FAILURE)) { + return sc; + } sc = m_outputCnvSvc->createRep(l1TriggerResultDO,l1addr); if (sc.isFailure()) {delete l1addr;} - rc = check("Conversion service failed to convert L1 Trigger Result for RewriteLVL1", - HLT::OnlineErrorCode::OUTPUT_BUILD_FAILURE); - if (rc != DrainSchedulerStatusCode::SUCCESS) {return rc;} + if (check("Conversion service failed to convert L1 Trigger Result for RewriteLVL1", + HLT::OnlineErrorCode::OUTPUT_BUILD_FAILURE)) { + return sc; + } } // Legacy (Run-2) L1 simulation result if (not m_roibResultRHKey.empty()) { auto roibResult = SG::makeHandle(m_roibResultRHKey, *eventContext); if (!roibResult.isValid()) {sc = StatusCode::FAILURE;} - rc = check("Failed to retrieve the RoIBResult for RewriteLVL1", - HLT::OnlineErrorCode::OUTPUT_BUILD_FAILURE); - if (rc != DrainSchedulerStatusCode::SUCCESS) {return rc;} + if (check("Failed to retrieve the RoIBResult for RewriteLVL1", + HLT::OnlineErrorCode::OUTPUT_BUILD_FAILURE)) { + return sc; + } DataObject* roibResultDO = m_evtStore->accessData(roibResult.clid(),roibResult.key()); if (roibResultDO == nullptr) {sc = StatusCode::FAILURE;} - rc = check("Failed to retrieve the RoIBResult DataObject for RewriteLVL1", - HLT::OnlineErrorCode::OUTPUT_BUILD_FAILURE); - if (rc != DrainSchedulerStatusCode::SUCCESS) {return rc;} + if (check("Failed to retrieve the RoIBResult DataObject for RewriteLVL1", + HLT::OnlineErrorCode::OUTPUT_BUILD_FAILURE)) { + return sc; + } sc = m_outputCnvSvc->createRep(roibResultDO,l1addrLegacy); if (sc.isFailure()) {delete l1addrLegacy;} - rc = check("Conversion service failed to convert RoIBResult for RewriteLVL1", - HLT::OnlineErrorCode::OUTPUT_BUILD_FAILURE); - if (rc != DrainSchedulerStatusCode::SUCCESS) {return rc;} + if (check("Conversion service failed to convert RoIBResult for RewriteLVL1", + HLT::OnlineErrorCode::OUTPUT_BUILD_FAILURE)) { + return sc; + } } } @@ -1638,8 +1640,7 @@ HltEventLoopMgr::DrainSchedulerStatusCode HltEventLoopMgr::processFinishedEvent( // Commit output (write/send the output data) - the arguments are currently not used sc = m_outputCnvSvc->commitOutput("",true); if (sc.isFailure()) {delete addr;} - rc = check("Conversion service failed to commitOutput", HLT::OnlineErrorCode::OUTPUT_SEND_FAILURE); - if (rc != DrainSchedulerStatusCode::SUCCESS) {return rc;} + if (check("Conversion service failed to commitOutput", HLT::OnlineErrorCode::OUTPUT_SEND_FAILURE)) {return sc;} // The output has been sent out, the ByteStreamAddress can be deleted delete addr; @@ -1658,14 +1659,22 @@ HltEventLoopMgr::DrainSchedulerStatusCode HltEventLoopMgr::processFinishedEvent( << " (event " << eventContext->evt() << ") of the whiteboard"); sc = clearWBSlot(eventContext->slot()); - rc = check("Whiteboard slot " + std::to_string(eventContext->slot()) + " could not be properly cleared", - HLT::OnlineErrorCode::AFTER_RESULT_SENT); - if (rc != DrainSchedulerStatusCode::SUCCESS) {return rc;} + if (check("Whiteboard slot " + std::to_string(eventContext->slot()) + " could not be properly cleared", + HLT::OnlineErrorCode::AFTER_RESULT_SENT)) { + return sc; + } ATH_MSG_DEBUG("Finished processing " << (eventAccepted ? "accepted" : "rejected") << " event with context " << *eventContext << " which took " << eventTimeMillisec << " ms"); + // Only now after store clearing we can allow the slot to be filled again, + // so we increment m_freeSlots and notify the input thread + ++m_freeSlots; + if (!m_loopStatus.loopEnded && m_inputThread!=nullptr) { + m_inputThread->cond().notify_all(); + } + // Fill the time monitoring histograms auto monTimeAny = Monitored::Scalar<int64_t>("TotalTime", eventTimeMillisec); auto monTimeAcc = Monitored::Scalar<int64_t>(eventAccepted ? "TotalTimeAccepted" : "TotalTimeRejected", eventTimeMillisec); @@ -1677,79 +1686,5 @@ HltEventLoopMgr::DrainSchedulerStatusCode HltEventLoopMgr::processFinishedEvent( // Delete the EventContext which was created when calling executeEvent( EventContext(*eventContext) ) delete eventContext; - return rc; -} - -// ============================================================================= -StatusCode HltEventLoopMgr::clearWBSlot(size_t evtSlot) const -{ - ATH_MSG_VERBOSE("start of " << __FUNCTION__); - auto monTime = Monitored::Timer<std::chrono::duration<float, std::milli>>("TIME_clearStore"); - StatusCode sc = m_whiteboard->clearStore(evtSlot); - Monitored::Group(m_monTool, monTime); - if( !sc.isSuccess() ) { - ATH_MSG_WARNING("Clear of event data store failed"); - } - ATH_MSG_VERBOSE("end of " << __FUNCTION__ << ", returning m_whiteboard->freeStore(evtSlot=" << evtSlot << ")"); - return m_whiteboard->freeStore(evtSlot); -} - -// ============================================================================= -StatusCode HltEventLoopMgr::recoverFromStarvation() -{ - auto freeSlotsScheduler = m_schedulerSvc->freeSlots(); - auto freeSlotsWhiteboard = m_whiteboard->freeSlots(); - if (freeSlotsScheduler == freeSlotsWhiteboard) { - ATH_MSG_WARNING("Starvation recovery was requested but not needed, so it was not attempted. " - << "This method should not have been called."); - return StatusCode::SUCCESS; - } - - if (drainAllSlots().isFailure()) { - ATH_MSG_ERROR("Starvation recovery failed. Scheduler saw " << freeSlotsScheduler << " free slots," - << " whereas whiteboard saw " << freeSlotsWhiteboard << " free slots. Total number of slots is " - << m_isSlotProcessing.size() << ". Now scheduler sees " << m_schedulerSvc->freeSlots() - << " free slots, whereas whiteboard sees " << m_whiteboard->freeSlots() << " free slots"); - return StatusCode::FAILURE; - } - else { - ATH_MSG_WARNING("Starvation detected, but successfully recovered. Scheduler saw " << freeSlotsScheduler - << " free slots, whereas whiteboard saw " << freeSlotsWhiteboard << " free slots. All slots have been cleared," - << " now scheduler sees " << m_schedulerSvc->freeSlots() << " free slots and whiteboard sees " - << m_whiteboard->freeSlots() << " free slots"); - return StatusCode::SUCCESS; - } -} - -// ============================================================================= -StatusCode HltEventLoopMgr::drainAllSlots() -{ - size_t nslots = m_isSlotProcessing.size(); // size is fixed in hltUpdateAfterFork after configuring scheduler - - // First try to drain the scheduler to free all processing slots - DrainSchedulerStatusCode drainResult = DrainSchedulerStatusCode::SUCCESS; - do { - drainResult = drainScheduler(); - // fail on recoverable, because it means an error while handling an error - // (drainAllSlots is a "clean up on failure" method) - if (drainResult == DrainSchedulerStatusCode::FAILURE || drainResult == DrainSchedulerStatusCode::RECOVERABLE) { - ATH_MSG_ERROR("Failed to drain the scheduler"); - return StatusCode::FAILURE; - } - } while (drainResult != DrainSchedulerStatusCode::SCHEDULER_EMPTY); // while there were still events to finish - - // Now try to clear all event data slots (should have no effect if done already) - for (size_t islot=0; islot<nslots; ++islot) { - if (clearWBSlot(islot).isFailure()) { - ATH_MSG_ERROR("Failed to clear whiteboard slot " << islot); - return StatusCode::FAILURE; - } - } - - // Check if the cleanup succeeded - if (m_schedulerSvc->freeSlots() == nslots && m_whiteboard->freeSlots() == nslots) { - return StatusCode::SUCCESS; - } - - return StatusCode::FAILURE; + return StatusCode::SUCCESS; } diff --git a/HLT/Trigger/TrigControl/TrigServices/src/HltEventLoopMgr.h b/HLT/Trigger/TrigControl/TrigServices/src/HltEventLoopMgr.h index 44bc272e471e26a1334b5308b27ded493d1fb07e..a37df4c87d216efdeeb56b1aecae4e3b295fb05d 100644 --- a/HLT/Trigger/TrigControl/TrigServices/src/HltEventLoopMgr.h +++ b/HLT/Trigger/TrigControl/TrigServices/src/HltEventLoopMgr.h @@ -1,12 +1,15 @@ /* - Copyright (C) 2002-2023 CERN for the benefit of the ATLAS collaboration + Copyright (C) 2002-2024 CERN for the benefit of the ATLAS collaboration */ #ifndef TRIGSERVICES_HLTEVENTLOOPMGR_H #define TRIGSERVICES_HLTEVENTLOOPMGR_H -// Trigger includes +// Local includes +#include "EventLoopUtils.h" #include "TrigSORFromPtreeHelper.h" + +// Trigger includes #include "TrigKernel/ITrigEventLoopMgr.h" #include "TrigOutputHandling/HLTResultMTMaker.h" #include "TrigSteeringEvent/OnlineErrorCode.h" @@ -40,14 +43,13 @@ // TBB includes #include "tbb/concurrent_queue.h" -#include "tbb/task_arena.h" +#include "tbb/task_group.h" // System includes #include <atomic> #include <chrono> #include <condition_variable> #include <memory> -#include <thread> // Forward declarations class CondAttrListCollection; @@ -75,7 +77,13 @@ public: /// Standard constructor HltEventLoopMgr(const std::string& name, ISvcLocator* svcLoc); /// Standard destructor - virtual ~HltEventLoopMgr() = default; + virtual ~HltEventLoopMgr() noexcept override; + + // Copy and move not allowed + HltEventLoopMgr(const HltEventLoopMgr&) = delete; + HltEventLoopMgr(HltEventLoopMgr&&) = delete; + HltEventLoopMgr& operator=(const HltEventLoopMgr&) = delete; + HltEventLoopMgr& operator=(HltEventLoopMgr&&) = delete; /// @name Gaudi state transitions (overriden from AthService) ///@{ @@ -125,10 +133,12 @@ private: struct EventLoopStatus { /// Event source has more events std::atomic<bool> eventsAvailable{true}; - /// Event source temporarily paused providing events - std::atomic<bool> triggerOnHold{false}; /// No more events available and all ongoing processing has finished std::atomic<bool> loopEnded{false}; + /// Condition variable to notify the main thread of the end of the event loop + std::condition_variable loopEndedCond; + /// Mutex to notify the main thread of the end of the event loop + std::mutex loopEndedMutex; /// Max lumiblock number seen in the loop std::atomic<EventIDBase::number_type> maxLB{0}; /// Condition variable to synchronize COOL updates @@ -137,9 +147,9 @@ private: std::mutex coolUpdateMutex; /// COOL update ongoing bool coolUpdateOngoing{false}; + /// Event exit status code + StatusCode exitCode{StatusCode::SUCCESS}; }; - /// Enum type returned by the drainScheduler method - enum class DrainSchedulerStatusCode : int {INVALID=-3, FAILURE=-2, RECOVERABLE=-1, SCHEDULER_EMPTY=0, NO_EVENT=1, SUCCESS=2}; // ------------------------- Helper methods ---------------------------------- @@ -171,36 +181,37 @@ private: StatusCode execAtStart(const EventContext& ctx) const; /** @brief Handle a failure to process an event - * @return FAILURE breaks the event loop + * @return FAILURE means the event loop was flagged to stop (no new events will be requested) **/ StatusCode failedEvent(HLT::OnlineErrorCode errorCode, const EventContext& eventContext); - /// The method executed by the event timeout monitoring thread - void runEventTimer(); - /// Reset the timeout flag and the timer, and mark the slot as busy or idle according to the second argument void resetEventTimer(const EventContext& eventContext, bool processing); - /// Perform all start-of-event actions for a single new event and push it to the scheduler - StatusCode startNextEvent(EventLoopStatus& loopStatus); + /// Clear an event slot in the whiteboard + StatusCode clearWBSlot(size_t evtSlot) const; - /// Drain the scheduler from all actions that may be queued - DrainSchedulerStatusCode drainScheduler(); + /// @name Methods executed by LoopThreads + ///@{ + /// The method executed by the input handling thread + void inputThreadCallback(); - /// Perform all end-of-event actions for a single event popped out from the scheduler - DrainSchedulerStatusCode processFinishedEvent(); + /// The method executed by the output handling thread + void outputThreadCallback(); - /// Clear an event slot in the whiteboard - StatusCode clearWBSlot(size_t evtSlot) const; + /// The method executed by the event timeout monitoring thread + void eventTimerCallback(); + ///@} - /// Try to recover from a situation where scheduler and whiteboard see different number of free slots - StatusCode recoverFromStarvation(); + /// @name Methods executed by TBB tasks + ///@{ + /// Perform all start-of-event actions for a single new event and push it to the scheduler + StatusCode startNextEvent(); - /** @brief Try to drain the scheduler and clear all event data slots. - * Method of the last resort, used in attempts to recover from framework errors - **/ - StatusCode drainAllSlots(); + /// Perform all end-of-event actions for a single event popped out from the scheduler + StatusCode processFinishedEvent(); + ///@} // ------------------------- Handles to required services/tools -------------- ServiceHandle<IIncidentSvc> m_incidentSvc{this, "IncidentSvc", "IncidentSvc"}; @@ -237,24 +248,24 @@ private: Gaudi::Property<float> m_softTimeoutFraction{ this, "SoftTimeoutFraction", 0.8, "Fraction of the hard timeout to be set as the soft timeout"}; + Gaudi::Property<unsigned int> m_timeoutThreadIntervalMs{ + this, "TimeoutThreadIntervalMs", 1000, "How often the timeout thread checks for soft timeout, in milliseconds"}; + Gaudi::Property<bool> m_traceOnTimeout{ this, "TraceOnTimeout", true, "Print a stack trace on the first soft timeout (might take a while, holding all threads)"}; - Gaudi::Property<int> m_popFromSchedulerTimeout{ - this, "PopFromSchedulerTimeout", 200, - "Maximum time in milliseconds to wait for a finished event before checking " - "if there are free slots to refill in the meantime"}; - - Gaudi::Property<int> m_popFromSchedulerQueryInterval{ - this, "PopFromSchedulerQueryInterval", 5, - "Time to wait before asking again in case the Scheduler doesn't have a finished event available"}; - Gaudi::Property<int> m_maxParallelIOTasks{ this, "MaxParallelIOTasks", -1, "Maximum number of I/O tasks which can be executed in parallel. " "If <=0 then the number of scheduler threads is used."}; + Gaudi::Property<int> m_maxIOWakeUpIntervalMs{ + this, "MaxIOWakeUpIntervalMs", -1, + "Maximum time input or output handling thread will sleep unless notified. Negative value (default) means no limit, " + "i.e. threads will only wake up on notifications. Zero means threads will never wait for notifications. " + "Positive value means the number of milliseconds after which a thread will wake up if it's not notified earlier."}; + Gaudi::Property<int> m_maxFrameworkErrors{ this, "MaxFrameworkErrors", 10, "Tolerable number of recovered framework errors before exiting (<0 means all are tolerated)"}; @@ -297,10 +308,6 @@ private: this, "RewriteLVL1", false, "Encode L1 results to ByteStream and write to the output. Possible only with athenaHLT, not online."}; - Gaudi::Property<bool> m_popAll{ - this, "PopAllMode", true, "If true, pop all finished events from scheduler and process all results before filling " - "the slots again. If false, pop only one and refill the slot before popping another finished event."}; - Gaudi::Property<bool> m_monitorScheduler{ this, "MonitorScheduler", false, "Enable SchedulerMonSvc to collect scheduler status data in online histograms"}; @@ -339,28 +346,26 @@ private: std::vector<std::chrono::steady_clock::time_point> m_freeSlotStartPoint; /// Vector of flags to tell if a slot is idle or processing std::vector<bool> m_isSlotProcessing; // be aware of vector<bool> specialisation - /// Timeout mutex - std::mutex m_timeoutMutex; - /// Timeout condition variable - std::condition_variable m_timeoutCond; + /// Number of free slots used to synchronise input/output tasks + std::atomic<size_t> m_freeSlots{0}; + /// Input handling thread (triggers reading new events) + std::unique_ptr<HLT::LoopThread> m_inputThread; + /// Output handling thread (triggers post-processing of finished events) + std::unique_ptr<HLT::LoopThread> m_outputThread; /// Timeout thread - std::unique_ptr<std::thread> m_timeoutThread; + std::unique_ptr<HLT::LoopThread> m_timeoutThread; /// Soft timeout value set to HardTimeout*SoftTimeoutFraction at initialisation std::chrono::milliseconds m_softTimeoutValue{0}; - /// Task arena to enqueue parallel I/O tasks - std::unique_ptr<tbb::task_arena> m_parallelIOTaskArena; + /// Task group to execute parallel I/O tasks asynchronously + tbb::task_group m_parallelIOTaskGroup; /// Queue limiting the number of parallel I/O tasks tbb::concurrent_bounded_queue<bool> m_parallelIOQueue; /// Queue of events ready for output processing tbb::concurrent_bounded_queue<EventContext*> m_finishedEventsQueue; - /// Queue of result codes of output processing - tbb::concurrent_bounded_queue<DrainSchedulerStatusCode> m_drainSchedulerStatusQueue; - /// Queue of result codes of startNextEvent - tbb::concurrent_bounded_queue<StatusCode> m_startNextEventStatusQueue; + /// Object keeping track of the event loop status + EventLoopStatus m_loopStatus{}; /// Flag set when a soft timeout produces a stack trace, to avoid producing multiple traces bool m_timeoutTraceGenerated{false}; - /// Flag set to false if timer thread should be stopped - std::atomic<bool> m_runEventTimer{true}; /// Counter of framework errors std::atomic<int> m_nFrameworkErrors{0}; /// Application name diff --git a/HLT/Trigger/TrigControl/TrigServices/src/components/TrigServices_entries.cxx b/HLT/Trigger/TrigControl/TrigServices/src/components/TrigServices_entries.cxx index d7c75b8c796920fc26c2a1525f8d4f5254976f85..14bba4d4a1dd93d9f11b95e149b62bf0bc67f947 100644 --- a/HLT/Trigger/TrigControl/TrigServices/src/components/TrigServices_entries.cxx +++ b/HLT/Trigger/TrigControl/TrigServices/src/components/TrigServices_entries.cxx @@ -1,13 +1,11 @@ #include "../TrigMessageSvc.h" #include "../TrigMonTHistSvc.h" #include "../HltEventLoopMgr.h" -#include "../HltAsyncEventLoopMgr.h" #include "../HltROBDataProviderSvc.h" #include "../TrigCOOLUpdateHelper.h" DECLARE_COMPONENT( TrigMessageSvc ) DECLARE_COMPONENT( TrigMonTHistSvc ) DECLARE_COMPONENT( HltEventLoopMgr ) -DECLARE_COMPONENT( HltAsyncEventLoopMgr ) DECLARE_COMPONENT( HltROBDataProviderSvc ) DECLARE_COMPONENT( TrigCOOLUpdateHelper ) diff --git a/Trigger/TriggerCommon/TriggerJobOpts/python/TriggerConfigFlags.py b/Trigger/TriggerCommon/TriggerJobOpts/python/TriggerConfigFlags.py index a91743e1640092e295cf49082039f159ca046302..46ba109cd2e7b887f6b0e2ef66147c9d8f3d4f3a 100644 --- a/Trigger/TriggerCommon/TriggerJobOpts/python/TriggerConfigFlags.py +++ b/Trigger/TriggerCommon/TriggerJobOpts/python/TriggerConfigFlags.py @@ -60,9 +60,6 @@ def createTriggerFlags(doTriggerRecoFlags): flags.addFlag('Trigger.enableL1CaloLegacy', True, help='enable Run-2 L1Calo simulation and/or decoding') - flags.addFlag('Trigger.enableAsyncIO', True, - help='enable HltAsyncEventLoopMgr instead of HltEventLoopMgr') - # L1MuonSim category flags.addFlag('Trigger.L1MuonSim.EmulateNSW', False, help='enable emulation tool for NSW-TGC coincidence')