aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/messagebus
diff options
context:
space:
mode:
authorstanly <stanly@yandex-team.ru>2022-02-10 16:46:49 +0300
committerDaniil Cherednik <dcherednik@yandex-team.ru>2022-02-10 16:46:49 +0300
commitcde218e65dfef5ce03a48d641fd8f7913cf17b2d (patch)
treed3349caea4095825a55b5ba24fe758067b29ce6f /library/cpp/messagebus
parent9f813499b4ef585cb3c2bb93de93ef003daf4fc4 (diff)
downloadydb-cde218e65dfef5ce03a48d641fd8f7913cf17b2d.tar.gz
Restoring authorship annotation for <stanly@yandex-team.ru>. Commit 1 of 2.
Diffstat (limited to 'library/cpp/messagebus')
-rw-r--r--library/cpp/messagebus/actor/executor.cpp6
-rw-r--r--library/cpp/messagebus/base.h4
-rw-r--r--library/cpp/messagebus/config/netaddr.cpp32
-rw-r--r--library/cpp/messagebus/config/netaddr.h28
-rw-r--r--library/cpp/messagebus/coreconn.cpp4
-rw-r--r--library/cpp/messagebus/coreconn.h10
-rw-r--r--library/cpp/messagebus/locator.cpp40
-rw-r--r--library/cpp/messagebus/messqueue.cpp2
-rw-r--r--library/cpp/messagebus/netaddr.h6
-rw-r--r--library/cpp/messagebus/rain_check/core/task.cpp4
-rw-r--r--library/cpp/messagebus/rain_check/core/task.h6
-rw-r--r--library/cpp/messagebus/rain_check/messagebus/messagebus_server.h2
-rw-r--r--library/cpp/messagebus/session.cpp8
-rw-r--r--library/cpp/messagebus/ybus.h4
14 files changed, 78 insertions, 78 deletions
diff --git a/library/cpp/messagebus/actor/executor.cpp b/library/cpp/messagebus/actor/executor.cpp
index 7a2227a458..6325dd3821 100644
--- a/library/cpp/messagebus/actor/executor.cpp
+++ b/library/cpp/messagebus/actor/executor.cpp
@@ -233,7 +233,7 @@ void TExecutor::EnqueueWork(TArrayRef<IWorkItem* const> wis) {
}
}
-size_t TExecutor::GetWorkQueueSize() const {
+size_t TExecutor::GetWorkQueueSize() const {
return WorkItems.Size();
}
@@ -259,7 +259,7 @@ TString TExecutor::GetStatusSingleLine() const {
return ss.Str();
}
-TExecutorStatus TExecutor::GetStatusRecordInternal() const {
+TExecutorStatus TExecutor::GetStatusRecordInternal() const {
TExecutorStatus r;
r.WorkQueueSize = GetWorkQueueSize();
@@ -279,7 +279,7 @@ TExecutorStatus TExecutor::GetStatusRecordInternal() const {
return r;
}
-bool TExecutor::IsInExecutorThread() const {
+bool TExecutor::IsInExecutorThread() const {
return ThreadCurrentExecutor == this;
}
diff --git a/library/cpp/messagebus/base.h b/library/cpp/messagebus/base.h
index 79fccc312e..7448e6ede1 100644
--- a/library/cpp/messagebus/base.h
+++ b/library/cpp/messagebus/base.h
@@ -1,7 +1,7 @@
#pragma once
-#include <util/system/defaults.h>
-
+#include <util/system/defaults.h>
+
namespace NBus {
/// millis since epoch
using TBusInstant = ui64;
diff --git a/library/cpp/messagebus/config/netaddr.cpp b/library/cpp/messagebus/config/netaddr.cpp
index 962ac538e2..f82378c345 100644
--- a/library/cpp/messagebus/config/netaddr.cpp
+++ b/library/cpp/messagebus/config/netaddr.cpp
@@ -4,7 +4,7 @@
#include <cstdlib>
-namespace NBus {
+namespace NBus {
const char* ToCString(EIpVersion ipVersion) {
switch (ipVersion) {
case EIP_VERSION_ANY:
@@ -37,12 +37,12 @@ namespace NBus {
{
}
};
-
+
static bool Compare(const IRemoteAddr& l, const IRemoteAddr& r) noexcept {
if (l.Addr()->sa_family != r.Addr()->sa_family) {
return false;
}
-
+
switch (l.Addr()->sa_family) {
case AF_INET: {
return memcmp(&(((const sockaddr_in*)l.Addr())->sin_addr), &(((const sockaddr_in*)r.Addr())->sin_addr), sizeof(in_addr)) == 0 &&
@@ -53,25 +53,25 @@ namespace NBus {
return memcmp(&(((const sockaddr_in6*)l.Addr())->sin6_addr), &(((const sockaddr_in6*)r.Addr())->sin6_addr), sizeof(in6_addr)) == 0 &&
((const sockaddr_in6*)l.Addr())->sin6_port == ((const sockaddr_in6*)r.Addr())->sin6_port;
}
- }
-
+ }
+
return memcmp(l.Addr(), r.Addr(), Min<size_t>(l.Len(), r.Len())) == 0;
- }
-
+ }
+
TNetAddr::TNetAddr()
: Ptr(new TOpaqueAddr)
{
}
-
+
TNetAddr::TNetAddr(TAutoPtr<IRemoteAddr> addr)
: Ptr(addr)
{
Y_VERIFY(!!Ptr);
}
-
+
namespace {
using namespace NAddr;
-
+
const char* Describe(EIpVersion version) {
switch (version) {
case EIP_VERSION_4:
@@ -150,7 +150,7 @@ namespace NBus {
socklen_t TNetAddr::Len() const {
return Ptr->Len();
}
-
+
int TNetAddr::GetPort() const {
switch (Ptr->Addr()->sa_family) {
case AF_INET:
@@ -162,11 +162,11 @@ namespace NBus {
throw 1;
}
}
-
+
bool TNetAddr::IsIpv4() const {
return Ptr->Addr()->sa_family == AF_INET;
}
-
+
bool TNetAddr::IsIpv6() const {
return Ptr->Addr()->sa_family == AF_INET6;
}
@@ -177,7 +177,7 @@ namespace NBus {
}
-template <>
+template <>
void Out<NBus::TNetAddr>(IOutputStream& out, const NBus::TNetAddr& addr) {
- Out<NAddr::IRemoteAddr>(out, addr);
-}
+ Out<NAddr::IRemoteAddr>(out, addr);
+}
diff --git a/library/cpp/messagebus/config/netaddr.h b/library/cpp/messagebus/config/netaddr.h
index b79c0cc355..2e40f1464b 100644
--- a/library/cpp/messagebus/config/netaddr.h
+++ b/library/cpp/messagebus/config/netaddr.h
@@ -1,22 +1,22 @@
-#pragma once
-
+#pragma once
+
#include <util/digest/numeric.h>
#include <util/generic/hash.h>
-#include <util/generic/ptr.h>
+#include <util/generic/ptr.h>
#include <util/generic/strbuf.h>
-#include <util/generic/vector.h>
-#include <util/network/address.h>
-
-namespace NBus {
+#include <util/generic/vector.h>
+#include <util/network/address.h>
+
+namespace NBus {
using namespace NAddr;
-
+
/// IP protocol version.
enum EIpVersion {
EIP_VERSION_4 = 1,
EIP_VERSION_6 = 2,
EIP_VERSION_ANY = EIP_VERSION_4 | EIP_VERSION_6,
};
-
+
inline bool IsFamilyAllowed(ui16 sa_family, EIpVersion ipVersion) {
if (ipVersion == EIP_VERSION_4 && sa_family != AF_INET) {
return false;
@@ -51,26 +51,26 @@ namespace NBus {
inline explicit operator bool() const noexcept {
return !!Ptr;
}
-
+
const sockaddr* Addr() const override;
socklen_t Len() const override;
-
+
bool IsIpv4() const;
bool IsIpv6() const;
int GetPort() const;
-
+
private:
TAtomicSharedPtr<IRemoteAddr> Ptr;
};
using TSockAddrInVector = TVector<TNetAddr>;
-
+
struct TNetAddrHostPortHash {
inline size_t operator()(const TNetAddr& a) const {
const sockaddr* s = a.Addr();
const sockaddr_in* const sa = reinterpret_cast<const sockaddr_in*>(s);
const sockaddr_in6* const sa6 = reinterpret_cast<const sockaddr_in6*>(s);
-
+
switch (s->sa_family) {
case AF_INET:
return CombineHashes<size_t>(ComputeHash(TStringBuf(reinterpret_cast<const char*>(&sa->sin_addr), sizeof(sa->sin_addr))), IntHashImpl(sa->sin_port));
diff --git a/library/cpp/messagebus/coreconn.cpp b/library/cpp/messagebus/coreconn.cpp
index d9411bb5db..75b355d4a1 100644
--- a/library/cpp/messagebus/coreconn.cpp
+++ b/library/cpp/messagebus/coreconn.cpp
@@ -1,7 +1,7 @@
-#include "coreconn.h"
+#include "coreconn.h"
#include "remote_connection.h"
-
+
#include <util/datetime/base.h>
#include <util/generic/yexception.h>
#include <util/network/socket.h>
diff --git a/library/cpp/messagebus/coreconn.h b/library/cpp/messagebus/coreconn.h
index fca228d82e..39e085bf31 100644
--- a/library/cpp/messagebus/coreconn.h
+++ b/library/cpp/messagebus/coreconn.h
@@ -4,10 +4,10 @@
/// \file
/// \brief Definitions for asynchonous connection queue
-#include "base.h"
+#include "base.h"
#include "event_loop.h"
-#include "netaddr.h"
-
+#include "netaddr.h"
+
#include <util/datetime/base.h>
#include <util/generic/algorithm.h>
#include <util/generic/list.h>
@@ -15,7 +15,7 @@
#include <util/generic/set.h>
#include <util/generic/string.h>
#include <util/generic/vector.h>
-#include <util/network/address.h>
+#include <util/network/address.h>
#include <util/network/ip.h>
#include <util/network/poller.h>
#include <util/string/util.h>
@@ -58,7 +58,7 @@ namespace NBus {
exc << maxConnect;
}
};
-
+
enum EPollType {
POLL_READ,
POLL_WRITE
diff --git a/library/cpp/messagebus/locator.cpp b/library/cpp/messagebus/locator.cpp
index e38a35c426..242b9b3120 100644
--- a/library/cpp/messagebus/locator.cpp
+++ b/library/cpp/messagebus/locator.cpp
@@ -7,8 +7,8 @@
#include "ybus.h"
#include <util/generic/hash_set.h>
-#include <util/system/hostname.h>
-
+#include <util/system/hostname.h>
+
namespace NBus {
using namespace NAddr;
@@ -17,7 +17,7 @@ namespace NBus {
case AF_INET: {
return ntohs(((const sockaddr_in*)addr.Addr())->sin_port);
}
-
+
case AF_INET6: {
return ntohs(((const sockaddr_in6*)addr.Addr())->sin6_port);
}
@@ -26,9 +26,9 @@ namespace NBus {
ythrow yexception() << "not implemented";
break;
}
- }
+ }
}
-
+
static inline bool GetIp6AddressFromVector(const TVector<TNetAddr>& addrs, TNetAddr* addr) {
for (size_t i = 1; i < addrs.size(); ++i) {
if (addrs[i - 1].Addr()->sa_family == addrs[i].Addr()->sa_family) {
@@ -38,18 +38,18 @@ namespace NBus {
if (GetAddrPort(addrs[i - 1]) != GetAddrPort(addrs[i])) {
return false;
}
- }
-
+ }
+
for (size_t i = 0; i < addrs.size(); ++i) {
if (addrs[i].Addr()->sa_family == AF_INET6) {
*addr = addrs[i];
return true;
}
- }
+ }
return false;
- }
-
+ }
+
EMessageStatus TBusProtocol::GetDestination(const TBusClientSession*, TBusMessage* mess, TBusLocator* locator, TNetAddr* addr) {
TBusService service = GetService();
TBusKey key = GetKey(mess);
@@ -61,8 +61,8 @@ namespace NBus {
} else {
/// lookup address/port in the locator table
locator->LocateAll(service, key, addrs);
- }
-
+ }
+
if (addrs.size() == 0) {
return MESSAGE_SERVICE_UNKNOWN;
} else if (addrs.size() == 1) {
@@ -73,15 +73,15 @@ namespace NBus {
/// to implement custom routing strategy for your service.
return MESSAGE_SERVICE_TOOMANY;
}
- }
+ }
return MESSAGE_OK;
- }
-
+ }
+
static const sockaddr_in* SockAddrIpV4(const IRemoteAddr& a) {
return (const sockaddr_in*)a.Addr();
- }
-
+ }
+
static const sockaddr_in6* SockAddrIpV6(const IRemoteAddr& a) {
return (const sockaddr_in6*)a.Addr();
}
@@ -96,7 +96,7 @@ namespace NBus {
}
return false;
}
-
+
TBusLocator::TBusLocator()
: MyInterfaces(GetNetworkInterfaces())
{
@@ -137,7 +137,7 @@ namespace NBus {
const char* c = ServiceIdSet.insert(name).first->c_str();
return (ui64)c;
}
-
+
int TBusLocator::RegisterBreak(TBusService service, const TVector<TBusKey>& starts, const TNetAddr& addr) {
TGuard<TMutex> G(Lock);
@@ -399,7 +399,7 @@ namespace NBus {
TServiceId serviceId = GetServiceId(service);
TItems::const_iterator it;
-
+
TItem itemToReg(serviceId, start, end, addr);
for (it = Items.lower_bound(TItem(serviceId, 0, start, TNetAddr()));
it != Items.end() && it->ServiceId == serviceId;
diff --git a/library/cpp/messagebus/messqueue.cpp b/library/cpp/messagebus/messqueue.cpp
index 3474d62705..e0e3d5f296 100644
--- a/library/cpp/messagebus/messqueue.cpp
+++ b/library/cpp/messagebus/messqueue.cpp
@@ -78,7 +78,7 @@ bool TBusMessageQueue::IsRunning() {
return AtomicGet(Running);
}
-TBusMessageQueueStatus TBusMessageQueue::GetStatusRecordInternal() const {
+TBusMessageQueueStatus TBusMessageQueue::GetStatusRecordInternal() const {
TBusMessageQueueStatus r;
r.ExecutorStatus = WorkQueue->GetStatusRecordInternal();
r.Config = Config;
diff --git a/library/cpp/messagebus/netaddr.h b/library/cpp/messagebus/netaddr.h
index f915c8c574..f5c441b12f 100644
--- a/library/cpp/messagebus/netaddr.h
+++ b/library/cpp/messagebus/netaddr.h
@@ -1,4 +1,4 @@
-#pragma once
-
+#pragma once
+
#include <library/cpp/messagebus/config/netaddr.h>
-
+
diff --git a/library/cpp/messagebus/rain_check/core/task.cpp b/library/cpp/messagebus/rain_check/core/task.cpp
index a098437d53..f0ba062d8e 100644
--- a/library/cpp/messagebus/rain_check/core/task.cpp
+++ b/library/cpp/messagebus/rain_check/core/task.cpp
@@ -211,6 +211,6 @@ TString TTaskRunnerBase::GetStatusSingleLine() {
return TypeName(*Impl);
}
-bool NRainCheck::AreWeInsideTask() {
+bool NRainCheck::AreWeInsideTask() {
return ThreadCurrentTask != nullptr;
-}
+}
diff --git a/library/cpp/messagebus/rain_check/core/task.h b/library/cpp/messagebus/rain_check/core/task.h
index 7d8778bcda..2eebd6e01c 100644
--- a/library/cpp/messagebus/rain_check/core/task.h
+++ b/library/cpp/messagebus/rain_check/core/task.h
@@ -178,7 +178,7 @@ namespace NRainCheck {
}
};
- // Check that current method executed inside some task.
- bool AreWeInsideTask();
-
+ // Check that current method executed inside some task.
+ bool AreWeInsideTask();
+
}
diff --git a/library/cpp/messagebus/rain_check/messagebus/messagebus_server.h b/library/cpp/messagebus/rain_check/messagebus/messagebus_server.h
index 1334f05fe4..429c8e34d0 100644
--- a/library/cpp/messagebus/rain_check/messagebus/messagebus_server.h
+++ b/library/cpp/messagebus/rain_check/messagebus/messagebus_server.h
@@ -5,7 +5,7 @@
#include <library/cpp/messagebus/ybus.h>
-#include <util/system/yassert.h>
+#include <util/system/yassert.h>
namespace NRainCheck {
class TBusTaskStarter: public NBus::IBusServerHandler {
diff --git a/library/cpp/messagebus/session.cpp b/library/cpp/messagebus/session.cpp
index 46a7ece6a8..78f848beb2 100644
--- a/library/cpp/messagebus/session.cpp
+++ b/library/cpp/messagebus/session.cpp
@@ -24,11 +24,11 @@ namespace NBus {
case AF_INET6: {
return memcmp(&(((const sockaddr_in6*)l.Addr())->sin6_addr), &(((const sockaddr_in6*)r.Addr())->sin6_addr), sizeof(in6_addr));
}
- }
-
+ }
+
return memcmp(l.Addr(), r.Addr(), Min<size_t>(l.Len(), r.Len()));
- }
-
+ }
+
bool operator<(const TNetAddr& a1, const TNetAddr& a2) {
return CompareByHost(a1, a2) < 0;
}
diff --git a/library/cpp/messagebus/ybus.h b/library/cpp/messagebus/ybus.h
index de21ad8521..f1a32de69d 100644
--- a/library/cpp/messagebus/ybus.h
+++ b/library/cpp/messagebus/ybus.h
@@ -3,7 +3,7 @@
/// Asynchronous Messaging Library implements framework for sending and
/// receiving messages between loosely connected processes.
-#include "coreconn.h"
+#include "coreconn.h"
#include "defs.h"
#include "handler.h"
#include "handler_impl.h"
@@ -17,7 +17,7 @@
#include "session.h"
#include "session_config.h"
#include "socket_addr.h"
-
+
#include <library/cpp/messagebus/actor/executor.h>
#include <library/cpp/messagebus/scheduler/scheduler.h>