Commit e087ba35 authored by Luciano Orsini's avatar Luciano Orsini
Browse files

references #240: re-designed connection protocol to support pipe creation with size

parent 4ba05bea
......@@ -38,6 +38,7 @@ Sources= \
ConnectionRequest.cc \
EstablishedConnection.cc \
Support.cc \
Protocol.cc \
version.cc
......@@ -94,8 +95,31 @@ DependentLibraries = ibvla
DynamicLibrary=ptvpi
StaticLibrary=
Libraries = crypt \
config \
peer \
toolbox \
asyncresolv \
log4cplus \
xerces-c \
cgicc \
xcept \
xoap \
xdata \
xgi \
logudpappender \
logxmlappender \
mimetic \
uuid \
tirpc \
ibvla \
xdaq \
b2innub \
elf
TestLibraries=
TestExecutables=
Executables= client.cc server.cc
include $(XDAQ_ROOT)/$(BUILD_SUPPORT)/Makefile.rules
include $(XDAQ_ROOT)/$(BUILD_SUPPORT)/mfRPM.rules
// $Id$
/*************************************************************************
/*
*************************************************************************
* XDAQ Components for Distributed Data Acquisition *
* Copyright (C) 2000-2020, CERN. *
* Copyright (C) 2000-2021, CERN. *
* All rights reserved. *
* Authors: L.Orsini,D. Siemelevicius *
* Authors: L.Orsini, A. Petrucci, D. Siemelevicius *
* For the licensing terms see LICENSE. *
* For the list of contributors see CREDITS. *
*************************************************************************/
*************************************************************************
*/
#ifndef _pt_vpi_Address_h
#define _pt_vpi_Address_h
......
// $Id$
/*************************************************************************
/*
*************************************************************************
* XDAQ Components for Distributed Data Acquisition *
* Copyright (C) 2000-2020, CERN. *
* Copyright (C) 2000-2021, CERN. *
* All rights reserved. *
* Authors: L.Orsini, D. Simelevicius *
* Authors: L.Orsini, A. PEtrucci, D. Simelevicius *
* For the licensing terms see LICENSE. *
* For the list of contributors see CREDITS. *
*************************************************************************/
*************************************************************************
*/
#ifndef _pt_vpi_Allocator_h_
#define _pt_vpi_Allocator_h_
......
// $Id$
/*************************************************************************
/*
*************************************************************************
* XDAQ Components for Distributed Data Acquisition *
* Copyright (C) 2000-2020, CERN. *
* Copyright (C) 2000-2021, CERN. *
* All rights reserved. *
* Authors: L.Orsini, D. Simelevicius *
* Authors: L.Orsini, A. Petrucci, D. Simelevicius *
* For the licensing terms see LICENSE. *
* For the list of contributors see CREDITS. *
*************************************************************************/
*************************************************************************
*/
#ifndef _pt_vpi_Application_h_
#define _pt_vpi_Application_h_
......
// $Id$
/*************************************************************************
/*
*************************************************************************
* XDAQ Components for Distributed Data Acquisition *
* Copyright (C) 2000-2020, CERN. *
* Copyright (C) 2000-2021, CERN. *
* All rights reserved. *
* Authors: L.Orsini, D. Simelevicius *
* Authors: L.Orsini, A. Petrucci, D. Simelevicius *
* For the licensing terms see LICENSE. *
* For the list of contributors see CREDITS. *
*************************************************************************/
*************************************************************************
*/
#ifndef _pt_vpi_ConnectionRequest_h_
#define _pt_vpi_ConnectionRequest_h_
......@@ -21,7 +23,6 @@ namespace vpi
class ConnectionRequest: public pt::pipe::ConnectionRequest
{
public:
//ConnectionRequest(pt::PeerTransport * pt, std::shared_ptr<void> & ch, const std::string & network, bool local);
ConnectionRequest(pt::PeerTransport * pt, std::shared_ptr<void> & ch, const std::string & network);
virtual ~ConnectionRequest();
std::shared_ptr<void> getConnectionHandle();
......
// $Id$
/*************************************************************************
/*
*************************************************************************
* XDAQ Components for Distributed Data Acquisition *
* Copyright (C) 2000-2020, CERN. *
* Copyright (C) 2000-2021, CERN. *
* All rights reserved. *
* Authors: L.Orsini, D. Simelevicius *
* Authors: L.Orsini, A. Petrucci, D. Simelevicius *
* For the licensing terms see LICENSE. *
* For the list of contributors see CREDITS. *
*************************************************************************/
*************************************************************************
*/
#ifndef _pt_vpi_EstablishedConnection_h_
#define _pt_vpi_EstablishedConnection_h_
......@@ -23,7 +25,6 @@ class EstablishedConnection: public pt::pipe::EstablishedConnection
{
public:
//EstablishedConnection(pt::PeerTransport * pt, std::shared_ptr<void> & ch, std::shared_ptr<void> & context, bool local);
EstablishedConnection(pt::PeerTransport * pt, std::shared_ptr<void> & ch, std::shared_ptr<void> & context, const std::string & network);
virtual ~EstablishedConnection();
std::shared_ptr<void> getConnectionHandle();
......
// $Id$
/*************************************************************************
/*
*************************************************************************
* XDAQ Components for Distributed Data Acquisition *
* Copyright (C) 2000-2020, CERN. *
* Copyright (C) 2000-2021, CERN. *
* All rights reserved. *
* Authors: L.Orsini, D. Simelevicius *
* Authors: L.Orsini, A. Petrucci, D. Simelevicius *
* For the licensing terms see LICENSE. *
* For the list of contributors see CREDITS. *
*************************************************************************/
*************************************************************************
*/
#ifndef _pt_vpi_header_info_h_
#define _pt_vpi_header_info_h_
......@@ -36,8 +38,8 @@ namespace pt
static bool insertHeader()
{
return true;
}
return true;
}
static void writeLength (char * bufferPtr, size_t len)
{
......
// $Id$
/*************************************************************************
/*
*************************************************************************
* XDAQ Components for Distributed Data Acquisition *
* Copyright (C) 2000-2020, CERN. *
* Copyright (C) 2000-2021, CERN. *
* All rights reserved. *
* Authors: L.Orsini, D. Simelevicius *
* Authors: L.Orsini, A. Petrucci, D. Simelevicius *
* For the licensing terms see LICENSE. *
* For the list of contributors see CREDITS. *
*************************************************************************/
*************************************************************************
*/
#ifndef _pt_vpi_Input_h_
#define _pt_vpi_Input_h_
......@@ -34,10 +36,11 @@ class PeerTransport;
class Input: public pt::pipe::Input
{
friend class PeerTransport;
public:
Input(std::shared_ptr<pt::pipe::ConnectionRequest> & pcr, pt::pipe::InputListener * listener , pt::vpi::PeerTransport * pt);
virtual ~Input() {};
pt::pipe::InputListener * getListener();
protected:
......@@ -50,15 +53,15 @@ protected:
class RInput: public pt::vpi::Input
{
friend class PeerTransport;
public:
RInput(std::shared_ptr<pt::pipe::ConnectionRequest> & pcr, pt::pipe::InputListener * listener , pt::vpi::PeerTransport * pt);
RInput(std::shared_ptr<pt::pipe::ConnectionRequest> & pcr, pt::pipe::InputListener * listener , pt::vpi::PeerTransport * pt,size_t n);
virtual ~RInput();
void postFrame(toolbox::mem::Reference * ref );
bool empty();
toolbox::mem::Reference * completed();
protected:
ibvla::QueuePair qp_;
......@@ -70,6 +73,7 @@ protected:
class LInput: public pt::vpi::Input
{
friend class PeerTransport;
public:
LInput(std::shared_ptr<pt::pipe::ConnectionRequest> & pcr, pt::pipe::InputListener * listener , pt::vpi::PeerTransport * pt);
......@@ -78,7 +82,6 @@ public:
bool empty();
toolbox::mem::Reference * completed();
protected:
toolbox::rlist<toolbox::mem::Reference*> * io_;
......
......@@ -5,7 +5,7 @@
* XDAQ Components for Distributed Data Acquisition *
* Copyright (C) 2000-2021, CERN. *
* All rights reserved. *
* Authors: L.Orsini, D. Simelevicius *
* Authors: L.Orsini, A. Petrucci, D. Simelevicius *
* For the licensing terms see LICENSE. *
* For the list of contributors see CREDITS. *
*************************************************************************
......@@ -37,17 +37,13 @@ class PeerTransport;
class Output: public pt::pipe::Output
{
friend class PeerTransport;
public:
Output(std::shared_ptr<pt::pipe::EstablishedConnection> & pec, pt::pipe::OutputListener * listener, pt::vpi::PeerTransport * pt);
virtual ~Output() {};
//void postFrame(toolbox::mem::Reference * ref );
//bool empty();
//toolbox::mem::Reference * completed();
pt::pipe::OutputListener * getOutputListener();
//virtual bool isLocal() = 0;
virtual const std::type_info& getType() const = 0;
protected:
......@@ -62,7 +58,7 @@ class ROutput: public pt::vpi::Output
friend class PeerTransport;
public:
ROutput(std::shared_ptr<pt::pipe::EstablishedConnection> & pec, pt::pipe::OutputListener * listener, pt::vpi::PeerTransport * pt);
ROutput(std::shared_ptr<pt::pipe::EstablishedConnection> & pec, pt::pipe::OutputListener * listener, pt::vpi::PeerTransport * pt, size_t n);
virtual ~ROutput();
void postFrame(toolbox::mem::Reference * ref );
bool empty();
......
// $Id$
/*************************************************************************
/*
*************************************************************************
* XDAQ Components for Distributed Data Acquisition *
* Copyright (C) 2000-2020, CERN. *
* Copyright (C) 2000-2021, CERN. *
* All rights reserved. *
* Authors: L.Orsini, D. Simelevicius *
* Authors: L.Orsini, A. Petrucci, D. Simelevicius *
* For the licensing terms see LICENSE. *
* For the list of contributors see CREDITS. *
*************************************************************************/
*************************************************************************
*/
#ifndef _pt_vpi_PeerTransport_h
#define _pt_vpi_PeerTransport_h
......@@ -46,6 +48,7 @@
#include "pt/pipe/EstablishedConnection.h"
#include "pt/pipe/ConnectionRequest.h"
#include "pt/pipe/Service.h"
#include "pt/vpi/Protocol.h"
namespace pt
{
......@@ -54,9 +57,9 @@ namespace vpi
const size_t MAX_ASYNC_EVENT = 19 + 1;
const size_t MAX_WC_EVENT = 22 + 1; // number of possible errors + 2
typedef std::tuple<size_t,size_t,std::string> LocalPipeInfo;
typedef std::tuple<size_t,size_t,size_t,size_t,std::string> LocalPipeInfo;
class PeerTransport : public pt::pipe::Service, public pt::PeerTransportSender, public pt::PeerTransportReceiver, public xdaq::Object, public ibvla::EventHandler, public ibvla::AcceptorListener
class PeerTransport : public pt::pipe::Service, public pt::PeerTransportSender, public pt::PeerTransportReceiver, public xdaq::Object, public ibvla::EventHandler, public pt::vpi::ActionListener
{
friend class RInput;
friend class ROutput;
......@@ -121,19 +124,18 @@ public:
// Pipes service support
void connect(pt::Address::Reference source, pt::Address::Reference destination, pt::pipe::ServiceListener * l, std::shared_ptr<void> & user_data );
pt::pipe::Output * createOutputPipe( std::shared_ptr<pt::pipe::EstablishedConnection> & ec, pt::pipe::OutputListener * listener );
pt::pipe::Input * createInputPipe( std::shared_ptr<pt::pipe::ConnectionRequest> & ec, pt::pipe::InputListener * listener );
pt::pipe::Input * createInputPipe( std::shared_ptr<pt::pipe::ConnectionRequest> & cr, pt::pipe::InputListener * listener );
pt::pipe::Output * createOutputPipe( std::shared_ptr<pt::pipe::EstablishedConnection> & ec, pt::pipe::OutputListener * listener, size_t n);
pt::pipe::Input * createInputPipe( std::shared_ptr<pt::pipe::ConnectionRequest> & cr, pt::pipe::InputListener * listener, size_t n);
void destroyInputPipe(pt::pipe::Input * ip);
void destroyOutputPipe(pt::pipe::Output * op);
protected:
void connect (ibvla::QueuePair & qp, const std::string & host, size_t port, size_t path, int sgid_index, int is_global) ;
//void remoteconnect (ibvla::QueuePair & qp, const std::string & host, size_t port, size_t path, int sgid_index, int is_global, int sl, int traffic_class, const std::string & network);
void localconnect( size_t qpsize, size_t cqsize, const std::string network, pt::pipe::ServiceListener * listener, std::shared_ptr<void> user_context);
//std::list<ibvla::QueuePair> acceptedQP_;
// ibvla callbacks
void handleEvent (struct ibv_async_event * event);
......@@ -142,21 +144,21 @@ protected:
void destroyCompletionQueue(ibvla::CompletionQueue & cq);
// Acceptor listener
void connectionRequest (ibvla::ConnectionRequest id);
void actionPerformed (std::shared_ptr<Channel> channel);
void moveQueuePairIntoError (ibvla::QueuePair & qp);
void resetQueuePair (ibvla::QueuePair & qp);
void cleanUpWorkRequestError (struct ibv_wc * wc) ;
//toolbox::BSem mutex_;
ibvla::EventWorkLoop * ewl_;
pt::Listener * listener_;
ibvla::Context context_;
ibvla::ProtectionDomain pd_;
ibvla::Acceptor * acceptor_;
pt::vpi::Acceptor * acceptor_;
toolbox::mem::Pool * pool_;
toolbox::mem::MemoryPoolFactory * factory_;
......@@ -176,11 +178,6 @@ public:
xdata::UnsignedInteger errors_;
xdata::UnsignedInteger events_;
//std::list<pt::vpi::Input*> ipipes_;
//std::list<pt::vpi::Output*> opipes_;
//std::list<pt::vpi::LocalInput*> lipipes_;
//std::list<pt::vpi::LocalOutput*> lopipes_;
std::list<pt::vpi::Input*> ipipes_;
std::list<pt::vpi::Output*> opipes_;
......
// $Id$
/*
*************************************************************************
* XDAQ Components for Distributed Data Acquisition *
* Copyright (C) 2000-2021, CERN. *
* All rights reserved. *
* Authors: L.Orsini, A.Petrucci, D. Simelevicius *
* For the licensing terms see LICENSE. *
* For the list of contributors see CREDITS. *
*************************************************************************
*/
#ifndef _pt_vpi_Protocol_h
#define _pt_vpi_Protocol_h
#include <memory>
#include <string>
#include <nlohmann/json.hpp>
#include "ibvla/QueuePair.h"
namespace pt
{
namespace vpi
{
class Channel
{
public:
Channel (int connfd, nlohmann::json & cookie, const std::string & originatorhost, const std::string & destinationhost, unsigned int destinationport, const std::string & ip);
~Channel();
nlohmann::json receive();
void send(nlohmann::json & document);
std::string getDestinationIP();
std::string getDestinationHost();
std::string getOriginatorHost();
unsigned int getDestinationPort();
void exchange_with_server(ibvla::QueuePair &qp, enum ibv_mtu mtu, int sgid_index, size_t ibPath, int is_global, int sl, int traffic_class, const std::string & network, bool sendWithTimeout);
void exchange_with_client(ibvla::QueuePair &qp, enum ibv_mtu mtu, int sgid_index, size_t ibPath, int is_global, int sl, int traffic_class, const std::string & network);
nlohmann::json getCookie();
protected:
size_t nreceive (char* buf, size_t len);
size_t receive(char * buf, size_t len );
void close ();
int sockfd_;
nlohmann::json cookie_;
std::string originatorhost_;
std::string destinationhost_;
unsigned int destinationport_;
std::string ip_;
};
class ActionListener
{
public:
virtual void actionPerformed (std::shared_ptr<Channel> channel) = 0;
};
class Connector
{
public:
Connector (nlohmann::json & cookie);
~Connector();
std::shared_ptr<Channel> connect ( const std::string & hostname, unsigned int port);
protected:
nlohmann::json cookie_;
};
class Acceptor
{
public:
Acceptor ( ActionListener * listener, nlohmann::json & cookie);
~Acceptor();
void listen (const std::string & hostname, unsigned int port, int num);
void accept ();
protected:
int listenfd_;
ActionListener * listener_;
std::string originatorhost_;
unsigned int destinationport_;
nlohmann::json cookie_;
};
}
}
#endif
// $Id$
/*************************************************************************
/*
*************************************************************************
* XDAQ Components for Distributed Data Acquisition *
* Copyright (C) 2000-2020, CERN. *
* Copyright (C) 2000-2021, CERN. *
* All rights reserved. *
* Authors: L.Orsini, D. Simelevicius *
* Authors: L.Orsini, A.Petrucci, D. Simelevicius *
* For the licensing terms see LICENSE. *
* For the list of contributors see CREDITS. *
*************************************************************************/
*************************************************************************
*/
#ifndef _pt_vpi_Local_h_
#define _pt_vpi_Local_h_
......@@ -16,6 +18,7 @@
#include "toolbox/rlist.h"
#include "pt/vpi/PeerTransport.h"
#include "pt/vpi/ConnectionRequest.h"
#include "pt/vpi/Protocol.h"
#include "ibvla/ConnectionRequest.h"
#include "ibvla/CompletionQueue.h"
......@@ -43,7 +46,8 @@ struct LSupport: public Support
struct RSupport: public Support
{
RSupport(pt::PeerTransport * pt, ibvla::QueuePair & qp, ibvla::CompletionQueue & cs, ibvla::CompletionQueue &cr);
RSupport(pt::PeerTransport * pt, std::shared_ptr<pt::vpi::Channel> & channel, ibvla::Context & context, ibvla::ProtectionDomain & pd);
~RSupport();
const std::type_info& getType() const
{
......@@ -51,9 +55,9 @@ struct RSupport: public Support
}
pt::PeerTransport * pt;
ibvla::QueuePair io;
ibvla::CompletionQueue cqs;
ibvla::CompletionQueue cqr;
std::shared_ptr<pt::vpi::Channel> channel;
ibvla::Context context;
ibvla::ProtectionDomain pd;
};
......
// $Id$
/*************************************************************************
/*
*************************************************************************
* XDAQ Components for Distributed Data Acquisition *
* Copyright (C) 2000-2020, CERN. *
* Copyright (C) 2000-2021, CERN. *
* All rights reserved. *
* Authors: L.Orsini, D. Simelevicius *
* Authors: L.Orsini, A. PEtrucci, D. Simelevicius *
* For the licensing terms see LICENSE. *
* For the list of contributors see CREDITS. *
*************************************************************************/
*************************************************************************
*/
#ifndef _pt_vpi_qpInfo_h
#define _pt_vpi_qpInfo_h
......@@ -21,7 +23,7 @@ namespace pt
{
typedef struct qpInfo
{
qpInfo (std::string ip, std::string host, std::string port, std::string network)
qpInfo (std::string ip, std::string host, std::string port, std::string network,size_t qp_len, size_t scq_len, size_t rcq_len)
{
this->ip = ip;
this->host = host;
......@@ -33,6 +35,9 @@ namespace pt
remote_qpn = 0;
startConnectionTime = toolbox::TimeVal::zero();
endConnectionTime = toolbox::TimeVal::zero();
this->qp_len = qp_len;
this->scq_len = scq_len;
this->rcq_len = rcq_len;
}
......@@ -46,6 +51,9 @@ namespace pt
size_t remote_qpn;
toolbox::TimeVal startConnectionTime;
toolbox::TimeVal endConnectionTime;
size_t qp_len;
size_t scq_len;
size_t rcq_len;
} QP_INFO;
}
......
// $Id$
/*************************************************************************
/*
*************************************************************************
* XDAQ Components for Distributed Data Acquisition *
* Copyright (C) 2000-2020, CERN. *
* Copyright (C) 2000-2021, CERN. *
* All rights reserved. *
* Authors: L.Orsini, D. Simelevicius *
* Authors: L.Orsini, A. Petrucci, D. Simelevicius *
* For the licensing terms see LICENSE. *
* For the list of contributors see CREDITS. *
*************************************************************************/
*************************************************************************
*/