aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/messagebus/remote_client_session.h
diff options
context:
space:
mode:
authorDevtools Arcadia <arcadia-devtools@yandex-team.ru>2022-02-07 18:08:42 +0300
committerDevtools Arcadia <arcadia-devtools@mous.vla.yp-c.yandex.net>2022-02-07 18:08:42 +0300
commit1110808a9d39d4b808aef724c861a2e1a38d2a69 (patch)
treee26c9fed0de5d9873cce7e00bc214573dc2195b7 /library/cpp/messagebus/remote_client_session.h
downloadydb-1110808a9d39d4b808aef724c861a2e1a38d2a69.tar.gz
intermediate changes
ref:cde9a383711a11544ce7e107a78147fb96cc4029
Diffstat (limited to 'library/cpp/messagebus/remote_client_session.h')
-rw-r--r--library/cpp/messagebus/remote_client_session.h59
1 files changed, 59 insertions, 0 deletions
diff --git a/library/cpp/messagebus/remote_client_session.h b/library/cpp/messagebus/remote_client_session.h
new file mode 100644
index 0000000000..7160d0dae9
--- /dev/null
+++ b/library/cpp/messagebus/remote_client_session.h
@@ -0,0 +1,59 @@
+#pragma once
+
+#include "remote_client_session_semaphore.h"
+#include "session_impl.h"
+
+#include <util/generic/array_ref.h>
+#include <util/generic/object_counter.h>
+
+#ifdef _MSC_VER
+#pragma warning(push)
+#pragma warning(disable : 4250) // 'NBus::NPrivate::TRemoteClientSession' : inherits 'NBus::NPrivate::TBusSessionImpl::NBus::NPrivate::TBusSessionImpl::GetConfig' via dominance
+#endif
+
+namespace NBus {
+ namespace NPrivate {
+ using TRemoteClientSessionPtr = TIntrusivePtr<TRemoteClientSession>;
+
+ class TRemoteClientSession: public TBusClientSession, public TBusSessionImpl {
+ friend class TRemoteClientConnection;
+ friend class TInvokeOnReply;
+
+ public:
+ TObjectCounter<TRemoteClientSession> ObjectCounter;
+
+ TRemoteClientSessionSemaphore ClientRemoteInFlight;
+ IBusClientHandler* const ClientHandler;
+
+ public:
+ TRemoteClientSession(TBusMessageQueue* queue, TBusProtocol* proto,
+ IBusClientHandler* handler,
+ const TBusSessionConfig& config, const TString& name);
+
+ ~TRemoteClientSession() override;
+
+ void OnMessageReceived(TRemoteConnection* c, TVectorSwaps<TBusMessagePtrAndHeader>& newMsg) override;
+
+ EMessageStatus SendMessageImpl(TBusMessage* msg, const TNetAddr* addr, bool wait, bool oneWay);
+ EMessageStatus SendMessage(TBusMessage* msg, const TNetAddr* addr = nullptr, bool wait = false) override;
+ EMessageStatus SendMessageOneWay(TBusMessage* msg, const TNetAddr* addr = nullptr, bool wait = false) override;
+
+ int GetInFlight() const noexcept override;
+ void FillStatus() override;
+ void AcquireInFlight(TArrayRef<TBusMessage* const> messages);
+ void ReleaseInFlight(TArrayRef<TBusMessage* const> messages);
+ void ReleaseInFlightAndCallOnReply(TNonDestroyingAutoPtr<TBusMessage> request, TBusMessagePtrAndHeader& response);
+
+ EMessageStatus GetMessageDestination(TBusMessage* mess, const TNetAddr* addrp, TBusSocketAddr* dest);
+
+ void OpenConnection(const TNetAddr&) override;
+
+ TBusClientConnectionPtr GetConnection(const TNetAddr&) override;
+ };
+
+#ifdef _MSC_VER
+#pragma warning(pop)
+#endif
+
+ }
+}