diff --git a/Control/AthenaIPCTools/src/AthenaSharedMemoryTool.cxx b/Control/AthenaIPCTools/src/AthenaSharedMemoryTool.cxx index ae0382c34cbbfbea747ea785b2343ce08cfa04c6..5c46484f21e9407f51ba01b1eaaa1fbe604191de 100644 --- a/Control/AthenaIPCTools/src/AthenaSharedMemoryTool.cxx +++ b/Control/AthenaIPCTools/src/AthenaSharedMemoryTool.cxx @@ -65,9 +65,6 @@ StatusCode AthenaSharedMemoryTool::initialize() { ATH_MSG_FATAL("Cannot get IncidentSvc"); return(StatusCode::FAILURE); } - std::ostringstream pidstr; - pidstr << getpid(); - m_sharedMemory.setValue(m_sharedMemory.value() + std::string("_") + pidstr.str()); return(StatusCode::SUCCESS); } @@ -114,6 +111,7 @@ StatusCode AthenaSharedMemoryTool::makeServer(int num) { m_num = num; m_isServer = true; ATH_MSG_DEBUG("Creating shared memory object with name \"" << m_sharedMemory.value() << "\""); + boost::interprocess::shared_memory_object::remove(m_sharedMemory.value().c_str()); boost::interprocess::shared_memory_object shm(boost::interprocess::create_only, m_sharedMemory.value().c_str(), boost::interprocess::read_write); diff --git a/Control/AthenaIPCTools/src/AthenaSharedWriter.cxx b/Control/AthenaIPCTools/src/AthenaSharedWriter.cxx new file mode 100644 index 0000000000000000000000000000000000000000..0230a7147fbde7b455b2c6d08b20ce1aa4e7b3d2 --- /dev/null +++ b/Control/AthenaIPCTools/src/AthenaSharedWriter.cxx @@ -0,0 +1,76 @@ +/* + Copyright (C) 2002-2020 CERN for the benefit of the ATLAS collaboration +*/ + +/** @file AthenaSharedWriter.cxx + * @brief This file contains the implementation for the AthenaSharedWriter class. + * @author Peter van Gemmeren <gemmeren@anl.gov> + **/ + +#include "AthenaBaseComps/AthCnvSvc.h" +#include "AthenaKernel/IDataShare.h" + +#include "AthenaSharedWriter.h" + +//___________________________________________________________________________ +AthenaSharedWriter::AthenaSharedWriter(const std::string& name, ISvcLocator* pSvcLocator) + : AthFilterAlgorithm(name, pSvcLocator) , m_cnvSvc(nullptr) { +} +//___________________________________________________________________________ +AthenaSharedWriter::~AthenaSharedWriter() { +} +//___________________________________________________________________________ +StatusCode AthenaSharedWriter::initialize() { + ATH_MSG_INFO("in initialize()"); + + StatusCode sc = serviceLocator()->service("AthenaPoolCnvSvc", m_cnvSvc); + if (sc.isFailure() || m_cnvSvc == nullptr) { + ATH_MSG_FATAL("Could not retrieve AthenaPoolCnvSvc"); + return StatusCode::FAILURE; + } + // Use IDataShare to make ConversionSvc a Share Server + IDataShare* dataShare = dynamic_cast<IDataShare*>(m_cnvSvc); + if (dataShare == nullptr || !dataShare->makeServer(-m_numberOfClients.value() - 1).isSuccess()) { + ATH_MSG_FATAL("Could not make AthenaPoolCnvSvc a share server: " << dataShare); + return StatusCode::FAILURE; + } else { + ATH_MSG_DEBUG("Successfully made the conversion service a share server"); + } + return StatusCode::SUCCESS; +} +//___________________________________________________________________________ +StatusCode AthenaSharedWriter::execute() { + ATH_MSG_DEBUG("in execute()"); + int counter = 0; + bool doCommit = false; + StatusCode sc = m_cnvSvc->commitOutput("", false); + ATH_MSG_VERBOSE("Start commitOutput loop"); + while (sc.isSuccess() || sc.isRecoverable()) { + if (sc.isRecoverable()) { + usleep(100); + } else { + counter++; + if (m_autoSend.value() > 0 && counter% m_autoSend.value() == 0) { + doCommit = true; + ATH_MSG_INFO("commitOutput sending data."); + } else { + doCommit = false; + } + } + sc = m_cnvSvc->commitOutput("", doCommit); + } + + AthCnvSvc* cnvSvc = dynamic_cast<AthCnvSvc*>(m_cnvSvc); + if (cnvSvc == nullptr || !cnvSvc->disconnectOutput("").isSuccess()) { + ATH_MSG_FATAL("Could not disconnectOutput"); + return StatusCode::FAILURE; + } + + setFilterPassed(false); // don't output events + return StatusCode::SUCCESS; +} +//___________________________________________________________________________ +StatusCode AthenaSharedWriter::finalize() { + ATH_MSG_INFO("in finalize()"); + return StatusCode::SUCCESS; +} diff --git a/Control/AthenaIPCTools/src/AthenaSharedWriter.h b/Control/AthenaIPCTools/src/AthenaSharedWriter.h new file mode 100644 index 0000000000000000000000000000000000000000..80562df0fb39c0d81774a3570845709ef2faa2d3 --- /dev/null +++ b/Control/AthenaIPCTools/src/AthenaSharedWriter.h @@ -0,0 +1,41 @@ +/* + Copyright (C) 2002-2020 CERN for the benefit of the ATLAS collaboration +*/ + +#ifndef ATHENASHAREDWRITER_H +#define ATHENASHAREDWRITER_H + +/** @file AthenaSharedWriter.h + * @brief This file contains the class definition for the AthenaSharedWriter class. + * @author Peter van Gemmeren <gemmeren@anl.gov> + **/ + +#include "AthenaBaseComps/AthFilterAlgorithm.h" + +class IConversionSvc; + +/** @class AthenaSharedWriter + * @brief This class provides an example for writing event data objects to Pool. + **/ +class AthenaSharedWriter : public AthFilterAlgorithm { +public: // Constructor and Destructor + /// Standard Service Constructor + AthenaSharedWriter(const std::string& name, ISvcLocator* pSvcLocator); + /// Destructor + virtual ~AthenaSharedWriter(); + +public: +/// Gaudi Service Interface method implementations: + virtual StatusCode initialize() override; + virtual StatusCode execute () override; + virtual StatusCode finalize() override; + +private: // properties + IntegerProperty m_numberOfClients{this,"NumberOfClients",1}; + IntegerProperty m_autoSend{this,"AutoSend",-1}; + +private: + IConversionSvc* m_cnvSvc; +}; + +#endif diff --git a/Control/AthenaIPCTools/src/components/AthenaIPCTools_entries.cxx b/Control/AthenaIPCTools/src/components/AthenaIPCTools_entries.cxx index c0d8e999d83da4cb90af2db5892db16daf722c13..e87d47baee5563bc1510800b92ea67e8005e7651 100644 --- a/Control/AthenaIPCTools/src/components/AthenaIPCTools_entries.cxx +++ b/Control/AthenaIPCTools/src/components/AthenaIPCTools_entries.cxx @@ -1,6 +1,8 @@ +#include "../AthenaSharedWriter.h" #include "../AthenaSharedMemoryTool.h" #include "../AthenaYamplTool.h" +DECLARE_COMPONENT( AthenaSharedWriter ) DECLARE_COMPONENT( AthenaSharedMemoryTool ) DECLARE_COMPONENT( AthenaYamplTool ) diff --git a/Control/AthenaKernel/AthenaKernel/IAthenaOutputStreamTool.h b/Control/AthenaKernel/AthenaKernel/IAthenaOutputStreamTool.h index b14d4ecad6650b179b4eb03c3088142ba1805f53..7f7356bf929c43428a3f7699d764a7fabc18337b 100644 --- a/Control/AthenaKernel/AthenaKernel/IAthenaOutputStreamTool.h +++ b/Control/AthenaKernel/AthenaKernel/IAthenaOutputStreamTool.h @@ -85,7 +85,7 @@ public: /// Commit the output stream after having streamed out objects /// Must commitOutput AFTER streaming - virtual StatusCode commitOutput() = 0; + virtual StatusCode commitOutput(bool doCommit = false) = 0; /// Finalize the output stream after the last commit, e.g. in /// finalize diff --git a/Control/AthenaMP/python/PyComps.py b/Control/AthenaMP/python/PyComps.py index 6a2a659779abdd8d40addea2d08c0ff200a18896..36ec82c33772e9784a9b0cc12e40c846723a0aa9 100644 --- a/Control/AthenaMP/python/PyComps.py +++ b/Control/AthenaMP/python/PyComps.py @@ -105,14 +105,14 @@ class MpEvtLoopMgr(AthMpEvtLoopMgr): if use_shared_reader: from AthenaCommon.AppMgr import ServiceMgr as svcMgr from AthenaIPCTools.AthenaIPCToolsConf import AthenaSharedMemoryTool - svcMgr.EventSelector.SharedMemoryTool = AthenaSharedMemoryTool("EventStreamingTool") + svcMgr.EventSelector.SharedMemoryTool = AthenaSharedMemoryTool("EventStreamingTool", SharedMemoryName="EventStream"+str(os.getpid())) if 'AthenaPoolCnvSvc.ReadAthenaPool' in sys.modules: - svcMgr.AthenaPoolCnvSvc.InputStreamingTool = AthenaSharedMemoryTool("InputStreamingTool") + svcMgr.AthenaPoolCnvSvc.InputStreamingTool = AthenaSharedMemoryTool("InputStreamingTool", SharedMemoryName="InputStream"+str(os.getpid())) if use_shared_writer: if 'AthenaPoolCnvSvc.WriteAthenaPool' in sys.modules: from AthenaCommon.AppMgr import ServiceMgr as svcMgr from AthenaIPCTools.AthenaIPCToolsConf import AthenaSharedMemoryTool - svcMgr.AthenaPoolCnvSvc.OutputStreamingTool += [ AthenaSharedMemoryTool("OutputStreamingTool_0") ] + svcMgr.AthenaPoolCnvSvc.OutputStreamingTool += [ AthenaSharedMemoryTool("OutputStreamingTool_0", SharedMemoryName="OutputStream"+str(os.getpid())) ] from AthenaMPTools.AthenaMPToolsConf import SharedEvtQueueProvider self.Tools += [ SharedEvtQueueProvider(UseSharedReader=use_shared_reader, diff --git a/Control/AthenaServices/src/AthenaOutputStream.cxx b/Control/AthenaServices/src/AthenaOutputStream.cxx index 08f9607bc043cd2bca2aa35395cc9e2c7136cbb2..ec733590e94640da25c49554d0b1f533fd0353a3 100644 --- a/Control/AthenaServices/src/AthenaOutputStream.cxx +++ b/Control/AthenaServices/src/AthenaOutputStream.cxx @@ -611,7 +611,7 @@ StatusCode AthenaOutputStream::write() { // lock.unlock(); // Connect the output file to the service - if (!streamer->connectOutput(outputFN).isSuccess()) { + if (!streamer->connectOutput(connectStr).isSuccess()) { ATH_MSG_FATAL("Could not connectOutput"); return StatusCode::FAILURE; } @@ -626,7 +626,12 @@ StatusCode AthenaOutputStream::write() { ATH_MSG_DEBUG("streamObjects failed."); } } - if (!streamer->commitOutput().isSuccess()) { + bool doCommit = false; + if (m_events % m_autoSend.value() == 0 && outputFN.find("?pmerge=") != std::string::npos) { + doCommit = true; + ATH_MSG_DEBUG("commitOutput sending data."); + } + if (!streamer->commitOutput(doCommit).isSuccess()) { ATH_MSG_FATAL("commitOutput failed."); failed = true; } diff --git a/Control/AthenaServices/src/AthenaOutputStream.h b/Control/AthenaServices/src/AthenaOutputStream.h index 6071cefba9841e00e3d9d40708bc4da82d1fc91d..5085056a2bf3667713c18d5b85a3d34e64986cbe 100644 --- a/Control/AthenaServices/src/AthenaOutputStream.h +++ b/Control/AthenaServices/src/AthenaOutputStream.h @@ -76,11 +76,13 @@ protected: ServiceHandle<OutputStreamSequencerSvc> m_outSeqSvc; /// Vector of item names - StringArrayProperty m_itemList{this,"ItemList",{},"List of items to write","Set<std::string>"}; + StringArrayProperty m_itemList{this,"ItemList",{},"List of items to write","Set<std::string>"}; /// Vector of item names StringArrayProperty m_metadataItemList; /// Vector of item names StringArrayProperty m_excludeList; + /// Number of commits after which to do a flush when using in memory files + IntegerProperty m_autoSend{this,"AutoSend",-1}; /// Vector of item names StringArrayProperty m_compressionListHigh; /// Vector of item names diff --git a/Control/AthenaServices/src/AthenaOutputStreamTool.cxx b/Control/AthenaServices/src/AthenaOutputStreamTool.cxx index c86e3a5280378ea47e825922d44469467cfe33d6..372fbd78089c464006e76fd0313975f7da135af7 100644 --- a/Control/AthenaServices/src/AthenaOutputStreamTool.cxx +++ b/Control/AthenaServices/src/AthenaOutputStreamTool.cxx @@ -305,11 +305,10 @@ StatusCode AthenaOutputStreamTool::connectOutput(const std::string& outputName) return(StatusCode::SUCCESS); } //__________________________________________________________________________ -StatusCode AthenaOutputStreamTool::commitOutput() { +StatusCode AthenaOutputStreamTool::commitOutput(bool doCommit) { ATH_MSG_DEBUG("In commitOutput"); - m_outputName.setValue(m_outputName.value().substr(0, m_outputName.value().find("["))); // Connect the output file to the service - if (m_conversionSvc->commitOutput(m_outputName.value(), false).isFailure()) { + if (m_conversionSvc->commitOutput(m_outputName.value(), doCommit).isFailure()) { ATH_MSG_ERROR("Unable to commit output " << m_outputName.value()); return(StatusCode::FAILURE); } diff --git a/Control/AthenaServices/src/AthenaOutputStreamTool.h b/Control/AthenaServices/src/AthenaOutputStreamTool.h index b8fb5c431b8f16e8ab5b1a86d7712d7e8827ffbe..51a370d4ecb4f0b424b5e30e2d43e1e703922b10 100644 --- a/Control/AthenaServices/src/AthenaOutputStreamTool.h +++ b/Control/AthenaServices/src/AthenaOutputStreamTool.h @@ -55,7 +55,7 @@ public: /// Commit the output stream after having streamed out objects /// Must commitOutput AFTER streaming - StatusCode commitOutput(); + StatusCode commitOutput(bool doCommit = false); /// Finalize the output stream after the last commit, e.g. in /// finalize diff --git a/Database/APR/PersistencySvc/src/UserDatabase.cpp b/Database/APR/PersistencySvc/src/UserDatabase.cpp index 181d442420578127ddfd3092f037216673403101..27a3c5cbe8f6eebaa46db62aaac39788657531d1 100644 --- a/Database/APR/PersistencySvc/src/UserDatabase.cpp +++ b/Database/APR/PersistencySvc/src/UserDatabase.cpp @@ -168,7 +168,7 @@ pool::PersistencySvc::UserDatabase::connectForWrite( const pool::DatabaseConnect // register in the catalog pool::DbType dbType( m_technology ); pool::DbType dbTypeMajor( dbType.majorType() ); - m_catalog.registerPFN( m_the_pfn, dbTypeMajor.storageName(), m_the_fid ); + m_catalog.registerPFN( m_the_pfn.substr(0, m_the_pfn.find("?")), dbTypeMajor.storageName(), m_the_fid ); DbPrint log("PersistencySvc::UserDB::connectForWrite()" ); log << DbPrintLvl::Debug << "registered PFN: " << m_the_pfn << " with FID:" << m_the_fid << endmsg; dbRegistered = true; diff --git a/Database/APR/RootStorageSvc/src/RootTreeIndexContainer.cpp b/Database/APR/RootStorageSvc/src/RootTreeIndexContainer.cpp index ee5faecda20afd8df9e339679bfd944a124520fe..5364d26c5f236beb2d201212bc3ee197511f8148 100644 --- a/Database/APR/RootStorageSvc/src/RootTreeIndexContainer.cpp +++ b/Database/APR/RootStorageSvc/src/RootTreeIndexContainer.cpp @@ -76,7 +76,7 @@ DbStatus RootTreeIndexContainer::transAct(Transaction::Action action) { DbStatus status = RootTreeContainer::transAct(action); if (action == Transaction::TRANSACT_FLUSH) { if (m_tree == nullptr) return Error; - if (m_index_ref != nullptr && m_tree->GetEntryNumberWithIndex(nextRecordId()) == -1) { + if (m_index_ref != nullptr && m_tree->GetEntries() > 0 && m_tree->GetEntryNumberWithIndex(nextRecordId()) == -1) { m_tree->BuildIndex("index_ref"); } } diff --git a/Database/AthenaPOOL/AthenaPoolCnvSvc/src/AthenaPoolCnvSvc.cxx b/Database/AthenaPOOL/AthenaPoolCnvSvc/src/AthenaPoolCnvSvc.cxx index 57327c0bbbf2728143d202c84703daeadab8ceed..e4d1fc2b680eca6eddf51336d1c8e0aaebfac1fc 100644 --- a/Database/AthenaPOOL/AthenaPoolCnvSvc/src/AthenaPoolCnvSvc.cxx +++ b/Database/AthenaPOOL/AthenaPoolCnvSvc/src/AthenaPoolCnvSvc.cxx @@ -280,28 +280,36 @@ StatusCode AthenaPoolCnvSvc::connectOutput(const std::string& outputConnectionSp //______________________________________________________________________________ StatusCode AthenaPoolCnvSvc::connectOutput(const std::string& outputConnectionSpec) { // This is called before DataObjects are being converted. - std::string outputConnection = outputConnectionSpec; + std::string outputConnection = outputConnectionSpec.substr(0, outputConnectionSpec.find("[")); // Extract the technology int tech = m_dbType.type(); if (!decodeOutputSpec(outputConnection, tech).isSuccess()) { ATH_MSG_ERROR("connectOutput FAILED extract file name and technology."); return(StatusCode::FAILURE); } - if (!m_outputStreamingTool.empty() && m_outputStreamingTool[0]->isClient()) { + if (m_makeStreamingToolClient.value() > 0 && !m_outputStreamingTool.empty() && !m_outputStreamingTool[0]->isServer() && !m_outputStreamingTool[0]->isClient()) { + m_outputStreamingTool[0]->makeClient(m_makeStreamingToolClient.value()).ignore(); + } + if (!m_outputStreamingTool.empty() && m_outputStreamingTool[0]->isClient() + && (!m_streamMetaDataOnly || outputConnectionSpec.find("[PoolContainerPrefix=" + m_metadataContainerProp.value() + "]") != std::string::npos)) { return(StatusCode::SUCCESS); } - if (!m_outputStreamingTool.empty()) { - if (m_streamServer == m_outputStreamingTool.size() || !m_outputStreamingTool[m_streamServer < m_outputStreamingTool.size() ? m_streamServer : 0]->isServer()) { + if (!m_outputStreamingTool.empty() && !m_outputStreamingTool[0]->isClient()) { + if (m_streamMetaDataOnly && outputConnectionSpec.find("[PoolContainerPrefix=" + m_metadataContainerProp.value() + "]") == std::string::npos) { + ATH_MSG_DEBUG("connectOutput SKIPPED for metadata-only server: " << outputConnectionSpec); + return(StatusCode::SUCCESS); + } + if (!m_streamMetaDataOnly && (m_streamServer == m_outputStreamingTool.size() || !m_outputStreamingTool[m_streamServer < m_outputStreamingTool.size() ? m_streamServer : 0]->isServer())) { ATH_MSG_DEBUG("connectOutput SKIPPED for expired server."); return(StatusCode::SUCCESS); } std::size_t streamClient = 0; for (std::vector<std::string>::const_iterator iter = m_streamClientFiles.begin(), last = m_streamClientFiles.end(); iter != last; iter++) { - if (*iter == outputConnectionSpec) break; + if (*iter == outputConnection) break; streamClient++; } if (streamClient == m_streamClientFiles.size()) { - m_streamClientFiles.push_back(outputConnectionSpec); + m_streamClientFiles.push_back(outputConnection); } } @@ -338,15 +346,17 @@ StatusCode AthenaPoolCnvSvc::connectOutput(const std::string& outputConnectionSp //______________________________________________________________________________ StatusCode AthenaPoolCnvSvc::commitOutput(const std::string& outputConnectionSpec, bool doCommit) { // This is called after all DataObjects are converted. - if (!m_outputStreamingTool.empty() && m_outputStreamingTool[0]->isClient()) { + std::string outputConnection = outputConnectionSpec.substr(0, outputConnectionSpec.find("[")); + if (!m_outputStreamingTool.empty() && m_outputStreamingTool[0]->isClient() + && (!m_streamMetaDataOnly || outputConnectionSpec.find("[PoolContainerPrefix=" + m_metadataContainerProp.value() + "]") != std::string::npos)) { std::size_t streamClient = 0; for (std::vector<std::string>::const_iterator iter = m_streamClientFiles.begin(), last = m_streamClientFiles.end(); iter != last; iter++) { - if (*iter == outputConnectionSpec) break; + if (*iter == outputConnection) break; streamClient++; } if (streamClient == m_streamClientFiles.size()) { if (m_streamClientFiles.size() < m_outputStreamingTool.size()) { - m_streamClientFiles.push_back(outputConnectionSpec); + m_streamClientFiles.push_back(outputConnection); } else { streamClient = 0; } @@ -356,35 +366,35 @@ StatusCode AthenaPoolCnvSvc::commitOutput(const std::string& outputConnectionSpe usleep(100); sc = m_outputStreamingTool[streamClient]->lockObject("release"); } - if (!this->cleanUp(outputConnectionSpec).isSuccess()) { + if (!this->cleanUp(outputConnection).isSuccess()) { ATH_MSG_ERROR("commitOutput FAILED to cleanup converters."); return(StatusCode::FAILURE); } return(StatusCode::SUCCESS); } - if (!m_outputStreamingTool.empty() && m_metadataContainerProp.value().empty() + if (!m_outputStreamingTool.empty() && !m_outputStreamingTool[0]->isClient() && m_metadataContainerProp.value().empty() && m_streamServer == m_outputStreamingTool.size()) { ATH_MSG_DEBUG("commitOutput SKIPPED for expired server."); return(StatusCode::SUCCESS); } - if (!m_outputStreamingTool.empty() && !m_outputStreamingTool[m_streamServer < m_outputStreamingTool.size() ? m_streamServer : 0]->isServer()) { + if (!m_outputStreamingTool.empty() && !m_outputStreamingTool[0]->isClient() && !m_outputStreamingTool[m_streamServer < m_outputStreamingTool.size() ? m_streamServer : 0]->isServer()) { ATH_MSG_DEBUG("commitOutput SKIPPED for uninitialized server: " << m_streamServer << "."); return(StatusCode::SUCCESS); } - if (!m_outputStreamingTool.empty() && m_streamServer == m_outputStreamingTool.size()) { + if (!m_outputStreamingTool.empty() && !m_outputStreamingTool[0]->isClient() && m_streamServer == m_outputStreamingTool.size()) { std::size_t streamClient = 0; for (std::vector<std::string>::const_iterator iter = m_streamClientFiles.begin(), last = m_streamClientFiles.end(); iter != last; iter++) { - if (*iter == outputConnectionSpec) break; + if (*iter == outputConnection) break; streamClient++; } if (streamClient == m_streamClientFiles.size()) { - ATH_MSG_DEBUG("commitOutput SKIPPED for unconnected file: " << outputConnectionSpec << "."); + ATH_MSG_DEBUG("commitOutput SKIPPED for unconnected file: " << outputConnection << "."); return(StatusCode::SUCCESS); } } std::map<void*, RootType> commitCache; std::string fileName; - if (!m_outputStreamingTool.empty() && m_streamServer < m_outputStreamingTool.size() + if (!m_outputStreamingTool.empty() && !m_outputStreamingTool[0]->isClient() && m_streamServer < m_outputStreamingTool.size() && m_outputStreamingTool[m_streamServer]->isServer()) { auto& streamingTool = m_outputStreamingTool[m_streamServer]; // Clear object to get Placements for all objects in a Stream @@ -544,14 +554,17 @@ StatusCode AthenaPoolCnvSvc::commitOutput(const std::string& outputConnectionSpe } else { ATH_MSG_INFO("Failed to get Data for client: " << num); } - return StatusCode::FAILURE; + return(StatusCode::FAILURE); } } + if (m_streamMetaDataOnly && !fileName.empty()) { + ATH_MSG_DEBUG("commitOutput SKIPPED for metadata-only server: " << outputConnectionSpec); + return(StatusCode::SUCCESS); + } if (m_doChronoStat) { m_chronoStatSvc->chronoStart("commitOutput"); } std::unique_lock<std::mutex> lock(m_mutex); - std::string outputConnection = outputConnectionSpec; if (outputConnection.empty()) { outputConnection = fileName; } @@ -588,7 +601,7 @@ StatusCode AthenaPoolCnvSvc::commitOutput(const std::string& outputConnectionSpe ATH_MSG_ERROR("commitOutput - caught exception: " << e.what()); return(StatusCode::FAILURE); } - if (!this->cleanUp(outputConnectionSpec).isSuccess()) { + if (!this->cleanUp(outputConnection).isSuccess()) { ATH_MSG_ERROR("commitOutput FAILED to cleanup converters."); return(StatusCode::FAILURE); } @@ -616,10 +629,12 @@ StatusCode AthenaPoolCnvSvc::commitOutput(const std::string& outputConnectionSpe //______________________________________________________________________________ StatusCode AthenaPoolCnvSvc::disconnectOutput(const std::string& outputConnectionSpec) { - if (!m_outputStreamingTool.empty() && m_outputStreamingTool[0]->isClient()) { + std::string outputConnection = outputConnectionSpec.substr(0, outputConnectionSpec.find("[")); + if (!m_outputStreamingTool.empty() && m_outputStreamingTool[0]->isClient() + && (!m_streamMetaDataOnly || outputConnectionSpec.find("[PoolContainerPrefix=" + m_metadataContainerProp.value() + "]") != std::string::npos)) { return(StatusCode::SUCCESS); } - if (!m_outputStreamingTool.empty()) { + if (!m_outputStreamingTool.empty() && !m_outputStreamingTool[0]->isClient()) { if (m_metadataContainerProp.value().empty() && m_streamServer == m_outputStreamingTool.size()) { ATH_MSG_DEBUG("disconnectOutput SKIPPED for expired server."); return(StatusCode::SUCCESS); @@ -632,7 +647,7 @@ StatusCode AthenaPoolCnvSvc::disconnectOutput(const std::string& outputConnectio } ATH_MSG_DEBUG("disconnectOutput not SKIPPED for server: " << m_streamServer); } - return m_poolSvc->disconnect(outputContextId(outputConnectionSpec)); + return m_poolSvc->disconnect(outputContextId(outputConnection)); } //______________________________________________________________________________ @@ -650,8 +665,12 @@ Token* AthenaPoolCnvSvc::registerForWrite(Placement* placement, const void* obj, if (m_doChronoStat) { m_chronoStatSvc->chronoStart("cRepR_ALL"); } + if (m_makeStreamingToolClient.value() > 0 && !m_outputStreamingTool.empty() && !m_outputStreamingTool[0]->isServer() && !m_outputStreamingTool[0]->isClient()) { + m_outputStreamingTool[0]->makeClient(m_makeStreamingToolClient.value()).ignore(); + } Token* token = nullptr; - if (!m_outputStreamingTool.empty() && m_outputStreamingTool[0]->isClient()) { + if (!m_outputStreamingTool.empty() && m_outputStreamingTool[0]->isClient() + && (!m_streamMetaDataOnly || placement->containerName().substr(0, m_metadataContainerProp.value().size()) == m_metadataContainerProp.value())) { std::size_t streamClient = 0; std::string fileName = placement->fileName(); for (std::vector<std::string>::const_iterator iter = m_streamClientFiles.begin(), last = m_streamClientFiles.end(); iter != last; iter++) { @@ -737,19 +756,20 @@ Token* AthenaPoolCnvSvc::registerForWrite(Placement* placement, const void* obj, tempToken->fromString(tokenStr); tokenStr = nullptr; tempToken->setClassID(pool::DbReflex::guid(classDesc)); token = tempToken; tempToken = nullptr; +// Client Write Request } else { - if (!m_outputStreamingTool.empty() && m_metadataContainerProp.value().empty() + if (!m_outputStreamingTool.empty() && !m_outputStreamingTool[0]->isClient() && m_metadataContainerProp.value().empty() && (m_streamServer == m_outputStreamingTool.size() || !m_outputStreamingTool[m_streamServer < m_outputStreamingTool.size() ? m_streamServer : 0]->isServer())) { ATH_MSG_DEBUG("registerForWrite SKIPPED for expired server, Placement = " << placement->toString()); Token* tempToken = new Token(); tempToken->setClassID(pool::DbReflex::guid(classDesc)); token = tempToken; tempToken = nullptr; - } else if (!m_outputStreamingTool.empty() && m_streamServer != m_outputStreamingTool.size() && !m_outputStreamingTool[m_streamServer < m_outputStreamingTool.size() ? m_streamServer : 0]->isServer()) { + } else if (!m_outputStreamingTool.empty() && !m_outputStreamingTool[0]->isClient() && m_streamServer != m_outputStreamingTool.size() && !m_outputStreamingTool[m_streamServer < m_outputStreamingTool.size() ? m_streamServer : 0]->isServer()) { ATH_MSG_DEBUG("registerForWrite SKIPPED for uninitialized server, Placement = " << placement->toString()); Token* tempToken = new Token(); tempToken->setClassID(pool::DbReflex::guid(classDesc)); token = tempToken; tempToken = nullptr; - } else if (!m_outputStreamingTool.empty() && m_streamServer == m_outputStreamingTool.size()) { + } else if (!m_outputStreamingTool.empty() && !m_outputStreamingTool[0]->isClient() && m_streamServer == m_outputStreamingTool.size()) { std::size_t streamClient = 0; std::string fileName = placement->fileName(); for (std::vector<std::string>::const_iterator iter = m_streamClientFiles.begin(), last = m_streamClientFiles.end(); iter != last; iter++) { diff --git a/Database/AthenaPOOL/AthenaPoolCnvSvc/src/AthenaPoolCnvSvc.h b/Database/AthenaPOOL/AthenaPoolCnvSvc/src/AthenaPoolCnvSvc.h index 291806cc2762bfb05097a275e060a4125b3455b2..17fc689c43323be76bb305d0137fa07208760deb 100644 --- a/Database/AthenaPOOL/AthenaPoolCnvSvc/src/AthenaPoolCnvSvc.h +++ b/Database/AthenaPOOL/AthenaPoolCnvSvc/src/AthenaPoolCnvSvc.h @@ -187,6 +187,8 @@ private: // data ToolHandleArray<IAthenaIPCTool> m_outputStreamingTool; //The following doesn't work because of Gaudi issue #122 //ToolHandleArray<IAthenaIPCTool> m_outputStreamingTool{this,"OutputStreamingTool", {} }; + IntegerProperty m_makeStreamingToolClient{this,"MakeStreamingToolClient",0}; + BooleanProperty m_streamMetaDataOnly{this,"StreamMetaDataOnly",false}; std::size_t m_streamServer=0; int m_metadataClient=0; diff --git a/Database/AthenaRoot/AthenaRootComps/src/RootOutputStreamTool.cxx b/Database/AthenaRoot/AthenaRootComps/src/RootOutputStreamTool.cxx index 0784c2c4898e0cb526e58a8201aa771d6d03dc76..2a7b1693cf1b808ddba2e594ede0309a1295149d 100755 --- a/Database/AthenaRoot/AthenaRootComps/src/RootOutputStreamTool.cxx +++ b/Database/AthenaRoot/AthenaRootComps/src/RootOutputStreamTool.cxx @@ -109,7 +109,7 @@ StatusCode RootOutputStreamTool::connectOutput(const std::string& outputName) { return StatusCode::SUCCESS; } -StatusCode RootOutputStreamTool::commitOutput() { +StatusCode RootOutputStreamTool::commitOutput(bool/* doCommit*/) { ATH_MSG_VERBOSE("commitOutput"); if (m_outputName.empty()) { ATH_MSG_ERROR("Unable to commit, no output connected."); diff --git a/Database/AthenaRoot/AthenaRootComps/src/RootOutputStreamTool.h b/Database/AthenaRoot/AthenaRootComps/src/RootOutputStreamTool.h index c6313d465a37ddd4ebb3fcdc827980714e0e5535..3865e2c8c6524aa652cff89bea907ebfb426490d 100755 --- a/Database/AthenaRoot/AthenaRootComps/src/RootOutputStreamTool.h +++ b/Database/AthenaRoot/AthenaRootComps/src/RootOutputStreamTool.h @@ -54,7 +54,7 @@ public: /// Commit the output stream after having streamed out objects /// Must commitOutput AFTER streaming - StatusCode commitOutput(); + StatusCode commitOutput(bool doCommit = false); /// Finalize the output stream after the last commit, e.g. in /// finalize diff --git a/Event/ByteStreamCnvSvc/src/ByteStreamOutputStreamCopyTool.cxx b/Event/ByteStreamCnvSvc/src/ByteStreamOutputStreamCopyTool.cxx index 2494f4b0515c274dadbc7daba619d9e26abc6026..a562870a4f4afd7bd72f18769c6c357fedc0c444 100644 --- a/Event/ByteStreamCnvSvc/src/ByteStreamOutputStreamCopyTool.cxx +++ b/Event/ByteStreamCnvSvc/src/ByteStreamOutputStreamCopyTool.cxx @@ -83,7 +83,7 @@ StatusCode ByteStreamOutputStreamCopyTool::connectOutput(const std::string& /*ou return(StatusCode::SUCCESS); } //__________________________________________________________________________ -StatusCode ByteStreamOutputStreamCopyTool::commitOutput() { +StatusCode ByteStreamOutputStreamCopyTool::commitOutput(bool/* doCommit*/) { MsgStream log(msgSvc(), name()); log << MSG::DEBUG << "In commitOutput" << endmsg; const RawEvent* re_c = m_inputSvc->currentEvent() ; diff --git a/Event/ByteStreamCnvSvc/src/ByteStreamOutputStreamCopyTool.h b/Event/ByteStreamCnvSvc/src/ByteStreamOutputStreamCopyTool.h index aa5fc3fd025fe3f27b13ef7316eeb9069c1eef51..2664ae81a741cf4560638bc6fb1ab52930e022aa 100644 --- a/Event/ByteStreamCnvSvc/src/ByteStreamOutputStreamCopyTool.h +++ b/Event/ByteStreamCnvSvc/src/ByteStreamOutputStreamCopyTool.h @@ -55,7 +55,7 @@ public: /// Commit the output stream after having streamed out objects /// Must commitOutput AFTER streaming - virtual StatusCode commitOutput() ; + virtual StatusCode commitOutput(bool doCommit = false) ; /// Finalize the output stream after the last commit, e.g. in /// finalize diff --git a/Event/EventOverlay/OverlayByteStreamAlgs/src/ByteStreamMultipleOutputStreamCopyTool.cxx b/Event/EventOverlay/OverlayByteStreamAlgs/src/ByteStreamMultipleOutputStreamCopyTool.cxx index 4c7b493ac812bf6e966afc3edde00af9039d7a0a..fa189a5539cde56418304f273bea6f517227f614 100644 --- a/Event/EventOverlay/OverlayByteStreamAlgs/src/ByteStreamMultipleOutputStreamCopyTool.cxx +++ b/Event/EventOverlay/OverlayByteStreamAlgs/src/ByteStreamMultipleOutputStreamCopyTool.cxx @@ -310,7 +310,7 @@ StatusCode ByteStreamMultipleOutputStreamCopyTool::connectOutput(const std::stri return StatusCode::SUCCESS; } //__________________________________________________________________________ -StatusCode ByteStreamMultipleOutputStreamCopyTool::commitOutput() { +StatusCode ByteStreamMultipleOutputStreamCopyTool::commitOutput(bool/* doCommit*/) { ATH_MSG_DEBUG( "In commitOutput" ); const RawEvent* re_c = m_inputSvc->currentEvent() ; if(!re_c){ diff --git a/Event/EventOverlay/OverlayByteStreamAlgs/src/ByteStreamMultipleOutputStreamCopyTool.h b/Event/EventOverlay/OverlayByteStreamAlgs/src/ByteStreamMultipleOutputStreamCopyTool.h index b417768974fc95421eafc878a594bdf10415c08f..e5485fe7f23609288986e42eca27c251fa155b23 100644 --- a/Event/EventOverlay/OverlayByteStreamAlgs/src/ByteStreamMultipleOutputStreamCopyTool.h +++ b/Event/EventOverlay/OverlayByteStreamAlgs/src/ByteStreamMultipleOutputStreamCopyTool.h @@ -66,7 +66,7 @@ public: /// Commit the output stream after having streamed out objects /// Must commitOutput AFTER streaming - virtual StatusCode commitOutput() ; + virtual StatusCode commitOutput(bool doCommit = false) ; /// Finalize the output stream after the last commit, e.g. in /// finalize