diff --git a/HLT/Services/JiveXMLServer/CMakeLists.txt b/HLT/Services/JiveXMLServer/CMakeLists.txt new file mode 100644 index 0000000000000000000000000000000000000000..032893be05ab323cd683e35d11e1acde626d346f --- /dev/null +++ b/HLT/Services/JiveXMLServer/CMakeLists.txt @@ -0,0 +1,31 @@ +################################################################################ +# Package: JiveXMLServer +################################################################################ + +# Declare the package name: +atlas_subdir( JiveXMLServer ) + +# Declare the package's dependencies: +atlas_depends_on_subdirs( PUBLIC + GaudiKernel + graphics/JiveXML ) + +# External dependencies: +find_package( tdaq ) +find_package( tdaq-common ) + +# this line failed automatic conversion in cmt2cmake : +# macro_remove_regexp AtlasCxxPolicy_pp_cppflags "'-DPACKAGE_VERSION=.*'" + +# Component(s) in the package: +atlas_add_library( JiveXMLServer + src/*.cxx + PUBLIC_HEADERS JiveXMLServer + INCLUDE_DIRS ${TDAQ-COMMON_INCLUDE_DIRS} ${TDAQ_INCLUDE_DIRS} + LINK_LIBRARIES ${TDAQ-COMMON_LIBRARIES} ${TDAQ_LIBRARIES} GaudiKernel JiveXMLLib ers ipc cmdline ) + +atlas_add_executable( jivexmlserver + bin/jivexmlserver.cc + INCLUDE_DIRS ${TDAQ-COMMON_INCLUDE_DIRS} ${TDAQ_INCLUDE_DIRS} + LINK_LIBRARIES ${TDAQ-COMMON_LIBRARIES} ${TDAQ_LIBRARIES} GaudiKernel JiveXMLLib JiveXMLServer ers ipc cmdline ) + diff --git a/HLT/Services/JiveXMLServer/JiveXMLServer/JiveXMLServer.h b/HLT/Services/JiveXMLServer/JiveXMLServer/JiveXMLServer.h new file mode 100644 index 0000000000000000000000000000000000000000..632835c263b1eb6813af8724a7f53a54e425f114 --- /dev/null +++ b/HLT/Services/JiveXMLServer/JiveXMLServer/JiveXMLServer.h @@ -0,0 +1,120 @@ +/* + Copyright (C) 2002-2017 CERN for the benefit of the ATLAS collaboration +*/ + +#ifndef JIVEXML_JIVEXMLSERVER_H +#define JIVEXML_JIVEXMLSERVER_H + +/** + * @class JiveXMLServer + * @brief Controls and handles the JiveXML server threads + * This class creates, handles and controls the JiveXML server threads both for + * incoming and outgoing data. It also provides an interface for the threads to + * the TDAQ services, such as error logging. + * + * @author Sebastian Boeser <sboeser --at-- hep.ucl.ac.uk> + **/ + +#include <vector> +#include <string> + +//athena includes +#include <GaudiKernel/IMessageSvc.h> +#include <GaudiKernel/StatusCode.h> + +//JiveXML includes +#include <JiveXML/IServer.h> +#include <JiveXML/EventStream.h> + +//tdaq includes +#include <owl/semaphore.h> + +namespace JiveXML { + + //Forward declarations + class ThreadCollection; + + class JiveXMLServer: virtual public IServer { + + public: + //Constructor with port number + JiveXMLServer( int port = 0 ); + + //Destructor + ~JiveXMLServer(); + + /**@name IEventReceiver methods */ + //@{ + /** Put this event as new current event for stream given by name */ + StatusCode UpdateEventForStream( const EventStreamID evtStreamID, const std::string & event); + //@} + + /** @name IEventServer methods */ + //@{ + /** get the names of all the streams */ + std::vector<std::string> GetStreamNames() const ; + /** get the current EventStreamID for a particular stream */ + const EventStreamID GetEventStreamID(std::string streamName) const ; + /** get the current event for a particular stream */ + const std::string GetEvent( const EventStreamID evtStreamID ) const ; + /** get the Status of the application */ + int GetState() const ; + //@} + + //@{ + /** This function is exposed to allow using ERS messaging service from + * other threads */ + void Message( const MSG::Level level, const std::string msg ) const ; + /** Get the logging level **/ + MSG::Level LogLevel() const ; + //@} + + /**@name Event serving thread control */ + //@{ + /** Start the serving thread */ + StatusCode StartServingThread(); + /** Stop the serving thread */ + StatusCode StopServingThread(); + /** The server thread will stop once this flag is set to false */ + bool GetRunServerFlag () const { return m_runServerThread; }; + /** Callback whenever the server thread is stopped **/ + void ServerThreadStopped(); + /** Wait for the server finish */ + void Wait(); + //@} + + + private: + + //Signal handler must be static function + static void signalHandler (int signum); + + //Static semaphore used to suspend main thread while server thread is + //running. The semaphore will reach its post-condition either by + // a) receiving a signal through the signal handler + // b) the ServerThreadStopped callback being called + static OWLSemaphore lock; + + //Store the received signal in a static member + static int m_receivedSignal; + + //Port number property - defaults to zero in which case + //it is dynamically assigned + int portNumber; + + //A map of the streams and their current events + EventStreamMap m_eventStreamMap; + + //A mutex (mutual exclusive) lock for the data map + mutable pthread_mutex_t m_accessLock; + + //A handle to the server thread + pthread_t m_ServerThreadHandle; + + //Once this flag is set to false, the thread will stop after handling its + //last request + bool m_runServerThread; + }; +} + +#endif diff --git a/HLT/Services/JiveXMLServer/bin/jivexmlserver.cc b/HLT/Services/JiveXMLServer/bin/jivexmlserver.cc new file mode 100644 index 0000000000000000000000000000000000000000..46c14e2ccbc9f921c0424d79223af861ce492689 --- /dev/null +++ b/HLT/Services/JiveXMLServer/bin/jivexmlserver.cc @@ -0,0 +1,62 @@ +/* + Copyright (C) 2002-2017 CERN for the benefit of the ATLAS collaboration +*/ + +/** @file + * @author Sebastian Boeser <sboeser --at-- hep.ucl.ac.uk> + * Standalone JiveXML server + * This application provides a standalone ONCRPC (SunRPC) server that receives + * events (in the JiveXML format) from an (athena) application and servers + * these same events to the AtlantisJava client. + * The actual serving thread is part of the JiveXML package, which also provides a + * serving thread controlled by Athena (JiveXML/ONCRCPServerSvc). + * This package is intended to be used only in the Atlas Online environment. + */ + +#include <JiveXMLServer/JiveXMLServer.h> + +//tdaq includes +#include <cmdl/cmdargs.h> +#include <ipc/core.h> +#include <ers/ers.h> + + +/** + * Main routine + */ +int main(int argc, char ** argv) +{ + + // Declare arguments + CmdArgStr partition_name('p', "partition", "partition-name", "The name of the partition in which to run the server"); + + // Initialise IPC + try { + IPCCore::init(argc,argv); + } catch (daq::ipc::Exception & ex) { + ers::fatal( ex ); + } + + // Declare command object and its argument-iterator + CmdLine cmd(*argv, &partition_name, NULL); + cmd.description("Standalone JiveXML server using ONCRPC (SunRPC)"); + CmdArgvIter arg_iter(--argc, ++argv); + + // Parse arguments + cmd.parse(arg_iter); + + //Say hello + ERS_INFO("Starting JiveXML server in partition: "<< partition_name ); + + //Create the object and start the server + JiveXML::JiveXMLServer server; + + //Now wait for the server to finish + server.Wait(); + + //Leaving the scope will call ~JiveXMLServer and destroy the server + //so no explicit shutdown needed + + return 0; +} + diff --git a/HLT/Services/JiveXMLServer/cmt/requirements b/HLT/Services/JiveXMLServer/cmt/requirements new file mode 100644 index 0000000000000000000000000000000000000000..4d5c5964fe50cf436aea6dbe951af78709c92bb2 --- /dev/null +++ b/HLT/Services/JiveXMLServer/cmt/requirements @@ -0,0 +1,25 @@ +package JiveXMLServer + +author Sebastian Boeser <sboeser@hep.ucl.ac.uk> + + +use AtlasPolicy AtlasPolicy-* +use HLTtdaq HLTtdaq-* HLT/HLTExternal +use GaudiInterface GaudiInterface-* External +use JiveXML JiveXML-* graphics + +private + +# Remove to avoid compiler warning +macro_remove_regexp AtlasCxxPolicy_pp_cppflags "'-DPACKAGE_VERSION=.*'" + +library JiveXMLServer -s=../src *.cxx +apply_pattern installed_library + +macro_append JiveXMLServer_linkopts " -lers -lipc -lcmdline " + +macro jivexmlserverlinkopts " -lJiveXMLServer " +macro jivexmlserver_dependencies " JiveXMLServer " +application jivexmlserver -s=../bin jivexmlserver.cc + + diff --git a/HLT/Services/JiveXMLServer/src/JiveXMLServer.cxx b/HLT/Services/JiveXMLServer/src/JiveXMLServer.cxx new file mode 100644 index 0000000000000000000000000000000000000000..0eed59a64eccf4f8277eb9e28d47c6d83cb44663 --- /dev/null +++ b/HLT/Services/JiveXMLServer/src/JiveXMLServer.cxx @@ -0,0 +1,385 @@ +/* + Copyright (C) 2002-2017 CERN for the benefit of the ATLAS collaboration +*/ + +#include <JiveXMLServer/JiveXMLServer.h> + +//tdaq includes +#include <ers/ers.h> + +//JiveXML includes +#include <JiveXML/ONCRPCThreadCollection.h> +#include <JiveXML/ONCRPCServerThreads.h> +#include <JiveXML/ONCRPCXDRProcs.h> +#include <JiveXML/ONCRPCServer.h> + +#include <signal.h> + +//Define warning and error +#define ERS_WARNING( message ) \ +{ \ + ERS_REPORT_IMPL( ers::warning, ers::Message, message, ); \ +} + +#define ERS_ERROR( message ) \ +{ \ + ERS_REPORT_IMPL( ers::error, ers::Message, message, ); \ +} + +namespace JiveXML { + + //Initialize static members + int JiveXMLServer::m_receivedSignal = 0; + OWLSemaphore JiveXMLServer::lock; + + /** + * Constructor + */ + JiveXMLServer::JiveXMLServer( int port ): + portNumber(port){ + + //Make sure ServerThread does not start unexpectedly + m_runServerThread = false ; + + //And then start it + StartServingThread(); + + //Also register the signal handlers + signal( SIGINT , JiveXMLServer::signalHandler ); + signal( SIGTERM, JiveXMLServer::signalHandler ); + } + + /** + * Destructor + */ + JiveXMLServer::~JiveXMLServer(){ + + //Just stop the serving thread + StopServingThread(); + } + + /** + * Create the server by + * - passing a this-pointer as an argument + * - setting the thread-running flag + * - starting the server thread + */ + StatusCode JiveXMLServer::StartServingThread(){ + + ERS_DEBUG(MSG::VERBOSE,"StartServingThread()"); + + //Initialize access lock mechanism to ensure that the data map is never + //accessed by more than one thread at the same time. NULL means default + int retVal = pthread_mutex_init(&m_accessLock,NULL); + if (retVal != 0){ + ERS_WARNING("Unable to initialize access lock while starting server: " << strerror(retVal)); + return StatusCode::FAILURE; + } + + //The arguments passed on to the server - create new object on the heap that + //is persistent through the lifetime of the thread + ServerThreadArguments* args = new ServerThreadArguments(this,portNumber); + + //set runServer flag to true, so the thread will start + m_runServerThread = true ; + + //create thread itself + if ( ( pthread_create(&m_ServerThreadHandle, NULL , &JiveXML::ONCRPCServerThread, (void *) args )) != 0){ + + //Create thread failed + ERS_WARNING("Thread creation failed"); + return StatusCode::FAILURE; + } + + return StatusCode::SUCCESS; + + } + + /** + * Stop the server by + * - unsetting the thread-running flag + * - waiting till the server thread has finished + * - destroying the thread handle + */ + StatusCode JiveXMLServer::StopServingThread(){ + + ERS_DEBUG(MSG::VERBOSE,"StopServingThread()"); + + /** + * Ping the server which will cause another request + * Otherwise the server won't update its loop condition + */ + + //Create a client - this will already cause an update of the file + //descriptors on the sockets + CLIENT* client = clnt_create("localhost", ONCRPCSERVERPROG,ONCRPCSERVERVERS, "tcp"); + if (!client){ + ERS_ERROR("Unable to create shutdown client for local server"); + return StatusCode::FAILURE; + } + + // Next unset the runServerThread flag, which will cause the loop to stop + // This needs to happen after client creation, otherwise the next call won't + // be serverd anymore + m_runServerThread = false ; + + // Now issue the call with a timeout + struct timeval timeout; timeout.tv_sec = 1; timeout.tv_usec = 0; + clnt_call(client, NULLPROC, (xdrproc_t)xdr_void, NULL, (xdrproc_t)xdr_void, NULL, timeout); + + // A pointer to the return value of the thread + void* ret = NULL; + // wait till the server thread has finished + ERS_INFO("Waiting for server thread to terminate ..."); + pthread_join(m_ServerThreadHandle, &ret); + ERS_INFO(" ... finished server thread"); + + //check if there was a return value + if (ret){ + //Get the return value + unsigned long NRequests = *(unsigned long*)ret; + ERS_DEBUG(MSG::DEBUG,"Server thread stopped after handling " << NRequests << " requests"); + } else + ERS_WARNING("Server thread stopped unexpectedly"); + + //Destroy the access lock + int retVal = pthread_mutex_destroy(&m_accessLock); + if (retVal != 0){ + ERS_WARNING("Unable to destroy access lock after stopping server: " << strerror(retVal)); + return StatusCode::FAILURE; + } + + return StatusCode::SUCCESS; + } + + /** + * When the signal handler is called, switch the lock to the post condition + */ + void JiveXMLServer::signalHandler(int signal){ + //Store signal + m_receivedSignal=signal; + //finish semaphore lock + lock.post(); + } + + /** + * Wait for the server thread to finish. This can be because + * a) we received a SIGTERM or SIGINT signal + * b) the server thread stopped by itself + **/ + void JiveXMLServer::Wait(){ + //just wait for the lock + lock.wait(); + //Tell why the lock was released + ERS_INFO("Reached post-condition after received signal " << m_receivedSignal ); + } + + /** + * When the server thread stopped, we will also call the + * the signal handler with the special value -1 + **/ + void JiveXMLServer::ServerThreadStopped(){ + //call the signal handler, so we will also reach post condition + signalHandler(-1); + } + + /** + * Get one event and put it as the new event for the this stream, which is + * identified by EventStreamID + */ + StatusCode JiveXMLServer::UpdateEventForStream( const EventStreamID evtStreamID, const std::string & event) { + + ERS_DEBUG(MSG::VERBOSE,"UpdateEventForStream"); + + //Check that the event stream id is valid + if (!evtStreamID.isValid()){ + ERS_ERROR("Invalid event stream identifier - cannot add event"); + return StatusCode::FAILURE; + } + + //Make sure we don't have already exceeded the maximum number of streams + if (m_eventStreamMap.size() > NSTREAMMAX ){ + ERS_ERROR("Reached max. allowed number of streams " << NSTREAMMAX << " - cannot add event"); + return StatusCode::FAILURE; + } + + //Make sure the event is not larger than the allowed maximal size + if (event.length() > NBYTESMAX ){ + ERS_ERROR("Event is larger than allowed max. of " << NBYTESMAX << " bytes - cannot add event"); + return StatusCode::FAILURE; + } + + //Make sure we are the only one accessing the data right now, by trying to + //obtain a lock. If the lock can not be obtained after a certain time, an + //error is reported + + //Timeout of 30 second and 0 nanoseconds + struct timespec timeout = { 30, 0 }; + //Try to obtain the lock + int retVal = pthread_mutex_timedlock(&m_accessLock, &timeout); + if ( retVal != 0 ){ + ERS_ERROR("Unable to obtain access lock to update event: " << strerror(retVal)); + return StatusCode::FAILURE; + } + + //Using try/catch to ensure the mutex gets unlocked in any case + try { + + //Using std::map::operator[] and std::map::insert() will create a new event + //if it did not exist, otherwise just replace the existing entry (making a + //copy of the std::string) but would not update the key which holds new + //event/run number. Therefore delete existing entry first. + + //Delete old entry if there is one + EventStreamMap::iterator OldEvtItr = m_eventStreamMap.find(evtStreamID); + if (OldEvtItr != m_eventStreamMap.end()) + m_eventStreamMap.erase(OldEvtItr); + + //Now add the new event + m_eventStreamMap.insert(EventStreamPair(evtStreamID,event)); + + } catch ( std::exception e ) { + ERS_ERROR("Exception caught while updating event for stream " + << evtStreamID.StreamName() << ": " << e.what()); + //Also release the lock in this case + pthread_mutex_unlock(&m_accessLock); + //before we return + return StatusCode::FAILURE; + } + + //Finally release the lock again + retVal = pthread_mutex_unlock(&m_accessLock); + if ( retVal != 0 ){ + ERS_ERROR("Unable to release access lock after updating event: " << strerror(retVal)); + return StatusCode::FAILURE; + } + + ERS_DEBUG(MSG::DEBUG, "Updated stream " << evtStreamID.StreamName() + << " with event Nr. " << evtStreamID.EventNumber() + << " from run Nr. " << evtStreamID.RunNumber()); + + return StatusCode::SUCCESS; + } + + + /** + * Return the current athena state + */ + int JiveXMLServer::GetState() const { + /** + * at the moement return a fixed value - will change this to some timeout + * condition soon. + **/ + return 3; + } + + /** + * Return an array with all the stream names + */ + std::vector<std::string> JiveXMLServer::GetStreamNames() const { + + //Create a vector that can be returned + std::vector<std::string> StreamNames; + + //Obtain an exclusive access lock + int retVal = pthread_mutex_lock(&m_accessLock); + if ( retVal != 0 ){ + ERS_ERROR("Unable to obtain access lock to get stream names: " << strerror(retVal)); + return StreamNames; + } + + // Iterate over map to get entries + EventStreamMap::const_iterator MapItr = m_eventStreamMap.begin(); + for ( ; MapItr != m_eventStreamMap.end(); ++MapItr){ + + //Get the EventStreamID object + EventStreamID EvtStrID = (*MapItr).first; + + //Add the name of this EventStreamID to the list of stream names + StreamNames.push_back(EvtStrID.StreamName()); + } + + //Release the lock + retVal = pthread_mutex_unlock(&m_accessLock); + if ( retVal != 0 ) + ERS_ERROR("Unable to release access lock after getting stream names: " << strerror(retVal)); + + //Return the list of names + return StreamNames; + } + + /** + * Return the EventStreamID for the last event of a given stream + */ + const EventStreamID JiveXMLServer::GetEventStreamID( const std::string StreamName) const { + + //Obtain an exclusive access lock + int retVal = pthread_mutex_lock(&m_accessLock); + if ( retVal != 0 ){ + ERS_ERROR("Unable to obtain access lock to get stream ID: " << strerror(retVal)); + return EventStreamID(""); + } + + // Search the entry in the map + EventStreamMap::const_iterator MapItr = m_eventStreamMap.find(EventStreamID(StreamName)); + + //Release the lock + retVal = pthread_mutex_unlock(&m_accessLock); + if ( retVal != 0 ) + ERS_ERROR("Unable to release access lock after getting stream ID: " << strerror(retVal)); + + //If the element was not found return an invalid ID + if ( MapItr == m_eventStreamMap.end()) return EventStreamID(""); + + //Return the found event stream identifier + return (*MapItr).first; + + } + + /** + * Return the event for a given stream + */ + const std::string JiveXMLServer::GetEvent( const EventStreamID evtStreamID ) const { + + //Obtain an exclusive access lock + int retVal = pthread_mutex_lock(&m_accessLock); + if ( retVal != 0 ){ + ERS_ERROR("Unable to obtain access lock to get event: " << strerror(retVal)); + return std::string(""); + } + + // Search the entry in the map + EventStreamMap::const_iterator MapItr = m_eventStreamMap.find(evtStreamID); + + //Release the lock + retVal = pthread_mutex_unlock(&m_accessLock); + if ( retVal != 0 ) + ERS_ERROR("Unable to release access lock after getting stream event: " << strerror(retVal)); + + //If the element was not found return an empty string + if ( MapItr == m_eventStreamMap.end()) return std::string(""); + + //Return a copy of the found event string + return std::string((*MapItr).second); + } + + /** + * Deliver a message - possibly from another thread - to ERS; + */ + void JiveXMLServer::Message( const MSG::Level level, const std::string msg) const { + //Deliver message to the proper stream + if (level <= MSG::DEBUG) ERS_REPORT_IMPL( ers::debug, ers::Message, msg, level); + if (level == MSG::INFO) ERS_REPORT_IMPL( ers::info, ers::Message, msg, ); + if (level == MSG::WARNING) ERS_REPORT_IMPL( ers::warning, ers::Message, msg, ); + if (level == MSG::ERROR) ERS_REPORT_IMPL( ers::error, ers::Message, msg, ); + if (level >= MSG::FATAL) ERS_REPORT_IMPL( ers::fatal, ers::Message, msg, ); + } + + /** + * Currently only used to suppress client hostname lookup if not in debug mode + */ + MSG::Level JiveXMLServer::LogLevel() const { + //set to fixed value for now + return MSG::DEBUG; + } +}