aboutsummaryrefslogblamecommitdiffstats
path: root/library/cpp/messagebus/locator.cpp
blob: e38a35c426f5a778b50c1bcf1b71ee00f2d56e82 (plain) (tree)
1
2
3
4
5
6
7
8
9
                                                                            
         
                                            
                    
 
                 
                                  
                                 
                
                          
 



                                                                          
 






                                                                            
         
     
 







                                                                                                
         



                                                         
         
                     
     









                                                                                                                                    
         








                                                                                                    
         
                          
     
                                                                  
     
                                                                   
     





                                                                                                                 
         
                     
     
 


                                              
 
                                                              
 
                                                                                              
         
                                         
     

                                                                                              
 




                                                                                                     
     
 




                                                             
 
                     
 

                                                                         
     
 
                                                                                                               
 



                                                      
     

                                                                                                     
                                                                  
 














                                                                                                                                     
         


                                   
     
                                                                                 
 
                                                     
     
 












                                                                                  
         
 



















                                                                 
 

                                                                              
         
 
                                   

                       

                                                                                                                 
 


                                                                                                     
 

                                                   
 
                                                    
         
     

                                                                                            
 
                                                     
 




                                                                                                   
 


                                 
     
 
                                                                               
 
                                                     
 
                                                                     
 


                                                                                     
 
                         
         
                  
     



                                                        
 



                                                                                                   
 





                                                                     
         
 
                    
     
 


                                                                                       
 



                                                                                                   
 

                                           
         
 
                                
         
                                 
     





                                                                                                            
 












                                                                     
     
                                                                                        
                               











                                                                 
         
                                
     






                                                                                                                                                                                                                                                   
     
 


                                                                                                                                                                                                          
     
                                                                                                      
                               
 
                                                     
 










                                                                                                   
 

                                
 



                                                                                  
     
 
////////////////////////////////////////////////////////////////////////////
/// \file
/// \brief Implementation of locator service

#include "locator.h"

#include "ybus.h"

#include <util/generic/hash_set.h>
#include <util/system/hostname.h>

namespace NBus {
    using namespace NAddr;

    static TIpPort GetAddrPort(const IRemoteAddr& addr) {
        switch (addr.Addr()->sa_family) {
            case AF_INET: {
                return ntohs(((const sockaddr_in*)addr.Addr())->sin_port);
            }

            case AF_INET6: {
                return ntohs(((const sockaddr_in6*)addr.Addr())->sin6_port);
            }

            default: {
                ythrow yexception() << "not implemented";
                break;
            }
        }
    }

    static inline bool GetIp6AddressFromVector(const TVector<TNetAddr>& addrs, TNetAddr* addr) {
        for (size_t i = 1; i < addrs.size(); ++i) {
            if (addrs[i - 1].Addr()->sa_family == addrs[i].Addr()->sa_family) {
                return false;
            }

            if (GetAddrPort(addrs[i - 1]) != GetAddrPort(addrs[i])) {
                return false;
            }
        }

        for (size_t i = 0; i < addrs.size(); ++i) {
            if (addrs[i].Addr()->sa_family == AF_INET6) {
                *addr = addrs[i];
                return true;
            }
        }

        return false;
    }

    EMessageStatus TBusProtocol::GetDestination(const TBusClientSession*, TBusMessage* mess, TBusLocator* locator, TNetAddr* addr) {
        TBusService service = GetService();
        TBusKey key = GetKey(mess);
        TVector<TNetAddr> addrs;

        /// check for special local key
        if (key == YBUS_KEYLOCAL) {
            locator->GetLocalAddresses(service, addrs);
        } else {
            /// lookup address/port in the locator table
            locator->LocateAll(service, key, addrs);
        }

        if (addrs.size() == 0) {
            return MESSAGE_SERVICE_UNKNOWN;
        } else if (addrs.size() == 1) {
            *addr = addrs[0];
        } else {
            if (!GetIp6AddressFromVector(addrs, addr)) {
                /// default policy can't make choice for you here, overide GetDestination() function
                /// to implement custom routing strategy for your service.
                return MESSAGE_SERVICE_TOOMANY;
            }
        }

        return MESSAGE_OK;
    }

