aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/messagebus/session_impl.h
diff options
context:
space:
mode:
authorDevtools Arcadia <arcadia-devtools@yandex-team.ru>2022-02-07 18:08:42 +0300
committerDevtools Arcadia <arcadia-devtools@mous.vla.yp-c.yandex.net>2022-02-07 18:08:42 +0300
commit1110808a9d39d4b808aef724c861a2e1a38d2a69 (patch)
treee26c9fed0de5d9873cce7e00bc214573dc2195b7 /library/cpp/messagebus/session_impl.h
downloadydb-1110808a9d39d4b808aef724c861a2e1a38d2a69.tar.gz
intermediate changes
ref:cde9a383711a11544ce7e107a78147fb96cc4029
Diffstat (limited to 'library/cpp/messagebus/session_impl.h')
-rw-r--r--library/cpp/messagebus/session_impl.h259
1 files changed, 259 insertions, 0 deletions
diff --git a/library/cpp/messagebus/session_impl.h b/library/cpp/messagebus/session_impl.h
new file mode 100644
index 0000000000..90ef246ff8
--- /dev/null
+++ b/library/cpp/messagebus/session_impl.h
@@ -0,0 +1,259 @@
+#pragma once
+
+#include "acceptor_status.h"
+#include "async_result.h"
+#include "event_loop.h"
+#include "netaddr.h"
+#include "remote_connection.h"
+#include "remote_connection_status.h"
+#include "session_job_count.h"
+#include "shutdown_state.h"
+#include "ybus.h"
+
+#include <library/cpp/messagebus/actor/actor.h>
+#include <library/cpp/messagebus/actor/queue_in_actor.h>
+#include <library/cpp/messagebus/monitoring/mon_proto.pb.h>
+
+#include <library/cpp/threading/future/legacy_future.h>
+
+#include <util/generic/array_ref.h>
+#include <util/generic/string.h>
+
+namespace NBus {
+ namespace NPrivate {
+ typedef TIntrusivePtr<TRemoteClientConnection> TRemoteClientConnectionPtr;
+ typedef TIntrusivePtr<TRemoteServerConnection> TRemoteServerConnectionPtr;
+
+ typedef TIntrusivePtr<TRemoteServerSession> TRemoteServerSessionPtr;
+
+ typedef TIntrusivePtr<TAcceptor> TAcceptorPtr;
+ typedef TVector<TAcceptorPtr> TAcceptorsPtrs;
+
+ struct TConnectionsAcceptorsSnapshot {
+ TVector<TRemoteConnectionPtr> Connections;
+ TVector<TAcceptorPtr> Acceptors;
+ ui64 LastConnectionId;
+ ui64 LastAcceptorId;
+
+ TConnectionsAcceptorsSnapshot();
+ };
+
+ typedef TAtomicSharedPtr<TConnectionsAcceptorsSnapshot> TConnectionsAcceptorsSnapshotPtr;
+
+ struct TOnAccept {
+ SOCKET s;
+ TNetAddr addr;
+ TInstant now;
+ };
+
+ struct TStatusTag {};
+ struct TConnectionTag {};
+
+ struct TDeadConnectionTag {};
+ struct TRemoveTag {};
+
+ struct TBusSessionImpl
+ : public virtual TBusSession,
+ private ::NActor::TActor<TBusSessionImpl, TStatusTag>,
+ private ::NActor::TActor<TBusSessionImpl, TConnectionTag>
+
+ ,
+ private ::NActor::TQueueInActor<TBusSessionImpl, TRemoteConnectionWriterIncrementalStatus, TStatusTag, TDeadConnectionTag>,
+ private ::NActor::TQueueInActor<TBusSessionImpl, TRemoteConnectionReaderIncrementalStatus, TStatusTag, TDeadConnectionTag>,
+ private ::NActor::TQueueInActor<TBusSessionImpl, TAcceptorStatus, TStatusTag, TDeadConnectionTag>
+
+ ,
+ private ::NActor::TQueueInActor<TBusSessionImpl, TOnAccept, TConnectionTag>,
+ private ::NActor::TQueueInActor<TBusSessionImpl, TRemoteConnectionPtr, TConnectionTag, TRemoveTag> {
+ friend class TAcceptor;
+ friend class TRemoteConnection;
+ friend class TRemoteServerConnection;
+ friend class ::NActor::TActor<TBusSessionImpl, TStatusTag>;
+ friend class ::NActor::TActor<TBusSessionImpl, TConnectionTag>;
+ friend class ::NActor::TQueueInActor<TBusSessionImpl, TRemoteConnectionWriterIncrementalStatus, TStatusTag, TDeadConnectionTag>;
+ friend class ::NActor::TQueueInActor<TBusSessionImpl, TRemoteConnectionReaderIncrementalStatus, TStatusTag, TDeadConnectionTag>;
+ friend class ::NActor::TQueueInActor<TBusSessionImpl, TAcceptorStatus, TStatusTag, TDeadConnectionTag>;
+ friend class ::NActor::TQueueInActor<TBusSessionImpl, TOnAccept, TConnectionTag>;
+ friend class ::NActor::TQueueInActor<TBusSessionImpl, TRemoteConnectionPtr, TConnectionTag, TRemoveTag>;
+
+ public:
+ ::NActor::TQueueInActor<TBusSessionImpl, TOnAccept, TConnectionTag>* GetOnAcceptQueue() {
+ return this;
+ }
+
+ ::NActor::TQueueInActor<TBusSessionImpl, TRemoteConnectionPtr, TConnectionTag, TRemoveTag>* GetRemoveConnectionQueue() {
+ return this;
+ }
+
+ ::NActor::TActor<TBusSessionImpl, TConnectionTag>* GetConnectionActor() {
+ return this;
+ }
+
+ typedef TGuard<TMutex> TConnectionsGuard;
+
+ TBusSessionImpl(bool isSource, TBusMessageQueue* queue, TBusProtocol* proto,
+ IBusErrorHandler* handler,
+ const TBusSessionConfig& config, const TString& name);
+
+ ~TBusSessionImpl() override;
+
+ void Shutdown() override;
+ bool IsDown();
+
+ size_t GetInFlightImpl(const TNetAddr& addr) const;
+ size_t GetConnectSyscallsNumForTestImpl(const TNetAddr& addr) const;
+
+ void GetInFlightBulk(TArrayRef<const TNetAddr> addrs, TArrayRef<size_t> results) const override;
+ void GetConnectSyscallsNumBulkForTest(TArrayRef<const TNetAddr> addrs, TArrayRef<size_t> results) const override;
+
+ virtual void FillStatus();
+ TSessionDumpStatus GetStatusRecordInternal() override;
+ TString GetStatus(ui16 flags = YBUS_STATUS_CONNS) override;
+ TConnectionStatusMonRecord GetStatusProtobuf() override;
+ TString GetStatusSingleLine() override;
+
+ void ProcessItem(TStatusTag, TDeadConnectionTag, const TRemoteConnectionWriterIncrementalStatus&);
+ void ProcessItem(TStatusTag, TDeadConnectionTag, const TRemoteConnectionReaderIncrementalStatus&);
+ void ProcessItem(TStatusTag, TDeadConnectionTag, const TAcceptorStatus&);
+ void ProcessItem(TStatusTag, ::NActor::TDefaultTag, const TAcceptorStatus&);
+ void ProcessItem(TConnectionTag, ::NActor::TDefaultTag, const TOnAccept&);
+ void ProcessItem(TConnectionTag, TRemoveTag, TRemoteConnectionPtr);
+ void ProcessConnectionsAcceptorsShapshotQueueItem(TAtomicSharedPtr<TConnectionsAcceptorsSnapshot>);
+ void StatusUpdateCachedDump();
+ void StatusUpdateCachedDumpIfNecessary(TInstant now);
+ void Act(TStatusTag);
+ void Act(TConnectionTag);
+
+ TBusProtocol* GetProto() const noexcept override;
+ const TBusSessionConfig* GetConfig() const noexcept override;
+ TBusMessageQueue* GetQueue() const noexcept override;
+ TString GetNameInternal() override;
+
+ virtual void OnMessageReceived(TRemoteConnection* c, TVectorSwaps<TBusMessagePtrAndHeader>& newMsg) = 0;
+
+ void Listen(int port, TBusMessageQueue* q);
+ void Listen(const TVector<TBindResult>& bindTo, TBusMessageQueue* q);
+ TBusConnection* Accept(SOCKET listen);
+
+ inline ::NActor::TActor<TBusSessionImpl, TStatusTag>* GetStatusActor() {
+ return this;
+ }
+ inline ::NActor::TActor<TBusSessionImpl, TConnectionTag>* GetConnectionsActor() {
+ return this;
+ }
+
+ typedef THashMap<TBusSocketAddr, TRemoteConnectionPtr> TAddrRemoteConnections;
+
+ void SendSnapshotToStatusActor();
+
+ void InsertConnectionLockAcquired(TRemoteConnection* connection);
+ void InsertAcceptorLockAcquired(TAcceptor* acceptor);
+
+ void GetConnections(TVector<TRemoteConnectionPtr>*);
+ void GetAcceptors(TVector<TAcceptorPtr>*);
+ void GetConnectionsLockAquired(TVector<TRemoteConnectionPtr>*);
+ void GetAcceptorsLockAquired(TVector<TAcceptorPtr>*);
+
+ TRemoteConnectionPtr GetConnection(const TBusSocketAddr& addr, bool create);
+ TRemoteConnectionPtr GetConnectionById(ui64 id);
+ TAcceptorPtr GetAcceptorById(ui64 id);
+
+ void InvokeOnError(TNonDestroyingAutoPtr<TBusMessage>, EMessageStatus);
+
+ void Cron();
+
+ TBusSessionJobCount JobCount;
+
+ // TODO: replace with actor
+ TMutex ConnectionsLock;
+
+ struct TImpl;
+ THolder<TImpl> Impl;
+
+ const bool IsSource_;
+
+ TBusMessageQueue* const Queue;
+ TBusProtocol* const Proto;
+ // copied to be available after Proto dies
+ const TString ProtoName;
+
+ IBusErrorHandler* const ErrorHandler;
+ TUseCountHolder HandlerUseCountHolder;
+ TBusSessionConfig Config; // TODO: make const
+
+ NEventLoop::TEventLoop WriteEventLoop;
+ NEventLoop::TEventLoop ReadEventLoop;
+ THolder<NThreading::TLegacyFuture<void, false>> ReadEventLoopThread;
+ THolder<NThreading::TLegacyFuture<void, false>> WriteEventLoopThread;
+
+ THashMap<ui64, TRemoteConnectionPtr> ConnectionsById;
+ TAddrRemoteConnections Connections;
+ TAcceptorsPtrs Acceptors;
+
+ struct TStatusData {
+ TAtomicSharedPtr<TConnectionsAcceptorsSnapshot> ConnectionsAcceptorsSnapshot;
+ ::NActor::TQueueForActor<TAtomicSharedPtr<TConnectionsAcceptorsSnapshot>> ConnectionsAcceptorsSnapshotsQueue;
+
+ TAtomicShutdownState ShutdownState;
+
+ TBusSessionStatus Status;
+
+ TSessionDumpStatus StatusDumpCached;
+ TMutex StatusDumpCachedMutex;
+ TInstant StatusDumpCachedLastUpdate;
+
+ TStatusData();
+ };
+ TStatusData StatusData;
+
+ struct TConnectionsData {
+ TAtomicShutdownState ShutdownState;
+
+ TConnectionsData();
+ };
+ TConnectionsData ConnectionsData;
+
+ ::NActor::TQueueInActor<TBusSessionImpl, TRemoteConnectionWriterIncrementalStatus,
+ TStatusTag, TDeadConnectionTag>*
+ GetDeadConnectionWriterStatusQueue() {
+ return this;
+ }
+
+ ::NActor::TQueueInActor<TBusSessionImpl, TRemoteConnectionReaderIncrementalStatus,
+ TStatusTag, TDeadConnectionTag>*
+ GetDeadConnectionReaderStatusQueue() {
+ return this;
+ }
+
+ ::NActor::TQueueInActor<TBusSessionImpl, TAcceptorStatus,
+ TStatusTag, TDeadConnectionTag>*
+ GetDeadAcceptorStatusQueue() {
+ return this;
+ }
+
+ template <typename TItem>
+ ::NActor::IQueueInActor<TItem>* GetQueue() {
+ return this;
+ }
+
+ ui64 LastAcceptorId;
+ ui64 LastConnectionId;
+
+ TAtomic Down;
+ TSystemEvent ShutdownCompleteEvent;
+ };
+
+ inline TBusProtocol* TBusSessionImpl::GetProto() const noexcept {
+ return Proto;
+ }
+
+ inline const TBusSessionConfig* TBusSessionImpl::GetConfig() const noexcept {
+ return &Config;
+ }
+
+ inline TBusMessageQueue* TBusSessionImpl::GetQueue() const noexcept {
+ return Queue;
+ }
+
+ }
+}