diff options
author | somov <somov@yandex-team.ru> | 2022-02-10 16:45:49 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:45:49 +0300 |
commit | 7489e4682331202b9c7d863c0898eb83d7b12c2b (patch) | |
tree | 9142afc54d335ea52910662635b898e79e192e49 /library/cpp/messagebus | |
parent | a5950576e397b1909261050b8c7da16db58f10b1 (diff) | |
download | ydb-7489e4682331202b9c7d863c0898eb83d7b12c2b.tar.gz |
Restoring authorship annotation for <somov@yandex-team.ru>. Commit 2 of 2.
Diffstat (limited to 'library/cpp/messagebus')
23 files changed, 447 insertions, 447 deletions
diff --git a/library/cpp/messagebus/config/netaddr.h b/library/cpp/messagebus/config/netaddr.h index 573458ba72..b79c0cc355 100644 --- a/library/cpp/messagebus/config/netaddr.h +++ b/library/cpp/messagebus/config/netaddr.h @@ -1,6 +1,6 @@ #pragma once -#include <util/digest/numeric.h> +#include <util/digest/numeric.h> #include <util/generic/hash.h> #include <util/generic/ptr.h> #include <util/generic/strbuf.h> @@ -74,13 +74,13 @@ namespace NBus { switch (s->sa_family) { case AF_INET: return CombineHashes<size_t>(ComputeHash(TStringBuf(reinterpret_cast<const char*>(&sa->sin_addr), sizeof(sa->sin_addr))), IntHashImpl(sa->sin_port)); - + case AF_INET6: return CombineHashes<size_t>(ComputeHash(TStringBuf(reinterpret_cast<const char*>(&sa6->sin6_addr), sizeof(sa6->sin6_addr))), IntHashImpl(sa6->sin6_port)); } - + return ComputeHash(TStringBuf(reinterpret_cast<const char*>(s), a.Len())); - } + } }; - + } diff --git a/library/cpp/messagebus/coreconn.cpp b/library/cpp/messagebus/coreconn.cpp index d9436f15d7..d9411bb5db 100644 --- a/library/cpp/messagebus/coreconn.cpp +++ b/library/cpp/messagebus/coreconn.cpp @@ -2,10 +2,10 @@ #include "remote_connection.h" -#include <util/datetime/base.h> +#include <util/datetime/base.h> #include <util/generic/yexception.h> #include <util/network/socket.h> -#include <util/string/util.h> +#include <util/string/util.h> #include <util/system/thread.h> namespace NBus { diff --git a/library/cpp/messagebus/coreconn.h b/library/cpp/messagebus/coreconn.h index f6ec07bef4..fca228d82e 100644 --- a/library/cpp/messagebus/coreconn.h +++ b/library/cpp/messagebus/coreconn.h @@ -5,24 +5,24 @@ /// \brief Definitions for asynchonous connection queue #include "base.h" -#include "event_loop.h" +#include "event_loop.h" #include "netaddr.h" -#include <util/datetime/base.h> +#include <util/datetime/base.h> #include <util/generic/algorithm.h> #include <util/generic/list.h> -#include <util/generic/map.h> -#include <util/generic/set.h> +#include <util/generic/map.h> +#include <util/generic/set.h> #include <util/generic/string.h> #include <util/generic/vector.h> #include <util/network/address.h> #include <util/network/ip.h> -#include <util/network/poller.h> -#include <util/string/util.h> -#include <util/system/condvar.h> +#include <util/network/poller.h> +#include <util/string/util.h> +#include <util/system/condvar.h> #include <util/system/mutex.h> #include <util/system/thread.h> -#include <util/thread/lfqueue.h> +#include <util/thread/lfqueue.h> #include <deque> #include <utility> @@ -31,9 +31,9 @@ #undef NO_ERROR #endif -#define BUS_WORKER_CONDVAR -//#define BUS_WORKER_MIXED - +#define BUS_WORKER_CONDVAR +//#define BUS_WORKER_MIXED + namespace NBus { class TBusConnection; class TBusConnectionFactory; @@ -64,4 +64,4 @@ namespace NBus { POLL_WRITE }; -} +} diff --git a/library/cpp/messagebus/event_loop.cpp b/library/cpp/messagebus/event_loop.cpp index b1209d2b5c..f685135bed 100644 --- a/library/cpp/messagebus/event_loop.cpp +++ b/library/cpp/messagebus/event_loop.cpp @@ -5,50 +5,50 @@ #include <util/generic/hash.h> #include <util/network/pair.h> -#include <util/network/poller.h> +#include <util/network/poller.h> #include <util/system/event.h> #include <util/system/mutex.h> #include <util/system/thread.h> -#include <util/system/yassert.h> +#include <util/system/yassert.h> #include <util/thread/lfqueue.h> - + #include <errno.h> -using namespace NEventLoop; - -namespace { +using namespace NEventLoop; + +namespace { enum ERunningState { EVENT_LOOP_CREATED, EVENT_LOOP_RUNNING, EVENT_LOOP_STOPPED, }; - enum EOperation { - OP_READ = 1, - OP_WRITE = 2, - OP_READ_WRITE = OP_READ | OP_WRITE, - }; -} - -class TChannel::TImpl { -public: + enum EOperation { + OP_READ = 1, + OP_WRITE = 2, + OP_READ_WRITE = OP_READ | OP_WRITE, + }; +} + +class TChannel::TImpl { +public: TImpl(TEventLoop::TImpl* eventLoop, TSocket socket, TEventHandlerPtr, void* cookie); ~TImpl(); - - void EnableRead(); - void DisableRead(); - void EnableWrite(); - void DisableWrite(); - - void Unregister(); - - SOCKET GetSocket() const; + + void EnableRead(); + void DisableRead(); + void EnableWrite(); + void DisableWrite(); + + void Unregister(); + + SOCKET GetSocket() const; TSocket GetSocketPtr() const; - + void Update(int pollerFlags, bool enable); void CallHandler(); - TEventLoop::TImpl* EventLoop; + TEventLoop::TImpl* EventLoop; TSocket Socket; TEventHandlerPtr EventHandler; void* Cookie; @@ -57,130 +57,130 @@ public: int CurrentFlags; bool Close; -}; - -class TEventLoop::TImpl { -public: +}; + +class TEventLoop::TImpl { +public: TImpl(const char* name); - - void Run(); - void Wakeup(); - void Stop(); - + + void Run(); + void Wakeup(); + void Stop(); + TChannelPtr Register(TSocket socket, TEventHandlerPtr eventHandler, void* cookie); - void Unregister(SOCKET socket); - + void Unregister(SOCKET socket); + typedef THashMap<SOCKET, TChannelPtr> TData; - + void AddToPoller(SOCKET socket, void* cookie, int flags); - + TMutex Mutex; - + const char* Name; TAtomic RunningState; TAtomic StopSignal; TSystemEvent StoppedEvent; - TData Data; - + TData Data; + TLockFreeQueue<SOCKET> SocketsToRemove; - TSocketPoller Poller; - TSocketHolder WakeupReadSocket; - TSocketHolder WakeupWriteSocket; -}; - -TChannel::~TChannel() { -} - -void TChannel::EnableRead() { - Impl->EnableRead(); -} - -void TChannel::DisableRead() { - Impl->DisableRead(); -} - -void TChannel::EnableWrite() { - Impl->EnableWrite(); -} - -void TChannel::DisableWrite() { - Impl->DisableWrite(); -} - -void TChannel::Unregister() { - Impl->Unregister(); -} - -SOCKET TChannel::GetSocket() const { - return Impl->GetSocket(); -} - + TSocketPoller Poller; + TSocketHolder WakeupReadSocket; + TSocketHolder WakeupWriteSocket; +}; + +TChannel::~TChannel() { +} + +void TChannel::EnableRead() { + Impl->EnableRead(); +} + +void TChannel::DisableRead() { + Impl->DisableRead(); +} + +void TChannel::EnableWrite() { + Impl->EnableWrite(); +} + +void TChannel::DisableWrite() { + Impl->DisableWrite(); +} + +void TChannel::Unregister() { + Impl->Unregister(); +} + +SOCKET TChannel::GetSocket() const { + return Impl->GetSocket(); +} + TSocket TChannel::GetSocketPtr() const { return Impl->GetSocketPtr(); } TChannel::TChannel(TImpl* impl) : Impl(impl) -{ -} - +{ +} + TEventLoop::TEventLoop(const char* name) : Impl(new TImpl(name)) -{ -} - +{ +} + TEventLoop::~TEventLoop() { -} - -void TEventLoop::Run() { - Impl->Run(); -} - -void TEventLoop::Stop() { - Impl->Stop(); -} - +} + +void TEventLoop::Run() { + Impl->Run(); +} + +void TEventLoop::Stop() { + Impl->Stop(); +} + bool TEventLoop::IsRunning() { return AtomicGet(Impl->RunningState) == EVENT_LOOP_RUNNING; } TChannelPtr TEventLoop::Register(TSocket socket, TEventHandlerPtr eventHandler, void* cookie) { return Impl->Register(socket, eventHandler, cookie); -} - +} + TChannel::TImpl::TImpl(TEventLoop::TImpl* eventLoop, TSocket socket, TEventHandlerPtr eventHandler, void* cookie) - : EventLoop(eventLoop) - , Socket(socket) + : EventLoop(eventLoop) + , Socket(socket) , EventHandler(eventHandler) , Cookie(cookie) , CurrentFlags(0) , Close(false) -{ -} - +{ +} + TChannel::TImpl::~TImpl() { Y_ASSERT(Close); } -void TChannel::TImpl::EnableRead() { +void TChannel::TImpl::EnableRead() { Update(OP_READ, true); -} - -void TChannel::TImpl::DisableRead() { +} + +void TChannel::TImpl::DisableRead() { Update(OP_READ, false); -} - -void TChannel::TImpl::EnableWrite() { +} + +void TChannel::TImpl::EnableWrite() { Update(OP_WRITE, true); -} - -void TChannel::TImpl::DisableWrite() { +} + +void TChannel::TImpl::DisableWrite() { Update(OP_WRITE, false); -} - -void TChannel::TImpl::Unregister() { +} + +void TChannel::TImpl::Unregister() { TGuard<TMutex> guard(Mutex); if (Close) { @@ -196,8 +196,8 @@ void TChannel::TImpl::Unregister() { EventLoop->SocketsToRemove.Enqueue(Socket); EventLoop->Wakeup(); -} - +} + void TChannel::TImpl::Update(int flags, bool enable) { TGuard<TMutex> guard(Mutex); @@ -221,10 +221,10 @@ void TChannel::TImpl::Update(int flags, bool enable) { CurrentFlags = newFlags; } -SOCKET TChannel::TImpl::GetSocket() const { - return Socket; -} - +SOCKET TChannel::TImpl::GetSocket() const { + return Socket; +} + TSocket TChannel::TImpl::GetSocketPtr() const { return Socket; } @@ -256,27 +256,27 @@ TEventLoop::TImpl::TImpl(const char* name) : Name(name) , RunningState(EVENT_LOOP_CREATED) , StopSignal(0) -{ - SOCKET wakeupSockets[2]; - +{ + SOCKET wakeupSockets[2]; + if (SocketPair(wakeupSockets) < 0) { Y_FAIL("failed to create socket pair for wakeup sockets: %s", LastSystemErrorText()); } - - TSocketHolder wakeupReadSocket(wakeupSockets[0]); - TSocketHolder wakeupWriteSocket(wakeupSockets[1]); - - WakeupReadSocket.Swap(wakeupReadSocket); - WakeupWriteSocket.Swap(wakeupWriteSocket); - - SetNonBlock(WakeupWriteSocket, true); - SetNonBlock(WakeupReadSocket, true); - - Poller.WaitRead(WakeupReadSocket, + + TSocketHolder wakeupReadSocket(wakeupSockets[0]); + TSocketHolder wakeupWriteSocket(wakeupSockets[1]); + + WakeupReadSocket.Swap(wakeupReadSocket); + WakeupWriteSocket.Swap(wakeupWriteSocket); + + SetNonBlock(WakeupWriteSocket, true); + SetNonBlock(WakeupReadSocket, true); + + Poller.WaitRead(WakeupReadSocket, reinterpret_cast<void*>(this)); -} - -void TEventLoop::TImpl::Run() { +} + +void TEventLoop::TImpl::Run() { bool res = AtomicCas(&RunningState, EVENT_LOOP_RUNNING, EVENT_LOOP_CREATED); Y_VERIFY(res, "Invalid mbus event loop state"); @@ -285,30 +285,30 @@ void TEventLoop::TImpl::Run() { } while (AtomicGet(StopSignal) == 0) { - void* cookies[1024]; + void* cookies[1024]; const size_t count = Poller.WaitI(cookies, Y_ARRAY_SIZE(cookies)); - - void** end = cookies + count; - for (void** c = cookies; c != end; ++c) { + + void** end = cookies + count; + for (void** c = cookies; c != end; ++c) { TChannel::TImpl* s = reinterpret_cast<TChannel::TImpl*>(*c); - + if (*c == this) { - char buf[0x1000]; + char buf[0x1000]; if (NBus::NPrivate::SocketRecv(WakeupReadSocket, buf) < 0) { Y_FAIL("failed to recv from wakeup socket: %s", LastSystemErrorText()); } - continue; - } - + continue; + } + s->CallHandler(); - } + } SOCKET socket = -1; while (SocketsToRemove.Dequeue(&socket)) { TGuard<TMutex> guard(Mutex); Y_VERIFY(Data.erase(socket) == 1, "must be removed once"); } - } + } { TGuard<TMutex> guard(Mutex); @@ -325,9 +325,9 @@ void TEventLoop::TImpl::Run() { Y_VERIFY(res); StoppedEvent.Signal(); -} - -void TEventLoop::TImpl::Stop() { +} + +void TEventLoop::TImpl::Stop() { AtomicSet(StopSignal, 1); if (AtomicGet(RunningState) == EVENT_LOOP_RUNNING) { @@ -335,36 +335,36 @@ void TEventLoop::TImpl::Stop() { StoppedEvent.WaitI(); } -} - +} + TChannelPtr TEventLoop::TImpl::Register(TSocket socket, TEventHandlerPtr eventHandler, void* cookie) { Y_VERIFY(socket != INVALID_SOCKET, "must be a valid socket"); TChannelPtr channel = new TChannel(new TChannel::TImpl(this, socket, eventHandler, cookie)); - + TGuard<TMutex> guard(Mutex); - + Y_VERIFY(Data.insert(std::make_pair(socket, channel)).second, "must not be already inserted"); - + return channel; -} - -void TEventLoop::TImpl::Wakeup() { +} + +void TEventLoop::TImpl::Wakeup() { if (NBus::NPrivate::SocketSend(WakeupWriteSocket, TArrayRef<const char>("", 1)) < 0) { if (LastSystemError() != EAGAIN) { Y_FAIL("failed to send to wakeup socket: %s", LastSystemErrorText()); } - } -} - -void TEventLoop::TImpl::AddToPoller(SOCKET socket, void* cookie, int flags) { - if (flags == OP_READ) { + } +} + +void TEventLoop::TImpl::AddToPoller(SOCKET socket, void* cookie, int flags) { + if (flags == OP_READ) { Poller.WaitReadOneShot(socket, cookie); - } else if (flags == OP_WRITE) { + } else if (flags == OP_WRITE) { Poller.WaitWriteOneShot(socket, cookie); - } else if (flags == OP_READ_WRITE) { + } else if (flags == OP_READ_WRITE) { Poller.WaitReadWriteOneShot(socket, cookie); - } else { + } else { Y_FAIL("Wrong flags: %d", int(flags)); - } -} + } +} diff --git a/library/cpp/messagebus/event_loop.h b/library/cpp/messagebus/event_loop.h index 677ade2fff..d5b0a53b0c 100644 --- a/library/cpp/messagebus/event_loop.h +++ b/library/cpp/messagebus/event_loop.h @@ -1,72 +1,72 @@ -#pragma once - +#pragma once + #include <util/generic/object_counter.h> -#include <util/generic/ptr.h> -#include <util/network/init.h> +#include <util/generic/ptr.h> +#include <util/network/init.h> #include <util/network/socket.h> - -namespace NEventLoop { - struct IEventHandler + +namespace NEventLoop { + struct IEventHandler : public TAtomicRefCount<IEventHandler> { virtual void HandleEvent(SOCKET socket, void* cookie) = 0; virtual ~IEventHandler() { } - }; - - typedef TIntrusivePtr<IEventHandler> TEventHandlerPtr; - - class TEventLoop; - + }; + + typedef TIntrusivePtr<IEventHandler> TEventHandlerPtr; + + class TEventLoop; + // TODO: make TChannel itself a pointer // to avoid confusion with Drop and Unregister - class TChannel + class TChannel : public TAtomicRefCount<TChannel> { - public: - ~TChannel(); - - void EnableRead(); - void DisableRead(); - void EnableWrite(); - void DisableWrite(); - - void Unregister(); - - SOCKET GetSocket() const; + public: + ~TChannel(); + + void EnableRead(); + void DisableRead(); + void EnableWrite(); + void DisableWrite(); + + void Unregister(); + + SOCKET GetSocket() const; TSocket GetSocketPtr() const; - - private: - class TImpl; - friend class TEventLoop; - + + private: + class TImpl; + friend class TEventLoop; + TObjectCounter<TChannel> ObjectCounter; TChannel(TImpl*); - - private: - THolder<TImpl> Impl; - }; - - typedef TIntrusivePtr<TChannel> TChannelPtr; - - class TEventLoop { - public: + + private: + THolder<TImpl> Impl; + }; + + typedef TIntrusivePtr<TChannel> TChannelPtr; + + class TEventLoop { + public: TEventLoop(const char* name = nullptr); - ~TEventLoop(); - - void Run(); - void Stop(); + ~TEventLoop(); + + void Run(); + void Stop(); bool IsRunning(); - + TChannelPtr Register(TSocket socket, TEventHandlerPtr, void* cookie = nullptr); - - private: - class TImpl; - friend class TChannel; - + + private: + class TImpl; + friend class TChannel; + TObjectCounter<TEventLoop> ObjectCounter; - private: - THolder<TImpl> Impl; - }; - -} + private: + THolder<TImpl> Impl; + }; + +} diff --git a/library/cpp/messagebus/messqueue.cpp b/library/cpp/messagebus/messqueue.cpp index 9176496252..3474d62705 100644 --- a/library/cpp/messagebus/messqueue.cpp +++ b/library/cpp/messagebus/messqueue.cpp @@ -56,7 +56,7 @@ TBusMessageQueue::TBusMessageQueue(const TBusQueueConfig& config, TExecutorPtr e } TBusMessageQueue::~TBusMessageQueue() { - Stop(); + Stop(); } void TBusMessageQueue::Stop() { @@ -127,7 +127,7 @@ TString TBusMessageQueue::GetStatus(ui16 flags) const { TBusClientSessionPtr TBusMessageQueue::CreateSource(TBusProtocol* proto, IBusClientHandler* handler, const TBusClientSessionConfig& config, const TString& name) { TRemoteClientSessionPtr session(new TRemoteClientSession(this, proto, handler, config, name)); Add(session.Get()); - return session.Get(); + return session.Get(); } TBusServerSessionPtr TBusMessageQueue::CreateDestination(TBusProtocol* proto, IBusServerHandler* handler, const TBusClientSessionConfig& config, const TString& name) { @@ -189,10 +189,10 @@ void TBusMessageQueue::DestroyAllSessions() { } } -void TBusMessageQueue::Schedule(IScheduleItemAutoPtr i) { - Scheduler.Schedule(i); +void TBusMessageQueue::Schedule(IScheduleItemAutoPtr i) { + Scheduler.Schedule(i); } - + TString TBusMessageQueue::GetNameInternal() const { return Config.Name; } diff --git a/library/cpp/messagebus/oldmodule/module.cpp b/library/cpp/messagebus/oldmodule/module.cpp index 8a64811979..24bd778799 100644 --- a/library/cpp/messagebus/oldmodule/module.cpp +++ b/library/cpp/messagebus/oldmodule/module.cpp @@ -79,19 +79,19 @@ namespace NBus { TBusModuleImpl* const Module; }; - + struct TModuleServerHandler : public IBusServerHandler { TModuleServerHandler(TBusModuleImpl* module) : Module(module) { } - + void OnMessage(TOnMessageContext& msg) override; - + TBusModuleImpl* const Module; }; - + struct TBusModuleImpl: public TBusModuleInternal { TBusModule* const Module; @@ -677,7 +677,7 @@ namespace NBus { return true; } - + bool TBusModule::Shutdown() { Impl->Shutdown(); @@ -702,7 +702,7 @@ TBusSession* TMyModule::CreateExtSession(TBusMessageQueue& queue) { Impl->Queue = queue; return true; } - + int TBusModule::GetModuleSessionInFlight() const { return Impl->Size(); } @@ -781,11 +781,11 @@ void TBusModuleImpl::DestroyJob(TJobRunner* job) { if (jobCount == 0) { ShutdownCondVar.BroadCast(); } - } + } } job->JobStorageIterator = TList<TJobRunner*>::iterator(); -} +} void TBusModuleImpl::OnMessageReceived(TAutoPtr<TBusMessage> msg0, TOnMessageContext& context) { TBusMessage* msg = !!msg0 ? msg0.Get() : context.GetMessage(); diff --git a/library/cpp/messagebus/oldmodule/module.h b/library/cpp/messagebus/oldmodule/module.h index 1b75c4df46..8d1c4a5d52 100644 --- a/library/cpp/messagebus/oldmodule/module.h +++ b/library/cpp/messagebus/oldmodule/module.h @@ -407,4 +407,4 @@ namespace NBus { TBusStarter* CreateDefaultStarter(TBusMessageQueue& unused, const TBusSessionConfig& config); }; -} +} diff --git a/library/cpp/messagebus/remote_client_session.cpp b/library/cpp/messagebus/remote_client_session.cpp index 7bd6e115c7..3bc421944f 100644 --- a/library/cpp/messagebus/remote_client_session.cpp +++ b/library/cpp/messagebus/remote_client_session.cpp @@ -1,10 +1,10 @@ #include "remote_client_session.h" - + #include "mb_lwtrace.h" #include "remote_client_connection.h" #include <library/cpp/messagebus/scheduler/scheduler.h> - + #include <util/generic/cast.h> #include <util/system/defaults.h> @@ -19,9 +19,9 @@ TRemoteClientSession::TRemoteClientSession(TBusMessageQueue* queue, : TBusSessionImpl(true, queue, proto, handler, config, name) , ClientRemoteInFlight(config.MaxInFlight, "ClientRemoteInFlight") , ClientHandler(handler) -{ -} - +{ +} + TRemoteClientSession::~TRemoteClientSession() { //Cerr << "~TRemoteClientSession" << Endl; } @@ -31,7 +31,7 @@ void TRemoteClientSession::OnMessageReceived(TRemoteConnection* c, TVectorSwaps< temp->swap(newMsg); c->ReplyQueue.EnqueueAll(temp); c->ScheduleWrite(); -} +} EMessageStatus TRemoteClientSession::SendMessageImpl(TBusMessage* msg, const TNetAddr* addr, bool wait, bool oneWay) { if (Y_UNLIKELY(IsDown())) { diff --git a/library/cpp/messagebus/remote_client_session.h b/library/cpp/messagebus/remote_client_session.h index f619d4d86a..7160d0dae9 100644 --- a/library/cpp/messagebus/remote_client_session.h +++ b/library/cpp/messagebus/remote_client_session.h @@ -1,5 +1,5 @@ -#pragma once - +#pragma once + #include "remote_client_session_semaphore.h" #include "session_impl.h" @@ -14,7 +14,7 @@ namespace NBus { namespace NPrivate { using TRemoteClientSessionPtr = TIntrusivePtr<TRemoteClientSession>; - + class TRemoteClientSession: public TBusClientSession, public TBusSessionImpl { friend class TRemoteClientConnection; friend class TInvokeOnReply; diff --git a/library/cpp/messagebus/remote_connection.cpp b/library/cpp/messagebus/remote_connection.cpp index 59b58f7797..22932569db 100644 --- a/library/cpp/messagebus/remote_connection.cpp +++ b/library/cpp/messagebus/remote_connection.cpp @@ -7,9 +7,9 @@ #include "remote_client_session.h" #include "remote_server_session.h" #include "session_impl.h" - + #include <library/cpp/messagebus/actor/what_thread_does.h> - + #include <util/generic/cast.h> #include <util/network/init.h> #include <util/system/atomic.h> @@ -44,7 +44,7 @@ namespace NBus { WriterData.Status.ConnectionId = connectionId; WriterData.Status.PeerAddr = PeerAddr; ReaderData.Status.ConnectionId = connectionId; - + const TInstant now = TInstant::Now(); WriterFillStatus(); @@ -70,7 +70,7 @@ namespace NBus { Y_VERIFY(AtomicGet(Down)); Y_VERIFY(SendQueue.Empty()); } - + bool TRemoteConnection::TReaderData::HasBytesInBuf(size_t bytes) noexcept { size_t left = Buffer.Size() - Offset; @@ -137,7 +137,7 @@ namespace NBus { void TRemoteConnection::Shutdown(EMessageStatus status) { ScheduleShutdown(status); - + ReaderData.ShutdownComplete.WaitI(); WriterData.ShutdownComplete.WaitI(); } @@ -145,15 +145,15 @@ namespace NBus { void TRemoteConnection::TryConnect() { Y_FAIL("TryConnect is client connection only operation"); } - + void TRemoteConnection::ScheduleRead() { GetReaderActor()->Schedule(); } - + void TRemoteConnection::ScheduleWrite() { GetWriterActor()->Schedule(); } - + void TRemoteConnection::WriterRotateCounters() { if (!WriterData.TimeToRotateCounters.FetchTask()) { return; @@ -383,7 +383,7 @@ namespace NBus { if (ReaderData.Buffer.Capacity() > MaxBufferSize && ReaderData.Buffer.Size() <= MaxBufferSize) { ReaderData.Status.Incremental.BufferDrops += 1; - + TBuffer temp; // probably should use another constant temp.Reserve(Config.DefaultBufferSize); @@ -391,7 +391,7 @@ namespace NBus { ReaderData.Buffer.Swap(temp); } - + return true; } @@ -406,7 +406,7 @@ namespace NBus { ReaderData.Buffer.Reserve(ReaderData.Buffer.Size() * 2); } } - + Y_ASSERT(ReaderData.Buffer.Avail() > 0); ssize_t bytes; @@ -465,27 +465,27 @@ namespace NBus { if (!Session->IsSource_) { message->SendTime = now.MilliSeconds(); } - + WriterData.SendQueue.PushBack(message); } - + void TRemoteConnection::ProcessBeforeSendQueue(TInstant now) { BeforeSendQueue.DequeueAll(std::bind(&TRemoteConnection::ProcessBeforeSendQueueMessage, this, std::placeholders::_1, now)); - } - + } + void TRemoteConnection::WriterFillInFlight() { // this is hack for TLoadBalancedProtocol WriterFillStatus(); AtomicSet(WriterData.InFlight, WriterData.Status.GetInFlight()); } - + const TRemoteConnectionWriterStatus& TRemoteConnection::WriterGetStatus() { WriterRotateCounters(); WriterFillStatus(); return WriterData.Status; } - + void TRemoteConnection::WriterFillStatus() { if (!!WriterData.Channel) { WriterData.Status.Fd = WriterData.Channel->GetSocket(); @@ -644,11 +644,11 @@ namespace NBus { if (WriterData.Buffer.Capacity() > MaxBufferSize) { WriterData.Status.Incremental.BufferDrops += 1; WriterData.Buffer.Reset(); - } + } WriterData.State = WRITER_FILLING; } - + void TRemoteConnection::ScheduleShutdownOnServerOrReconnectOnClient(EMessageStatus status, bool writer) { if (Session->IsSource_) { WriterGetReconnectQueue()->EnqueueAndSchedule(writer ? WriterData.SocketVersion : ReaderData.SocketVersion); @@ -662,11 +662,11 @@ namespace NBus { AtomicSet(ReaderData.Down, 1); ScheduleRead(); - + AtomicSet(WriterData.Down, 1); ScheduleWrite(); } - + void TRemoteConnection::CallSerialize(TBusMessage* msg, TBuffer& buffer) const { size_t posForAssertion = buffer.Size(); Proto->Serialize(msg, buffer); @@ -688,12 +688,12 @@ namespace NBus { } } - + void TRemoteConnection::SerializeMessage(TBusMessage* msg, TBuffer* data, TMessageCounter* counter) const { size_t pos = data->Size(); - + size_t dataSize; - + bool compressionRequested = msg->IsCompressed(); if (compressionRequested) { @@ -821,20 +821,20 @@ namespace NBus { TBusMessagePtrAndHeader h(r); r->RecvTime = now; - + QuotaConsume(1, header.Size); ReaderData.ReadMessages.push_back(h); if (ReaderData.ReadMessages.size() >= 100) { ReaderFlushMessages(); } - + return true; } void TRemoteConnection::WriterFillBuffer() { Y_ASSERT(WriterData.State == WRITER_FILLING); - + Y_ASSERT(WriterData.Buffer.LeftSize() == 0); if (Y_UNLIKELY(!WrongVersionRequests.IsEmpty())) { @@ -868,7 +868,7 @@ namespace NBus { WriterData.CorkUntil = TInstant::Now() + Config.Cork; } } - + size_t sizeBeforeSerialize = WriterData.Buffer.Size(); TMessageCounter messageCounter = WriterData.Status.Incremental.MessageCounter; @@ -952,7 +952,7 @@ namespace NBus { void TRemoteConnection::WriterErrorMessages(const TArrayRef<TBusMessage*> ms, EMessageStatus status) { ResetOneWayFlag(ms); - + WriterData.Status.Incremental.StatusCounter[status] += ms.size(); for (auto m : ms) { Session->InvokeOnError(m, status); diff --git a/library/cpp/messagebus/remote_connection.h b/library/cpp/messagebus/remote_connection.h index ee0665774d..4538947368 100644 --- a/library/cpp/messagebus/remote_connection.h +++ b/library/cpp/messagebus/remote_connection.h @@ -1,8 +1,8 @@ -#pragma once - +#pragma once + #include "async_result.h" #include "defs.h" -#include "event_loop.h" +#include "event_loop.h" #include "left_right_buffer.h" #include "lfqueue_batch.h" #include "message_ptr_and_header.h" @@ -15,7 +15,7 @@ #include "ybus.h" #include "misc/granup.h" #include "misc/tokenquota.h" - + #include <library/cpp/messagebus/actor/actor.h> #include <library/cpp/messagebus/actor/executor.h> #include <library/cpp/messagebus/actor/queue_for_actor.h> @@ -96,7 +96,7 @@ namespace NBus { void Shutdown(EMessageStatus status); inline const TNetAddr& GetAddr() const noexcept; - + private: friend class TScheduleConnect; friend class TWorkIO; @@ -111,14 +111,14 @@ namespace NBus { bool ReaderProcessBuffer(); bool ReaderFillBuffer(); void ReaderFlushMessages(); - + void ReadQuotaWakeup(); ui32 WriteWakeFlags() const; - + virtual bool NeedInterruptRead() { return false; } - + public: virtual void TryConnect(); void ProcessItem(TReaderTag, ::NActor::TDefaultTag, TWriterToReaderSocketMessage); @@ -174,7 +174,7 @@ namespace NBus { void WriterErrorMessage(TNonDestroyingAutoPtr<TBusMessage> m, EMessageStatus status); // takes ownership of ms void WriterErrorMessages(const TArrayRef<TBusMessage*> ms, EMessageStatus status); - + void FireClientConnectionEvent(TClientConnectionEvent::EType); size_t GetInFlight(); @@ -207,14 +207,14 @@ namespace NBus { NEventLoop::TChannelPtr Channel; ui32 SocketVersion; - + TRemoteConnectionWriterStatus Status; TInstant StatusLastSendTime; - + TLocalTasks TimeToRotateCounters; TAtomic InFlight; - + TTimedMessages SendQueue; ui32 AwakeFlags; EWriterState State; @@ -290,5 +290,5 @@ namespace NBus { typedef TIntrusivePtr<TRemoteConnection> TRemoteConnectionPtr; - } + } } diff --git a/library/cpp/messagebus/remote_server_session.cpp b/library/cpp/messagebus/remote_server_session.cpp index 12765ab9b4..6abbf88a60 100644 --- a/library/cpp/messagebus/remote_server_session.cpp +++ b/library/cpp/messagebus/remote_server_session.cpp @@ -21,14 +21,14 @@ TRemoteServerSession::TRemoteServerSession(TBusMessageQueue* queue, : TBusSessionImpl(false, queue, proto, handler, config, name) , ServerOwnedMessages(config.MaxInFlight, config.MaxInFlightBySize, "ServerOwnedMessages") , ServerHandler(handler) -{ +{ if (config.PerConnectionMaxInFlightBySize > 0) { if (config.PerConnectionMaxInFlightBySize < config.MaxMessageSize) ythrow yexception() << "too low PerConnectionMaxInFlightBySize value"; } -} - +} + namespace NBus { namespace NPrivate { class TInvokeOnMessage: public IWorkItem { @@ -83,7 +83,7 @@ void TRemoteServerSession::OnMessageReceived(TRemoteConnection* c, TVectorSwaps< JobCount.Add(workQueueTemp.GetVector()->size()); Queue->EnqueueWork(*workQueueTemp.GetVector()); } -} +} void TRemoteServerSession::InvokeOnMessage(TBusMessagePtrAndHeader& request, TIntrusivePtr<TRemoteServerConnection>& conn) { if (Y_UNLIKELY(AtomicGet(Down))) { diff --git a/library/cpp/messagebus/remote_server_session.h b/library/cpp/messagebus/remote_server_session.h index c70dde00e2..f5c266a7f7 100644 --- a/library/cpp/messagebus/remote_server_session.h +++ b/library/cpp/messagebus/remote_server_session.h @@ -1,8 +1,8 @@ -#pragma once - +#pragma once + #include "remote_server_session_semaphore.h" #include "session_impl.h" - + #ifdef _MSC_VER #pragma warning(push) #pragma warning(disable : 4250) // 'NBus::NPrivate::TRemoteClientSession' : inherits 'NBus::NPrivate::TBusSessionImpl::NBus::NPrivate::TBusSessionImpl::GetConfig' via dominance @@ -12,7 +12,7 @@ namespace NBus { namespace NPrivate { class TRemoteServerSession: public TBusServerSession, public TBusSessionImpl { friend class TRemoteServerConnection; - + private: TObjectCounter<TRemoteServerSession> ObjectCounter; diff --git a/library/cpp/messagebus/scheduler/scheduler.cpp b/library/cpp/messagebus/scheduler/scheduler.cpp index 5c0686d32a..5a5fe52894 100644 --- a/library/cpp/messagebus/scheduler/scheduler.cpp +++ b/library/cpp/messagebus/scheduler/scheduler.cpp @@ -1,37 +1,37 @@ -#include "scheduler.h" - -#include <util/datetime/base.h> -#include <util/generic/algorithm.h> -#include <util/generic/yexception.h> - +#include "scheduler.h" + +#include <util/datetime/base.h> +#include <util/generic/algorithm.h> +#include <util/generic/yexception.h> + //#include "dummy_debugger.h" using namespace NBus; using namespace NBus::NPrivate; - -class TScheduleDeadlineCompare { -public: + +class TScheduleDeadlineCompare { +public: bool operator()(const IScheduleItemAutoPtr& i1, const IScheduleItemAutoPtr& i2) const noexcept { return i1->GetScheduleTime() > i2->GetScheduleTime(); - } -}; - -TScheduler::TScheduler() + } +}; + +TScheduler::TScheduler() : StopThread(false) , Thread([&] { this->SchedulerThread(); }) -{ -} - -TScheduler::~TScheduler() { +{ +} + +TScheduler::~TScheduler() { Y_VERIFY(StopThread, "state check"); -} - +} + size_t TScheduler::Size() const { TGuard<TLock> guard(Lock); return Items.size() + (!!NextItem ? 1 : 0); } -void TScheduler::Stop() { +void TScheduler::Stop() { { TGuard<TLock> guard(Lock); Y_VERIFY(!StopThread, "Scheduler already stopped"); @@ -46,28 +46,28 @@ void TScheduler::Stop() { for (auto& item : Items) { item.Destroy(); - } -} - -void TScheduler::Schedule(TAutoPtr<IScheduleItem> i) { + } +} + +void TScheduler::Schedule(TAutoPtr<IScheduleItem> i) { TGuard<TLock> lock(Lock); if (StopThread) - return; + return; if (!!NextItem) { if (i->GetScheduleTime() < NextItem->GetScheduleTime()) { DoSwap(i, NextItem); } - } + } - Items.push_back(i); + Items.push_back(i); PushHeap(Items.begin(), Items.end(), TScheduleDeadlineCompare()); FillNextItem(); CondVar.Signal(); -} - +} + void TScheduler::FillNextItem() { if (!NextItem && !Items.empty()) { PopHeap(Items.begin(), Items.end(), TScheduleDeadlineCompare()); @@ -76,22 +76,22 @@ void TScheduler::FillNextItem() { } } -void TScheduler::SchedulerThread() { +void TScheduler::SchedulerThread() { for (;;) { IScheduleItemAutoPtr current; - { + { TGuard<TLock> guard(Lock); if (StopThread) { break; - } + } if (!!NextItem) { CondVar.WaitD(Lock, NextItem->GetScheduleTime()); } else { CondVar.WaitI(Lock); - } + } if (StopThread) { break; @@ -106,7 +106,7 @@ void TScheduler::SchedulerThread() { } current = NextItem.Release(); - } + } current->Do(); current.Destroy(); @@ -115,5 +115,5 @@ void TScheduler::SchedulerThread() { TGuard<TLock> guard(Lock); FillNextItem(); } - } -} + } +} diff --git a/library/cpp/messagebus/scheduler/scheduler.h b/library/cpp/messagebus/scheduler/scheduler.h index 996bf30f8c..afcc0de55d 100644 --- a/library/cpp/messagebus/scheduler/scheduler.h +++ b/library/cpp/messagebus/scheduler/scheduler.h @@ -1,16 +1,16 @@ -#pragma once - +#pragma once + #include <library/cpp/threading/future/legacy_future.h> #include <util/datetime/base.h> #include <util/generic/object_counter.h> -#include <util/generic/ptr.h> -#include <util/generic/vector.h> -#include <util/system/atomic.h> +#include <util/generic/ptr.h> +#include <util/generic/vector.h> +#include <util/system/atomic.h> #include <util/system/condvar.h> #include <util/system/mutex.h> -#include <util/system/thread.h> - +#include <util/system/thread.h> + namespace NBus { namespace NPrivate { class IScheduleItem { @@ -25,30 +25,30 @@ namespace NBus { private: TInstant ScheduleTime; }; - + using IScheduleItemAutoPtr = TAutoPtr<IScheduleItem>; - + class TScheduler { public: TScheduler(); ~TScheduler(); void Stop(); void Schedule(TAutoPtr<IScheduleItem> i); - + size_t Size() const; - + private: void SchedulerThread(); - + void FillNextItem(); - + private: TVector<IScheduleItemAutoPtr> Items; IScheduleItemAutoPtr NextItem; typedef TMutex TLock; TLock Lock; TCondVar CondVar; - + TObjectCounter<TScheduler> ObjectCounter; bool StopThread; @@ -63,6 +63,6 @@ namespace NBus { inline TInstant IScheduleItem::GetScheduleTime() const noexcept { return ScheduleTime; } - + } -} +} diff --git a/library/cpp/messagebus/session_impl.cpp b/library/cpp/messagebus/session_impl.cpp index 76790221ec..ddf9f360c4 100644 --- a/library/cpp/messagebus/session_impl.cpp +++ b/library/cpp/messagebus/session_impl.cpp @@ -14,7 +14,7 @@ using namespace NActor; using namespace NBus; using namespace NBus::NPrivate; using namespace NEventLoop; - + namespace { class TScheduleSession: public IScheduleItem { public: @@ -95,7 +95,7 @@ TBusSessionImpl::TBusSessionImpl(bool isSource, TBusMessageQueue* queue, TBusPro : TActor<TBusSessionImpl, TStatusTag>(queue->WorkQueue.Get()) , TActor<TBusSessionImpl, TConnectionTag>(queue->WorkQueue.Get()) , Impl(new TImpl) - , IsSource_(isSource) + , IsSource_(isSource) , Queue(queue) , Proto(proto) , ProtoName(Proto->GetService()) @@ -106,16 +106,16 @@ TBusSessionImpl::TBusSessionImpl(bool isSource, TBusMessageQueue* queue, TBusPro , ReadEventLoop("rd-el") , LastAcceptorId(0) , LastConnectionId(0) - , Down(0) -{ + , Down(0) +{ Impl->DeadAcceptorStatusSummary.Summary = true; ReadEventLoopThread.Reset(new NThreading::TLegacyFuture<void, false>(std::bind(&TEventLoop::Run, std::ref(ReadEventLoop)))); WriteEventLoopThread.Reset(new NThreading::TLegacyFuture<void, false>(std::bind(&TEventLoop::Run, std::ref(WriteEventLoop)))); Queue->Schedule(IScheduleItemAutoPtr(new TScheduleSession(this, TInstant::Now() + Config.Secret.TimeoutPeriod))); -} - +} + TBusSessionImpl::~TBusSessionImpl() { Y_VERIFY(Down); Y_VERIFY(ShutdownCompleteEvent.WaitT(TDuration::Zero())); @@ -160,11 +160,11 @@ void TBusSessionImpl::Shutdown() { TGuard<TMutex> guard(ConnectionsLock); Acceptors.clear(); } - + for (auto& acceptor : acceptors) { acceptor->Shutdown(); - } - + } + // shutdown connections TVector<TRemoteConnectionPtr> cs; GetConnections(&cs); @@ -189,12 +189,12 @@ void TBusSessionImpl::Shutdown() { HandlerUseCountHolder.Reset(); ShutdownCompleteEvent.Signal(); -} - +} + bool TBusSessionImpl::IsDown() { - return static_cast<bool>(AtomicGet(Down)); -} - + return static_cast<bool>(AtomicGet(Down)); +} + size_t TBusSessionImpl::GetInFlightImpl(const TNetAddr& addr) const { TRemoteConnectionPtr conn = const_cast<TBusSessionImpl*>(this)->GetConnection(addr, false); if (!!conn) { @@ -202,8 +202,8 @@ size_t TBusSessionImpl::GetInFlightImpl(const TNetAddr& addr) const { } else { return 0; } -} - +} + void TBusSessionImpl::GetInFlightBulk(TArrayRef<const TNetAddr> addrs, TArrayRef<size_t> results) const { Y_VERIFY(addrs.size() == results.size(), "input.size != output.size"); for (size_t i = 0; i < addrs.size(); ++i) { @@ -427,7 +427,7 @@ void TBusSessionImpl::StatusUpdateCachedDump() { } r.Config = Config; - + TGuard<TMutex> guard(StatusData.StatusDumpCachedMutex); StatusData.StatusDumpCached = r; } @@ -490,7 +490,7 @@ void TBusSessionImpl::Listen(int port, TBusMessageQueue* q) { void TBusSessionImpl::Listen(const TVector<TBindResult>& bindTo, TBusMessageQueue* q) { Y_ASSERT(q == Queue); int actualPort = -1; - + for (const TBindResult& br : bindTo) { if (actualPort == -1) { actualPort = br.Addr.GetPort(); @@ -502,14 +502,14 @@ void TBusSessionImpl::Listen(const TVector<TBindResult>& bindTo, TBusMessageQueu } TAcceptorPtr acceptor(new TAcceptor(this, ++LastAcceptorId, br.Socket->Release(), br.Addr)); - + TConnectionsGuard guard(ConnectionsLock); InsertAcceptorLockAcquired(acceptor.Get()); - } + } Config.ListenPort = actualPort; -} - +} + void TBusSessionImpl::SendSnapshotToStatusActor() { //Y_ASSERT(ConnectionsLock.IsLocked()); @@ -604,24 +604,24 @@ void TBusSessionImpl::InvokeOnError(TNonDestroyingAutoPtr<TBusMessage> message, TRemoteConnectionPtr TBusSessionImpl::GetConnection(const TBusSocketAddr& addr, bool create) { TConnectionsGuard guard(ConnectionsLock); - - TAddrRemoteConnections::const_iterator it = Connections.find(addr); - if (it != Connections.end()) { - return it->second; - } - - if (!create) { - return TRemoteConnectionPtr(); - } - + + TAddrRemoteConnections::const_iterator it = Connections.find(addr); + if (it != Connections.end()) { + return it->second; + } + + if (!create) { + return TRemoteConnectionPtr(); + } + Y_VERIFY(IsSource_, "must be source"); TRemoteConnectionPtr c(new TRemoteClientConnection(VerifyDynamicCast<TRemoteClientSession*>(this), ++LastConnectionId, addr.ToNetAddr())); InsertConnectionLockAcquired(c.Get()); - - return c; -} - + + return c; +} + void TBusSessionImpl::Cron() { TVector<TRemoteConnectionPtr> connections; GetConnections(&connections); diff --git a/library/cpp/messagebus/session_impl.h b/library/cpp/messagebus/session_impl.h index d980ce6ce3..90ef246ff8 100644 --- a/library/cpp/messagebus/session_impl.h +++ b/library/cpp/messagebus/session_impl.h @@ -1,5 +1,5 @@ -#pragma once - +#pragma once + #include "acceptor_status.h" #include "async_result.h" #include "event_loop.h" @@ -9,7 +9,7 @@ #include "session_job_count.h" #include "shutdown_state.h" #include "ybus.h" - + #include <library/cpp/messagebus/actor/actor.h> #include <library/cpp/messagebus/actor/queue_in_actor.h> #include <library/cpp/messagebus/monitoring/mon_proto.pb.h> @@ -25,7 +25,7 @@ namespace NBus { typedef TIntrusivePtr<TRemoteServerConnection> TRemoteServerConnectionPtr; typedef TIntrusivePtr<TRemoteServerSession> TRemoteServerSessionPtr; - + typedef TIntrusivePtr<TAcceptor> TAcceptorPtr; typedef TVector<TAcceptorPtr> TAcceptorsPtrs; @@ -34,7 +34,7 @@ namespace NBus { TVector<TAcceptorPtr> Acceptors; ui64 LastConnectionId; ui64 LastAcceptorId; - + TConnectionsAcceptorsSnapshot(); }; @@ -96,13 +96,13 @@ namespace NBus { const TBusSessionConfig& config, const TString& name); ~TBusSessionImpl() override; - + void Shutdown() override; bool IsDown(); size_t GetInFlightImpl(const TNetAddr& addr) const; size_t GetConnectSyscallsNumForTestImpl(const TNetAddr& addr) const; - + void GetInFlightBulk(TArrayRef<const TNetAddr> addrs, TArrayRef<size_t> results) const override; void GetConnectSyscallsNumBulkForTest(TArrayRef<const TNetAddr> addrs, TArrayRef<size_t> results) const override; @@ -123,29 +123,29 @@ namespace NBus { void StatusUpdateCachedDumpIfNecessary(TInstant now); void Act(TStatusTag); void Act(TConnectionTag); - + TBusProtocol* GetProto() const noexcept override; const TBusSessionConfig* GetConfig() const noexcept override; TBusMessageQueue* GetQueue() const noexcept override; TString GetNameInternal() override; virtual void OnMessageReceived(TRemoteConnection* c, TVectorSwaps<TBusMessagePtrAndHeader>& newMsg) = 0; - + void Listen(int port, TBusMessageQueue* q); void Listen(const TVector<TBindResult>& bindTo, TBusMessageQueue* q); TBusConnection* Accept(SOCKET listen); - + inline ::NActor::TActor<TBusSessionImpl, TStatusTag>* GetStatusActor() { return this; } inline ::NActor::TActor<TBusSessionImpl, TConnectionTag>* GetConnectionsActor() { return this; } - + typedef THashMap<TBusSocketAddr, TRemoteConnectionPtr> TAddrRemoteConnections; void SendSnapshotToStatusActor(); - + void InsertConnectionLockAcquired(TRemoteConnection* connection); void InsertAcceptorLockAcquired(TAcceptor* acceptor); @@ -159,7 +159,7 @@ namespace NBus { TAcceptorPtr GetAcceptorById(ui64 id); void InvokeOnError(TNonDestroyingAutoPtr<TBusMessage>, EMessageStatus); - + void Cron(); TBusSessionJobCount JobCount; @@ -193,7 +193,7 @@ namespace NBus { struct TStatusData { TAtomicSharedPtr<TConnectionsAcceptorsSnapshot> ConnectionsAcceptorsSnapshot; ::NActor::TQueueForActor<TAtomicSharedPtr<TConnectionsAcceptorsSnapshot>> ConnectionsAcceptorsSnapshotsQueue; - + TAtomicShutdownState ShutdownState; TBusSessionStatus Status; @@ -246,14 +246,14 @@ namespace NBus { inline TBusProtocol* TBusSessionImpl::GetProto() const noexcept { return Proto; } - + inline const TBusSessionConfig* TBusSessionImpl::GetConfig() const noexcept { return &Config; } - + inline TBusMessageQueue* TBusSessionImpl::GetQueue() const noexcept { return Queue; } - - } + + } } diff --git a/library/cpp/messagebus/storage.cpp b/library/cpp/messagebus/storage.cpp index 6743f0abe4..efefc87340 100644 --- a/library/cpp/messagebus/storage.cpp +++ b/library/cpp/messagebus/storage.cpp @@ -6,7 +6,7 @@ namespace NBus { namespace NPrivate { TTimedMessages::TTimedMessages() { } - + TTimedMessages::~TTimedMessages() { Y_VERIFY(Items.empty()); } @@ -33,14 +33,14 @@ namespace NBus { size_t TTimedMessages::Size() const { return Items.size(); } - + void TTimedMessages::Timeout(TInstant before, TMessagesPtrs* r) { // shortcut if (before == TInstant::Max()) { Clear(r); return; } - + while (!Items.empty()) { TItem& i = *Items.front(); if (TInstant::MilliSeconds(i.Message->GetHeader()->SendTime) > before) { @@ -50,14 +50,14 @@ namespace NBus { Items.pop_front(); } } - + void TTimedMessages::Clear(TMessagesPtrs* r) { while (!Items.empty()) { r->push_back(Items.front()->Message.Release()); Items.pop_front(); } } - + TSyncAckMessages::TSyncAckMessages() { KeyToMessage.set_empty_key(0); KeyToMessage.set_deleted_key(1); @@ -66,14 +66,14 @@ namespace NBus { TSyncAckMessages::~TSyncAckMessages() { Y_VERIFY(KeyToMessage.empty()); Y_VERIFY(TimedItems.empty()); - } - + } + void TSyncAckMessages::Push(TBusMessagePtrAndHeader& m) { // Perform garbage collection if `TimedMessages` contain too many junk data if (TimedItems.size() > 1000 && TimedItems.size() > KeyToMessage.size() * 4) { Gc(); } - + TValue value = {m.MessagePtr.Release()}; std::pair<TKeyToMessage::iterator, bool> p = KeyToMessage.insert(TKeyToMessage::value_type(m.Header.Id, value)); @@ -95,7 +95,7 @@ namespace NBus { return v.Message; } - + void TSyncAckMessages::Timeout(TInstant before, TMessagesPtrs* r) { // shortcut if (before == TInstant::Max()) { @@ -110,7 +110,7 @@ namespace NBus { if (TInstant::MilliSeconds(i.SendTime) > before) { break; } - + TKeyToMessage::iterator itMessage = KeyToMessage.find(i.Key); if (itMessage != KeyToMessage.end()) { @@ -133,7 +133,7 @@ namespace NBus { void TSyncAckMessages::Gc() { TDeque<TTimedItem> tmp; - + for (auto& timedItem : TimedItems) { if (KeyToMessage.find(timedItem.Key) == KeyToMessage.end()) { continue; @@ -143,7 +143,7 @@ namespace NBus { TimedItems.swap(tmp); } - + void TSyncAckMessages::RemoveAll(const TMessagesPtrs& messages) { for (auto message : messages) { TKeyToMessage::iterator it = KeyToMessage.find(message->GetHeader()->Id); @@ -158,4 +158,4 @@ namespace NBus { } } -} +} diff --git a/library/cpp/messagebus/storage.h b/library/cpp/messagebus/storage.h index 3f8de480a1..7d168844ed 100644 --- a/library/cpp/messagebus/storage.h +++ b/library/cpp/messagebus/storage.h @@ -1,5 +1,5 @@ -#pragma once - +#pragma once + #include "message_ptr_and_header.h" #include "moved.h" #include "ybus.h" @@ -18,7 +18,7 @@ namespace NBus { public: TTimedMessages(); ~TTimedMessages(); - + struct TItem { THolder<TBusMessage> Message; @@ -36,31 +36,31 @@ namespace NBus { void Timeout(TInstant before, TMessagesPtrs* r); void Clear(TMessagesPtrs* r); - + private: TItems Items; }; - + class TSyncAckMessages : TNonCopyable { public: TSyncAckMessages(); ~TSyncAckMessages(); - + void Push(TBusMessagePtrAndHeader& m); TBusMessage* Pop(TBusKey id); void Timeout(TInstant before, TMessagesPtrs* r); void Clear(TMessagesPtrs* r); - + size_t Size() const { return KeyToMessage.size(); } - + void RemoveAll(const TMessagesPtrs&); - + void Gc(); - + void DumpState(); private: diff --git a/library/cpp/messagebus/test/perftest/perftest.cpp b/library/cpp/messagebus/test/perftest/perftest.cpp index e4ea37f0f3..8489319278 100644 --- a/library/cpp/messagebus/test/perftest/perftest.cpp +++ b/library/cpp/messagebus/test/perftest/perftest.cpp @@ -605,10 +605,10 @@ int main(int argc, char* argv[]) { opts.AddLongOption('s', "server-port", "server port").RequiredArgument("port").StoreResult(&TheConfig->ServerPort); opts.AddCharOption('m', "average message size").RequiredArgument("size").StoreResult(&TheConfig->MessageSize); opts.AddLongOption('c', "server-host", "server hosts").RequiredArgument("host[,host]...").StoreResult(&TheConfig->Nodes); - opts.AddCharOption('f', "failure rate (rational number between 0 and 1)").RequiredArgument("rate").StoreResult(&TheConfig->Failure); - opts.AddCharOption('w', "delay before reply").RequiredArgument("microseconds").StoreResult(&TheConfig->Delay); - opts.AddCharOption('r', "run duration").RequiredArgument("seconds").StoreResult(&TheConfig->Run); - opts.AddLongOption("client-count", "amount of clients").RequiredArgument("count").StoreResult(&TheConfig->ClientCount).DefaultValue("1"); + opts.AddCharOption('f', "failure rate (rational number between 0 and 1)").RequiredArgument("rate").StoreResult(&TheConfig->Failure); + opts.AddCharOption('w', "delay before reply").RequiredArgument("microseconds").StoreResult(&TheConfig->Delay); + opts.AddCharOption('r', "run duration").RequiredArgument("seconds").StoreResult(&TheConfig->Run); + opts.AddLongOption("client-count", "amount of clients").RequiredArgument("count").StoreResult(&TheConfig->ClientCount).DefaultValue("1"); opts.AddLongOption("server-use-modules").StoreResult(&TheConfig->ServerUseModules, true); opts.AddLongOption("on-message-in-pool", "execute OnMessage callback in worker pool") .RequiredArgument("BOOL") diff --git a/library/cpp/messagebus/test/ut/messagebus_ut.cpp b/library/cpp/messagebus/test/ut/messagebus_ut.cpp index a358339513..040f9b7702 100644 --- a/library/cpp/messagebus/test/ut/messagebus_ut.cpp +++ b/library/cpp/messagebus/test/ut/messagebus_ut.cpp @@ -1,5 +1,5 @@ #include <library/cpp/testing/unittest/registar.h> - + #include <library/cpp/messagebus/test/helper/example.h> #include <library/cpp/messagebus/test/helper/fixed_port.h> #include <library/cpp/messagebus/test/helper/hanging_server.h> @@ -13,7 +13,7 @@ #include <utility> using namespace NBus; -using namespace NBus::NTest; +using namespace NBus::NTest; namespace { struct TExampleClientSlowOnMessageSent: public TExampleClient { diff --git a/library/cpp/messagebus/ybus.h b/library/cpp/messagebus/ybus.h index bb21cf1c99..de21ad8521 100644 --- a/library/cpp/messagebus/ybus.h +++ b/library/cpp/messagebus/ybus.h @@ -193,7 +193,7 @@ namespace NBus { void Add(TIntrusivePtr< ::NBus::NPrivate::TBusSessionImpl> session); void Remove(TBusSession* session); }; - + ///////////////////////////////////////////////////////////////// /// Factory methods to construct message queue TBusMessageQueuePtr CreateMessageQueue(const char* name = ""); |