aboutsummaryrefslogtreecommitdiffstats
path: root/ydb/library/yql/dq/actors/compute/retry_queue.cpp
blob: b4e9b9a4f90c8f7ffff48d61bc61fe09bfda9b4e (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
#include "retry_queue.h"

#include <util/generic/utility.h>

namespace NYql::NDq {

void TRetryEventsQueue::Init(const TTxId& txId, const NActors::TActorId& senderId, const NActors::TActorId& selfId, ui64 eventQueueId) {
    TxId = txId;
    SenderId = senderId;
    SelfId = selfId;
    Y_ASSERT(SelfId.NodeId() == SenderId.NodeId());
    EventQueueId = eventQueueId;
}

void TRetryEventsQueue::OnNewRecipientId(const NActors::TActorId& recipientId, bool unsubscribe) {
    if (unsubscribe) {
        Unsubscribe();
    }
    RecipientId = recipientId;
    LocalRecipient = RecipientId.NodeId() == SelfId.NodeId();
    NextSeqNo = 1;
    Events.clear();
    MyConfirmedSeqNo = 0;
    ReceivedEventsSeqNos.clear();
    Connected = false;
    RetryState = Nothing();
}

void TRetryEventsQueue::HandleNodeDisconnected(ui32 nodeId) {
    if (nodeId == RecipientId.NodeId()) {
        Connected = false;
        ScheduleRetry();
    }
}

void TRetryEventsQueue::HandleNodeConnected(ui32 nodeId) {
    if (nodeId == RecipientId.NodeId()) {
        if (!Connected) {
            Connected = true;
            RetryState = Nothing();

            // (Re)send all events
            for (const IRetryableEvent::TPtr& ev : Events) {
                SendRetryable(ev);
            }
        }
    }
}

void TRetryEventsQueue::Retry() {
    RetryScheduled = false;
    if (!Connected) {
        Connect();
    }
}

void TRetryEventsQueue::Connect() {
    auto connectEvent = MakeHolder<NActors::TEvInterconnect::TEvConnectNode>();
    NActors::TActivationContext::Send(
        new NActors::IEventHandle(NActors::TActivationContext::InterconnectProxy(RecipientId.NodeId()), SenderId, connectEvent.Release(), 0, 0));
}

void TRetryEventsQueue::Unsubscribe() {
    if (Connected) {
        Connected = false;
        auto unsubscribeEvent = MakeHolder<NActors::TEvents::TEvUnsubscribe>();
        NActors::TActivationContext::Send(
            new NActors::IEventHandle(NActors::TActivationContext::InterconnectProxy(RecipientId.NodeId()), SenderId, unsubscribeEvent.Release(), 0, 0));
    }
}

void TRetryEventsQueue::RemoveConfirmedEvents(ui64 confirmedSeqNo) {
    while (!Events.empty() && Events.front()->GetSeqNo() <= confirmedSeqNo) {
        Events.pop_front();
    }
    if (Events.size() > TEvRetryQueuePrivate::UNCONFIRMED_EVENTS_COUNT_LIMIT) {
        throw yexception()
            << "Too many unconfirmed events: " << Events.size()
            << ". Confirmed SeqNo: " << confirmedSeqNo
            << ". Unconfirmed SeqNos: " << Events.front()->GetSeqNo() << "-" << Events.back()->GetSeqNo()
            << ". TxId: \"" << TxId << "\". EventQueueId: " << EventQueueId;
    }
}

void TRetryEventsQueue::SendRetryable(const IRetryableEvent::TPtr& ev) {
    NActors::TActivationContext::Send(ev->Clone(MyConfirmedSeqNo).Release());
}

void TRetryEventsQueue::ScheduleRetry() {
    if (!RetryScheduled && !Events.empty()) {
        RetryScheduled = true;
        if (!RetryState) {
            RetryState.ConstructInPlace();
        }
        auto ev = MakeHolder<TEvRetryQueuePrivate::TEvRetry>(EventQueueId);
        NActors::TActivationContext::Schedule(RetryState->GetNextDelay(), new NActors::IEventHandle(SelfId, SelfId, ev.Release()));
    }
}

TDuration TRetryEventsQueue::TRetryState::GetNextDelay() {
    constexpr TDuration MaxDelay = TDuration::Seconds(10);
    constexpr TDuration MinDelay = TDuration::MilliSeconds(100); // from second retry
    TDuration ret = Delay; // The first delay is zero
    Delay = ClampVal(Delay * 2, MinDelay, MaxDelay);
    return ret ? RandomizeDelay(ret) : ret;
}

TDuration TRetryEventsQueue::TRetryState::RandomizeDelay(TDuration baseDelay) {
    const TDuration::TValue half = baseDelay.GetValue() / 2;
    return TDuration::FromValue(half + RandomNumber<TDuration::TValue>(half));
}

} // namespace NYql::NDq