aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/messagebus/remote_connection.h
diff options
context:
space:
mode:
authornga <nga@yandex-team.ru>2022-02-10 16:48:09 +0300
committerDaniil Cherednik <dcherednik@yandex-team.ru>2022-02-10 16:48:09 +0300
commit1f553f46fb4f3c5eec631352cdd900a0709016af (patch)
treea231fba2c03b440becaea6c86a2702d0bfb0336e /library/cpp/messagebus/remote_connection.h
parentc4de7efdedc25b49cbea74bd589eecb61b55b60a (diff)
downloadydb-1f553f46fb4f3c5eec631352cdd900a0709016af.tar.gz
Restoring authorship annotation for <nga@yandex-team.ru>. Commit 1 of 2.
Diffstat (limited to 'library/cpp/messagebus/remote_connection.h')
-rw-r--r--library/cpp/messagebus/remote_connection.h94
1 files changed, 47 insertions, 47 deletions
diff --git a/library/cpp/messagebus/remote_connection.h b/library/cpp/messagebus/remote_connection.h
index 4538947368..5481b2c6ed 100644
--- a/library/cpp/messagebus/remote_connection.h
+++ b/library/cpp/messagebus/remote_connection.h
@@ -1,17 +1,17 @@
#pragma once
-#include "async_result.h"
+#include "async_result.h"
#include "defs.h"
#include "event_loop.h"
#include "left_right_buffer.h"
#include "lfqueue_batch.h"
#include "message_ptr_and_header.h"
-#include "nondestroying_holder.h"
-#include "remote_connection_status.h"
-#include "scheduler_actor.h"
-#include "socket_addr.h"
+#include "nondestroying_holder.h"
+#include "remote_connection_status.h"
+#include "scheduler_actor.h"
+#include "socket_addr.h"
#include "storage.h"
-#include "vector_swaps.h"
+#include "vector_swaps.h"
#include "ybus.h"
#include "misc/granup.h"
#include "misc/tokenquota.h"
@@ -29,23 +29,23 @@
namespace NBus {
namespace NPrivate {
class TRemoteConnection;
-
+
typedef TIntrusivePtr<TRemoteConnection> TRemoteConnectionPtr;
typedef TIntrusivePtr<TBusSessionImpl> TRemoteSessionPtr;
-
+
static void* const WriteCookie = (void*)1;
static void* const ReadCookie = (void*)2;
-
+
enum {
WAKE_QUOTA_MSG = 0x01,
WAKE_QUOTA_BYTES = 0x02
};
-
+
struct TWriterTag {};
struct TReaderTag {};
struct TReconnectTag {};
struct TWakeReaderTag {};
-
+
struct TWriterToReaderSocketMessage {
TSocket Socket;
ui32 SocketVersion;
@@ -56,7 +56,7 @@ namespace NBus {
{
}
};
-
+
class TRemoteConnection
: public NEventLoop::IEventHandler,
public ::NActor::TActor<TRemoteConnection, TWriterTag>,
@@ -71,43 +71,43 @@ namespace NBus {
friend class ::NActor::TQueueInActor<TRemoteConnection, TWriterToReaderSocketMessage, TReaderTag>;
friend class ::NActor::TQueueInActor<TRemoteConnection, ui32, TWriterTag, TReconnectTag>;
friend class ::NActor::TQueueInActor<TRemoteConnection, ui32, TWriterTag, TWakeReaderTag>;
-
+
protected:
::NActor::TQueueInActor<TRemoteConnection, TWriterToReaderSocketMessage, TReaderTag>* ReaderGetSocketQueue() {
return this;
}
-
+
::NActor::TQueueInActor<TRemoteConnection, ui32, TWriterTag, TReconnectTag>* WriterGetReconnectQueue() {
return this;
}
-
+
::NActor::TQueueInActor<TRemoteConnection, ui32, TWriterTag, TWakeReaderTag>* WriterGetWakeQueue() {
return this;
}
-
+
protected:
TRemoteConnection(TRemoteSessionPtr session, ui64 connectionId, TNetAddr addr);
~TRemoteConnection() override;
-
+
virtual void ClearOutgoingQueue(TMessagesPtrs&, bool reconnect /* or shutdown */);
-
+
public:
void Send(TNonDestroyingAutoPtr<TBusMessage> msg);
void Shutdown(EMessageStatus status);
-
+
inline const TNetAddr& GetAddr() const noexcept;
private:
friend class TScheduleConnect;
friend class TWorkIO;
-
+
protected:
static size_t MessageSize(TArrayRef<TBusMessagePtrAndHeader>);
bool QuotaAcquire(size_t msg, size_t bytes);
void QuotaConsume(size_t msg, size_t bytes);
void QuotaReturnSelf(size_t items, size_t bytes);
bool QuotaReturnValues(size_t items, size_t bytes);
-
+
bool ReaderProcessBuffer();
bool ReaderFillBuffer();
void ReaderFlushMessages();
@@ -170,22 +170,22 @@ namespace NBus {
inline TScheduleActor<TRemoteConnection, TWriterTag>* GetWriterSchedulerActor() {
return this;
}
-
+
void WriterErrorMessage(TNonDestroyingAutoPtr<TBusMessage> m, EMessageStatus status);
// takes ownership of ms
void WriterErrorMessages(const TArrayRef<TBusMessage*> ms, EMessageStatus status);
void FireClientConnectionEvent(TClientConnectionEvent::EType);
-
+
size_t GetInFlight();
size_t GetConnectSyscallsNumForTest();
-
+
bool IsReturnConnectFailedImmediately() {
return (bool)AtomicGet(ReturnConnectFailedImmediately);
}
-
+
bool IsAlive() const;
-
+
TRemoteSessionPtr Session;
TBusProtocol* const Proto;
TBusSessionConfig const Config;
@@ -193,15 +193,15 @@ namespace NBus {
const ui64 ConnectionId;
const TNetAddr PeerAddr;
const TBusSocketAddr PeerAddrSocketAddr;
-
+
const TInstant CreatedTime;
TInstant LastConnectAttempt;
TAtomic ReturnConnectFailedImmediately;
-
+
protected:
::NActor::TQueueForActor<TBusMessage*> BeforeSendQueue;
TLockFreeStack<TBusHeader> WrongVersionRequests;
-
+
struct TWriterData {
TAtomic Down;
@@ -212,7 +212,7 @@ namespace NBus {
TInstant StatusLastSendTime;
TLocalTasks TimeToRotateCounters;
-
+
TAtomic InFlight;
TTimedMessages SendQueue;
@@ -220,44 +220,44 @@ namespace NBus {
EWriterState State;
TLeftRightBuffer Buffer;
TInstant CorkUntil;
-
+
TSystemEvent ShutdownComplete;
-
+
void SetChannel(NEventLoop::TChannelPtr channel);
void DropChannel();
-
+
TWriterData();
~TWriterData();
};
-
+
struct TReaderData {
TAtomic Down;
-
+
NEventLoop::TChannelPtr Channel;
ui32 SocketVersion;
-
+
TRemoteConnectionReaderStatus Status;
TInstant StatusLastSendTime;
-
+
TBuffer Buffer;
size_t Offset; /* offset in read buffer */
size_t MoreBytes; /* more bytes required from socket */
TVectorSwaps<TBusMessagePtrAndHeader> ReadMessages;
-
+
TSystemEvent ShutdownComplete;
-
+
bool BufferMore() const noexcept {
return MoreBytes > 0;
}
-
+
bool HasBytesInBuf(size_t bytes) noexcept;
void SetChannel(NEventLoop::TChannelPtr channel);
void DropChannel();
-
+
TReaderData();
~TReaderData();
};
-
+
// owned by session status actor
struct TGranStatus {
TGranStatus(TDuration gran)
@@ -265,7 +265,7 @@ namespace NBus {
, Reader(gran)
{
}
-
+
TGranUp<TRemoteConnectionWriterStatus> Writer;
TGranUp<TRemoteConnectionReaderStatus> Reader;
};
@@ -275,15 +275,15 @@ namespace NBus {
TGranStatus GranStatus;
TTokenQuota QuotaMsg;
TTokenQuota QuotaBytes;
-
+
size_t MaxBufferSize;
-
+
// client connection only
TLockFreeQueueBatch<TBusMessagePtrAndHeader, TVectorSwaps> ReplyQueue;
EMessageStatus ShutdownReason;
- };
-
+ };
+
inline const TNetAddr& TRemoteConnection::GetAddr() const noexcept {
return PeerAddr;
}