diff options
author | nga <nga@yandex-team.ru> | 2022-02-10 16:48:09 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:48:09 +0300 |
commit | 1f553f46fb4f3c5eec631352cdd900a0709016af (patch) | |
tree | a231fba2c03b440becaea6c86a2702d0bfb0336e /library/cpp/messagebus/event_loop.cpp | |
parent | c4de7efdedc25b49cbea74bd589eecb61b55b60a (diff) | |
download | ydb-1f553f46fb4f3c5eec631352cdd900a0709016af.tar.gz |
Restoring authorship annotation for <nga@yandex-team.ru>. Commit 1 of 2.
Diffstat (limited to 'library/cpp/messagebus/event_loop.cpp')
-rw-r--r-- | library/cpp/messagebus/event_loop.cpp | 330 |
1 files changed, 165 insertions, 165 deletions
diff --git a/library/cpp/messagebus/event_loop.cpp b/library/cpp/messagebus/event_loop.cpp index f685135bed..b82bd023e5 100644 --- a/library/cpp/messagebus/event_loop.cpp +++ b/library/cpp/messagebus/event_loop.cpp @@ -1,8 +1,8 @@ #include "event_loop.h" - + #include "network.h" -#include "thread_extra.h" - +#include "thread_extra.h" + #include <util/generic/hash.h> #include <util/network/pair.h> #include <util/network/poller.h> @@ -10,19 +10,19 @@ #include <util/system/mutex.h> #include <util/system/thread.h> #include <util/system/yassert.h> -#include <util/thread/lfqueue.h> +#include <util/thread/lfqueue.h> #include <errno.h> - + using namespace NEventLoop; namespace { - enum ERunningState { - EVENT_LOOP_CREATED, - EVENT_LOOP_RUNNING, - EVENT_LOOP_STOPPED, - }; - + enum ERunningState { + EVENT_LOOP_CREATED, + EVENT_LOOP_RUNNING, + EVENT_LOOP_STOPPED, + }; + enum EOperation { OP_READ = 1, OP_WRITE = 2, @@ -32,8 +32,8 @@ namespace { class TChannel::TImpl { public: - TImpl(TEventLoop::TImpl* eventLoop, TSocket socket, TEventHandlerPtr, void* cookie); - ~TImpl(); + TImpl(TEventLoop::TImpl* eventLoop, TSocket socket, TEventHandlerPtr, void* cookie); + ~TImpl(); void EnableRead(); void DisableRead(); @@ -43,48 +43,48 @@ public: void Unregister(); SOCKET GetSocket() const; - TSocket GetSocketPtr() const; - - void Update(int pollerFlags, bool enable); - void CallHandler(); + TSocket GetSocketPtr() const; + void Update(int pollerFlags, bool enable); + void CallHandler(); + TEventLoop::TImpl* EventLoop; - TSocket Socket; - TEventHandlerPtr EventHandler; - void* Cookie; - - TMutex Mutex; - - int CurrentFlags; - bool Close; + TSocket Socket; + TEventHandlerPtr EventHandler; + void* Cookie; + + TMutex Mutex; + + int CurrentFlags; + bool Close; }; class TEventLoop::TImpl { public: - TImpl(const char* name); + TImpl(const char* name); void Run(); void Wakeup(); void Stop(); - TChannelPtr Register(TSocket socket, TEventHandlerPtr eventHandler, void* cookie); + TChannelPtr Register(TSocket socket, TEventHandlerPtr eventHandler, void* cookie); void Unregister(SOCKET socket); typedef THashMap<SOCKET, TChannelPtr> TData; - void AddToPoller(SOCKET socket, void* cookie, int flags); + void AddToPoller(SOCKET socket, void* cookie, int flags); - TMutex Mutex; - - const char* Name; + TMutex Mutex; + const char* Name; + TAtomic RunningState; TAtomic StopSignal; TSystemEvent StoppedEvent; TData Data; - TLockFreeQueue<SOCKET> SocketsToRemove; - + TLockFreeQueue<SOCKET> SocketsToRemove; + TSocketPoller Poller; TSocketHolder WakeupReadSocket; TSocketHolder WakeupWriteSocket; @@ -117,17 +117,17 @@ SOCKET TChannel::GetSocket() const { return Impl->GetSocket(); } -TSocket TChannel::GetSocketPtr() const { - return Impl->GetSocketPtr(); -} - -TChannel::TChannel(TImpl* impl) - : Impl(impl) +TSocket TChannel::GetSocketPtr() const { + return Impl->GetSocketPtr(); +} + +TChannel::TChannel(TImpl* impl) + : Impl(impl) { } -TEventLoop::TEventLoop(const char* name) - : Impl(new TImpl(name)) +TEventLoop::TEventLoop(const char* name) + : Impl(new TImpl(name)) { } @@ -142,126 +142,126 @@ void TEventLoop::Stop() { Impl->Stop(); } -bool TEventLoop::IsRunning() { +bool TEventLoop::IsRunning() { return AtomicGet(Impl->RunningState) == EVENT_LOOP_RUNNING; +} + +TChannelPtr TEventLoop::Register(TSocket socket, TEventHandlerPtr eventHandler, void* cookie) { + return Impl->Register(socket, eventHandler, cookie); } -TChannelPtr TEventLoop::Register(TSocket socket, TEventHandlerPtr eventHandler, void* cookie) { - return Impl->Register(socket, eventHandler, cookie); -} - -TChannel::TImpl::TImpl(TEventLoop::TImpl* eventLoop, TSocket socket, TEventHandlerPtr eventHandler, void* cookie) +TChannel::TImpl::TImpl(TEventLoop::TImpl* eventLoop, TSocket socket, TEventHandlerPtr eventHandler, void* cookie) : EventLoop(eventLoop) , Socket(socket) - , EventHandler(eventHandler) - , Cookie(cookie) - , CurrentFlags(0) - , Close(false) + , EventHandler(eventHandler) + , Cookie(cookie) + , CurrentFlags(0) + , Close(false) { } -TChannel::TImpl::~TImpl() { +TChannel::TImpl::~TImpl() { Y_ASSERT(Close); -} - +} + void TChannel::TImpl::EnableRead() { - Update(OP_READ, true); + Update(OP_READ, true); } void TChannel::TImpl::DisableRead() { - Update(OP_READ, false); + Update(OP_READ, false); } void TChannel::TImpl::EnableWrite() { - Update(OP_WRITE, true); + Update(OP_WRITE, true); } void TChannel::TImpl::DisableWrite() { - Update(OP_WRITE, false); + Update(OP_WRITE, false); } void TChannel::TImpl::Unregister() { - TGuard<TMutex> guard(Mutex); - - if (Close) { - return; - } - - Close = true; - if (CurrentFlags != 0) { - EventLoop->Poller.Unwait(Socket); - CurrentFlags = 0; - } - EventHandler.Drop(); - - EventLoop->SocketsToRemove.Enqueue(Socket); - EventLoop->Wakeup(); -} - -void TChannel::TImpl::Update(int flags, bool enable) { - TGuard<TMutex> guard(Mutex); - - if (Close) { - return; - } - - int newFlags = enable ? (CurrentFlags | flags) : (CurrentFlags & ~flags); - - if (CurrentFlags == newFlags) { - return; - } - - if (!newFlags) { - EventLoop->Poller.Unwait(Socket); - } else { - void* cookie = reinterpret_cast<void*>(this); - EventLoop->AddToPoller(Socket, cookie, newFlags); - } - - CurrentFlags = newFlags; -} - + TGuard<TMutex> guard(Mutex); + + if (Close) { + return; + } + + Close = true; + if (CurrentFlags != 0) { + EventLoop->Poller.Unwait(Socket); + CurrentFlags = 0; + } + EventHandler.Drop(); + + EventLoop->SocketsToRemove.Enqueue(Socket); + EventLoop->Wakeup(); +} + +void TChannel::TImpl::Update(int flags, bool enable) { + TGuard<TMutex> guard(Mutex); + + if (Close) { + return; + } + + int newFlags = enable ? (CurrentFlags | flags) : (CurrentFlags & ~flags); + + if (CurrentFlags == newFlags) { + return; + } + + if (!newFlags) { + EventLoop->Poller.Unwait(Socket); + } else { + void* cookie = reinterpret_cast<void*>(this); + EventLoop->AddToPoller(Socket, cookie, newFlags); + } + + CurrentFlags = newFlags; +} + SOCKET TChannel::TImpl::GetSocket() const { return Socket; } -TSocket TChannel::TImpl::GetSocketPtr() const { - return Socket; -} - -void TChannel::TImpl::CallHandler() { - TEventHandlerPtr handler; - - { - TGuard<TMutex> guard(Mutex); - - // other thread may have re-added socket to epoll - // so even if CurrentFlags is 0, epoll may fire again - // so please use non-blocking operations - CurrentFlags = 0; - - if (Close) { - return; - } - - handler = EventHandler; - } - - if (!!handler) { - handler->HandleEvent(Socket, Cookie); - } -} - -TEventLoop::TImpl::TImpl(const char* name) - : Name(name) - , RunningState(EVENT_LOOP_CREATED) +TSocket TChannel::TImpl::GetSocketPtr() const { + return Socket; +} + +void TChannel::TImpl::CallHandler() { + TEventHandlerPtr handler; + + { + TGuard<TMutex> guard(Mutex); + + // other thread may have re-added socket to epoll + // so even if CurrentFlags is 0, epoll may fire again + // so please use non-blocking operations + CurrentFlags = 0; + + if (Close) { + return; + } + + handler = EventHandler; + } + + if (!!handler) { + handler->HandleEvent(Socket, Cookie); + } +} + +TEventLoop::TImpl::TImpl(const char* name) + : Name(name) + , RunningState(EVENT_LOOP_CREATED) , StopSignal(0) { SOCKET wakeupSockets[2]; - if (SocketPair(wakeupSockets) < 0) { + if (SocketPair(wakeupSockets) < 0) { Y_FAIL("failed to create socket pair for wakeup sockets: %s", LastSystemErrorText()); - } + } TSocketHolder wakeupReadSocket(wakeupSockets[0]); TSocketHolder wakeupWriteSocket(wakeupSockets[1]); @@ -279,91 +279,91 @@ TEventLoop::TImpl::TImpl(const char* name) void TEventLoop::TImpl::Run() { bool res = AtomicCas(&RunningState, EVENT_LOOP_RUNNING, EVENT_LOOP_CREATED); Y_VERIFY(res, "Invalid mbus event loop state"); - - if (!!Name) { + + if (!!Name) { SetCurrentThreadName(Name); - } - + } + while (AtomicGet(StopSignal) == 0) { void* cookies[1024]; const size_t count = Poller.WaitI(cookies, Y_ARRAY_SIZE(cookies)); void** end = cookies + count; for (void** c = cookies; c != end; ++c) { - TChannel::TImpl* s = reinterpret_cast<TChannel::TImpl*>(*c); + TChannel::TImpl* s = reinterpret_cast<TChannel::TImpl*>(*c); - if (*c == this) { + if (*c == this) { char buf[0x1000]; - if (NBus::NPrivate::SocketRecv(WakeupReadSocket, buf) < 0) { + if (NBus::NPrivate::SocketRecv(WakeupReadSocket, buf) < 0) { Y_FAIL("failed to recv from wakeup socket: %s", LastSystemErrorText()); - } + } continue; } - s->CallHandler(); + s->CallHandler(); } - + SOCKET socket = -1; while (SocketsToRemove.Dequeue(&socket)) { - TGuard<TMutex> guard(Mutex); + TGuard<TMutex> guard(Mutex); Y_VERIFY(Data.erase(socket) == 1, "must be removed once"); - } + } } - - { - TGuard<TMutex> guard(Mutex); + + { + TGuard<TMutex> guard(Mutex); for (auto& it : Data) { it.second->Unregister(); - } - - // release file descriptors - Data.clear(); - } - + } + + // release file descriptors + Data.clear(); + } + res = AtomicCas(&RunningState, EVENT_LOOP_STOPPED, EVENT_LOOP_RUNNING); - + Y_VERIFY(res); - StoppedEvent.Signal(); + StoppedEvent.Signal(); } void TEventLoop::TImpl::Stop() { AtomicSet(StopSignal, 1); - + if (AtomicGet(RunningState) == EVENT_LOOP_RUNNING) { Wakeup(); - + StoppedEvent.WaitI(); } } -TChannelPtr TEventLoop::TImpl::Register(TSocket socket, TEventHandlerPtr eventHandler, void* cookie) { +TChannelPtr TEventLoop::TImpl::Register(TSocket socket, TEventHandlerPtr eventHandler, void* cookie) { Y_VERIFY(socket != INVALID_SOCKET, "must be a valid socket"); + + TChannelPtr channel = new TChannel(new TChannel::TImpl(this, socket, eventHandler, cookie)); - TChannelPtr channel = new TChannel(new TChannel::TImpl(this, socket, eventHandler, cookie)); - - TGuard<TMutex> guard(Mutex); + TGuard<TMutex> guard(Mutex); Y_VERIFY(Data.insert(std::make_pair(socket, channel)).second, "must not be already inserted"); - return channel; + return channel; } void TEventLoop::TImpl::Wakeup() { if (NBus::NPrivate::SocketSend(WakeupWriteSocket, TArrayRef<const char>("", 1)) < 0) { - if (LastSystemError() != EAGAIN) { + if (LastSystemError() != EAGAIN) { Y_FAIL("failed to send to wakeup socket: %s", LastSystemErrorText()); - } + } } } void TEventLoop::TImpl::AddToPoller(SOCKET socket, void* cookie, int flags) { if (flags == OP_READ) { - Poller.WaitReadOneShot(socket, cookie); + Poller.WaitReadOneShot(socket, cookie); } else if (flags == OP_WRITE) { - Poller.WaitWriteOneShot(socket, cookie); + Poller.WaitWriteOneShot(socket, cookie); } else if (flags == OP_READ_WRITE) { - Poller.WaitReadWriteOneShot(socket, cookie); + Poller.WaitReadWriteOneShot(socket, cookie); } else { Y_FAIL("Wrong flags: %d", int(flags)); } |