aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/messagebus
diff options
context:
space:
mode:
authorsingle <single@yandex-team.ru>2022-02-10 16:50:29 +0300
committerDaniil Cherednik <dcherednik@yandex-team.ru>2022-02-10 16:50:29 +0300
commit8ae96df130bbede609c3504aa9af1bc6ff5361b3 (patch)
tree4751832974bd75ca721269aa54faa15d76032dfb /library/cpp/messagebus
parent5d4e7b7c923852e0f6398791ec98a60cf9faab46 (diff)
downloadydb-8ae96df130bbede609c3504aa9af1bc6ff5361b3.tar.gz
Restoring authorship annotation for <single@yandex-team.ru>. Commit 1 of 2.
Diffstat (limited to 'library/cpp/messagebus')
-rw-r--r--library/cpp/messagebus/acceptor.cpp18
-rw-r--r--library/cpp/messagebus/acceptor.h2
-rw-r--r--library/cpp/messagebus/actor/queue_for_actor.h6
-rw-r--r--library/cpp/messagebus/actor/temp_tls_vector.h22
-rw-r--r--library/cpp/messagebus/config/netaddr.cpp2
-rw-r--r--library/cpp/messagebus/config/netaddr.h4
-rw-r--r--library/cpp/messagebus/config/session_config.cpp2
-rw-r--r--library/cpp/messagebus/event_loop.cpp18
-rw-r--r--library/cpp/messagebus/misc/granup.h66
-rw-r--r--library/cpp/messagebus/misc/tokenquota.h126
-rw-r--r--library/cpp/messagebus/remote_connection.cpp100
-rw-r--r--library/cpp/messagebus/remote_connection.h20
-rw-r--r--library/cpp/messagebus/remote_connection_status.cpp16
-rw-r--r--library/cpp/messagebus/remote_server_session.cpp24
-rw-r--r--library/cpp/messagebus/session_impl.cpp26
-rw-r--r--library/cpp/messagebus/test/ut/messagebus_ut.cpp166
-rw-r--r--library/cpp/messagebus/www/www.cpp18
17 files changed, 318 insertions, 318 deletions
diff --git a/library/cpp/messagebus/acceptor.cpp b/library/cpp/messagebus/acceptor.cpp
index 64a38619c2..de8810d02e 100644
--- a/library/cpp/messagebus/acceptor.cpp
+++ b/library/cpp/messagebus/acceptor.cpp
@@ -19,7 +19,7 @@ TAcceptor::TAcceptor(TBusSessionImpl* session, ui64 acceptorId, SOCKET socket, c
: TActor<TAcceptor>(session->Queue->WorkQueue.Get())
, AcceptorId(acceptorId)
, Session(session)
- , GranStatus(session->Config.Secret.StatusFlushPeriod)
+ , GranStatus(session->Config.Secret.StatusFlushPeriod)
{
SetNonBlock(socket, true);
@@ -30,7 +30,7 @@ TAcceptor::TAcceptor(TBusSessionImpl* session, ui64 acceptorId, SOCKET socket, c
Stats.Fd = socket;
Stats.ListenAddr = addr;
- SendStatus(TInstant::Now());
+ SendStatus(TInstant::Now());
}
void TAcceptor::Act(TDefaultTag) {
@@ -40,8 +40,8 @@ void TAcceptor::Act(TDefaultTag) {
return;
}
- TInstant now = TInstant::Now();
-
+ TInstant now = TInstant::Now();
+
if (state == SS_SHUTDOWN_COMMAND) {
if (!!Channel) {
Channel->Unregister();
@@ -49,7 +49,7 @@ void TAcceptor::Act(TDefaultTag) {
Stats.Fd = INVALID_SOCKET;
}
- SendStatus(now);
+ SendStatus(now);
Session->GetDeadAcceptorStatusQueue()->EnqueueAndSchedule(Stats);
Stats.ResetIncremental();
@@ -96,7 +96,7 @@ void TAcceptor::Act(TDefaultTag) {
Session->GetOnAcceptQueue()->EnqueueAndSchedule(onAccept);
- Stats.LastAcceptSuccessInstant = now;
+ Stats.LastAcceptSuccessInstant = now;
++Stats.AcceptSuccessCount;
}
@@ -105,11 +105,11 @@ void TAcceptor::Act(TDefaultTag) {
Channel->EnableRead();
- SendStatus(now);
+ SendStatus(now);
}
-void TAcceptor::SendStatus(TInstant now) {
- GranStatus.Listen.Update(Stats, now);
+void TAcceptor::SendStatus(TInstant now) {
+ GranStatus.Listen.Update(Stats, now);
}
void TAcceptor::HandleEvent(SOCKET socket, void* cookie) {
diff --git a/library/cpp/messagebus/acceptor.h b/library/cpp/messagebus/acceptor.h
index 57cb010bf2..8ec2229d63 100644
--- a/library/cpp/messagebus/acceptor.h
+++ b/library/cpp/messagebus/acceptor.h
@@ -55,6 +55,6 @@ namespace NBus {
TGranStatus GranStatus;
};
-
+
}
}
diff --git a/library/cpp/messagebus/actor/queue_for_actor.h b/library/cpp/messagebus/actor/queue_for_actor.h
index 40fa536b82..d26a546296 100644
--- a/library/cpp/messagebus/actor/queue_for_actor.h
+++ b/library/cpp/messagebus/actor/queue_for_actor.h
@@ -60,15 +60,15 @@ namespace NActor {
temp.Shrink();
}
}
-
+
template <typename TFunc>
void DequeueAllLikelyEmpty(const TFunc& func) {
if (Y_LIKELY(IsEmpty())) {
return;
}
-
+
DequeueAll(func);
- }
+ }
};
}
diff --git a/library/cpp/messagebus/actor/temp_tls_vector.h b/library/cpp/messagebus/actor/temp_tls_vector.h
index 675d92f5b0..407703d702 100644
--- a/library/cpp/messagebus/actor/temp_tls_vector.h
+++ b/library/cpp/messagebus/actor/temp_tls_vector.h
@@ -23,18 +23,18 @@ public:
}
~TTempTlsVector() {
- Clear();
- }
-
- void Clear() {
+ Clear();
+ }
+
+ void Clear() {
Vector->clear();
}
-
+
size_t Capacity() const noexcept {
- return Vector->capacity();
- }
-
- void Shrink() {
- Vector->shrink_to_fit();
- }
+ return Vector->capacity();
+ }
+
+ void Shrink() {
+ Vector->shrink_to_fit();
+ }
};
diff --git a/library/cpp/messagebus/config/netaddr.cpp b/library/cpp/messagebus/config/netaddr.cpp
index 962ac538e2..c1cb356840 100644
--- a/library/cpp/messagebus/config/netaddr.cpp
+++ b/library/cpp/messagebus/config/netaddr.cpp
@@ -129,7 +129,7 @@ namespace NBus {
ythrow TNetAddr::TError() << "cannot resolve " << host << ":" << port << " into " << Describe(requireVersion);
}
}
-
+
TNetAddr::TNetAddr(const TNetworkAddress& na, EIpVersion requireVersion /*= EIP_VERSION_ANY*/, EIpVersion preferVersion /*= EIP_VERSION_ANY*/)
: Ptr(MakeAddress(na, requireVersion, preferVersion))
{
diff --git a/library/cpp/messagebus/config/netaddr.h b/library/cpp/messagebus/config/netaddr.h
index b79c0cc355..ccb4b42810 100644
--- a/library/cpp/messagebus/config/netaddr.h
+++ b/library/cpp/messagebus/config/netaddr.h
@@ -36,14 +36,14 @@ namespace NBus {
public:
class TError: public yexception {
};
-
+
TNetAddr();
TNetAddr(TAutoPtr<IRemoteAddr> addr);
TNetAddr(const char* hostPort, EIpVersion requireVersion = EIP_VERSION_ANY, EIpVersion preferVersion = EIP_VERSION_ANY);
TNetAddr(TStringBuf host, int port, EIpVersion requireVersion = EIP_VERSION_ANY, EIpVersion preferVersion = EIP_VERSION_ANY);
TNetAddr(const TNetworkAddress& na, EIpVersion requireVersion = EIP_VERSION_ANY, EIpVersion preferVersion = EIP_VERSION_ANY);
TNetAddr(const TNetworkAddress& na, const TAddrInfo& ai);
-
+
bool operator==(const TNetAddr&) const;
bool operator!=(const TNetAddr& other) const {
return !(*this == other);
diff --git a/library/cpp/messagebus/config/session_config.cpp b/library/cpp/messagebus/config/session_config.cpp
index fbbbb106c9..17157b3dfa 100644
--- a/library/cpp/messagebus/config/session_config.cpp
+++ b/library/cpp/messagebus/config/session_config.cpp
@@ -120,7 +120,7 @@ void TBusSessionConfig::ConfigureLastGetopt(NLastGetopt::TOpts& opts,
opts.AddLongOption(prefix + "max-message-size")
.RequiredArgument("BYTES")
.DefaultValue(ToString(MaxMessageSize))
- .StoreMappedResultT<const char*>(&MaxMessageSize, &ParseWithKmgSuffix);
+ .StoreMappedResultT<const char*>(&MaxMessageSize, &ParseWithKmgSuffix);
opts.AddLongOption(prefix + "socket-recv-buffer-size")
.RequiredArgument("BYTES")
.DefaultValue(ToString(SocketRecvBufferSize))
diff --git a/library/cpp/messagebus/event_loop.cpp b/library/cpp/messagebus/event_loop.cpp
index f685135bed..fd2e726d0b 100644
--- a/library/cpp/messagebus/event_loop.cpp
+++ b/library/cpp/messagebus/event_loop.cpp
@@ -78,7 +78,7 @@ public:
const char* Name;
- TAtomic RunningState;
+ TAtomic RunningState;
TAtomic StopSignal;
TSystemEvent StoppedEvent;
TData Data;
@@ -143,7 +143,7 @@ void TEventLoop::Stop() {
}
bool TEventLoop::IsRunning() {
- return AtomicGet(Impl->RunningState) == EVENT_LOOP_RUNNING;
+ return AtomicGet(Impl->RunningState) == EVENT_LOOP_RUNNING;
}
TChannelPtr TEventLoop::Register(TSocket socket, TEventHandlerPtr eventHandler, void* cookie) {
@@ -277,7 +277,7 @@ TEventLoop::TImpl::TImpl(const char* name)
}
void TEventLoop::TImpl::Run() {
- bool res = AtomicCas(&RunningState, EVENT_LOOP_RUNNING, EVENT_LOOP_CREATED);
+ bool res = AtomicCas(&RunningState, EVENT_LOOP_RUNNING, EVENT_LOOP_CREATED);
Y_VERIFY(res, "Invalid mbus event loop state");
if (!!Name) {
@@ -320,21 +320,21 @@ void TEventLoop::TImpl::Run() {
Data.clear();
}
- res = AtomicCas(&RunningState, EVENT_LOOP_STOPPED, EVENT_LOOP_RUNNING);
+ res = AtomicCas(&RunningState, EVENT_LOOP_STOPPED, EVENT_LOOP_RUNNING);
Y_VERIFY(res);
-
+
StoppedEvent.Signal();
}
void TEventLoop::TImpl::Stop() {
AtomicSet(StopSignal, 1);
- if (AtomicGet(RunningState) == EVENT_LOOP_RUNNING) {
- Wakeup();
+ if (AtomicGet(RunningState) == EVENT_LOOP_RUNNING) {
+ Wakeup();
- StoppedEvent.WaitI();
- }
+ StoppedEvent.WaitI();
+ }
}
TChannelPtr TEventLoop::TImpl::Register(TSocket socket, TEventHandlerPtr eventHandler, void* cookie) {
diff --git a/library/cpp/messagebus/misc/granup.h b/library/cpp/messagebus/misc/granup.h
index 36ecfebc93..8b04aca597 100644
--- a/library/cpp/messagebus/misc/granup.h
+++ b/library/cpp/messagebus/misc/granup.h
@@ -1,50 +1,50 @@
-#pragma once
-
+#pragma once
+
#include <util/datetime/base.h>
#include <util/system/guard.h>
-#include <util/system/mutex.h>
-#include <util/system/spinlock.h>
-
-namespace NBus {
- template <typename TItem, typename TLocker = TSpinLock>
- class TGranUp {
- public:
- TGranUp(TDuration gran)
- : Gran(gran)
+#include <util/system/mutex.h>
+#include <util/system/spinlock.h>
+
+namespace NBus {
+ template <typename TItem, typename TLocker = TSpinLock>
+ class TGranUp {
+ public:
+ TGranUp(TDuration gran)
+ : Gran(gran)
, Next(TInstant::MicroSeconds(0))
{
}
-
+
template <typename TFunctor>
void Update(TFunctor functor, TInstant now, bool force = false) {
if (force || now > Next)
- Set(functor(), now);
- }
-
+ Set(functor(), now);
+ }
+
void Update(const TItem& item, TInstant now, bool force = false) {
if (force || now > Next)
- Set(item, now);
- }
-
+ Set(item, now);
+ }
+
TItem Get() const noexcept {
TGuard<TLocker> guard(Lock);
-
- return Item;
- }
-
- protected:
+
+ return Item;
+ }
+
+ protected:
void Set(const TItem& item, TInstant now) {
TGuard<TLocker> guard(Lock);
-
- Item = item;
-
- Next = now + Gran;
- }
-
- private:
- const TDuration Gran;
+
+ Item = item;
+
+ Next = now + Gran;
+ }
+
+ private:
+ const TDuration Gran;
TLocker Lock;
TItem Item;
TInstant Next;
- };
-}
+ };
+}
diff --git a/library/cpp/messagebus/misc/tokenquota.h b/library/cpp/messagebus/misc/tokenquota.h
index 190547fa54..954cf0f0d7 100644
--- a/library/cpp/messagebus/misc/tokenquota.h
+++ b/library/cpp/messagebus/misc/tokenquota.h
@@ -1,83 +1,83 @@
-#pragma once
-
-#include <util/system/atomic.h>
-
-namespace NBus {
- /* Consumer and feeder quota model impl.
-
- Consumer thread only calls:
- Acquire(), fetches tokens for usage from bucket;
- Consume(), eats given amount of tokens, must not
- be greater than Value() items;
-
- Other threads (feeders) calls:
- Return(), put used tokens back to bucket;
- */
-
- class TTokenQuota {
- public:
- TTokenQuota(bool enabled, size_t tokens, size_t wake)
- : Enabled(tokens > 0 ? enabled : false)
- , Acquired(0)
- , WakeLev(wake < 1 ? Max<size_t>(1, tokens / 2) : 0)
- , Tokens_(tokens)
+#pragma once
+
+#include <util/system/atomic.h>
+
+namespace NBus {
+ /* Consumer and feeder quota model impl.
+
+ Consumer thread only calls:
+ Acquire(), fetches tokens for usage from bucket;
+ Consume(), eats given amount of tokens, must not
+ be greater than Value() items;
+
+ Other threads (feeders) calls:
+ Return(), put used tokens back to bucket;
+ */
+
+ class TTokenQuota {
+ public:
+ TTokenQuota(bool enabled, size_t tokens, size_t wake)
+ : Enabled(tokens > 0 ? enabled : false)
+ , Acquired(0)
+ , WakeLev(wake < 1 ? Max<size_t>(1, tokens / 2) : 0)
+ , Tokens_(tokens)
{
Y_UNUSED(padd_);
}
-
+
bool Acquire(TAtomic level = 1, bool force = false) {
level = Max(TAtomicBase(level), TAtomicBase(1));
-
+
if (Enabled && (Acquired < level || force)) {
Acquired += AtomicSwap(&Tokens_, 0);
- }
-
- return !Enabled || Acquired >= level;
- }
-
+ }
+
+ return !Enabled || Acquired >= level;
+ }
+
void Consume(size_t items) {
if (Enabled) {
Y_ASSERT(Acquired >= TAtomicBase(items));
-
- Acquired -= items;
- }
- }
-
+
+ Acquired -= items;
+ }
+ }
+
bool Return(size_t items_) noexcept {
if (!Enabled || items_ == 0)
- return false;
-
- const TAtomic items = items_;
- const TAtomic value = AtomicAdd(Tokens_, items);
-
- return (value - items < WakeLev && value >= WakeLev);
- }
-
+ return false;
+
+ const TAtomic items = items_;
+ const TAtomic value = AtomicAdd(Tokens_, items);
+
+ return (value - items < WakeLev && value >= WakeLev);
+ }
+
bool IsEnabled() const noexcept {
- return Enabled;
- }
-
+ return Enabled;
+ }
+
bool IsAboveWake() const noexcept {
- return !Enabled || (WakeLev <= AtomicGet(Tokens_));
- }
-
+ return !Enabled || (WakeLev <= AtomicGet(Tokens_));
+ }
+
size_t Tokens() const noexcept {
- return Acquired + AtomicGet(Tokens_);
- }
-
+ return Acquired + AtomicGet(Tokens_);
+ }
+
size_t Check(const TAtomic level) const noexcept {
- return !Enabled || level <= Acquired;
- }
-
- private:
+ return !Enabled || level <= Acquired;
+ }
+
+ private:
bool Enabled;
TAtomicBase Acquired;
- const TAtomicBase WakeLev;
+ const TAtomicBase WakeLev;
TAtomic Tokens_;
-
- /* This padd requires for align Tokens_ member on its own
- CPU cacheline. */
-
+
+ /* This padd requires for align Tokens_ member on its own
+ CPU cacheline. */
+
ui64 padd_;
- };
-}
+ };
+}
diff --git a/library/cpp/messagebus/remote_connection.cpp b/library/cpp/messagebus/remote_connection.cpp
index 22932569db..730fc0f554 100644
--- a/library/cpp/messagebus/remote_connection.cpp
+++ b/library/cpp/messagebus/remote_connection.cpp
@@ -48,11 +48,11 @@ namespace NBus {
const TInstant now = TInstant::Now();
WriterFillStatus();
-
+
GranStatus.Writer.Update(WriterData.Status, now, true);
GranStatus.Reader.Update(ReaderData.Status, now, true);
}
-
+
TRemoteConnection::~TRemoteConnection() {
Y_VERIFY(ReplyQueue.IsEmpty());
}
@@ -73,7 +73,7 @@ namespace NBus {
bool TRemoteConnection::TReaderData::HasBytesInBuf(size_t bytes) noexcept {
size_t left = Buffer.Size() - Offset;
-
+
return (MoreBytes = left >= bytes ? 0 : bytes - left) == 0;
}
@@ -83,13 +83,13 @@ namespace NBus {
Y_VERIFY(State == WRITER_FILLING, "state must be initial");
Channel = channel;
}
-
+
void TRemoteConnection::TReaderData::SetChannel(NEventLoop::TChannelPtr channel) {
Y_VERIFY(!Channel, "must not have channel");
Y_VERIFY(Buffer.Empty(), "buffer must be empty");
Channel = channel;
}
-
+
void TRemoteConnection::TWriterData::DropChannel() {
if (!!Channel) {
Channel->Unregister();
@@ -184,7 +184,7 @@ namespace NBus {
ReaderData.Status.Fd = INVALID_SOCKET;
return;
}
-
+
ReaderData.DropChannel();
ReaderData.Status.Fd = readSocket.Socket;
@@ -232,10 +232,10 @@ namespace NBus {
ReaderData.Status.Acts += 1;
ReaderGetSocketQueue()->DequeueAllLikelyEmpty();
-
+
if (AtomicGet(ReaderData.Down)) {
ReaderData.DropChannel();
-
+
ReaderProcessStatusDown();
ReaderData.ShutdownComplete.Signal();
@@ -262,7 +262,7 @@ namespace NBus {
}
ReaderFlushMessages();
- }
+ }
ReaderSendStatus(now);
}
@@ -275,109 +275,109 @@ namespace NBus {
else if (!QuotaBytes.Acquire(bytes))
wakeFlags |= WAKE_QUOTA_BYTES;
-
+
if (wakeFlags) {
ReaderData.Status.QuotaExhausted++;
-
+
WriterGetWakeQueue()->EnqueueAndSchedule(wakeFlags);
}
-
+
return wakeFlags == 0;
}
-
+
void TRemoteConnection::QuotaConsume(size_t msg, size_t bytes) {
QuotaMsg.Consume(msg);
QuotaBytes.Consume(bytes);
}
-
+
void TRemoteConnection::QuotaReturnSelf(size_t items, size_t bytes) {
if (QuotaReturnValues(items, bytes))
ReadQuotaWakeup();
}
-
+
void TRemoteConnection::QuotaReturnAside(size_t items, size_t bytes) {
if (QuotaReturnValues(items, bytes) && !AtomicGet(WriterData.Down))
WriterGetWakeQueue()->EnqueueAndSchedule(0x0);
}
-
+
bool TRemoteConnection::QuotaReturnValues(size_t items, size_t bytes) {
bool rMsg = QuotaMsg.Return(items);
bool rBytes = QuotaBytes.Return(bytes);
-
+
return rMsg || rBytes;
}
-
+
void TRemoteConnection::ReadQuotaWakeup() {
const ui32 mask = WriterData.AwakeFlags & WriteWakeFlags();
-
+
if (mask && mask == WriterData.AwakeFlags) {
WriterData.Status.ReaderWakeups++;
WriterData.AwakeFlags = 0;
-
+
ScheduleRead();
}
}
-
+
ui32 TRemoteConnection::WriteWakeFlags() const {
ui32 awakeFlags = 0;
-
+
if (QuotaMsg.IsAboveWake())
awakeFlags |= WAKE_QUOTA_MSG;
-
+
if (QuotaBytes.IsAboveWake())
awakeFlags |= WAKE_QUOTA_BYTES;
-
+
return awakeFlags;
}
-
+
bool TRemoteConnection::ReaderProcessBuffer() {
TInstant now = TInstant::Now();
-
+
for (;;) {
if (!ReaderData.HasBytesInBuf(sizeof(TBusHeader))) {
break;
}
-
+
TBusHeader header(MakeArrayRef(ReaderData.Buffer.Data() + ReaderData.Offset, ReaderData.Buffer.Size() - ReaderData.Offset));
-
+
if (header.Size < sizeof(TBusHeader)) {
LWPROBE(Error, ToString(MESSAGE_HEADER_CORRUPTED), ToString(PeerAddr), ToString(header.Size));
ReaderData.Status.Incremental.StatusCounter[MESSAGE_HEADER_CORRUPTED] += 1;
ScheduleShutdownOnServerOrReconnectOnClient(MESSAGE_HEADER_CORRUPTED, false);
return false;
}
-
+
if (!IsVersionNegotiation(header) && !IsBusKeyValid(header.Id)) {
LWPROBE(Error, ToString(MESSAGE_HEADER_CORRUPTED), ToString(PeerAddr), ToString(header.Size));
ReaderData.Status.Incremental.StatusCounter[MESSAGE_HEADER_CORRUPTED] += 1;
ScheduleShutdownOnServerOrReconnectOnClient(MESSAGE_HEADER_CORRUPTED, false);
return false;
}
-
+
if (header.Size > Config.MaxMessageSize) {
LWPROBE(Error, ToString(MESSAGE_MESSAGE_TOO_LARGE), ToString(PeerAddr), ToString(header.Size));
ReaderData.Status.Incremental.StatusCounter[MESSAGE_MESSAGE_TOO_LARGE] += 1;
ScheduleShutdownOnServerOrReconnectOnClient(MESSAGE_MESSAGE_TOO_LARGE, false);
return false;
}
-
+
if (!ReaderData.HasBytesInBuf(header.Size)) {
if (ReaderData.Offset == 0) {
ReaderData.Buffer.Reserve(header.Size);
}
break;
}
-
+
if (!QuotaAcquire(1, header.Size))
return false;
-
+
if (!MessageRead(MakeArrayRef(ReaderData.Buffer.Data() + ReaderData.Offset, header.Size), now)) {
return false;
}
-
+
ReaderData.Offset += header.Size;
}
-
+
ReaderData.Buffer.ChopHead(ReaderData.Offset);
ReaderData.Offset = 0;
@@ -408,7 +408,7 @@ namespace NBus {
}
Y_ASSERT(ReaderData.Buffer.Avail() > 0);
-
+
ssize_t bytes;
{
TWhatThreadDoesPushPop pp("recv syscall");
@@ -454,7 +454,7 @@ namespace NBus {
message != replyQueueTemp.rend(); ++message) {
messages.push_back(message->MessagePtr.Release());
}
-
+
WriterErrorMessages(messages, reason);
replyQueueTemp.clear();
@@ -535,10 +535,10 @@ namespace NBus {
ClearBeforeSendQueue(reasonForQueues);
WriterGetReconnectQueue()->Clear();
WriterGetWakeQueue()->Clear();
-
+
TMessagesPtrs cleared;
ClearOutgoingQueue(cleared, false);
-
+
if (!Session->IsSource_) {
for (auto& i : cleared) {
TBusMessagePtrAndHeader h(i);
@@ -548,10 +548,10 @@ namespace NBus {
// and this part is not batch
}
}
-
+
WriterErrorMessages(cleared, reason);
}
-
+
void TRemoteConnection::BeforeTryWrite() {
}
@@ -638,7 +638,7 @@ namespace NBus {
WriterData.Status.Incremental.NetworkOps += 1;
WriterData.Buffer.LeftProceed(bytes);
- }
+ }
WriterData.Buffer.Clear();
if (WriterData.Buffer.Capacity() > MaxBufferSize) {
@@ -654,12 +654,12 @@ namespace NBus {
WriterGetReconnectQueue()->EnqueueAndSchedule(writer ? WriterData.SocketVersion : ReaderData.SocketVersion);
} else {
ScheduleShutdown(status);
- }
+ }
}
void TRemoteConnection::ScheduleShutdown(EMessageStatus status) {
ShutdownReason = status;
-
+
AtomicSet(ReaderData.Down, 1);
ScheduleRead();
@@ -856,7 +856,7 @@ namespace NBus {
}
TTempTlsVector<TBusMessagePtrAndHeader, void, TVectorSwaps> writeMessages;
-
+
for (;;) {
THolder<TBusMessage> writeMessage(WriterData.SendQueue.PopFront());
if (!writeMessage) {
@@ -944,12 +944,12 @@ namespace NBus {
WriterErrorMessage(h.MessagePtr.Release(), status);
}
}
-
+
void TRemoteConnection::WriterErrorMessage(TNonDestroyingAutoPtr<TBusMessage> m, EMessageStatus status) {
TBusMessage* released = m.Release();
WriterErrorMessages(MakeArrayRef(&released, 1), status);
}
-
+
void TRemoteConnection::WriterErrorMessages(const TArrayRef<TBusMessage*> ms, EMessageStatus status) {
ResetOneWayFlag(ms);
@@ -958,17 +958,17 @@ namespace NBus {
Session->InvokeOnError(m, status);
}
}
-
+
void TRemoteConnection::FireClientConnectionEvent(TClientConnectionEvent::EType type) {
Y_VERIFY(Session->IsSource_, "state check");
TClientConnectionEvent event(type, ConnectionId, PeerAddr);
TRemoteClientSession* session = CheckedCast<TRemoteClientSession*>(Session.Get());
session->ClientHandler->OnClientConnectionEvent(event);
}
-
+
bool TRemoteConnection::IsAlive() const {
return !AtomicGet(WriterData.Down);
}
-
+
}
}
diff --git a/library/cpp/messagebus/remote_connection.h b/library/cpp/messagebus/remote_connection.h
index 4538947368..5141a8ea9f 100644
--- a/library/cpp/messagebus/remote_connection.h
+++ b/library/cpp/messagebus/remote_connection.h
@@ -13,8 +13,8 @@
#include "storage.h"
#include "vector_swaps.h"
#include "ybus.h"
-#include "misc/granup.h"
-#include "misc/tokenquota.h"
+#include "misc/granup.h"
+#include "misc/tokenquota.h"
#include <library/cpp/messagebus/actor/actor.h>
#include <library/cpp/messagebus/actor/executor.h>
@@ -49,7 +49,7 @@ namespace NBus {
struct TWriterToReaderSocketMessage {
TSocket Socket;
ui32 SocketVersion;
-
+
TWriterToReaderSocketMessage(TSocket socket, ui32 socketVersion)
: Socket(socket)
, SocketVersion(socketVersion)
@@ -154,13 +154,13 @@ namespace NBus {
virtual void ReaderProcessMessageUnknownVersion(TArrayRef<const char> dataRef) = 0;
bool MessageRead(TArrayRef<const char> dataRef, TInstant now);
virtual void MessageSent(TArrayRef<TBusMessagePtrAndHeader> messages) = 0;
-
+
void CallSerialize(TBusMessage* msg, TBuffer& buffer) const;
void SerializeMessage(TBusMessage* msg, TBuffer* data, TMessageCounter* counter) const;
TBusMessage* DeserializeMessage(TArrayRef<const char> dataRef, const TBusHeader* header, TMessageCounter* messageCounter, EMessageStatus* status) const;
-
+
void ResetOneWayFlag(TArrayRef<TBusMessage*>);
-
+
inline ::NActor::TActor<TRemoteConnection, TWriterTag>* GetWriterActor() {
return this;
}
@@ -269,7 +269,7 @@ namespace NBus {
TGranUp<TRemoteConnectionWriterStatus> Writer;
TGranUp<TRemoteConnectionReaderStatus> Reader;
};
-
+
TWriterData WriterData;
TReaderData ReaderData;
TGranStatus GranStatus;
@@ -280,15 +280,15 @@ namespace NBus {
// client connection only
TLockFreeQueueBatch<TBusMessagePtrAndHeader, TVectorSwaps> ReplyQueue;
-
+
EMessageStatus ShutdownReason;
};
inline const TNetAddr& TRemoteConnection::GetAddr() const noexcept {
return PeerAddr;
}
-
+
typedef TIntrusivePtr<TRemoteConnection> TRemoteConnectionPtr;
-
+
}
}
diff --git a/library/cpp/messagebus/remote_connection_status.cpp b/library/cpp/messagebus/remote_connection_status.cpp
index 2c48b2a287..05ae84791c 100644
--- a/library/cpp/messagebus/remote_connection_status.cpp
+++ b/library/cpp/messagebus/remote_connection_status.cpp
@@ -180,15 +180,15 @@ TString TRemoteConnectionStatus::PrintToString() const {
p.AddRow("connect syscalls", WriterStatus.ConnectSyscalls);
}
- p.AddRow("send queue", LeftPad(WriterStatus.SendQueueSize, 6));
-
+ p.AddRow("send queue", LeftPad(WriterStatus.SendQueueSize, 6));
+
if (Server) {
- p.AddRow("quota msg", LeftPad(ReaderStatus.QuotaMsg, 6));
- p.AddRow("quota bytes", LeftPad(ReaderStatus.QuotaBytes, 6));
- p.AddRow("quota exhausted", LeftPad(ReaderStatus.QuotaExhausted, 6));
- p.AddRow("reader wakeups", LeftPad(WriterStatus.ReaderWakeups, 6));
- } else {
- p.AddRow("ack messages", LeftPad(WriterStatus.AckMessagesSize, 6));
+ p.AddRow("quota msg", LeftPad(ReaderStatus.QuotaMsg, 6));
+ p.AddRow("quota bytes", LeftPad(ReaderStatus.QuotaBytes, 6));
+ p.AddRow("quota exhausted", LeftPad(ReaderStatus.QuotaExhausted, 6));
+ p.AddRow("reader wakeups", LeftPad(WriterStatus.ReaderWakeups, 6));
+ } else {
+ p.AddRow("ack messages", LeftPad(WriterStatus.AckMessagesSize, 6));
}
p.AddRow("written", WriterStatus.Incremental.MessageCounter.ToString(false));
diff --git a/library/cpp/messagebus/remote_server_session.cpp b/library/cpp/messagebus/remote_server_session.cpp
index 6abbf88a60..34dd2153e2 100644
--- a/library/cpp/messagebus/remote_server_session.cpp
+++ b/library/cpp/messagebus/remote_server_session.cpp
@@ -24,9 +24,9 @@ TRemoteServerSession::TRemoteServerSession(TBusMessageQueue* queue,
{
if (config.PerConnectionMaxInFlightBySize > 0) {
if (config.PerConnectionMaxInFlightBySize < config.MaxMessageSize)
- ythrow yexception()
- << "too low PerConnectionMaxInFlightBySize value";
- }
+ ythrow yexception()
+ << "too low PerConnectionMaxInFlightBySize value";
+ }
}
namespace NBus {
@@ -87,7 +87,7 @@ void TRemoteServerSession::OnMessageReceived(TRemoteConnection* c, TVectorSwaps<
void TRemoteServerSession::InvokeOnMessage(TBusMessagePtrAndHeader& request, TIntrusivePtr<TRemoteServerConnection>& conn) {
if (Y_UNLIKELY(AtomicGet(Down))) {
- ReleaseInWorkRequests(*conn.Get(), request.MessagePtr.Get());
+ ReleaseInWorkRequests(*conn.Get(), request.MessagePtr.Get());
InvokeOnError(request.MessagePtr.Release(), MESSAGE_SHUTDOWN);
} else {
TWhatThreadDoesPushPop pp("OnMessage");
@@ -167,19 +167,19 @@ void TRemoteServerSession::ReleaseInWorkResponses(TArrayRef<const TBusMessagePtr
void TRemoteServerSession::ReleaseInWorkRequests(TRemoteConnection& con, TBusMessage* request) {
Y_ASSERT((request->LocalFlags & MESSAGE_IN_WORK));
- request->LocalFlags &= ~MESSAGE_IN_WORK;
-
- const size_t size = request->GetHeader()->Size;
+ request->LocalFlags &= ~MESSAGE_IN_WORK;
- con.QuotaReturnAside(1, size);
- ServerOwnedMessages.ReleaseMultiple(1, size);
+ const size_t size = request->GetHeader()->Size;
+
+ con.QuotaReturnAside(1, size);
+ ServerOwnedMessages.ReleaseMultiple(1, size);
}
void TRemoteServerSession::ReleaseInWork(TBusIdentity& ident) {
- ident.SetInWork(false);
- ident.Connection->QuotaReturnAside(1, ident.Size);
+ ident.SetInWork(false);
+ ident.Connection->QuotaReturnAside(1, ident.Size);
- ServerOwnedMessages.ReleaseMultiple(1, ident.Size);
+ ServerOwnedMessages.ReleaseMultiple(1, ident.Size);
}
void TRemoteServerSession::ConvertInWork(TBusIdentity& req, TBusMessage* reply) {
diff --git a/library/cpp/messagebus/session_impl.cpp b/library/cpp/messagebus/session_impl.cpp
index ddf9f360c4..7adaa1ae6d 100644
--- a/library/cpp/messagebus/session_impl.cpp
+++ b/library/cpp/messagebus/session_impl.cpp
@@ -389,14 +389,14 @@ void TBusSessionImpl::StatusUpdateCachedDump() {
for (TVector<TAcceptorPtr>::const_iterator acceptor = acceptors.begin();
acceptor != acceptors.end(); ++acceptor) {
- const TAcceptorStatus status = (*acceptor)->GranStatus.Listen.Get();
-
- acceptorStatusSummary += status;
-
+ const TAcceptorStatus status = (*acceptor)->GranStatus.Listen.Get();
+
+ acceptorStatusSummary += status;
+
if (acceptor != acceptors.begin()) {
ss << "\n";
}
- ss << status.PrintToString();
+ ss << status.PrintToString();
}
r.Acceptors = ss.Str();
@@ -410,19 +410,19 @@ void TBusSessionImpl::StatusUpdateCachedDump() {
if (connection != connections.begin()) {
ss << "\n";
}
-
+
TRemoteConnectionStatus status;
status.Server = !IsSource_;
- status.ReaderStatus = (*connection)->GranStatus.Reader.Get();
- status.WriterStatus = (*connection)->GranStatus.Writer.Get();
-
+ status.ReaderStatus = (*connection)->GranStatus.Reader.Get();
+ status.WriterStatus = (*connection)->GranStatus.Writer.Get();
+
ss << status.PrintToString();
-
- r.ConnectionStatusSummary.ReaderStatus += status.ReaderStatus;
- r.ConnectionStatusSummary.WriterStatus += status.WriterStatus;
+
+ r.ConnectionStatusSummary.ReaderStatus += status.ReaderStatus;
+ r.ConnectionStatusSummary.WriterStatus += status.WriterStatus;
}
- r.ConnectionsSummary = r.ConnectionStatusSummary.PrintToString();
+ r.ConnectionsSummary = r.ConnectionStatusSummary.PrintToString();
r.Connections = ss.Str();
}
diff --git a/library/cpp/messagebus/test/ut/messagebus_ut.cpp b/library/cpp/messagebus/test/ut/messagebus_ut.cpp
index 040f9b7702..c11d447224 100644
--- a/library/cpp/messagebus/test/ut/messagebus_ut.cpp
+++ b/library/cpp/messagebus/test/ut/messagebus_ut.cpp
@@ -962,103 +962,103 @@ Y_UNIT_TEST_SUITE(TMessageBusTests) {
client.Sync.WaitForAndIncrement(3);
}
-
+
struct TServerForQuotaWake: public TExampleServer {
TSystemEvent GoOn;
TMutex OneLock;
-
- TOnMessageContext OneMessage;
-
- static TBusServerSessionConfig Config() {
- TBusServerSessionConfig config;
-
- config.PerConnectionMaxInFlight = 1;
- config.PerConnectionMaxInFlightBySize = 1500;
- config.MaxMessageSize = 1024;
-
- return config;
- }
-
- TServerForQuotaWake()
- : TExampleServer("TServerForQuotaWake", Config())
+
+ TOnMessageContext OneMessage;
+
+ static TBusServerSessionConfig Config() {
+ TBusServerSessionConfig config;
+
+ config.PerConnectionMaxInFlight = 1;
+ config.PerConnectionMaxInFlightBySize = 1500;
+ config.MaxMessageSize = 1024;
+
+ return config;
+ }
+
+ TServerForQuotaWake()
+ : TExampleServer("TServerForQuotaWake", Config())
{
}
-
+
~TServerForQuotaWake() override {
- Session->Shutdown();
- }
-
+ Session->Shutdown();
+ }
+
void OnMessage(TOnMessageContext& req) override {
- if (!GoOn.Wait(0)) {
+ if (!GoOn.Wait(0)) {
TGuard<TMutex> guard(OneLock);
-
- UNIT_ASSERT(!OneMessage);
-
- OneMessage.Swap(req);
- } else
- TExampleServer::OnMessage(req);
- }
-
- void WakeOne() {
+
+ UNIT_ASSERT(!OneMessage);
+
+ OneMessage.Swap(req);
+ } else
+ TExampleServer::OnMessage(req);
+ }
+
+ void WakeOne() {
TGuard<TMutex> guard(OneLock);
-
- UNIT_ASSERT(!!OneMessage);
-
- TExampleServer::OnMessage(OneMessage);
-
- TOnMessageContext().Swap(OneMessage);
- }
- };
-
+
+ UNIT_ASSERT(!!OneMessage);
+
+ TExampleServer::OnMessage(OneMessage);
+
+ TOnMessageContext().Swap(OneMessage);
+ }
+ };
+
Y_UNIT_TEST(WakeReaderOnQuota) {
- const size_t test_msg_count = 64;
-
- TBusClientSessionConfig clientConfig;
-
- clientConfig.MaxInFlight = test_msg_count;
-
- TExampleClient client(clientConfig);
- TServerForQuotaWake server;
- TInstant start;
-
- client.MessageCount = test_msg_count;
-
- const NBus::TNetAddr addr = server.GetActualListenAddr();
-
- for (unsigned count = 0;;) {
- UNIT_ASSERT(count <= test_msg_count);
-
- TAutoPtr<TBusMessage> message(new TExampleRequest(&client.Proto.RequestCount));
- EMessageStatus status = client.Session->SendMessageAutoPtr(message, &addr);
-
- if (status == MESSAGE_OK) {
- count++;
-
- } else if (status == MESSAGE_BUSY) {
+ const size_t test_msg_count = 64;
+
+ TBusClientSessionConfig clientConfig;
+
+ clientConfig.MaxInFlight = test_msg_count;
+
+ TExampleClient client(clientConfig);
+ TServerForQuotaWake server;
+ TInstant start;
+
+ client.MessageCount = test_msg_count;
+
+ const NBus::TNetAddr addr = server.GetActualListenAddr();
+
+ for (unsigned count = 0;;) {
+ UNIT_ASSERT(count <= test_msg_count);
+
+ TAutoPtr<TBusMessage> message(new TExampleRequest(&client.Proto.RequestCount));
+ EMessageStatus status = client.Session->SendMessageAutoPtr(message, &addr);
+
+ if (status == MESSAGE_OK) {
+ count++;
+
+ } else if (status == MESSAGE_BUSY) {
if (count == test_msg_count) {
- TInstant now = TInstant::Now();
-
+ TInstant now = TInstant::Now();
+
if (start.GetValue() == 0) {
- start = now;
-
+ start = now;
+
// TODO: properly check that server is blocked
} else if (start + TDuration::MilliSeconds(100) < now) {
- break;
- }
- }
-
- Sleep(TDuration::MilliSeconds(10));
-
- } else
- UNIT_ASSERT(false);
- }
-
- server.GoOn.Signal();
- server.WakeOne();
-
- client.WaitReplies();
-
- server.WaitForOnMessageCount(test_msg_count);
+ break;
+ }
+ }
+
+ Sleep(TDuration::MilliSeconds(10));
+
+ } else
+ UNIT_ASSERT(false);
+ }
+
+ server.GoOn.Signal();
+ server.WakeOne();
+
+ client.WaitReplies();
+
+ server.WaitForOnMessageCount(test_msg_count);
};
Y_UNIT_TEST(TestConnectionAttempts) {
diff --git a/library/cpp/messagebus/www/www.cpp b/library/cpp/messagebus/www/www.cpp
index 62ec241d85..e501cbf4a9 100644
--- a/library/cpp/messagebus/www/www.cpp
+++ b/library/cpp/messagebus/www/www.cpp
@@ -200,12 +200,12 @@ struct TBusWww::TImpl {
Queues.Add(s->GetQueue());
}
- void RegisterQueue(TBusMessageQueuePtr q) {
+ void RegisterQueue(TBusMessageQueuePtr q) {
Y_VERIFY(!!q);
- TGuard<TMutex> g(Mutex);
- Queues.Add(q);
- }
-
+ TGuard<TMutex> g(Mutex);
+ Queues.Add(q);
+ }
+
void RegisterModule(TBusModule* module) {
Y_VERIFY(!!module);
TGuard<TMutex> g(Mutex);
@@ -824,10 +824,10 @@ void TBusWww::RegisterServerSession(TBusServerSessionPtr s) {
Impl->RegisterServerSession(s);
}
-void TBusWww::RegisterQueue(TBusMessageQueuePtr q) {
- Impl->RegisterQueue(q);
-}
-
+void TBusWww::RegisterQueue(TBusMessageQueuePtr q) {
+ Impl->RegisterQueue(q);
+}
+
void TBusWww::RegisterModule(TBusModule* module) {
Impl->RegisterModule(module);
}