Commit 562a947d authored by Luciano Orsini's avatar Luciano Orsini
Browse files

references #249: improved thread invokation and clean up

parent 0da6a26e
......@@ -18,7 +18,7 @@
// !!! Edit this line to reflect the latest package version !!!
#define CORE_PTVPI_VERSION_MAJOR 1
#define CORE_PTVPI_VERSION_MINOR 3
#define CORE_PTVPI_VERSION_MINOR 4
#define CORE_PTVPI_VERSION_PATCH 0
// If any previous versions available E.g. #define CORE_PTVPI_PREVIOUS_VERSIONS "3.8.0,3.8.1"
#undef CORE_PTVPI_PREVIOUS_VERSIONS
......
......@@ -544,13 +544,7 @@ void pt::vpi::PeerTransport::config (pt::Address::Reference address)
try
{
LOG4CPLUS_DEBUG(this->getOwnerApplication()->getApplicationLogger(), "going to listen on host:port " << a.getHost() << ":" << a.getPortNum() << " for network " << a.getNetwork());
// this completion quue is not used, completion queue is created for each pipe
//ibvla::CompletionQueue cqr = context_.createCompletionQueue(1, 0, 0);
//ibvla::Acceptor * acceptor = new ibvla::Acceptor(a.getNetwork(), this, app->deviceMTU_, cqr, 0, a.getIBPort(), a.getIBPath(), a.getSGIDIndex(), a.getIsGlobal(), a.getSL(), a.getTrafficClass());
//acceptor->listen(a.getHost(), a.getPort());
nlohmann::json cookie;
cookie["mtu"] = (xdata::UnsignedIntegerT)app->deviceMTU_;
cookie["ibport"] = a.getIBPort();
......@@ -643,49 +637,6 @@ void pt::vpi::PeerTransport::removeAllServiceListeners ()
listener_ = 0;
}
/*
void pt::vpi::PeerTransport::remoteconnect (ibvla::QueuePair & qp, const std::string & host, size_t port, size_t path, int sgid_index, int is_global, int sl, int traffic_class, const std::string & network)
{
LOG4CPLUS_DEBUG(this->getOwnerApplication()->getApplicationLogger(), "schedule remote connection");
pt::vpi::Application * app = dynamic_cast<pt::vpi::Application *>(this->getOwnerApplication());
try
{
std::stringstream p;
p << port;
//ibvla::Connector conn(app->deviceMTU_, app->sendWithTimeout_, sgid_index, is_global);
//conn.connect(qp, host, p.str(), path);
nlohmann::json cookie = {};
pt::vpi::Connector connector(cookie);
auto channel = connector.connect(host,port);
channel->exchange_with_server(qp, ibvla::mtu_to_enum(app->deviceMTU_), sgid_index, path, is_global, sl, traffic_class, network, app->sendWithTimeout_);
channel.reset();
}
catch (ibvla::exception::Exception & e)
{
this->moveQueuePairIntoError(qp);
XCEPT_RETHROW(pt::exception::Exception, "Failed to submit connect", e);
}
try
{
struct ibv_qp_attr attr;
struct ibv_qp_init_attr init_attr;
qp.query(0, attr, init_attr);
((struct qpInfo *) qp.getContext())->remote_qpn = attr.dest_qp_num;
}
catch (ibvla::exception::Exception & e)
{
this->moveQueuePairIntoError(qp);
XCEPT_RETHROW(pt::exception::Exception, "Failed to query QP", e);
}
}
*/
void pt::vpi::PeerTransport::handleEvent (struct ibv_async_event * event)
{
......@@ -1377,14 +1328,12 @@ std::list<pt::vpi::LocalPipeInfo> pt::vpi::PeerTransport::getConnectedLocalPipes
std::string network = ec->getNetwork();
if ( typeid(*opipe) == typeid(pt::vpi::LOutput) )
{
pt::vpi::LOutput * lo = dynamic_cast<pt::vpi::LOutput*>(*o);
#warning "TBD actual rlist size"
// tuples.push_back({ch->io->elements(),ch->cq->elements(), ch->io->size(), ch->cq->size(), network});
//pt::vpi::LOutput * lo = dynamic_cast<pt::vpi::LOutput*>(*o);
// tuples.push_back({ch->io->elements(),ch->cq->elements(), ch->io->size(), ch->cq->size(), network});
tuples.push_back({ch->io->elements(),ch->cq->elements(), app->sendQueuePairSize_, app->completionQueueSize_, network});
}
}
return tuples;
}
......@@ -1403,19 +1352,16 @@ std::list<pt::vpi::LocalPipeInfo> pt::vpi::PeerTransport::getAcceptedLocalPipes
std::string network = cr->getNetwork();
if ( typeid(*ipipe) == typeid(pt::vpi::LInput) )
{
pt::vpi::LInput * li = dynamic_cast<pt::vpi::LInput*>(*i);
#warning "TBD actual rlist size"
//pt::vpi::LInput * li = dynamic_cast<pt::vpi::LInput*>(*i);
//tuples.push_back({ch->io->elements(),ch->cq->elements(), ch->io->size(), ch->cq->size(), network});
tuples.push_back({ch->io->elements(),ch->cq->elements(), app->recvQueuePairSize_, app->completionQueueSize_, network});
}
}
return tuples;
}
std::list<ibvla::QueuePair> pt::vpi::PeerTransport::getConnectedQueuePairs ()
{
std::lock_guard<std::mutex> guard(olock_);
......@@ -1426,16 +1372,14 @@ std::list<ibvla::QueuePair> pt::vpi::PeerTransport::getConnectedQueuePairs ()
//std::cout << (*o)->qp_ << std::endl;
//if ( ! (*o)->isLocal())
//std::cout << "- DEBUG - outpipes " << typeid(*o).name() << " compares to " << typeid(pt::vpi::ROutput).name() << std::endl;
pt::vpi::Output * opipe = *o;
if ( typeid(*opipe) == typeid(pt::vpi::ROutput) )
{
pt::vpi::Output * opipe = *o;
if ( typeid(*opipe) == typeid(pt::vpi::ROutput) )
{
//std::cout << "- DEBUG - retrived typeid " << typeid(*o).name() << std::endl;
pt::vpi::ROutput * ro = dynamic_cast<pt::vpi::ROutput*>(*o);
connectedqps.push_back(ro->qp_);
}
}
return connectedqps;
}
std::list<ibvla::QueuePair> pt::vpi::PeerTransport::getAcceptedQueuePairs ()
......@@ -1447,13 +1391,12 @@ std::list<ibvla::QueuePair> pt::vpi::PeerTransport::getAcceptedQueuePairs ()
//std::cout << (*o)->qp_ << std::endl;
//if ( ! (*o)->isLocal())
//std::cout << "- DEBUG - outpipes " << typeid(*o).name() << " compares to " << typeid(pt::vpi::ROutput).name() << std::endl;
pt::vpi::Input * ipipe = *i;
if ( typeid(*ipipe) == typeid(pt::vpi::RInput) )
{
pt::vpi::Input * ipipe = *i;
if ( typeid(*ipipe) == typeid(pt::vpi::RInput) )
{
//std::cout << "- DEBUG - retrived typeid " << typeid(*o).name() << std::endl;
pt::vpi::RInput * ri = dynamic_cast<pt::vpi::RInput*>(*i);
connectedqps.push_back(ri->qp_);
}
}
return connectedqps;
......@@ -1475,7 +1418,6 @@ void pt::vpi::PeerTransport::localconnect( size_t qpsize, size_t cqsize, const s
LOG4CPLUS_DEBUG(this->getOwnerApplication()->getApplicationLogger(), "Connection establishment for network (as local):" << network);
std::shared_ptr<pt::vpi::EstablishedConnection> pec = std::make_shared<pt::vpi::EstablishedConnection>(this,pch,user_context,network);
//std::thread([listener,pec](){ listener->actionPerformed(pec); }).detach();
listener->actionPerformed(pec);
break;
}
......@@ -1505,7 +1447,6 @@ void pt::vpi::PeerTransport::connect(pt::Address::Reference from, pt::Address::
LOG4CPLUS_INFO(this->getOwnerApplication()->getApplicationLogger(), "Connection request for network (as local):" << network << " Local IP " << localIPNumber << " Remote IP " << remoteIPNumber);
std::thread lc(&pt::vpi::PeerTransport::localconnect, this, app->sendQueuePairSize_, app->completionQueueSize_, network, listener, user_context);
lc.detach();
}
else
{
......@@ -1533,20 +1474,18 @@ void pt::vpi::PeerTransport::connect(pt::Address::Reference from, pt::Address::
}
catch (pt::exception::Exception & e)
{
std::stringstream ss;
std::stringstream ss;
ss << "fail to connect to host " << daddress.getHost() << " port " << daddress.getPortNum() << " on network" << laddress.getNetwork() << std::endl;
XCEPT_DECLARE_NESTED(pt::vpi::exception::Exception, ex, ss.str(), e);
std::shared_ptr<pt::pipe::ConnectionError> pce = std::make_shared<pt::pipe::ConnectionError>(user_context, ex);
std::thread([listener,pce](){ listener->actionPerformed(pce); }).detach();
std::thread([](pt::pipe::ServiceListener * listener,std::shared_ptr<pt::pipe::ConnectionError> pce){ listener->actionPerformed(pce);},listener,pce).detach();
return;
}
// invoke listener here asynchronously to prevent blocking in pipe member application
std::shared_ptr<void> pch = std::make_shared<pt::vpi::RSupport>(this,channel,context_,pd_);
std::shared_ptr<pt::vpi::EstablishedConnection> pec = std::make_shared<pt::vpi::EstablishedConnection>(this, pch,user_context,network);
std::thread([listener,pec](){ listener->actionPerformed(pec); }).detach();
std::thread([](pt::pipe::ServiceListener * listener, std::shared_ptr<pt::vpi::EstablishedConnection> pec){ listener->actionPerformed(pec);},listener,pec).detach();
}
}
......@@ -1555,63 +1494,19 @@ void pt::vpi::PeerTransport::connect(pt::Address::Reference from, pt::Address::
pt::pipe::Input * pt::vpi::PeerTransport::createInputPipe( std::shared_ptr<pt::pipe::ConnectionRequest> & cr , pt::pipe::InputListener * listener)
{
pt::vpi::Application * app = dynamic_cast<pt::vpi::Application *>(this->getOwnerApplication());
std::lock_guard<std::mutex> guard(ilock_);
pt::vpi::ConnectionRequest * crp = dynamic_cast<pt::vpi::ConnectionRequest*>(cr.get());
pt::vpi::Support* support = static_cast<pt::vpi::Support*>(crp->getConnectionHandle().get());
if ( typeid(*support) == typeid(pt::vpi::LSupport) )
{
LOG4CPLUS_DEBUG(this->getOwnerApplication()->getApplicationLogger(), "Create local input pipe for network :" << cr->getNetwork());
pt::vpi::Input * ipipe = new pt::vpi::LInput (cr, listener, this);
ipipes_.push_back(ipipe);
return ipipe;
}
else
{
LOG4CPLUS_DEBUG(this->getOwnerApplication()->getApplicationLogger(), "Create remote input pipe for network :" << cr->getNetwork());
pt::vpi::Input * ipipe = new pt::vpi::RInput (cr, listener, this, app->recvQueuePairSize_);
ipipes_.push_back(ipipe);
return ipipe;
}
return this->createInputPipe( cr, listener, app->recvQueuePairSize_);
}
pt::pipe::Output * pt::vpi::PeerTransport::createOutputPipe( std::shared_ptr<pt::pipe::EstablishedConnection> & ec , pt::pipe::OutputListener * listener)
{
pt::vpi::Application * app = dynamic_cast<pt::vpi::Application *>(this->getOwnerApplication());
std::lock_guard<std::mutex> guard(olock_);
pt::vpi::EstablishedConnection * ecp = dynamic_cast<pt::vpi::EstablishedConnection*>(ec.get());
//if ( ecp->isLocal())
pt::vpi::Support* support = static_cast<pt::vpi::Support*>(ecp->getConnectionHandle().get());
if ( typeid(*support) == typeid(pt::vpi::LSupport) )
{
LOG4CPLUS_DEBUG(this->getOwnerApplication()->getApplicationLogger(), "Create local output pipe");
pt::vpi::Output * opipe = new pt::vpi::LOutput (ec, listener, this);
opipes_.push_back(opipe);
return opipe;
}
else
{
LOG4CPLUS_DEBUG(this->getOwnerApplication()->getApplicationLogger(), "Create remote output pipe");
pt::vpi::Output * opipe = new pt::vpi::ROutput (ec, listener, this, app->sendQueuePairSize_);
opipes_.push_back(opipe);
return opipe;
}
return this->createOutputPipe( ec, listener , app->sendQueuePairSize_);
}
pt::pipe::Output * pt::vpi::PeerTransport::createOutputPipe( std::shared_ptr<pt::pipe::EstablishedConnection> & ec, pt::pipe::OutputListener * listener , size_t n)
{
//pt::vpi::Application * app = dynamic_cast<pt::vpi::Application *>(this->getOwnerApplication());
std::lock_guard<std::mutex> guard(olock_);
......@@ -1620,7 +1515,7 @@ pt::pipe::Output * pt::vpi::PeerTransport::createOutputPipe( std::shared_ptr<pt:
pt::vpi::Support* support = static_cast<pt::vpi::Support*>(ecp->getConnectionHandle().get());
if ( typeid(*support) == typeid(pt::vpi::LSupport) )
{
LOG4CPLUS_DEBUG(this->getOwnerApplication()->getApplicationLogger(), "Create local output pipe");
LOG4CPLUS_DEBUG(this->getOwnerApplication()->getApplicationLogger(), "Create local output pipe");
pt::vpi::Output * opipe = new pt::vpi::LOutput (ec, listener, this);
opipes_.push_back(opipe);
......@@ -1628,7 +1523,7 @@ pt::pipe::Output * pt::vpi::PeerTransport::createOutputPipe( std::shared_ptr<pt:
}
else
{
LOG4CPLUS_DEBUG(this->getOwnerApplication()->getApplicationLogger(), "Create remote output pipe");
LOG4CPLUS_DEBUG(this->getOwnerApplication()->getApplicationLogger(), "Create remote output pipe");
pt::vpi::Output * opipe = new pt::vpi::ROutput (ec, listener, this, n);
opipes_.push_back(opipe);
......@@ -1638,7 +1533,6 @@ pt::pipe::Output * pt::vpi::PeerTransport::createOutputPipe( std::shared_ptr<pt:
pt::pipe::Input * pt::vpi::PeerTransport::createInputPipe( std::shared_ptr<pt::pipe::ConnectionRequest> & cr , pt::pipe::InputListener * listener, size_t n)
{
pt::vpi::Application * app = dynamic_cast<pt::vpi::Application *>(this->getOwnerApplication());
std::lock_guard<std::mutex> guard(ilock_);
......@@ -1647,7 +1541,7 @@ pt::pipe::Input * pt::vpi::PeerTransport::createInputPipe( std::shared_ptr<pt::
pt::vpi::Support* support = static_cast<pt::vpi::Support*>(crp->getConnectionHandle().get());
if ( typeid(*support) == typeid(pt::vpi::LSupport) )
{
LOG4CPLUS_DEBUG(this->getOwnerApplication()->getApplicationLogger(), "Create local input pipe for network :" << cr->getNetwork());
LOG4CPLUS_DEBUG(this->getOwnerApplication()->getApplicationLogger(), "Create local input pipe for network :" << cr->getNetwork());
pt::vpi::Input * ipipe = new pt::vpi::LInput (cr, listener, this);
ipipes_.push_back(ipipe);
......@@ -1655,7 +1549,7 @@ pt::pipe::Input * pt::vpi::PeerTransport::createInputPipe( std::shared_ptr<pt::
}
else
{
LOG4CPLUS_DEBUG(this->getOwnerApplication()->getApplicationLogger(), "Create remote input pipe for network :" << cr->getNetwork());
LOG4CPLUS_DEBUG(this->getOwnerApplication()->getApplicationLogger(), "Create remote input pipe for network :" << cr->getNetwork());
pt::vpi::Input * ipipe = new pt::vpi::RInput (cr, listener, this, n);
ipipes_.push_back(ipipe);
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment