aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAlexander Rutkovsky <alexvru@mail.ru>2022-03-14 18:10:41 +0300
committerAlexander Rutkovsky <alexvru@mail.ru>2022-03-14 18:10:41 +0300
commit831543961b2710a44df149e1d363638bc008d265 (patch)
treee12404d7f20fa9b4915c585e2a587cce1cb033cb
parent443255e68d604b2dcbb97c3bf6cf89db75fbbb4d (diff)
downloadydb-831543961b2710a44df149e1d363638bc008d265.tar.gz
Fix defrag timer bug KIKIMR-14494
ref:ab0112fd5b6f5c7b6e2cad7ee770aef68a99e73c
-rw-r--r--ydb/core/blobstorage/vdisk/defrag/defrag_actor.cpp61
1 files changed, 35 insertions, 26 deletions
diff --git a/ydb/core/blobstorage/vdisk/defrag/defrag_actor.cpp b/ydb/core/blobstorage/vdisk/defrag/defrag_actor.cpp
index f7393d4f9b..758adf5c4e 100644
--- a/ydb/core/blobstorage/vdisk/defrag/defrag_actor.cpp
+++ b/ydb/core/blobstorage/vdisk/defrag/defrag_actor.cpp
@@ -161,7 +161,7 @@ namespace NKikimr {
Y_VERIFY(ev->Sender == PlannerId);
PlannerId = {};
if (ev->Get()->ChunksToDefrag) {
- ctx.Send(ev->Forward(DefragActorId));
+ ctx.Send(new IEventHandle(DefragActorId, SelfId(), ev->ReleaseBase().Release()));
} else {
ctx.Schedule(GeneratePause(), new TEvents::TEvWakeup);
}
@@ -207,17 +207,13 @@ namespace NKikimr {
// Task for database defrag
struct TTask {
- TEvBlobStorage::TEvVDefrag::TPtr Req;
- TEvDefragStartQuantum::TPtr StartQuantumReq;
+ std::variant<TEvBlobStorage::TEvVDefrag::TPtr, TEvDefragStartQuantum::TPtr> Request;
TStat Stat;
bool FirstQuantum = true; // true, if we run a first quantum with this task
- TTask(TEvDefragStartQuantum::TPtr req)
- : StartQuantumReq(req)
- {}
-
- TTask(TEvBlobStorage::TEvVDefrag::TPtr req)
- : Req(req)
+ template<typename T>
+ TTask(T&& req)
+ : Request(std::forward<T>(req))
{}
};
@@ -251,7 +247,15 @@ namespace NKikimr {
InProgress = true;
ActiveActors.Insert(RunInBatchPool(ctx, CreateDefragQuantumActor(DCtx,
GInfo->GetVDiskId(DCtx->VCtx->ShortSelfVDisk),
- task.StartQuantumReq ? std::make_optional(std::move(task.StartQuantumReq->Get()->ChunksToDefrag)) : std::nullopt)));
+ std::visit([](auto& r) { return GetChunksToDefrag(r); }, task.Request))));
+ }
+
+ static std::optional<TChunksToDefrag> GetChunksToDefrag(TEvBlobStorage::TEvVDefrag::TPtr& /*ev*/) {
+ return std::nullopt;
+ }
+
+ static std::optional<TChunksToDefrag> GetChunksToDefrag(TEvDefragStartQuantum::TPtr& ev) {
+ return std::move(ev->Get()->ChunksToDefrag);
}
void Bootstrap(const TActorContext &ctx) {
@@ -281,28 +285,33 @@ namespace NKikimr {
task.Stat.Eof = msg->Stat.Eof;
task.Stat.FreedChunks.insert(task.Stat.FreedChunks.end(), msg->Stat.FreedChunks.begin(), msg->Stat.FreedChunks.end());
- if (msg->Stat.Eof || !task.Req || !task.Req->Get()->Record.GetFull()) {
- if (task.Req) {
- // response to caller
- auto vdisk = task.Req->Get()->Record.GetVDiskID();
- auto reply = std::make_unique<TEvBlobStorage::TEvVDefragResult>(NKikimrProto::OK, vdisk);
- reply->Record.SetFoundChunksToDefrag(task.Stat.FoundChunksToDefrag);
- reply->Record.SetRewrittenRecs(task.Stat.RewrittenRecs);
- reply->Record.SetRewrittenBytes(task.Stat.RewrittenBytes);
- reply->Record.SetEof(task.Stat.Eof);
- for (const auto &x : task.Stat.FreedChunks) {
- reply->Record.MutableFreedChunks()->Add(x.ChunkId);
- }
- ctx.Send(task.Req->Sender, reply.release());
- }
- // and remove the task from active tasks
+ if (std::visit([&](auto& r) { return ProcessQuantumResult(r, task); }, task.Request)) {
WaitQueue.pop_front();
Sublog.Log() << "=== Defrag Finished ===\n";
- } // otherwise continue with the same task
+ }
RunDefragIfAny(ctx);
}
+ bool ProcessQuantumResult(TEvBlobStorage::TEvVDefrag::TPtr& ev, TTask& task) {
+ const auto& record = ev->Get()->Record;
+ auto reply = std::make_unique<TEvBlobStorage::TEvVDefragResult>(NKikimrProto::OK, record.GetVDiskID());
+ reply->Record.SetFoundChunksToDefrag(task.Stat.FoundChunksToDefrag);
+ reply->Record.SetRewrittenRecs(task.Stat.RewrittenRecs);
+ reply->Record.SetRewrittenBytes(task.Stat.RewrittenBytes);
+ reply->Record.SetEof(task.Stat.Eof);
+ for (const auto& x : task.Stat.FreedChunks) {
+ reply->Record.MutableFreedChunks()->Add(x.ChunkId);
+ }
+ Send(ev->Sender, reply.release());
+ return task.Stat.Eof || !record.GetFull();
+ }
+
+ bool ProcessQuantumResult(TEvDefragStartQuantum::TPtr& ev, TTask& /*task*/) {
+ Send(ev->Sender, new TEvBlobStorage::TEvVDefragResult);
+ return true; // this is always final quantum
+ }
+
void Die(const TActorContext& ctx) override {
ActiveActors.KillAndClear(ctx);
TActorBootstrapped::Die(ctx);