    static const sockaddr_in* SockAddrIpV4(const IRemoteAddr& a) {
        return (const sockaddr_in*)a.Addr();
    }

    static const sockaddr_in6* SockAddrIpV6(const IRemoteAddr& a) {
        return (const sockaddr_in6*)a.Addr();
    }

    static bool IsAddressEqual(const IRemoteAddr& a1, const IRemoteAddr& a2) {
        if (a1.Addr()->sa_family == a2.Addr()->sa_family) {
            if (a1.Addr()->sa_family == AF_INET) {
                return memcmp(&SockAddrIpV4(a1)->sin_addr, &SockAddrIpV4(a2)->sin_addr, sizeof(in_addr)) == 0;
            } else {
                return memcmp(&SockAddrIpV6(a1)->sin6_addr, &SockAddrIpV6(a2)->sin6_addr, sizeof(in6_addr)) == 0;
            }
        }
        return false;
    }

    TBusLocator::TBusLocator()
        : MyInterfaces(GetNetworkInterfaces())
    {
    }

    bool TBusLocator::TItem::operator<(const TItem& y) const {
        const TItem& x = *this;

        if (x.ServiceId == y.ServiceId) {
            return (x.End < y.End) || ((x.End == y.End) && CompareByHost(x.Addr, y.Addr) < 0);
        }
        return x.ServiceId < y.ServiceId;
    }

    bool TBusLocator::TItem::operator==(const TItem& y) const {
        return ServiceId == y.ServiceId && Start == y.Start && End == y.End && Addr == y.Addr;
    }

    TBusLocator::TItem::TItem(TServiceId serviceId, TBusKey start, TBusKey end, const TNetAddr& addr)
        : ServiceId(serviceId)
        , Start(start)
        , End(end)
        , Addr(addr)
    {
    }

    bool TBusLocator::IsLocal(const TNetAddr& addr) {
        for (const auto& myInterface : MyInterfaces) {
            if (IsAddressEqual(addr, *myInterface.Address)) {
                return true;
            }
        }

        return false;
    }

    TBusLocator::TServiceId TBusLocator::GetServiceId(const char* name) {
        const char* c = ServiceIdSet.insert(name).first->c_str();
        return (ui64)c;
    }

    int TBusLocator::RegisterBreak(TBusService service, const TVector<TBusKey>& starts, const TNetAddr& addr) {
        TGuard<TMutex> G(Lock);

        TServiceId serviceId = GetServiceId(service);
        for (size_t i = 0; i < starts.size(); ++i) {
            RegisterBreak(serviceId, starts[i], addr);
        }
        return 0;
    }

    int TBusLocator::RegisterBreak(TServiceId serviceId, const TBusKey start, const TNetAddr& addr) {
        TItems::const_iterator it = Items.lower_bound(TItem(serviceId, 0, start, addr));
        TItems::const_iterator service_it =
            Items.lower_bound(TItem(serviceId, 0, 0, TNetAddr()));

        THolder<TItem> left;
        THolder<TItem> right;
        if ((it != Items.end() || Items.begin() != Items.end()) && service_it != Items.end() && service_it->ServiceId == serviceId) {
            if (it == Items.end()) {
                --it;
            }
            const TItem& item = *it;
            left.Reset(new TItem(serviceId, item.Start,
                                 Max<TBusKey>(item.Start, start - 1), item.Addr));
            right.Reset(new TItem(serviceId, start, item.End, addr));
            Items.erase(*it);
        } else {
            left.Reset(new TItem(serviceId, YBUS_KEYMIN, start, addr));
            if (start < YBUS_KEYMAX) {
                right.Reset(new TItem(serviceId, start + 1, YBUS_KEYMAX, addr));
            }
        }
        Items.insert(*left);
        Items.insert(*right);
        NormalizeBreaks(serviceId);
        return 0;
    }

    int TBusLocator::UnregisterBreak(TBusService service, const TNetAddr& addr) {
        TGuard<TMutex> G(Lock);

        TServiceId serviceId = GetServiceId(service);
        return UnregisterBreak(serviceId, addr);
    }

