aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/messagebus
diff options
context:
space:
mode:
authorsomov <somov@yandex-team.ru>2022-02-10 16:45:49 +0300
committerDaniil Cherednik <dcherednik@yandex-team.ru>2022-02-10 16:45:49 +0300
commit7489e4682331202b9c7d863c0898eb83d7b12c2b (patch)
tree9142afc54d335ea52910662635b898e79e192e49 /library/cpp/messagebus
parenta5950576e397b1909261050b8c7da16db58f10b1 (diff)
downloadydb-7489e4682331202b9c7d863c0898eb83d7b12c2b.tar.gz
Restoring authorship annotation for <somov@yandex-team.ru>. Commit 2 of 2.
Diffstat (limited to 'library/cpp/messagebus')
-rw-r--r--library/cpp/messagebus/config/netaddr.h10
-rw-r--r--library/cpp/messagebus/coreconn.cpp4
-rw-r--r--library/cpp/messagebus/coreconn.h24
-rw-r--r--library/cpp/messagebus/event_loop.cpp330
-rw-r--r--library/cpp/messagebus/event_loop.h108
-rw-r--r--library/cpp/messagebus/messqueue.cpp10
-rw-r--r--library/cpp/messagebus/oldmodule/module.cpp16
-rw-r--r--library/cpp/messagebus/oldmodule/module.h2
-rw-r--r--library/cpp/messagebus/remote_client_session.cpp12
-rw-r--r--library/cpp/messagebus/remote_client_session.h6
-rw-r--r--library/cpp/messagebus/remote_connection.cpp58
-rw-r--r--library/cpp/messagebus/remote_connection.h26
-rw-r--r--library/cpp/messagebus/remote_server_session.cpp8
-rw-r--r--library/cpp/messagebus/remote_server_session.h8
-rw-r--r--library/cpp/messagebus/scheduler/scheduler.cpp72
-rw-r--r--library/cpp/messagebus/scheduler/scheduler.h32
-rw-r--r--library/cpp/messagebus/session_impl.cpp72
-rw-r--r--library/cpp/messagebus/session_impl.h36
-rw-r--r--library/cpp/messagebus/storage.cpp26
-rw-r--r--library/cpp/messagebus/storage.h20
-rw-r--r--library/cpp/messagebus/test/perftest/perftest.cpp8
-rw-r--r--library/cpp/messagebus/test/ut/messagebus_ut.cpp4
-rw-r--r--library/cpp/messagebus/ybus.h2
23 files changed, 447 insertions, 447 deletions
diff --git a/library/cpp/messagebus/config/netaddr.h b/library/cpp/messagebus/config/netaddr.h
index 573458ba72..b79c0cc355 100644
--- a/library/cpp/messagebus/config/netaddr.h
+++ b/library/cpp/messagebus/config/netaddr.h
@@ -1,6 +1,6 @@
#pragma once
-#include <util/digest/numeric.h>
+#include <util/digest/numeric.h>
#include <util/generic/hash.h>
#include <util/generic/ptr.h>
#include <util/generic/strbuf.h>
@@ -74,13 +74,13 @@ namespace NBus {
switch (s->sa_family) {
case AF_INET:
return CombineHashes<size_t>(ComputeHash(TStringBuf(reinterpret_cast<const char*>(&sa->sin_addr), sizeof(sa->sin_addr))), IntHashImpl(sa->sin_port));
-
+
case AF_INET6:
return CombineHashes<size_t>(ComputeHash(TStringBuf(reinterpret_cast<const char*>(&sa6->sin6_addr), sizeof(sa6->sin6_addr))), IntHashImpl(sa6->sin6_port));
}
-
+
return ComputeHash(TStringBuf(reinterpret_cast<const char*>(s), a.Len()));
- }
+ }
};
-
+
}
diff --git a/library/cpp/messagebus/coreconn.cpp b/library/cpp/messagebus/coreconn.cpp
index d9436f15d7..d9411bb5db 100644
--- a/library/cpp/messagebus/coreconn.cpp
+++ b/library/cpp/messagebus/coreconn.cpp
@@ -2,10 +2,10 @@
#include "remote_connection.h"
-#include <util/datetime/base.h>
+#include <util/datetime/base.h>
#include <util/generic/yexception.h>
#include <util/network/socket.h>
-#include <util/string/util.h>
+#include <util/string/util.h>
#include <util/system/thread.h>
namespace NBus {
diff --git a/library/cpp/messagebus/coreconn.h b/library/cpp/messagebus/coreconn.h
index f6ec07bef4..fca228d82e 100644
--- a/library/cpp/messagebus/coreconn.h
+++ b/library/cpp/messagebus/coreconn.h
@@ -5,24 +5,24 @@
/// \brief Definitions for asynchonous connection queue
#include "base.h"
-#include "event_loop.h"
+#include "event_loop.h"
#include "netaddr.h"
-#include <util/datetime/base.h>
+#include <util/datetime/base.h>
#include <util/generic/algorithm.h>
#include <util/generic/list.h>
-#include <util/generic/map.h>
-#include <util/generic/set.h>
+#include <util/generic/map.h>
+#include <util/generic/set.h>
#include <util/generic/string.h>
#include <util/generic/vector.h>
#include <util/network/address.h>
#include <util/network/ip.h>
-#include <util/network/poller.h>
-#include <util/string/util.h>
-#include <util/system/condvar.h>
+#include <util/network/poller.h>
+#include <util/string/util.h>
+#include <util/system/condvar.h>
#include <util/system/mutex.h>
#include <util/system/thread.h>
-#include <util/thread/lfqueue.h>
+#include <util/thread/lfqueue.h>
#include <deque>
#include <utility>
@@ -31,9 +31,9 @@
#undef NO_ERROR
#endif
-#define BUS_WORKER_CONDVAR
-//#define BUS_WORKER_MIXED
-
+#define BUS_WORKER_CONDVAR
+//#define BUS_WORKER_MIXED
+
namespace NBus {
class TBusConnection;
class TBusConnectionFactory;
@@ -64,4 +64,4 @@ namespace NBus {
POLL_WRITE
};
-}
+}
diff --git a/library/cpp/messagebus/event_loop.cpp b/library/cpp/messagebus/event_loop.cpp
index b1209d2b5c..f685135bed 100644
--- a/library/cpp/messagebus/event_loop.cpp
+++ b/library/cpp/messagebus/event_loop.cpp
@@ -5,50 +5,50 @@
#include <util/generic/hash.h>
#include <util/network/pair.h>
-#include <util/network/poller.h>
+#include <util/network/poller.h>
#include <util/system/event.h>
#include <util/system/mutex.h>
#include <util/system/thread.h>
-#include <util/system/yassert.h>
+#include <util/system/yassert.h>
#include <util/thread/lfqueue.h>
-
+
#include <errno.h>
-using namespace NEventLoop;
-
-namespace {
+using namespace NEventLoop;
+
+namespace {
enum ERunningState {
EVENT_LOOP_CREATED,
EVENT_LOOP_RUNNING,
EVENT_LOOP_STOPPED,
};
- enum EOperation {
- OP_READ = 1,
- OP_WRITE = 2,
- OP_READ_WRITE = OP_READ | OP_WRITE,
- };
-}
-
-class TChannel::TImpl {
-public:
+ enum EOperation {
+ OP_READ = 1,
+ OP_WRITE = 2,
+ OP_READ_WRITE = OP_READ | OP_WRITE,
+ };
+}
+
+class TChannel::TImpl {
+public:
TImpl(TEventLoop::TImpl* eventLoop, TSocket socket, TEventHandlerPtr, void* cookie);
~TImpl();
-
- void EnableRead();
- void DisableRead();
- void EnableWrite();
- void DisableWrite();
-
- void Unregister();
-
- SOCKET GetSocket() const;
+
+ void EnableRead();
+ void DisableRead();
+ void EnableWrite();
+ void DisableWrite();
+
+ void Unregister();
+
+ SOCKET GetSocket() const;
TSocket GetSocketPtr() const;
-
+
void Update(int pollerFlags, bool enable);
void CallHandler();
- TEventLoop::TImpl* EventLoop;
+ TEventLoop::TImpl* EventLoop;
TSocket Socket;
TEventHandlerPtr EventHandler;
void* Cookie;
@@ -57,130 +57,130 @@ public:
int CurrentFlags;
bool Close;
-};
-
-class TEventLoop::TImpl {
-public:
+};
+
+class TEventLoop::TImpl {
+public:
TImpl(const char* name);
-
- void Run();
- void Wakeup();
- void Stop();
-
+
+ void Run();
+ void Wakeup();
+ void Stop();
+
TChannelPtr Register(TSocket socket, TEventHandlerPtr eventHandler, void* cookie);
- void Unregister(SOCKET socket);
-
+ void Unregister(SOCKET socket);
+
typedef THashMap<SOCKET, TChannelPtr> TData;
-
+
void AddToPoller(SOCKET socket, void* cookie, int flags);
-
+
TMutex Mutex;
-
+
const char* Name;
TAtomic RunningState;
TAtomic StopSignal;
TSystemEvent StoppedEvent;
- TData Data;
-
+ TData Data;
+
TLockFreeQueue<SOCKET> SocketsToRemove;
- TSocketPoller Poller;
- TSocketHolder WakeupReadSocket;
- TSocketHolder WakeupWriteSocket;
-};
-
-TChannel::~TChannel() {
-}
-
-void TChannel::EnableRead() {
- Impl->EnableRead();
-}
-
-void TChannel::DisableRead() {
- Impl->DisableRead();
-}
-
-void TChannel::EnableWrite() {
- Impl->EnableWrite();
-}
-
-void TChannel::DisableWrite() {
- Impl->DisableWrite();
-}
-
-void TChannel::Unregister() {
- Impl->Unregister();
-}
-
-SOCKET TChannel::GetSocket() const {
- return Impl->GetSocket();
-}
-
+ TSocketPoller Poller;
+ TSocketHolder WakeupReadSocket;
+ TSocketHolder WakeupWriteSocket;
+};
+
+TChannel::~TChannel() {
+}
+
+void TChannel::EnableRead() {
+ Impl->EnableRead();
+}
+
+void TChannel::DisableRead() {
+ Impl->DisableRead();
+}
+
+void TChannel::EnableWrite() {
+ Impl->EnableWrite();
+}
+
+void TChannel::DisableWrite() {
+ Impl->DisableWrite();
+}
+
+void TChannel::Unregister() {
+ Impl->Unregister();
+}
+
+SOCKET TChannel::GetSocket() const {
+ return Impl->GetSocket();
+}
+
TSocket TChannel::GetSocketPtr() const {
return Impl->GetSocketPtr();
}
TChannel::TChannel(TImpl* impl)
: Impl(impl)
-{
-}
-
+{
+}
+
TEventLoop::TEventLoop(const char* name)
: Impl(new TImpl(name))
-{
-}
-
+{
+}
+
TEventLoop::~TEventLoop() {
-}
-
-void TEventLoop::Run() {
- Impl->Run();
-}
-
-void TEventLoop::Stop() {
- Impl->Stop();
-}
-
+}
+
+void TEventLoop::Run() {
+ Impl->Run();
+}
+
+void TEventLoop::Stop() {
+ Impl->Stop();
+}
+
bool TEventLoop::IsRunning() {
return AtomicGet(Impl->RunningState) == EVENT_LOOP_RUNNING;
}
TChannelPtr TEventLoop::Register(TSocket socket, TEventHandlerPtr eventHandler, void* cookie) {
return Impl->Register(socket, eventHandler, cookie);
-}
-
+}
+
TChannel::TImpl::TImpl(TEventLoop::TImpl* eventLoop, TSocket socket, TEventHandlerPtr eventHandler, void* cookie)
- : EventLoop(eventLoop)
- , Socket(socket)
+ : EventLoop(eventLoop)
+ , Socket(socket)
, EventHandler(eventHandler)
, Cookie(cookie)
, CurrentFlags(0)
, Close(false)
-{
-}
-
+{
+}
+
TChannel::TImpl::~TImpl() {
Y_ASSERT(Close);
}
-void TChannel::TImpl::EnableRead() {
+void TChannel::TImpl::EnableRead() {
Update(OP_READ, true);
-}
-
-void TChannel::TImpl::DisableRead() {
+}
+
+void TChannel::TImpl::DisableRead() {
Update(OP_READ, false);
-}
-
-void TChannel::TImpl::EnableWrite() {
+}
+
+void TChannel::TImpl::EnableWrite() {
Update(OP_WRITE, true);
-}
-
-void TChannel::TImpl::DisableWrite() {
+}
+
+void TChannel::TImpl::DisableWrite() {
Update(OP_WRITE, false);
-}
-
-void TChannel::TImpl::Unregister() {
+}
+
+void TChannel::TImpl::Unregister() {
TGuard<TMutex> guard(Mutex);
if (Close) {
@@ -196,8 +196,8 @@ void TChannel::TImpl::Unregister() {
EventLoop->SocketsToRemove.Enqueue(Socket);
EventLoop->Wakeup();
-}
-
+}
+
void TChannel::TImpl::Update(int flags, bool enable) {
TGuard<TMutex> guard(Mutex);
@@ -221,10 +221,10 @@ void TChannel::TImpl::Update(int flags, bool enable) {
CurrentFlags = newFlags;
}
-SOCKET TChannel::TImpl::GetSocket() const {
- return Socket;
-}
-
+SOCKET TChannel::TImpl::GetSocket() const {
+ return Socket;
+}
+
TSocket TChannel::TImpl::GetSocketPtr() const {
return Socket;
}
@@ -256,27 +256,27 @@ TEventLoop::TImpl::TImpl(const char* name)
: Name(name)
, RunningState(EVENT_LOOP_CREATED)
, StopSignal(0)
-{
- SOCKET wakeupSockets[2];
-
+{
+ SOCKET wakeupSockets[2];
+
if (SocketPair(wakeupSockets) < 0) {
Y_FAIL("failed to create socket pair for wakeup sockets: %s", LastSystemErrorText());
}
-
- TSocketHolder wakeupReadSocket(wakeupSockets[0]);
- TSocketHolder wakeupWriteSocket(wakeupSockets[1]);
-
- WakeupReadSocket.Swap(wakeupReadSocket);
- WakeupWriteSocket.Swap(wakeupWriteSocket);
-
- SetNonBlock(WakeupWriteSocket, true);
- SetNonBlock(WakeupReadSocket, true);
-
- Poller.WaitRead(WakeupReadSocket,
+
+ TSocketHolder wakeupReadSocket(wakeupSockets[0]);
+ TSocketHolder wakeupWriteSocket(wakeupSockets[1]);
+
+ WakeupReadSocket.Swap(wakeupReadSocket);
+ WakeupWriteSocket.Swap(wakeupWriteSocket);
+
+ SetNonBlock(WakeupWriteSocket, true);
+ SetNonBlock(WakeupReadSocket, true);
+
+ Poller.WaitRead(WakeupReadSocket,
reinterpret_cast<void*>(this));
-}
-
-void TEventLoop::TImpl::Run() {
+}
+
+void TEventLoop::TImpl::Run() {
bool res = AtomicCas(&RunningState, EVENT_LOOP_RUNNING, EVENT_LOOP_CREATED);
Y_VERIFY(res, "Invalid mbus event loop state");
@@ -285,30 +285,30 @@ void TEventLoop::TImpl::Run() {
}
while (AtomicGet(StopSignal) == 0) {
- void* cookies[1024];
+ void* cookies[1024];
const size_t count = Poller.WaitI(cookies, Y_ARRAY_SIZE(cookies));
-
- void** end = cookies + count;
- for (void** c = cookies; c != end; ++c) {
+
+ void** end = cookies + count;
+ for (void** c = cookies; c != end; ++c) {
TChannel::TImpl* s = reinterpret_cast<TChannel::TImpl*>(*c);
-
+
if (*c == this) {
- char buf[0x1000];
+ char buf[0x1000];
if (NBus::NPrivate::SocketRecv(WakeupReadSocket, buf) < 0) {
Y_FAIL("failed to recv from wakeup socket: %s", LastSystemErrorText());
}
- continue;
- }
-
+ continue;
+ }
+
s->CallHandler();
- }
+ }
SOCKET socket = -1;
while (SocketsToRemove.Dequeue(&socket)) {
TGuard<TMutex> guard(Mutex);
Y_VERIFY(Data.erase(socket) == 1, "must be removed once");
}
- }
+ }
{
TGuard<TMutex> guard(Mutex);
@@ -325,9 +325,9 @@ void TEventLoop::TImpl::Run() {
Y_VERIFY(res);
StoppedEvent.Signal();
-}
-
-void TEventLoop::TImpl::Stop() {
+}
+
+void TEventLoop::TImpl::Stop() {
AtomicSet(StopSignal, 1);
if (AtomicGet(RunningState) == EVENT_LOOP_RUNNING) {
@@ -335,36 +335,36 @@ void TEventLoop::TImpl::Stop() {
StoppedEvent.WaitI();
}
-}
-
+}
+
TChannelPtr TEventLoop::TImpl::Register(TSocket socket, TEventHandlerPtr eventHandler, void* cookie) {
Y_VERIFY(socket != INVALID_SOCKET, "must be a valid socket");
TChannelPtr channel = new TChannel(new TChannel::TImpl(this, socket, eventHandler, cookie));
-
+
TGuard<TMutex> guard(Mutex);
-
+
Y_VERIFY(Data.insert(std::make_pair(socket, channel)).second, "must not be already inserted");
-
+
return channel;
-}
-
-void TEventLoop::TImpl::Wakeup() {
+}
+
+void TEventLoop::TImpl::Wakeup() {
if (NBus::NPrivate::SocketSend(WakeupWriteSocket, TArrayRef<const char>("", 1)) < 0) {
if (LastSystemError() != EAGAIN) {
Y_FAIL("failed to send to wakeup socket: %s", LastSystemErrorText());
}
- }
-}
-
-void TEventLoop::TImpl::AddToPoller(SOCKET socket, void* cookie, int flags) {
- if (flags == OP_READ) {
+ }
+}
+
+void TEventLoop::TImpl::AddToPoller(SOCKET socket, void* cookie, int flags) {
+ if (flags == OP_READ) {
Poller.WaitReadOneShot(socket, cookie);
- } else if (flags == OP_WRITE) {
+ } else if (flags == OP_WRITE) {
Poller.WaitWriteOneShot(socket, cookie);
- } else if (flags == OP_READ_WRITE) {
+ } else if (flags == OP_READ_WRITE) {
Poller.WaitReadWriteOneShot(socket, cookie);
- } else {
+ } else {
Y_FAIL("Wrong flags: %d", int(flags));
- }
-}
+ }
+}
diff --git a/library/cpp/messagebus/event_loop.h b/library/cpp/messagebus/event_loop.h
index 677ade2fff..d5b0a53b0c 100644
--- a/library/cpp/messagebus/event_loop.h
+++ b/library/cpp/messagebus/event_loop.h
@@ -1,72 +1,72 @@
-#pragma once
-
+#pragma once
+
#include <util/generic/object_counter.h>
-#include <util/generic/ptr.h>
-#include <util/network/init.h>
+#include <util/generic/ptr.h>
+#include <util/network/init.h>
#include <util/network/socket.h>
-
-namespace NEventLoop {
- struct IEventHandler
+
+namespace NEventLoop {
+ struct IEventHandler
: public TAtomicRefCount<IEventHandler> {
virtual void HandleEvent(SOCKET socket, void* cookie) = 0;
virtual ~IEventHandler() {
}
- };
-
- typedef TIntrusivePtr<IEventHandler> TEventHandlerPtr;
-
- class TEventLoop;
-
+ };
+
+ typedef TIntrusivePtr<IEventHandler> TEventHandlerPtr;
+
+ class TEventLoop;
+
// TODO: make TChannel itself a pointer
// to avoid confusion with Drop and Unregister
- class TChannel
+ class TChannel
: public TAtomicRefCount<TChannel> {
- public:
- ~TChannel();
-
- void EnableRead();
- void DisableRead();
- void EnableWrite();
- void DisableWrite();
-
- void Unregister();
-
- SOCKET GetSocket() const;
+ public:
+ ~TChannel();
+
+ void EnableRead();
+ void DisableRead();
+ void EnableWrite();
+ void DisableWrite();
+
+ void Unregister();
+
+ SOCKET GetSocket() const;
TSocket GetSocketPtr() const;
-
- private:
- class TImpl;
- friend class TEventLoop;
-
+
+ private:
+ class TImpl;
+ friend class TEventLoop;
+
TObjectCounter<TChannel> ObjectCounter;
TChannel(TImpl*);
-
- private:
- THolder<TImpl> Impl;
- };
-
- typedef TIntrusivePtr<TChannel> TChannelPtr;
-
- class TEventLoop {
- public:
+
+ private:
+ THolder<TImpl> Impl;
+ };
+
+ typedef TIntrusivePtr<TChannel> TChannelPtr;
+
+ class TEventLoop {
+ public:
TEventLoop(const char* name = nullptr);
- ~TEventLoop();
-
- void Run();
- void Stop();
+ ~TEventLoop();
+
+ void Run();
+ void Stop();
bool IsRunning();
-
+
TChannelPtr Register(TSocket socket, TEventHandlerPtr, void* cookie = nullptr);
-
- private:
- class TImpl;
- friend class TChannel;
-
+
+ private:
+ class TImpl;
+ friend class TChannel;
+
TObjectCounter<TEventLoop> ObjectCounter;
- private:
- THolder<TImpl> Impl;
- };
-
-}
+ private:
+ THolder<TImpl> Impl;
+ };
+
+}
diff --git a/library/cpp/messagebus/messqueue.cpp b/library/cpp/messagebus/messqueue.cpp
index 9176496252..3474d62705 100644
--- a/library/cpp/messagebus/messqueue.cpp
+++ b/library/cpp/messagebus/messqueue.cpp
@@ -56,7 +56,7 @@ TBusMessageQueue::TBusMessageQueue(const TBusQueueConfig& config, TExecutorPtr e
}
TBusMessageQueue::~TBusMessageQueue() {
- Stop();
+ Stop();
}
void TBusMessageQueue::Stop() {
@@ -127,7 +127,7 @@ TString TBusMessageQueue::GetStatus(ui16 flags) const {
TBusClientSessionPtr TBusMessageQueue::CreateSource(TBusProtocol* proto, IBusClientHandler* handler, const TBusClientSessionConfig& config, const TString& name) {
TRemoteClientSessionPtr session(new TRemoteClientSession(this, proto, handler, config, name));
Add(session.Get());
- return session.Get();
+ return session.Get();
}
TBusServerSessionPtr TBusMessageQueue::CreateDestination(TBusProtocol* proto, IBusServerHandler* handler, const TBusClientSessionConfig& config, const TString& name) {
@@ -189,10 +189,10 @@ void TBusMessageQueue::DestroyAllSessions() {
}
}
-void TBusMessageQueue::Schedule(IScheduleItemAutoPtr i) {
- Scheduler.Schedule(i);
+void TBusMessageQueue::Schedule(IScheduleItemAutoPtr i) {
+ Scheduler.Schedule(i);
}
-
+
TString TBusMessageQueue::GetNameInternal() const {
return Config.Name;
}
diff --git a/library/cpp/messagebus/oldmodule/module.cpp b/library/cpp/messagebus/oldmodule/module.cpp
index 8a64811979..24bd778799 100644
--- a/library/cpp/messagebus/oldmodule/module.cpp
+++ b/library/cpp/messagebus/oldmodule/module.cpp
@@ -79,19 +79,19 @@ namespace NBus {
TBusModuleImpl* const Module;
};
-
+
struct TModuleServerHandler
: public IBusServerHandler {
TModuleServerHandler(TBusModuleImpl* module)
: Module(module)
{
}
-
+
void OnMessage(TOnMessageContext& msg) override;
-
+
TBusModuleImpl* const Module;
};
-
+
struct TBusModuleImpl: public TBusModuleInternal {
TBusModule* const Module;
@@ -677,7 +677,7 @@ namespace NBus {
return true;
}
-
+
bool TBusModule::Shutdown() {
Impl->Shutdown();
@@ -702,7 +702,7 @@ TBusSession* TMyModule::CreateExtSession(TBusMessageQueue& queue) {
Impl->Queue = queue;
return true;
}
-
+
int TBusModule::GetModuleSessionInFlight() const {
return Impl->Size();
}
@@ -781,11 +781,11 @@ void TBusModuleImpl::DestroyJob(TJobRunner* job) {
if (jobCount == 0) {
ShutdownCondVar.BroadCast();
}
- }
+ }
}
job->JobStorageIterator = TList<TJobRunner*>::iterator();
-}
+}
void TBusModuleImpl::OnMessageReceived(TAutoPtr<TBusMessage> msg0, TOnMessageContext& context) {
TBusMessage* msg = !!msg0 ? msg0.Get() : context.GetMessage();
diff --git a/library/cpp/messagebus/oldmodule/module.h b/library/cpp/messagebus/oldmodule/module.h
index 1b75c4df46..8d1c4a5d52 100644
--- a/library/cpp/messagebus/oldmodule/module.h
+++ b/library/cpp/messagebus/oldmodule/module.h
@@ -407,4 +407,4 @@ namespace NBus {
TBusStarter* CreateDefaultStarter(TBusMessageQueue& unused, const TBusSessionConfig& config);
};
-}
+}
diff --git a/library/cpp/messagebus/remote_client_session.cpp b/library/cpp/messagebus/remote_client_session.cpp
index 7bd6e115c7..3bc421944f 100644
--- a/library/cpp/messagebus/remote_client_session.cpp
+++ b/library/cpp/messagebus/remote_client_session.cpp
@@ -1,10 +1,10 @@
#include "remote_client_session.h"
-
+
#include "mb_lwtrace.h"
#include "remote_client_connection.h"
#include <library/cpp/messagebus/scheduler/scheduler.h>
-
+
#include <util/generic/cast.h>
#include <util/system/defaults.h>
@@ -19,9 +19,9 @@ TRemoteClientSession::TRemoteClientSession(TBusMessageQueue* queue,
: TBusSessionImpl(true, queue, proto, handler, config, name)
, ClientRemoteInFlight(config.MaxInFlight, "ClientRemoteInFlight")
, ClientHandler(handler)
-{
-}
-
+{
+}
+
TRemoteClientSession::~TRemoteClientSession() {
//Cerr << "~TRemoteClientSession" << Endl;
}
@@ -31,7 +31,7 @@ void TRemoteClientSession::OnMessageReceived(TRemoteConnection* c, TVectorSwaps<
temp->swap(newMsg);
c->ReplyQueue.EnqueueAll(temp);
c->ScheduleWrite();
-}
+}
EMessageStatus TRemoteClientSession::SendMessageImpl(TBusMessage* msg, const TNetAddr* addr, bool wait, bool oneWay) {
if (Y_UNLIKELY(IsDown())) {
diff --git a/library/cpp/messagebus/remote_client_session.h b/library/cpp/messagebus/remote_client_session.h
index f619d4d86a..7160d0dae9 100644
--- a/library/cpp/messagebus/remote_client_session.h
+++ b/library/cpp/messagebus/remote_client_session.h
@@ -1,5 +1,5 @@
-#pragma once
-
+#pragma once
+
#include "remote_client_session_semaphore.h"
#include "session_impl.h"
@@ -14,7 +14,7 @@
namespace NBus {
namespace NPrivate {
using TRemoteClientSessionPtr = TIntrusivePtr<TRemoteClientSession>;
-
+
class TRemoteClientSession: public TBusClientSession, public TBusSessionImpl {
friend class TRemoteClientConnection;
friend class TInvokeOnReply;
diff --git a/library/cpp/messagebus/remote_connection.cpp b/library/cpp/messagebus/remote_connection.cpp
index 59b58f7797..22932569db 100644
--- a/library/cpp/messagebus/remote_connection.cpp
+++ b/library/cpp/messagebus/remote_connection.cpp
@@ -7,9 +7,9 @@
#include "remote_client_session.h"
#include "remote_server_session.h"
#include "session_impl.h"
-
+
#include <library/cpp/messagebus/actor/what_thread_does.h>
-
+
#include <util/generic/cast.h>
#include <util/network/init.h>
#include <util/system/atomic.h>
@@ -44,7 +44,7 @@ namespace NBus {
WriterData.Status.ConnectionId = connectionId;
WriterData.Status.PeerAddr = PeerAddr;
ReaderData.Status.ConnectionId = connectionId;
-
+
const TInstant now = TInstant::Now();
WriterFillStatus();
@@ -70,7 +70,7 @@ namespace NBus {
Y_VERIFY(AtomicGet(Down));
Y_VERIFY(SendQueue.Empty());
}
-
+
bool TRemoteConnection::TReaderData::HasBytesInBuf(size_t bytes) noexcept {
size_t left = Buffer.Size() - Offset;
@@ -137,7 +137,7 @@ namespace NBus {
void TRemoteConnection::Shutdown(EMessageStatus status) {
ScheduleShutdown(status);
-
+
ReaderData.ShutdownComplete.WaitI();
WriterData.ShutdownComplete.WaitI();
}
@@ -145,15 +145,15 @@ namespace NBus {
void TRemoteConnection::TryConnect() {
Y_FAIL("TryConnect is client connection only operation");
}
-
+
void TRemoteConnection::ScheduleRead() {
GetReaderActor()->Schedule();
}
-
+
void TRemoteConnection::ScheduleWrite() {
GetWriterActor()->Schedule();
}
-
+
void TRemoteConnection::WriterRotateCounters() {
if (!WriterData.TimeToRotateCounters.FetchTask()) {
return;
@@ -383,7 +383,7 @@ namespace NBus {
if (ReaderData.Buffer.Capacity() > MaxBufferSize && ReaderData.Buffer.Size() <= MaxBufferSize) {
ReaderData.Status.Incremental.BufferDrops += 1;
-
+
TBuffer temp;
// probably should use another constant
temp.Reserve(Config.DefaultBufferSize);
@@ -391,7 +391,7 @@ namespace NBus {
ReaderData.Buffer.Swap(temp);
}
-
+
return true;
}
@@ -406,7 +406,7 @@ namespace NBus {
ReaderData.Buffer.Reserve(ReaderData.Buffer.Size() * 2);
}
}
-
+
Y_ASSERT(ReaderData.Buffer.Avail() > 0);
ssize_t bytes;
@@ -465,27 +465,27 @@ namespace NBus {
if (!Session->IsSource_) {
message->SendTime = now.MilliSeconds();
}
-
+
WriterData.SendQueue.PushBack(message);
}
-
+
void TRemoteConnection::ProcessBeforeSendQueue(TInstant now) {
BeforeSendQueue.DequeueAll(std::bind(&TRemoteConnection::ProcessBeforeSendQueueMessage, this, std::placeholders::_1, now));
- }
-
+ }
+
void TRemoteConnection::WriterFillInFlight() {
// this is hack for TLoadBalancedProtocol
WriterFillStatus();
AtomicSet(WriterData.InFlight, WriterData.Status.GetInFlight());
}
-
+
const TRemoteConnectionWriterStatus& TRemoteConnection::WriterGetStatus() {
WriterRotateCounters();
WriterFillStatus();
return WriterData.Status;
}
-
+
void TRemoteConnection::WriterFillStatus() {
if (!!WriterData.Channel) {
WriterData.Status.Fd = WriterData.Channel->GetSocket();
@@ -644,11 +644,11 @@ namespace NBus {
if (WriterData.Buffer.Capacity() > MaxBufferSize) {
WriterData.Status.Incremental.BufferDrops += 1;
WriterData.Buffer.Reset();
- }
+ }
WriterData.State = WRITER_FILLING;
}
-
+
void TRemoteConnection::ScheduleShutdownOnServerOrReconnectOnClient(EMessageStatus status, bool writer) {
if (Session->IsSource_) {
WriterGetReconnectQueue()->EnqueueAndSchedule(writer ? WriterData.SocketVersion : ReaderData.SocketVersion);
@@ -662,11 +662,11 @@ namespace NBus {
AtomicSet(ReaderData.Down, 1);
ScheduleRead();
-
+
AtomicSet(WriterData.Down, 1);
ScheduleWrite();
}
-
+
void TRemoteConnection::CallSerialize(TBusMessage* msg, TBuffer& buffer) const {
size_t posForAssertion = buffer.Size();
Proto->Serialize(msg, buffer);
@@ -688,12 +688,12 @@ namespace NBus {
}
}
-
+
void TRemoteConnection::SerializeMessage(TBusMessage* msg, TBuffer* data, TMessageCounter* counter) const {
size_t pos = data->Size();
-
+
size_t dataSize;
-
+
bool compressionRequested = msg->IsCompressed();
if (compressionRequested) {
@@ -821,20 +821,20 @@ namespace NBus {
TBusMessagePtrAndHeader h(r);
r->RecvTime = now;
-
+
QuotaConsume(1, header.Size);
ReaderData.ReadMessages.push_back(h);
if (ReaderData.ReadMessages.size() >= 100) {
ReaderFlushMessages();
}
-
+
return true;
}
void TRemoteConnection::WriterFillBuffer() {
Y_ASSERT(WriterData.State == WRITER_FILLING);
-
+
Y_ASSERT(WriterData.Buffer.LeftSize() == 0);
if (Y_UNLIKELY(!WrongVersionRequests.IsEmpty())) {
@@ -868,7 +868,7 @@ namespace NBus {
WriterData.CorkUntil = TInstant::Now() + Config.Cork;
}
}
-
+
size_t sizeBeforeSerialize = WriterData.Buffer.Size();
TMessageCounter messageCounter = WriterData.Status.Incremental.MessageCounter;
@@ -952,7 +952,7 @@ namespace NBus {
void TRemoteConnection::WriterErrorMessages(const TArrayRef<TBusMessage*> ms, EMessageStatus status) {
ResetOneWayFlag(ms);
-
+
WriterData.Status.Incremental.StatusCounter[status] += ms.size();
for (auto m : ms) {
Session->InvokeOnError(m, status);
diff --git a/library/cpp/messagebus/remote_connection.h b/library/cpp/messagebus/remote_connection.h
index ee0665774d..4538947368 100644
--- a/library/cpp/messagebus/remote_connection.h
+++ b/library/cpp/messagebus/remote_connection.h
@@ -1,8 +1,8 @@
-#pragma once
-
+#pragma once
+
#include "async_result.h"
#include "defs.h"
-#include "event_loop.h"
+#include "event_loop.h"
#include "left_right_buffer.h"
#include "lfqueue_batch.h"
#include "message_ptr_and_header.h"
@@ -15,7 +15,7 @@
#include "ybus.h"
#include "misc/granup.h"
#include "misc/tokenquota.h"
-
+
#include <library/cpp/messagebus/actor/actor.h>
#include <library/cpp/messagebus/actor/executor.h>
#include <library/cpp/messagebus/actor/queue_for_actor.h>
@@ -96,7 +96,7 @@ namespace NBus {
void Shutdown(EMessageStatus status);
inline const TNetAddr& GetAddr() const noexcept;
-
+
private:
friend class TScheduleConnect;
friend class TWorkIO;
@@ -111,14 +111,14 @@ namespace NBus {
bool ReaderProcessBuffer();
bool ReaderFillBuffer();
void ReaderFlushMessages();
-
+
void ReadQuotaWakeup();
ui32 WriteWakeFlags() const;
-
+
virtual bool NeedInterruptRead() {
return false;
}
-
+
public:
virtual void TryConnect();
void ProcessItem(TReaderTag, ::NActor::TDefaultTag, TWriterToReaderSocketMessage);
@@ -174,7 +174,7 @@ namespace NBus {
void WriterErrorMessage(TNonDestroyingAutoPtr<TBusMessage> m, EMessageStatus status);
// takes ownership of ms
void WriterErrorMessages(const TArrayRef<TBusMessage*> ms, EMessageStatus status);
-
+
void FireClientConnectionEvent(TClientConnectionEvent::EType);
size_t GetInFlight();
@@ -207,14 +207,14 @@ namespace NBus {
NEventLoop::TChannelPtr Channel;
ui32 SocketVersion;
-
+
TRemoteConnectionWriterStatus Status;
TInstant StatusLastSendTime;
-
+
TLocalTasks TimeToRotateCounters;
TAtomic InFlight;
-
+
TTimedMessages SendQueue;
ui32 AwakeFlags;
EWriterState State;
@@ -290,5 +290,5 @@ namespace NBus {
typedef TIntrusivePtr<TRemoteConnection> TRemoteConnectionPtr;
- }
+ }
}
diff --git a/library/cpp/messagebus/remote_server_session.cpp b/library/cpp/messagebus/remote_server_session.cpp
index 12765ab9b4..6abbf88a60 100644
--- a/library/cpp/messagebus/remote_server_session.cpp
+++ b/library/cpp/messagebus/remote_server_session.cpp
@@ -21,14 +21,14 @@ TRemoteServerSession::TRemoteServerSession(TBusMessageQueue* queue,
: TBusSessionImpl(false, queue, proto, handler, config, name)
, ServerOwnedMessages(config.MaxInFlight, config.MaxInFlightBySize, "ServerOwnedMessages")
, ServerHandler(handler)
-{
+{
if (config.PerConnectionMaxInFlightBySize > 0) {
if (config.PerConnectionMaxInFlightBySize < config.MaxMessageSize)
ythrow yexception()
<< "too low PerConnectionMaxInFlightBySize value";
}
-}
-
+}
+
namespace NBus {
namespace NPrivate {
class TInvokeOnMessage: public IWorkItem {
@@ -83,7 +83,7 @@ void TRemoteServerSession::OnMessageReceived(TRemoteConnection* c, TVectorSwaps<
JobCount.Add(workQueueTemp.GetVector()->size());
Queue->EnqueueWork(*workQueueTemp.GetVector());
}
-}
+}
void TRemoteServerSession::InvokeOnMessage(TBusMessagePtrAndHeader& request, TIntrusivePtr<TRemoteServerConnection>& conn) {
if (Y_UNLIKELY(AtomicGet(Down))) {
diff --git a/library/cpp/messagebus/remote_server_session.h b/library/cpp/messagebus/remote_server_session.h
index c70dde00e2..f5c266a7f7 100644
--- a/library/cpp/messagebus/remote_server_session.h
+++ b/library/cpp/messagebus/remote_server_session.h
@@ -1,8 +1,8 @@
-#pragma once
-
+#pragma once
+
#include "remote_server_session_semaphore.h"
#include "session_impl.h"
-
+
#ifdef _MSC_VER
#pragma warning(push)
#pragma warning(disable : 4250) // 'NBus::NPrivate::TRemoteClientSession' : inherits 'NBus::NPrivate::TBusSessionImpl::NBus::NPrivate::TBusSessionImpl::GetConfig' via dominance
@@ -12,7 +12,7 @@ namespace NBus {
namespace NPrivate {
class TRemoteServerSession: public TBusServerSession, public TBusSessionImpl {
friend class TRemoteServerConnection;
-
+
private:
TObjectCounter<TRemoteServerSession> ObjectCounter;
diff --git a/library/cpp/messagebus/scheduler/scheduler.cpp b/library/cpp/messagebus/scheduler/scheduler.cpp
index 5c0686d32a..5a5fe52894 100644
--- a/library/cpp/messagebus/scheduler/scheduler.cpp
+++ b/library/cpp/messagebus/scheduler/scheduler.cpp
@@ -1,37 +1,37 @@
-#include "scheduler.h"
-
-#include <util/datetime/base.h>
-#include <util/generic/algorithm.h>
-#include <util/generic/yexception.h>
-
+#include "scheduler.h"
+
+#include <util/datetime/base.h>
+#include <util/generic/algorithm.h>
+#include <util/generic/yexception.h>
+
//#include "dummy_debugger.h"
using namespace NBus;
using namespace NBus::NPrivate;
-
-class TScheduleDeadlineCompare {
-public:
+
+class TScheduleDeadlineCompare {
+public:
bool operator()(const IScheduleItemAutoPtr& i1, const IScheduleItemAutoPtr& i2) const noexcept {
return i1->GetScheduleTime() > i2->GetScheduleTime();
- }
-};
-
-TScheduler::TScheduler()
+ }
+};
+
+TScheduler::TScheduler()
: StopThread(false)
, Thread([&] { this->SchedulerThread(); })
-{
-}
-
-TScheduler::~TScheduler() {
+{
+}
+
+TScheduler::~TScheduler() {
Y_VERIFY(StopThread, "state check");
-}
-
+}
+
size_t TScheduler::Size() const {
TGuard<TLock> guard(Lock);
return Items.size() + (!!NextItem ? 1 : 0);
}
-void TScheduler::Stop() {
+void TScheduler::Stop() {
{
TGuard<TLock> guard(Lock);
Y_VERIFY(!StopThread, "Scheduler already stopped");
@@ -46,28 +46,28 @@ void TScheduler::Stop() {
for (auto& item : Items) {
item.Destroy();
- }
-}
-
-void TScheduler::Schedule(TAutoPtr<IScheduleItem> i) {
+ }
+}
+
+void TScheduler::Schedule(TAutoPtr<IScheduleItem> i) {
TGuard<TLock> lock(Lock);
if (StopThread)
- return;
+ return;
if (!!NextItem) {
if (i->GetScheduleTime() < NextItem->GetScheduleTime()) {
DoSwap(i, NextItem);
}
- }
+ }
- Items.push_back(i);
+ Items.push_back(i);
PushHeap(Items.begin(), Items.end(), TScheduleDeadlineCompare());
FillNextItem();
CondVar.Signal();
-}
-
+}
+
void TScheduler::FillNextItem() {
if (!NextItem && !Items.empty()) {
PopHeap(Items.begin(), Items.end(), TScheduleDeadlineCompare());
@@ -76,22 +76,22 @@ void TScheduler::FillNextItem() {
}
}
-void TScheduler::SchedulerThread() {
+void TScheduler::SchedulerThread() {
for (;;) {
IScheduleItemAutoPtr current;
- {
+ {
TGuard<TLock> guard(Lock);
if (StopThread) {
break;
- }
+ }
if (!!NextItem) {
CondVar.WaitD(Lock, NextItem->GetScheduleTime());
} else {
CondVar.WaitI(Lock);
- }
+ }
if (StopThread) {
break;
@@ -106,7 +106,7 @@ void TScheduler::SchedulerThread() {
}
current = NextItem.Release();
- }
+ }
current->Do();
current.Destroy();
@@ -115,5 +115,5 @@ void TScheduler::SchedulerThread() {
TGuard<TLock> guard(Lock);
FillNextItem();
}
- }
-}
+ }
+}
diff --git a/library/cpp/messagebus/scheduler/scheduler.h b/library/cpp/messagebus/scheduler/scheduler.h
index 996bf30f8c..afcc0de55d 100644
--- a/library/cpp/messagebus/scheduler/scheduler.h
+++ b/library/cpp/messagebus/scheduler/scheduler.h
@@ -1,16 +1,16 @@
-#pragma once
-
+#pragma once
+
#include <library/cpp/threading/future/legacy_future.h>
#include <util/datetime/base.h>
#include <util/generic/object_counter.h>
-#include <util/generic/ptr.h>
-#include <util/generic/vector.h>
-#include <util/system/atomic.h>
+#include <util/generic/ptr.h>
+#include <util/generic/vector.h>
+#include <util/system/atomic.h>
#include <util/system/condvar.h>
#include <util/system/mutex.h>
-#include <util/system/thread.h>
-
+#include <util/system/thread.h>
+
namespace NBus {
namespace NPrivate {
class IScheduleItem {
@@ -25,30 +25,30 @@ namespace NBus {
private:
TInstant ScheduleTime;
};
-
+
using IScheduleItemAutoPtr = TAutoPtr<IScheduleItem>;
-
+
class TScheduler {
public:
TScheduler();
~TScheduler();
void Stop();
void Schedule(TAutoPtr<IScheduleItem> i);
-
+
size_t Size() const;
-
+
private:
void SchedulerThread();
-
+
void FillNextItem();
-
+
private:
TVector<IScheduleItemAutoPtr> Items;
IScheduleItemAutoPtr NextItem;
typedef TMutex TLock;
TLock Lock;
TCondVar CondVar;
-
+
TObjectCounter<TScheduler> ObjectCounter;
bool StopThread;
@@ -63,6 +63,6 @@ namespace NBus {
inline TInstant IScheduleItem::GetScheduleTime() const noexcept {
return ScheduleTime;
}
-
+
}
-}
+}
diff --git a/library/cpp/messagebus/session_impl.cpp b/library/cpp/messagebus/session_impl.cpp
index 76790221ec..ddf9f360c4 100644
--- a/library/cpp/messagebus/session_impl.cpp
+++ b/library/cpp/messagebus/session_impl.cpp
@@ -14,7 +14,7 @@ using namespace NActor;
using namespace NBus;
using namespace NBus::NPrivate;
using namespace NEventLoop;
-
+
namespace {
class TScheduleSession: public IScheduleItem {
public:
@@ -95,7 +95,7 @@ TBusSessionImpl::TBusSessionImpl(bool isSource, TBusMessageQueue* queue, TBusPro
: TActor<TBusSessionImpl, TStatusTag>(queue->WorkQueue.Get())
, TActor<TBusSessionImpl, TConnectionTag>(queue->WorkQueue.Get())
, Impl(new TImpl)
- , IsSource_(isSource)
+ , IsSource_(isSource)
, Queue(queue)
, Proto(proto)
, ProtoName(Proto->GetService())
@@ -106,16 +106,16 @@ TBusSessionImpl::TBusSessionImpl(bool isSource, TBusMessageQueue* queue, TBusPro
, ReadEventLoop("rd-el")
, LastAcceptorId(0)
, LastConnectionId(0)
- , Down(0)
-{
+ , Down(0)
+{
Impl->DeadAcceptorStatusSummary.Summary = true;
ReadEventLoopThread.Reset(new NThreading::TLegacyFuture<void, false>(std::bind(&TEventLoop::Run, std::ref(ReadEventLoop))));
WriteEventLoopThread.Reset(new NThreading::TLegacyFuture<void, false>(std::bind(&TEventLoop::Run, std::ref(WriteEventLoop))));
Queue->Schedule(IScheduleItemAutoPtr(new TScheduleSession(this, TInstant::Now() + Config.Secret.TimeoutPeriod)));
-}
-
+}
+
TBusSessionImpl::~TBusSessionImpl() {
Y_VERIFY(Down);
Y_VERIFY(ShutdownCompleteEvent.WaitT(TDuration::Zero()));
@@ -160,11 +160,11 @@ void TBusSessionImpl::Shutdown() {
TGuard<TMutex> guard(ConnectionsLock);
Acceptors.clear();
}
-
+
for (auto& acceptor : acceptors) {
acceptor->Shutdown();
- }
-
+ }
+
// shutdown connections
TVector<TRemoteConnectionPtr> cs;
GetConnections(&cs);
@@ -189,12 +189,12 @@ void TBusSessionImpl::Shutdown() {
HandlerUseCountHolder.Reset();
ShutdownCompleteEvent.Signal();
-}
-
+}
+
bool TBusSessionImpl::IsDown() {
- return static_cast<bool>(AtomicGet(Down));
-}
-
+ return static_cast<bool>(AtomicGet(Down));
+}
+
size_t TBusSessionImpl::GetInFlightImpl(const TNetAddr& addr) const {
TRemoteConnectionPtr conn = const_cast<TBusSessionImpl*>(this)->GetConnection(addr, false);
if (!!conn) {
@@ -202,8 +202,8 @@ size_t TBusSessionImpl::GetInFlightImpl(const TNetAddr& addr) const {
} else {
return 0;
}
-}
-
+}
+
void TBusSessionImpl::GetInFlightBulk(TArrayRef<const TNetAddr> addrs, TArrayRef<size_t> results) const {
Y_VERIFY(addrs.size() == results.size(), "input.size != output.size");
for (size_t i = 0; i < addrs.size(); ++i) {
@@ -427,7 +427,7 @@ void TBusSessionImpl::StatusUpdateCachedDump() {
}
r.Config = Config;
-
+
TGuard<TMutex> guard(StatusData.StatusDumpCachedMutex);
StatusData.StatusDumpCached = r;
}
@@ -490,7 +490,7 @@ void TBusSessionImpl::Listen(int port, TBusMessageQueue* q) {
void TBusSessionImpl::Listen(const TVector<TBindResult>& bindTo, TBusMessageQueue* q) {
Y_ASSERT(q == Queue);
int actualPort = -1;
-
+
for (const TBindResult& br : bindTo) {
if (actualPort == -1) {
actualPort = br.Addr.GetPort();
@@ -502,14 +502,14 @@ void TBusSessionImpl::Listen(const TVector<TBindResult>& bindTo, TBusMessageQueu
}
TAcceptorPtr acceptor(new TAcceptor(this, ++LastAcceptorId, br.Socket->Release(), br.Addr));
-
+
TConnectionsGuard guard(ConnectionsLock);
InsertAcceptorLockAcquired(acceptor.Get());
- }
+ }
Config.ListenPort = actualPort;
-}
-
+}
+
void TBusSessionImpl::SendSnapshotToStatusActor() {
//Y_ASSERT(ConnectionsLock.IsLocked());
@@ -604,24 +604,24 @@ void TBusSessionImpl::InvokeOnError(TNonDestroyingAutoPtr<TBusMessage> message,
TRemoteConnectionPtr TBusSessionImpl::GetConnection(const TBusSocketAddr& addr, bool create) {
TConnectionsGuard guard(ConnectionsLock);
-
- TAddrRemoteConnections::const_iterator it = Connections.find(addr);
- if (it != Connections.end()) {
- return it->second;
- }
-
- if (!create) {
- return TRemoteConnectionPtr();
- }
-
+
+ TAddrRemoteConnections::const_iterator it = Connections.find(addr);
+ if (it != Connections.end()) {
+ return it->second;
+ }
+
+ if (!create) {
+ return TRemoteConnectionPtr();
+ }
+
Y_VERIFY(IsSource_, "must be source");
TRemoteConnectionPtr c(new TRemoteClientConnection(VerifyDynamicCast<TRemoteClientSession*>(this), ++LastConnectionId, addr.ToNetAddr()));
InsertConnectionLockAcquired(c.Get());
-
- return c;
-}
-
+
+ return c;
+}
+
void TBusSessionImpl::Cron() {
TVector<TRemoteConnectionPtr> connections;
GetConnections(&connections);
diff --git a/library/cpp/messagebus/session_impl.h b/library/cpp/messagebus/session_impl.h
index d980ce6ce3..90ef246ff8 100644
--- a/library/cpp/messagebus/session_impl.h
+++ b/library/cpp/messagebus/session_impl.h
@@ -1,5 +1,5 @@
-#pragma once
-
+#pragma once
+
#include "acceptor_status.h"
#include "async_result.h"
#include "event_loop.h"
@@ -9,7 +9,7 @@
#include "session_job_count.h"
#include "shutdown_state.h"
#include "ybus.h"
-
+
#include <library/cpp/messagebus/actor/actor.h>
#include <library/cpp/messagebus/actor/queue_in_actor.h>
#include <library/cpp/messagebus/monitoring/mon_proto.pb.h>
@@ -25,7 +25,7 @@ namespace NBus {
typedef TIntrusivePtr<TRemoteServerConnection> TRemoteServerConnectionPtr;
typedef TIntrusivePtr<TRemoteServerSession> TRemoteServerSessionPtr;
-
+
typedef TIntrusivePtr<TAcceptor> TAcceptorPtr;
typedef TVector<TAcceptorPtr> TAcceptorsPtrs;
@@ -34,7 +34,7 @@ namespace NBus {
TVector<TAcceptorPtr> Acceptors;
ui64 LastConnectionId;
ui64 LastAcceptorId;
-
+
TConnectionsAcceptorsSnapshot();
};
@@ -96,13 +96,13 @@ namespace NBus {
const TBusSessionConfig& config, const TString& name);
~TBusSessionImpl() override;
-
+
void Shutdown() override;
bool IsDown();
size_t GetInFlightImpl(const TNetAddr& addr) const;
size_t GetConnectSyscallsNumForTestImpl(const TNetAddr& addr) const;
-
+
void GetInFlightBulk(TArrayRef<const TNetAddr> addrs, TArrayRef<size_t> results) const override;
void GetConnectSyscallsNumBulkForTest(TArrayRef<const TNetAddr> addrs, TArrayRef<size_t> results) const override;
@@ -123,29 +123,29 @@ namespace NBus {
void StatusUpdateCachedDumpIfNecessary(TInstant now);
void Act(TStatusTag);
void Act(TConnectionTag);
-
+
TBusProtocol* GetProto() const noexcept override;
const TBusSessionConfig* GetConfig() const noexcept override;
TBusMessageQueue* GetQueue() const noexcept override;
TString GetNameInternal() override;
virtual void OnMessageReceived(TRemoteConnection* c, TVectorSwaps<TBusMessagePtrAndHeader>& newMsg) = 0;
-
+
void Listen(int port, TBusMessageQueue* q);
void Listen(const TVector<TBindResult>& bindTo, TBusMessageQueue* q);
TBusConnection* Accept(SOCKET listen);
-
+
inline ::NActor::TActor<TBusSessionImpl, TStatusTag>* GetStatusActor() {
return this;
}
inline ::NActor::TActor<TBusSessionImpl, TConnectionTag>* GetConnectionsActor() {
return this;
}
-
+
typedef THashMap<TBusSocketAddr, TRemoteConnectionPtr> TAddrRemoteConnections;
void SendSnapshotToStatusActor();
-
+
void InsertConnectionLockAcquired(TRemoteConnection* connection);
void InsertAcceptorLockAcquired(TAcceptor* acceptor);
@@ -159,7 +159,7 @@ namespace NBus {
TAcceptorPtr GetAcceptorById(ui64 id);
void InvokeOnError(TNonDestroyingAutoPtr<TBusMessage>, EMessageStatus);
-
+
void Cron();
TBusSessionJobCount JobCount;
@@ -193,7 +193,7 @@ namespace NBus {
struct TStatusData {
TAtomicSharedPtr<TConnectionsAcceptorsSnapshot> ConnectionsAcceptorsSnapshot;
::NActor::TQueueForActor<TAtomicSharedPtr<TConnectionsAcceptorsSnapshot>> ConnectionsAcceptorsSnapshotsQueue;
-
+
TAtomicShutdownState ShutdownState;
TBusSessionStatus Status;
@@ -246,14 +246,14 @@ namespace NBus {
inline TBusProtocol* TBusSessionImpl::GetProto() const noexcept {
return Proto;
}
-
+
inline const TBusSessionConfig* TBusSessionImpl::GetConfig() const noexcept {
return &Config;
}
-
+
inline TBusMessageQueue* TBusSessionImpl::GetQueue() const noexcept {
return Queue;
}
-
- }
+
+ }
}
diff --git a/library/cpp/messagebus/storage.cpp b/library/cpp/messagebus/storage.cpp
index 6743f0abe4..efefc87340 100644
--- a/library/cpp/messagebus/storage.cpp
+++ b/library/cpp/messagebus/storage.cpp
@@ -6,7 +6,7 @@ namespace NBus {
namespace NPrivate {
TTimedMessages::TTimedMessages() {
}
-
+
TTimedMessages::~TTimedMessages() {
Y_VERIFY(Items.empty());
}
@@ -33,14 +33,14 @@ namespace NBus {
size_t TTimedMessages::Size() const {
return Items.size();
}
-
+
void TTimedMessages::Timeout(TInstant before, TMessagesPtrs* r) {
// shortcut
if (before == TInstant::Max()) {
Clear(r);
return;
}
-
+
while (!Items.empty()) {
TItem& i = *Items.front();
if (TInstant::MilliSeconds(i.Message->GetHeader()->SendTime) > before) {
@@ -50,14 +50,14 @@ namespace NBus {
Items.pop_front();
}
}
-
+
void TTimedMessages::Clear(TMessagesPtrs* r) {
while (!Items.empty()) {
r->push_back(Items.front()->Message.Release());
Items.pop_front();
}
}
-
+
TSyncAckMessages::TSyncAckMessages() {
KeyToMessage.set_empty_key(0);
KeyToMessage.set_deleted_key(1);
@@ -66,14 +66,14 @@ namespace NBus {
TSyncAckMessages::~TSyncAckMessages() {
Y_VERIFY(KeyToMessage.empty());
Y_VERIFY(TimedItems.empty());
- }
-
+ }
+
void TSyncAckMessages::Push(TBusMessagePtrAndHeader& m) {
// Perform garbage collection if `TimedMessages` contain too many junk data
if (TimedItems.size() > 1000 && TimedItems.size() > KeyToMessage.size() * 4) {
Gc();
}
-
+
TValue value = {m.MessagePtr.Release()};
std::pair<TKeyToMessage::iterator, bool> p = KeyToMessage.insert(TKeyToMessage::value_type(m.Header.Id, value));
@@ -95,7 +95,7 @@ namespace NBus {
return v.Message;
}
-
+
void TSyncAckMessages::Timeout(TInstant before, TMessagesPtrs* r) {
// shortcut
if (before == TInstant::Max()) {
@@ -110,7 +110,7 @@ namespace NBus {
if (TInstant::MilliSeconds(i.SendTime) > before) {
break;
}
-
+
TKeyToMessage::iterator itMessage = KeyToMessage.find(i.Key);
if (itMessage != KeyToMessage.end()) {
@@ -133,7 +133,7 @@ namespace NBus {
void TSyncAckMessages::Gc() {
TDeque<TTimedItem> tmp;
-
+
for (auto& timedItem : TimedItems) {
if (KeyToMessage.find(timedItem.Key) == KeyToMessage.end()) {
continue;
@@ -143,7 +143,7 @@ namespace NBus {
TimedItems.swap(tmp);
}
-
+
void TSyncAckMessages::RemoveAll(const TMessagesPtrs& messages) {
for (auto message : messages) {
TKeyToMessage::iterator it = KeyToMessage.find(message->GetHeader()->Id);
@@ -158,4 +158,4 @@ namespace NBus {
}
}
-}
+}
diff --git a/library/cpp/messagebus/storage.h b/library/cpp/messagebus/storage.h
index 3f8de480a1..7d168844ed 100644
--- a/library/cpp/messagebus/storage.h
+++ b/library/cpp/messagebus/storage.h
@@ -1,5 +1,5 @@
-#pragma once
-
+#pragma once
+
#include "message_ptr_and_header.h"
#include "moved.h"
#include "ybus.h"
@@ -18,7 +18,7 @@ namespace NBus {
public:
TTimedMessages();
~TTimedMessages();
-
+
struct TItem {
THolder<TBusMessage> Message;
@@ -36,31 +36,31 @@ namespace NBus {
void Timeout(TInstant before, TMessagesPtrs* r);
void Clear(TMessagesPtrs* r);
-
+
private:
TItems Items;
};
-
+
class TSyncAckMessages : TNonCopyable {
public:
TSyncAckMessages();
~TSyncAckMessages();
-
+
void Push(TBusMessagePtrAndHeader& m);
TBusMessage* Pop(TBusKey id);
void Timeout(TInstant before, TMessagesPtrs* r);
void Clear(TMessagesPtrs* r);
-
+
size_t Size() const {
return KeyToMessage.size();
}
-
+
void RemoveAll(const TMessagesPtrs&);
-
+
void Gc();
-
+
void DumpState();
private:
diff --git a/library/cpp/messagebus/test/perftest/perftest.cpp b/library/cpp/messagebus/test/perftest/perftest.cpp
index e4ea37f0f3..8489319278 100644
--- a/library/cpp/messagebus/test/perftest/perftest.cpp
+++ b/library/cpp/messagebus/test/perftest/perftest.cpp
@@ -605,10 +605,10 @@ int main(int argc, char* argv[]) {
opts.AddLongOption('s', "server-port", "server port").RequiredArgument("port").StoreResult(&TheConfig->ServerPort);
opts.AddCharOption('m', "average message size").RequiredArgument("size").StoreResult(&TheConfig->MessageSize);
opts.AddLongOption('c', "server-host", "server hosts").RequiredArgument("host[,host]...").StoreResult(&TheConfig->Nodes);
- opts.AddCharOption('f', "failure rate (rational number between 0 and 1)").RequiredArgument("rate").StoreResult(&TheConfig->Failure);
- opts.AddCharOption('w', "delay before reply").RequiredArgument("microseconds").StoreResult(&TheConfig->Delay);
- opts.AddCharOption('r', "run duration").RequiredArgument("seconds").StoreResult(&TheConfig->Run);
- opts.AddLongOption("client-count", "amount of clients").RequiredArgument("count").StoreResult(&TheConfig->ClientCount).DefaultValue("1");
+ opts.AddCharOption('f', "failure rate (rational number between 0 and 1)").RequiredArgument("rate").StoreResult(&TheConfig->Failure);
+ opts.AddCharOption('w', "delay before reply").RequiredArgument("microseconds").StoreResult(&TheConfig->Delay);
+ opts.AddCharOption('r', "run duration").RequiredArgument("seconds").StoreResult(&TheConfig->Run);
+ opts.AddLongOption("client-count", "amount of clients").RequiredArgument("count").StoreResult(&TheConfig->ClientCount).DefaultValue("1");
opts.AddLongOption("server-use-modules").StoreResult(&TheConfig->ServerUseModules, true);
opts.AddLongOption("on-message-in-pool", "execute OnMessage callback in worker pool")
.RequiredArgument("BOOL")
diff --git a/library/cpp/messagebus/test/ut/messagebus_ut.cpp b/library/cpp/messagebus/test/ut/messagebus_ut.cpp
index a358339513..040f9b7702 100644
--- a/library/cpp/messagebus/test/ut/messagebus_ut.cpp
+++ b/library/cpp/messagebus/test/ut/messagebus_ut.cpp
@@ -1,5 +1,5 @@
#include <library/cpp/testing/unittest/registar.h>
-
+
#include <library/cpp/messagebus/test/helper/example.h>
#include <library/cpp/messagebus/test/helper/fixed_port.h>
#include <library/cpp/messagebus/test/helper/hanging_server.h>
@@ -13,7 +13,7 @@
#include <utility>
using namespace NBus;
-using namespace NBus::NTest;
+using namespace NBus::NTest;
namespace {
struct TExampleClientSlowOnMessageSent: public TExampleClient {
diff --git a/library/cpp/messagebus/ybus.h b/library/cpp/messagebus/ybus.h
index bb21cf1c99..de21ad8521 100644
--- a/library/cpp/messagebus/ybus.h
+++ b/library/cpp/messagebus/ybus.h
@@ -193,7 +193,7 @@ namespace NBus {
void Add(TIntrusivePtr< ::NBus::NPrivate::TBusSessionImpl> session);
void Remove(TBusSession* session);
};
-
+
/////////////////////////////////////////////////////////////////
/// Factory methods to construct message queue
TBusMessageQueuePtr CreateMessageQueue(const char* name = "");