diff options
author | Devtools Arcadia <arcadia-devtools@yandex-team.ru> | 2022-02-07 18:08:42 +0300 |
---|---|---|
committer | Devtools Arcadia <arcadia-devtools@mous.vla.yp-c.yandex.net> | 2022-02-07 18:08:42 +0300 |
commit | 1110808a9d39d4b808aef724c861a2e1a38d2a69 (patch) | |
tree | e26c9fed0de5d9873cce7e00bc214573dc2195b7 /library/cpp/messagebus/session_impl.h | |
download | ydb-1110808a9d39d4b808aef724c861a2e1a38d2a69.tar.gz |
intermediate changes
ref:cde9a383711a11544ce7e107a78147fb96cc4029
Diffstat (limited to 'library/cpp/messagebus/session_impl.h')
-rw-r--r-- | library/cpp/messagebus/session_impl.h | 259 |
1 files changed, 259 insertions, 0 deletions
diff --git a/library/cpp/messagebus/session_impl.h b/library/cpp/messagebus/session_impl.h new file mode 100644 index 0000000000..90ef246ff8 --- /dev/null +++ b/library/cpp/messagebus/session_impl.h @@ -0,0 +1,259 @@ +#pragma once + +#include "acceptor_status.h" +#include "async_result.h" +#include "event_loop.h" +#include "netaddr.h" +#include "remote_connection.h" +#include "remote_connection_status.h" +#include "session_job_count.h" +#include "shutdown_state.h" +#include "ybus.h" + +#include <library/cpp/messagebus/actor/actor.h> +#include <library/cpp/messagebus/actor/queue_in_actor.h> +#include <library/cpp/messagebus/monitoring/mon_proto.pb.h> + +#include <library/cpp/threading/future/legacy_future.h> + +#include <util/generic/array_ref.h> +#include <util/generic/string.h> + +namespace NBus { + namespace NPrivate { + typedef TIntrusivePtr<TRemoteClientConnection> TRemoteClientConnectionPtr; + typedef TIntrusivePtr<TRemoteServerConnection> TRemoteServerConnectionPtr; + + typedef TIntrusivePtr<TRemoteServerSession> TRemoteServerSessionPtr; + + typedef TIntrusivePtr<TAcceptor> TAcceptorPtr; + typedef TVector<TAcceptorPtr> TAcceptorsPtrs; + + struct TConnectionsAcceptorsSnapshot { + TVector<TRemoteConnectionPtr> Connections; + TVector<TAcceptorPtr> Acceptors; + ui64 LastConnectionId; + ui64 LastAcceptorId; + + TConnectionsAcceptorsSnapshot(); + }; + + typedef TAtomicSharedPtr<TConnectionsAcceptorsSnapshot> TConnectionsAcceptorsSnapshotPtr; + + struct TOnAccept { + SOCKET s; + TNetAddr addr; + TInstant now; + }; + + struct TStatusTag {}; + struct TConnectionTag {}; + + struct TDeadConnectionTag {}; + struct TRemoveTag {}; + + struct TBusSessionImpl + : public virtual TBusSession, + private ::NActor::TActor<TBusSessionImpl, TStatusTag>, + private ::NActor::TActor<TBusSessionImpl, TConnectionTag> + + , + private ::NActor::TQueueInActor<TBusSessionImpl, TRemoteConnectionWriterIncrementalStatus, TStatusTag, TDeadConnectionTag>, + private ::NActor::TQueueInActor<TBusSessionImpl, TRemoteConnectionReaderIncrementalStatus, TStatusTag, TDeadConnectionTag>, + private ::NActor::TQueueInActor<TBusSessionImpl, TAcceptorStatus, TStatusTag, TDeadConnectionTag> + + , + private ::NActor::TQueueInActor<TBusSessionImpl, TOnAccept, TConnectionTag>, + private ::NActor::TQueueInActor<TBusSessionImpl, TRemoteConnectionPtr, TConnectionTag, TRemoveTag> { + friend class TAcceptor; + friend class TRemoteConnection; + friend class TRemoteServerConnection; + friend class ::NActor::TActor<TBusSessionImpl, TStatusTag>; + friend class ::NActor::TActor<TBusSessionImpl, TConnectionTag>; + friend class ::NActor::TQueueInActor<TBusSessionImpl, TRemoteConnectionWriterIncrementalStatus, TStatusTag, TDeadConnectionTag>; + friend class ::NActor::TQueueInActor<TBusSessionImpl, TRemoteConnectionReaderIncrementalStatus, TStatusTag, TDeadConnectionTag>; + friend class ::NActor::TQueueInActor<TBusSessionImpl, TAcceptorStatus, TStatusTag, TDeadConnectionTag>; + friend class ::NActor::TQueueInActor<TBusSessionImpl, TOnAccept, TConnectionTag>; + friend class ::NActor::TQueueInActor<TBusSessionImpl, TRemoteConnectionPtr, TConnectionTag, TRemoveTag>; + + public: + ::NActor::TQueueInActor<TBusSessionImpl, TOnAccept, TConnectionTag>* GetOnAcceptQueue() { + return this; + } + + ::NActor::TQueueInActor<TBusSessionImpl, TRemoteConnectionPtr, TConnectionTag, TRemoveTag>* GetRemoveConnectionQueue() { + return this; + } + + ::NActor::TActor<TBusSessionImpl, TConnectionTag>* GetConnectionActor() { + return this; + } + + typedef TGuard<TMutex> TConnectionsGuard; + + TBusSessionImpl(bool isSource, TBusMessageQueue* queue, TBusProtocol* proto, + IBusErrorHandler* handler, + const TBusSessionConfig& config, const TString& name); + + ~TBusSessionImpl() override; + + void Shutdown() override; + bool IsDown(); + + size_t GetInFlightImpl(const TNetAddr& addr) const; + size_t GetConnectSyscallsNumForTestImpl(const TNetAddr& addr) const; + + void GetInFlightBulk(TArrayRef<const TNetAddr> addrs, TArrayRef<size_t> results) const override; + void GetConnectSyscallsNumBulkForTest(TArrayRef<const TNetAddr> addrs, TArrayRef<size_t> results) const override; + + virtual void FillStatus(); + TSessionDumpStatus GetStatusRecordInternal() override; + TString GetStatus(ui16 flags = YBUS_STATUS_CONNS) override; + TConnectionStatusMonRecord GetStatusProtobuf() override; + TString GetStatusSingleLine() override; + + void ProcessItem(TStatusTag, TDeadConnectionTag, const TRemoteConnectionWriterIncrementalStatus&); + void ProcessItem(TStatusTag, TDeadConnectionTag, const TRemoteConnectionReaderIncrementalStatus&); + void ProcessItem(TStatusTag, TDeadConnectionTag, const TAcceptorStatus&); + void ProcessItem(TStatusTag, ::NActor::TDefaultTag, const TAcceptorStatus&); + void ProcessItem(TConnectionTag, ::NActor::TDefaultTag, const TOnAccept&); + void ProcessItem(TConnectionTag, TRemoveTag, TRemoteConnectionPtr); + void ProcessConnectionsAcceptorsShapshotQueueItem(TAtomicSharedPtr<TConnectionsAcceptorsSnapshot>); + void StatusUpdateCachedDump(); + void StatusUpdateCachedDumpIfNecessary(TInstant now); + void Act(TStatusTag); + void Act(TConnectionTag); + + TBusProtocol* GetProto() const noexcept override; + const TBusSessionConfig* GetConfig() const noexcept override; + TBusMessageQueue* GetQueue() const noexcept override; + TString GetNameInternal() override; + + virtual void OnMessageReceived(TRemoteConnection* c, TVectorSwaps<TBusMessagePtrAndHeader>& newMsg) = 0; + + void Listen(int port, TBusMessageQueue* q); + void Listen(const TVector<TBindResult>& bindTo, TBusMessageQueue* q); + TBusConnection* Accept(SOCKET listen); + + inline ::NActor::TActor<TBusSessionImpl, TStatusTag>* GetStatusActor() { + return this; + } + inline ::NActor::TActor<TBusSessionImpl, TConnectionTag>* GetConnectionsActor() { + return this; + } + + typedef THashMap<TBusSocketAddr, TRemoteConnectionPtr> TAddrRemoteConnections; + + void SendSnapshotToStatusActor(); + + void InsertConnectionLockAcquired(TRemoteConnection* connection); + void InsertAcceptorLockAcquired(TAcceptor* acceptor); + + void GetConnections(TVector<TRemoteConnectionPtr>*); + void GetAcceptors(TVector<TAcceptorPtr>*); + void GetConnectionsLockAquired(TVector<TRemoteConnectionPtr>*); + void GetAcceptorsLockAquired(TVector<TAcceptorPtr>*); + + TRemoteConnectionPtr GetConnection(const TBusSocketAddr& addr, bool create); + TRemoteConnectionPtr GetConnectionById(ui64 id); + TAcceptorPtr GetAcceptorById(ui64 id); + + void InvokeOnError(TNonDestroyingAutoPtr<TBusMessage>, EMessageStatus); + + void Cron(); + + TBusSessionJobCount JobCount; + + // TODO: replace with actor + TMutex ConnectionsLock; + + struct TImpl; + THolder<TImpl> Impl; + + const bool IsSource_; + + TBusMessageQueue* const Queue; + TBusProtocol* const Proto; + // copied to be available after Proto dies + const TString ProtoName; + + IBusErrorHandler* const ErrorHandler; + TUseCountHolder HandlerUseCountHolder; + TBusSessionConfig Config; // TODO: make const + + NEventLoop::TEventLoop WriteEventLoop; + NEventLoop::TEventLoop ReadEventLoop; + THolder<NThreading::TLegacyFuture<void, false>> ReadEventLoopThread; + THolder<NThreading::TLegacyFuture<void, false>> WriteEventLoopThread; + + THashMap<ui64, TRemoteConnectionPtr> ConnectionsById; + TAddrRemoteConnections Connections; + TAcceptorsPtrs Acceptors; + + struct TStatusData { + TAtomicSharedPtr<TConnectionsAcceptorsSnapshot> ConnectionsAcceptorsSnapshot; + ::NActor::TQueueForActor<TAtomicSharedPtr<TConnectionsAcceptorsSnapshot>> ConnectionsAcceptorsSnapshotsQueue; + + TAtomicShutdownState ShutdownState; + + TBusSessionStatus Status; + + TSessionDumpStatus StatusDumpCached; + TMutex StatusDumpCachedMutex; + TInstant StatusDumpCachedLastUpdate; + + TStatusData(); + }; + TStatusData StatusData; + + struct TConnectionsData { + TAtomicShutdownState ShutdownState; + + TConnectionsData(); + }; + TConnectionsData ConnectionsData; + + ::NActor::TQueueInActor<TBusSessionImpl, TRemoteConnectionWriterIncrementalStatus, + TStatusTag, TDeadConnectionTag>* + GetDeadConnectionWriterStatusQueue() { + return this; + } + + ::NActor::TQueueInActor<TBusSessionImpl, TRemoteConnectionReaderIncrementalStatus, + TStatusTag, TDeadConnectionTag>* + GetDeadConnectionReaderStatusQueue() { + return this; + } + + ::NActor::TQueueInActor<TBusSessionImpl, TAcceptorStatus, + TStatusTag, TDeadConnectionTag>* + GetDeadAcceptorStatusQueue() { + return this; + } + + template <typename TItem> + ::NActor::IQueueInActor<TItem>* GetQueue() { + return this; + } + + ui64 LastAcceptorId; + ui64 LastConnectionId; + + TAtomic Down; + TSystemEvent ShutdownCompleteEvent; + }; + + inline TBusProtocol* TBusSessionImpl::GetProto() const noexcept { + return Proto; + } + + inline const TBusSessionConfig* TBusSessionImpl::GetConfig() const noexcept { + return &Config; + } + + inline TBusMessageQueue* TBusSessionImpl::GetQueue() const noexcept { + return Queue; + } + + } +} |