diff options
author | Alexander Rutkovsky <alexvru@mail.ru> | 2022-03-14 18:10:41 +0300 |
---|---|---|
committer | Alexander Rutkovsky <alexvru@mail.ru> | 2022-03-14 18:10:41 +0300 |
commit | 831543961b2710a44df149e1d363638bc008d265 (patch) | |
tree | e12404d7f20fa9b4915c585e2a587cce1cb033cb | |
parent | 443255e68d604b2dcbb97c3bf6cf89db75fbbb4d (diff) | |
download | ydb-831543961b2710a44df149e1d363638bc008d265.tar.gz |
Fix defrag timer bug KIKIMR-14494
ref:ab0112fd5b6f5c7b6e2cad7ee770aef68a99e73c
-rw-r--r-- | ydb/core/blobstorage/vdisk/defrag/defrag_actor.cpp | 61 |
1 files changed, 35 insertions, 26 deletions
diff --git a/ydb/core/blobstorage/vdisk/defrag/defrag_actor.cpp b/ydb/core/blobstorage/vdisk/defrag/defrag_actor.cpp index f7393d4f9b..758adf5c4e 100644 --- a/ydb/core/blobstorage/vdisk/defrag/defrag_actor.cpp +++ b/ydb/core/blobstorage/vdisk/defrag/defrag_actor.cpp @@ -161,7 +161,7 @@ namespace NKikimr { Y_VERIFY(ev->Sender == PlannerId); PlannerId = {}; if (ev->Get()->ChunksToDefrag) { - ctx.Send(ev->Forward(DefragActorId)); + ctx.Send(new IEventHandle(DefragActorId, SelfId(), ev->ReleaseBase().Release())); } else { ctx.Schedule(GeneratePause(), new TEvents::TEvWakeup); } @@ -207,17 +207,13 @@ namespace NKikimr { // Task for database defrag struct TTask { - TEvBlobStorage::TEvVDefrag::TPtr Req; - TEvDefragStartQuantum::TPtr StartQuantumReq; + std::variant<TEvBlobStorage::TEvVDefrag::TPtr, TEvDefragStartQuantum::TPtr> Request; TStat Stat; bool FirstQuantum = true; // true, if we run a first quantum with this task - TTask(TEvDefragStartQuantum::TPtr req) - : StartQuantumReq(req) - {} - - TTask(TEvBlobStorage::TEvVDefrag::TPtr req) - : Req(req) + template<typename T> + TTask(T&& req) + : Request(std::forward<T>(req)) {} }; @@ -251,7 +247,15 @@ namespace NKikimr { InProgress = true; ActiveActors.Insert(RunInBatchPool(ctx, CreateDefragQuantumActor(DCtx, GInfo->GetVDiskId(DCtx->VCtx->ShortSelfVDisk), - task.StartQuantumReq ? std::make_optional(std::move(task.StartQuantumReq->Get()->ChunksToDefrag)) : std::nullopt))); + std::visit([](auto& r) { return GetChunksToDefrag(r); }, task.Request)))); + } + + static std::optional<TChunksToDefrag> GetChunksToDefrag(TEvBlobStorage::TEvVDefrag::TPtr& /*ev*/) { + return std::nullopt; + } + + static std::optional<TChunksToDefrag> GetChunksToDefrag(TEvDefragStartQuantum::TPtr& ev) { + return std::move(ev->Get()->ChunksToDefrag); } void Bootstrap(const TActorContext &ctx) { @@ -281,28 +285,33 @@ namespace NKikimr { task.Stat.Eof = msg->Stat.Eof; task.Stat.FreedChunks.insert(task.Stat.FreedChunks.end(), msg->Stat.FreedChunks.begin(), msg->Stat.FreedChunks.end()); - if (msg->Stat.Eof || !task.Req || !task.Req->Get()->Record.GetFull()) { - if (task.Req) { - // response to caller - auto vdisk = task.Req->Get()->Record.GetVDiskID(); - auto reply = std::make_unique<TEvBlobStorage::TEvVDefragResult>(NKikimrProto::OK, vdisk); - reply->Record.SetFoundChunksToDefrag(task.Stat.FoundChunksToDefrag); - reply->Record.SetRewrittenRecs(task.Stat.RewrittenRecs); - reply->Record.SetRewrittenBytes(task.Stat.RewrittenBytes); - reply->Record.SetEof(task.Stat.Eof); - for (const auto &x : task.Stat.FreedChunks) { - reply->Record.MutableFreedChunks()->Add(x.ChunkId); - } - ctx.Send(task.Req->Sender, reply.release()); - } - // and remove the task from active tasks + if (std::visit([&](auto& r) { return ProcessQuantumResult(r, task); }, task.Request)) { WaitQueue.pop_front(); Sublog.Log() << "=== Defrag Finished ===\n"; - } // otherwise continue with the same task + } RunDefragIfAny(ctx); } + bool ProcessQuantumResult(TEvBlobStorage::TEvVDefrag::TPtr& ev, TTask& task) { + const auto& record = ev->Get()->Record; + auto reply = std::make_unique<TEvBlobStorage::TEvVDefragResult>(NKikimrProto::OK, record.GetVDiskID()); + reply->Record.SetFoundChunksToDefrag(task.Stat.FoundChunksToDefrag); + reply->Record.SetRewrittenRecs(task.Stat.RewrittenRecs); + reply->Record.SetRewrittenBytes(task.Stat.RewrittenBytes); + reply->Record.SetEof(task.Stat.Eof); + for (const auto& x : task.Stat.FreedChunks) { + reply->Record.MutableFreedChunks()->Add(x.ChunkId); + } + Send(ev->Sender, reply.release()); + return task.Stat.Eof || !record.GetFull(); + } + + bool ProcessQuantumResult(TEvDefragStartQuantum::TPtr& ev, TTask& /*task*/) { + Send(ev->Sender, new TEvBlobStorage::TEvVDefragResult); + return true; // this is always final quantum + } + void Die(const TActorContext& ctx) override { ActiveActors.KillAndClear(ctx); TActorBootstrapped::Die(ctx); |