aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/messagebus
diff options
context:
space:
mode:
authorAlexander Fokin <apfokin@gmail.com>2022-02-10 16:45:38 +0300
committerDaniil Cherednik <dcherednik@yandex-team.ru>2022-02-10 16:45:38 +0300
commitbf9e69a933f89af083d895185f01ed65e4d90766 (patch)
treeb2cc84ee7850122e7ccf51d0ea21e4fa7e7a5685 /library/cpp/messagebus
parent863a59a65247c24db7cb06789bc5cf79d04da32f (diff)
downloadydb-bf9e69a933f89af083d895185f01ed65e4d90766.tar.gz
Restoring authorship annotation for Alexander Fokin <apfokin@gmail.com>. Commit 2 of 2.
Diffstat (limited to 'library/cpp/messagebus')
-rw-r--r--library/cpp/messagebus/actor/executor.cpp2
-rw-r--r--library/cpp/messagebus/actor/executor.h2
-rw-r--r--library/cpp/messagebus/actor/ring_buffer.h4
-rw-r--r--library/cpp/messagebus/actor/ring_buffer_with_spin_lock.h6
-rw-r--r--library/cpp/messagebus/debug_receiver/debug_receiver_proto.h2
-rw-r--r--library/cpp/messagebus/protobuf/ybusbuf.cpp4
-rw-r--r--library/cpp/messagebus/remote_client_connection.cpp10
-rw-r--r--library/cpp/messagebus/remote_client_session.cpp6
-rw-r--r--library/cpp/messagebus/remote_server_connection.cpp4
-rw-r--r--library/cpp/messagebus/remote_server_connection.h2
-rw-r--r--library/cpp/messagebus/remote_server_session.cpp8
-rw-r--r--library/cpp/messagebus/session_impl.cpp4
-rw-r--r--library/cpp/messagebus/test/helper/example.cpp6
-rw-r--r--library/cpp/messagebus/test/helper/object_count_check.h4
-rw-r--r--library/cpp/messagebus/test/perftest/simple_proto.cpp2
-rw-r--r--library/cpp/messagebus/test/perftest/simple_proto.h2
-rw-r--r--library/cpp/messagebus/vector_swaps.h14
17 files changed, 41 insertions, 41 deletions
diff --git a/library/cpp/messagebus/actor/executor.cpp b/library/cpp/messagebus/actor/executor.cpp
index f7244d5f19..7a2227a458 100644
--- a/library/cpp/messagebus/actor/executor.cpp
+++ b/library/cpp/messagebus/actor/executor.cpp
@@ -210,7 +210,7 @@ void TExecutor::Stop() {
ProcessWorkQueueHere();
}
-void TExecutor::EnqueueWork(TArrayRef<IWorkItem* const> wis) {
+void TExecutor::EnqueueWork(TArrayRef<IWorkItem* const> wis) {
if (wis.empty())
return;
diff --git a/library/cpp/messagebus/actor/executor.h b/library/cpp/messagebus/actor/executor.h
index e073b6dac6..7292d8be53 100644
--- a/library/cpp/messagebus/actor/executor.h
+++ b/library/cpp/messagebus/actor/executor.h
@@ -2,7 +2,7 @@
#include "ring_buffer_with_spin_lock.h"
-#include <util/generic/array_ref.h>
+#include <util/generic/array_ref.h>
#include <util/generic/vector.h>
#include <util/system/condvar.h>
#include <util/system/event.h>
diff --git a/library/cpp/messagebus/actor/ring_buffer.h b/library/cpp/messagebus/actor/ring_buffer.h
index 4aaf92bcbc..ec5706f7c7 100644
--- a/library/cpp/messagebus/actor/ring_buffer.h
+++ b/library/cpp/messagebus/actor/ring_buffer.h
@@ -80,7 +80,7 @@ public:
return WritePos == ReadPos;
}
- void PushAll(TArrayRef<const T> value) {
+ void PushAll(TArrayRef<const T> value) {
ReserveWritable(value.size());
ui32 secondSize;
@@ -107,7 +107,7 @@ public:
}
void Push(const T& t) {
- PushAll(MakeArrayRef(&t, 1));
+ PushAll(MakeArrayRef(&t, 1));
}
bool TryPop(T* r) {
diff --git a/library/cpp/messagebus/actor/ring_buffer_with_spin_lock.h b/library/cpp/messagebus/actor/ring_buffer_with_spin_lock.h
index 4ec1fc2aa4..f0b7cd90e4 100644
--- a/library/cpp/messagebus/actor/ring_buffer_with_spin_lock.h
+++ b/library/cpp/messagebus/actor/ring_buffer_with_spin_lock.h
@@ -21,7 +21,7 @@ public:
PushAll(t);
}
- void PushAll(TArrayRef<const T> collection) {
+ void PushAll(TArrayRef<const T> collection) {
if (collection.empty()) {
return;
}
@@ -59,7 +59,7 @@ public:
}
}
- bool PushAllAndTryPop(TArrayRef<const T> collection, T* r) {
+ bool PushAllAndTryPop(TArrayRef<const T> collection, T* r) {
if (collection.size() == 0) {
return TryPop(r);
} else {
@@ -67,7 +67,7 @@ public:
*r = collection[0];
if (collection.size() > 1) {
TGuard<TSpinLock> guard(SpinLock);
- RingBuffer.PushAll(MakeArrayRef(collection.data() + 1, collection.size() - 1));
+ RingBuffer.PushAll(MakeArrayRef(collection.data() + 1, collection.size() - 1));
AtomicSet(CachedSize, RingBuffer.Size());
}
} else {
diff --git a/library/cpp/messagebus/debug_receiver/debug_receiver_proto.h b/library/cpp/messagebus/debug_receiver/debug_receiver_proto.h
index 768cd24446..d34710dcf7 100644
--- a/library/cpp/messagebus/debug_receiver/debug_receiver_proto.h
+++ b/library/cpp/messagebus/debug_receiver/debug_receiver_proto.h
@@ -23,5 +23,5 @@ struct TDebugReceiverProtocol: public NBus::TBusProtocol {
void Serialize(const NBus::TBusMessage* mess, TBuffer& data) override;
- TAutoPtr<NBus::TBusMessage> Deserialize(ui16 messageType, TArrayRef<const char> payload) override;
+ TAutoPtr<NBus::TBusMessage> Deserialize(ui16 messageType, TArrayRef<const char> payload) override;
};
diff --git a/library/cpp/messagebus/protobuf/ybusbuf.cpp b/library/cpp/messagebus/protobuf/ybusbuf.cpp
index 711a78b558..63415b3737 100644
--- a/library/cpp/messagebus/protobuf/ybusbuf.cpp
+++ b/library/cpp/messagebus/protobuf/ybusbuf.cpp
@@ -37,7 +37,7 @@ void TBusBufferProtocol::RegisterType(TAutoPtr<TBusBufferBase> mess) {
Types.push_back(mess.Release());
}
-TArrayRef<TBusBufferBase* const> TBusBufferProtocol::GetTypes() const {
+TArrayRef<TBusBufferBase* const> TBusBufferProtocol::GetTypes() const {
return Types;
}
@@ -63,7 +63,7 @@ void TBusBufferProtocol::Serialize(const TBusMessage* mess, TBuffer& data) {
data.Advance(size);
}
-TAutoPtr<TBusMessage> TBusBufferProtocol::Deserialize(ui16 messageType, TArrayRef<const char> payload) {
+TAutoPtr<TBusMessage> TBusBufferProtocol::Deserialize(ui16 messageType, TArrayRef<const char> payload) {
TWhatThreadDoesPushPop pp("deserialize protobuf message");
TBusBufferBase* messageTemplate = FindType(messageType);
diff --git a/library/cpp/messagebus/remote_client_connection.cpp b/library/cpp/messagebus/remote_client_connection.cpp
index ffc544ac89..8c7a6db3a8 100644
--- a/library/cpp/messagebus/remote_client_connection.cpp
+++ b/library/cpp/messagebus/remote_client_connection.cpp
@@ -248,7 +248,7 @@ void TRemoteClientConnection::ScheduleTimeoutMessages() {
ScheduleWrite();
}
-void TRemoteClientConnection::ReaderProcessMessageUnknownVersion(TArrayRef<const char>) {
+void TRemoteClientConnection::ReaderProcessMessageUnknownVersion(TArrayRef<const char>) {
LWPROBE(Error, ToString(MESSAGE_INVALID_VERSION), ToString(PeerAddr), "");
ReaderData.Status.Incremental.StatusCounter[MESSAGE_INVALID_VERSION] += 1;
// TODO: close connection
@@ -265,7 +265,7 @@ void TRemoteClientConnection::ClearOutgoingQueue(TMessagesPtrs& result, bool rec
GetSession()->ReleaseInFlight(result);
}
-void TRemoteClientConnection::MessageSent(TArrayRef<TBusMessagePtrAndHeader> messages) {
+void TRemoteClientConnection::MessageSent(TArrayRef<TBusMessagePtrAndHeader> messages) {
for (auto& message : messages) {
bool oneWay = message.LocalFlags & MESSAGE_ONE_WAY_INTERNAL;
@@ -281,7 +281,7 @@ void TRemoteClientConnection::MessageSent(TArrayRef<TBusMessagePtrAndHeader> mes
// TODO: non-unique id?
}
- GetSession()->ReleaseInFlight({message.MessagePtr.Get()});
+ GetSession()->ReleaseInFlight({message.MessagePtr.Get()});
ClientHandler->OnMessageSentOneWay(message.MessagePtr.Release());
} else {
ClientHandler->OnMessageSent(message.MessagePtr.Get());
@@ -314,7 +314,7 @@ EMessageStatus TRemoteClientConnection::SendMessageImpl(TBusMessage* msg, bool w
}
}
- GetSession()->AcquireInFlight({msg});
+ GetSession()->AcquireInFlight({msg});
EMessageStatus ret = MESSAGE_OK;
@@ -334,7 +334,7 @@ EMessageStatus TRemoteClientConnection::SendMessageImpl(TBusMessage* msg, bool w
return MESSAGE_OK;
clean:
msg->LocalFlags &= ~MESSAGE_ONE_WAY_INTERNAL;
- GetSession()->ReleaseInFlight({msg});
+ GetSession()->ReleaseInFlight({msg});
return ret;
}
diff --git a/library/cpp/messagebus/remote_client_session.cpp b/library/cpp/messagebus/remote_client_session.cpp
index a31c509e7b..3bc421944f 100644
--- a/library/cpp/messagebus/remote_client_session.cpp
+++ b/library/cpp/messagebus/remote_client_session.cpp
@@ -71,7 +71,7 @@ void TRemoteClientSession::FillStatus() {
StatusData.Status.InputPaused = false;
}
-void TRemoteClientSession::AcquireInFlight(TArrayRef<TBusMessage* const> messages) {
+void TRemoteClientSession::AcquireInFlight(TArrayRef<TBusMessage* const> messages) {
for (auto message : messages) {
Y_ASSERT(!(message->LocalFlags & MESSAGE_IN_FLIGHT_ON_CLIENT));
message->LocalFlags |= MESSAGE_IN_FLIGHT_ON_CLIENT;
@@ -79,7 +79,7 @@ void TRemoteClientSession::AcquireInFlight(TArrayRef<TBusMessage* const> message
ClientRemoteInFlight.IncrementMultiple(messages.size());
}
-void TRemoteClientSession::ReleaseInFlight(TArrayRef<TBusMessage* const> messages) {
+void TRemoteClientSession::ReleaseInFlight(TArrayRef<TBusMessage* const> messages) {
for (auto message : messages) {
Y_ASSERT(message->LocalFlags & MESSAGE_IN_FLIGHT_ON_CLIENT);
message->LocalFlags &= ~MESSAGE_IN_FLIGHT_ON_CLIENT;
@@ -88,7 +88,7 @@ void TRemoteClientSession::ReleaseInFlight(TArrayRef<TBusMessage* const> message
}
void TRemoteClientSession::ReleaseInFlightAndCallOnReply(TNonDestroyingAutoPtr<TBusMessage> request, TBusMessagePtrAndHeader& response) {
- ReleaseInFlight({request.Get()});
+ ReleaseInFlight({request.Get()});
if (Y_UNLIKELY(AtomicGet(Down))) {
InvokeOnError(request, MESSAGE_SHUTDOWN);
InvokeOnError(response.MessagePtr.Release(), MESSAGE_SHUTDOWN);
diff --git a/library/cpp/messagebus/remote_server_connection.cpp b/library/cpp/messagebus/remote_server_connection.cpp
index 7de0c04c70..74be34ded9 100644
--- a/library/cpp/messagebus/remote_server_connection.cpp
+++ b/library/cpp/messagebus/remote_server_connection.cpp
@@ -51,7 +51,7 @@ bool TRemoteServerConnection::NeedInterruptRead() {
return !GetSession()->ServerOwnedMessages.TryWait();
}
-void TRemoteServerConnection::MessageSent(TArrayRef<TBusMessagePtrAndHeader> messages) {
+void TRemoteServerConnection::MessageSent(TArrayRef<TBusMessagePtrAndHeader> messages) {
TInstant now = TInstant::Now();
GetSession()->ReleaseInWorkResponses(messages);
@@ -64,7 +64,7 @@ void TRemoteServerConnection::MessageSent(TArrayRef<TBusMessagePtrAndHeader> mes
}
}
-void TRemoteServerConnection::ReaderProcessMessageUnknownVersion(TArrayRef<const char> dataRef) {
+void TRemoteServerConnection::ReaderProcessMessageUnknownVersion(TArrayRef<const char> dataRef) {
TBusHeader header(dataRef);
// TODO: full version hex
LWPROBE(ServerUnknownVersion, ToString(PeerAddr), header.GetVersionInternal());
diff --git a/library/cpp/messagebus/remote_server_connection.h b/library/cpp/messagebus/remote_server_connection.h
index 9f4a1d3446..63d7f20646 100644
--- a/library/cpp/messagebus/remote_server_connection.h
+++ b/library/cpp/messagebus/remote_server_connection.h
@@ -2,7 +2,7 @@
#include "session_impl.h"
-#include <util/generic/object_counter.h>
+#include <util/generic/object_counter.h>
namespace NBus {
namespace NPrivate {
diff --git a/library/cpp/messagebus/remote_server_session.cpp b/library/cpp/messagebus/remote_server_session.cpp
index b20aedd349..6abbf88a60 100644
--- a/library/cpp/messagebus/remote_server_session.cpp
+++ b/library/cpp/messagebus/remote_server_session.cpp
@@ -143,9 +143,9 @@ void TRemoteServerSession::FillStatus() {
StatusData.Status.InputPaused = ServerOwnedMessages.IsLocked();
}
-void TRemoteServerSession::AcquireInWorkRequests(TArrayRef<const TBusMessagePtrAndHeader> messages) {
+void TRemoteServerSession::AcquireInWorkRequests(TArrayRef<const TBusMessagePtrAndHeader> messages) {
TAtomicBase size = 0;
- for (auto message = messages.begin(); message != messages.end(); ++message) {
+ for (auto message = messages.begin(); message != messages.end(); ++message) {
Y_ASSERT(!(message->MessagePtr->LocalFlags & MESSAGE_IN_WORK));
message->MessagePtr->LocalFlags |= MESSAGE_IN_WORK;
size += message->MessagePtr->GetHeader()->Size;
@@ -154,9 +154,9 @@ void TRemoteServerSession::AcquireInWorkRequests(TArrayRef<const TBusMessagePtrA
ServerOwnedMessages.IncrementMultiple(messages.size(), size);
}
-void TRemoteServerSession::ReleaseInWorkResponses(TArrayRef<const TBusMessagePtrAndHeader> responses) {
+void TRemoteServerSession::ReleaseInWorkResponses(TArrayRef<const TBusMessagePtrAndHeader> responses) {
TAtomicBase size = 0;
- for (auto response = responses.begin(); response != responses.end(); ++response) {
+ for (auto response = responses.begin(); response != responses.end(); ++response) {
Y_ASSERT((response->MessagePtr->LocalFlags & MESSAGE_REPLY_IS_BEGING_SENT));
response->MessagePtr->LocalFlags &= ~MESSAGE_REPLY_IS_BEGING_SENT;
size += response->MessagePtr->RequestSize;
diff --git a/library/cpp/messagebus/session_impl.cpp b/library/cpp/messagebus/session_impl.cpp
index bcac44326f..ddf9f360c4 100644
--- a/library/cpp/messagebus/session_impl.cpp
+++ b/library/cpp/messagebus/session_impl.cpp
@@ -204,7 +204,7 @@ size_t TBusSessionImpl::GetInFlightImpl(const TNetAddr& addr) const {
}
}
-void TBusSessionImpl::GetInFlightBulk(TArrayRef<const TNetAddr> addrs, TArrayRef<size_t> results) const {
+void TBusSessionImpl::GetInFlightBulk(TArrayRef<const TNetAddr> addrs, TArrayRef<size_t> results) const {
Y_VERIFY(addrs.size() == results.size(), "input.size != output.size");
for (size_t i = 0; i < addrs.size(); ++i) {
results[i] = GetInFlightImpl(addrs[i]);
@@ -220,7 +220,7 @@ size_t TBusSessionImpl::GetConnectSyscallsNumForTestImpl(const TNetAddr& addr) c
}
}
-void TBusSessionImpl::GetConnectSyscallsNumBulkForTest(TArrayRef<const TNetAddr> addrs, TArrayRef<size_t> results) const {
+void TBusSessionImpl::GetConnectSyscallsNumBulkForTest(TArrayRef<const TNetAddr> addrs, TArrayRef<size_t> results) const {
Y_VERIFY(addrs.size() == results.size(), "input.size != output.size");
for (size_t i = 0; i < addrs.size(); ++i) {
results[i] = GetConnectSyscallsNumForTestImpl(addrs[i]);
diff --git a/library/cpp/messagebus/test/helper/example.cpp b/library/cpp/messagebus/test/helper/example.cpp
index a488f96493..7c6d704042 100644
--- a/library/cpp/messagebus/test/helper/example.cpp
+++ b/library/cpp/messagebus/test/helper/example.cpp
@@ -7,7 +7,7 @@
using namespace NBus;
using namespace NBus::NTest;
-static void FillWithJunk(TArrayRef<char> data) {
+static void FillWithJunk(TArrayRef<char> data) {
TStringBuf junk =
"01234567890123456789012345678901234567890123456789012345678901234567890123456789"
"01234567890123456789012345678901234567890123456789012345678901234567890123456789"
@@ -21,7 +21,7 @@ static void FillWithJunk(TArrayRef<char> data) {
static TString JunkString(size_t len) {
TTempBuf temp(len);
- TArrayRef<char> tempArrayRef(temp.Data(), len);
+ TArrayRef<char> tempArrayRef(temp.Data(), len);
FillWithJunk(tempArrayRef);
return TString(tempArrayRef.data(), tempArrayRef.size());
@@ -87,7 +87,7 @@ void TExampleProtocol::Serialize(const TBusMessage* message, TBuffer& buffer) {
}
}
-TAutoPtr<TBusMessage> TExampleProtocol::Deserialize(ui16 messageType, TArrayRef<const char> payload) {
+TAutoPtr<TBusMessage> TExampleProtocol::Deserialize(ui16 messageType, TArrayRef<const char> payload) {
// TODO: check data
Y_UNUSED(payload);
diff --git a/library/cpp/messagebus/test/helper/object_count_check.h b/library/cpp/messagebus/test/helper/object_count_check.h
index 1bc3f90917..1c4756e58c 100644
--- a/library/cpp/messagebus/test/helper/object_count_check.h
+++ b/library/cpp/messagebus/test/helper/object_count_check.h
@@ -10,7 +10,7 @@
#include <library/cpp/messagebus/oldmodule/module.h>
#include <library/cpp/messagebus/scheduler/scheduler.h>
-#include <util/generic/object_counter.h>
+#include <util/generic/object_counter.h>
#include <util/system/type_name.h>
#include <util/stream/output.h>
@@ -29,7 +29,7 @@ struct TObjectCountCheck {
}
void operator()() {
- long oldValue = TObjectCounter<T>::ResetObjectCount();
+ long oldValue = TObjectCounter<T>::ResetObjectCount();
if (oldValue != 0) {
Cerr << "warning: previous counter: " << oldValue << " for " << TypeName<T>() << Endl;
Cerr << "won't check in this test" << Endl;
diff --git a/library/cpp/messagebus/test/perftest/simple_proto.cpp b/library/cpp/messagebus/test/perftest/simple_proto.cpp
index 463fe6528b..19d6c15b9d 100644
--- a/library/cpp/messagebus/test/perftest/simple_proto.cpp
+++ b/library/cpp/messagebus/test/perftest/simple_proto.cpp
@@ -12,7 +12,7 @@ void TSimpleProtocol::Serialize(const TBusMessage* mess, TBuffer& data) {
data.Append((const char*)&typed->Payload, 4);
}
-TAutoPtr<TBusMessage> TSimpleProtocol::Deserialize(ui16, TArrayRef<const char> payload) {
+TAutoPtr<TBusMessage> TSimpleProtocol::Deserialize(ui16, TArrayRef<const char> payload) {
if (payload.size() != 4) {
return nullptr;
}
diff --git a/library/cpp/messagebus/test/perftest/simple_proto.h b/library/cpp/messagebus/test/perftest/simple_proto.h
index 8479181809..4a0cc08db3 100644
--- a/library/cpp/messagebus/test/perftest/simple_proto.h
+++ b/library/cpp/messagebus/test/perftest/simple_proto.h
@@ -25,5 +25,5 @@ struct TSimpleProtocol: public NBus::TBusProtocol {
void Serialize(const NBus::TBusMessage* mess, TBuffer& data) override;
- TAutoPtr<NBus::TBusMessage> Deserialize(ui16 ty, TArrayRef<const char> payload) override;
+ TAutoPtr<NBus::TBusMessage> Deserialize(ui16 ty, TArrayRef<const char> payload) override;
};
diff --git a/library/cpp/messagebus/vector_swaps.h b/library/cpp/messagebus/vector_swaps.h
index 1d727f58ee..b920bcf03e 100644
--- a/library/cpp/messagebus/vector_swaps.h
+++ b/library/cpp/messagebus/vector_swaps.h
@@ -1,13 +1,13 @@
#pragma once
-#include <util/generic/array_ref.h>
+#include <util/generic/array_ref.h>
#include <util/generic/noncopyable.h>
#include <util/generic/utility.h>
#include <util/system/yassert.h>
#include <stdlib.h>
-template <typename T, class A = std::allocator<T>>
+template <typename T, class A = std::allocator<T>>
class TVectorSwaps : TNonCopyable {
private:
T* Start;
@@ -40,12 +40,12 @@ public:
free(Start);
}
- operator TArrayRef<const T>() const {
- return MakeArrayRef(data(), size());
+ operator TArrayRef<const T>() const {
+ return MakeArrayRef(data(), size());
}
- operator TArrayRef<T>() {
- return MakeArrayRef(data(), size());
+ operator TArrayRef<T>() {
+ return MakeArrayRef(data(), size());
}
size_t capacity() const {
@@ -166,6 +166,6 @@ public:
}
void push_back(T& elem) {
- insert(end(), &elem, &elem + 1);
+ insert(end(), &elem, &elem + 1);
}
};