Skip to content
Snippets Groups Projects
Commit 296e1e34 authored by Walter Lampl's avatar Walter Lampl
Browse files

Merge branch 'AthenaSharedIO_Improvements' into 'master'

Some improvements to SharedIO efficiency of dynamic aux store.

See merge request atlas/athena!39407
parents eb617635 451459ec
No related branches found
No related tags found
No related merge requests found
...@@ -356,7 +356,7 @@ StatusCode AthenaSharedMemoryTool::getObject(void** target, size_t& nbytes, int ...@@ -356,7 +356,7 @@ StatusCode AthenaSharedMemoryTool::getObject(void** target, size_t& nbytes, int
} }
//___________________________________________________________________________ //___________________________________________________________________________
StatusCode AthenaSharedMemoryTool::clearObject(char** tokenString, int& num) const { StatusCode AthenaSharedMemoryTool::clearObject(const char** tokenString, int& num) const {
if (m_isClient) { if (m_isClient) {
ShareEventHeader* evtH = static_cast<ShareEventHeader*>(m_status->get_address()); ShareEventHeader* evtH = static_cast<ShareEventHeader*>(m_status->get_address());
if (evtH->evtProcessStatus != ShareEventHeader::CLEARED) { if (evtH->evtProcessStatus != ShareEventHeader::CLEARED) {
......
...@@ -53,7 +53,7 @@ public: ...@@ -53,7 +53,7 @@ public:
StatusCode putObject(const void* source, size_t nbytes, int num = 0) const; StatusCode putObject(const void* source, size_t nbytes, int num = 0) const;
StatusCode getObject(void** target, size_t& nbytes, int num = 0) const; StatusCode getObject(void** target, size_t& nbytes, int num = 0) const;
StatusCode clearObject(char** tokenString, int& num) const; StatusCode clearObject(const char** tokenString, int& num) const;
StatusCode lockObject(const char* tokenString, int num = 0) const; StatusCode lockObject(const char* tokenString, int num = 0) const;
private: private:
......
...@@ -235,7 +235,7 @@ StatusCode AthenaYamplTool::getObject(void**, size_t&, int) const { ...@@ -235,7 +235,7 @@ StatusCode AthenaYamplTool::getObject(void**, size_t&, int) const {
} }
//___________________________________________________________________________ //___________________________________________________________________________
StatusCode AthenaYamplTool::clearObject(char**, int&) const { StatusCode AthenaYamplTool::clearObject(const char**, int&) const {
return(StatusCode::FAILURE); return(StatusCode::FAILURE);
} }
......
...@@ -43,7 +43,7 @@ public: ...@@ -43,7 +43,7 @@ public:
StatusCode putObject(const void* source, size_t nbytes, int num = 0) const; StatusCode putObject(const void* source, size_t nbytes, int num = 0) const;
StatusCode getObject(void** target, size_t& nbytes, int num = 0) const; StatusCode getObject(void** target, size_t& nbytes, int num = 0) const;
StatusCode clearObject(char** tokenString, int& num) const; StatusCode clearObject(const char** tokenString, int& num) const;
StatusCode lockObject(const char* tokenString, int num = 0) const; StatusCode lockObject(const char* tokenString, int num = 0) const;
private: private:
......
...@@ -24,7 +24,7 @@ public: ...@@ -24,7 +24,7 @@ public:
virtual StatusCode putObject(const void* source, size_t nbytes, int num = 0) const = 0; virtual StatusCode putObject(const void* source, size_t nbytes, int num = 0) const = 0;
virtual StatusCode getObject(void** target, size_t& nbytes, int num = 0) const = 0; virtual StatusCode getObject(void** target, size_t& nbytes, int num = 0) const = 0;
virtual StatusCode clearObject(char** tokenString, int& num) const = 0; virtual StatusCode clearObject(const char** tokenString, int& num) const = 0;
virtual StatusCode lockObject(const char* tokenString, int num = 0) const = 0; virtual StatusCode lockObject(const char* tokenString, int num = 0) const = 0;
}; };
......
...@@ -424,7 +424,7 @@ StatusCode AthenaPoolCnvSvc::commitOutput(const std::string& outputConnectionSpe ...@@ -424,7 +424,7 @@ StatusCode AthenaPoolCnvSvc::commitOutput(const std::string& outputConnectionSpe
&& m_outputStreamingTool[m_streamServer]->isServer()) { && m_outputStreamingTool[m_streamServer]->isServer()) {
auto& streamingTool = m_outputStreamingTool[m_streamServer]; auto& streamingTool = m_outputStreamingTool[m_streamServer];
// Clear object to get Placements for all objects in a Stream // Clear object to get Placements for all objects in a Stream
char* placementStr = nullptr; const char* placementStr = nullptr;
int num = -1; int num = -1;
StatusCode sc = streamingTool->clearObject(&placementStr, num); StatusCode sc = streamingTool->clearObject(&placementStr, num);
if (sc.isSuccess() && placementStr != nullptr && strlen(placementStr) > 6 && num > 0) { if (sc.isSuccess() && placementStr != nullptr && strlen(placementStr) > 6 && num > 0) {
...@@ -778,7 +778,7 @@ Token* AthenaPoolCnvSvc::registerForWrite(Placement* placement, const void* obj, ...@@ -778,7 +778,7 @@ Token* AthenaPoolCnvSvc::registerForWrite(Placement* placement, const void* obj,
return(nullptr); return(nullptr);
} }
// Get Token back from Server // Get Token back from Server
char* tokenStr = nullptr; const char* tokenStr = nullptr;
int num = -1; int num = -1;
sc = m_outputStreamingTool[streamClient]->clearObject(&tokenStr, num); sc = m_outputStreamingTool[streamClient]->clearObject(&tokenStr, num);
while (sc.isRecoverable()) { while (sc.isRecoverable()) {
...@@ -893,7 +893,7 @@ void AthenaPoolCnvSvc::setObjPtr(void*& obj, const Token* token) { ...@@ -893,7 +893,7 @@ void AthenaPoolCnvSvc::setObjPtr(void*& obj, const Token* token) {
} }
} }
if (!m_inputStreamingTool.empty() && m_inputStreamingTool->isClient()) { if (!m_inputStreamingTool.empty() && m_inputStreamingTool->isClient()) {
ATH_MSG_VERBOSE("Requesting object for: " << token->toString()); ATH_MSG_VERBOSE("Requesting remote object for: " << token->toString());
if (!m_inputStreamingTool->lockObject(token->toString().c_str()).isSuccess()) { if (!m_inputStreamingTool->lockObject(token->toString().c_str()).isSuccess()) {
ATH_MSG_ERROR("Failed to lock Data for " << token->toString()); ATH_MSG_ERROR("Failed to lock Data for " << token->toString());
obj = nullptr; obj = nullptr;
...@@ -979,7 +979,7 @@ StatusCode AthenaPoolCnvSvc::createAddress(long svcType, ...@@ -979,7 +979,7 @@ StatusCode AthenaPoolCnvSvc::createAddress(long svcType,
return(StatusCode::FAILURE); return(StatusCode::FAILURE);
} }
token = new Token(); token = new Token();
token->fromString(static_cast<char*>(buffer)); buffer = nullptr; token->fromString(static_cast<const char*>(buffer)); buffer = nullptr;
if (token->classID() == Guid::null()) { if (token->classID() == Guid::null()) {
delete token; token = nullptr; delete token; token = nullptr;
} }
...@@ -1118,7 +1118,7 @@ StatusCode AthenaPoolCnvSvc::readData() { ...@@ -1118,7 +1118,7 @@ StatusCode AthenaPoolCnvSvc::readData() {
if (m_inputStreamingTool.empty()) { if (m_inputStreamingTool.empty()) {
return(StatusCode::FAILURE); return(StatusCode::FAILURE);
} }
char* tokenStr = nullptr; const char* tokenStr = nullptr;
int num = -1; int num = -1;
StatusCode sc = m_inputStreamingTool->clearObject(&tokenStr, num); StatusCode sc = m_inputStreamingTool->clearObject(&tokenStr, num);
if (sc.isSuccess() && tokenStr != nullptr && strlen(tokenStr) > 0 && num > 0) { if (sc.isSuccess() && tokenStr != nullptr && strlen(tokenStr) > 0 && num > 0) {
...@@ -1200,7 +1200,7 @@ StatusCode AthenaPoolCnvSvc::abortSharedWrClients(int client_n) ...@@ -1200,7 +1200,7 @@ StatusCode AthenaPoolCnvSvc::abortSharedWrClients(int client_n)
if (client_n >= 0) { if (client_n >= 0) {
sc = streamingTool->lockObject("ABORT", client_n); sc = streamingTool->lockObject("ABORT", client_n);
} }
char* dummy; const char* dummy;
sc = streamingTool->clearObject(&dummy, client_n); sc = streamingTool->clearObject(&dummy, client_n);
while (sc.isRecoverable()) { while (sc.isRecoverable()) {
sc = streamingTool->clearObject(&dummy, client_n); sc = streamingTool->clearObject(&dummy, client_n);
......
...@@ -166,25 +166,25 @@ StatusCode AuxDiscoverySvc::receiveStore(const IAthenaSerializeSvc* serSvc, cons ...@@ -166,25 +166,25 @@ StatusCode AuxDiscoverySvc::receiveStore(const IAthenaSerializeSvc* serSvc, cons
return(StatusCode::SUCCESS); return(StatusCode::SUCCESS);
} }
Guid classId; Guid classId;
classId.fromString(static_cast<char*>(buffer)); classId.fromString(static_cast<const char*>(buffer));
if (!ipcTool->getObject(&buffer, nbytes, num).isSuccess() || nbytes == 0) { if (!ipcTool->getObject(&buffer, nbytes, num).isSuccess() || nbytes == 0) {
return(StatusCode::FAILURE); return(StatusCode::FAILURE);
} }
const std::string contName = std::string(static_cast<char*>(buffer)); const std::string contName = std::string(static_cast<const char*>(buffer));
if (classId != Guid::null() && this->getAuxStore(obj, classId, contName)) { if (classId != Guid::null() && this->getAuxStore(obj, classId, contName)) {
void* attrName = nullptr; void* nameData = nullptr;
void* typeName = nullptr;
void* elemName = nullptr;
// StreamingTool owns buffer, will stay around until last dynamic attribute is copied // StreamingTool owns buffer, will stay around until last dynamic attribute is copied
while (ipcTool->getObject(&attrName, nbytes, num).isSuccess() && nbytes > 0 && while (ipcTool->getObject(&nameData, nbytes, num).isSuccess() && nbytes > 0) {
ipcTool->getObject(&typeName, nbytes, num).isSuccess() && nbytes > 0 && const char* del1 = static_cast<const char*>(memchr(nameData, '\n', nbytes));
ipcTool->getObject(&elemName, nbytes, num).isSuccess() && nbytes > 0) { const char* del2 = static_cast<const char*>(memchr(del1 + 1, '\n', nbytes - (del1 - static_cast<const char*>(nameData) - 1)));
const std::string dataStr(static_cast<const char*>(nameData));
const std::string& attrName = dataStr.substr(0, del1 - static_cast<const char*>(nameData));
const std::string& typeName = dataStr.substr(del1 - static_cast<const char*>(nameData) + 1, del2 - del1 - 1);
const std::string& elemName = dataStr.substr(del2 - static_cast<const char*>(nameData) + 1);
if (ipcTool->getObject(&buffer, nbytes, num).isSuccess()) { if (ipcTool->getObject(&buffer, nbytes, num).isSuccess()) {
SG::auxid_t auxid = this->getAuxID(static_cast<char*>(attrName), SG::auxid_t auxid = this->getAuxID(attrName, elemName, typeName);
static_cast<char*>(elemName),
static_cast<char*>(typeName));
if (auxid != SG::null_auxid) { if (auxid != SG::null_auxid) {
const RootType type(std::string(static_cast<char*>(typeName))); const RootType type(typeName);
void* dynAttr = nullptr; void* dynAttr = nullptr;
if (type.IsFundamental()) { if (type.IsFundamental()) {
dynAttr = new char[nbytes]; dynAttr = new char[nbytes];
...@@ -218,12 +218,8 @@ StatusCode AuxDiscoverySvc::sendStore(const IAthenaSerializeSvc* serSvc, ...@@ -218,12 +218,8 @@ StatusCode AuxDiscoverySvc::sendStore(const IAthenaSerializeSvc* serSvc,
} }
} }
for (SG::auxid_set_t::const_iterator iter = auxIDs.begin(), last = auxIDs.end(); iter != last; iter++) { for (SG::auxid_set_t::const_iterator iter = auxIDs.begin(), last = auxIDs.end(); iter != last; iter++) {
const std::string& attrName = this->getAttrName(*iter); const std::string& dataStr = this->getAttrName(*iter) + "\n" + this->getTypeName(*iter) + "\n" + this->getElemName(*iter);
const std::string& typeName = this->getTypeName(*iter); if (!ipcTool->putObject(dataStr.c_str(), dataStr.size() + 1, num).isSuccess()) {
const std::string& elemName = this->getElemName(*iter);
if (!ipcTool->putObject(attrName.c_str(), attrName.size() + 1, num).isSuccess() ||
!ipcTool->putObject(typeName.c_str(), typeName.size() + 1, num).isSuccess() ||
!ipcTool->putObject(elemName.c_str(), elemName.size() + 1, num).isSuccess()) {
return(StatusCode::FAILURE); return(StatusCode::FAILURE);
} }
const std::type_info* tip = this->getType(*iter); const std::type_info* tip = this->getType(*iter);
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment