diff --git a/Offloading/OffloadSvc/CMakeLists.txt b/Offloading/OffloadSvc/CMakeLists.txt new file mode 100644 index 0000000000000000000000000000000000000000..00774c7ddb92aaec7628ddc80390aae4d7cf94b8 --- /dev/null +++ b/Offloading/OffloadSvc/CMakeLists.txt @@ -0,0 +1,30 @@ +################################################################################ +# Package: OffloadSvc +################################################################################ + +# Declare the package name: +atlas_subdir( OffloadSvc ) + +# Declare the package's dependencies: +atlas_depends_on_subdirs( PUBLIC + Control/AthenaBaseComps + GaudiKernel ) + +# External dependencies: +find_package( APE ) +find_package( yampl ) + +# this line failed automatic conversion in cmt2cmake : +# action checkreq "echo 'skipping checkreq'" + +# Component(s) in the package: +atlas_add_component( OffloadSvc + src/OffloadSvc.cxx + src/IOffloadSvc.cxx + src/components/*.cxx + INCLUDE_DIRS ${YAMPL_INCLUDE_DIRS} ${APE_INCLUDE_DIRS} + LINK_LIBRARIES ${YAMPL_LIBRARIES} ${APE_LIBRARIES} AthenaBaseComps GaudiKernel ) + +# Install files from the package: +atlas_install_headers( OffloadSvc ) + diff --git a/Offloading/OffloadSvc/OffloadSvc/IOffloadSvc.h b/Offloading/OffloadSvc/OffloadSvc/IOffloadSvc.h index 8e721c402b9d77769e6f943242a7dbba30905ec6..cd3184091a14da1b2cf82efa4752902bceafda3e 100644 --- a/Offloading/OffloadSvc/OffloadSvc/IOffloadSvc.h +++ b/Offloading/OffloadSvc/OffloadSvc/IOffloadSvc.h @@ -47,7 +47,7 @@ class IOffloadSvc : virtual public IService /// Delivers important informations //virtual StatusCode qotd( std::string& quote ) = 0; - virtual StatusCode sendData(std::unique_ptr<APE::BufferContainer> &buff, int &token)=0; + virtual StatusCode sendData(std::unique_ptr<APE::BufferContainer> &buff, int &token, bool requiresResponse=true)=0; virtual StatusCode receiveData(std::unique_ptr<APE::BufferContainer> &buff, int token, int timeOut=-1)=0; /// identifier for the framework static const InterfaceID& interfaceID(); diff --git a/Offloading/OffloadSvc/OffloadSvc/OffloadSvc.h b/Offloading/OffloadSvc/OffloadSvc/OffloadSvc.h index a9458a1c4cbf5d2bd83cc91188a34631fa6edb5a..78ce6ca3beb51dc3f92fa698c3515d67cfffa7a2 100644 --- a/Offloading/OffloadSvc/OffloadSvc/OffloadSvc.h +++ b/Offloading/OffloadSvc/OffloadSvc/OffloadSvc.h @@ -19,10 +19,12 @@ #include <chrono> #include <mutex> #include <condition_variable> +#include <memory> // FrameWork includes #include "AthenaBaseComps/AthService.h" - +#include "GaudiKernel/IIncidentListener.h" +#include "GaudiKernel/ServiceHandle.h" // OffloadManagerSvc includes #include "OffloadSvc/IOffloadSvc.h" #include "yampl/SocketFactory.h" @@ -33,8 +35,9 @@ template <class TT> class SvcFactory; namespace APE{ class BufferContainer; } +class IIncidentSvc; -class OffloadSvc : virtual public IOffloadSvc, public AthService{ +class OffloadSvc : virtual public IOffloadSvc, public AthService,public virtual IIncidentListener{ protected: friend class SvcFactory<OffloadSvc>; @@ -65,11 +68,9 @@ public: /////////////////////////////////////////////////////////////////// static const InterfaceID& interfaceID(); - // /** The very important message of the day - // */ - // StatusCode qotd( std::string& quote ); - virtual StatusCode sendData(std::unique_ptr<APE::BufferContainer> &buff, int &token); + virtual StatusCode sendData(std::unique_ptr<APE::BufferContainer> &buff, int &token,bool requiresResponse=true); virtual StatusCode receiveData(std::unique_ptr<APE::BufferContainer> &buff, int token, int timeOut=-1); + virtual void handle(const Incident &); /////////////////////////////////////////////////////////////////// // Private methods: /////////////////////////////////////////////////////////////////// @@ -77,7 +78,8 @@ private: /// Default constructor: OffloadSvc(); - + bool openCommChannel(bool postFork=false); + bool closeCommChannel(bool preFork=false); /////////////////////////////////////////////////////////////////// // Private data: /////////////////////////////////////////////////////////////////// @@ -91,15 +93,18 @@ private: size_t uploadSize; size_t downloadSize; }; - - std::string m_connName; + std::string m_connType; + std::string m_commChannelSend; + std::string m_commChannelRecv; + bool m_useUID; bool m_isConnected; std::map<int,OffloadSvc::TransferStats> m_stats; - yampl::ISocket *m_mySocket,*m_downSock; + std::shared_ptr<yampl::ISocket> m_sendSock,m_recvSock; std::queue<int> m_tokens; int m_maxTokens; std::condition_variable m_tCond; std::mutex m_cMutex; + }; /// I/O operators diff --git a/Offloading/OffloadSvc/src/OffloadSvc.cxx b/Offloading/OffloadSvc/src/OffloadSvc.cxx index 8ac02c7162dbf16297622ebf7c8d656ce08c6331..d3049263ca6230b8b56375db768d77e671314c4f 100644 --- a/Offloading/OffloadSvc/src/OffloadSvc.cxx +++ b/Offloading/OffloadSvc/src/OffloadSvc.cxx @@ -12,11 +12,13 @@ // FrameWork includes #include "GaudiKernel/Property.h" #include "GaudiKernel/SvcFactory.h" +#include "GaudiKernel/IIncidentSvc.h" // OffloadManagerSvc includes #include "OffloadSvc/OffloadSvc.h" #include "APE/BufferContainer.hpp" #include "APE/BufferAccessor.hpp" +#include "APE/ServerDefines.hpp" #include <sys/types.h> #include <unistd.h> #include <thread> @@ -26,11 +28,18 @@ //////////////// OffloadSvc::OffloadSvc( const std::string& name, ISvcLocator* pSvcLocator ) : AthService ( name, pSvcLocator ){ - declareProperty( "ConnectionName", - m_connName = "apeSock", "Name of the APE socket"); + declareProperty( "ConnectionType", + m_connType = "SHM", "Connection type [SHM,PIPE,ZMQ]"); + declareProperty( "CommChannelSend", + m_commChannelSend = "apeSock", "Name of the APE socket for sending request"); + declareProperty( "CommChannelRecv", + m_commChannelRecv = "apeSock_upload", "Name of the APE socket for receiving results"); + declareProperty( "CommChannelUserUID", + m_useUID=true, "Whether to add uid to comm channels"); + m_isConnected=false; - m_mySocket=0; - m_downSock=0; + m_sendSock=0; + m_recvSock=0; m_maxTokens=0; //std::cout<<__PRETTY_FUNCTION__<<"Constructing Service SAMI"<<std::endl; @@ -41,9 +50,15 @@ OffloadSvc::OffloadSvc( const std::string& name, ISvcLocator* pSvcLocator ) : OffloadSvc::~OffloadSvc(){ } -StatusCode OffloadSvc::sendData(std::unique_ptr<APE::BufferContainer> &buff, int &token){ - //std::cout<<__PRETTY_FUNCTION__<<"Received SendData SAMI"<<std::endl; - // MsgStream athenaLog(msgSvc(), name()); +StatusCode OffloadSvc::sendData(std::unique_ptr<APE::BufferContainer> &buff, int &token,bool requiresResponse){ + if(!m_isConnected){ + msg(MSG::ERROR)<<"APE Server communication was not open. Initiating communication"<<endreq; + bool rc=openCommChannel(); + if(!rc){ + msg(MSG::FATAL)<<"APE Server communication Failed"<<endreq; + return StatusCode::FAILURE; + } + } if(!buff){ msg(MSG::ERROR)<<"BufferContainer is 0 "<<endreq; return StatusCode::FAILURE; @@ -52,7 +67,6 @@ StatusCode OffloadSvc::sendData(std::unique_ptr<APE::BufferContainer> &buff, int msg(MSG::ERROR)<<"Accelerator Process Extension connection is not open!"<<endreq; return StatusCode::FAILURE; } - //msg(MSG::DEBUG)<<"Received sendData request with algorithm ="<<buff->getAlgorithm()<<" and payloadsize="<<buff->getPayloadSize()<<" totalSize="<<buff->getTransferSize()); if(m_tokens.size()){ token=m_tokens.front(); m_tokens.pop(); @@ -69,38 +83,50 @@ StatusCode OffloadSvc::sendData(std::unique_ptr<APE::BufferContainer> &buff, int <<" TransferSize="<<buff->getTransferSize() <<" userBuffer="<<buff->getBuffer() <<endreq; - TransferStats ts; - - ts.sendStart=std::chrono::system_clock::now(); - m_mySocket->send(APE::BufferAccessor::getRawBuffer(*buff),buff->getTransferSize()); - ts.sendEnd=std::chrono::system_clock::now(); - ts.uploadSize=buff->getTransferSize(); - ts.downloadSize = 0; - m_stats[token]=ts; - return StatusCode::SUCCESS; + if(requiresResponse){// will get reply + TransferStats ts; + ts.sendStart=std::chrono::system_clock::now(); + m_sendSock->send(APE::BufferAccessor::getRawBuffer(*buff),buff->getTransferSize()); + ts.sendEnd=std::chrono::system_clock::now(); + ts.uploadSize=buff->getTransferSize(); + ts.downloadSize = 0; + m_stats[token]=ts; + }else{// doesn't expect reply + m_sendSock->send(APE::BufferAccessor::getRawBuffer(*buff),buff->getTransferSize()); + m_tokens.push(token); + } + return StatusCode::SUCCESS; } StatusCode OffloadSvc::receiveData(std::unique_ptr<APE::BufferContainer> &buff, int token, int timeOut){ + if(!m_isConnected){ + msg(MSG::ERROR)<<"APE Server communication was not open. Initiating communication"<<endreq; + bool rc=openCommChannel(); + if(!rc){ + msg(MSG::FATAL)<<"APE Server communication Failed"<<endreq; + return StatusCode::FAILURE; + } + } ssize_t recvdSize=0; //MsgStream athenaLog(msgSvc(), name()); void* rawBuffer=APE::BufferAccessor::getRawBuffer(*buff); if(timeOut>0){ ATH_MSG_WARNING("Timeout parameter might not be implemented yet"); - recvdSize=m_downSock->tryRecv(rawBuffer,APE::BufferAccessor::getRawBufferSize(*buff),timeOut); + recvdSize=m_recvSock->tryRecv(rawBuffer,APE::BufferAccessor::getRawBufferSize(*buff),timeOut); if(recvdSize < static_cast<ssize_t>(sizeof(APE::APEHeaders))){ ATH_MSG_WARNING("Received possibly corrupt data. Trying again"); - recvdSize=m_downSock->tryRecv(rawBuffer,APE::BufferAccessor::getRawBufferSize(*buff),timeOut); + recvdSize=m_recvSock->tryRecv(rawBuffer,APE::BufferAccessor::getRawBufferSize(*buff),timeOut); } if(recvdSize < static_cast<ssize_t>(sizeof(APE::APEHeaders))){ ATH_MSG_ERROR("Received corrupt data on both trials"); return StatusCode::FAILURE; } }else{ - recvdSize=m_downSock->recv(rawBuffer,APE::BufferAccessor::getRawBufferSize(*buff)); + recvdSize=m_recvSock->recv(rawBuffer,APE::BufferAccessor::getRawBufferSize(*buff)); if(recvdSize < static_cast<ssize_t>(sizeof(APE::APEHeaders))){ ATH_MSG_WARNING("Received possibly corrupt data. Trying again"); - recvdSize=m_downSock->recv(rawBuffer,APE::BufferAccessor::getRawBufferSize(*buff)); + recvdSize=m_recvSock->recv(rawBuffer,APE::BufferAccessor::getRawBufferSize(*buff)); } if(recvdSize < static_cast<ssize_t>(sizeof(APE::APEHeaders))){ ATH_MSG_ERROR("Received corrupt data on both trials"); @@ -155,71 +181,21 @@ StatusCode OffloadSvc::receiveData(std::unique_ptr<APE::BufferContainer> &buff, StatusCode OffloadSvc::initialize() { ATH_MSG_INFO ("Initializing " << name() << "..."); - try{ - char buff[1000]; - uid_t userUID=geteuid(); - snprintf(buff,1000,"%s_%u",m_connName.c_str(),userUID); - ATH_MSG_INFO ("Upload connection Name=: [" << buff << "]"); - yampl::Channel apeChannel(buff,yampl::LOCAL_SHM); - snprintf(buff,1000,"%s_%u_upload",m_connName.c_str(),userUID); - ATH_MSG_INFO ("download connection Name=: [" << buff << "]"); - yampl::Channel downChannel(buff,yampl::LOCAL_SHM); - yampl::SocketFactory sf; - m_mySocket=sf.createClientSocket(apeChannel); - m_downSock=sf.createClientSocket(downChannel); - std::thread *t=new std::thread([&,this](){ - APE::BufferContainer b(sizeof(int)); - b.setModule(0xffffffff);//Openning connection; - b.setAlgorithm(0xefffffff);//Openning connection; - *(int*)b.getBuffer()=(int)getpid(); - //ATH_MSG_INFO("Trying to open connections"); - this->m_mySocket->send(APE::BufferAccessor::getRawBuffer(b),b.getTransferSize()); - this->m_downSock->send(APE::BufferAccessor::getRawBuffer(b),b.getTransferSize()); - this->m_isConnected=true; - this->m_tCond.notify_all(); - //ATH_MSG_INFO("Connections opened"); - } - ); - std::unique_lock<std::mutex> lock(m_cMutex); - ATH_MSG_INFO("Waiting for connections"); - if(m_tCond.wait_for(lock,std::chrono::milliseconds(3000))==std::cv_status::timeout){ - ATH_MSG_ERROR("Error while opening connection to APE! Timeout while trying to open the connection"); - t->detach(); - delete t; - return StatusCode::FAILURE; - }else{ - ATH_MSG_INFO ("Connection successfully opened"); - if(m_isConnected){ - t->join(); - delete t; - } - return StatusCode::SUCCESS; - } - }catch(std::exception &ex){ - ATH_MSG_ERROR("Error while opening connection to APE! "<<ex.what()); - m_isConnected=false; - return StatusCode::FAILURE; + IIncidentSvc* p_incSvc; + auto sc=service("IncidentSvc",p_incSvc); + if(sc!=StatusCode::SUCCESS){ + return sc; } - ATH_MSG_INFO ("Connection successfully opened"); - return StatusCode::SUCCESS; + p_incSvc->addListener(this,"PostFork",-1l); + p_incSvc->addListener(this,"BeforeFork",-100l); + return sc; } StatusCode OffloadSvc::finalize() { ATH_MSG_INFO ("Finalizing " << name() << "..."); - if(m_isConnected){ - ATH_MSG_INFO ("closing connection to APE server"); - APE::BufferContainer b(sizeof(int)); - b.setModule(0xffffffff);//Openning connection; - b.setAlgorithm(0xffffffff);//Closing connection; - *(int*)b.getBuffer()=(int)getpid(); - m_mySocket->send(APE::BufferAccessor::getRawBuffer(b),b.getTransferSize()); - delete m_mySocket; - delete m_downSock; - m_mySocket=0; - m_downSock=0; - } - return StatusCode::SUCCESS; + bool rv=closeCommChannel(); + return (rv?StatusCode::SUCCESS:StatusCode::FAILURE); } // Query the interfaces. @@ -239,3 +215,109 @@ OffloadSvc::queryInterface(const InterfaceID& riid, void** ppvInterface) { return StatusCode::SUCCESS; } +void OffloadSvc::handle(const Incident& inc){ + ATH_MSG_INFO("Got incident "<<inc.type()<<" from "<<inc.source()); + if(inc.type()=="BeginRun"){ + + }else if(inc.type()=="BeforeFork"){ + if(m_isConnected){ + closeCommChannel(true); + } + std::this_thread::sleep_for(std::chrono::seconds(1)); + }else if(inc.type()=="PostFork"){ + if(!m_isConnected){ + openCommChannel(true); + } + } +} + +bool OffloadSvc::openCommChannel(bool postFork){ + if(!m_isConnected){ + try{ + char buff[1000]; + uid_t userUID=geteuid(); + auto chanType=yampl::LOCAL_SHM; + if(m_connType=="PIPE"){ + chanType=yampl::LOCAL_PIPE; + }else if(m_connType=="ZMQ"){ + chanType=yampl::DISTRIBUTED; + } + if(m_commChannelSend.empty())return false; + if(m_commChannelRecv.empty())return false; + + if(chanType!=yampl::DISTRIBUTED && m_useUID){ + snprintf(buff,1000,"%s_%u",m_commChannelSend.c_str(),userUID); + }else{ + snprintf(buff,1000,"%s",m_commChannelSend.c_str()); + } + ATH_MSG_INFO("Send channel = "<<buff); + yampl::Channel channelSend(buff,chanType); + if(chanType!=yampl::DISTRIBUTED && m_useUID){ + snprintf(buff,1000,"%s_%u",m_commChannelRecv.c_str(),userUID); + }else{ + snprintf(buff,1000,"%s",m_commChannelRecv.c_str()); + } + ATH_MSG_INFO("Receive channel = "<<buff); + yampl::Channel channelRecv(buff,chanType); + yampl::SocketFactory factory; + m_sendSock.reset(factory.createClientSocket(channelSend)); + m_recvSock.reset(factory.createClientSocket(channelRecv)); + std::thread *t=new std::thread([&,this](){ + APE::BufferContainer b(sizeof(int)); + b.setModule(SERVER_MODULE);//Openning connection; + if(postFork){ + b.setAlgorithm(ALG_OPENING_AFTER_FORKING);//Openning connection; + }else{ + b.setAlgorithm(ALG_OPENING_CONNECTION);//Openning connection; + } + *(int*)b.getBuffer()=(int)getpid(); + //ATH_MSG_INFO("Trying to open connections"); + this->m_sendSock->send(APE::BufferAccessor::getRawBuffer(b),b.getTransferSize()); + this->m_recvSock->send(APE::BufferAccessor::getRawBuffer(b),b.getTransferSize()); + this->m_isConnected=true; + this->m_tCond.notify_all(); + //ATH_MSG_INFO("Connections opened"); + } + ); + std::unique_lock<std::mutex> lock(m_cMutex); + ATH_MSG_INFO("Waiting for connections"); + if(m_tCond.wait_for(lock,std::chrono::milliseconds(3000))==std::cv_status::timeout){ + ATH_MSG_ERROR("Error while opening connection to APE! Timeout while trying to open the connection"); + t->detach(); + delete t; + return false; + }else{ + ATH_MSG_INFO ("Connection successfully opened to "<<m_commChannelSend); + if(m_isConnected){ + t->join(); + delete t; + } + return true; + } + }catch(std::exception &ex){ + ATH_MSG_ERROR("Error while opening connection to APE! "<<ex.what()); + m_isConnected=false; + } + } + return false; +} + +bool OffloadSvc::closeCommChannel(bool preFork){ + if(m_isConnected){ + ATH_MSG_INFO ("closing connection to APE server"); + APE::BufferContainer b(sizeof(int)); + b.setModule(SERVER_MODULE);//Openning connection; + if(preFork){ + b.setAlgorithm(ALG_CLOSING_FOR_FORKING);//Closing connection for reopen; + }else{ + b.setAlgorithm(ALG_CLOSING_CONNECTION);//Closing connection; + } + *(int*)b.getBuffer()=(int)getpid(); + m_sendSock->send(APE::BufferAccessor::getRawBuffer(b),b.getTransferSize()); + m_sendSock.reset(); + m_recvSock.reset(); + m_isConnected=false; + return true; + } + return false; +}