diff options
author | Alexander Gololobov <davenger@yandex-team.com> | 2022-02-10 16:47:37 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:47:37 +0300 |
commit | 39608cdb86363c75ce55b2b9a69841c3b71f22cf (patch) | |
tree | 4ec132c1665bd4d68e3628aa18d937c70d32413b /library/cpp/messagebus | |
parent | 54295b9bd4dc45c54d804084fd846d945148a7f0 (diff) | |
download | ydb-39608cdb86363c75ce55b2b9a69841c3b71f22cf.tar.gz |
Restoring authorship annotation for Alexander Gololobov <davenger@yandex-team.com>. Commit 1 of 2.
Diffstat (limited to 'library/cpp/messagebus')
-rw-r--r-- | library/cpp/messagebus/actor/executor.cpp | 20 | ||||
-rw-r--r-- | library/cpp/messagebus/actor/thread_extra.h | 22 | ||||
-rw-r--r-- | library/cpp/messagebus/actor/what_thread_does.cpp | 8 | ||||
-rw-r--r-- | library/cpp/messagebus/config/defs.h | 2 | ||||
-rw-r--r-- | library/cpp/messagebus/latch.h | 4 | ||||
-rw-r--r-- | library/cpp/messagebus/local_tasks.h | 12 | ||||
-rw-r--r-- | library/cpp/messagebus/message.h | 2 | ||||
-rw-r--r-- | library/cpp/messagebus/oldmodule/module.cpp | 8 | ||||
-rw-r--r-- | library/cpp/messagebus/protobuf/ybusbuf.cpp | 10 | ||||
-rw-r--r-- | library/cpp/messagebus/remote_connection.cpp | 6 | ||||
-rw-r--r-- | library/cpp/messagebus/remote_connection_status.cpp | 2 | ||||
-rw-r--r-- | library/cpp/messagebus/test/ut/messagebus_ut.cpp | 70 | ||||
-rw-r--r-- | library/cpp/messagebus/test/ut/one_way_ut.cpp | 20 |
13 files changed, 93 insertions, 93 deletions
diff --git a/library/cpp/messagebus/actor/executor.cpp b/library/cpp/messagebus/actor/executor.cpp index 7a2227a458..b58e07f4bd 100644 --- a/library/cpp/messagebus/actor/executor.cpp +++ b/library/cpp/messagebus/actor/executor.cpp @@ -18,7 +18,7 @@ using namespace NActor::NPrivate; namespace { struct THistoryInternal { struct TRecord { - TAtomic MaxQueueSize; + TAtomic MaxQueueSize; TRecord() : MaxQueueSize() @@ -27,7 +27,7 @@ namespace { TExecutorHistory::THistoryRecord Capture() { TExecutorHistory::THistoryRecord r; - r.MaxQueueSize = AtomicGet(MaxQueueSize); + r.MaxQueueSize = AtomicGet(MaxQueueSize); return r; } }; @@ -237,14 +237,14 @@ size_t TExecutor::GetWorkQueueSize() const { return WorkItems.Size(); } -using namespace NTSAN; - +using namespace NTSAN; + ui32 TExecutor::GetMaxQueueSizeAndClear() const { ui32 max = 0; for (unsigned i = 0; i < WorkerThreads.size(); ++i) { - TExecutorWorkerThreadLocalData* wtls = RelaxedLoad(&WorkerThreads[i]->ThreadLocalData); - max = Max<ui32>(max, RelaxedLoad(&wtls->MaxQueueSize)); - RelaxedStore<ui32>(&wtls->MaxQueueSize, 0); + TExecutorWorkerThreadLocalData* wtls = RelaxedLoad(&WorkerThreads[i]->ThreadLocalData); + max = Max<ui32>(max, RelaxedLoad(&wtls->MaxQueueSize)); + RelaxedStore<ui32>(&wtls->MaxQueueSize, 0); } return max; } @@ -269,7 +269,7 @@ TExecutorStatus TExecutor::GetStatusRecordInternal() const { ss << "work items: " << GetWorkQueueSize() << "\n"; ss << "workers:\n"; for (unsigned i = 0; i < WorkerThreads.size(); ++i) { - ss << "-- " << AtomicGet(*AtomicGet(WorkerThreads[i]->WhatThreadDoesLocation)) << "\n"; + ss << "-- " << AtomicGet(*AtomicGet(WorkerThreads[i]->WhatThreadDoesLocation)) << "\n"; } r.Status = ss.Str(); } @@ -299,8 +299,8 @@ TAutoPtr<IWorkItem> TExecutor::DequeueWork() { auto& wtls = TlsRef(WorkerThreadLocalData); - if (queueSize > RelaxedLoad(&wtls.MaxQueueSize)) { - RelaxedStore<ui32>(&wtls.MaxQueueSize, queueSize); + if (queueSize > RelaxedLoad(&wtls.MaxQueueSize)) { + RelaxedStore<ui32>(&wtls.MaxQueueSize, queueSize); } return wi; diff --git a/library/cpp/messagebus/actor/thread_extra.h b/library/cpp/messagebus/actor/thread_extra.h index b5aa151618..e4f37a9760 100644 --- a/library/cpp/messagebus/actor/thread_extra.h +++ b/library/cpp/messagebus/actor/thread_extra.h @@ -2,28 +2,28 @@ #include <util/thread/singleton.h> -namespace NTSAN { +namespace NTSAN { template <typename T> inline void RelaxedStore(volatile T* a, T x) { static_assert(std::is_integral<T>::value || std::is_pointer<T>::value, "expect std::is_integral<T>::value || std::is_pointer<T>::value"); -#ifdef _win_ +#ifdef _win_ *a = x; -#else +#else __atomic_store_n(a, x, __ATOMIC_RELAXED); -#endif +#endif } - + template <typename T> inline T RelaxedLoad(volatile T* a) { -#ifdef _win_ +#ifdef _win_ return *a; -#else +#else return __atomic_load_n(a, __ATOMIC_RELAXED); -#endif +#endif } - -} - + +} + void SetCurrentThreadName(const char* name); namespace NThreadExtra { diff --git a/library/cpp/messagebus/actor/what_thread_does.cpp b/library/cpp/messagebus/actor/what_thread_does.cpp index bebb6a888c..bce5ccd15e 100644 --- a/library/cpp/messagebus/actor/what_thread_does.cpp +++ b/library/cpp/messagebus/actor/what_thread_does.cpp @@ -1,6 +1,6 @@ #include "what_thread_does.h" -#include "thread_extra.h" +#include "thread_extra.h" #include <util/system/tls.h> @@ -8,13 +8,13 @@ Y_POD_STATIC_THREAD(const char*) WhatThreadDoes; const char* PushWhatThreadDoes(const char* what) { - const char* r = NTSAN::RelaxedLoad(&WhatThreadDoes); - NTSAN::RelaxedStore(&WhatThreadDoes, what); + const char* r = NTSAN::RelaxedLoad(&WhatThreadDoes); + NTSAN::RelaxedStore(&WhatThreadDoes, what); return r; } void PopWhatThreadDoes(const char* prev) { - NTSAN::RelaxedStore(&WhatThreadDoes, prev); + NTSAN::RelaxedStore(&WhatThreadDoes, prev); } const char** WhatThreadDoesLocation() { diff --git a/library/cpp/messagebus/config/defs.h b/library/cpp/messagebus/config/defs.h index 92b1df9969..5db4ef4dae 100644 --- a/library/cpp/messagebus/config/defs.h +++ b/library/cpp/messagebus/config/defs.h @@ -70,7 +70,7 @@ namespace NBus { inline bool IsBusKeyValid(TBusKey key) { return key != YBUS_KEYINVALID && key != YBUS_KEYMAX && key > YBUS_KEYLOCAL; } - + #define YBUS_VERSION 0 #define YBUS_INFINITE (1u << 30u) diff --git a/library/cpp/messagebus/latch.h b/library/cpp/messagebus/latch.h index 373f4c0e13..3677ee7c29 100644 --- a/library/cpp/messagebus/latch.h +++ b/library/cpp/messagebus/latch.h @@ -23,7 +23,7 @@ public: } TGuard<TMutex> guard(Mutex); - while (AtomicGet(Locked) == 1) { + while (AtomicGet(Locked) == 1) { CondVar.WaitI(Mutex); } } @@ -39,7 +39,7 @@ public: } TGuard<TMutex> guard(Mutex); - AtomicSet(Locked, 0); + AtomicSet(Locked, 0); CondVar.BroadCast(); } diff --git a/library/cpp/messagebus/local_tasks.h b/library/cpp/messagebus/local_tasks.h index d8e801a457..c92d197ca5 100644 --- a/library/cpp/messagebus/local_tasks.h +++ b/library/cpp/messagebus/local_tasks.h @@ -1,23 +1,23 @@ #pragma once -#include <util/system/atomic.h> - +#include <util/system/atomic.h> + class TLocalTasks { private: - TAtomic GotTasks; + TAtomic GotTasks; public: TLocalTasks() - : GotTasks(0) + : GotTasks(0) { } void AddTask() { - AtomicSet(GotTasks, 1); + AtomicSet(GotTasks, 1); } bool FetchTask() { - bool gotTasks = AtomicCas(&GotTasks, 0, 1); + bool gotTasks = AtomicCas(&GotTasks, 0, 1); return gotTasks; } }; diff --git a/library/cpp/messagebus/message.h b/library/cpp/messagebus/message.h index 005ca10c65..bf57f13dde 100644 --- a/library/cpp/messagebus/message.h +++ b/library/cpp/messagebus/message.h @@ -155,7 +155,7 @@ namespace NBus { inline bool IsVersionNegotiation(const NBus::TBusHeader& header) { return header.Id == 0 && header.Size == sizeof(TBusHeader); } - + ////////////////////////////////////////////////////////// /// \brief Base class for all messages passed in the system diff --git a/library/cpp/messagebus/oldmodule/module.cpp b/library/cpp/messagebus/oldmodule/module.cpp index 24bd778799..a322a2366f 100644 --- a/library/cpp/messagebus/oldmodule/module.cpp +++ b/library/cpp/messagebus/oldmodule/module.cpp @@ -777,7 +777,7 @@ void TBusModuleImpl::DestroyJob(TJobRunner* job) { Y_VERIFY(jobCount >= 0, "decremented too much"); Jobs.erase(job->JobStorageIterator); - if (AtomicGet(State) == STOPPED) { + if (AtomicGet(State) == STOPPED) { if (jobCount == 0) { ShutdownCondVar.BroadCast(); } @@ -804,11 +804,11 @@ void TBusModuleImpl::OnMessageReceived(TAutoPtr<TBusMessage> msg0, TOnMessageCon } void TBusModuleImpl::Shutdown() { - if (AtomicGet(State) != TBusModuleImpl::RUNNING) { - AtomicSet(State, TBusModuleImpl::STOPPED); + if (AtomicGet(State) != TBusModuleImpl::RUNNING) { + AtomicSet(State, TBusModuleImpl::STOPPED); return; } - AtomicSet(State, TBusModuleImpl::STOPPED); + AtomicSet(State, TBusModuleImpl::STOPPED); for (auto& clientSession : ClientSessions) { clientSession->Shutdown(); diff --git a/library/cpp/messagebus/protobuf/ybusbuf.cpp b/library/cpp/messagebus/protobuf/ybusbuf.cpp index 63415b3737..90ff132942 100644 --- a/library/cpp/messagebus/protobuf/ybusbuf.cpp +++ b/library/cpp/messagebus/protobuf/ybusbuf.cpp @@ -75,12 +75,12 @@ TAutoPtr<TBusMessage> TBusBufferProtocol::Deserialize(ui16 messageType, TArrayRe // clone the base TAutoPtr<TBusBufferBase> bmess = messageTemplate->New(); - // Need to override protobuf message size limit - // NOTE: the payload size has already been checked against session MaxMessageSize - google::protobuf::io::CodedInputStream input(reinterpret_cast<const ui8*>(payload.data()), payload.size()); + // Need to override protobuf message size limit + // NOTE: the payload size has already been checked against session MaxMessageSize + google::protobuf::io::CodedInputStream input(reinterpret_cast<const ui8*>(payload.data()), payload.size()); input.SetTotalBytesLimit(payload.size()); - - bool ok = bmess->GetRecord()->ParseFromCodedStream(&input) && input.ConsumedEntireMessage(); + + bool ok = bmess->GetRecord()->ParseFromCodedStream(&input) && input.ConsumedEntireMessage(); if (!ok) { return nullptr; } diff --git a/library/cpp/messagebus/remote_connection.cpp b/library/cpp/messagebus/remote_connection.cpp index 22932569db..2113b5622f 100644 --- a/library/cpp/messagebus/remote_connection.cpp +++ b/library/cpp/messagebus/remote_connection.cpp @@ -186,7 +186,7 @@ namespace NBus { } ReaderData.DropChannel(); - + ReaderData.Status.Fd = readSocket.Socket; ReaderData.SocketVersion = readSocket.SocketVersion; @@ -393,8 +393,8 @@ namespace NBus { } return true; - } - + } + bool TRemoteConnection::ReaderFillBuffer() { if (!ReaderData.BufferMore()) return true; diff --git a/library/cpp/messagebus/remote_connection_status.cpp b/library/cpp/messagebus/remote_connection_status.cpp index 2c48b2a287..c34c875536 100644 --- a/library/cpp/messagebus/remote_connection_status.cpp +++ b/library/cpp/messagebus/remote_connection_status.cpp @@ -202,7 +202,7 @@ TString TRemoteConnectionStatus::PrintToString() const { p.AddRow("write buffer cap", LeftPad(WriterStatus.BufferSize, 12)); p.AddRow("read buffer cap", LeftPad(ReaderStatus.BufferSize, 12)); - + p.AddRow("write buffer drops", LeftPad(WriterStatus.Incremental.BufferDrops, 10)); p.AddRow("read buffer drops", LeftPad(ReaderStatus.Incremental.BufferDrops, 10)); diff --git a/library/cpp/messagebus/test/ut/messagebus_ut.cpp b/library/cpp/messagebus/test/ut/messagebus_ut.cpp index 040f9b7702..e771a933ca 100644 --- a/library/cpp/messagebus/test/ut/messagebus_ut.cpp +++ b/library/cpp/messagebus/test/ut/messagebus_ut.cpp @@ -313,9 +313,9 @@ Y_UNIT_TEST_SUITE(TMessageBusTests) { TMutex Lock_; TDeque<TAutoPtr<TOnMessageContext>> DelayedMessages; - TDelayReplyServer() - : MessageReceivedEvent(TEventResetType::rAuto) - { + TDelayReplyServer() + : MessageReceivedEvent(TEventResetType::rAuto) + { Bus = CreateMessageQueue("TDelayReplyServer"); TBusServerSessionConfig sessionConfig; sessionConfig.SendTimeout = 1000; @@ -617,30 +617,30 @@ Y_UNIT_TEST_SUITE(TMessageBusTests) { } Y_UNIT_TEST(ServerMessageReservedIds) { - TObjectCountCheck objectCountCheck; - - TExampleServer server; - TNetAddr serverAddr = server.GetActualListenAddr(); - - TExampleClient client; - - client.SendMessagesWaitReplies(2, serverAddr); - - // This test doens't check 0, 1, YBUS_KEYINVALID because there are asserts() on sending side - - TAutoPtr<TBusMessage> req(new TExampleRequest(&client.Proto.RequestCount)); - req->GetHeader()->Id = 2; - client.Session->SendMessageAutoPtr(req, &serverAddr); - client.MessageCount = 1; - client.WaitForError(MESSAGE_DELIVERY_FAILED); - - req.Reset(new TExampleRequest(&client.Proto.RequestCount)); - req->GetHeader()->Id = YBUS_KEYLOCAL; - client.Session->SendMessageAutoPtr(req, &serverAddr); - client.MessageCount = 1; - client.WaitForError(MESSAGE_DELIVERY_FAILED); - } - + TObjectCountCheck objectCountCheck; + + TExampleServer server; + TNetAddr serverAddr = server.GetActualListenAddr(); + + TExampleClient client; + + client.SendMessagesWaitReplies(2, serverAddr); + + // This test doens't check 0, 1, YBUS_KEYINVALID because there are asserts() on sending side + + TAutoPtr<TBusMessage> req(new TExampleRequest(&client.Proto.RequestCount)); + req->GetHeader()->Id = 2; + client.Session->SendMessageAutoPtr(req, &serverAddr); + client.MessageCount = 1; + client.WaitForError(MESSAGE_DELIVERY_FAILED); + + req.Reset(new TExampleRequest(&client.Proto.RequestCount)); + req->GetHeader()->Id = YBUS_KEYLOCAL; + client.Session->SendMessageAutoPtr(req, &serverAddr); + client.MessageCount = 1; + client.WaitForError(MESSAGE_DELIVERY_FAILED); + } + Y_UNIT_TEST(TestGetInFlightForDestination) { TObjectCountCheck objectCountCheck; @@ -661,7 +661,7 @@ Y_UNIT_TEST_SUITE(TMessageBusTests) { break; } } - UNIT_ASSERT_VALUES_EQUAL(server.GetDelayedMessageCount(), 2); + UNIT_ASSERT_VALUES_EQUAL(server.GetDelayedMessageCount(), 2); size_t inFlight = client.Session->GetInFlight(addr); // 4 is for messagebus1 that adds inFlight counter twice for some reason @@ -731,10 +731,10 @@ Y_UNIT_TEST_SUITE(TMessageBusTests) { // check reset is possible here message->Reset(); - // intentionally don't destroy the message - // we will try to resend it + // intentionally don't destroy the message + // we will try to resend it Y_UNUSED(message.Release()); - + TestSync.CheckAndIncrement(1); } }; @@ -760,8 +760,8 @@ Y_UNIT_TEST_SUITE(TMessageBusTests) { // check reset is possible here message->Reset(); client.TestSync.CheckAndIncrement(3); - - delete message; + + delete message; } Y_UNIT_TEST(ResetAfterSendOneWayErrorInReturn) { @@ -865,8 +865,8 @@ Y_UNIT_TEST_SUITE(TMessageBusTests) { request.SetVersionInternal(0xF); // max output.Write(&request, sizeof(request)); - UNIT_ASSERT_VALUES_EQUAL(IsVersionNegotiation(request), true); - + UNIT_ASSERT_VALUES_EQUAL(IsVersionNegotiation(request), true); + TStreamSocketInput input(&socket); TBusHeader response; diff --git a/library/cpp/messagebus/test/ut/one_way_ut.cpp b/library/cpp/messagebus/test/ut/one_way_ut.cpp index 9c21227e2b..bc78c5238a 100644 --- a/library/cpp/messagebus/test/ut/one_way_ut.cpp +++ b/library/cpp/messagebus/test/ut/one_way_ut.cpp @@ -93,7 +93,7 @@ public: TExampleProtocol Proto; public: - TAtomic NumMessages; + TAtomic NumMessages; NullServer() { NumMessages = 0; @@ -119,7 +119,7 @@ public: /// tell session to forget this message and never expect any reply mess.ForgetRequest(); - AtomicIncrement(NumMessages); + AtomicIncrement(NumMessages); } /// this handler should not be called because this server does not send replies @@ -139,10 +139,10 @@ Y_UNIT_TEST_SUITE(TMessageBusTests_OneWay) { client.Work(); // wait until all client message are delivered - UNIT_WAIT_FOR(AtomicGet(server.NumMessages) == 10); + UNIT_WAIT_FOR(AtomicGet(server.NumMessages) == 10); // assert correct number of messages - UNIT_ASSERT_VALUES_EQUAL(AtomicGet(server.NumMessages), 10); + UNIT_ASSERT_VALUES_EQUAL(AtomicGet(server.NumMessages), 10); UNIT_ASSERT_VALUES_EQUAL(server.Session->GetInFlight(), 0); UNIT_ASSERT_VALUES_EQUAL(client.Session->GetInFlight(), 0); } @@ -196,7 +196,7 @@ Y_UNIT_TEST_SUITE(TMessageBusTests_OneWay) { TBusClientSessionConfig sessionConfig; sessionConfig.SendTimeout = 1; sessionConfig.ConnectTimeout = 1; - sessionConfig.Secret.TimeoutPeriod = TDuration::MilliSeconds(10); + sessionConfig.Secret.TimeoutPeriod = TDuration::MilliSeconds(10); return sessionConfig; } @@ -245,11 +245,11 @@ Y_UNIT_TEST_SUITE(TMessageBusTests_OneWay) { first = false; } - // BUGBUG: The test is buggy: the client might not get any error when sending one-way messages. - // All the messages that the client has sent before he gets first MESSAGE_BUSY error might get - // serailized and written to the socket buffer, so the write queue gets drained and there are - // no messages to timeout when periodic timeout check happens. - + // BUGBUG: The test is buggy: the client might not get any error when sending one-way messages. + // All the messages that the client has sent before he gets first MESSAGE_BUSY error might get + // serailized and written to the socket buffer, so the write queue gets drained and there are + // no messages to timeout when periodic timeout check happens. + client.GotError.WaitI(); } } |