Commit aece2144 authored by Luciano Orsini's avatar Luciano Orsini
Browse files

references #252: asyncronous connect call

parent 562a947d
......@@ -136,7 +136,8 @@ protected:
//void remoteconnect (ibvla::QueuePair & qp, const std::string & host, size_t port, size_t path, int sgid_index, int is_global, int sl, int traffic_class, const std::string & network);
void localconnect( size_t qpsize, size_t cqsize, const std::string network, pt::pipe::ServiceListener * listener, std::shared_ptr<void> user_context);
void remoteconnect(nlohmann::json cookie, const std::string hostname, size_t port, const std::string network, pt::pipe::ServiceListener * listener, std::shared_ptr<void> user_context);
// ibvla callbacks
void handleEvent (struct ibv_async_event * event);
void handleWorkRequestError (struct ibv_wc * wc);
......
......@@ -18,7 +18,7 @@
// !!! Edit this line to reflect the latest package version !!!
#define CORE_PTVPI_VERSION_MAJOR 1
#define CORE_PTVPI_VERSION_MINOR 4
#define CORE_PTVPI_VERSION_MINOR 5
#define CORE_PTVPI_VERSION_PATCH 0
// If any previous versions available E.g. #define CORE_PTVPI_PREVIOUS_VERSIONS "3.8.0,3.8.1"
#undef CORE_PTVPI_PREVIOUS_VERSIONS
......
......@@ -1423,6 +1423,33 @@ void pt::vpi::PeerTransport::localconnect( size_t qpsize, size_t cqsize, const s
}
}
}
void pt::vpi::PeerTransport::remoteconnect(nlohmann::json cookie, const std::string hostname, size_t port, const std::string network, pt::pipe::ServiceListener * listener, std::shared_ptr<void> user_context)
{
std::shared_ptr<pt::vpi::Channel> channel;
try
{
pt::vpi::Connector connector(cookie);
channel = connector.connect(hostname, port);
}
catch (pt::exception::Exception & e)
{
std::stringstream ss;
ss << "fail to connect to host " << hostname << " port " << port << " on network" << cookie["network"] << std::endl;
XCEPT_DECLARE_NESTED(pt::vpi::exception::Exception, ex, ss.str(), e);
std::shared_ptr<pt::pipe::ConnectionError> pce = std::make_shared<pt::pipe::ConnectionError>(user_context, ex);
//std::thread([](pt::pipe::ServiceListener * listener,std::shared_ptr<pt::pipe::ConnectionError> pce){ listener->actionPerformed(pce);},listener,pce).detach();
listener->actionPerformed(pce);
return;
}
// invoke listener here asynchronously to prevent blocking in pipe member application
std::shared_ptr<void> pch = std::make_shared<pt::vpi::RSupport>(this,channel,context_,pd_);
std::shared_ptr<pt::vpi::EstablishedConnection> pec = std::make_shared<pt::vpi::EstablishedConnection>(this, pch,user_context,network);
//std::thread([](pt::pipe::ServiceListener * listener, std::shared_ptr<pt::vpi::EstablishedConnection> pec){ listener->actionPerformed(pec);},listener,pec).detach();
listener->actionPerformed(pec);
}
void pt::vpi::PeerTransport::connect(pt::Address::Reference from, pt::Address::Reference to, pt::pipe::ServiceListener * listener, std::shared_ptr<void> & user_context)
{
......@@ -1433,6 +1460,7 @@ void pt::vpi::PeerTransport::connect(pt::Address::Reference from, pt::Address::
pt::vpi::Address & daddress = dynamic_cast<pt::vpi::Address &>(*(to));
std::string rport = daddress.getPort();
size_t rportnum = daddress.getPortNum();
std::string rhostname = daddress.getHost();
std::string network = daddress.getNetwork();
......@@ -1464,28 +1492,9 @@ void pt::vpi::PeerTransport::connect(pt::Address::Reference from, pt::Address::
cookie["sendwithtimeout"] = (xdata::BooleanT)app->sendWithTimeout_;
cookie["remoteip"] = remoteIPNumber; // to be used in ROutput for qpinfo
std::shared_ptr<pt::vpi::Channel> channel;
std::thread rc(&pt::vpi::PeerTransport::remoteconnect, this, cookie, rhostname, rportnum, network, listener, user_context);
rc.detach();
try
{
pt::vpi::Connector connector(cookie);
channel = connector.connect(daddress.getHost(),daddress.getPortNum());
}
catch (pt::exception::Exception & e)
{
std::stringstream ss;
ss << "fail to connect to host " << daddress.getHost() << " port " << daddress.getPortNum() << " on network" << laddress.getNetwork() << std::endl;
XCEPT_DECLARE_NESTED(pt::vpi::exception::Exception, ex, ss.str(), e);
std::shared_ptr<pt::pipe::ConnectionError> pce = std::make_shared<pt::pipe::ConnectionError>(user_context, ex);
std::thread([](pt::pipe::ServiceListener * listener,std::shared_ptr<pt::pipe::ConnectionError> pce){ listener->actionPerformed(pce);},listener,pce).detach();
return;
}
// invoke listener here asynchronously to prevent blocking in pipe member application
std::shared_ptr<void> pch = std::make_shared<pt::vpi::RSupport>(this,channel,context_,pd_);
std::shared_ptr<pt::vpi::EstablishedConnection> pec = std::make_shared<pt::vpi::EstablishedConnection>(this, pch,user_context,network);
std::thread([](pt::pipe::ServiceListener * listener, std::shared_ptr<pt::vpi::EstablishedConnection> pec){ listener->actionPerformed(pec);},listener,pec).detach();
}
}
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment