Commit da139e8b authored by Atlas-Software Librarian's avatar Atlas-Software Librarian Committed by Graeme Stewart
Browse files

'CMakeLists.txt' (OffloadSvc-00-00-08)

	* Tagging OffloadSvc-00-00-07.
	* src/OffloadSvc.cxx (initialize): Fix coverity warning
	(unreachable code).
parent 4f033f7a
################################################################################
# Package: OffloadSvc
################################################################################
# Declare the package name:
atlas_subdir( OffloadSvc )
# Declare the package's dependencies:
atlas_depends_on_subdirs( PUBLIC
Control/AthenaBaseComps
GaudiKernel )
# External dependencies:
find_package( APE )
find_package( yampl )
# this line failed automatic conversion in cmt2cmake :
# action checkreq "echo 'skipping checkreq'"
# Component(s) in the package:
atlas_add_component( OffloadSvc
src/OffloadSvc.cxx
src/IOffloadSvc.cxx
src/components/*.cxx
INCLUDE_DIRS ${YAMPL_INCLUDE_DIRS} ${APE_INCLUDE_DIRS}
LINK_LIBRARIES ${YAMPL_LIBRARIES} ${APE_LIBRARIES} AthenaBaseComps GaudiKernel )
# Install files from the package:
atlas_install_headers( OffloadSvc )
......@@ -47,7 +47,7 @@ class IOffloadSvc : virtual public IService
/// Delivers important informations
//virtual StatusCode qotd( std::string& quote ) = 0;
virtual StatusCode sendData(std::unique_ptr<APE::BufferContainer> &buff, int &token)=0;
virtual StatusCode sendData(std::unique_ptr<APE::BufferContainer> &buff, int &token, bool requiresResponse=true)=0;
virtual StatusCode receiveData(std::unique_ptr<APE::BufferContainer> &buff, int token, int timeOut=-1)=0;
/// identifier for the framework
static const InterfaceID& interfaceID();
......
......@@ -19,10 +19,12 @@
#include <chrono>
#include <mutex>
#include <condition_variable>
#include <memory>
// FrameWork includes
#include "AthenaBaseComps/AthService.h"
#include "GaudiKernel/IIncidentListener.h"
#include "GaudiKernel/ServiceHandle.h"
// OffloadManagerSvc includes
#include "OffloadSvc/IOffloadSvc.h"
#include "yampl/SocketFactory.h"
......@@ -33,8 +35,9 @@ template <class TT> class SvcFactory;
namespace APE{
class BufferContainer;
}
class IIncidentSvc;
class OffloadSvc : virtual public IOffloadSvc, public AthService{
class OffloadSvc : virtual public IOffloadSvc, public AthService,public virtual IIncidentListener{
protected:
friend class SvcFactory<OffloadSvc>;
......@@ -65,11 +68,9 @@ public:
///////////////////////////////////////////////////////////////////
static const InterfaceID& interfaceID();
// /** The very important message of the day
// */
// StatusCode qotd( std::string& quote );
virtual StatusCode sendData(std::unique_ptr<APE::BufferContainer> &buff, int &token);
virtual StatusCode sendData(std::unique_ptr<APE::BufferContainer> &buff, int &token,bool requiresResponse=true);
virtual StatusCode receiveData(std::unique_ptr<APE::BufferContainer> &buff, int token, int timeOut=-1);
virtual void handle(const Incident &);
///////////////////////////////////////////////////////////////////
// Private methods:
///////////////////////////////////////////////////////////////////
......@@ -77,7 +78,8 @@ private:
/// Default constructor:
OffloadSvc();
bool openCommChannel(bool postFork=false);
bool closeCommChannel(bool preFork=false);
///////////////////////////////////////////////////////////////////
// Private data:
///////////////////////////////////////////////////////////////////
......@@ -91,15 +93,18 @@ private:
size_t uploadSize;
size_t downloadSize;
};
std::string m_connName;
std::string m_connType;
std::string m_commChannelSend;
std::string m_commChannelRecv;
bool m_useUID;
bool m_isConnected;
std::map<int,OffloadSvc::TransferStats> m_stats;
yampl::ISocket *m_mySocket,*m_downSock;
std::shared_ptr<yampl::ISocket> m_sendSock,m_recvSock;
std::queue<int> m_tokens;
int m_maxTokens;
std::condition_variable m_tCond;
std::mutex m_cMutex;
};
/// I/O operators
......
......@@ -12,11 +12,13 @@
// FrameWork includes
#include "GaudiKernel/Property.h"
#include "GaudiKernel/SvcFactory.h"
#include "GaudiKernel/IIncidentSvc.h"
// OffloadManagerSvc includes
#include "OffloadSvc/OffloadSvc.h"
#include "APE/BufferContainer.hpp"
#include "APE/BufferAccessor.hpp"
#include "APE/ServerDefines.hpp"
#include <sys/types.h>
#include <unistd.h>
#include <thread>
......@@ -26,11 +28,18 @@
////////////////
OffloadSvc::OffloadSvc( const std::string& name, ISvcLocator* pSvcLocator ) :
AthService ( name, pSvcLocator ){
declareProperty( "ConnectionName",
m_connName = "apeSock", "Name of the APE socket");
declareProperty( "ConnectionType",
m_connType = "SHM", "Connection type [SHM,PIPE,ZMQ]");
declareProperty( "CommChannelSend",
m_commChannelSend = "apeSock", "Name of the APE socket for sending request");
declareProperty( "CommChannelRecv",
m_commChannelRecv = "apeSock_upload", "Name of the APE socket for receiving results");
declareProperty( "CommChannelUserUID",
m_useUID=true, "Whether to add uid to comm channels");
m_isConnected=false;
m_mySocket=0;
m_downSock=0;
m_sendSock=0;
m_recvSock=0;
m_maxTokens=0;
//std::cout<<__PRETTY_FUNCTION__<<"Constructing Service SAMI"<<std::endl;
......@@ -41,9 +50,15 @@ OffloadSvc::OffloadSvc( const std::string& name, ISvcLocator* pSvcLocator ) :
OffloadSvc::~OffloadSvc(){
}
StatusCode OffloadSvc::sendData(std::unique_ptr<APE::BufferContainer> &buff, int &token){
//std::cout<<__PRETTY_FUNCTION__<<"Received SendData SAMI"<<std::endl;
// MsgStream athenaLog(msgSvc(), name());
StatusCode OffloadSvc::sendData(std::unique_ptr<APE::BufferContainer> &buff, int &token,bool requiresResponse){
if(!m_isConnected){
msg(MSG::ERROR)<<"APE Server communication was not open. Initiating communication"<<endreq;
bool rc=openCommChannel();
if(!rc){
msg(MSG::FATAL)<<"APE Server communication Failed"<<endreq;
return StatusCode::FAILURE;
}
}
if(!buff){
msg(MSG::ERROR)<<"BufferContainer is 0 "<<endreq;
return StatusCode::FAILURE;
......@@ -52,7 +67,6 @@ StatusCode OffloadSvc::sendData(std::unique_ptr<APE::BufferContainer> &buff, int
msg(MSG::ERROR)<<"Accelerator Process Extension connection is not open!"<<endreq;
return StatusCode::FAILURE;
}
//msg(MSG::DEBUG)<<"Received sendData request with algorithm ="<<buff->getAlgorithm()<<" and payloadsize="<<buff->getPayloadSize()<<" totalSize="<<buff->getTransferSize());
if(m_tokens.size()){
token=m_tokens.front();
m_tokens.pop();
......@@ -69,38 +83,50 @@ StatusCode OffloadSvc::sendData(std::unique_ptr<APE::BufferContainer> &buff, int
<<" TransferSize="<<buff->getTransferSize()
<<" userBuffer="<<buff->getBuffer()
<<endreq;
TransferStats ts;
ts.sendStart=std::chrono::system_clock::now();
m_mySocket->send(APE::BufferAccessor::getRawBuffer(*buff),buff->getTransferSize());
ts.sendEnd=std::chrono::system_clock::now();
ts.uploadSize=buff->getTransferSize();
ts.downloadSize = 0;
m_stats[token]=ts;
return StatusCode::SUCCESS;
if(requiresResponse){// will get reply
TransferStats ts;
ts.sendStart=std::chrono::system_clock::now();
m_sendSock->send(APE::BufferAccessor::getRawBuffer(*buff),buff->getTransferSize());
ts.sendEnd=std::chrono::system_clock::now();
ts.uploadSize=buff->getTransferSize();
ts.downloadSize = 0;
m_stats[token]=ts;
}else{// doesn't expect reply
m_sendSock->send(APE::BufferAccessor::getRawBuffer(*buff),buff->getTransferSize());
m_tokens.push(token);
}
return StatusCode::SUCCESS;
}
StatusCode OffloadSvc::receiveData(std::unique_ptr<APE::BufferContainer> &buff, int token, int timeOut){
if(!m_isConnected){
msg(MSG::ERROR)<<"APE Server communication was not open. Initiating communication"<<endreq;
bool rc=openCommChannel();
if(!rc){
msg(MSG::FATAL)<<"APE Server communication Failed"<<endreq;
return StatusCode::FAILURE;
}
}
ssize_t recvdSize=0;
//MsgStream athenaLog(msgSvc(), name());
void* rawBuffer=APE::BufferAccessor::getRawBuffer(*buff);
if(timeOut>0){
ATH_MSG_WARNING("Timeout parameter might not be implemented yet");
recvdSize=m_downSock->tryRecv(rawBuffer,APE::BufferAccessor::getRawBufferSize(*buff),timeOut);
recvdSize=m_recvSock->tryRecv(rawBuffer,APE::BufferAccessor::getRawBufferSize(*buff),timeOut);
if(recvdSize < static_cast<ssize_t>(sizeof(APE::APEHeaders))){
ATH_MSG_WARNING("Received possibly corrupt data. Trying again");
recvdSize=m_downSock->tryRecv(rawBuffer,APE::BufferAccessor::getRawBufferSize(*buff),timeOut);
recvdSize=m_recvSock->tryRecv(rawBuffer,APE::BufferAccessor::getRawBufferSize(*buff),timeOut);
}
if(recvdSize < static_cast<ssize_t>(sizeof(APE::APEHeaders))){
ATH_MSG_ERROR("Received corrupt data on both trials");
return StatusCode::FAILURE;
}
}else{
recvdSize=m_downSock->recv(rawBuffer,APE::BufferAccessor::getRawBufferSize(*buff));
recvdSize=m_recvSock->recv(rawBuffer,APE::BufferAccessor::getRawBufferSize(*buff));
if(recvdSize < static_cast<ssize_t>(sizeof(APE::APEHeaders))){
ATH_MSG_WARNING("Received possibly corrupt data. Trying again");
recvdSize=m_downSock->recv(rawBuffer,APE::BufferAccessor::getRawBufferSize(*buff));
recvdSize=m_recvSock->recv(rawBuffer,APE::BufferAccessor::getRawBufferSize(*buff));
}
if(recvdSize < static_cast<ssize_t>(sizeof(APE::APEHeaders))){
ATH_MSG_ERROR("Received corrupt data on both trials");
......@@ -155,71 +181,21 @@ StatusCode OffloadSvc::receiveData(std::unique_ptr<APE::BufferContainer> &buff,
StatusCode OffloadSvc::initialize()
{
ATH_MSG_INFO ("Initializing " << name() << "...");
try{
char buff[1000];
uid_t userUID=geteuid();
snprintf(buff,1000,"%s_%u",m_connName.c_str(),userUID);
ATH_MSG_INFO ("Upload connection Name=: [" << buff << "]");
yampl::Channel apeChannel(buff,yampl::LOCAL_SHM);
snprintf(buff,1000,"%s_%u_upload",m_connName.c_str(),userUID);
ATH_MSG_INFO ("download connection Name=: [" << buff << "]");
yampl::Channel downChannel(buff,yampl::LOCAL_SHM);
yampl::SocketFactory sf;
m_mySocket=sf.createClientSocket(apeChannel);
m_downSock=sf.createClientSocket(downChannel);
std::thread *t=new std::thread([&,this](){
APE::BufferContainer b(sizeof(int));
b.setModule(0xffffffff);//Openning connection;
b.setAlgorithm(0xefffffff);//Openning connection;
*(int*)b.getBuffer()=(int)getpid();
//ATH_MSG_INFO("Trying to open connections");
this->m_mySocket->send(APE::BufferAccessor::getRawBuffer(b),b.getTransferSize());
this->m_downSock->send(APE::BufferAccessor::getRawBuffer(b),b.getTransferSize());
this->m_isConnected=true;
this->m_tCond.notify_all();
//ATH_MSG_INFO("Connections opened");
}
);
std::unique_lock<std::mutex> lock(m_cMutex);
ATH_MSG_INFO("Waiting for connections");
if(m_tCond.wait_for(lock,std::chrono::milliseconds(3000))==std::cv_status::timeout){
ATH_MSG_ERROR("Error while opening connection to APE! Timeout while trying to open the connection");
t->detach();
delete t;
return StatusCode::FAILURE;
}else{
ATH_MSG_INFO ("Connection successfully opened");
if(m_isConnected){
t->join();
delete t;
}
return StatusCode::SUCCESS;
}
}catch(std::exception &ex){
ATH_MSG_ERROR("Error while opening connection to APE! "<<ex.what());
m_isConnected=false;
return StatusCode::FAILURE;
IIncidentSvc* p_incSvc;
auto sc=service("IncidentSvc",p_incSvc);
if(sc!=StatusCode::SUCCESS){
return sc;
}
ATH_MSG_INFO ("Connection successfully opened");
return StatusCode::SUCCESS;
p_incSvc->addListener(this,"PostFork",-1l);
p_incSvc->addListener(this,"BeforeFork",-100l);
return sc;
}
StatusCode OffloadSvc::finalize()
{
ATH_MSG_INFO ("Finalizing " << name() << "...");
if(m_isConnected){
ATH_MSG_INFO ("closing connection to APE server");
APE::BufferContainer b(sizeof(int));
b.setModule(0xffffffff);//Openning connection;
b.setAlgorithm(0xffffffff);//Closing connection;
*(int*)b.getBuffer()=(int)getpid();
m_mySocket->send(APE::BufferAccessor::getRawBuffer(b),b.getTransferSize());
delete m_mySocket;
delete m_downSock;
m_mySocket=0;
m_downSock=0;
}
return StatusCode::SUCCESS;
bool rv=closeCommChannel();
return (rv?StatusCode::SUCCESS:StatusCode::FAILURE);
}
// Query the interfaces.
......@@ -239,3 +215,109 @@ OffloadSvc::queryInterface(const InterfaceID& riid, void** ppvInterface) {
return StatusCode::SUCCESS;
}
void OffloadSvc::handle(const Incident& inc){
ATH_MSG_INFO("Got incident "<<inc.type()<<" from "<<inc.source());
if(inc.type()=="BeginRun"){
}else if(inc.type()=="BeforeFork"){
if(m_isConnected){
closeCommChannel(true);
}
std::this_thread::sleep_for(std::chrono::seconds(1));
}else if(inc.type()=="PostFork"){
if(!m_isConnected){
openCommChannel(true);
}
}
}
bool OffloadSvc::openCommChannel(bool postFork){
if(!m_isConnected){
try{
char buff[1000];
uid_t userUID=geteuid();
auto chanType=yampl::LOCAL_SHM;
if(m_connType=="PIPE"){
chanType=yampl::LOCAL_PIPE;
}else if(m_connType=="ZMQ"){
chanType=yampl::DISTRIBUTED;
}
if(m_commChannelSend.empty())return false;
if(m_commChannelRecv.empty())return false;
if(chanType!=yampl::DISTRIBUTED && m_useUID){
snprintf(buff,1000,"%s_%u",m_commChannelSend.c_str(),userUID);
}else{
snprintf(buff,1000,"%s",m_commChannelSend.c_str());
}
ATH_MSG_INFO("Send channel = "<<buff);
yampl::Channel channelSend(buff,chanType);
if(chanType!=yampl::DISTRIBUTED && m_useUID){
snprintf(buff,1000,"%s_%u",m_commChannelRecv.c_str(),userUID);
}else{
snprintf(buff,1000,"%s",m_commChannelRecv.c_str());
}
ATH_MSG_INFO("Receive channel = "<<buff);
yampl::Channel channelRecv(buff,chanType);
yampl::SocketFactory factory;
m_sendSock.reset(factory.createClientSocket(channelSend));
m_recvSock.reset(factory.createClientSocket(channelRecv));
std::thread *t=new std::thread([&,this](){
APE::BufferContainer b(sizeof(int));
b.setModule(SERVER_MODULE);//Openning connection;
if(postFork){
b.setAlgorithm(ALG_OPENING_AFTER_FORKING);//Openning connection;
}else{
b.setAlgorithm(ALG_OPENING_CONNECTION);//Openning connection;
}
*(int*)b.getBuffer()=(int)getpid();
//ATH_MSG_INFO("Trying to open connections");
this->m_sendSock->send(APE::BufferAccessor::getRawBuffer(b),b.getTransferSize());
this->m_recvSock->send(APE::BufferAccessor::getRawBuffer(b),b.getTransferSize());
this->m_isConnected=true;
this->m_tCond.notify_all();
//ATH_MSG_INFO("Connections opened");
}
);
std::unique_lock<std::mutex> lock(m_cMutex);
ATH_MSG_INFO("Waiting for connections");
if(m_tCond.wait_for(lock,std::chrono::milliseconds(3000))==std::cv_status::timeout){
ATH_MSG_ERROR("Error while opening connection to APE! Timeout while trying to open the connection");
t->detach();
delete t;
return false;
}else{
ATH_MSG_INFO ("Connection successfully opened to "<<m_commChannelSend);
if(m_isConnected){
t->join();
delete t;
}
return true;
}
}catch(std::exception &ex){
ATH_MSG_ERROR("Error while opening connection to APE! "<<ex.what());
m_isConnected=false;
}
}
return false;
}
bool OffloadSvc::closeCommChannel(bool preFork){
if(m_isConnected){
ATH_MSG_INFO ("closing connection to APE server");
APE::BufferContainer b(sizeof(int));
b.setModule(SERVER_MODULE);//Openning connection;
if(preFork){
b.setAlgorithm(ALG_CLOSING_FOR_FORKING);//Closing connection for reopen;
}else{
b.setAlgorithm(ALG_CLOSING_CONNECTION);//Closing connection;
}
*(int*)b.getBuffer()=(int)getpid();
m_sendSock->send(APE::BufferAccessor::getRawBuffer(b),b.getTransferSize());
m_sendSock.reset();
m_recvSock.reset();
m_isConnected=false;
return true;
}
return false;
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment