Commit 9355d96a authored by Luciano Orsini's avatar Luciano Orsini
Browse files

references #229: support for local pipes and remote pipes automatically

parent 523eb1df
......@@ -37,6 +37,7 @@ Sources= \
Output.cc \
ConnectionRequest.cc \
EstablishedConnection.cc \
Support.cc \
version.cc
......@@ -74,7 +75,7 @@ DependentLibraryDirs = \
UserSourcePath =
UserCFlags =
UserCCFlags =
UserCCFlags = -frtti
UserDynamicLinkFlags =
UserStaticLinkFlags =
UserExecutableLinkFlags =
......
......@@ -50,52 +50,54 @@
namespace pt
{
namespace vpi
{
class Application : public xdaq::Application, public xgi::framework::UIManager, public xdata::ActionListener
{
public:
XDAQ_INSTANTIATOR();
Application (xdaq::ApplicationStub* stub) ;
void actionPerformed (xdata::Event& e);
void Default (xgi::Input * in, xgi::Output * out);
void SenderTabPage (xgi::Output * sout);
void ReceiverTabPage (xgi::Output * sout);
void SettingsTabPage (xgi::Output * sout);
void EventsTabPage (xgi::Output * sout);
void EndpointsTabPage (xgi::Output * sout);
ibvla::EventWorkLoop * ewl_;
pt::vpi::PeerTransport * pt_;
xdata::String iaName_;
xdata::String networkInterface_;
xdata::UnsignedInteger SGIDIndex_;
xdata::UnsignedInteger portNumber_;
xdata::UnsignedLong poolSize_;
xdata::Double poolHighThreshold_;
xdata::Double poolLowThreshold_;
// IBVLA specific configurable variables
xdata::UnsignedInteger completionQueueSize_;
xdata::UnsignedInteger sendQueuePairSize_;
xdata::UnsignedInteger recvQueuePairSize_;
xdata::UnsignedInteger maxMessageSize_; // max size that is to be sent over wire
xdata::Boolean maxMessageSizeCheck_;
xdata::UnsignedInteger deviceMTU_; // can be one of 256, 512, 1024, 2048, 4096, used for ib device configuration
xdata::Boolean sendWithTimeout_;
protected:
};
}
namespace vpi
{
class Application : public xdaq::Application, public xgi::framework::UIManager, public xdata::ActionListener
{
public:
XDAQ_INSTANTIATOR();
Application (xdaq::ApplicationStub* stub) ;
void actionPerformed (xdata::Event& e);
void Default (xgi::Input * in, xgi::Output * out);
void SenderTabPage (xgi::Output * sout);
void ReceiverTabPage (xgi::Output * sout);
void SettingsTabPage (xgi::Output * sout);
void EventsTabPage (xgi::Output * sout);
void EndpointsTabPage (xgi::Output * sout);
ibvla::EventWorkLoop * ewl_;
pt::vpi::PeerTransport * pt_;
xdata::String iaName_;
xdata::String networkInterface_;
xdata::UnsignedInteger SGIDIndex_;
xdata::UnsignedInteger portNumber_;
xdata::String GIDType_; // Use to auto detect index "ib", "v1" or "v2". If empty string use of SGIDIndex_
xdata::UnsignedLong poolSize_;
xdata::Double poolHighThreshold_;
xdata::Double poolLowThreshold_;
// IBVLA specific configurable variables
xdata::UnsignedInteger completionQueueSize_;
xdata::UnsignedInteger sendQueuePairSize_;
xdata::UnsignedInteger recvQueuePairSize_;
//xdata::UnsignedInteger maxMessageSize_; // max size that is to be sent over wire
xdata::Boolean maxMessageSizeCheck_;
xdata::UnsignedInteger deviceMTU_; // can be one of 256, 512, 1024, 2048, 4096, used for ib device configuration
xdata::Boolean sendWithTimeout_;
xdata::Boolean allowLocalPipeSupport_; // if true , for connection within the same executive uses local pipe, otherwise loopback
protected:
};
}
}
#endif
......@@ -16,34 +16,27 @@
namespace pt
{
namespace vpi
{
class ConnectionRequest: public pt::pipe::ConnectionRequest
{
public:
ConnectionRequest(pt::PeerTransport * pt, std::shared_ptr<void> & ch, const std::string & network);
virtual ~ConnectionRequest();
std::shared_ptr<void> getConnectionHandle();
std::string getNetwork();
pt::PeerTransport * getPeerTransport();
protected:
pt::PeerTransport *pt_;
std::shared_ptr<void> ch_;
std::string network_;
};
}
namespace vpi
{
class ConnectionRequest: public pt::pipe::ConnectionRequest
{
public:
//ConnectionRequest(pt::PeerTransport * pt, std::shared_ptr<void> & ch, const std::string & network, bool local);
ConnectionRequest(pt::PeerTransport * pt, std::shared_ptr<void> & ch, const std::string & network);
virtual ~ConnectionRequest();
std::shared_ptr<void> getConnectionHandle();
std::string getNetwork();
pt::PeerTransport * getPeerTransport();
//bool isLocal();
protected:
pt::PeerTransport *pt_;
std::shared_ptr<void> ch_;
std::string network_;
//bool local_;
};
}
}
#endif
// $Id$
/*
*************************************************************************
* XDAQ Components for Distributed Data Acquisition *
* Copyright (C) 2000-2021, CERN. *
* All rights reserved. *
* Authors: L.Orsini, A.Petrucci, d. Simelevicius *
* For the licensing terms see LICENSE. *
* For the list of contributors see CREDITS. *
*************************************************************************/
#ifndef _pt_vpi_Connector_h
#define _pt_vpi_Connector_h
#include "ibvla/exception/Exception.h"
#include "ibvla/QueuePair.h"
#include <infiniband/verbs.h>
namespace pt
{
namespace vpi
{
class QueuePair;
class Connector
{
public:
Connector (const std::string & destname, const std::string & destport, size_t mtu, bool sendWithTimeout, int sgid_index, int is_global, size_t path);
void connect();
void exchangeWithRemote(ibvla::QueuePair &qp);
const std::string & destname_;
const std::string & destport_;
ibv_mtu mtu_;
bool sendWithTimeout_;
int sgid_index_;
int is_global_;
size_t path_;
int sockfd_;
};
}
}
#endif
......@@ -16,34 +16,29 @@
namespace pt
{
namespace vpi
{
class EstablishedConnection: public pt::pipe::EstablishedConnection
{
public:
EstablishedConnection(pt::PeerTransport * pt, std::shared_ptr<void> & ch, std::shared_ptr<void> & context);
virtual ~EstablishedConnection();
std::shared_ptr<void> getConnectionHandle();
void * getContext();
pt::PeerTransport * getPeerTransport();
protected:
pt::PeerTransport * pt_;
std::shared_ptr<void> ch_;
std::shared_ptr<void> context_;
};
namespace vpi
{
}
class EstablishedConnection: public pt::pipe::EstablishedConnection
{
public:
//EstablishedConnection(pt::PeerTransport * pt, std::shared_ptr<void> & ch, std::shared_ptr<void> & context, bool local);
EstablishedConnection(pt::PeerTransport * pt, std::shared_ptr<void> & ch, std::shared_ptr<void> & context);
virtual ~EstablishedConnection();
std::shared_ptr<void> getConnectionHandle();
void * getContext();
pt::PeerTransport * getPeerTransport();
//bool isLocal();
protected:
pt::PeerTransport * pt_;
std::shared_ptr<void> ch_;
std::shared_ptr<void> context_;
//bool local_;
};
}
}
#endif
......@@ -27,43 +27,69 @@
namespace pt
{
namespace vpi
{
class PeerTransport;
class Input: public pt::pipe::Input
{
public:
Input(std::shared_ptr<pt::pipe::ConnectionRequest> & pcr, pt::pipe::InputListener * listener , pt::vpi::PeerTransport * pt);
virtual ~Input();
void postFrame(toolbox::mem::Reference * ref );
bool empty();
toolbox::mem::Reference * completed();
pt::pipe::InputListener * getListener();
protected:
std::shared_ptr<pt::pipe::ConnectionRequest> pcr_;
ibvla::QueuePair qp_;
ibvla::CompletionQueue cqs_;
ibvla::CompletionQueue cqr_;
pt::pipe::InputListener * listener_;
toolbox::rlist<toolbox::mem::Reference*>* wclist_;
pt::vpi::PeerTransport * pt_;
};
namespace vpi
{
class PeerTransport;
}
class Input: public pt::pipe::Input
{
friend class PeerTransport;
public:
Input(std::shared_ptr<pt::pipe::ConnectionRequest> & pcr, pt::pipe::InputListener * listener , pt::vpi::PeerTransport * pt);
virtual ~Input() {};
//void postFrame(toolbox::mem::Reference * ref );
//bool empty();
//toolbox::mem::Reference * completed();
pt::pipe::InputListener * getListener();
//virtual bool isLocal() = 0;
protected:
std::shared_ptr<pt::pipe::ConnectionRequest> pcr_;
pt::pipe::InputListener * listener_;
pt::vpi::PeerTransport * pt_;
};
class RInput: public pt::vpi::Input
{
friend class PeerTransport;
public:
RInput(std::shared_ptr<pt::pipe::ConnectionRequest> & pcr, pt::pipe::InputListener * listener , pt::vpi::PeerTransport * pt);
virtual ~RInput();
void postFrame(toolbox::mem::Reference * ref );
bool empty();
toolbox::mem::Reference * completed();
//bool isLocal();
//pt::pipe::InputListener * getListener();
protected:
ibvla::QueuePair qp_;
ibvla::CompletionQueue cqs_;
ibvla::CompletionQueue cqr_;
toolbox::rlist<toolbox::mem::Reference*>* wclist_;
};
class LInput: public pt::vpi::Input
{
friend class PeerTransport;
public:
LInput(std::shared_ptr<pt::pipe::ConnectionRequest> & pcr, pt::pipe::InputListener * listener , pt::vpi::PeerTransport * pt);
virtual ~LInput();
void postFrame(toolbox::mem::Reference * ref );
bool empty();
toolbox::mem::Reference * completed();
//bool isLocal();
//pt::pipe::InputListener * getListener();
protected:
toolbox::rlist<toolbox::mem::Reference*> * io_;
toolbox::rlist<toolbox::mem::Reference*> * cq_;
};
}
}
#endif
......@@ -28,75 +28,76 @@
namespace pt
{
namespace vpi
{
class PeerTransport;
class Output: public pt::pipe::Output
{
friend class PeerTransport;
public:
Output(std::shared_ptr<pt::pipe::EstablishedConnection> & pec, pt::pipe::OutputListener * listener, pt::vpi::PeerTransport * pt);
virtual ~Output();
void postFrame(toolbox::mem::Reference * ref );
bool empty();
toolbox::mem::Reference * completed();
pt::pipe::OutputListener * getOutputListener();
protected:
std::shared_ptr<pt::pipe::EstablishedConnection> pec_;
ibvla::QueuePair qp_;
ibvla::CompletionQueue cqs_;
ibvla::CompletionQueue cqr_;
pt::pipe::OutputListener * listener_;
size_t outstandingRequests_;
toolbox::rlist<toolbox::mem::Reference*>* wclist_;
pt::vpi::PeerTransport * pt_;
namespace vpi
{
class PeerTransport;
};
class Output: public pt::pipe::Output
{
friend class PeerTransport;
public:
Output(std::shared_ptr<pt::pipe::EstablishedConnection> & pec, pt::pipe::OutputListener * listener, pt::vpi::PeerTransport * pt);
virtual ~Output() {};
//void postFrame(toolbox::mem::Reference * ref );
//bool empty();
//toolbox::mem::Reference * completed();
pt::pipe::OutputListener * getOutputListener();
//virtual bool isLocal() = 0;
protected:
std::shared_ptr<pt::pipe::EstablishedConnection> pec_;
pt::pipe::OutputListener * listener_;
pt::vpi::PeerTransport * pt_;
};
class ROutput: public pt::vpi::Output
{
friend class PeerTransport;
public:
ROutput(std::shared_ptr<pt::pipe::EstablishedConnection> & pec, pt::pipe::OutputListener * listener, pt::vpi::PeerTransport * pt);
virtual ~ROutput();
void postFrame(toolbox::mem::Reference * ref );
bool empty();
toolbox::mem::Reference * completed();
//pt::pipe::OutputListener * getOutputListener();
//bool isLocal();
protected:
ibvla::QueuePair qp_;
ibvla::CompletionQueue cqs_;
ibvla::CompletionQueue cqr_;
size_t outstandingRequests_;
toolbox::rlist<toolbox::mem::Reference*>* wclist_;
};
class LOutput: public pt::vpi::Output
{
friend class PeerTransport;
public:
LOutput(std::shared_ptr<pt::pipe::EstablishedConnection> & pec, pt::pipe::OutputListener * listener, pt::vpi::PeerTransport * pt);
virtual ~LOutput();
void postFrame(toolbox::mem::Reference * ref );
bool empty();
toolbox::mem::Reference * completed();
//pt::pipe::OutputListener * getOutputListener();
//bool isLocal();
protected:
toolbox::rlist<toolbox::mem::Reference*> * io_;
toolbox::rlist<toolbox::mem::Reference*> * cq_;
};
}
}
}
#endif
/*
struct ibv_wc wc;
int ret;
while (1)
{
try
{
ret = cq_.poll(1, &wc);
}
catch (ibvla::exception::Exception & e)
{
std::stringstream ss;
ss << "Failed process event queue '";
ss << e.what() << "'";
std::cout << ss.str() << std::endl;
return false;
}
if (ret == 0)
{
emptyDispatcherQueueCounter_++;
continue;
}
receivedEventCounter_++;
handler_->handleWorkRequest(&wc);
}
return true;
*/
......@@ -49,129 +49,138 @@
namespace pt
{
namespace vpi
{
const size_t MAX_ASYNC_EVENT = 19 + 1;
const size_t MAX_WC_EVENT = 22 + 1; // number of possible errors + 2
class PeerTransport : public pt::pipe::Service, public pt::PeerTransportSender, public pt::PeerTransportReceiver, public xdaq::Object, public ibvla::EventHandler, public ibvla::AcceptorListener
{
friend class Input;
friend class Output;
friend class ConnectionRequest;
friend class EstablishedConnection;
public:
PeerTransport (xdaq::Application * parent) ;
~PeerTransport ();
//! Retrieve the type of peer transport (Sender or Receiver or both)
//
pt::TransportType getType ();
pt::Address::Reference createAddress (const std::string& url, const std::string& service) ;
pt::Address::Reference createAddress (std::map<std::string, std::string, std::less<std::string> >& address) ;
//! Returns the implemented protocol ("loopback" in this version)
//
std::string getProtocol ();
//! Retrieve a list of supported services ("i2o" only in this version)
//
std::vector<std::string> getSupportedServices ();
//! Returns true if a service is supported ("i2o" only in this version), false otherwise
//
bool isServiceSupported (const std::string& service);
//! Retrieve a loopback messenger for the fifo peer transport that allows context internal application communication
//
pt::Messenger::Reference getMessenger (pt::Address::Reference destination, pt::Address::Reference local) ;
//! Internal function to add a message processing listener for this peer transport
//
void addServiceListener (pt::Listener* listener) ;
//! Internal function to remove a message processing listener for this peer transport
//
void removeServiceListener (pt::Listener* listener) ;
//! Internal function to remove all message processing listeners for this peer transport
//
void removeAllServiceListeners ();
//! Function to configure this peer transport with a loopback address
//
void config (pt::Address::Reference address) ;
std::list<ibvla::QueuePair> getAcceptedQueuePairs ();
std::list<ibvla::QueuePair> getConnectedQueuePairs ();
// Pipes service support
void connect(pt::Address::Reference source, pt::Address::Reference destination, pt::pipe::ServiceListener * l, std::shared_ptr<void> & user_data );
pt::pipe::Output * createOutputPipe( std::shared_ptr<pt::pipe::EstablishedConnection> & ec, pt::pipe::OutputListener * listener );
pt::pipe::Input * createInputPipe( std::shared_ptr<pt::pipe::ConnectionRequest> & ec, pt::pipe::InputListener * listener );
void destroyInputPipe(pt::pipe::Input * ip);
void destroyOutputPipe(pt::pipe::Output * op);
protected:
void connect (ibvla::QueuePair & qp, const std::string & host, size_t port, size_t path, int sgid_index, int is_global) ;
std::list<ibvla::QueuePair> acceptedQP_;