aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/messagebus/remote_connection.h
diff options
context:
space:
mode:
authorAnton Samokhvalov <pg83@yandex.ru>2022-02-10 16:45:17 +0300
committerDaniil Cherednik <dcherednik@yandex-team.ru>2022-02-10 16:45:17 +0300
commitd3a398281c6fd1d3672036cb2d63f842d2cb28c5 (patch)
treedd4bd3ca0f36b817e96812825ffaf10d645803f2 /library/cpp/messagebus/remote_connection.h
parent72cb13b4aff9bc9cf22e49251bc8fd143f82538f (diff)
downloadydb-d3a398281c6fd1d3672036cb2d63f842d2cb28c5.tar.gz
Restoring authorship annotation for Anton Samokhvalov <pg83@yandex.ru>. Commit 2 of 2.
Diffstat (limited to 'library/cpp/messagebus/remote_connection.h')
-rw-r--r--library/cpp/messagebus/remote_connection.h484
1 files changed, 242 insertions, 242 deletions
diff --git a/library/cpp/messagebus/remote_connection.h b/library/cpp/messagebus/remote_connection.h
index 575fbff32e..4538947368 100644
--- a/library/cpp/messagebus/remote_connection.h
+++ b/library/cpp/messagebus/remote_connection.h
@@ -26,269 +26,269 @@
#include <util/system/event.h>
#include <util/thread/lfstack.h>
-namespace NBus {
- namespace NPrivate {
- class TRemoteConnection;
-
- typedef TIntrusivePtr<TRemoteConnection> TRemoteConnectionPtr;
- typedef TIntrusivePtr<TBusSessionImpl> TRemoteSessionPtr;
-
- static void* const WriteCookie = (void*)1;
- static void* const ReadCookie = (void*)2;
-
- enum {
- WAKE_QUOTA_MSG = 0x01,
- WAKE_QUOTA_BYTES = 0x02
- };
-
- struct TWriterTag {};
- struct TReaderTag {};
- struct TReconnectTag {};
- struct TWakeReaderTag {};
-
- struct TWriterToReaderSocketMessage {
- TSocket Socket;
- ui32 SocketVersion;
-
- TWriterToReaderSocketMessage(TSocket socket, ui32 socketVersion)
- : Socket(socket)
- , SocketVersion(socketVersion)
- {
- }
- };
-
- class TRemoteConnection
- : public NEventLoop::IEventHandler,
- public ::NActor::TActor<TRemoteConnection, TWriterTag>,
- public ::NActor::TActor<TRemoteConnection, TReaderTag>,
- private ::NActor::TQueueInActor<TRemoteConnection, TWriterToReaderSocketMessage, TReaderTag>,
- private ::NActor::TQueueInActor<TRemoteConnection, ui32, TWriterTag, TReconnectTag>,
- private ::NActor::TQueueInActor<TRemoteConnection, ui32, TWriterTag, TWakeReaderTag>,
- public TScheduleActor<TRemoteConnection, TWriterTag> {
- friend struct TBusSessionImpl;
- friend class TRemoteClientSession;
- friend class TRemoteServerSession;
- friend class ::NActor::TQueueInActor<TRemoteConnection, TWriterToReaderSocketMessage, TReaderTag>;
- friend class ::NActor::TQueueInActor<TRemoteConnection, ui32, TWriterTag, TReconnectTag>;
- friend class ::NActor::TQueueInActor<TRemoteConnection, ui32, TWriterTag, TWakeReaderTag>;
-
- protected:
- ::NActor::TQueueInActor<TRemoteConnection, TWriterToReaderSocketMessage, TReaderTag>* ReaderGetSocketQueue() {
- return this;
- }
-
- ::NActor::TQueueInActor<TRemoteConnection, ui32, TWriterTag, TReconnectTag>* WriterGetReconnectQueue() {
- return this;
- }
-
- ::NActor::TQueueInActor<TRemoteConnection, ui32, TWriterTag, TWakeReaderTag>* WriterGetWakeQueue() {
- return this;
- }
-
- protected:
- TRemoteConnection(TRemoteSessionPtr session, ui64 connectionId, TNetAddr addr);
- ~TRemoteConnection() override;
-
- virtual void ClearOutgoingQueue(TMessagesPtrs&, bool reconnect /* or shutdown */);
-
- public:
- void Send(TNonDestroyingAutoPtr<TBusMessage> msg);
- void Shutdown(EMessageStatus status);
-
- inline const TNetAddr& GetAddr() const noexcept;
-
- private:
- friend class TScheduleConnect;
- friend class TWorkIO;
-
- protected:
- static size_t MessageSize(TArrayRef<TBusMessagePtrAndHeader>);
- bool QuotaAcquire(size_t msg, size_t bytes);
- void QuotaConsume(size_t msg, size_t bytes);
- void QuotaReturnSelf(size_t items, size_t bytes);
- bool QuotaReturnValues(size_t items, size_t bytes);
-
- bool ReaderProcessBuffer();
- bool ReaderFillBuffer();
- void ReaderFlushMessages();
-
- void ReadQuotaWakeup();
- ui32 WriteWakeFlags() const;
-
- virtual bool NeedInterruptRead() {
- return false;
- }
-
- public:
- virtual void TryConnect();
- void ProcessItem(TReaderTag, ::NActor::TDefaultTag, TWriterToReaderSocketMessage);
- void ProcessItem(TWriterTag, TReconnectTag, ui32 socketVersion);
- void ProcessItem(TWriterTag, TWakeReaderTag, ui32 awakeFlags);
- void Act(TReaderTag);
- inline void WriterBeforeWriteErrorMessage(TBusMessage*, EMessageStatus);
- void ClearBeforeSendQueue(EMessageStatus reasonForQueues);
- void ClearReplyQueue(EMessageStatus reasonForQueues);
- inline void ProcessBeforeSendQueueMessage(TBusMessage*, TInstant now);
- void ProcessBeforeSendQueue(TInstant now);
- void WriterProcessStatusDown();
- void ReaderProcessStatusDown();
- void ProcessWriterDown();
- void DropEnqueuedData(EMessageStatus reason, EMessageStatus reasonForQueues);
- const TRemoteConnectionWriterStatus& WriterGetStatus();
- virtual void WriterFillStatus();
- void WriterFillInFlight();
- virtual void BeforeTryWrite();
- void Act(TWriterTag);
- void ScheduleRead();
- void ScheduleWrite();
- void ScheduleShutdownOnServerOrReconnectOnClient(EMessageStatus status, bool writer);
- void ScheduleShutdown(EMessageStatus status);
- void WriterFlushBuffer();
- void WriterFillBuffer();
- void ReaderSendStatus(TInstant now, bool force = false);
- const TRemoteConnectionReaderStatus& ReaderFillStatus();
- void WriterRotateCounters();
- void WriterSendStatus(TInstant now, bool force = false);
- void WriterSendStatusIfNecessary(TInstant now);
- void QuotaReturnAside(size_t items, size_t bytes);
- virtual void ReaderProcessMessageUnknownVersion(TArrayRef<const char> dataRef) = 0;
- bool MessageRead(TArrayRef<const char> dataRef, TInstant now);
- virtual void MessageSent(TArrayRef<TBusMessagePtrAndHeader> messages) = 0;
-
- void CallSerialize(TBusMessage* msg, TBuffer& buffer) const;
- void SerializeMessage(TBusMessage* msg, TBuffer* data, TMessageCounter* counter) const;
- TBusMessage* DeserializeMessage(TArrayRef<const char> dataRef, const TBusHeader* header, TMessageCounter* messageCounter, EMessageStatus* status) const;
-
- void ResetOneWayFlag(TArrayRef<TBusMessage*>);
-
- inline ::NActor::TActor<TRemoteConnection, TWriterTag>* GetWriterActor() {
- return this;
- }
- inline ::NActor::TActor<TRemoteConnection, TReaderTag>* GetReaderActor() {
- return this;
- }
- inline TScheduleActor<TRemoteConnection, TWriterTag>* GetWriterSchedulerActor() {
- return this;
- }
-
- void WriterErrorMessage(TNonDestroyingAutoPtr<TBusMessage> m, EMessageStatus status);
- // takes ownership of ms
- void WriterErrorMessages(const TArrayRef<TBusMessage*> ms, EMessageStatus status);
-
- void FireClientConnectionEvent(TClientConnectionEvent::EType);
-
- size_t GetInFlight();
- size_t GetConnectSyscallsNumForTest();
-
- bool IsReturnConnectFailedImmediately() {
- return (bool)AtomicGet(ReturnConnectFailedImmediately);
- }
-
- bool IsAlive() const;
-
- TRemoteSessionPtr Session;
- TBusProtocol* const Proto;
- TBusSessionConfig const Config;
- bool RemovedFromSession;
- const ui64 ConnectionId;
- const TNetAddr PeerAddr;
- const TBusSocketAddr PeerAddrSocketAddr;
-
- const TInstant CreatedTime;
- TInstant LastConnectAttempt;
- TAtomic ReturnConnectFailedImmediately;
-
- protected:
- ::NActor::TQueueForActor<TBusMessage*> BeforeSendQueue;
- TLockFreeStack<TBusHeader> WrongVersionRequests;
-
- struct TWriterData {
- TAtomic Down;
-
- NEventLoop::TChannelPtr Channel;
- ui32 SocketVersion;
-
- TRemoteConnectionWriterStatus Status;
- TInstant StatusLastSendTime;
-
- TLocalTasks TimeToRotateCounters;
-
- TAtomic InFlight;
-
- TTimedMessages SendQueue;
- ui32 AwakeFlags;
- EWriterState State;
- TLeftRightBuffer Buffer;
- TInstant CorkUntil;
+namespace NBus {
+ namespace NPrivate {
+ class TRemoteConnection;
+
+ typedef TIntrusivePtr<TRemoteConnection> TRemoteConnectionPtr;
+ typedef TIntrusivePtr<TBusSessionImpl> TRemoteSessionPtr;
+
+ static void* const WriteCookie = (void*)1;
+ static void* const ReadCookie = (void*)2;
+
+ enum {
+ WAKE_QUOTA_MSG = 0x01,
+ WAKE_QUOTA_BYTES = 0x02
+ };
+
+ struct TWriterTag {};
+ struct TReaderTag {};
+ struct TReconnectTag {};
+ struct TWakeReaderTag {};
+
+ struct TWriterToReaderSocketMessage {
+ TSocket Socket;
+ ui32 SocketVersion;
+
+ TWriterToReaderSocketMessage(TSocket socket, ui32 socketVersion)
+ : Socket(socket)
+ , SocketVersion(socketVersion)
+ {
+ }
+ };
+
+ class TRemoteConnection
+ : public NEventLoop::IEventHandler,
+ public ::NActor::TActor<TRemoteConnection, TWriterTag>,
+ public ::NActor::TActor<TRemoteConnection, TReaderTag>,
+ private ::NActor::TQueueInActor<TRemoteConnection, TWriterToReaderSocketMessage, TReaderTag>,
+ private ::NActor::TQueueInActor<TRemoteConnection, ui32, TWriterTag, TReconnectTag>,
+ private ::NActor::TQueueInActor<TRemoteConnection, ui32, TWriterTag, TWakeReaderTag>,
+ public TScheduleActor<TRemoteConnection, TWriterTag> {
+ friend struct TBusSessionImpl;
+ friend class TRemoteClientSession;
+ friend class TRemoteServerSession;
+ friend class ::NActor::TQueueInActor<TRemoteConnection, TWriterToReaderSocketMessage, TReaderTag>;
+ friend class ::NActor::TQueueInActor<TRemoteConnection, ui32, TWriterTag, TReconnectTag>;
+ friend class ::NActor::TQueueInActor<TRemoteConnection, ui32, TWriterTag, TWakeReaderTag>;
+
+ protected:
+ ::NActor::TQueueInActor<TRemoteConnection, TWriterToReaderSocketMessage, TReaderTag>* ReaderGetSocketQueue() {
+ return this;
+ }
+
+ ::NActor::TQueueInActor<TRemoteConnection, ui32, TWriterTag, TReconnectTag>* WriterGetReconnectQueue() {
+ return this;
+ }
+
+ ::NActor::TQueueInActor<TRemoteConnection, ui32, TWriterTag, TWakeReaderTag>* WriterGetWakeQueue() {
+ return this;
+ }
+
+ protected:
+ TRemoteConnection(TRemoteSessionPtr session, ui64 connectionId, TNetAddr addr);
+ ~TRemoteConnection() override;
+
+ virtual void ClearOutgoingQueue(TMessagesPtrs&, bool reconnect /* or shutdown */);
+
+ public:
+ void Send(TNonDestroyingAutoPtr<TBusMessage> msg);
+ void Shutdown(EMessageStatus status);
+
+ inline const TNetAddr& GetAddr() const noexcept;
+
+ private:
+ friend class TScheduleConnect;
+ friend class TWorkIO;
+
+ protected:
+ static size_t MessageSize(TArrayRef<TBusMessagePtrAndHeader>);
+ bool QuotaAcquire(size_t msg, size_t bytes);
+ void QuotaConsume(size_t msg, size_t bytes);
+ void QuotaReturnSelf(size_t items, size_t bytes);
+ bool QuotaReturnValues(size_t items, size_t bytes);
+
+ bool ReaderProcessBuffer();
+ bool ReaderFillBuffer();
+ void ReaderFlushMessages();
+
+ void ReadQuotaWakeup();
+ ui32 WriteWakeFlags() const;
+
+ virtual bool NeedInterruptRead() {
+ return false;
+ }
+
+ public:
+ virtual void TryConnect();
+ void ProcessItem(TReaderTag, ::NActor::TDefaultTag, TWriterToReaderSocketMessage);
+ void ProcessItem(TWriterTag, TReconnectTag, ui32 socketVersion);
+ void ProcessItem(TWriterTag, TWakeReaderTag, ui32 awakeFlags);
+ void Act(TReaderTag);
+ inline void WriterBeforeWriteErrorMessage(TBusMessage*, EMessageStatus);
+ void ClearBeforeSendQueue(EMessageStatus reasonForQueues);
+ void ClearReplyQueue(EMessageStatus reasonForQueues);
+ inline void ProcessBeforeSendQueueMessage(TBusMessage*, TInstant now);
+ void ProcessBeforeSendQueue(TInstant now);
+ void WriterProcessStatusDown();
+ void ReaderProcessStatusDown();
+ void ProcessWriterDown();
+ void DropEnqueuedData(EMessageStatus reason, EMessageStatus reasonForQueues);
+ const TRemoteConnectionWriterStatus& WriterGetStatus();
+ virtual void WriterFillStatus();
+ void WriterFillInFlight();
+ virtual void BeforeTryWrite();
+ void Act(TWriterTag);
+ void ScheduleRead();
+ void ScheduleWrite();
+ void ScheduleShutdownOnServerOrReconnectOnClient(EMessageStatus status, bool writer);
+ void ScheduleShutdown(EMessageStatus status);
+ void WriterFlushBuffer();
+ void WriterFillBuffer();
+ void ReaderSendStatus(TInstant now, bool force = false);
+ const TRemoteConnectionReaderStatus& ReaderFillStatus();
+ void WriterRotateCounters();
+ void WriterSendStatus(TInstant now, bool force = false);
+ void WriterSendStatusIfNecessary(TInstant now);
+ void QuotaReturnAside(size_t items, size_t bytes);
+ virtual void ReaderProcessMessageUnknownVersion(TArrayRef<const char> dataRef) = 0;
+ bool MessageRead(TArrayRef<const char> dataRef, TInstant now);
+ virtual void MessageSent(TArrayRef<TBusMessagePtrAndHeader> messages) = 0;
+
+ void CallSerialize(TBusMessage* msg, TBuffer& buffer) const;
+ void SerializeMessage(TBusMessage* msg, TBuffer* data, TMessageCounter* counter) const;
+ TBusMessage* DeserializeMessage(TArrayRef<const char> dataRef, const TBusHeader* header, TMessageCounter* messageCounter, EMessageStatus* status) const;
+
+ void ResetOneWayFlag(TArrayRef<TBusMessage*>);
+
+ inline ::NActor::TActor<TRemoteConnection, TWriterTag>* GetWriterActor() {
+ return this;
+ }
+ inline ::NActor::TActor<TRemoteConnection, TReaderTag>* GetReaderActor() {
+ return this;
+ }
+ inline TScheduleActor<TRemoteConnection, TWriterTag>* GetWriterSchedulerActor() {
+ return this;
+ }
+
+ void WriterErrorMessage(TNonDestroyingAutoPtr<TBusMessage> m, EMessageStatus status);
+ // takes ownership of ms
+ void WriterErrorMessages(const TArrayRef<TBusMessage*> ms, EMessageStatus status);
+
+ void FireClientConnectionEvent(TClientConnectionEvent::EType);
+
+ size_t GetInFlight();
+ size_t GetConnectSyscallsNumForTest();
+
+ bool IsReturnConnectFailedImmediately() {
+ return (bool)AtomicGet(ReturnConnectFailedImmediately);
+ }
+
+ bool IsAlive() const;
+
+ TRemoteSessionPtr Session;
+ TBusProtocol* const Proto;
+ TBusSessionConfig const Config;
+ bool RemovedFromSession;
+ const ui64 ConnectionId;
+ const TNetAddr PeerAddr;
+ const TBusSocketAddr PeerAddrSocketAddr;
+
+ const TInstant CreatedTime;
+ TInstant LastConnectAttempt;
+ TAtomic ReturnConnectFailedImmediately;
+
+ protected:
+ ::NActor::TQueueForActor<TBusMessage*> BeforeSendQueue;
+ TLockFreeStack<TBusHeader> WrongVersionRequests;
+
+ struct TWriterData {
+ TAtomic Down;
+
+ NEventLoop::TChannelPtr Channel;
+ ui32 SocketVersion;
+
+ TRemoteConnectionWriterStatus Status;
+ TInstant StatusLastSendTime;
+
+ TLocalTasks TimeToRotateCounters;
+
+ TAtomic InFlight;
+
+ TTimedMessages SendQueue;
+ ui32 AwakeFlags;
+ EWriterState State;
+ TLeftRightBuffer Buffer;
+ TInstant CorkUntil;
TSystemEvent ShutdownComplete;
- void SetChannel(NEventLoop::TChannelPtr channel);
- void DropChannel();
+ void SetChannel(NEventLoop::TChannelPtr channel);
+ void DropChannel();
- TWriterData();
- ~TWriterData();
- };
+ TWriterData();
+ ~TWriterData();
+ };
- struct TReaderData {
- TAtomic Down;
+ struct TReaderData {
+ TAtomic Down;
- NEventLoop::TChannelPtr Channel;
- ui32 SocketVersion;
+ NEventLoop::TChannelPtr Channel;
+ ui32 SocketVersion;
- TRemoteConnectionReaderStatus Status;
- TInstant StatusLastSendTime;
+ TRemoteConnectionReaderStatus Status;
+ TInstant StatusLastSendTime;
- TBuffer Buffer;
- size_t Offset; /* offset in read buffer */
- size_t MoreBytes; /* more bytes required from socket */
- TVectorSwaps<TBusMessagePtrAndHeader> ReadMessages;
+ TBuffer Buffer;
+ size_t Offset; /* offset in read buffer */
+ size_t MoreBytes; /* more bytes required from socket */
+ TVectorSwaps<TBusMessagePtrAndHeader> ReadMessages;
TSystemEvent ShutdownComplete;
- bool BufferMore() const noexcept {
- return MoreBytes > 0;
- }
+ bool BufferMore() const noexcept {
+ return MoreBytes > 0;
+ }
- bool HasBytesInBuf(size_t bytes) noexcept;
- void SetChannel(NEventLoop::TChannelPtr channel);
- void DropChannel();
+ bool HasBytesInBuf(size_t bytes) noexcept;
+ void SetChannel(NEventLoop::TChannelPtr channel);
+ void DropChannel();
- TReaderData();
- ~TReaderData();
- };
+ TReaderData();
+ ~TReaderData();
+ };
- // owned by session status actor
- struct TGranStatus {
- TGranStatus(TDuration gran)
- : Writer(gran)
- , Reader(gran)
- {
- }
+ // owned by session status actor
+ struct TGranStatus {
+ TGranStatus(TDuration gran)
+ : Writer(gran)
+ , Reader(gran)
+ {
+ }
- TGranUp<TRemoteConnectionWriterStatus> Writer;
- TGranUp<TRemoteConnectionReaderStatus> Reader;
- };
+ TGranUp<TRemoteConnectionWriterStatus> Writer;
+ TGranUp<TRemoteConnectionReaderStatus> Reader;
+ };
- TWriterData WriterData;
- TReaderData ReaderData;
- TGranStatus GranStatus;
- TTokenQuota QuotaMsg;
- TTokenQuota QuotaBytes;
+ TWriterData WriterData;
+ TReaderData ReaderData;
+ TGranStatus GranStatus;
+ TTokenQuota QuotaMsg;
+ TTokenQuota QuotaBytes;
- size_t MaxBufferSize;
+ size_t MaxBufferSize;
- // client connection only
- TLockFreeQueueBatch<TBusMessagePtrAndHeader, TVectorSwaps> ReplyQueue;
+ // client connection only
+ TLockFreeQueueBatch<TBusMessagePtrAndHeader, TVectorSwaps> ReplyQueue;
- EMessageStatus ShutdownReason;
+ EMessageStatus ShutdownReason;
};
- inline const TNetAddr& TRemoteConnection::GetAddr() const noexcept {
- return PeerAddr;
- }
+ inline const TNetAddr& TRemoteConnection::GetAddr() const noexcept {
+ return PeerAddr;
+ }
- typedef TIntrusivePtr<TRemoteConnection> TRemoteConnectionPtr;
+ typedef TIntrusivePtr<TRemoteConnection> TRemoteConnectionPtr;
}
-}
+}