Commit 099554af authored by Luciano Orsini's avatar Luciano Orsini Committed by Dainius Simelevicius
Browse files

references #183: support for automatic device discovery according to linux interface

parent 519d7643
......@@ -18,9 +18,9 @@
#define WORKSUITE_IBVLA_VERSION_MAJOR 2
#define WORKSUITE_IBVLA_VERSION_MINOR 1
#define WORKSUITE_IBVLA_VERSION_PATCH 0
#define WORKSUITE_IBVLA_VERSION_PATCH 1
// If any previous versions available E.g. #define WORKSUITE_ESOURCE_PREVIOUS_VERSIONS "3.8.0,3.8.1"
#define WORKSUITE_IBVLA_PREVIOUS_VERSIONS "2.0.0"
#define WORKSUITE_IBVLA_PREVIOUS_VERSIONS "2.0.0,2.1.0"
//
// Template macros
......
......@@ -115,7 +115,7 @@ ibvla::ProtectionDomain ibvla::Context::allocateProtectionDomain ()
XCEPT_RAISE(ibvla::exception::Exception, ss.str());
}
std::cout << "ibv_alloc_pd for context device '" << getDeviceName() << "'" << std::endl;
//std::cout << "ibv_alloc_pd for context device '" << getDeviceName() << "'" << std::endl;
return ibvla::ProtectionDomain(*this, pd);
}
......
......@@ -112,6 +112,10 @@ namespace pt
xdata::String iaName_;
xdata::String recvPoolName_;
xdata::String sendPoolName_;
//xdata::Boolean useNetworkInterface_;
xdata::String networkInterface_;
xdata::UnsignedInteger SGIDIndex_;
xdata::UnsignedInteger portNumber_;
xdata::String memAllocTimeout_;
......
......@@ -192,6 +192,8 @@ namespace pt
//xdata::UnsignedInteger sendQPSize_;
//xdata::UnsignedInteger recvQPSize_;
bool isGlobal_; // true if RoCE false otherwise
public:
//xdata::UnsignedInteger maxMessageSize_;
......
......@@ -18,8 +18,8 @@
#include "config/PackageInfo.h"
// !!! Edit this line to reflect the latest package version !!!
#define WORKSUITE_PTIBV_VERSION_MAJOR 2
#define WORKSUITE_PTIBV_VERSION_MINOR 5
#define WORKSUITE_PTIBV_VERSION_MAJOR 3
#define WORKSUITE_PTIBV_VERSION_MINOR 0
#define WORKSUITE_PTIBV_VERSION_PATCH 0
// If any previous versions available E.g. #define WORKSUITE_PTIBV_PREVIOUS_VERSIONS "3.8.0,3.8.1"
#undef WORKSUITE_PTIBV_PREVIOUS_VERSIONS
......
......@@ -53,6 +53,10 @@ pt::ibv::Application::Application (xdaq::ApplicationStub* stub)
{
retrivedRcmsStateListener_ = false;
this->iaName_ = "unknown";
//this->useNetworkInterface_ = false;
this->networkInterface_ = "unknown";
this->portNumber_ = 1;
this->SGIDIndex_ = 0;
this->senderPoolSize_ = 0x100000;
this->receiverPoolSize_ = 0x100000;
......@@ -120,7 +124,11 @@ pt::ibv::Application::Application (xdaq::ApplicationStub* stub)
xoap::bind(this, &pt::ibv::Application::fireEvent, "Fail", XDAQ_NS_URI );
xoap::bind(this, &pt::ibv::Application::fireEvent, "fail", XDAQ_NS_URI );
this->getApplicationInfoSpace()->fireItemAvailable("iaName", &iaName_);
//this->getApplicationInfoSpace()->fireItemAvailable("iaName", &iaName_);
//this->getApplicationInfoSpace()->fireItemAvailable("useNetworkInterface", &useNetworkInterface_);
this->getApplicationInfoSpace()->fireItemAvailable("networkInterface", &networkInterface_);
this->getApplicationInfoSpace()->fireItemAvailable("SGIDIndex", &SGIDIndex_);
this->getApplicationInfoSpace()->fireItemAvailable("portNumber", &portNumber_);
this->getApplicationInfoSpace()->fireItemAvailable("memAllocTimeout", &memAllocTimeout_);
this->getApplicationInfoSpace()->fireItemAvailable("receiverPoolSize", &receiverPoolSize_);
this->getApplicationInfoSpace()->fireItemAvailable("senderPoolSize", &senderPoolSize_);
......
......@@ -8,6 +8,26 @@
* For the licensing terms see LICENSE. *
* For the list of contributors see CREDITS. *
*************************************************************************/
#include <sys/types.h>
#include <sys/socket.h>
#include <sys/ioctl.h>
#include <net/if.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <string.h>
#include <stdio.h>
#include <unistd.h>
#include <netinet/ip.h>
#include <linux/ethtool.h>
#include <linux/sockios.h>
#include <errno.h>
#include <net/if_arp.h>
#include <ifaddrs.h>
#include <netpacket/packet.h>
#include <sstream>
#include "pt/ibv/PeerTransport.h"
#include "ibvla/Buffer.h"
......@@ -30,12 +50,139 @@
#include "toolbox/TimeInterval.h"
#include "toolbox/TimeVal.h"
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include "pt/ibv/Application.h"
static std::string getIPv4(const std::string & ifname)
{
char ipv4[16];
struct ifreq ifc;
int res;
int sockfd = socket(AF_INET, SOCK_DGRAM, 0);
if(sockfd < 0)
{
XCEPT_RAISE(pt::ibv::exception::Exception,"failed called to socket for conversion IPv4 from " + ifname);
}
strcpy(ifc.ifr_name, ifname.c_str());
res = ioctl(sockfd, SIOCGIFADDR, &ifc);
close(sockfd);
if(res < 0)
{
XCEPT_RAISE(pt::ibv::exception::Exception,"failed called to ioctl for conversion IPv4 from " + ifname);
}
strcpy(ipv4, inet_ntoa(((struct sockaddr_in*)&ifc.ifr_addr)->sin_addr));
return std::string(ipv4);
}
/**
* * Get network interface link type
* * @param name the network interface name
* * @return <0 error code on error, the ARPHRD_ link type otherwise
* */
static int getlinktype(const std::string & name)
{
int rv;
int fd;
struct ifreq ifr;
if (name.size() >= IFNAMSIZ)
{
std::stringstream msg;
msg << "failed to retrieve link type for Interface name '" << name << "', is too long";
XCEPT_RAISE(pt::ibv::exception::Exception, msg.str());
}
strncpy(ifr.ifr_name, name.c_str(), IFNAMSIZ);
fd = socket(AF_INET, SOCK_DGRAM, IPPROTO_IP);
if (fd < 0)
return fd;
rv = ioctl(fd, SIOCGIFHWADDR, &ifr);
close(fd);
if (rv < 0)
{
std::stringstream msg;
msg << "failed to retrieve link type for Interface name '" << name << "', ioctl error " << rv;
XCEPT_RAISE(pt::ibv::exception::Exception, msg.str());
}
return ifr.ifr_hwaddr.sa_family;
}
static std::string linktype2string(int ltype )
{
std::string name = "unknown";
switch (ltype)
{
case ARPHRD_ETHER:
name = "ether";
break;
case ARPHRD_IEEE80211:
name = "ieee802.11";
break;
case ARPHRD_INFINIBAND:
name = "infiniband";
break;
}
return name;
}
union ibv_gid mac2gid (const std::string & ifname)
{
struct ifaddrs *ifaddr=NULL;
struct ifaddrs *ifa = NULL;
if (getifaddrs(&ifaddr) == -1)
{
perror("getifaddrs");
}
else
{
for ( ifa = ifaddr; ifa != NULL; ifa = ifa->ifa_next)
{
if ( (ifa->ifa_addr) && (ifa->ifa_addr->sa_family == AF_PACKET) )
{
struct sockaddr_ll *s = (struct sockaddr_ll*)ifa->ifa_addr;
std::string val(ifa->ifa_name);
if ( (ifname == val) && (s->sll_halen == 20) )
{
/* printf("name %-8s addr len %d : ", ifa->ifa_name, s->sll_halen);
for (i=0; i <s->sll_halen; i++)
{
printf("%02x%c", (s->sll_addr[i]), (i+1!=s->sll_halen)?':':'\n');
}
*/
/*
union ibv_gid {
uint8_t raw[16];
struct {
__be64 subnet_prefix;
__be64 interface_id;
} global;
};
*/
// skip first four
union ibv_gid gid;
for (int i = 0; i < 16; ++i)
{
gid.raw[i] = s->sll_addr[i+4];
}
return gid;
}
}
}
freeifaddrs(ifaddr);
}
XCEPT_RAISE(pt::ibv::exception::Exception, "cannot find mac address for interface name " + ifname);
}
//pt::ibv::PeerTransport::PeerTransport (xdaq::Application * parent, ibvla::Context & context, ibvla::ProtectionDomain & pd, toolbox::mem::Pool * spool, toolbox::mem::Pool * rpool, xdata::UnsignedInteger cqSize, xdata::UnsignedInteger sendQPSize, xdata::UnsignedInteger recvQPSize, xdata::UnsignedInteger maxMessageSize, xdata::UnsignedInteger deviceMTU, xdata::Boolean sendWithTimeout, xdata::Boolean useRelay)
// : xdaq::Object(parent), mutex_(toolbox::BSem::FULL, true), context_(context), pd_(pd), spool_(spool), rpool_(rpool), completionQueueSize_(cqSize), sendQPSize_(sendQPSize), recvQPSize_(recvQPSize), maxMessageSize_(maxMessageSize), deviceMTU_(deviceMTU), sendWithTimeout_(sendWithTimeout), useRelay_(useRelay)
pt::ibv::PeerTransport::PeerTransport (xdaq::Application * parent)
......@@ -63,35 +210,77 @@ pt::ibv::PeerTransport::PeerTransport (xdaq::Application * parent)
LOG4CPLUS_INFO(app->getApplicationLogger(), "Found " << devices.size() << " devices");
int ltype = getlinktype(app->networkInterface_);
LOG4CPLUS_INFO(app->getApplicationLogger(), "Link type detected: " << linktype2string(ltype) );
union ibv_gid mgid; // GID derived from linux interface name
if ( ltype == ARPHRD_INFINIBAND )
{
isGlobal_ = false;
mgid = mac2gid(app->networkInterface_); // infiniban only
//std::cout << "Hardware address to GID is " << std::hex << bswap_64(mgid.global.subnet_prefix) << "-" << bswap_64(mgid.global.interface_id) << std::dec << std::endl;
}
else if ( ltype == ARPHRD_ETHER )
{
isGlobal_ = true;
std::string ipv4 = getIPv4(app->networkInterface_.c_str());
uint64_t iaddr = inet_addr(ipv4.c_str()) & 0x00000000ffffffff;
mgid.global.interface_id = (iaddr << 32) | 0xffff0000;
mgid.global.subnet_prefix = 0x0;
//std::cout << "Original IPV5 adress binary " << std::hex << bswap_64(addr) << std::dec << std::endl;
//std::cout << "Expected GID binary " << std::hex << bswap_64(egid) << std::dec << std::endl;
//std::cout << "IP address to GID is " << std::hex << bswap_64(mgid.global.subnet_prefix) << "-" << bswap_64(mgid.global.interface_id) << std::dec << std::endl;
}
else
{
XCEPT_RAISE(pt::exception::Exception, "link type not supported for " + linktype2string(ltype));
}
ibvla::Device device;
std::list<ibvla::Device>::iterator i;
for (i = devices.begin(); i != devices.end(); i++)
{
if ((*i).getName() == app->iaName_.toString())
for (i = devices.begin(); i != devices.end(); i++)
{
//LOG4CPLUS_DEBUG(app->getApplicationLogger(), "Device name = '" << (*i).getName() << "', guid = ' 0x" << std::hex << bswap_64((*i).getGUID()) << std::dec << "'");
//printf("RDMA device[]: name=%s, GUID=0x%016Lx\n", (*i).getName().c_str(), (unsigned long long)ntohll((*i).getGUID()));
ibvla::Context context = ibvla::createContext((*i));
try
{
LOG4CPLUS_INFO(app->getApplicationLogger(), "Found device name = '" << (*i).getName() << "', guid = ' 0x" << std::hex << bswap_64((*i).getGUID()) << std::dec << "'");
break;
LOG4CPLUS_DEBUG(app->getApplicationLogger(), "going to query GID for " << (*i).getName() << "index " << app->SGIDIndex_ << " port " << app->portNumber_ );
union ibv_gid gid = context.queryGID(app->portNumber_, app->SGIDIndex_);
/*enum ibv_gid_type_sysfs sgid_type = 0;
int ret = ibv_query_gid_type(context_.context_, 1, sgid_index, &sgid_type);
*/
if ( mgid.global.interface_id == gid.global.interface_id )
{
//std::cout << "compare (not swapped)" << std::hex << mgid.global.interface_id << " with " << gid.global.interface_id << std::dec << std::endl;
//std::cout << "found matching IP for device " << (*i).getName() << std::endl;
//printf("index %d port %d %016lx:%016lx\n", (int)app->SGIDIndex_, (int)app->portNumber_, bswap_64(gid.global.subnet_prefix), bswap_64(gid.global.interface_id));
context_ = context;
app->iaName_ = (*i).getName();
LOG4CPLUS_INFO(app->getApplicationLogger(), "found matching interface " << app->networkInterface_.toString() << " for device " << (*i).getName() << " at index " << app->SGIDIndex_ << " port " << app->portNumber_ << " GID " << std::hex << bswap_64(gid.global.subnet_prefix) << ":" << bswap_64(gid.global.interface_id) );
break;
}
}
}
catch( ibvla::exception::Exception & e )
{
XCEPT_RETHROW(pt::ibv::exception::Exception, "failed to create IB context ", e);
}
ibvla::destroyContext(context);
}
if (i == devices.end())
{
ibvla::freeDeviceList (devices);
std::stringstream msg;
msg << "Could not find device '" << app->iaName_.toString() << "', peer transport configuration failure" << std::endl;
msg << "Available devices :" << std::endl;
for (i = devices.begin(); i != devices.end(); i++)
{
msg << (*i).getName() << std::endl;
}
msg << "Could not find device for interface '" << app->networkInterface_.toString() << "' index " << app->SGIDIndex_ << " port " << app->portNumber_ << std::endl;
XCEPT_RAISE(pt::exception::Exception, msg.str());
}
context_ = ibvla::createContext((*i));
pd_ = context_.allocateProtectionDomain();
toolbox::net::URN urn("toolbox-mem-pool", app->sendPoolName_);
......@@ -157,7 +346,7 @@ pt::ibv::PeerTransport::PeerTransport (xdaq::Application * parent)
*/
// (size, user context (void*), comp_vector)
std::cout << "cqSize == " << app->completionQueueSize_ << ", sendQPSize == " << app->sendQueuePairSize_ << ", recvQPSize == " << app->recvQueuePairSize_ << ", maxMessageSize == " << app->maxMessageSize_ << ", MTU == " << app->deviceMTU_ << std::endl;
//std::cout << "cqSize == " << app->completionQueueSize_ << ", sendQPSize == " << app->sendQueuePairSize_ << ", recvQPSize == " << app->recvQueuePairSize_ << ", maxMessageSize == " << app->maxMessageSize_ << ", MTU == " << app->deviceMTU_ << std::endl;
cqs_ = context_.createCompletionQueue(app->completionQueueSize_, 0, 0);
cqr_ = context_.createCompletionQueue(1, 0, 0);
......@@ -276,26 +465,13 @@ pt::Address::Reference pt::ibv::PeerTransport::createAddress (std::map<std::stri
{
std::string protocol = address["protocol"];
std::string ibport = address["ibport"];
if (ibport == "")
{
ibport = "1";
}
std::string path = address["path"];
if (path == "")
{
path = "0";
}
int isglobal = 0;
std::string ssgidindex = "0";
if (address.find("sgidindex") != address.end())
{
isglobal = 1;
ssgidindex = address["sgidindex"];
}
bool isglobal = isGlobal_;
if (protocol == "ibv")
{
......@@ -323,9 +499,10 @@ pt::Address::Reference pt::ibv::PeerTransport::createAddress (std::map<std::stri
}
//return this->createAddress(url, service, ibport);
size_t ibPortNum = toolbox::toUnsignedLong(ibport);
pt::ibv::Application * app = dynamic_cast<pt::ibv::Application *>(this->getOwnerApplication());
size_t ibPortNum = app->portNumber_;
size_t ibPath = toolbox::toUnsignedLong(path);
int sgidindex = toolbox::toLong(ssgidindex);
int sgidindex = app->SGIDIndex_;
return pt::Address::Reference(new pt::ibv::Address(url, service, ibPortNum, ibPath, sgidindex, isglobal));
}
else
......
<xc:Partition xmlns:soapenc="http://schemas.xmlsoap.org/soap/encoding/" xmlns:xc="http://xdaq.web.cern.ch/xdaq/xsd/2004/XMLConfiguration-30" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
<i2o:protocol xmlns:i2o="http://xdaq.web.cern.ch/xdaq/xsd/2004/I2OConfiguration-30">
<i2o:target class="Client" instance="0" tid="23"/>
<i2o:target class="Server" instance="0" tid="24"/>
</i2o:protocol>
<xc:Context url="http://d3vrubu-c2e34-41-01.cms:1972">
<xc:Endpoint protocol="ibv" service="i2o" hostname="d3vrubu-c2e34-41-01.cms" port="50000" network="fdr"/>
<xc:Application class="pt::ibv::Application" id="11" instance="0" network="local">
<properties xmlns="urn:xdaq-application:pt::ibv::Application" xsi:type="soapenc:Struct">
<!-- networkInterface xsi:type="xsd:string">p4p2</networkInterface -->
<networkInterface xsi:type="xsd:string">p4p2</networkInterface>
<SGIDIndex xsi:type="xsd:unsignedInt">3</SGIDIndex>
<portNumber xsi:type="xsd:unsignedInt">1</portNumber>
<receiverPoolSize xsi:type="xsd:unsignedLong">0x100000</receiverPoolSize>
<senderPoolSize xsi:type="xsd:unsignedLong">0x2000000</senderPoolSize>
<completionQueueSize xsi:type="xsd:unsignedInt">8192</completionQueueSize>
<sendQueuePairSize xsi:type="xsd:unsignedInt">8192</sendQueuePairSize>
<recvQueuePairSize xsi:type="xsd:unsignedInt">0</recvQueuePairSize>
<maxMessageSize xsi:type="xsd:unsignedInt">257000</maxMessageSize>
<deviceMTU xsi:type="xsd:unsignedInt">4096</deviceMTU>
</properties>
</xc:Application>
<!-- Declare an application -->
<xc:Application class="Client" id="10" instance="0" network="fdr">
<properties xmlns="urn:xdaq-application:Client" xsi:type="soapenc:Struct">
<maxFrameSize xsi:type="xsd:unsignedLong">0x40000</maxFrameSize>
<committedPoolSize xsi:type="xsd:unsignedLong">84000000</committedPoolSize>
<numberOfSamples xsi:type="xsd:unsignedLong">10</numberOfSamples>
<sampleTime xsi:type="xsd:string">PT5S</sampleTime>
<StdDev xsi:type="xsd:unsignedLong">512</StdDev>
<currentSize xsi:type="xsd:unsignedLong">128</currentSize>
<MinFragmentSize xsi:type="xsd:unsignedLong">32</MinFragmentSize>
<MaxFragmentSize xsi:type="xsd:unsignedLong">262000</MaxFragmentSize>
<fixedSize xsi:type="xsd:boolean">true</fixedSize>
<createPool xsi:type="xsd:boolean">false</createPool>
<poolName xsi:type="xsd:string">sibv</poolName>
</properties>
</xc:Application>
<!-- Shared object library that contains the implementation -->
<xc:Module>/opt/xdaq/lib/libxdaq2rc.so</xc:Module>
<xc:Module>/nfshome0/lorsini/devel/localbus/worksuite/pt/ibv/lib/linux/x86_64_centos7/libptibv.so</xc:Module>
<xc:Module>/opt/xdaq/lib/libmstreamio2g.so</xc:Module>
<xc:Application class="xmem::probe::Application" id="21" instance="0" network="local">
</xc:Application>
<xc:Module>/opt/xdaq/lib/libxmemprobe.so</xc:Module>
</xc:Context>
<xc:Context url="http://d3vrubu-c2e34-06-01.cms:1972">
<xc:Endpoint protocol="ibv" service="i2o" hostname="d3vrubu-c2e34-06-01.cms" port="30010" network="fdr" />
<xc:Application class="pt::ibv::Application" id="12" instance="0" network="local">
<properties xmlns="urn:xdaq-application:pt::ibv::Application" xsi:type="soapenc:Struct">
<networkInterface xsi:type="xsd:string">p4p2</networkInterface>
<SGIDIndex xsi:type="xsd:unsignedInt">3</SGIDIndex>
<portNumber xsi:type="xsd:unsignedInt">1</portNumber>
<receiverPoolSize xsi:type="xsd:unsignedLong">0x40000000</receiverPoolSize>
<senderPoolSize xsi:type="xsd:unsignedLong">0x100000</senderPoolSize>
<completionQueueSize xsi:type="xsd:unsignedInt">8192</completionQueueSize>
<sendQueuePairSize xsi:type="xsd:unsignedInt">0</sendQueuePairSize>
<recvQueuePairSize xsi:type="xsd:unsignedInt">1024</recvQueuePairSize>
<maxMessageSize xsi:type="xsd:unsignedInt">257000</maxMessageSize>
<deviceMTU xsi:type="xsd:unsignedInt">4096</deviceMTU>
</properties>
</xc:Application>
<!-- Declare an application -->
<xc:Application class="Server" id="11" instance="0" network="fdr">
<properties xmlns="urn:xdaq=application:Server" xsi:type="soapenc:Struct">
<currentSize xsi:type="xsd:unsignedLong">0</currentSize>
<numberOfSamples xsi:type="xsd:unsignedLong">10</numberOfSamples>
<sampleTime xsi:type="xsd:string">PT5S</sampleTime>
</properties>
</xc:Application>
<!-- Shared object library that contains the implementation -->
<xc:Module>/opt/xdaq/lib/libxdaq2rc.so</xc:Module>
<xc:Module>/nfshome0/lorsini/devel/localbus/worksuite/pt/ibv/lib/linux/x86_64_centos7/libptibv.so</xc:Module>
<xc:Module>/opt/xdaq/lib/libmstreamio2g.so</xc:Module>
<xc:Application class="xmem::probe::Application" id="21" instance="0" network="local">
</xc:Application>
<xc:Module>/opt/xdaq/lib/libxmemprobe.so</xc:Module>
</xc:Context>
</xc:Partition>
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment