aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/messagebus/remote_client_session.cpp
diff options
context:
space:
mode:
authornga <nga@yandex-team.ru>2022-02-10 16:48:09 +0300
committerDaniil Cherednik <dcherednik@yandex-team.ru>2022-02-10 16:48:09 +0300
commitc2a1af049e9deca890e9923abe64fe6c59060348 (patch)
treeb222e5ac2e2e98872661c51ccceee5da0d291e13 /library/cpp/messagebus/remote_client_session.cpp
parent1f553f46fb4f3c5eec631352cdd900a0709016af (diff)
downloadydb-c2a1af049e9deca890e9923abe64fe6c59060348.tar.gz
Restoring authorship annotation for <nga@yandex-team.ru>. Commit 2 of 2.
Diffstat (limited to 'library/cpp/messagebus/remote_client_session.cpp')
-rw-r--r--library/cpp/messagebus/remote_client_session.cpp196
1 files changed, 98 insertions, 98 deletions
diff --git a/library/cpp/messagebus/remote_client_session.cpp b/library/cpp/messagebus/remote_client_session.cpp
index 70c20b9063..3bc421944f 100644
--- a/library/cpp/messagebus/remote_client_session.cpp
+++ b/library/cpp/messagebus/remote_client_session.cpp
@@ -1,127 +1,127 @@
-#include "remote_client_session.h"
+#include "remote_client_session.h"
#include "mb_lwtrace.h"
-#include "remote_client_connection.h"
-
+#include "remote_client_connection.h"
+
#include <library/cpp/messagebus/scheduler/scheduler.h>
#include <util/generic/cast.h>
#include <util/system/defaults.h>
-
-LWTRACE_USING(LWTRACE_MESSAGEBUS_PROVIDER)
-
-using namespace NBus;
-using namespace NBus::NPrivate;
-
-TRemoteClientSession::TRemoteClientSession(TBusMessageQueue* queue,
+
+LWTRACE_USING(LWTRACE_MESSAGEBUS_PROVIDER)
+
+using namespace NBus;
+using namespace NBus::NPrivate;
+
+TRemoteClientSession::TRemoteClientSession(TBusMessageQueue* queue,
TBusProtocol* proto, IBusClientHandler* handler,
const TBusClientSessionConfig& config, const TString& name)
- : TBusSessionImpl(true, queue, proto, handler, config, name)
- , ClientRemoteInFlight(config.MaxInFlight, "ClientRemoteInFlight")
- , ClientHandler(handler)
+ : TBusSessionImpl(true, queue, proto, handler, config, name)
+ , ClientRemoteInFlight(config.MaxInFlight, "ClientRemoteInFlight")
+ , ClientHandler(handler)
{
}
-TRemoteClientSession::~TRemoteClientSession() {
- //Cerr << "~TRemoteClientSession" << Endl;
-}
-
-void TRemoteClientSession::OnMessageReceived(TRemoteConnection* c, TVectorSwaps<TBusMessagePtrAndHeader>& newMsg) {
+TRemoteClientSession::~TRemoteClientSession() {
+ //Cerr << "~TRemoteClientSession" << Endl;
+}
+
+void TRemoteClientSession::OnMessageReceived(TRemoteConnection* c, TVectorSwaps<TBusMessagePtrAndHeader>& newMsg) {
TAutoPtr<TVectorSwaps<TBusMessagePtrAndHeader>> temp(new TVectorSwaps<TBusMessagePtrAndHeader>);
- temp->swap(newMsg);
- c->ReplyQueue.EnqueueAll(temp);
- c->ScheduleWrite();
+ temp->swap(newMsg);
+ c->ReplyQueue.EnqueueAll(temp);
+ c->ScheduleWrite();
}
-
-EMessageStatus TRemoteClientSession::SendMessageImpl(TBusMessage* msg, const TNetAddr* addr, bool wait, bool oneWay) {
+
+EMessageStatus TRemoteClientSession::SendMessageImpl(TBusMessage* msg, const TNetAddr* addr, bool wait, bool oneWay) {
if (Y_UNLIKELY(IsDown())) {
- return MESSAGE_SHUTDOWN;
- }
-
- TBusSocketAddr resolvedAddr;
- EMessageStatus ret = GetMessageDestination(msg, addr, &resolvedAddr);
- if (ret != MESSAGE_OK) {
- return ret;
- }
-
- msg->ReplyTo = resolvedAddr;
-
+ return MESSAGE_SHUTDOWN;
+ }
+
+ TBusSocketAddr resolvedAddr;
+ EMessageStatus ret = GetMessageDestination(msg, addr, &resolvedAddr);
+ if (ret != MESSAGE_OK) {
+ return ret;
+ }
+
+ msg->ReplyTo = resolvedAddr;
+
TRemoteConnectionPtr c = ((TBusSessionImpl*)this)->GetConnection(resolvedAddr, true);
Y_ASSERT(!!c);
-
- return CheckedCast<TRemoteClientConnection*>(c.Get())->SendMessageImpl(msg, wait, oneWay);
-}
-
-EMessageStatus TRemoteClientSession::SendMessage(TBusMessage* msg, const TNetAddr* addr, bool wait) {
- return SendMessageImpl(msg, addr, wait, false);
-}
-
-EMessageStatus TRemoteClientSession::SendMessageOneWay(TBusMessage* pMes, const TNetAddr* addr, bool wait) {
- return SendMessageImpl(pMes, addr, wait, true);
-}
-
+
+ return CheckedCast<TRemoteClientConnection*>(c.Get())->SendMessageImpl(msg, wait, oneWay);
+}
+
+EMessageStatus TRemoteClientSession::SendMessage(TBusMessage* msg, const TNetAddr* addr, bool wait) {
+ return SendMessageImpl(msg, addr, wait, false);
+}
+
+EMessageStatus TRemoteClientSession::SendMessageOneWay(TBusMessage* pMes, const TNetAddr* addr, bool wait) {
+ return SendMessageImpl(pMes, addr, wait, true);
+}
+
int TRemoteClientSession::GetInFlight() const noexcept {
- return ClientRemoteInFlight.GetCurrent();
-}
-
-void TRemoteClientSession::FillStatus() {
- TBusSessionImpl::FillStatus();
-
- StatusData.Status.InFlightCount = ClientRemoteInFlight.GetCurrent();
- StatusData.Status.InputPaused = false;
-}
-
+ return ClientRemoteInFlight.GetCurrent();
+}
+
+void TRemoteClientSession::FillStatus() {
+ TBusSessionImpl::FillStatus();
+
+ StatusData.Status.InFlightCount = ClientRemoteInFlight.GetCurrent();
+ StatusData.Status.InputPaused = false;
+}
+
void TRemoteClientSession::AcquireInFlight(TArrayRef<TBusMessage* const> messages) {
for (auto message : messages) {
Y_ASSERT(!(message->LocalFlags & MESSAGE_IN_FLIGHT_ON_CLIENT));
message->LocalFlags |= MESSAGE_IN_FLIGHT_ON_CLIENT;
- }
- ClientRemoteInFlight.IncrementMultiple(messages.size());
-}
-
+ }
+ ClientRemoteInFlight.IncrementMultiple(messages.size());
+}
+
void TRemoteClientSession::ReleaseInFlight(TArrayRef<TBusMessage* const> messages) {
for (auto message : messages) {
Y_ASSERT(message->LocalFlags & MESSAGE_IN_FLIGHT_ON_CLIENT);
message->LocalFlags &= ~MESSAGE_IN_FLIGHT_ON_CLIENT;
- }
- ClientRemoteInFlight.ReleaseMultiple(messages.size());
-}
-
-void TRemoteClientSession::ReleaseInFlightAndCallOnReply(TNonDestroyingAutoPtr<TBusMessage> request, TBusMessagePtrAndHeader& response) {
+ }
+ ClientRemoteInFlight.ReleaseMultiple(messages.size());
+}
+
+void TRemoteClientSession::ReleaseInFlightAndCallOnReply(TNonDestroyingAutoPtr<TBusMessage> request, TBusMessagePtrAndHeader& response) {
ReleaseInFlight({request.Get()});
if (Y_UNLIKELY(AtomicGet(Down))) {
- InvokeOnError(request, MESSAGE_SHUTDOWN);
- InvokeOnError(response.MessagePtr.Release(), MESSAGE_SHUTDOWN);
-
- TRemoteConnectionReaderIncrementalStatus counter;
- LWPROBE(Error, ToString(MESSAGE_SHUTDOWN), "", "");
- counter.StatusCounter[MESSAGE_SHUTDOWN] += 1;
- GetDeadConnectionReaderStatusQueue()->EnqueueAndSchedule(counter);
- } else {
- TWhatThreadDoesPushPop pp("OnReply");
- ClientHandler->OnReply(request, response.MessagePtr.Release());
- }
-}
-
+ InvokeOnError(request, MESSAGE_SHUTDOWN);
+ InvokeOnError(response.MessagePtr.Release(), MESSAGE_SHUTDOWN);
+
+ TRemoteConnectionReaderIncrementalStatus counter;
+ LWPROBE(Error, ToString(MESSAGE_SHUTDOWN), "", "");
+ counter.StatusCounter[MESSAGE_SHUTDOWN] += 1;
+ GetDeadConnectionReaderStatusQueue()->EnqueueAndSchedule(counter);
+ } else {
+ TWhatThreadDoesPushPop pp("OnReply");
+ ClientHandler->OnReply(request, response.MessagePtr.Release());
+ }
+}
+
EMessageStatus TRemoteClientSession::GetMessageDestination(TBusMessage* mess, const TNetAddr* addrp, TBusSocketAddr* dest) {
- if (addrp) {
- *dest = *addrp;
- } else {
- TNetAddr tmp;
- EMessageStatus ret = const_cast<TBusProtocol*>(GetProto())->GetDestination(this, mess, GetQueue()->GetLocator(), &tmp);
- if (ret != MESSAGE_OK) {
- return ret;
- }
- *dest = tmp;
- }
- return MESSAGE_OK;
-}
-
-void TRemoteClientSession::OpenConnection(const TNetAddr& addr) {
- GetConnection(addr)->OpenConnection();
-}
-
-TBusClientConnectionPtr TRemoteClientSession::GetConnection(const TNetAddr& addr) {
- // TODO: GetConnection should not open
+ if (addrp) {
+ *dest = *addrp;
+ } else {
+ TNetAddr tmp;
+ EMessageStatus ret = const_cast<TBusProtocol*>(GetProto())->GetDestination(this, mess, GetQueue()->GetLocator(), &tmp);
+ if (ret != MESSAGE_OK) {
+ return ret;
+ }
+ *dest = tmp;
+ }
+ return MESSAGE_OK;
+}
+
+void TRemoteClientSession::OpenConnection(const TNetAddr& addr) {
+ GetConnection(addr)->OpenConnection();
+}
+
+TBusClientConnectionPtr TRemoteClientSession::GetConnection(const TNetAddr& addr) {
+ // TODO: GetConnection should not open
return CheckedCast<TRemoteClientConnection*>(((TBusSessionImpl*)this)->GetConnection(addr, true).Get());
-}
+}