diff --git a/Event/ByteStreamCnvSvc/ByteStreamCnvSvc/ATLAS_CHECK_THREAD_SAFETY b/Event/ByteStreamCnvSvc/ByteStreamCnvSvc/ATLAS_CHECK_THREAD_SAFETY new file mode 100644 index 0000000000000000000000000000000000000000..dc9d13e70aff5e92140d7ffa19d81676a7a74d7f --- /dev/null +++ b/Event/ByteStreamCnvSvc/ByteStreamCnvSvc/ATLAS_CHECK_THREAD_SAFETY @@ -0,0 +1 @@ +Event/ByteStreamCnvSvc diff --git a/Event/ByteStreamCnvSvc/ByteStreamCnvSvc/ByteStreamCnvSvc.h b/Event/ByteStreamCnvSvc/ByteStreamCnvSvc/ByteStreamCnvSvc.h index 3afd7827940a8d42569143d13f9bd0500366d821..f3757af2ed92dc15f6c8338a282dd44506719880 100644 --- a/Event/ByteStreamCnvSvc/ByteStreamCnvSvc/ByteStreamCnvSvc.h +++ b/Event/ByteStreamCnvSvc/ByteStreamCnvSvc/ByteStreamCnvSvc.h @@ -1,12 +1,15 @@ /* - Copyright (C) 2002-2017 CERN for the benefit of the ATLAS collaboration + Copyright (C) 2002-2020 CERN for the benefit of the ATLAS collaboration */ #ifndef BYTESTREAMCNVSVC_BYTESTREAMCNVSVC_H #define BYTESTREAMCNVSVC_BYTESTREAMCNVSVC_H #include "ByteStreamCnvSvcBase/ByteStreamCnvSvcBase.h" +#include "ByteStreamCnvSvcBase/FullEventAssembler.h" #include "StoreGate/StoreGateSvc.h" +#include "AthenaKernel/SlotSpecificObj.h" +#include "GaudiKernel/ThreadLocalContext.h" #include "GaudiKernel/ServiceHandle.h" #include <map> @@ -15,7 +18,7 @@ class ByteStreamOutputSvc; class FullEventAssemblerBase; /** @class ByteStreamCnvSvc - @brief Gaudi COnversion Service class for ByteStream Persistency + @brief Gaudi Conversion Service class for ByteStream Persistency This class is responsible for converting data object to and from BS format It inherits from ByteStreamCnvSvcBase, which is used by HLT in online. @@ -38,22 +41,27 @@ public: virtual ~ByteStreamCnvSvc(); /// Gaudi Service Interface method implementations: - virtual StatusCode initialize(); - virtual StatusCode finalize(); + virtual StatusCode initialize() override; + virtual StatusCode finalize() override; /// Implements ConversionSvc's connectOutput - virtual StatusCode connectOutput(const std::string& t, const std::string& mode); - virtual StatusCode connectOutput(const std::string& t); + virtual StatusCode connectOutput(const std::string& t, const std::string& mode) override; + virtual StatusCode connectOutput(const std::string& t) override; /// Implements ConversionSvc's commitOutput - virtual StatusCode commitOutput(const std::string& outputConnection, bool b); + virtual StatusCode commitOutput(const std::string& outputConnection, bool b) override; - /// @brief Access to FullEventAssembler - template <class T> StatusCode getFullEventAssembler(T*&t, std::string nm); + /// Implementation of IByteStreamEventAccess: Get RawEvent + virtual RawEventWrite* getRawEvent() override + { + return m_slots->m_rawEventWrite.get(); + } + + /// @brief Access to FullEventAssembler + template <class T> StatusCode getFullEventAssembler(T*&t, const std::string& nm); protected: - /// Write the FEA to RawEvent. - void writeFEA(); + RawEventWrite* setRawEvent (std::unique_ptr<RawEventWrite> rawEventWrite); private: /// name of the service @@ -81,22 +89,43 @@ private: std::string m_userType; /// @brief common FEA, indexed by string key - std::map<std::string, FullEventAssemblerBase*> m_feaMap; + using FEAPtr_t = std::unique_ptr<FullEventAssemblerBase>; + using FEAMap_t = std::map<std::string, FEAPtr_t>; + + /// Slot-specific state. + struct SlotData + { + std::unique_ptr<RawEventWrite> m_rawEventWrite; + FEAMap_t m_feaMap; + std::vector<uint32_t> m_tagBuff; + std::vector<uint32_t> m_l1Buff; + std::vector<uint32_t> m_l2Buff; + std::vector<uint32_t> m_efBuff; + + void clear() + { + m_rawEventWrite.reset(); + m_feaMap.clear(); + m_tagBuff.clear(); + m_l1Buff.clear(); + m_l2Buff.clear(); + m_efBuff.clear(); + } + }; + SG::SlotSpecificObj<SlotData> m_slots; - /// Cache for serialised event header data - std::vector<std::unique_ptr<uint32_t[]>> m_serialiseCache; - - /// Add new array to the cache - uint32_t* newCachedArray(const size_t size) { - return m_serialiseCache.emplace_back(std::make_unique<uint32_t[]>(size)).get(); - } + /// Write the FEA to RawEvent. + void writeFEA (SlotData& slot); }; // Implementation of template method: -template <class T> StatusCode ByteStreamCnvSvc::getFullEventAssembler(T*& t, std::string nm) { - std::map<std::string, FullEventAssemblerBase*>::const_iterator it = m_feaMap.find(nm); - if (it != m_feaMap.end()) { - T* p = dynamic_cast<T*>((*it).second); +template <class T> StatusCode ByteStreamCnvSvc::getFullEventAssembler(T*& t, const std::string& nm) +{ + const EventContext& ctx = Gaudi::Hive::currentContext(); + FEAMap_t& feaMap = m_slots.get (ctx)->m_feaMap; + FEAPtr_t& fea = feaMap[nm]; + if (fea) { + T* p = dynamic_cast<T*>(fea.get()); if (p == 0) { ATH_MSG_WARNING(" Key = " << nm << " exists, but of different type"); return(StatusCode::FAILURE); @@ -106,8 +135,9 @@ template <class T> StatusCode ByteStreamCnvSvc::getFullEventAssembler(T*& t, std } // reach here if key does not exist - t = new T(); - m_feaMap[nm] = t; + auto ptr = std::make_unique<T>(); + t = ptr.get(); + fea = std::move (ptr); return(StatusCode::SUCCESS); } diff --git a/Event/ByteStreamCnvSvc/ByteStreamCnvSvc/ByteStreamOutputSvc.h b/Event/ByteStreamCnvSvc/ByteStreamCnvSvc/ByteStreamOutputSvc.h index 5d773c986ef7a985b72ecce2d12d7c137e296c23..dd7f676a719c4e21ee60d9727a445eed8b9c3735 100644 --- a/Event/ByteStreamCnvSvc/ByteStreamCnvSvc/ByteStreamOutputSvc.h +++ b/Event/ByteStreamCnvSvc/ByteStreamCnvSvc/ByteStreamOutputSvc.h @@ -33,10 +33,10 @@ class ByteStreamOutputSvc : public ::AthService { static const InterfaceID& interfaceID(); /// virtual method for writing the event - virtual bool putEvent(RawEvent* re) = 0; + virtual bool putEvent(const RawEvent* re) = 0; /// context-aware method for writing the event - virtual bool putEvent(RawEvent* re, const EventContext& ctx) = 0; + virtual bool putEvent(const RawEvent* re, const EventContext& ctx) = 0; }; diff --git a/Event/ByteStreamCnvSvc/CMakeLists.txt b/Event/ByteStreamCnvSvc/CMakeLists.txt index 91e915fd767a1365619d7a61ab2b5cb9498cde87..7a5351199fc2960397b68640816d383da878eed1 100644 --- a/Event/ByteStreamCnvSvc/CMakeLists.txt +++ b/Event/ByteStreamCnvSvc/CMakeLists.txt @@ -28,17 +28,17 @@ atlas_add_component( ByteStreamCnvSvc # Executables in the package: atlas_add_executable( AtlFindBSEvent test/AtlFindBSEvent.cxx INCLUDE_DIRS ${TDAQ-COMMON_INCLUDE_DIRS} ${Boost_INCLUDE_DIRS} - LINK_LIBRARIES ${TDAQ-COMMON_LIBRARIES} ${Boost_LIBRARIES} ) + LINK_LIBRARIES CxxUtils ${TDAQ-COMMON_LIBRARIES} ${Boost_LIBRARIES} ) atlas_add_executable( AtlCopyBSEvent test/AtlCopyBSEvent.cxx INCLUDE_DIRS ${CORAL_INCLUDE_DIRS} ${TDAQ-COMMON_INCLUDE_DIRS} ${Boost_INCLUDE_DIRS} - LINK_LIBRARIES ${CORAL_LIBRARIES} ${TDAQ-COMMON_LIBRARIES} ${Boost_LIBRARIES} + LINK_LIBRARIES CxxUtils ${CORAL_LIBRARIES} ${TDAQ-COMMON_LIBRARIES} ${Boost_LIBRARIES} CollectionBase FileCatalog PersistentDataModel ) atlas_add_executable( AtlListBSEvents test/AtlListBSEvents.cxx INCLUDE_DIRS ${TDAQ-COMMON_INCLUDE_DIRS} ${Boost_INCLUDE_DIRS} - LINK_LIBRARIES ${TDAQ-COMMON_LIBRARIES} ${Boost_LIBRARIES} ) + LINK_LIBRARIES CxxUtils ${TDAQ-COMMON_LIBRARIES} ${Boost_LIBRARIES} ) # Test(s) in the package: atlas_add_test( BSEventSelector diff --git a/Event/ByteStreamCnvSvc/src/ByteStreamCnvSvc.cxx b/Event/ByteStreamCnvSvc/src/ByteStreamCnvSvc.cxx index fc5836a7b3904dcc2ab60a12bc27b57cb2650a7b..545260bc556d0000601196457e64fc6557015b61 100644 --- a/Event/ByteStreamCnvSvc/src/ByteStreamCnvSvc.cxx +++ b/Event/ByteStreamCnvSvc/src/ByteStreamCnvSvc.cxx @@ -7,6 +7,7 @@ #include "ByteStreamCnvSvcBase/FullEventAssembler.h" #include "ByteStreamCnvSvcBase/ByteStreamAddress.h" +#include "StoreGate/StoreGateSvc.h" #include "xAODEventInfo/EventInfo.h" #include "xAODTrigger/TrigDecision.h" @@ -107,6 +108,9 @@ StatusCode ByteStreamCnvSvc::connectOutput(const std::string& t, const std::stri StatusCode ByteStreamCnvSvc::connectOutput(const std::string& /*t*/) { ATH_MSG_DEBUG("In connectOutput"); + const EventContext& ctx = Gaudi::Hive::currentContext(); + SlotData& slot = *m_slots.get(ctx); + // Get the EventInfo obj for run/event number const xAOD::EventInfo* evtInfo{nullptr}; ATH_CHECK( m_evtStore->retrieve(evtInfo) ); @@ -123,11 +127,10 @@ StatusCode ByteStreamCnvSvc::connectOutput(const std::string& /*t*/) { uint64_t global_id = event; uint16_t lumi_block = evtInfo->lumiBlock(); uint16_t bc_id = evtInfo->bcid(); - static uint8_t nevt = 0; - nevt = nevt%255; + uint8_t nevt = 0; // create an empty RawEvent eformat::helper::SourceIdentifier sid = eformat::helper::SourceIdentifier(eformat::FULL_SD_EVENT, nevt); - m_rawEventWrite = new RawEventWrite(sid.code(), bc_time_sec, bc_time_ns, global_id, run_type, run_no, lumi_block, lvl1_id, bc_id, lvl1_type); + RawEventWrite* re = setRawEvent (std::make_unique<RawEventWrite>(sid.code(), bc_time_sec, bc_time_ns, global_id, run_type, run_no, lumi_block, lvl1_id, bc_id, lvl1_type)); // set stream tags std::vector<eformat::helper::StreamTag> on_streamTags; @@ -135,9 +138,9 @@ StatusCode ByteStreamCnvSvc::connectOutput(const std::string& /*t*/) { on_streamTags.emplace_back(sTag.name(), sTag.type(), sTag.obeysLumiblock(), sTag.robs(), detsOnline(sTag.dets())); } uint32_t nStreamTagWords = eformat::helper::size_word(on_streamTags); - uint32_t* sTagBuff = newCachedArray(nStreamTagWords); - eformat::helper::encode(on_streamTags, nStreamTagWords, sTagBuff); - m_rawEventWrite->stream_tag(nStreamTagWords, sTagBuff); + slot.m_tagBuff.resize (nStreamTagWords); + eformat::helper::encode(on_streamTags, nStreamTagWords, slot.m_tagBuff.data()); + re->stream_tag(nStreamTagWords, slot.m_tagBuff.data()); // try to get TrigDecision const xAOD::TrigDecision *trigDecision{nullptr}; @@ -152,34 +155,32 @@ StatusCode ByteStreamCnvSvc::connectOutput(const std::string& /*t*/) { const std::vector<uint32_t> &tav = trigDecision->tav(); const size_t l1TotSize = tbp.size() + tap.size() + tav.size(); if (l1TotSize > 0) { - uint32_t* l1Buff = newCachedArray(l1TotSize); + slot.m_l1Buff.resize (l1TotSize); size_t l1Size{0}; for (const uint32_t tb : tbp) { - l1Buff[l1Size++] = tb; + slot.m_l1Buff[l1Size++] = tb; } for (const uint32_t tb : tap) { - l1Buff[l1Size++] = tb; + slot.m_l1Buff[l1Size++] = tb; } for (const uint32_t tb : tav) { - l1Buff[l1Size++] = tb; + slot.m_l1Buff[l1Size++] = tb; } - m_rawEventWrite->lvl1_trigger_info(l1TotSize, l1Buff); + re->lvl1_trigger_info(l1TotSize, slot.m_l1Buff.data()); } // LVL2 info const std::vector<uint32_t>& lvl2PP = trigDecision->lvl2PassedPhysics(); if (lvl2PP.size() > 0) { - uint32_t* l2Buff = newCachedArray(lvl2PP.size()); - std::copy(lvl2PP.begin(), lvl2PP.end(), l2Buff); - m_rawEventWrite->lvl2_trigger_info(lvl2PP.size(), l2Buff); + slot.m_l2Buff = lvl2PP; + re->lvl2_trigger_info(lvl2PP.size(), slot.m_l2Buff.data()); } // EF info const std::vector<uint32_t>& efPP = trigDecision->efPassedPhysics(); if (efPP.size() > 0) { - uint32_t* efBuff = newCachedArray(efPP.size()); - std::copy(efPP.begin(), efPP.end(), efBuff); - m_rawEventWrite->event_filter_info(efPP.size(), efBuff); + slot.m_efBuff = efPP; + re->event_filter_info(efPP.size(), slot.m_efBuff.data()); } return(StatusCode::SUCCESS); @@ -188,21 +189,26 @@ StatusCode ByteStreamCnvSvc::connectOutput(const std::string& /*t*/) { StatusCode ByteStreamCnvSvc::commitOutput(const std::string& outputConnection, bool /*b*/) { ATH_MSG_DEBUG("In flushOutput " << outputConnection); + const EventContext& ctx = Gaudi::Hive::currentContext(); + SlotData& slot = *m_slots.get(ctx); + if (m_ioSvcMap.size() == 0) { ATH_MSG_ERROR("ByteStreamCnvSvc not configure for output"); return(StatusCode::FAILURE); } - writeFEA(); + + writeFEA (slot); // convert RawEventWrite to RawEvent - uint32_t rawSize = m_rawEventWrite->size_word(); - uint32_t* buffer = newCachedArray(rawSize); - uint32_t count = eformat::write::copy(*(m_rawEventWrite->bind()), buffer, rawSize); + RawEventWrite* re = slot.m_rawEventWrite.get(); + uint32_t rawSize = re->size_word(); + std::vector<uint32_t> buffer (rawSize); + uint32_t count = eformat::write::copy(*(re->bind()), buffer.data(), rawSize); if (count != rawSize) { ATH_MSG_ERROR("Memcopy failed"); return(StatusCode::FAILURE); } - RawEvent rawEvent(buffer); + RawEvent rawEvent(buffer.data()); // check validity try { rawEvent.check_tree(); @@ -224,25 +230,27 @@ StatusCode ByteStreamCnvSvc::commitOutput(const std::string& outputConnection, b return(StatusCode::FAILURE); } } - // delete RawEventWrite - delete m_rawEventWrite; m_rawEventWrite = 0; - // delete FEA - for (std::map<std::string, FullEventAssemblerBase*>::const_iterator it = m_feaMap.begin(), - itE = m_feaMap.end(); it != itE; it++) { - delete it->second; - } - m_feaMap.clear(); - // delete cache - m_serialiseCache.clear(); + // Clear slot-specific data. + slot.clear(); return(StatusCode::SUCCESS); } -void ByteStreamCnvSvc::writeFEA() { - ATH_MSG_DEBUG("before FEAMAP size = " << m_feaMap.size()); - for (std::map<std::string, FullEventAssemblerBase*>::const_iterator it = m_feaMap.begin(), - itE = m_feaMap.end(); it != itE; it++) { +void ByteStreamCnvSvc::writeFEA (SlotData& slot) +{ + FEAMap_t& feaMap = slot.m_feaMap; + ATH_MSG_DEBUG("before FEAMAP size = " << feaMap.size()); + for (auto& p : feaMap) { MsgStream log(msgSvc(), name()); - (*it).second->fill(m_rawEventWrite, log); + p.second->fill(slot.m_rawEventWrite.get(), log); } - ATH_MSG_DEBUG("after FEAMAP size = " << m_feaMap.size()); + ATH_MSG_DEBUG("after FEAMAP size = " << feaMap.size()); } + +RawEventWrite* +ByteStreamCnvSvc::setRawEvent (std::unique_ptr<RawEventWrite> rawEventWrite) +{ + RawEventWrite* ptr = rawEventWrite.get(); + m_slots->m_rawEventWrite = std::move (rawEventWrite); + return ptr; +} + diff --git a/Event/ByteStreamCnvSvc/src/ByteStreamEventStorageInputSvc.h b/Event/ByteStreamCnvSvc/src/ByteStreamEventStorageInputSvc.h index d3a793f19dd838d6171e09c1fceef67b612ddfe4..c0a878753c4ea4487e0e81f95bc88cbc4eeb171e 100644 --- a/Event/ByteStreamCnvSvc/src/ByteStreamEventStorageInputSvc.h +++ b/Event/ByteStreamCnvSvc/src/ByteStreamEventStorageInputSvc.h @@ -83,7 +83,7 @@ private: // data std::unique_ptr<EventStorage::DataReader> m_reader; //!< DataReader from EventStorage - mutable std::vector<long long int> m_evtOffsets; //!< offset for event i in that file + std::vector<long long int> m_evtOffsets; //!< offset for event i in that file unsigned int m_evtInFile; long long int m_evtFileOffset; //!< last read in event offset within a file, can be -1 // Event back navigation info diff --git a/Event/ByteStreamCnvSvc/src/ByteStreamEventStorageOutputSvc.cxx b/Event/ByteStreamCnvSvc/src/ByteStreamEventStorageOutputSvc.cxx index 41bdde3d539e07c627860f1afffe257cc59a9905..d6ef377e9692f46959dcebab0c83a9527fe35eab 100644 --- a/Event/ByteStreamCnvSvc/src/ByteStreamEventStorageOutputSvc.cxx +++ b/Event/ByteStreamCnvSvc/src/ByteStreamEventStorageOutputSvc.cxx @@ -171,7 +171,7 @@ ByteStreamEventStorageOutputSvc::initDataWriterContents( bool -ByteStreamEventStorageOutputSvc::putEvent(RawEvent* re) { +ByteStreamEventStorageOutputSvc::putEvent(const RawEvent* re) { // Read the next event. return putEvent(re, Gaudi::Hive::currentContext()); } @@ -179,7 +179,7 @@ ByteStreamEventStorageOutputSvc::putEvent(RawEvent* re) { bool ByteStreamEventStorageOutputSvc::putEvent( - RawEvent* re, const EventContext& ctx) { + const RawEvent* re, const EventContext& ctx) { // Read the next event. using OFFLINE_FRAGMENTS_NAMESPACE::DataType; using OFFLINE_FRAGMENTS_NAMESPACE::PointerType; diff --git a/Event/ByteStreamCnvSvc/src/ByteStreamEventStorageOutputSvc.h b/Event/ByteStreamCnvSvc/src/ByteStreamEventStorageOutputSvc.h index fb784c48e69e0657f7948a3731463a0caadc609e..3cf481b72fd0411e9d068ac3a9c023b594a03673 100644 --- a/Event/ByteStreamCnvSvc/src/ByteStreamEventStorageOutputSvc.h +++ b/Event/ByteStreamCnvSvc/src/ByteStreamEventStorageOutputSvc.h @@ -69,8 +69,8 @@ class ByteStreamEventStorageOutputSvc : /// Implementation of the ByteStreamOutputSvc interface method putEvent. - bool putEvent(RawEvent* re) override; - bool putEvent(RawEvent* re, const EventContext& ctx) override; + virtual bool putEvent(const RawEvent* re) override; + virtual bool putEvent(const RawEvent* re, const EventContext& ctx) override; // Callback method to reinitialize the internal state of the component // for I/O purposes (e.g. upon @c fork(2)) diff --git a/Event/ByteStreamCnvSvc/src/ByteStreamMergeOutputSvc.cxx b/Event/ByteStreamCnvSvc/src/ByteStreamMergeOutputSvc.cxx index 506706b4da22bbc88cf972f3e324b2fb9fd77e15..1028f743deb617f780fa85807f9ea93a57519847 100644 --- a/Event/ByteStreamCnvSvc/src/ByteStreamMergeOutputSvc.cxx +++ b/Event/ByteStreamCnvSvc/src/ByteStreamMergeOutputSvc.cxx @@ -77,9 +77,9 @@ uint32_t ByteStreamMergeOutputSvc::reducedROBid(uint32_t source_id) { } // Read the next event. -bool ByteStreamMergeOutputSvc::putEvent(RawEvent* newEvent) { +bool ByteStreamMergeOutputSvc::putEvent(const RawEvent* newEvent) { // get original event - RawEvent* orgEvent = (RawEvent*)m_inSvc->currentEvent(); + const RawEvent* orgEvent = m_inSvc->currentEvent(); ATH_MSG_DEBUG("original BS size = " << 4 * orgEvent->fragment_size_word()); ATH_MSG_DEBUG("athena BS size = " << 4 * newEvent->fragment_size_word()); @@ -120,7 +120,7 @@ bool ByteStreamMergeOutputSvc::putEvent(RawEvent* newEvent) { } RawEventWrite* mergedEventWrite = new RawEventWrite(); // copy header - RawEvent *event = orgEvent; + const RawEvent *event = orgEvent; if (m_overwriteHeader) { event = newEvent; } @@ -171,7 +171,7 @@ bool ByteStreamMergeOutputSvc::putEvent(RawEvent* newEvent) { return(true); } -bool ByteStreamMergeOutputSvc::putEvent(RawEvent* /*re*/, const EventContext& /*ctx*/) { +bool ByteStreamMergeOutputSvc::putEvent(const RawEvent* /*re*/, const EventContext& /*ctx*/) { ATH_MSG_FATAL(name() << " does not implement the context-aware putEvent method"); return false; } diff --git a/Event/ByteStreamCnvSvc/src/ByteStreamMergeOutputSvc.h b/Event/ByteStreamCnvSvc/src/ByteStreamMergeOutputSvc.h index 9794a8a8332a7557514104efa96fb53de093a639..a2b5df066fedcaba2f448f045cecab55e61a08d0 100644 --- a/Event/ByteStreamCnvSvc/src/ByteStreamMergeOutputSvc.h +++ b/Event/ByteStreamCnvSvc/src/ByteStreamMergeOutputSvc.h @@ -29,8 +29,8 @@ public: virtual StatusCode initialize() override; /// Implementation of the ByteStreamOutputSvc interface methods. - virtual bool putEvent(RawEvent* re) override; - virtual bool putEvent(RawEvent* re, const EventContext& ctx) override; + virtual bool putEvent(const RawEvent* re) override; + virtual bool putEvent(const RawEvent* re, const EventContext& ctx) override; /// Required of all Gaudi services: see Gaudi documentation for details StatusCode queryInterface(const InterfaceID& riid, void** ppvInterface) override; diff --git a/Event/ByteStreamCnvSvc/src/ByteStreamOutputStreamCopyTool.cxx b/Event/ByteStreamCnvSvc/src/ByteStreamOutputStreamCopyTool.cxx index a562870a4f4afd7bd72f18769c6c357fedc0c444..d8de560338e6ba02717f6f6118805decfb1bfcb2 100644 --- a/Event/ByteStreamCnvSvc/src/ByteStreamOutputStreamCopyTool.cxx +++ b/Event/ByteStreamCnvSvc/src/ByteStreamOutputStreamCopyTool.cxx @@ -1,5 +1,5 @@ /* - Copyright (C) 2002-2017 CERN for the benefit of the ATLAS collaboration + Copyright (C) 2002-2020 CERN for the benefit of the ATLAS collaboration */ /** @file ByteStreamOutputStreamCopyTool.cxx @@ -86,12 +86,11 @@ StatusCode ByteStreamOutputStreamCopyTool::connectOutput(const std::string& /*ou StatusCode ByteStreamOutputStreamCopyTool::commitOutput(bool/* doCommit*/) { MsgStream log(msgSvc(), name()); log << MSG::DEBUG << "In commitOutput" << endmsg; - const RawEvent* re_c = m_inputSvc->currentEvent() ; - if(!re_c){ + const RawEvent* re = m_inputSvc->currentEvent() ; + if(!re){ log << MSG::ERROR << " failed to get the current event from ByteStreamInputSvc " << endmsg; return StatusCode::FAILURE ; } - RawEvent* re = const_cast<RawEvent*>(re_c); if( ! m_outputSvc->putEvent(re) ) { log << MSG::ERROR << " failed to write event to ByteStreamOutputSvc " << endmsg; diff --git a/Event/ByteStreamCnvSvc/src/ByteStreamRDP_OutputSvc.cxx b/Event/ByteStreamCnvSvc/src/ByteStreamRDP_OutputSvc.cxx index 517ece2152b7ad6aa69a2739ed2d654bc3fce95e..468cfe1b4704c735c6f72aafef8be35bccf85184 100644 --- a/Event/ByteStreamCnvSvc/src/ByteStreamRDP_OutputSvc.cxx +++ b/Event/ByteStreamCnvSvc/src/ByteStreamRDP_OutputSvc.cxx @@ -36,12 +36,12 @@ StatusCode ByteStreamRDP_OutputSvc::initialize() { } // Receive the next event without explicit context -bool ByteStreamRDP_OutputSvc::putEvent(RawEvent* re) { +bool ByteStreamRDP_OutputSvc::putEvent(const RawEvent* re) { return putEvent(re, Gaudi::Hive::currentContext()); } // Receive the next event -bool ByteStreamRDP_OutputSvc::putEvent(RawEvent* re, const EventContext& ctx) { +bool ByteStreamRDP_OutputSvc::putEvent(const RawEvent* re, const EventContext& ctx) { EventCache* cache = m_eventsCache.get(ctx); cache->releaseEvent(); const uint32_t reSize = re->fragment_size_word(); diff --git a/Event/ByteStreamCnvSvc/src/ByteStreamRDP_OutputSvc.h b/Event/ByteStreamCnvSvc/src/ByteStreamRDP_OutputSvc.h index af2d3e1c4eda170085c6c015769655ce63cdb154..87757e2caf4a43b37734db5bc81c8269b556c8bd 100644 --- a/Event/ByteStreamCnvSvc/src/ByteStreamRDP_OutputSvc.h +++ b/Event/ByteStreamCnvSvc/src/ByteStreamRDP_OutputSvc.h @@ -31,8 +31,8 @@ public: virtual StatusCode initialize() override; /// Implementation of the ByteStreamOutputSvc interface methods. - virtual bool putEvent(RawEvent* re) override; - virtual bool putEvent(RawEvent* re, const EventContext& ctx) override; + virtual bool putEvent(const RawEvent* re) override; + virtual bool putEvent(const RawEvent* re, const EventContext& ctx) override; /// Required of all Gaudi services: see Gaudi documentation for details StatusCode queryInterface(const InterfaceID& riid, void** ppvInterface) override; diff --git a/Event/ByteStreamCnvSvc/src/EventContextByteStream.cxx b/Event/ByteStreamCnvSvc/src/EventContextByteStream.cxx index d5041d91927ddf7b352e56d7a5b68d70d7573c13..7239a67c622c573913e3a7a8e3f652bfe7bad39f 100644 --- a/Event/ByteStreamCnvSvc/src/EventContextByteStream.cxx +++ b/Event/ByteStreamCnvSvc/src/EventContextByteStream.cxx @@ -1,5 +1,5 @@ /* - Copyright (C) 2002-2017 CERN for the benefit of the ATLAS collaboration + Copyright (C) 2002-2020 CERN for the benefit of the ATLAS collaboration */ /** @file EventContextByteStream.cxx @@ -9,6 +9,7 @@ **/ #include "EventContextByteStream.h" +#include "CxxUtils/checker_macros.h" //________________________________________________________________________________ EventContextByteStream::EventContextByteStream(const IEvtSelector* selector) : m_evtSelector(selector) { @@ -22,5 +23,6 @@ EventContextByteStream::~EventContextByteStream() { } //________________________________________________________________________________ void* EventContextByteStream::identifier() const { - return((void*)(m_evtSelector)); + IEvtSelector* id ATLAS_THREAD_SAFE = const_cast<IEvtSelector*> (m_evtSelector); + return id; } diff --git a/Event/ByteStreamCnvSvc/src/EventContextByteStream.h b/Event/ByteStreamCnvSvc/src/EventContextByteStream.h index 53cb88882899cddf16741a7a6f4b9bc79f35280a..33a09ae6f5e4fa8b901b5469b7ae41b842f3873f 100644 --- a/Event/ByteStreamCnvSvc/src/EventContextByteStream.h +++ b/Event/ByteStreamCnvSvc/src/EventContextByteStream.h @@ -1,5 +1,5 @@ /* - Copyright (C) 2002-2017 CERN for the benefit of the ATLAS collaboration + Copyright (C) 2002-2020 CERN for the benefit of the ATLAS collaboration */ #ifndef EVENTCONTEXTBYTESTREAM_H diff --git a/Event/ByteStreamCnvSvc/src/EventInfoByteStreamAuxCnv.cxx b/Event/ByteStreamCnvSvc/src/EventInfoByteStreamAuxCnv.cxx index d12ea084e81e3cd415300de53f70969e976628d2..3eeaf3b09b8732fac25b68d5a2b19fdf53c138c0 100644 --- a/Event/ByteStreamCnvSvc/src/EventInfoByteStreamAuxCnv.cxx +++ b/Event/ByteStreamCnvSvc/src/EventInfoByteStreamAuxCnv.cxx @@ -1,5 +1,5 @@ /* - Copyright (C) 2002-2019 CERN for the benefit of the ATLAS collaboration + Copyright (C) 2002-2020 CERN for the benefit of the ATLAS collaboration */ #include "EventInfoByteStreamAuxCnv.h" @@ -278,7 +278,7 @@ StatusCode EventInfoByteStreamAuxCnv::createRep(DataObject* /*pObj*/, IOpaqueAdd return StatusCode::SUCCESS; } -const char* EventInfoByteStreamAuxCnv::ascTime(unsigned int tstamp) +std::string EventInfoByteStreamAuxCnv::ascTime(unsigned int tstamp) { struct tm t; t.tm_sec = tstamp; @@ -291,6 +291,9 @@ const char* EventInfoByteStreamAuxCnv::ascTime(unsigned int tstamp) t.tm_yday = 00; t.tm_isdst = 0; time_t ct = mktime(&t); - tm* t2 = gmtime(&ct); - return(asctime(t2)); + struct tm t2; + gmtime_r(&ct, &t2); + char buf[32]; + asctime_r (&t2, buf); + return std::string (buf); } diff --git a/Event/ByteStreamCnvSvc/src/EventInfoByteStreamAuxCnv.h b/Event/ByteStreamCnvSvc/src/EventInfoByteStreamAuxCnv.h index b201e9846f67de84433ac33eace9bbe54b80de56..3f0c988f133bbcf501e854646d41d10ec76c52a0 100644 --- a/Event/ByteStreamCnvSvc/src/EventInfoByteStreamAuxCnv.h +++ b/Event/ByteStreamCnvSvc/src/EventInfoByteStreamAuxCnv.h @@ -1,5 +1,5 @@ /* - Copyright (C) 2002-2019 CERN for the benefit of the ATLAS collaboration + Copyright (C) 2002-2020 CERN for the benefit of the ATLAS collaboration */ #ifndef EVENTINFOBYTESTREAMAUXCNV_H @@ -46,7 +46,7 @@ class EventInfoByteStreamAuxCnv : public Converter, public AthMessaging static const CLID& classID(); private: - const char* ascTime(unsigned int t); //!< convert timestamp to ascii time. + std::string ascTime(unsigned int t); //!< convert timestamp to ascii time. ByteStreamCnvSvc* m_ByteStreamCnvSvc; //!< pointer to BS CnvSvc ServiceHandle<IROBDataProviderSvc> m_robDataProvider; //!< RODDataProviderSvc handle ServiceHandle<StoreGateSvc> m_mdSvc; //!< TDS handle diff --git a/Event/ByteStreamCnvSvc/src/EventSelectorByteStream.cxx b/Event/ByteStreamCnvSvc/src/EventSelectorByteStream.cxx index 8ecff3c9a53f9860fa21a8c9c69b9ef96dc0339c..b481be19b4e4710ae29959caa60f4b1b6a008ef8 100644 --- a/Event/ByteStreamCnvSvc/src/EventSelectorByteStream.cxx +++ b/Event/ByteStreamCnvSvc/src/EventSelectorByteStream.cxx @@ -52,8 +52,9 @@ EventSelectorByteStream::EventSelectorByteStream( /******************************************************************************/ void EventSelectorByteStream::inputCollectionsHandler(Gaudi::Details::PropertyBase&) { + lock_t lock (m_mutex); if (this->FSMState() != Gaudi::StateMachine::OFFLINE) { - this->reinit().ignore(); + this->reinit(lock).ignore(); } } @@ -158,12 +159,13 @@ StatusCode EventSelectorByteStream::initialize() { } // Must happen before trying to open a file - StatusCode risc = this->reinit(); + lock_t lock (m_mutex); + StatusCode risc = this->reinit(lock); return risc; } //__________________________________________________________________________ -StatusCode EventSelectorByteStream::reinit() { +StatusCode EventSelectorByteStream::reinit(lock_t& /*lock*/) { ATH_MSG_INFO("reinitialization..."); // reset markers if (m_inputCollectionsProp.value().size()>0) { @@ -198,6 +200,7 @@ StatusCode EventSelectorByteStream::reinit() { //________________________________________________________________________________ StatusCode EventSelectorByteStream::start() { ATH_MSG_DEBUG("Calling EventSelectorByteStream::start()"); + lock_t lock (m_mutex); // If file based input then fire appropriate incidents if (m_filebased) { if (!m_firstFileFired) { @@ -207,7 +210,7 @@ StatusCode EventSelectorByteStream::start() { } // try to open a file - if (this->openNewRun().isFailure()) { + if (this->openNewRun(lock).isFailure()) { ATH_MSG_FATAL("Unable to open any file in initialize"); return(StatusCode::FAILURE); } @@ -272,14 +275,14 @@ StatusCode EventSelectorByteStream::finalize() { return(AthService::finalize()); } -void EventSelectorByteStream::nextFile() const { +void EventSelectorByteStream::nextFile(lock_t& /*lock*/) const { FileIncident endInputFileIncident(name(), "EndInputFile", "BSF:" + *m_inputCollectionsIterator); m_incidentSvc->fireIncident(endInputFileIncident); ++m_inputCollectionsIterator; ++m_fileCount; } -StatusCode EventSelectorByteStream::openNewRun() const { +StatusCode EventSelectorByteStream::openNewRun(lock_t& lock) const { // Should be protected upstream, but this is further protection if (!m_filebased) { ATH_MSG_ERROR("cannot open new run for non-filebased inputs"); @@ -310,16 +313,16 @@ StatusCode EventSelectorByteStream::openNewRun() const { if (nev == 0) { ATH_MSG_WARNING("no events in file " << blockname << " try next"); if (m_eventSource->ready()) m_eventSource->closeBlockIterator(true); - this->nextFile(); - return openNewRun(); + this->nextFile(lock); + return openNewRun(lock); // check if skipping all events in that file (minus events already skipped) } else if (m_skipEvents.value() - m_NumEvents > nev) { ATH_MSG_WARNING("skipping more events " << m_skipEvents.value() - m_NumEvents << "(" << nev <<") than in file " << *m_inputCollectionsIterator << ", try next"); m_NumEvents += nev; m_numEvt[m_fileCount] = nev; if (m_eventSource->ready()) m_eventSource->closeBlockIterator(true); - this->nextFile(); - return openNewRun(); + this->nextFile(lock); + return openNewRun(lock); } ATH_MSG_DEBUG("Opened block/file " << blockname); @@ -335,7 +338,13 @@ StatusCode EventSelectorByteStream::createContext(IEvtSelector::Context*& it) co } StatusCode EventSelectorByteStream::next(IEvtSelector::Context& it) const { - static int n_bad_events = 0; // cross loop counter of bad events + lock_t lock (m_mutex); + return nextImpl (it, lock); +} +StatusCode EventSelectorByteStream::nextImpl(IEvtSelector::Context& it, + lock_t& lock) const +{ + static std::atomic<int> n_bad_events = 0; // cross loop counter of bad events // Check if this is an athenaMP client process if (!m_eventStreamingTool.empty() && m_eventStreamingTool->isClient()) { void* source = 0; @@ -362,7 +371,7 @@ StatusCode EventSelectorByteStream::next(IEvtSelector::Context& it) const { // Find an event to return for (;;) { bool badEvent{}; - StatusCode sc = nextHandleFileTransition(it); + StatusCode sc = nextHandleFileTransitionImpl(it, lock); if (sc.isRecoverable()) { badEvent = true; } else if (sc.isFailure()) { @@ -374,9 +383,9 @@ StatusCode EventSelectorByteStream::next(IEvtSelector::Context& it) const { // check bad event flag and handle as configured if (badEvent) { - ++n_bad_events; - ATH_MSG_INFO("Bad event encountered, current count at " << n_bad_events); - bool toomany = (m_maxBadEvts >= 0 && n_bad_events > m_maxBadEvts); + int nbad = ++n_bad_events; + ATH_MSG_INFO("Bad event encountered, current count at " << nbad); + bool toomany = (m_maxBadEvts >= 0 && nbad > m_maxBadEvts); if (toomany) {ATH_MSG_FATAL("too many bad events ");} if (!m_procBadEvent || toomany) { // End of file @@ -396,7 +405,7 @@ StatusCode EventSelectorByteStream::next(IEvtSelector::Context& it) const { (m_skipEventSequence.empty() || m_NumEvents != m_skipEventSequence.front()) ) { StatusCode status(StatusCode::SUCCESS); // Build event info attribute list - if (recordAttributeList().isFailure()) ATH_MSG_WARNING("Unable to build event info att list"); + if (recordAttributeListImpl(lock).isFailure()) ATH_MSG_WARNING("Unable to build event info att list"); for (std::vector<ToolHandle<IAthenaSelectorTool> >::const_iterator iter = m_helperTools.begin(), last = m_helperTools.end(); iter != last; iter++) { StatusCode toolStatus = (*iter)->postNext(); @@ -428,10 +437,10 @@ StatusCode EventSelectorByteStream::next(IEvtSelector::Context& it) const { catch (const ByteStreamExceptions::badFragmentData&) { ATH_MSG_ERROR("badFragment data encountered"); - ++n_bad_events; - ATH_MSG_INFO("Bad event encountered, current count at " << n_bad_events); + int nbad = ++n_bad_events; + ATH_MSG_INFO("Bad event encountered, current count at " << nbad); - bool toomany = (m_maxBadEvts >= 0 && n_bad_events > m_maxBadEvts); + bool toomany = (m_maxBadEvts >= 0 && nbad > m_maxBadEvts); if (toomany) {ATH_MSG_FATAL("too many bad events ");} if (!m_procBadEvent || toomany) { // End of file @@ -465,13 +474,14 @@ StatusCode EventSelectorByteStream::next(IEvtSelector::Context& it) const { //________________________________________________________________________________ StatusCode EventSelectorByteStream::next(IEvtSelector::Context& ctxt, int jump) const { + lock_t lock (m_mutex); if (jump > 0) { if ( m_NumEvents+jump != m_skipEvents.value()) { // Save initial event count unsigned int cntr = m_NumEvents; // In case NumEvents increments multiple times in a single next call while (m_NumEvents+1 <= cntr + jump) { - if (!next(ctxt).isSuccess()) { + if (!nextImpl(ctxt, lock).isSuccess()) { return(StatusCode::FAILURE); } } @@ -487,14 +497,20 @@ StatusCode EventSelectorByteStream::next(IEvtSelector::Context& ctxt, int jump) //________________________________________________________________________________ StatusCode EventSelectorByteStream::nextHandleFileTransition(IEvtSelector::Context& it) const +{ + lock_t lock (m_mutex); + return nextHandleFileTransitionImpl (it, lock); +} +StatusCode EventSelectorByteStream::nextHandleFileTransitionImpl(IEvtSelector::Context& it, + lock_t& lock) const { const RawEvent* pre{}; bool badEvent{}; // if event source not ready from init, try next file if (m_filebased && !m_eventSource->ready()) { // next file - this->nextFile(); - if (this->openNewRun().isFailure()) { + this->nextFile(lock); + if (this->openNewRun(lock).isFailure()) { ATH_MSG_DEBUG("Event source found no more valid files left in input list"); m_NumEvents = -1; return StatusCode::FAILURE; @@ -537,7 +553,13 @@ StatusCode EventSelectorByteStream::nextHandleFileTransition(IEvtSelector::Conte } //________________________________________________________________________________ -StatusCode EventSelectorByteStream::previous(IEvtSelector::Context& /*ctxt*/) const { +StatusCode EventSelectorByteStream::previous(IEvtSelector::Context& ctxt) const +{ + lock_t lock (m_mutex); + return previousImpl (ctxt, lock); +} +StatusCode EventSelectorByteStream::previousImpl(IEvtSelector::Context& /*ctxt*/, + lock_t& /*lock*/) const { ATH_MSG_DEBUG(" ... previous"); const RawEvent* pre = 0; bool badEvent(false); @@ -593,9 +615,10 @@ StatusCode EventSelectorByteStream::previous(IEvtSelector::Context& /*ctxt*/) co } //________________________________________________________________________________ StatusCode EventSelectorByteStream::previous(IEvtSelector::Context& ctxt, int jump) const { + lock_t lock (m_mutex); if (jump > 0) { for (int i = 0; i < jump; i++) { - if (!previous(ctxt).isSuccess()) { + if (!previousImpl(ctxt, lock).isSuccess()) { return(StatusCode::FAILURE); } } @@ -624,13 +647,14 @@ StatusCode EventSelectorByteStream::resetCriteria(const std::string& /*criteria* //__________________________________________________________________________ StatusCode EventSelectorByteStream::seek(Context& it, int evtNum) const { + lock_t lock (m_mutex); // Check that input is seekable if (!m_filebased) { ATH_MSG_ERROR("Input not seekable, choose different input svc"); return StatusCode::FAILURE; } // find the file index with that event - long fileNum = findEvent(evtNum); + long fileNum = findEvent(evtNum, lock); if (fileNum == -1 && evtNum >= m_firstEvt[m_fileCount] && evtNum < m_NumEvents) { fileNum = m_fileCount; } @@ -676,6 +700,11 @@ StatusCode EventSelectorByteStream::seek(Context& it, int evtNum) const { } StatusCode EventSelectorByteStream::recordAttributeList() const +{ + lock_t lock (m_mutex); + return recordAttributeListImpl (lock); +} +StatusCode EventSelectorByteStream::recordAttributeListImpl(lock_t& lock) const { std::string listName("EventInfoAtts"); @@ -696,7 +725,7 @@ StatusCode EventSelectorByteStream::recordAttributeList() const auto attrList = std::make_unique<AthenaAttributeList>(*spec); // fill the attr list - ATH_CHECK(fillAttributeList(attrList.get(), "", false)); + ATH_CHECK(fillAttributeListImpl(attrList.get(), "", false, lock)); // put result in event store if (eventStore()->record(std::move(attrList), listName).isFailure()) { @@ -706,7 +735,13 @@ StatusCode EventSelectorByteStream::recordAttributeList() const return StatusCode::SUCCESS; } -StatusCode EventSelectorByteStream::fillAttributeList(coral::AttributeList *attrList, const std::string &suffix, bool /* copySource */) const +StatusCode EventSelectorByteStream::fillAttributeList(coral::AttributeList *attrList, const std::string &suffix, bool copySource) const +{ + lock_t lock (m_mutex); + return fillAttributeListImpl (attrList, suffix, copySource, lock); +} +StatusCode EventSelectorByteStream::fillAttributeListImpl(coral::AttributeList *attrList, const std::string &suffix, bool /* copySource */, + lock_t& /*lock*/) const { attrList->extend("RunNumber" + suffix, "unsigned int"); attrList->extend("EventNumber" + suffix, "unsigned long long"); @@ -797,7 +832,7 @@ StatusCode EventSelectorByteStream::fillAttributeList(coral::AttributeList *attr } //__________________________________________________________________________ -int EventSelectorByteStream::findEvent(int evtNum) const { +int EventSelectorByteStream::findEvent(int evtNum, lock_t& /*lock*/) const { // Loop over file event counts //ATH_MSG_INFO("try to find evnum = " << evtNum << " in " << m_numEvt.size() << " files"); for (size_t i = 0; i < m_inputCollectionsProp.value().size(); i++) { @@ -837,6 +872,7 @@ int EventSelectorByteStream::findEvent(int evtNum) const { //__________________________________________________________________________ int EventSelectorByteStream::curEvent (const Context& /*it*/) const { // event counter in IEvtSelectorSeek interface + lock_t lock (m_mutex); return int(m_NumEvents); } @@ -847,6 +883,7 @@ int EventSelectorByteStream::size (Context& /*it*/) const { //________________________________________________________________________________ StatusCode EventSelectorByteStream::makeServer(int /*num*/) { + lock_t lock (m_mutex); if (m_eventStreamingTool.empty()) { return(StatusCode::FAILURE); } @@ -855,6 +892,7 @@ StatusCode EventSelectorByteStream::makeServer(int /*num*/) { //________________________________________________________________________________ StatusCode EventSelectorByteStream::makeClient(int /*num*/) { + lock_t lock (m_mutex); if (m_eventStreamingTool.empty()) { return(StatusCode::FAILURE); } @@ -863,6 +901,7 @@ StatusCode EventSelectorByteStream::makeClient(int /*num*/) { //________________________________________________________________________________ StatusCode EventSelectorByteStream::share(int evtNum) { + lock_t lock (m_mutex); if (m_eventStreamingTool.empty()) { return(StatusCode::FAILURE); } @@ -879,6 +918,7 @@ StatusCode EventSelectorByteStream::share(int evtNum) { //________________________________________________________________________________ StatusCode EventSelectorByteStream::readEvent(int maxevt) { + lock_t lock (m_mutex); if (m_eventStreamingTool.empty()) { ATH_MSG_ERROR("No AthenaSharedMemoryTool configured for readEvent()"); return(StatusCode::FAILURE); @@ -886,7 +926,7 @@ StatusCode EventSelectorByteStream::readEvent(int maxevt) { ATH_MSG_VERBOSE("Called read Event " << maxevt); for (int i = 0; i < maxevt || maxevt == -1; ++i) { const RawEvent* pre = 0; - if (this->next(*m_beginIter).isSuccess()) { + if (this->nextImpl(*m_beginIter, lock).isSuccess()) { pre = m_eventSource->currentEvent(); } else { if (m_NumEvents == -1) { @@ -962,6 +1002,7 @@ StatusCode EventSelectorByteStream::queryInterface(const InterfaceID& riid, void } //________________________________________________________________________________ StatusCode EventSelectorByteStream::io_reinit() { + lock_t lock (m_mutex); ATH_MSG_INFO("I/O reinitialization..."); ServiceHandle<IIoComponentMgr> iomgr("IoComponentMgr", name()); if (!iomgr.retrieve().isSuccess()) { @@ -1003,12 +1044,13 @@ StatusCode EventSelectorByteStream::io_reinit() { m_beginFileFired = false; m_inputCollectionsProp = inputCollections; - return(this->reinit()); + return(this->reinit(lock)); } //__________________________________________________________________________ void EventSelectorByteStream::syncEventCount(int count) const { + lock_t lock (m_mutex); m_NumEvents = count; } diff --git a/Event/ByteStreamCnvSvc/src/EventSelectorByteStream.h b/Event/ByteStreamCnvSvc/src/EventSelectorByteStream.h index c8e583015f15dc81b76d6148838dc2c192ebcfcb..922d42e6e470ddd52ec7757785da515fb9f37e13 100644 --- a/Event/ByteStreamCnvSvc/src/EventSelectorByteStream.h +++ b/Event/ByteStreamCnvSvc/src/EventSelectorByteStream.h @@ -29,6 +29,8 @@ #include "AthenaBaseComps/AthService.h" #include "ByteStreamData/RawEvent.h" +#include "CxxUtils/checker_macros.h" +#include <mutex> // Forward declarations. class ISvcLocator; @@ -124,6 +126,9 @@ public: virtual StatusCode io_reinit() override; protected: + typedef std::mutex mutex_t; + typedef std::lock_guard<mutex_t> lock_t; + //------------------------------------------------- // ISecondaryEventSelector /// Handle file transition at the next iteration @@ -138,45 +143,59 @@ protected: virtual bool disconnectIfFinished(const SG::SourceID &fid) const override; private: // internal member functions + StatusCode nextImpl(Context& it, lock_t& lock) const; + StatusCode previousImpl(Context& it, lock_t& lock) const; + StatusCode nextHandleFileTransitionImpl(IEvtSelector::Context& it, + lock_t& lock) const; + StatusCode recordAttributeListImpl(std::lock_guard<std::mutex>& lock) const; + StatusCode fillAttributeListImpl(coral::AttributeList *attrList, const std::string &suffix, bool copySource, + lock_t& lock) const; + /// Reinitialize the service when a @c fork() occured/was-issued - StatusCode reinit(); - StatusCode openNewRun() const; - void nextFile() const; + StatusCode reinit(lock_t& lock); + StatusCode openNewRun(lock_t& lock) const; + void nextFile(lock_t& lock) const; /// Search for event with number evtNum. - int findEvent(int evtNum) const; + int findEvent(int evtNum, lock_t& lock) const; StoreGateSvc* eventStore() const; private: // properties + // FIXME: A Gaudi EventSelector is not meant to have mutable state. + // Any needed state is meant to be kept in the Context object. + // So the mutable members here should instead be kept in + // EventContextByteStream. However, making that work would + // require redesigning other Athena interfaces. So for now, + // just add a mutex to protect access to them. + mutable mutex_t m_mutex; + mutable int m_fileCount ATLAS_THREAD_SAFE = 0; //!< number of files to process. + mutable std::vector<int> m_numEvt ATLAS_THREAD_SAFE; + mutable std::vector<int> m_firstEvt ATLAS_THREAD_SAFE; + mutable std::vector<std::string>::const_iterator m_inputCollectionsIterator ATLAS_THREAD_SAFE; + mutable std::vector<long> m_skipEventSequence ATLAS_THREAD_SAFE; + mutable long m_NumEvents ATLAS_THREAD_SAFE = 0; // Number of Events read so far. + mutable ToolHandle<IAthenaIPCTool> m_eventStreamingTool ATLAS_THREAD_SAFE {this, "SharedMemoryTool", "", ""}; + /// IsSecondary, know if this is an instance of secondary event selector Gaudi::Property<bool> m_isSecondary{this, "IsSecondary", false, ""}; Gaudi::Property<std::string> m_eventSourceName{this, "ByteStreamInputSvc", "", ""}; Gaudi::Property<bool> m_procBadEvent{this, "ProcessBadEvent", false, ""}; //!< process bad events, which fail check_tree(). Gaudi::Property<int> m_maxBadEvts{this, "MaxBadEvents", -1, ""}; //!< number of bad events allowed before quitting. - mutable int m_fileCount{}; //!< number of files to process. - - mutable std::vector<int> m_numEvt; - mutable std::vector<int> m_firstEvt; EventContextByteStream* m_beginIter{}; EventContextByteStream* m_endIter{}; ByteStreamInputSvc* m_eventSource{}; Gaudi::Property<std::vector<std::string>> m_inputCollectionsProp{this, "Input", {}, ""}; - mutable std::vector<std::string>::const_iterator m_inputCollectionsIterator; void inputCollectionsHandler(Gaudi::Details::PropertyBase&); ServiceHandle<IIncidentSvc> m_incidentSvc{this, "IncidentSvc", "IncidentSvc", ""}; ServiceHandle<ActiveStoreSvc> m_activeStoreSvc; Gaudi::Property<long> m_skipEvents{this, "SkipEvents", 0, ""}; // Number of events to skip at the beginning Gaudi::Property<std::vector<long>> m_skipEventSequenceProp{this, "SkipEventSequence", {}, ""}; - mutable std::vector<long> m_skipEventSequence; bool m_firstFileFired{}; bool m_beginFileFired{}; bool m_inputCollectionsFromIS{}; - mutable long m_NumEvents{}; // Number of Events read so far. - - mutable ToolHandle<IAthenaIPCTool> m_eventStreamingTool{this, "SharedMemoryTool", "", ""}; /// HelperTools, vector of names of AlgTools that are executed by the EventSelector ToolHandleArray<IAthenaSelectorTool> m_helperTools{this}; diff --git a/Event/ByteStreamCnvSvc/test/AtlCopyBSEvent.cxx b/Event/ByteStreamCnvSvc/test/AtlCopyBSEvent.cxx index 70fed2d77daa49f58e37d2eb8bd32a987ca410f0..4af0f739421d2ca3a86ba9227f11c7d5b0b2c560 100644 --- a/Event/ByteStreamCnvSvc/test/AtlCopyBSEvent.cxx +++ b/Event/ByteStreamCnvSvc/test/AtlCopyBSEvent.cxx @@ -1,5 +1,5 @@ /* - Copyright (C) 2002-2017 CERN for the benefit of the ATLAS collaboration + Copyright (C) 2002-2020 CERN for the benefit of the ATLAS collaboration */ /** @@ -40,9 +40,11 @@ #include "FileCatalog/IFileCatalog.h" +#include "CxxUtils/checker_macros.h" + void eventLoop(DataReader*, EventStorage::DataWriter*, unsigned&, const std::vector<uint32_t>*, uint32_t, bool, bool, bool, const std::vector<long long int>* = 0); -int main (int argc, char *argv[]) { +int main ATLAS_NOT_THREAD_SAFE (int argc, char *argv[]) { using namespace eformat; //Interpret arguments diff --git a/Event/ByteStreamCnvSvc/test/AtlFindBSEvent.cxx b/Event/ByteStreamCnvSvc/test/AtlFindBSEvent.cxx index a4f30216f9841a757cb29387e353bb5d5e1e9282..49257d6eea070a31dd03188854bdb65992168a70 100644 --- a/Event/ByteStreamCnvSvc/test/AtlFindBSEvent.cxx +++ b/Event/ByteStreamCnvSvc/test/AtlFindBSEvent.cxx @@ -1,5 +1,5 @@ /* - Copyright (C) 2002-2017 CERN for the benefit of the ATLAS collaboration + Copyright (C) 2002-2020 CERN for the benefit of the ATLAS collaboration */ /** @@ -34,7 +34,9 @@ #include "EventStorage/pickDataReader.h" #include <time.h> -int main (int argc, char *argv[]) +#include "CxxUtils/checker_macros.h" + +int main ATLAS_NOT_THREAD_SAFE (int argc, char *argv[]) { using namespace eformat; diff --git a/Event/ByteStreamCnvSvc/test/AtlListBSEvents.cxx b/Event/ByteStreamCnvSvc/test/AtlListBSEvents.cxx index f38a1d04cec56e58fef2fede492a487ba86189de..4d7e8db67ebbcb92fb9e3bccd9df74111c6589cb 100644 --- a/Event/ByteStreamCnvSvc/test/AtlListBSEvents.cxx +++ b/Event/ByteStreamCnvSvc/test/AtlListBSEvents.cxx @@ -1,5 +1,5 @@ /* - Copyright (C) 2002-2017 CERN for the benefit of the ATLAS collaboration + Copyright (C) 2002-2020 CERN for the benefit of the ATLAS collaboration */ /** @@ -35,7 +35,9 @@ #include "EventStorage/pickDataReader.h" #include <time.h> -int main (int argc, char *argv[]) +#include "CxxUtils/checker_macros.h" + +int main ATLAS_NOT_THREAD_SAFE (int argc, char *argv[]) { using namespace eformat; diff --git a/Event/ByteStreamCnvSvcBase/ByteStreamCnvSvcBase/ByteStreamCnvSvcBase.h b/Event/ByteStreamCnvSvcBase/ByteStreamCnvSvcBase/ByteStreamCnvSvcBase.h index eb20d2b92eac63509321d235ca7aec8543050fb0..d05293b33992bdd8b62ee0ce127a718485d2be50 100755 --- a/Event/ByteStreamCnvSvcBase/ByteStreamCnvSvcBase/ByteStreamCnvSvcBase.h +++ b/Event/ByteStreamCnvSvcBase/ByteStreamCnvSvcBase/ByteStreamCnvSvcBase.h @@ -1,5 +1,5 @@ /* - Copyright (C) 2002-2019 CERN for the benefit of the ATLAS collaboration + Copyright (C) 2002-2020 CERN for the benefit of the ATLAS collaboration */ #ifndef BYTESTREAMCNVSVCBASE_BYTESTREAMCNVSVCBASE_H @@ -37,15 +37,10 @@ public: /// Checks whether an IOpaqueAddress is a GenericAddress virtual StatusCode updateServiceState(IOpaqueAddress* pAddress) override; - /// Implementation of IByteStreamEventAccess: Get RawEvent - virtual RawEventWrite* getRawEvent() override { return m_rawEventWrite; } - /// Implementation of IIncidentListener: Handle for EndEvent incidence virtual void handle(const Incident&) override; protected: // data - RawEventWrite* m_rawEventWrite; - std::vector<std::string> m_initCnvs; std::vector<std::string> m_ROD2ROBmap; }; diff --git a/Event/ByteStreamCnvSvcBase/src/ByteStreamCnvSvcBase.cxx b/Event/ByteStreamCnvSvcBase/src/ByteStreamCnvSvcBase.cxx index c231f5d85354b884e2e4fbded5c1a8f2ae445fd9..cadfc55bfa28589e4dd71b5c706f60840276a736 100755 --- a/Event/ByteStreamCnvSvcBase/src/ByteStreamCnvSvcBase.cxx +++ b/Event/ByteStreamCnvSvcBase/src/ByteStreamCnvSvcBase.cxx @@ -1,5 +1,5 @@ /* - Copyright (C) 2002-2019 CERN for the benefit of the ATLAS collaboration + Copyright (C) 2002-2020 CERN for the benefit of the ATLAS collaboration */ #include "ByteStreamCnvSvcBase/ByteStreamCnvSvcBase.h" @@ -16,8 +16,8 @@ //______________________________________________________________________________ ByteStreamCnvSvcBase::ByteStreamCnvSvcBase(const std::string& name, ISvcLocator* pSvcLocator) : - ::AthCnvSvc(name, pSvcLocator, ByteStreamAddress::storageType()), - m_rawEventWrite(0) { + ::AthCnvSvc(name, pSvcLocator, ByteStreamAddress::storageType()) +{ declareProperty("InitCnvs", m_initCnvs); // This property is used by Tile BS converter, not by this class. declareProperty("ROD2ROBmap", m_ROD2ROBmap); diff --git a/Event/ByteStreamCnvSvcBase/src/components/ByteStreamCnvSvcBase_entries.cxx b/Event/ByteStreamCnvSvcBase/src/components/ByteStreamCnvSvcBase_entries.cxx index 5d0e6d9f0c994445bca57feeb1966b55c802f266..dec5cd347e92e9138602683e2f03dd814e196884 100644 --- a/Event/ByteStreamCnvSvcBase/src/components/ByteStreamCnvSvcBase_entries.cxx +++ b/Event/ByteStreamCnvSvcBase/src/components/ByteStreamCnvSvcBase_entries.cxx @@ -1,9 +1,7 @@ -#include "ByteStreamCnvSvcBase/ByteStreamCnvSvcBase.h" #include "ByteStreamCnvSvcBase/ByteStreamAddressProviderSvc.h" #include "ByteStreamCnvSvcBase/ROBDataProviderSvc.h" #include "../ROBDataProviderMTTest.h" -DECLARE_COMPONENT( ByteStreamCnvSvcBase ) DECLARE_COMPONENT( ByteStreamAddressProviderSvc ) DECLARE_COMPONENT( ROBDataProviderSvc ) DECLARE_COMPONENT( ROBDataProviderMTTest ) diff --git a/HLT/Event/TrigByteStreamCnvSvc/src/TrigByteStreamCnvSvc.cxx b/HLT/Event/TrigByteStreamCnvSvc/src/TrigByteStreamCnvSvc.cxx index ffeda04b3f7de388f9eaf845a4c91492b9bc529f..56b749809ac314e6b3f8f5b91bb09ec3bd45c899 100644 --- a/HLT/Event/TrigByteStreamCnvSvc/src/TrigByteStreamCnvSvc.cxx +++ b/HLT/Event/TrigByteStreamCnvSvc/src/TrigByteStreamCnvSvc.cxx @@ -136,15 +136,15 @@ StatusCode TrigByteStreamCnvSvc::connectOutput(const std::string& /*outputFile*/ ATH_MSG_DEBUG("Creating new RawEventWrite for EventContext = " << *eventContext); // Create a new RawEventWrite and copy the header from the input RawEvent - m_rawEventWrite = new RawEventWrite; + RawEventWrite* re = setRawEvent (std::make_unique<RawEventWrite>()); const uint32_t* inputRawEvent = m_robDataProviderSvc->getEvent(*eventContext)->start(); if (!inputRawEvent) { ATH_MSG_ERROR("Input RawEvent is nullptr, cannot create output"); return StatusCode::FAILURE; } - m_rawEventWrite->copy_header(inputRawEvent); + re->copy_header(inputRawEvent); - ATH_MSG_VERBOSE("Created RawEventWrite pointer = " << m_rawEventWrite); + ATH_MSG_VERBOSE("Created RawEventWrite pointer = " << re); ATH_MSG_VERBOSE("end of " << __FUNCTION__); return StatusCode::SUCCESS; @@ -168,11 +168,13 @@ StatusCode TrigByteStreamCnvSvc::commitOutput(const std::string& /*outputFile*/, if (msgLvl(MSG::DEBUG)) printRawEvent(); + RawEventWrite* re = getRawEvent(); + // Serialise the output FullEventFragment std::unique_ptr<uint32_t[]> rawEventPtr; try { - const eformat::write::node_t* top = m_rawEventWrite->bind(); - uint32_t rawEventSize = m_rawEventWrite->size_word(); + const eformat::write::node_t* top = re->bind(); + uint32_t rawEventSize = re->size_word(); rawEventPtr = std::make_unique<uint32_t[]>(rawEventSize); uint32_t copiedSize = eformat::write::copy(*top,rawEventPtr.get(),rawEventSize); if(copiedSize!=rawEventSize) { @@ -213,8 +215,7 @@ StatusCode TrigByteStreamCnvSvc::commitOutput(const std::string& /*outputFile*/, result = StatusCode::FAILURE; } - delete m_rawEventWrite; - m_rawEventWrite = nullptr; + setRawEvent (std::unique_ptr<RawEventWrite>()); ATH_MSG_VERBOSE("end of " << __FUNCTION__); return result; @@ -346,44 +347,46 @@ void TrigByteStreamCnvSvc::monitorRawEvent(const std::unique_ptr<uint32_t[]>& ra // ============================================================================= void TrigByteStreamCnvSvc::printRawEvent() { - if (!m_rawEventWrite) { + RawEventWrite* re = getRawEvent(); + + if (!re) { ATH_MSG_WARNING("RawEventWrite pointer is null"); return; } std::ostringstream ss; ss << "Dumping header of the FullEventFragment with HLT result:" << std::endl; ss << "--> status = " - << printNWordsHex<uint32_t>(m_rawEventWrite->nstatus(), m_rawEventWrite->status()) + << printNWordsHex<uint32_t>(re->nstatus(), re->status()) << std::endl; - ss << "--> source_id = " << printWordHex<uint32_t>(m_rawEventWrite->source_id()) << std::endl; - ss << "--> checksum_type = " << printWordHex<uint32_t>(m_rawEventWrite->checksum_type()) << std::endl; - ss << "--> compression_type = " << printWordHex<uint32_t>(m_rawEventWrite->compression_type()) << std::endl; - ss << "--> compression_level = " << m_rawEventWrite->compression_level() << std::endl; - ss << "--> bc_time_seconds = " << m_rawEventWrite->bc_time_seconds() << std::endl; - ss << "--> bc_time_nanoseconds = " << m_rawEventWrite->bc_time_nanoseconds() << std::endl; - ss << "--> global_id = " << m_rawEventWrite->global_id() << std::endl; - ss << "--> run_type = " << m_rawEventWrite->run_type() << std::endl; - ss << "--> run_no = " << m_rawEventWrite->run_no() << std::endl; - ss << "--> lumi_block = " << m_rawEventWrite->lumi_block() << std::endl; - ss << "--> lvl1_id = " << m_rawEventWrite->lvl1_id() << std::endl; - ss << "--> bc_id = " << m_rawEventWrite->bc_id() << std::endl; - ss << "--> lvl1_trigger_type = " << printWordHex<uint8_t>(m_rawEventWrite->lvl1_trigger_type()) << std::endl; + ss << "--> source_id = " << printWordHex<uint32_t>(re->source_id()) << std::endl; + ss << "--> checksum_type = " << printWordHex<uint32_t>(re->checksum_type()) << std::endl; + ss << "--> compression_type = " << printWordHex<uint32_t>(re->compression_type()) << std::endl; + ss << "--> compression_level = " << re->compression_level() << std::endl; + ss << "--> bc_time_seconds = " << re->bc_time_seconds() << std::endl; + ss << "--> bc_time_nanoseconds = " << re->bc_time_nanoseconds() << std::endl; + ss << "--> global_id = " << re->global_id() << std::endl; + ss << "--> run_type = " << re->run_type() << std::endl; + ss << "--> run_no = " << re->run_no() << std::endl; + ss << "--> lumi_block = " << re->lumi_block() << std::endl; + ss << "--> lvl1_id = " << re->lvl1_id() << std::endl; + ss << "--> bc_id = " << re->bc_id() << std::endl; + ss << "--> lvl1_trigger_type = " << printWordHex<uint8_t>(re->lvl1_trigger_type()) << std::endl; ss << "--> lvl1_trigger_info = " - << printNWordsHex<uint32_t>(m_rawEventWrite->nlvl1_trigger_info(), m_rawEventWrite->lvl1_trigger_info()) + << printNWordsHex<uint32_t>(re->nlvl1_trigger_info(), re->lvl1_trigger_info()) << std::endl; ss << "--> lvl2_trigger_info = " - << printNWordsHex<uint32_t>(m_rawEventWrite->nlvl2_trigger_info(), m_rawEventWrite->lvl2_trigger_info()) + << printNWordsHex<uint32_t>(re->nlvl2_trigger_info(), re->lvl2_trigger_info()) << std::endl; ss << "--> event_filter_info = " - << printNWordsHex<uint32_t>(m_rawEventWrite->nevent_filter_info(), m_rawEventWrite->event_filter_info()) + << printNWordsHex<uint32_t>(re->nevent_filter_info(), re->event_filter_info()) << std::endl; ss << "--> hlt_info = " - << printNWordsHex<uint32_t>(m_rawEventWrite->nhlt_info(), m_rawEventWrite->hlt_info()) + << printNWordsHex<uint32_t>(re->nhlt_info(), re->hlt_info()) << std::endl; std::vector<eformat::helper::StreamTag> stream_tags; try { - eformat::helper::decode(m_rawEventWrite->nstream_tag(), m_rawEventWrite->stream_tag(), stream_tags); + eformat::helper::decode(re->nstream_tag(), re->stream_tag(), stream_tags); } catch (const std::exception& ex) { ATH_MSG_ERROR("StreamTag decoding failed, caught an unexpected std::exception " << ex.what());