aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/messagebus/event_loop.cpp
diff options
context:
space:
mode:
authorsomov <somov@yandex-team.ru>2022-02-10 16:45:47 +0300
committerDaniil Cherednik <dcherednik@yandex-team.ru>2022-02-10 16:45:47 +0300
commita5950576e397b1909261050b8c7da16db58f10b1 (patch)
tree7ba7677f6a4c3e19e2cefab34d16df2c8963b4d4 /library/cpp/messagebus/event_loop.cpp
parent81eddc8c0b55990194e112b02d127b87d54164a9 (diff)
downloadydb-a5950576e397b1909261050b8c7da16db58f10b1.tar.gz
Restoring authorship annotation for <somov@yandex-team.ru>. Commit 1 of 2.
Diffstat (limited to 'library/cpp/messagebus/event_loop.cpp')
-rw-r--r--library/cpp/messagebus/event_loop.cpp330
1 files changed, 165 insertions, 165 deletions
diff --git a/library/cpp/messagebus/event_loop.cpp b/library/cpp/messagebus/event_loop.cpp
index f685135bed..b1209d2b5c 100644
--- a/library/cpp/messagebus/event_loop.cpp
+++ b/library/cpp/messagebus/event_loop.cpp
@@ -5,50 +5,50 @@
#include <util/generic/hash.h>
#include <util/network/pair.h>
-#include <util/network/poller.h>
+#include <util/network/poller.h>
#include <util/system/event.h>
#include <util/system/mutex.h>
#include <util/system/thread.h>
-#include <util/system/yassert.h>
+#include <util/system/yassert.h>
#include <util/thread/lfqueue.h>
-
+
#include <errno.h>
-using namespace NEventLoop;
-
-namespace {
+using namespace NEventLoop;
+
+namespace {
enum ERunningState {
EVENT_LOOP_CREATED,
EVENT_LOOP_RUNNING,
EVENT_LOOP_STOPPED,
};
- enum EOperation {
- OP_READ = 1,
- OP_WRITE = 2,
- OP_READ_WRITE = OP_READ | OP_WRITE,
- };
-}
-
-class TChannel::TImpl {
-public:
+ enum EOperation {
+ OP_READ = 1,
+ OP_WRITE = 2,
+ OP_READ_WRITE = OP_READ | OP_WRITE,
+ };
+}
+
+class TChannel::TImpl {
+public:
TImpl(TEventLoop::TImpl* eventLoop, TSocket socket, TEventHandlerPtr, void* cookie);
~TImpl();
-
- void EnableRead();
- void DisableRead();
- void EnableWrite();
- void DisableWrite();
-
- void Unregister();
-
- SOCKET GetSocket() const;
+
+ void EnableRead();
+ void DisableRead();
+ void EnableWrite();
+ void DisableWrite();
+
+ void Unregister();
+
+ SOCKET GetSocket() const;
TSocket GetSocketPtr() const;
-
+
void Update(int pollerFlags, bool enable);
void CallHandler();
- TEventLoop::TImpl* EventLoop;
+ TEventLoop::TImpl* EventLoop;
TSocket Socket;
TEventHandlerPtr EventHandler;
void* Cookie;
@@ -57,130 +57,130 @@ public:
int CurrentFlags;
bool Close;
-};
-
-class TEventLoop::TImpl {
-public:
+};
+
+class TEventLoop::TImpl {
+public:
TImpl(const char* name);
-
- void Run();
- void Wakeup();
- void Stop();
-
+
+ void Run();
+ void Wakeup();
+ void Stop();
+
TChannelPtr Register(TSocket socket, TEventHandlerPtr eventHandler, void* cookie);
- void Unregister(SOCKET socket);
-
+ void Unregister(SOCKET socket);
+
typedef THashMap<SOCKET, TChannelPtr> TData;
-
+
void AddToPoller(SOCKET socket, void* cookie, int flags);
-
+
TMutex Mutex;
-
+
const char* Name;
TAtomic RunningState;
TAtomic StopSignal;
TSystemEvent StoppedEvent;
- TData Data;
-
+ TData Data;
+
TLockFreeQueue<SOCKET> SocketsToRemove;
- TSocketPoller Poller;
- TSocketHolder WakeupReadSocket;
- TSocketHolder WakeupWriteSocket;
-};
-
-TChannel::~TChannel() {
-}
-
-void TChannel::EnableRead() {
- Impl->EnableRead();
-}
-
-void TChannel::DisableRead() {
- Impl->DisableRead();
-}
-
-void TChannel::EnableWrite() {
- Impl->EnableWrite();
-}
-
-void TChannel::DisableWrite() {
- Impl->DisableWrite();
-}
-
-void TChannel::Unregister() {
- Impl->Unregister();
-}
-
-SOCKET TChannel::GetSocket() const {
- return Impl->GetSocket();
-}
-
+ TSocketPoller Poller;
+ TSocketHolder WakeupReadSocket;
+ TSocketHolder WakeupWriteSocket;
+};
+
+TChannel::~TChannel() {
+}
+
+void TChannel::EnableRead() {
+ Impl->EnableRead();
+}
+
+void TChannel::DisableRead() {
+ Impl->DisableRead();
+}
+
+void TChannel::EnableWrite() {
+ Impl->EnableWrite();
+}
+
+void TChannel::DisableWrite() {
+ Impl->DisableWrite();
+}
+
+void TChannel::Unregister() {
+ Impl->Unregister();
+}
+
+SOCKET TChannel::GetSocket() const {
+ return Impl->GetSocket();
+}
+
TSocket TChannel::GetSocketPtr() const {
return Impl->GetSocketPtr();
}
TChannel::TChannel(TImpl* impl)
: Impl(impl)
-{
-}
-
+{
+}
+
TEventLoop::TEventLoop(const char* name)
: Impl(new TImpl(name))
-{
-}
-
+{
+}
+
TEventLoop::~TEventLoop() {
-}
-
-void TEventLoop::Run() {
- Impl->Run();
-}
-
-void TEventLoop::Stop() {
- Impl->Stop();
-}
-
+}
+
+void TEventLoop::Run() {
+ Impl->Run();
+}
+
+void TEventLoop::Stop() {
+ Impl->Stop();
+}
+
bool TEventLoop::IsRunning() {
return AtomicGet(Impl->RunningState) == EVENT_LOOP_RUNNING;
}
TChannelPtr TEventLoop::Register(TSocket socket, TEventHandlerPtr eventHandler, void* cookie) {
return Impl->Register(socket, eventHandler, cookie);
-}
-
+}
+
TChannel::TImpl::TImpl(TEventLoop::TImpl* eventLoop, TSocket socket, TEventHandlerPtr eventHandler, void* cookie)
- : EventLoop(eventLoop)
- , Socket(socket)
+ : EventLoop(eventLoop)
+ , Socket(socket)
, EventHandler(eventHandler)
, Cookie(cookie)
, CurrentFlags(0)
, Close(false)
-{
-}
-
+{
+}
+
TChannel::TImpl::~TImpl() {
Y_ASSERT(Close);
}
-void TChannel::TImpl::EnableRead() {
+void TChannel::TImpl::EnableRead() {
Update(OP_READ, true);
-}
-
-void TChannel::TImpl::DisableRead() {
+}
+
+void TChannel::TImpl::DisableRead() {
Update(OP_READ, false);
-}
-
-void TChannel::TImpl::EnableWrite() {
+}
+
+void TChannel::TImpl::EnableWrite() {
Update(OP_WRITE, true);
-}
-
-void TChannel::TImpl::DisableWrite() {
+}
+
+void TChannel::TImpl::DisableWrite() {
Update(OP_WRITE, false);
-}
-
-void TChannel::TImpl::Unregister() {
+}
+
+void TChannel::TImpl::Unregister() {
TGuard<TMutex> guard(Mutex);
if (Close) {
@@ -196,8 +196,8 @@ void TChannel::TImpl::Unregister() {
EventLoop->SocketsToRemove.Enqueue(Socket);
EventLoop->Wakeup();
-}
-
+}
+
void TChannel::TImpl::Update(int flags, bool enable) {
TGuard<TMutex> guard(Mutex);
@@ -221,10 +221,10 @@ void TChannel::TImpl::Update(int flags, bool enable) {
CurrentFlags = newFlags;
}
-SOCKET TChannel::TImpl::GetSocket() const {
- return Socket;
-}
-
+SOCKET TChannel::TImpl::GetSocket() const {
+ return Socket;
+}
+
TSocket TChannel::TImpl::GetSocketPtr() const {
return Socket;
}
@@ -256,27 +256,27 @@ TEventLoop::TImpl::TImpl(const char* name)
: Name(name)
, RunningState(EVENT_LOOP_CREATED)
, StopSignal(0)
-{
- SOCKET wakeupSockets[2];
-
+{
+ SOCKET wakeupSockets[2];
+
if (SocketPair(wakeupSockets) < 0) {
Y_FAIL("failed to create socket pair for wakeup sockets: %s", LastSystemErrorText());
}
-
- TSocketHolder wakeupReadSocket(wakeupSockets[0]);
- TSocketHolder wakeupWriteSocket(wakeupSockets[1]);
-
- WakeupReadSocket.Swap(wakeupReadSocket);
- WakeupWriteSocket.Swap(wakeupWriteSocket);
-
- SetNonBlock(WakeupWriteSocket, true);
- SetNonBlock(WakeupReadSocket, true);
-
- Poller.WaitRead(WakeupReadSocket,
+
+ TSocketHolder wakeupReadSocket(wakeupSockets[0]);
+ TSocketHolder wakeupWriteSocket(wakeupSockets[1]);
+
+ WakeupReadSocket.Swap(wakeupReadSocket);
+ WakeupWriteSocket.Swap(wakeupWriteSocket);
+
+ SetNonBlock(WakeupWriteSocket, true);
+ SetNonBlock(WakeupReadSocket, true);
+
+ Poller.WaitRead(WakeupReadSocket,
reinterpret_cast<void*>(this));
-}
-
-void TEventLoop::TImpl::Run() {
+}
+
+void TEventLoop::TImpl::Run() {
bool res = AtomicCas(&RunningState, EVENT_LOOP_RUNNING, EVENT_LOOP_CREATED);
Y_VERIFY(res, "Invalid mbus event loop state");
@@ -285,30 +285,30 @@ void TEventLoop::TImpl::Run() {
}
while (AtomicGet(StopSignal) == 0) {
- void* cookies[1024];
+ void* cookies[1024];
const size_t count = Poller.WaitI(cookies, Y_ARRAY_SIZE(cookies));
-
- void** end = cookies + count;
- for (void** c = cookies; c != end; ++c) {
+
+ void** end = cookies + count;
+ for (void** c = cookies; c != end; ++c) {
TChannel::TImpl* s = reinterpret_cast<TChannel::TImpl*>(*c);
-
+
if (*c == this) {
- char buf[0x1000];
+ char buf[0x1000];
if (NBus::NPrivate::SocketRecv(WakeupReadSocket, buf) < 0) {
Y_FAIL("failed to recv from wakeup socket: %s", LastSystemErrorText());
}
- continue;
- }
-
+ continue;
+ }
+
s->CallHandler();
- }
+ }
SOCKET socket = -1;
while (SocketsToRemove.Dequeue(&socket)) {
TGuard<TMutex> guard(Mutex);
Y_VERIFY(Data.erase(socket) == 1, "must be removed once");
}
- }
+ }
{
TGuard<TMutex> guard(Mutex);
@@ -325,9 +325,9 @@ void TEventLoop::TImpl::Run() {
Y_VERIFY(res);
StoppedEvent.Signal();
-}
-
-void TEventLoop::TImpl::Stop() {
+}
+
+void TEventLoop::TImpl::Stop() {
AtomicSet(StopSignal, 1);
if (AtomicGet(RunningState) == EVENT_LOOP_RUNNING) {
@@ -335,36 +335,36 @@ void TEventLoop::TImpl::Stop() {
StoppedEvent.WaitI();
}
-}
-
+}
+
TChannelPtr TEventLoop::TImpl::Register(TSocket socket, TEventHandlerPtr eventHandler, void* cookie) {
Y_VERIFY(socket != INVALID_SOCKET, "must be a valid socket");
TChannelPtr channel = new TChannel(new TChannel::TImpl(this, socket, eventHandler, cookie));
-
+
TGuard<TMutex> guard(Mutex);
-
+
Y_VERIFY(Data.insert(std::make_pair(socket, channel)).second, "must not be already inserted");
-
+
return channel;
-}
-
-void TEventLoop::TImpl::Wakeup() {
+}
+
+void TEventLoop::TImpl::Wakeup() {
if (NBus::NPrivate::SocketSend(WakeupWriteSocket, TArrayRef<const char>("", 1)) < 0) {
if (LastSystemError() != EAGAIN) {
Y_FAIL("failed to send to wakeup socket: %s", LastSystemErrorText());
}
- }
-}
-
-void TEventLoop::TImpl::AddToPoller(SOCKET socket, void* cookie, int flags) {
- if (flags == OP_READ) {
+ }
+}
+
+void TEventLoop::TImpl::AddToPoller(SOCKET socket, void* cookie, int flags) {
+ if (flags == OP_READ) {
Poller.WaitReadOneShot(socket, cookie);
- } else if (flags == OP_WRITE) {
+ } else if (flags == OP_WRITE) {
Poller.WaitWriteOneShot(socket, cookie);
- } else if (flags == OP_READ_WRITE) {
+ } else if (flags == OP_READ_WRITE) {
Poller.WaitReadWriteOneShot(socket, cookie);
- } else {
+ } else {
Y_FAIL("Wrong flags: %d", int(flags));
- }
-}
+ }
+}