aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/messagebus/session_impl.h
diff options
context:
space:
mode:
authornga <nga@yandex-team.ru>2022-02-10 16:48:09 +0300
committerDaniil Cherednik <dcherednik@yandex-team.ru>2022-02-10 16:48:09 +0300
commitc2a1af049e9deca890e9923abe64fe6c59060348 (patch)
treeb222e5ac2e2e98872661c51ccceee5da0d291e13 /library/cpp/messagebus/session_impl.h
parent1f553f46fb4f3c5eec631352cdd900a0709016af (diff)
downloadydb-c2a1af049e9deca890e9923abe64fe6c59060348.tar.gz
Restoring authorship annotation for <nga@yandex-team.ru>. Commit 2 of 2.
Diffstat (limited to 'library/cpp/messagebus/session_impl.h')
-rw-r--r--library/cpp/messagebus/session_impl.h98
1 files changed, 49 insertions, 49 deletions
diff --git a/library/cpp/messagebus/session_impl.h b/library/cpp/messagebus/session_impl.h
index 0c6fec1af4..90ef246ff8 100644
--- a/library/cpp/messagebus/session_impl.h
+++ b/library/cpp/messagebus/session_impl.h
@@ -3,17 +3,17 @@
#include "acceptor_status.h"
#include "async_result.h"
#include "event_loop.h"
-#include "netaddr.h"
+#include "netaddr.h"
#include "remote_connection.h"
-#include "remote_connection_status.h"
-#include "session_job_count.h"
-#include "shutdown_state.h"
+#include "remote_connection_status.h"
+#include "session_job_count.h"
+#include "shutdown_state.h"
#include "ybus.h"
#include <library/cpp/messagebus/actor/actor.h>
#include <library/cpp/messagebus/actor/queue_in_actor.h>
#include <library/cpp/messagebus/monitoring/mon_proto.pb.h>
-
+
#include <library/cpp/threading/future/legacy_future.h>
#include <util/generic/array_ref.h>
@@ -23,12 +23,12 @@ namespace NBus {
namespace NPrivate {
typedef TIntrusivePtr<TRemoteClientConnection> TRemoteClientConnectionPtr;
typedef TIntrusivePtr<TRemoteServerConnection> TRemoteServerConnectionPtr;
-
+
typedef TIntrusivePtr<TRemoteServerSession> TRemoteServerSessionPtr;
typedef TIntrusivePtr<TAcceptor> TAcceptorPtr;
typedef TVector<TAcceptorPtr> TAcceptorsPtrs;
-
+
struct TConnectionsAcceptorsSnapshot {
TVector<TRemoteConnectionPtr> Connections;
TVector<TAcceptorPtr> Acceptors;
@@ -37,31 +37,31 @@ namespace NBus {
TConnectionsAcceptorsSnapshot();
};
-
+
typedef TAtomicSharedPtr<TConnectionsAcceptorsSnapshot> TConnectionsAcceptorsSnapshotPtr;
-
+
struct TOnAccept {
SOCKET s;
TNetAddr addr;
TInstant now;
};
-
+
struct TStatusTag {};
struct TConnectionTag {};
-
+
struct TDeadConnectionTag {};
struct TRemoveTag {};
-
+
struct TBusSessionImpl
: public virtual TBusSession,
private ::NActor::TActor<TBusSessionImpl, TStatusTag>,
private ::NActor::TActor<TBusSessionImpl, TConnectionTag>
-
+
,
private ::NActor::TQueueInActor<TBusSessionImpl, TRemoteConnectionWriterIncrementalStatus, TStatusTag, TDeadConnectionTag>,
private ::NActor::TQueueInActor<TBusSessionImpl, TRemoteConnectionReaderIncrementalStatus, TStatusTag, TDeadConnectionTag>,
private ::NActor::TQueueInActor<TBusSessionImpl, TAcceptorStatus, TStatusTag, TDeadConnectionTag>
-
+
,
private ::NActor::TQueueInActor<TBusSessionImpl, TOnAccept, TConnectionTag>,
private ::NActor::TQueueInActor<TBusSessionImpl, TRemoteConnectionPtr, TConnectionTag, TRemoveTag> {
@@ -75,43 +75,43 @@ namespace NBus {
friend class ::NActor::TQueueInActor<TBusSessionImpl, TAcceptorStatus, TStatusTag, TDeadConnectionTag>;
friend class ::NActor::TQueueInActor<TBusSessionImpl, TOnAccept, TConnectionTag>;
friend class ::NActor::TQueueInActor<TBusSessionImpl, TRemoteConnectionPtr, TConnectionTag, TRemoveTag>;
-
+
public:
::NActor::TQueueInActor<TBusSessionImpl, TOnAccept, TConnectionTag>* GetOnAcceptQueue() {
return this;
}
-
+
::NActor::TQueueInActor<TBusSessionImpl, TRemoteConnectionPtr, TConnectionTag, TRemoveTag>* GetRemoveConnectionQueue() {
return this;
}
-
+
::NActor::TActor<TBusSessionImpl, TConnectionTag>* GetConnectionActor() {
return this;
}
-
+
typedef TGuard<TMutex> TConnectionsGuard;
-
+
TBusSessionImpl(bool isSource, TBusMessageQueue* queue, TBusProtocol* proto,
IBusErrorHandler* handler,
const TBusSessionConfig& config, const TString& name);
-
+
~TBusSessionImpl() override;
void Shutdown() override;
bool IsDown();
-
+
size_t GetInFlightImpl(const TNetAddr& addr) const;
size_t GetConnectSyscallsNumForTestImpl(const TNetAddr& addr) const;
void GetInFlightBulk(TArrayRef<const TNetAddr> addrs, TArrayRef<size_t> results) const override;
void GetConnectSyscallsNumBulkForTest(TArrayRef<const TNetAddr> addrs, TArrayRef<size_t> results) const override;
-
+
virtual void FillStatus();
TSessionDumpStatus GetStatusRecordInternal() override;
TString GetStatus(ui16 flags = YBUS_STATUS_CONNS) override;
TConnectionStatusMonRecord GetStatusProtobuf() override;
TString GetStatusSingleLine() override;
-
+
void ProcessItem(TStatusTag, TDeadConnectionTag, const TRemoteConnectionWriterIncrementalStatus&);
void ProcessItem(TStatusTag, TDeadConnectionTag, const TRemoteConnectionReaderIncrementalStatus&);
void ProcessItem(TStatusTag, TDeadConnectionTag, const TAcceptorStatus&);
@@ -128,7 +128,7 @@ namespace NBus {
const TBusSessionConfig* GetConfig() const noexcept override;
TBusMessageQueue* GetQueue() const noexcept override;
TString GetNameInternal() override;
-
+
virtual void OnMessageReceived(TRemoteConnection* c, TVectorSwaps<TBusMessagePtrAndHeader>& newMsg) = 0;
void Listen(int port, TBusMessageQueue* q);
@@ -143,106 +143,106 @@ namespace NBus {
}
typedef THashMap<TBusSocketAddr, TRemoteConnectionPtr> TAddrRemoteConnections;
-
+
void SendSnapshotToStatusActor();
void InsertConnectionLockAcquired(TRemoteConnection* connection);
void InsertAcceptorLockAcquired(TAcceptor* acceptor);
-
+
void GetConnections(TVector<TRemoteConnectionPtr>*);
void GetAcceptors(TVector<TAcceptorPtr>*);
void GetConnectionsLockAquired(TVector<TRemoteConnectionPtr>*);
void GetAcceptorsLockAquired(TVector<TAcceptorPtr>*);
-
+
TRemoteConnectionPtr GetConnection(const TBusSocketAddr& addr, bool create);
TRemoteConnectionPtr GetConnectionById(ui64 id);
TAcceptorPtr GetAcceptorById(ui64 id);
-
+
void InvokeOnError(TNonDestroyingAutoPtr<TBusMessage>, EMessageStatus);
void Cron();
-
+
TBusSessionJobCount JobCount;
-
+
// TODO: replace with actor
TMutex ConnectionsLock;
-
+
struct TImpl;
THolder<TImpl> Impl;
-
+
const bool IsSource_;
-
+
TBusMessageQueue* const Queue;
TBusProtocol* const Proto;
// copied to be available after Proto dies
const TString ProtoName;
-
+
IBusErrorHandler* const ErrorHandler;
TUseCountHolder HandlerUseCountHolder;
TBusSessionConfig Config; // TODO: make const
-
+
NEventLoop::TEventLoop WriteEventLoop;
NEventLoop::TEventLoop ReadEventLoop;
THolder<NThreading::TLegacyFuture<void, false>> ReadEventLoopThread;
THolder<NThreading::TLegacyFuture<void, false>> WriteEventLoopThread;
-
+
THashMap<ui64, TRemoteConnectionPtr> ConnectionsById;
TAddrRemoteConnections Connections;
TAcceptorsPtrs Acceptors;
-
+
struct TStatusData {
TAtomicSharedPtr<TConnectionsAcceptorsSnapshot> ConnectionsAcceptorsSnapshot;
::NActor::TQueueForActor<TAtomicSharedPtr<TConnectionsAcceptorsSnapshot>> ConnectionsAcceptorsSnapshotsQueue;
TAtomicShutdownState ShutdownState;
-
+
TBusSessionStatus Status;
-
+
TSessionDumpStatus StatusDumpCached;
TMutex StatusDumpCachedMutex;
TInstant StatusDumpCachedLastUpdate;
-
+
TStatusData();
};
TStatusData StatusData;
-
+
struct TConnectionsData {
TAtomicShutdownState ShutdownState;
-
+
TConnectionsData();
};
TConnectionsData ConnectionsData;
-
+
::NActor::TQueueInActor<TBusSessionImpl, TRemoteConnectionWriterIncrementalStatus,
TStatusTag, TDeadConnectionTag>*
GetDeadConnectionWriterStatusQueue() {
return this;
}
-
+
::NActor::TQueueInActor<TBusSessionImpl, TRemoteConnectionReaderIncrementalStatus,
TStatusTag, TDeadConnectionTag>*
GetDeadConnectionReaderStatusQueue() {
return this;
}
-
+
::NActor::TQueueInActor<TBusSessionImpl, TAcceptorStatus,
TStatusTag, TDeadConnectionTag>*
GetDeadAcceptorStatusQueue() {
return this;
}
-
+
template <typename TItem>
::NActor::IQueueInActor<TItem>* GetQueue() {
return this;
}
-
+
ui64 LastAcceptorId;
ui64 LastConnectionId;
-
+
TAtomic Down;
TSystemEvent ShutdownCompleteEvent;
};
-
+
inline TBusProtocol* TBusSessionImpl::GetProto() const noexcept {
return Proto;
}