aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/actors/interconnect/interconnect_channel.h
blob: 56d6e31ba7218b0b2d7403d8200bf90a75d3c3ea (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
#pragma once 
 
#include <library/cpp/monlib/dynamic_counters/counters.h>
#include <library/cpp/actors/core/actorsystem.h>
#include <library/cpp/actors/core/event_load.h>
#include <library/cpp/actors/util/rope.h>
#include <util/generic/deque.h> 
#include <util/generic/vector.h> 
#include <util/generic/map.h> 
#include <util/stream/walk.h> 
#include <library/cpp/actors/wilson/wilson_event.h>
#include <library/cpp/actors/helpers/mon_histogram_helper.h>
 
#include "interconnect_common.h" 
#include "interconnect_counters.h" 
#include "packet.h" 
#include "event_holder_pool.h" 
 
namespace NActors { 
#pragma pack(push, 1) 
    struct TChannelPart {
        ui16 Channel;
        ui16 Size;
 
        static constexpr ui16 LastPartFlag = ui16(1) << 15;
 
        TString ToString() const { 
            return TStringBuilder() << "{Channel# " << (Channel & ~LastPartFlag) 
                << " LastPartFlag# " << ((Channel & LastPartFlag) ? "true" : "false") 
                << " Size# " << Size << "}"; 
        } 
    };
#pragma pack(pop) 
 
    struct TExSerializedEventTooLarge : std::exception { 
        const ui32 Type; 
 
        TExSerializedEventTooLarge(ui32 type) 
            : Type(type) 
        {} 
    }; 
 
    class TEventOutputChannel : public TInterconnectLoggingBase { 
    public:
        TEventOutputChannel(TEventHolderPool& pool, ui16 id, ui32 peerNodeId, ui32 maxSerializedEventSize,
                std::shared_ptr<IInterconnectMetrics> metrics, TSessionParams params)
            : TInterconnectLoggingBase(Sprintf("OutputChannel %" PRIu16 " [node %" PRIu32 "]", id, peerNodeId))
            , Pool(pool)
            , PeerNodeId(peerNodeId)
            , ChannelId(id)
            , Metrics(std::move(metrics))
            , Params(std::move(params)) 
            , MaxSerializedEventSize(maxSerializedEventSize) 
        {} 
 
        ~TEventOutputChannel() { 
        } 
 
        std::pair<ui32, TEventHolder*> Push(IEventHandle& ev) {
            TEventHolder& event = Pool.Allocate(Queue); 
            const ui32 bytes = event.Fill(ev) + sizeof(TEventDescr); 
            OutputQueueSize += bytes; 
            return std::make_pair(bytes, &event);
        }
 
        void DropConfirmed(ui64 confirm); 
 
        bool FeedBuf(TTcpPacketOutTask& task, ui64 serial, ui64 *weightConsumed); 
 
        bool IsEmpty() const {
            return Queue.empty(); 
        }
 
        bool IsWorking() const { 
            return !IsEmpty(); 
        } 
 
        ui32 GetQueueSize() const {
            return (ui32)Queue.size();
        }
 
        ui64 GetBufferedAmountOfData() const { 
            return OutputQueueSize;
        }
 
        void NotifyUndelivered(); 
 
        TEventHolderPool& Pool;
        const ui32 PeerNodeId;
        const ui16 ChannelId;
        std::shared_ptr<IInterconnectMetrics> Metrics;
        const TSessionParams Params; 
        const ui32 MaxSerializedEventSize; 
        ui64 UnaccountedTraffic = 0; 
        ui64 EqualizeCounterOnPause = 0; 
        ui64 WeightConsumedOnPause = 0; 
 
        enum class EState { 
            INITIAL, 
            CHUNKER, 
            BUFFER, 
            DESCRIPTOR, 
        }; 
        EState State = EState::INITIAL; 
 
        static constexpr ui16 MinimumFreeSpace = sizeof(TChannelPart) + sizeof(TEventDescr); 
 
    protected:
        ui64 OutputQueueSize = 0; 
 
        std::list<TEventHolder> Queue; 
        std::list<TEventHolder> NotYetConfirmed; 
        TRope::TConstIterator Iter; 
        TCoroutineChunkSerializer Chunker; 
        bool ExtendedFormat = false; 
 
        bool FeedDescriptor(TTcpPacketOutTask& task, TEventHolder& event, ui64 *weightConsumed); 
 
        void AccountTraffic() { 
            if (const ui64 amount = std::exchange(UnaccountedTraffic, 0)) { 
                Metrics->UpdateOutputChannelTraffic(ChannelId, amount);
            } 
        }
 
        friend class TInterconnectSessionTCP;
    };
}