aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/messagebus/event_loop.cpp
diff options
context:
space:
mode:
authornga <nga@yandex-team.ru>2022-02-10 16:48:09 +0300
committerDaniil Cherednik <dcherednik@yandex-team.ru>2022-02-10 16:48:09 +0300
commit1f553f46fb4f3c5eec631352cdd900a0709016af (patch)
treea231fba2c03b440becaea6c86a2702d0bfb0336e /library/cpp/messagebus/event_loop.cpp
parentc4de7efdedc25b49cbea74bd589eecb61b55b60a (diff)
downloadydb-1f553f46fb4f3c5eec631352cdd900a0709016af.tar.gz
Restoring authorship annotation for <nga@yandex-team.ru>. Commit 1 of 2.
Diffstat (limited to 'library/cpp/messagebus/event_loop.cpp')
-rw-r--r--library/cpp/messagebus/event_loop.cpp330
1 files changed, 165 insertions, 165 deletions
diff --git a/library/cpp/messagebus/event_loop.cpp b/library/cpp/messagebus/event_loop.cpp
index f685135bed..b82bd023e5 100644
--- a/library/cpp/messagebus/event_loop.cpp
+++ b/library/cpp/messagebus/event_loop.cpp
@@ -1,8 +1,8 @@
#include "event_loop.h"
-
+
#include "network.h"
-#include "thread_extra.h"
-
+#include "thread_extra.h"
+
#include <util/generic/hash.h>
#include <util/network/pair.h>
#include <util/network/poller.h>
@@ -10,19 +10,19 @@
#include <util/system/mutex.h>
#include <util/system/thread.h>
#include <util/system/yassert.h>
-#include <util/thread/lfqueue.h>
+#include <util/thread/lfqueue.h>
#include <errno.h>
-
+
using namespace NEventLoop;
namespace {
- enum ERunningState {
- EVENT_LOOP_CREATED,
- EVENT_LOOP_RUNNING,
- EVENT_LOOP_STOPPED,
- };
-
+ enum ERunningState {
+ EVENT_LOOP_CREATED,
+ EVENT_LOOP_RUNNING,
+ EVENT_LOOP_STOPPED,
+ };
+
enum EOperation {
OP_READ = 1,
OP_WRITE = 2,
@@ -32,8 +32,8 @@ namespace {
class TChannel::TImpl {
public:
- TImpl(TEventLoop::TImpl* eventLoop, TSocket socket, TEventHandlerPtr, void* cookie);
- ~TImpl();
+ TImpl(TEventLoop::TImpl* eventLoop, TSocket socket, TEventHandlerPtr, void* cookie);
+ ~TImpl();
void EnableRead();
void DisableRead();
@@ -43,48 +43,48 @@ public:
void Unregister();
SOCKET GetSocket() const;
- TSocket GetSocketPtr() const;
-
- void Update(int pollerFlags, bool enable);
- void CallHandler();
+ TSocket GetSocketPtr() const;
+ void Update(int pollerFlags, bool enable);
+ void CallHandler();
+
TEventLoop::TImpl* EventLoop;
- TSocket Socket;
- TEventHandlerPtr EventHandler;
- void* Cookie;
-
- TMutex Mutex;
-
- int CurrentFlags;
- bool Close;
+ TSocket Socket;
+ TEventHandlerPtr EventHandler;
+ void* Cookie;
+
+ TMutex Mutex;
+
+ int CurrentFlags;
+ bool Close;
};
class TEventLoop::TImpl {
public:
- TImpl(const char* name);
+ TImpl(const char* name);
void Run();
void Wakeup();
void Stop();
- TChannelPtr Register(TSocket socket, TEventHandlerPtr eventHandler, void* cookie);
+ TChannelPtr Register(TSocket socket, TEventHandlerPtr eventHandler, void* cookie);
void Unregister(SOCKET socket);
typedef THashMap<SOCKET, TChannelPtr> TData;
- void AddToPoller(SOCKET socket, void* cookie, int flags);
+ void AddToPoller(SOCKET socket, void* cookie, int flags);
- TMutex Mutex;
-
- const char* Name;
+ TMutex Mutex;
+ const char* Name;
+
TAtomic RunningState;
TAtomic StopSignal;
TSystemEvent StoppedEvent;
TData Data;
- TLockFreeQueue<SOCKET> SocketsToRemove;
-
+ TLockFreeQueue<SOCKET> SocketsToRemove;
+
TSocketPoller Poller;
TSocketHolder WakeupReadSocket;
TSocketHolder WakeupWriteSocket;
@@ -117,17 +117,17 @@ SOCKET TChannel::GetSocket() const {
return Impl->GetSocket();
}
-TSocket TChannel::GetSocketPtr() const {
- return Impl->GetSocketPtr();
-}
-
-TChannel::TChannel(TImpl* impl)
- : Impl(impl)
+TSocket TChannel::GetSocketPtr() const {
+ return Impl->GetSocketPtr();
+}
+
+TChannel::TChannel(TImpl* impl)
+ : Impl(impl)
{
}
-TEventLoop::TEventLoop(const char* name)
- : Impl(new TImpl(name))
+TEventLoop::TEventLoop(const char* name)
+ : Impl(new TImpl(name))
{
}
@@ -142,126 +142,126 @@ void TEventLoop::Stop() {
Impl->Stop();
}
-bool TEventLoop::IsRunning() {
+bool TEventLoop::IsRunning() {
return AtomicGet(Impl->RunningState) == EVENT_LOOP_RUNNING;
+}
+
+TChannelPtr TEventLoop::Register(TSocket socket, TEventHandlerPtr eventHandler, void* cookie) {
+ return Impl->Register(socket, eventHandler, cookie);
}
-TChannelPtr TEventLoop::Register(TSocket socket, TEventHandlerPtr eventHandler, void* cookie) {
- return Impl->Register(socket, eventHandler, cookie);
-}
-
-TChannel::TImpl::TImpl(TEventLoop::TImpl* eventLoop, TSocket socket, TEventHandlerPtr eventHandler, void* cookie)
+TChannel::TImpl::TImpl(TEventLoop::TImpl* eventLoop, TSocket socket, TEventHandlerPtr eventHandler, void* cookie)
: EventLoop(eventLoop)
, Socket(socket)
- , EventHandler(eventHandler)
- , Cookie(cookie)
- , CurrentFlags(0)
- , Close(false)
+ , EventHandler(eventHandler)
+ , Cookie(cookie)
+ , CurrentFlags(0)
+ , Close(false)
{
}
-TChannel::TImpl::~TImpl() {
+TChannel::TImpl::~TImpl() {
Y_ASSERT(Close);
-}
-
+}
+
void TChannel::TImpl::EnableRead() {
- Update(OP_READ, true);
+ Update(OP_READ, true);
}
void TChannel::TImpl::DisableRead() {
- Update(OP_READ, false);
+ Update(OP_READ, false);
}
void TChannel::TImpl::EnableWrite() {
- Update(OP_WRITE, true);
+ Update(OP_WRITE, true);
}
void TChannel::TImpl::DisableWrite() {
- Update(OP_WRITE, false);
+ Update(OP_WRITE, false);
}
void TChannel::TImpl::Unregister() {
- TGuard<TMutex> guard(Mutex);
-
- if (Close) {
- return;
- }
-
- Close = true;
- if (CurrentFlags != 0) {
- EventLoop->Poller.Unwait(Socket);
- CurrentFlags = 0;
- }
- EventHandler.Drop();
-
- EventLoop->SocketsToRemove.Enqueue(Socket);
- EventLoop->Wakeup();
-}
-
-void TChannel::TImpl::Update(int flags, bool enable) {
- TGuard<TMutex> guard(Mutex);
-
- if (Close) {
- return;
- }
-
- int newFlags = enable ? (CurrentFlags | flags) : (CurrentFlags & ~flags);
-
- if (CurrentFlags == newFlags) {
- return;
- }
-
- if (!newFlags) {
- EventLoop->Poller.Unwait(Socket);
- } else {
- void* cookie = reinterpret_cast<void*>(this);
- EventLoop->AddToPoller(Socket, cookie, newFlags);
- }
-
- CurrentFlags = newFlags;
-}
-
+ TGuard<TMutex> guard(Mutex);
+
+ if (Close) {
+ return;
+ }
+
+ Close = true;
+ if (CurrentFlags != 0) {
+ EventLoop->Poller.Unwait(Socket);
+ CurrentFlags = 0;
+ }
+ EventHandler.Drop();
+
+ EventLoop->SocketsToRemove.Enqueue(Socket);
+ EventLoop->Wakeup();
+}
+
+void TChannel::TImpl::Update(int flags, bool enable) {
+ TGuard<TMutex> guard(Mutex);
+
+ if (Close) {
+ return;
+ }
+
+ int newFlags = enable ? (CurrentFlags | flags) : (CurrentFlags & ~flags);
+
+ if (CurrentFlags == newFlags) {
+ return;
+ }
+
+ if (!newFlags) {
+ EventLoop->Poller.Unwait(Socket);
+ } else {
+ void* cookie = reinterpret_cast<void*>(this);
+ EventLoop->AddToPoller(Socket, cookie, newFlags);
+ }
+
+ CurrentFlags = newFlags;
+}
+
SOCKET TChannel::TImpl::GetSocket() const {
return Socket;
}
-TSocket TChannel::TImpl::GetSocketPtr() const {
- return Socket;
-}
-
-void TChannel::TImpl::CallHandler() {
- TEventHandlerPtr handler;
-
- {
- TGuard<TMutex> guard(Mutex);
-
- // other thread may have re-added socket to epoll
- // so even if CurrentFlags is 0, epoll may fire again
- // so please use non-blocking operations
- CurrentFlags = 0;
-
- if (Close) {
- return;
- }
-
- handler = EventHandler;
- }
-
- if (!!handler) {
- handler->HandleEvent(Socket, Cookie);
- }
-}
-
-TEventLoop::TImpl::TImpl(const char* name)
- : Name(name)
- , RunningState(EVENT_LOOP_CREATED)
+TSocket TChannel::TImpl::GetSocketPtr() const {
+ return Socket;
+}
+
+void TChannel::TImpl::CallHandler() {
+ TEventHandlerPtr handler;
+
+ {
+ TGuard<TMutex> guard(Mutex);
+
+ // other thread may have re-added socket to epoll
+ // so even if CurrentFlags is 0, epoll may fire again
+ // so please use non-blocking operations
+ CurrentFlags = 0;
+
+ if (Close) {
+ return;
+ }
+
+ handler = EventHandler;
+ }
+
+ if (!!handler) {
+ handler->HandleEvent(Socket, Cookie);
+ }
+}
+
+TEventLoop::TImpl::TImpl(const char* name)
+ : Name(name)
+ , RunningState(EVENT_LOOP_CREATED)
, StopSignal(0)
{
SOCKET wakeupSockets[2];
- if (SocketPair(wakeupSockets) < 0) {
+ if (SocketPair(wakeupSockets) < 0) {
Y_FAIL("failed to create socket pair for wakeup sockets: %s", LastSystemErrorText());
- }
+ }
TSocketHolder wakeupReadSocket(wakeupSockets[0]);
TSocketHolder wakeupWriteSocket(wakeupSockets[1]);
@@ -279,91 +279,91 @@ TEventLoop::TImpl::TImpl(const char* name)
void TEventLoop::TImpl::Run() {
bool res = AtomicCas(&RunningState, EVENT_LOOP_RUNNING, EVENT_LOOP_CREATED);
Y_VERIFY(res, "Invalid mbus event loop state");
-
- if (!!Name) {
+
+ if (!!Name) {
SetCurrentThreadName(Name);
- }
-
+ }
+
while (AtomicGet(StopSignal) == 0) {
void* cookies[1024];
const size_t count = Poller.WaitI(cookies, Y_ARRAY_SIZE(cookies));
void** end = cookies + count;
for (void** c = cookies; c != end; ++c) {
- TChannel::TImpl* s = reinterpret_cast<TChannel::TImpl*>(*c);
+ TChannel::TImpl* s = reinterpret_cast<TChannel::TImpl*>(*c);
- if (*c == this) {
+ if (*c == this) {
char buf[0x1000];
- if (NBus::NPrivate::SocketRecv(WakeupReadSocket, buf) < 0) {
+ if (NBus::NPrivate::SocketRecv(WakeupReadSocket, buf) < 0) {
Y_FAIL("failed to recv from wakeup socket: %s", LastSystemErrorText());
- }
+ }
continue;
}
- s->CallHandler();
+ s->CallHandler();
}
-
+
SOCKET socket = -1;
while (SocketsToRemove.Dequeue(&socket)) {
- TGuard<TMutex> guard(Mutex);
+ TGuard<TMutex> guard(Mutex);
Y_VERIFY(Data.erase(socket) == 1, "must be removed once");
- }
+ }
}
-
- {
- TGuard<TMutex> guard(Mutex);
+
+ {
+ TGuard<TMutex> guard(Mutex);
for (auto& it : Data) {
it.second->Unregister();
- }
-
- // release file descriptors
- Data.clear();
- }
-
+ }
+
+ // release file descriptors
+ Data.clear();
+ }
+
res = AtomicCas(&RunningState, EVENT_LOOP_STOPPED, EVENT_LOOP_RUNNING);
-
+
Y_VERIFY(res);
- StoppedEvent.Signal();
+ StoppedEvent.Signal();
}
void TEventLoop::TImpl::Stop() {
AtomicSet(StopSignal, 1);
-
+
if (AtomicGet(RunningState) == EVENT_LOOP_RUNNING) {
Wakeup();
-
+
StoppedEvent.WaitI();
}
}
-TChannelPtr TEventLoop::TImpl::Register(TSocket socket, TEventHandlerPtr eventHandler, void* cookie) {
+TChannelPtr TEventLoop::TImpl::Register(TSocket socket, TEventHandlerPtr eventHandler, void* cookie) {
Y_VERIFY(socket != INVALID_SOCKET, "must be a valid socket");
+
+ TChannelPtr channel = new TChannel(new TChannel::TImpl(this, socket, eventHandler, cookie));
- TChannelPtr channel = new TChannel(new TChannel::TImpl(this, socket, eventHandler, cookie));
-
- TGuard<TMutex> guard(Mutex);
+ TGuard<TMutex> guard(Mutex);
Y_VERIFY(Data.insert(std::make_pair(socket, channel)).second, "must not be already inserted");
- return channel;
+ return channel;
}
void TEventLoop::TImpl::Wakeup() {
if (NBus::NPrivate::SocketSend(WakeupWriteSocket, TArrayRef<const char>("", 1)) < 0) {
- if (LastSystemError() != EAGAIN) {
+ if (LastSystemError() != EAGAIN) {
Y_FAIL("failed to send to wakeup socket: %s", LastSystemErrorText());
- }
+ }
}
}
void TEventLoop::TImpl::AddToPoller(SOCKET socket, void* cookie, int flags) {
if (flags == OP_READ) {
- Poller.WaitReadOneShot(socket, cookie);
+ Poller.WaitReadOneShot(socket, cookie);
} else if (flags == OP_WRITE) {
- Poller.WaitWriteOneShot(socket, cookie);
+ Poller.WaitWriteOneShot(socket, cookie);
} else if (flags == OP_READ_WRITE) {
- Poller.WaitReadWriteOneShot(socket, cookie);
+ Poller.WaitReadWriteOneShot(socket, cookie);
} else {
Y_FAIL("Wrong flags: %d", int(flags));
}