diff --git a/Database/AthenaPOOL/AthenaPoolCnvSvc/src/AthenaPoolCnvSvc.cxx b/Database/AthenaPOOL/AthenaPoolCnvSvc/src/AthenaPoolCnvSvc.cxx index ff5bb1041d4cd954640fec1ef934f5482da291bf..812cb145b36f9f0884dd53f1465efb4b5fbac1eb 100644 --- a/Database/AthenaPOOL/AthenaPoolCnvSvc/src/AthenaPoolCnvSvc.cxx +++ b/Database/AthenaPOOL/AthenaPoolCnvSvc/src/AthenaPoolCnvSvc.cxx @@ -403,6 +403,17 @@ StatusCode AthenaPoolCnvSvc::commitOutput(const std::string& /*outputConnectionS m_chronoStatSvc->chronoStop("wDeser_ALL"); } } + if (m_doChronoStat) { + m_chronoStatSvc->chronoStart("wAux_ALL"); + } + if (!receiveStore(m_outputStreamingTool.get(), obj, num).isSuccess()) { + ATH_MSG_ERROR("Failed to get Dynamic Aux Store for " << placementStr); + delete placementStr; placementStr = nullptr; + return(StatusCode::FAILURE); + } + if (m_doChronoStat) { + m_chronoStatSvc->chronoStop("wAux_ALL"); + } // Write object Placement placement; placement.fromString(placementStr); @@ -417,7 +428,7 @@ StatusCode AthenaPoolCnvSvc::commitOutput(const std::string& /*outputConnectionS if (className == "DataHeaderForm_p5") { GenericAddress address(POOL_StorageType, ClassID_traits<DataHeader>::ID(), token->toString(), placement.auxString()); IConverter* cnv = converter(ClassID_traits<DataHeader>::ID()); - if (!cnv->updateRepRefs(&address, (DataObject*)obj).isSuccess()) { + if (!cnv->updateRepRefs(&address, static_cast<DataObject*>(obj)).isSuccess()) { ATH_MSG_ERROR("Failed updateRepRefs for obj = " << token->toString()); delete token; token = nullptr; return(StatusCode::FAILURE); @@ -430,7 +441,7 @@ StatusCode AthenaPoolCnvSvc::commitOutput(const std::string& /*outputConnectionS if (className == "DataHeader_p5") { GenericAddress address(POOL_StorageType, ClassID_traits<DataHeader>::ID(), token->toString(), placement.auxString()); IConverter* cnv = converter(ClassID_traits<DataHeader>::ID()); - if (!cnv->updateRep(&address, (DataObject*)obj).isSuccess()) { + if (!cnv->updateRep(&address, static_cast<DataObject*>(obj)).isSuccess()) { ATH_MSG_ERROR("Failed updateRep for obj = " << token->toString()); delete token; token = nullptr; return(StatusCode::FAILURE); @@ -640,8 +651,20 @@ const Token* AthenaPoolCnvSvc::registerForWrite(const Placement* placement, usleep(100); sc = m_outputStreamingTool->putObject(buffer, nbytes); } - if (own) { delete [] (char*)buffer; } + if (own) { delete [] static_cast<const char*>(buffer); } buffer = nullptr; + std::string contName = strstr(placementStr.c_str(), "[CONT="); + contName = contName.substr(6, contName.find(']') - 6); + if (m_doChronoStat) { + m_chronoStatSvc->chronoStart("wAux_ALL"); + } + if (!sendStore(m_outputStreamingTool.get(), obj, pool::DbReflex::guid(classDesc), contName).isSuccess()) { + ATH_MSG_ERROR("Could not share dynamic aux store for: " << placementStr); + return(nullptr); + } + if (m_doChronoStat) { + m_chronoStatSvc->chronoStop("wAux_ALL"); + } if (!sc.isSuccess() || !m_outputStreamingTool->putObject(nullptr, 0).isSuccess()) { ATH_MSG_ERROR("Failed to put Data for " << placementStr); return(nullptr); @@ -695,7 +718,7 @@ void AthenaPoolCnvSvc::setObjPtr(void*& obj, const Token* token) const { m_chronoStatSvc->chronoStop("gObj_ALL"); } if (!sc.isSuccess()) { - delete [] (char*)buffer; buffer = nullptr; + delete [] static_cast<char*>(buffer); buffer = nullptr; ATH_MSG_WARNING("Failed to get Data for " << token->toString()); obj = nullptr; } else { @@ -707,46 +730,14 @@ void AthenaPoolCnvSvc::setObjPtr(void*& obj, const Token* token) const { m_chronoStatSvc->chronoStop("rDeser_ALL"); } } - nbytes = 1; // StreamingTool owns buffer, will stay around until last dynamic attribute is copied - sc = m_inputStreamingTool->getObject(&buffer, nbytes); - while (sc.isRecoverable() && nbytes > 0) { - // sleep - sc = m_inputStreamingTool->getObject(&buffer, nbytes); + if (m_doChronoStat) { + m_chronoStatSvc->chronoStart("rAux_ALL"); } - if (sc.isSuccess() && nbytes > 0) { - const_cast<Token*>(token)->setCont(std::string((char*)buffer)); + if (!receiveStore(m_inputStreamingTool.get(), obj).isSuccess()) { + ATH_MSG_ERROR("Failed to get Dynamic Aux Store for " << token->toString()); } - AuxDiscoverySvc auxDiscover; - if (auxDiscover.getAuxStore(obj, *token)) { - if (m_doChronoStat) { - m_chronoStatSvc->chronoStart("gObjD_ALL"); - } - void* attrName = nullptr; - void* typeName = nullptr; - void* elemName = nullptr; - // StreamingTool owns buffer, will stay around until last dynamic attribute is copied - while (m_inputStreamingTool->getObject(&attrName, nbytes).isSuccess() && nbytes > 0 && - m_inputStreamingTool->getObject(&typeName, nbytes).isSuccess() && nbytes > 0 && - m_inputStreamingTool->getObject(&elemName, nbytes).isSuccess() && nbytes > 0) { - nbytes = 0; - if (!m_inputStreamingTool->getObject(&buffer, nbytes).isSuccess()) { - ATH_MSG_WARNING("Failed to get dynamic attribute for " << (char*)attrName); - } else { - const RootType type(std::string((char*)typeName)); - void* dynAttr = nullptr; - if (type.IsFundamental()) { - dynAttr = buffer; buffer = nullptr; - } else { - dynAttr = m_serializeSvc->deserialize(buffer, nbytes, type); buffer = nullptr; - } - SG::auxid_t auxid = auxDiscover.getAuxID((char*)attrName, (char*)elemName, (char*)typeName); - auxDiscover.setData(auxid, dynAttr, type); - } - } - auxDiscover.setAuxStore(); - if (m_doChronoStat) { - m_chronoStatSvc->chronoStop("gObjD_ALL"); - } + if (m_doChronoStat) { + m_chronoStatSvc->chronoStop("rAux_ALL"); } } } else if (!m_inputStreamingTool.empty() && m_inputStreamingTool->isServer()) { @@ -795,8 +786,8 @@ StatusCode AthenaPoolCnvSvc::createAddress(long svcType, return(StatusCode::FAILURE); } else { token = new Token(); - token->fromString((char*)buffer); - delete [] (char*)buffer; buffer = nullptr; + token->fromString(static_cast<char*>(buffer)); + delete [] static_cast<char*>(buffer); buffer = nullptr; if (token->classID() == Guid::null()) { delete token; token = nullptr; } @@ -934,7 +925,7 @@ StatusCode AthenaPoolCnvSvc::readData() const { m_chronoStatSvc->chronoStart("pObj_ALL"); } sc = m_inputStreamingTool->putObject(buffer, nbytes, num); - delete [] (char*)buffer; buffer = nullptr; + delete [] static_cast<char*>(buffer); buffer = nullptr; if (!sc.isSuccess()) { ATH_MSG_ERROR("Could not share object for: " << token.toString()); return(StatusCode::FAILURE); @@ -943,42 +934,9 @@ StatusCode AthenaPoolCnvSvc::readData() const { m_chronoStatSvc->chronoStop("pObj_ALL"); m_chronoStatSvc->chronoStart("rAux_ALL"); } - AuxDiscoverySvc auxDiscover; - const SG::auxid_set_t& auxIDs = auxDiscover.getAuxIDs(instance, token); - if (!auxIDs.empty()) { - const std::string contId = token.contID(); - if (!m_inputStreamingTool->putObject(contId.c_str(), contId.size() + 1, num).isSuccess()) { - ATH_MSG_ERROR("Could not share container ID for: " << token.toString()); - return(StatusCode::FAILURE); - } - } - for (SG::auxid_set_t::const_iterator iter = auxIDs.begin(), last = auxIDs.end(); iter != last; iter++) { - const std::string& attrName = auxDiscover.getAttrName(*iter); - const std::string& typeName = auxDiscover.getTypeName(*iter); - const std::string& elemName = auxDiscover.getElemName(*iter); - if (!m_inputStreamingTool->putObject(attrName.c_str(), attrName.size() + 1, num).isSuccess() || - !m_inputStreamingTool->putObject(typeName.c_str(), typeName.size() + 1, num).isSuccess() || - !m_inputStreamingTool->putObject(elemName.c_str(), elemName.size() + 1, num).isSuccess()) { - ATH_MSG_ERROR("Could not share object header for: " << token.toString()); - return(StatusCode::FAILURE); - } - const std::type_info* tip = auxDiscover.getType(*iter); - if (tip == nullptr) { - ATH_MSG_ERROR("Could not get type_info for: " << token.toString()); - return(StatusCode::FAILURE); - } - RootType type(*tip); - if (type.IsFundamental()) { - sc = m_inputStreamingTool->putObject(auxDiscover.getData(*iter), type.SizeOf(), num); - } else { - buffer = m_serializeSvc->serialize(auxDiscover.getData(*iter), type, nbytes); - sc = m_inputStreamingTool->putObject(buffer, nbytes, num); - delete [] (char*)buffer; buffer = nullptr; - } - if (!sc.isSuccess()) { - ATH_MSG_ERROR("Could not share object decoration for: " << token.toString()); - return(StatusCode::FAILURE); - } + if (!sendStore(m_inputStreamingTool.get(), instance, token.classID(), token.contID(), num).isSuccess()) { + ATH_MSG_ERROR("Could not share dynamic aux store for: " << token.toString()); + return(StatusCode::FAILURE); } if (m_doChronoStat) { m_chronoStatSvc->chronoStop("rAux_ALL"); @@ -1202,3 +1160,99 @@ StatusCode AthenaPoolCnvSvc::processPoolAttributes(std::vector<std::vector<std:: } return(retError ? StatusCode::FAILURE : StatusCode::SUCCESS); } +//______________________________________________________________________________ +StatusCode AthenaPoolCnvSvc::receiveStore(const IAthenaIPCTool* tool, void* obj, int num) const { + void* buffer = nullptr; + size_t nbytes = 1; // StreamingTool owns buffer, will stay around until last dynamic attribute is copied + StatusCode sc = tool->getObject(&buffer, nbytes, num); + while (sc.isRecoverable() && nbytes > 0) { + sc = tool->getObject(&buffer, nbytes, num); + } + if (!sc.isSuccess() || nbytes == 0) { // No dynamic attributes + return(StatusCode::SUCCESS); + } + Guid classId; + classId.fromString(static_cast<char*>(buffer)); + if (!tool->getObject(&buffer, nbytes, num).isSuccess() || nbytes == 0) { + return(StatusCode::FAILURE); + } + const std::string contName = std::string(static_cast<char*>(buffer)); + AuxDiscoverySvc auxDiscover; + if (classId != Guid::null() && !contName.empty() && auxDiscover.getAuxStore(obj, classId, contName)) { + void* attrName = nullptr; + void* typeName = nullptr; + void* elemName = nullptr; + // StreamingTool owns buffer, will stay around until last dynamic attribute is copied + while (tool->getObject(&attrName, nbytes, num).isSuccess() && nbytes > 0 && + tool->getObject(&typeName, nbytes, num).isSuccess() && nbytes > 0 && + tool->getObject(&elemName, nbytes, num).isSuccess() && nbytes > 0) { + nbytes = 0; + if (tool->getObject(&buffer, nbytes, num).isSuccess()) { + const RootType type(std::string(static_cast<char*>(typeName))); + void* dynAttr = nullptr; + if (type.IsFundamental()) { + dynAttr = buffer; buffer = nullptr; + } else { + dynAttr = m_serializeSvc->deserialize(buffer, nbytes, type); buffer = nullptr; + } + SG::auxid_t auxid = auxDiscover.getAuxID(static_cast<char*>(attrName), + static_cast<char*>(elemName), + static_cast<char*>(typeName)); + auxDiscover.setData(auxid, dynAttr, type); + } + } + auxDiscover.setAuxStore(); + } + return(StatusCode::SUCCESS); +} +//______________________________________________________________________________ +StatusCode AthenaPoolCnvSvc::sendStore(const IAthenaIPCTool* tool, + const void* obj, + const Guid& classId, + const std::string& contName, + int num) const { + AuxDiscoverySvc auxDiscover; + const SG::auxid_set_t& auxIDs = auxDiscover.getAuxIDs(obj, classId, contName); + if (!auxIDs.empty()) { + const std::string& classIdStr = classId.toString(); + if (!tool->putObject(classIdStr.c_str(), classIdStr.size() + 1, num).isSuccess()) { + ATH_MSG_ERROR("Could not share classId: " << classIdStr); + return(StatusCode::FAILURE); + } + if (!tool->putObject(contName.c_str(), contName.size() + 1, num).isSuccess()) { + ATH_MSG_ERROR("Could not share contName: " << contName); + return(StatusCode::FAILURE); + } + } + for (SG::auxid_set_t::const_iterator iter = auxIDs.begin(), last = auxIDs.end(); iter != last; iter++) { + const std::string& attrName = auxDiscover.getAttrName(*iter); + const std::string& typeName = auxDiscover.getTypeName(*iter); + const std::string& elemName = auxDiscover.getElemName(*iter); + if (!tool->putObject(attrName.c_str(), attrName.size() + 1, num).isSuccess() || + !tool->putObject(typeName.c_str(), typeName.size() + 1, num).isSuccess() || + !tool->putObject(elemName.c_str(), elemName.size() + 1, num).isSuccess()) { + ATH_MSG_ERROR("Could not share object header for: " << contName); + return(StatusCode::FAILURE); + } + const std::type_info* tip = auxDiscover.getType(*iter); + if (tip == nullptr) { + ATH_MSG_ERROR("Could not get type_info for: " << contName); + return(StatusCode::FAILURE); + } + RootType type(*tip); + StatusCode sc = StatusCode::FAILURE; + if (type.IsFundamental()) { + sc = tool->putObject(auxDiscover.getData(*iter), type.SizeOf(), num); + } else { + size_t nbytes = 0; + void* buffer = m_serializeSvc->serialize(auxDiscover.getData(*iter), type, nbytes); + sc = tool->putObject(buffer, nbytes, num); + delete [] static_cast<char*>(buffer); buffer = nullptr; + } + if (!sc.isSuccess()) { + ATH_MSG_ERROR("Could not share object decoration for: " << contName); + return(StatusCode::FAILURE); + } + } + return(StatusCode::SUCCESS); +} diff --git a/Database/AthenaPOOL/AthenaPoolCnvSvc/src/AthenaPoolCnvSvc.h b/Database/AthenaPOOL/AthenaPoolCnvSvc/src/AthenaPoolCnvSvc.h index 52a3f84798b5b9c79d4aa674e676c3212a68ee79..372229ae9c5daa45d30b17f2be55a4e7901abb9c 100644 --- a/Database/AthenaPOOL/AthenaPoolCnvSvc/src/AthenaPoolCnvSvc.h +++ b/Database/AthenaPOOL/AthenaPoolCnvSvc/src/AthenaPoolCnvSvc.h @@ -28,6 +28,7 @@ class IAthenaSerializeSvc; class IChronoStatSvc; class IClassIDSvc; class IPoolSvc; +class Guid; template <class TYPE> class SvcFactory; @@ -182,6 +183,16 @@ private: // member functions bool doSet = true, bool doClear = true) const; + /// Receive dynamic aux store variables from streaming tool + StatusCode receiveStore(const IAthenaIPCTool* tool, void* obj, int num = 0) const; + + /// Send dynamic aux store variables to streaming tool + StatusCode sendStore(const IAthenaIPCTool* tool, + const void* obj, + const Guid& classId, + const std::string& contName, + int num = 0) const; + private: // data pool::DbType m_dbType; std::string m_outputConnectionSpec; diff --git a/Database/AthenaPOOL/AthenaPoolCnvSvc/src/AthenaPoolConverter.cxx b/Database/AthenaPOOL/AthenaPoolCnvSvc/src/AthenaPoolConverter.cxx index 1f0a759f56ab2f5bb629cfc0576fe90c7ea84400..69541c997b0f55f62727ae55748f354d7cb3769e 100644 --- a/Database/AthenaPOOL/AthenaPoolCnvSvc/src/AthenaPoolConverter.cxx +++ b/Database/AthenaPOOL/AthenaPoolCnvSvc/src/AthenaPoolConverter.cxx @@ -54,7 +54,6 @@ long AthenaPoolConverter::repSvcType() const { } //__________________________________________________________________________ StatusCode AthenaPoolConverter::createObj(IOpaqueAddress* pAddr, DataObject*& pObj) { - //std::lock_guard<CallMutex> lock(m_conv_mut); TokenAddress* tokAddr = dynamic_cast<TokenAddress*>(pAddr); if (tokAddr == nullptr || tokAddr->getToken() == nullptr) { if (m_i_poolToken == nullptr) m_i_poolToken = new Token; @@ -86,7 +85,6 @@ StatusCode AthenaPoolConverter::createObj(IOpaqueAddress* pAddr, DataObject*& pO } //__________________________________________________________________________ StatusCode AthenaPoolConverter::createRep(DataObject* pObj, IOpaqueAddress*& pAddr) { - //std::lock_guard<CallMutex> lock(m_conv_mut); // Create a Pool object for DataObject m_o_poolToken = nullptr; try { diff --git a/Database/AthenaPOOL/AthenaPoolCnvSvc/src/AuxDiscoverySvc.cxx b/Database/AthenaPOOL/AthenaPoolCnvSvc/src/AuxDiscoverySvc.cxx index 2b2c65777e68988fdfa25d738fddb9eaac527a32..d9ef15c7ffea780e85cc518c7638785580632393 100644 --- a/Database/AthenaPOOL/AthenaPoolCnvSvc/src/AuxDiscoverySvc.cxx +++ b/Database/AthenaPOOL/AthenaPoolCnvSvc/src/AuxDiscoverySvc.cxx @@ -16,7 +16,7 @@ #include "AthContainers/normalizedTypeinfoName.h" #include "AthContainersRoot/getDynamicAuxID.h" -#include "PersistentDataModel/Token.h" +#include "PersistentDataModel/Guid.h" #include "StorageSvc/DbTypeInfo.h" @@ -31,12 +31,12 @@ public: using SG::AuxStoreInternal::addVector; }; -bool AuxDiscoverySvc::getAuxStore(void* obj, const Token& token) { - pool::DbTypeInfo* info = pool::DbTypeInfo::create(token.classID()); // Needed for Properties and TClass +bool AuxDiscoverySvc::getAuxStore(void* obj, const Guid& classId, const std::string& contId) { + pool::DbTypeInfo* info = pool::DbTypeInfo::create(classId); // Needed for Properties and TClass if (info == nullptr) { return false; } - if ((token.contID().size() < 5 || token.contID().substr(token.contID().size() - 5, 4) != "Aux.") + if ((contId.size() < 5 || contId.substr(contId.size() - 5, 4) != "Aux.") && !info->clazz().Properties().HasProperty("IAuxStore")) { return false; } @@ -97,12 +97,12 @@ SG::auxid_t AuxDiscoverySvc::getAuxID(const std::string& attrName, const std::st return auxid; } -const SG::auxid_set_t& AuxDiscoverySvc::getAuxIDs(void* obj, const Token& token) { - pool::DbTypeInfo* info = pool::DbTypeInfo::create(token.classID()); // Needed for Properties and TClass +const SG::auxid_set_t& AuxDiscoverySvc::getAuxIDs(const void* obj, const Guid& classId, const std::string& contId) { + pool::DbTypeInfo* info = pool::DbTypeInfo::create(classId); // Needed for Properties and TClass if (info == nullptr) { return s_emptySet; } - if ((token.contID().size() < 5 || token.contID().substr(token.contID().size() - 5, 4) != "Aux.") + if ((contId.size() < 5 || contId.substr(contId.size() - 5, 4) != "Aux.") && !info->clazz().Properties().HasProperty("IAuxStore")) { return s_emptySet; } @@ -115,7 +115,7 @@ const SG::auxid_set_t& AuxDiscoverySvc::getAuxIDs(void* obj, const Token& token) if (storeTC == nullptr) { return s_emptySet; } - m_store = reinterpret_cast<SG::IAuxStoreIO*>((char*)obj + cl->GetBaseClassOffset(storeTC)); + m_store = reinterpret_cast<const SG::IAuxStoreIO*>((const char*)obj + cl->GetBaseClassOffset(storeTC)); if (m_store == nullptr) { return s_emptySet; } diff --git a/Database/AthenaPOOL/AthenaPoolCnvSvc/src/AuxDiscoverySvc.h b/Database/AthenaPOOL/AthenaPoolCnvSvc/src/AuxDiscoverySvc.h index 892d42202eb051c613b26e71545f73241ef6c198..5bd57591a99057649ae4fb192043084e4a279781 100644 --- a/Database/AthenaPOOL/AthenaPoolCnvSvc/src/AuxDiscoverySvc.h +++ b/Database/AthenaPOOL/AthenaPoolCnvSvc/src/AuxDiscoverySvc.h @@ -16,7 +16,7 @@ #include <string> // Forward declarations -class Token; +class Guid; class AthenaPoolAuxStore; namespace SG { class IAuxStoreIO; @@ -30,7 +30,7 @@ class AuxDiscoverySvc { public: AuxDiscoverySvc() : m_store(0), m_storeInt(0), m_storeHolder(0) {} - bool getAuxStore(void* obj, const Token& token); + bool getAuxStore(void* obj, const Guid& classId, const std::string& contId); bool setData(SG::auxid_t auxid, void* data, const RootType& type); @@ -38,7 +38,7 @@ public: SG::auxid_t getAuxID(const std::string& attrName, const std::string& elemName, const std::string& typeName); - const SG::auxid_set_t& getAuxIDs(void* obj, const Token& token); + const SG::auxid_set_t& getAuxIDs(const void* obj, const Guid& classId, const std::string& contId); const void* getData(SG::auxid_t auxid); @@ -51,7 +51,7 @@ public: std::string getElemName(SG::auxid_t auxid); private: // data - SG::IAuxStoreIO* m_store; + const SG::IAuxStoreIO* m_store; AthenaPoolAuxStore* m_storeInt; SG::IAuxStoreHolder* m_storeHolder; };