    int TBusLocator::UnregisterBreak(TServiceId serviceId, const TNetAddr& addr) {
        int deleted = 0;
        TItems::iterator it = Items.begin();
        while (it != Items.end()) {
            const TItem& item = *it;
            if (item.ServiceId != serviceId) {
                ++it;
                continue;
            }
            TItems::iterator itErase = it++;
            if (item.ServiceId == serviceId && item.Addr == addr) {
                Items.erase(itErase);
                deleted += 1;
            }
        }

        if (Items.begin() == Items.end()) {
            return deleted;
        }
        TBusKey keyItem = YBUS_KEYMAX;
        it = Items.end();
        TItems::iterator first = it;
        do {
            --it;
            // item.Start is not used in set comparison function
            // so you can't violate set sort order by changing it
            // hence const_cast()
            TItem& item = const_cast<TItem&>(*it);
            if (item.ServiceId != serviceId) {
                continue;
            }
            first = it;
            if (item.End < keyItem) {
                item.End = keyItem;
            }
            keyItem = item.Start - 1;
        } while (it != Items.begin());

        if (first != Items.end() && first->Start != 0) {
            TItem item(serviceId, YBUS_KEYMIN, first->Start - 1, first->Addr);
            Items.insert(item);
        }

        NormalizeBreaks(serviceId);
        return deleted;
    }

    void TBusLocator::NormalizeBreaks(TServiceId serviceId) {
        TItems::const_iterator first = Items.lower_bound(TItem(serviceId, YBUS_KEYMIN, YBUS_KEYMIN, TNetAddr()));
        TItems::const_iterator last = Items.end();

        if ((Items.end() != first) && (first->ServiceId == serviceId)) {
            if (serviceId != Max<TServiceId>()) {
                last = Items.lower_bound(TItem(serviceId + 1, YBUS_KEYMIN, YBUS_KEYMIN, TNetAddr()));
            }

            --last;
            Y_ASSERT(Items.end() != last);
            Y_ASSERT(last->ServiceId == serviceId);

            TItem& beg = const_cast<TItem&>(*first);
            beg.Addr = last->Addr;
        }
    }

    int TBusLocator::LocateAll(TBusService service, TBusKey key, TVector<TNetAddr>& addrs) {
        TGuard<TMutex> G(Lock);
        Y_VERIFY(addrs.empty(), "Non emtpy addresses");

        TServiceId serviceId = GetServiceId(service);
        TItems::const_iterator it;

        for (it = Items.lower_bound(TItem(serviceId, 0, key, TNetAddr()));
             it != Items.end() && it->ServiceId == serviceId && it->Start <= key && key <= it->End;
             ++it) {
            const TItem& item = *it;
            addrs.push_back(item.Addr);
        }

        if (addrs.size() == 0) {
            return -1;
        }
        return (int)addrs.size();
    }

    int TBusLocator::Locate(TBusService service, TBusKey key, TNetAddr* addr) {
        TGuard<TMutex> G(Lock);

        TServiceId serviceId = GetServiceId(service);
        TItems::const_iterator it;

        it = Items.lower_bound(TItem(serviceId, 0, key, TNetAddr()));

        if (it != Items.end()) {
            const TItem& item = *it;
            if (item.ServiceId == serviceId && item.Start <= key && key < item.End) {
                *addr = item.Addr;

                return 0;
            }
        }

        return -1;
    }

    int TBusLocator::GetLocalPort(TBusService service) {
        TGuard<TMutex> G(Lock);
        TServiceId serviceId = GetServiceId(service);
        TItems::const_iterator it;
        int port = 0;

        for (it = Items.lower_bound(TItem(serviceId, 0, 0, TNetAddr())); it != Items.end(); ++it) {
            const TItem& item = *it;
            if (item.ServiceId != serviceId) {
                break;
            }

            if (IsLocal(item.Addr)) {
                if (port != 0 && port != GetAddrPort(item.Addr)) {
                    Y_ASSERT(0 && "Can't decide which port to use.");
                    return 0;
                }
                port = GetAddrPort(item.Addr);
            }
        }

        return port;
    }

