diff --git a/AtlasTest/TestTools/share/post.sh b/AtlasTest/TestTools/share/post.sh index ecb60552395b5a70d502d389c81d3a1497ee69d8..c457699ac1f8113f5728ab19b0d6fba62bd766dc 100755 --- a/AtlasTest/TestTools/share/post.sh +++ b/AtlasTest/TestTools/share/post.sh @@ -260,6 +260,9 @@ PP="$PP"'|filling address for' # MetaInputLoader addresses and SIDs PP="$PP"'|MetaInputLoader *INFO ( address|.*is still valid for|.*and sid)' +# Message useless for judging test success +PP="$PP"'|^FileMgr +DEBUG Successfully registered handler for tech' + ########################################### END ##################################################### # Always use default ignore list diff --git a/Control/AthenaKernel/AthenaKernel/IMetaDataSvc.h b/Control/AthenaKernel/AthenaKernel/IMetaDataSvc.h new file mode 100644 index 0000000000000000000000000000000000000000..ec82de1182d47ea34d36ea20ad36c5f245305acf --- /dev/null +++ b/Control/AthenaKernel/AthenaKernel/IMetaDataSvc.h @@ -0,0 +1,116 @@ +/* + Copyright (C) 2002-2020 CERN for the benefit of the ATLAS collaboration +*/ + +#ifndef ATHENAKERNEL_IMETADATASVC_H +#define ATHENAKERNEL_IMETADATASVC_H + +/** @file IMetaDataSvc.h + * @brief This file contains the class definition for the IMetaDataSvc class. + * @author Marcin Nowak + **/ + +#include "GaudiKernel/INamedInterface.h" +#include "AthenaKernel/MetaCont.h" + +#include <string> +#include <mutex> + +/** @class IMetaDataSvc + * @brief This class provides the interface for MetaDataSvc + **/ +class IMetaDataSvc : virtual public ::INamedInterface { + +public: // Non-static members + + + /// used by AthenaPoolCnvSvc + virtual StatusCode shmProxy(const std::string& filename) = 0; + + + // ======= Methods for handling metadata objects stored in MetaContainers (EventService) + template <typename T, class TKEY> + T* tryRetrieve (const TKEY& key) const; + + template <typename T, class TKEY> + const T* tryConstRetrieve(const TKEY& key) const; + + /// Record an object with a key. + template <typename T, typename TKEY> + StatusCode record(T* p2BRegistered, const TKEY& key); + + /// Remove object with this type+key + template <typename T, typename TKEY> + StatusCode remove(const TKEY& key, bool ignoreIfAbsent=false); + + /// The output MetaData Store + virtual StoreGateSvc* outputDataStore() const = 0; + + /// rangeID for the current EventContext - used to index MetaContainrs - + virtual const std::string currentRangeID() const = 0; + + /// Gaudi boilerplate + static const InterfaceID& interfaceID(); + +private: // Data + std::mutex m_mutex; +}; + + +inline const InterfaceID& IMetaDataSvc::interfaceID() { + static const InterfaceID IID("IMetaDataSvc", 1, 0); + return(IID); +} + +/** + * @brief Retrieve an object of type @c T from MetaDataStore + * Return 0 if not found. Don't print any WARNINGs + * @param key The key to use for the lookup. + **/ +template <typename T, class TKEY> +T* IMetaDataSvc::tryRetrieve (const TKEY& key) const +{ + const MetaCont<T>* container = outputDataStore()->tryRetrieve< MetaCont<T> >(key); + if( container ) { + return container->get( currentRangeID() ); + } + return nullptr; +} + +template <typename T, class TKEY> +const T* IMetaDataSvc::tryConstRetrieve (const TKEY& key) const +{ + const MetaCont<T>* container = outputDataStore()->tryRetrieve< MetaCont<T> >(key); + if( container ) { + return container->get( currentRangeID() ); + } + return nullptr; +} + +template <typename T, typename TKEY> +StatusCode IMetaDataSvc::record(T* pObject, const TKEY& key) +{ + std::lock_guard lock(m_mutex); + MetaCont<T>* container = outputDataStore()->tryRetrieve< MetaCont<T> >(key); + if( !container ) { + auto cont_uptr = std::make_unique< MetaCont<T> >(); + if( cont_uptr->insert( currentRangeID() , pObject) ) { + return outputDataStore()->record( std::move(cont_uptr), key ); + } + return StatusCode::FAILURE; + } + if( container->insert( currentRangeID() , pObject) ) return StatusCode::SUCCESS; + return StatusCode::FAILURE; +} + +template <typename T, class TKEY> +StatusCode IMetaDataSvc::remove(const TKEY& key, bool ignoreIfAbsent) +{ + std::lock_guard lock(m_mutex); + // change erase to setting nullptr? + MetaCont<T>* container = outputDataStore()->tryRetrieve< MetaCont<T> >(key); + if( container and container->erase( currentRangeID() ) ) return StatusCode::SUCCESS; + return ignoreIfAbsent? StatusCode::SUCCESS : StatusCode::FAILURE; +} + +#endif diff --git a/Control/AthenaKernel/AthenaKernel/IMetadataTransition.h b/Control/AthenaKernel/AthenaKernel/IMetadataTransition.h deleted file mode 100644 index b0e6f49c5bac8a06424a4c171c904af7b07f9001..0000000000000000000000000000000000000000 --- a/Control/AthenaKernel/AthenaKernel/IMetadataTransition.h +++ /dev/null @@ -1,48 +0,0 @@ -/* - Copyright (C) 2002-2019 CERN for the benefit of the ATLAS collaboration -*/ - -#ifndef ATHENAKERNEL_IMETADATATRANSITION_H -#define ATHENAKERNEL_IMETADATATRANSITION_H - -/** @file IMetadataTransition.h - * @brief This file contains the class definition for the IMetadataTransition class. - * @author Peter van Gemmeren <gemmeren@anl.gov> - * $Id: IMetadataTransition.h,v 1.2 2007-07-30 19:06:50 gemmeren Exp $ - **/ - -#include "GaudiKernel/INamedInterface.h" -#include "GaudiKernel/Incident.h" -#include <string> - -/** @class IMetadataTransition - * @brief This class provides the interface for MetadataTransitions. - **/ -class IMetadataTransition : virtual public ::INamedInterface { - -public: // Non-static members - - /// Function called when a new input file is opened - virtual StatusCode newMetadataSource(const Incident&) = 0; - - /// Function called when the currently open input file got completely - /// processed - virtual StatusCode retireMetadataSource(const Incident&) = 0; - - /// Function called when the tool should write out its metadata - virtual StatusCode prepareOutput() = 0; - - virtual StatusCode shmProxy(const std::string& filename) = 0; - - /// Gaudi boilerplate - static const InterfaceID& interfaceID(); -private: // Data -}; - -inline const InterfaceID& IMetadataTransition::interfaceID() { - static const InterfaceID IID("IMetadataTransition", 1, 0); - return(IID); -} - -#endif - diff --git a/Control/AthenaKernel/AthenaKernel/MetaCont.h b/Control/AthenaKernel/AthenaKernel/MetaCont.h index ff6fea20ef1e298d28bfbacb356090e7bb7af822..e7618ae8b606bc54663ff8c18511bcaa1fe1df11 100644 --- a/Control/AthenaKernel/AthenaKernel/MetaCont.h +++ b/Control/AthenaKernel/AthenaKernel/MetaCont.h @@ -28,6 +28,7 @@ class MetaContBase { virtual ~MetaContBase(){}; virtual bool insert(const SourceID& sid, void* obj) = 0; + virtual size_t erase(const SourceID& sid) = 0; virtual int entries() const { return 0; } virtual bool valid(const SourceID& sid) const = 0; @@ -36,6 +37,8 @@ class MetaContBase { virtual void list(std::ostringstream& stream) const = 0; + virtual void* getAsVoid(const SourceID& sid) const = 0; + private: }; @@ -51,19 +54,25 @@ class MetaCont: public MetaContBase { ~MetaCont(); // Virtual functions - virtual bool insert(const SourceID& sid, void* obj) override; + virtual bool insert(const SourceID& sid, void* obj) override final; + virtual size_t erase(const SourceID& sid) override final; + virtual int entries() const override; - virtual bool valid(const SourceID& sid) const override; + virtual bool valid(const SourceID& sid) const override final; - virtual std::vector<SourceID> sources() const override; + virtual std::vector<SourceID> sources() const override final; - virtual void list(std::ostringstream& stream) const override; + virtual void list(std::ostringstream& stream) const override final; // Non-virtual functions bool insert(const SourceID& sid, T* t); + + /// various Get methods bool find(const SourceID& sid, T*& t) const; - const T* get(const SourceID& sid) const; + T* get(const SourceID& sid) const; + + void* getAsVoid(const SourceID& sid) const override final { return get(sid); } private: @@ -112,6 +121,14 @@ bool MetaCont<T>::insert(const SourceID& sid, void* obj) { /* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */ +template <typename T> +size_t MetaCont<T>::erase(const SourceID& sid) { + std::lock_guard<std::mutex> lock(m_mut); + return m_metaSet.erase(sid); +} + +/* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */ + template <typename T> bool MetaCont<T>::insert(const SourceID& sid, T* t) { std::lock_guard<std::mutex> lock(m_mut); @@ -145,7 +162,7 @@ bool MetaCont<T>::find(const SourceID& sid, T*& t) const { /* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */ template <typename T> -const T* MetaCont<T>::get(const SourceID& sid) const { +T* MetaCont<T>::get(const SourceID& sid) const { std::lock_guard<std::mutex> lock(m_mut); typename MetaContSet::const_iterator itr = m_metaSet.find(sid); diff --git a/Control/AthenaServices/src/AthenaOutputStream.cxx b/Control/AthenaServices/src/AthenaOutputStream.cxx index ec733590e94640da25c49554d0b1f533fd0353a3..cadbc700470df9c716f52d41396c7239ed586085 100644 --- a/Control/AthenaServices/src/AthenaOutputStream.cxx +++ b/Control/AthenaServices/src/AthenaOutputStream.cxx @@ -700,7 +700,8 @@ void AthenaOutputStream::addItemObjects(const SG::FolderItem& item) } else { item_key = item.key(); } - ATH_MSG_DEBUG("addItemObjects(" << item.id() << ",\"" << item.key() << "\") called"); + CLID item_id = item.id(); + ATH_MSG_DEBUG("addItemObjects(" << item_id << ",\"" << item_key << "\") called"); ATH_MSG_DEBUG(" Key:" << item_key ); if( aux_attr.size() ) { ATH_MSG_DEBUG(" Aux Attr:" << aux_attr ); @@ -709,7 +710,7 @@ void AthenaOutputStream::addItemObjects(const SG::FolderItem& item) std::set<std::string> clidKeys; for (SG::IFolder::const_iterator iter = m_decoder->begin(), iterEnd = m_decoder->end(); iter != iterEnd; iter++) { - if (iter->id() == item.id()) { + if (iter->id() == item_id) { clidKeys.insert(iter->key()); } } @@ -727,7 +728,7 @@ void AthenaOutputStream::addItemObjects(const SG::FolderItem& item) for (SG::IFolder::const_iterator iter = m_compressionDecoderHigh->begin(), iterEnd = m_compressionDecoderHigh->end(); iter != iterEnd; iter++) { // First match the IDs for early rejection. - if (iter->id() != item.id()) { + if (iter->id() != item_id) { continue; } // Then find the compression item key and the compression list string @@ -754,7 +755,7 @@ void AthenaOutputStream::addItemObjects(const SG::FolderItem& item) for (SG::IFolder::const_iterator iter = m_compressionDecoderLow->begin(), iterEnd = m_compressionDecoderLow->end(); iter != iterEnd; iter++) { // First match the IDs for early rejection. - if (iter->id() != item.id()) { + if (iter->id() != item_id) { continue; } // Then find the compression item key and the compression list string @@ -790,9 +791,11 @@ void AthenaOutputStream::addItemObjects(const SG::FolderItem& item) } } + // For MetaData objects of type T that are kept in MetaContainers get the MetaCont<T> ID + const CLID remapped_item_id = m_metaDataSvc->remapMetaContCLID( item_id ); SG::ConstProxyIterator iter, end; // Look for the clid in storegate - if (((*m_currentStore)->proxyRange(item.id(), iter, end)).isSuccess()) { + if (((*m_currentStore)->proxyRange(remapped_item_id, iter, end)).isSuccess()) { bool added = false, removed = false; // For item list entry // Check for wildcard within string, i.e. 'xxx*yyy', and save the matching parts @@ -841,27 +844,45 @@ void AthenaOutputStream::addItemObjects(const SG::FolderItem& item) if (keyMatch && !xkeyMatch) { if (m_forceRead && itemProxy->isValid()) { if (nullptr == itemProxy->accessData()) { - ATH_MSG_ERROR(" Could not get data object for id " << item.id() << ",\"" << itemProxy->name()); + ATH_MSG_ERROR(" Could not get data object for id " << remapped_item_id << ",\"" << itemProxy->name()); } } if (nullptr != itemProxy->object()) { if( std::find(m_objects.begin(), m_objects.end(), itemProxy->object()) == m_objects.end() && std::find(m_altObjects.begin(), m_altObjects.end(), itemProxy->object()) == m_altObjects.end() ) { - if (item.exact()) { + if( item_id != remapped_item_id ) { + // For MetaCont<T>: - + // create a temporary DataObject for an entry in the container to pass to CnvSvc + DataBucketBase* dbb = static_cast<DataBucketBase*>( itemProxy->object() ); + const MetaContBase* metaCont = static_cast<MetaContBase*>( dbb->cast( ClassID_traits<MetaContBase>::ID() ) ); + ATH_MSG_INFO("MN: metaCont after cast=" << metaCont ); + if( metaCont ) { + void* obj = metaCont->getAsVoid( m_outSeqSvc->currentRangeID() ); + ATH_MSG_INFO("MN: got object" << obj ); + auto altbucket = std::make_unique<AltDataBucket>( + obj, item_id, *CLIDRegistry::CLIDToTypeinfo(item_id), *itemProxy ); + m_objects.push_back( altbucket.get() ); + m_ownedObjects.push_back( std::move(altbucket) ); + m_altObjects.push_back( itemProxy->object() ); // only for duplicate prevention + } else { + ATH_MSG_ERROR("Failed to retrieve object from MetaCont with key=" << item_key << " for EventRangeID=" << m_outSeqSvc->currentRangeID() ); + return; + } + } else if (item.exact()) { // If the exact flag is set, make a new DataObject // holding the object as the requested type. DataBucketBase* dbb = dynamic_cast<DataBucketBase*> (itemProxy->object()); if (!dbb) std::abort(); - void* ptr = dbb->cast (item.id()); + void* ptr = dbb->cast (item_id); if (!ptr) { // Hard cast ptr = dbb->object(); } auto altbucket = std::make_unique<AltDataBucket> - (ptr, item.id(), - *CLIDRegistry::CLIDToTypeinfo (item.id()), + (ptr, item_id, + *CLIDRegistry::CLIDToTypeinfo (item_id), *itemProxy); m_objects.push_back(altbucket.get()); m_ownedObjects.push_back (std::move(altbucket)); @@ -869,16 +890,16 @@ void AthenaOutputStream::addItemObjects(const SG::FolderItem& item) } else m_objects.push_back(itemProxy->object()); - ATH_MSG_DEBUG(" Added object " << item.id() << ",\"" << itemProxy->name() << "\""); + ATH_MSG_DEBUG(" Added object " << item_id << ",\"" << itemProxy->name() << "\""); } // Build ItemListSvc string std::string tn; std::stringstream tns; - if (!m_pCLIDSvc->getTypeNameOfID(item.id(), tn).isSuccess()) { + if (!m_pCLIDSvc->getTypeNameOfID(item_id, tn).isSuccess()) { ATH_MSG_ERROR(" Could not get type name for id " - << item.id() << ",\"" << itemProxy->name()); - tns << item.id() << '_' << itemProxy->name(); + << item_id << ",\"" << itemProxy->name()); + tns << item_id << '_' << itemProxy->name(); } else { tn += '_' + itemProxy->name(); tns << tn; @@ -960,14 +981,14 @@ void AthenaOutputStream::addItemObjects(const SG::FolderItem& item) } } // proxy loop if (!added && !removed) { - ATH_MSG_DEBUG(" No object matching " << item.id() << ",\"" << item_key << "\" found"); + ATH_MSG_DEBUG(" No object matching " << item_id << ",\"" << item_key << "\" found"); } else if (removed) { ATH_MSG_DEBUG(" Object being excluded based on property setting " - << item.id() << ",\"" << item_key << "\". Skipping"); + << item_id << ",\"" << item_key << "\". Skipping"); } } else { ATH_MSG_DEBUG(" Failed to receive proxy iterators from StoreGate for " - << item.id() << ",\"" << item_key << "\". Skipping"); + << item_id << ",\"" << item_key << "\". Skipping"); } } diff --git a/Control/AthenaServices/src/MetaDataSvc.cxx b/Control/AthenaServices/src/MetaDataSvc.cxx index 186e6f9797c6c408771bd393ee36895e01c8c382..3a96556106056cd046bfec0976e37bf611a0d3ec 100644 --- a/Control/AthenaServices/src/MetaDataSvc.cxx +++ b/Control/AthenaServices/src/MetaDataSvc.cxx @@ -1,5 +1,5 @@ /* - Copyright (C) 2002-2017 CERN for the benefit of the ATLAS collaboration + Copyright (C) 2002-2020 CERN for the benefit of the ATLAS collaboration */ /** @file MetaDataSvc.cxx @@ -24,6 +24,8 @@ #include "SGTools/SGVersionedKey.h" #include "PersistentDataModel/DataHeader.h" +#include "OutputStreamSequencerSvc.h" + #include <vector> #include <sstream> @@ -34,6 +36,7 @@ MetaDataSvc::MetaDataSvc(const std::string& name, ISvcLocator* pSvcLocator) : :: m_addrCrtr("AthenaPoolCnvSvc", name), m_fileMgr("FileMgr", name), m_incSvc("IncidentSvc", name), + m_outSeqSvc("OutputStreamSequencerSvc", name), m_storageType(0L), m_clearedInputDataStore(true), m_clearedOutputDataStore(false), @@ -157,6 +160,9 @@ StatusCode MetaDataSvc::initialize() { } } } + // retrieve the output sequences service (EventService) if available + m_outSeqSvc.retrieve().ignore(); + return(StatusCode::SUCCESS); } //__________________________________________________________________________ @@ -197,8 +203,8 @@ StatusCode MetaDataSvc::stop() { StatusCode MetaDataSvc::queryInterface(const InterfaceID& riid, void** ppvInterface) { if (riid == this->interfaceID()) { *ppvInterface = this; - } else if (riid == IMetadataTransition::interfaceID()) { - *ppvInterface = dynamic_cast<IMetadataTransition*>(this); + } else if (riid == IMetaDataSvc::interfaceID()) { + *ppvInterface = dynamic_cast<IMetaDataSvc*>(this); } else { // Interface is not directly available: try out a base class return(::AthService::queryInterface(riid, ppvInterface)); @@ -603,3 +609,18 @@ StatusCode MetaDataSvc::initInputMetaDataStore(const std::string& fileName) { return(StatusCode::SUCCESS); } + +const std::string MetaDataSvc::currentRangeID() const +{ + return m_outSeqSvc.isValid()? m_outSeqSvc->currentRangeID() : ""; +} + + +CLID MetaDataSvc::remapMetaContCLID( const CLID& item_id ) const +{ + // for now just a simple dumb if + if( item_id == 167728019 ) { + return 167729019; // MetaCont<EventStreamInfo> CLID + } + return item_id; +} diff --git a/Control/AthenaServices/src/MetaDataSvc.h b/Control/AthenaServices/src/MetaDataSvc.h index 24dbc206eb3a2bcc04f33e9492299da949501d35..2809547d1192ed85117e16e9b3693acf95d4d0c7 100644 --- a/Control/AthenaServices/src/MetaDataSvc.h +++ b/Control/AthenaServices/src/MetaDataSvc.h @@ -1,5 +1,5 @@ /* - Copyright (C) 2002-2019 CERN for the benefit of the ATLAS collaboration + Copyright (C) 2002-2020 CERN for the benefit of the ATLAS collaboration */ #ifndef ATHENASERVICES_METADATASVC_H @@ -19,7 +19,7 @@ #include "AthenaKernel/IAddressProvider.h" #include "AthenaBaseComps/AthService.h" #include "AthenaKernel/IMetaDataTool.h" -#include "AthenaKernel/IMetadataTransition.h" +#include "AthenaKernel/IMetaDataSvc.h" #include "boost/bind.hpp" @@ -29,6 +29,7 @@ class IAddressCreator; class StoreGateSvc; class IAlgTool; +class OutputStreamSequencerSvc; namespace Io { class FileAttr; @@ -44,7 +45,7 @@ template <class TYPE> class SvcFactory; class ATLAS_CHECK_THREAD_SAFETY MetaDataSvc : public ::AthService, virtual public IAddressProvider, virtual public IIncidentListener, - virtual public IMetadataTransition, + virtual public IMetaDataSvc, virtual public IIoComponent { // Allow the factory class access to the constructor friend class SvcFactory<MetaDataSvc>; @@ -68,14 +69,14 @@ public: // Non-static members /// Required of all Gaudi services: see Gaudi documentation for details /// Function called when a new metadata source becomes available - virtual StatusCode newMetadataSource(const Incident&) override; + virtual StatusCode newMetadataSource(const Incident&); /// Function called when a metadata source is closed - virtual StatusCode retireMetadataSource(const Incident&) override; + virtual StatusCode retireMetadataSource(const Incident&); /// Function called when the current state of metadata must be made /// ready for output - virtual StatusCode prepareOutput() override; + virtual StatusCode prepareOutput(); /// version of prepareOutput() for parallel streams virtual StatusCode prepareOutput(const std::string& outputName); @@ -108,6 +109,12 @@ public: // Non-static members StatusCode rootOpenAction(FILEMGR_CALLBACK_ARGS); + virtual StoreGateSvc* outputDataStore() const override final { return &*m_outputDataStore; } + + virtual const std::string currentRangeID() const override final; + + CLID remapMetaContCLID( const CLID& item_id ) const; + private: /// Add proxy to input metadata store - can be called directly or via BeginInputFile incident StatusCode addProxyToInputMetaDataStore(const std::string& tokenStr); @@ -120,7 +127,8 @@ private: // data ServiceHandle<IAddressCreator> m_addrCrtr; ServiceHandle<IFileMgr> m_fileMgr; ServiceHandle<IIncidentSvc> m_incSvc; - + ServiceHandle<OutputStreamSequencerSvc> m_outSeqSvc; + long m_storageType; bool m_clearedInputDataStore; bool m_clearedOutputDataStore; @@ -136,6 +144,5 @@ private: // properties /// MetaDataTools, vector with the MetaData tools. ToolHandleArray<IMetaDataTool> m_metaDataTools; }; - + #endif - diff --git a/Control/AthenaServices/src/OutputStreamSequencerSvc.cxx b/Control/AthenaServices/src/OutputStreamSequencerSvc.cxx index f678820a9a3978650cc7fae054d2a266d2d029ee..d9a6e189fa6f264a49968017fcd4d3bf9603db27 100644 --- a/Control/AthenaServices/src/OutputStreamSequencerSvc.cxx +++ b/Control/AthenaServices/src/OutputStreamSequencerSvc.cxx @@ -1,5 +1,5 @@ /* - Copyright (C) 2002-2019 CERN for the benefit of the ATLAS collaboration + Copyright (C) 2002-2020 CERN for the benefit of the ATLAS collaboration */ /** @file OutputStreamSequencerSvc.cxx @@ -34,12 +34,6 @@ StatusCode OutputStreamSequencerSvc::initialize() { ATH_MSG_FATAL("Cannot initialize AthService base class."); return(StatusCode::FAILURE); } - - // Retrieve MetaDataSvc - if (!m_metaDataSvc.retrieve().isSuccess()) { - ATH_MSG_FATAL("Cannot get MetaDataSvc."); - return(StatusCode::FAILURE); - } // Set to be listener for end of event ServiceHandle<IIncidentSvc> incsvc("IncidentSvc", this->name()); if (!incsvc.retrieve().isSuccess()) { @@ -48,6 +42,7 @@ StatusCode OutputStreamSequencerSvc::initialize() { } if( !incidentName().empty() ) { incsvc->addListener(this, incidentName(), 100); + incsvc->addListener(this, IncidentType::BeginProcessing, 100); } if( inConcurrentEventsMode() ) { ATH_MSG_DEBUG("Concurrent events mode"); @@ -55,6 +50,8 @@ StatusCode OutputStreamSequencerSvc::initialize() { ATH_MSG_VERBOSE("Sequential events mode"); } + // Gaudi::Concurrency::ConcurrencyFlags::numConcurrentEvents() not set yet + // m_rangeIDinSlot.resize( ); m_finishedRange = m_fnToRangeId.end(); return(StatusCode::SUCCESS); @@ -92,32 +89,56 @@ bool OutputStreamSequencerSvc::inUse() const { //__________________________________________________________________________ void OutputStreamSequencerSvc::handle(const Incident& inc) { - // process NextEventRange ATH_MSG_INFO("handle incident type " << inc.type()); - // finish the old range if needed - if( m_fileSequenceNumber >= 0 and !inConcurrentEventsMode() ) { - // When processing events sequentially (threads<2) write metadata on the NextRange incident - // but ignore the first incident because it only starts the first sequence - ATH_MSG_DEBUG("MetaData transition"); - if (!m_metaDataSvc->transitionMetaDataFile().isSuccess()) { - ATH_MSG_FATAL("Cannot transition MetaDataSvc."); + auto slot = Gaudi::Hive::currentContext().slot(); + ATH_MSG_INFO("MN: SLOT in seq handle=" << slot); + if( slot == EventContext::INVALID_CONTEXT_ID ) slot = 0; + + if( inc.type() == incidentName() ) { // NextEventRange + // finish the old range if needed + if( m_fileSequenceNumber >= 0 and !inConcurrentEventsMode() ) { + // When processing events sequentially (threads<2) write metadata on the NextRange incident + // but ignore the first incident because it only starts the first sequence + ATH_MSG_DEBUG("MetaData transition"); + // Retrieve MetaDataSvc + if( !m_metaDataSvc.isValid() and !m_metaDataSvc.retrieve().isSuccess() ) { + throw GaudiException("Cannot get MetaDataSvc", name(), StatusCode::FAILURE); + } + if( !m_metaDataSvc->transitionMetaDataFile().isSuccess() ) { + throw GaudiException("Cannot transition MetaData", name(), StatusCode::FAILURE); + } } + // start a new range + std::lock_guard lockg( m_mutex ); + std::string rangeID; + m_fileSequenceNumber++; + const FileIncident* fileInc = dynamic_cast<const FileIncident*>(&inc); + if (fileInc != nullptr) { + rangeID = fileInc->fileName(); + ATH_MSG_DEBUG("Requested (through incident) next event range filename extension: " << rangeID); + } + if( rangeID.empty() ) { + std::ostringstream n; + n << "_" << std::setw(4) << std::setfill('0') << m_fileSequenceNumber; + rangeID = n.str(); + ATH_MSG_DEBUG("Default next event range filename extension: " << rangeID); + } + if( slot >= m_rangeIDinSlot.size() ) { + // MN - late resize, is there a better place for it? + m_rangeIDinSlot.resize( std::max(slot+1, Gaudi::Concurrency::ConcurrencyFlags::numConcurrentEvents()) ); + } + m_rangeIDinSlot[ slot ] = rangeID; + // remember range ID for next events in the same range + m_currentRangeID = rangeID; } - // start a new range - m_currentRangeID.clear(); - m_fileSequenceNumber++; - const FileIncident* fileInc = dynamic_cast<const FileIncident*>(&inc); - if (fileInc != nullptr) { - m_currentRangeID = fileInc->fileName(); - ATH_MSG_DEBUG("Requested (through incident) next event range filename extension: " << m_currentRangeID); - } - if( m_currentRangeID.empty() ) { - std::ostringstream n; - n << "_" << std::setw(4) << std::setfill('0') << m_fileSequenceNumber; - m_currentRangeID = n.str(); - ATH_MSG_DEBUG("Default next event range filename extension: " << m_currentRangeID); + else if( inc.type() == IncidentType::BeginProcessing ) { + // new event start - assing current rangeId to its slot + ATH_MSG_INFO("MN: assigne rangeID = " << m_currentRangeID << " to slot " << slot); + std::lock_guard lockg( m_mutex ); + m_rangeIDinSlot[ slot ] = m_currentRangeID; } + } //__________________________________________________________________________ @@ -127,6 +148,8 @@ std::string OutputStreamSequencerSvc::buildSequenceFileName(const std::string& o // Event sequences not in use, just return the original filename return orgFileName; } + std::string rangeID = currentRangeID(); + std::lock_guard lockg( m_mutex ); // build the full output file name for this event range std::string fileNameCore = orgFileName, fileNameExt; std::size_t sepPos = orgFileName.find("["); @@ -135,20 +158,34 @@ std::string OutputStreamSequencerSvc::buildSequenceFileName(const std::string& o fileNameExt = orgFileName.substr(sepPos); } std::ostringstream n; - n << fileNameCore << "." << m_currentRangeID << fileNameExt; - m_fnToRangeId.insert(std::pair<std::string,std::string>(n.str(),m_currentRangeID)); + n << fileNameCore << "." << rangeID << fileNameExt; + m_fnToRangeId.insert( std::pair(n.str(), rangeID) ); return n.str(); } + +std::string OutputStreamSequencerSvc::currentRangeID() const +{ + if( !inUse() ) return ""; + auto slot = Gaudi::Hive::currentContext().slot(); + if( slot == EventContext::INVALID_CONTEXT_ID ) slot = 0; + std::lock_guard lockg( m_mutex ); + if( slot >= m_rangeIDinSlot.size() ) return ""; + return m_rangeIDinSlot[ slot ]; +} + + void OutputStreamSequencerSvc::publishRangeReport(const std::string& outputFile) { - m_finishedRange = m_fnToRangeId.find(outputFile); + std::lock_guard lockg( m_mutex ); + m_finishedRange = m_fnToRangeId.find(outputFile); } OutputStreamSequencerSvc::RangeReport_ptr OutputStreamSequencerSvc::getRangeReport() { RangeReport_ptr report; + std::lock_guard lockg( m_mutex ); if(m_finishedRange!=m_fnToRangeId.end()) { report = std::make_unique<RangeReport_t>(m_finishedRange->second,m_finishedRange->first); m_fnToRangeId.erase(m_finishedRange); diff --git a/Control/AthenaServices/src/OutputStreamSequencerSvc.h b/Control/AthenaServices/src/OutputStreamSequencerSvc.h index 0b43715ba4d502d465cfcfe248aa208292131097..893bd27b04cf2a830c723103f0dae000a23fb526 100644 --- a/Control/AthenaServices/src/OutputStreamSequencerSvc.h +++ b/Control/AthenaServices/src/OutputStreamSequencerSvc.h @@ -1,5 +1,5 @@ /* - Copyright (C) 2002-2019 CERN for the benefit of the ATLAS collaboration + Copyright (C) 2002-2020 CERN for the benefit of the ATLAS collaboration */ #ifndef OUTPUTSTREAMSEQUENCERSVC_H @@ -17,6 +17,7 @@ #include <memory> #include <map> +#include <mutex> // Forward declarations class MetaDataSvc; @@ -46,14 +47,14 @@ public: // Constructor and Destructor public: // Non-static members /// Required of all Gaudi services: - StatusCode initialize(); + virtual StatusCode initialize() override final; /// Required of all Gaudi services: - StatusCode finalize(); + virtual StatusCode finalize() override final; /// Required of all Gaudi services: see Gaudi documentation for details - StatusCode queryInterface(const InterfaceID& riid, void** ppvInterface); + virtual StatusCode queryInterface(const InterfaceID& riid, void** ppvInterface) override final; /// Incident service handle - void handle(const Incident& /*inc*/); + virtual void handle(const Incident& /*inc*/) override final; /// Returns sequenced file name for output stream std::string buildSequenceFileName(const std::string&); @@ -62,7 +63,10 @@ public: // Non-static members /// The name of the incident that starts a new event sequence std::string incidentName() const { return m_incidentName.value(); } - + + /// The current Event Range ID (only one range is + std::string currentRangeID() const; + /// Is the service in active use? (true after the first range incident is handled) bool inUse() const; @@ -75,8 +79,11 @@ private: // data /// The event sequence number int m_fileSequenceNumber; - /// Current EventRange ID constructed on the NextRange incident + /// Current EventRange ID constructed on the last NextRange incident std::string m_currentRangeID; + + /// EventRange ID for all slots + std::vector<std::string> m_rangeIDinSlot; private: // properties /// SequenceIncidentName, incident name for triggering file sequencing. @@ -84,6 +91,8 @@ private: // properties std::map<std::string,std::string> m_fnToRangeId; std::map<std::string,std::string>::iterator m_finishedRange; + + mutable std::mutex m_mutex; }; #endif diff --git a/Database/AthenaPOOL/AthenaPoolCnvSvc/src/AthenaPoolCnvSvc.cxx b/Database/AthenaPOOL/AthenaPoolCnvSvc/src/AthenaPoolCnvSvc.cxx index e4d1fc2b680eca6eddf51336d1c8e0aaebfac1fc..081a7b67d03d8814e6e8415cf8dd7e62f5aeab50 100644 --- a/Database/AthenaPOOL/AthenaPoolCnvSvc/src/AthenaPoolCnvSvc.cxx +++ b/Database/AthenaPOOL/AthenaPoolCnvSvc/src/AthenaPoolCnvSvc.cxx @@ -19,7 +19,7 @@ #include "AthenaKernel/IAthenaSerializeSvc.h" #include "AthenaKernel/IAthenaOutputStreamTool.h" -#include "AthenaKernel/IMetadataTransition.h" +#include "AthenaKernel/IMetaDataSvc.h" #include "PersistentDataModel/Placement.h" #include "PersistentDataModel/Token.h" #include "PersistentDataModel/TokenAddress.h" @@ -447,10 +447,10 @@ StatusCode AthenaPoolCnvSvc::commitOutput(const std::string& outputConnectionSpe m_metadataClient = num; } // Retrieve MetaDataSvc - ServiceHandle<IMetadataTransition> metadataTransition("MetaDataSvc", name()); - ATH_CHECK(metadataTransition.retrieve()); - if(!metadataTransition->shmProxy(std::string(placementStr) + "[NUM=" + oss2.str() + "]").isSuccess()) { - ATH_MSG_FATAL("IMetadataTransition::shmProxy() failed!"); + ServiceHandle<IMetaDataSvc> metadataSvc("MetaDataSvc", name()); + ATH_CHECK(metadataSvc.retrieve()); + if(!metadataSvc->shmProxy(std::string(placementStr) + "[NUM=" + oss2.str() + "]").isSuccess()) { + ATH_MSG_FATAL("MetaDataSvc::shmProxy() failed!"); return abortSharedWrClients(num); } } else { diff --git a/Database/AthenaPOOL/OutputStreamAthenaPool/src/MakeEventStreamInfo.cxx b/Database/AthenaPOOL/OutputStreamAthenaPool/src/MakeEventStreamInfo.cxx index f6220d92acb5c908156fa344777f30cd328f836a..462cecda784ed2539a84345c27b075803770b15e 100644 --- a/Database/AthenaPOOL/OutputStreamAthenaPool/src/MakeEventStreamInfo.cxx +++ b/Database/AthenaPOOL/OutputStreamAthenaPool/src/MakeEventStreamInfo.cxx @@ -24,7 +24,7 @@ MakeEventStreamInfo::MakeEventStreamInfo(const std::string& type, const std::string& name, const IInterface* parent) : ::AthAlgTool(type, name, parent), - m_metaDataStore("StoreGateSvc/MetaDataStore", name), + m_metaDataSvc("MetaDataSvc", name), m_eventStore("StoreGateSvc", name) { // Declare IAthenaOutputStreamTool interface @@ -42,8 +42,8 @@ MakeEventStreamInfo::~MakeEventStreamInfo() { StatusCode MakeEventStreamInfo::initialize() { ATH_MSG_INFO("Initializing " << name() << " - package version " << PACKAGE_VERSION); // Locate the MetaDataStore - if (!m_metaDataStore.retrieve().isSuccess()) { - ATH_MSG_FATAL("Could not find MetaDataStore"); + if (!m_metaDataSvc.retrieve().isSuccess()) { + ATH_MSG_FATAL("Could not find MetaDataSvc"); return(StatusCode::FAILURE); } if (!m_eventStore.retrieve().isSuccess()) { @@ -56,16 +56,10 @@ StatusCode MakeEventStreamInfo::initialize() { //___________________________________________________________________________ StatusCode MakeEventStreamInfo::postInitialize() { // Remove EventStreamInfo with same key if it exists - if (m_metaDataStore->contains<EventStreamInfo>(m_key.value())) { - const EventStreamInfo* pEventStream = nullptr; - if (!m_metaDataStore->retrieve(pEventStream, m_key.value()).isSuccess()) { - ATH_MSG_ERROR("Unable to retrieve EventStreamInfo object"); - return(StatusCode::FAILURE); - } - if (!m_metaDataStore->removeDataAndProxy(pEventStream).isSuccess()) { - ATH_MSG_ERROR("Unable to remove proxy for EventStreamInfo object"); - return(StatusCode::FAILURE); - } + bool ignoreIfAbsent = true; + if( !m_metaDataSvc->remove<EventStreamInfo>(m_key.value(), ignoreIfAbsent).isSuccess() ) { + ATH_MSG_ERROR("Unable to remove EventStreamInfo with key " << m_key.value()); + return StatusCode::FAILURE; } return(StatusCode::SUCCESS); } @@ -105,18 +99,16 @@ StatusCode MakeEventStreamInfo::postExecute() { return(StatusCode::FAILURE); } } - if (!m_metaDataStore->contains<EventStreamInfo>(m_key.value())) { - EventStreamInfo* pEventStream = new EventStreamInfo(); - if (m_metaDataStore->record(pEventStream, m_key.value()).isFailure()) { + + EventStreamInfo* pEventStream = m_metaDataSvc->tryRetrieve<EventStreamInfo>(m_key.value()); + if( !pEventStream ) { + pEventStream = new EventStreamInfo(); + if( m_metaDataSvc->record(pEventStream, m_key.value() ).isFailure()) { ATH_MSG_ERROR("Could not register EventStreamInfo object"); + delete pEventStream; return(StatusCode::FAILURE); } } - EventStreamInfo* pEventStream = nullptr; - if (!m_metaDataStore->retrieve(pEventStream, m_key.value()).isSuccess()) { - ATH_MSG_ERROR("Unable to retrieve EventStreamInfo object"); - return(StatusCode::FAILURE); - } pEventStream->addEvent(); pEventStream->insertProcessingTag(dataHeader->getProcessTag()); pEventStream->insertLumiBlockNumber( lumiN ); @@ -130,9 +122,9 @@ StatusCode MakeEventStreamInfo::postExecute() { } //___________________________________________________________________________ StatusCode MakeEventStreamInfo::preFinalize() { - if (!m_metaDataStore->contains<EventStreamInfo>(m_key.value())) { + if( !m_metaDataSvc->tryRetrieve<EventStreamInfo>(m_key.value()) ) { EventStreamInfo* pEventStream = new EventStreamInfo(); - if (m_metaDataStore->record(pEventStream, m_key.value()).isFailure()) { + if( m_metaDataSvc->record(pEventStream, m_key.value()).isFailure() ) { ATH_MSG_ERROR("Could not register EventStreamInfo object"); return(StatusCode::FAILURE); } @@ -143,7 +135,7 @@ StatusCode MakeEventStreamInfo::preFinalize() { StatusCode MakeEventStreamInfo::finalize() { ATH_MSG_DEBUG("in finalize()"); // release the MetaDataStore - if (!m_metaDataStore.release().isSuccess()) { + if (!m_metaDataSvc.release().isSuccess()) { ATH_MSG_WARNING("Could not release MetaDataStore"); } if (!m_eventStore.release().isSuccess()) { diff --git a/Database/AthenaPOOL/OutputStreamAthenaPool/src/MakeEventStreamInfo.h b/Database/AthenaPOOL/OutputStreamAthenaPool/src/MakeEventStreamInfo.h index e8af0a5cbdbb9a03903808b5ec43560c618e512a..d8dda11f4ae2b68bf780fe8aacf636cbb20dab96 100644 --- a/Database/AthenaPOOL/OutputStreamAthenaPool/src/MakeEventStreamInfo.h +++ b/Database/AthenaPOOL/OutputStreamAthenaPool/src/MakeEventStreamInfo.h @@ -1,5 +1,5 @@ /* - Copyright (C) 2002-2019 CERN for the benefit of the ATLAS collaboration + Copyright (C) 2002-2020 CERN for the benefit of the ATLAS collaboration */ #ifndef MAKEEVENTSTREAMINFO_H @@ -14,6 +14,7 @@ #include "AthenaBaseComps/AthAlgTool.h" #include "GaudiKernel/ServiceHandle.h" +#include "AthenaKernel/IMetaDataSvc.h" #include <string> @@ -53,7 +54,7 @@ private: StringProperty m_oEventInfoKey; /// Pointer to the data stores - ServiceHandle<StoreGateSvc> m_metaDataStore; - ServiceHandle<StoreGateSvc> m_eventStore; + ServiceHandle<IMetaDataSvc> m_metaDataSvc; + ServiceHandle<StoreGateSvc> m_eventStore; }; #endif