diff options
author | Devtools Arcadia <arcadia-devtools@yandex-team.ru> | 2022-02-07 18:08:42 +0300 |
---|---|---|
committer | Devtools Arcadia <arcadia-devtools@mous.vla.yp-c.yandex.net> | 2022-02-07 18:08:42 +0300 |
commit | 1110808a9d39d4b808aef724c861a2e1a38d2a69 (patch) | |
tree | e26c9fed0de5d9873cce7e00bc214573dc2195b7 /library/cpp/messagebus/remote_server_session.cpp | |
download | ydb-1110808a9d39d4b808aef724c861a2e1a38d2a69.tar.gz |
intermediate changes
ref:cde9a383711a11544ce7e107a78147fb96cc4029
Diffstat (limited to 'library/cpp/messagebus/remote_server_session.cpp')
-rw-r--r-- | library/cpp/messagebus/remote_server_session.cpp | 206 |
1 files changed, 206 insertions, 0 deletions
diff --git a/library/cpp/messagebus/remote_server_session.cpp b/library/cpp/messagebus/remote_server_session.cpp new file mode 100644 index 0000000000..6abbf88a60 --- /dev/null +++ b/library/cpp/messagebus/remote_server_session.cpp @@ -0,0 +1,206 @@ +#include "remote_server_session.h" + +#include "remote_connection.h" +#include "remote_server_connection.h" + +#include <library/cpp/messagebus/actor/temp_tls_vector.h> + +#include <util/generic/cast.h> +#include <util/stream/output.h> +#include <util/system/yassert.h> + +#include <typeinfo> + +using namespace NActor; +using namespace NBus; +using namespace NBus::NPrivate; + +TRemoteServerSession::TRemoteServerSession(TBusMessageQueue* queue, + TBusProtocol* proto, IBusServerHandler* handler, + const TBusServerSessionConfig& config, const TString& name) + : TBusSessionImpl(false, queue, proto, handler, config, name) + , ServerOwnedMessages(config.MaxInFlight, config.MaxInFlightBySize, "ServerOwnedMessages") + , ServerHandler(handler) +{ + if (config.PerConnectionMaxInFlightBySize > 0) { + if (config.PerConnectionMaxInFlightBySize < config.MaxMessageSize) + ythrow yexception() + << "too low PerConnectionMaxInFlightBySize value"; + } +} + +namespace NBus { + namespace NPrivate { + class TInvokeOnMessage: public IWorkItem { + private: + TRemoteServerSession* RemoteServerSession; + TBusMessagePtrAndHeader Request; + TIntrusivePtr<TRemoteServerConnection> Connection; + + public: + TInvokeOnMessage(TRemoteServerSession* session, TBusMessagePtrAndHeader& request, TIntrusivePtr<TRemoteServerConnection>& connection) + : RemoteServerSession(session) + { + Y_ASSERT(!!connection); + Connection.Swap(connection); + + Request.Swap(request); + } + + void DoWork() override { + THolder<TInvokeOnMessage> holder(this); + RemoteServerSession->InvokeOnMessage(Request, Connection); + // TODO: TRemoteServerSessionSemaphore should be enough + RemoteServerSession->JobCount.Decrement(); + } + }; + + } +} + +void TRemoteServerSession::OnMessageReceived(TRemoteConnection* c, TVectorSwaps<TBusMessagePtrAndHeader>& messages) { + AcquireInWorkRequests(messages); + + bool executeInPool = Config.ExecuteOnMessageInWorkerPool; + + TTempTlsVector< ::IWorkItem*> workQueueTemp; + + if (executeInPool) { + workQueueTemp.GetVector()->reserve(messages.size()); + } + + for (auto& message : messages) { + // TODO: incref once + TIntrusivePtr<TRemoteServerConnection> connection(CheckedCast<TRemoteServerConnection*>(c)); + if (executeInPool) { + workQueueTemp.GetVector()->push_back(new TInvokeOnMessage(this, message, connection)); + } else { + InvokeOnMessage(message, connection); + } + } + + if (executeInPool) { + JobCount.Add(workQueueTemp.GetVector()->size()); + Queue->EnqueueWork(*workQueueTemp.GetVector()); + } +} + +void TRemoteServerSession::InvokeOnMessage(TBusMessagePtrAndHeader& request, TIntrusivePtr<TRemoteServerConnection>& conn) { + if (Y_UNLIKELY(AtomicGet(Down))) { + ReleaseInWorkRequests(*conn.Get(), request.MessagePtr.Get()); + InvokeOnError(request.MessagePtr.Release(), MESSAGE_SHUTDOWN); + } else { + TWhatThreadDoesPushPop pp("OnMessage"); + + TBusIdentity ident; + + ident.Connection.Swap(conn); + request.MessagePtr->GetIdentity(ident); + + Y_ASSERT(request.MessagePtr->LocalFlags & MESSAGE_IN_WORK); + DoSwap(request.MessagePtr->LocalFlags, ident.LocalFlags); + + ident.RecvTime = request.MessagePtr->RecvTime; + +#ifndef NDEBUG + auto& message = *request.MessagePtr; + ident.SetMessageType(typeid(message)); +#endif + + TOnMessageContext context(request.MessagePtr.Release(), ident, this); + ServerHandler->OnMessage(context); + } +} + +EMessageStatus TRemoteServerSession::ForgetRequest(const TBusIdentity& ident) { + ReleaseInWork(const_cast<TBusIdentity&>(ident)); + + return MESSAGE_OK; +} + +EMessageStatus TRemoteServerSession::SendReply(const TBusIdentity& ident, TBusMessage* reply) { + reply->CheckClean(); + + ConvertInWork(const_cast<TBusIdentity&>(ident), reply); + + reply->RecvTime = ident.RecvTime; + + ident.Connection->Send(reply); + + return MESSAGE_OK; +} + +int TRemoteServerSession::GetInFlight() const noexcept { + return ServerOwnedMessages.GetCurrentCount(); +} + +void TRemoteServerSession::FillStatus() { + TBusSessionImpl::FillStatus(); + + // TODO: weird + StatusData.Status.InFlightCount = ServerOwnedMessages.GetCurrentCount(); + StatusData.Status.InFlightSize = ServerOwnedMessages.GetCurrentSize(); + StatusData.Status.InputPaused = ServerOwnedMessages.IsLocked(); +} + +void TRemoteServerSession::AcquireInWorkRequests(TArrayRef<const TBusMessagePtrAndHeader> messages) { + TAtomicBase size = 0; + for (auto message = messages.begin(); message != messages.end(); ++message) { + Y_ASSERT(!(message->MessagePtr->LocalFlags & MESSAGE_IN_WORK)); + message->MessagePtr->LocalFlags |= MESSAGE_IN_WORK; + size += message->MessagePtr->GetHeader()->Size; + } + + ServerOwnedMessages.IncrementMultiple(messages.size(), size); +} + +void TRemoteServerSession::ReleaseInWorkResponses(TArrayRef<const TBusMessagePtrAndHeader> responses) { + TAtomicBase size = 0; + for (auto response = responses.begin(); response != responses.end(); ++response) { + Y_ASSERT((response->MessagePtr->LocalFlags & MESSAGE_REPLY_IS_BEGING_SENT)); + response->MessagePtr->LocalFlags &= ~MESSAGE_REPLY_IS_BEGING_SENT; + size += response->MessagePtr->RequestSize; + } + + ServerOwnedMessages.ReleaseMultiple(responses.size(), size); +} + +void TRemoteServerSession::ReleaseInWorkRequests(TRemoteConnection& con, TBusMessage* request) { + Y_ASSERT((request->LocalFlags & MESSAGE_IN_WORK)); + request->LocalFlags &= ~MESSAGE_IN_WORK; + + const size_t size = request->GetHeader()->Size; + + con.QuotaReturnAside(1, size); + ServerOwnedMessages.ReleaseMultiple(1, size); +} + +void TRemoteServerSession::ReleaseInWork(TBusIdentity& ident) { + ident.SetInWork(false); + ident.Connection->QuotaReturnAside(1, ident.Size); + + ServerOwnedMessages.ReleaseMultiple(1, ident.Size); +} + +void TRemoteServerSession::ConvertInWork(TBusIdentity& req, TBusMessage* reply) { + reply->SetIdentity(req); + + req.SetInWork(false); + Y_ASSERT(!(reply->LocalFlags & MESSAGE_REPLY_IS_BEGING_SENT)); + reply->LocalFlags |= MESSAGE_REPLY_IS_BEGING_SENT; + reply->RequestSize = req.Size; +} + +void TRemoteServerSession::Shutdown() { + ServerOwnedMessages.Stop(); + TBusSessionImpl::Shutdown(); +} + +void TRemoteServerSession::PauseInput(bool pause) { + ServerOwnedMessages.PauseByUsed(pause); +} + +unsigned TRemoteServerSession::GetActualListenPort() { + Y_VERIFY(Config.ListenPort > 0, "state check"); + return Config.ListenPort; +} |