1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
|
#include "retry_queue.h"
#include <util/generic/utility.h>
namespace NYql::NDq {
void TRetryEventsQueue::Init(const TTxId& txId, const NActors::TActorId& senderId, const NActors::TActorId& selfId, ui64 eventQueueId) {
TxId = txId;
SenderId = senderId;
SelfId = selfId;
Y_ASSERT(SelfId.NodeId() == SenderId.NodeId());
EventQueueId = eventQueueId;
}
void TRetryEventsQueue::OnNewRecipientId(const NActors::TActorId& recipientId, bool unsubscribe) {
if (unsubscribe) {
Unsubscribe();
}
RecipientId = recipientId;
LocalRecipient = RecipientId.NodeId() == SelfId.NodeId();
NextSeqNo = 1;
Events.clear();
MyConfirmedSeqNo = 0;
ReceivedEventsSeqNos.clear();
Connected = false;
RetryState = Nothing();
}
void TRetryEventsQueue::HandleNodeDisconnected(ui32 nodeId) {
if (nodeId == RecipientId.NodeId()) {
Connected = false;
ScheduleRetry();
}
}
void TRetryEventsQueue::HandleNodeConnected(ui32 nodeId) {
if (nodeId == RecipientId.NodeId()) {
if (!Connected) {
Connected = true;
RetryState = Nothing();
// (Re)send all events
for (const IRetryableEvent::TPtr& ev : Events) {
SendRetryable(ev);
}
}
}
}
void TRetryEventsQueue::Retry() {
RetryScheduled = false;
if (!Connected) {
Connect();
}
}
void TRetryEventsQueue::Connect() {
auto connectEvent = MakeHolder<NActors::TEvInterconnect::TEvConnectNode>();
NActors::TActivationContext::Send(
new NActors::IEventHandle(NActors::TActivationContext::InterconnectProxy(RecipientId.NodeId()), SenderId, connectEvent.Release(), 0, 0));
}
void TRetryEventsQueue::Unsubscribe() {
if (Connected) {
Connected = false;
auto unsubscribeEvent = MakeHolder<NActors::TEvents::TEvUnsubscribe>();
NActors::TActivationContext::Send(
new NActors::IEventHandle(NActors::TActivationContext::InterconnectProxy(RecipientId.NodeId()), SenderId, unsubscribeEvent.Release(), 0, 0));
}
}
void TRetryEventsQueue::RemoveConfirmedEvents(ui64 confirmedSeqNo) {
while (!Events.empty() && Events.front()->GetSeqNo() <= confirmedSeqNo) {
Events.pop_front();
}
if (Events.size() > TEvRetryQueuePrivate::UNCONFIRMED_EVENTS_COUNT_LIMIT) {
throw yexception()
<< "Too many unconfirmed events: " << Events.size()
<< ". Confirmed SeqNo: " << confirmedSeqNo
<< ". Unconfirmed SeqNos: " << Events.front()->GetSeqNo() << "-" << Events.back()->GetSeqNo()
<< ". TxId: \"" << TxId << "\". EventQueueId: " << EventQueueId;
}
}
void TRetryEventsQueue::SendRetryable(const IRetryableEvent::TPtr& ev) {
NActors::TActivationContext::Send(ev->Clone(MyConfirmedSeqNo).Release());
}
void TRetryEventsQueue::ScheduleRetry() {
if (!RetryScheduled && !Events.empty()) {
RetryScheduled = true;
if (!RetryState) {
RetryState.ConstructInPlace();
}
auto ev = MakeHolder<TEvRetryQueuePrivate::TEvRetry>(EventQueueId);
NActors::TActivationContext::Schedule(RetryState->GetNextDelay(), new NActors::IEventHandle(SelfId, SelfId, ev.Release()));
}
}
TDuration TRetryEventsQueue::TRetryState::GetNextDelay() {
constexpr TDuration MaxDelay = TDuration::Seconds(10);
constexpr TDuration MinDelay = TDuration::MilliSeconds(100); // from second retry
TDuration ret = Delay; // The first delay is zero
Delay = ClampVal(Delay * 2, MinDelay, MaxDelay);
return ret ? RandomizeDelay(ret) : ret;
}
TDuration TRetryEventsQueue::TRetryState::RandomizeDelay(TDuration baseDelay) {
const TDuration::TValue half = baseDelay.GetValue() / 2;
return TDuration::FromValue(half + RandomNumber<TDuration::TValue>(half));
}
} // namespace NYql::NDq
|