From 8ac47077c091bd84d41156dae76165712900af7c Mon Sep 17 00:00:00 2001 From: Peter Van Gemmeren <peter.van.gemmeren@cern.ch> Date: Wed, 9 Sep 2020 05:17:10 +0000 Subject: [PATCH] Migrate Shared I/O to use AthenaSharedWriterSvc --- .../AthenaIPCTools/src/AthenaSharedWriter.cxx | 57 ++---- .../AthenaIPCTools/src/AthenaSharedWriter.h | 9 +- .../AthenaKernel/IAthenaSharedWriterSvc.h | 17 ++ .../src/IAthenaSharedWriterSvc.cxx | 13 ++ .../AthenaMPTools/src/SharedWriterTool.cxx | 30 ++- .../AthenaServices/src/AthenaOutputStream.cxx | 5 +- .../AthenaPoolCnvSvc/src/AthenaPoolCnvSvc.cxx | 83 ++++---- .../AthenaPoolCnvSvc/src/AthenaPoolCnvSvc.h | 4 +- .../src/AthenaRootSharedWriter.cxx | 189 ------------------ .../src/AthenaRootSharedWriter.h | 47 ----- .../src/AthenaRootSharedWriterSvc.cxx | 174 ++++++++++++++++ .../src/AthenaRootSharedWriterSvc.h | 54 +++++ .../components/AthenaPoolCnvSvc_entries.cxx | 4 +- Database/IOVDbSvc/src/IOVDbSvc.cxx | 1 + 14 files changed, 358 insertions(+), 329 deletions(-) create mode 100644 Control/AthenaKernel/AthenaKernel/IAthenaSharedWriterSvc.h create mode 100644 Control/AthenaKernel/src/IAthenaSharedWriterSvc.cxx delete mode 100644 Database/AthenaPOOL/AthenaPoolCnvSvc/src/AthenaRootSharedWriter.cxx delete mode 100644 Database/AthenaPOOL/AthenaPoolCnvSvc/src/AthenaRootSharedWriter.h create mode 100644 Database/AthenaPOOL/AthenaPoolCnvSvc/src/AthenaRootSharedWriterSvc.cxx create mode 100644 Database/AthenaPOOL/AthenaPoolCnvSvc/src/AthenaRootSharedWriterSvc.h diff --git a/Control/AthenaIPCTools/src/AthenaSharedWriter.cxx b/Control/AthenaIPCTools/src/AthenaSharedWriter.cxx index 0230a7147fbd..74b6027d7e1e 100644 --- a/Control/AthenaIPCTools/src/AthenaSharedWriter.cxx +++ b/Control/AthenaIPCTools/src/AthenaSharedWriter.cxx @@ -7,70 +7,49 @@ * @author Peter van Gemmeren <gemmeren@anl.gov> **/ -#include "AthenaBaseComps/AthCnvSvc.h" -#include "AthenaKernel/IDataShare.h" - #include "AthenaSharedWriter.h" +#include "AthenaKernel/IAthenaSharedWriterSvc.h" +#include "AthenaKernel/IDataShare.h" +#include "AthenaBaseComps/AthCnvSvc.h" //___________________________________________________________________________ AthenaSharedWriter::AthenaSharedWriter(const std::string& name, ISvcLocator* pSvcLocator) - : AthFilterAlgorithm(name, pSvcLocator) , m_cnvSvc(nullptr) { -} -//___________________________________________________________________________ -AthenaSharedWriter::~AthenaSharedWriter() { + : AthFilterAlgorithm(name, pSvcLocator) { } //___________________________________________________________________________ StatusCode AthenaSharedWriter::initialize() { ATH_MSG_INFO("in initialize()"); - - StatusCode sc = serviceLocator()->service("AthenaPoolCnvSvc", m_cnvSvc); - if (sc.isFailure() || m_cnvSvc == nullptr) { - ATH_MSG_FATAL("Could not retrieve AthenaPoolCnvSvc"); - return StatusCode::FAILURE; - } + // Initialize IConversionSvc + ATH_CHECK(m_cnvSvc.retrieve()); // Use IDataShare to make ConversionSvc a Share Server - IDataShare* dataShare = dynamic_cast<IDataShare*>(m_cnvSvc); + IDataShare* dataShare = dynamic_cast<IDataShare*>(m_cnvSvc.get()); if (dataShare == nullptr || !dataShare->makeServer(-m_numberOfClients.value() - 1).isSuccess()) { ATH_MSG_FATAL("Could not make AthenaPoolCnvSvc a share server: " << dataShare); return StatusCode::FAILURE; } else { ATH_MSG_DEBUG("Successfully made the conversion service a share server"); } + // Initialize IAthenaSharedWriterSvc + ATH_CHECK(m_sharedWriterSvc.retrieve()); return StatusCode::SUCCESS; } //___________________________________________________________________________ StatusCode AthenaSharedWriter::execute() { ATH_MSG_DEBUG("in execute()"); - int counter = 0; - bool doCommit = false; - StatusCode sc = m_cnvSvc->commitOutput("", false); - ATH_MSG_VERBOSE("Start commitOutput loop"); - while (sc.isSuccess() || sc.isRecoverable()) { - if (sc.isRecoverable()) { - usleep(100); - } else { - counter++; - if (m_autoSend.value() > 0 && counter% m_autoSend.value() == 0) { - doCommit = true; - ATH_MSG_INFO("commitOutput sending data."); - } else { - doCommit = false; - } - } - sc = m_cnvSvc->commitOutput("", doCommit); - } - - AthCnvSvc* cnvSvc = dynamic_cast<AthCnvSvc*>(m_cnvSvc); - if (cnvSvc == nullptr || !cnvSvc->disconnectOutput("").isSuccess()) { - ATH_MSG_FATAL("Could not disconnectOutput"); - return StatusCode::FAILURE; - } - setFilterPassed(false); // don't output events return StatusCode::SUCCESS; } //___________________________________________________________________________ StatusCode AthenaSharedWriter::finalize() { ATH_MSG_INFO("in finalize()"); + AthCnvSvc* cnvSvc = dynamic_cast<AthCnvSvc*>(m_cnvSvc.get()); + if (cnvSvc == nullptr || !cnvSvc->disconnectOutput("").isSuccess()) { + ATH_MSG_FATAL("Could not disconnectOutput"); + return StatusCode::FAILURE; + } + // Release IAthenaSharedWriterSvc + if (!m_sharedWriterSvc.release().isSuccess()) { + ATH_MSG_WARNING("Could not release IAthenaSharedWriterSvc"); + } return StatusCode::SUCCESS; } diff --git a/Control/AthenaIPCTools/src/AthenaSharedWriter.h b/Control/AthenaIPCTools/src/AthenaSharedWriter.h index 80562df0fb39..ae8b297f4a12 100644 --- a/Control/AthenaIPCTools/src/AthenaSharedWriter.h +++ b/Control/AthenaIPCTools/src/AthenaSharedWriter.h @@ -11,8 +11,9 @@ **/ #include "AthenaBaseComps/AthFilterAlgorithm.h" +#include "AthenaKernel/IAthenaSharedWriterSvc.h" -class IConversionSvc; +#include "GaudiKernel/IConversionSvc.h" /** @class AthenaSharedWriter * @brief This class provides an example for writing event data objects to Pool. @@ -22,7 +23,7 @@ public: // Constructor and Destructor /// Standard Service Constructor AthenaSharedWriter(const std::string& name, ISvcLocator* pSvcLocator); /// Destructor - virtual ~AthenaSharedWriter(); + virtual ~AthenaSharedWriter() = default; public: /// Gaudi Service Interface method implementations: @@ -32,10 +33,10 @@ public: private: // properties IntegerProperty m_numberOfClients{this,"NumberOfClients",1}; - IntegerProperty m_autoSend{this,"AutoSend",-1}; private: - IConversionSvc* m_cnvSvc; + ServiceHandle<IConversionSvc> m_cnvSvc{this,"AthenaPoolCnvSvc","AthenaPoolCnvSvc"}; + ServiceHandle<IAthenaSharedWriterSvc> m_sharedWriterSvc{this,"AthenaRootSharedWriterSvc","AthenaRootSharedWriterSvc"}; }; #endif diff --git a/Control/AthenaKernel/AthenaKernel/IAthenaSharedWriterSvc.h b/Control/AthenaKernel/AthenaKernel/IAthenaSharedWriterSvc.h new file mode 100644 index 000000000000..43200e7b0c31 --- /dev/null +++ b/Control/AthenaKernel/AthenaKernel/IAthenaSharedWriterSvc.h @@ -0,0 +1,17 @@ +/* + Copyright (C) 2002-2020 CERN for the benefit of the ATLAS collaboration +*/ + +#ifndef ATHENAKERNEL_IATHENASHAREDWRITERSVC_H +#define ATHENAKERNEL_IATHENASHAREDWRITERSVC_H + +#include "GaudiKernel/IService.h" + +static const InterfaceID IID_IAthenaSharedWriterSvc( "IAthenaSharedWriterSvc", 1, 0 ); + +class IAthenaSharedWriterSvc : virtual public ::IService { +public: + static const InterfaceID& interfaceID() { return IID_IAthenaSharedWriterSvc; } +}; + +#endif diff --git a/Control/AthenaKernel/src/IAthenaSharedWriterSvc.cxx b/Control/AthenaKernel/src/IAthenaSharedWriterSvc.cxx new file mode 100644 index 000000000000..905ccc16ef08 --- /dev/null +++ b/Control/AthenaKernel/src/IAthenaSharedWriterSvc.cxx @@ -0,0 +1,13 @@ +///////////////////////// -*- C++ -*- ///////////////////////////// + +/* + Copyright (C) 2002-2020 CERN for the benefit of the ATLAS collaboration +*/ + +// IAthenaSharedWriterSvc.cxx +// Implementation file for class IAthenaSharedWriterSvc +// Author: P.van Gemmeren<gemmeren@anl.gov> +/////////////////////////////////////////////////////////////////// + +// AthenaKernel includes +#include "AthenaKernel/IAthenaSharedWriterSvc.h" diff --git a/Control/AthenaMPTools/src/SharedWriterTool.cxx b/Control/AthenaMPTools/src/SharedWriterTool.cxx index 7c49f4776df2..4b743a02e587 100644 --- a/Control/AthenaMPTools/src/SharedWriterTool.cxx +++ b/Control/AthenaMPTools/src/SharedWriterTool.cxx @@ -7,6 +7,7 @@ #include "AthenaKernel/IEventShare.h" #include "AthenaKernel/IDataShare.h" +#include "AthenaKernel/IAthenaSharedWriterSvc.h" #include "GaudiKernel/IEvtSelector.h" #include "GaudiKernel/IConversionSvc.h" #include "GaudiKernel/IIoComponentMgr.h" @@ -79,6 +80,24 @@ int SharedWriterTool::makePool(int /*maxevt*/, int nprocs, const std::string& to else { m_writer = writeClientsProp.value().size(); } + propertyName = "StreamMetaDataOnly"; + bool streamMetaDataOnly(false); + BooleanProperty streamMetaDataOnlyProp(propertyName,streamMetaDataOnly); + if(propertyServer->getProperty(&streamMetaDataOnlyProp).isFailure()) { + ATH_MSG_INFO("Conversion service does not have StreamMetaDataOnly property"); + } + else { + IService* poolSvc; + if(serviceLocator()->service("PoolSvc", poolSvc).isFailure() || poolSvc==0) { + ATH_MSG_ERROR("Error retrieving PoolSvc"); + } + else if(streamMetaDataOnlyProp.value()) { + propertyServer = dynamic_cast<IProperty*>(poolSvc); + if (propertyServer==0 || propertyServer->setProperty("FileOpen", "update").isFailure()) { + ATH_MSG_ERROR("Could not change PoolSvc FileOpen Property"); + } + } + } } // Create rank queue and fill it @@ -219,12 +238,11 @@ std::unique_ptr<AthenaInterprocess::ScheduledWork> SharedWriterTool::exec_func() ATH_MSG_INFO("Exec function in the AthenaMP Shared Writer PID=" << getpid()); bool all_ok=true; - StatusCode sc = m_cnvSvc->commitOutput("", false); - while(sc.isSuccess() || sc.isRecoverable()) { - if (sc.isRecoverable()) { - usleep(100); - } - sc = m_cnvSvc->commitOutput("", false); + IAthenaSharedWriterSvc* sharedWriterSvc; + StatusCode sc = serviceLocator()->service("AthenaRootSharedWriterSvc", sharedWriterSvc); + if(sc.isFailure() || sharedWriterSvc==0) { + ATH_MSG_ERROR("Error retrieving AthenaRootSharedWriterSvc"); + all_ok=false; } AthCnvSvc* cnvSvc = dynamic_cast<AthCnvSvc*>(m_cnvSvc); if (cnvSvc == 0 || !cnvSvc->disconnectOutput("").isSuccess()) { diff --git a/Control/AthenaServices/src/AthenaOutputStream.cxx b/Control/AthenaServices/src/AthenaOutputStream.cxx index 6d521d15da2f..b754e0487888 100644 --- a/Control/AthenaServices/src/AthenaOutputStream.cxx +++ b/Control/AthenaServices/src/AthenaOutputStream.cxx @@ -605,7 +605,7 @@ StatusCode AthenaOutputStream::write() { } } bool doCommit = false; - if (m_events % m_autoSend.value() == 0 && outputFN.find("?pmerge=") != std::string::npos) { + if (m_autoSend.value() > 0 && m_events % m_autoSend.value() == 0) { doCommit = true; ATH_MSG_DEBUG("commitOutput sending data."); } @@ -1157,6 +1157,9 @@ StatusCode AthenaOutputStream::io_finalize() { ATH_MSG_FATAL("Cannot get the IncidentSvc"); return StatusCode::FAILURE; } + if (m_dataStore->clearStore().isSuccess()) { + ATH_MSG_WARNING("Cannot clear the DataStore"); + } incSvc->removeListener(this, "MetaDataStop"); return StatusCode::SUCCESS; } diff --git a/Database/AthenaPOOL/AthenaPoolCnvSvc/src/AthenaPoolCnvSvc.cxx b/Database/AthenaPOOL/AthenaPoolCnvSvc/src/AthenaPoolCnvSvc.cxx index 43aa3e70f5fd..2a8d9cb1c4dc 100644 --- a/Database/AthenaPOOL/AthenaPoolCnvSvc/src/AthenaPoolCnvSvc.cxx +++ b/Database/AthenaPOOL/AthenaPoolCnvSvc/src/AthenaPoolCnvSvc.cxx @@ -36,53 +36,31 @@ //______________________________________________________________________________ // Initialize the service. StatusCode AthenaPoolCnvSvc::initialize() { - ATH_MSG_INFO("Initializing " << name() << " - package version " << PACKAGE_VERSION); - if (!::AthCnvSvc::initialize().isSuccess()) { - ATH_MSG_FATAL("Cannot initialize ConversionSvc base class."); - return(StatusCode::FAILURE); - } - // Initialize DataModelCompatSvc ServiceHandle<IService> dmcsvc("DataModelCompatSvc", this->name()); - if (!dmcsvc.retrieve().isSuccess()) { - ATH_MSG_FATAL("Cannot get DataModelCompatSvc."); - return(StatusCode::FAILURE); - } + ATH_CHECK(dmcsvc.retrieve()); // Retrieve PoolSvc - if (!m_poolSvc.retrieve().isSuccess()) { - ATH_MSG_FATAL("Cannot get PoolSvc."); - return(StatusCode::FAILURE); - } + ATH_CHECK(m_poolSvc.retrieve()); // Retrieve ChronoStatSvc - if (!m_chronoStatSvc.retrieve().isSuccess()) { - ATH_MSG_FATAL("Cannot get ChronoStatSvc."); - return(StatusCode::FAILURE); - } + ATH_CHECK(m_chronoStatSvc.retrieve()); // Retrieve ClassIDSvc - if (!m_clidSvc.retrieve().isSuccess()) { - ATH_MSG_FATAL("Cannot get ClassIDSvc."); - return(StatusCode::FAILURE); - } + ATH_CHECK(m_clidSvc.retrieve()); // Retrieve InputStreamingTool (if configured) if (!m_inputStreamingTool.empty()) { - if (!m_inputStreamingTool.retrieve().isSuccess()) { - ATH_MSG_FATAL("Cannot get Input AthenaIPCTool"); - return(StatusCode::FAILURE); - } + ATH_CHECK(m_inputStreamingTool.retrieve()); } // Retrieve OutputStreamingTool (if configured) if (!m_outputStreamingTool.empty()) { m_streamClientFiles = m_streamClientFilesProp.value(); - if (!m_outputStreamingTool.retrieve().isSuccess()) { - ATH_MSG_FATAL("Cannot get Output AthenaIPCTool"); - return(StatusCode::FAILURE); - } + ATH_CHECK(m_outputStreamingTool.retrieve()); } if (!m_inputStreamingTool.empty() || !m_outputStreamingTool.empty()) { // Retrieve AthenaSerializeSvc - if (!m_serializeSvc.retrieve().isSuccess()) { - ATH_MSG_FATAL("Cannot get AthenaSerializeSvc."); - return(StatusCode::FAILURE); + ATH_CHECK(m_serializeSvc.retrieve()); + if (!m_outputStreamingTool.empty() && m_makeStreamingToolClient.value() == -1) { + // Initialize AthenaRootSharedWriter + ServiceHandle<IService> arswsvc("AthenaRootSharedWriterSvc", this->name()); + ATH_CHECK(arswsvc.retrieve()); } } // Extracting MaxFileSizes for global default and map by Database name. @@ -158,7 +136,7 @@ StatusCode AthenaPoolCnvSvc::finalize() { m_cnvs.clear(); m_cnvs.shrink_to_fit(); - return(::AthCnvSvc::finalize()); + return(StatusCode::SUCCESS); } //_______________________________________________________________________ StatusCode AthenaPoolCnvSvc::queryInterface(const InterfaceID& riid, void** ppvInterface) { @@ -314,6 +292,9 @@ StatusCode AthenaPoolCnvSvc::connectOutput(const std::string& outputConnectionSp } } + if (!m_outputStreamingTool.empty() && m_outputStreamingTool[0]->isClient() && m_streamMetaDataOnly) { + outputConnection = outputConnection + "?pmerge=localhost:1095"; + } unsigned int contextId = outputContextId(outputConnection); try { if (!m_poolSvc->connect(pool::ITransaction::UPDATE, contextId).isSuccess()) { @@ -335,6 +316,15 @@ StatusCode AthenaPoolCnvSvc::connectOutput(const std::string& outputConnectionSp m_domainAttr.push_back(maxFileSize); // Extracting OUTPUT POOL ItechnologySpecificAttributes for Domain, Database and Container. extractPoolAttributes(m_poolAttr, &m_containerAttr, &m_databaseAttr, &m_domainAttr); + for (std::vector<std::vector<std::string> >::iterator iter = m_databaseAttr.begin(), last = m_databaseAttr.end(); + iter != last; ++iter) { + const std::string& opt = (*iter)[0]; + std::string data = (*iter)[1]; + const std::string& file = (*iter)[2]; + if (opt == "TREE_AUTO_FLUSH" && data != "int" && data != "DbLonglong" && data != "double" && data != "string") { + m_fileFlushSetting[file] = atoi(data.c_str()); + } + } } if (!processPoolAttributes(m_domainAttr, outputConnection, contextId).isSuccess()) { ATH_MSG_DEBUG("connectOutput failed process POOL domain attributes."); @@ -575,6 +565,14 @@ StatusCode AthenaPoolCnvSvc::commitOutput(const std::string& outputConnectionSpe ATH_MSG_ERROR("connectOutput FAILED extract file name and technology."); return(StatusCode::FAILURE); } + if (!m_outputStreamingTool.empty() && m_outputStreamingTool[0]->isClient() && m_streamMetaDataOnly) { + m_fileCommitCounter[outputConnection] = m_fileCommitCounter[outputConnection] + 1; + if (m_fileFlushSetting[outputConnection] > 0 && m_fileCommitCounter[outputConnection] % m_fileFlushSetting[outputConnection] == 0) { + doCommit = true; + ATH_MSG_DEBUG("commitOutput sending data."); + } + outputConnection = outputConnection + "?pmerge=localhost:1095"; + } unsigned int contextId = outputContextId(outputConnection); if (!processPoolAttributes(m_domainAttr, outputConnection, contextId).isSuccess()) { ATH_MSG_DEBUG("commitOutput failed process POOL domain attributes."); @@ -648,7 +646,12 @@ StatusCode AthenaPoolCnvSvc::disconnectOutput(const std::string& outputConnectio } ATH_MSG_DEBUG("disconnectOutput not SKIPPED for server: " << m_streamServer); } - return m_poolSvc->disconnect(outputContextId(outputConnection)); + if (!m_outputStreamingTool.empty() && m_outputStreamingTool[0]->isClient() && m_streamMetaDataOnly) { + outputConnection = outputConnection + "?pmerge=localhost:1095"; + } + unsigned int contextId = outputContextId(outputConnection); + StatusCode sc = m_poolSvc->disconnect(contextId); + return sc; } //______________________________________________________________________________ @@ -787,6 +790,9 @@ Token* AthenaPoolCnvSvc::registerForWrite(Placement* placement, const void* obj, token = m_poolSvc->registerForWrite(placement, obj, classDesc); } } else { + if (!m_outputStreamingTool.empty() && m_outputStreamingTool[0]->isClient() && m_streamMetaDataOnly) { + placement->setFileName(placement->fileName() + "?pmerge=localhost:1095"); + } if (m_persSvcPerOutput) { char text[32]; ::sprintf(text, "[CTXT=%08X]", m_poolSvc->getOutputContext(placement->fileName())); @@ -1168,9 +1174,6 @@ AthenaPoolCnvSvc::AthenaPoolCnvSvc(const std::string& name, ISvcLocator* pSvcLoc { declareProperty("OutputStreamingTool", m_outputStreamingTool); } -//______________________________________________________________________________ -AthenaPoolCnvSvc::~AthenaPoolCnvSvc() { -} //__________________________________________________________________________ void AthenaPoolCnvSvc::extractPoolAttributes(const StringArrayProperty& property, std::vector<std::vector<std::string> >* contAttr, @@ -1265,8 +1268,8 @@ StatusCode AthenaPoolCnvSvc::processPoolAttributes(std::vector<std::vector<std:: std::string data = (*iter)[1]; const std::string& file = (*iter)[2]; const std::string& cont = (*iter)[3]; - if (!fileName.empty() && (file == fileName || (file.substr(0, 1) == "*" - && file.find("," + fileName + ",") == std::string::npos))) { + if (!fileName.empty() && (file == fileName.substr(0, fileName.find("?")) + || (file.substr(0, 1) == "*" && file.find("," + fileName + ",") == std::string::npos))) { if (data == "int" || data == "DbLonglong" || data == "double" || data == "string") { if (doGet) { if (!m_poolSvc->getAttribute(opt, data, pool::DbType(pool::ROOTTREE_StorageType).type(), fileName, cont, contextId).isSuccess()) { diff --git a/Database/AthenaPOOL/AthenaPoolCnvSvc/src/AthenaPoolCnvSvc.h b/Database/AthenaPOOL/AthenaPoolCnvSvc/src/AthenaPoolCnvSvc.h index be992df67047..8212f1a69e1e 100644 --- a/Database/AthenaPOOL/AthenaPoolCnvSvc/src/AthenaPoolCnvSvc.h +++ b/Database/AthenaPOOL/AthenaPoolCnvSvc/src/AthenaPoolCnvSvc.h @@ -159,7 +159,7 @@ public: /// Standard Service Constructor AthenaPoolCnvSvc(const std::string& name, ISvcLocator* pSvcLocator); /// Destructor - virtual ~AthenaPoolCnvSvc(); + virtual ~AthenaPoolCnvSvc() = default; private: // member functions /// Extract POOL ItechnologySpecificAttributes for Domain, Database and Container from property. @@ -210,6 +210,8 @@ private: // properties std::vector<std::vector<std::string> > m_databaseAttr; std::vector<std::vector<std::string> > m_containerAttr; std::vector<unsigned int> m_contextAttr; + std::map<std::string, int> m_fileCommitCounter; + std::map<std::string, int> m_fileFlushSetting; /// Input PoolAttributes, vector with names and values of technology specific attributes for POOL StringArrayProperty m_inputPoolAttr{this,"InputPoolAttributes",{}}; std::vector<std::vector<std::string> > m_inputAttr; diff --git a/Database/AthenaPOOL/AthenaPoolCnvSvc/src/AthenaRootSharedWriter.cxx b/Database/AthenaPOOL/AthenaPoolCnvSvc/src/AthenaRootSharedWriter.cxx deleted file mode 100644 index 6c06c685f5a9..000000000000 --- a/Database/AthenaPOOL/AthenaPoolCnvSvc/src/AthenaRootSharedWriter.cxx +++ /dev/null @@ -1,189 +0,0 @@ -/* - Copyright (C) 2002-2020 CERN for the benefit of the ATLAS collaboration -*/ - -/** @file AthenaRootSharedWriter.cxx - * @brief This file contains the implementation for the AthenaRootSharedWriter class. - * @author Peter van Gemmeren <gemmeren@anl.gov> - **/ - -#include "AthenaBaseComps/AthCnvSvc.h" -#include "AthenaKernel/IDataShare.h" - -#include "AthenaRootSharedWriter.h" - -#include "TFile.h" -#include "TFileMerger.h" -#include "TMessage.h" -#include "TMemFile.h" -#include "TMonitor.h" -#include "TServerSocket.h" -#include "TSocket.h" - -/* Code from ROOT tutorials/net/parallelMergeServer.C, reduced to handle TTrees only */ - -struct ParallelFileMerger : public TObject -{ - TString fFilename; - TFileMerger fMerger; - - ParallelFileMerger(const char *filename) : fFilename(filename), fMerger(kFALSE, kTRUE) - { - fMerger.OutputFile(filename, "RECREATE"); - } - - ~ParallelFileMerger() - { - } - - ULong_t Hash() const - { - return fFilename.Hash(); - } - - const char* GetName() const - { - return fFilename; - } - - Bool_t MergeTrees(TFile *input) - { - fMerger.AddFile(input); - Bool_t result = fMerger.PartialMerge(TFileMerger::kIncremental | TFileMerger::kResetable); - return result; - } -}; - -//___________________________________________________________________________ -AthenaRootSharedWriter::AthenaRootSharedWriter(const std::string& name, ISvcLocator* pSvcLocator) - : AthFilterAlgorithm(name, pSvcLocator) , m_cnvSvc(nullptr) { -} -//___________________________________________________________________________ -AthenaRootSharedWriter::~AthenaRootSharedWriter() { -} -//___________________________________________________________________________ -StatusCode AthenaRootSharedWriter::initialize() { - ATH_MSG_INFO("in initialize()"); - - StatusCode sc = serviceLocator()->service("AthenaPoolCnvSvc", m_cnvSvc); - if (sc.isFailure() || m_cnvSvc == nullptr) { - ATH_MSG_FATAL("Could not retrieve AthenaPoolCnvSvc"); - return StatusCode::FAILURE; - } - // Use IDataShare to make ConversionSvc a Share Server - IDataShare* dataShare = dynamic_cast<IDataShare*>(m_cnvSvc); - if (dataShare == nullptr || !dataShare->makeServer(-m_numberOfClients.value() - 1).isSuccess()) { - ATH_MSG_FATAL("Could not make AthenaPoolCnvSvc a share server: " << dataShare); - return StatusCode::FAILURE; - } else { - ATH_MSG_DEBUG("Successfully made the conversion service a share server"); - } - - m_rootServerSocket = new TServerSocket(1095, true, 100); - if (m_rootServerSocket == nullptr || !m_rootServerSocket->IsValid()) { - ATH_MSG_FATAL("Could not create ROOT TServerSocket"); - return StatusCode::FAILURE; - } - m_rootMonitor = new TMonitor; - m_rootMonitor->Add(m_rootServerSocket); - ATH_MSG_DEBUG("Successfully created ROOT TServerSocket and added it to TMonitor: ready to accept connections!"); - - return StatusCode::SUCCESS; -} -//___________________________________________________________________________ -StatusCode AthenaRootSharedWriter::execute() { - ATH_MSG_DEBUG("in execute()"); - int counter = 0; - bool doCommit = false; - StatusCode sc = m_cnvSvc->commitOutput("", false); - ATH_MSG_VERBOSE("Start commitOutput loop"); - - int clientIndex = 0; - int clientCount = 0; - THashTable mergers; - - while (sc.isSuccess() || sc.isRecoverable()) { - if (sc.isRecoverable()) { - ATH_MSG_VERBOSE("commitOutput not doing anything."); - TSocket* socket = m_rootMonitor->Select(1); - if (socket == nullptr || socket == (TSocket*)-1) { - ATH_MSG_VERBOSE("ROOT Monitor did not do anything."); - } else { - ATH_MSG_DEBUG("ROOT Monitor got: " << socket); - if (socket->IsA() == TServerSocket::Class()) { - TSocket* client = ((TServerSocket*)socket)->Accept(); - client->Send(clientIndex, 0); - client->Send(1, 1); - ++clientIndex; - ++clientCount; - m_rootMonitor->Add(client); - ATH_MSG_INFO("ROOT Monitor add client: " << clientIndex << ", " << client); - } else { - TMessage* message; - socket->Recv(message); - if (message == nullptr) { - ATH_MSG_WARNING("ROOT Monitor got no message from socket: " << socket); - } else if (message->What() == kMESS_STRING) { - char str[64]; - message->ReadString(str, 64); - ATH_MSG_INFO("ROOT Monitor client: " << socket << ", " << str); - m_rootMonitor->Remove(socket); - ATH_MSG_INFO("ROOT Monitor client: " << socket << ", " << socket->GetBytesRecv() << ", " << socket->GetBytesSent()); - socket->Close(); - --clientCount; - if (m_rootMonitor->GetActive() == 0 || clientCount == 0) { - ATH_MSG_INFO("ROOT Monitor: No more active clients..."); - } - } else if (message->What() == kMESS_ANY) { - long long length; - TString filename; - int clientId; - message->ReadInt(clientId); - message->ReadTString(filename); - message->ReadLong64(length); - ATH_MSG_DEBUG("ROOT Monitor client: " << socket << ", " << clientId << ": " << filename << ", " << length); - TMemFile *transient = new TMemFile(filename, message->Buffer() + message->Length(), length, "UPDATE"); - message->SetBufferOffset(message->Length() + length); - ParallelFileMerger* info = static_cast<ParallelFileMerger*>(mergers.FindObject(filename)); - if (!info) { - info = new ParallelFileMerger(filename); - mergers.Add(info); - ATH_MSG_DEBUG("ROOT Monitor ParallelFileMerger: " << info << ", for: " << filename); - } - info->MergeTrees(transient); - delete transient; transient = nullptr; - } - } - } - } else { - counter++; - if (m_autoSend.value() > 0 && counter% m_autoSend.value() == 0) { - doCommit = true; - ATH_MSG_INFO("commitOutput sending data."); - } else { - doCommit = false; - ATH_MSG_DEBUG("commitOutput not sending data."); - } - } - sc = m_cnvSvc->commitOutput("", doCommit); - } - ATH_MSG_INFO("End commitOutput loop"); - - mergers.Delete(); - - AthCnvSvc* cnvSvc = dynamic_cast<AthCnvSvc*>(m_cnvSvc); - if (cnvSvc == nullptr || !cnvSvc->disconnectOutput("").isSuccess()) { - ATH_MSG_FATAL("Could not disconnectOutput"); - return StatusCode::FAILURE; - } - - setFilterPassed(false); // don't output events - return StatusCode::SUCCESS; -} -//___________________________________________________________________________ -StatusCode AthenaRootSharedWriter::finalize() { - ATH_MSG_INFO("in finalize()"); - delete m_rootMonitor; m_rootMonitor = nullptr; - delete m_rootServerSocket; m_rootServerSocket = nullptr; - return StatusCode::SUCCESS; -} diff --git a/Database/AthenaPOOL/AthenaPoolCnvSvc/src/AthenaRootSharedWriter.h b/Database/AthenaPOOL/AthenaPoolCnvSvc/src/AthenaRootSharedWriter.h deleted file mode 100644 index 5b2c90051a19..000000000000 --- a/Database/AthenaPOOL/AthenaPoolCnvSvc/src/AthenaRootSharedWriter.h +++ /dev/null @@ -1,47 +0,0 @@ -/* - Copyright (C) 2002-2020 CERN for the benefit of the ATLAS collaboration -*/ - -#ifndef ATHENAROOTSHAREDWRITER_H -#define ATHENAROOTSHAREDWRITER_H - -/** @file AthenaRootSharedWriter.h - * @brief This file contains the class definition for the AthenaRootSharedWriter class. - * @author Peter van Gemmeren <gemmeren@anl.gov> - **/ - -#include "AthenaBaseComps/AthFilterAlgorithm.h" - -class IConversionSvc; - -class TServerSocket; -class TMonitor; - -/** @class AthenaRootSharedWriter - * @brief This class provides an example for writing event data objects to Pool. - **/ -class AthenaRootSharedWriter : public AthFilterAlgorithm { -public: // Constructor and Destructor - /// Standard Service Constructor - AthenaRootSharedWriter(const std::string& name, ISvcLocator* pSvcLocator); - /// Destructor - virtual ~AthenaRootSharedWriter(); - -public: -/// Gaudi Service Interface method implementations: - virtual StatusCode initialize() override; - virtual StatusCode execute () override; - virtual StatusCode finalize() override; - -private: // properties - IntegerProperty m_numberOfClients{this,"NumberOfClients",1}; - IntegerProperty m_autoSend{this,"AutoSend",-1}; - -private: - IConversionSvc* m_cnvSvc; - - TServerSocket* m_rootServerSocket; - TMonitor* m_rootMonitor; -}; - -#endif diff --git a/Database/AthenaPOOL/AthenaPoolCnvSvc/src/AthenaRootSharedWriterSvc.cxx b/Database/AthenaPOOL/AthenaPoolCnvSvc/src/AthenaRootSharedWriterSvc.cxx new file mode 100644 index 000000000000..3b25a5de0263 --- /dev/null +++ b/Database/AthenaPOOL/AthenaPoolCnvSvc/src/AthenaRootSharedWriterSvc.cxx @@ -0,0 +1,174 @@ +/* + Copyright (C) 2002-2020 CERN for the benefit of the ATLAS collaboration +*/ + +/** @file AthenaRootSharedWriterSvc.cxx + * @brief This file contains the implementation for the AthenaRootSharedWriterSvc class. + * @author Peter van Gemmeren <gemmeren@anl.gov> + **/ + +#include "AthenaRootSharedWriterSvc.h" + +#include "TFile.h" +#include "TFileMerger.h" +#include "TMessage.h" +#include "TMemFile.h" +#include "TMonitor.h" +#include "TServerSocket.h" +#include "TSocket.h" + +/* Code from ROOT tutorials/net/parallelMergeServer.C, reduced to handle TTrees only */ + +struct ParallelFileMerger : public TObject +{ + TString fFilename; + TFileMerger fMerger; + + ParallelFileMerger(const char *filename, int compress = ROOT::RCompressionSetting::EDefaults::kUseCompiledDefault) : fFilename(filename), fMerger(kFALSE, kTRUE) + { + fMerger.OutputFile(filename, "RECREATE", compress); + } + + ~ParallelFileMerger() + { + } + + ULong_t Hash() const + { + return fFilename.Hash(); + } + + const char* GetName() const + { + return fFilename; + } + + Bool_t MergeTrees(TFile *input) + { + fMerger.AddFile(input); + Bool_t result = fMerger.PartialMerge(TFileMerger::kIncremental | TFileMerger::kResetable | TFileMerger::kKeepCompression); + return result; + } +}; + +//___________________________________________________________________________ +AthenaRootSharedWriterSvc::AthenaRootSharedWriterSvc(const std::string& name, ISvcLocator* pSvcLocator) + : AthService(name, pSvcLocator) + , m_rootServerSocket(nullptr), m_rootMonitor(nullptr), m_rootMergers(), m_rootClientIndex(0), m_rootClientCount(0) { +} +//___________________________________________________________________________ +StatusCode AthenaRootSharedWriterSvc::initialize() { + ATH_MSG_INFO("in initialize()"); + + // Initialize IConversionSvc + ATH_CHECK(m_cnvSvc.retrieve()); + IProperty* propertyServer = dynamic_cast<IProperty*>(m_cnvSvc.get()); + if (propertyServer == nullptr) { + ATH_MSG_ERROR("Unable to cast conversion service to IProperty"); + return StatusCode::FAILURE; + } else { + std::string propertyName = "StreamMetaDataOnly"; + bool streamMetaDataOnly(false); + BooleanProperty streamMetaDataOnlyProp(propertyName,streamMetaDataOnly); + if (propertyServer->getProperty(&streamMetaDataOnlyProp).isFailure()) { + ATH_MSG_INFO("Conversion service does not have StreamMetaDataOnly property"); + } else if(streamMetaDataOnlyProp.value()) { + m_rootServerSocket = new TServerSocket(1095, true, 100); + if (m_rootServerSocket == nullptr || !m_rootServerSocket->IsValid()) { + ATH_MSG_FATAL("Could not create ROOT TServerSocket"); + return StatusCode::FAILURE; + } + m_rootMonitor = new TMonitor; + m_rootMonitor->Add(m_rootServerSocket); + ATH_MSG_DEBUG("Successfully created ROOT TServerSocket and added it to TMonitor: ready to accept connections!"); + } + } + + return StatusCode::SUCCESS; +} +//___________________________________________________________________________ +StatusCode AthenaRootSharedWriterSvc::start() { + ATH_MSG_VERBOSE("Start commitOutput loop"); + StatusCode sc = m_cnvSvc->commitOutput("", false); + while (m_rootClientCount > 0 || (m_rootClientIndex == 0 && (sc.isSuccess() || sc.isRecoverable()))) { + if (sc.isSuccess()) { + ATH_MSG_VERBOSE("Success in commitOutput loop"); + } else if (m_rootMonitor != nullptr) { + TSocket* socket = m_rootMonitor->Select(1); + if (socket != nullptr && socket != (TSocket*)-1) { + ATH_MSG_DEBUG("ROOT Monitor got: " << socket); + if (socket->IsA() == TServerSocket::Class()) { + TSocket* client = ((TServerSocket*)socket)->Accept(); + client->Send(m_rootClientIndex, 0); + client->Send(1, 1); + ++m_rootClientIndex; + ++m_rootClientCount; + m_rootMonitor->Add(client); + ATH_MSG_INFO("ROOT Monitor add client: " << m_rootClientIndex << ", " << client); + } else { + TMessage* message; + socket->Recv(message); + if (message == nullptr) { + ATH_MSG_WARNING("ROOT Monitor got no message from socket: " << socket); + } else if (message->What() == kMESS_STRING) { + char str[64]; + message->ReadString(str, 64); + ATH_MSG_INFO("ROOT Monitor client: " << socket << ", " << str); + m_rootMonitor->Remove(socket); + ATH_MSG_INFO("ROOT Monitor client: " << socket << ", " << socket->GetBytesRecv() << ", " << socket->GetBytesSent()); + socket->Close(); + --m_rootClientCount; + if (m_rootMonitor->GetActive() == 0 || m_rootClientCount == 0) { + ATH_MSG_INFO("ROOT Monitor: No more active clients..."); + } + } else if (message->What() == kMESS_ANY) { + long long length; + TString filename; + int clientId; + message->ReadInt(clientId); + message->ReadTString(filename); + message->ReadLong64(length); + ATH_MSG_INFO("ROOT Monitor client: " << socket << ", " << clientId << ": " << filename << ", " << length); + std::unique_ptr<TMemFile> transient(new TMemFile(filename, message->Buffer() + message->Length(), length, "UPDATE")); + message->SetBufferOffset(message->Length() + length); + ParallelFileMerger* info = static_cast<ParallelFileMerger*>(m_rootMergers.FindObject(filename)); + if (!info) { + info = new ParallelFileMerger(filename, transient->GetCompressionSettings()); + m_rootMergers.Add(info); + ATH_MSG_INFO("ROOT Monitor ParallelFileMerger: " << info << ", for: " << filename); + } + info->MergeTrees(transient.get()); + } + } + } + } else if (m_rootMonitor == nullptr) { + usleep(100); + } + sc = m_cnvSvc->commitOutput("", false); + } + ATH_MSG_INFO("End commitOutput loop"); + return StatusCode::SUCCESS; +} +//___________________________________________________________________________ +StatusCode AthenaRootSharedWriterSvc::stop() { + m_rootMergers.Delete(); + return StatusCode::SUCCESS; +} +//___________________________________________________________________________ +StatusCode AthenaRootSharedWriterSvc::finalize() { + ATH_MSG_INFO("in finalize()"); + delete m_rootMonitor; m_rootMonitor = nullptr; + delete m_rootServerSocket; m_rootServerSocket = nullptr; + return StatusCode::SUCCESS; +} +//___________________________________________________________________________ +StatusCode AthenaRootSharedWriterSvc::queryInterface(const InterfaceID& riid, void** ppvInterface) { + if ( IAthenaSharedWriterSvc::interfaceID().versionMatch(riid) ) { + *ppvInterface = (IAthenaSharedWriterSvc*)this; + } else { + // Interface is not directly available: try out a base class + return(AthService::queryInterface(riid, ppvInterface)); + } + addRef(); + return(StatusCode::SUCCESS); +} diff --git a/Database/AthenaPOOL/AthenaPoolCnvSvc/src/AthenaRootSharedWriterSvc.h b/Database/AthenaPOOL/AthenaPoolCnvSvc/src/AthenaRootSharedWriterSvc.h new file mode 100644 index 000000000000..dc9c989df341 --- /dev/null +++ b/Database/AthenaPOOL/AthenaPoolCnvSvc/src/AthenaRootSharedWriterSvc.h @@ -0,0 +1,54 @@ +/* + Copyright (C) 2002-2020 CERN for the benefit of the ATLAS collaboration +*/ + +#ifndef ATHENAROOTSHAREDWRITERSVC_H +#define ATHENAROOTSHAREDWRITERSVC_H + +/** @file AthenaRootSharedWriterSvc.h + * @brief This file contains the class definition for the AthenaRootSharedWriterSvc class. + * @author Peter van Gemmeren <gemmeren@anl.gov> + **/ + +#include "AthenaBaseComps/AthService.h" +#include "AthenaKernel/IAthenaSharedWriterSvc.h" + +#include "GaudiKernel/IConversionSvc.h" + +#include "THashTable.h" + +class TServerSocket; +class TMonitor; + +/** @class AthenaRootSharedWriterSvc + * @brief This class provides an example for writing event data objects to Pool. + **/ +class AthenaRootSharedWriterSvc : public AthService, virtual public IAthenaSharedWriterSvc { + // Allow the factory class access to the constructor + friend class SvcFactory<AthenaRootSharedWriterSvc>; + +public: // Constructor and Destructor + /// Standard Service Constructor + AthenaRootSharedWriterSvc(const std::string& name, ISvcLocator* pSvcLocator); + /// Destructor + virtual ~AthenaRootSharedWriterSvc() = default; + +public: +/// Gaudi Service Interface method implementations: + virtual StatusCode initialize() override; + virtual StatusCode start() override; + virtual StatusCode stop() override; + virtual StatusCode finalize() override; + virtual StatusCode queryInterface(const InterfaceID& riid, void** ppvInterface) override; + +private: + ServiceHandle<IConversionSvc> m_cnvSvc{this,"AthenaPoolCnvSvc","AthenaPoolCnvSvc"}; + + TServerSocket* m_rootServerSocket; + TMonitor* m_rootMonitor; + THashTable m_rootMergers; + int m_rootClientIndex; + int m_rootClientCount; +}; + +#endif diff --git a/Database/AthenaPOOL/AthenaPoolCnvSvc/src/components/AthenaPoolCnvSvc_entries.cxx b/Database/AthenaPOOL/AthenaPoolCnvSvc/src/components/AthenaPoolCnvSvc_entries.cxx index 383e6eb422e8..efd91d9b22e7 100644 --- a/Database/AthenaPOOL/AthenaPoolCnvSvc/src/components/AthenaPoolCnvSvc_entries.cxx +++ b/Database/AthenaPOOL/AthenaPoolCnvSvc/src/components/AthenaPoolCnvSvc_entries.cxx @@ -1,13 +1,13 @@ #include "../AthenaPoolCnvSvc.h" #include "../AthenaRootSerializeSvc.h" -#include "../AthenaRootSharedWriter.h" +#include "../AthenaRootSharedWriterSvc.h" #include "../AthenaAttributeListCnv.h" #include "../CondAttrListCollCnv.h" #include "../CondAttrListVecCnv.h" DECLARE_COMPONENT( AthenaPoolCnvSvc ) DECLARE_COMPONENT( AthenaRootSerializeSvc ) -DECLARE_COMPONENT( AthenaRootSharedWriter ) +DECLARE_COMPONENT( AthenaRootSharedWriterSvc ) DECLARE_CONVERTER( AthenaAttributeListCnv ) DECLARE_CONVERTER( CondAttrListCollCnv ) DECLARE_CONVERTER( CondAttrListVecCnv ) diff --git a/Database/IOVDbSvc/src/IOVDbSvc.cxx b/Database/IOVDbSvc/src/IOVDbSvc.cxx index 859b92cf94cc..7c16b4062820 100644 --- a/Database/IOVDbSvc/src/IOVDbSvc.cxx +++ b/Database/IOVDbSvc/src/IOVDbSvc.cxx @@ -646,6 +646,7 @@ void IOVDbSvc::handle( const Incident& inc) { } if (m_par_managePoolConnections && m_poolPayloadRequested) { // reset POOL connection to close all open conditions POOL files + m_par_managePoolConnections.set(false); m_poolPayloadRequested=false; if (StatusCode::SUCCESS==m_h_poolSvc->disconnect(m_poolSvcContext)) { ATH_MSG_DEBUG( "Successfully closed input POOL connections"); -- GitLab