diff options
author | Devtools Arcadia <arcadia-devtools@yandex-team.ru> | 2022-02-07 18:08:42 +0300 |
---|---|---|
committer | Devtools Arcadia <arcadia-devtools@mous.vla.yp-c.yandex.net> | 2022-02-07 18:08:42 +0300 |
commit | 1110808a9d39d4b808aef724c861a2e1a38d2a69 (patch) | |
tree | e26c9fed0de5d9873cce7e00bc214573dc2195b7 /library/cpp/messagebus/remote_connection.h | |
download | ydb-1110808a9d39d4b808aef724c861a2e1a38d2a69.tar.gz |
intermediate changes
ref:cde9a383711a11544ce7e107a78147fb96cc4029
Diffstat (limited to 'library/cpp/messagebus/remote_connection.h')
-rw-r--r-- | library/cpp/messagebus/remote_connection.h | 294 |
1 files changed, 294 insertions, 0 deletions
diff --git a/library/cpp/messagebus/remote_connection.h b/library/cpp/messagebus/remote_connection.h new file mode 100644 index 0000000000..4538947368 --- /dev/null +++ b/library/cpp/messagebus/remote_connection.h @@ -0,0 +1,294 @@ +#pragma once + +#include "async_result.h" +#include "defs.h" +#include "event_loop.h" +#include "left_right_buffer.h" +#include "lfqueue_batch.h" +#include "message_ptr_and_header.h" +#include "nondestroying_holder.h" +#include "remote_connection_status.h" +#include "scheduler_actor.h" +#include "socket_addr.h" +#include "storage.h" +#include "vector_swaps.h" +#include "ybus.h" +#include "misc/granup.h" +#include "misc/tokenquota.h" + +#include <library/cpp/messagebus/actor/actor.h> +#include <library/cpp/messagebus/actor/executor.h> +#include <library/cpp/messagebus/actor/queue_for_actor.h> +#include <library/cpp/messagebus/actor/queue_in_actor.h> +#include <library/cpp/messagebus/scheduler/scheduler.h> + +#include <util/system/atomic.h> +#include <util/system/event.h> +#include <util/thread/lfstack.h> + +namespace NBus { + namespace NPrivate { + class TRemoteConnection; + + typedef TIntrusivePtr<TRemoteConnection> TRemoteConnectionPtr; + typedef TIntrusivePtr<TBusSessionImpl> TRemoteSessionPtr; + + static void* const WriteCookie = (void*)1; + static void* const ReadCookie = (void*)2; + + enum { + WAKE_QUOTA_MSG = 0x01, + WAKE_QUOTA_BYTES = 0x02 + }; + + struct TWriterTag {}; + struct TReaderTag {}; + struct TReconnectTag {}; + struct TWakeReaderTag {}; + + struct TWriterToReaderSocketMessage { + TSocket Socket; + ui32 SocketVersion; + + TWriterToReaderSocketMessage(TSocket socket, ui32 socketVersion) + : Socket(socket) + , SocketVersion(socketVersion) + { + } + }; + + class TRemoteConnection + : public NEventLoop::IEventHandler, + public ::NActor::TActor<TRemoteConnection, TWriterTag>, + public ::NActor::TActor<TRemoteConnection, TReaderTag>, + private ::NActor::TQueueInActor<TRemoteConnection, TWriterToReaderSocketMessage, TReaderTag>, + private ::NActor::TQueueInActor<TRemoteConnection, ui32, TWriterTag, TReconnectTag>, + private ::NActor::TQueueInActor<TRemoteConnection, ui32, TWriterTag, TWakeReaderTag>, + public TScheduleActor<TRemoteConnection, TWriterTag> { + friend struct TBusSessionImpl; + friend class TRemoteClientSession; + friend class TRemoteServerSession; + friend class ::NActor::TQueueInActor<TRemoteConnection, TWriterToReaderSocketMessage, TReaderTag>; + friend class ::NActor::TQueueInActor<TRemoteConnection, ui32, TWriterTag, TReconnectTag>; + friend class ::NActor::TQueueInActor<TRemoteConnection, ui32, TWriterTag, TWakeReaderTag>; + + protected: + ::NActor::TQueueInActor<TRemoteConnection, TWriterToReaderSocketMessage, TReaderTag>* ReaderGetSocketQueue() { + return this; + } + + ::NActor::TQueueInActor<TRemoteConnection, ui32, TWriterTag, TReconnectTag>* WriterGetReconnectQueue() { + return this; + } + + ::NActor::TQueueInActor<TRemoteConnection, ui32, TWriterTag, TWakeReaderTag>* WriterGetWakeQueue() { + return this; + } + + protected: + TRemoteConnection(TRemoteSessionPtr session, ui64 connectionId, TNetAddr addr); + ~TRemoteConnection() override; + + virtual void ClearOutgoingQueue(TMessagesPtrs&, bool reconnect /* or shutdown */); + + public: + void Send(TNonDestroyingAutoPtr<TBusMessage> msg); + void Shutdown(EMessageStatus status); + + inline const TNetAddr& GetAddr() const noexcept; + + private: + friend class TScheduleConnect; + friend class TWorkIO; + + protected: + static size_t MessageSize(TArrayRef<TBusMessagePtrAndHeader>); + bool QuotaAcquire(size_t msg, size_t bytes); + void QuotaConsume(size_t msg, size_t bytes); + void QuotaReturnSelf(size_t items, size_t bytes); + bool QuotaReturnValues(size_t items, size_t bytes); + + bool ReaderProcessBuffer(); + bool ReaderFillBuffer(); + void ReaderFlushMessages(); + + void ReadQuotaWakeup(); + ui32 WriteWakeFlags() const; + + virtual bool NeedInterruptRead() { + return false; + } + + public: + virtual void TryConnect(); + void ProcessItem(TReaderTag, ::NActor::TDefaultTag, TWriterToReaderSocketMessage); + void ProcessItem(TWriterTag, TReconnectTag, ui32 socketVersion); + void ProcessItem(TWriterTag, TWakeReaderTag, ui32 awakeFlags); + void Act(TReaderTag); + inline void WriterBeforeWriteErrorMessage(TBusMessage*, EMessageStatus); + void ClearBeforeSendQueue(EMessageStatus reasonForQueues); + void ClearReplyQueue(EMessageStatus reasonForQueues); + inline void ProcessBeforeSendQueueMessage(TBusMessage*, TInstant now); + void ProcessBeforeSendQueue(TInstant now); + void WriterProcessStatusDown(); + void ReaderProcessStatusDown(); + void ProcessWriterDown(); + void DropEnqueuedData(EMessageStatus reason, EMessageStatus reasonForQueues); + const TRemoteConnectionWriterStatus& WriterGetStatus(); + virtual void WriterFillStatus(); + void WriterFillInFlight(); + virtual void BeforeTryWrite(); + void Act(TWriterTag); + void ScheduleRead(); + void ScheduleWrite(); + void ScheduleShutdownOnServerOrReconnectOnClient(EMessageStatus status, bool writer); + void ScheduleShutdown(EMessageStatus status); + void WriterFlushBuffer(); + void WriterFillBuffer(); + void ReaderSendStatus(TInstant now, bool force = false); + const TRemoteConnectionReaderStatus& ReaderFillStatus(); + void WriterRotateCounters(); + void WriterSendStatus(TInstant now, bool force = false); + void WriterSendStatusIfNecessary(TInstant now); + void QuotaReturnAside(size_t items, size_t bytes); + virtual void ReaderProcessMessageUnknownVersion(TArrayRef<const char> dataRef) = 0; + bool MessageRead(TArrayRef<const char> dataRef, TInstant now); + virtual void MessageSent(TArrayRef<TBusMessagePtrAndHeader> messages) = 0; + + void CallSerialize(TBusMessage* msg, TBuffer& buffer) const; + void SerializeMessage(TBusMessage* msg, TBuffer* data, TMessageCounter* counter) const; + TBusMessage* DeserializeMessage(TArrayRef<const char> dataRef, const TBusHeader* header, TMessageCounter* messageCounter, EMessageStatus* status) const; + + void ResetOneWayFlag(TArrayRef<TBusMessage*>); + + inline ::NActor::TActor<TRemoteConnection, TWriterTag>* GetWriterActor() { + return this; + } + inline ::NActor::TActor<TRemoteConnection, TReaderTag>* GetReaderActor() { + return this; + } + inline TScheduleActor<TRemoteConnection, TWriterTag>* GetWriterSchedulerActor() { + return this; + } + + void WriterErrorMessage(TNonDestroyingAutoPtr<TBusMessage> m, EMessageStatus status); + // takes ownership of ms + void WriterErrorMessages(const TArrayRef<TBusMessage*> ms, EMessageStatus status); + + void FireClientConnectionEvent(TClientConnectionEvent::EType); + + size_t GetInFlight(); + size_t GetConnectSyscallsNumForTest(); + + bool IsReturnConnectFailedImmediately() { + return (bool)AtomicGet(ReturnConnectFailedImmediately); + } + + bool IsAlive() const; + + TRemoteSessionPtr Session; + TBusProtocol* const Proto; + TBusSessionConfig const Config; + bool RemovedFromSession; + const ui64 ConnectionId; + const TNetAddr PeerAddr; + const TBusSocketAddr PeerAddrSocketAddr; + + const TInstant CreatedTime; + TInstant LastConnectAttempt; + TAtomic ReturnConnectFailedImmediately; + + protected: + ::NActor::TQueueForActor<TBusMessage*> BeforeSendQueue; + TLockFreeStack<TBusHeader> WrongVersionRequests; + + struct TWriterData { + TAtomic Down; + + NEventLoop::TChannelPtr Channel; + ui32 SocketVersion; + + TRemoteConnectionWriterStatus Status; + TInstant StatusLastSendTime; + + TLocalTasks TimeToRotateCounters; + + TAtomic InFlight; + + TTimedMessages SendQueue; + ui32 AwakeFlags; + EWriterState State; + TLeftRightBuffer Buffer; + TInstant CorkUntil; + + TSystemEvent ShutdownComplete; + + void SetChannel(NEventLoop::TChannelPtr channel); + void DropChannel(); + + TWriterData(); + ~TWriterData(); + }; + + struct TReaderData { + TAtomic Down; + + NEventLoop::TChannelPtr Channel; + ui32 SocketVersion; + + TRemoteConnectionReaderStatus Status; + TInstant StatusLastSendTime; + + TBuffer Buffer; + size_t Offset; /* offset in read buffer */ + size_t MoreBytes; /* more bytes required from socket */ + TVectorSwaps<TBusMessagePtrAndHeader> ReadMessages; + + TSystemEvent ShutdownComplete; + + bool BufferMore() const noexcept { + return MoreBytes > 0; + } + + bool HasBytesInBuf(size_t bytes) noexcept; + void SetChannel(NEventLoop::TChannelPtr channel); + void DropChannel(); + + TReaderData(); + ~TReaderData(); + }; + + // owned by session status actor + struct TGranStatus { + TGranStatus(TDuration gran) + : Writer(gran) + , Reader(gran) + { + } + + TGranUp<TRemoteConnectionWriterStatus> Writer; + TGranUp<TRemoteConnectionReaderStatus> Reader; + }; + + TWriterData WriterData; + TReaderData ReaderData; + TGranStatus GranStatus; + TTokenQuota QuotaMsg; + TTokenQuota QuotaBytes; + + size_t MaxBufferSize; + + // client connection only + TLockFreeQueueBatch<TBusMessagePtrAndHeader, TVectorSwaps> ReplyQueue; + + EMessageStatus ShutdownReason; + }; + + inline const TNetAddr& TRemoteConnection::GetAddr() const noexcept { + return PeerAddr; + } + + typedef TIntrusivePtr<TRemoteConnection> TRemoteConnectionPtr; + + } +} |