Skip to content
GitLab
Menu
Projects
Groups
Snippets
Loading...
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Sign in
Toggle navigation
Menu
Open sidebar
atlas
athena
Commits
b5bc5a9d
Commit
b5bc5a9d
authored
Jul 28, 2016
by
Scott Snyder
Committed by
Graeme Stewart
Dec 02, 2016
Browse files
'endreq -> endmsg.' (OffloadSvc-00-00-09)
Former-commit-id:
800233e7
parent
049a8896
Changes
8
Hide whitespace changes
Inline
Side-by-side
Offloading/OffloadSvc/CMakeLists.txt
0 → 100644
View file @
b5bc5a9d
################################################################################
# Package: OffloadSvc
################################################################################
# Declare the package name:
atlas_subdir
(
OffloadSvc
)
# Declare the package's dependencies:
atlas_depends_on_subdirs
(
PUBLIC
Control/AthenaBaseComps
Control/SGTools
Control/StoreGate
Event/EventInfo
GaudiKernel
)
# External dependencies:
find_package
(
APE
)
find_package
(
yampl
)
# this line failed automatic conversion in cmt2cmake :
# action checkreq "echo 'skipping checkreq'"
# Component(s) in the package:
atlas_add_component
(
OffloadSvc
src/OffloadSvc.cxx
src/IOffloadSvc.cxx
src/components/*.cxx
INCLUDE_DIRS
${
YAMPL_INCLUDE_DIRS
}
${
APE_INCLUDE_DIRS
}
LINK_LIBRARIES
${
YAMPL_LIBRARIES
}
${
APE_LIBRARIES
}
AthenaBaseComps SGTools StoreGateLib SGtests EventInfo GaudiKernel
)
# Install files from the package:
atlas_install_headers
(
OffloadSvc
)
Offloading/OffloadSvc/OffloadSvc/IOffloadSvc.h
0 → 100644
View file @
b5bc5a9d
///////////////////////// -*- C++ -*- /////////////////////////////
/*
Copyright (C) 2002-2017 CERN for the benefit of the ATLAS collaboration
*/
// IOffloadSvc.h
// Header file for class IOffloadSvc
// Author: Sami Kama <sami.kama@cern.ch>
///////////////////////////////////////////////////////////////////
#ifndef OFFLOADSERVICE_IOFFLOADSVC_H
#define OFFLOADSERVICE_IOFFLOADSVC_H
/** @class IOffloadSvc
* This is the interface to a test service
*/
// STL includes
// FrameWork includes
#include <memory>
#include "GaudiKernel/IService.h"
// forward declaration
namespace
APE
{
class
BufferContainer
;
}
class
IOffloadSvc
:
virtual
public
IService
{
///////////////////////////////////////////////////////////////////
// Public methods:
///////////////////////////////////////////////////////////////////
public:
/** Destructor:
*/
virtual
~
IOffloadSvc
();
///////////////////////////////////////////////////////////////////
// Const methods:
///////////////////////////////////////////////////////////////////
///////////////////////////////////////////////////////////////////
// Non-const methods:
///////////////////////////////////////////////////////////////////
/// Delivers important informations
//virtual StatusCode qotd( std::string& quote ) = 0;
virtual
StatusCode
sendData
(
std
::
unique_ptr
<
APE
::
BufferContainer
>
&
buff
,
int
&
token
,
bool
requiresResponse
=
true
)
=
0
;
virtual
StatusCode
receiveData
(
std
::
unique_ptr
<
APE
::
BufferContainer
>
&
buff
,
int
token
,
int
timeOut
=-
1
)
=
0
;
/// identifier for the framework
static
const
InterfaceID
&
interfaceID
();
};
// I/O operators
//////////////////////
///////////////////////////////////////////////////////////////////
// Inline methods:
///////////////////////////////////////////////////////////////////
inline
const
InterfaceID
&
IOffloadSvc
::
interfaceID
()
{
static
const
InterfaceID
IID_IOffloadSvc
(
"IOffloadSvc"
,
1
,
0
);
return
IID_IOffloadSvc
;
}
#endif
Offloading/OffloadSvc/OffloadSvc/OffloadSvc.h
0 → 100644
View file @
b5bc5a9d
// -------------------- -*- C++ -*- --------------------
/*
Copyright (C) 2002-2017 CERN for the benefit of the ATLAS collaboration
*/
// OffloadSvc.h
// Header file for class OffloadSvc
// Author: Sami Kama <sami.kama@cern.ch>
// -----------------------------------------------------
#ifndef OFFLOADSERVICE_OFFLOADSVC_H
#define OFFLOADSERVICE_OFFLOADSVC_H
// STL includes
#include <string>
#include <queue>
#include <chrono>
#include <map>
#include <chrono>
#include <mutex>
#include <condition_variable>
#include <memory>
// FrameWork includes
#include "AthenaBaseComps/AthService.h"
#include "GaudiKernel/IIncidentListener.h"
#include "GaudiKernel/ServiceHandle.h"
// OffloadManagerSvc includes
#include "OffloadSvc/IOffloadSvc.h"
#include "yampl/SocketFactory.h"
#include "StoreGate/StoreGateSvc.h"
// Forward declaration
template
<
class
TT
>
class
SvcFactory
;
namespace
APE
{
class
BufferContainer
;
}
class
IIncidentSvc
;
class
OffloadSvc
:
virtual
public
IOffloadSvc
,
public
AthService
,
public
virtual
IIncidentListener
{
protected:
friend
class
SvcFactory
<
OffloadSvc
>
;
public:
/// Constructor with parameters:
OffloadSvc
(
const
std
::
string
&
name
,
ISvcLocator
*
pSvcLocator
);
/// Destructor:
virtual
~
OffloadSvc
();
/// Gaudi Service Implementation
//@{
StatusCode
initialize
();
StatusCode
finalize
();
virtual
StatusCode
queryInterface
(
const
InterfaceID
&
riid
,
void
**
ppvInterface
);
//@}
///////////////////////////////////////////////////////////////////
// Const methods:
///////////////////////////////////////////////////////////////////
///////////////////////////////////////////////////////////////////
// Non-const methods:
///////////////////////////////////////////////////////////////////
static
const
InterfaceID
&
interfaceID
();
virtual
StatusCode
sendData
(
std
::
unique_ptr
<
APE
::
BufferContainer
>
&
buff
,
int
&
token
,
bool
requiresResponse
=
true
);
virtual
StatusCode
receiveData
(
std
::
unique_ptr
<
APE
::
BufferContainer
>
&
buff
,
int
token
,
int
timeOut
=-
1
);
virtual
void
handle
(
const
Incident
&
);
///////////////////////////////////////////////////////////////////
// Private methods:
///////////////////////////////////////////////////////////////////
private:
/// Default constructor:
OffloadSvc
();
bool
openCommChannel
(
bool
postFork
=
false
);
bool
closeCommChannel
(
bool
preFork
=
false
);
///////////////////////////////////////////////////////////////////
// Private data:
///////////////////////////////////////////////////////////////////
private:
/// The quote of the day
//StringProperty m_qotd;
struct
TransferStats
{
std
::
chrono
::
system_clock
::
time_point
sendStart
;
std
::
chrono
::
system_clock
::
time_point
sendEnd
;
size_t
uploadSize
;
size_t
downloadSize
;
};
std
::
string
m_connType
;
std
::
string
m_commChannelSend
;
std
::
string
m_commChannelRecv
;
uint
m_forkDelay
;
bool
m_useUID
;
bool
m_isConnected
;
bool
m_dumpOffloadRequests
;
bool
m_dumpReplies
;
std
::
map
<
int
,
OffloadSvc
::
TransferStats
>
m_stats
;
std
::
shared_ptr
<
yampl
::
ISocket
>
m_sendSock
,
m_recvSock
;
std
::
queue
<
int
>
m_tokens
;
int
m_maxTokens
;
std
::
condition_variable
m_tCond
;
std
::
mutex
m_cMutex
;
uint64_t
m_currEvt
;
int
m_requestNumber
;
ServiceHandle
<
StoreGateSvc
>
m_evtStore
;
yampl
::
ISocketFactory
*
m_fact
;
};
/// I/O operators
//////////////////////
///////////////////////////////////////////////////////////////////
/// Inline methods:
///////////////////////////////////////////////////////////////////
inline
const
InterfaceID
&
OffloadSvc
::
interfaceID
()
{
return
IOffloadSvc
::
interfaceID
();
}
#endif
Offloading/OffloadSvc/cmt/requirements
0 → 100644
View file @
b5bc5a9d
package
OffloadSvc
author
Sami
Kama
<
sami
.
kama
@
cern
.
ch
>
use
AtlasPolicy
AtlasPolicy
-*
use
GaudiInterface
GaudiInterface
-*
External
use
AthenaBaseComps
AthenaBaseComps
-*
Control
use
StoreGate
StoreGate
-*
Control
use
EventInfo
EventInfo
-*
Event
use
SGTools
SGTools
-*
Control
library
OffloadSvc
"OffloadSvc.cxx \
IOffloadSvc.cxx"
\
-
s
=
components
*.
cxx
macro_append
OffloadSvc_shlibflags
" ${APE_linkopts} "
apply_pattern
component_library
apply_pattern
declare_joboptions
\
files
=
"*.py"
apply_pattern
declare_python_modules
\
files
=
"*.py"
action
checkreq
"echo 'skipping checkreq'"
use
APEGlue
APEGlue
-*
External
apply_pattern
install_runtime
Offloading/OffloadSvc/src/IOffloadSvc.cxx
0 → 100644
View file @
b5bc5a9d
///////////////////////// -*- C++ -*- /////////////////////////////
/*
Copyright (C) 2002-2017 CERN for the benefit of the ATLAS collaboration
*/
// IOffloadSvc.cxx
// Implementation file for class IOffloadSvc
// Author: Sami Kama <sami.kama@cern.ch
///////////////////////////////////////////////////////////////////
// OffloadManagerSvc includes
#include "OffloadSvc/IOffloadSvc.h"
///////////////////////////////////////////////////////////////////
// Public methods:
///////////////////////////////////////////////////////////////////
// Constructors
////////////////
// Destructor
///////////////
IOffloadSvc
::~
IOffloadSvc
()
{}
///////////////////////////////////////////////////////////////////
// Const methods:
///////////////////////////////////////////////////////////////////
///////////////////////////////////////////////////////////////////
// Non-const methods:
///////////////////////////////////////////////////////////////////
///////////////////////////////////////////////////////////////////
// Protected methods:
///////////////////////////////////////////////////////////////////
///////////////////////////////////////////////////////////////////
// Const methods:
///////////////////////////////////////////////////////////////////
///////////////////////////////////////////////////////////////////
// Non-const methods:
///////////////////////////////////////////////////////////////////
Offloading/OffloadSvc/src/OffloadSvc.cxx
0 → 100644
View file @
b5bc5a9d
///////////////////////// -*- C++ -*- /////////////////////////////
/*
Copyright (C) 2002-2017 CERN for the benefit of the ATLAS collaboration
*/
// OffloadSvc.cxx
// Implementation file for class OffloadSvc
// Author: Sami Kama <sami.kama@cern.ch>
///////////////////////////////////////////////////////////////////
// FrameWork includes
#include "GaudiKernel/Property.h"
#include "GaudiKernel/SvcFactory.h"
#include "GaudiKernel/IIncidentSvc.h"
// OffloadManagerSvc includes
#include "OffloadSvc/OffloadSvc.h"
#include "APE/BufferContainer.hpp"
#include "APE/BufferAccessor.hpp"
#include "APE/ServerDefines.hpp"
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <unistd.h>
#include <thread>
#include <chrono>
#include <random>
#include "EventInfo/EventInfo.h"
#include "EventInfo/EventID.h"
#include "uuid/uuid.h"
// Constructors
////////////////
void
nullDealloc
(
void
*
,
void
*
){}
OffloadSvc
::
OffloadSvc
(
const
std
::
string
&
name
,
ISvcLocator
*
pSvcLocator
)
:
AthService
(
name
,
pSvcLocator
),
m_evtStore
(
"StoreGateSvc/StoreGateSvc"
,
name
){
declareProperty
(
"ConnectionType"
,
m_connType
=
"SHM"
,
"Connection type [SHM,PIPE,ZMQ]"
);
declareProperty
(
"CommChannelSend"
,
m_commChannelSend
=
"apeSock"
,
"Name of the APE socket for sending request"
);
declareProperty
(
"CommChannelRecv"
,
m_commChannelRecv
=
"apeSock_upload"
,
"Name of the APE socket for receiving results"
);
declareProperty
(
"CommChannelUserUID"
,
m_useUID
=
true
,
"Whether to add uid to comm channels"
);
declareProperty
(
"ForkDelay"
,
m_forkDelay
=
0
,
"Maximum of random range that will be waited before openning connection to server "
);
declareProperty
(
"DumpOffloadRequests"
,
m_dumpOffloadRequests
=
false
,
"Whether to write offload requests to a file"
);
declareProperty
(
"DumpReplies"
,
m_dumpReplies
=
false
,
"Whether to write responses to a file"
);
m_isConnected
=
false
;
m_sendSock
=
0
;
m_recvSock
=
0
;
m_maxTokens
=
0
;
m_currEvt
=
0
;
//std::cout<<__PRETTY_FUNCTION__<<"Constructing Service SAMI"<<std::endl;
m_evtStore
=
ServiceHandle
<
StoreGateSvc
>
(
"StoreGateSvc"
,
name
);
m_fact
=
0
;
}
// Destructor
///////////////
OffloadSvc
::~
OffloadSvc
(){
}
StatusCode
OffloadSvc
::
sendData
(
std
::
unique_ptr
<
APE
::
BufferContainer
>
&
buff
,
int
&
token
,
bool
requiresResponse
){
msg
(
MSG
::
VERBOSE
)
<<
"Offload request! pid="
<<
getpid
()
<<
endmsg
;
if
(
!
m_isConnected
){
msg
(
MSG
::
WARNING
)
<<
"APE Server communication was not open while offload was requested. Initiating communication!"
<<
endmsg
;
bool
rc
=
openCommChannel
();
if
(
!
rc
){
msg
(
MSG
::
FATAL
)
<<
"APE Server communication Failed"
<<
endmsg
;
return
StatusCode
::
FAILURE
;
}
}
if
(
!
buff
){
msg
(
MSG
::
ERROR
)
<<
"BufferContainer is 0 "
<<
endmsg
;
return
StatusCode
::
FAILURE
;
}
if
(
!
m_isConnected
){
msg
(
MSG
::
ERROR
)
<<
"Accelerator Process Extension connection is not open!"
<<
endmsg
;
return
StatusCode
::
FAILURE
;
}
if
(
m_tokens
.
size
()){
token
=
m_tokens
.
front
();
m_tokens
.
pop
();
}
else
{
token
=
m_maxTokens
;
m_maxTokens
++
;
}
APE
::
BufferAccessor
::
setToken
(
*
buff
,
token
);
msg
(
MSG
::
DEBUG
)
<<
" Sending Data, Algorithm="
<<
buff
->
getAlgorithm
()
<<
" token="
<<
buff
->
getToken
()
<<
" module="
<<
buff
->
getModule
()
<<
" payloadSize="
<<
buff
->
getPayloadSize
()
<<
" TransferSize="
<<
buff
->
getTransferSize
()
<<
" userBuffer="
<<
buff
->
getBuffer
()
<<
" needsReply="
<<
requiresResponse
<<
endmsg
;
if
(
requiresResponse
){
// will get reply
TransferStats
ts
;
ts
.
sendStart
=
std
::
chrono
::
system_clock
::
now
();
m_sendSock
->
send
(
APE
::
BufferAccessor
::
getRawBuffer
(
*
buff
),
buff
->
getTransferSize
());
ts
.
sendEnd
=
std
::
chrono
::
system_clock
::
now
();
ts
.
uploadSize
=
buff
->
getTransferSize
();
ts
.
downloadSize
=
0
;
m_stats
[
token
]
=
ts
;
}
else
{
// doesn't expect reply
m_sendSock
->
send
(
APE
::
BufferAccessor
::
getRawBuffer
(
*
buff
),
buff
->
getTransferSize
());
m_tokens
.
push
(
token
);
}
if
(
m_dumpOffloadRequests
){
const
EventInfo
*
eventInfo
;
StatusCode
scEv
=
m_evtStore
->
retrieve
(
eventInfo
);
uint64_t
eventNumber
=
0
;
if
(
scEv
.
isFailure
())
{
ATH_MSG_WARNING
(
"Could not retrieve event info"
);
}
else
{
eventNumber
=
eventInfo
->
event_ID
()
->
event_number
();
}
if
(
m_currEvt
!=
eventNumber
){
m_currEvt
=
eventNumber
;
m_requestNumber
=
0
;
}
ATH_MSG_DEBUG
(
" Event "
<<
eventNumber
);
char
fnamBuff
[
1024
];
snprintf
(
fnamBuff
,
1024
,
"Req-%08ld-%d-%d-%d-%d.dat"
,
eventNumber
,
(
buff
->
getModule
()),
(
buff
->
getAlgorithm
()),
m_requestNumber
,(
requiresResponse
?
1
:
0
));
int
fd
=
open
(
fnamBuff
,
O_CREAT
|
O_TRUNC
|
O_WRONLY
,(
S_IRWXU
^
S_IXUSR
)
|
(
S_IRWXG
^
S_IXGRP
)
|
(
S_IRWXO
^
S_IXOTH
));
write
(
fd
,
APE
::
BufferAccessor
::
getRawBuffer
(
*
buff
),
buff
->
getTransferSize
());
close
(
fd
);
m_requestNumber
++
;
}
return
StatusCode
::
SUCCESS
;
}
StatusCode
OffloadSvc
::
receiveData
(
std
::
unique_ptr
<
APE
::
BufferContainer
>
&
buff
,
int
token
,
int
timeOut
){
if
(
!
m_isConnected
){
msg
(
MSG
::
WARNING
)
<<
"APE Server communication was not open during data request. Initiating communication"
<<
endmsg
;
bool
rc
=
openCommChannel
();
if
(
!
rc
){
msg
(
MSG
::
FATAL
)
<<
"APE Server communication Failed"
<<
endmsg
;
return
StatusCode
::
FAILURE
;
}
}
ssize_t
recvdSize
=
0
;
//MsgStream athenaLog(msgSvc(), name());
void
*
rawBuffer
=
APE
::
BufferAccessor
::
getRawBuffer
(
*
buff
);
if
(
timeOut
>
0
){
ATH_MSG_WARNING
(
"Timeout parameter might not be implemented yet"
);
recvdSize
=
m_recvSock
->
tryRecv
(
rawBuffer
,
APE
::
BufferAccessor
::
getRawBufferSize
(
*
buff
),
timeOut
);
if
(
recvdSize
<
static_cast
<
ssize_t
>
(
sizeof
(
APE
::
APEHeaders
))){
ATH_MSG_WARNING
(
"Received possibly corrupt data. Trying again"
<<
getpid
());
recvdSize
=
m_recvSock
->
tryRecv
(
rawBuffer
,
APE
::
BufferAccessor
::
getRawBufferSize
(
*
buff
),
timeOut
);
}
if
(
recvdSize
<
static_cast
<
ssize_t
>
(
sizeof
(
APE
::
APEHeaders
))){
ATH_MSG_ERROR
(
"Received corrupt data on both trials"
);
return
StatusCode
::
FAILURE
;
}
}
else
{
recvdSize
=
m_recvSock
->
recv
(
rawBuffer
,
APE
::
BufferAccessor
::
getRawBufferSize
(
*
buff
));
if
(
recvdSize
<
static_cast
<
ssize_t
>
(
sizeof
(
APE
::
APEHeaders
))){
ATH_MSG_WARNING
(
"Received possibly corrupt data. Trying again "
<<
getpid
());
recvdSize
=
m_recvSock
->
recv
(
rawBuffer
,
APE
::
BufferAccessor
::
getRawBufferSize
(
*
buff
));
}
if
(
recvdSize
<
static_cast
<
ssize_t
>
(
sizeof
(
APE
::
APEHeaders
))){
ATH_MSG_ERROR
(
"Received corrupt data on both trials "
<<
getpid
());
return
StatusCode
::
FAILURE
;
}
}
auto
recvEnd
=
std
::
chrono
::
system_clock
::
now
();
msg
(
MSG
::
DEBUG
)
<<
" Receive Algorithm="
<<
buff
->
getAlgorithm
()
<<
" token="
<<
buff
->
getToken
()
<<
" module="
<<
buff
->
getModule
()
<<
" payloadSize="
<<
buff
->
getPayloadSize
()
<<
" TransferSize="
<<
buff
->
getTransferSize
()
<<
" userBuffer="
<<
buff
->
getBuffer
()
<<
" transferred bytes="
<<
recvdSize
<<
endmsg
;
auto
it
=
m_stats
.
find
(
buff
->
getToken
());
if
(
m_dumpReplies
){
const
EventInfo
*
eventInfo
;
StatusCode
scEv
=
m_evtStore
->
retrieve
(
eventInfo
);
uint64_t
eventNumber
=
0
;
if
(
scEv
.
isFailure
())
{
ATH_MSG_WARNING
(
"Could not retrieve event info"
);
}
else
{
eventNumber
=
eventInfo
->
event_ID
()
->
event_number
();
}
ATH_MSG_DEBUG
(
" Event "
<<
eventNumber
);
char
fnamBuff
[
1024
];
snprintf
(
fnamBuff
,
1024
,
"Resp-%08ld-%d-%d.dat"
,
eventNumber
,
buff
->
getModule
(),
buff
->
getAlgorithm
()
);
int
fd
=
open
(
fnamBuff
,
O_CREAT
|
O_TRUNC
|
O_WRONLY
,(
S_IRWXU
^
S_IXUSR
)
|
(
S_IRWXG
^
S_IXGRP
)
|
(
S_IRWXO
^
S_IXOTH
));
write
(
fd
,
APE
::
BufferAccessor
::
getRawBuffer
(
*
buff
),
buff
->
getTransferSize
());
close
(
fd
);
}
if
(
it
==
m_stats
.
end
()){
msg
(
MSG
::
ERROR
)
<<
" APE returned token "
<<
buff
->
getToken
()
<<
" However this token was not sent by this process!"
<<
endmsg
;
return
StatusCode
::
FAILURE
;
}
else
{
const
TransferStats
&
ts
=
it
->
second
;
auto
diff
=
recvEnd
-
it
->
second
.
sendStart
;
msg
(
MSG
::
DEBUG
)
<<
"Send request took "
<<
std
::
chrono
::
duration_cast
<
std
::
chrono
::
microseconds
>
(
ts
.
sendEnd
-
ts
.
sendStart
).
count
()
<<
"(usec). "
<<
"Time between send and receive requests of token "
<<
buff
->
getToken
()
<<
" is "
<<
std
::
chrono
::
duration_cast
<
std
::
chrono
::
milliseconds
>
(
diff
).
count
()
<<
"(ms). "
<<
"Total transfer size= "
<<
ts
.
uploadSize
<<
" + "
<<
recvdSize
<<
"= "
<<
ts
.
uploadSize
+
recvdSize
<<
" bytes. pid="
<<
getpid
()
<<
endmsg
;
m_tokens
.
push
(
buff
->
getToken
());
}
if
(
token
!=
buff
->
getToken
()){
ATH_MSG_ERROR
(
"Received token "
<<
buff
->
getToken
()
<<
" was expecting "
<<
token
);
return
StatusCode
::
FAILURE
;
}
if
(
recvdSize
<
static_cast
<
ssize_t
>
(
sizeof
(
APE
::
APEHeaders
))){
ATH_MSG_WARNING
(
"Receiving Failed"
);
return
StatusCode
::
FAILURE
;
}
else
if
(
recvdSize
==
0
){
ATH_MSG_WARNING
(
"Received size=0"
);
return
StatusCode
::
FAILURE
;
}
return
StatusCode
::
SUCCESS
;
}
// Athena Algorithm's Hooks
////////////////////////////
StatusCode
OffloadSvc
::
initialize
()
{
ATH_MSG_INFO
(
"Initializing "
<<
name
()
<<
"..."
);
IIncidentSvc
*
p_incSvc
;
auto
sc
=
service
(
"IncidentSvc"
,
p_incSvc
);
if
(
sc
!=
StatusCode
::
SUCCESS
){
return
sc
;
}
p_incSvc
->
addListener
(
this
,
"PostFork"
,
100000l
);
//do it first
p_incSvc
->
addListener
(
this
,
"BeforeFork"
,
-
10000l
);
// do it last
return
sc
;
}
StatusCode
OffloadSvc
::
finalize
()
{
ATH_MSG_INFO
(
"Finalizing "
<<
name
()
<<
"..."
);
bool
rv
=
true
;
if
(
m_isConnected
){
rv
=
closeCommChannel
();
}
return
(
rv
?
StatusCode
::
SUCCESS
:
StatusCode
::
FAILURE
);
}
// Query the interfaces.
// Input: riid, Requested interface ID
// ppvInterface, Pointer to requested interface
// Return: StatusCode indicating SUCCESS or FAILURE.
// N.B. Don't forget to release the interface after use!!!
StatusCode
OffloadSvc
::
queryInterface
(
const
InterfaceID
&
riid
,
void
**
ppvInterface
)
{
if
(
IOffloadSvc
::
interfaceID
().
versionMatch
(
riid
)
)
{
*
ppvInterface
=
dynamic_cast
<
IOffloadSvc
*>
(
this
);
}
else
{
// Interface is not directly available : try out a base class
return
AthService
::
queryInterface
(
riid
,
ppvInterface
);
}
addRef
();
return
StatusCode
::
SUCCESS
;
}
void
OffloadSvc
::
handle
(
const
Incident
&
inc
){
ATH_MSG_INFO
(
"Got incident "
<<
inc
.
type
()
<<
" from "
<<
inc
.
source
()
<<
" pid="
<<
getpid
());
if
(
inc
.
type
()
==
"BeginRun"
){
}
else
if
(
inc
.
type
()
==
"BeforeFork"
){
if
(
m_isConnected
){
closeCommChannel
(
true
);
}
std
::
this_thread
::
sleep_for
(
std
::
chrono
::
seconds
(
1
));
}
else
if
(
inc
.
type
()