diff options
author | nga <nga@yandex-team.ru> | 2022-02-10 16:48:09 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:48:09 +0300 |
commit | c2a1af049e9deca890e9923abe64fe6c59060348 (patch) | |
tree | b222e5ac2e2e98872661c51ccceee5da0d291e13 /library/cpp/messagebus/session_impl.h | |
parent | 1f553f46fb4f3c5eec631352cdd900a0709016af (diff) | |
download | ydb-c2a1af049e9deca890e9923abe64fe6c59060348.tar.gz |
Restoring authorship annotation for <nga@yandex-team.ru>. Commit 2 of 2.
Diffstat (limited to 'library/cpp/messagebus/session_impl.h')
-rw-r--r-- | library/cpp/messagebus/session_impl.h | 98 |
1 files changed, 49 insertions, 49 deletions
diff --git a/library/cpp/messagebus/session_impl.h b/library/cpp/messagebus/session_impl.h index 0c6fec1af4..90ef246ff8 100644 --- a/library/cpp/messagebus/session_impl.h +++ b/library/cpp/messagebus/session_impl.h @@ -3,17 +3,17 @@ #include "acceptor_status.h" #include "async_result.h" #include "event_loop.h" -#include "netaddr.h" +#include "netaddr.h" #include "remote_connection.h" -#include "remote_connection_status.h" -#include "session_job_count.h" -#include "shutdown_state.h" +#include "remote_connection_status.h" +#include "session_job_count.h" +#include "shutdown_state.h" #include "ybus.h" #include <library/cpp/messagebus/actor/actor.h> #include <library/cpp/messagebus/actor/queue_in_actor.h> #include <library/cpp/messagebus/monitoring/mon_proto.pb.h> - + #include <library/cpp/threading/future/legacy_future.h> #include <util/generic/array_ref.h> @@ -23,12 +23,12 @@ namespace NBus { namespace NPrivate { typedef TIntrusivePtr<TRemoteClientConnection> TRemoteClientConnectionPtr; typedef TIntrusivePtr<TRemoteServerConnection> TRemoteServerConnectionPtr; - + typedef TIntrusivePtr<TRemoteServerSession> TRemoteServerSessionPtr; typedef TIntrusivePtr<TAcceptor> TAcceptorPtr; typedef TVector<TAcceptorPtr> TAcceptorsPtrs; - + struct TConnectionsAcceptorsSnapshot { TVector<TRemoteConnectionPtr> Connections; TVector<TAcceptorPtr> Acceptors; @@ -37,31 +37,31 @@ namespace NBus { TConnectionsAcceptorsSnapshot(); }; - + typedef TAtomicSharedPtr<TConnectionsAcceptorsSnapshot> TConnectionsAcceptorsSnapshotPtr; - + struct TOnAccept { SOCKET s; TNetAddr addr; TInstant now; }; - + struct TStatusTag {}; struct TConnectionTag {}; - + struct TDeadConnectionTag {}; struct TRemoveTag {}; - + struct TBusSessionImpl : public virtual TBusSession, private ::NActor::TActor<TBusSessionImpl, TStatusTag>, private ::NActor::TActor<TBusSessionImpl, TConnectionTag> - + , private ::NActor::TQueueInActor<TBusSessionImpl, TRemoteConnectionWriterIncrementalStatus, TStatusTag, TDeadConnectionTag>, private ::NActor::TQueueInActor<TBusSessionImpl, TRemoteConnectionReaderIncrementalStatus, TStatusTag, TDeadConnectionTag>, private ::NActor::TQueueInActor<TBusSessionImpl, TAcceptorStatus, TStatusTag, TDeadConnectionTag> - + , private ::NActor::TQueueInActor<TBusSessionImpl, TOnAccept, TConnectionTag>, private ::NActor::TQueueInActor<TBusSessionImpl, TRemoteConnectionPtr, TConnectionTag, TRemoveTag> { @@ -75,43 +75,43 @@ namespace NBus { friend class ::NActor::TQueueInActor<TBusSessionImpl, TAcceptorStatus, TStatusTag, TDeadConnectionTag>; friend class ::NActor::TQueueInActor<TBusSessionImpl, TOnAccept, TConnectionTag>; friend class ::NActor::TQueueInActor<TBusSessionImpl, TRemoteConnectionPtr, TConnectionTag, TRemoveTag>; - + public: ::NActor::TQueueInActor<TBusSessionImpl, TOnAccept, TConnectionTag>* GetOnAcceptQueue() { return this; } - + ::NActor::TQueueInActor<TBusSessionImpl, TRemoteConnectionPtr, TConnectionTag, TRemoveTag>* GetRemoveConnectionQueue() { return this; } - + ::NActor::TActor<TBusSessionImpl, TConnectionTag>* GetConnectionActor() { return this; } - + typedef TGuard<TMutex> TConnectionsGuard; - + TBusSessionImpl(bool isSource, TBusMessageQueue* queue, TBusProtocol* proto, IBusErrorHandler* handler, const TBusSessionConfig& config, const TString& name); - + ~TBusSessionImpl() override; void Shutdown() override; bool IsDown(); - + size_t GetInFlightImpl(const TNetAddr& addr) const; size_t GetConnectSyscallsNumForTestImpl(const TNetAddr& addr) const; void GetInFlightBulk(TArrayRef<const TNetAddr> addrs, TArrayRef<size_t> results) const override; void GetConnectSyscallsNumBulkForTest(TArrayRef<const TNetAddr> addrs, TArrayRef<size_t> results) const override; - + virtual void FillStatus(); TSessionDumpStatus GetStatusRecordInternal() override; TString GetStatus(ui16 flags = YBUS_STATUS_CONNS) override; TConnectionStatusMonRecord GetStatusProtobuf() override; TString GetStatusSingleLine() override; - + void ProcessItem(TStatusTag, TDeadConnectionTag, const TRemoteConnectionWriterIncrementalStatus&); void ProcessItem(TStatusTag, TDeadConnectionTag, const TRemoteConnectionReaderIncrementalStatus&); void ProcessItem(TStatusTag, TDeadConnectionTag, const TAcceptorStatus&); @@ -128,7 +128,7 @@ namespace NBus { const TBusSessionConfig* GetConfig() const noexcept override; TBusMessageQueue* GetQueue() const noexcept override; TString GetNameInternal() override; - + virtual void OnMessageReceived(TRemoteConnection* c, TVectorSwaps<TBusMessagePtrAndHeader>& newMsg) = 0; void Listen(int port, TBusMessageQueue* q); @@ -143,106 +143,106 @@ namespace NBus { } typedef THashMap<TBusSocketAddr, TRemoteConnectionPtr> TAddrRemoteConnections; - + void SendSnapshotToStatusActor(); void InsertConnectionLockAcquired(TRemoteConnection* connection); void InsertAcceptorLockAcquired(TAcceptor* acceptor); - + void GetConnections(TVector<TRemoteConnectionPtr>*); void GetAcceptors(TVector<TAcceptorPtr>*); void GetConnectionsLockAquired(TVector<TRemoteConnectionPtr>*); void GetAcceptorsLockAquired(TVector<TAcceptorPtr>*); - + TRemoteConnectionPtr GetConnection(const TBusSocketAddr& addr, bool create); TRemoteConnectionPtr GetConnectionById(ui64 id); TAcceptorPtr GetAcceptorById(ui64 id); - + void InvokeOnError(TNonDestroyingAutoPtr<TBusMessage>, EMessageStatus); void Cron(); - + TBusSessionJobCount JobCount; - + // TODO: replace with actor TMutex ConnectionsLock; - + struct TImpl; THolder<TImpl> Impl; - + const bool IsSource_; - + TBusMessageQueue* const Queue; TBusProtocol* const Proto; // copied to be available after Proto dies const TString ProtoName; - + IBusErrorHandler* const ErrorHandler; TUseCountHolder HandlerUseCountHolder; TBusSessionConfig Config; // TODO: make const - + NEventLoop::TEventLoop WriteEventLoop; NEventLoop::TEventLoop ReadEventLoop; THolder<NThreading::TLegacyFuture<void, false>> ReadEventLoopThread; THolder<NThreading::TLegacyFuture<void, false>> WriteEventLoopThread; - + THashMap<ui64, TRemoteConnectionPtr> ConnectionsById; TAddrRemoteConnections Connections; TAcceptorsPtrs Acceptors; - + struct TStatusData { TAtomicSharedPtr<TConnectionsAcceptorsSnapshot> ConnectionsAcceptorsSnapshot; ::NActor::TQueueForActor<TAtomicSharedPtr<TConnectionsAcceptorsSnapshot>> ConnectionsAcceptorsSnapshotsQueue; TAtomicShutdownState ShutdownState; - + TBusSessionStatus Status; - + TSessionDumpStatus StatusDumpCached; TMutex StatusDumpCachedMutex; TInstant StatusDumpCachedLastUpdate; - + TStatusData(); }; TStatusData StatusData; - + struct TConnectionsData { TAtomicShutdownState ShutdownState; - + TConnectionsData(); }; TConnectionsData ConnectionsData; - + ::NActor::TQueueInActor<TBusSessionImpl, TRemoteConnectionWriterIncrementalStatus, TStatusTag, TDeadConnectionTag>* GetDeadConnectionWriterStatusQueue() { return this; } - + ::NActor::TQueueInActor<TBusSessionImpl, TRemoteConnectionReaderIncrementalStatus, TStatusTag, TDeadConnectionTag>* GetDeadConnectionReaderStatusQueue() { return this; } - + ::NActor::TQueueInActor<TBusSessionImpl, TAcceptorStatus, TStatusTag, TDeadConnectionTag>* GetDeadAcceptorStatusQueue() { return this; } - + template <typename TItem> ::NActor::IQueueInActor<TItem>* GetQueue() { return this; } - + ui64 LastAcceptorId; ui64 LastConnectionId; - + TAtomic Down; TSystemEvent ShutdownCompleteEvent; }; - + inline TBusProtocol* TBusSessionImpl::GetProto() const noexcept { return Proto; } |