diff options
author | yazevnul <yazevnul@yandex-team.ru> | 2022-02-10 16:46:46 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:46:46 +0300 |
commit | 8cbc307de0221f84c80c42dcbe07d40727537e2c (patch) | |
tree | 625d5a673015d1df891e051033e9fcde5c7be4e5 /library/cpp/messagebus/event_loop.cpp | |
parent | 30d1ef3941e0dc835be7609de5ebee66958f215a (diff) | |
download | ydb-8cbc307de0221f84c80c42dcbe07d40727537e2c.tar.gz |
Restoring authorship annotation for <yazevnul@yandex-team.ru>. Commit 1 of 2.
Diffstat (limited to 'library/cpp/messagebus/event_loop.cpp')
-rw-r--r-- | library/cpp/messagebus/event_loop.cpp | 30 |
1 files changed, 15 insertions, 15 deletions
diff --git a/library/cpp/messagebus/event_loop.cpp b/library/cpp/messagebus/event_loop.cpp index f685135bed..1dbbec1657 100644 --- a/library/cpp/messagebus/event_loop.cpp +++ b/library/cpp/messagebus/event_loop.cpp @@ -4,11 +4,11 @@ #include "thread_extra.h" #include <util/generic/hash.h> -#include <util/network/pair.h> +#include <util/network/pair.h> #include <util/network/poller.h> #include <util/system/event.h> #include <util/system/mutex.h> -#include <util/system/thread.h> +#include <util/system/thread.h> #include <util/system/yassert.h> #include <util/thread/lfqueue.h> @@ -161,7 +161,7 @@ TChannel::TImpl::TImpl(TEventLoop::TImpl* eventLoop, TSocket socket, TEventHandl } TChannel::TImpl::~TImpl() { - Y_ASSERT(Close); + Y_ASSERT(Close); } void TChannel::TImpl::EnableRead() { @@ -260,7 +260,7 @@ TEventLoop::TImpl::TImpl(const char* name) SOCKET wakeupSockets[2]; if (SocketPair(wakeupSockets) < 0) { - Y_FAIL("failed to create socket pair for wakeup sockets: %s", LastSystemErrorText()); + Y_FAIL("failed to create socket pair for wakeup sockets: %s", LastSystemErrorText()); } TSocketHolder wakeupReadSocket(wakeupSockets[0]); @@ -278,7 +278,7 @@ TEventLoop::TImpl::TImpl(const char* name) void TEventLoop::TImpl::Run() { bool res = AtomicCas(&RunningState, EVENT_LOOP_RUNNING, EVENT_LOOP_CREATED); - Y_VERIFY(res, "Invalid mbus event loop state"); + Y_VERIFY(res, "Invalid mbus event loop state"); if (!!Name) { SetCurrentThreadName(Name); @@ -286,7 +286,7 @@ void TEventLoop::TImpl::Run() { while (AtomicGet(StopSignal) == 0) { void* cookies[1024]; - const size_t count = Poller.WaitI(cookies, Y_ARRAY_SIZE(cookies)); + const size_t count = Poller.WaitI(cookies, Y_ARRAY_SIZE(cookies)); void** end = cookies + count; for (void** c = cookies; c != end; ++c) { @@ -295,7 +295,7 @@ void TEventLoop::TImpl::Run() { if (*c == this) { char buf[0x1000]; if (NBus::NPrivate::SocketRecv(WakeupReadSocket, buf) < 0) { - Y_FAIL("failed to recv from wakeup socket: %s", LastSystemErrorText()); + Y_FAIL("failed to recv from wakeup socket: %s", LastSystemErrorText()); } continue; } @@ -306,14 +306,14 @@ void TEventLoop::TImpl::Run() { SOCKET socket = -1; while (SocketsToRemove.Dequeue(&socket)) { TGuard<TMutex> guard(Mutex); - Y_VERIFY(Data.erase(socket) == 1, "must be removed once"); + Y_VERIFY(Data.erase(socket) == 1, "must be removed once"); } } { TGuard<TMutex> guard(Mutex); - for (auto& it : Data) { - it.second->Unregister(); + for (auto& it : Data) { + it.second->Unregister(); } // release file descriptors @@ -322,7 +322,7 @@ void TEventLoop::TImpl::Run() { res = AtomicCas(&RunningState, EVENT_LOOP_STOPPED, EVENT_LOOP_RUNNING); - Y_VERIFY(res); + Y_VERIFY(res); StoppedEvent.Signal(); } @@ -338,13 +338,13 @@ void TEventLoop::TImpl::Stop() { } TChannelPtr TEventLoop::TImpl::Register(TSocket socket, TEventHandlerPtr eventHandler, void* cookie) { - Y_VERIFY(socket != INVALID_SOCKET, "must be a valid socket"); + Y_VERIFY(socket != INVALID_SOCKET, "must be a valid socket"); TChannelPtr channel = new TChannel(new TChannel::TImpl(this, socket, eventHandler, cookie)); TGuard<TMutex> guard(Mutex); - Y_VERIFY(Data.insert(std::make_pair(socket, channel)).second, "must not be already inserted"); + Y_VERIFY(Data.insert(std::make_pair(socket, channel)).second, "must not be already inserted"); return channel; } @@ -352,7 +352,7 @@ TChannelPtr TEventLoop::TImpl::Register(TSocket socket, TEventHandlerPtr eventHa void TEventLoop::TImpl::Wakeup() { if (NBus::NPrivate::SocketSend(WakeupWriteSocket, TArrayRef<const char>("", 1)) < 0) { if (LastSystemError() != EAGAIN) { - Y_FAIL("failed to send to wakeup socket: %s", LastSystemErrorText()); + Y_FAIL("failed to send to wakeup socket: %s", LastSystemErrorText()); } } } @@ -365,6 +365,6 @@ void TEventLoop::TImpl::AddToPoller(SOCKET socket, void* cookie, int flags) { } else if (flags == OP_READ_WRITE) { Poller.WaitReadWriteOneShot(socket, cookie); } else { - Y_FAIL("Wrong flags: %d", int(flags)); + Y_FAIL("Wrong flags: %d", int(flags)); } } |