aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/messagebus/locator.cpp
diff options
context:
space:
mode:
authorAnton Samokhvalov <pg83@yandex.ru>2022-02-10 16:45:15 +0300
committerDaniil Cherednik <dcherednik@yandex-team.ru>2022-02-10 16:45:15 +0300
commit72cb13b4aff9bc9cf22e49251bc8fd143f82538f (patch)
treeda2c34829458c7d4e74bdfbdf85dff449e9e7fb8 /library/cpp/messagebus/locator.cpp
parent778e51ba091dc39e7b7fcab2b9cf4dbedfb6f2b5 (diff)
downloadydb-72cb13b4aff9bc9cf22e49251bc8fd143f82538f.tar.gz
Restoring authorship annotation for Anton Samokhvalov <pg83@yandex.ru>. Commit 1 of 2.
Diffstat (limited to 'library/cpp/messagebus/locator.cpp')
-rw-r--r--library/cpp/messagebus/locator.cpp662
1 files changed, 331 insertions, 331 deletions
diff --git a/library/cpp/messagebus/locator.cpp b/library/cpp/messagebus/locator.cpp
index e38a35c426..f0f8da7cb0 100644
--- a/library/cpp/messagebus/locator.cpp
+++ b/library/cpp/messagebus/locator.cpp
@@ -1,5 +1,5 @@
////////////////////////////////////////////////////////////////////////////
-/// \file
+/// \file
/// \brief Implementation of locator service
#include "locator.h"
@@ -10,418 +10,418 @@
#include <util/system/hostname.h>
namespace NBus {
- using namespace NAddr;
-
- static TIpPort GetAddrPort(const IRemoteAddr& addr) {
- switch (addr.Addr()->sa_family) {
- case AF_INET: {
- return ntohs(((const sockaddr_in*)addr.Addr())->sin_port);
- }
-
- case AF_INET6: {
- return ntohs(((const sockaddr_in6*)addr.Addr())->sin6_port);
- }
-
- default: {
- ythrow yexception() << "not implemented";
- break;
- }
+ using namespace NAddr;
+
+ static TIpPort GetAddrPort(const IRemoteAddr& addr) {
+ switch (addr.Addr()->sa_family) {
+ case AF_INET: {
+ return ntohs(((const sockaddr_in*)addr.Addr())->sin_port);
+ }
+
+ case AF_INET6: {
+ return ntohs(((const sockaddr_in6*)addr.Addr())->sin6_port);
+ }
+
+ default: {
+ ythrow yexception() << "not implemented";
+ break;
+ }
}
- }
-
- static inline bool GetIp6AddressFromVector(const TVector<TNetAddr>& addrs, TNetAddr* addr) {
- for (size_t i = 1; i < addrs.size(); ++i) {
- if (addrs[i - 1].Addr()->sa_family == addrs[i].Addr()->sa_family) {
- return false;
- }
-
- if (GetAddrPort(addrs[i - 1]) != GetAddrPort(addrs[i])) {
- return false;
- }
+ }
+
+ static inline bool GetIp6AddressFromVector(const TVector<TNetAddr>& addrs, TNetAddr* addr) {
+ for (size_t i = 1; i < addrs.size(); ++i) {
+ if (addrs[i - 1].Addr()->sa_family == addrs[i].Addr()->sa_family) {
+ return false;
+ }
+
+ if (GetAddrPort(addrs[i - 1]) != GetAddrPort(addrs[i])) {
+ return false;
+ }
}
- for (size_t i = 0; i < addrs.size(); ++i) {
- if (addrs[i].Addr()->sa_family == AF_INET6) {
- *addr = addrs[i];
- return true;
- }
+ for (size_t i = 0; i < addrs.size(); ++i) {
+ if (addrs[i].Addr()->sa_family == AF_INET6) {
+ *addr = addrs[i];
+ return true;
+ }
}
-
- return false;
+
+ return false;
}
- EMessageStatus TBusProtocol::GetDestination(const TBusClientSession*, TBusMessage* mess, TBusLocator* locator, TNetAddr* addr) {
- TBusService service = GetService();
- TBusKey key = GetKey(mess);
- TVector<TNetAddr> addrs;
-
- /// check for special local key
- if (key == YBUS_KEYLOCAL) {
- locator->GetLocalAddresses(service, addrs);
- } else {
- /// lookup address/port in the locator table
- locator->LocateAll(service, key, addrs);
+ EMessageStatus TBusProtocol::GetDestination(const TBusClientSession*, TBusMessage* mess, TBusLocator* locator, TNetAddr* addr) {
+ TBusService service = GetService();
+ TBusKey key = GetKey(mess);
+ TVector<TNetAddr> addrs;
+
+ /// check for special local key
+ if (key == YBUS_KEYLOCAL) {
+ locator->GetLocalAddresses(service, addrs);
+ } else {
+ /// lookup address/port in the locator table
+ locator->LocateAll(service, key, addrs);
}
- if (addrs.size() == 0) {
- return MESSAGE_SERVICE_UNKNOWN;
- } else if (addrs.size() == 1) {
- *addr = addrs[0];
- } else {
- if (!GetIp6AddressFromVector(addrs, addr)) {
- /// default policy can't make choice for you here, overide GetDestination() function
- /// to implement custom routing strategy for your service.
- return MESSAGE_SERVICE_TOOMANY;
- }
+ if (addrs.size() == 0) {
+ return MESSAGE_SERVICE_UNKNOWN;
+ } else if (addrs.size() == 1) {
+ *addr = addrs[0];
+ } else {
+ if (!GetIp6AddressFromVector(addrs, addr)) {
+ /// default policy can't make choice for you here, overide GetDestination() function
+ /// to implement custom routing strategy for your service.
+ return MESSAGE_SERVICE_TOOMANY;
+ }
}
-
- return MESSAGE_OK;
+
+ return MESSAGE_OK;
}
- static const sockaddr_in* SockAddrIpV4(const IRemoteAddr& a) {
- return (const sockaddr_in*)a.Addr();
+ static const sockaddr_in* SockAddrIpV4(const IRemoteAddr& a) {
+ return (const sockaddr_in*)a.Addr();
}
- static const sockaddr_in6* SockAddrIpV6(const IRemoteAddr& a) {
- return (const sockaddr_in6*)a.Addr();
+ static const sockaddr_in6* SockAddrIpV6(const IRemoteAddr& a) {
+ return (const sockaddr_in6*)a.Addr();
}
- static bool IsAddressEqual(const IRemoteAddr& a1, const IRemoteAddr& a2) {
- if (a1.Addr()->sa_family == a2.Addr()->sa_family) {
- if (a1.Addr()->sa_family == AF_INET) {
- return memcmp(&SockAddrIpV4(a1)->sin_addr, &SockAddrIpV4(a2)->sin_addr, sizeof(in_addr)) == 0;
- } else {
- return memcmp(&SockAddrIpV6(a1)->sin6_addr, &SockAddrIpV6(a2)->sin6_addr, sizeof(in6_addr)) == 0;
- }
+ static bool IsAddressEqual(const IRemoteAddr& a1, const IRemoteAddr& a2) {
+ if (a1.Addr()->sa_family == a2.Addr()->sa_family) {
+ if (a1.Addr()->sa_family == AF_INET) {
+ return memcmp(&SockAddrIpV4(a1)->sin_addr, &SockAddrIpV4(a2)->sin_addr, sizeof(in_addr)) == 0;
+ } else {
+ return memcmp(&SockAddrIpV6(a1)->sin6_addr, &SockAddrIpV6(a2)->sin6_addr, sizeof(in6_addr)) == 0;
+ }
}
- return false;
+ return false;
}
- TBusLocator::TBusLocator()
- : MyInterfaces(GetNetworkInterfaces())
- {
- }
+ TBusLocator::TBusLocator()
+ : MyInterfaces(GetNetworkInterfaces())
+ {
+ }
- bool TBusLocator::TItem::operator<(const TItem& y) const {
- const TItem& x = *this;
+ bool TBusLocator::TItem::operator<(const TItem& y) const {
+ const TItem& x = *this;
- if (x.ServiceId == y.ServiceId) {
- return (x.End < y.End) || ((x.End == y.End) && CompareByHost(x.Addr, y.Addr) < 0);
+ if (x.ServiceId == y.ServiceId) {
+ return (x.End < y.End) || ((x.End == y.End) && CompareByHost(x.Addr, y.Addr) < 0);
}
- return x.ServiceId < y.ServiceId;
+ return x.ServiceId < y.ServiceId;
}
- bool TBusLocator::TItem::operator==(const TItem& y) const {
- return ServiceId == y.ServiceId && Start == y.Start && End == y.End && Addr == y.Addr;
- }
+ bool TBusLocator::TItem::operator==(const TItem& y) const {
+ return ServiceId == y.ServiceId && Start == y.Start && End == y.End && Addr == y.Addr;
+ }
- TBusLocator::TItem::TItem(TServiceId serviceId, TBusKey start, TBusKey end, const TNetAddr& addr)
- : ServiceId(serviceId)
- , Start(start)
- , End(end)
- , Addr(addr)
- {
+ TBusLocator::TItem::TItem(TServiceId serviceId, TBusKey start, TBusKey end, const TNetAddr& addr)
+ : ServiceId(serviceId)
+ , Start(start)
+ , End(end)
+ , Addr(addr)
+ {
}
- bool TBusLocator::IsLocal(const TNetAddr& addr) {
- for (const auto& myInterface : MyInterfaces) {
- if (IsAddressEqual(addr, *myInterface.Address)) {
- return true;
- }
- }
+ bool TBusLocator::IsLocal(const TNetAddr& addr) {
+ for (const auto& myInterface : MyInterfaces) {
+ if (IsAddressEqual(addr, *myInterface.Address)) {
+ return true;
+ }
+ }
- return false;
- }
+ return false;
+ }
- TBusLocator::TServiceId TBusLocator::GetServiceId(const char* name) {
- const char* c = ServiceIdSet.insert(name).first->c_str();
- return (ui64)c;
+ TBusLocator::TServiceId TBusLocator::GetServiceId(const char* name) {
+ const char* c = ServiceIdSet.insert(name).first->c_str();
+ return (ui64)c;
}
- int TBusLocator::RegisterBreak(TBusService service, const TVector<TBusKey>& starts, const TNetAddr& addr) {
- TGuard<TMutex> G(Lock);
+ int TBusLocator::RegisterBreak(TBusService service, const TVector<TBusKey>& starts, const TNetAddr& addr) {
+ TGuard<TMutex> G(Lock);
- TServiceId serviceId = GetServiceId(service);
- for (size_t i = 0; i < starts.size(); ++i) {
- RegisterBreak(serviceId, starts[i], addr);
- }
- return 0;
+ TServiceId serviceId = GetServiceId(service);
+ for (size_t i = 0; i < starts.size(); ++i) {
+ RegisterBreak(serviceId, starts[i], addr);
+ }
+ return 0;
}
- int TBusLocator::RegisterBreak(TServiceId serviceId, const TBusKey start, const TNetAddr& addr) {
- TItems::const_iterator it = Items.lower_bound(TItem(serviceId, 0, start, addr));
- TItems::const_iterator service_it =
+ int TBusLocator::RegisterBreak(TServiceId serviceId, const TBusKey start, const TNetAddr& addr) {
+ TItems::const_iterator it = Items.lower_bound(TItem(serviceId, 0, start, addr));
+ TItems::const_iterator service_it =
Items.lower_bound(TItem(serviceId, 0, 0, TNetAddr()));
- THolder<TItem> left;
- THolder<TItem> right;
- if ((it != Items.end() || Items.begin() != Items.end()) && service_it != Items.end() && service_it->ServiceId == serviceId) {
- if (it == Items.end()) {
- --it;
- }
- const TItem& item = *it;
- left.Reset(new TItem(serviceId, item.Start,
- Max<TBusKey>(item.Start, start - 1), item.Addr));
- right.Reset(new TItem(serviceId, start, item.End, addr));
- Items.erase(*it);
- } else {
- left.Reset(new TItem(serviceId, YBUS_KEYMIN, start, addr));
- if (start < YBUS_KEYMAX) {
- right.Reset(new TItem(serviceId, start + 1, YBUS_KEYMAX, addr));
- }
+ THolder<TItem> left;
+ THolder<TItem> right;
+ if ((it != Items.end() || Items.begin() != Items.end()) && service_it != Items.end() && service_it->ServiceId == serviceId) {
+ if (it == Items.end()) {
+ --it;
+ }
+ const TItem& item = *it;
+ left.Reset(new TItem(serviceId, item.Start,
+ Max<TBusKey>(item.Start, start - 1), item.Addr));
+ right.Reset(new TItem(serviceId, start, item.End, addr));
+ Items.erase(*it);
+ } else {
+ left.Reset(new TItem(serviceId, YBUS_KEYMIN, start, addr));
+ if (start < YBUS_KEYMAX) {
+ right.Reset(new TItem(serviceId, start + 1, YBUS_KEYMAX, addr));
+ }
}
- Items.insert(*left);
- Items.insert(*right);
- NormalizeBreaks(serviceId);
- return 0;
- }
-
- int TBusLocator::UnregisterBreak(TBusService service, const TNetAddr& addr) {
- TGuard<TMutex> G(Lock);
-
- TServiceId serviceId = GetServiceId(service);
- return UnregisterBreak(serviceId, addr);
+ Items.insert(*left);
+ Items.insert(*right);
+ NormalizeBreaks(serviceId);
+ return 0;
}
- int TBusLocator::UnregisterBreak(TServiceId serviceId, const TNetAddr& addr) {
- int deleted = 0;
- TItems::iterator it = Items.begin();
- while (it != Items.end()) {
- const TItem& item = *it;
- if (item.ServiceId != serviceId) {
- ++it;
- continue;
- }
- TItems::iterator itErase = it++;
- if (item.ServiceId == serviceId && item.Addr == addr) {
- Items.erase(itErase);
- deleted += 1;
- }
+ int TBusLocator::UnregisterBreak(TBusService service, const TNetAddr& addr) {
+ TGuard<TMutex> G(Lock);
+
+ TServiceId serviceId = GetServiceId(service);
+ return UnregisterBreak(serviceId, addr);
+ }
+
+ int TBusLocator::UnregisterBreak(TServiceId serviceId, const TNetAddr& addr) {
+ int deleted = 0;
+ TItems::iterator it = Items.begin();
+ while (it != Items.end()) {
+ const TItem& item = *it;
+ if (item.ServiceId != serviceId) {
+ ++it;
+ continue;
+ }
+ TItems::iterator itErase = it++;
+ if (item.ServiceId == serviceId && item.Addr == addr) {
+ Items.erase(itErase);
+ deleted += 1;
+ }
+ }
+
+ if (Items.begin() == Items.end()) {
+ return deleted;
+ }
+ TBusKey keyItem = YBUS_KEYMAX;
+ it = Items.end();
+ TItems::iterator first = it;
+ do {
+ --it;
+ // item.Start is not used in set comparison function
+ // so you can't violate set sort order by changing it
+ // hence const_cast()
+ TItem& item = const_cast<TItem&>(*it);
+ if (item.ServiceId != serviceId) {
+ continue;
+ }
+ first = it;
+ if (item.End < keyItem) {
+ item.End = keyItem;
+ }
+ keyItem = item.Start - 1;
+ } while (it != Items.begin());
+
+ if (first != Items.end() && first->Start != 0) {
+ TItem item(serviceId, YBUS_KEYMIN, first->Start - 1, first->Addr);
+ Items.insert(item);
}
- if (Items.begin() == Items.end()) {
- return deleted;
- }
- TBusKey keyItem = YBUS_KEYMAX;
- it = Items.end();
- TItems::iterator first = it;
- do {
- --it;
- // item.Start is not used in set comparison function
- // so you can't violate set sort order by changing it
- // hence const_cast()
- TItem& item = const_cast<TItem&>(*it);
- if (item.ServiceId != serviceId) {
- continue;
- }
- first = it;
- if (item.End < keyItem) {
- item.End = keyItem;
- }
- keyItem = item.Start - 1;
- } while (it != Items.begin());
-
- if (first != Items.end() && first->Start != 0) {
- TItem item(serviceId, YBUS_KEYMIN, first->Start - 1, first->Addr);
- Items.insert(item);
- }
-
- NormalizeBreaks(serviceId);
+ NormalizeBreaks(serviceId);
return deleted;
}
- void TBusLocator::NormalizeBreaks(TServiceId serviceId) {
- TItems::const_iterator first = Items.lower_bound(TItem(serviceId, YBUS_KEYMIN, YBUS_KEYMIN, TNetAddr()));
- TItems::const_iterator last = Items.end();
+ void TBusLocator::NormalizeBreaks(TServiceId serviceId) {
+ TItems::const_iterator first = Items.lower_bound(TItem(serviceId, YBUS_KEYMIN, YBUS_KEYMIN, TNetAddr()));
+ TItems::const_iterator last = Items.end();
- if ((Items.end() != first) && (first->ServiceId == serviceId)) {
- if (serviceId != Max<TServiceId>()) {
- last = Items.lower_bound(TItem(serviceId + 1, YBUS_KEYMIN, YBUS_KEYMIN, TNetAddr()));
- }
+ if ((Items.end() != first) && (first->ServiceId == serviceId)) {
+ if (serviceId != Max<TServiceId>()) {
+ last = Items.lower_bound(TItem(serviceId + 1, YBUS_KEYMIN, YBUS_KEYMIN, TNetAddr()));
+ }
- --last;
- Y_ASSERT(Items.end() != last);
- Y_ASSERT(last->ServiceId == serviceId);
+ --last;
+ Y_ASSERT(Items.end() != last);
+ Y_ASSERT(last->ServiceId == serviceId);
- TItem& beg = const_cast<TItem&>(*first);
- beg.Addr = last->Addr;
+ TItem& beg = const_cast<TItem&>(*first);
+ beg.Addr = last->Addr;
}
}
- int TBusLocator::LocateAll(TBusService service, TBusKey key, TVector<TNetAddr>& addrs) {
- TGuard<TMutex> G(Lock);
- Y_VERIFY(addrs.empty(), "Non emtpy addresses");
+ int TBusLocator::LocateAll(TBusService service, TBusKey key, TVector<TNetAddr>& addrs) {
+ TGuard<TMutex> G(Lock);
+ Y_VERIFY(addrs.empty(), "Non emtpy addresses");
- TServiceId serviceId = GetServiceId(service);
- TItems::const_iterator it;
+ TServiceId serviceId = GetServiceId(service);
+ TItems::const_iterator it;
- for (it = Items.lower_bound(TItem(serviceId, 0, key, TNetAddr()));
- it != Items.end() && it->ServiceId == serviceId && it->Start <= key && key <= it->End;
- ++it) {
- const TItem& item = *it;
- addrs.push_back(item.Addr);
- }
+ for (it = Items.lower_bound(TItem(serviceId, 0, key, TNetAddr()));
+ it != Items.end() && it->ServiceId == serviceId && it->Start <= key && key <= it->End;
+ ++it) {
+ const TItem& item = *it;
+ addrs.push_back(item.Addr);
+ }
- if (addrs.size() == 0) {
- return -1;
- }
- return (int)addrs.size();
+ if (addrs.size() == 0) {
+ return -1;
+ }
+ return (int)addrs.size();
}
+
+ int TBusLocator::Locate(TBusService service, TBusKey key, TNetAddr* addr) {
+ TGuard<TMutex> G(Lock);
- int TBusLocator::Locate(TBusService service, TBusKey key, TNetAddr* addr) {
- TGuard<TMutex> G(Lock);
+ TServiceId serviceId = GetServiceId(service);
+ TItems::const_iterator it;
- TServiceId serviceId = GetServiceId(service);
- TItems::const_iterator it;
+ it = Items.lower_bound(TItem(serviceId, 0, key, TNetAddr()));
- it = Items.lower_bound(TItem(serviceId, 0, key, TNetAddr()));
+ if (it != Items.end()) {
+ const TItem& item = *it;
+ if (item.ServiceId == serviceId && item.Start <= key && key < item.End) {
+ *addr = item.Addr;
- if (it != Items.end()) {
- const TItem& item = *it;
- if (item.ServiceId == serviceId && item.Start <= key && key < item.End) {
- *addr = item.Addr;
+ return 0;
+ }
+ }
+
+ return -1;
+ }
- return 0;
- }
+ int TBusLocator::GetLocalPort(TBusService service) {
+ TGuard<TMutex> G(Lock);
+ TServiceId serviceId = GetServiceId(service);
+ TItems::const_iterator it;
+ int port = 0;
+
+ for (it = Items.lower_bound(TItem(serviceId, 0, 0, TNetAddr())); it != Items.end(); ++it) {
+ const TItem& item = *it;
+ if (item.ServiceId != serviceId) {
+ break;
+ }
+
+ if (IsLocal(item.Addr)) {
+ if (port != 0 && port != GetAddrPort(item.Addr)) {
+ Y_ASSERT(0 && "Can't decide which port to use.");
+ return 0;
+ }
+ port = GetAddrPort(item.Addr);
+ }
}
- return -1;
+ return port;
}
- int TBusLocator::GetLocalPort(TBusService service) {
- TGuard<TMutex> G(Lock);
- TServiceId serviceId = GetServiceId(service);
- TItems::const_iterator it;
- int port = 0;
-
- for (it = Items.lower_bound(TItem(serviceId, 0, 0, TNetAddr())); it != Items.end(); ++it) {
- const TItem& item = *it;
- if (item.ServiceId != serviceId) {
- break;
- }
-
- if (IsLocal(item.Addr)) {
- if (port != 0 && port != GetAddrPort(item.Addr)) {
- Y_ASSERT(0 && "Can't decide which port to use.");
- return 0;
- }
- port = GetAddrPort(item.Addr);
- }
- }
+ int TBusLocator::GetLocalAddresses(TBusService service, TVector<TNetAddr>& addrs) {
+ TGuard<TMutex> G(Lock);
+ TServiceId serviceId = GetServiceId(service);
+ TItems::const_iterator it;
- return port;
- }
+ for (it = Items.lower_bound(TItem(serviceId, 0, 0, TNetAddr())); it != Items.end(); ++it) {
+ const TItem& item = *it;
+ if (item.ServiceId != serviceId) {
+ break;
+ }
- int TBusLocator::GetLocalAddresses(TBusService service, TVector<TNetAddr>& addrs) {
- TGuard<TMutex> G(Lock);
- TServiceId serviceId = GetServiceId(service);
- TItems::const_iterator it;
-
- for (it = Items.lower_bound(TItem(serviceId, 0, 0, TNetAddr())); it != Items.end(); ++it) {
- const TItem& item = *it;
- if (item.ServiceId != serviceId) {
- break;
- }
-
- if (IsLocal(item.Addr)) {
- addrs.push_back(item.Addr);
- }
+ if (IsLocal(item.Addr)) {
+ addrs.push_back(item.Addr);
+ }
}
- if (addrs.size() == 0) {
- return -1;
+ if (addrs.size() == 0) {
+ return -1;
}
- return (int)addrs.size();
+ return (int)addrs.size();
}
- int TBusLocator::LocateHost(TBusService service, TBusKey key, TString* host, int* port, bool* isLocal) {
- int ret;
- TNetAddr addr;
- ret = Locate(service, key, &addr);
- if (ret != 0) {
- return ret;
- }
-
- {
- TGuard<TMutex> G(Lock);
- THostAddrMap::const_iterator it = HostAddrMap.find(addr);
- if (it == HostAddrMap.end()) {
- return -1;
- }
- *host = it->second;
- }
-
- *port = GetAddrPort(addr);
- if (isLocal != nullptr) {
- *isLocal = IsLocal(addr);
- }
- return 0;
+ int TBusLocator::LocateHost(TBusService service, TBusKey key, TString* host, int* port, bool* isLocal) {
+ int ret;
+ TNetAddr addr;
+ ret = Locate(service, key, &addr);
+ if (ret != 0) {
+ return ret;
+ }
+
+ {
+ TGuard<TMutex> G(Lock);
+ THostAddrMap::const_iterator it = HostAddrMap.find(addr);
+ if (it == HostAddrMap.end()) {
+ return -1;
+ }
+ *host = it->second;
+ }
+
+ *port = GetAddrPort(addr);
+ if (isLocal != nullptr) {
+ *isLocal = IsLocal(addr);
+ }
+ return 0;
}
- int TBusLocator::LocateKeys(TBusService service, TBusKeyVec& keys, bool onlyLocal) {
+ int TBusLocator::LocateKeys(TBusService service, TBusKeyVec& keys, bool onlyLocal) {
TGuard<TMutex> G(Lock);
- Y_VERIFY(keys.empty(), "Non empty keys");
-
- TServiceId serviceId = GetServiceId(service);
- TItems::const_iterator it;
- for (it = Items.begin(); it != Items.end(); ++it) {
- const TItem& item = *it;
- if (item.ServiceId != serviceId) {
- continue;
- }
- if (onlyLocal && !IsLocal(item.Addr)) {
- continue;
- }
- keys.push_back(std::make_pair(item.Start, item.End));
+ Y_VERIFY(keys.empty(), "Non empty keys");
+
+ TServiceId serviceId = GetServiceId(service);
+ TItems::const_iterator it;
+ for (it = Items.begin(); it != Items.end(); ++it) {
+ const TItem& item = *it;
+ if (item.ServiceId != serviceId) {
+ continue;
+ }
+ if (onlyLocal && !IsLocal(item.Addr)) {
+ continue;
+ }
+ keys.push_back(std::make_pair(item.Start, item.End));
}
- return (int)keys.size();
+ return (int)keys.size();
}
- int TBusLocator::Register(TBusService service, const char* hostName, int port, TBusKey start /*= YBUS_KEYMIN*/, TBusKey end /*= YBUS_KEYMAX*/, EIpVersion requireVersion /*= EIP_VERSION_4*/, EIpVersion preferVersion /*= EIP_VERSION_ANY*/) {
- TNetAddr addr(hostName, port, requireVersion, preferVersion); // throws
- {
- TGuard<TMutex> G(Lock);
- HostAddrMap[addr] = hostName;
- }
- Register(service, start, end, addr);
- return 0;
+ int TBusLocator::Register(TBusService service, const char* hostName, int port, TBusKey start /*= YBUS_KEYMIN*/, TBusKey end /*= YBUS_KEYMAX*/, EIpVersion requireVersion /*= EIP_VERSION_4*/, EIpVersion preferVersion /*= EIP_VERSION_ANY*/) {
+ TNetAddr addr(hostName, port, requireVersion, preferVersion); // throws
+ {
+ TGuard<TMutex> G(Lock);
+ HostAddrMap[addr] = hostName;
+ }
+ Register(service, start, end, addr);
+ return 0;
}
- int TBusLocator::Register(TBusService service, TBusKey start, TBusKey end, const TNetworkAddress& na, EIpVersion requireVersion /*= EIP_VERSION_4*/, EIpVersion preferVersion /*= EIP_VERSION_ANY*/) {
- TNetAddr addr(na, requireVersion, preferVersion); // throws
- Register(service, start, end, addr);
- return 0;
+ int TBusLocator::Register(TBusService service, TBusKey start, TBusKey end, const TNetworkAddress& na, EIpVersion requireVersion /*= EIP_VERSION_4*/, EIpVersion preferVersion /*= EIP_VERSION_ANY*/) {
+ TNetAddr addr(na, requireVersion, preferVersion); // throws
+ Register(service, start, end, addr);
+ return 0;
}
- int TBusLocator::Register(TBusService service, TBusKey start, TBusKey end, const TNetAddr& addr) {
+ int TBusLocator::Register(TBusService service, TBusKey start, TBusKey end, const TNetAddr& addr) {
TGuard<TMutex> G(Lock);
- TServiceId serviceId = GetServiceId(service);
- TItems::const_iterator it;
-
- TItem itemToReg(serviceId, start, end, addr);
- for (it = Items.lower_bound(TItem(serviceId, 0, start, TNetAddr()));
- it != Items.end() && it->ServiceId == serviceId;
- ++it) {
- const TItem& item = *it;
- if (item == itemToReg) {
- return 0;
- }
- if ((item.Start < start && start < item.End) || (item.Start < end && end < item.End)) {
- Y_FAIL("Overlap in registered keys with non-identical range");
- }
- }
-
- Items.insert(itemToReg);
- return 0;
- }
-
- int TBusLocator::Unregister(TBusService service, TBusKey start, TBusKey end) {
- TGuard<TMutex> G(Lock);
- TServiceId serviceId = GetServiceId(service);
- Items.erase(TItem(serviceId, start, end, TNetAddr()));
- return 0;
+ TServiceId serviceId = GetServiceId(service);
+ TItems::const_iterator it;
+
+ TItem itemToReg(serviceId, start, end, addr);
+ for (it = Items.lower_bound(TItem(serviceId, 0, start, TNetAddr()));
+ it != Items.end() && it->ServiceId == serviceId;
+ ++it) {
+ const TItem& item = *it;
+ if (item == itemToReg) {
+ return 0;
+ }
+ if ((item.Start < start && start < item.End) || (item.Start < end && end < item.End)) {
+ Y_FAIL("Overlap in registered keys with non-identical range");
+ }
+ }
+
+ Items.insert(itemToReg);
+ return 0;
+ }
+
+ int TBusLocator::Unregister(TBusService service, TBusKey start, TBusKey end) {
+ TGuard<TMutex> G(Lock);
+ TServiceId serviceId = GetServiceId(service);
+ Items.erase(TItem(serviceId, start, end, TNetAddr()));
+ return 0;
}
}