    int TBusLocator::GetLocalAddresses(TBusService service, TVector<TNetAddr>& addrs) {
        TGuard<TMutex> G(Lock);
        TServiceId serviceId = GetServiceId(service);
        TItems::const_iterator it;

        for (it = Items.lower_bound(TItem(serviceId, 0, 0, TNetAddr())); it != Items.end(); ++it) {
            const TItem& item = *it;
            if (item.ServiceId != serviceId) {
                break;
            }

            if (IsLocal(item.Addr)) {
                addrs.push_back(item.Addr);
            }
        }

        if (addrs.size() == 0) {
            return -1;
        }

        return (int)addrs.size();
    }

    int TBusLocator::LocateHost(TBusService service, TBusKey key, TString* host, int* port, bool* isLocal) {
        int ret;
        TNetAddr addr;
        ret = Locate(service, key, &addr);
        if (ret != 0) {
            return ret;
        }

        {
            TGuard<TMutex> G(Lock);
            THostAddrMap::const_iterator it = HostAddrMap.find(addr);
            if (it == HostAddrMap.end()) {
                return -1;
            }
            *host = it->second;
        }

        *port = GetAddrPort(addr);
        if (isLocal != nullptr) {
            *isLocal = IsLocal(addr);
        }
        return 0;
    }

    int TBusLocator::LocateKeys(TBusService service, TBusKeyVec& keys, bool onlyLocal) {
        TGuard<TMutex> G(Lock);
        Y_VERIFY(keys.empty(), "Non empty keys");

        TServiceId serviceId = GetServiceId(service);
        TItems::const_iterator it;
        for (it = Items.begin(); it != Items.end(); ++it) {
            const TItem& item = *it;
            if (item.ServiceId != serviceId) {
                continue;
            }
            if (onlyLocal && !IsLocal(item.Addr)) {
                continue;
            }
            keys.push_back(std::make_pair(item.Start, item.End));
        }
        return (int)keys.size();
    }

    int TBusLocator::Register(TBusService service, const char* hostName, int port, TBusKey start /*= YBUS_KEYMIN*/, TBusKey end /*= YBUS_KEYMAX*/, EIpVersion requireVersion /*= EIP_VERSION_4*/, EIpVersion preferVersion /*= EIP_VERSION_ANY*/) {
        TNetAddr addr(hostName, port, requireVersion, preferVersion); // throws
        {
            TGuard<TMutex> G(Lock);
            HostAddrMap[addr] = hostName;
        }
        Register(service, start, end, addr);
        return 0;
    }

    int TBusLocator::Register(TBusService service, TBusKey start, TBusKey end, const TNetworkAddress& na, EIpVersion requireVersion /*= EIP_VERSION_4*/, EIpVersion preferVersion /*= EIP_VERSION_ANY*/) {
        TNetAddr addr(na, requireVersion, preferVersion); // throws
        Register(service, start, end, addr);
        return 0;
    }

    int TBusLocator::Register(TBusService service, TBusKey start, TBusKey end, const TNetAddr& addr) {
        TGuard<TMutex> G(Lock);

        TServiceId serviceId = GetServiceId(service);
        TItems::const_iterator it;

        TItem itemToReg(serviceId, start, end, addr);
        for (it = Items.lower_bound(TItem(serviceId, 0, start, TNetAddr()));
             it != Items.end() && it->ServiceId == serviceId;
             ++it) {
            const TItem& item = *it;
            if (item == itemToReg) {
                return 0;
            }
            if ((item.Start < start && start < item.End) || (item.Start < end && end < item.End)) {
                Y_FAIL("Overlap in registered keys with non-identical range");
            }
        }

        Items.insert(itemToReg);
        return 0;
    }

    int TBusLocator::Unregister(TBusService service, TBusKey start, TBusKey end) {
        TGuard<TMutex> G(Lock);
        TServiceId serviceId = GetServiceId(service);
        Items.erase(TItem(serviceId, start, end, TNetAddr()));
        return 0;
    }

}