Commit bf5b21d4 authored by Dainius Simelevicius's avatar Dainius Simelevicius
Browse files

references #263: refactoring dipbridge code

parent 5e2bf269
// $Id$
/*************************************************************************
* XDAQ Components for Distributed Data Acquisition *
* Copyright (C) 2000-2020, CERN. *
* Copyright (C) 2000-2021, CERN. *
* All rights reserved. *
* Authors: Zhen Xie, Luciano Orsini, Dainius Simelevicius *
* Authors: Zhen Xie, Luciano Orsini and Dainius Simelevicius *
* For the licensing terms see LICENSE. *
* For the list of contributors see CREDITS. *
*************************************************************************/
......@@ -49,6 +47,22 @@ namespace dipbridge
class PubErrorHandler;
class ServerErrorHandler;
enum Command{update, destroy, incUpdate, incPublish, dipPublish};
class DipCommand
{
public:
DipCommand(Command c, const std::string & topic);
DipCommand(Command c, const std::string & topic, xdata::Table::Reference & tabRef);
Command getCommand();
std::string getTopic();
xdata::Table::Reference getTable();
protected:
Command c_;
std::string topic_;
xdata::Table::Reference tabRef_;
};
/**
receive xdata::Table from eventing and publish it to dip
subscribe to dip topic and send it as xdata::Table to eventing
......@@ -64,55 +78,58 @@ namespace dipbridge
public:
XDAQ_INSTANTIATOR();
// constructor
Application(xdaq::ApplicationStub* s);
Application(xdaq::ApplicationStub * s);
// destructor
~Application();
// xgi(web) callback
void Default(xgi::Input * in, xgi::Output * out);
// infospace event callback
virtual void actionPerformed(xdata::Event& e);
virtual void actionPerformed(xdata::Event & e);
// toolbox event callback
virtual void actionPerformed(toolbox::Event& e);
virtual void actionPerformed(toolbox::Event & e);
// b2in callback
void onMessage(toolbox::mem::Reference * ref, xdata::Properties & plist);
// dip message handler
void handleMessage(DipSubscription* dipsub, DipData& message);
void handleMessage(DipSubscription * dipsub, DipData & message);
// DipSubscriptionListener methods
void connected(DipSubscription* dipsub);
void connected(DipSubscription * dipsub);
// DipSubscriptionListener methods
void disconnected(DipSubscription* subscription,char *reason);
void disconnected(DipSubscription * subscription, char * reason);
// DipSubscriptionListener methods
void handleException(DipSubscription* subscription, DipException& ex);
// workloop method
bool publishingToDip( toolbox::task::WorkLoop* wl );
void handleException(DipSubscription * subscription, DipException & ex);
private:
void publishDipMessageToEventing(const std::string& dipname, xdata::Table& dipmessage);
void publishDipMessageToEventing(const std::string & dipname, xdata::Table & dipmessage);
void process(std::string name);
void init(std::string name);
protected:
std::string m_processuuid;
toolbox::BSem m_applock;
bool m_busready;
DipFactory* pm_dip;
toolbox::mem::Pool* pm_memPool;
PubErrorHandler* pm_puberrorhandler;
ServerErrorHandler* pm_servererrorhandler;
std::string processuuid_;
toolbox::BSem applock_;
bool busready_;
DipFactory * dip_;
toolbox::mem::Pool * memPool_;
PubErrorHandler * puberrorhandler_;
ServerErrorHandler * servererrorhandler_;
// config parameters
xdata::String m_bus;
xdata::Vector< xdata::String > m_dipSubTopics;
xdata::String m_dipDataTopic;
xdata::Integer32 m_throttle_threshold_millisec;
xdata::String bus_;
xdata::Vector< xdata::String > dipSubTopics_;
std::string dipDataTopic_;
xdata::Integer32 throttleThresholdMillisec_;
// registries
std::map< std::string,DipSubscription* > m_dipsubs;
std::map< std::string,dipDataInsert* > m_dippubs;
std::map< std::string,long long > m_millisecsincelast;
xdata::exdr::Serializer m_serializer;
// outdata queue
std::map< dipDataInsert*, toolbox::squeue< xdata::Table::Reference >* > m_dippubqueuestore;
// workloops
toolbox::task::WorkLoop* pm_pubtodip_wl;
toolbox::task::ActionSignature* pm_pubtodip_as;
std::map< std::string, DipSubscription* > dipsubs_;
std::map< std::string, dipDataInsert* > dippubs_;
std::map< std::string, long long > millisecSinceLast_;
xdata::exdr::Serializer serializer_;
// statistics
typedef struct stat
{
size_t requestDipUpdateCounter;
size_t updateDipCounter;
size_t publishToEventingCounter;
} Stat;
std::map< std::string, Stat > statistics_;
toolbox::squeue < DipCommand* > commandQueue_;
};
}
#endif
// $Id$
/*************************************************************************
* XDAQ Components for Distributed Data Acquisition *
* Copyright (C) 2000-2021, CERN. *
......@@ -17,7 +15,7 @@
// !!! Edit this line to reflect the latest package version !!!
#define WORKSUITE_DIPBRIDGE_VERSION_MAJOR 3
#define WORKSUITE_DIPBRIDGE_VERSION_MINOR 2
#define WORKSUITE_DIPBRIDGE_VERSION_MINOR 3
#define WORKSUITE_DIPBRIDGE_VERSION_PATCH 0
// If any previous versions available E.g. #define ESOURCE_PREVIOUS_VERSIONS "3.8.0,3.8.1"
#define DIPBRIDGE_PREVIOUS_VERSIONS
......
This diff is collapsed.
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment