diff options
author | single <single@yandex-team.ru> | 2022-02-10 16:50:29 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:50:29 +0300 |
commit | 8ae96df130bbede609c3504aa9af1bc6ff5361b3 (patch) | |
tree | 4751832974bd75ca721269aa54faa15d76032dfb /library/cpp/messagebus | |
parent | 5d4e7b7c923852e0f6398791ec98a60cf9faab46 (diff) | |
download | ydb-8ae96df130bbede609c3504aa9af1bc6ff5361b3.tar.gz |
Restoring authorship annotation for <single@yandex-team.ru>. Commit 1 of 2.
Diffstat (limited to 'library/cpp/messagebus')
-rw-r--r-- | library/cpp/messagebus/acceptor.cpp | 18 | ||||
-rw-r--r-- | library/cpp/messagebus/acceptor.h | 2 | ||||
-rw-r--r-- | library/cpp/messagebus/actor/queue_for_actor.h | 6 | ||||
-rw-r--r-- | library/cpp/messagebus/actor/temp_tls_vector.h | 22 | ||||
-rw-r--r-- | library/cpp/messagebus/config/netaddr.cpp | 2 | ||||
-rw-r--r-- | library/cpp/messagebus/config/netaddr.h | 4 | ||||
-rw-r--r-- | library/cpp/messagebus/config/session_config.cpp | 2 | ||||
-rw-r--r-- | library/cpp/messagebus/event_loop.cpp | 18 | ||||
-rw-r--r-- | library/cpp/messagebus/misc/granup.h | 66 | ||||
-rw-r--r-- | library/cpp/messagebus/misc/tokenquota.h | 126 | ||||
-rw-r--r-- | library/cpp/messagebus/remote_connection.cpp | 100 | ||||
-rw-r--r-- | library/cpp/messagebus/remote_connection.h | 20 | ||||
-rw-r--r-- | library/cpp/messagebus/remote_connection_status.cpp | 16 | ||||
-rw-r--r-- | library/cpp/messagebus/remote_server_session.cpp | 24 | ||||
-rw-r--r-- | library/cpp/messagebus/session_impl.cpp | 26 | ||||
-rw-r--r-- | library/cpp/messagebus/test/ut/messagebus_ut.cpp | 166 | ||||
-rw-r--r-- | library/cpp/messagebus/www/www.cpp | 18 |
17 files changed, 318 insertions, 318 deletions
diff --git a/library/cpp/messagebus/acceptor.cpp b/library/cpp/messagebus/acceptor.cpp index 64a38619c2..de8810d02e 100644 --- a/library/cpp/messagebus/acceptor.cpp +++ b/library/cpp/messagebus/acceptor.cpp @@ -19,7 +19,7 @@ TAcceptor::TAcceptor(TBusSessionImpl* session, ui64 acceptorId, SOCKET socket, c : TActor<TAcceptor>(session->Queue->WorkQueue.Get()) , AcceptorId(acceptorId) , Session(session) - , GranStatus(session->Config.Secret.StatusFlushPeriod) + , GranStatus(session->Config.Secret.StatusFlushPeriod) { SetNonBlock(socket, true); @@ -30,7 +30,7 @@ TAcceptor::TAcceptor(TBusSessionImpl* session, ui64 acceptorId, SOCKET socket, c Stats.Fd = socket; Stats.ListenAddr = addr; - SendStatus(TInstant::Now()); + SendStatus(TInstant::Now()); } void TAcceptor::Act(TDefaultTag) { @@ -40,8 +40,8 @@ void TAcceptor::Act(TDefaultTag) { return; } - TInstant now = TInstant::Now(); - + TInstant now = TInstant::Now(); + if (state == SS_SHUTDOWN_COMMAND) { if (!!Channel) { Channel->Unregister(); @@ -49,7 +49,7 @@ void TAcceptor::Act(TDefaultTag) { Stats.Fd = INVALID_SOCKET; } - SendStatus(now); + SendStatus(now); Session->GetDeadAcceptorStatusQueue()->EnqueueAndSchedule(Stats); Stats.ResetIncremental(); @@ -96,7 +96,7 @@ void TAcceptor::Act(TDefaultTag) { Session->GetOnAcceptQueue()->EnqueueAndSchedule(onAccept); - Stats.LastAcceptSuccessInstant = now; + Stats.LastAcceptSuccessInstant = now; ++Stats.AcceptSuccessCount; } @@ -105,11 +105,11 @@ void TAcceptor::Act(TDefaultTag) { Channel->EnableRead(); - SendStatus(now); + SendStatus(now); } -void TAcceptor::SendStatus(TInstant now) { - GranStatus.Listen.Update(Stats, now); +void TAcceptor::SendStatus(TInstant now) { + GranStatus.Listen.Update(Stats, now); } void TAcceptor::HandleEvent(SOCKET socket, void* cookie) { diff --git a/library/cpp/messagebus/acceptor.h b/library/cpp/messagebus/acceptor.h index 57cb010bf2..8ec2229d63 100644 --- a/library/cpp/messagebus/acceptor.h +++ b/library/cpp/messagebus/acceptor.h @@ -55,6 +55,6 @@ namespace NBus { TGranStatus GranStatus; }; - + } } diff --git a/library/cpp/messagebus/actor/queue_for_actor.h b/library/cpp/messagebus/actor/queue_for_actor.h index 40fa536b82..d26a546296 100644 --- a/library/cpp/messagebus/actor/queue_for_actor.h +++ b/library/cpp/messagebus/actor/queue_for_actor.h @@ -60,15 +60,15 @@ namespace NActor { temp.Shrink(); } } - + template <typename TFunc> void DequeueAllLikelyEmpty(const TFunc& func) { if (Y_LIKELY(IsEmpty())) { return; } - + DequeueAll(func); - } + } }; } diff --git a/library/cpp/messagebus/actor/temp_tls_vector.h b/library/cpp/messagebus/actor/temp_tls_vector.h index 675d92f5b0..407703d702 100644 --- a/library/cpp/messagebus/actor/temp_tls_vector.h +++ b/library/cpp/messagebus/actor/temp_tls_vector.h @@ -23,18 +23,18 @@ public: } ~TTempTlsVector() { - Clear(); - } - - void Clear() { + Clear(); + } + + void Clear() { Vector->clear(); } - + size_t Capacity() const noexcept { - return Vector->capacity(); - } - - void Shrink() { - Vector->shrink_to_fit(); - } + return Vector->capacity(); + } + + void Shrink() { + Vector->shrink_to_fit(); + } }; diff --git a/library/cpp/messagebus/config/netaddr.cpp b/library/cpp/messagebus/config/netaddr.cpp index 962ac538e2..c1cb356840 100644 --- a/library/cpp/messagebus/config/netaddr.cpp +++ b/library/cpp/messagebus/config/netaddr.cpp @@ -129,7 +129,7 @@ namespace NBus { ythrow TNetAddr::TError() << "cannot resolve " << host << ":" << port << " into " << Describe(requireVersion); } } - + TNetAddr::TNetAddr(const TNetworkAddress& na, EIpVersion requireVersion /*= EIP_VERSION_ANY*/, EIpVersion preferVersion /*= EIP_VERSION_ANY*/) : Ptr(MakeAddress(na, requireVersion, preferVersion)) { diff --git a/library/cpp/messagebus/config/netaddr.h b/library/cpp/messagebus/config/netaddr.h index b79c0cc355..ccb4b42810 100644 --- a/library/cpp/messagebus/config/netaddr.h +++ b/library/cpp/messagebus/config/netaddr.h @@ -36,14 +36,14 @@ namespace NBus { public: class TError: public yexception { }; - + TNetAddr(); TNetAddr(TAutoPtr<IRemoteAddr> addr); TNetAddr(const char* hostPort, EIpVersion requireVersion = EIP_VERSION_ANY, EIpVersion preferVersion = EIP_VERSION_ANY); TNetAddr(TStringBuf host, int port, EIpVersion requireVersion = EIP_VERSION_ANY, EIpVersion preferVersion = EIP_VERSION_ANY); TNetAddr(const TNetworkAddress& na, EIpVersion requireVersion = EIP_VERSION_ANY, EIpVersion preferVersion = EIP_VERSION_ANY); TNetAddr(const TNetworkAddress& na, const TAddrInfo& ai); - + bool operator==(const TNetAddr&) const; bool operator!=(const TNetAddr& other) const { return !(*this == other); diff --git a/library/cpp/messagebus/config/session_config.cpp b/library/cpp/messagebus/config/session_config.cpp index fbbbb106c9..17157b3dfa 100644 --- a/library/cpp/messagebus/config/session_config.cpp +++ b/library/cpp/messagebus/config/session_config.cpp @@ -120,7 +120,7 @@ void TBusSessionConfig::ConfigureLastGetopt(NLastGetopt::TOpts& opts, opts.AddLongOption(prefix + "max-message-size") .RequiredArgument("BYTES") .DefaultValue(ToString(MaxMessageSize)) - .StoreMappedResultT<const char*>(&MaxMessageSize, &ParseWithKmgSuffix); + .StoreMappedResultT<const char*>(&MaxMessageSize, &ParseWithKmgSuffix); opts.AddLongOption(prefix + "socket-recv-buffer-size") .RequiredArgument("BYTES") .DefaultValue(ToString(SocketRecvBufferSize)) diff --git a/library/cpp/messagebus/event_loop.cpp b/library/cpp/messagebus/event_loop.cpp index f685135bed..fd2e726d0b 100644 --- a/library/cpp/messagebus/event_loop.cpp +++ b/library/cpp/messagebus/event_loop.cpp @@ -78,7 +78,7 @@ public: const char* Name; - TAtomic RunningState; + TAtomic RunningState; TAtomic StopSignal; TSystemEvent StoppedEvent; TData Data; @@ -143,7 +143,7 @@ void TEventLoop::Stop() { } bool TEventLoop::IsRunning() { - return AtomicGet(Impl->RunningState) == EVENT_LOOP_RUNNING; + return AtomicGet(Impl->RunningState) == EVENT_LOOP_RUNNING; } TChannelPtr TEventLoop::Register(TSocket socket, TEventHandlerPtr eventHandler, void* cookie) { @@ -277,7 +277,7 @@ TEventLoop::TImpl::TImpl(const char* name) } void TEventLoop::TImpl::Run() { - bool res = AtomicCas(&RunningState, EVENT_LOOP_RUNNING, EVENT_LOOP_CREATED); + bool res = AtomicCas(&RunningState, EVENT_LOOP_RUNNING, EVENT_LOOP_CREATED); Y_VERIFY(res, "Invalid mbus event loop state"); if (!!Name) { @@ -320,21 +320,21 @@ void TEventLoop::TImpl::Run() { Data.clear(); } - res = AtomicCas(&RunningState, EVENT_LOOP_STOPPED, EVENT_LOOP_RUNNING); + res = AtomicCas(&RunningState, EVENT_LOOP_STOPPED, EVENT_LOOP_RUNNING); Y_VERIFY(res); - + StoppedEvent.Signal(); } void TEventLoop::TImpl::Stop() { AtomicSet(StopSignal, 1); - if (AtomicGet(RunningState) == EVENT_LOOP_RUNNING) { - Wakeup(); + if (AtomicGet(RunningState) == EVENT_LOOP_RUNNING) { + Wakeup(); - StoppedEvent.WaitI(); - } + StoppedEvent.WaitI(); + } } TChannelPtr TEventLoop::TImpl::Register(TSocket socket, TEventHandlerPtr eventHandler, void* cookie) { diff --git a/library/cpp/messagebus/misc/granup.h b/library/cpp/messagebus/misc/granup.h index 36ecfebc93..8b04aca597 100644 --- a/library/cpp/messagebus/misc/granup.h +++ b/library/cpp/messagebus/misc/granup.h @@ -1,50 +1,50 @@ -#pragma once - +#pragma once + #include <util/datetime/base.h> #include <util/system/guard.h> -#include <util/system/mutex.h> -#include <util/system/spinlock.h> - -namespace NBus { - template <typename TItem, typename TLocker = TSpinLock> - class TGranUp { - public: - TGranUp(TDuration gran) - : Gran(gran) +#include <util/system/mutex.h> +#include <util/system/spinlock.h> + +namespace NBus { + template <typename TItem, typename TLocker = TSpinLock> + class TGranUp { + public: + TGranUp(TDuration gran) + : Gran(gran) , Next(TInstant::MicroSeconds(0)) { } - + template <typename TFunctor> void Update(TFunctor functor, TInstant now, bool force = false) { if (force || now > Next) - Set(functor(), now); - } - + Set(functor(), now); + } + void Update(const TItem& item, TInstant now, bool force = false) { if (force || now > Next) - Set(item, now); - } - + Set(item, now); + } + TItem Get() const noexcept { TGuard<TLocker> guard(Lock); - - return Item; - } - - protected: + + return Item; + } + + protected: void Set(const TItem& item, TInstant now) { TGuard<TLocker> guard(Lock); - - Item = item; - - Next = now + Gran; - } - - private: - const TDuration Gran; + + Item = item; + + Next = now + Gran; + } + + private: + const TDuration Gran; TLocker Lock; TItem Item; TInstant Next; - }; -} + }; +} diff --git a/library/cpp/messagebus/misc/tokenquota.h b/library/cpp/messagebus/misc/tokenquota.h index 190547fa54..954cf0f0d7 100644 --- a/library/cpp/messagebus/misc/tokenquota.h +++ b/library/cpp/messagebus/misc/tokenquota.h @@ -1,83 +1,83 @@ -#pragma once - -#include <util/system/atomic.h> - -namespace NBus { - /* Consumer and feeder quota model impl. - - Consumer thread only calls: - Acquire(), fetches tokens for usage from bucket; - Consume(), eats given amount of tokens, must not - be greater than Value() items; - - Other threads (feeders) calls: - Return(), put used tokens back to bucket; - */ - - class TTokenQuota { - public: - TTokenQuota(bool enabled, size_t tokens, size_t wake) - : Enabled(tokens > 0 ? enabled : false) - , Acquired(0) - , WakeLev(wake < 1 ? Max<size_t>(1, tokens / 2) : 0) - , Tokens_(tokens) +#pragma once + +#include <util/system/atomic.h> + +namespace NBus { + /* Consumer and feeder quota model impl. + + Consumer thread only calls: + Acquire(), fetches tokens for usage from bucket; + Consume(), eats given amount of tokens, must not + be greater than Value() items; + + Other threads (feeders) calls: + Return(), put used tokens back to bucket; + */ + + class TTokenQuota { + public: + TTokenQuota(bool enabled, size_t tokens, size_t wake) + : Enabled(tokens > 0 ? enabled : false) + , Acquired(0) + , WakeLev(wake < 1 ? Max<size_t>(1, tokens / 2) : 0) + , Tokens_(tokens) { Y_UNUSED(padd_); } - + bool Acquire(TAtomic level = 1, bool force = false) { level = Max(TAtomicBase(level), TAtomicBase(1)); - + if (Enabled && (Acquired < level || force)) { Acquired += AtomicSwap(&Tokens_, 0); - } - - return !Enabled || Acquired >= level; - } - + } + + return !Enabled || Acquired >= level; + } + void Consume(size_t items) { if (Enabled) { Y_ASSERT(Acquired >= TAtomicBase(items)); - - Acquired -= items; - } - } - + + Acquired -= items; + } + } + bool Return(size_t items_) noexcept { if (!Enabled || items_ == 0) - return false; - - const TAtomic items = items_; - const TAtomic value = AtomicAdd(Tokens_, items); - - return (value - items < WakeLev && value >= WakeLev); - } - + return false; + + const TAtomic items = items_; + const TAtomic value = AtomicAdd(Tokens_, items); + + return (value - items < WakeLev && value >= WakeLev); + } + bool IsEnabled() const noexcept { - return Enabled; - } - + return Enabled; + } + bool IsAboveWake() const noexcept { - return !Enabled || (WakeLev <= AtomicGet(Tokens_)); - } - + return !Enabled || (WakeLev <= AtomicGet(Tokens_)); + } + size_t Tokens() const noexcept { - return Acquired + AtomicGet(Tokens_); - } - + return Acquired + AtomicGet(Tokens_); + } + size_t Check(const TAtomic level) const noexcept { - return !Enabled || level <= Acquired; - } - - private: + return !Enabled || level <= Acquired; + } + + private: bool Enabled; TAtomicBase Acquired; - const TAtomicBase WakeLev; + const TAtomicBase WakeLev; TAtomic Tokens_; - - /* This padd requires for align Tokens_ member on its own - CPU cacheline. */ - + + /* This padd requires for align Tokens_ member on its own + CPU cacheline. */ + ui64 padd_; - }; -} + }; +} diff --git a/library/cpp/messagebus/remote_connection.cpp b/library/cpp/messagebus/remote_connection.cpp index 22932569db..730fc0f554 100644 --- a/library/cpp/messagebus/remote_connection.cpp +++ b/library/cpp/messagebus/remote_connection.cpp @@ -48,11 +48,11 @@ namespace NBus { const TInstant now = TInstant::Now(); WriterFillStatus(); - + GranStatus.Writer.Update(WriterData.Status, now, true); GranStatus.Reader.Update(ReaderData.Status, now, true); } - + TRemoteConnection::~TRemoteConnection() { Y_VERIFY(ReplyQueue.IsEmpty()); } @@ -73,7 +73,7 @@ namespace NBus { bool TRemoteConnection::TReaderData::HasBytesInBuf(size_t bytes) noexcept { size_t left = Buffer.Size() - Offset; - + return (MoreBytes = left >= bytes ? 0 : bytes - left) == 0; } @@ -83,13 +83,13 @@ namespace NBus { Y_VERIFY(State == WRITER_FILLING, "state must be initial"); Channel = channel; } - + void TRemoteConnection::TReaderData::SetChannel(NEventLoop::TChannelPtr channel) { Y_VERIFY(!Channel, "must not have channel"); Y_VERIFY(Buffer.Empty(), "buffer must be empty"); Channel = channel; } - + void TRemoteConnection::TWriterData::DropChannel() { if (!!Channel) { Channel->Unregister(); @@ -184,7 +184,7 @@ namespace NBus { ReaderData.Status.Fd = INVALID_SOCKET; return; } - + ReaderData.DropChannel(); ReaderData.Status.Fd = readSocket.Socket; @@ -232,10 +232,10 @@ namespace NBus { ReaderData.Status.Acts += 1; ReaderGetSocketQueue()->DequeueAllLikelyEmpty(); - + if (AtomicGet(ReaderData.Down)) { ReaderData.DropChannel(); - + ReaderProcessStatusDown(); ReaderData.ShutdownComplete.Signal(); @@ -262,7 +262,7 @@ namespace NBus { } ReaderFlushMessages(); - } + } ReaderSendStatus(now); } @@ -275,109 +275,109 @@ namespace NBus { else if (!QuotaBytes.Acquire(bytes)) wakeFlags |= WAKE_QUOTA_BYTES; - + if (wakeFlags) { ReaderData.Status.QuotaExhausted++; - + WriterGetWakeQueue()->EnqueueAndSchedule(wakeFlags); } - + return wakeFlags == 0; } - + void TRemoteConnection::QuotaConsume(size_t msg, size_t bytes) { QuotaMsg.Consume(msg); QuotaBytes.Consume(bytes); } - + void TRemoteConnection::QuotaReturnSelf(size_t items, size_t bytes) { if (QuotaReturnValues(items, bytes)) ReadQuotaWakeup(); } - + void TRemoteConnection::QuotaReturnAside(size_t items, size_t bytes) { if (QuotaReturnValues(items, bytes) && !AtomicGet(WriterData.Down)) WriterGetWakeQueue()->EnqueueAndSchedule(0x0); } - + bool TRemoteConnection::QuotaReturnValues(size_t items, size_t bytes) { bool rMsg = QuotaMsg.Return(items); bool rBytes = QuotaBytes.Return(bytes); - + return rMsg || rBytes; } - + void TRemoteConnection::ReadQuotaWakeup() { const ui32 mask = WriterData.AwakeFlags & WriteWakeFlags(); - + if (mask && mask == WriterData.AwakeFlags) { WriterData.Status.ReaderWakeups++; WriterData.AwakeFlags = 0; - + ScheduleRead(); } } - + ui32 TRemoteConnection::WriteWakeFlags() const { ui32 awakeFlags = 0; - + if (QuotaMsg.IsAboveWake()) awakeFlags |= WAKE_QUOTA_MSG; - + if (QuotaBytes.IsAboveWake()) awakeFlags |= WAKE_QUOTA_BYTES; - + return awakeFlags; } - + bool TRemoteConnection::ReaderProcessBuffer() { TInstant now = TInstant::Now(); - + for (;;) { if (!ReaderData.HasBytesInBuf(sizeof(TBusHeader))) { break; } - + TBusHeader header(MakeArrayRef(ReaderData.Buffer.Data() + ReaderData.Offset, ReaderData.Buffer.Size() - ReaderData.Offset)); - + if (header.Size < sizeof(TBusHeader)) { LWPROBE(Error, ToString(MESSAGE_HEADER_CORRUPTED), ToString(PeerAddr), ToString(header.Size)); ReaderData.Status.Incremental.StatusCounter[MESSAGE_HEADER_CORRUPTED] += 1; ScheduleShutdownOnServerOrReconnectOnClient(MESSAGE_HEADER_CORRUPTED, false); return false; } - + if (!IsVersionNegotiation(header) && !IsBusKeyValid(header.Id)) { LWPROBE(Error, ToString(MESSAGE_HEADER_CORRUPTED), ToString(PeerAddr), ToString(header.Size)); ReaderData.Status.Incremental.StatusCounter[MESSAGE_HEADER_CORRUPTED] += 1; ScheduleShutdownOnServerOrReconnectOnClient(MESSAGE_HEADER_CORRUPTED, false); return false; } - + if (header.Size > Config.MaxMessageSize) { LWPROBE(Error, ToString(MESSAGE_MESSAGE_TOO_LARGE), ToString(PeerAddr), ToString(header.Size)); ReaderData.Status.Incremental.StatusCounter[MESSAGE_MESSAGE_TOO_LARGE] += 1; ScheduleShutdownOnServerOrReconnectOnClient(MESSAGE_MESSAGE_TOO_LARGE, false); return false; } - + if (!ReaderData.HasBytesInBuf(header.Size)) { if (ReaderData.Offset == 0) { ReaderData.Buffer.Reserve(header.Size); } break; } - + if (!QuotaAcquire(1, header.Size)) return false; - + if (!MessageRead(MakeArrayRef(ReaderData.Buffer.Data() + ReaderData.Offset, header.Size), now)) { return false; } - + ReaderData.Offset += header.Size; } - + ReaderData.Buffer.ChopHead(ReaderData.Offset); ReaderData.Offset = 0; @@ -408,7 +408,7 @@ namespace NBus { } Y_ASSERT(ReaderData.Buffer.Avail() > 0); - + ssize_t bytes; { TWhatThreadDoesPushPop pp("recv syscall"); @@ -454,7 +454,7 @@ namespace NBus { message != replyQueueTemp.rend(); ++message) { messages.push_back(message->MessagePtr.Release()); } - + WriterErrorMessages(messages, reason); replyQueueTemp.clear(); @@ -535,10 +535,10 @@ namespace NBus { ClearBeforeSendQueue(reasonForQueues); WriterGetReconnectQueue()->Clear(); WriterGetWakeQueue()->Clear(); - + TMessagesPtrs cleared; ClearOutgoingQueue(cleared, false); - + if (!Session->IsSource_) { for (auto& i : cleared) { TBusMessagePtrAndHeader h(i); @@ -548,10 +548,10 @@ namespace NBus { // and this part is not batch } } - + WriterErrorMessages(cleared, reason); } - + void TRemoteConnection::BeforeTryWrite() { } @@ -638,7 +638,7 @@ namespace NBus { WriterData.Status.Incremental.NetworkOps += 1; WriterData.Buffer.LeftProceed(bytes); - } + } WriterData.Buffer.Clear(); if (WriterData.Buffer.Capacity() > MaxBufferSize) { @@ -654,12 +654,12 @@ namespace NBus { WriterGetReconnectQueue()->EnqueueAndSchedule(writer ? WriterData.SocketVersion : ReaderData.SocketVersion); } else { ScheduleShutdown(status); - } + } } void TRemoteConnection::ScheduleShutdown(EMessageStatus status) { ShutdownReason = status; - + AtomicSet(ReaderData.Down, 1); ScheduleRead(); @@ -856,7 +856,7 @@ namespace NBus { } TTempTlsVector<TBusMessagePtrAndHeader, void, TVectorSwaps> writeMessages; - + for (;;) { THolder<TBusMessage> writeMessage(WriterData.SendQueue.PopFront()); if (!writeMessage) { @@ -944,12 +944,12 @@ namespace NBus { WriterErrorMessage(h.MessagePtr.Release(), status); } } - + void TRemoteConnection::WriterErrorMessage(TNonDestroyingAutoPtr<TBusMessage> m, EMessageStatus status) { TBusMessage* released = m.Release(); WriterErrorMessages(MakeArrayRef(&released, 1), status); } - + void TRemoteConnection::WriterErrorMessages(const TArrayRef<TBusMessage*> ms, EMessageStatus status) { ResetOneWayFlag(ms); @@ -958,17 +958,17 @@ namespace NBus { Session->InvokeOnError(m, status); } } - + void TRemoteConnection::FireClientConnectionEvent(TClientConnectionEvent::EType type) { Y_VERIFY(Session->IsSource_, "state check"); TClientConnectionEvent event(type, ConnectionId, PeerAddr); TRemoteClientSession* session = CheckedCast<TRemoteClientSession*>(Session.Get()); session->ClientHandler->OnClientConnectionEvent(event); } - + bool TRemoteConnection::IsAlive() const { return !AtomicGet(WriterData.Down); } - + } } diff --git a/library/cpp/messagebus/remote_connection.h b/library/cpp/messagebus/remote_connection.h index 4538947368..5141a8ea9f 100644 --- a/library/cpp/messagebus/remote_connection.h +++ b/library/cpp/messagebus/remote_connection.h @@ -13,8 +13,8 @@ #include "storage.h" #include "vector_swaps.h" #include "ybus.h" -#include "misc/granup.h" -#include "misc/tokenquota.h" +#include "misc/granup.h" +#include "misc/tokenquota.h" #include <library/cpp/messagebus/actor/actor.h> #include <library/cpp/messagebus/actor/executor.h> @@ -49,7 +49,7 @@ namespace NBus { struct TWriterToReaderSocketMessage { TSocket Socket; ui32 SocketVersion; - + TWriterToReaderSocketMessage(TSocket socket, ui32 socketVersion) : Socket(socket) , SocketVersion(socketVersion) @@ -154,13 +154,13 @@ namespace NBus { virtual void ReaderProcessMessageUnknownVersion(TArrayRef<const char> dataRef) = 0; bool MessageRead(TArrayRef<const char> dataRef, TInstant now); virtual void MessageSent(TArrayRef<TBusMessagePtrAndHeader> messages) = 0; - + void CallSerialize(TBusMessage* msg, TBuffer& buffer) const; void SerializeMessage(TBusMessage* msg, TBuffer* data, TMessageCounter* counter) const; TBusMessage* DeserializeMessage(TArrayRef<const char> dataRef, const TBusHeader* header, TMessageCounter* messageCounter, EMessageStatus* status) const; - + void ResetOneWayFlag(TArrayRef<TBusMessage*>); - + inline ::NActor::TActor<TRemoteConnection, TWriterTag>* GetWriterActor() { return this; } @@ -269,7 +269,7 @@ namespace NBus { TGranUp<TRemoteConnectionWriterStatus> Writer; TGranUp<TRemoteConnectionReaderStatus> Reader; }; - + TWriterData WriterData; TReaderData ReaderData; TGranStatus GranStatus; @@ -280,15 +280,15 @@ namespace NBus { // client connection only TLockFreeQueueBatch<TBusMessagePtrAndHeader, TVectorSwaps> ReplyQueue; - + EMessageStatus ShutdownReason; }; inline const TNetAddr& TRemoteConnection::GetAddr() const noexcept { return PeerAddr; } - + typedef TIntrusivePtr<TRemoteConnection> TRemoteConnectionPtr; - + } } diff --git a/library/cpp/messagebus/remote_connection_status.cpp b/library/cpp/messagebus/remote_connection_status.cpp index 2c48b2a287..05ae84791c 100644 --- a/library/cpp/messagebus/remote_connection_status.cpp +++ b/library/cpp/messagebus/remote_connection_status.cpp @@ -180,15 +180,15 @@ TString TRemoteConnectionStatus::PrintToString() const { p.AddRow("connect syscalls", WriterStatus.ConnectSyscalls); } - p.AddRow("send queue", LeftPad(WriterStatus.SendQueueSize, 6)); - + p.AddRow("send queue", LeftPad(WriterStatus.SendQueueSize, 6)); + if (Server) { - p.AddRow("quota msg", LeftPad(ReaderStatus.QuotaMsg, 6)); - p.AddRow("quota bytes", LeftPad(ReaderStatus.QuotaBytes, 6)); - p.AddRow("quota exhausted", LeftPad(ReaderStatus.QuotaExhausted, 6)); - p.AddRow("reader wakeups", LeftPad(WriterStatus.ReaderWakeups, 6)); - } else { - p.AddRow("ack messages", LeftPad(WriterStatus.AckMessagesSize, 6)); + p.AddRow("quota msg", LeftPad(ReaderStatus.QuotaMsg, 6)); + p.AddRow("quota bytes", LeftPad(ReaderStatus.QuotaBytes, 6)); + p.AddRow("quota exhausted", LeftPad(ReaderStatus.QuotaExhausted, 6)); + p.AddRow("reader wakeups", LeftPad(WriterStatus.ReaderWakeups, 6)); + } else { + p.AddRow("ack messages", LeftPad(WriterStatus.AckMessagesSize, 6)); } p.AddRow("written", WriterStatus.Incremental.MessageCounter.ToString(false)); diff --git a/library/cpp/messagebus/remote_server_session.cpp b/library/cpp/messagebus/remote_server_session.cpp index 6abbf88a60..34dd2153e2 100644 --- a/library/cpp/messagebus/remote_server_session.cpp +++ b/library/cpp/messagebus/remote_server_session.cpp @@ -24,9 +24,9 @@ TRemoteServerSession::TRemoteServerSession(TBusMessageQueue* queue, { if (config.PerConnectionMaxInFlightBySize > 0) { if (config.PerConnectionMaxInFlightBySize < config.MaxMessageSize) - ythrow yexception() - << "too low PerConnectionMaxInFlightBySize value"; - } + ythrow yexception() + << "too low PerConnectionMaxInFlightBySize value"; + } } namespace NBus { @@ -87,7 +87,7 @@ void TRemoteServerSession::OnMessageReceived(TRemoteConnection* c, TVectorSwaps< void TRemoteServerSession::InvokeOnMessage(TBusMessagePtrAndHeader& request, TIntrusivePtr<TRemoteServerConnection>& conn) { if (Y_UNLIKELY(AtomicGet(Down))) { - ReleaseInWorkRequests(*conn.Get(), request.MessagePtr.Get()); + ReleaseInWorkRequests(*conn.Get(), request.MessagePtr.Get()); InvokeOnError(request.MessagePtr.Release(), MESSAGE_SHUTDOWN); } else { TWhatThreadDoesPushPop pp("OnMessage"); @@ -167,19 +167,19 @@ void TRemoteServerSession::ReleaseInWorkResponses(TArrayRef<const TBusMessagePtr void TRemoteServerSession::ReleaseInWorkRequests(TRemoteConnection& con, TBusMessage* request) { Y_ASSERT((request->LocalFlags & MESSAGE_IN_WORK)); - request->LocalFlags &= ~MESSAGE_IN_WORK; - - const size_t size = request->GetHeader()->Size; + request->LocalFlags &= ~MESSAGE_IN_WORK; - con.QuotaReturnAside(1, size); - ServerOwnedMessages.ReleaseMultiple(1, size); + const size_t size = request->GetHeader()->Size; + + con.QuotaReturnAside(1, size); + ServerOwnedMessages.ReleaseMultiple(1, size); } void TRemoteServerSession::ReleaseInWork(TBusIdentity& ident) { - ident.SetInWork(false); - ident.Connection->QuotaReturnAside(1, ident.Size); + ident.SetInWork(false); + ident.Connection->QuotaReturnAside(1, ident.Size); - ServerOwnedMessages.ReleaseMultiple(1, ident.Size); + ServerOwnedMessages.ReleaseMultiple(1, ident.Size); } void TRemoteServerSession::ConvertInWork(TBusIdentity& req, TBusMessage* reply) { diff --git a/library/cpp/messagebus/session_impl.cpp b/library/cpp/messagebus/session_impl.cpp index ddf9f360c4..7adaa1ae6d 100644 --- a/library/cpp/messagebus/session_impl.cpp +++ b/library/cpp/messagebus/session_impl.cpp @@ -389,14 +389,14 @@ void TBusSessionImpl::StatusUpdateCachedDump() { for (TVector<TAcceptorPtr>::const_iterator acceptor = acceptors.begin(); acceptor != acceptors.end(); ++acceptor) { - const TAcceptorStatus status = (*acceptor)->GranStatus.Listen.Get(); - - acceptorStatusSummary += status; - + const TAcceptorStatus status = (*acceptor)->GranStatus.Listen.Get(); + + acceptorStatusSummary += status; + if (acceptor != acceptors.begin()) { ss << "\n"; } - ss << status.PrintToString(); + ss << status.PrintToString(); } r.Acceptors = ss.Str(); @@ -410,19 +410,19 @@ void TBusSessionImpl::StatusUpdateCachedDump() { if (connection != connections.begin()) { ss << "\n"; } - + TRemoteConnectionStatus status; status.Server = !IsSource_; - status.ReaderStatus = (*connection)->GranStatus.Reader.Get(); - status.WriterStatus = (*connection)->GranStatus.Writer.Get(); - + status.ReaderStatus = (*connection)->GranStatus.Reader.Get(); + status.WriterStatus = (*connection)->GranStatus.Writer.Get(); + ss << status.PrintToString(); - - r.ConnectionStatusSummary.ReaderStatus += status.ReaderStatus; - r.ConnectionStatusSummary.WriterStatus += status.WriterStatus; + + r.ConnectionStatusSummary.ReaderStatus += status.ReaderStatus; + r.ConnectionStatusSummary.WriterStatus += status.WriterStatus; } - r.ConnectionsSummary = r.ConnectionStatusSummary.PrintToString(); + r.ConnectionsSummary = r.ConnectionStatusSummary.PrintToString(); r.Connections = ss.Str(); } diff --git a/library/cpp/messagebus/test/ut/messagebus_ut.cpp b/library/cpp/messagebus/test/ut/messagebus_ut.cpp index 040f9b7702..c11d447224 100644 --- a/library/cpp/messagebus/test/ut/messagebus_ut.cpp +++ b/library/cpp/messagebus/test/ut/messagebus_ut.cpp @@ -962,103 +962,103 @@ Y_UNIT_TEST_SUITE(TMessageBusTests) { client.Sync.WaitForAndIncrement(3); } - + struct TServerForQuotaWake: public TExampleServer { TSystemEvent GoOn; TMutex OneLock; - - TOnMessageContext OneMessage; - - static TBusServerSessionConfig Config() { - TBusServerSessionConfig config; - - config.PerConnectionMaxInFlight = 1; - config.PerConnectionMaxInFlightBySize = 1500; - config.MaxMessageSize = 1024; - - return config; - } - - TServerForQuotaWake() - : TExampleServer("TServerForQuotaWake", Config()) + + TOnMessageContext OneMessage; + + static TBusServerSessionConfig Config() { + TBusServerSessionConfig config; + + config.PerConnectionMaxInFlight = 1; + config.PerConnectionMaxInFlightBySize = 1500; + config.MaxMessageSize = 1024; + + return config; + } + + TServerForQuotaWake() + : TExampleServer("TServerForQuotaWake", Config()) { } - + ~TServerForQuotaWake() override { - Session->Shutdown(); - } - + Session->Shutdown(); + } + void OnMessage(TOnMessageContext& req) override { - if (!GoOn.Wait(0)) { + if (!GoOn.Wait(0)) { TGuard<TMutex> guard(OneLock); - - UNIT_ASSERT(!OneMessage); - - OneMessage.Swap(req); - } else - TExampleServer::OnMessage(req); - } - - void WakeOne() { + + UNIT_ASSERT(!OneMessage); + + OneMessage.Swap(req); + } else + TExampleServer::OnMessage(req); + } + + void WakeOne() { TGuard<TMutex> guard(OneLock); - - UNIT_ASSERT(!!OneMessage); - - TExampleServer::OnMessage(OneMessage); - - TOnMessageContext().Swap(OneMessage); - } - }; - + + UNIT_ASSERT(!!OneMessage); + + TExampleServer::OnMessage(OneMessage); + + TOnMessageContext().Swap(OneMessage); + } + }; + Y_UNIT_TEST(WakeReaderOnQuota) { - const size_t test_msg_count = 64; - - TBusClientSessionConfig clientConfig; - - clientConfig.MaxInFlight = test_msg_count; - - TExampleClient client(clientConfig); - TServerForQuotaWake server; - TInstant start; - - client.MessageCount = test_msg_count; - - const NBus::TNetAddr addr = server.GetActualListenAddr(); - - for (unsigned count = 0;;) { - UNIT_ASSERT(count <= test_msg_count); - - TAutoPtr<TBusMessage> message(new TExampleRequest(&client.Proto.RequestCount)); - EMessageStatus status = client.Session->SendMessageAutoPtr(message, &addr); - - if (status == MESSAGE_OK) { - count++; - - } else if (status == MESSAGE_BUSY) { + const size_t test_msg_count = 64; + + TBusClientSessionConfig clientConfig; + + clientConfig.MaxInFlight = test_msg_count; + + TExampleClient client(clientConfig); + TServerForQuotaWake server; + TInstant start; + + client.MessageCount = test_msg_count; + + const NBus::TNetAddr addr = server.GetActualListenAddr(); + + for (unsigned count = 0;;) { + UNIT_ASSERT(count <= test_msg_count); + + TAutoPtr<TBusMessage> message(new TExampleRequest(&client.Proto.RequestCount)); + EMessageStatus status = client.Session->SendMessageAutoPtr(message, &addr); + + if (status == MESSAGE_OK) { + count++; + + } else if (status == MESSAGE_BUSY) { if (count == test_msg_count) { - TInstant now = TInstant::Now(); - + TInstant now = TInstant::Now(); + if (start.GetValue() == 0) { - start = now; - + start = now; + // TODO: properly check that server is blocked } else if (start + TDuration::MilliSeconds(100) < now) { - break; - } - } - - Sleep(TDuration::MilliSeconds(10)); - - } else - UNIT_ASSERT(false); - } - - server.GoOn.Signal(); - server.WakeOne(); - - client.WaitReplies(); - - server.WaitForOnMessageCount(test_msg_count); + break; + } + } + + Sleep(TDuration::MilliSeconds(10)); + + } else + UNIT_ASSERT(false); + } + + server.GoOn.Signal(); + server.WakeOne(); + + client.WaitReplies(); + + server.WaitForOnMessageCount(test_msg_count); }; Y_UNIT_TEST(TestConnectionAttempts) { diff --git a/library/cpp/messagebus/www/www.cpp b/library/cpp/messagebus/www/www.cpp index 62ec241d85..e501cbf4a9 100644 --- a/library/cpp/messagebus/www/www.cpp +++ b/library/cpp/messagebus/www/www.cpp @@ -200,12 +200,12 @@ struct TBusWww::TImpl { Queues.Add(s->GetQueue()); } - void RegisterQueue(TBusMessageQueuePtr q) { + void RegisterQueue(TBusMessageQueuePtr q) { Y_VERIFY(!!q); - TGuard<TMutex> g(Mutex); - Queues.Add(q); - } - + TGuard<TMutex> g(Mutex); + Queues.Add(q); + } + void RegisterModule(TBusModule* module) { Y_VERIFY(!!module); TGuard<TMutex> g(Mutex); @@ -824,10 +824,10 @@ void TBusWww::RegisterServerSession(TBusServerSessionPtr s) { Impl->RegisterServerSession(s); } -void TBusWww::RegisterQueue(TBusMessageQueuePtr q) { - Impl->RegisterQueue(q); -} - +void TBusWww::RegisterQueue(TBusMessageQueuePtr q) { + Impl->RegisterQueue(q); +} + void TBusWww::RegisterModule(TBusModule* module) { Impl->RegisterModule(module); } |