diff options
author | somov <somov@yandex-team.ru> | 2022-02-10 16:45:47 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:45:47 +0300 |
commit | a5950576e397b1909261050b8c7da16db58f10b1 (patch) | |
tree | 7ba7677f6a4c3e19e2cefab34d16df2c8963b4d4 /library/cpp/messagebus/event_loop.cpp | |
parent | 81eddc8c0b55990194e112b02d127b87d54164a9 (diff) | |
download | ydb-a5950576e397b1909261050b8c7da16db58f10b1.tar.gz |
Restoring authorship annotation for <somov@yandex-team.ru>. Commit 1 of 2.
Diffstat (limited to 'library/cpp/messagebus/event_loop.cpp')
-rw-r--r-- | library/cpp/messagebus/event_loop.cpp | 330 |
1 files changed, 165 insertions, 165 deletions
diff --git a/library/cpp/messagebus/event_loop.cpp b/library/cpp/messagebus/event_loop.cpp index f685135bed..b1209d2b5c 100644 --- a/library/cpp/messagebus/event_loop.cpp +++ b/library/cpp/messagebus/event_loop.cpp @@ -5,50 +5,50 @@ #include <util/generic/hash.h> #include <util/network/pair.h> -#include <util/network/poller.h> +#include <util/network/poller.h> #include <util/system/event.h> #include <util/system/mutex.h> #include <util/system/thread.h> -#include <util/system/yassert.h> +#include <util/system/yassert.h> #include <util/thread/lfqueue.h> - + #include <errno.h> -using namespace NEventLoop; - -namespace { +using namespace NEventLoop; + +namespace { enum ERunningState { EVENT_LOOP_CREATED, EVENT_LOOP_RUNNING, EVENT_LOOP_STOPPED, }; - enum EOperation { - OP_READ = 1, - OP_WRITE = 2, - OP_READ_WRITE = OP_READ | OP_WRITE, - }; -} - -class TChannel::TImpl { -public: + enum EOperation { + OP_READ = 1, + OP_WRITE = 2, + OP_READ_WRITE = OP_READ | OP_WRITE, + }; +} + +class TChannel::TImpl { +public: TImpl(TEventLoop::TImpl* eventLoop, TSocket socket, TEventHandlerPtr, void* cookie); ~TImpl(); - - void EnableRead(); - void DisableRead(); - void EnableWrite(); - void DisableWrite(); - - void Unregister(); - - SOCKET GetSocket() const; + + void EnableRead(); + void DisableRead(); + void EnableWrite(); + void DisableWrite(); + + void Unregister(); + + SOCKET GetSocket() const; TSocket GetSocketPtr() const; - + void Update(int pollerFlags, bool enable); void CallHandler(); - TEventLoop::TImpl* EventLoop; + TEventLoop::TImpl* EventLoop; TSocket Socket; TEventHandlerPtr EventHandler; void* Cookie; @@ -57,130 +57,130 @@ public: int CurrentFlags; bool Close; -}; - -class TEventLoop::TImpl { -public: +}; + +class TEventLoop::TImpl { +public: TImpl(const char* name); - - void Run(); - void Wakeup(); - void Stop(); - + + void Run(); + void Wakeup(); + void Stop(); + TChannelPtr Register(TSocket socket, TEventHandlerPtr eventHandler, void* cookie); - void Unregister(SOCKET socket); - + void Unregister(SOCKET socket); + typedef THashMap<SOCKET, TChannelPtr> TData; - + void AddToPoller(SOCKET socket, void* cookie, int flags); - + TMutex Mutex; - + const char* Name; TAtomic RunningState; TAtomic StopSignal; TSystemEvent StoppedEvent; - TData Data; - + TData Data; + TLockFreeQueue<SOCKET> SocketsToRemove; - TSocketPoller Poller; - TSocketHolder WakeupReadSocket; - TSocketHolder WakeupWriteSocket; -}; - -TChannel::~TChannel() { -} - -void TChannel::EnableRead() { - Impl->EnableRead(); -} - -void TChannel::DisableRead() { - Impl->DisableRead(); -} - -void TChannel::EnableWrite() { - Impl->EnableWrite(); -} - -void TChannel::DisableWrite() { - Impl->DisableWrite(); -} - -void TChannel::Unregister() { - Impl->Unregister(); -} - -SOCKET TChannel::GetSocket() const { - return Impl->GetSocket(); -} - + TSocketPoller Poller; + TSocketHolder WakeupReadSocket; + TSocketHolder WakeupWriteSocket; +}; + +TChannel::~TChannel() { +} + +void TChannel::EnableRead() { + Impl->EnableRead(); +} + +void TChannel::DisableRead() { + Impl->DisableRead(); +} + +void TChannel::EnableWrite() { + Impl->EnableWrite(); +} + +void TChannel::DisableWrite() { + Impl->DisableWrite(); +} + +void TChannel::Unregister() { + Impl->Unregister(); +} + +SOCKET TChannel::GetSocket() const { + return Impl->GetSocket(); +} + TSocket TChannel::GetSocketPtr() const { return Impl->GetSocketPtr(); } TChannel::TChannel(TImpl* impl) : Impl(impl) -{ -} - +{ +} + TEventLoop::TEventLoop(const char* name) : Impl(new TImpl(name)) -{ -} - +{ +} + TEventLoop::~TEventLoop() { -} - -void TEventLoop::Run() { - Impl->Run(); -} - -void TEventLoop::Stop() { - Impl->Stop(); -} - +} + +void TEventLoop::Run() { + Impl->Run(); +} + +void TEventLoop::Stop() { + Impl->Stop(); +} + bool TEventLoop::IsRunning() { return AtomicGet(Impl->RunningState) == EVENT_LOOP_RUNNING; } TChannelPtr TEventLoop::Register(TSocket socket, TEventHandlerPtr eventHandler, void* cookie) { return Impl->Register(socket, eventHandler, cookie); -} - +} + TChannel::TImpl::TImpl(TEventLoop::TImpl* eventLoop, TSocket socket, TEventHandlerPtr eventHandler, void* cookie) - : EventLoop(eventLoop) - , Socket(socket) + : EventLoop(eventLoop) + , Socket(socket) , EventHandler(eventHandler) , Cookie(cookie) , CurrentFlags(0) , Close(false) -{ -} - +{ +} + TChannel::TImpl::~TImpl() { Y_ASSERT(Close); } -void TChannel::TImpl::EnableRead() { +void TChannel::TImpl::EnableRead() { Update(OP_READ, true); -} - -void TChannel::TImpl::DisableRead() { +} + +void TChannel::TImpl::DisableRead() { Update(OP_READ, false); -} - -void TChannel::TImpl::EnableWrite() { +} + +void TChannel::TImpl::EnableWrite() { Update(OP_WRITE, true); -} - -void TChannel::TImpl::DisableWrite() { +} + +void TChannel::TImpl::DisableWrite() { Update(OP_WRITE, false); -} - -void TChannel::TImpl::Unregister() { +} + +void TChannel::TImpl::Unregister() { TGuard<TMutex> guard(Mutex); if (Close) { @@ -196,8 +196,8 @@ void TChannel::TImpl::Unregister() { EventLoop->SocketsToRemove.Enqueue(Socket); EventLoop->Wakeup(); -} - +} + void TChannel::TImpl::Update(int flags, bool enable) { TGuard<TMutex> guard(Mutex); @@ -221,10 +221,10 @@ void TChannel::TImpl::Update(int flags, bool enable) { CurrentFlags = newFlags; } -SOCKET TChannel::TImpl::GetSocket() const { - return Socket; -} - +SOCKET TChannel::TImpl::GetSocket() const { + return Socket; +} + TSocket TChannel::TImpl::GetSocketPtr() const { return Socket; } @@ -256,27 +256,27 @@ TEventLoop::TImpl::TImpl(const char* name) : Name(name) , RunningState(EVENT_LOOP_CREATED) , StopSignal(0) -{ - SOCKET wakeupSockets[2]; - +{ + SOCKET wakeupSockets[2]; + if (SocketPair(wakeupSockets) < 0) { Y_FAIL("failed to create socket pair for wakeup sockets: %s", LastSystemErrorText()); } - - TSocketHolder wakeupReadSocket(wakeupSockets[0]); - TSocketHolder wakeupWriteSocket(wakeupSockets[1]); - - WakeupReadSocket.Swap(wakeupReadSocket); - WakeupWriteSocket.Swap(wakeupWriteSocket); - - SetNonBlock(WakeupWriteSocket, true); - SetNonBlock(WakeupReadSocket, true); - - Poller.WaitRead(WakeupReadSocket, + + TSocketHolder wakeupReadSocket(wakeupSockets[0]); + TSocketHolder wakeupWriteSocket(wakeupSockets[1]); + + WakeupReadSocket.Swap(wakeupReadSocket); + WakeupWriteSocket.Swap(wakeupWriteSocket); + + SetNonBlock(WakeupWriteSocket, true); + SetNonBlock(WakeupReadSocket, true); + + Poller.WaitRead(WakeupReadSocket, reinterpret_cast<void*>(this)); -} - -void TEventLoop::TImpl::Run() { +} + +void TEventLoop::TImpl::Run() { bool res = AtomicCas(&RunningState, EVENT_LOOP_RUNNING, EVENT_LOOP_CREATED); Y_VERIFY(res, "Invalid mbus event loop state"); @@ -285,30 +285,30 @@ void TEventLoop::TImpl::Run() { } while (AtomicGet(StopSignal) == 0) { - void* cookies[1024]; + void* cookies[1024]; const size_t count = Poller.WaitI(cookies, Y_ARRAY_SIZE(cookies)); - - void** end = cookies + count; - for (void** c = cookies; c != end; ++c) { + + void** end = cookies + count; + for (void** c = cookies; c != end; ++c) { TChannel::TImpl* s = reinterpret_cast<TChannel::TImpl*>(*c); - + if (*c == this) { - char buf[0x1000]; + char buf[0x1000]; if (NBus::NPrivate::SocketRecv(WakeupReadSocket, buf) < 0) { Y_FAIL("failed to recv from wakeup socket: %s", LastSystemErrorText()); } - continue; - } - + continue; + } + s->CallHandler(); - } + } SOCKET socket = -1; while (SocketsToRemove.Dequeue(&socket)) { TGuard<TMutex> guard(Mutex); Y_VERIFY(Data.erase(socket) == 1, "must be removed once"); } - } + } { TGuard<TMutex> guard(Mutex); @@ -325,9 +325,9 @@ void TEventLoop::TImpl::Run() { Y_VERIFY(res); StoppedEvent.Signal(); -} - -void TEventLoop::TImpl::Stop() { +} + +void TEventLoop::TImpl::Stop() { AtomicSet(StopSignal, 1); if (AtomicGet(RunningState) == EVENT_LOOP_RUNNING) { @@ -335,36 +335,36 @@ void TEventLoop::TImpl::Stop() { StoppedEvent.WaitI(); } -} - +} + TChannelPtr TEventLoop::TImpl::Register(TSocket socket, TEventHandlerPtr eventHandler, void* cookie) { Y_VERIFY(socket != INVALID_SOCKET, "must be a valid socket"); TChannelPtr channel = new TChannel(new TChannel::TImpl(this, socket, eventHandler, cookie)); - + TGuard<TMutex> guard(Mutex); - + Y_VERIFY(Data.insert(std::make_pair(socket, channel)).second, "must not be already inserted"); - + return channel; -} - -void TEventLoop::TImpl::Wakeup() { +} + +void TEventLoop::TImpl::Wakeup() { if (NBus::NPrivate::SocketSend(WakeupWriteSocket, TArrayRef<const char>("", 1)) < 0) { if (LastSystemError() != EAGAIN) { Y_FAIL("failed to send to wakeup socket: %s", LastSystemErrorText()); } - } -} - -void TEventLoop::TImpl::AddToPoller(SOCKET socket, void* cookie, int flags) { - if (flags == OP_READ) { + } +} + +void TEventLoop::TImpl::AddToPoller(SOCKET socket, void* cookie, int flags) { + if (flags == OP_READ) { Poller.WaitReadOneShot(socket, cookie); - } else if (flags == OP_WRITE) { + } else if (flags == OP_WRITE) { Poller.WaitWriteOneShot(socket, cookie); - } else if (flags == OP_READ_WRITE) { + } else if (flags == OP_READ_WRITE) { Poller.WaitReadWriteOneShot(socket, cookie); - } else { + } else { Y_FAIL("Wrong flags: %d", int(flags)); - } -} + } +} |