Commit 7f3b34de authored by Luciano Orsini's avatar Luciano Orsini
Browse files

references #238: display qp and local pipes info

parent 0cb7a5be
......@@ -27,14 +27,12 @@ public:
std::shared_ptr<void> getConnectionHandle();
std::string getNetwork();
pt::PeerTransport * getPeerTransport();
//bool isLocal();
protected:
pt::PeerTransport *pt_;
std::shared_ptr<void> ch_;
std::string network_;
//bool local_;
};
}
}
......
......@@ -24,19 +24,19 @@ class EstablishedConnection: public pt::pipe::EstablishedConnection
public:
//EstablishedConnection(pt::PeerTransport * pt, std::shared_ptr<void> & ch, std::shared_ptr<void> & context, bool local);
EstablishedConnection(pt::PeerTransport * pt, std::shared_ptr<void> & ch, std::shared_ptr<void> & context);
EstablishedConnection(pt::PeerTransport * pt, std::shared_ptr<void> & ch, std::shared_ptr<void> & context, const std::string & network);
virtual ~EstablishedConnection();
std::shared_ptr<void> getConnectionHandle();
void * getContext();
pt::PeerTransport * getPeerTransport();
//bool isLocal();
std::string getNetwork();
protected:
pt::PeerTransport * pt_;
std::shared_ptr<void> ch_;
std::shared_ptr<void> context_;
//bool local_;
std::string network_;
};
}
}
......
......@@ -37,11 +37,8 @@ class Input: public pt::pipe::Input
public:
Input(std::shared_ptr<pt::pipe::ConnectionRequest> & pcr, pt::pipe::InputListener * listener , pt::vpi::PeerTransport * pt);
virtual ~Input() {};
//void postFrame(toolbox::mem::Reference * ref );
//bool empty();
//toolbox::mem::Reference * completed();
pt::pipe::InputListener * getListener();
//virtual bool isLocal() = 0;
protected:
......@@ -60,8 +57,7 @@ public:
void postFrame(toolbox::mem::Reference * ref );
bool empty();
toolbox::mem::Reference * completed();
//bool isLocal();
//pt::pipe::InputListener * getListener();
protected:
......@@ -81,8 +77,7 @@ public:
void postFrame(toolbox::mem::Reference * ref );
bool empty();
toolbox::mem::Reference * completed();
//bool isLocal();
//pt::pipe::InputListener * getListener();
protected:
......
// $Id$
/*************************************************************************
/*
*************************************************************************
* XDAQ Components for Distributed Data Acquisition *
* Copyright (C) 2000-2020, CERN. *
* Copyright (C) 2000-2021, CERN. *
* All rights reserved. *
* Authors: L.Orsini, D. Simelevicius *
* For the licensing terms see LICENSE. *
* For the list of contributors see CREDITS. *
*************************************************************************/
*************************************************************************
*/
#ifndef _pt_vpi_Output_h_
#define _pt_vpi_Output_h_
......@@ -44,6 +46,7 @@ public:
//toolbox::mem::Reference * completed();
pt::pipe::OutputListener * getOutputListener();
//virtual bool isLocal() = 0;
virtual const std::type_info& getType() const = 0;
protected:
......@@ -65,7 +68,10 @@ public:
bool empty();
toolbox::mem::Reference * completed();
//pt::pipe::OutputListener * getOutputListener();
//bool isLocal();
const std::type_info& getType() const
{
return typeid(ROutput);
}
protected:
......@@ -88,6 +94,10 @@ public:
toolbox::mem::Reference * completed();
//pt::pipe::OutputListener * getOutputListener();
//bool isLocal();
const std::type_info& getType() const
{
return typeid(LOutput);
}
protected:
......
......@@ -54,6 +54,8 @@ namespace vpi
const size_t MAX_ASYNC_EVENT = 19 + 1;
const size_t MAX_WC_EVENT = 22 + 1; // number of possible errors + 2
typedef std::tuple<size_t,size_t,std::string> LocalPipeInfo;
class PeerTransport : public pt::pipe::Service, public pt::PeerTransportSender, public pt::PeerTransportReceiver, public xdaq::Object, public ibvla::EventHandler, public ibvla::AcceptorListener
{
friend class RInput;
......@@ -112,6 +114,10 @@ public:
std::list<ibvla::QueuePair> getAcceptedQueuePairs ();
std::list<ibvla::QueuePair> getConnectedQueuePairs ();
std::list<pt::vpi::LocalPipeInfo> getAcceptedLocalPipes ();
std::list<pt::vpi::LocalPipeInfo> getConnectedLocalPipes ();
// Pipes service support
void connect(pt::Address::Reference source, pt::Address::Reference destination, pt::pipe::ServiceListener * l, std::shared_ptr<void> & user_data );
pt::pipe::Output * createOutputPipe( std::shared_ptr<pt::pipe::EstablishedConnection> & ec, pt::pipe::OutputListener * listener );
......@@ -127,7 +133,7 @@ protected:
std::list<ibvla::QueuePair> acceptedQP_;
//std::list<ibvla::QueuePair> acceptedQP_;
// ibvla callbacks
void handleEvent (struct ibv_async_event * event);
......
......@@ -21,11 +21,12 @@ namespace pt
{
typedef struct qpInfo
{
qpInfo (std::string ip, std::string host, std::string port)
qpInfo (std::string ip, std::string host, std::string port, std::string network)
{
this->ip = ip;
this->host = host;
this->port = port;
this->network = network;
send_count = 0;
recv_count = 0;
error_count = 0;
......@@ -37,6 +38,7 @@ namespace pt
std::string host;
std::string port;
std::string network;
std::string ip;
size_t send_count;
size_t recv_count;
......
......@@ -9,6 +9,9 @@
* For the list of contributors see CREDITS. *
*************************************************************************/
#include <iomanip> // std::setfill, std::setw
#include "pt/vpi/Application.h"
#include "pt/vpi/Address.h"
......@@ -128,99 +131,6 @@ void pt::vpi::Application::actionPerformed (xdata::Event& e)
}
}
/*
void pt::vpi::Application::autoConnect ()
{
std::vector<const xdaq::Network*> networks = this->getApplicationContext()->getNetGroup()->getNetworks();
for (std::vector<const xdaq::Network*>::iterator n = networks.begin(); n != networks.end(); n++)
{
if ((*n)->getProtocol() == "vpi")
{
if ((*n)->isEndpointExisting(this->getApplicationDescriptor()->getContextDescriptor()))
{
pt::Address::Reference local = (*n)->getAddress(this->getApplicationDescriptor()->getContextDescriptor());
std::stringstream ss;
ss << "Found local network : " << (*n)->getName() << " - " << (*n)->getProtocol() << " - " << local->toString();
LOG4CPLUS_DEBUG(this->getApplicationLogger(), ss.str());
std::vector<xdaq::ContextDescriptor*> contexts = this->getApplicationContext()->getContextTable()->getContextDescriptors();
for (std::vector<xdaq::ContextDescriptor*>::iterator c = contexts.begin(); c != contexts.end(); c++)
{
if ((*c)->getURL() != this->getApplicationDescriptor()->getContextDescriptor()->getURL())
{
if ((*n)->isEndpointExisting((*c)))
{
LOG4CPLUS_DEBUG(this->getApplicationLogger(), "Found Endpoint matching in network " << (*n)->getName() << " for remote context " << (*c)->getURL());
pt::Address::Reference remote;
try
{
remote = (*n)->getAddress(*c);
}
catch (...)
{
// there is no matching address for this network
continue;
}
if (!remote->equals(local))
{
pt::vpi::Address & rAddr = dynamic_cast<pt::vpi::Address&>(*remote);
std::stringstream ss;
ss << "Found remote network (cacheing messenger) : " << (*n)->getName() << " - " << (*n)->getProtocol() << " url: " << rAddr.toString();
LOG4CPLUS_DEBUG(this->getApplicationLogger(), ss.str());
pt::Messenger::Reference mr = pt::getPeerTransportAgent()->getMessenger(remote, local);
pt::vpi::I2OMessenger & m = dynamic_cast<pt::vpi::I2OMessenger&>(*mr);
try
{
m.postConnect();
}
catch (pt::vpi::exception::Exception & e)
{
LOG4CPLUS_ERROR(this->getApplicationLogger(), "failed to connect cached messenger");
this->notifyQualified("error", e);
try
{
toolbox::Event::Reference input(new toolbox::Event("fail", this));
fsm_.fireEvent(input);
}
catch (toolbox::fsm::exception::Exception & exception)
{
XCEPT_DECLARE_NESTED(pt::exception::Exception, ex, "Invalid command", exception);
this->notifyQualified("fatal", ex);
}
}
}
std::string enabled("Enabled");
if (fsm_.getStateName(fsm_.getCurrentState()).compare(enabled) != 0)
{
try
{
toolbox::Event::Reference input(new toolbox::Event("enable", this));
fsm_.fireEvent(input);
}
catch (toolbox::fsm::exception::Exception & exception)
{
XCEPT_DECLARE_NESTED(pt::exception::Exception, ex, "Invalid command", exception);
this->notifyQualified("fatal", ex);
}
}
}
else
{
LOG4CPLUS_DEBUG(this->getApplicationLogger(), "No Endpoint matching in network " << (*n)->getName() << " for remote context " << (*c)->getURL());
}
}
}
}
}
}
}
*/
void pt::vpi::Application::Default (xgi::Input * in, xgi::Output * out)
{
......@@ -261,10 +171,13 @@ void pt::vpi::Application::SenderTabPage (xgi::Output * sout)
*sout << cgicc::tr();
*sout << cgicc::th("IP");
*sout << cgicc::th("Hostname");
*sout << cgicc::th("Recv count");
*sout << cgicc::th("Port");
*sout << cgicc::th("Send count");
*sout << cgicc::th("Local QPN");
*sout << cgicc::th("Remote QPN");
*sout << cgicc::th("QP State");
*sout << cgicc::th("Connection Time");
*sout << cgicc::th("Network");
//*sout << cgicc::th("");
*sout << cgicc::tr() << std::endl;
......@@ -272,20 +185,14 @@ void pt::vpi::Application::SenderTabPage (xgi::Output * sout)
*sout << cgicc::tbody() << std::endl;
/*
for (std::list<pt::Messenger::Reference>::iterator i = messengers.begin(); i != messengers.end(); i++)
{
pt::vpi::I2OMessenger * m = dynamic_cast<pt::vpi::I2OMessenger*>(&(*(*i)));
*sout << *m << std::endl;
}
*/
for (std::list<ibvla::QueuePair>::iterator i = qps.begin(); i != qps.end(); i++)
{
struct pt::vpi::qpInfo * info = ((struct pt::vpi::qpInfo *) (*i).getContext());
*sout << cgicc::tr();
*sout << "<td>" << info->ip << "</td>"; // ip
*sout << "<td>" << info->host << "</td>"; // hostname
*sout << "<td>" << info->recv_count << "</td>"; //
*sout << "<td>" << info->port << "</td>"; // port
*sout << "<td>" << info->send_count << "</td>"; //
*sout << "<td>";
*sout << (*i).qp_->qp_num << std::endl; // qp
*sout << "</td>";
......@@ -308,6 +215,10 @@ void pt::vpi::Application::SenderTabPage (xgi::Output * sout)
*sout << ibvla::stateToString(*i) << std::endl; // qp
*sout << "</td>";
toolbox::TimeInterval connectionTime = info->endConnectionTime - info->startConnectionTime;
*sout << "<td>" << connectionTime.toString() << "." << std::setfill('0') << std::setw(6) << connectionTime.usec() << "</td>";
*sout << "<td>" << info->network << "</td>"; // network
//*sout << "<td><a href=\"destroyAcceptedQueuePair?qpn=" << (*i).qp_->qp_num << "\">Destroy</a></td>";
//*sout << "<td><input type=\"button\" class=\"red-button\" onClick=\"parent.location='destroyAcceptedQueuePair?qpn=" << (*i).qp_->qp_num << "'\" value=\"Destroy\"></td>" << std::endl;
......@@ -316,32 +227,40 @@ void pt::vpi::Application::SenderTabPage (xgi::Output * sout)
*sout << cgicc::tbody();
*sout << cgicc::table();
}
void pt::vpi::Application::ReceiverTabPage (xgi::Output * sout)
{
*sout << cgicc::table().set("class", "xdaq-table") << std::endl;
*sout << cgicc::caption(iaName_) << "-" << " receive completion queue " << std::endl;
auto tuples = pt_->getConnectedLocalPipes();
*sout << cgicc::thead() << std::endl;
*sout << cgicc::table().set("class", "xdaq-table") << std::endl;
*sout << cgicc::tr();
*sout << cgicc::th("Idle");
*sout << cgicc::th("Actual receive");
*sout << cgicc::tr() << std::endl;
*sout << cgicc::thead() << std::endl;
*sout << cgicc::caption("Local") << std::endl;
*sout << cgicc::thead() << std::endl;
*sout << cgicc::tbody() << std::endl;
*sout << cgicc::tr();
//*sout << "<td>" << pt_->getCompletionWorLoopReceiver()->emptyDispatcherQueueCounter_ << "</td>";
//*sout << "<td>" << pt_->getCompletionWorLoopReceiver()->receivedEventCounter_ << "</td>";
*sout << cgicc::tr();
*sout << cgicc::tbody();
*sout << cgicc::table();
*sout << cgicc::tr();
*sout << cgicc::th("Output requests");
*sout << cgicc::th("Completed requests");
*sout << cgicc::th("network");
*sout << cgicc::tr() << std::endl;
*sout << cgicc::thead() << std::endl;
*sout << cgicc::tbody() << std::endl;
for (auto const& [ongoing, completed, network] : tuples)
{
*sout << cgicc::tr();
*sout << "<td>" << ongoing << "</td>"; // ip
*sout << "<td>" << completed << "</td>"; // hostname
*sout << "<td>" << network << "</td>"; // port
*sout << cgicc::tr() << std::endl;
}
*sout << cgicc::tbody();
*sout << cgicc::table();
}
void pt::vpi::Application::ReceiverTabPage (xgi::Output * sout)
{
// queue pair statistics
......@@ -356,10 +275,12 @@ void pt::vpi::Application::ReceiverTabPage (xgi::Output * sout)
*sout << cgicc::tr();
*sout << cgicc::th("IP");
*sout << cgicc::th("Hostname");
*sout << cgicc::th("Port");
*sout << cgicc::th("Recv count");
*sout << cgicc::th("Local QPN");
*sout << cgicc::th("Remote QPN");
*sout << cgicc::th("QP State");
*sout << cgicc::th("Network");
//*sout << cgicc::th("");
*sout << cgicc::tr() << std::endl;
......@@ -373,6 +294,7 @@ void pt::vpi::Application::ReceiverTabPage (xgi::Output * sout)
*sout << cgicc::tr();
*sout << "<td>" << info->ip << "</td>"; // ip
*sout << "<td>" << info->host << "</td>"; // hostname
*sout << "<td>" << info->port << "</td>"; // port
*sout << "<td>" << info->recv_count << "</td>"; //
*sout << "<td>";
*sout << (*i).qp_->qp_num << std::endl; // qp
......@@ -396,7 +318,7 @@ void pt::vpi::Application::ReceiverTabPage (xgi::Output * sout)
*sout << ibvla::stateToString(*i) << std::endl; // qp
*sout << "</td>";
*sout << "<td>" << info->network << "</td>"; // network
//*sout << "<td><a href=\"destroyAcceptedQueuePair?qpn=" << (*i).qp_->qp_num << "\">Destroy</a></td>";
//*sout << "<td><input type=\"button\" class=\"red-button\" onClick=\"parent.location='destroyAcceptedQueuePair?qpn=" << (*i).qp_->qp_num << "'\" value=\"Destroy\"></td>" << std::endl;
*sout << cgicc::tr() << std::endl;
......@@ -404,6 +326,39 @@ void pt::vpi::Application::ReceiverTabPage (xgi::Output * sout)
*sout << cgicc::tbody();
*sout << cgicc::table();
auto tuples = pt_->getAcceptedLocalPipes();
*sout << cgicc::table().set("class", "xdaq-table") << std::endl;
*sout << cgicc::caption("Local") << std::endl;
*sout << cgicc::thead() << std::endl;
*sout << cgicc::tr();
*sout << cgicc::th("Output requests");
*sout << cgicc::th("Completed requests");
*sout << cgicc::th("network");
*sout << cgicc::tr() << std::endl;
*sout << cgicc::thead() << std::endl;
*sout << cgicc::tbody() << std::endl;
for (auto const& [ongoing, completed, network] : tuples)
{
*sout << cgicc::tr();
*sout << "<td>" << ongoing << "</td>"; // ip
*sout << "<td>" << completed << "</td>"; // hostname
*sout << "<td>" << network << "</td>"; // port
*sout << cgicc::tr() << std::endl;
}
*sout << cgicc::tbody();
*sout << cgicc::table();
}
void pt::vpi::Application::SettingsTabPage (xgi::Output * sout)
......
......@@ -13,34 +13,12 @@
#include "pt/vpi/PeerTransport.h"
#include "toolbox/rlist.h"
//pt::vpi::EstablishedConnection::EstablishedConnection(pt::PeerTransport * pt, std::shared_ptr<void> & ch, std::shared_ptr<void> & context, bool local): pt_(pt), ch_(ch), context_(context),local_(local)
pt::vpi::EstablishedConnection::EstablishedConnection(pt::PeerTransport * pt, std::shared_ptr<void> & ch, std::shared_ptr<void> & context): pt_(pt), ch_(ch), context_(context)
pt::vpi::EstablishedConnection::EstablishedConnection(pt::PeerTransport * pt, std::shared_ptr<void> & ch, std::shared_ptr<void> & context, const std::string & network): pt_(pt), ch_(ch), context_(context), network_(network)
{
}
pt::vpi::EstablishedConnection::~EstablishedConnection()
{
#ifdef CUT
pt::vpi::PeerTransport * pt = dynamic_cast<pt::vpi::PeerTransport*>(pt_);
if (this->isLocal())
{
// destroy completion queues and qp
/* using PipeConnectionHandle = std::tuple<toolbox::rlist<toolbox::mem::Reference*> *,toolbox::rlist<toolbox::mem::Reference*> *>;
PipeConnectionHandle * ch = (PipeConnectionHandle*)ch_.get();
toolbox::rlist<toolbox::mem::Reference*>::destroy(std::get<0>(*ch));
toolbox::rlist<toolbox::mem::Reference*>::destroy(std::get<1>(*ch));
*/
}
else
{
// destroy completion queues and qp
using PipeConnectionHandle = std::tuple<ibvla::QueuePair,ibvla::CompletionQueue,ibvla::CompletionQueue>;
PipeConnectionHandle * ch = (PipeConnectionHandle*)ch_.get();
pt->destroyQueuePair(std::get<0>(*ch));
pt->destroyCompletionQueue(std::get<1>(*ch));
pt->destroyCompletionQueue(std::get<2>(*ch));
}
#endif
}
std::shared_ptr<void> pt::vpi::EstablishedConnection::getConnectionHandle()
......@@ -57,11 +35,10 @@ pt::PeerTransport * pt::vpi::EstablishedConnection::getPeerTransport()
{
return pt_;
}
/*
bool pt::vpi::EstablishedConnection::isLocal()
std::string pt::vpi::EstablishedConnection::getNetwork()
{
return local_;
return network_;
}
*/
......@@ -40,15 +40,7 @@ pt::pipe::OutputListener * pt::vpi::Output::getOutputListener()
pt::vpi::ROutput::ROutput(std::shared_ptr<pt::pipe::EstablishedConnection> & pec, pt::pipe::OutputListener * listener, pt::vpi::PeerTransport * pt): pt::vpi::Output::Output(pec,listener,pt)
{
/*
using PipeConnectionHandle = std::tuple<ibvla::QueuePair, ibvla::CompletionQueue, ibvla::CompletionQueue>;
PipeConnectionHandle * ch = (PipeConnectionHandle*)pec_->getConnectionHandle().get();
qp_ = std::get<0>(*ch);
cqs_ = std::get<1>(*ch);
cqr_ = std::get<2>(*ch);
*/
pt::vpi::RSupport * ch = (pt::vpi::RSupport *)pec_->getConnectionHandle().get();
qp_ = ch->io;
cqs_ = ch->cqs;
......@@ -214,26 +206,10 @@ toolbox::mem::Reference * pt::vpi::ROutput::completed()
return ref;
}
/*
bool pt::vpi::ROutput::isLocal()
{
return false;
}
*/
// Local Output Support
pt::vpi::LOutput::LOutput(std::shared_ptr<pt::pipe::EstablishedConnection> & pec, pt::pipe::OutputListener * listener, pt::vpi::PeerTransport * pt): pt::vpi::Output::Output(pec,listener,pt)
{
/*
using PipeConnectionHandle = std::tuple<toolbox::rlist<toolbox::mem::Reference*> *, toolbox::rlist<toolbox::mem::Reference*> *>;
PipeConnectionHandle * ch = (PipeConnectionHandle*)pec_->getConnectionHandle().get();
io_ = std::get<0>(*ch);
cq_ = std::get<1>(*ch);
*/
pt::vpi::LSupport * ch = (pt::vpi::LSupport *)pec_->getConnectionHandle().get();
io_ = ch->io;
cq_ = ch->cq;
......@@ -261,13 +237,4 @@ toolbox::mem::Reference * pt::vpi::LOutput::completed()
return ref;
}
/*
bool pt::vpi::LOutput::isLocal()
{
return true;
}
*/
......@@ -653,7 +653,7 @@ void pt::vpi::PeerTransport::connectionRequest (ibvla::ConnectionRequest id)
//qp = pd_.createQueuePair(cqs, id.acceptor_->cqr_, 0, app->recvQueuePairSize_, new struct qpInfo(id.ip_, id.hostname_, ""));
// use a private completion queue, since pipes are standalone objects
qp = pd_.createQueuePair(cqs, cqr, 0, app->recvQueuePairSize_, new struct qpInfo(id.ip_, id.hostname_, ""));
qp = pd_.createQueuePair(cqs, cqr, 0, app->recvQueuePairSize_, new struct qpInfo(id.ip_, id.hostname_, "", id.network_));
// init qp
......@@ -714,7 +714,7 @@ void pt::vpi::PeerTransport::connectionRequest (ibvla::ConnectionRequest id)
return;
}
acceptedQP_.push_back(qp);
//acceptedQP_.push_back(qp);
// invoke listener here asynchronously to prevent blocking in pipe member application
LOG4CPLUS_DEBUG(this->getOwnerApplication()->getApplicationLogger(), "Connection request for network " << id.network_);
......@@ -1412,6 +1412,51 @@ void pt::vpi::PeerTransport::resetQueuePair (ibvla::QueuePair & qp)
}
}
std::list<pt::vpi::LocalPipeInfo> pt::vpi::PeerTransport::getConnectedLocalPipes ()
{
std::lock_guard<std::mutex> guard(lock_);
std::list<pt::vpi::LocalPipeInfo> tuples;
for ( auto o = opipes_.begin(); o != opipes_.end(); o++)
{
pt::vpi::Output * opipe = *o;
auto ec = static_cast<pt::vpi::EstablishedConnection*>(opipe->pec_.get());
auto ch = static_cast<pt::vpi::LSupport *>(ec->getConnectionHandle().get());
std::string network = ec->getNetwork();
if ( typeid(*opipe) == typeid(pt::vpi::LOutput) )
{
pt::vpi::LOutput * lo = dynamic_cast<pt::vpi::LOutput*>(*o);
tuples.push_back({ch->io->elements(),ch->cq->elements(), network});
}
}
return tuples;
}