diff options
author | Devtools Arcadia <arcadia-devtools@yandex-team.ru> | 2022-02-07 18:08:42 +0300 |
---|---|---|
committer | Devtools Arcadia <arcadia-devtools@mous.vla.yp-c.yandex.net> | 2022-02-07 18:08:42 +0300 |
commit | 1110808a9d39d4b808aef724c861a2e1a38d2a69 (patch) | |
tree | e26c9fed0de5d9873cce7e00bc214573dc2195b7 /library/cpp/messagebus/remote_client_session.cpp | |
download | ydb-1110808a9d39d4b808aef724c861a2e1a38d2a69.tar.gz |
intermediate changes
ref:cde9a383711a11544ce7e107a78147fb96cc4029
Diffstat (limited to 'library/cpp/messagebus/remote_client_session.cpp')
-rw-r--r-- | library/cpp/messagebus/remote_client_session.cpp | 127 |
1 files changed, 127 insertions, 0 deletions
diff --git a/library/cpp/messagebus/remote_client_session.cpp b/library/cpp/messagebus/remote_client_session.cpp new file mode 100644 index 0000000000..3bc421944f --- /dev/null +++ b/library/cpp/messagebus/remote_client_session.cpp @@ -0,0 +1,127 @@ +#include "remote_client_session.h" + +#include "mb_lwtrace.h" +#include "remote_client_connection.h" + +#include <library/cpp/messagebus/scheduler/scheduler.h> + +#include <util/generic/cast.h> +#include <util/system/defaults.h> + +LWTRACE_USING(LWTRACE_MESSAGEBUS_PROVIDER) + +using namespace NBus; +using namespace NBus::NPrivate; + +TRemoteClientSession::TRemoteClientSession(TBusMessageQueue* queue, + TBusProtocol* proto, IBusClientHandler* handler, + const TBusClientSessionConfig& config, const TString& name) + : TBusSessionImpl(true, queue, proto, handler, config, name) + , ClientRemoteInFlight(config.MaxInFlight, "ClientRemoteInFlight") + , ClientHandler(handler) +{ +} + +TRemoteClientSession::~TRemoteClientSession() { + //Cerr << "~TRemoteClientSession" << Endl; +} + +void TRemoteClientSession::OnMessageReceived(TRemoteConnection* c, TVectorSwaps<TBusMessagePtrAndHeader>& newMsg) { + TAutoPtr<TVectorSwaps<TBusMessagePtrAndHeader>> temp(new TVectorSwaps<TBusMessagePtrAndHeader>); + temp->swap(newMsg); + c->ReplyQueue.EnqueueAll(temp); + c->ScheduleWrite(); +} + +EMessageStatus TRemoteClientSession::SendMessageImpl(TBusMessage* msg, const TNetAddr* addr, bool wait, bool oneWay) { + if (Y_UNLIKELY(IsDown())) { + return MESSAGE_SHUTDOWN; + } + + TBusSocketAddr resolvedAddr; + EMessageStatus ret = GetMessageDestination(msg, addr, &resolvedAddr); + if (ret != MESSAGE_OK) { + return ret; + } + + msg->ReplyTo = resolvedAddr; + + TRemoteConnectionPtr c = ((TBusSessionImpl*)this)->GetConnection(resolvedAddr, true); + Y_ASSERT(!!c); + + return CheckedCast<TRemoteClientConnection*>(c.Get())->SendMessageImpl(msg, wait, oneWay); +} + +EMessageStatus TRemoteClientSession::SendMessage(TBusMessage* msg, const TNetAddr* addr, bool wait) { + return SendMessageImpl(msg, addr, wait, false); +} + +EMessageStatus TRemoteClientSession::SendMessageOneWay(TBusMessage* pMes, const TNetAddr* addr, bool wait) { + return SendMessageImpl(pMes, addr, wait, true); +} + +int TRemoteClientSession::GetInFlight() const noexcept { + return ClientRemoteInFlight.GetCurrent(); +} + +void TRemoteClientSession::FillStatus() { + TBusSessionImpl::FillStatus(); + + StatusData.Status.InFlightCount = ClientRemoteInFlight.GetCurrent(); + StatusData.Status.InputPaused = false; +} + +void TRemoteClientSession::AcquireInFlight(TArrayRef<TBusMessage* const> messages) { + for (auto message : messages) { + Y_ASSERT(!(message->LocalFlags & MESSAGE_IN_FLIGHT_ON_CLIENT)); + message->LocalFlags |= MESSAGE_IN_FLIGHT_ON_CLIENT; + } + ClientRemoteInFlight.IncrementMultiple(messages.size()); +} + +void TRemoteClientSession::ReleaseInFlight(TArrayRef<TBusMessage* const> messages) { + for (auto message : messages) { + Y_ASSERT(message->LocalFlags & MESSAGE_IN_FLIGHT_ON_CLIENT); + message->LocalFlags &= ~MESSAGE_IN_FLIGHT_ON_CLIENT; + } + ClientRemoteInFlight.ReleaseMultiple(messages.size()); +} + +void TRemoteClientSession::ReleaseInFlightAndCallOnReply(TNonDestroyingAutoPtr<TBusMessage> request, TBusMessagePtrAndHeader& response) { + ReleaseInFlight({request.Get()}); + if (Y_UNLIKELY(AtomicGet(Down))) { + InvokeOnError(request, MESSAGE_SHUTDOWN); + InvokeOnError(response.MessagePtr.Release(), MESSAGE_SHUTDOWN); + + TRemoteConnectionReaderIncrementalStatus counter; + LWPROBE(Error, ToString(MESSAGE_SHUTDOWN), "", ""); + counter.StatusCounter[MESSAGE_SHUTDOWN] += 1; + GetDeadConnectionReaderStatusQueue()->EnqueueAndSchedule(counter); + } else { + TWhatThreadDoesPushPop pp("OnReply"); + ClientHandler->OnReply(request, response.MessagePtr.Release()); + } +} + +EMessageStatus TRemoteClientSession::GetMessageDestination(TBusMessage* mess, const TNetAddr* addrp, TBusSocketAddr* dest) { + if (addrp) { + *dest = *addrp; + } else { + TNetAddr tmp; + EMessageStatus ret = const_cast<TBusProtocol*>(GetProto())->GetDestination(this, mess, GetQueue()->GetLocator(), &tmp); + if (ret != MESSAGE_OK) { + return ret; + } + *dest = tmp; + } + return MESSAGE_OK; +} + +void TRemoteClientSession::OpenConnection(const TNetAddr& addr) { + GetConnection(addr)->OpenConnection(); +} + +TBusClientConnectionPtr TRemoteClientSession::GetConnection(const TNetAddr& addr) { + // TODO: GetConnection should not open + return CheckedCast<TRemoteClientConnection*>(((TBusSessionImpl*)this)->GetConnection(addr, true).Get()); +} |