diff options
author | nga <nga@yandex-team.ru> | 2022-02-10 16:48:09 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:48:09 +0300 |
commit | 1f553f46fb4f3c5eec631352cdd900a0709016af (patch) | |
tree | a231fba2c03b440becaea6c86a2702d0bfb0336e /library/cpp/messagebus/remote_connection.h | |
parent | c4de7efdedc25b49cbea74bd589eecb61b55b60a (diff) | |
download | ydb-1f553f46fb4f3c5eec631352cdd900a0709016af.tar.gz |
Restoring authorship annotation for <nga@yandex-team.ru>. Commit 1 of 2.
Diffstat (limited to 'library/cpp/messagebus/remote_connection.h')
-rw-r--r-- | library/cpp/messagebus/remote_connection.h | 94 |
1 files changed, 47 insertions, 47 deletions
diff --git a/library/cpp/messagebus/remote_connection.h b/library/cpp/messagebus/remote_connection.h index 4538947368..5481b2c6ed 100644 --- a/library/cpp/messagebus/remote_connection.h +++ b/library/cpp/messagebus/remote_connection.h @@ -1,17 +1,17 @@ #pragma once -#include "async_result.h" +#include "async_result.h" #include "defs.h" #include "event_loop.h" #include "left_right_buffer.h" #include "lfqueue_batch.h" #include "message_ptr_and_header.h" -#include "nondestroying_holder.h" -#include "remote_connection_status.h" -#include "scheduler_actor.h" -#include "socket_addr.h" +#include "nondestroying_holder.h" +#include "remote_connection_status.h" +#include "scheduler_actor.h" +#include "socket_addr.h" #include "storage.h" -#include "vector_swaps.h" +#include "vector_swaps.h" #include "ybus.h" #include "misc/granup.h" #include "misc/tokenquota.h" @@ -29,23 +29,23 @@ namespace NBus { namespace NPrivate { class TRemoteConnection; - + typedef TIntrusivePtr<TRemoteConnection> TRemoteConnectionPtr; typedef TIntrusivePtr<TBusSessionImpl> TRemoteSessionPtr; - + static void* const WriteCookie = (void*)1; static void* const ReadCookie = (void*)2; - + enum { WAKE_QUOTA_MSG = 0x01, WAKE_QUOTA_BYTES = 0x02 }; - + struct TWriterTag {}; struct TReaderTag {}; struct TReconnectTag {}; struct TWakeReaderTag {}; - + struct TWriterToReaderSocketMessage { TSocket Socket; ui32 SocketVersion; @@ -56,7 +56,7 @@ namespace NBus { { } }; - + class TRemoteConnection : public NEventLoop::IEventHandler, public ::NActor::TActor<TRemoteConnection, TWriterTag>, @@ -71,43 +71,43 @@ namespace NBus { friend class ::NActor::TQueueInActor<TRemoteConnection, TWriterToReaderSocketMessage, TReaderTag>; friend class ::NActor::TQueueInActor<TRemoteConnection, ui32, TWriterTag, TReconnectTag>; friend class ::NActor::TQueueInActor<TRemoteConnection, ui32, TWriterTag, TWakeReaderTag>; - + protected: ::NActor::TQueueInActor<TRemoteConnection, TWriterToReaderSocketMessage, TReaderTag>* ReaderGetSocketQueue() { return this; } - + ::NActor::TQueueInActor<TRemoteConnection, ui32, TWriterTag, TReconnectTag>* WriterGetReconnectQueue() { return this; } - + ::NActor::TQueueInActor<TRemoteConnection, ui32, TWriterTag, TWakeReaderTag>* WriterGetWakeQueue() { return this; } - + protected: TRemoteConnection(TRemoteSessionPtr session, ui64 connectionId, TNetAddr addr); ~TRemoteConnection() override; - + virtual void ClearOutgoingQueue(TMessagesPtrs&, bool reconnect /* or shutdown */); - + public: void Send(TNonDestroyingAutoPtr<TBusMessage> msg); void Shutdown(EMessageStatus status); - + inline const TNetAddr& GetAddr() const noexcept; private: friend class TScheduleConnect; friend class TWorkIO; - + protected: static size_t MessageSize(TArrayRef<TBusMessagePtrAndHeader>); bool QuotaAcquire(size_t msg, size_t bytes); void QuotaConsume(size_t msg, size_t bytes); void QuotaReturnSelf(size_t items, size_t bytes); bool QuotaReturnValues(size_t items, size_t bytes); - + bool ReaderProcessBuffer(); bool ReaderFillBuffer(); void ReaderFlushMessages(); @@ -170,22 +170,22 @@ namespace NBus { inline TScheduleActor<TRemoteConnection, TWriterTag>* GetWriterSchedulerActor() { return this; } - + void WriterErrorMessage(TNonDestroyingAutoPtr<TBusMessage> m, EMessageStatus status); // takes ownership of ms void WriterErrorMessages(const TArrayRef<TBusMessage*> ms, EMessageStatus status); void FireClientConnectionEvent(TClientConnectionEvent::EType); - + size_t GetInFlight(); size_t GetConnectSyscallsNumForTest(); - + bool IsReturnConnectFailedImmediately() { return (bool)AtomicGet(ReturnConnectFailedImmediately); } - + bool IsAlive() const; - + TRemoteSessionPtr Session; TBusProtocol* const Proto; TBusSessionConfig const Config; @@ -193,15 +193,15 @@ namespace NBus { const ui64 ConnectionId; const TNetAddr PeerAddr; const TBusSocketAddr PeerAddrSocketAddr; - + const TInstant CreatedTime; TInstant LastConnectAttempt; TAtomic ReturnConnectFailedImmediately; - + protected: ::NActor::TQueueForActor<TBusMessage*> BeforeSendQueue; TLockFreeStack<TBusHeader> WrongVersionRequests; - + struct TWriterData { TAtomic Down; @@ -212,7 +212,7 @@ namespace NBus { TInstant StatusLastSendTime; TLocalTasks TimeToRotateCounters; - + TAtomic InFlight; TTimedMessages SendQueue; @@ -220,44 +220,44 @@ namespace NBus { EWriterState State; TLeftRightBuffer Buffer; TInstant CorkUntil; - + TSystemEvent ShutdownComplete; - + void SetChannel(NEventLoop::TChannelPtr channel); void DropChannel(); - + TWriterData(); ~TWriterData(); }; - + struct TReaderData { TAtomic Down; - + NEventLoop::TChannelPtr Channel; ui32 SocketVersion; - + TRemoteConnectionReaderStatus Status; TInstant StatusLastSendTime; - + TBuffer Buffer; size_t Offset; /* offset in read buffer */ size_t MoreBytes; /* more bytes required from socket */ TVectorSwaps<TBusMessagePtrAndHeader> ReadMessages; - + TSystemEvent ShutdownComplete; - + bool BufferMore() const noexcept { return MoreBytes > 0; } - + bool HasBytesInBuf(size_t bytes) noexcept; void SetChannel(NEventLoop::TChannelPtr channel); void DropChannel(); - + TReaderData(); ~TReaderData(); }; - + // owned by session status actor struct TGranStatus { TGranStatus(TDuration gran) @@ -265,7 +265,7 @@ namespace NBus { , Reader(gran) { } - + TGranUp<TRemoteConnectionWriterStatus> Writer; TGranUp<TRemoteConnectionReaderStatus> Reader; }; @@ -275,15 +275,15 @@ namespace NBus { TGranStatus GranStatus; TTokenQuota QuotaMsg; TTokenQuota QuotaBytes; - + size_t MaxBufferSize; - + // client connection only TLockFreeQueueBatch<TBusMessagePtrAndHeader, TVectorSwaps> ReplyQueue; EMessageStatus ShutdownReason; - }; - + }; + inline const TNetAddr& TRemoteConnection::GetAddr() const noexcept { return PeerAddr; } |