1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
|
#pragma once
#include <library/cpp/actors/core/actorid.h>
#include <library/cpp/actors/core/actorsystem.h>
#include <library/cpp/actors/util/datetime.h>
#include <library/cpp/monlib/dynamic_counters/counters.h>
#include <library/cpp/monlib/metrics/metric_registry.h>
#include <util/generic/map.h>
#include <util/generic/set.h>
#include <util/system/datetime.h>
#include "poller_tcp.h"
#include "logging.h"
#include "event_filter.h"
#include <atomic>
namespace NActors {
enum class EEncryptionMode {
DISABLED, // no encryption is required at all
OPTIONAL, // encryption is enabled when supported by both peers
REQUIRED, // encryption is mandatory
};
struct TInterconnectSettings {
TDuration Handshake;
TDuration DeadPeer;
TDuration CloseOnIdle;
ui32 SendBufferDieLimitInMB = 0;
ui64 OutputBuffersTotalSizeLimitInMB = 0;
ui32 TotalInflightAmountOfData = 0;
bool MergePerPeerCounters = false;
bool MergePerDataCenterCounters = false;
ui32 TCPSocketBufferSize = 0;
TDuration PingPeriod = TDuration::Seconds(3);
TDuration ForceConfirmPeriod = TDuration::Seconds(1);
TDuration LostConnection;
TDuration BatchPeriod;
bool BindOnAllAddresses = true;
EEncryptionMode EncryptionMode = EEncryptionMode::DISABLED;
bool TlsAuthOnly = false;
TString Certificate; // certificate data in PEM format
TString PrivateKey; // private key for the certificate in PEM format
TString CaFilePath; // path to certificate authority file
TString CipherList; // encryption algorithms
TDuration MessagePendingTimeout = TDuration::Seconds(1); // timeout for which messages are queued while in PendingConnection state
ui64 MessagePendingSize = Max<ui64>(); // size of the queue
ui32 MaxSerializedEventSize = NActors::EventMaxByteSize;
ui32 PreallocatedBufferSize = 8 << 10; // 8 KB
ui32 NumPreallocatedBuffers = 16;
bool EnableExternalDataChannel = false;
bool ValidateIncomingPeerViaDirectLookup = false;
ui32 SocketBacklogSize = 0; // SOMAXCONN if zero
ui32 GetSendBufferSize() const {
ui32 res = 512 * 1024; // 512 kb is the default value for send buffer
if (TCPSocketBufferSize) {
res = TCPSocketBufferSize;
}
return res;
}
};
struct TWhiteboardSessionStatus {
TActorSystem* ActorSystem;
ui32 PeerId;
TString Peer;
bool Connected;
bool Green;
bool Yellow;
bool Orange;
bool Red;
i64 ClockSkew;
TWhiteboardSessionStatus(TActorSystem* actorSystem, ui32 peerId, const TString& peer, bool connected, bool green, bool yellow, bool orange, bool red, i64 clockSkew)
: ActorSystem(actorSystem)
, PeerId(peerId)
, Peer(peer)
, Connected(connected)
, Green(green)
, Yellow(yellow)
, Orange(orange)
, Red(red)
, ClockSkew(clockSkew)
{}
};
struct TChannelSettings {
ui16 Weight;
};
typedef TMap<ui16, TChannelSettings> TChannelsConfig;
using TRegisterMonPageCallback = std::function<void(const TString& path, const TString& title,
TActorSystem* actorSystem, const TActorId& actorId)>;
using TInitWhiteboardCallback = std::function<void(ui16 icPort, TActorSystem* actorSystem)>;
using TUpdateWhiteboardCallback = std::function<void(const TWhiteboardSessionStatus& data)>;
struct TInterconnectProxyCommon : TAtomicRefCount<TInterconnectProxyCommon> {
TActorId NameserviceId;
NMonitoring::TDynamicCounterPtr MonCounters;
std::shared_ptr<NMonitoring::IMetricRegistry> Metrics;
TChannelsConfig ChannelsConfig;
TInterconnectSettings Settings;
TRegisterMonPageCallback RegisterMonPage;
TActorId DestructorId;
std::shared_ptr<std::atomic<TAtomicBase>> DestructorQueueSize;
TAtomicBase MaxDestructorQueueSize = 1024 * 1024 * 1024;
TString ClusterUUID;
TVector<TString> AcceptUUID;
ui64 StartTime = GetCycleCountFast();
TString TechnicalSelfHostName;
TInitWhiteboardCallback InitWhiteboard;
TUpdateWhiteboardCallback UpdateWhiteboard;
ui32 HandshakeBallastSize = 0;
TAtomic StartedSessionKiller = 0;
TScopeId LocalScopeId;
std::shared_ptr<TEventFilter> EventFilter;
TString Cookie; // unique random identifier of a node instance (generated randomly at every start)
std::unordered_map<ui16, TString> ChannelName;
std::optional<ui32> OutgoingHandshakeInflightLimit;
struct TVersionInfo {
TString Tag; // version tag for this node
TSet<TString> AcceptedTags; // we accept all enlisted version tags of peer nodes, but no others; empty = accept all
};
// obsolete compatibility control
TMaybe<TVersionInfo> VersionInfo;
std::optional<TString> CompatibilityInfo;
std::function<bool(const TString&, TString&)> ValidateCompatibilityInfo;
std::function<bool(const TInterconnectProxyCommon::TVersionInfo&, TString&)> ValidateCompatibilityOldFormat;
using TPtr = TIntrusivePtr<TInterconnectProxyCommon>;
};
}
|