aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/messagebus/remote_client_session.cpp
diff options
context:
space:
mode:
authorDevtools Arcadia <arcadia-devtools@yandex-team.ru>2022-02-07 18:08:42 +0300
committerDevtools Arcadia <arcadia-devtools@mous.vla.yp-c.yandex.net>2022-02-07 18:08:42 +0300
commit1110808a9d39d4b808aef724c861a2e1a38d2a69 (patch)
treee26c9fed0de5d9873cce7e00bc214573dc2195b7 /library/cpp/messagebus/remote_client_session.cpp
downloadydb-1110808a9d39d4b808aef724c861a2e1a38d2a69.tar.gz
intermediate changes
ref:cde9a383711a11544ce7e107a78147fb96cc4029
Diffstat (limited to 'library/cpp/messagebus/remote_client_session.cpp')
-rw-r--r--library/cpp/messagebus/remote_client_session.cpp127
1 files changed, 127 insertions, 0 deletions
diff --git a/library/cpp/messagebus/remote_client_session.cpp b/library/cpp/messagebus/remote_client_session.cpp
new file mode 100644
index 0000000000..3bc421944f
--- /dev/null
+++ b/library/cpp/messagebus/remote_client_session.cpp
@@ -0,0 +1,127 @@
+#include "remote_client_session.h"
+
+#include "mb_lwtrace.h"
+#include "remote_client_connection.h"
+
+#include <library/cpp/messagebus/scheduler/scheduler.h>
+
+#include <util/generic/cast.h>
+#include <util/system/defaults.h>
+
+LWTRACE_USING(LWTRACE_MESSAGEBUS_PROVIDER)
+
+using namespace NBus;
+using namespace NBus::NPrivate;
+
+TRemoteClientSession::TRemoteClientSession(TBusMessageQueue* queue,
+ TBusProtocol* proto, IBusClientHandler* handler,
+ const TBusClientSessionConfig& config, const TString& name)
+ : TBusSessionImpl(true, queue, proto, handler, config, name)
+ , ClientRemoteInFlight(config.MaxInFlight, "ClientRemoteInFlight")
+ , ClientHandler(handler)
+{
+}
+
+TRemoteClientSession::~TRemoteClientSession() {
+ //Cerr << "~TRemoteClientSession" << Endl;
+}
+
+void TRemoteClientSession::OnMessageReceived(TRemoteConnection* c, TVectorSwaps<TBusMessagePtrAndHeader>& newMsg) {
+ TAutoPtr<TVectorSwaps<TBusMessagePtrAndHeader>> temp(new TVectorSwaps<TBusMessagePtrAndHeader>);
+ temp->swap(newMsg);
+ c->ReplyQueue.EnqueueAll(temp);
+ c->ScheduleWrite();
+}
+
+EMessageStatus TRemoteClientSession::SendMessageImpl(TBusMessage* msg, const TNetAddr* addr, bool wait, bool oneWay) {
+ if (Y_UNLIKELY(IsDown())) {
+ return MESSAGE_SHUTDOWN;
+ }
+
+ TBusSocketAddr resolvedAddr;
+ EMessageStatus ret = GetMessageDestination(msg, addr, &resolvedAddr);
+ if (ret != MESSAGE_OK) {
+ return ret;
+ }
+
+ msg->ReplyTo = resolvedAddr;
+
+ TRemoteConnectionPtr c = ((TBusSessionImpl*)this)->GetConnection(resolvedAddr, true);
+ Y_ASSERT(!!c);
+
+ return CheckedCast<TRemoteClientConnection*>(c.Get())->SendMessageImpl(msg, wait, oneWay);
+}
+
+EMessageStatus TRemoteClientSession::SendMessage(TBusMessage* msg, const TNetAddr* addr, bool wait) {
+ return SendMessageImpl(msg, addr, wait, false);
+}
+
+EMessageStatus TRemoteClientSession::SendMessageOneWay(TBusMessage* pMes, const TNetAddr* addr, bool wait) {
+ return SendMessageImpl(pMes, addr, wait, true);
+}
+
+int TRemoteClientSession::GetInFlight() const noexcept {
+ return ClientRemoteInFlight.GetCurrent();
+}
+
+void TRemoteClientSession::FillStatus() {
+ TBusSessionImpl::FillStatus();
+
+ StatusData.Status.InFlightCount = ClientRemoteInFlight.GetCurrent();
+ StatusData.Status.InputPaused = false;
+}
+
+void TRemoteClientSession::AcquireInFlight(TArrayRef<TBusMessage* const> messages) {
+ for (auto message : messages) {
+ Y_ASSERT(!(message->LocalFlags & MESSAGE_IN_FLIGHT_ON_CLIENT));
+ message->LocalFlags |= MESSAGE_IN_FLIGHT_ON_CLIENT;
+ }
+ ClientRemoteInFlight.IncrementMultiple(messages.size());
+}
+
+void TRemoteClientSession::ReleaseInFlight(TArrayRef<TBusMessage* const> messages) {
+ for (auto message : messages) {
+ Y_ASSERT(message->LocalFlags & MESSAGE_IN_FLIGHT_ON_CLIENT);
+ message->LocalFlags &= ~MESSAGE_IN_FLIGHT_ON_CLIENT;
+ }
+ ClientRemoteInFlight.ReleaseMultiple(messages.size());
+}
+
+void TRemoteClientSession::ReleaseInFlightAndCallOnReply(TNonDestroyingAutoPtr<TBusMessage> request, TBusMessagePtrAndHeader& response) {
+ ReleaseInFlight({request.Get()});
+ if (Y_UNLIKELY(AtomicGet(Down))) {
+ InvokeOnError(request, MESSAGE_SHUTDOWN);
+ InvokeOnError(response.MessagePtr.Release(), MESSAGE_SHUTDOWN);
+
+ TRemoteConnectionReaderIncrementalStatus counter;
+ LWPROBE(Error, ToString(MESSAGE_SHUTDOWN), "", "");
+ counter.StatusCounter[MESSAGE_SHUTDOWN] += 1;
+ GetDeadConnectionReaderStatusQueue()->EnqueueAndSchedule(counter);
+ } else {
+ TWhatThreadDoesPushPop pp("OnReply");
+ ClientHandler->OnReply(request, response.MessagePtr.Release());
+ }
+}
+
+EMessageStatus TRemoteClientSession::GetMessageDestination(TBusMessage* mess, const TNetAddr* addrp, TBusSocketAddr* dest) {
+ if (addrp) {
+ *dest = *addrp;
+ } else {
+ TNetAddr tmp;
+ EMessageStatus ret = const_cast<TBusProtocol*>(GetProto())->GetDestination(this, mess, GetQueue()->GetLocator(), &tmp);
+ if (ret != MESSAGE_OK) {
+ return ret;
+ }
+ *dest = tmp;
+ }
+ return MESSAGE_OK;
+}
+
+void TRemoteClientSession::OpenConnection(const TNetAddr& addr) {
+ GetConnection(addr)->OpenConnection();
+}
+
+TBusClientConnectionPtr TRemoteClientSession::GetConnection(const TNetAddr& addr) {
+ // TODO: GetConnection should not open
+ return CheckedCast<TRemoteClientConnection*>(((TBusSessionImpl*)this)->GetConnection(addr, true).Get());
+}