diff options
author | stanly <stanly@yandex-team.ru> | 2022-02-10 16:46:49 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:46:49 +0300 |
commit | cde218e65dfef5ce03a48d641fd8f7913cf17b2d (patch) | |
tree | d3349caea4095825a55b5ba24fe758067b29ce6f /library/cpp/messagebus | |
parent | 9f813499b4ef585cb3c2bb93de93ef003daf4fc4 (diff) | |
download | ydb-cde218e65dfef5ce03a48d641fd8f7913cf17b2d.tar.gz |
Restoring authorship annotation for <stanly@yandex-team.ru>. Commit 1 of 2.
Diffstat (limited to 'library/cpp/messagebus')
-rw-r--r-- | library/cpp/messagebus/actor/executor.cpp | 6 | ||||
-rw-r--r-- | library/cpp/messagebus/base.h | 4 | ||||
-rw-r--r-- | library/cpp/messagebus/config/netaddr.cpp | 32 | ||||
-rw-r--r-- | library/cpp/messagebus/config/netaddr.h | 28 | ||||
-rw-r--r-- | library/cpp/messagebus/coreconn.cpp | 4 | ||||
-rw-r--r-- | library/cpp/messagebus/coreconn.h | 10 | ||||
-rw-r--r-- | library/cpp/messagebus/locator.cpp | 40 | ||||
-rw-r--r-- | library/cpp/messagebus/messqueue.cpp | 2 | ||||
-rw-r--r-- | library/cpp/messagebus/netaddr.h | 6 | ||||
-rw-r--r-- | library/cpp/messagebus/rain_check/core/task.cpp | 4 | ||||
-rw-r--r-- | library/cpp/messagebus/rain_check/core/task.h | 6 | ||||
-rw-r--r-- | library/cpp/messagebus/rain_check/messagebus/messagebus_server.h | 2 | ||||
-rw-r--r-- | library/cpp/messagebus/session.cpp | 8 | ||||
-rw-r--r-- | library/cpp/messagebus/ybus.h | 4 |
14 files changed, 78 insertions, 78 deletions
diff --git a/library/cpp/messagebus/actor/executor.cpp b/library/cpp/messagebus/actor/executor.cpp index 7a2227a458..6325dd3821 100644 --- a/library/cpp/messagebus/actor/executor.cpp +++ b/library/cpp/messagebus/actor/executor.cpp @@ -233,7 +233,7 @@ void TExecutor::EnqueueWork(TArrayRef<IWorkItem* const> wis) { } } -size_t TExecutor::GetWorkQueueSize() const { +size_t TExecutor::GetWorkQueueSize() const { return WorkItems.Size(); } @@ -259,7 +259,7 @@ TString TExecutor::GetStatusSingleLine() const { return ss.Str(); } -TExecutorStatus TExecutor::GetStatusRecordInternal() const { +TExecutorStatus TExecutor::GetStatusRecordInternal() const { TExecutorStatus r; r.WorkQueueSize = GetWorkQueueSize(); @@ -279,7 +279,7 @@ TExecutorStatus TExecutor::GetStatusRecordInternal() const { return r; } -bool TExecutor::IsInExecutorThread() const { +bool TExecutor::IsInExecutorThread() const { return ThreadCurrentExecutor == this; } diff --git a/library/cpp/messagebus/base.h b/library/cpp/messagebus/base.h index 79fccc312e..7448e6ede1 100644 --- a/library/cpp/messagebus/base.h +++ b/library/cpp/messagebus/base.h @@ -1,7 +1,7 @@ #pragma once -#include <util/system/defaults.h> - +#include <util/system/defaults.h> + namespace NBus { /// millis since epoch using TBusInstant = ui64; diff --git a/library/cpp/messagebus/config/netaddr.cpp b/library/cpp/messagebus/config/netaddr.cpp index 962ac538e2..f82378c345 100644 --- a/library/cpp/messagebus/config/netaddr.cpp +++ b/library/cpp/messagebus/config/netaddr.cpp @@ -4,7 +4,7 @@ #include <cstdlib> -namespace NBus { +namespace NBus { const char* ToCString(EIpVersion ipVersion) { switch (ipVersion) { case EIP_VERSION_ANY: @@ -37,12 +37,12 @@ namespace NBus { { } }; - + static bool Compare(const IRemoteAddr& l, const IRemoteAddr& r) noexcept { if (l.Addr()->sa_family != r.Addr()->sa_family) { return false; } - + switch (l.Addr()->sa_family) { case AF_INET: { return memcmp(&(((const sockaddr_in*)l.Addr())->sin_addr), &(((const sockaddr_in*)r.Addr())->sin_addr), sizeof(in_addr)) == 0 && @@ -53,25 +53,25 @@ namespace NBus { return memcmp(&(((const sockaddr_in6*)l.Addr())->sin6_addr), &(((const sockaddr_in6*)r.Addr())->sin6_addr), sizeof(in6_addr)) == 0 && ((const sockaddr_in6*)l.Addr())->sin6_port == ((const sockaddr_in6*)r.Addr())->sin6_port; } - } - + } + return memcmp(l.Addr(), r.Addr(), Min<size_t>(l.Len(), r.Len())) == 0; - } - + } + TNetAddr::TNetAddr() : Ptr(new TOpaqueAddr) { } - + TNetAddr::TNetAddr(TAutoPtr<IRemoteAddr> addr) : Ptr(addr) { Y_VERIFY(!!Ptr); } - + namespace { using namespace NAddr; - + const char* Describe(EIpVersion version) { switch (version) { case EIP_VERSION_4: @@ -150,7 +150,7 @@ namespace NBus { socklen_t TNetAddr::Len() const { return Ptr->Len(); } - + int TNetAddr::GetPort() const { switch (Ptr->Addr()->sa_family) { case AF_INET: @@ -162,11 +162,11 @@ namespace NBus { throw 1; } } - + bool TNetAddr::IsIpv4() const { return Ptr->Addr()->sa_family == AF_INET; } - + bool TNetAddr::IsIpv6() const { return Ptr->Addr()->sa_family == AF_INET6; } @@ -177,7 +177,7 @@ namespace NBus { } -template <> +template <> void Out<NBus::TNetAddr>(IOutputStream& out, const NBus::TNetAddr& addr) { - Out<NAddr::IRemoteAddr>(out, addr); -} + Out<NAddr::IRemoteAddr>(out, addr); +} diff --git a/library/cpp/messagebus/config/netaddr.h b/library/cpp/messagebus/config/netaddr.h index b79c0cc355..2e40f1464b 100644 --- a/library/cpp/messagebus/config/netaddr.h +++ b/library/cpp/messagebus/config/netaddr.h @@ -1,22 +1,22 @@ -#pragma once - +#pragma once + #include <util/digest/numeric.h> #include <util/generic/hash.h> -#include <util/generic/ptr.h> +#include <util/generic/ptr.h> #include <util/generic/strbuf.h> -#include <util/generic/vector.h> -#include <util/network/address.h> - -namespace NBus { +#include <util/generic/vector.h> +#include <util/network/address.h> + +namespace NBus { using namespace NAddr; - + /// IP protocol version. enum EIpVersion { EIP_VERSION_4 = 1, EIP_VERSION_6 = 2, EIP_VERSION_ANY = EIP_VERSION_4 | EIP_VERSION_6, }; - + inline bool IsFamilyAllowed(ui16 sa_family, EIpVersion ipVersion) { if (ipVersion == EIP_VERSION_4 && sa_family != AF_INET) { return false; @@ -51,26 +51,26 @@ namespace NBus { inline explicit operator bool() const noexcept { return !!Ptr; } - + const sockaddr* Addr() const override; socklen_t Len() const override; - + bool IsIpv4() const; bool IsIpv6() const; int GetPort() const; - + private: TAtomicSharedPtr<IRemoteAddr> Ptr; }; using TSockAddrInVector = TVector<TNetAddr>; - + struct TNetAddrHostPortHash { inline size_t operator()(const TNetAddr& a) const { const sockaddr* s = a.Addr(); const sockaddr_in* const sa = reinterpret_cast<const sockaddr_in*>(s); const sockaddr_in6* const sa6 = reinterpret_cast<const sockaddr_in6*>(s); - + switch (s->sa_family) { case AF_INET: return CombineHashes<size_t>(ComputeHash(TStringBuf(reinterpret_cast<const char*>(&sa->sin_addr), sizeof(sa->sin_addr))), IntHashImpl(sa->sin_port)); diff --git a/library/cpp/messagebus/coreconn.cpp b/library/cpp/messagebus/coreconn.cpp index d9411bb5db..75b355d4a1 100644 --- a/library/cpp/messagebus/coreconn.cpp +++ b/library/cpp/messagebus/coreconn.cpp @@ -1,7 +1,7 @@ -#include "coreconn.h" +#include "coreconn.h" #include "remote_connection.h" - + #include <util/datetime/base.h> #include <util/generic/yexception.h> #include <util/network/socket.h> diff --git a/library/cpp/messagebus/coreconn.h b/library/cpp/messagebus/coreconn.h index fca228d82e..39e085bf31 100644 --- a/library/cpp/messagebus/coreconn.h +++ b/library/cpp/messagebus/coreconn.h @@ -4,10 +4,10 @@ /// \file /// \brief Definitions for asynchonous connection queue -#include "base.h" +#include "base.h" #include "event_loop.h" -#include "netaddr.h" - +#include "netaddr.h" + #include <util/datetime/base.h> #include <util/generic/algorithm.h> #include <util/generic/list.h> @@ -15,7 +15,7 @@ #include <util/generic/set.h> #include <util/generic/string.h> #include <util/generic/vector.h> -#include <util/network/address.h> +#include <util/network/address.h> #include <util/network/ip.h> #include <util/network/poller.h> #include <util/string/util.h> @@ -58,7 +58,7 @@ namespace NBus { exc << maxConnect; } }; - + enum EPollType { POLL_READ, POLL_WRITE diff --git a/library/cpp/messagebus/locator.cpp b/library/cpp/messagebus/locator.cpp index e38a35c426..242b9b3120 100644 --- a/library/cpp/messagebus/locator.cpp +++ b/library/cpp/messagebus/locator.cpp @@ -7,8 +7,8 @@ #include "ybus.h" #include <util/generic/hash_set.h> -#include <util/system/hostname.h> - +#include <util/system/hostname.h> + namespace NBus { using namespace NAddr; @@ -17,7 +17,7 @@ namespace NBus { case AF_INET: { return ntohs(((const sockaddr_in*)addr.Addr())->sin_port); } - + case AF_INET6: { return ntohs(((const sockaddr_in6*)addr.Addr())->sin6_port); } @@ -26,9 +26,9 @@ namespace NBus { ythrow yexception() << "not implemented"; break; } - } + } } - + static inline bool GetIp6AddressFromVector(const TVector<TNetAddr>& addrs, TNetAddr* addr) { for (size_t i = 1; i < addrs.size(); ++i) { if (addrs[i - 1].Addr()->sa_family == addrs[i].Addr()->sa_family) { @@ -38,18 +38,18 @@ namespace NBus { if (GetAddrPort(addrs[i - 1]) != GetAddrPort(addrs[i])) { return false; } - } - + } + for (size_t i = 0; i < addrs.size(); ++i) { if (addrs[i].Addr()->sa_family == AF_INET6) { *addr = addrs[i]; return true; } - } + } return false; - } - + } + EMessageStatus TBusProtocol::GetDestination(const TBusClientSession*, TBusMessage* mess, TBusLocator* locator, TNetAddr* addr) { TBusService service = GetService(); TBusKey key = GetKey(mess); @@ -61,8 +61,8 @@ namespace NBus { } else { /// lookup address/port in the locator table locator->LocateAll(service, key, addrs); - } - + } + if (addrs.size() == 0) { return MESSAGE_SERVICE_UNKNOWN; } else if (addrs.size() == 1) { @@ -73,15 +73,15 @@ namespace NBus { /// to implement custom routing strategy for your service. return MESSAGE_SERVICE_TOOMANY; } - } + } return MESSAGE_OK; - } - + } + static const sockaddr_in* SockAddrIpV4(const IRemoteAddr& a) { return (const sockaddr_in*)a.Addr(); - } - + } + static const sockaddr_in6* SockAddrIpV6(const IRemoteAddr& a) { return (const sockaddr_in6*)a.Addr(); } @@ -96,7 +96,7 @@ namespace NBus { } return false; } - + TBusLocator::TBusLocator() : MyInterfaces(GetNetworkInterfaces()) { @@ -137,7 +137,7 @@ namespace NBus { const char* c = ServiceIdSet.insert(name).first->c_str(); return (ui64)c; } - + int TBusLocator::RegisterBreak(TBusService service, const TVector<TBusKey>& starts, const TNetAddr& addr) { TGuard<TMutex> G(Lock); @@ -399,7 +399,7 @@ namespace NBus { TServiceId serviceId = GetServiceId(service); TItems::const_iterator it; - + TItem itemToReg(serviceId, start, end, addr); for (it = Items.lower_bound(TItem(serviceId, 0, start, TNetAddr())); it != Items.end() && it->ServiceId == serviceId; diff --git a/library/cpp/messagebus/messqueue.cpp b/library/cpp/messagebus/messqueue.cpp index 3474d62705..e0e3d5f296 100644 --- a/library/cpp/messagebus/messqueue.cpp +++ b/library/cpp/messagebus/messqueue.cpp @@ -78,7 +78,7 @@ bool TBusMessageQueue::IsRunning() { return AtomicGet(Running); } -TBusMessageQueueStatus TBusMessageQueue::GetStatusRecordInternal() const { +TBusMessageQueueStatus TBusMessageQueue::GetStatusRecordInternal() const { TBusMessageQueueStatus r; r.ExecutorStatus = WorkQueue->GetStatusRecordInternal(); r.Config = Config; diff --git a/library/cpp/messagebus/netaddr.h b/library/cpp/messagebus/netaddr.h index f915c8c574..f5c441b12f 100644 --- a/library/cpp/messagebus/netaddr.h +++ b/library/cpp/messagebus/netaddr.h @@ -1,4 +1,4 @@ -#pragma once - +#pragma once + #include <library/cpp/messagebus/config/netaddr.h> - + diff --git a/library/cpp/messagebus/rain_check/core/task.cpp b/library/cpp/messagebus/rain_check/core/task.cpp index a098437d53..f0ba062d8e 100644 --- a/library/cpp/messagebus/rain_check/core/task.cpp +++ b/library/cpp/messagebus/rain_check/core/task.cpp @@ -211,6 +211,6 @@ TString TTaskRunnerBase::GetStatusSingleLine() { return TypeName(*Impl); } -bool NRainCheck::AreWeInsideTask() { +bool NRainCheck::AreWeInsideTask() { return ThreadCurrentTask != nullptr; -} +} diff --git a/library/cpp/messagebus/rain_check/core/task.h b/library/cpp/messagebus/rain_check/core/task.h index 7d8778bcda..2eebd6e01c 100644 --- a/library/cpp/messagebus/rain_check/core/task.h +++ b/library/cpp/messagebus/rain_check/core/task.h @@ -178,7 +178,7 @@ namespace NRainCheck { } }; - // Check that current method executed inside some task. - bool AreWeInsideTask(); - + // Check that current method executed inside some task. + bool AreWeInsideTask(); + } diff --git a/library/cpp/messagebus/rain_check/messagebus/messagebus_server.h b/library/cpp/messagebus/rain_check/messagebus/messagebus_server.h index 1334f05fe4..429c8e34d0 100644 --- a/library/cpp/messagebus/rain_check/messagebus/messagebus_server.h +++ b/library/cpp/messagebus/rain_check/messagebus/messagebus_server.h @@ -5,7 +5,7 @@ #include <library/cpp/messagebus/ybus.h> -#include <util/system/yassert.h> +#include <util/system/yassert.h> namespace NRainCheck { class TBusTaskStarter: public NBus::IBusServerHandler { diff --git a/library/cpp/messagebus/session.cpp b/library/cpp/messagebus/session.cpp index 46a7ece6a8..78f848beb2 100644 --- a/library/cpp/messagebus/session.cpp +++ b/library/cpp/messagebus/session.cpp @@ -24,11 +24,11 @@ namespace NBus { case AF_INET6: { return memcmp(&(((const sockaddr_in6*)l.Addr())->sin6_addr), &(((const sockaddr_in6*)r.Addr())->sin6_addr), sizeof(in6_addr)); } - } - + } + return memcmp(l.Addr(), r.Addr(), Min<size_t>(l.Len(), r.Len())); - } - + } + bool operator<(const TNetAddr& a1, const TNetAddr& a2) { return CompareByHost(a1, a2) < 0; } diff --git a/library/cpp/messagebus/ybus.h b/library/cpp/messagebus/ybus.h index de21ad8521..f1a32de69d 100644 --- a/library/cpp/messagebus/ybus.h +++ b/library/cpp/messagebus/ybus.h @@ -3,7 +3,7 @@ /// Asynchronous Messaging Library implements framework for sending and /// receiving messages between loosely connected processes. -#include "coreconn.h" +#include "coreconn.h" #include "defs.h" #include "handler.h" #include "handler_impl.h" @@ -17,7 +17,7 @@ #include "session.h" #include "session_config.h" #include "socket_addr.h" - + #include <library/cpp/messagebus/actor/executor.h> #include <library/cpp/messagebus/scheduler/scheduler.h> |