-
Reiner Hauser authoredReiner Hauser authored
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
NameService.cxx 11.22 KiB
#include "asyncmsg/NameService.h"
#include "transport/Interface.h"
#include "MsgInfo.h"
#include "is/infostream.h"
#include "is/infodictionary.h"
#include "is/serveriterator.h"
namespace daq {
namespace asyncmsg {
NameService::NameService(IPCPartition partition, const std::vector<std::string>& data_networks, const std::string& is_server) noexcept
: m_partition(partition),
m_is_server(is_server)
{
if(const char *server = getenv("DF_CONFIG_IS_SERVER")) {
// override user argument
m_is_server = server;
}
for(const auto & network : data_networks) {
m_data_networks.push_back(parse_address_network(network));
}
}
NameService::~NameService() noexcept
{
// nothing to do
}
// Publish IS object with all valid local interfaces and port number.
// The 'name' parameter should typically be the application name.
void NameService::publish(const std::string& name, uint16_t port)
{
MsgInfo info;
char host[1024];
gethostname(host, sizeof(host));
info.Hostname = host;
info.Port = port;
const daq::transport::Interface::InterfaceList& interfaces = daq::transport::Interface::interfaces();
for(auto intf : interfaces) {
// check for invalid interfaces
// !UP, LOOPBACK, address == 0 (e.g. when bonded)
if(!intf->has_flag(IFF_UP) || intf->has_flag(IFF_LOOPBACK)) {
continue;
}
// An interface maybe up, but no IP address set, e.g. if it
// is used in a bonded configuration.
if(intf->family() == AF_INET && intf->address() == 0) {
continue;
}
if(intf->family() == AF_INET6) {
auto a = intf->address_v6();
if(memcmp(&a,&in6addr_any, sizeof(in6addr_any)) == 0) {
continue;
}
}
for(const auto& network : m_data_networks) {
auto& addr = std::get<0>(network);
auto& mask = std::get<1>(network);
if(addr.is_v4() && intf->family() == AF_INET) {
auto addr4(addr.to_v4());
auto mask4(mask.to_v4());
if(htonl(addr4.to_ulong() & mask4.to_ulong()) == (intf->address() & htonl(mask4.to_ulong()))) {
info.Addresses.push_back(intf->address_string() + '/' + mask4.to_string());
}
} else if(addr.is_v6() && intf->family() == AF_INET6) {
auto addr6(addr.to_v6().to_bytes());
auto mask6(mask.to_v6().to_bytes());
auto local_network = intf->network_v6();
bool matches = true;
for(size_t i = 0; i < addr6.size(); i++) {
if((addr6[i] & mask6[i]) != local_network.s6_addr[i]) {
matches = false;
break;
}
}
if(matches) {
info.Addresses.push_back(intf->address_string() + '/' + intf->netmask_string());
}
}
}
}
if(info.Addresses.empty()) {
throw CannotPublish(ERS_HERE);
}
ISInfoDictionary d(m_partition);
ISServerIterator servers(m_partition, m_is_server + ".*");
if(servers.entries() == 0) {
throw InvalidISServer(ERS_HERE,m_is_server);
}
while(servers++) {
// may throw
try {
d.checkin(std::string(servers.name()) + ".MSG_" + name, info);
} catch(ers::Issue& reason) {
throw InvalidISServer(ERS_HERE, servers.name(), reason);
}
}
}
bool NameService::matches(const boost::asio::ip::address& addr, const boost::asio::ip::address& mask) const
{
for(const auto intf : daq::transport::Interface::interfaces()) {
// ignore these
if(!intf->has_flag(IFF_UP) || intf->has_flag(IFF_LOOPBACK) || intf->address() == 0) {
continue;
}
// compare IPV4 address
if(addr.is_v4() && intf->family() == AF_INET) {
if(htonl(addr.to_v4().to_ulong() & mask.to_v4().to_ulong()) == (intf->network() & htonl(mask.to_v4().to_ulong()))) {
// found one, first match will be used
return true;
}
} else if (addr.is_v6() && intf->family() == AF_INET6) {
auto addr6(addr.to_v6().to_bytes());
auto mask6(mask.to_v6().to_bytes());
auto local_network = intf->network_v6();
bool matches = true;
for(size_t i = 0; i < addr6.size(); i++) {
if((addr6[i] & mask6[i]) != local_network.s6_addr[i]) {
matches = false;
break;
}
}
return matches;
}
}
// no match found at all
return false;
}
// Look up all applications of given 'type' and return a list of endpoints
std::vector<boost::asio::ip::tcp::endpoint>
NameService::lookup(const std::string& prefix) const
{
std::vector<boost::asio::ip::tcp::endpoint> results;
ISInfoStream it(m_partition, m_is_server, ISCriteria("MSG_" + prefix + ".*", MsgInfo::type()));
while(!it.eof()) {
std::string name(it.name());
MsgInfo info;
it >> info;
// match my local interfaces against the list
for(auto& addr_netmask : info.Addresses) {
boost::asio::ip::address addr, mask;
std::tie(addr, mask) = parse_address_network(addr_netmask);
if(matches(addr, mask)) {
results.push_back(boost::asio::ip::tcp::endpoint(addr, info.Port));
break;
}
}
}
return results;
}
// Look up all applications of given 'type' and return a map of names and endpoints
std::map<std::string,boost::asio::ip::tcp::endpoint>
NameService::lookup_names(const std::string& prefix) const
{
std::map<std::string,boost::asio::ip::tcp::endpoint> results;
ISInfoStream it(m_partition, m_is_server, ISCriteria("MSG_" + prefix + ".*", MsgInfo::type()));
while(!it.eof()) {
std::string name(it.name());
MsgInfo info;
it >> info;
// match my local interfaces against the list
for(auto& addr_netmask : info.Addresses) {
boost::asio::ip::address addr, mask;
std::tie(addr, mask) = parse_address_network(addr_netmask);
if(matches(addr, mask)) {
results[name.substr(m_is_server.size() + 5)] = boost::asio::ip::tcp::endpoint(addr, info.Port);
break;
}
}
}
return results;
}
// Look up all applications of given 'type' and return a map of all names and endpoints that match
std::map<std::string, std::vector<boost::asio::ip::tcp::endpoint>>
NameService::lookup_all(const std::string& prefix) const
{
std::map<std::string,std::vector<boost::asio::ip::tcp::endpoint>> results;
ISInfoStream it(m_partition, m_is_server, ISCriteria("MSG_" + prefix + ".*", MsgInfo::type()));
while(!it.eof()) {
std::string name(it.name());
MsgInfo info;
it >> info;
// match my local interfaces against the list
for(auto& addr_netmask : info.Addresses) {
boost::asio::ip::address addr, mask;
std::tie(addr, mask) = parse_address_network(addr_netmask);
if(matches(addr, mask)) {
results[name.substr(m_is_server.size() + 5)].push_back(boost::asio::ip::tcp::endpoint(addr, info.Port));
}
}
}
return results;
}
boost::asio::ip::tcp::endpoint
NameService::resolve(const std::string& name) const
{
ISInfoDictionary d(m_partition);
MsgInfo info;
d.getValue(m_is_server + ".MSG_" + name, info);
for(auto& addr_netmask : info.Addresses) {
boost::asio::ip::address addr, mask;
std::tie(addr, mask) = parse_address_network(addr_netmask);
if(matches(addr, mask)) {
return boost::asio::ip::tcp::endpoint(addr, info.Port);
}
}
throw CannotResolve(ERS_HERE,name);
}
NameService::Network
NameService::parse_address_network(const std::string& network)
{
size_t slash = network.find('/');
std::string address = network.substr(0, slash);
std::string netmask = "255.255.255.255";
if(slash != std::string::npos) {
netmask = network.substr(slash+1);
}
return std::make_tuple(boost::asio::ip::address::from_string(address),
boost::asio::ip::address::from_string(netmask));
}
boost::asio::ip::address NameService::find_interface(const std::string& netmask)
{
std::string local_mask(netmask);
std::string mc_mask("255.255.255.255");
auto n = netmask.find('/');
if (n != std::string::npos) {
local_mask = netmask.substr(0,n);
mc_mask = netmask.substr(n+1);
}
auto addr = boost::asio::ip::address::from_string(local_mask).to_v4();
auto mask = boost::asio::ip::address::from_string(mc_mask).to_v4();
auto interfaces = daq::transport::Interface::interfaces();
for(auto intf : interfaces) {
if(!intf->has_flag(IFF_UP) || intf->has_flag(IFF_LOOPBACK)) {
continue;
}
if(intf->family() != AF_INET) {
continue;
}
if(intf->address() == 0) {
continue;
}
if((intf->address() & htonl(mask.to_ulong())) == htonl(addr.to_ulong())) {
return boost::asio::ip::address::from_string(intf->address_string());
}
}
return boost::asio::ip::address();
}
}
}