Skip to content
Snippets Groups Projects
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
NameService.cxx 11.22 KiB

#include "asyncmsg/NameService.h"
#include "transport/Interface.h"

#include "MsgInfo.h"

#include "is/infostream.h"
#include "is/infodictionary.h"
#include "is/serveriterator.h"

namespace daq {

    namespace asyncmsg {

        NameService::NameService(IPCPartition partition, const std::vector<std::string>& data_networks, const std::string& is_server) noexcept
            : m_partition(partition),
              m_is_server(is_server)
        {

            if(const char *server = getenv("DF_CONFIG_IS_SERVER")) {
                // override user argument
                m_is_server = server;
            }

            for(const auto & network : data_networks) {
                m_data_networks.push_back(parse_address_network(network));
            }
        }

        NameService::~NameService() noexcept
        {
            // nothing to do
        }

        // Publish IS object with all valid local interfaces and port number.
        // The 'name' parameter should typically be the application name.
        void NameService::publish(const std::string& name, uint16_t port)
        {
            MsgInfo info; 

            char host[1024];
            gethostname(host, sizeof(host));

            info.Hostname = host;
            info.Port     = port;

            const daq::transport::Interface::InterfaceList& interfaces = daq::transport::Interface::interfaces();

            for(auto intf : interfaces) {

                // check for invalid interfaces
                // !UP, LOOPBACK, address == 0 (e.g. when bonded)

                if(!intf->has_flag(IFF_UP) || intf->has_flag(IFF_LOOPBACK)) {
                    continue;
                }

                // An interface maybe up, but no IP address set, e.g. if it
                // is used in a bonded configuration.
                if(intf->family() == AF_INET && intf->address() == 0) {
                    continue;
                }

                if(intf->family() == AF_INET6) {
                    auto a = intf->address_v6();
                    if(memcmp(&a,&in6addr_any, sizeof(in6addr_any)) == 0) {
                        continue;
                    }
                }
                for(const auto& network : m_data_networks) {

                    auto& addr = std::get<0>(network);
                    auto& mask = std::get<1>(network);

                    if(addr.is_v4() && intf->family() == AF_INET) {
		        auto addr4(addr.to_v4());
			auto mask4(mask.to_v4());

                        if(htonl(addr4.to_ulong() & mask4.to_ulong()) == (intf->address() & htonl(mask4.to_ulong()))) {
			    info.Addresses.push_back(intf->address_string() + '/' +  mask4.to_string());
                        }

                    } else if(addr.is_v6() && intf->family() == AF_INET6) {
                        auto addr6(addr.to_v6().to_bytes());
                        auto mask6(mask.to_v6().to_bytes());

                        auto local_network = intf->network_v6();

                        bool matches = true;
                        for(size_t i = 0; i < addr6.size(); i++) {
                            if((addr6[i] & mask6[i]) != local_network.s6_addr[i]) {
                                matches = false;
                                break;
                            }
                        }
                        
                        if(matches) {
                            info.Addresses.push_back(intf->address_string() + '/' + intf->netmask_string());
                        }
                    } 
                }
            }

            if(info.Addresses.empty()) {
                throw CannotPublish(ERS_HERE);
            }

            ISInfoDictionary d(m_partition);

            ISServerIterator servers(m_partition, m_is_server + ".*");
            if(servers.entries() == 0) {
                throw InvalidISServer(ERS_HERE,m_is_server);
            }
            while(servers++) {
                // may throw
                try {
                   d.checkin(std::string(servers.name()) + ".MSG_" + name, info);
                } catch(ers::Issue& reason) {
                   throw InvalidISServer(ERS_HERE, servers.name(), reason);
                }
            }

        }

        bool NameService::matches(const boost::asio::ip::address& addr, const boost::asio::ip::address& mask) const
        {
            for(const auto intf : daq::transport::Interface::interfaces()) {

                // ignore these
                if(!intf->has_flag(IFF_UP) || intf->has_flag(IFF_LOOPBACK) || intf->address() == 0) {
                    continue;
                }
                
                // compare IPV4 address
                if(addr.is_v4() && intf->family() == AF_INET) {
		    if(htonl(addr.to_v4().to_ulong() & mask.to_v4().to_ulong()) == (intf->network() & htonl(mask.to_v4().to_ulong()))) {
                        // found one, first match will be used
                        return true;
                    }
                } else if (addr.is_v6() && intf->family() == AF_INET6) {
                    auto addr6(addr.to_v6().to_bytes());
                    auto mask6(mask.to_v6().to_bytes());

                    auto local_network = intf->network_v6();

                    bool matches = true;
                    for(size_t i = 0; i < addr6.size(); i++) {
                        if((addr6[i] & mask6[i]) != local_network.s6_addr[i]) {
                            matches = false;
                            break;
                        }
                    }

                    return matches;
                }
            }
            
            // no match found at all
            return false;
        }
                
