aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/actors/interconnect/interconnect_common.h
diff options
context:
space:
mode:
authorDevtools Arcadia <arcadia-devtools@yandex-team.ru>2022-02-07 18:08:42 +0300
committerDevtools Arcadia <arcadia-devtools@mous.vla.yp-c.yandex.net>2022-02-07 18:08:42 +0300
commit1110808a9d39d4b808aef724c861a2e1a38d2a69 (patch)
treee26c9fed0de5d9873cce7e00bc214573dc2195b7 /library/cpp/actors/interconnect/interconnect_common.h
downloadydb-1110808a9d39d4b808aef724c861a2e1a38d2a69.tar.gz
intermediate changes
ref:cde9a383711a11544ce7e107a78147fb96cc4029
Diffstat (limited to 'library/cpp/actors/interconnect/interconnect_common.h')
-rw-r--r--library/cpp/actors/interconnect/interconnect_common.h106
1 files changed, 106 insertions, 0 deletions
diff --git a/library/cpp/actors/interconnect/interconnect_common.h b/library/cpp/actors/interconnect/interconnect_common.h
new file mode 100644
index 0000000000..285709a00c
--- /dev/null
+++ b/library/cpp/actors/interconnect/interconnect_common.h
@@ -0,0 +1,106 @@
+#pragma once
+
+#include <library/cpp/actors/core/actorid.h>
+#include <library/cpp/actors/core/actorsystem.h>
+#include <library/cpp/actors/util/datetime.h>
+#include <library/cpp/monlib/dynamic_counters/counters.h>
+#include <library/cpp/monlib/metrics/metric_registry.h>
+#include <util/generic/map.h>
+#include <util/generic/set.h>
+#include <util/system/datetime.h>
+
+#include "poller_tcp.h"
+#include "logging.h"
+#include "event_filter.h"
+
+#include <atomic>
+
+namespace NActors {
+ enum class EEncryptionMode {
+ DISABLED, // no encryption is required at all
+ OPTIONAL, // encryption is enabled when supported by both peers
+ REQUIRED, // encryption is mandatory
+ };
+
+ struct TInterconnectSettings {
+ TDuration Handshake;
+ TDuration DeadPeer;
+ TDuration CloseOnIdle;
+ ui32 SendBufferDieLimitInMB = 0;
+ ui64 OutputBuffersTotalSizeLimitInMB = 0;
+ ui32 TotalInflightAmountOfData = 0;
+ bool MergePerPeerCounters = false;
+ bool MergePerDataCenterCounters = false;
+ ui32 TCPSocketBufferSize = 0;
+ TDuration PingPeriod = TDuration::Seconds(3);
+ TDuration ForceConfirmPeriod = TDuration::Seconds(1);
+ TDuration LostConnection;
+ TDuration BatchPeriod;
+ bool BindOnAllAddresses = true;
+ EEncryptionMode EncryptionMode = EEncryptionMode::DISABLED;
+ bool TlsAuthOnly = false;
+ TString Certificate; // certificate data in PEM format
+ TString PrivateKey; // private key for the certificate in PEM format
+ TString CaFilePath; // path to certificate authority file
+ TString CipherList; // encryption algorithms
+ TDuration MessagePendingTimeout = TDuration::Seconds(1); // timeout for which messages are queued while in PendingConnection state
+ ui64 MessagePendingSize = Max<ui64>(); // size of the queue
+ ui32 MaxSerializedEventSize = NActors::EventMaxByteSize;
+
+ ui32 GetSendBufferSize() const {
+ ui32 res = 512 * 1024; // 512 kb is the default value for send buffer
+ if (TCPSocketBufferSize) {
+ res = TCPSocketBufferSize;
+ }
+ return res;
+ }
+ };
+
+ struct TChannelSettings {
+ ui16 Weight;
+ };
+
+ typedef TMap<ui16, TChannelSettings> TChannelsConfig;
+
+ using TRegisterMonPageCallback = std::function<void(const TString& path, const TString& title,
+ TActorSystem* actorSystem, const TActorId& actorId)>;
+
+ using TInitWhiteboardCallback = std::function<void(ui16 icPort, TActorSystem* actorSystem)>;
+
+ using TUpdateWhiteboardCallback = std::function<void(const TString& peer, bool connected, bool green, bool yellow,
+ bool orange, bool red, TActorSystem* actorSystem)>;
+
+ struct TInterconnectProxyCommon : TAtomicRefCount<TInterconnectProxyCommon> {
+ TActorId NameserviceId;
+ NMonitoring::TDynamicCounterPtr MonCounters;
+ std::shared_ptr<NMonitoring::IMetricRegistry> Metrics;
+ TChannelsConfig ChannelsConfig;
+ TInterconnectSettings Settings;
+ TRegisterMonPageCallback RegisterMonPage;
+ TActorId DestructorId;
+ std::shared_ptr<std::atomic<TAtomicBase>> DestructorQueueSize;
+ TAtomicBase MaxDestructorQueueSize = 1024 * 1024 * 1024;
+ TString ClusterUUID;
+ TVector<TString> AcceptUUID;
+ ui64 StartTime = GetCycleCountFast();
+ TString TechnicalSelfHostName;
+ TInitWhiteboardCallback InitWhiteboard;
+ TUpdateWhiteboardCallback UpdateWhiteboard;
+ ui32 HandshakeBallastSize = 0;
+ TAtomic StartedSessionKiller = 0;
+ TScopeId LocalScopeId;
+ std::shared_ptr<TEventFilter> EventFilter;
+ TString Cookie; // unique random identifier of a node instance (generated randomly at every start)
+ std::unordered_map<ui16, TString> ChannelName;
+
+ struct TVersionInfo {
+ TString Tag; // version tag for this node
+ TSet<TString> AcceptedTags; // we accept all enlisted version tags of peer nodes, but no others; empty = accept all
+ };
+
+ TMaybe<TVersionInfo> VersionInfo;
+
+ using TPtr = TIntrusivePtr<TInterconnectProxyCommon>;
+ };
+
+}