diff options
author | Anton Samokhvalov <pg83@yandex.ru> | 2022-02-10 16:45:15 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:45:15 +0300 |
commit | 72cb13b4aff9bc9cf22e49251bc8fd143f82538f (patch) | |
tree | da2c34829458c7d4e74bdfbdf85dff449e9e7fb8 /library/cpp/messagebus/session_impl.h | |
parent | 778e51ba091dc39e7b7fcab2b9cf4dbedfb6f2b5 (diff) | |
download | ydb-72cb13b4aff9bc9cf22e49251bc8fd143f82538f.tar.gz |
Restoring authorship annotation for Anton Samokhvalov <pg83@yandex.ru>. Commit 1 of 2.
Diffstat (limited to 'library/cpp/messagebus/session_impl.h')
-rw-r--r-- | library/cpp/messagebus/session_impl.h | 408 |
1 files changed, 204 insertions, 204 deletions
diff --git a/library/cpp/messagebus/session_impl.h b/library/cpp/messagebus/session_impl.h index 90ef246ff8..e099c798b9 100644 --- a/library/cpp/messagebus/session_impl.h +++ b/library/cpp/messagebus/session_impl.h @@ -19,241 +19,241 @@ #include <util/generic/array_ref.h> #include <util/generic/string.h> -namespace NBus { - namespace NPrivate { - typedef TIntrusivePtr<TRemoteClientConnection> TRemoteClientConnectionPtr; - typedef TIntrusivePtr<TRemoteServerConnection> TRemoteServerConnectionPtr; - - typedef TIntrusivePtr<TRemoteServerSession> TRemoteServerSessionPtr; - - typedef TIntrusivePtr<TAcceptor> TAcceptorPtr; - typedef TVector<TAcceptorPtr> TAcceptorsPtrs; - - struct TConnectionsAcceptorsSnapshot { - TVector<TRemoteConnectionPtr> Connections; - TVector<TAcceptorPtr> Acceptors; - ui64 LastConnectionId; - ui64 LastAcceptorId; - - TConnectionsAcceptorsSnapshot(); - }; - - typedef TAtomicSharedPtr<TConnectionsAcceptorsSnapshot> TConnectionsAcceptorsSnapshotPtr; - - struct TOnAccept { - SOCKET s; - TNetAddr addr; - TInstant now; - }; - - struct TStatusTag {}; - struct TConnectionTag {}; - - struct TDeadConnectionTag {}; - struct TRemoveTag {}; - - struct TBusSessionImpl - : public virtual TBusSession, - private ::NActor::TActor<TBusSessionImpl, TStatusTag>, - private ::NActor::TActor<TBusSessionImpl, TConnectionTag> - - , - private ::NActor::TQueueInActor<TBusSessionImpl, TRemoteConnectionWriterIncrementalStatus, TStatusTag, TDeadConnectionTag>, - private ::NActor::TQueueInActor<TBusSessionImpl, TRemoteConnectionReaderIncrementalStatus, TStatusTag, TDeadConnectionTag>, - private ::NActor::TQueueInActor<TBusSessionImpl, TAcceptorStatus, TStatusTag, TDeadConnectionTag> - - , - private ::NActor::TQueueInActor<TBusSessionImpl, TOnAccept, TConnectionTag>, - private ::NActor::TQueueInActor<TBusSessionImpl, TRemoteConnectionPtr, TConnectionTag, TRemoveTag> { - friend class TAcceptor; - friend class TRemoteConnection; - friend class TRemoteServerConnection; - friend class ::NActor::TActor<TBusSessionImpl, TStatusTag>; - friend class ::NActor::TActor<TBusSessionImpl, TConnectionTag>; - friend class ::NActor::TQueueInActor<TBusSessionImpl, TRemoteConnectionWriterIncrementalStatus, TStatusTag, TDeadConnectionTag>; - friend class ::NActor::TQueueInActor<TBusSessionImpl, TRemoteConnectionReaderIncrementalStatus, TStatusTag, TDeadConnectionTag>; - friend class ::NActor::TQueueInActor<TBusSessionImpl, TAcceptorStatus, TStatusTag, TDeadConnectionTag>; - friend class ::NActor::TQueueInActor<TBusSessionImpl, TOnAccept, TConnectionTag>; - friend class ::NActor::TQueueInActor<TBusSessionImpl, TRemoteConnectionPtr, TConnectionTag, TRemoveTag>; - - public: - ::NActor::TQueueInActor<TBusSessionImpl, TOnAccept, TConnectionTag>* GetOnAcceptQueue() { - return this; - } +namespace NBus { + namespace NPrivate { + typedef TIntrusivePtr<TRemoteClientConnection> TRemoteClientConnectionPtr; + typedef TIntrusivePtr<TRemoteServerConnection> TRemoteServerConnectionPtr; + + typedef TIntrusivePtr<TRemoteServerSession> TRemoteServerSessionPtr; + + typedef TIntrusivePtr<TAcceptor> TAcceptorPtr; + typedef TVector<TAcceptorPtr> TAcceptorsPtrs; + + struct TConnectionsAcceptorsSnapshot { + TVector<TRemoteConnectionPtr> Connections; + TVector<TAcceptorPtr> Acceptors; + ui64 LastConnectionId; + ui64 LastAcceptorId; + + TConnectionsAcceptorsSnapshot(); + }; + + typedef TAtomicSharedPtr<TConnectionsAcceptorsSnapshot> TConnectionsAcceptorsSnapshotPtr; + + struct TOnAccept { + SOCKET s; + TNetAddr addr; + TInstant now; + }; + + struct TStatusTag {}; + struct TConnectionTag {}; + + struct TDeadConnectionTag {}; + struct TRemoveTag {}; + + struct TBusSessionImpl + : public virtual TBusSession, + private ::NActor::TActor<TBusSessionImpl, TStatusTag>, + private ::NActor::TActor<TBusSessionImpl, TConnectionTag> + + , + private ::NActor::TQueueInActor<TBusSessionImpl, TRemoteConnectionWriterIncrementalStatus, TStatusTag, TDeadConnectionTag>, + private ::NActor::TQueueInActor<TBusSessionImpl, TRemoteConnectionReaderIncrementalStatus, TStatusTag, TDeadConnectionTag>, + private ::NActor::TQueueInActor<TBusSessionImpl, TAcceptorStatus, TStatusTag, TDeadConnectionTag> + + , + private ::NActor::TQueueInActor<TBusSessionImpl, TOnAccept, TConnectionTag>, + private ::NActor::TQueueInActor<TBusSessionImpl, TRemoteConnectionPtr, TConnectionTag, TRemoveTag> { + friend class TAcceptor; + friend class TRemoteConnection; + friend class TRemoteServerConnection; + friend class ::NActor::TActor<TBusSessionImpl, TStatusTag>; + friend class ::NActor::TActor<TBusSessionImpl, TConnectionTag>; + friend class ::NActor::TQueueInActor<TBusSessionImpl, TRemoteConnectionWriterIncrementalStatus, TStatusTag, TDeadConnectionTag>; + friend class ::NActor::TQueueInActor<TBusSessionImpl, TRemoteConnectionReaderIncrementalStatus, TStatusTag, TDeadConnectionTag>; + friend class ::NActor::TQueueInActor<TBusSessionImpl, TAcceptorStatus, TStatusTag, TDeadConnectionTag>; + friend class ::NActor::TQueueInActor<TBusSessionImpl, TOnAccept, TConnectionTag>; + friend class ::NActor::TQueueInActor<TBusSessionImpl, TRemoteConnectionPtr, TConnectionTag, TRemoveTag>; + + public: + ::NActor::TQueueInActor<TBusSessionImpl, TOnAccept, TConnectionTag>* GetOnAcceptQueue() { + return this; + } - ::NActor::TQueueInActor<TBusSessionImpl, TRemoteConnectionPtr, TConnectionTag, TRemoveTag>* GetRemoveConnectionQueue() { - return this; - } - - ::NActor::TActor<TBusSessionImpl, TConnectionTag>* GetConnectionActor() { - return this; - } - - typedef TGuard<TMutex> TConnectionsGuard; - - TBusSessionImpl(bool isSource, TBusMessageQueue* queue, TBusProtocol* proto, - IBusErrorHandler* handler, - const TBusSessionConfig& config, const TString& name); - - ~TBusSessionImpl() override; + ::NActor::TQueueInActor<TBusSessionImpl, TRemoteConnectionPtr, TConnectionTag, TRemoveTag>* GetRemoveConnectionQueue() { + return this; + } + + ::NActor::TActor<TBusSessionImpl, TConnectionTag>* GetConnectionActor() { + return this; + } + + typedef TGuard<TMutex> TConnectionsGuard; + + TBusSessionImpl(bool isSource, TBusMessageQueue* queue, TBusProtocol* proto, + IBusErrorHandler* handler, + const TBusSessionConfig& config, const TString& name); + + ~TBusSessionImpl() override; - void Shutdown() override; - bool IsDown(); + void Shutdown() override; + bool IsDown(); - size_t GetInFlightImpl(const TNetAddr& addr) const; - size_t GetConnectSyscallsNumForTestImpl(const TNetAddr& addr) const; + size_t GetInFlightImpl(const TNetAddr& addr) const; + size_t GetConnectSyscallsNumForTestImpl(const TNetAddr& addr) const; - void GetInFlightBulk(TArrayRef<const TNetAddr> addrs, TArrayRef<size_t> results) const override; - void GetConnectSyscallsNumBulkForTest(TArrayRef<const TNetAddr> addrs, TArrayRef<size_t> results) const override; + void GetInFlightBulk(TArrayRef<const TNetAddr> addrs, TArrayRef<size_t> results) const override; + void GetConnectSyscallsNumBulkForTest(TArrayRef<const TNetAddr> addrs, TArrayRef<size_t> results) const override; - virtual void FillStatus(); - TSessionDumpStatus GetStatusRecordInternal() override; - TString GetStatus(ui16 flags = YBUS_STATUS_CONNS) override; - TConnectionStatusMonRecord GetStatusProtobuf() override; - TString GetStatusSingleLine() override; + virtual void FillStatus(); + TSessionDumpStatus GetStatusRecordInternal() override; + TString GetStatus(ui16 flags = YBUS_STATUS_CONNS) override; + TConnectionStatusMonRecord GetStatusProtobuf() override; + TString GetStatusSingleLine() override; - void ProcessItem(TStatusTag, TDeadConnectionTag, const TRemoteConnectionWriterIncrementalStatus&); - void ProcessItem(TStatusTag, TDeadConnectionTag, const TRemoteConnectionReaderIncrementalStatus&); - void ProcessItem(TStatusTag, TDeadConnectionTag, const TAcceptorStatus&); - void ProcessItem(TStatusTag, ::NActor::TDefaultTag, const TAcceptorStatus&); - void ProcessItem(TConnectionTag, ::NActor::TDefaultTag, const TOnAccept&); - void ProcessItem(TConnectionTag, TRemoveTag, TRemoteConnectionPtr); - void ProcessConnectionsAcceptorsShapshotQueueItem(TAtomicSharedPtr<TConnectionsAcceptorsSnapshot>); - void StatusUpdateCachedDump(); - void StatusUpdateCachedDumpIfNecessary(TInstant now); - void Act(TStatusTag); - void Act(TConnectionTag); + void ProcessItem(TStatusTag, TDeadConnectionTag, const TRemoteConnectionWriterIncrementalStatus&); + void ProcessItem(TStatusTag, TDeadConnectionTag, const TRemoteConnectionReaderIncrementalStatus&); + void ProcessItem(TStatusTag, TDeadConnectionTag, const TAcceptorStatus&); + void ProcessItem(TStatusTag, ::NActor::TDefaultTag, const TAcceptorStatus&); + void ProcessItem(TConnectionTag, ::NActor::TDefaultTag, const TOnAccept&); + void ProcessItem(TConnectionTag, TRemoveTag, TRemoteConnectionPtr); + void ProcessConnectionsAcceptorsShapshotQueueItem(TAtomicSharedPtr<TConnectionsAcceptorsSnapshot>); + void StatusUpdateCachedDump(); + void StatusUpdateCachedDumpIfNecessary(TInstant now); + void Act(TStatusTag); + void Act(TConnectionTag); - TBusProtocol* GetProto() const noexcept override; - const TBusSessionConfig* GetConfig() const noexcept override; - TBusMessageQueue* GetQueue() const noexcept override; - TString GetNameInternal() override; - - virtual void OnMessageReceived(TRemoteConnection* c, TVectorSwaps<TBusMessagePtrAndHeader>& newMsg) = 0; - - void Listen(int port, TBusMessageQueue* q); - void Listen(const TVector<TBindResult>& bindTo, TBusMessageQueue* q); - TBusConnection* Accept(SOCKET listen); - - inline ::NActor::TActor<TBusSessionImpl, TStatusTag>* GetStatusActor() { - return this; - } - inline ::NActor::TActor<TBusSessionImpl, TConnectionTag>* GetConnectionsActor() { - return this; - } - - typedef THashMap<TBusSocketAddr, TRemoteConnectionPtr> TAddrRemoteConnections; - - void SendSnapshotToStatusActor(); + TBusProtocol* GetProto() const noexcept override; + const TBusSessionConfig* GetConfig() const noexcept override; + TBusMessageQueue* GetQueue() const noexcept override; + TString GetNameInternal() override; + + virtual void OnMessageReceived(TRemoteConnection* c, TVectorSwaps<TBusMessagePtrAndHeader>& newMsg) = 0; + + void Listen(int port, TBusMessageQueue* q); + void Listen(const TVector<TBindResult>& bindTo, TBusMessageQueue* q); + TBusConnection* Accept(SOCKET listen); + + inline ::NActor::TActor<TBusSessionImpl, TStatusTag>* GetStatusActor() { + return this; + } + inline ::NActor::TActor<TBusSessionImpl, TConnectionTag>* GetConnectionsActor() { + return this; + } + + typedef THashMap<TBusSocketAddr, TRemoteConnectionPtr> TAddrRemoteConnections; + + void SendSnapshotToStatusActor(); - void InsertConnectionLockAcquired(TRemoteConnection* connection); - void InsertAcceptorLockAcquired(TAcceptor* acceptor); - - void GetConnections(TVector<TRemoteConnectionPtr>*); - void GetAcceptors(TVector<TAcceptorPtr>*); - void GetConnectionsLockAquired(TVector<TRemoteConnectionPtr>*); - void GetAcceptorsLockAquired(TVector<TAcceptorPtr>*); - - TRemoteConnectionPtr GetConnection(const TBusSocketAddr& addr, bool create); - TRemoteConnectionPtr GetConnectionById(ui64 id); - TAcceptorPtr GetAcceptorById(ui64 id); - - void InvokeOnError(TNonDestroyingAutoPtr<TBusMessage>, EMessageStatus); - - void Cron(); + void InsertConnectionLockAcquired(TRemoteConnection* connection); + void InsertAcceptorLockAcquired(TAcceptor* acceptor); + + void GetConnections(TVector<TRemoteConnectionPtr>*); + void GetAcceptors(TVector<TAcceptorPtr>*); + void GetConnectionsLockAquired(TVector<TRemoteConnectionPtr>*); + void GetAcceptorsLockAquired(TVector<TAcceptorPtr>*); + + TRemoteConnectionPtr GetConnection(const TBusSocketAddr& addr, bool create); + TRemoteConnectionPtr GetConnectionById(ui64 id); + TAcceptorPtr GetAcceptorById(ui64 id); + + void InvokeOnError(TNonDestroyingAutoPtr<TBusMessage>, EMessageStatus); + + void Cron(); - TBusSessionJobCount JobCount; - - // TODO: replace with actor - TMutex ConnectionsLock; + TBusSessionJobCount JobCount; + + // TODO: replace with actor + TMutex ConnectionsLock; - struct TImpl; - THolder<TImpl> Impl; + struct TImpl; + THolder<TImpl> Impl; - const bool IsSource_; + const bool IsSource_; - TBusMessageQueue* const Queue; - TBusProtocol* const Proto; - // copied to be available after Proto dies - const TString ProtoName; + TBusMessageQueue* const Queue; + TBusProtocol* const Proto; + // copied to be available after Proto dies + const TString ProtoName; - IBusErrorHandler* const ErrorHandler; - TUseCountHolder HandlerUseCountHolder; - TBusSessionConfig Config; // TODO: make const + IBusErrorHandler* const ErrorHandler; + TUseCountHolder HandlerUseCountHolder; + TBusSessionConfig Config; // TODO: make const - NEventLoop::TEventLoop WriteEventLoop; - NEventLoop::TEventLoop ReadEventLoop; - THolder<NThreading::TLegacyFuture<void, false>> ReadEventLoopThread; - THolder<NThreading::TLegacyFuture<void, false>> WriteEventLoopThread; + NEventLoop::TEventLoop WriteEventLoop; + NEventLoop::TEventLoop ReadEventLoop; + THolder<NThreading::TLegacyFuture<void, false>> ReadEventLoopThread; + THolder<NThreading::TLegacyFuture<void, false>> WriteEventLoopThread; - THashMap<ui64, TRemoteConnectionPtr> ConnectionsById; - TAddrRemoteConnections Connections; - TAcceptorsPtrs Acceptors; + THashMap<ui64, TRemoteConnectionPtr> ConnectionsById; + TAddrRemoteConnections Connections; + TAcceptorsPtrs Acceptors; - struct TStatusData { - TAtomicSharedPtr<TConnectionsAcceptorsSnapshot> ConnectionsAcceptorsSnapshot; - ::NActor::TQueueForActor<TAtomicSharedPtr<TConnectionsAcceptorsSnapshot>> ConnectionsAcceptorsSnapshotsQueue; + struct TStatusData { + TAtomicSharedPtr<TConnectionsAcceptorsSnapshot> ConnectionsAcceptorsSnapshot; + ::NActor::TQueueForActor<TAtomicSharedPtr<TConnectionsAcceptorsSnapshot>> ConnectionsAcceptorsSnapshotsQueue; - TAtomicShutdownState ShutdownState; + TAtomicShutdownState ShutdownState; - TBusSessionStatus Status; + TBusSessionStatus Status; - TSessionDumpStatus StatusDumpCached; - TMutex StatusDumpCachedMutex; - TInstant StatusDumpCachedLastUpdate; + TSessionDumpStatus StatusDumpCached; + TMutex StatusDumpCachedMutex; + TInstant StatusDumpCachedLastUpdate; - TStatusData(); - }; - TStatusData StatusData; + TStatusData(); + }; + TStatusData StatusData; - struct TConnectionsData { - TAtomicShutdownState ShutdownState; + struct TConnectionsData { + TAtomicShutdownState ShutdownState; - TConnectionsData(); - }; - TConnectionsData ConnectionsData; + TConnectionsData(); + }; + TConnectionsData ConnectionsData; - ::NActor::TQueueInActor<TBusSessionImpl, TRemoteConnectionWriterIncrementalStatus, - TStatusTag, TDeadConnectionTag>* - GetDeadConnectionWriterStatusQueue() { - return this; - } + ::NActor::TQueueInActor<TBusSessionImpl, TRemoteConnectionWriterIncrementalStatus, + TStatusTag, TDeadConnectionTag>* + GetDeadConnectionWriterStatusQueue() { + return this; + } - ::NActor::TQueueInActor<TBusSessionImpl, TRemoteConnectionReaderIncrementalStatus, - TStatusTag, TDeadConnectionTag>* - GetDeadConnectionReaderStatusQueue() { - return this; - } + ::NActor::TQueueInActor<TBusSessionImpl, TRemoteConnectionReaderIncrementalStatus, + TStatusTag, TDeadConnectionTag>* + GetDeadConnectionReaderStatusQueue() { + return this; + } - ::NActor::TQueueInActor<TBusSessionImpl, TAcceptorStatus, - TStatusTag, TDeadConnectionTag>* - GetDeadAcceptorStatusQueue() { - return this; - } + ::NActor::TQueueInActor<TBusSessionImpl, TAcceptorStatus, + TStatusTag, TDeadConnectionTag>* + GetDeadAcceptorStatusQueue() { + return this; + } - template <typename TItem> - ::NActor::IQueueInActor<TItem>* GetQueue() { - return this; - } + template <typename TItem> + ::NActor::IQueueInActor<TItem>* GetQueue() { + return this; + } - ui64 LastAcceptorId; - ui64 LastConnectionId; + ui64 LastAcceptorId; + ui64 LastConnectionId; - TAtomic Down; + TAtomic Down; TSystemEvent ShutdownCompleteEvent; - }; + }; - inline TBusProtocol* TBusSessionImpl::GetProto() const noexcept { - return Proto; - } + inline TBusProtocol* TBusSessionImpl::GetProto() const noexcept { + return Proto; + } - inline const TBusSessionConfig* TBusSessionImpl::GetConfig() const noexcept { - return &Config; - } + inline const TBusSessionConfig* TBusSessionImpl::GetConfig() const noexcept { + return &Config; + } - inline TBusMessageQueue* TBusSessionImpl::GetQueue() const noexcept { - return Queue; - } + inline TBusMessageQueue* TBusSessionImpl::GetQueue() const noexcept { + return Queue; + } } -} +} |