diff --git a/Control/AthenaServices/src/AthenaSharedMemoryTool.cxx b/Control/AthenaServices/src/AthenaSharedMemoryTool.cxx index 45deb50362d2b3ff2323618bf0f64b5d8e502ecc..66c5786483fbe2c806d9f6dbb1008845ce27bbb9 100644 --- a/Control/AthenaServices/src/AthenaSharedMemoryTool.cxx +++ b/Control/AthenaServices/src/AthenaSharedMemoryTool.cxx @@ -13,7 +13,7 @@ #include <boost/interprocess/shared_memory_object.hpp> #include <boost/interprocess/mapped_region.hpp> -const std::size_t maxTokenLength = 512; +const std::size_t maxTokenLength = 1024; struct ShareEventHeader { enum ProcessStatus { CLEARED, FILLED, LOCKED, UNLOCKED, PARTIAL, SHARED, UNKNOWN }; diff --git a/Control/AthenaServices/src/MetaDataSvc.cxx b/Control/AthenaServices/src/MetaDataSvc.cxx index b29fe0b7fe91b06630c8d19b62ddbd80aee5a717..221e7e964a2e912e02b5e4f4ee0f4dc4cf7d65b4 100644 --- a/Control/AthenaServices/src/MetaDataSvc.cxx +++ b/Control/AthenaServices/src/MetaDataSvc.cxx @@ -24,19 +24,54 @@ #include "SGTools/SGVersionedKey.h" #include "PersistentDataModel/DataHeader.h" +#include <vector> +#include <sstream> + //________________________________________________________________________________ MetaDataSvc::MetaDataSvc(const std::string& name, ISvcLocator* pSvcLocator) : ::AthService(name, pSvcLocator), m_inputDataStore("StoreGateSvc/InputMetaDataStore", name), m_outputDataStore("StoreGateSvc/MetaDataStore", name), m_addrCrtr("AthenaPoolCnvSvc", name), m_fileMgr("FileMgr", name), + m_incSvc("IncidentSvc", name), m_storageType(0L), m_clearedInputDataStore(true), - m_allowMetaDataStop(false) { + m_allowMetaDataStop(false), + m_persToClid(), + m_toolForClid(), + m_streamForKey() { // declare properties declareProperty("MetaDataContainer", m_metaDataCont = ""); declareProperty("MetaDataTools", m_metaDataTools); declareProperty("CnvSvc", m_addrCrtr = ServiceHandle<IAddressCreator>("AthenaPoolCnvSvc", name)); + // persistent class name to transient CLID map + m_persToClid.insert(std::pair<std::string, CLID>("DataHeader_p5", 222376821)); + m_persToClid.insert(std::pair<std::string, CLID>("EventStreamInfo_p3", 167728019)); + m_persToClid.insert(std::pair<std::string, CLID>("ByteStreamMetadataContainer_p1", 1076128893)); + m_persToClid.insert(std::pair<std::string, CLID>("IOVMetaDataContainer_p1", 1316383046)); + m_persToClid.insert(std::pair<std::string, CLID>("xAOD::EventFormat_v1", 243004407)); + m_persToClid.insert(std::pair<std::string, CLID>("xAOD::CutBookkeeperContainer_v1", 1234982351)); + m_persToClid.insert(std::pair<std::string, CLID>("xAOD::CutBookkeeperAuxContainer_v1", 1147935274)); + m_persToClid.insert(std::pair<std::string, CLID>("xAOD::TriggerMenuContainer_v1", 1107011239)); + m_persToClid.insert(std::pair<std::string, CLID>("DataVector<xAOD::TriggerMenu_v1>", 1107011239)); + m_persToClid.insert(std::pair<std::string, CLID>("xAOD::TriggerMenuAuxContainer_v1", 1212409402)); + m_persToClid.insert(std::pair<std::string, CLID>("xAOD::LumiBlockRangeContainer_v1", 1115934851)); + m_persToClid.insert(std::pair<std::string, CLID>("DataVector<xAOD::LumiBlockRange_v1>", 1115934851)); + m_persToClid.insert(std::pair<std::string, CLID>("xAOD::LumiBlockRangeAuxContainer_v1", 1251061086)); + m_persToClid.insert(std::pair<std::string, CLID>("xAOD::FileMetaData_v1", 178309087)); + m_persToClid.insert(std::pair<std::string, CLID>("xAOD::FileMetaDataAuxInfo_v1", 73252552)); + m_persToClid.insert(std::pair<std::string, CLID>("xAOD::RingSetConfContainer_v1", 1157997427)); + m_persToClid.insert(std::pair<std::string, CLID>("DataVector<xAOD::RingSetConf_v1>", 1157997427)); + m_persToClid.insert(std::pair<std::string, CLID>("xAOD::RingSetConfAuxContainer_v1", 1307745126)); + m_persToClid.insert(std::pair<std::string, CLID>("xAOD::TruthMetaDataContainer_v1", 1188015687)); + m_persToClid.insert(std::pair<std::string, CLID>("DataVector<xAOD::TruthMetaData_v1>", 1188015687)); + m_persToClid.insert(std::pair<std::string, CLID>("xAOD::TruthMetaDataAuxContainer_v1", 1094306618)); + // some classes need to have new/different tools added for metadata propagation + m_toolForClid.insert(std::pair<CLID, std::string>(167728019, "CopyEventStreamInfo")); + m_toolForClid.insert(std::pair<CLID, std::string>(243004407, "xAODMaker::EventFormatMetaDataTool")); + m_toolForClid.insert(std::pair<CLID, std::string>(1234982351, "BookkeeperTool")); + m_toolForClid.insert(std::pair<CLID, std::string>(1107011239, "xAODMaker::TriggerMenuMetaDataTool")); + m_toolForClid.insert(std::pair<CLID, std::string>(1115934851, "LumiBlockMetaDataTool")); } //__________________________________________________________________________ MetaDataSvc::~MetaDataSvc() { @@ -76,8 +111,7 @@ StatusCode MetaDataSvc::initialize() { return(StatusCode::FAILURE); } // Set to be listener for end of event - ServiceHandle<IIncidentSvc> incsvc("IncidentSvc", this->name()); - if (!incsvc.retrieve().isSuccess()) { + if (!m_incSvc.retrieve().isSuccess()) { ATH_MSG_FATAL("Cannot get IncidentSvc."); return(StatusCode::FAILURE); } @@ -85,12 +119,13 @@ StatusCode MetaDataSvc::initialize() { ATH_MSG_FATAL("Cannot get " << m_metaDataTools); return(StatusCode::FAILURE); } - incsvc->addListener(this, "FirstInputFile", 90); - incsvc->addListener(this, "BeginTagFile", 90); - incsvc->addListener(this, "BeginInputFile", 90); - incsvc->addListener(this, "EndInputFile", 10); - incsvc->addListener(this, "EndTagFile", 10); - incsvc->addListener(this, "LastInputFile", 10); + m_incSvc->addListener(this, "FirstInputFile", 90); + m_incSvc->addListener(this, "BeginTagFile", 90); + m_incSvc->addListener(this, "BeginInputFile", 90); + m_incSvc->addListener(this, "EndInputFile", 10); + m_incSvc->addListener(this, "EndTagFile", 10); + m_incSvc->addListener(this, "LastInputFile", 10); + m_incSvc->addListener(this, "ShmProxy", 90); // Register this service for 'I/O' events ServiceHandle<IIoComponentMgr> iomgr("IoComponentMgr", this->name()); if (!iomgr.retrieve().isSuccess()) { @@ -123,6 +158,10 @@ StatusCode MetaDataSvc::initialize() { } //__________________________________________________________________________ StatusCode MetaDataSvc::finalize() { + // Release IncidentService + if (!m_incSvc.release().isSuccess()) { + ATH_MSG_WARNING("Cannot release IncidentService."); + } // Release FileMgr if (!m_fileMgr.release().isSuccess()) { ATH_MSG_WARNING("Cannot release FileMgr."); @@ -167,13 +206,8 @@ StatusCode MetaDataSvc::stop() { ATH_MSG_WARNING("Cannot release " << m_metaDataTools); } // Set to be listener for end of event - ServiceHandle<IIncidentSvc> incsvc("IncidentSvc", this->name()); - if (!incsvc.retrieve().isSuccess()) { - ATH_MSG_FATAL("Cannot get the IncidentSvc"); - return(StatusCode::FAILURE); - } Incident metaDataStopIncident(name(), "MetaDataStop"); - incsvc->fireIncident(metaDataStopIncident); + m_incSvc->fireIncident(metaDataStopIncident); return(StatusCode::SUCCESS); } //_______________________________________________________________________ @@ -214,29 +248,11 @@ StatusCode MetaDataSvc::loadAddresses(StoreID::type storeID, IAddressProvider::t for (std::vector<DataHeaderElement>::const_iterator dhIter = dataHeader->begin(), dhLast = dataHeader->end(); dhIter != dhLast; dhIter++) { const CLID clid = dhIter->getPrimaryClassID(); - SG::VersionedKey myVersObjKey(dhIter->getKey(), verNumber); - IOpaqueAddress* opqAddr = 0; - if (dhIter->getToken() == 0 - || !m_addrCrtr->createAddress(m_storageType, clid, dhIter->getToken()->toString(), opqAddr).isSuccess()) { - ATH_MSG_ERROR("Could not create IOpaqueAddress, will not read Metadata object"); - continue; - } - SG::TransientAddress* tadd = 0; - if (verNumber == 0 && clid != ClassID_traits<DataHeader>::ID()) { - tadd = new SG::TransientAddress(clid, dhIter->getKey(), opqAddr); - } else { - tadd = new SG::TransientAddress(clid, myVersObjKey, opqAddr); - } - if (tadd->clID() == ClassID_traits<DataHeader>::ID()) { - delete tadd; tadd = 0; - } else { - std::set<unsigned int> clids = dhIter->getClassIDs(); - for (std::set<unsigned int>::const_iterator iter = clids.begin(), last = clids.end(); - iter != last; iter++) { - tadd->setTransientID(*iter); - } - tadd->setAlias(dhIter->getAlias()); - tads.push_back(tadd); + if (clid != ClassID_traits<DataHeader>::ID()) { + SG::VersionedKey myVersObjKey(dhIter->getKey(), verNumber); + std::string key = dhIter->getKey(); + if (verNumber != 0) key = myVersObjKey; + tads.push_back(dhIter->getAddress(key)); } } } @@ -291,6 +307,16 @@ void MetaDataSvc::handle(const Incident& inc) { if (!m_metaDataTools.release().isSuccess()) { ATH_MSG_WARNING("Cannot release " << m_metaDataTools); } + } else if (inc.type() == "ShmProxy") { + if (!m_clearedInputDataStore) { + if (!m_inputDataStore->clearStore().isSuccess()) { + ATH_MSG_WARNING("Unable to clear input MetaData Proxies"); + } + m_clearedInputDataStore = true; + } + if (!addProxyToInputMetaDataStore(fileName).isSuccess()) { + ATH_MSG_WARNING("Unable to add proxy to InputMetaDataStore"); + } } } //__________________________________________________________________________ @@ -300,13 +326,8 @@ StatusCode MetaDataSvc::transitionMetaDataFile(bool ignoreInputFile) { return(StatusCode::FAILURE); } // Set to be listener for end of event - ServiceHandle<IIncidentSvc> incsvc("IncidentSvc", this->name()); - if (!incsvc.retrieve().isSuccess()) { - ATH_MSG_FATAL("Cannot get the IncidentSvc"); - return(StatusCode::FAILURE); - } Incident metaDataStopIncident(name(), "MetaDataStop"); - incsvc->fireIncident(metaDataStopIncident); + m_incSvc->fireIncident(metaDataStopIncident); if (!m_metaDataTools.release().isSuccess()) { ATH_MSG_WARNING("Cannot release " << m_metaDataTools); } @@ -343,6 +364,84 @@ StatusCode MetaDataSvc::rootOpenAction(FILEMGR_CALLBACK_ARGS) { return(StatusCode::SUCCESS); } //__________________________________________________________________________ +StatusCode MetaDataSvc::addProxyToInputMetaDataStore(const std::string& tokenStr) { + std::string fileName = tokenStr.substr(tokenStr.find("[FILE=") + 6); + fileName = fileName.substr(0, fileName.find(']')); + std::string className = tokenStr.substr(tokenStr.find("[PNAME=") + 7); + className = className.substr(0, className.find(']')); + std::string contName = tokenStr.substr(tokenStr.find("[CONT=") + 6); + contName = contName.substr(0, contName.find(']')); + std::size_t pos1 = contName.find('('); + std::string keyName = contName.substr(pos1 + 1, contName.size() - pos1 - 2); + std::size_t pos2 = keyName.find('/'); + if (pos2 != std::string::npos) keyName = keyName.substr(pos2 + 1); + std::string numName = tokenStr.substr(tokenStr.find("[NUM=") + 5); + numName = numName.substr(0, numName.find(']')); + unsigned long num = 0; + std::istringstream iss(numName); + iss >> num; + CLID clid = m_persToClid[className]; + const std::string par[2] = { "SHM" , className }; + const unsigned long ipar[2] = { num , 0 }; + IOpaqueAddress* opqAddr = nullptr; + if (clid == 167728019) { // EventStreamInfo, will change tool to combine input metadata, clearing things before... + bool foundTool = false; + for (ToolHandleArray<IAlgTool>::const_iterator iter = m_metaDataTools.begin(), iterEnd = m_metaDataTools.end(); iter != iterEnd; iter++) { + if ((*iter)->name() == "ToolSvc.CopyEventStreamInfo") foundTool = true; + } + if (!foundTool) { + ServiceHandle<IIncidentListener> cfSvc("CutFlowSvc", this->name()); // Disable CutFlowSvc by stopping its incidents. + if (cfSvc.retrieve().isSuccess()) { + m_incSvc->removeListener(cfSvc.get(), IncidentType::BeginInputFile); + m_incSvc->removeListener(cfSvc.get(), IncidentType::EndInputFile); + m_incSvc->removeListener(cfSvc.get(), IncidentType::EndRun); + m_incSvc->removeListener(cfSvc.get(), "StoreCleared"); + cfSvc.release().ignore(); + } + if (!m_outputDataStore->clearStore().isSuccess()) { + ATH_MSG_WARNING("Unable to clear output MetaData Proxies"); + } + } + } + const std::string toolName = m_toolForClid[clid]; + if (!toolName.empty()) { + bool foundTool = false; + for (ToolHandleArray<IAlgTool>::const_iterator iter = m_metaDataTools.begin(), iterEnd = m_metaDataTools.end(); iter != iterEnd; iter++) { + if ((*iter)->name() == "ToolSvc." + toolName) foundTool = true; + } + if (!foundTool) { + ToolHandle<IAlgTool> metadataTool(toolName); + m_metaDataTools.push_back(metadataTool); + if (!metadataTool.retrieve().isSuccess()) { + ATH_MSG_FATAL("Cannot get " << toolName); + return(StatusCode::FAILURE); + } + } + } + + if (!m_addrCrtr->createAddress(m_storageType, clid, par, ipar, opqAddr).isSuccess()) { + ATH_MSG_FATAL("addProxyToInputMetaDataStore: Cannot create address for " << tokenStr); + return(StatusCode::FAILURE); + } + if (m_inputDataStore->recordAddress(keyName, opqAddr).isFailure()) { + delete opqAddr; opqAddr = nullptr; + ATH_MSG_FATAL("addProxyToInputMetaDataStore: Cannot create proxy for " << tokenStr); + return(StatusCode::FAILURE); + } + if (m_inputDataStore->accessData(clid, keyName) == nullptr) { + ATH_MSG_FATAL("addProxyToInputMetaDataStore: Cannot access data for " << tokenStr); + return(StatusCode::FAILURE); + } + std::map<std::string, std::string>::const_iterator iter = m_streamForKey.find(keyName); + if (iter == m_streamForKey.end()) { + m_streamForKey.insert(std::pair<std::string, std::string>(keyName, fileName)); + } else if (fileName != iter->second) { // Remove duplicated objects + ATH_MSG_DEBUG("Resetting duplicate proxy for: " << clid << "#" << keyName << " from file: " << fileName); + m_inputDataStore->proxy(clid, keyName)->reset(); + } + return(StatusCode::SUCCESS); +} +//__________________________________________________________________________ StatusCode MetaDataSvc::initInputMetaDataStore(const std::string& fileName) { ATH_MSG_DEBUG("initInputMetaDataStore: file name " << fileName); m_clearedInputDataStore = false; diff --git a/Control/AthenaServices/src/MetaDataSvc.h b/Control/AthenaServices/src/MetaDataSvc.h index e4b93ebda5e748e306555ca06c2f859c0c257d3a..0d427edc7896c870456c8a37996b3cf08aced205 100644 --- a/Control/AthenaServices/src/MetaDataSvc.h +++ b/Control/AthenaServices/src/MetaDataSvc.h @@ -21,7 +21,7 @@ #include "boost/bind.hpp" -#include <vector> +#include <map> // Forward declarations class IAddressCreator; @@ -90,6 +90,8 @@ public: // Non-static members StatusCode rootOpenAction(FILEMGR_CALLBACK_ARGS); private: + /// Add proxy to input metadata store - can be called directly or via BeginInputFile incident + StatusCode addProxyToInputMetaDataStore(const std::string& tokenStr); /// Initialize input metadata store - can be called directly or via BeginInputFile incident StatusCode initInputMetaDataStore(const std::string& fileName); @@ -98,10 +100,14 @@ private: // data ServiceHandle<StoreGateSvc> m_outputDataStore; ServiceHandle<IAddressCreator> m_addrCrtr; ServiceHandle<IFileMgr> m_fileMgr; + ServiceHandle<IIncidentSvc> m_incSvc; long m_storageType; bool m_clearedInputDataStore; bool m_allowMetaDataStop; + std::map<std::string, CLID> m_persToClid; + std::map<CLID, std::string> m_toolForClid; + std::map<std::string, std::string> m_streamForKey; private: // properties /// MetaDataContainer, POOL container name for MetaData. diff --git a/Database/AthenaPOOL/AthenaPoolCnvSvc/src/AthenaPoolCnvSvc.cxx b/Database/AthenaPOOL/AthenaPoolCnvSvc/src/AthenaPoolCnvSvc.cxx index 4bc258383d4e62659aff7b4faffb3a498814ade8..62416beea9d310d4161e10d4dd4a86a05b94f40d 100644 --- a/Database/AthenaPOOL/AthenaPoolCnvSvc/src/AthenaPoolCnvSvc.cxx +++ b/Database/AthenaPOOL/AthenaPoolCnvSvc/src/AthenaPoolCnvSvc.cxx @@ -10,6 +10,7 @@ #include "AthenaPoolCnvSvc.h" #include "GaudiKernel/ClassID.h" +#include "GaudiKernel/FileIncident.h" #include "GaudiKernel/IChronoStatSvc.h" #include "GaudiKernel/IOpaqueAddress.h" #include "GaudiKernel/IJobOptionsSvc.h" @@ -31,8 +32,6 @@ #include "AuxDiscoverySvc.h" -#include <set> - //______________________________________________________________________________ // Initialize the service. StatusCode AthenaPoolCnvSvc::initialize() { @@ -324,7 +323,8 @@ StatusCode AthenaPoolCnvSvc::connectOutput(const std::string& outputConnectionSp if (!m_outputStreamingTool.empty() && m_outputStreamingTool[0]->isClient()) { return(StatusCode::SUCCESS); } - if (!m_outputStreamingTool.empty() && m_streamServer == m_outputStreamingTool.size()) { + if (!m_outputStreamingTool.empty() + && (m_streamServer == m_outputStreamingTool.size() || !m_outputStreamingTool[0]->isServer())) { ATH_MSG_DEBUG("connectOutput SKIPPED for expired server."); return(StatusCode::SUCCESS); } @@ -372,7 +372,8 @@ StatusCode AthenaPoolCnvSvc::commitOutput(const std::string& outputConnectionSpe } return(StatusCode::SUCCESS); } - if (!m_outputStreamingTool.empty() && m_streamServer == m_outputStreamingTool.size()) { + if (!m_outputStreamingTool.empty() + && (m_streamServer == m_outputStreamingTool.size() || !m_outputStreamingTool[0]->isServer())) { ATH_MSG_DEBUG("commitOutput SKIPPED for expired server."); return(StatusCode::SUCCESS); } @@ -384,9 +385,8 @@ StatusCode AthenaPoolCnvSvc::commitOutput(const std::string& outputConnectionSpe int num = -1; StatusCode sc = m_outputStreamingTool[m_streamServer]->clearObject(&placementStr, num); if (sc.isSuccess() && placementStr != nullptr && strlen(placementStr) > 0 && num > 0) { - char* fileStr = strstr(placementStr, "[FILE=") + 6; - char* endPos = strpbrk(fileStr, "]"); *endPos = 0; - std::string fileName(fileStr); *endPos = ']'; + std::string fileName = strstr(placementStr, "[FILE="); + fileName = fileName.substr(6, fileName.find(']') - 6); if (!this->connectOutput(fileName).isSuccess()) { ATH_MSG_ERROR("Failed to connectOutput for " << fileName); return(StatusCode::FAILURE); @@ -399,80 +399,79 @@ StatusCode AthenaPoolCnvSvc::commitOutput(const std::string& outputConnectionSpe if (m_doChronoStat) { m_chronoStatSvc->chronoStart("cRep_" + objName); } + std::string tokenStr = placementStr; + std::string contName = strstr(placementStr, "[CONT="); + tokenStr = tokenStr.substr(0, tokenStr.find("[CONT=")) + contName.substr(contName.find(']') + 1); + contName = contName.substr(6, contName.find(']') - 6); std::string className = strstr(placementStr, "[PNAME="); className = className.substr(7, className.find(']') - 7); RootType classDesc = RootType::ByName(className); - // Get object - void* buffer = nullptr; - size_t nbytes = 0; - sc = m_outputStreamingTool[m_streamServer]->getObject(&buffer, nbytes, num); - while (sc.isRecoverable()) { - //usleep(100); - sc = m_outputStreamingTool[m_streamServer]->getObject(&buffer, nbytes, num); - } - if (!sc.isSuccess()) { - ATH_MSG_ERROR("Failed to get Data for " << placementStr); - return(StatusCode::FAILURE); - } - // Deserialize object void* obj = nullptr; - if (m_doChronoStat) { - m_chronoStatSvc->chronoStart("wDeser_ALL"); - } - obj = m_serializeSvc->deserialize(buffer, nbytes, classDesc); buffer = nullptr; - if (m_doChronoStat) { - m_chronoStatSvc->chronoStop("wDeser_ALL"); - m_chronoStatSvc->chronoStart("wAux_ALL"); - } - AuxDiscoverySvc auxDiscover; - if (!auxDiscover.receiveStore(const_cast<IAthenaSerializeSvc*>(m_serializeSvc.get()), dynamic_cast<const IAthenaIPCTool*>(m_outputStreamingTool[m_streamServer].get()), obj, num).isSuccess()) { - ATH_MSG_ERROR("Failed to get Dynamic Aux Store for " << placementStr); - return(StatusCode::FAILURE); - } - if (m_doChronoStat) { - m_chronoStatSvc->chronoStop("wAux_ALL"); - } - // Write object - Placement placement; - placement.fromString(placementStr); placementStr = nullptr; - const Token* token = this->registerForWrite(&placement, obj, classDesc); - if (token == nullptr) { - ATH_MSG_ERROR("Failed to write Data for: " << className); - return(StatusCode::FAILURE); - } - - // For DataHeaderForm, Token needs to be inserted to DataHeader Object - if (className == "DataHeaderForm_p5") { - GenericAddress address(POOL_StorageType, ClassID_traits<DataHeader>::ID(), token->toString(), placement.auxString()); - IConverter* cnv = converter(ClassID_traits<DataHeader>::ID()); - if (!cnv->updateRepRefs(&address, static_cast<DataObject*>(obj)).isSuccess()) { - ATH_MSG_ERROR("Failed updateRepRefs for obj = " << token->toString()); + std::string::size_type len = m_metadataContainerProp.value().size(); + // For Metadata fire incident to read object into store + if (len > 0 && contName.substr(0, len) == m_metadataContainerProp.value() + && contName.substr(len, 1) == "(") { + ServiceHandle<IIncidentSvc> incSvc("IncidentSvc", name()); + std::ostringstream oss; + oss << std::dec << num; + FileIncident proxyIncident(name(), "ShmProxy", std::string(placementStr) + "[NUM=" + oss.str() + "]"); + incSvc->fireIncident(proxyIncident); // Object will be pulled out of shared memory by setObjPtr() + } else { + Token readToken; + readToken.setOid(Token::OID_t(num, 0)); + readToken.setAuxString("[PNAME=" + className + "]"); + this->setObjPtr(obj, &readToken); // Pull/read Obbject out of shared memory + if (len > 0 && contName.substr(0, len) == m_metadataContainerProp.value()) { + ServiceHandle<IIncidentSvc> incSvc("IncidentSvc", name()); + if (contName.substr(len, 7) == "HdrForm") { + FileIncident beginInputIncident(name(), "BeginInputFile", "SHM"); + incSvc->fireIncident(beginInputIncident); + } else if (contName.substr(len, 3) == "Hdr") { + FileIncident endInputIncident(name(), "EndInputFile", "SHM"); + incSvc->fireIncident(endInputIncident); + } + } else { + // Write object + Placement placement; + placement.fromString(placementStr); placementStr = nullptr; + const Token* token = this->registerForWrite(&placement, obj, classDesc); + if (token == nullptr) { + ATH_MSG_ERROR("Failed to write Data for: " << className); + return(StatusCode::FAILURE); + } + tokenStr = token->toString(); delete token; token = nullptr; - return(StatusCode::FAILURE); - } - } else if (className != "Token" && !classDesc.IsFundamental()) { - commitCache.insert(std::pair<void*, RootType>(obj, classDesc)); - } - // Found DataHeader - if (className == "DataHeader_p5") { - GenericAddress address(POOL_StorageType, ClassID_traits<DataHeader>::ID(), token->toString(), placement.auxString()); - IConverter* cnv = converter(ClassID_traits<DataHeader>::ID()); - if (!cnv->updateRep(&address, static_cast<DataObject*>(obj)).isSuccess()) { - ATH_MSG_ERROR("Failed updateRep for obj = " << token->toString()); - delete token; token = nullptr; - return(StatusCode::FAILURE); + // For DataHeaderForm, Token needs to be inserted to DataHeader Object + if (className == "DataHeaderForm_p5") { + GenericAddress address(POOL_StorageType, ClassID_traits<DataHeader>::ID(), tokenStr, placement.auxString()); + IConverter* cnv = converter(ClassID_traits<DataHeader>::ID()); + if (!cnv->updateRepRefs(&address, static_cast<DataObject*>(obj)).isSuccess()) { + ATH_MSG_ERROR("Failed updateRepRefs for obj = " << tokenStr); + return(StatusCode::FAILURE); + } + // Found DataHeader + } else if (className == "DataHeader_p5") { + GenericAddress address(POOL_StorageType, ClassID_traits<DataHeader>::ID(), tokenStr, placement.auxString()); + IConverter* cnv = converter(ClassID_traits<DataHeader>::ID()); + if (!cnv->updateRep(&address, static_cast<DataObject*>(obj)).isSuccess()) { + ATH_MSG_ERROR("Failed updateRep for obj = " << tokenStr); + return(StatusCode::FAILURE); + } + commitCache.insert(std::pair<void*, RootType>(obj, classDesc)); + } else if (className != "Token" && !classDesc.IsFundamental()) { + commitCache.insert(std::pair<void*, RootType>(obj, classDesc)); + } + } } // Send Token back to Client - sc = m_outputStreamingTool[m_streamServer]->lockObject(token->toString().c_str(), num); + sc = m_outputStreamingTool[m_streamServer]->lockObject(tokenStr.c_str(), num); if (!sc.isSuccess()) { - ATH_MSG_ERROR("Failed to lock Data for " << token->toString()); - delete token; token = nullptr; + ATH_MSG_ERROR("Failed to lock Data for " << tokenStr); return(StatusCode::FAILURE); } - delete token; token = nullptr; sc = m_outputStreamingTool[m_streamServer]->clearObject(&placementStr, num); while (sc.isRecoverable()) { sc = m_outputStreamingTool[m_streamServer]->clearObject(&placementStr, num); @@ -557,12 +556,18 @@ StatusCode AthenaPoolCnvSvc::disconnectOutput() { if (!m_outputStreamingTool.empty() && m_outputStreamingTool[0]->isClient()) { return(StatusCode::SUCCESS); } - if (!m_outputStreamingTool.empty() && m_streamServer == m_outputStreamingTool.size()) { - ATH_MSG_DEBUG("disconnectOutput SKIPPED for expired server."); - return(StatusCode::SUCCESS); - } if (!m_outputStreamingTool.empty()) { - m_streamServer = m_outputStreamingTool.size(); + if (m_metadataContainerProp.value().empty() && m_streamServer == m_outputStreamingTool.size()) { + ATH_MSG_DEBUG("disconnectOutput SKIPPED for expired server."); + return(StatusCode::SUCCESS); + } else if (!m_metadataContainerProp.value().empty() && m_streamServer < m_outputStreamingTool.size()) { + m_streamServer = m_outputStreamingTool.size(); + ATH_MSG_DEBUG("disconnectOutput SKIPPED to expire server."); + return(StatusCode::SUCCESS); + } else { + m_streamServer = m_outputStreamingTool.size(); + } + ATH_MSG_DEBUG("disconnectOutput not SKIPPED for server: " << m_streamServer); } // Setting default 'TREE_MAX_SIZE' for ROOT to 1024 GB to avoid file chains. std::vector<std::string> maxFileSize; @@ -624,17 +629,15 @@ IPoolSvc* AthenaPoolCnvSvc::getPoolSvc() { } //______________________________________________________________________________ const Token* AthenaPoolCnvSvc::registerForWrite(const Placement* placement, - const void* obj, - const RootType& classDesc) const { + const void* obj, + const RootType& classDesc) const { if (m_doChronoStat) { m_chronoStatSvc->chronoStart("cRepR_ALL"); } const Token* token = nullptr; if (!m_outputStreamingTool.empty() && m_outputStreamingTool[0]->isClient()) { - std::string placementStr = placement->toString(); std::size_t streamClient = 0; - std::string fileName = placementStr.substr(placementStr.find("[FILE=") + 6); - fileName = fileName.substr(0, fileName.find(']')); + std::string fileName = placement->fileName(); for (std::vector<std::string>::const_iterator iter = m_streamClientFiles.begin(), last = m_streamClientFiles.end(); iter != last; iter++) { if (*iter == fileName) break; streamClient++; @@ -647,7 +650,13 @@ const Token* AthenaPoolCnvSvc::registerForWrite(const Placement* placement, } } // Lock object - placementStr = placementStr + "[PNAME=" + classDesc.Name() + "]"; + std::string placementStr = placement->toString(); + std::size_t formPos = placementStr.find("[FORM="); + if (formPos != std::string::npos) { + placementStr = placementStr.substr(0, formPos) + "[PNAME=" + classDesc.Name() + "]" + placementStr.substr(formPos); + } else { + placementStr += "[PNAME=" + classDesc.Name() + "]"; + } ATH_MSG_VERBOSE("Requesting write object for: " << placementStr); StatusCode sc = m_outputStreamingTool[streamClient]->lockObject(placementStr.c_str()); while (sc.isRecoverable()) { @@ -660,7 +669,7 @@ const Token* AthenaPoolCnvSvc::registerForWrite(const Placement* placement, } // Serialize object via ROOT const void* buffer = nullptr; - size_t nbytes = 0; + std::size_t nbytes = 0; bool own = true; if (classDesc.Name() == "Token") { nbytes = strlen(static_cast<const char*>(obj)) + 1; @@ -671,13 +680,7 @@ const Token* AthenaPoolCnvSvc::registerForWrite(const Placement* placement, buffer = obj; own = false; } else { - if (m_doChronoStat) { - m_chronoStatSvc->chronoStart("wSer_ALL"); - } buffer = m_serializeSvc->serialize(obj, classDesc, nbytes); - if (m_doChronoStat) { - m_chronoStatSvc->chronoStop("wSer_ALL"); - } } // Share object sc = m_outputStreamingTool[streamClient]->putObject(buffer, nbytes); @@ -687,19 +690,11 @@ const Token* AthenaPoolCnvSvc::registerForWrite(const Placement* placement, } if (own) { delete [] static_cast<const char*>(buffer); } buffer = nullptr; - std::string contName = strstr(placementStr.c_str(), "[CONT="); - contName = contName.substr(6, contName.find(']') - 6); - if (m_doChronoStat) { - m_chronoStatSvc->chronoStart("wAux_ALL"); - } AuxDiscoverySvc auxDiscover; - if (!auxDiscover.sendStore(const_cast<IAthenaSerializeSvc*>(m_serializeSvc.get()), dynamic_cast<const IAthenaIPCTool*>(m_outputStreamingTool[streamClient].get()), obj, pool::DbReflex::guid(classDesc), contName).isSuccess()) { + if (!auxDiscover.sendStore(const_cast<IAthenaSerializeSvc*>(m_serializeSvc.get()), dynamic_cast<const IAthenaIPCTool*>(m_outputStreamingTool[streamClient].get()), obj, pool::DbReflex::guid(classDesc), placement->containerName()).isSuccess()) { ATH_MSG_ERROR("Could not share dynamic aux store for: " << placementStr); return(nullptr); } - if (m_doChronoStat) { - m_chronoStatSvc->chronoStop("wAux_ALL"); - } if (!sc.isSuccess() || !m_outputStreamingTool[streamClient]->putObject(nullptr, 0).isSuccess()) { ATH_MSG_ERROR("Failed to put Data for " << placementStr); return(nullptr); @@ -718,15 +713,23 @@ const Token* AthenaPoolCnvSvc::registerForWrite(const Placement* placement, } Token* tempToken = new Token(); tempToken->fromString(tokenStr); tokenStr = nullptr; + tempToken->setClassID(pool::DbReflex::guid(classDesc)); token = tempToken; tempToken = nullptr; } else { - if (!m_outputStreamingTool.empty() && m_streamServer == m_outputStreamingTool.size()) { - ATH_MSG_DEBUG("registerForWrite SKIPPED for expired server."); + if (!m_outputStreamingTool.empty() && m_metadataContainerProp.value().empty() + && (m_streamServer == m_outputStreamingTool.size() || !m_outputStreamingTool[0]->isServer())) { + ATH_MSG_DEBUG("registerForWrite SKIPPED for expired server, Placement = " << placement->toString()); Token* tempToken = new Token(); tempToken->setClassID(pool::DbReflex::guid(classDesc)); - return(tempToken); + token = tempToken; tempToken = nullptr; + } else if (!m_outputStreamingTool.empty() && !m_outputStreamingTool[0]->isServer()) { + ATH_MSG_DEBUG("registerForWrite SKIPPED for uninitialized server, Placement = " << placement->toString()); + Token* tempToken = new Token(); + tempToken->setClassID(pool::DbReflex::guid(classDesc)); + token = tempToken; tempToken = nullptr; + } else { + token = m_poolSvc->registerForWrite(placement, obj, classDesc); } - token = m_poolSvc->registerForWrite(placement, obj, classDesc); } if (m_doChronoStat) { m_chronoStatSvc->chronoStop("cRepR_ALL"); @@ -738,6 +741,41 @@ void AthenaPoolCnvSvc::setObjPtr(void*& obj, const Token* token) const { if (m_doChronoStat) { m_chronoStatSvc->chronoStart("cObjR_ALL"); } + if (!m_outputStreamingTool.empty() && m_streamServer < m_outputStreamingTool.size() + && m_outputStreamingTool[m_streamServer]->isServer()) { + if (token->dbID() == Guid::null()) { + int num = token->oid().first; + // Get object from SHM + void* buffer = nullptr; + std::size_t nbytes = 0; + StatusCode sc = m_outputStreamingTool[m_streamServer]->getObject(&buffer, nbytes, num); + while (sc.isRecoverable()) { + //usleep(100); + sc = m_outputStreamingTool[m_streamServer]->getObject(&buffer, nbytes, num); + } + if (!sc.isSuccess()) { + ATH_MSG_ERROR("Failed to get Data for " << token->toString()); + return; + } + if (token->classID() != Guid::null()) { + // Deserialize object + RootType cltype(pool::DbReflex::forGuid(token->classID())); + obj = m_serializeSvc->deserialize(buffer, nbytes, cltype); buffer = nullptr; + } else { + // Deserialize object + std::string className = token->auxString(); + className = className.substr(className.find("[PNAME=")); + className = className.substr(7, className.find(']') - 7); + RootType cltype(RootType::ByName(className)); + obj = m_serializeSvc->deserialize(buffer, nbytes, cltype); buffer = nullptr; + } + AuxDiscoverySvc auxDiscover; + if (!auxDiscover.receiveStore(const_cast<IAthenaSerializeSvc*>(m_serializeSvc.get()), dynamic_cast<const IAthenaIPCTool*>(m_outputStreamingTool[m_streamServer].get()), obj, num).isSuccess()) { + ATH_MSG_ERROR("Failed to get Dynamic Aux Store for " << token->toString()); + } + return; + } + } if (!m_inputStreamingTool.empty() && m_inputStreamingTool->isClient()) { ATH_MSG_VERBOSE("Requesting object for: " << token->toString()); if (!m_inputStreamingTool->lockObject(token->toString().c_str()).isSuccess()) { @@ -748,7 +786,7 @@ void AthenaPoolCnvSvc::setObjPtr(void*& obj, const Token* token) const { m_chronoStatSvc->chronoStart("gObj_ALL"); } void* buffer = nullptr; - size_t nbytes = 0; + std::size_t nbytes = 0; StatusCode sc = m_inputStreamingTool->getObject(&buffer, nbytes); while (sc.isRecoverable()) { // sleep @@ -761,23 +799,11 @@ void AthenaPoolCnvSvc::setObjPtr(void*& obj, const Token* token) const { ATH_MSG_WARNING("Failed to get Data for " << token->toString()); obj = nullptr; } else { - if (m_doChronoStat) { - m_chronoStatSvc->chronoStart("rDeser_ALL"); - } obj = m_serializeSvc->deserialize(buffer, nbytes, token->classID()); buffer = nullptr; - if (m_doChronoStat) { - m_chronoStatSvc->chronoStop("rDeser_ALL"); - } - if (m_doChronoStat) { - m_chronoStatSvc->chronoStart("rAux_ALL"); - } AuxDiscoverySvc auxDiscover; if (!auxDiscover.receiveStore(const_cast<IAthenaSerializeSvc*>(m_serializeSvc.get()), dynamic_cast<const IAthenaIPCTool*>(m_inputStreamingTool.get()), obj).isSuccess()) { ATH_MSG_ERROR("Failed to get Dynamic Aux Store for " << token->toString()); } - if (m_doChronoStat) { - m_chronoStatSvc->chronoStop("rAux_ALL"); - } } } } else if (!m_inputStreamingTool.empty() && m_inputStreamingTool->isServer()) { @@ -805,7 +831,13 @@ StatusCode AthenaPoolCnvSvc::createAddress(long svcType, return(StatusCode::FAILURE); } Token* token = nullptr; - if (!m_inputStreamingTool.empty() && m_inputStreamingTool->isClient()) { + if (par[0] == "SHM") { + token = new Token(); + token->setOid(Token::OID_t(ip[0], ip[1])); + token->setAuxString("[PNAME=" + par[1] + "]"); + RootType classDesc = RootType::ByName(par[1]); + token->setClassID(pool::DbReflex::guid(classDesc)); + } else if (!m_inputStreamingTool.empty() && m_inputStreamingTool->isClient()) { Token addressToken; addressToken.setDb(par[0].substr(4)); addressToken.setCont(par[1]); @@ -815,7 +847,7 @@ StatusCode AthenaPoolCnvSvc::createAddress(long svcType, return(StatusCode::FAILURE); } void* buffer = nullptr; - size_t nbytes = 0; + std::size_t nbytes = 0; StatusCode sc = m_inputStreamingTool->getObject(&buffer, nbytes); while (sc.isRecoverable()) { // sleep @@ -894,7 +926,8 @@ StatusCode AthenaPoolCnvSvc::makeServer(int num) { num = -num; m_streamServer = int(num / 1024); num = num % 1024; - if (!m_outputStreamingTool.empty() && !m_outputStreamingTool[m_streamServer]->isServer()) { + if (!m_outputStreamingTool.empty() && m_streamServer < m_outputStreamingTool.size() + && !m_outputStreamingTool[m_streamServer]->isServer()) { ATH_MSG_DEBUG("makeServer: " << m_outputStreamingTool << " = " << num); if (m_outputStreamingTool[m_streamServer]->makeServer(num).isFailure()) { ATH_MSG_ERROR("makeServer: " << m_outputStreamingTool << " failed"); @@ -954,35 +987,21 @@ StatusCode AthenaPoolCnvSvc::readData() const { } this->setObjPtr(instance, &token); // Serialize object via ROOT - if (m_doChronoStat) { - m_chronoStatSvc->chronoStart("rSer_ALL"); - } RootType cltype(pool::DbReflex::forGuid(token.classID())); void* buffer = nullptr; - size_t nbytes = 0; + std::size_t nbytes = 0; buffer = m_serializeSvc->serialize(instance, cltype, nbytes); - if (m_doChronoStat) { - m_chronoStatSvc->chronoStop("rSer_ALL"); - m_chronoStatSvc->chronoStart("pObj_ALL"); - } sc = m_inputStreamingTool->putObject(buffer, nbytes, num); delete [] static_cast<char*>(buffer); buffer = nullptr; if (!sc.isSuccess()) { ATH_MSG_ERROR("Could not share object for: " << token.toString()); return(StatusCode::FAILURE); } - if (m_doChronoStat) { - m_chronoStatSvc->chronoStop("pObj_ALL"); - m_chronoStatSvc->chronoStart("rAux_ALL"); - } AuxDiscoverySvc auxDiscover; if (!auxDiscover.sendStore(const_cast<IAthenaSerializeSvc*>(m_serializeSvc.get()), dynamic_cast<const IAthenaIPCTool*>(m_inputStreamingTool.get()), instance, token.classID(), token.contID(), num).isSuccess()) { ATH_MSG_ERROR("Could not share dynamic aux store for: " << token.toString()); return(StatusCode::FAILURE); } - if (m_doChronoStat) { - m_chronoStatSvc->chronoStop("rAux_ALL"); - } cltype.Destruct(instance); instance = nullptr; if (!m_inputStreamingTool->putObject(nullptr, 0, num).isSuccess()) { ATH_MSG_ERROR("Could not share object for: " << token.toString()); @@ -1053,13 +1072,14 @@ AthenaPoolCnvSvc::AthenaPoolCnvSvc(const std::string& name, ISvcLocator* pSvcLoc declareProperty("SkipFirstChronoCommit", m_skipFirstChronoCommit = false); declareProperty("InputStreamingTool", m_inputStreamingTool); declareProperty("OutputStreamingTool", m_outputStreamingTool); + declareProperty("OutputMetadataContainer", m_metadataContainerProp); } //______________________________________________________________________________ AthenaPoolCnvSvc::~AthenaPoolCnvSvc() { } //__________________________________________________________________________ StatusCode AthenaPoolCnvSvc::decodeOutputSpec(std::string& fileSpec, - pool::DbType& outputTech) const { + pool::DbType& outputTech) const { outputTech = pool::ROOTTREE_StorageType; if (fileSpec.find("oracle") == 0 || fileSpec.find("mysql") == 0) { outputTech = pool::POOL_RDBMS_StorageType; @@ -1074,9 +1094,9 @@ StatusCode AthenaPoolCnvSvc::decodeOutputSpec(std::string& fileSpec, } //__________________________________________________________________________ void AthenaPoolCnvSvc::extractPoolAttributes(const StringArrayProperty& property, - std::vector<std::vector<std::string> >* contAttr, - std::vector<std::vector<std::string> >* dbAttr, - std::vector<std::vector<std::string> >* domAttr) const { + std::vector<std::vector<std::string> >* contAttr, + std::vector<std::vector<std::string> >* dbAttr, + std::vector<std::vector<std::string> >* domAttr) const { std::vector<std::string> opt; std::string attributeName, containerName, databaseName, valueString; for (std::vector<std::string>::const_iterator iter = property.value().begin(), @@ -1130,11 +1150,11 @@ void AthenaPoolCnvSvc::extractPoolAttributes(const StringArrayProperty& property } //__________________________________________________________________________ StatusCode AthenaPoolCnvSvc::processPoolAttributes(std::vector<std::vector<std::string> >& attr, - const std::string& fileName, - unsigned long contextId, - bool doGet, - bool doSet, - bool doClear) const { + const std::string& fileName, + unsigned long contextId, + bool doGet, + bool doSet, + bool doClear) const { bool retError = false; if (!m_inputStreamingTool.empty() && m_inputStreamingTool->isClient()) doGet = false; for (std::vector<std::vector<std::string> >::iterator iter = attr.begin(), last = attr.end(); diff --git a/Database/AthenaPOOL/AthenaPoolCnvSvc/src/AthenaPoolCnvSvc.h b/Database/AthenaPOOL/AthenaPoolCnvSvc/src/AthenaPoolCnvSvc.h index 51989ca09c8e231935ba1d98cc910bfcce8f2bce..d5dc74dc09c18f3cf2418eff92e24e5bb3080a41 100644 --- a/Database/AthenaPOOL/AthenaPoolCnvSvc/src/AthenaPoolCnvSvc.h +++ b/Database/AthenaPOOL/AthenaPoolCnvSvc/src/AthenaPoolCnvSvc.h @@ -251,6 +251,8 @@ private: // properties mutable CallMutex m_i_mut; mutable CallMutex m_o_mut; #endif + /// For SharedWriter to use MetadataSvc to merge data placed in a certain container + StringProperty m_metadataContainerProp; }; #endif diff --git a/Database/AthenaPOOL/AthenaPoolTools/CMakeLists.txt b/Database/AthenaPOOL/AthenaPoolTools/CMakeLists.txt index fd67852d5d1d47ed92c21065465cf05726ec0ef0..4a7a8eaaa429f5f7a3926836b91956c048b837ff 100644 --- a/Database/AthenaPOOL/AthenaPoolTools/CMakeLists.txt +++ b/Database/AthenaPOOL/AthenaPoolTools/CMakeLists.txt @@ -13,6 +13,7 @@ atlas_depends_on_subdirs( PRIVATE Database/PersistentDataModel Event/EventInfo Event/xAOD/xAODEventInfo + Event/xAOD/xAODCutFlow GaudiKernel TestPolicy ) @@ -21,7 +22,7 @@ atlas_add_component( AthenaPoolTools src/EventCount.cxx src/RequireUniqueEvent.cxx src/components/*.cxx - LINK_LIBRARIES AthenaBaseComps AthenaKernel StoreGateLib SGtests PersistentDataModel EventInfo xAODEventInfo GaudiKernel ) + LINK_LIBRARIES AthenaBaseComps AthenaKernel StoreGateLib SGtests PersistentDataModel EventInfo xAODEventInfo GaudiKernel xAODCutFlow ) # Install files from the package: atlas_install_joboptions( share/*.py ) diff --git a/Database/AthenaPOOL/AthenaPoolTools/src/EventCount.cxx b/Database/AthenaPOOL/AthenaPoolTools/src/EventCount.cxx index f3b817531a0e09127867ab823e239298113cc213..ef5a6682b8e4f6988d54fa5291a06a9818153a74 100755 --- a/Database/AthenaPOOL/AthenaPoolTools/src/EventCount.cxx +++ b/Database/AthenaPOOL/AthenaPoolTools/src/EventCount.cxx @@ -25,6 +25,8 @@ #include "StoreGate/StoreGateSvc.h" #include "AthenaKernel/IClassIDSvc.h" +#include "xAODCutFlow/CutBookkeeperContainer.h" + //--------------- Utility Struct Constructors ---------------- EventCount::ObjSum::ObjSum() : num(-1) {keys.clear();} @@ -84,6 +86,27 @@ void EventCount::handle(const Incident& inc) else { ATH_MSG_ERROR("Could not get file name at BeginInputFile"); } + ServiceHandle<StoreGateSvc> mdstore("StoreGateSvc/InputMetaDataStore", name()); + if (mdstore.retrieve().isSuccess()) { + const DataHandle<xAOD::CutBookkeeperContainer> compBook(NULL); + if (mdstore->retrieve(compBook, "CutBookkeepers").isSuccess()) { + ATH_MSG_INFO("CBK size = " << compBook->size()); + for (auto it = compBook->begin(); it != compBook->end(); ++it) { + ATH_MSG_INFO("CBK name= " << (*it)->name() << " stream=" << (*it)->inputStream() << " N=" << (*it)->nAcceptedEvents() << " W=" << (*it)->sumOfEventWeights()); + } + } else { + ATH_MSG_INFO("CBK No bookkeepers " << mdstore->dump()); + } + const DataHandle<xAOD::CutBookkeeperContainer> incompBook(NULL); + if (mdstore->retrieve(incompBook, "IncompleteCutBookkeepers").isSuccess()) { + ATH_MSG_INFO("CBK size = " << incompBook->size()); + for (auto it = incompBook->begin(); it != incompBook->end(); ++it) { + ATH_MSG_INFO("CBK name= " << (*it)->name() << " stream=" << (*it)->inputStream() << " N=" << (*it)->nAcceptedEvents() << " W=" << (*it)->sumOfEventWeights()); + } + } else { + ATH_MSG_INFO("CBK No bookkeepers " << mdstore->dump()); + } + } } } diff --git a/Database/AthenaPOOL/OutputStreamAthenaPool/src/CopyEventStreamInfo.cxx b/Database/AthenaPOOL/OutputStreamAthenaPool/src/CopyEventStreamInfo.cxx index e9abf388d04c90fd14b4390921a11e49e7a3321e..517b03c638490451b75022b5fed02531c81cb9e0 100644 --- a/Database/AthenaPOOL/OutputStreamAthenaPool/src/CopyEventStreamInfo.cxx +++ b/Database/AthenaPOOL/OutputStreamAthenaPool/src/CopyEventStreamInfo.cxx @@ -67,47 +67,61 @@ StatusCode CopyEventStreamInfo::finalize() { void CopyEventStreamInfo::handle(const Incident& inc) { ATH_MSG_DEBUG("handle() " << inc.type()); if (inc.type() == "BeginInputFile") { - if (m_inputMetaDataStore->contains<EventStreamInfo>(m_key)) { - std::list<SG::ObjectWithVersion<EventStreamInfo> > allVersions; - if (!m_inputMetaDataStore->retrieveAllVersions(allVersions, m_key).isSuccess()) { - ATH_MSG_ERROR("Could not retrieve all versions for EventStreamInfo"); - return; + std::vector<std::string> keys; + if (m_key.value().empty()) { + m_inputMetaDataStore->keys<EventStreamInfo>(keys); + } else { + keys.push_back(m_key); + } + for (const auto &key : keys) { + // Ignore versioned container + if (key.substr(0, 1) == ";" && key.substr(3, 1) == ";") { + ATH_MSG_VERBOSE( "Ignore versioned containe: " << key ); + continue; } - EventStreamInfo* evtStrInfo_out = 0; - for (std::list<SG::ObjectWithVersion<EventStreamInfo> >::const_iterator iter = allVersions.begin(), - iterEnd = allVersions.end(); iter != iterEnd; iter++) { - const EventStreamInfo* evtStrInfo_in = iter->dataObject; - if (!m_metaDataStore->contains<EventStreamInfo>(m_key)) { - evtStrInfo_out = new EventStreamInfo(*evtStrInfo_in); - if (!m_metaDataStore->record(evtStrInfo_out, m_key).isSuccess()) { - ATH_MSG_ERROR("Could not record DataObject: " << m_key); - return; - } - } else { - if (!m_metaDataStore->retrieve(evtStrInfo_out, m_key).isSuccess()) { - ATH_MSG_ERROR("Could not find DataObject in output: " << m_key); - return; - } - evtStrInfo_out->addEvent(evtStrInfo_in->getNumberOfEvents()); - for (std::set<unsigned int>::const_iterator elem = evtStrInfo_in->getRunNumbers().begin(), - lastElem = evtStrInfo_in->getRunNumbers().end(); elem != lastElem; elem++) { - evtStrInfo_out->insertRunNumber(*elem); - } - for (std::set<unsigned int>::const_iterator elem = evtStrInfo_in->getLumiBlockNumbers().begin(), - lastElem = evtStrInfo_in->getLumiBlockNumbers().end(); elem != lastElem; elem++) { - evtStrInfo_out->insertLumiBlockNumber(*elem); - } - for (std::set<std::string>::const_iterator elem = evtStrInfo_in->getProcessingTags().begin(), - lastElem = evtStrInfo_in->getProcessingTags().end(); elem != lastElem; elem++) { - evtStrInfo_out->insertProcessingTag(*elem); - } - for (std::set<std::pair<CLID, std::string> >::const_iterator elem = evtStrInfo_in->getItemList().begin(), - lastElem = evtStrInfo_in->getItemList().end(); elem != lastElem; elem++) { - evtStrInfo_out->insertItemList((*elem).first, (*elem).second); - } - for (std::set<EventType>::const_iterator elem = evtStrInfo_in->getEventTypes().begin(), - lastElem = evtStrInfo_in->getEventTypes().end(); elem != lastElem; elem++) { - evtStrInfo_out->insertEventType(*elem); + ATH_MSG_VERBOSE("Attempting to copy " << key); + if (m_inputMetaDataStore->contains<EventStreamInfo>(key)) { + std::list<SG::ObjectWithVersion<EventStreamInfo> > allVersions; + if (!m_inputMetaDataStore->retrieveAllVersions(allVersions, key).isSuccess()) { + ATH_MSG_ERROR("Could not retrieve all versions for EventStreamInfo"); + return; + } + EventStreamInfo* evtStrInfo_out = 0; + for (std::list<SG::ObjectWithVersion<EventStreamInfo> >::const_iterator iter = allVersions.begin(), + iterEnd = allVersions.end(); iter != iterEnd; iter++) { + const EventStreamInfo* evtStrInfo_in = iter->dataObject; + if (!m_metaDataStore->contains<EventStreamInfo>(key)) { + evtStrInfo_out = new EventStreamInfo(*evtStrInfo_in); + if (!m_metaDataStore->record(evtStrInfo_out, key).isSuccess()) { + ATH_MSG_ERROR("Could not record DataObject: " << key); + return; + } + } else { + if (!m_metaDataStore->retrieve(evtStrInfo_out, key).isSuccess()) { + ATH_MSG_ERROR("Could not find DataObject in output: " << key); + return; + } + evtStrInfo_out->addEvent(evtStrInfo_in->getNumberOfEvents()); + for (std::set<unsigned int>::const_iterator elem = evtStrInfo_in->getRunNumbers().begin(), + lastElem = evtStrInfo_in->getRunNumbers().end(); elem != lastElem; elem++) { + evtStrInfo_out->insertRunNumber(*elem); + } + for (std::set<unsigned int>::const_iterator elem = evtStrInfo_in->getLumiBlockNumbers().begin(), + lastElem = evtStrInfo_in->getLumiBlockNumbers().end(); elem != lastElem; elem++) { + evtStrInfo_out->insertLumiBlockNumber(*elem); + } + for (std::set<std::string>::const_iterator elem = evtStrInfo_in->getProcessingTags().begin(), + lastElem = evtStrInfo_in->getProcessingTags().end(); elem != lastElem; elem++) { + evtStrInfo_out->insertProcessingTag(*elem); + } + for (std::set<std::pair<CLID, std::string> >::const_iterator elem = evtStrInfo_in->getItemList().begin(), + lastElem = evtStrInfo_in->getItemList().end(); elem != lastElem; elem++) { + evtStrInfo_out->insertItemList((*elem).first, (*elem).second); + } + for (std::set<EventType>::const_iterator elem = evtStrInfo_in->getEventTypes().begin(), + lastElem = evtStrInfo_in->getEventTypes().end(); elem != lastElem; elem++) { + evtStrInfo_out->insertEventType(*elem); + } } } } diff --git a/Database/PersistentDataModel/src/Placement.cxx b/Database/PersistentDataModel/src/Placement.cxx index b931e26c463f0aa78cafade59c8aaa6317700c57..5cfbcb150110c79fb214bfa486e8daca8608a325 100755 --- a/Database/PersistentDataModel/src/Placement.cxx +++ b/Database/PersistentDataModel/src/Placement.cxx @@ -39,8 +39,14 @@ Placement& Placement::fromString(const std::string& source) { } else if (::strncmp(fmt_tech, p1, 6) == 0) { ::sscanf(p1, fmt_tech, &m_technology); } else { - m_auxString = p1; - break; + while (*(p2 + 1) == '[' && *(++p3) != 0 && *p3 != ']') { + p3 = ::strchr(p3, ']'); + } + char* p3mod = const_cast<char*>(p3); + *p3mod = 0; + m_auxString += p1; + m_auxString += "]"; + *p3mod = ']'; } } } diff --git a/Database/PersistentDataModel/src/Token.cxx b/Database/PersistentDataModel/src/Token.cxx index cc1715eee05325d7c546dfda9f6e91bdb1cadfa2..c99cfc5831ac2db14cfe5d8fac3af1da0d597a67 100755 --- a/Database/PersistentDataModel/src/Token.cxx +++ b/Database/PersistentDataModel/src/Token.cxx @@ -147,8 +147,14 @@ Token& Token::fromString(const std::string& source) { } else if (::strncmp(fmt_tech, p1, 6) == 0) { ::sscanf(p1, fmt_tech, &m_technology); } else { - m_auxString = p1; - break; + while (*(p2 + 1) == '[' && *(++p3) != 0 && *p3 != ']') { + p3 = ::strchr(p3, ']'); + } + char* p3mod = const_cast<char*>(p3); + *p3mod = 0; + m_auxString += p1; + m_auxString += "]"; + *p3mod = ']'; } } } diff --git a/Event/EventBookkeeperTools/src/CutFlowSvc.cxx b/Event/EventBookkeeperTools/src/CutFlowSvc.cxx index ae38cbfa5813dba6c93566ad819564e3c74a6dd0..90346148fd881fcda2cb7648536c9191bcf55966 100644 --- a/Event/EventBookkeeperTools/src/CutFlowSvc.cxx +++ b/Event/EventBookkeeperTools/src/CutFlowSvc.cxx @@ -971,6 +971,10 @@ CutFlowSvc::queryInterface( const InterfaceID& riid, void** ppvi ) *ppvi = static_cast<ICutFlowSvc*>(this); addRef(); // NB! : inrement the reference count! return StatusCode::SUCCESS; // RETURN + } else if ( IIncidentListener::interfaceID() == riid ) { + *ppvi = static_cast<IIncidentListener*>(this); + addRef(); // NB! : inrement the reference count! + return StatusCode::SUCCESS; // RETURN } // Interface is not directly availible: try out a base class return AthService::queryInterface( riid, ppvi );