Commit 661cf9f6 authored by Dainius Simelevicius's avatar Dainius Simelevicius
Browse files

references #218: extended dipbridge with metacontrol

parent 6ed9f78b
......@@ -58,8 +58,7 @@ namespace dipbridge
public eventing::api::Member,
public xdata::ActionListener,
public toolbox::ActionListener,
public DipSubscriptionListener,
public toolbox::task::TimerListener
public DipSubscriptionListener
{
public:
......@@ -76,8 +75,6 @@ namespace dipbridge
virtual void actionPerformed(toolbox::Event& e);
// b2in callback
void onMessage(toolbox::mem::Reference * ref, xdata::Properties & plist);
// timer callback
void timeExpired(toolbox::task::TimerEvent& e);
// dip message handler
void handleMessage(DipSubscription* dipsub, DipData& message);
// DipSubscriptionListener methods
......@@ -90,11 +87,11 @@ namespace dipbridge
bool publishingToDip( toolbox::task::WorkLoop* wl );
private:
void publishDipMessageToEventing( const std::string& dipname, xdata::Table& dipmessage);
void publishDipMessageToEventing(const std::string& dipname, xdata::Table& dipmessage);
void init(std::string name);
protected:
std::string m_processuuid;
std::string m_startsubtimer_name;
toolbox::BSem m_applock;
bool m_busready;
DipFactory* pm_dip;
......
......@@ -22,11 +22,12 @@
#include "xgi/exception/Exception.h"
#include "xdata/String.h"
#include "xdata/Table.h"
#include "toolbox/ActionListener.h"
#include <list>
namespace dipbridge
{
class subscriber : public xdaq::Application, public xgi::framework::UIManager, public eventing::api::Member,public xdata::ActionListener
class subscriber : public xdaq::Application, public xgi::framework::UIManager, public eventing::api::Member,public xdata::ActionListener, public toolbox::ActionListener
{
public:
XDAQ_INSTANTIATOR();
......@@ -36,10 +37,14 @@ namespace dipbridge
~subscriber();
// xgi(web) callback
void Default(xgi::Input * in, xgi::Output * out);
void metaregister(xgi::Input * in, xgi::Output * out);
void metaunregister(xgi::Input * in, xgi::Output * out);
// infospace callback
virtual void actionPerformed(xdata::Event& e);
// b2in message callback
void onMessage(toolbox::mem::Reference * ref, xdata::Properties & plist);
void actionPerformed (toolbox::Event& e);
private:
xdata::String m_bus;
xdata::String m_topics;
......
......@@ -2,9 +2,9 @@
/*************************************************************************
* XDAQ Components for Distributed Data Acquisition *
* Copyright (C) 2000-2020, CERN. *
* Copyright (C) 2000-2021, CERN. *
* All rights reserved. *
* Authors: Zhen Xie, Luciano Orsini, Dainius Simelevicius *
* Authors: Zhen Xie, Luciano Orsini, Dainius Simelevicius *
* For the licensing terms see LICENSE. *
* For the list of contributors see CREDITS. *
*************************************************************************/
......@@ -17,7 +17,7 @@
// !!! Edit this line to reflect the latest package version !!!
#define WORKSUITE_DIPBRIDGE_VERSION_MAJOR 3
#define WORKSUITE_DIPBRIDGE_VERSION_MINOR 0
#define WORKSUITE_DIPBRIDGE_VERSION_MINOR 1
#define WORKSUITE_DIPBRIDGE_VERSION_PATCH 0
// If any previous versions available E.g. #define ESOURCE_PREVIOUS_VERSIONS "3.8.0,3.8.1"
#define DIPBRIDGE_PREVIOUS_VERSIONS
......
......@@ -2,13 +2,16 @@
/*************************************************************************
* XDAQ Components for Distributed Data Acquisition *
* Copyright (C) 2000-2020, CERN. *
* Copyright (C) 2000-2021, CERN. *
* All rights reserved. *
* Authors: Zhen Xie, Luciano Orsini, Dainius Simelevicius *
* Authors: Zhen Xie, Luciano Orsini and Dainius Simelevicius *
* For the licensing terms see LICENSE. *
* For the list of contributors see CREDITS. *
*************************************************************************/
#include <iostream>
#include <chrono>
#include <thread>
#include "cgicc/CgiDefs.h"
#include "cgicc/Cgicc.h"
#include "cgicc/HTTPHTMLHeader.h"
......@@ -25,9 +28,7 @@
#include "toolbox/mem/AutoReference.h"
#include "b2in/nub/Method.h"
#include "toolbox/net/UUID.h"
#include "toolbox/task/TimerFactory.h"
#include "toolbox/TimeInterval.h"
#include "toolbox/TimeVal.h"
#include "toolbox/task/Guard.h"
#include "xdata/exdr/Serializer.h"
#include "xdata/exdr/AutoSizeOutputStreamBuffer.h"
#include "xdata/exdr/FixedSizeInputStreamBuffer.h"
......@@ -47,10 +48,9 @@ dipbridge::Application::Application(xdaq::ApplicationStub* s) : xdaq::Applicatio
b2in::nub::bind(this, &dipbridge::Application::onMessage);
toolbox::net::UUID uuid;
m_processuuid = uuid.toString();
m_startsubtimer_name = "dipbridgeApplication_timer_"+m_processuuid;
toolbox::net::URN memurn("toolbox-mem-pool","dipbridgeApplication_mem"+m_processuuid);
toolbox::mem::HeapAllocator* allocator = new toolbox::mem::HeapAllocator();
pm_memPool = toolbox::mem::getMemoryPoolFactory()->createPool(memurn,allocator);
pm_memPool = toolbox::mem::getMemoryPoolFactory()->createPool(memurn,allocator);
m_bus.fromString("dip");
m_dipDataTopic.fromString("dipdata");
m_throttle_threshold_millisec = 900;
......@@ -117,16 +117,12 @@ void dipbridge::Application::actionPerformed(xdata::Event& e)
pm_servererrorhandler=new dipbridge::ServerErrorHandler(getApplicationLogger());
Dip::addDipDimErrorHandler(pm_servererrorhandler);
pm_puberrorhandler = new PubErrorHandler(getApplicationLogger());
toolbox::TimeVal now = toolbox::TimeVal::gettimeofday();
toolbox::TimeInterval vsec(5,0);//5 sec
toolbox::TimeVal startT = now + vsec;
toolbox::task::Timer* t = toolbox::task::TimerFactory::getInstance()->createTimer( m_startsubtimer_name );
t->schedule( this, startT, (void*)0, "start_dipsubscription" );
try
{
this->getEventingBus(m_bus.value_).addActionListener(this);
this->getEventingBus(m_bus.value_).subscribe(m_dipDataTopic.value_);
this->getEventingBus(m_bus.value_).subscribe("urn:dip:metacontrol");
}
catch(eventing::api::exception::Exception& e)
{
......@@ -134,7 +130,43 @@ void dipbridge::Application::actionPerformed(xdata::Event& e)
LOG4CPLUS_ERROR(getApplicationLogger(),msg+stdformat_exception_history(e));
XCEPT_RETHROW(exception::Exception,msg,e);
}
std::thread thi(&dipbridge::Application::init, this, "dip subscriptions task");
thi.detach();
}
}
void dipbridge::Application::init(std::string name)
{
int timeout = 60;
while ( ! this->getEventingBus(m_bus.value_).canPublish() )
{
std::this_thread::sleep_for(std::chrono::seconds(1));
timeout--;
if ( timeout == 0 )
{
XCEPT_DECLARE(dipbridge::exception::Exception, err, "Timeout reached while waiting for eventing bus");
this->notifyQualified("warning", err);
timeout = 60;
}
}
{
toolbox::task::Guard < toolbox::BSem > guard(m_applock);
//start subscribe to dip
size_t nsubs = m_dipSubTopics.elements();
for(size_t i = 0; i < nsubs; ++i)
{
xdata::String topicname = m_dipSubTopics.elementAt(i)->toString();
DipSubscription* s = pm_dip->createDipSubscription(topicname.value_.c_str(), this);
m_dipsubs.insert(std::make_pair(topicname.value_, s));
m_millisecsincelast.insert(std::make_pair(topicname.value_, 0));
}
}
//start publishing workloop
pm_pubtodip_wl->activate();
pm_pubtodip_wl->submit(pm_pubtodip_as);
}
void dipbridge::Application::actionPerformed(toolbox::Event& e)
......@@ -144,8 +176,8 @@ void dipbridge::Application::actionPerformed(toolbox::Event& e)
if ( e.type() == "eventing::api::BusReadyToPublish" )
{
std::stringstream msg;
msg<< "Eventing bus '" << m_bus.value_ << "' is ready to publish";
LOG4CPLUS_DEBUG(getApplicationLogger(),msg.str());
msg << "Eventing bus '" << m_bus.value_ << "' is ready to publish";
LOG4CPLUS_DEBUG(getApplicationLogger(), msg.str());
m_busready = true;
}
}
......@@ -155,11 +187,11 @@ void dipbridge::Application::onMessage(toolbox::mem::Reference * ref, xdata::Pro
toolbox::mem::AutoReference refguard(ref); //guarantee ref is released when refguard is out of scope
std::string action = plist.getProperty("urn:b2in-eventing:action");
if (action == "notify" && ref!=0 )
if ( action == "notify" )
{
std::stringstream ss;
std::string topic = plist.getProperty("urn:b2in-eventing:topic");
if( topic==m_dipDataTopic.value_ )
if( (topic == m_dipDataTopic.value_) && (ref != 0) )
{
std::string dipname = plist.getProperty("urn:dipbridge:dipname");
if( !dipname.empty() )
......@@ -213,7 +245,40 @@ void dipbridge::Application::onMessage(toolbox::mem::Reference * ref, xdata::Pro
}
}//end if not dipname empty
}//end if dipDataTopic
}//end if action notify && ref!=0
else if ( topic == "urn:dip:metacontrol" )
{
std::string action = plist.getProperty("urn:dip:action");
std::string topicName = plist.getProperty("urn:dip:topicname");
if ( action == "register" )
{
toolbox::task::Guard < toolbox::BSem > guard(m_applock);
//If already exists then request for update
auto subscription = m_dipsubs.find(topicName);
if ( subscription != m_dipsubs.end() )
{
m_millisecsincelast[topicName] = 0;
subscription->second->requestUpdate();
}
else //If doesn't exist then subscribe
{
DipSubscription* s = pm_dip->createDipSubscription(topicName.c_str(), this);
m_dipsubs.insert(std::make_pair(topicName, s));
m_millisecsincelast.insert(std::make_pair(topicName, 0));
}
}
else if ( action == "unregister" )
{
toolbox::task::Guard < toolbox::BSem > guard(m_applock);
auto subscription = m_dipsubs.find(topicName);
if ( subscription != m_dipsubs.end() )
{
pm_dip->destroyDipSubscription(subscription->second);
m_dipsubs.erase(subscription);
m_millisecsincelast.erase(topicName);
}
}
}
}//end if action notify
}//end scope of refguard
void dipbridge::Application::handleMessage(DipSubscription* dipsub, DipData& message)
......@@ -229,7 +294,7 @@ void dipbridge::Application::handleMessage(DipSubscription* dipsub, DipData& mes
}
DipQuality dipq = message.extractDataQuality();
if( dipq!=1 )
if( dipq != 1 )
{
LOG4CPLUS_INFO(getApplicationLogger(),subname+" : No good quality data, skip");
return;
......@@ -237,20 +302,24 @@ void dipbridge::Application::handleMessage(DipSubscription* dipsub, DipData& mes
const DipTimestamp dipt = message.extractDipTime();
long long diptmillisec = dipt.getAsNanos()/1000000;
toolbox::task::Guard < toolbox::BSem > guard(m_applock);
std::map< std::string,long long >::iterator mit = m_millisecsincelast.find(subname);
if(mit==m_millisecsincelast.end())
if( mit == m_millisecsincelast.end() )
{
return;
}
long long deltasincelast = diptmillisec-mit->second;
ss<<"DeltaT "<<subname<<" "<<deltasincelast<<" : "<<diptmillisec<<" vs "<<mit->second;
long long deltasincelast = diptmillisec - mit->second;
ss << "DeltaT " << subname << " " << deltasincelast << " : " << diptmillisec << " vs " << mit->second;
LOG4CPLUS_DEBUG(getApplicationLogger(),ss.str());
ss.str("");ss.clear();
ss.str("");
ss.clear();
if( mit->second!=0. && int(deltasincelast)<m_throttle_threshold_millisec.value_)
if( (mit->second != 0) && (deltasincelast < m_throttle_threshold_millisec.value_) )
{
ss<<"Throttle "<<subname;
LOG4CPLUS_DEBUG(getApplicationLogger(),ss.str());
ss << "Throttle " << subname;
LOG4CPLUS_DEBUG(getApplicationLogger(), ss.str());
return;
}
mit->second = diptmillisec;
......@@ -272,7 +341,7 @@ void dipbridge::Application::handleMessage(DipSubscription* dipsub, DipData& mes
if(m_busready)
{
publishDipMessageToEventing( subname, *dipmessage );
publishDipMessageToEventing(subname, *dipmessage);
}
}
......@@ -300,9 +369,10 @@ void dipbridge::Application::publishDipMessageToEventing( const std::string& dip
char* databuf = outBuffer.getBuffer();
size_t bufsize = outBuffer.tellp();
ss<<"serialized outBuffer size "<<bufsize;
ss << "serialized outBuffer size " << bufsize;
LOG4CPLUS_DEBUG(getApplicationLogger(),ss.str());
ss.str("");ss.clear();
ss.str("");
ss.clear();
//send xdata::Table dipmessage to eventing
toolbox::mem::Reference* bufRef=0;
......@@ -312,8 +382,8 @@ void dipbridge::Application::publishDipMessageToEventing( const std::string& dip
{
bufRef = toolbox::mem::getMemoryPoolFactory()->getFrame(pm_memPool,bufsize);
bufRef->setDataSize(bufsize);
memcpy( bufRef->getDataLocation(), databuf, bufsize );
this->getEventingBus(m_bus.value_).publish(dipname,bufRef,plist);
memcpy(bufRef->getDataLocation(), databuf, bufsize);
this->getEventingBus(m_bus.value_).publish(dipname, bufRef, plist);
}
catch(xcept::Exception& e)
{
......@@ -325,26 +395,6 @@ void dipbridge::Application::publishDipMessageToEventing( const std::string& dip
}
}
void dipbridge::Application::timeExpired( toolbox::task::TimerEvent& e )
{
if( e.getTimerTask()->name=="start_dipsubscription" )
{
//start subscribe to dip
size_t nsubs = m_dipSubTopics.elements();
for(size_t i=0; i<nsubs; ++i)
{
xdata::String topicname = m_dipSubTopics.elementAt(i)->toString();
DipSubscription* s=pm_dip->createDipSubscription(topicname.value_.c_str(),this);
m_dipsubs.insert( std::make_pair( topicname.value_,s) );
m_millisecsincelast.insert( std::make_pair( topicname.value_,0.) );
}
//start publishing workloop
pm_pubtodip_wl->activate();
pm_pubtodip_wl->submit(pm_pubtodip_as);
}
}
bool dipbridge::Application::publishingToDip( toolbox::task::WorkLoop* wl )
{
LOG4CPLUS_DEBUG(getApplicationLogger(), "Entering publishingToDip");
......
......@@ -40,6 +40,13 @@ xdata::Table::Reference dipbridge::dipDataExtract::getAll( const DipData& dipdat
std::stringstream ss;
int nfields;
xdata::Table* data = new xdata::Table;
//Adding timestamp
const DipTimestamp dipt = dipdata.extractDipTime();
data->addColumn("DipTimestamp", "int 64");
xdata::Integer64 fieldvalue(dipt.getAsNanos());
data->setValueAt(0, "DipTimestamp", fieldvalue);
const char** allfields = dipdata.getTags(nfields);
for(int i=0; i<nfields; ++i)
{
......
......@@ -32,7 +32,10 @@ XDAQ_INSTANTIATOR_IMPL (dipbridge::subscriber)
dipbridge::subscriber::subscriber(xdaq::ApplicationStub* s) : xdaq::Application(s),xgi::framework::UIManager(this),eventing::api::Member(this)
{
xgi::framework::deferredbind(this,this,&dipbridge::subscriber::Default,"Default");
xgi::framework::deferredbind(this,this,&dipbridge::subscriber::Default,"Default");
xgi::bind(this, &dipbridge::subscriber::metaregister, "register");
xgi::bind(this, &dipbridge::subscriber::metaunregister, "unregister");
b2in::nub::bind(this, &dipbridge::subscriber::onMessage);
m_bus.fromString("dip");
getApplicationInfoSpace()->fireItemAvailable("bus",&m_bus);
......@@ -124,6 +127,14 @@ void dipbridge::subscriber::actionPerformed(xdata::Event& e)
if( e.type() == "urn:xdaq-event:setDefaultValues" )
{
// User should listen for publsh readiness, but should also check immediately
this->getEventingBus(m_bus.value_).addActionListener(this);
if (this->getEventingBus(m_bus.value_).canPublish())
{
// ready now
std::cout << "Eventing bus is ready at setDefaultValues" << std::endl;
}
m_intopics = toolbox::parseTokenList(m_topics.value_,",");
try
{
......@@ -141,4 +152,38 @@ void dipbridge::subscriber::actionPerformed(xdata::Event& e)
}
}
void dipbridge::subscriber::actionPerformed (toolbox::Event& event)
{
if (event.type() == "eventing::api::BusReadyToPublish")
{
std::string busname = (static_cast<eventing::api::Bus*>(event.originator()))->getBusName();
std::cout << "event Bus '" << busname << "' is ready to publish" << std::endl;
}
}
void dipbridge::subscriber::metaregister (xgi::Input * in, xgi::Output * out)
{
// create fake message with fake property
xdata::Properties plist;
plist.setProperty("urn:dip:action", "register");
plist.setProperty("urn:dip:topicname", "dip/CMS/MCS/Current");
this->getEventingBus(m_bus.toString()).publish("urn:dip:metacontrol", 0, plist);
std::cout << "message sent" << std::endl;
out->getHTTPResponseHeader().addHeader("Content-Type", "text/plain");
}
void dipbridge::subscriber::metaunregister (xgi::Input * in, xgi::Output * out)
{
// create fake message with fake property
xdata::Properties plist;
plist.setProperty("urn:dip:action", "unregister");
plist.setProperty("urn:dip:topicname", "dip/CMS/MCS/Current");
this->getEventingBus(m_bus.toString()).publish("urn:dip:metacontrol", 0, plist);
std::cout << "message sent" << std::endl;
out->getHTTPResponseHeader().addHeader("Content-Type", "text/plain");
}
<?xml version="1.0" encoding="UTF-8"?>
<xc:Partition xmlns:soapenc="http://schemas.xmlsoap.org/soap/encoding/" xmlns:xc="http://xdaq.web.cern.ch/xdaq/xsd/2004/XMLConfiguration-30" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
<xc:Context url="http://kvm-s3562-1-ip151-69.cms:9868">
<xc:Context url="http://kvm-s3562-1-ip151-69.cms:1972">
<xc:Endpoint protocol="utcp" service="b2in" rmode="select" hostname="kvm-s3562-1-ip151-69.cms" port="9869" network="slimnet" sndTimeout="2000" rcvTimeout="2000" affinity="RCV:P,SND:W,DSR:W,DSS:W" singleThread="true" publish="true"/>
<xc:Application class="eventing::core::Subscriber" id="11" instance="0" network="slimnet" group="b2in" service="eventing-subscriber" bus="slimbus">
......@@ -14,15 +14,25 @@
<xc:Module>${XDAQ_ROOT}/lib/libb2inutils.so</xc:Module>
<xc:Module>${XDAQ_ROOT}/lib/libeventingapi.so</xc:Module>
<xc:Module>${XDAQ_ROOT}/lib/libeventingcore.so</xc:Module>
<xc:Application class="eventing::core::Publisher" id="63" instance="0" network="slimnet" bus="slimbus" service="eventing-publisher" logpolicy="inherit">
<properties xmlns="urn:xdaq-application:eventing::core::Publisher" xsi:type="soapenc:Struct">
<eventings xsi:type="soapenc:Array" soapenc:arrayType="xsd:ur-type[1]">
<item xsi:type="xsd:string" soapenc:position="[0]">utcp://kvm-s3562-1-ip151-131.cms:1977</item>
</eventings>
</properties>
</xc:Application>
<xc:Module>${XDAQ_ROOT}/lib/libeventingapi.so</xc:Module>
<xc:Module>${XDAQ_ROOT}/lib/libeventingcore.so</xc:Module>
<xc:Application class="dipbridge::subscriber" id="101" instance="0" network="local" logpolicy="inherit">
<properties xmlns="urn:xdaq-dipbridge::subscriber" xsi:type="soapenc:Struct">
<bus xsi:type="xsd:string">slimbus</bus>
<!-- topics xsi:type="xsd:string">dip/acc/LHC/Beam/Energy,dip/acc/LHC/Beam/IntensityPerBunch/Beam1,dip/acc/LHC/Beam/IntensityPerBunch/Beam2</topics -->
<topics xsi:type="xsd:string">dip/CMS/MCS/Current,dip/CMS/DCS/CMS_CSC/CMS_CSCM/state,dip/CMS/DCS/CMS_CSC/CMS_CSCP/state,dip/CMS/DCS/CMS_DT/CMS_DT_DT0/state,dip/CMS/DCS/CMS_DT/CMS_DT_DTM/state,dip/CMS/DCS/CMS_DT/CMS_DT_DTP/state,dip/CMS/DCS/CMS_ECAL/CMS_ECAL_BM/state,dip/CMS/DCS/CMS_ECAL/CMS_ECAL_BP/state,dip/CMS/DCS/CMS_ECAL/CMS_ECAL_EM/state,dip/CMS/DCS/CMS_ECAL/CMS_ECAL_EP/state,dip/CMS/DCS/CMS_ECAL/CMS_ECAL_ESM/state,dip/CMS/DCS/CMS_ECAL/CMS_ECAL_ESP/state,dip/CMS/DCS/CMS_HCAL/CMS_HCAL_HEHBa/state,dip/CMS/DCS/CMS_HCAL/CMS_HCAL_HEHBb/state,dip/CMS/DCS/CMS_HCAL/CMS_HCAL_HEHBc/state,dip/CMS/DCS/CMS_HCAL/CMS_HCAL_HF/state,dip/CMS/DCS/CMS_HCAL/CMS_HCAL_HO/state,dip/CMS/DCS/CMS_PIXEL/CMS_PIXEL_BPIX/state,dip/CMS/DCS/CMS_PIXEL/CMS_PIXEL_FPIX/state,dip/CMS/DCS/CMS_RPC/state,dip/CMS/DCS/CMS_TRACKER/CMS_TRACKER_TECM/state,dip/CMS/DCS/CMS_TRACKER/CMS_TRACKER_TECP/state,dip/CMS/DCS/CMS_TRACKER/CMS_TRACKER_TIB_TID/state,dip/CMS/DCS/CMS_TRACKER/CMS_TRACKER_TOB/state</topics>
</properties>
<topics xsi:type="xsd:string">dip/CMS/MCS/Current</topics>
</properties>
</xc:Application>
<xc:Module>/nfshome0/lorsini/devel/localbus/worksuite/dipbridge/lib/linux/x86_64_centos7/libdipbridge.so</xc:Module>
<xc:Module>/usr/repos/worksuite/dipbridge/lib/linux/x86_64_centos7/libdipbridge.so</xc:Module>
</xc:Context>
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment