#pragma once #include "acceptor_status.h" #include "async_result.h" #include "event_loop.h" #include "netaddr.h" #include "remote_connection.h" #include "remote_connection_status.h" #include "session_job_count.h" #include "shutdown_state.h" #include "ybus.h" #include <library/cpp/messagebus/actor/actor.h> #include <library/cpp/messagebus/actor/queue_in_actor.h> #include <library/cpp/messagebus/monitoring/mon_proto.pb.h> #include <library/cpp/threading/future/legacy_future.h> #include <util/generic/array_ref.h> #include <util/generic/string.h> namespace NBus { namespace NPrivate { typedef TIntrusivePtr<TRemoteClientConnection> TRemoteClientConnectionPtr; typedef TIntrusivePtr<TRemoteServerConnection> TRemoteServerConnectionPtr; typedef TIntrusivePtr<TRemoteServerSession> TRemoteServerSessionPtr; typedef TIntrusivePtr<TAcceptor> TAcceptorPtr; typedef TVector<TAcceptorPtr> TAcceptorsPtrs; struct TConnectionsAcceptorsSnapshot { TVector<TRemoteConnectionPtr> Connections; TVector<TAcceptorPtr> Acceptors; ui64 LastConnectionId; ui64 LastAcceptorId; TConnectionsAcceptorsSnapshot(); }; typedef TAtomicSharedPtr<TConnectionsAcceptorsSnapshot> TConnectionsAcceptorsSnapshotPtr; struct TOnAccept { SOCKET s; TNetAddr addr; TInstant now; }; struct TStatusTag {}; struct TConnectionTag {}; struct TDeadConnectionTag {}; struct TRemoveTag {}; struct TBusSessionImpl : public virtual TBusSession, private ::NActor::TActor<TBusSessionImpl, TStatusTag>, private ::NActor::TActor<TBusSessionImpl, TConnectionTag> , private ::NActor::TQueueInActor<TBusSessionImpl, TRemoteConnectionWriterIncrementalStatus, TStatusTag, TDeadConnectionTag>, private ::NActor::TQueueInActor<TBusSessionImpl, TRemoteConnectionReaderIncrementalStatus, TStatusTag, TDeadConnectionTag>, private ::NActor::TQueueInActor<TBusSessionImpl, TAcceptorStatus, TStatusTag, TDeadConnectionTag> , private ::NActor::TQueueInActor<TBusSessionImpl, TOnAccept, TConnectionTag>, private ::NActor::TQueueInActor<TBusSessionImpl, TRemoteConnectionPtr, TConnectionTag, TRemoveTag> { friend class TAcceptor; friend class TRemoteConnection; friend class TRemoteServerConnection; friend class ::NActor::TActor<TBusSessionImpl, TStatusTag>; friend class ::NActor::TActor<TBusSessionImpl, TConnectionTag>; friend class ::NActor::TQueueInActor<TBusSessionImpl, TRemoteConnectionWriterIncrementalStatus, TStatusTag, TDeadConnectionTag>; friend class ::NActor::TQueueInActor<TBusSessionImpl, TRemoteConnectionReaderIncrementalStatus, TStatusTag, TDeadConnectionTag>; friend class ::NActor::TQueueInActor<TBusSessionImpl, TAcceptorStatus, TStatusTag, TDeadConnectionTag>; friend class ::NActor::TQueueInActor<TBusSessionImpl, TOnAccept, TConnectionTag>; friend class ::NActor::TQueueInActor<TBusSessionImpl, TRemoteConnectionPtr, TConnectionTag, TRemoveTag>; public: ::NActor::TQueueInActor<TBusSessionImpl, TOnAccept, TConnectionTag>* GetOnAcceptQueue() { return this; } ::NActor::TQueueInActor<TBusSessionImpl, TRemoteConnectionPtr, TConnectionTag, TRemoveTag>* GetRemoveConnectionQueue() { return this; } ::NActor::TActor<TBusSessionImpl, TConnectionTag>* GetConnectionActor() { return this; } typedef TGuard<TMutex> TConnectionsGuard; TBusSessionImpl(bool isSource, TBusMessageQueue* queue, TBusProtocol* proto, IBusErrorHandler* handler, const TBusSessionConfig& config, const TString& name); ~TBusSessionImpl() override; void Shutdown() override; bool IsDown(); size_t GetInFlightImpl(const TNetAddr& addr) const; size_t GetConnectSyscallsNumForTestImpl(const TNetAddr& addr) const; void GetInFlightBulk(TArrayRef<const TNetAddr> addrs, TArrayRef<size_t> results) const override; void GetConnectSyscallsNumBulkForTest(TArrayRef<const TNetAddr> addrs, TArrayRef<size_t> results) const override; virtual void FillStatus(); TSessionDumpStatus GetStatusRecordInternal() override; TString GetStatus(ui16 flags = YBUS_STATUS_CONNS) override; TConnectionStatusMonRecord GetStatusProtobuf() override; TString GetStatusSingleLine() override; void ProcessItem(TStatusTag, TDeadConnectionTag, const TRemoteConnectionWriterIncrementalStatus&); void ProcessItem(TStatusTag, TDeadConnectionTag, const TRemoteConnectionReaderIncrementalStatus&); void ProcessItem(TStatusTag, TDeadConnectionTag, const TAcceptorStatus&); void ProcessItem(TStatusTag, ::NActor::TDefaultTag, const TAcceptorStatus&); void ProcessItem(TConnectionTag, ::NActor::TDefaultTag, const TOnAccept&); void ProcessItem(TConnectionTag, TRemoveTag, TRemoteConnectionPtr); void ProcessConnectionsAcceptorsShapshotQueueItem(TAtomicSharedPtr<TConnectionsAcceptorsSnapshot>); void StatusUpdateCachedDump(); void StatusUpdateCachedDumpIfNecessary(TInstant now); void Act(TStatusTag); void Act(TConnectionTag); TBusProtocol* GetProto() const noexcept override; const TBusSessionConfig* GetConfig() const noexcept override; TBusMessageQueue* GetQueue() const noexcept override; TString GetNameInternal() override; virtual void OnMessageReceived(TRemoteConnection* c, TVectorSwaps<TBusMessagePtrAndHeader>& newMsg) = 0; void Listen(int port, TBusMessageQueue* q); void Listen(const TVector<TBindResult>& bindTo, TBusMessageQueue* q); TBusConnection* Accept(SOCKET listen); inline ::NActor::TActor<TBusSessionImpl, TStatusTag>* GetStatusActor() { return this; } inline ::NActor::TActor<TBusSessionImpl, TConnectionTag>* GetConnectionsActor() { return this; } typedef THashMap<TBusSocketAddr, TRemoteConnectionPtr> TAddrRemoteConnections; void SendSnapshotToStatusActor(); void InsertConnectionLockAcquired(TRemoteConnection* connection); void InsertAcceptorLockAcquired(TAcceptor* acceptor); void GetConnections(TVector<TRemoteConnectionPtr>*); void GetAcceptors(TVector<TAcceptorPtr>*); void GetConnectionsLockAquired(TVector<TRemoteConnectionPtr>*); void GetAcceptorsLockAquired(TVector<TAcceptorPtr>*); TRemoteConnectionPtr GetConnection(const TBusSocketAddr& addr, bool create); TRemoteConnectionPtr GetConnectionById(ui64 id); TAcceptorPtr GetAcceptorById(ui64 id); void InvokeOnError(TNonDestroyingAutoPtr<TBusMessage>, EMessageStatus); void Cron(); TBusSessionJobCount JobCount; // TODO: replace with actor TMutex ConnectionsLock; struct TImpl; THolder<TImpl> Impl; const bool IsSource_; TBusMessageQueue* const Queue; TBusProtocol* const Proto; // copied to be available after Proto dies const TString ProtoName; IBusErrorHandler* const ErrorHandler; TUseCountHolder HandlerUseCountHolder; TBusSessionConfig Config; // TODO: make const NEventLoop::TEventLoop WriteEventLoop; NEventLoop::TEventLoop ReadEventLoop; THolder<NThreading::TLegacyFuture<void, false>> ReadEventLoopThread; THolder<NThreading::TLegacyFuture<void, false>> WriteEventLoopThread; THashMap<ui64, TRemoteConnectionPtr> ConnectionsById; TAddrRemoteConnections Connections; TAcceptorsPtrs Acceptors; struct TStatusData { TAtomicSharedPtr<TConnectionsAcceptorsSnapshot> ConnectionsAcceptorsSnapshot; ::NActor::TQueueForActor<TAtomicSharedPtr<TConnectionsAcceptorsSnapshot>> ConnectionsAcceptorsSnapshotsQueue; TAtomicShutdownState ShutdownState; TBusSessionStatus Status; TSessionDumpStatus StatusDumpCached; TMutex StatusDumpCachedMutex; TInstant StatusDumpCachedLastUpdate; TStatusData(); }; TStatusData StatusData; struct TConnectionsData { TAtomicShutdownState ShutdownState; TConnectionsData(); }; TConnectionsData ConnectionsData; ::NActor::TQueueInActor<TBusSessionImpl, TRemoteConnectionWriterIncrementalStatus, TStatusTag, TDeadConnectionTag>* GetDeadConnectionWriterStatusQueue() { return this; } ::NActor::TQueueInActor<TBusSessionImpl, TRemoteConnectionReaderIncrementalStatus, TStatusTag, TDeadConnectionTag>* GetDeadConnectionReaderStatusQueue() { return this; } ::NActor::TQueueInActor<TBusSessionImpl, TAcceptorStatus, TStatusTag, TDeadConnectionTag>* GetDeadAcceptorStatusQueue() { return this; } template <typename TItem> ::NActor::IQueueInActor<TItem>* GetQueue() { return this; } ui64 LastAcceptorId; ui64 LastConnectionId; TAtomic Down; TSystemEvent ShutdownCompleteEvent; }; inline TBusProtocol* TBusSessionImpl::GetProto() const noexcept { return Proto; } inline const TBusSessionConfig* TBusSessionImpl::GetConfig() const noexcept { return &Config; } inline TBusMessageQueue* TBusSessionImpl::GetQueue() const noexcept { return Queue; } } }