aboutsummaryrefslogtreecommitdiffstats
path: root/ydb/core/ymq/actor/fifo_cleanup.cpp
blob: 218deb0fcc0c103404456e29eb92677e39154fef (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
#include "fifo_cleanup.h"
#include "cfg.h"
#include "log.h"
#include "executor.h"

#include <ydb/public/lib/value/value.h>
#include <ydb/core/base/appdata.h>
#include <ydb/core/ymq/base/debug_info.h>

#include <library/cpp/actors/core/hfunc.h>

#include <util/random/random.h>

namespace NKikimr::NSQS {

TCleanupActor::TCleanupActor(const TQueuePath& queuePath, const TActorId& queueLeader, ECleanupType cleanupType)
    : QueuePath_(queuePath)
    , RequestId_(CreateGuidAsString())
    , QueueLeader_(queueLeader)
    , CleanupType(cleanupType)
{
    DebugInfo->QueueCleanupActors.emplace(TStringBuilder() << TLogQueueName(QueuePath_), this);
}

TCleanupActor::~TCleanupActor() {
    DebugInfo->QueueCleanupActors.EraseKeyValue(TStringBuilder() << TLogQueueName(QueuePath_), this);
}

void TCleanupActor::Bootstrap() {
    RLOG_SQS_INFO("Bootstrap cleanup actor for queue " << TLogQueueName(QueuePath_));
    Become(&TThis::StateFunc);
    Schedule(RandomCleanupPeriod(), new TEvWakeup());
}

TDuration TCleanupActor::RandomCleanupPeriod() {
    const ui64 cleanupPeriodMs = Cfg().GetCleanupPeriodMs();
    Y_VERIFY(cleanupPeriodMs > 0);
    return TDuration::MilliSeconds(cleanupPeriodMs) +
        TDuration::MilliSeconds(RandomNumber<ui64>(cleanupPeriodMs / 4));
}

void TCleanupActor::HandleExecuted(TSqsEvents::TEvExecuted::TPtr& ev) {
    const auto& record = ev->Get()->Record;
    const ui32 status = record.GetStatus();
    if (status == TEvTxUserProxy::TEvProposeTransactionStatus::EStatus::ExecComplete) {
        using NKikimr::NClient::TValue;
        const TValue val(TValue::Create(record.GetExecutionEngineEvaluatedResponse()));
        const bool shouldContinue = val["moreData"];
        const TValue lastProcessedKey = val["lastProcessedKey"];
        if (lastProcessedKey.HaveValue()) {
            KeyRangeStart = lastProcessedKey;
        }

        if (shouldContinue) {
            RunCleanupQuery();
        } else {
            Schedule(RandomCleanupPeriod(), new TEvWakeup());
        }
    } else {
        RLOG_SQS_ERROR("Cleanup query failed. Queue: " << TLogQueueName(QueuePath_));
        Schedule(RandomCleanupPeriod(), new TEvWakeup());
    }
}

void TCleanupActor::HandlePoisonPill(TEvPoisonPill::TPtr&) {
    PassAway();
}

void TCleanupActor::HandleWakeup() {
    KeyRangeStart = "";
    RunCleanupQuery();
}

void TCleanupActor::RunCleanupQuery() {
    TExecutorBuilder builder(SelfId(), RequestId_);
    builder
        .User(QueuePath_.UserName)
        .Queue(QueuePath_.QueueName)
        .QueueLeader(QueueLeader_)
        .QueryId(GetCleanupQueryId())
        .RetryOnTimeout()
        .Params()
            .Uint64("NOW", Now().MilliSeconds())
            .Uint64("BATCH_SIZE", Cfg().GetCleanupBatchSize());

    switch (CleanupType) {
    case ECleanupType::Deduplication:
        builder.Params().String("KEY_RANGE_START", KeyRangeStart);
        break;
    case ECleanupType::Reads:
        builder.Params().Utf8("KEY_RANGE_START", KeyRangeStart);
        break;
    }

    builder.Start();

    RLOG_SQS_DEBUG("Executing cleanup request for queue " << TLogQueueName(QueuePath_));
}

STATEFN(TCleanupActor::StateFunc) {
    switch (ev->GetTypeRewrite()) {
        cFunc(TEvWakeup::EventType, HandleWakeup);
        hFunc(TSqsEvents::TEvExecuted, HandleExecuted);
        hFunc(TEvPoisonPill, HandlePoisonPill);
    }
}

EQueryId TCleanupActor::GetCleanupQueryId() const {
    switch (CleanupType) {
    case ECleanupType::Deduplication:
        return CLEANUP_DEDUPLICATION_ID;
    case ECleanupType::Reads:
        return CLEANUP_READS_ID;
    }
}

} // namespace NKikimr::NSQS