diff options
author | Devtools Arcadia <arcadia-devtools@yandex-team.ru> | 2022-02-07 18:08:42 +0300 |
---|---|---|
committer | Devtools Arcadia <arcadia-devtools@mous.vla.yp-c.yandex.net> | 2022-02-07 18:08:42 +0300 |
commit | 1110808a9d39d4b808aef724c861a2e1a38d2a69 (patch) | |
tree | e26c9fed0de5d9873cce7e00bc214573dc2195b7 /library/cpp/messagebus/locator.cpp | |
download | ydb-1110808a9d39d4b808aef724c861a2e1a38d2a69.tar.gz |
intermediate changes
ref:cde9a383711a11544ce7e107a78147fb96cc4029
Diffstat (limited to 'library/cpp/messagebus/locator.cpp')
-rw-r--r-- | library/cpp/messagebus/locator.cpp | 427 |
1 files changed, 427 insertions, 0 deletions
diff --git a/library/cpp/messagebus/locator.cpp b/library/cpp/messagebus/locator.cpp new file mode 100644 index 0000000000..e38a35c426 --- /dev/null +++ b/library/cpp/messagebus/locator.cpp @@ -0,0 +1,427 @@ +//////////////////////////////////////////////////////////////////////////// +/// \file +/// \brief Implementation of locator service + +#include "locator.h" + +#include "ybus.h" + +#include <util/generic/hash_set.h> +#include <util/system/hostname.h> + +namespace NBus { + using namespace NAddr; + + static TIpPort GetAddrPort(const IRemoteAddr& addr) { + switch (addr.Addr()->sa_family) { + case AF_INET: { + return ntohs(((const sockaddr_in*)addr.Addr())->sin_port); + } + + case AF_INET6: { + return ntohs(((const sockaddr_in6*)addr.Addr())->sin6_port); + } + + default: { + ythrow yexception() << "not implemented"; + break; + } + } + } + + static inline bool GetIp6AddressFromVector(const TVector<TNetAddr>& addrs, TNetAddr* addr) { + for (size_t i = 1; i < addrs.size(); ++i) { + if (addrs[i - 1].Addr()->sa_family == addrs[i].Addr()->sa_family) { + return false; + } + + if (GetAddrPort(addrs[i - 1]) != GetAddrPort(addrs[i])) { + return false; + } + } + + for (size_t i = 0; i < addrs.size(); ++i) { + if (addrs[i].Addr()->sa_family == AF_INET6) { + *addr = addrs[i]; + return true; + } + } + + return false; + } + + EMessageStatus TBusProtocol::GetDestination(const TBusClientSession*, TBusMessage* mess, TBusLocator* locator, TNetAddr* addr) { + TBusService service = GetService(); + TBusKey key = GetKey(mess); + TVector<TNetAddr> addrs; + + /// check for special local key + if (key == YBUS_KEYLOCAL) { + locator->GetLocalAddresses(service, addrs); + } else { + /// lookup address/port in the locator table + locator->LocateAll(service, key, addrs); + } + + if (addrs.size() == 0) { + return MESSAGE_SERVICE_UNKNOWN; + } else if (addrs.size() == 1) { + *addr = addrs[0]; + } else { + if (!GetIp6AddressFromVector(addrs, addr)) { + /// default policy can't make choice for you here, overide GetDestination() function + /// to implement custom routing strategy for your service. + return MESSAGE_SERVICE_TOOMANY; + } + } + + return MESSAGE_OK; + } + + static const sockaddr_in* SockAddrIpV4(const IRemoteAddr& a) { + return (const sockaddr_in*)a.Addr(); + } + + static const sockaddr_in6* SockAddrIpV6(const IRemoteAddr& a) { + return (const sockaddr_in6*)a.Addr(); + } + + static bool IsAddressEqual(const IRemoteAddr& a1, const IRemoteAddr& a2) { + if (a1.Addr()->sa_family == a2.Addr()->sa_family) { + if (a1.Addr()->sa_family == AF_INET) { + return memcmp(&SockAddrIpV4(a1)->sin_addr, &SockAddrIpV4(a2)->sin_addr, sizeof(in_addr)) == 0; + } else { + return memcmp(&SockAddrIpV6(a1)->sin6_addr, &SockAddrIpV6(a2)->sin6_addr, sizeof(in6_addr)) == 0; + } + } + return false; + } + + TBusLocator::TBusLocator() + : MyInterfaces(GetNetworkInterfaces()) + { + } + + bool TBusLocator::TItem::operator<(const TItem& y) const { + const TItem& x = *this; + + if (x.ServiceId == y.ServiceId) { + return (x.End < y.End) || ((x.End == y.End) && CompareByHost(x.Addr, y.Addr) < 0); + } + return x.ServiceId < y.ServiceId; + } + + bool TBusLocator::TItem::operator==(const TItem& y) const { + return ServiceId == y.ServiceId && Start == y.Start && End == y.End && Addr == y.Addr; + } + + TBusLocator::TItem::TItem(TServiceId serviceId, TBusKey start, TBusKey end, const TNetAddr& addr) + : ServiceId(serviceId) + , Start(start) + , End(end) + , Addr(addr) + { + } + + bool TBusLocator::IsLocal(const TNetAddr& addr) { + for (const auto& myInterface : MyInterfaces) { + if (IsAddressEqual(addr, *myInterface.Address)) { + return true; + } + } + + return false; + } + + TBusLocator::TServiceId TBusLocator::GetServiceId(const char* name) { + const char* c = ServiceIdSet.insert(name).first->c_str(); + return (ui64)c; + } + + int TBusLocator::RegisterBreak(TBusService service, const TVector<TBusKey>& starts, const TNetAddr& addr) { + TGuard<TMutex> G(Lock); + + TServiceId serviceId = GetServiceId(service); + for (size_t i = 0; i < starts.size(); ++i) { + RegisterBreak(serviceId, starts[i], addr); + } + return 0; + } + + int TBusLocator::RegisterBreak(TServiceId serviceId, const TBusKey start, const TNetAddr& addr) { + TItems::const_iterator it = Items.lower_bound(TItem(serviceId, 0, start, addr)); + TItems::const_iterator service_it = + Items.lower_bound(TItem(serviceId, 0, 0, TNetAddr())); + + THolder<TItem> left; + THolder<TItem> right; + if ((it != Items.end() || Items.begin() != Items.end()) && service_it != Items.end() && service_it->ServiceId == serviceId) { + if (it == Items.end()) { + --it; + } + const TItem& item = *it; + left.Reset(new TItem(serviceId, item.Start, + Max<TBusKey>(item.Start, start - 1), item.Addr)); + right.Reset(new TItem(serviceId, start, item.End, addr)); + Items.erase(*it); + } else { + left.Reset(new TItem(serviceId, YBUS_KEYMIN, start, addr)); + if (start < YBUS_KEYMAX) { + right.Reset(new TItem(serviceId, start + 1, YBUS_KEYMAX, addr)); + } + } + Items.insert(*left); + Items.insert(*right); + NormalizeBreaks(serviceId); + return 0; + } + + int TBusLocator::UnregisterBreak(TBusService service, const TNetAddr& addr) { + TGuard<TMutex> G(Lock); + + TServiceId serviceId = GetServiceId(service); + return UnregisterBreak(serviceId, addr); + } + + int TBusLocator::UnregisterBreak(TServiceId serviceId, const TNetAddr& addr) { + int deleted = 0; + TItems::iterator it = Items.begin(); + while (it != Items.end()) { + const TItem& item = *it; + if (item.ServiceId != serviceId) { + ++it; + continue; + } + TItems::iterator itErase = it++; + if (item.ServiceId == serviceId && item.Addr == addr) { + Items.erase(itErase); + deleted += 1; + } + } + + if (Items.begin() == Items.end()) { + return deleted; + } + TBusKey keyItem = YBUS_KEYMAX; + it = Items.end(); + TItems::iterator first = it; + do { + --it; + // item.Start is not used in set comparison function + // so you can't violate set sort order by changing it + // hence const_cast() + TItem& item = const_cast<TItem&>(*it); + if (item.ServiceId != serviceId) { + continue; + } + first = it; + if (item.End < keyItem) { + item.End = keyItem; + } + keyItem = item.Start - 1; + } while (it != Items.begin()); + + if (first != Items.end() && first->Start != 0) { + TItem item(serviceId, YBUS_KEYMIN, first->Start - 1, first->Addr); + Items.insert(item); + } + + NormalizeBreaks(serviceId); + return deleted; + } + + void TBusLocator::NormalizeBreaks(TServiceId serviceId) { + TItems::const_iterator first = Items.lower_bound(TItem(serviceId, YBUS_KEYMIN, YBUS_KEYMIN, TNetAddr())); + TItems::const_iterator last = Items.end(); + + if ((Items.end() != first) && (first->ServiceId == serviceId)) { + if (serviceId != Max<TServiceId>()) { + last = Items.lower_bound(TItem(serviceId + 1, YBUS_KEYMIN, YBUS_KEYMIN, TNetAddr())); + } + + --last; + Y_ASSERT(Items.end() != last); + Y_ASSERT(last->ServiceId == serviceId); + + TItem& beg = const_cast<TItem&>(*first); + beg.Addr = last->Addr; + } + } + + int TBusLocator::LocateAll(TBusService service, TBusKey key, TVector<TNetAddr>& addrs) { + TGuard<TMutex> G(Lock); + Y_VERIFY(addrs.empty(), "Non emtpy addresses"); + + TServiceId serviceId = GetServiceId(service); + TItems::const_iterator it; + + for (it = Items.lower_bound(TItem(serviceId, 0, key, TNetAddr())); + it != Items.end() && it->ServiceId == serviceId && it->Start <= key && key <= it->End; + ++it) { + const TItem& item = *it; + addrs.push_back(item.Addr); + } + + if (addrs.size() == 0) { + return -1; + } + return (int)addrs.size(); + } + + int TBusLocator::Locate(TBusService service, TBusKey key, TNetAddr* addr) { + TGuard<TMutex> G(Lock); + + TServiceId serviceId = GetServiceId(service); + TItems::const_iterator it; + + it = Items.lower_bound(TItem(serviceId, 0, key, TNetAddr())); + + if (it != Items.end()) { + const TItem& item = *it; + if (item.ServiceId == serviceId && item.Start <= key && key < item.End) { + *addr = item.Addr; + + return 0; + } + } + + return -1; + } + + int TBusLocator::GetLocalPort(TBusService service) { + TGuard<TMutex> G(Lock); + TServiceId serviceId = GetServiceId(service); + TItems::const_iterator it; + int port = 0; + + for (it = Items.lower_bound(TItem(serviceId, 0, 0, TNetAddr())); it != Items.end(); ++it) { + const TItem& item = *it; + if (item.ServiceId != serviceId) { + break; + } + + if (IsLocal(item.Addr)) { + if (port != 0 && port != GetAddrPort(item.Addr)) { + Y_ASSERT(0 && "Can't decide which port to use."); + return 0; + } + port = GetAddrPort(item.Addr); + } + } + + return port; + } + + int TBusLocator::GetLocalAddresses(TBusService service, TVector<TNetAddr>& addrs) { + TGuard<TMutex> G(Lock); + TServiceId serviceId = GetServiceId(service); + TItems::const_iterator it; + + for (it = Items.lower_bound(TItem(serviceId, 0, 0, TNetAddr())); it != Items.end(); ++it) { + const TItem& item = *it; + if (item.ServiceId != serviceId) { + break; + } + + if (IsLocal(item.Addr)) { + addrs.push_back(item.Addr); + } + } + + if (addrs.size() == 0) { + return -1; + } + + return (int)addrs.size(); + } + + int TBusLocator::LocateHost(TBusService service, TBusKey key, TString* host, int* port, bool* isLocal) { + int ret; + TNetAddr addr; + ret = Locate(service, key, &addr); + if (ret != 0) { + return ret; + } + + { + TGuard<TMutex> G(Lock); + THostAddrMap::const_iterator it = HostAddrMap.find(addr); + if (it == HostAddrMap.end()) { + return -1; + } + *host = it->second; + } + + *port = GetAddrPort(addr); + if (isLocal != nullptr) { + *isLocal = IsLocal(addr); + } + return 0; + } + + int TBusLocator::LocateKeys(TBusService service, TBusKeyVec& keys, bool onlyLocal) { + TGuard<TMutex> G(Lock); + Y_VERIFY(keys.empty(), "Non empty keys"); + + TServiceId serviceId = GetServiceId(service); + TItems::const_iterator it; + for (it = Items.begin(); it != Items.end(); ++it) { + const TItem& item = *it; + if (item.ServiceId != serviceId) { + continue; + } + if (onlyLocal && !IsLocal(item.Addr)) { + continue; + } + keys.push_back(std::make_pair(item.Start, item.End)); + } + return (int)keys.size(); + } + + int TBusLocator::Register(TBusService service, const char* hostName, int port, TBusKey start /*= YBUS_KEYMIN*/, TBusKey end /*= YBUS_KEYMAX*/, EIpVersion requireVersion /*= EIP_VERSION_4*/, EIpVersion preferVersion /*= EIP_VERSION_ANY*/) { + TNetAddr addr(hostName, port, requireVersion, preferVersion); // throws + { + TGuard<TMutex> G(Lock); + HostAddrMap[addr] = hostName; + } + Register(service, start, end, addr); + return 0; + } + + int TBusLocator::Register(TBusService service, TBusKey start, TBusKey end, const TNetworkAddress& na, EIpVersion requireVersion /*= EIP_VERSION_4*/, EIpVersion preferVersion /*= EIP_VERSION_ANY*/) { + TNetAddr addr(na, requireVersion, preferVersion); // throws + Register(service, start, end, addr); + return 0; + } + + int TBusLocator::Register(TBusService service, TBusKey start, TBusKey end, const TNetAddr& addr) { + TGuard<TMutex> G(Lock); + + TServiceId serviceId = GetServiceId(service); + TItems::const_iterator it; + + TItem itemToReg(serviceId, start, end, addr); + for (it = Items.lower_bound(TItem(serviceId, 0, start, TNetAddr())); + it != Items.end() && it->ServiceId == serviceId; + ++it) { + const TItem& item = *it; + if (item == itemToReg) { + return 0; + } + if ((item.Start < start && start < item.End) || (item.Start < end && end < item.End)) { + Y_FAIL("Overlap in registered keys with non-identical range"); + } + } + + Items.insert(itemToReg); + return 0; + } + + int TBusLocator::Unregister(TBusService service, TBusKey start, TBusKey end) { + TGuard<TMutex> G(Lock); + TServiceId serviceId = GetServiceId(service); + Items.erase(TItem(serviceId, start, end, TNetAddr())); + return 0; + } + +} |