aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/actors/interconnect/interconnect_common.h
blob: 28c40f46ffa6ead50fd747834a1a65b687ccd1f4 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
#pragma once

#include <library/cpp/actors/core/actorid.h>
#include <library/cpp/actors/core/actorsystem.h>
#include <library/cpp/actors/util/datetime.h>
#include <library/cpp/monlib/dynamic_counters/counters.h>
#include <library/cpp/monlib/metrics/metric_registry.h>
#include <util/generic/map.h>
#include <util/generic/set.h>
#include <util/system/datetime.h>

#include "poller_tcp.h"
#include "logging.h"
#include "event_filter.h"

#include <atomic>

namespace NActors {
    enum class EEncryptionMode {
        DISABLED, // no encryption is required at all
        OPTIONAL, // encryption is enabled when supported by both peers
        REQUIRED, // encryption is mandatory
    };

    struct TInterconnectSettings {
        TDuration Handshake; 
        TDuration DeadPeer; 
        TDuration CloseOnIdle; 
        ui32 SendBufferDieLimitInMB = 0; 
        ui64 OutputBuffersTotalSizeLimitInMB = 0; 
        ui32 TotalInflightAmountOfData = 0; 
        bool MergePerPeerCounters = false; 
        bool MergePerDataCenterCounters = false;
        ui32 TCPSocketBufferSize = 0; 
        TDuration PingPeriod = TDuration::Seconds(3); 
        TDuration ForceConfirmPeriod = TDuration::Seconds(1); 
        TDuration LostConnection;
        TDuration BatchPeriod;
        bool BindOnAllAddresses = true;
        EEncryptionMode EncryptionMode = EEncryptionMode::DISABLED;
        bool TlsAuthOnly = false;
        TString Certificate; // certificate data in PEM format
        TString PrivateKey; // private key for the certificate in PEM format
        TString CaFilePath; // path to certificate authority file
        TString CipherList; // encryption algorithms
        TDuration MessagePendingTimeout = TDuration::Seconds(1); // timeout for which messages are queued while in PendingConnection state
        ui64 MessagePendingSize = Max<ui64>(); // size of the queue
        ui32 MaxSerializedEventSize = NActors::EventMaxByteSize;

        ui32 GetSendBufferSize() const {
            ui32 res = 512 * 1024; // 512 kb is the default value for send buffer
            if (TCPSocketBufferSize) {
                res = TCPSocketBufferSize;
            }
            return res;
        }
    }; 

    struct TChannelSettings { 
        ui16 Weight; 
    }; 

    typedef TMap<ui16, TChannelSettings> TChannelsConfig; 

    using TRegisterMonPageCallback = std::function<void(const TString& path, const TString& title, 
                                                        TActorSystem* actorSystem, const TActorId& actorId)>;

    using TInitWhiteboardCallback = std::function<void(ui16 icPort, TActorSystem* actorSystem)>;

    using TUpdateWhiteboardCallback = std::function<void(const TString& peer, bool connected, bool green, bool yellow,
                                                         bool orange, bool red, TActorSystem* actorSystem)>; 

    struct TInterconnectProxyCommon : TAtomicRefCount<TInterconnectProxyCommon> { 
        TActorId NameserviceId;
        NMonitoring::TDynamicCounterPtr MonCounters; 
        std::shared_ptr<NMonitoring::IMetricRegistry> Metrics;
        TChannelsConfig ChannelsConfig;
        TInterconnectSettings Settings;
        TRegisterMonPageCallback RegisterMonPage; 
        TActorId DestructorId;
        std::shared_ptr<std::atomic<TAtomicBase>> DestructorQueueSize;
        TAtomicBase MaxDestructorQueueSize = 1024 * 1024 * 1024;
        TString ClusterUUID; 
        TVector<TString> AcceptUUID; 
        ui64 StartTime = GetCycleCountFast();
        TString TechnicalSelfHostName; 
        TInitWhiteboardCallback InitWhiteboard;
        TUpdateWhiteboardCallback UpdateWhiteboard; 
        ui32 HandshakeBallastSize = 0; 
        TAtomic StartedSessionKiller = 0; 
        TScopeId LocalScopeId;
        std::shared_ptr<TEventFilter> EventFilter;
        TString Cookie; // unique random identifier of a node instance (generated randomly at every start)
        std::unordered_map<ui16, TString> ChannelName;

        struct TVersionInfo {
            TString Tag; // version tag for this node
            TSet<TString> AcceptedTags; // we accept all enlisted version tags of peer nodes, but no others; empty = accept all
        };

        TMaybe<TVersionInfo> VersionInfo;

        using TPtr = TIntrusivePtr<TInterconnectProxyCommon>;
    }; 

}