From 7368dd300707a86f7ee41176a6b2ba6524785ab9 Mon Sep 17 00:00:00 2001 From: cranshaw <Jack.Cranshaw@cern.ch> Date: Tue, 14 Aug 2018 10:35:14 -0500 Subject: [PATCH] First prototype of generic metadata tool Former-commit-id: 194a93ff365f0e3adcca5e715c3b038bab6f3b5c --- .../AthenaKernel/GenericMetadataTool.h | 104 ++++ .../AthenaKernel/GenericMetadataTool.icc | 511 ++++++++++++++++++ 2 files changed, 615 insertions(+) create mode 100644 Control/AthenaKernel/AthenaKernel/GenericMetadataTool.h create mode 100644 Control/AthenaKernel/AthenaKernel/GenericMetadataTool.icc diff --git a/Control/AthenaKernel/AthenaKernel/GenericMetadataTool.h b/Control/AthenaKernel/AthenaKernel/GenericMetadataTool.h new file mode 100644 index 000000000000..4acf9858e754 --- /dev/null +++ b/Control/AthenaKernel/AthenaKernel/GenericMetadataTool.h @@ -0,0 +1,104 @@ +/* + Copyright (C) 2002-2017 CERN for the benefit of the ATLAS collaboration +*/ + +#ifndef GENERICMETADATATOOL_H +#define GENERICMETADATATOOL_H + +/** @file GenericMetadataTool.h + * @brief This file contains the class definition for the GenericMetadataTool class. + * @author Peter van Gemmeren <gemmeren@anl.gov> + * $Id: GenericMetadataTool.h 663679 2015-04-29 08:31:54Z krasznaa $ + **/ + +#include "AthenaBaseComps/AthAlgTool.h" +//#include "AsgTools/AsgMetadataTool.h" +#include "AthenaKernel/IMetaDataTool.h" +#include "GaudiKernel/IIncidentListener.h" +#include "GaudiKernel/ServiceHandle.h" +//#include "AthenaKernel/ICutFlowSvc.h" + +//#include "xAODCutFlow/CutBookkeeper.h" +//#include "xAODCutFlow/CutBookkeeperContainer.h" +//#include "xAODCutFlow/CutBookkeeperAuxContainer.h" + +#include <string> + +/** @class GenericMetadataTool + * @brief This class provides an example for reading with a ISelectorTool to veto events on AttributeList. + **/ + +template <typename T, typename U> +class GenericMetadataTool : public AthAlgTool, public virtual ::IMetaDataTool +{ +public: // Constructor and Destructor + /// Standard Service Constructor + GenericMetadataTool(const std::string& type, + const std::string& name, + const IInterface* parent); + /// Destructor + virtual ~GenericMetadataTool(); + +public: + virtual StatusCode metaDataStop(const SG::SourceID&); + virtual StatusCode beginInputFile(const SG::SourceID& sid = "Serial"); + virtual StatusCode endInputFile(const SG::SourceID& sid = "Serial"); + virtual StatusCode initialize(); + virtual StatusCode finalize(); +protected: + ServiceHandle<StoreGateSvc> inputMetaStore() const; + ServiceHandle<StoreGateSvc> outputMetaStore() const; + +private: + + /// Helper class to update a container with information from another one + //StatusCode updateContainer( T* contToUpdate, + // const T* otherCont ); + + StatusCode initOutputContainer(const std::string& sgkey); + + StatusCode buildAthenaInterface(const std::string& inputName, + const std::string& outputName, + const SG::SourceID& sid); + + /// Fill Cutflow information + StatusCode addProcessMetadata(); + + /// Pointer to cut flow svc + ServiceHandle<StoreGateSvc> m_inputMetaStore; + ServiceHandle<StoreGateSvc> m_outputMetaStore; + + /// The name of the output Container + std::string m_outputCollName; + + /// The name of the input Container + std::string m_inputCollName; + + /// The name of the process Container + std::string m_procMetaName; + + bool m_processMetadataTaken; + bool m_markIncomplete; + + /// List of source ids which have reached end file + std::set<SG::SourceID> m_fullreads; + std::set<SG::SourceID> m_read; + std::set<SG::SourceID> m_written; + +}; + +template <typename T, typename U> +inline ServiceHandle<StoreGateSvc> GenericMetadataTool<T,U>::inputMetaStore() const +{ + return m_inputMetaStore; +} + +template <typename T, typename U> +inline ServiceHandle<StoreGateSvc> GenericMetadataTool<T,U>::outputMetaStore() const +{ + return m_outputMetaStore; +} + +#include "GenericMetadataTool.icc" +#endif + diff --git a/Control/AthenaKernel/AthenaKernel/GenericMetadataTool.icc b/Control/AthenaKernel/AthenaKernel/GenericMetadataTool.icc new file mode 100644 index 000000000000..b8776344f30f --- /dev/null +++ b/Control/AthenaKernel/AthenaKernel/GenericMetadataTool.icc @@ -0,0 +1,511 @@ +///////////////////////// -*- C++ -*- ///////////////////////////// + +/* + Copyright (C) 2002-2017 CERN for the benefit of the ATLAS collaboration +*/ + +// Implementation file for class GenericMetadataTool +// Authors: Joao Firmino da Costa <joao.costa@cern.ch> and David Cote <david.cote@cern.ch> +/////////////////////////////////////////////////////////////////// + +// STL include +#include <algorithm> + +#include "GaudiKernel/Incident.h" +#include "GaudiKernel/FileIncident.h" +#include "GaudiKernel/IIncidentSvc.h" +#include "AthenaKernel/MetaCont.h" +#include "AthenaKernel/ClassID_traits.h" +#include "AthenaKernel/errorcheck.h" +#include "StoreGate/WriteMetaHandle.h" +#include "AthenaBaseComps/AthCheckMacros.h" +#include "AthContainersInterfaces/IConstAuxStoreMeta.h" + +template <typename T, typename U> +GenericMetadataTool<T,U>::GenericMetadataTool(const std::string& type, + const std::string& name, + const IInterface* parent) + : AthAlgTool(type,name,parent), + m_inputMetaStore( "StoreGateSvc/InputMetaDataStore", name ), + m_outputMetaStore( "StoreGateSvc/MetaDataStore", name ), + m_processMetadataTaken(false), + m_markIncomplete(true) +{ + declareProperty("OutputCollName", m_outputCollName="GenericMetadataOutput", + "The default name of the container for output files"); + declareProperty("InputCollName", m_inputCollName = "GenericMetadataInput", + "The default name of the container for input files"); + declareProperty("ProcessMetadataCollName", m_procMetaName = "ProcessMetadata", + "The default name of the container for process meta"); + declareProperty("MarkIncomplete", m_markIncomplete = true, + "Defaults to filling both complete and incomplete bookkeepers"); + declareInterface< ::IMetaDataTool >( this ); +} + + + +template <typename T, typename U> +GenericMetadataTool<T,U>::~GenericMetadataTool() +{ +} + + + +template <typename T, typename U> +StatusCode +GenericMetadataTool<T,U>::initialize() +{ + ATH_MSG_DEBUG( "Initializing " << name() << " - package version " << PACKAGE_VERSION ); + + ATH_MSG_DEBUG("InputCollName = " << m_inputCollName); + ATH_MSG_DEBUG("OutputCollName = " << m_outputCollName); + ATH_MSG_DEBUG("ProcessMetadataCollName = " << m_procMetaName); + + return StatusCode::SUCCESS; +} + + + +template <typename T, typename U> +StatusCode GenericMetadataTool<T,U>::beginInputFile(const SG::SourceID& sid) +{ + ATH_MSG_DEBUG("beginInputFile " << this->name() << "\n" << outputMetaStore()->dump()); + //OPENING NEW INPUT FILE + //Things to do: + // 1) note that a file is currently opened + // 2) Load CutBookkeepers from input file + // 2a) if incomplete from input, directly propagate to output + // 2b) if complete from input, wait for EndInputFile to decide what to do in output + + const std::string storename("MetaDataStore+"); + if (m_inputCollName != "") { // are there inputs + // IF NO METACONT IN OUTPUT STORE YET + // Initialize MetaCont for incomplete and tmp containers in output store + // + std::string tmp_name = storename+m_outputCollName+"tmp"; + ATH_CHECK(buildAthenaInterface(m_inputCollName,tmp_name,sid)); + + // Do the following if we want incomplete processings marked + if (m_markIncomplete) { + std::string inc_name = storename+"Incomplete"+m_outputCollName; + std::string input_inc_name = "Incomplete"+m_inputCollName; + ATH_CHECK(buildAthenaInterface(input_inc_name,inc_name,sid)); + } + } // inputCollName if + + // reset cutflow taken marker + m_processMetadataTaken = false; + + m_read.insert(sid); + + return StatusCode::SUCCESS; +} + + +template <typename T, typename U> +StatusCode GenericMetadataTool<T,U>::endInputFile(const SG::SourceID& sid) +{ + // Add the sid to the list of complete sids + if (m_inputCollName != "") { // are there inputs + m_fullreads.insert(sid); + } + + return StatusCode::SUCCESS; +} + +template <typename T, typename U> +StatusCode GenericMetadataTool<T,U>::metaDataStop(const SG::SourceID&) +{ + const std::string storename("MetaDataStore+"); + if (m_inputCollName != "") { // are there inputs + //TERMINATING THE JOB (EndRun) + //Things to do: + // 1) Create new incomplete CutBookkeepers if relevant + // 2) Print cut flow summary + // 3) Write root file if requested + // Now retrieve pointers for the MetaConts + std::string tmp_name = storename+m_outputCollName+"tmpCont"; + const MetaCont<T>* tmp; + ATH_CHECK(outputMetaStore()->retrieve(tmp,tmp_name)); + T* outcom = new T(); + // ARSE + U* outcom_aux = new U(); + outcom->setStore(outcom_aux); + + if (m_markIncomplete) { + std::string inc_name = storename+"Incomplete"+m_outputCollName+"Cont"; + // Build incomplete container to fill + T* outinc = new T(); + // ARSE + U* outinc_aux = new U(); + outinc->setStore(outinc_aux); + // Check if there were any incomplete inputs + const MetaCont<T>* inc; + if(outputMetaStore()->retrieve(inc,inc_name).isSuccess()) { + + // Incomplete inputs can just be merged + auto sids_inc = inc->sources(); + T* contptr(nullptr); + for (auto it = sids_inc.begin(); it != sids_inc.end(); ++it) { + if (!inc->find(*it,contptr)) { + ATH_MSG_ERROR("Container sid list did not match contents"); + } else { + //ATH_CHECK(updateContainer(outinc,contptr)); + } + contptr = nullptr; + } + } else { + ATH_MSG_INFO("Did not find MetaCont for " << inc_name << ", assuming input had no incomplete bookkeepers"); + } + + // Loop over containers and mark based on end files seen + auto sids_tmp = tmp->sources(); + T* contptr(nullptr); + for (auto it = sids_tmp.begin(); it != sids_tmp.end(); ++it) { + if (!tmp->find(*it,contptr)) { + ATH_MSG_ERROR("Container sid list did not match contents"); + } else { + bool complete = std::find(m_fullreads.begin(), + m_fullreads.end(), + *it) != m_fullreads.end(); + bool not_written = std::find(m_written.begin(), + m_written.end(), + *it) == m_written.end(); + if (complete && not_written) { + //ATH_CHECK(updateContainer(outcom,contptr)); + } else { + //ATH_CHECK(updateContainer(outinc,contptr)); + } + } + } + + std::string incout_name = "Incomplete"+m_outputCollName; + // Do any cleanup + if (outputMetaStore()->contains(ClassID_traits<T>::ID(),incout_name) ) { + ATH_MSG_INFO("Cleaning up class for " << incout_name); + const T* tmpBook(nullptr); + if ( outputMetaStore()->retrieve(tmpBook,incout_name).isSuccess() ) { + const SG::IConstAuxStore* tmpBookAux = tmpBook->getConstStore(); + ATH_CHECK(outputMetaStore()->removeDataAndProxy(tmpBook)); + ATH_CHECK(outputMetaStore()->removeDataAndProxy(tmpBookAux)); + } + else ATH_MSG_INFO("StoreGate failed retrieve"); + } + ATH_CHECK(outputMetaStore()->record(outinc,incout_name)); + ATH_CHECK(outputMetaStore()->record(outinc_aux,incout_name+"Aux.")); + } // markIncomplete + else { + auto sids_tmp = tmp->sources(); + T* contptr(nullptr); + // just merge complete inputs into complete/output container + for (auto it = sids_tmp.begin(); it != sids_tmp.end(); ++it) { + if (!tmp->find(*it,contptr)) { + ATH_MSG_ERROR("Container sid list did not match contents"); + } else { + // default to not worrying about marking + //ATH_CHECK(updateContainer(outcom,contptr)); + } + } + } + + // Record container objects directly in store for output + if (outputMetaStore()->contains(ClassID_traits<T>::ID(),m_outputCollName)) { + ATH_MSG_INFO("Cleaning up class for " << m_outputCollName); + const T* tmpBook(nullptr); + if ( outputMetaStore()->retrieve(tmpBook,m_outputCollName).isSuccess() ) { + const SG::IConstAuxStore* tmpBookAux = tmpBook->getConstStore(); + ATH_CHECK(outputMetaStore()->removeDataAndProxy(tmpBook)); + ATH_CHECK(outputMetaStore()->removeDataAndProxy(tmpBookAux)); + } + else ATH_MSG_ERROR("StoreGate failed retrieve"); + } + ATH_CHECK(outputMetaStore()->record(outcom,m_outputCollName)); + ATH_CHECK(outputMetaStore()->record(outcom_aux,m_outputCollName+"Aux.")); + } // inputCollName if + + if (!m_processMetadataTaken) { + if (addProcessMetadata().isFailure()) { + ATH_MSG_ERROR("Could not add CutFlow information"); + } + } + else { + ATH_MSG_DEBUG("Process metadata written into container before metaDataStop"); + } + + // Reset after metadata stop + m_processMetadataTaken = false; + + if (m_inputCollName != "") { // are there inputs + // Copy read files into written files + //std::copy(m_read.begin(),m_read.end(),back_inserter(m_written)); + for (auto it = m_read.begin(); it != m_read.end(); ++it) { + m_written.insert(*it); + } + // Remove completes from read + for (auto it = m_fullreads.begin(); it != m_fullreads.end(); ++it) { + m_read.erase(*it); + //std::remove(m_read.begin(); m_read.end(), *it); + } + m_fullreads.clear(); + } // inputCollName if + + return StatusCode::SUCCESS; +} + + +template <typename T, typename U> +StatusCode +GenericMetadataTool<T,U>::finalize() +{ + ATH_MSG_DEBUG( "Finalizing " << name() << " - package version " << PACKAGE_VERSION ); + return StatusCode::SUCCESS; +} + + +template <typename T, typename U> +StatusCode GenericMetadataTool<T,U>::initOutputContainer( const std::string& sgkey) +{ + std::string key = sgkey; + // Create the primary container + // Put it in a MetaCont + // ARSE + //MetaCont<T>* mcont = new MetaCont<T>(DataObjID("xAOD::CutBookkeeperContainer",key)); + MetaCont<T>* mcont = new MetaCont<T>(DataObjID(ClassID_traits<T>::ID(),key)); + // Do the same for the auxiliary container + std::string auxkey(key+"Aux."); + // ARSE + //MetaCont<T>* acont = new MetaCont<T>(DataObjID("xAOD::CutBookkeeperAuxContainer",auxkey)); + MetaCont<T>* acont = new MetaCont<T>(DataObjID("xAOD::CutBookkeeperAuxContainer",auxkey)); + ATH_CHECK(outputMetaStore()->record(std::move(mcont),key)); + ATH_CHECK(outputMetaStore()->record(std::move(acont),auxkey)); + ATH_CHECK(outputMetaStore()->symLink + ( + ClassID_traits<MetaCont<T> >::ID(), + auxkey, + ClassID_traits<T>::ID() + )); + + return StatusCode::SUCCESS; +} + +//--------------------------------------------------------// +// MetaConts are only needed when reading in Athena +// This builds them and populates them with bookeepers from the input store +//--------------------------------------------------------// +template <typename T, typename U> +StatusCode GenericMetadataTool<T,U>::buildAthenaInterface(const std::string& inputName, + const std::string& outputName, + const SG::SourceID& sid) +{ + // Make sure the MetaCont is ready in the output store for outputName + // If not, then create it + std::string name = outputName+"Cont"; + if( !(outputMetaStore()->contains(ClassID_traits<MetaCont<T> >::ID(),name)) ) { + ATH_CHECK(this->initOutputContainer(name)); + } + else { + ATH_MSG_WARNING("incomplete collection already exists"); + } + + // Retrieve pointer for the MetaCont + MetaCont<T>* mc; + ATH_CHECK(outputMetaStore()->retrieve(mc,name)); + + // Make sure sid does not already exist in the MetaCont + if ( std::find(mc->sources().begin(),mc->sources().end(),sid) + != mc->sources().end() ) { + ATH_MSG_ERROR("Metadata already exists for sid " << sid); + return StatusCode::FAILURE; // Fail if sid already exists + } + + // Get the input bookkeeper of the input metadata store + const T* cbc; + if (inputMetaStore()->contains(ClassID_traits<T>::ID(),inputName) ) { + StatusCode ssc = inputMetaStore()->retrieve( cbc, inputName ); + if (ssc.isSuccess()) { + // Insert input bookkeeper into MetaCont for this sid + T* tostore = new T(*cbc); + if ( !mc->insert(sid,tostore) ) { + ATH_MSG_ERROR("Unable to insert " << inputName << " for " << sid << " with key " << name); + return StatusCode::FAILURE; // Fail if insert to mc fails + } + } + else { + ATH_MSG_ERROR("Could not retrieve class with name " << inputName << " in input store"); + return StatusCode::FAILURE; // Fail if store contains, but not retrieves + } + } + else { + ATH_MSG_WARNING("No " << inputName << " data in this file "); + } + + return StatusCode::SUCCESS; +} + +template <typename T, typename U> +StatusCode GenericMetadataTool<T,U>::addProcessMetadata() +{ + // Add the information from the current processing to the complete output + // --> same paradigm as original CutFlowSvc + // Get the complete bookkeeper collection of the output meta-data store + T* completeBook(NULL); + if( !(outputMetaStore()->retrieve( completeBook, m_outputCollName) ).isSuccess() ) { + ATH_MSG_ERROR( "Could not get output container from output MetaDataStore" ); + return StatusCode::FAILURE; + } + + // Get the bookkeeper from the current processing + T* fileCompleteBook(NULL); + if( outputMetaStore()->contains(ClassID_traits<T>::ID(),m_procMetaName) ) { + if( !(outputMetaStore()->retrieve( fileCompleteBook, m_procMetaName) ).isSuccess() ) { + ATH_MSG_WARNING( "Could not get process metadata from output MetaDataStore" ); + } + else { + // update the complete output with the complete input + //ATH_CHECK(this->updateContainer(completeBook,fileCompleteBook)); + } + } + else { + ATH_MSG_INFO("No process container " << m_procMetaName); + } + + return StatusCode::SUCCESS; +} + + +/* +namespace { + +xAOD::CutBookkeeper* +resolveLink (const xAOD::CutBookkeeper* old, + xAOD::CutBookkeeperContainer& contToUpdate, + const xAOD::CutBookkeeperContainer& otherCont, + const std::vector<size_t>& otherIndices) +{ + { + xAOD::CutBookkeeperContainer::iterator matchIter = + std::find( contToUpdate.begin(), + contToUpdate.end(), + old ); + if (matchIter != contToUpdate.end()) + return *matchIter; + } + + { + xAOD::CutBookkeeperContainer::const_iterator matchIter = + std::find( otherCont.begin(), + otherCont.end(), + old ); + if (matchIter != contToUpdate.end()) { + size_t pos = matchIter - otherCont.begin(); + return contToUpdate[otherIndices[pos]]; + } + } + + // If we didn't find it, we need to add it + xAOD::CutBookkeeper* newEBK = new xAOD::CutBookkeeper(); + if ( newEBK->usingPrivateStore() ) { newEBK->releasePrivateStore(); } + newEBK->makePrivateStore(old); + contToUpdate.push_back( newEBK ); + return newEBK; +} + + +} // anonymous namespace + +StatusCode +GenericMetadataTool::updateContainer( xAOD::CutBookkeeperContainer* contToUpdate, + const xAOD::CutBookkeeperContainer* otherCont ) +{ + ATH_MSG_DEBUG("calling updateContainer(...)" ); + ATH_MSG_VERBOSE("Have container to update with size=" << contToUpdate->size() + << ", and other container with size=" << otherCont->size() ); + + size_t origSize = contToUpdate->size(); + + // Vector of indices in contToUpdate of elements in otherCont. + std::vector< std::size_t > otherIndices (otherCont->size()); + // Loop through otherCont. + // If element already in contToUpdate, update event counts, otherwise create new element + for ( std::size_t i=0; i<otherCont->size(); ++i ) { + const xAOD::CutBookkeeper* otherEBK = otherCont->at(i); + ATH_MSG_VERBOSE("Looping through otherCont at index " << i); + ATH_MSG_VERBOSE("Have otherEBK with: name=" << otherEBK->name() + << ", cycle=" << otherEBK->cycle() + << ", nAcceptedEvents=" << otherEBK->nAcceptedEvents() + << ", inputStream=" << otherEBK->inputStream() ); + + + // Loop through the container to be updated (contToUpdate) and see if we find a match + bool foundEBKToUpdate(false); + for ( std::size_t j=0; j<contToUpdate->size(); ++j ) { + xAOD::CutBookkeeper* ebkToUpdate = contToUpdate->at(j); + // Check if they are identical, if so, update; else add otherEBK + if ( otherEBK->isEqualTo(ebkToUpdate) ) { + ebkToUpdate->setPayload( ebkToUpdate->payload() + otherEBK->payload() ); + otherIndices[i] = j; + foundEBKToUpdate = true; + break; + } + } // End: Inner loop over contToUpdate + if (!foundEBKToUpdate) { + xAOD::CutBookkeeper* newEBK = new xAOD::CutBookkeeper(); + if ( newEBK->usingPrivateStore() ) { newEBK->releasePrivateStore(); } + newEBK->makePrivateStore(otherEBK); + contToUpdate->push_back( newEBK ); + std::size_t ebIdx = newEBK->index(); + otherIndices[i] = ebIdx; + } + } // End: Outer loop over contToUpdate + + // Now, we still need to fix the cross-referencing of the newly added CutBookkkeepers + for ( std::size_t i=origSize; i<contToUpdate->size(); ++i ) { + xAOD::CutBookkeeper* ebkToModify = contToUpdate->at(i); + + // Parent check + if ( ebkToModify->hasParent() ) { + const xAOD::CutBookkeeper* oldParent = ebkToModify->parent(); + const xAOD::CutBookkeeper* newParent = resolveLink (oldParent, + *contToUpdate, + *otherCont, + otherIndices); + ebkToModify->setParent (newParent); + } // Done fixing parent + + // Children check + std::vector< xAOD::CutBookkeeper* > newChildren; + for ( std::size_t oldIdx=0; oldIdx<ebkToModify->nChildren(); ++oldIdx ) { + const xAOD::CutBookkeeper* oldEBK = ebkToModify->child(oldIdx); + newChildren.push_back (resolveLink (oldEBK, + *contToUpdate, + *otherCont, + otherIndices)); + } // Done fixing children + ebkToModify->setChildren (newChildren); + + // Used others check + std::vector< xAOD::CutBookkeeper* > newOthers; + for ( std::size_t oldIdx=0; oldIdx<ebkToModify->nUsedOthers(); ++oldIdx ) { + const xAOD::CutBookkeeper* oldEBK = ebkToModify->usedOther(oldIdx); + newOthers.push_back (resolveLink (oldEBK, + *contToUpdate, + *otherCont, + otherIndices)); + } // Done fixing used others + ebkToModify->setUsedOthers (newOthers); + + // Siblings check + std::vector< xAOD::CutBookkeeper* > newSiblings; + for ( std::size_t oldIdx=0; oldIdx<ebkToModify->nSiblings(); ++oldIdx ) { + const xAOD::CutBookkeeper* oldEBK = ebkToModify->sibling(oldIdx); + newSiblings.push_back (resolveLink (oldEBK, + *contToUpdate, + *otherCont, + otherIndices)); + } // Done fixing siblings + ebkToModify->setSiblings (newSiblings); + } // Done fixing all cross references + return StatusCode::SUCCESS; +} +*/ + -- GitLab