aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/messagebus/locator.cpp
diff options
context:
space:
mode:
authorDevtools Arcadia <arcadia-devtools@yandex-team.ru>2022-02-07 18:08:42 +0300
committerDevtools Arcadia <arcadia-devtools@mous.vla.yp-c.yandex.net>2022-02-07 18:08:42 +0300
commit1110808a9d39d4b808aef724c861a2e1a38d2a69 (patch)
treee26c9fed0de5d9873cce7e00bc214573dc2195b7 /library/cpp/messagebus/locator.cpp
downloadydb-1110808a9d39d4b808aef724c861a2e1a38d2a69.tar.gz
intermediate changes
ref:cde9a383711a11544ce7e107a78147fb96cc4029
Diffstat (limited to 'library/cpp/messagebus/locator.cpp')
-rw-r--r--library/cpp/messagebus/locator.cpp427
1 files changed, 427 insertions, 0 deletions
diff --git a/library/cpp/messagebus/locator.cpp b/library/cpp/messagebus/locator.cpp
new file mode 100644
index 0000000000..e38a35c426
--- /dev/null
+++ b/library/cpp/messagebus/locator.cpp
@@ -0,0 +1,427 @@
+////////////////////////////////////////////////////////////////////////////
+/// \file
+/// \brief Implementation of locator service
+
+#include "locator.h"
+
+#include "ybus.h"
+
+#include <util/generic/hash_set.h>
+#include <util/system/hostname.h>
+
+namespace NBus {
+ using namespace NAddr;
+
+ static TIpPort GetAddrPort(const IRemoteAddr& addr) {
+ switch (addr.Addr()->sa_family) {
+ case AF_INET: {
+ return ntohs(((const sockaddr_in*)addr.Addr())->sin_port);
+ }
+
+ case AF_INET6: {
+ return ntohs(((const sockaddr_in6*)addr.Addr())->sin6_port);
+ }
+
+ default: {
+ ythrow yexception() << "not implemented";
+ break;
+ }
+ }
+ }
+
+ static inline bool GetIp6AddressFromVector(const TVector<TNetAddr>& addrs, TNetAddr* addr) {
+ for (size_t i = 1; i < addrs.size(); ++i) {
+ if (addrs[i - 1].Addr()->sa_family == addrs[i].Addr()->sa_family) {
+ return false;
+ }
+
+ if (GetAddrPort(addrs[i - 1]) != GetAddrPort(addrs[i])) {
+ return false;
+ }
+ }
+
+ for (size_t i = 0; i < addrs.size(); ++i) {
+ if (addrs[i].Addr()->sa_family == AF_INET6) {
+ *addr = addrs[i];
+ return true;
+ }
+ }
+
+ return false;
+ }
+
+ EMessageStatus TBusProtocol::GetDestination(const TBusClientSession*, TBusMessage* mess, TBusLocator* locator, TNetAddr* addr) {
+ TBusService service = GetService();
+ TBusKey key = GetKey(mess);
+ TVector<TNetAddr> addrs;
+
+ /// check for special local key
+ if (key == YBUS_KEYLOCAL) {
+ locator->GetLocalAddresses(service, addrs);
+ } else {
+ /// lookup address/port in the locator table
+ locator->LocateAll(service, key, addrs);
+ }
+
+ if (addrs.size() == 0) {
+ return MESSAGE_SERVICE_UNKNOWN;
+ } else if (addrs.size() == 1) {
+ *addr = addrs[0];
+ } else {
+ if (!GetIp6AddressFromVector(addrs, addr)) {
+ /// default policy can't make choice for you here, overide GetDestination() function
+ /// to implement custom routing strategy for your service.
+ return MESSAGE_SERVICE_TOOMANY;
+ }
+ }
+
+ return MESSAGE_OK;
+ }
+
+ static const sockaddr_in* SockAddrIpV4(const IRemoteAddr& a) {
+ return (const sockaddr_in*)a.Addr();
+ }
+
+ static const sockaddr_in6* SockAddrIpV6(const IRemoteAddr& a) {
+ return (const sockaddr_in6*)a.Addr();
+ }
+
+ static bool IsAddressEqual(const IRemoteAddr& a1, const IRemoteAddr& a2) {
+ if (a1.Addr()->sa_family == a2.Addr()->sa_family) {
+ if (a1.Addr()->sa_family == AF_INET) {
+ return memcmp(&SockAddrIpV4(a1)->sin_addr, &SockAddrIpV4(a2)->sin_addr, sizeof(in_addr)) == 0;
+ } else {
+ return memcmp(&SockAddrIpV6(a1)->sin6_addr, &SockAddrIpV6(a2)->sin6_addr, sizeof(in6_addr)) == 0;
+ }
+ }
+ return false;
+ }
+
+ TBusLocator::TBusLocator()
+ : MyInterfaces(GetNetworkInterfaces())
+ {
+ }
+
+ bool TBusLocator::TItem::operator<(const TItem& y) const {
+ const TItem& x = *this;
+
+ if (x.ServiceId == y.ServiceId) {
+ return (x.End < y.End) || ((x.End == y.End) && CompareByHost(x.Addr, y.Addr) < 0);
+ }
+ return x.ServiceId < y.ServiceId;
+ }
+
+ bool TBusLocator::TItem::operator==(const TItem& y) const {
+ return ServiceId == y.ServiceId && Start == y.Start && End == y.End && Addr == y.Addr;
+ }
+
+ TBusLocator::TItem::TItem(TServiceId serviceId, TBusKey start, TBusKey end, const TNetAddr& addr)
+ : ServiceId(serviceId)
+ , Start(start)
+ , End(end)
+ , Addr(addr)
+ {
+ }
+
+ bool TBusLocator::IsLocal(const TNetAddr& addr) {
+ for (const auto& myInterface : MyInterfaces) {
+ if (IsAddressEqual(addr, *myInterface.Address)) {
+ return true;
+ }
+ }
+
+ return false;
+ }
+
+ TBusLocator::TServiceId TBusLocator::GetServiceId(const char* name) {
+ const char* c = ServiceIdSet.insert(name).first->c_str();
+ return (ui64)c;
+ }
+
+ int TBusLocator::RegisterBreak(TBusService service, const TVector<TBusKey>& starts, const TNetAddr& addr) {
+ TGuard<TMutex> G(Lock);
+
+ TServiceId serviceId = GetServiceId(service);
+ for (size_t i = 0; i < starts.size(); ++i) {
+ RegisterBreak(serviceId, starts[i], addr);
+ }
+ return 0;
+ }
+
+ int TBusLocator::RegisterBreak(TServiceId serviceId, const TBusKey start, const TNetAddr& addr) {
+ TItems::const_iterator it = Items.lower_bound(TItem(serviceId, 0, start, addr));
+ TItems::const_iterator service_it =
+ Items.lower_bound(TItem(serviceId, 0, 0, TNetAddr()));
+
+ THolder<TItem> left;
+ THolder<TItem> right;
+ if ((it != Items.end() || Items.begin() != Items.end()) && service_it != Items.end() && service_it->ServiceId == serviceId) {
+ if (it == Items.end()) {
+ --it;
+ }
+ const TItem& item = *it;
+ left.Reset(new TItem(serviceId, item.Start,
+ Max<TBusKey>(item.Start, start - 1), item.Addr));
+ right.Reset(new TItem(serviceId, start, item.End, addr));
+ Items.erase(*it);
+ } else {
+ left.Reset(new TItem(serviceId, YBUS_KEYMIN, start, addr));
+ if (start < YBUS_KEYMAX) {
+ right.Reset(new TItem(serviceId, start + 1, YBUS_KEYMAX, addr));
+ }
+ }
+ Items.insert(*left);
+ Items.insert(*right);
+ NormalizeBreaks(serviceId);
+ return 0;
+ }
+
+ int TBusLocator::UnregisterBreak(TBusService service, const TNetAddr& addr) {
+ TGuard<TMutex> G(Lock);
+
+ TServiceId serviceId = GetServiceId(service);
+ return UnregisterBreak(serviceId, addr);
+ }
+
+ int TBusLocator::UnregisterBreak(TServiceId serviceId, const TNetAddr& addr) {
+ int deleted = 0;
+ TItems::iterator it = Items.begin();
+ while (it != Items.end()) {
+ const TItem& item = *it;
+ if (item.ServiceId != serviceId) {
+ ++it;
+ continue;
+ }
+ TItems::iterator itErase = it++;
+ if (item.ServiceId == serviceId && item.Addr == addr) {
+ Items.erase(itErase);
+ deleted += 1;
+ }
+ }
+
+ if (Items.begin() == Items.end()) {
+ return deleted;
+ }
+ TBusKey keyItem = YBUS_KEYMAX;
+ it = Items.end();
+ TItems::iterator first = it;
+ do {
+ --it;
+ // item.Start is not used in set comparison function
+ // so you can't violate set sort order by changing it
+ // hence const_cast()
+ TItem& item = const_cast<TItem&>(*it);
+ if (item.ServiceId != serviceId) {
+ continue;
+ }
+ first = it;
+ if (item.End < keyItem) {
+ item.End = keyItem;
+ }
+ keyItem = item.Start - 1;
+ } while (it != Items.begin());
+
+ if (first != Items.end() && first->Start != 0) {
+ TItem item(serviceId, YBUS_KEYMIN, first->Start - 1, first->Addr);
+ Items.insert(item);
+ }
+
+ NormalizeBreaks(serviceId);
+ return deleted;
+ }
+
+ void TBusLocator::NormalizeBreaks(TServiceId serviceId) {
+ TItems::const_iterator first = Items.lower_bound(TItem(serviceId, YBUS_KEYMIN, YBUS_KEYMIN, TNetAddr()));
+ TItems::const_iterator last = Items.end();
+
+ if ((Items.end() != first) && (first->ServiceId == serviceId)) {
+ if (serviceId != Max<TServiceId>()) {
+ last = Items.lower_bound(TItem(serviceId + 1, YBUS_KEYMIN, YBUS_KEYMIN, TNetAddr()));
+ }
+
+ --last;
+ Y_ASSERT(Items.end() != last);
+ Y_ASSERT(last->ServiceId == serviceId);
+
+ TItem& beg = const_cast<TItem&>(*first);
+ beg.Addr = last->Addr;
+ }
+ }
+
+ int TBusLocator::LocateAll(TBusService service, TBusKey key, TVector<TNetAddr>& addrs) {
+ TGuard<TMutex> G(Lock);
+ Y_VERIFY(addrs.empty(), "Non emtpy addresses");
+
+ TServiceId serviceId = GetServiceId(service);
+ TItems::const_iterator it;
+
+ for (it = Items.lower_bound(TItem(serviceId, 0, key, TNetAddr()));
+ it != Items.end() && it->ServiceId == serviceId && it->Start <= key && key <= it->End;
+ ++it) {
+ const TItem& item = *it;
+ addrs.push_back(item.Addr);
+ }
+
+ if (addrs.size() == 0) {
+ return -1;
+ }
+ return (int)addrs.size();
+ }
+
+ int TBusLocator::Locate(TBusService service, TBusKey key, TNetAddr* addr) {
+ TGuard<TMutex> G(Lock);
+
+ TServiceId serviceId = GetServiceId(service);
+ TItems::const_iterator it;
+
+ it = Items.lower_bound(TItem(serviceId, 0, key, TNetAddr()));
+
+ if (it != Items.end()) {
+ const TItem& item = *it;
+ if (item.ServiceId == serviceId && item.Start <= key && key < item.End) {
+ *addr = item.Addr;
+
+ return 0;
+ }
+ }
+
+ return -1;
+ }
+
+ int TBusLocator::GetLocalPort(TBusService service) {
+ TGuard<TMutex> G(Lock);
+ TServiceId serviceId = GetServiceId(service);
+ TItems::const_iterator it;
+ int port = 0;
+
+ for (it = Items.lower_bound(TItem(serviceId, 0, 0, TNetAddr())); it != Items.end(); ++it) {
+ const TItem& item = *it;
+ if (item.ServiceId != serviceId) {
+ break;
+ }
+
+ if (IsLocal(item.Addr)) {
+ if (port != 0 && port != GetAddrPort(item.Addr)) {
+ Y_ASSERT(0 && "Can't decide which port to use.");
+ return 0;
+ }
+ port = GetAddrPort(item.Addr);
+ }
+ }
+
+ return port;
+ }
+
+ int TBusLocator::GetLocalAddresses(TBusService service, TVector<TNetAddr>& addrs) {
+ TGuard<TMutex> G(Lock);
+ TServiceId serviceId = GetServiceId(service);
+ TItems::const_iterator it;
+
+ for (it = Items.lower_bound(TItem(serviceId, 0, 0, TNetAddr())); it != Items.end(); ++it) {
+ const TItem& item = *it;
+ if (item.ServiceId != serviceId) {
+ break;
+ }
+
+ if (IsLocal(item.Addr)) {
+ addrs.push_back(item.Addr);
+ }
+ }
+
+ if (addrs.size() == 0) {
+ return -1;
+ }
+
+ return (int)addrs.size();
+ }
+
+ int TBusLocator::LocateHost(TBusService service, TBusKey key, TString* host, int* port, bool* isLocal) {
+ int ret;
+ TNetAddr addr;
+ ret = Locate(service, key, &addr);
+ if (ret != 0) {
+ return ret;
+ }
+
+ {
+ TGuard<TMutex> G(Lock);
+ THostAddrMap::const_iterator it = HostAddrMap.find(addr);
+ if (it == HostAddrMap.end()) {
+ return -1;
+ }
+ *host = it->second;
+ }
+
+ *port = GetAddrPort(addr);
+ if (isLocal != nullptr) {
+ *isLocal = IsLocal(addr);
+ }
+ return 0;
+ }
+
+ int TBusLocator::LocateKeys(TBusService service, TBusKeyVec& keys, bool onlyLocal) {
+ TGuard<TMutex> G(Lock);
+ Y_VERIFY(keys.empty(), "Non empty keys");
+
+ TServiceId serviceId = GetServiceId(service);
+ TItems::const_iterator it;
+ for (it = Items.begin(); it != Items.end(); ++it) {
+ const TItem& item = *it;
+ if (item.ServiceId != serviceId) {
+ continue;
+ }
+ if (onlyLocal && !IsLocal(item.Addr)) {
+ continue;
+ }
+ keys.push_back(std::make_pair(item.Start, item.End));
+ }
+ return (int)keys.size();
+ }
+
+ int TBusLocator::Register(TBusService service, const char* hostName, int port, TBusKey start /*= YBUS_KEYMIN*/, TBusKey end /*= YBUS_KEYMAX*/, EIpVersion requireVersion /*= EIP_VERSION_4*/, EIpVersion preferVersion /*= EIP_VERSION_ANY*/) {
+ TNetAddr addr(hostName, port, requireVersion, preferVersion); // throws
+ {
+ TGuard<TMutex> G(Lock);
+ HostAddrMap[addr] = hostName;
+ }
+ Register(service, start, end, addr);
+ return 0;
+ }
+
+ int TBusLocator::Register(TBusService service, TBusKey start, TBusKey end, const TNetworkAddress& na, EIpVersion requireVersion /*= EIP_VERSION_4*/, EIpVersion preferVersion /*= EIP_VERSION_ANY*/) {
+ TNetAddr addr(na, requireVersion, preferVersion); // throws
+ Register(service, start, end, addr);
+ return 0;
+ }
+
+ int TBusLocator::Register(TBusService service, TBusKey start, TBusKey end, const TNetAddr& addr) {
+ TGuard<TMutex> G(Lock);
+
+ TServiceId serviceId = GetServiceId(service);
+ TItems::const_iterator it;
+
+ TItem itemToReg(serviceId, start, end, addr);
+ for (it = Items.lower_bound(TItem(serviceId, 0, start, TNetAddr()));
+ it != Items.end() && it->ServiceId == serviceId;
+ ++it) {
+ const TItem& item = *it;
+ if (item == itemToReg) {
+ return 0;
+ }
+ if ((item.Start < start && start < item.End) || (item.Start < end && end < item.End)) {
+ Y_FAIL("Overlap in registered keys with non-identical range");
+ }
+ }
+
+ Items.insert(itemToReg);
+ return 0;
+ }
+
+ int TBusLocator::Unregister(TBusService service, TBusKey start, TBusKey end) {
+ TGuard<TMutex> G(Lock);
+ TServiceId serviceId = GetServiceId(service);
+ Items.erase(TItem(serviceId, start, end, TNetAddr()));
+ return 0;
+ }
+
+}