1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
|
#include "fifo_cleanup.h"
#include "cfg.h"
#include "log.h"
#include "executor.h"
#include <ydb/public/lib/value/value.h>
#include <ydb/core/base/appdata.h>
#include <ydb/core/ymq/base/debug_info.h>
#include <library/cpp/actors/core/hfunc.h>
#include <util/random/random.h>
namespace NKikimr::NSQS {
TCleanupActor::TCleanupActor(const TQueuePath& queuePath, const TActorId& queueLeader, ECleanupType cleanupType)
: QueuePath_(queuePath)
, RequestId_(CreateGuidAsString())
, QueueLeader_(queueLeader)
, CleanupType(cleanupType)
{
DebugInfo->QueueCleanupActors.emplace(TStringBuilder() << TLogQueueName(QueuePath_), this);
}
TCleanupActor::~TCleanupActor() {
DebugInfo->QueueCleanupActors.EraseKeyValue(TStringBuilder() << TLogQueueName(QueuePath_), this);
}
void TCleanupActor::Bootstrap() {
RLOG_SQS_INFO("Bootstrap cleanup actor for queue " << TLogQueueName(QueuePath_));
Become(&TThis::StateFunc);
Schedule(RandomCleanupPeriod(), new TEvWakeup());
}
TDuration TCleanupActor::RandomCleanupPeriod() {
const ui64 cleanupPeriodMs = Cfg().GetCleanupPeriodMs();
Y_VERIFY(cleanupPeriodMs > 0);
return TDuration::MilliSeconds(cleanupPeriodMs) +
TDuration::MilliSeconds(RandomNumber<ui64>(cleanupPeriodMs / 4));
}
void TCleanupActor::HandleExecuted(TSqsEvents::TEvExecuted::TPtr& ev) {
const auto& record = ev->Get()->Record;
const ui32 status = record.GetStatus();
if (status == TEvTxUserProxy::TEvProposeTransactionStatus::EStatus::ExecComplete) {
using NKikimr::NClient::TValue;
const TValue val(TValue::Create(record.GetExecutionEngineEvaluatedResponse()));
const bool shouldContinue = val["moreData"];
const TValue lastProcessedKey = val["lastProcessedKey"];
if (lastProcessedKey.HaveValue()) {
KeyRangeStart = lastProcessedKey;
}
if (shouldContinue) {
RunCleanupQuery();
} else {
Schedule(RandomCleanupPeriod(), new TEvWakeup());
}
} else {
RLOG_SQS_ERROR("Cleanup query failed. Queue: " << TLogQueueName(QueuePath_));
Schedule(RandomCleanupPeriod(), new TEvWakeup());
}
}
void TCleanupActor::HandlePoisonPill(TEvPoisonPill::TPtr&) {
PassAway();
}
void TCleanupActor::HandleWakeup() {
KeyRangeStart = "";
RunCleanupQuery();
}
void TCleanupActor::RunCleanupQuery() {
TExecutorBuilder builder(SelfId(), RequestId_);
builder
.User(QueuePath_.UserName)
.Queue(QueuePath_.QueueName)
.QueueLeader(QueueLeader_)
.QueryId(GetCleanupQueryId())
.RetryOnTimeout()
.Params()
.Uint64("NOW", Now().MilliSeconds())
.Uint64("BATCH_SIZE", Cfg().GetCleanupBatchSize());
switch (CleanupType) {
case ECleanupType::Deduplication:
builder.Params().String("KEY_RANGE_START", KeyRangeStart);
break;
case ECleanupType::Reads:
builder.Params().Utf8("KEY_RANGE_START", KeyRangeStart);
break;
}
builder.Start();
RLOG_SQS_DEBUG("Executing cleanup request for queue " << TLogQueueName(QueuePath_));
}
STATEFN(TCleanupActor::StateFunc) {
switch (ev->GetTypeRewrite()) {
cFunc(TEvWakeup::EventType, HandleWakeup);
hFunc(TSqsEvents::TEvExecuted, HandleExecuted);
hFunc(TEvPoisonPill, HandlePoisonPill);
}
}
EQueryId TCleanupActor::GetCleanupQueryId() const {
switch (CleanupType) {
case ECleanupType::Deduplication:
return CLEANUP_DEDUPLICATION_ID;
case ECleanupType::Reads:
return CLEANUP_READS_ID;
}
}
} // namespace NKikimr::NSQS
|