aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/messagebus/event_loop.cpp
diff options
context:
space:
mode:
authoryazevnul <yazevnul@yandex-team.ru>2022-02-10 16:46:46 +0300
committerDaniil Cherednik <dcherednik@yandex-team.ru>2022-02-10 16:46:46 +0300
commit8cbc307de0221f84c80c42dcbe07d40727537e2c (patch)
tree625d5a673015d1df891e051033e9fcde5c7be4e5 /library/cpp/messagebus/event_loop.cpp
parent30d1ef3941e0dc835be7609de5ebee66958f215a (diff)
downloadydb-8cbc307de0221f84c80c42dcbe07d40727537e2c.tar.gz
Restoring authorship annotation for <yazevnul@yandex-team.ru>. Commit 1 of 2.
Diffstat (limited to 'library/cpp/messagebus/event_loop.cpp')
-rw-r--r--library/cpp/messagebus/event_loop.cpp30
1 files changed, 15 insertions, 15 deletions
diff --git a/library/cpp/messagebus/event_loop.cpp b/library/cpp/messagebus/event_loop.cpp
index f685135bed..1dbbec1657 100644
--- a/library/cpp/messagebus/event_loop.cpp
+++ b/library/cpp/messagebus/event_loop.cpp
@@ -4,11 +4,11 @@
#include "thread_extra.h"
#include <util/generic/hash.h>
-#include <util/network/pair.h>
+#include <util/network/pair.h>
#include <util/network/poller.h>
#include <util/system/event.h>
#include <util/system/mutex.h>
-#include <util/system/thread.h>
+#include <util/system/thread.h>
#include <util/system/yassert.h>
#include <util/thread/lfqueue.h>
@@ -161,7 +161,7 @@ TChannel::TImpl::TImpl(TEventLoop::TImpl* eventLoop, TSocket socket, TEventHandl
}
TChannel::TImpl::~TImpl() {
- Y_ASSERT(Close);
+ Y_ASSERT(Close);
}
void TChannel::TImpl::EnableRead() {
@@ -260,7 +260,7 @@ TEventLoop::TImpl::TImpl(const char* name)
SOCKET wakeupSockets[2];
if (SocketPair(wakeupSockets) < 0) {
- Y_FAIL("failed to create socket pair for wakeup sockets: %s", LastSystemErrorText());
+ Y_FAIL("failed to create socket pair for wakeup sockets: %s", LastSystemErrorText());
}
TSocketHolder wakeupReadSocket(wakeupSockets[0]);
@@ -278,7 +278,7 @@ TEventLoop::TImpl::TImpl(const char* name)
void TEventLoop::TImpl::Run() {
bool res = AtomicCas(&RunningState, EVENT_LOOP_RUNNING, EVENT_LOOP_CREATED);
- Y_VERIFY(res, "Invalid mbus event loop state");
+ Y_VERIFY(res, "Invalid mbus event loop state");
if (!!Name) {
SetCurrentThreadName(Name);
@@ -286,7 +286,7 @@ void TEventLoop::TImpl::Run() {
while (AtomicGet(StopSignal) == 0) {
void* cookies[1024];
- const size_t count = Poller.WaitI(cookies, Y_ARRAY_SIZE(cookies));
+ const size_t count = Poller.WaitI(cookies, Y_ARRAY_SIZE(cookies));
void** end = cookies + count;
for (void** c = cookies; c != end; ++c) {
@@ -295,7 +295,7 @@ void TEventLoop::TImpl::Run() {
if (*c == this) {
char buf[0x1000];
if (NBus::NPrivate::SocketRecv(WakeupReadSocket, buf) < 0) {
- Y_FAIL("failed to recv from wakeup socket: %s", LastSystemErrorText());
+ Y_FAIL("failed to recv from wakeup socket: %s", LastSystemErrorText());
}
continue;
}
@@ -306,14 +306,14 @@ void TEventLoop::TImpl::Run() {
SOCKET socket = -1;
while (SocketsToRemove.Dequeue(&socket)) {
TGuard<TMutex> guard(Mutex);
- Y_VERIFY(Data.erase(socket) == 1, "must be removed once");
+ Y_VERIFY(Data.erase(socket) == 1, "must be removed once");
}
}
{
TGuard<TMutex> guard(Mutex);
- for (auto& it : Data) {
- it.second->Unregister();
+ for (auto& it : Data) {
+ it.second->Unregister();
}
// release file descriptors
@@ -322,7 +322,7 @@ void TEventLoop::TImpl::Run() {
res = AtomicCas(&RunningState, EVENT_LOOP_STOPPED, EVENT_LOOP_RUNNING);
- Y_VERIFY(res);
+ Y_VERIFY(res);
StoppedEvent.Signal();
}
@@ -338,13 +338,13 @@ void TEventLoop::TImpl::Stop() {
}
TChannelPtr TEventLoop::TImpl::Register(TSocket socket, TEventHandlerPtr eventHandler, void* cookie) {
- Y_VERIFY(socket != INVALID_SOCKET, "must be a valid socket");
+ Y_VERIFY(socket != INVALID_SOCKET, "must be a valid socket");
TChannelPtr channel = new TChannel(new TChannel::TImpl(this, socket, eventHandler, cookie));
TGuard<TMutex> guard(Mutex);
- Y_VERIFY(Data.insert(std::make_pair(socket, channel)).second, "must not be already inserted");
+ Y_VERIFY(Data.insert(std::make_pair(socket, channel)).second, "must not be already inserted");
return channel;
}
@@ -352,7 +352,7 @@ TChannelPtr TEventLoop::TImpl::Register(TSocket socket, TEventHandlerPtr eventHa
void TEventLoop::TImpl::Wakeup() {
if (NBus::NPrivate::SocketSend(WakeupWriteSocket, TArrayRef<const char>("", 1)) < 0) {
if (LastSystemError() != EAGAIN) {
- Y_FAIL("failed to send to wakeup socket: %s", LastSystemErrorText());
+ Y_FAIL("failed to send to wakeup socket: %s", LastSystemErrorText());
}
}
}
@@ -365,6 +365,6 @@ void TEventLoop::TImpl::AddToPoller(SOCKET socket, void* cookie, int flags) {
} else if (flags == OP_READ_WRITE) {
Poller.WaitReadWriteOneShot(socket, cookie);
} else {
- Y_FAIL("Wrong flags: %d", int(flags));
+ Y_FAIL("Wrong flags: %d", int(flags));
}
}