diff --git a/Control/AthenaIPCTools/src/AthenaSharedMemoryTool.cxx b/Control/AthenaIPCTools/src/AthenaSharedMemoryTool.cxx index f0104ed3333a0ba4d06591271d55b68813e07282..c8d9d8fe5d008bcf14044d1977c8d5f785131d90 100644 --- a/Control/AthenaIPCTools/src/AthenaSharedMemoryTool.cxx +++ b/Control/AthenaIPCTools/src/AthenaSharedMemoryTool.cxx @@ -68,12 +68,6 @@ StatusCode AthenaSharedMemoryTool::initialize() { std::ostringstream pidstr; pidstr << getpid(); m_sharedMemory.setValue(m_sharedMemory.value() + std::string("_") + pidstr.str()); - boost::interprocess::shared_memory_object::remove(m_sharedMemory.value().c_str()); - ATH_MSG_DEBUG("creating shared memory object with name \"" << m_sharedMemory.value() << "\""); - boost::interprocess::shared_memory_object shm(boost::interprocess::create_only, - m_sharedMemory.value().c_str(), - boost::interprocess::read_write); - shm.truncate(m_maxSize + m_maxDataClients * sizeof(ShareEventHeader)); return(StatusCode::SUCCESS); } @@ -109,7 +103,6 @@ StatusCode AthenaSharedMemoryTool::finalize() { //___________________________________________________________________________ StatusCode AthenaSharedMemoryTool::makeServer(int num) { - ATH_MSG_DEBUG("Creating shared memory object for writer."); if (m_isServer || m_isClient) { ATH_MSG_ERROR("Cannot make AthenaSharedMemoryTool a Server."); return(StatusCode::FAILURE); @@ -120,9 +113,11 @@ StatusCode AthenaSharedMemoryTool::makeServer(int num) { } m_num = num; m_isServer = true; - boost::interprocess::shared_memory_object shm(boost::interprocess::open_only, + ATH_MSG_DEBUG("Creating shared memory object with name \"" << m_sharedMemory.value() << "\""); + boost::interprocess::shared_memory_object shm(boost::interprocess::create_only, m_sharedMemory.value().c_str(), boost::interprocess::read_write); + shm.truncate(m_maxSize + m_maxDataClients * sizeof(ShareEventHeader)); m_payload = new boost::interprocess::mapped_region(shm, boost::interprocess::read_write, 0, m_maxSize); m_status = new boost::interprocess::mapped_region(shm, boost::interprocess::read_write, m_maxSize, num * sizeof(ShareEventHeader)); ShareEventHeader evtH = { ShareEventHeader::UNLOCKED, -1, -1, 0, 0, 0, 0, "" }; @@ -139,7 +134,6 @@ bool AthenaSharedMemoryTool::isServer() const { //___________________________________________________________________________ StatusCode AthenaSharedMemoryTool::makeClient(int num) { - ATH_MSG_DEBUG("Creating shared memory object for Client."); if (m_isServer) { ATH_MSG_ERROR("Cannot make AthenaSharedMemoryTool a Client."); return(StatusCode::FAILURE); @@ -148,7 +142,8 @@ StatusCode AthenaSharedMemoryTool::makeClient(int num) { ATH_MSG_ERROR("Too many clients for AthenaSharedMemoryTool."); return(StatusCode::FAILURE); } - if (m_num > 0 && num <= 0) { + if (m_num > 0 && num <= 0) {// stop running client + ATH_MSG_DEBUG("Stop AthenaSharedMemoryTool Client."); m_num = -1; while (lockObject("stop").isRecoverable()) { usleep(100); @@ -157,15 +152,21 @@ StatusCode AthenaSharedMemoryTool::makeClient(int num) { while (evtH->evtProcessStatus != ShareEventHeader::UNLOCKED) { usleep(100); } + delete m_payload ; m_payload = nullptr; + delete m_status ; m_status = nullptr; + m_isClient = false; + return(StatusCode::SUCCESS); } - if (!m_isClient) { - m_isClient = true; - boost::interprocess::shared_memory_object shm(boost::interprocess::open_only, - m_sharedMemory.value().c_str(), - boost::interprocess::read_write); - if (m_payload == nullptr) { + while (!m_isClient) { + try { // Check whether Server created shared memory object + boost::interprocess::shared_memory_object shm(boost::interprocess::open_only, + m_sharedMemory.value().c_str(), + boost::interprocess::read_write); + m_isClient = true; m_payload = new boost::interprocess::mapped_region(shm, boost::interprocess::read_write, 0, m_maxSize); m_status = new boost::interprocess::mapped_region(shm, boost::interprocess::read_write, m_maxSize + num * sizeof(ShareEventHeader), sizeof(ShareEventHeader)); + } catch (boost::interprocess::interprocess_exception e) { + usleep(100000); } } if (m_num <= 0 && num > 0) { diff --git a/Database/AthenaPOOL/AthenaPoolCnvSvc/src/AthenaPoolCnvSvc.cxx b/Database/AthenaPOOL/AthenaPoolCnvSvc/src/AthenaPoolCnvSvc.cxx index 74fcd060642ce0e6b7f1eefaa1fcb5110fa3c09e..57327c0bbbf2728143d202c84703daeadab8ceed 100644 --- a/Database/AthenaPOOL/AthenaPoolCnvSvc/src/AthenaPoolCnvSvc.cxx +++ b/Database/AthenaPOOL/AthenaPoolCnvSvc/src/AthenaPoolCnvSvc.cxx @@ -386,12 +386,12 @@ StatusCode AthenaPoolCnvSvc::commitOutput(const std::string& outputConnectionSpe std::string fileName; if (!m_outputStreamingTool.empty() && m_streamServer < m_outputStreamingTool.size() && m_outputStreamingTool[m_streamServer]->isServer()) { - auto streamingTool = m_outputStreamingTool[m_streamServer]; + auto& streamingTool = m_outputStreamingTool[m_streamServer]; // Clear object to get Placements for all objects in a Stream char* placementStr = nullptr; int num = -1; StatusCode sc = streamingTool->clearObject(&placementStr, num); - if (sc.isSuccess() && placementStr != nullptr && strlen(placementStr) > 0 && num > 0) { + if (sc.isSuccess() && placementStr != nullptr && strlen(placementStr) > 6 && num > 0) { fileName = strstr(placementStr, "[FILE="); fileName = fileName.substr(6, fileName.find(']') - 6); if (!this->connectOutput(fileName).isSuccess()) { @@ -458,7 +458,6 @@ StatusCode AthenaPoolCnvSvc::commitOutput(const std::string& outputConnectionSpe return abortSharedWrClients(num); } tokenStr = token->toString(); - if (className == "DataHeader_p6") { // Found DataHeader GenericAddress address(POOL_StorageType, ClassID_traits<DataHeader>::ID(), @@ -498,6 +497,9 @@ StatusCode AthenaPoolCnvSvc::commitOutput(const std::string& outputConnectionSpe } // Send Token back to Client sc = streamingTool->lockObject(tokenStr.c_str(), num); + while (sc.isRecoverable()) { + sc = streamingTool->lockObject(tokenStr.c_str(), num); + } if (!sc.isSuccess()) { ATH_MSG_ERROR("Failed to lock Data for " << tokenStr); return abortSharedWrClients(-1); @@ -523,6 +525,8 @@ StatusCode AthenaPoolCnvSvc::commitOutput(const std::string& outputConnectionSpe } } placementStr = nullptr; + } else if (sc.isSuccess() && placementStr != nullptr && strncmp(placementStr, "stop", 4) == 0) { + return(StatusCode::RECOVERABLE); } else if (sc.isRecoverable() || num == -1) { return(StatusCode::RECOVERABLE); }