aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/messagebus/session_impl.h
diff options
context:
space:
mode:
authorAnton Samokhvalov <pg83@yandex.ru>2022-02-10 16:45:17 +0300
committerDaniil Cherednik <dcherednik@yandex-team.ru>2022-02-10 16:45:17 +0300
commitd3a398281c6fd1d3672036cb2d63f842d2cb28c5 (patch)
treedd4bd3ca0f36b817e96812825ffaf10d645803f2 /library/cpp/messagebus/session_impl.h
parent72cb13b4aff9bc9cf22e49251bc8fd143f82538f (diff)
downloadydb-d3a398281c6fd1d3672036cb2d63f842d2cb28c5.tar.gz
Restoring authorship annotation for Anton Samokhvalov <pg83@yandex.ru>. Commit 2 of 2.
Diffstat (limited to 'library/cpp/messagebus/session_impl.h')
-rw-r--r--library/cpp/messagebus/session_impl.h408
1 files changed, 204 insertions, 204 deletions
diff --git a/library/cpp/messagebus/session_impl.h b/library/cpp/messagebus/session_impl.h
index e099c798b9..90ef246ff8 100644
--- a/library/cpp/messagebus/session_impl.h
+++ b/library/cpp/messagebus/session_impl.h
@@ -19,241 +19,241 @@
#include <util/generic/array_ref.h>
#include <util/generic/string.h>
-namespace NBus {
- namespace NPrivate {
- typedef TIntrusivePtr<TRemoteClientConnection> TRemoteClientConnectionPtr;
- typedef TIntrusivePtr<TRemoteServerConnection> TRemoteServerConnectionPtr;
-
- typedef TIntrusivePtr<TRemoteServerSession> TRemoteServerSessionPtr;
-
- typedef TIntrusivePtr<TAcceptor> TAcceptorPtr;
- typedef TVector<TAcceptorPtr> TAcceptorsPtrs;
-
- struct TConnectionsAcceptorsSnapshot {
- TVector<TRemoteConnectionPtr> Connections;
- TVector<TAcceptorPtr> Acceptors;
- ui64 LastConnectionId;
- ui64 LastAcceptorId;
-
- TConnectionsAcceptorsSnapshot();
- };
-
- typedef TAtomicSharedPtr<TConnectionsAcceptorsSnapshot> TConnectionsAcceptorsSnapshotPtr;
-
- struct TOnAccept {
- SOCKET s;
- TNetAddr addr;
- TInstant now;
- };
-
- struct TStatusTag {};
- struct TConnectionTag {};
-
- struct TDeadConnectionTag {};
- struct TRemoveTag {};
-
- struct TBusSessionImpl
- : public virtual TBusSession,
- private ::NActor::TActor<TBusSessionImpl, TStatusTag>,
- private ::NActor::TActor<TBusSessionImpl, TConnectionTag>
-
- ,
- private ::NActor::TQueueInActor<TBusSessionImpl, TRemoteConnectionWriterIncrementalStatus, TStatusTag, TDeadConnectionTag>,
- private ::NActor::TQueueInActor<TBusSessionImpl, TRemoteConnectionReaderIncrementalStatus, TStatusTag, TDeadConnectionTag>,
- private ::NActor::TQueueInActor<TBusSessionImpl, TAcceptorStatus, TStatusTag, TDeadConnectionTag>
-
- ,
- private ::NActor::TQueueInActor<TBusSessionImpl, TOnAccept, TConnectionTag>,
- private ::NActor::TQueueInActor<TBusSessionImpl, TRemoteConnectionPtr, TConnectionTag, TRemoveTag> {
- friend class TAcceptor;
- friend class TRemoteConnection;
- friend class TRemoteServerConnection;
- friend class ::NActor::TActor<TBusSessionImpl, TStatusTag>;
- friend class ::NActor::TActor<TBusSessionImpl, TConnectionTag>;
- friend class ::NActor::TQueueInActor<TBusSessionImpl, TRemoteConnectionWriterIncrementalStatus, TStatusTag, TDeadConnectionTag>;
- friend class ::NActor::TQueueInActor<TBusSessionImpl, TRemoteConnectionReaderIncrementalStatus, TStatusTag, TDeadConnectionTag>;
- friend class ::NActor::TQueueInActor<TBusSessionImpl, TAcceptorStatus, TStatusTag, TDeadConnectionTag>;
- friend class ::NActor::TQueueInActor<TBusSessionImpl, TOnAccept, TConnectionTag>;
- friend class ::NActor::TQueueInActor<TBusSessionImpl, TRemoteConnectionPtr, TConnectionTag, TRemoveTag>;
-
- public:
- ::NActor::TQueueInActor<TBusSessionImpl, TOnAccept, TConnectionTag>* GetOnAcceptQueue() {
- return this;
- }
+namespace NBus {
+ namespace NPrivate {
+ typedef TIntrusivePtr<TRemoteClientConnection> TRemoteClientConnectionPtr;
+ typedef TIntrusivePtr<TRemoteServerConnection> TRemoteServerConnectionPtr;
+
+ typedef TIntrusivePtr<TRemoteServerSession> TRemoteServerSessionPtr;
+
+ typedef TIntrusivePtr<TAcceptor> TAcceptorPtr;
+ typedef TVector<TAcceptorPtr> TAcceptorsPtrs;
+
+ struct TConnectionsAcceptorsSnapshot {
+ TVector<TRemoteConnectionPtr> Connections;
+ TVector<TAcceptorPtr> Acceptors;
+ ui64 LastConnectionId;
+ ui64 LastAcceptorId;
+
+ TConnectionsAcceptorsSnapshot();
+ };
+
+ typedef TAtomicSharedPtr<TConnectionsAcceptorsSnapshot> TConnectionsAcceptorsSnapshotPtr;
+
+ struct TOnAccept {
+ SOCKET s;
+ TNetAddr addr;
+ TInstant now;
+ };
+
+ struct TStatusTag {};
+ struct TConnectionTag {};
+
+ struct TDeadConnectionTag {};
+ struct TRemoveTag {};
+
+ struct TBusSessionImpl
+ : public virtual TBusSession,
+ private ::NActor::TActor<TBusSessionImpl, TStatusTag>,
+ private ::NActor::TActor<TBusSessionImpl, TConnectionTag>
+
+ ,
+ private ::NActor::TQueueInActor<TBusSessionImpl, TRemoteConnectionWriterIncrementalStatus, TStatusTag, TDeadConnectionTag>,
+ private ::NActor::TQueueInActor<TBusSessionImpl, TRemoteConnectionReaderIncrementalStatus, TStatusTag, TDeadConnectionTag>,
+ private ::NActor::TQueueInActor<TBusSessionImpl, TAcceptorStatus, TStatusTag, TDeadConnectionTag>
+
+ ,
+ private ::NActor::TQueueInActor<TBusSessionImpl, TOnAccept, TConnectionTag>,
+ private ::NActor::TQueueInActor<TBusSessionImpl, TRemoteConnectionPtr, TConnectionTag, TRemoveTag> {
+ friend class TAcceptor;
+ friend class TRemoteConnection;
+ friend class TRemoteServerConnection;
+ friend class ::NActor::TActor<TBusSessionImpl, TStatusTag>;
+ friend class ::NActor::TActor<TBusSessionImpl, TConnectionTag>;
+ friend class ::NActor::TQueueInActor<TBusSessionImpl, TRemoteConnectionWriterIncrementalStatus, TStatusTag, TDeadConnectionTag>;
+ friend class ::NActor::TQueueInActor<TBusSessionImpl, TRemoteConnectionReaderIncrementalStatus, TStatusTag, TDeadConnectionTag>;
+ friend class ::NActor::TQueueInActor<TBusSessionImpl, TAcceptorStatus, TStatusTag, TDeadConnectionTag>;
+ friend class ::NActor::TQueueInActor<TBusSessionImpl, TOnAccept, TConnectionTag>;
+ friend class ::NActor::TQueueInActor<TBusSessionImpl, TRemoteConnectionPtr, TConnectionTag, TRemoveTag>;
+
+ public:
+ ::NActor::TQueueInActor<TBusSessionImpl, TOnAccept, TConnectionTag>* GetOnAcceptQueue() {
+ return this;
+ }
- ::NActor::TQueueInActor<TBusSessionImpl, TRemoteConnectionPtr, TConnectionTag, TRemoveTag>* GetRemoveConnectionQueue() {
- return this;
- }
-
- ::NActor::TActor<TBusSessionImpl, TConnectionTag>* GetConnectionActor() {
- return this;
- }
-
- typedef TGuard<TMutex> TConnectionsGuard;
-
- TBusSessionImpl(bool isSource, TBusMessageQueue* queue, TBusProtocol* proto,
- IBusErrorHandler* handler,
- const TBusSessionConfig& config, const TString& name);
-
- ~TBusSessionImpl() override;
+ ::NActor::TQueueInActor<TBusSessionImpl, TRemoteConnectionPtr, TConnectionTag, TRemoveTag>* GetRemoveConnectionQueue() {
+ return this;
+ }
+
+ ::NActor::TActor<TBusSessionImpl, TConnectionTag>* GetConnectionActor() {
+ return this;
+ }
+
+ typedef TGuard<TMutex> TConnectionsGuard;
+
+ TBusSessionImpl(bool isSource, TBusMessageQueue* queue, TBusProtocol* proto,
+ IBusErrorHandler* handler,
+ const TBusSessionConfig& config, const TString& name);
+
+ ~TBusSessionImpl() override;
- void Shutdown() override;
- bool IsDown();
+ void Shutdown() override;
+ bool IsDown();
- size_t GetInFlightImpl(const TNetAddr& addr) const;
- size_t GetConnectSyscallsNumForTestImpl(const TNetAddr& addr) const;
+ size_t GetInFlightImpl(const TNetAddr& addr) const;
+ size_t GetConnectSyscallsNumForTestImpl(const TNetAddr& addr) const;
- void GetInFlightBulk(TArrayRef<const TNetAddr> addrs, TArrayRef<size_t> results) const override;
- void GetConnectSyscallsNumBulkForTest(TArrayRef<const TNetAddr> addrs, TArrayRef<size_t> results) const override;
+ void GetInFlightBulk(TArrayRef<const TNetAddr> addrs, TArrayRef<size_t> results) const override;
+ void GetConnectSyscallsNumBulkForTest(TArrayRef<const TNetAddr> addrs, TArrayRef<size_t> results) const override;
- virtual void FillStatus();
- TSessionDumpStatus GetStatusRecordInternal() override;
- TString GetStatus(ui16 flags = YBUS_STATUS_CONNS) override;
- TConnectionStatusMonRecord GetStatusProtobuf() override;
- TString GetStatusSingleLine() override;
+ virtual void FillStatus();
+ TSessionDumpStatus GetStatusRecordInternal() override;
+ TString GetStatus(ui16 flags = YBUS_STATUS_CONNS) override;
+ TConnectionStatusMonRecord GetStatusProtobuf() override;
+ TString GetStatusSingleLine() override;
- void ProcessItem(TStatusTag, TDeadConnectionTag, const TRemoteConnectionWriterIncrementalStatus&);
- void ProcessItem(TStatusTag, TDeadConnectionTag, const TRemoteConnectionReaderIncrementalStatus&);
- void ProcessItem(TStatusTag, TDeadConnectionTag, const TAcceptorStatus&);
- void ProcessItem(TStatusTag, ::NActor::TDefaultTag, const TAcceptorStatus&);
- void ProcessItem(TConnectionTag, ::NActor::TDefaultTag, const TOnAccept&);
- void ProcessItem(TConnectionTag, TRemoveTag, TRemoteConnectionPtr);
- void ProcessConnectionsAcceptorsShapshotQueueItem(TAtomicSharedPtr<TConnectionsAcceptorsSnapshot>);
- void StatusUpdateCachedDump();
- void StatusUpdateCachedDumpIfNecessary(TInstant now);
- void Act(TStatusTag);
- void Act(TConnectionTag);
+ void ProcessItem(TStatusTag, TDeadConnectionTag, const TRemoteConnectionWriterIncrementalStatus&);
+ void ProcessItem(TStatusTag, TDeadConnectionTag, const TRemoteConnectionReaderIncrementalStatus&);
+ void ProcessItem(TStatusTag, TDeadConnectionTag, const TAcceptorStatus&);
+ void ProcessItem(TStatusTag, ::NActor::TDefaultTag, const TAcceptorStatus&);
+ void ProcessItem(TConnectionTag, ::NActor::TDefaultTag, const TOnAccept&);
+ void ProcessItem(TConnectionTag, TRemoveTag, TRemoteConnectionPtr);
+ void ProcessConnectionsAcceptorsShapshotQueueItem(TAtomicSharedPtr<TConnectionsAcceptorsSnapshot>);
+ void StatusUpdateCachedDump();
+ void StatusUpdateCachedDumpIfNecessary(TInstant now);
+ void Act(TStatusTag);
+ void Act(TConnectionTag);
- TBusProtocol* GetProto() const noexcept override;
- const TBusSessionConfig* GetConfig() const noexcept override;
- TBusMessageQueue* GetQueue() const noexcept override;
- TString GetNameInternal() override;
-
- virtual void OnMessageReceived(TRemoteConnection* c, TVectorSwaps<TBusMessagePtrAndHeader>& newMsg) = 0;
-
- void Listen(int port, TBusMessageQueue* q);
- void Listen(const TVector<TBindResult>& bindTo, TBusMessageQueue* q);
- TBusConnection* Accept(SOCKET listen);
-
- inline ::NActor::TActor<TBusSessionImpl, TStatusTag>* GetStatusActor() {
- return this;
- }
- inline ::NActor::TActor<TBusSessionImpl, TConnectionTag>* GetConnectionsActor() {
- return this;
- }
-
- typedef THashMap<TBusSocketAddr, TRemoteConnectionPtr> TAddrRemoteConnections;
-
- void SendSnapshotToStatusActor();
+ TBusProtocol* GetProto() const noexcept override;
+ const TBusSessionConfig* GetConfig() const noexcept override;
+ TBusMessageQueue* GetQueue() const noexcept override;
+ TString GetNameInternal() override;
+
+ virtual void OnMessageReceived(TRemoteConnection* c, TVectorSwaps<TBusMessagePtrAndHeader>& newMsg) = 0;
+
+ void Listen(int port, TBusMessageQueue* q);
+ void Listen(const TVector<TBindResult>& bindTo, TBusMessageQueue* q);
+ TBusConnection* Accept(SOCKET listen);
+
+ inline ::NActor::TActor<TBusSessionImpl, TStatusTag>* GetStatusActor() {
+ return this;
+ }
+ inline ::NActor::TActor<TBusSessionImpl, TConnectionTag>* GetConnectionsActor() {
+ return this;
+ }
+
+ typedef THashMap<TBusSocketAddr, TRemoteConnectionPtr> TAddrRemoteConnections;
+
+ void SendSnapshotToStatusActor();
- void InsertConnectionLockAcquired(TRemoteConnection* connection);
- void InsertAcceptorLockAcquired(TAcceptor* acceptor);
-
- void GetConnections(TVector<TRemoteConnectionPtr>*);
- void GetAcceptors(TVector<TAcceptorPtr>*);
- void GetConnectionsLockAquired(TVector<TRemoteConnectionPtr>*);
- void GetAcceptorsLockAquired(TVector<TAcceptorPtr>*);
-
- TRemoteConnectionPtr GetConnection(const TBusSocketAddr& addr, bool create);
- TRemoteConnectionPtr GetConnectionById(ui64 id);
- TAcceptorPtr GetAcceptorById(ui64 id);
-
- void InvokeOnError(TNonDestroyingAutoPtr<TBusMessage>, EMessageStatus);
-
- void Cron();
+ void InsertConnectionLockAcquired(TRemoteConnection* connection);
+ void InsertAcceptorLockAcquired(TAcceptor* acceptor);
+
+ void GetConnections(TVector<TRemoteConnectionPtr>*);
+ void GetAcceptors(TVector<TAcceptorPtr>*);
+ void GetConnectionsLockAquired(TVector<TRemoteConnectionPtr>*);
+ void GetAcceptorsLockAquired(TVector<TAcceptorPtr>*);
+
+ TRemoteConnectionPtr GetConnection(const TBusSocketAddr& addr, bool create);
+ TRemoteConnectionPtr GetConnectionById(ui64 id);
+ TAcceptorPtr GetAcceptorById(ui64 id);
+
+ void InvokeOnError(TNonDestroyingAutoPtr<TBusMessage>, EMessageStatus);
+
+ void Cron();
- TBusSessionJobCount JobCount;
-
- // TODO: replace with actor
- TMutex ConnectionsLock;
+ TBusSessionJobCount JobCount;
+
+ // TODO: replace with actor
+ TMutex ConnectionsLock;
- struct TImpl;
- THolder<TImpl> Impl;
+ struct TImpl;
+ THolder<TImpl> Impl;
- const bool IsSource_;
+ const bool IsSource_;
- TBusMessageQueue* const Queue;
- TBusProtocol* const Proto;
- // copied to be available after Proto dies
- const TString ProtoName;
+ TBusMessageQueue* const Queue;
+ TBusProtocol* const Proto;
+ // copied to be available after Proto dies
+ const TString ProtoName;
- IBusErrorHandler* const ErrorHandler;
- TUseCountHolder HandlerUseCountHolder;
- TBusSessionConfig Config; // TODO: make const
+ IBusErrorHandler* const ErrorHandler;
+ TUseCountHolder HandlerUseCountHolder;
+ TBusSessionConfig Config; // TODO: make const
- NEventLoop::TEventLoop WriteEventLoop;
- NEventLoop::TEventLoop ReadEventLoop;
- THolder<NThreading::TLegacyFuture<void, false>> ReadEventLoopThread;
- THolder<NThreading::TLegacyFuture<void, false>> WriteEventLoopThread;
+ NEventLoop::TEventLoop WriteEventLoop;
+ NEventLoop::TEventLoop ReadEventLoop;
+ THolder<NThreading::TLegacyFuture<void, false>> ReadEventLoopThread;
+ THolder<NThreading::TLegacyFuture<void, false>> WriteEventLoopThread;
- THashMap<ui64, TRemoteConnectionPtr> ConnectionsById;
- TAddrRemoteConnections Connections;
- TAcceptorsPtrs Acceptors;
+ THashMap<ui64, TRemoteConnectionPtr> ConnectionsById;
+ TAddrRemoteConnections Connections;
+ TAcceptorsPtrs Acceptors;
- struct TStatusData {
- TAtomicSharedPtr<TConnectionsAcceptorsSnapshot> ConnectionsAcceptorsSnapshot;
- ::NActor::TQueueForActor<TAtomicSharedPtr<TConnectionsAcceptorsSnapshot>> ConnectionsAcceptorsSnapshotsQueue;
+ struct TStatusData {
+ TAtomicSharedPtr<TConnectionsAcceptorsSnapshot> ConnectionsAcceptorsSnapshot;
+ ::NActor::TQueueForActor<TAtomicSharedPtr<TConnectionsAcceptorsSnapshot>> ConnectionsAcceptorsSnapshotsQueue;
- TAtomicShutdownState ShutdownState;
+ TAtomicShutdownState ShutdownState;
- TBusSessionStatus Status;
+ TBusSessionStatus Status;
- TSessionDumpStatus StatusDumpCached;
- TMutex StatusDumpCachedMutex;
- TInstant StatusDumpCachedLastUpdate;
+ TSessionDumpStatus StatusDumpCached;
+ TMutex StatusDumpCachedMutex;
+ TInstant StatusDumpCachedLastUpdate;
- TStatusData();
- };
- TStatusData StatusData;
+ TStatusData();
+ };
+ TStatusData StatusData;
- struct TConnectionsData {
- TAtomicShutdownState ShutdownState;
+ struct TConnectionsData {
+ TAtomicShutdownState ShutdownState;
- TConnectionsData();
- };
- TConnectionsData ConnectionsData;
+ TConnectionsData();
+ };
+ TConnectionsData ConnectionsData;
- ::NActor::TQueueInActor<TBusSessionImpl, TRemoteConnectionWriterIncrementalStatus,
- TStatusTag, TDeadConnectionTag>*
- GetDeadConnectionWriterStatusQueue() {
- return this;
- }
+ ::NActor::TQueueInActor<TBusSessionImpl, TRemoteConnectionWriterIncrementalStatus,
+ TStatusTag, TDeadConnectionTag>*
+ GetDeadConnectionWriterStatusQueue() {
+ return this;
+ }
- ::NActor::TQueueInActor<TBusSessionImpl, TRemoteConnectionReaderIncrementalStatus,
- TStatusTag, TDeadConnectionTag>*
- GetDeadConnectionReaderStatusQueue() {
- return this;
- }
+ ::NActor::TQueueInActor<TBusSessionImpl, TRemoteConnectionReaderIncrementalStatus,
+ TStatusTag, TDeadConnectionTag>*
+ GetDeadConnectionReaderStatusQueue() {
+ return this;
+ }
- ::NActor::TQueueInActor<TBusSessionImpl, TAcceptorStatus,
- TStatusTag, TDeadConnectionTag>*
- GetDeadAcceptorStatusQueue() {
- return this;
- }
+ ::NActor::TQueueInActor<TBusSessionImpl, TAcceptorStatus,
+ TStatusTag, TDeadConnectionTag>*
+ GetDeadAcceptorStatusQueue() {
+ return this;
+ }
- template <typename TItem>
- ::NActor::IQueueInActor<TItem>* GetQueue() {
- return this;
- }
+ template <typename TItem>
+ ::NActor::IQueueInActor<TItem>* GetQueue() {
+ return this;
+ }
- ui64 LastAcceptorId;
- ui64 LastConnectionId;
+ ui64 LastAcceptorId;
+ ui64 LastConnectionId;
- TAtomic Down;
+ TAtomic Down;
TSystemEvent ShutdownCompleteEvent;
- };
+ };
- inline TBusProtocol* TBusSessionImpl::GetProto() const noexcept {
- return Proto;
- }
+ inline TBusProtocol* TBusSessionImpl::GetProto() const noexcept {
+ return Proto;
+ }
- inline const TBusSessionConfig* TBusSessionImpl::GetConfig() const noexcept {
- return &Config;
- }
+ inline const TBusSessionConfig* TBusSessionImpl::GetConfig() const noexcept {
+ return &Config;
+ }
- inline TBusMessageQueue* TBusSessionImpl::GetQueue() const noexcept {
- return Queue;
- }
+ inline TBusMessageQueue* TBusSessionImpl::GetQueue() const noexcept {
+ return Queue;
+ }
}
-}
+}