        // Look up all applications of given 'type' and return a list of endpoints
        std::vector<boost::asio::ip::tcp::endpoint> 
        NameService::lookup(const std::string& prefix) const
        {

            std::vector<boost::asio::ip::tcp::endpoint> results;

            ISInfoStream it(m_partition, m_is_server, ISCriteria("MSG_" + prefix + ".*", MsgInfo::type()));

            while(!it.eof()) {
                std::string name(it.name());
                MsgInfo     info;

                it >> info;

                // match my local interfaces against the list
                for(auto& addr_netmask : info.Addresses) {
                    boost::asio::ip::address addr, mask;
                    std::tie(addr, mask) = parse_address_network(addr_netmask);

                    if(matches(addr, mask)) {
                        results.push_back(boost::asio::ip::tcp::endpoint(addr, info.Port));
                        break;
                    }
                }
            }

            return results;
        }

        // Look up all applications of given 'type' and return a map of names and endpoints
        std::map<std::string,boost::asio::ip::tcp::endpoint> 
        NameService::lookup_names(const std::string& prefix) const
        {

            std::map<std::string,boost::asio::ip::tcp::endpoint> results;

            ISInfoStream it(m_partition, m_is_server, ISCriteria("MSG_" + prefix + ".*", MsgInfo::type()));

            while(!it.eof()) {
                std::string name(it.name());
                MsgInfo     info;

                it >> info;

                // match my local interfaces against the list
                for(auto& addr_netmask : info.Addresses) {
                    boost::asio::ip::address addr, mask;
                    std::tie(addr, mask) = parse_address_network(addr_netmask);

                    if(matches(addr, mask)) {
                        results[name.substr(m_is_server.size() + 5)] = boost::asio::ip::tcp::endpoint(addr, info.Port);
                        break;
                    }
                }
            }

            return results;
        }

        // Look up all applications of given 'type' and return a map of all names and endpoints that match
        std::map<std::string, std::vector<boost::asio::ip::tcp::endpoint>>
        NameService::lookup_all(const std::string& prefix) const
        {

            std::map<std::string,std::vector<boost::asio::ip::tcp::endpoint>> results;

            ISInfoStream it(m_partition, m_is_server, ISCriteria("MSG_" + prefix + ".*", MsgInfo::type()));

            while(!it.eof()) {
                std::string name(it.name());
                MsgInfo     info;

                it >> info;

                // match my local interfaces against the list
                for(auto& addr_netmask : info.Addresses) {
                    boost::asio::ip::address addr, mask;
                    std::tie(addr, mask) = parse_address_network(addr_netmask);

                    if(matches(addr, mask)) {
                        results[name.substr(m_is_server.size() + 5)].push_back(boost::asio::ip::tcp::endpoint(addr, info.Port));
                    }
                }
            }

            return results;
        }


        boost::asio::ip::tcp::endpoint 
        NameService::resolve(const std::string& name) const
        {
            ISInfoDictionary d(m_partition);

            MsgInfo info;

            d.getValue(m_is_server + ".MSG_" + name, info);

            for(auto& addr_netmask : info.Addresses) {
                boost::asio::ip::address addr, mask;
                std::tie(addr, mask) = parse_address_network(addr_netmask);
                
                if(matches(addr, mask)) {
                    return boost::asio::ip::tcp::endpoint(addr, info.Port);                    
                }
            }

            throw CannotResolve(ERS_HERE,name);

        }

        NameService::Network 
        NameService::parse_address_network(const std::string& network)
        {
            size_t slash = network.find('/');
            std::string address = network.substr(0, slash);
            std::string netmask = "255.255.255.255";
            if(slash != std::string::npos) {
                netmask = network.substr(slash+1);
            }

            return std::make_tuple(boost::asio::ip::address::from_string(address),
                                   boost::asio::ip::address::from_string(netmask));
        }

        boost::asio::ip::address NameService::find_interface(const std::string& netmask)
        {
            std::string local_mask(netmask);
	    std::string mc_mask("255.255.255.255");

            auto n = netmask.find('/');
            if (n != std::string::npos) {
                local_mask = netmask.substr(0,n);
                mc_mask = netmask.substr(n+1);
            }

            auto addr = boost::asio::ip::address::from_string(local_mask).to_v4();
	    auto mask = boost::asio::ip::address::from_string(mc_mask).to_v4();

            auto interfaces = daq::transport::Interface::interfaces();
            for(auto intf : interfaces) {
                if(!intf->has_flag(IFF_UP) || intf->has_flag(IFF_LOOPBACK)) {
                    continue;
                }

                if(intf->family() != AF_INET) {
                    continue;
                }

                if(intf->address() == 0) {
                    continue;
                }

                if((intf->address() & htonl(mask.to_ulong())) == htonl(addr.to_ulong())) {
                    return boost::asio::ip::address::from_string(intf->address_string());
                }
            }

            return boost::asio::ip::address();
        }

    }
}