diff options
author | nga <nga@yandex-team.ru> | 2022-02-10 16:48:09 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:48:09 +0300 |
commit | 1f553f46fb4f3c5eec631352cdd900a0709016af (patch) | |
tree | a231fba2c03b440becaea6c86a2702d0bfb0336e /library/cpp/messagebus/remote_client_session.cpp | |
parent | c4de7efdedc25b49cbea74bd589eecb61b55b60a (diff) | |
download | ydb-1f553f46fb4f3c5eec631352cdd900a0709016af.tar.gz |
Restoring authorship annotation for <nga@yandex-team.ru>. Commit 1 of 2.
Diffstat (limited to 'library/cpp/messagebus/remote_client_session.cpp')
-rw-r--r-- | library/cpp/messagebus/remote_client_session.cpp | 196 |
1 files changed, 98 insertions, 98 deletions
diff --git a/library/cpp/messagebus/remote_client_session.cpp b/library/cpp/messagebus/remote_client_session.cpp index 3bc421944f..70c20b9063 100644 --- a/library/cpp/messagebus/remote_client_session.cpp +++ b/library/cpp/messagebus/remote_client_session.cpp @@ -1,127 +1,127 @@ -#include "remote_client_session.h" +#include "remote_client_session.h" #include "mb_lwtrace.h" -#include "remote_client_connection.h" - +#include "remote_client_connection.h" + #include <library/cpp/messagebus/scheduler/scheduler.h> #include <util/generic/cast.h> #include <util/system/defaults.h> - -LWTRACE_USING(LWTRACE_MESSAGEBUS_PROVIDER) - -using namespace NBus; -using namespace NBus::NPrivate; - -TRemoteClientSession::TRemoteClientSession(TBusMessageQueue* queue, + +LWTRACE_USING(LWTRACE_MESSAGEBUS_PROVIDER) + +using namespace NBus; +using namespace NBus::NPrivate; + +TRemoteClientSession::TRemoteClientSession(TBusMessageQueue* queue, TBusProtocol* proto, IBusClientHandler* handler, const TBusClientSessionConfig& config, const TString& name) - : TBusSessionImpl(true, queue, proto, handler, config, name) - , ClientRemoteInFlight(config.MaxInFlight, "ClientRemoteInFlight") - , ClientHandler(handler) + : TBusSessionImpl(true, queue, proto, handler, config, name) + , ClientRemoteInFlight(config.MaxInFlight, "ClientRemoteInFlight") + , ClientHandler(handler) { } -TRemoteClientSession::~TRemoteClientSession() { - //Cerr << "~TRemoteClientSession" << Endl; -} - -void TRemoteClientSession::OnMessageReceived(TRemoteConnection* c, TVectorSwaps<TBusMessagePtrAndHeader>& newMsg) { +TRemoteClientSession::~TRemoteClientSession() { + //Cerr << "~TRemoteClientSession" << Endl; +} + +void TRemoteClientSession::OnMessageReceived(TRemoteConnection* c, TVectorSwaps<TBusMessagePtrAndHeader>& newMsg) { TAutoPtr<TVectorSwaps<TBusMessagePtrAndHeader>> temp(new TVectorSwaps<TBusMessagePtrAndHeader>); - temp->swap(newMsg); - c->ReplyQueue.EnqueueAll(temp); - c->ScheduleWrite(); + temp->swap(newMsg); + c->ReplyQueue.EnqueueAll(temp); + c->ScheduleWrite(); } - -EMessageStatus TRemoteClientSession::SendMessageImpl(TBusMessage* msg, const TNetAddr* addr, bool wait, bool oneWay) { + +EMessageStatus TRemoteClientSession::SendMessageImpl(TBusMessage* msg, const TNetAddr* addr, bool wait, bool oneWay) { if (Y_UNLIKELY(IsDown())) { - return MESSAGE_SHUTDOWN; - } - - TBusSocketAddr resolvedAddr; - EMessageStatus ret = GetMessageDestination(msg, addr, &resolvedAddr); - if (ret != MESSAGE_OK) { - return ret; - } - - msg->ReplyTo = resolvedAddr; - + return MESSAGE_SHUTDOWN; + } + + TBusSocketAddr resolvedAddr; + EMessageStatus ret = GetMessageDestination(msg, addr, &resolvedAddr); + if (ret != MESSAGE_OK) { + return ret; + } + + msg->ReplyTo = resolvedAddr; + TRemoteConnectionPtr c = ((TBusSessionImpl*)this)->GetConnection(resolvedAddr, true); Y_ASSERT(!!c); - - return CheckedCast<TRemoteClientConnection*>(c.Get())->SendMessageImpl(msg, wait, oneWay); -} - -EMessageStatus TRemoteClientSession::SendMessage(TBusMessage* msg, const TNetAddr* addr, bool wait) { - return SendMessageImpl(msg, addr, wait, false); -} - -EMessageStatus TRemoteClientSession::SendMessageOneWay(TBusMessage* pMes, const TNetAddr* addr, bool wait) { - return SendMessageImpl(pMes, addr, wait, true); -} - + + return CheckedCast<TRemoteClientConnection*>(c.Get())->SendMessageImpl(msg, wait, oneWay); +} + +EMessageStatus TRemoteClientSession::SendMessage(TBusMessage* msg, const TNetAddr* addr, bool wait) { + return SendMessageImpl(msg, addr, wait, false); +} + +EMessageStatus TRemoteClientSession::SendMessageOneWay(TBusMessage* pMes, const TNetAddr* addr, bool wait) { + return SendMessageImpl(pMes, addr, wait, true); +} + int TRemoteClientSession::GetInFlight() const noexcept { - return ClientRemoteInFlight.GetCurrent(); -} - -void TRemoteClientSession::FillStatus() { - TBusSessionImpl::FillStatus(); - - StatusData.Status.InFlightCount = ClientRemoteInFlight.GetCurrent(); - StatusData.Status.InputPaused = false; -} - + return ClientRemoteInFlight.GetCurrent(); +} + +void TRemoteClientSession::FillStatus() { + TBusSessionImpl::FillStatus(); + + StatusData.Status.InFlightCount = ClientRemoteInFlight.GetCurrent(); + StatusData.Status.InputPaused = false; +} + void TRemoteClientSession::AcquireInFlight(TArrayRef<TBusMessage* const> messages) { for (auto message : messages) { Y_ASSERT(!(message->LocalFlags & MESSAGE_IN_FLIGHT_ON_CLIENT)); message->LocalFlags |= MESSAGE_IN_FLIGHT_ON_CLIENT; - } - ClientRemoteInFlight.IncrementMultiple(messages.size()); -} - + } + ClientRemoteInFlight.IncrementMultiple(messages.size()); +} + void TRemoteClientSession::ReleaseInFlight(TArrayRef<TBusMessage* const> messages) { for (auto message : messages) { Y_ASSERT(message->LocalFlags & MESSAGE_IN_FLIGHT_ON_CLIENT); message->LocalFlags &= ~MESSAGE_IN_FLIGHT_ON_CLIENT; - } - ClientRemoteInFlight.ReleaseMultiple(messages.size()); -} - -void TRemoteClientSession::ReleaseInFlightAndCallOnReply(TNonDestroyingAutoPtr<TBusMessage> request, TBusMessagePtrAndHeader& response) { + } + ClientRemoteInFlight.ReleaseMultiple(messages.size()); +} + +void TRemoteClientSession::ReleaseInFlightAndCallOnReply(TNonDestroyingAutoPtr<TBusMessage> request, TBusMessagePtrAndHeader& response) { ReleaseInFlight({request.Get()}); if (Y_UNLIKELY(AtomicGet(Down))) { - InvokeOnError(request, MESSAGE_SHUTDOWN); - InvokeOnError(response.MessagePtr.Release(), MESSAGE_SHUTDOWN); - - TRemoteConnectionReaderIncrementalStatus counter; - LWPROBE(Error, ToString(MESSAGE_SHUTDOWN), "", ""); - counter.StatusCounter[MESSAGE_SHUTDOWN] += 1; - GetDeadConnectionReaderStatusQueue()->EnqueueAndSchedule(counter); - } else { - TWhatThreadDoesPushPop pp("OnReply"); - ClientHandler->OnReply(request, response.MessagePtr.Release()); - } -} - + InvokeOnError(request, MESSAGE_SHUTDOWN); + InvokeOnError(response.MessagePtr.Release(), MESSAGE_SHUTDOWN); + + TRemoteConnectionReaderIncrementalStatus counter; + LWPROBE(Error, ToString(MESSAGE_SHUTDOWN), "", ""); + counter.StatusCounter[MESSAGE_SHUTDOWN] += 1; + GetDeadConnectionReaderStatusQueue()->EnqueueAndSchedule(counter); + } else { + TWhatThreadDoesPushPop pp("OnReply"); + ClientHandler->OnReply(request, response.MessagePtr.Release()); + } +} + EMessageStatus TRemoteClientSession::GetMessageDestination(TBusMessage* mess, const TNetAddr* addrp, TBusSocketAddr* dest) { - if (addrp) { - *dest = *addrp; - } else { - TNetAddr tmp; - EMessageStatus ret = const_cast<TBusProtocol*>(GetProto())->GetDestination(this, mess, GetQueue()->GetLocator(), &tmp); - if (ret != MESSAGE_OK) { - return ret; - } - *dest = tmp; - } - return MESSAGE_OK; -} - -void TRemoteClientSession::OpenConnection(const TNetAddr& addr) { - GetConnection(addr)->OpenConnection(); -} - -TBusClientConnectionPtr TRemoteClientSession::GetConnection(const TNetAddr& addr) { - // TODO: GetConnection should not open + if (addrp) { + *dest = *addrp; + } else { + TNetAddr tmp; + EMessageStatus ret = const_cast<TBusProtocol*>(GetProto())->GetDestination(this, mess, GetQueue()->GetLocator(), &tmp); + if (ret != MESSAGE_OK) { + return ret; + } + *dest = tmp; + } + return MESSAGE_OK; +} + +void TRemoteClientSession::OpenConnection(const TNetAddr& addr) { + GetConnection(addr)->OpenConnection(); +} + +TBusClientConnectionPtr TRemoteClientSession::GetConnection(const TNetAddr& addr) { + // TODO: GetConnection should not open return CheckedCast<TRemoteClientConnection*>(((TBusSessionImpl*)this)->GetConnection(addr, true).Get()); -} +} |