diff options
author | Robert Drynkin <robdrynkin@ydb.tech> | 2024-07-12 17:03:49 +0400 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-07-12 17:03:49 +0400 |
commit | a2040b49a51a07727674bcf03ed098cc65afdf22 (patch) | |
tree | b66549ba4d680f3030151bbb93d6ff9e1d63e549 | |
parent | e615634d6e3dd02e6cce3bd0fc8b59739419f48a (diff) | |
download | ydb-a2040b49a51a07727674bcf03ed098cc65afdf22.tar.gz |
Refactor balancing + add batch requests (#5904)
-rw-r--r-- | ydb/core/blobstorage/ut_blobstorage/balancing.cpp | 25 | ||||
-rw-r--r-- | ydb/core/blobstorage/vdisk/balance/balancing_actor.cpp | 201 | ||||
-rw-r--r-- | ydb/core/blobstorage/vdisk/balance/balancing_actor.h | 4 | ||||
-rw-r--r-- | ydb/core/blobstorage/vdisk/balance/defs.h | 29 | ||||
-rw-r--r-- | ydb/core/blobstorage/vdisk/balance/deleter.cpp | 275 | ||||
-rw-r--r-- | ydb/core/blobstorage/vdisk/balance/sender.cpp | 313 | ||||
-rw-r--r-- | ydb/core/blobstorage/vdisk/common/vdisk_mongroups.h | 27 | ||||
-rw-r--r-- | ydb/core/blobstorage/vdisk/common/vdisk_private_events.h | 4 | ||||
-rw-r--r-- | ydb/core/blobstorage/vdisk/ingress/blobstorage_ingress.cpp | 3 | ||||
-rw-r--r-- | ydb/core/blobstorage/vdisk/skeleton/blobstorage_skeleton.cpp | 5 |
10 files changed, 594 insertions, 292 deletions
diff --git a/ydb/core/blobstorage/ut_blobstorage/balancing.cpp b/ydb/core/blobstorage/ut_blobstorage/balancing.cpp index 2937892b59f..b83dfd4bd81 100644 --- a/ydb/core/blobstorage/ut_blobstorage/balancing.cpp +++ b/ydb/core/blobstorage/ut_blobstorage/balancing.cpp @@ -79,7 +79,6 @@ struct TTestEnv { auto SendGet(ui32 step, ui32 dataSize, bool mustRestoreFirst=false) { const TLogoBlobID blobId(1, 1, step, 0, dataSize, 0); - Cerr << "SEND TEvGet with key " << blobId.ToString() << Endl; const TActorId sender = Env.Runtime->AllocateEdgeActor(GroupInfo->GetActorId(*RunningNodes.begin()).NodeId(), __FILE__, __LINE__); auto ev = std::make_unique<TEvBlobStorage::TEvGet>( blobId, @@ -94,7 +93,6 @@ struct TTestEnv { }); TInstant getDeadline = Env.Now() + TDuration::Seconds(30); auto res = Env.WaitForEdgeActorEvent<TEvBlobStorage::TEvGetResult>(sender, /* termOnCapture */ false, getDeadline); - Cerr << "TEvGetResult: " << res->Get()->ToString() << Endl; return res; }; @@ -152,7 +150,7 @@ struct TTestEnv { bool CheckPartsLocations(const TLogoBlobID& blobId) { auto expectedParts = GetExpectedPartsLocations(blobId); auto actualParts = GetActualPartsLocations(blobId); - TString errMsg = ToString(expectedParts) + " != " + ToString(actualParts); + TString errMsg = ToString(expectedParts) + " != " + ToString(actualParts) + " Key: " + blobId.ToString(); UNIT_ASSERT_VALUES_EQUAL_C(expectedParts.size(), actualParts.size(), errMsg); for (ui32 i = 0; i < expectedParts.size(); ++i) { @@ -231,20 +229,20 @@ struct TStopOneNodeTest { Env.SendPut(step, data, NKikimrProto::OK); Env->Sim(TDuration::Seconds(10)); Env.StartNode(nodeIdWithBlob); - Env->Sim(TDuration::Seconds(10)); + Env->Sim(TDuration::Seconds(30)); Cerr << "Start compaction 1" << Endl; for (ui32 pos = 0; pos < Env->Settings.NodeCount; ++pos) { Env->CompactVDisk(Env.GroupInfo->GetActorId(pos)); } - Env->Sim(TDuration::Seconds(10)); + Env->Sim(TDuration::Seconds(30)); Cerr << "Finish compaction 1" << Endl; Cerr << "Start compaction 2" << Endl; for (ui32 pos = 0; pos < Env->Settings.NodeCount; ++pos) { Env->CompactVDisk(Env.GroupInfo->GetActorId(pos)); } - Env->Sim(TDuration::Seconds(10)); + Env->Sim(TDuration::Seconds(30)); Cerr << "Finish compaction 2" << Endl; Env.CheckPartsLocations(MakeLogoBlobId(step, data.size())); @@ -259,7 +257,7 @@ struct TRandomTest { ui32 MaxBlobSize; void RunTest() { - srand(123); + srand(123456); TVector<TString> data(Reserve(NumIters)); TVector<ui32> successfulSteps; @@ -302,7 +300,7 @@ struct TRandomTest { // Wipe random node if (random() % 100 == 1) { ui32 pos = random() % Env->Settings.NodeCount; - if (Env.RunningNodes.contains(pos)) { + if (Env.RunningNodes.contains(pos) && Env.RunningNodes.contains(0)) { Cerr << "Wipe node " << pos << Endl; auto baseConfig = Env->FetchBaseConfig(); const auto& someVSlot = baseConfig.GetVSlot(pos); @@ -321,7 +319,7 @@ struct TRandomTest { for (ui32 pos = 0; pos < Env->Settings.NodeCount; ++pos) { Env.StartNode(pos); } - Env->Sim(TDuration::Seconds(10)); + Env->Sim(TDuration::Seconds(60)); Cerr << "Start compaction 1" << Endl; for (ui32 pos = 0; pos < Env->Settings.NodeCount; ++pos) { @@ -337,9 +335,8 @@ struct TRandomTest { Cerr << "Start checking" << Endl; for (ui32 step: successfulSteps) { - Cerr << "step = " << step << Endl; Env.CheckPartsLocations(MakeLogoBlobId(step, data[step].size())); - UNIT_ASSERT_VALUES_EQUAL(Env.SendGet(step, data[step].size())->Get()->Responses[0].Buffer.ConvertToString(), data[step]); + UNIT_ASSERT_VALUES_EQUAL_C(Env.SendGet(step, data[step].size())->Get()->Responses[0].Buffer.ConvertToString(), data[step], MakeLogoBlobId(step, data[step].size()).ToString()); } } }; @@ -401,21 +398,21 @@ struct TTwoPartsOnOneNodeTest { // start all stopped nodes Env.StartNode(partIdxToNodeId[1]); Env.StartNode(handoffNodeIds[1]); - Env->Sim(TDuration::Seconds(10)); + Env->Sim(TDuration::Seconds(30)); // run compactions Cerr << "Start compaction 1" << Endl; for (ui32 pos = 0; pos < Env->Settings.NodeCount; ++pos) { Env->CompactVDisk(Env.GroupInfo->GetActorId(pos)); } - Env->Sim(TDuration::Seconds(10)); + Env->Sim(TDuration::Seconds(30)); Cerr << "Finish compaction 1" << Endl; Cerr << "Start compaction 2" << Endl; for (ui32 pos = 0; pos < Env->Settings.NodeCount; ++pos) { Env->CompactVDisk(Env.GroupInfo->GetActorId(pos)); } - Env->Sim(TDuration::Seconds(10)); + Env->Sim(TDuration::Seconds(30)); Cerr << "Finish compaction 2" << Endl; Env.CheckPartsLocations(MakeLogoBlobId(step, data.size())); diff --git a/ydb/core/blobstorage/vdisk/balance/balancing_actor.cpp b/ydb/core/blobstorage/vdisk/balance/balancing_actor.cpp index ff14300f9b8..de0c1790347 100644 --- a/ydb/core/blobstorage/vdisk/balance/balancing_actor.cpp +++ b/ydb/core/blobstorage/vdisk/balance/balancing_actor.cpp @@ -9,6 +9,63 @@ namespace NKikimr { namespace NBalancing { + template<class T> + struct TBatchedQueue { + TVector<T> Data; + ui32 CurPos = 0; + const ui32 BatchSize = 0; + + TBatchedQueue(ui32 batchSize) + : BatchSize(batchSize) + {} + + TConstArrayRef<T> GetNextBatch() { + if (Empty()) { + return {}; + } + + ui32 begin = CurPos; + ui32 end = Min(begin + BatchSize, static_cast<ui32>(Data.size())); + CurPos = end; + return TConstArrayRef<T>(Data.data() + begin, end - begin); + } + + bool Empty() const { + return CurPos >= Data.size(); + } + + ui32 Size() const { + return Data.size() - CurPos; + } + }; + + struct TBatchManager { + TActorId SenderId; + TActorId DeleterId; + bool IsSendCompleted = false; + bool IsDeleteCompleted = false; + + void Handle(NActors::TEvents::TEvCompleted::TPtr ev) { + if (ev->Sender == SenderId) { + IsSendCompleted = true; + } else if (ev->Sender == DeleterId) { + IsDeleteCompleted = true; + } else { + STLOG(PRI_WARN, BS_VDISK_BALANCING, BSVB05, "Unexpected id", (Id, ev->Sender)); + } + } + + bool IsBatchCompleted() const { + return IsSendCompleted && IsDeleteCompleted; + } + + TBatchManager() = default; + TBatchManager(IActor* sender, IActor* deleter) + : SenderId(TlsActivationContext->Register(sender)) + , DeleterId(TlsActivationContext->Register(deleter)) + {} + }; + class TBalancingActor : public TActorBootstrapped<TBalancingActor> { private: std::shared_ptr<TBalancingCtx> Ctx; @@ -17,27 +74,24 @@ namespace NBalancing { TQueueActorMapPtr QueueActorMapPtr; THashSet<TVDiskID> ConnectedVDisks; - TQueue<TPartInfo> SendOnMainParts; - TQueue<TLogoBlobID> TryDeleteParts; - - TActorId SenderId; - TActorId DeleterId; - - struct TStats { - bool SendCompleted = false; - ui32 SendPartsLeft = 0; - bool DeleteCompleted = false; - ui32 DeletePartsLeft = 0; - }; - - TStats Stats; + TBatchedQueue<TPartInfo> SendOnMainParts; + TBatchedQueue<TLogoBlobID> TryDeleteParts; + TBatchManager BatchManager; /////////////////////////////////////////////////////////////////////////////////////////// // Main logic /////////////////////////////////////////////////////////////////////////////////////////// void ContinueBalancing() { + if (SendOnMainParts.Empty() && TryDeleteParts.Empty()) { + // no more parts to send or delete + STLOG(PRI_DEBUG, BS_VDISK_BALANCING, BSVB03, VDISKP(Ctx->VCtx, "Balancing completed")); + Send(Ctx->SkeletonId, new TEvStartBalancing()); + PassAway(); + return; + } + // ask for repl token to continue balancing STLOG(PRI_DEBUG, BS_VDISK_BALANCING, BSVB01, VDISKP(Ctx->VCtx, "Ask repl token to continue balancing")); Send(MakeBlobStorageReplBrokerID(), new TEvQueryReplToken(Ctx->VDiskCfg->BaseInfo.PDiskId), NActors::IEventHandle::FlagTrackDelivery); @@ -46,55 +100,23 @@ namespace NBalancing { void ScheduleJobQuant() { // once repl token received, start balancing - waking up sender and deleter STLOG(PRI_DEBUG, BS_VDISK_BALANCING, BSVB02, VDISKP(Ctx->VCtx, "Schedule job quant"), - (SendPartsLeft, Stats.SendPartsLeft), (DeletePartsLeft, Stats.DeletePartsLeft), + (SendPartsLeft, SendOnMainParts.Size()), (DeletePartsLeft, TryDeleteParts.Size()), (ConnectedVDisks, ConnectedVDisks.size()), (TotalVDisks, GInfo->GetTotalVDisksNum())); - Send(SenderId, new NActors::TEvents::TEvWakeup()); - Send(DeleterId, new NActors::TEvents::TEvWakeup()); - } - - void Handle(NActors::TEvents::TEvCompleted::TPtr ev) { - // sender or deleter completed job quant - switch (ev->Get()->Id) { - case SENDER_ID: - Stats.SendCompleted = true; - Stats.SendPartsLeft = ev->Get()->Status; - break; - case DELETER_ID: - Stats.DeleteCompleted = true; - Stats.DeletePartsLeft = ev->Get()->Status; - break; - default: - Y_ABORT("Unexpected id"); - } - - if (Stats.SendCompleted && Stats.DeleteCompleted) { - // balancing job quant completed, release token - Send(MakeBlobStorageReplBrokerID(), new TEvReleaseReplToken); - - if (Stats.SendPartsLeft != 0 || Stats.DeletePartsLeft != 0) { - // sender or deleter has not finished yet - Stats.SendCompleted = Stats.SendPartsLeft == 0; - Stats.DeleteCompleted = Stats.DeletePartsLeft == 0; - ContinueBalancing(); - return; - } - // balancing completed - STLOG(PRI_DEBUG, BS_VDISK_BALANCING, BSVB03, VDISKP(Ctx->VCtx, "Balancing completed")); - Send(Ctx->SkeletonId, new TEvStartBalancing()); - PassAway(); - } + // register sender and deleter actors + BatchManager = TBatchManager( + CreateSenderActor(SelfId(), SendOnMainParts.GetNextBatch(), QueueActorMapPtr, Ctx), + CreateDeleterActor(SelfId(), TryDeleteParts.GetNextBatch(), QueueActorMapPtr, Ctx) + ); } - constexpr static TDuration JOB_GRANULARITY = TDuration::MilliSeconds(1); - void CollectKeys() { THPTimer timer; for (ui32 cnt = 0; It.Valid(); It.Next(), ++cnt) { - if (cnt % 1000 == 999 && TDuration::Seconds(timer.Passed()) > JOB_GRANULARITY) { + if (cnt % 100 == 99 && TDuration::Seconds(timer.Passed()) > JOB_GRANULARITY) { // actor should not block the thread for a long time, so we should yield - STLOG(PRI_DEBUG, BS_VDISK_BALANCING, BSVB04, VDISKP(Ctx->VCtx, "Collect keys"), (collected, cnt)); + STLOG(PRI_DEBUG, BS_VDISK_BALANCING, BSVB04, VDISKP(Ctx->VCtx, "Collect keys"), (collected, cnt), (passed, timer.Passed())); Send(SelfId(), new NActors::TEvents::TEvWakeup()); return; } @@ -106,42 +128,48 @@ namespace NBalancing { It.PutToMerger(&merger); auto [moveMask, delMask] = merger.Ingress.HandoffParts(&top, Ctx->VCtx->ShortSelfVDisk, key); - auto partsToSend = merger.Ingress.LocalParts(top.GType) & moveMask; - auto partsToDelete = merger.Ingress.LocalParts(top.GType) & delMask; - - // collect parts to send on main - for (const auto& [parts, data]: merger.Parts) { - if (!(partsToSend & parts).Empty()) { - SendOnMainParts.push(TPartInfo{ - .Key=It.GetCurKey().LogoBlobID(), - .PartsMask=parts, - .PartData=data - }); + + if (auto partsToSend = merger.Ingress.LocalParts(top.GType) & moveMask; !partsToSend.Empty()) { + // collect parts to send on main + for (const auto& [parts, data]: merger.Parts) { + if (!(partsToSend & parts).Empty()) { + SendOnMainParts.Data.emplace_back(TPartInfo{ + .Key=It.GetCurKey().LogoBlobID(), + .PartsMask=parts, + .PartData=data + }); + } } } - // collect parts to delete - for (ui8 partIdx = partsToDelete.FirstPosition(); partIdx < partsToDelete.GetSize(); partIdx = partsToDelete.NextPosition(partIdx)) { - TryDeleteParts.push(TLogoBlobID(It.GetCurKey().LogoBlobID(), partIdx + 1)); - STLOG(PRI_DEBUG, BS_VDISK_BALANCING, BSVB07, VDISKP(Ctx->VCtx, "Delete"), (LogoBlobId, TryDeleteParts.back().ToString())); + if (auto partsToDelete = merger.Ingress.LocalParts(top.GType) & delMask; !partsToDelete.Empty()) { + // collect parts to delete + for (ui8 partIdx = partsToDelete.FirstPosition(); partIdx < partsToDelete.GetSize(); partIdx = partsToDelete.NextPosition(partIdx)) { + TryDeleteParts.Data.emplace_back(TLogoBlobID(It.GetCurKey().LogoBlobID(), partIdx + 1)); + STLOG(PRI_DEBUG, BS_VDISK_BALANCING, BSVB10, VDISKP(Ctx->VCtx, "Delete"), (LogoBlobId, TryDeleteParts.Data.back().ToString())); + } } merger.Clear(); } STLOG(PRI_DEBUG, BS_VDISK_BALANCING, BSVB08, VDISKP(Ctx->VCtx, "Keys collected"), - (SendOnMainParts, SendOnMainParts.size()), (TryDeleteParts, TryDeleteParts.size())); - Ctx->MonGroup.PlannedToSendOnMain() = SendOnMainParts.size(); - Ctx->MonGroup.CandidatesToDelete() = TryDeleteParts.size(); - - // register sender and deleter actors - SenderId = TlsActivationContext->Register(CreateSenderActor(SelfId(), std::move(SendOnMainParts), QueueActorMapPtr, Ctx)); - DeleterId = TlsActivationContext->Register(CreateDeleterActor(SelfId(), std::move(TryDeleteParts), QueueActorMapPtr, Ctx)); + (SendOnMainParts, SendOnMainParts.Data.size()), (TryDeleteParts, TryDeleteParts.Data.size())); + Ctx->MonGroup.PlannedToSendOnMain() = SendOnMainParts.Data.size(); + Ctx->MonGroup.CandidatesToDelete() = TryDeleteParts.Data.size(); // start balancing ContinueBalancing(); } + void Handle(NActors::TEvents::TEvCompleted::TPtr ev) { + BatchManager.Handle(ev); + if (BatchManager.IsBatchCompleted()) { + Send(MakeBlobStorageReplBrokerID(), new TEvReleaseReplToken); + + ContinueBalancing(); + } + } /////////////////////////////////////////////////////////////////////////////////////////// // Helper functions @@ -162,7 +190,7 @@ namespace NBalancing { void Handle(NActors::TEvents::TEvUndelivered::TPtr ev) { if (ev.Get()->Type == TEvReplToken::EventType) { - STLOG(PRI_WARN, BS_VDISK_BALANCING, BSVB09, VDISKP(Ctx->VCtx, "Aske repl token msg not delivered")); + STLOG(PRI_WARN, BS_VDISK_BALANCING, BSVB06, VDISKP(Ctx->VCtx, "Ask repl token msg not delivered")); ScheduleJobQuant(); } } @@ -170,8 +198,10 @@ namespace NBalancing { void Handle(TEvProxyQueueState::TPtr ev) { const TVDiskID& vdiskId = ev->Get()->VDiskId; if (ev->Get()->IsConnected) { + STLOG(PRI_DEBUG, BS_VDISK_BALANCING, BSVB07, VDISKP(Ctx->VCtx, "VDisk connected"), (VDiskId, vdiskId.ToString())); ConnectedVDisks.insert(vdiskId); } else { + STLOG(PRI_DEBUG, BS_VDISK_BALANCING, BSVB09, VDISKP(Ctx->VCtx, "VDisk disconnected"), (VDiskId, vdiskId.ToString())); ConnectedVDisks.erase(vdiskId); } } @@ -184,13 +214,13 @@ namespace NBalancing { } GInfo = msg->NewInfo; - Send(SenderId, msg->Clone()); - Send(DeleterId, msg->Clone()); + Send(BatchManager.SenderId, msg->Clone()); + Send(BatchManager.DeleterId, msg->Clone()); } void PassAway() override { - Send(SenderId, new NActors::TEvents::TEvPoison); - Send(DeleterId, new NActors::TEvents::TEvPoison); + Send(BatchManager.SenderId, new NActors::TEvents::TEvPoison); + Send(BatchManager.DeleterId, new NActors::TEvents::TEvPoison); for (const auto& kv : *QueueActorMapPtr) { Send(kv.second, new TEvents::TEvPoison); } @@ -198,10 +228,12 @@ namespace NBalancing { } STRICT_STFUNC(StateFunc, + // Logic events cFunc(NActors::TEvents::TEvWakeup::EventType, CollectKeys) cFunc(TEvReplToken::EventType, ScheduleJobQuant) hFunc(NActors::TEvents::TEvCompleted, Handle) + // System events hFunc(NActors::TEvents::TEvUndelivered, Handle) cFunc(NActors::TEvents::TEvPoison::EventType, PassAway) @@ -216,14 +248,17 @@ namespace NBalancing { , Ctx(ctx) , GInfo(ctx->GInfo) , It(Ctx->Snap.HullCtx, &Ctx->Snap.LogoBlobsSnap) + , SendOnMainParts(BATCH_SIZE) + , TryDeleteParts(BATCH_SIZE) { } void Bootstrap() { + Become(&TThis::StateFunc); CreateVDisksQueues(); It.SeekToFirst(); - Become(&TThis::StateFunc); - Schedule(TDuration::Seconds(1), new NActors::TEvents::TEvWakeup()); + ++Ctx->MonGroup.BalancingIterations(); + Schedule(TDuration::Seconds(10), new NActors::TEvents::TEvWakeup()); } }; diff --git a/ydb/core/blobstorage/vdisk/balance/balancing_actor.h b/ydb/core/blobstorage/vdisk/balance/balancing_actor.h index f96debd8ee7..9e35865a537 100644 --- a/ydb/core/blobstorage/vdisk/balance/balancing_actor.h +++ b/ydb/core/blobstorage/vdisk/balance/balancing_actor.h @@ -10,13 +10,13 @@ namespace NKikimr { namespace NBalancing { IActor* CreateSenderActor( TActorId notifyId, - TQueue<TPartInfo> parts, + TConstArrayRef<TPartInfo> parts, TQueueActorMapPtr queueActorMapPtr, std::shared_ptr<TBalancingCtx> ctx ); IActor* CreateDeleterActor( TActorId notifyId, - TQueue<TLogoBlobID> parts, + TConstArrayRef<TLogoBlobID> parts, TQueueActorMapPtr queueActorMapPtr, std::shared_ptr<TBalancingCtx> ctx ); diff --git a/ydb/core/blobstorage/vdisk/balance/defs.h b/ydb/core/blobstorage/vdisk/balance/defs.h index 45445dd8028..6206e24565c 100644 --- a/ydb/core/blobstorage/vdisk/balance/defs.h +++ b/ydb/core/blobstorage/vdisk/balance/defs.h @@ -3,6 +3,7 @@ #include <ydb/core/blobstorage/vdisk/common/vdisk_context.h> #include <ydb/core/blobstorage/vdisk/common/vdisk_pdiskctx.h> #include <ydb/core/blobstorage/vdisk/hulldb/hull_ds_all_snap.h> +#include <ydb/core/blobstorage/vdisk/common/vdisk_hugeblobctx.h> #include <ydb/core/blobstorage/vdisk/common/vdisk_hulllogctx.h> @@ -10,6 +11,7 @@ namespace NKikimr { struct TBalancingCtx { TIntrusivePtr<TVDiskContext> VCtx; TPDiskCtxPtr PDiskCtx; + THugeBlobCtxPtr HugeBlobCtx; TActorId SkeletonId; NMonGroup::TBalancingGroup MonGroup; @@ -18,21 +20,27 @@ namespace NKikimr { TIntrusivePtr<TVDiskConfig> VDiskCfg; TIntrusivePtr<TBlobStorageGroupInfo> GInfo; + ui32 MinREALHugeBlobInBytes; + TBalancingCtx( TIntrusivePtr<TVDiskContext> vCtx, TPDiskCtxPtr pDiskCtx, + THugeBlobCtxPtr hugeBlobCtx, TActorId skeletonId, NKikimr::THullDsSnap snap, TIntrusivePtr<TVDiskConfig> vDiskCfg, - TIntrusivePtr<TBlobStorageGroupInfo> gInfo + TIntrusivePtr<TBlobStorageGroupInfo> gInfo, + ui32 minREALHugeBlobInBytes ) : VCtx(std::move(vCtx)) , PDiskCtx(std::move(pDiskCtx)) + , HugeBlobCtx(std::move(hugeBlobCtx)) , SkeletonId(skeletonId) , MonGroup(VCtx->VDiskCounters, "subsystem", "balancing") , Snap(std::move(snap)) , VDiskCfg(std::move(vDiskCfg)) , GInfo(std::move(gInfo)) + , MinREALHugeBlobInBytes(minREALHugeBlobInBytes) { } }; @@ -47,7 +55,22 @@ namespace NBalancing { std::variant<TDiskPart, TRope> PartData; }; - constexpr ui32 SENDER_ID = 0; - constexpr ui32 DELETER_ID = 1; + static constexpr ui32 SENDER_ID = 0; + static constexpr ui32 DELETER_ID = 1; + + static constexpr TDuration JOB_GRANULARITY = TDuration::MilliSeconds(1); + + static constexpr TDuration READ_BATCH_TIMEOUT = TDuration::Seconds(10); + static constexpr TDuration SEND_BATCH_TIMEOUT = TDuration::Seconds(10); + static constexpr TDuration REQUEST_BLOBS_ON_MAIN_BATCH_TIMEOUT = TDuration::Seconds(10); + static constexpr TDuration DELETE_BATCH_TIMEOUT = TDuration::Seconds(10); + + static constexpr ui64 READ_TIMEOUT_TAG = 0; + static constexpr ui64 SEND_TIMEOUT_TAG = 1; + static constexpr ui64 REQUEST_TIMEOUT_TAG = 2; + static constexpr ui64 DELETE_TIMEOUT_TAG = 3; + + static constexpr ui32 BATCH_SIZE = 32; + } // NBalancing } // NKikimr diff --git a/ydb/core/blobstorage/vdisk/balance/deleter.cpp b/ydb/core/blobstorage/vdisk/balance/deleter.cpp index 8b842cfcf99..e189f3c6988 100644 --- a/ydb/core/blobstorage/vdisk/balance/deleter.cpp +++ b/ydb/core/blobstorage/vdisk/balance/deleter.cpp @@ -21,35 +21,33 @@ namespace { class TPartsRequester { private: const TActorId NotifyId; - const size_t BatchSize; - TQueue<TLogoBlobID> Parts; + TConstArrayRef<TLogoBlobID> Parts; TReplQuoter::TPtr Quoter; TIntrusivePtr<TBlobStorageGroupInfo> GInfo; TQueueActorMapPtr QueueActorMapPtr; + NMonGroup::TBalancingGroup& MonGroup; TVector<TPartOnMain> Result; - ui32 Responses; - ui32 ExpectedResponses; + ui32 RequestsSent = 0; + ui32 Responses = 0; public: - TPartsRequester(TActorId notifyId, size_t batchSize, TQueue<TLogoBlobID> parts, TReplQuoter::TPtr quoter, TIntrusivePtr<TBlobStorageGroupInfo> gInfo, TQueueActorMapPtr queueActorMapPtr) + TPartsRequester(TActorId notifyId, TConstArrayRef<TLogoBlobID> parts, TReplQuoter::TPtr quoter, TIntrusivePtr<TBlobStorageGroupInfo> gInfo, TQueueActorMapPtr queueActorMapPtr, NMonGroup::TBalancingGroup& monGroup) : NotifyId(notifyId) - , BatchSize(batchSize) - , Parts(std::move(parts)) + , Parts(parts) , Quoter(quoter) , GInfo(gInfo) , QueueActorMapPtr(queueActorMapPtr) - , Result(Reserve(BatchSize)) - , Responses(0) - , ExpectedResponses(0) - {} + , MonGroup(monGroup) + , Result(parts.size()) + { + } + + void SendRequestsToCheckPartsOnMain(const TActorId& selfId) { + THashMap<TVDiskID, std::unique_ptr<TEvBlobStorage::TEvVGet>> vDiskToQueries; - void ScheduleJobQuant(const TActorId& selfId) { - Result.resize(Min(Parts.size(), BatchSize)); - ExpectedResponses = 0; - for (ui64 i = 0; i < BatchSize && !Parts.empty(); ++i) { - auto key = Parts.front(); - Parts.pop(); + for (ui64 i = 0; i < Parts.size(); ++i) { + auto key = Parts[i]; Result[i] = TPartOnMain{ .Key=key, .HasOnMain=false @@ -57,29 +55,27 @@ namespace { auto vDiskId = GetMainReplicaVDiskId(*GInfo, key); - // query which would tell us which parts are realy on main (not by ingress) - auto ev = TEvBlobStorage::TEvVGet::CreateExtremeIndexQuery( - vDiskId, TInstant::Max(), NKikimrBlobStorage::EGetHandleClass::AsyncRead, - TEvBlobStorage::TEvVGet::EFlags::None, i, - {{key.FullID(), 0, 0}} - ); + auto& ev = vDiskToQueries[vDiskId]; + if (!ev) { + ev = TEvBlobStorage::TEvVGet::CreateExtremeIndexQuery( + vDiskId, TInstant::Max(), NKikimrBlobStorage::EGetHandleClass::AsyncRead, + TEvBlobStorage::TEvVGet::EFlags::None, 0 + ); + } + + ev->AddExtremeQuery(key.FullID(), 0, 0, &i); + ++MonGroup.CandidatesToDeleteAskedFromMain(); + } + + for (auto& [vDiskId, ev]: vDiskToQueries) { ui32 msgSize = ev->CalculateSerializedSize(); TReplQuoter::QuoteMessage( Quoter, std::make_unique<IEventHandle>(QueueActorMapPtr->at(TVDiskIdShort(vDiskId)), selfId, ev.release()), msgSize ); - ++ExpectedResponses; - } - } - - std::pair<std::optional<TVector<TPartOnMain>>, ui32> TryGetResults() { - if (ExpectedResponses == Responses) { - ExpectedResponses = 0; - Responses = 0; - return {std::move(Result), Parts.size()}; + ++RequestsSent; } - return {std::nullopt, Parts.size()}; } void Handle(TEvBlobStorage::TEvVGetResult::TPtr ev) { @@ -88,14 +84,81 @@ namespace { if (msg.GetStatus() != NKikimrProto::EReplyStatus::OK) { return; } - ui64 i = msg.GetCookie(); - auto res = msg.GetResult().at(0); - for (ui32 partId: res.GetParts()) { - if (partId == Result[i].Key.PartId()) { - Result[i].HasOnMain = true; + + for (const auto& res: msg.GetResult()) { + auto i = res.GetCookie(); + Y_DEBUG_ABORT_UNLESS(i < Result.size()); + for (ui32 partId: res.GetParts()) { + if (partId == Result[i].Key.PartId()) { + Result[i].HasOnMain = true; + } } + ++MonGroup.CandidatesToDeleteAskedFromMainResponse(); } } + + ui32 GetPartsSize() const { + return Parts.size(); + } + + bool IsDone() const { + return Responses == RequestsSent; + } + + const TVector<TPartOnMain>& GetResult() const { + return Result; + } + }; + + class TPartsDeleter { + private: + std::shared_ptr<TBalancingCtx> Ctx; + TIntrusivePtr<TBlobStorageGroupInfo> GInfo; + ui32 OrderId = 0; + + ui32 RequestsSent = 0; + ui32 Responses = 0; + public: + TPartsDeleter(std::shared_ptr<TBalancingCtx> ctx, TIntrusivePtr<TBlobStorageGroupInfo> gInfo) + : Ctx(ctx) + , GInfo(gInfo) + {} + + void DeleteParts(TActorId selfId, const TVector<TPartOnMain>& parts) { + for (const auto& part: parts) { + if (part.HasOnMain) { + DeleteLocal(selfId, part.Key); + } + } + } + + void DeleteLocal(TActorId selfId, const TLogoBlobID& key) { + TLogoBlobID keyWithoutPartId(key, 0); + + TIngress ingress; + ingress.DeleteHandoff(&GInfo->GetTopology(), Ctx->VCtx->ShortSelfVDisk, key); + + STLOG(PRI_DEBUG, BS_VDISK_BALANCING, BSVB20, VDISKP(Ctx->VCtx, "Deleting local"), (LogoBlobID, key.ToString()), + (Ingress, ingress.ToString(&GInfo->GetTopology(), Ctx->VCtx->ShortSelfVDisk, keyWithoutPartId))); + + TlsActivationContext->Send( + new IEventHandle(Ctx->SkeletonId, selfId, new TEvDelLogoBlobDataSyncLog(keyWithoutPartId, ingress, OrderId++))); + + ++Ctx->MonGroup.MarkedReadyToDelete(); + Ctx->MonGroup.MarkedReadyToDeleteBytes() += GInfo->GetTopology().GType.PartSize(key); + ++RequestsSent; + } + + void Handle(TEvDelLogoBlobDataSyncLogResult::TPtr ev) { + ++Responses; + ++Ctx->MonGroup.MarkedReadyToDeleteResponse(); + Ctx->MonGroup.MarkedReadyToDeleteWithResponseBytes() += GInfo->GetTopology().GType.PartSize(ev->Get()->Id); + STLOG(PRI_DEBUG, BS_VDISK_BALANCING, BSVB21, VDISKP(Ctx->VCtx, "Deleted local"), (LogoBlobID, ev->Get()->Id)); + } + + bool IsDone() const { + return Responses == RequestsSent; + } }; @@ -104,56 +167,92 @@ namespace { std::shared_ptr<TBalancingCtx> Ctx; TIntrusivePtr<TBlobStorageGroupInfo> GInfo; TPartsRequester PartsRequester; - ui32 OrderId = 0; + TPartsDeleter PartsDeleter; + + /////////////////////////////////////////////////////////////////////////////////////////// + // RequestState + /////////////////////////////////////////////////////////////////////////////////////////// + + void SendRequestsToCheckPartsOnMain() { + Become(&TThis::RequestState); - struct TStats { - ui32 PartsRequested = 0; - ui32 PartsDecidedToDelete = 0; - ui32 PartsMarkedDeleted = 0; - }; - TStats Stats; + STLOG(PRI_INFO, BS_VDISK_BALANCING, BSVB22, VDISKP(Ctx->VCtx, "SendRequestsToCheckPartsOnMain"), (Parts, PartsRequester.GetPartsSize())); + + if (PartsRequester.GetPartsSize() == 0) { + STLOG(PRI_DEBUG, BS_VDISK_BALANCING, BSVB23, VDISKP(Ctx->VCtx, "Nothing to request. PassAway")); + PassAway(); + return; + } - void ScheduleJobQuant() { - PartsRequester.ScheduleJobQuant(SelfId()); - TryProcessResults(); + PartsRequester.SendRequestsToCheckPartsOnMain(SelfId()); + + Schedule(TDuration::Seconds(15), new NActors::TEvents::TEvWakeup(REQUEST_TIMEOUT_TAG)); // read timeout } void Handle(TEvBlobStorage::TEvVGetResult::TPtr ev) { PartsRequester.Handle(ev); - TryProcessResults(); + if (PartsRequester.IsDone()) { + DeleteLocalParts(); + } } - void TryProcessResults() { - if (auto [batch, partsLeft] = PartsRequester.TryGetResults(); batch.has_value()) { - Stats.PartsRequested += batch->size(); - for (auto& part: *batch) { - if (part.HasOnMain) { - ++Stats.PartsDecidedToDelete; - DeleteLocal(part.Key); - } - } - Send(NotifyId, new NActors::TEvents::TEvCompleted(DELETER_ID, partsLeft)); - if (partsLeft == 0) { - PassAway(); - } + void TimeoutRequest(NActors::TEvents::TEvWakeup::TPtr ev) { + if (ev->Get()->Tag != REQUEST_TIMEOUT_TAG) { + return; } + STLOG(PRI_DEBUG, BS_VDISK_BALANCING, BSVB24, VDISKP(Ctx->VCtx, "SendRequestsToCheckPartsOnMain timeout")); + DeleteLocalParts(); } - void DeleteLocal(const TLogoBlobID& key) { - TLogoBlobID keyWithoutPartId(key, 0); + STRICT_STFUNC(RequestState, + hFunc(TEvBlobStorage::TEvVGetResult, Handle) + hFunc(NActors::TEvents::TEvWakeup, TimeoutRequest) - TIngress ingress; - ingress.DeleteHandoff(&GInfo->GetTopology(), Ctx->VCtx->ShortSelfVDisk, key); + hFunc(TEvVGenerationChange, Handle) + cFunc(NActors::TEvents::TEvPoison::EventType, PassAway) + ); - STLOG(PRI_DEBUG, BS_VDISK_BALANCING, BSVB10, VDISKP(Ctx->VCtx, "Deleting local"), (LogoBlobID, key.ToString()), - (Ingress, ingress.ToString(&GInfo->GetTopology(), Ctx->VCtx->ShortSelfVDisk, keyWithoutPartId))); - Send(Ctx->SkeletonId, new TEvDelLogoBlobDataSyncLog(keyWithoutPartId, ingress, OrderId++)); + /////////////////////////////////////////////////////////////////////////////////////////// + // DeleteState + /////////////////////////////////////////////////////////////////////////////////////////// + + void DeleteLocalParts() { + Become(&TThis::DeleteState); + + if (PartsRequester.GetResult().empty()) { + STLOG(PRI_DEBUG, BS_VDISK_BALANCING, BSVB25, VDISKP(Ctx->VCtx, "Nothing to delete. PassAway")); + PassAway(); + return; + } + + { + ui32 partsOnMain = 0; + for (const auto& part: PartsRequester.GetResult()) { + partsOnMain += part.HasOnMain; + } + STLOG(PRI_INFO, BS_VDISK_BALANCING, BSVB26, VDISKP(Ctx->VCtx, "DeleteLocalParts"), (Parts, PartsRequester.GetResult().size()), (PartsOnMain, partsOnMain)); + } + + PartsDeleter.DeleteParts(SelfId(), PartsRequester.GetResult()); + + Schedule(TDuration::Seconds(15), new NActors::TEvents::TEvWakeup(DELETE_TIMEOUT_TAG)); // delete timeout } - void Handle(TEvDelLogoBlobDataSyncLogResult::TPtr ev) { - Y_VERIFY(ev->Get()->OrderId == Stats.PartsMarkedDeleted++); - ++Ctx->MonGroup.MarkedReadyToDelete(); + void HandleDelLogoBlobResult(TEvDelLogoBlobDataSyncLogResult::TPtr ev) { + PartsDeleter.Handle(ev); + if (PartsDeleter.IsDone()) { + STLOG(PRI_INFO, BS_VDISK_BALANCING, BSVB27, VDISKP(Ctx->VCtx, "DeleteLocalParts done")); + PassAway(); + } + } + + void TimeoutDelete(NActors::TEvents::TEvWakeup::TPtr ev) { + if (ev->Get()->Tag != DELETE_TIMEOUT_TAG) { + return; + } + STLOG(PRI_INFO, BS_VDISK_BALANCING, BSVB28, VDISKP(Ctx->VCtx, "DeleteLocalParts timeout")); + PassAway(); } void PassAway() override { @@ -161,47 +260,51 @@ namespace { TActorBootstrapped::PassAway(); } - void Handle(TEvVGenerationChange::TPtr ev) { - GInfo = ev->Get()->NewInfo; - } - - STRICT_STFUNC(StateFunc, - cFunc(NActors::TEvents::TEvWakeup::EventType, ScheduleJobQuant) - hFunc(TEvBlobStorage::TEvVGetResult, Handle) - hFunc(TEvDelLogoBlobDataSyncLogResult, Handle) - cFunc(NActors::TEvents::TEvPoison::EventType, PassAway) + STRICT_STFUNC(DeleteState, + hFunc(TEvDelLogoBlobDataSyncLogResult, HandleDelLogoBlobResult) + hFunc(NActors::TEvents::TEvWakeup, TimeoutDelete) hFunc(TEvVGenerationChange, Handle) + cFunc(NActors::TEvents::TEvPoison::EventType, PassAway) ); + /////////////////////////////////////////////////////////////////////////////////////////// + // Helper functions + /////////////////////////////////////////////////////////////////////////////////////////// + + void Handle(TEvVGenerationChange::TPtr ev) { + GInfo = ev->Get()->NewInfo; + } + public: TDeleter() = default; TDeleter( TActorId notifyId, - TQueue<TLogoBlobID> parts, + TConstArrayRef<TLogoBlobID> parts, TQueueActorMapPtr queueActorMapPtr, std::shared_ptr<TBalancingCtx> ctx ) : NotifyId(notifyId) , Ctx(ctx) , GInfo(ctx->GInfo) - , PartsRequester(SelfId(), 32, std::move(parts), Ctx->VCtx->ReplNodeRequestQuoter, GInfo, queueActorMapPtr) + , PartsRequester(SelfId(), parts, Ctx->VCtx->ReplNodeRequestQuoter, GInfo, queueActorMapPtr, Ctx->MonGroup) + , PartsDeleter(ctx, GInfo) { } void Bootstrap() { - Become(&TThis::StateFunc); + SendRequestsToCheckPartsOnMain(); } }; } IActor* CreateDeleterActor( TActorId notifyId, - TQueue<TLogoBlobID> parts, + TConstArrayRef<TLogoBlobID> parts, TQueueActorMapPtr queueActorMapPtr, std::shared_ptr<TBalancingCtx> ctx ) { - return new TDeleter(notifyId, std::move(parts), queueActorMapPtr, ctx); + return new TDeleter(notifyId, parts, queueActorMapPtr, ctx); } } // NBalancing diff --git a/ydb/core/blobstorage/vdisk/balance/sender.cpp b/ydb/core/blobstorage/vdisk/balance/sender.cpp index c68b5509433..35bdfcdb3ec 100644 --- a/ydb/core/blobstorage/vdisk/balance/sender.cpp +++ b/ydb/core/blobstorage/vdisk/balance/sender.cpp @@ -18,38 +18,29 @@ namespace { class TReader { private: - const size_t BatchSize; TPDiskCtxPtr PDiskCtx; - TQueue<TPartInfo> Parts; + TConstArrayRef<TPartInfo> Parts; TReplQuoter::TPtr Quoter; const TBlobStorageGroupType GType; + NMonGroup::TBalancingGroup& MonGroup; TVector<TPart> Result; - ui32 Responses; - ui32 ExpectedResponses; + ui32 Responses = 0; public: - TReader(size_t batchSize, TPDiskCtxPtr pDiskCtx, TQueue<TPartInfo> parts, TReplQuoter::TPtr replPDiskReadQuoter, TBlobStorageGroupType gType) - : BatchSize(batchSize) - , PDiskCtx(pDiskCtx) - , Parts(std::move(parts)) + TReader(TPDiskCtxPtr pDiskCtx, TConstArrayRef<TPartInfo> parts, TReplQuoter::TPtr replPDiskReadQuoter, TBlobStorageGroupType gType, NMonGroup::TBalancingGroup& monGroup) + : PDiskCtx(pDiskCtx) + , Parts(parts) , Quoter(replPDiskReadQuoter) , GType(gType) - , Result(Reserve(BatchSize)) - , Responses(0) - , ExpectedResponses(0) + , MonGroup(monGroup) + , Result(parts.size()) {} - void ScheduleJobQuant(const TActorId& selfId) { - Result.resize(Min(Parts.size(), BatchSize)); - ExpectedResponses = 0; - for (ui64 i = 0; i < BatchSize && !Parts.empty(); ++i) { - auto item = Parts.front(); - Parts.pop(); - Result[i] = TPart{ - .Key=item.Key, - .PartsMask=item.PartsMask, - }; + void SendReadRequests(const TActorId& selfId) { + for (ui32 i = 0; i < Parts.size(); ++i) { + const auto& item = Parts[i]; + Result[i] = TPart{.Key=item.Key, .PartsMask=item.PartsMask}; std::visit(TOverloaded{ [&](const TRope& data) { // part is already in memory, no need to read it from disk @@ -73,21 +64,12 @@ namespace { std::make_unique<IEventHandle>(PDiskCtx->PDiskId, selfId, ev.release()), diskPart.Size ); + MonGroup.ReadFromHandoffBytes() += diskPart.Size; } }, item.PartData); - ++ExpectedResponses; } } - std::pair<std::optional<TVector<TPart>>, ui32> TryGetResults() { - if (ExpectedResponses == Responses) { - ExpectedResponses = 0; - Responses = 0; - return {std::move(Result), Parts.size()}; - } - return {std::nullopt, Parts.size()}; - } - void Handle(NPDisk::TEvChunkReadResult::TPtr ev) { ++Responses; auto *msg = ev->Get(); @@ -99,15 +81,138 @@ namespace { auto data = TRope(msg->Data.ToString()); auto localParts = Result[i].PartsMask; auto diskBlob = TDiskBlob(&data, localParts, GType, key); + ui32 readSize = 0; for (ui8 partIdx = localParts.FirstPosition(); partIdx < localParts.GetSize(); partIdx = localParts.NextPosition(partIdx)) { TRope result; result = diskBlob.GetPart(partIdx, &result); + readSize += result.size(); Result[i].PartsData.emplace_back(std::move(result)); } + + MonGroup.ReadFromHandoffResponseBytes() += readSize; + } + + ui32 GetPartsSize() const { + return Parts.size(); + } + + ui32 GetResponses() const { + return Responses; + } + + bool IsDone() const { + return Responses == Parts.size(); + } + + TVector<TPart>& GetResult() { + return Result; } }; + class TPartsSender { + private: + std::shared_ptr<TBalancingCtx> Ctx; + TIntrusivePtr<TBlobStorageGroupInfo> GInfo; + TQueueActorMapPtr QueueActorMapPtr; + + ui32 RequestsSent = 0; + ui32 Responses = 0; + public: + + TPartsSender( + std::shared_ptr<TBalancingCtx> ctx, + TIntrusivePtr<TBlobStorageGroupInfo> gInfo, + TQueueActorMapPtr queueActorMapPtr + ) + : Ctx(ctx) + , GInfo(gInfo) + , QueueActorMapPtr(queueActorMapPtr) + {} + + void SendRequest(const TVDiskIdShort& vDiskId, const TActorId& selfId, IEventBase* ev, ui32 dataSize) { + auto& queue = (*QueueActorMapPtr)[vDiskId]; + TReplQuoter::QuoteMessage( + Ctx->VCtx->ReplNodeRequestQuoter, + std::make_unique<IEventHandle>(queue, selfId, ev), + dataSize + ); + RequestsSent++; + Ctx->MonGroup.SentOnMainBytes() += dataSize; + } + + void SendPartsOnMain(const TActorId& selfId, TVector<TPart>& parts) { + THashMap<TVDiskID, std::unique_ptr<TEvBlobStorage::TEvVMultiPut>> vDiskToEv; + for (auto& part: parts) { + auto localParts = part.PartsMask; + for (ui8 partIdx = localParts.FirstPosition(), i = 0; partIdx < localParts.GetSize(); partIdx = localParts.NextPosition(partIdx), ++i) { + auto key = TLogoBlobID(part.Key, partIdx + 1); + auto& data = part.PartsData[i]; + auto vDiskId = GetMainReplicaVDiskId(*GInfo, key); + + if (Ctx->HugeBlobCtx->IsHugeBlob(GInfo->GetTopology().GType, part.Key, Ctx->MinREALHugeBlobInBytes)) { + auto ev = std::make_unique<TEvBlobStorage::TEvVPut>( + key, data, vDiskId, + true, nullptr, + TInstant::Max(), NKikimrBlobStorage::EPutHandleClass::AsyncBlob + ); + SendRequest(TVDiskIdShort(vDiskId), selfId, ev.release(), data.size()); + } else { + STLOG(PRI_DEBUG, BS_VDISK_BALANCING, BSVB11, VDISKP(Ctx->VCtx, "Add in multiput"), (LogoBlobId, key.ToString()), + (To, GInfo->GetTopology().GetOrderNumber(TVDiskIdShort(vDiskId))), (DataSize, data.size())); + + auto& ev = vDiskToEv[vDiskId]; + if (!ev) { + ev = std::make_unique<TEvBlobStorage::TEvVMultiPut>(vDiskId, TInstant::Max(), NKikimrBlobStorage::EPutHandleClass::AsyncBlob, true, nullptr); + } + + ev->AddVPut(key, TRcBuf(data), nullptr, {}, NWilson::TTraceId()); + } + } + } + + for (auto& [vDiskId, ev]: vDiskToEv) { + STLOG(PRI_DEBUG, BS_VDISK_BALANCING, BSVB12, VDISKP(Ctx->VCtx, "Send multiput"), (VDisk, vDiskId.ToString())); + + ui32 blobsSize = 0; + for (const auto& item: ev->Record.GetItems()) { + blobsSize += item.GetBuffer().size(); + } + + SendRequest(TVDiskIdShort(vDiskId), selfId, ev.release(), blobsSize); + } + } + + void Handle(TEvBlobStorage::TEvVPutResult::TPtr ev) { + ++Responses; + if (ev->Get()->Record.GetStatus() != NKikimrProto::OK) { + STLOG(PRI_DEBUG, BS_VDISK_BALANCING, BSVB13, VDISKP(Ctx->VCtx, "Put failed"), (Msg, ev->Get()->ToString())); + } else { + ++Ctx->MonGroup.SentOnMain(); + Ctx->MonGroup.SentOnMainWithResponseBytes() += GInfo->GetTopology().GType.PartSize(LogoBlobIDFromLogoBlobID(ev->Get()->Record.GetBlobID())); + STLOG(PRI_DEBUG, BS_VDISK_BALANCING, BSVB14, VDISKP(Ctx->VCtx, "Put done"), (Msg, ev->Get()->ToString())); + } + } + + void Handle(TEvBlobStorage::TEvVMultiPutResult::TPtr ev) { + ++Responses; + const auto& items = ev->Get()->Record.GetItems(); + for (const auto& item: items) { + if (item.GetStatus() != NKikimrProto::OK) { + STLOG(PRI_DEBUG, BS_VDISK_BALANCING, BSVB15, VDISKP(Ctx->VCtx, "MultiPut failed"), (Key, LogoBlobIDFromLogoBlobID(item.GetBlobID()).ToString())); + continue; + } + ++Ctx->MonGroup.SentOnMain(); + Ctx->MonGroup.SentOnMainWithResponseBytes() += GInfo->GetTopology().GType.PartSize(LogoBlobIDFromLogoBlobID(item.GetBlobID())); + STLOG(PRI_DEBUG, BS_VDISK_BALANCING, BSVB16, VDISKP(Ctx->VCtx, "MultiPut done"), (Key, LogoBlobIDFromLogoBlobID(item.GetBlobID()).ToString())); + } + } + + bool IsDone() const { + return Responses == RequestsSent; + } + + }; class TSender : public TActorBootstrapped<TSender> { TActorId NotifyId; @@ -115,76 +220,85 @@ namespace { std::shared_ptr<TBalancingCtx> Ctx; TIntrusivePtr<TBlobStorageGroupInfo> GInfo; TReader Reader; + TPartsSender Sender; - struct TStats { - ui32 PartsRead = 0; - ui32 PartsSent = 0; - ui32 PartsSentUnsuccsesfull = 0; - }; - TStats Stats; + /////////////////////////////////////////////////////////////////////////////////////////// + // StateRead + /////////////////////////////////////////////////////////////////////////////////////////// - void ScheduleJobQuant() { - Reader.ScheduleJobQuant(SelfId()); - // if all parts are already in memory, we could process results right away - TryProcessResults(); - } + void ReadPartsFromDisk() { + Become(&TThis::StateRead); - void TryProcessResults() { - if (auto [batch, partsLeft] = Reader.TryGetResults(); batch.has_value()) { - Stats.PartsRead += batch->size(); - SendParts(*batch); - - // notify about job quant completion - Send(NotifyId, new NActors::TEvents::TEvCompleted(SENDER_ID, partsLeft)); + if (Reader.GetPartsSize() == 0) { + STLOG(PRI_DEBUG, BS_VDISK_BALANCING, BSVB10, VDISKP(Ctx->VCtx, "Nothing to read. PassAway")); + PassAway(); + return; + } - if (partsLeft == 0) { - // no more parts to send - PassAway(); - } + Reader.SendReadRequests(SelfId()); + if (Reader.IsDone()) { + SendPartsOnMain(); + return; } + + Schedule(TDuration::Seconds(15), new NActors::TEvents::TEvWakeup(READ_TIMEOUT_TAG)); // read timeout } void Handle(NPDisk::TEvChunkReadResult::TPtr ev) { Reader.Handle(ev); - TryProcessResults(); + if (Reader.IsDone()) { + SendPartsOnMain(); + } } - void SendParts(const TVector<TPart>& batch) { - STLOG(PRI_DEBUG, BS_VDISK_BALANCING, BSVB11, VDISKP(Ctx->VCtx, "Sending parts"), (BatchSize, batch.size())); + void TimeoutRead(NActors::TEvents::TEvWakeup::TPtr ev) { + if (ev->Get()->Tag != READ_TIMEOUT_TAG) { + return; + } + STLOG(PRI_INFO, BS_VDISK_BALANCING, BSVB17, VDISKP(Ctx->VCtx, "TimeoutRead"), (Requests, Reader.GetPartsSize()), (Responses, Reader.GetResponses())); + SendPartsOnMain(); + } - for (const auto& part: batch) { - auto localParts = part.PartsMask; - for (ui8 partIdx = localParts.FirstPosition(), i = 0; partIdx < localParts.GetSize(); partIdx = localParts.NextPosition(partIdx), ++i) { - auto key = TLogoBlobID(part.Key, partIdx + 1); - const auto& data = part.PartsData[i]; - auto vDiskId = GetMainReplicaVDiskId(*GInfo, key); - STLOG(PRI_DEBUG, BS_VDISK_BALANCING, BSVB12, VDISKP(Ctx->VCtx, "Sending"), (LogoBlobId, key.ToString()), - (To, GInfo->GetTopology().GetOrderNumber(TVDiskIdShort(vDiskId))), (DataSize, data.size())); - - auto& queue = (*QueueActorMapPtr)[TVDiskIdShort(vDiskId)]; - auto ev = std::make_unique<TEvBlobStorage::TEvVPut>( - key, data, vDiskId, - true, nullptr, - TInstant::Max(), NKikimrBlobStorage::EPutHandleClass::AsyncBlob - ); - TReplQuoter::QuoteMessage( - Ctx->VCtx->ReplNodeRequestQuoter, - std::make_unique<IEventHandle>(queue, SelfId(), ev.release()), - data.size() - ); - } + STRICT_STFUNC(StateRead, + hFunc(NPDisk::TEvChunkReadResult, Handle) + hFunc(NActors::TEvents::TEvWakeup, TimeoutRead) + + hFunc(TEvVGenerationChange, Handle) + cFunc(NActors::TEvents::TEvPoison::EventType, PassAway) + ); + + /////////////////////////////////////////////////////////////////////////////////////////// + // StateSend + /////////////////////////////////////////////////////////////////////////////////////////// + + void SendPartsOnMain() { + Become(&TThis::StateSend); + + if (Reader.GetResult().empty()) { + STLOG(PRI_DEBUG, BS_VDISK_BALANCING, BSVB18, VDISKP(Ctx->VCtx, "Nothing to send. PassAway")); + PassAway(); + return; } + + Sender.SendPartsOnMain(SelfId(), Reader.GetResult()); + + Schedule(TDuration::Seconds(15), new NActors::TEvents::TEvWakeup(SEND_TIMEOUT_TAG)); // send timeout } - void Handle(TEvBlobStorage::TEvVPutResult::TPtr ev) { - ++Stats.PartsSent; - if (ev->Get()->Record.GetStatus() != NKikimrProto::OK) { - ++Stats.PartsSentUnsuccsesfull; - STLOG(PRI_WARN, BS_VDISK_BALANCING, BSVB13, VDISKP(Ctx->VCtx, "Put failed"), (Msg, ev->Get()->ToString())); + template<class TEvPutResult> + void HandlePutResult(TEvPutResult ev) { + Sender.Handle(ev); + if (Sender.IsDone()) { + PassAway(); + } + } + + void TimeoutSend(NActors::TEvents::TEvWakeup::TPtr ev) { + if (ev->Get()->Tag != SEND_TIMEOUT_TAG) { return; } - ++Ctx->MonGroup.SentOnMain(); - STLOG(PRI_DEBUG, BS_VDISK_BALANCING, BSVB14, VDISKP(Ctx->VCtx, "Put done"), (Msg, ev->Get()->ToString())); + STLOG(PRI_DEBUG, BS_VDISK_BALANCING, BSVB19, VDISKP(Ctx->VCtx, "TimeoutSend")); + PassAway(); } void PassAway() override { @@ -192,23 +306,27 @@ namespace { TActorBootstrapped::PassAway(); } - void Handle(TEvVGenerationChange::TPtr ev) { - GInfo = ev->Get()->NewInfo; - } - - STRICT_STFUNC(StateFunc, - cFunc(NActors::TEvents::TEvWakeup::EventType, ScheduleJobQuant) - hFunc(NPDisk::TEvChunkReadResult, Handle) - hFunc(TEvBlobStorage::TEvVPutResult, Handle) - cFunc(NActors::TEvents::TEvPoison::EventType, PassAway) + STRICT_STFUNC(StateSend, + hFunc(NActors::TEvents::TEvWakeup, TimeoutSend) + hFunc(TEvBlobStorage::TEvVPutResult, HandlePutResult) + hFunc(TEvBlobStorage::TEvVMultiPutResult, HandlePutResult) hFunc(TEvVGenerationChange, Handle) + cFunc(NActors::TEvents::TEvPoison::EventType, PassAway) ); + /////////////////////////////////////////////////////////////////////////////////////////// + // Helper functions + /////////////////////////////////////////////////////////////////////////////////////////// + + void Handle(TEvVGenerationChange::TPtr ev) { + GInfo = ev->Get()->NewInfo; + } + public: TSender( TActorId notifyId, - TQueue<TPartInfo> parts, + TConstArrayRef<TPartInfo> parts, TQueueActorMapPtr queueActorMapPtr, std::shared_ptr<TBalancingCtx> ctx ) @@ -216,18 +334,19 @@ namespace { , QueueActorMapPtr(queueActorMapPtr) , Ctx(ctx) , GInfo(ctx->GInfo) - , Reader(32, Ctx->PDiskCtx, std::move(parts), ctx->VCtx->ReplPDiskReadQuoter, GInfo->GetTopology().GType) + , Reader(Ctx->PDiskCtx, parts, ctx->VCtx->ReplPDiskReadQuoter, GInfo->GetTopology().GType, Ctx->MonGroup) + , Sender(ctx, GInfo, queueActorMapPtr) {} void Bootstrap() { - Become(&TThis::StateFunc); + ReadPartsFromDisk(); } }; } IActor* CreateSenderActor( TActorId notifyId, - TQueue<TPartInfo> parts, + TConstArrayRef<TPartInfo> parts, TQueueActorMapPtr queueActorMapPtr, std::shared_ptr<TBalancingCtx> ctx ) { diff --git a/ydb/core/blobstorage/vdisk/common/vdisk_mongroups.h b/ydb/core/blobstorage/vdisk/common/vdisk_mongroups.h index 28fa9e6f37d..93b476cbc20 100644 --- a/ydb/core/blobstorage/vdisk/common/vdisk_mongroups.h +++ b/ydb/core/blobstorage/vdisk/common/vdisk_mongroups.h @@ -592,16 +592,39 @@ public: public: GROUP_CONSTRUCTOR(TBalancingGroup) { + COUNTER_INIT(BalancingIterations, true); + COUNTER_INIT(PlannedToSendOnMain, false); - COUNTER_INIT(SentOnMain, false); COUNTER_INIT(CandidatesToDelete, false); - COUNTER_INIT(MarkedReadyToDelete, false); + + COUNTER_INIT(ReadFromHandoffBytes, true); + COUNTER_INIT(ReadFromHandoffResponseBytes, true); + COUNTER_INIT(SentOnMain, true); + COUNTER_INIT(SentOnMainBytes, true); + COUNTER_INIT(SentOnMainWithResponseBytes, true); + + COUNTER_INIT(CandidatesToDeleteAskedFromMain, true); + COUNTER_INIT(CandidatesToDeleteAskedFromMainResponse, true); + COUNTER_INIT(MarkedReadyToDelete, true); + COUNTER_INIT(MarkedReadyToDeleteBytes, true); + COUNTER_INIT(MarkedReadyToDeleteResponse, true); + COUNTER_INIT(MarkedReadyToDeleteWithResponseBytes, true); } + COUNTER_DEF(BalancingIterations); COUNTER_DEF(PlannedToSendOnMain); + COUNTER_DEF(ReadFromHandoffBytes); + COUNTER_DEF(ReadFromHandoffResponseBytes); COUNTER_DEF(SentOnMain); + COUNTER_DEF(SentOnMainBytes); + COUNTER_DEF(SentOnMainWithResponseBytes); COUNTER_DEF(CandidatesToDelete); + COUNTER_DEF(CandidatesToDeleteAskedFromMain); + COUNTER_DEF(CandidatesToDeleteAskedFromMainResponse); COUNTER_DEF(MarkedReadyToDelete); + COUNTER_DEF(MarkedReadyToDeleteBytes); + COUNTER_DEF(MarkedReadyToDeleteResponse); + COUNTER_DEF(MarkedReadyToDeleteWithResponseBytes); }; /////////////////////////////////////////////////////////////////////////////////// diff --git a/ydb/core/blobstorage/vdisk/common/vdisk_private_events.h b/ydb/core/blobstorage/vdisk/common/vdisk_private_events.h index 64c92463539..3f73fa30ae5 100644 --- a/ydb/core/blobstorage/vdisk/common/vdisk_private_events.h +++ b/ydb/core/blobstorage/vdisk/common/vdisk_private_events.h @@ -58,12 +58,14 @@ namespace NKikimr { : public TEventLocal<TEvDelLogoBlobDataSyncLogResult, TEvBlobStorage::EvDelLogoBlobDataSyncLogResult> , public TEvVResultBase { + const TLogoBlobID Id; const ui64 OrderId; - TEvDelLogoBlobDataSyncLogResult(ui64 orderId, const TInstant &now, + TEvDelLogoBlobDataSyncLogResult(const TLogoBlobID &id, ui64 orderId, const TInstant &now, ::NMonitoring::TDynamicCounters::TCounterPtr counterPtr, NVDiskMon::TLtcHistoPtr histoPtr) : TEvVResultBase(now, TInterconnectChannels::IC_BLOBSTORAGE_SMALL_MSG, counterPtr, histoPtr) + , Id(id) , OrderId(orderId) {} }; diff --git a/ydb/core/blobstorage/vdisk/ingress/blobstorage_ingress.cpp b/ydb/core/blobstorage/vdisk/ingress/blobstorage_ingress.cpp index 17f7e36482a..7aa332bad0d 100644 --- a/ydb/core/blobstorage/vdisk/ingress/blobstorage_ingress.cpp +++ b/ydb/core/blobstorage/vdisk/ingress/blobstorage_ingress.cpp @@ -233,7 +233,8 @@ namespace NKikimr { TVectorType m = handoff[handoffNodeId].ToVector(); // map of handoff replicas on this node TVectorType mainVec = main.ToVector(); TVectorType toMove = m - mainVec; // what we can send to main replicas - TVectorType toDel = m & mainVec; // what we can delete + TVectorType deleted = GetVDiskHandoffDeletedVec(top, vdisk, id); + TVectorType toDel = m & mainVec & ~deleted; // not deleted, what we can to delete return TPairOfVectors(toMove, toDel); } } diff --git a/ydb/core/blobstorage/vdisk/skeleton/blobstorage_skeleton.cpp b/ydb/core/blobstorage/vdisk/skeleton/blobstorage_skeleton.cpp index 043b5300e30..69975e9b2fb 100644 --- a/ydb/core/blobstorage/vdisk/skeleton/blobstorage_skeleton.cpp +++ b/ydb/core/blobstorage/vdisk/skeleton/blobstorage_skeleton.cpp @@ -853,7 +853,7 @@ namespace NKikimr { std::unique_ptr<NSyncLog::TEvSyncLogPut> syncLogMsg( new NSyncLog::TEvSyncLogPut(Db->GType, seg.Point(), msg->Id, msg->Ingress)); - std::unique_ptr<TEvDelLogoBlobDataSyncLogResult> result(new TEvDelLogoBlobDataSyncLogResult(msg->OrderId, now, + std::unique_ptr<TEvDelLogoBlobDataSyncLogResult> result(new TEvDelLogoBlobDataSyncLogResult(msg->Id, msg->OrderId, now, nullptr, nullptr)); bool confirmSyncLogAlso = static_cast<bool>(syncLogMsg); @@ -2539,8 +2539,7 @@ namespace NKikimr { ActiveActors.Erase(BalancingId); } auto balancingCtx = std::make_shared<TBalancingCtx>( - VCtx, PDiskCtx, SelfId(), Hull->GetSnapshot(), Config, GInfo - ); + VCtx, PDiskCtx, HugeBlobCtx, SelfId(), Hull->GetSnapshot(), Config, GInfo, MinREALHugeBlobInBytes); BalancingId = ctx.Register(CreateBalancingActor(balancingCtx)); ActiveActors.Insert(BalancingId, __FILE__, __LINE__, ctx, NKikimrServices::BLOBSTORAGE); } |