Commit 6d72909b authored by Andrea Petrucci's avatar Andrea Petrucci
Browse files

references #251: first debugging version of DIP datapoints using the DIPBridge

parent 2e875d47
......@@ -55,6 +55,7 @@ namespace evb {
inline void I2O_BU_CACHE_Callback(toolbox::mem::Reference*);
inline void I2O_LUMISECTION_INFO_Callback(toolbox::mem::Reference*);
virtual void timeExpired (toolbox::task::TimerEvent&);
virtual void do_appendApplicationInfoSpaceItems(InfoSpaceItems&);
virtual void do_appendMonitoringInfoSpaceItems(InfoSpaceItems&);
virtual void do_updateMonitoringInfo();
......
......@@ -66,7 +66,7 @@ namespace evb {
private:
virtual void timeExpired (toolbox::task::TimerEvent&);
virtual void do_appendApplicationInfoSpaceItems(InfoSpaceItems&);
virtual void do_appendMonitoringInfoSpaceItems(InfoSpaceItems&);
virtual void do_updateMonitoringInfo();
......
......@@ -12,6 +12,7 @@
#include "xdaq/ApplicationStub.h"
namespace evb {
/**
......@@ -34,7 +35,7 @@ namespace evb {
{ return ruProxy_->getRUtids(); }
private:
virtual void timeExpired (toolbox::task::TimerEvent&);
virtual void do_appendMonitoringInfoSpaceItems(InfoSpaceItems&);
virtual void do_updateMonitoringInfo();
virtual void do_handleItemChangedEvent(const std::string& item);
......
......@@ -90,7 +90,8 @@ namespace evb {
xdata::UnsignedInteger32 dummyFedSizeMax; // Maximum size of the FED data when using the log-normal distrubution
xdata::String dipNodes; // Comma-separated list of DIP nodes
xdata::String maskedDipTopics; // DIP topics which will not be considered
xdata::String dipEventingBus; // Name of Eventing BUS for DIP Bridge
xdata::String dipEventingBus; // Name of Eventing BUS for DIP Bridge
xdata::UnsignedInteger32 dipBridgeRegisterTimeout; // Time in second to wait between the DIP datapoints eventing subscription and the register message to DIPBrige
xdata::UnsignedInteger32 fragmentPoolSize; // Size of the toolbox::mem::Pool in Bytes used for dummy events
xdata::Vector<xdata::UnsignedInteger32> fedSourceIds; // Vector of activ FED ids
FerolSources ferolSources; // Vector of FEROL sources
......@@ -128,6 +129,7 @@ namespace evb {
dipNodes("cmsdimns1.cern.ch,cmsdimns2.cern.ch"),
maskedDipTopics(""),
dipEventingBus("slimbus"),
dipBridgeRegisterTimeout(15),
fragmentPoolSize(200000000),
createSoftFed1022(false),
checkBxId(true),
......@@ -168,6 +170,7 @@ namespace evb {
params.add("dipNodes", &dipNodes);
params.add("maskedDipTopics", &maskedDipTopics);
params.add("dipEventingBus", &dipEventingBus);
params.add("dipBridgeRegisterTimeout", &dipBridgeRegisterTimeout);
params.add("fragmentPoolSize", &fragmentPoolSize);
params.add("fedSourceIds", &fedSourceIds);
params.add("ferolSources", &ferolSources);
......
......@@ -177,6 +177,15 @@ namespace evb {
ReadoutUnit* getReadoutUnit() const
{ return readoutUnit_; }
MetaDataRetrieverDIPBridgePtr getMetaDataRetrieverDIPBridgePtr() const
{return metaDataRetrieverDIPBridge_;}
/**
* Return the metaData Retriever DIPBridge
*/
//MetaDataRetrieverDIPBridge& getMetaDataRetrieverDIPBridge()
//{ return metaDataRetrieverDIPBridge_; }
using FerolStreamPtr = std::shared_ptr<FerolStream<ReadoutUnit,Configuration>>; //SocketStreams is shared with PipeHandler
using FerolStreams = std::map<uint16_t,FerolStreamPtr>;
......@@ -194,7 +203,7 @@ namespace evb {
uint32_t getLumiSectionFromGTPe(const DataLocations&) const;
ReadoutUnit* readoutUnit_;
MetaDataRetrieverDIPBridgePtr metaDataRetrieverDIPBridge_;
FerolStreams ferolStreams_;
mutable boost::shared_mutex ferolStreamsMutex_;
......@@ -211,6 +220,7 @@ namespace evb {
toolbox::task::ActionSignature* dummySuperFragmentAction_;
volatile std::atomic<bool> buildDummySuperFragmentActive_;
MetaDataRetrieverDIPBridgePtr metaDataRetrieverDIPBridge_;
InputMonitor superFragmentMonitor_;
mutable std::mutex superFragmentMonitorMutex_;
uint32_t incompleteEvents_;
......@@ -692,7 +702,7 @@ void evb::readoutunit::Input<ReadoutUnit,Configuration>::configure()
|| configuration->createSoftFed1022 )
{
if ( ! metaDataRetrieverDIPBridge_ )
metaDataRetrieverDIPBridge_ = std::make_shared<MetaDataRetrieverDIPBridge>(readoutUnit_,configuration->dipEventingBus,readoutUnit_->getApplicationLogger());
metaDataRetrieverDIPBridge_ = std::make_shared<MetaDataRetrieverDIPBridge>(readoutUnit_,readoutUnit_,configuration->dipEventingBus, configuration->dipBridgeRegisterTimeout,readoutUnit_->getApplicationLogger());
metaDataRetrieverDIPBridge_->subscribeToDip( configuration->maskedDipTopics );
}
......
......@@ -9,6 +9,7 @@
#include <utility>
#include <vector>
#include "toolbox/task/TimerListener.h"
#include "cgicc/HTMLClasses.h"
#include "evb/readoutunit/MetaData.h"
......@@ -18,6 +19,7 @@
#include "toolbox/mem/Reference.h"
#include "xdata/Properties.h"
#include "xdata/String.h"
#include "xdata/UnsignedInteger32.h"
#include "eventing/api/Bus.h"
#include "eventing/api/Member.h"
#include "xdaq/Object.h"
......@@ -35,7 +37,7 @@ namespace evb {
{
public:
MetaDataRetrieverDIPBridge(xdaq::Application* owner,const std::string&, log4cplus::Logger&);
MetaDataRetrieverDIPBridge(xdaq::Application* owner, toolbox::task::TimerListener* timerListener, const std::string&, xdata::UnsignedInteger32&, log4cplus::Logger&);
~MetaDataRetrieverDIPBridge();
bool fillData(unsigned char*);
......@@ -47,6 +49,8 @@ namespace evb {
//void handleException(DipSubscription*, DipException&);
void subscribeToDip(const std::string& maskedDipTopics);
void timerToRegisterToDipBridge();
void registerToDipBridge(const std::string& maskedDipTopics);
bool missingSubscriptions() const;
void addListOfSubscriptions(std::ostringstream&, const bool missingOnly = false);
......@@ -63,6 +67,8 @@ namespace evb {
log4cplus::Logger& logger_;
std::string eventingBusName_;
toolbox::task::TimerListener* timerListener_;
bool dipBridgeSubscriptions_;
enum DipStatus { unavailable,okay,masked };
using DipTopic = std::pair<std::string,DipStatus>;
......@@ -77,6 +83,7 @@ namespace evb {
};
using DipTopics = std::vector<DipTopic>;
DipTopics dipTopics_;
xdata::UnsignedInteger32 dipBridgeRegisterTimeout_;
MetaData::Luminosity lastLuminosity_;
mutable std::mutex luminosityMutex_;
......
......@@ -65,6 +65,7 @@ namespace evb {
protected:
virtual void timeExpired (toolbox::task::TimerEvent&);
virtual void do_appendApplicationInfoSpaceItems(InfoSpaceItems&);
virtual void do_appendMonitoringInfoSpaceItems(InfoSpaceItems&);
virtual void do_updateMonitoringInfo();
......@@ -130,6 +131,15 @@ evb::readoutunit::ReadoutUnit<Unit,Configuration,StateMachine>::ReadoutUnit
{}
template<class Unit,class Configuration,class StateMachine>
void evb::readoutunit::ReadoutUnit<Unit,Configuration,StateMachine>:: timeExpired
(
toolbox::task::TimerEvent& e
)
{
LOG4CPLUS_ERROR(this->getApplicationLogger(), "The timeExperied should be called only for the EVM application.");
}
template<class Unit,class Configuration,class StateMachine>
void evb::readoutunit::ReadoutUnit<Unit,Configuration,StateMachine>::do_appendApplicationInfoSpaceItems
(
......
......@@ -65,6 +65,11 @@ void evb::BU::do_appendApplicationInfoSpaceItems
}
void evb::BU::timeExpired (toolbox::task::TimerEvent& e) {
LOG4CPLUS_ERROR(getApplicationLogger(), "The timeExperied should be called only for the EVM application.");
}
void evb::BU::do_appendMonitoringInfoSpaceItems
(
InfoSpaceItems& monitoringParams
......
......@@ -49,6 +49,9 @@ evb::test::DummyFEROL::~DummyFEROL()
workLoop_->cancel();
}
void evb::test::DummyFEROL::timeExpired (toolbox::task::TimerEvent& e) {
LOG4CPLUS_ERROR(getApplicationLogger(), "The timeExperied should be called only for the EVM application.");
}
void evb::test::DummyFEROL::do_appendApplicationInfoSpaceItems
(
......
......@@ -7,6 +7,8 @@
#include "evb/readoutunit/BUproxy.h"
#include "evb/readoutunit/FedFragment.h"
#include "evb/readoutunit/FerolConnectionManager.h"
#include "evb/readoutunit/MetaDataRetrieverDIPBridge.h"
#include "evb/readoutunit/Input.h"
#include "evb/readoutunit/States.h"
#include "interface/shared/GlobalEventNumber.h"
......@@ -27,6 +29,14 @@ evb::EVM::EVM(xdaq::ApplicationStub* app) :
LOG4CPLUS_INFO(this->getApplicationLogger(), "End of constructor");
}
void evb::EVM::timeExpired (toolbox::task::TimerEvent& e) {
std::cout<< "Called evb::EVM::timeExpired "<< e.type()<<std::endl;
xdata::String dipNodes = this->configuration_->dipNodes;
evb::readoutunit::MetaDataRetrieverDIPBridgePtr dipRetriver = this->input_.get()->getMetaDataRetrieverDIPBridgePtr();
dipRetriver->registerToDipBridge(dipNodes);
}
void evb::EVM::do_appendMonitoringInfoSpaceItems(InfoSpaceItems& items)
{
......
......@@ -8,26 +8,35 @@
#include "xcept/tools.h"
#include "xcept/Exception.h"
#include "toolbox/mem/AutoReference.h"
#include "toolbox/task/Timer.h"
#include "toolbox/task/TimerFactory.h"
#include "toolbox/TimeInterval.h"
#include "xdata/Table.h"
#include "xdata/exdr/Serializer.h"
#include "xdata/exdr/FixedSizeInputStreamBuffer.h"
#include "xdata/Integer32.h"
#include "xdata/Integer64.h"
#include "xdata/Integer64.h"
#include "xdata/Float.h"
#include <unistd.h>
evb::readoutunit::MetaDataRetrieverDIPBridge::MetaDataRetrieverDIPBridge(
xdaq::Application* owner,
toolbox::task::TimerListener* timerListener,
const std::string& eventBusName,
xdata::UnsignedInteger32& dipBridgeTimeout,
log4cplus::Logger& logger
):
eventing::api::Member(owner),
logger_(logger)
{
timerListener_= timerListener;
eventingBusName_=eventBusName.c_str();
dipBridgeRegisterTimeout_ = dipBridgeTimeout;
// Attention: DCS HV values have to come first and in the order of the HV bits
// defined in DataFormats::OnlineMetaData::DCSRecord::Partition in CMSSW
dipTopics_.push_back( DipTopic("dip/CMS/DCS/CMS_ECAL/CMS_ECAL_BP/state",unavailable) );
......@@ -61,6 +70,7 @@ evb::readoutunit::MetaDataRetrieverDIPBridge::MetaDataRetrieverDIPBridge(
dipTopics_.push_back( DipTopic("dip/CMS/CTPPS/detectorFSM",unavailable) );
b2in::nub::deferredbind(this->getOwnerApplication(),this, &evb::readoutunit::MetaDataRetrieverDIPBridge::handleMessage);
dipBridgeSubscriptions_ = false;
}
......@@ -81,6 +91,7 @@ evb::readoutunit::MetaDataRetrieverDIPBridge::~MetaDataRetrieverDIPBridge()
plist.setProperty("urn:dip:topicname", topic.first.c_str());
this->getEventingBus(eventingBusName_).publish("urn:dip:metacontrol", 0, plist);
LOG4CPLUS_WARN(logger_,toolbox::toString("urn:dip:action=unregister, urn:dip:topicname=%s",topic.first.c_str()));
std::cout<< "urn:dip:action=unregister topicname='"<< topic.first.c_str() <<std::endl;
}
} catch(eventing::api::exception::Exception& e)
{
......@@ -91,31 +102,90 @@ evb::readoutunit::MetaDataRetrieverDIPBridge::~MetaDataRetrieverDIPBridge()
}
void evb::readoutunit::MetaDataRetrieverDIPBridge::timerToRegisterToDipBridge() {
/*
toolbox::task::Timer * timer = toolbox::task::getTimerFactory()->createTimer("PeriodicDiagnostic");
toolbox::TimeInterval interval(8,0); // period of 8 secs
toolbox::TimeVal start;
start = toolbox::TimeVal::gettimeofday();
timer->scheduleAtFixedRate( start, timerListener_, interval, 0, "" );
std::cout<< "Started timerToRegisterToDipBridge TimeInterval="<< interval <<std::endl;
*/
void evb::readoutunit::MetaDataRetrieverDIPBridge::subscribeToDip(const std::string& maskedDipTopics)
{
maskedDipTopics_ = maskedDipTopics;
for (auto& topic : dipTopics_)
{
std::string appUUID= this->getOwnerApplication()->getApplicationDescriptor()->getUUID().toString();
std::string timerName_= "timer_" + appUUID + toolbox::toString(".%d", rand());
toolbox::task::Timer * timer_ = toolbox::task::getTimerFactory()->createTimer(timerName_);
try
{
if ( maskedDipTopics.find(topic.first) != std::string::npos )
topic.second = masked;
else if (topic.second != okay) {
xdata::Properties plist;
plist.setProperty("urn:dip:action", "register");
plist.setProperty("urn:dip:topicname", topic.first.c_str());
this->getEventingBus(eventingBusName_).publish("urn:dip:metacontrol", 0, plist);
LOG4CPLUS_WARN(logger_,toolbox::toString("urn:dip:action=register, urn:dip:topicname=%s",topic.first.c_str()));
// current time, scheduled start time, and xmas-ready time
double now = toolbox::TimeVal::gettimeofday();
double startTimeSecs_ = now + dipBridgeRegisterTimeout_;
// schedule start of monitoring
toolbox::TimeVal startTime(startTimeSecs_);
timer_->schedule(timerListener_, startTime, 0, "");
std::cout<< "Started timerToRegisterToDipBridge TimeInterval="<< startTime <<std::endl;
}
void evb::readoutunit::MetaDataRetrieverDIPBridge::registerToDipBridge(
const std::string &maskedDipTopics) {
for (auto &topic : dipTopics_) {
try {
if (maskedDipTopics.find(topic.first) != std::string::npos)
topic.second = masked;
else if (topic.second != okay) {
xdata::Properties plist;
plist.setProperty("urn:dip:action", "register");
plist.setProperty("urn:dip:topicname", topic.first.c_str());
this->getEventingBus(eventingBusName_).publish(
"urn:dip:metacontrol", 0, plist);
LOG4CPLUS_WARN(logger_,
toolbox::toString(
"urn:dip:action=register, urn:dip:topicname=%s",
topic.first.c_str()));
std::cout<< "urn:dip:action=register topicname='"<< topic.first.c_str() <<std::endl;
usleep(200);
}
} catch (eventing::api::exception::Exception &e) {
std::string msg(
"Failed to subscribe to eventing bus " + eventingBusName_);
LOG4CPLUS_ERROR(logger_, msg + stdformat_exception_history(e));
}
} catch(eventing::api::exception::Exception& e)
{
std::string msg("Failed to subscribe to eventing bus "+eventingBusName_);
LOG4CPLUS_ERROR(logger_,msg+stdformat_exception_history(e));
}
}
}
void evb::readoutunit::MetaDataRetrieverDIPBridge::subscribeToDip(
const std::string &maskedDipTopics) {
maskedDipTopics_ = maskedDipTopics;
if (!dipBridgeSubscriptions_) {
for (auto &topic : dipTopics_) {
try {
if (maskedDipTopics.find(topic.first) != std::string::npos)
topic.second = masked;
else if (topic.second != okay) {
this->getEventingBus(eventingBusName_).subscribe(
topic.first.c_str());
LOG4CPLUS_WARN(logger_,
toolbox::toString("subsribe, topicname=%s",
topic.first.c_str()));
std::cout << "Subscribe topicname='" << topic.first.c_str()
<< std::endl;
}
} catch (eventing::api::exception::Exception &e) {
std::string msg(
"Failed to subscribe to eventing bus "
+ eventingBusName_);
LOG4CPLUS_ERROR(logger_, msg + stdformat_exception_history(e));
}
}
dipBridgeSubscriptions_ = true;
}
// Call the timer to register to DIPBridge
this->timerToRegisterToDipBridge();
std::cout<< "Called timerToRegisterToDipBridge"<<std::endl;
}
/*
......@@ -165,7 +235,7 @@ void evb::readoutunit::MetaDataRetrieverDIPBridge::handleMessage(toolbox::mem::R
LOG4CPLUS_WARN(logger_,"received topic "+topicName);
//parse table message
xdata::Table table;
std::cout<<"Message buffer size "<<ref->getDataSize()<<std::endl;
std::cout<< "received topic='"<< topicName << "' Message buffer size "<<ref->getDataSize()<<std::endl;
xdata::exdr::FixedSizeInputStreamBuffer inBuffer(static_cast<char*>(ref->getDataLocation()),ref->getDataSize());
xdata::exdr::Serializer serializer;
try
......@@ -292,6 +362,7 @@ void evb::readoutunit::MetaDataRetrieverDIPBridge::handleMessage(toolbox::mem::R
lastBeamSpot_.errDydz = dynamic_cast<xdata::Float*>(p)->value_;
msg << ", dxdz=" << lastBeamSpot_.dxdz << ", dydz=" << lastBeamSpot_.dydz << ", err_dxdz="<< lastBeamSpot_.errDxdz << ", err_dydz="<< lastBeamSpot_.errDydz;
std::cout<< msg.str()<<std::endl;
LOG4CPLUS_WARN(logger_,msg.str());
}
else if ( topicName == "dip/CMS/CTPPS/detectorFSM" )
......@@ -348,6 +419,7 @@ void evb::readoutunit::MetaDataRetrieverDIPBridge::handleMessage(toolbox::mem::R
//const uint64_t dipTime = dipData.extractDipTime().getAsMillis();
lastDCS_.timeStamp = std::max(lastDCS_.timeStamp,dipTime);
std::cout<< "diptime='"<< dipTime << "' state='"<<state<<"'"<<std::endl;
LOG4CPLUS_WARN(logger_, toolbox::toString(
"Got a %s notification: diptime=%, status=%f",
topicName, dipTime, lastDCS_.highVoltageReady));
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment