1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
|
#pragma once
#include <library/cpp/monlib/dynamic_counters/counters.h>
#include <library/cpp/actors/core/actorsystem.h>
#include <library/cpp/actors/core/event_load.h>
#include <library/cpp/actors/util/rope.h>
#include <util/generic/deque.h>
#include <util/generic/vector.h>
#include <util/generic/map.h>
#include <util/stream/walk.h>
#include <library/cpp/actors/wilson/wilson_event.h>
#include "interconnect_common.h"
#include "interconnect_counters.h"
#include "packet.h"
#include "event_holder_pool.h"
namespace NActors {
#pragma pack(push, 1)
struct TChannelPart {
ui16 Channel;
ui16 Size;
static constexpr ui16 LastPartFlag = ui16(1) << 15;
TString ToString() const {
return TStringBuilder() << "{Channel# " << (Channel & ~LastPartFlag)
<< " LastPartFlag# " << ((Channel & LastPartFlag) ? "true" : "false")
<< " Size# " << Size << "}";
}
};
#pragma pack(pop)
struct TExSerializedEventTooLarge : std::exception {
const ui32 Type;
TExSerializedEventTooLarge(ui32 type)
: Type(type)
{}
};
class TEventOutputChannel : public TInterconnectLoggingBase {
public:
TEventOutputChannel(TEventHolderPool& pool, ui16 id, ui32 peerNodeId, ui32 maxSerializedEventSize,
std::shared_ptr<IInterconnectMetrics> metrics, TSessionParams params)
: TInterconnectLoggingBase(Sprintf("OutputChannel %" PRIu16 " [node %" PRIu32 "]", id, peerNodeId))
, Pool(pool)
, PeerNodeId(peerNodeId)
, ChannelId(id)
, Metrics(std::move(metrics))
, Params(std::move(params))
, MaxSerializedEventSize(maxSerializedEventSize)
{}
~TEventOutputChannel() {
}
std::pair<ui32, TEventHolder*> Push(IEventHandle& ev) {
TEventHolder& event = Pool.Allocate(Queue);
const ui32 bytes = event.Fill(ev) + sizeof(TEventDescr);
OutputQueueSize += bytes;
return std::make_pair(bytes, &event);
}
void DropConfirmed(ui64 confirm);
bool FeedBuf(TTcpPacketOutTask& task, ui64 serial, ui64 *weightConsumed);
bool IsEmpty() const {
return Queue.empty();
}
bool IsWorking() const {
return !IsEmpty();
}
ui32 GetQueueSize() const {
return (ui32)Queue.size();
}
ui64 GetBufferedAmountOfData() const {
return OutputQueueSize;
}
void NotifyUndelivered();
TEventHolderPool& Pool;
const ui32 PeerNodeId;
const ui16 ChannelId;
std::shared_ptr<IInterconnectMetrics> Metrics;
const TSessionParams Params;
const ui32 MaxSerializedEventSize;
ui64 UnaccountedTraffic = 0;
ui64 EqualizeCounterOnPause = 0;
ui64 WeightConsumedOnPause = 0;
enum class EState {
INITIAL,
CHUNKER,
BUFFER,
DESCRIPTOR,
};
EState State = EState::INITIAL;
static constexpr ui16 MinimumFreeSpace = sizeof(TChannelPart) + sizeof(TEventDescr);
protected:
ui64 OutputQueueSize = 0;
std::list<TEventHolder> Queue;
std::list<TEventHolder> NotYetConfirmed;
TRope::TConstIterator Iter;
TCoroutineChunkSerializer Chunker;
bool ExtendedFormat = false;
bool FeedDescriptor(TTcpPacketOutTask& task, TEventHolder& event, ui64 *weightConsumed);
void AccountTraffic() {
if (const ui64 amount = std::exchange(UnaccountedTraffic, 0)) {
Metrics->UpdateOutputChannelTraffic(ChannelId, amount);
}
}
friend class TInterconnectSessionTCP;
};
}
|