aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorRobert Drynkin <robdrynkin@ydb.tech>2024-07-12 17:03:49 +0400
committerGitHub <noreply@github.com>2024-07-12 17:03:49 +0400
commita2040b49a51a07727674bcf03ed098cc65afdf22 (patch)
treeb66549ba4d680f3030151bbb93d6ff9e1d63e549
parente615634d6e3dd02e6cce3bd0fc8b59739419f48a (diff)
downloadydb-a2040b49a51a07727674bcf03ed098cc65afdf22.tar.gz
Refactor balancing + add batch requests (#5904)
-rw-r--r--ydb/core/blobstorage/ut_blobstorage/balancing.cpp25
-rw-r--r--ydb/core/blobstorage/vdisk/balance/balancing_actor.cpp201
-rw-r--r--ydb/core/blobstorage/vdisk/balance/balancing_actor.h4
-rw-r--r--ydb/core/blobstorage/vdisk/balance/defs.h29
-rw-r--r--ydb/core/blobstorage/vdisk/balance/deleter.cpp275
-rw-r--r--ydb/core/blobstorage/vdisk/balance/sender.cpp313
-rw-r--r--ydb/core/blobstorage/vdisk/common/vdisk_mongroups.h27
-rw-r--r--ydb/core/blobstorage/vdisk/common/vdisk_private_events.h4
-rw-r--r--ydb/core/blobstorage/vdisk/ingress/blobstorage_ingress.cpp3
-rw-r--r--ydb/core/blobstorage/vdisk/skeleton/blobstorage_skeleton.cpp5
10 files changed, 594 insertions, 292 deletions
diff --git a/ydb/core/blobstorage/ut_blobstorage/balancing.cpp b/ydb/core/blobstorage/ut_blobstorage/balancing.cpp
index 2937892b59f..b83dfd4bd81 100644
--- a/ydb/core/blobstorage/ut_blobstorage/balancing.cpp
+++ b/ydb/core/blobstorage/ut_blobstorage/balancing.cpp
@@ -79,7 +79,6 @@ struct TTestEnv {
auto SendGet(ui32 step, ui32 dataSize, bool mustRestoreFirst=false) {
const TLogoBlobID blobId(1, 1, step, 0, dataSize, 0);
- Cerr << "SEND TEvGet with key " << blobId.ToString() << Endl;
const TActorId sender = Env.Runtime->AllocateEdgeActor(GroupInfo->GetActorId(*RunningNodes.begin()).NodeId(), __FILE__, __LINE__);
auto ev = std::make_unique<TEvBlobStorage::TEvGet>(
blobId,
@@ -94,7 +93,6 @@ struct TTestEnv {
});
TInstant getDeadline = Env.Now() + TDuration::Seconds(30);
auto res = Env.WaitForEdgeActorEvent<TEvBlobStorage::TEvGetResult>(sender, /* termOnCapture */ false, getDeadline);
- Cerr << "TEvGetResult: " << res->Get()->ToString() << Endl;
return res;
};
@@ -152,7 +150,7 @@ struct TTestEnv {
bool CheckPartsLocations(const TLogoBlobID& blobId) {
auto expectedParts = GetExpectedPartsLocations(blobId);
auto actualParts = GetActualPartsLocations(blobId);
- TString errMsg = ToString(expectedParts) + " != " + ToString(actualParts);
+ TString errMsg = ToString(expectedParts) + " != " + ToString(actualParts) + " Key: " + blobId.ToString();
UNIT_ASSERT_VALUES_EQUAL_C(expectedParts.size(), actualParts.size(), errMsg);
for (ui32 i = 0; i < expectedParts.size(); ++i) {
@@ -231,20 +229,20 @@ struct TStopOneNodeTest {
Env.SendPut(step, data, NKikimrProto::OK);
Env->Sim(TDuration::Seconds(10));
Env.StartNode(nodeIdWithBlob);
- Env->Sim(TDuration::Seconds(10));
+ Env->Sim(TDuration::Seconds(30));
Cerr << "Start compaction 1" << Endl;
for (ui32 pos = 0; pos < Env->Settings.NodeCount; ++pos) {
Env->CompactVDisk(Env.GroupInfo->GetActorId(pos));
}
- Env->Sim(TDuration::Seconds(10));
+ Env->Sim(TDuration::Seconds(30));
Cerr << "Finish compaction 1" << Endl;
Cerr << "Start compaction 2" << Endl;
for (ui32 pos = 0; pos < Env->Settings.NodeCount; ++pos) {
Env->CompactVDisk(Env.GroupInfo->GetActorId(pos));
}
- Env->Sim(TDuration::Seconds(10));
+ Env->Sim(TDuration::Seconds(30));
Cerr << "Finish compaction 2" << Endl;
Env.CheckPartsLocations(MakeLogoBlobId(step, data.size()));
@@ -259,7 +257,7 @@ struct TRandomTest {
ui32 MaxBlobSize;
void RunTest() {
- srand(123);
+ srand(123456);
TVector<TString> data(Reserve(NumIters));
TVector<ui32> successfulSteps;
@@ -302,7 +300,7 @@ struct TRandomTest {
// Wipe random node
if (random() % 100 == 1) {
ui32 pos = random() % Env->Settings.NodeCount;
- if (Env.RunningNodes.contains(pos)) {
+ if (Env.RunningNodes.contains(pos) && Env.RunningNodes.contains(0)) {
Cerr << "Wipe node " << pos << Endl;
auto baseConfig = Env->FetchBaseConfig();
const auto& someVSlot = baseConfig.GetVSlot(pos);
@@ -321,7 +319,7 @@ struct TRandomTest {
for (ui32 pos = 0; pos < Env->Settings.NodeCount; ++pos) {
Env.StartNode(pos);
}
- Env->Sim(TDuration::Seconds(10));
+ Env->Sim(TDuration::Seconds(60));
Cerr << "Start compaction 1" << Endl;
for (ui32 pos = 0; pos < Env->Settings.NodeCount; ++pos) {
@@ -337,9 +335,8 @@ struct TRandomTest {
Cerr << "Start checking" << Endl;
for (ui32 step: successfulSteps) {
- Cerr << "step = " << step << Endl;
Env.CheckPartsLocations(MakeLogoBlobId(step, data[step].size()));
- UNIT_ASSERT_VALUES_EQUAL(Env.SendGet(step, data[step].size())->Get()->Responses[0].Buffer.ConvertToString(), data[step]);
+ UNIT_ASSERT_VALUES_EQUAL_C(Env.SendGet(step, data[step].size())->Get()->Responses[0].Buffer.ConvertToString(), data[step], MakeLogoBlobId(step, data[step].size()).ToString());
}
}
};
@@ -401,21 +398,21 @@ struct TTwoPartsOnOneNodeTest {
// start all stopped nodes
Env.StartNode(partIdxToNodeId[1]);
Env.StartNode(handoffNodeIds[1]);
- Env->Sim(TDuration::Seconds(10));
+ Env->Sim(TDuration::Seconds(30));
// run compactions
Cerr << "Start compaction 1" << Endl;
for (ui32 pos = 0; pos < Env->Settings.NodeCount; ++pos) {
Env->CompactVDisk(Env.GroupInfo->GetActorId(pos));
}
- Env->Sim(TDuration::Seconds(10));
+ Env->Sim(TDuration::Seconds(30));
Cerr << "Finish compaction 1" << Endl;
Cerr << "Start compaction 2" << Endl;
for (ui32 pos = 0; pos < Env->Settings.NodeCount; ++pos) {
Env->CompactVDisk(Env.GroupInfo->GetActorId(pos));
}
- Env->Sim(TDuration::Seconds(10));
+ Env->Sim(TDuration::Seconds(30));
Cerr << "Finish compaction 2" << Endl;
Env.CheckPartsLocations(MakeLogoBlobId(step, data.size()));
diff --git a/ydb/core/blobstorage/vdisk/balance/balancing_actor.cpp b/ydb/core/blobstorage/vdisk/balance/balancing_actor.cpp
index ff14300f9b8..de0c1790347 100644
--- a/ydb/core/blobstorage/vdisk/balance/balancing_actor.cpp
+++ b/ydb/core/blobstorage/vdisk/balance/balancing_actor.cpp
@@ -9,6 +9,63 @@
namespace NKikimr {
namespace NBalancing {
+ template<class T>
+ struct TBatchedQueue {
+ TVector<T> Data;
+ ui32 CurPos = 0;
+ const ui32 BatchSize = 0;
+
+ TBatchedQueue(ui32 batchSize)
+ : BatchSize(batchSize)
+ {}
+
+ TConstArrayRef<T> GetNextBatch() {
+ if (Empty()) {
+ return {};
+ }
+
+ ui32 begin = CurPos;
+ ui32 end = Min(begin + BatchSize, static_cast<ui32>(Data.size()));
+ CurPos = end;
+ return TConstArrayRef<T>(Data.data() + begin, end - begin);
+ }
+
+ bool Empty() const {
+ return CurPos >= Data.size();
+ }
+
+ ui32 Size() const {
+ return Data.size() - CurPos;
+ }
+ };
+
+ struct TBatchManager {
+ TActorId SenderId;
+ TActorId DeleterId;
+ bool IsSendCompleted = false;
+ bool IsDeleteCompleted = false;
+
+ void Handle(NActors::TEvents::TEvCompleted::TPtr ev) {
+ if (ev->Sender == SenderId) {
+ IsSendCompleted = true;
+ } else if (ev->Sender == DeleterId) {
+ IsDeleteCompleted = true;
+ } else {
+ STLOG(PRI_WARN, BS_VDISK_BALANCING, BSVB05, "Unexpected id", (Id, ev->Sender));
+ }
+ }
+
+ bool IsBatchCompleted() const {
+ return IsSendCompleted && IsDeleteCompleted;
+ }
+
+ TBatchManager() = default;
+ TBatchManager(IActor* sender, IActor* deleter)
+ : SenderId(TlsActivationContext->Register(sender))
+ , DeleterId(TlsActivationContext->Register(deleter))
+ {}
+ };
+
class TBalancingActor : public TActorBootstrapped<TBalancingActor> {
private:
std::shared_ptr<TBalancingCtx> Ctx;
@@ -17,27 +74,24 @@ namespace NBalancing {
TQueueActorMapPtr QueueActorMapPtr;
THashSet<TVDiskID> ConnectedVDisks;
- TQueue<TPartInfo> SendOnMainParts;
- TQueue<TLogoBlobID> TryDeleteParts;
-
- TActorId SenderId;
- TActorId DeleterId;
-
- struct TStats {
- bool SendCompleted = false;
- ui32 SendPartsLeft = 0;
- bool DeleteCompleted = false;
- ui32 DeletePartsLeft = 0;
- };
-
- TStats Stats;
+ TBatchedQueue<TPartInfo> SendOnMainParts;
+ TBatchedQueue<TLogoBlobID> TryDeleteParts;
+ TBatchManager BatchManager;
///////////////////////////////////////////////////////////////////////////////////////////
// Main logic
///////////////////////////////////////////////////////////////////////////////////////////
void ContinueBalancing() {
+ if (SendOnMainParts.Empty() && TryDeleteParts.Empty()) {
+ // no more parts to send or delete
+ STLOG(PRI_DEBUG, BS_VDISK_BALANCING, BSVB03, VDISKP(Ctx->VCtx, "Balancing completed"));
+ Send(Ctx->SkeletonId, new TEvStartBalancing());
+ PassAway();
+ return;
+ }
+
// ask for repl token to continue balancing
STLOG(PRI_DEBUG, BS_VDISK_BALANCING, BSVB01, VDISKP(Ctx->VCtx, "Ask repl token to continue balancing"));
Send(MakeBlobStorageReplBrokerID(), new TEvQueryReplToken(Ctx->VDiskCfg->BaseInfo.PDiskId), NActors::IEventHandle::FlagTrackDelivery);
@@ -46,55 +100,23 @@ namespace NBalancing {
void ScheduleJobQuant() {
// once repl token received, start balancing - waking up sender and deleter
STLOG(PRI_DEBUG, BS_VDISK_BALANCING, BSVB02, VDISKP(Ctx->VCtx, "Schedule job quant"),
- (SendPartsLeft, Stats.SendPartsLeft), (DeletePartsLeft, Stats.DeletePartsLeft),
+ (SendPartsLeft, SendOnMainParts.Size()), (DeletePartsLeft, TryDeleteParts.Size()),
(ConnectedVDisks, ConnectedVDisks.size()), (TotalVDisks, GInfo->GetTotalVDisksNum()));
- Send(SenderId, new NActors::TEvents::TEvWakeup());
- Send(DeleterId, new NActors::TEvents::TEvWakeup());
- }
-
- void Handle(NActors::TEvents::TEvCompleted::TPtr ev) {
- // sender or deleter completed job quant
- switch (ev->Get()->Id) {
- case SENDER_ID:
- Stats.SendCompleted = true;
- Stats.SendPartsLeft = ev->Get()->Status;
- break;
- case DELETER_ID:
- Stats.DeleteCompleted = true;
- Stats.DeletePartsLeft = ev->Get()->Status;
- break;
- default:
- Y_ABORT("Unexpected id");
- }
-
- if (Stats.SendCompleted && Stats.DeleteCompleted) {
- // balancing job quant completed, release token
- Send(MakeBlobStorageReplBrokerID(), new TEvReleaseReplToken);
-
- if (Stats.SendPartsLeft != 0 || Stats.DeletePartsLeft != 0) {
- // sender or deleter has not finished yet
- Stats.SendCompleted = Stats.SendPartsLeft == 0;
- Stats.DeleteCompleted = Stats.DeletePartsLeft == 0;
- ContinueBalancing();
- return;
- }
- // balancing completed
- STLOG(PRI_DEBUG, BS_VDISK_BALANCING, BSVB03, VDISKP(Ctx->VCtx, "Balancing completed"));
- Send(Ctx->SkeletonId, new TEvStartBalancing());
- PassAway();
- }
+ // register sender and deleter actors
+ BatchManager = TBatchManager(
+ CreateSenderActor(SelfId(), SendOnMainParts.GetNextBatch(), QueueActorMapPtr, Ctx),
+ CreateDeleterActor(SelfId(), TryDeleteParts.GetNextBatch(), QueueActorMapPtr, Ctx)
+ );
}
- constexpr static TDuration JOB_GRANULARITY = TDuration::MilliSeconds(1);
-
void CollectKeys() {
THPTimer timer;
for (ui32 cnt = 0; It.Valid(); It.Next(), ++cnt) {
- if (cnt % 1000 == 999 && TDuration::Seconds(timer.Passed()) > JOB_GRANULARITY) {
+ if (cnt % 100 == 99 && TDuration::Seconds(timer.Passed()) > JOB_GRANULARITY) {
// actor should not block the thread for a long time, so we should yield
- STLOG(PRI_DEBUG, BS_VDISK_BALANCING, BSVB04, VDISKP(Ctx->VCtx, "Collect keys"), (collected, cnt));
+ STLOG(PRI_DEBUG, BS_VDISK_BALANCING, BSVB04, VDISKP(Ctx->VCtx, "Collect keys"), (collected, cnt), (passed, timer.Passed()));
Send(SelfId(), new NActors::TEvents::TEvWakeup());
return;
}
@@ -106,42 +128,48 @@ namespace NBalancing {
It.PutToMerger(&merger);
auto [moveMask, delMask] = merger.Ingress.HandoffParts(&top, Ctx->VCtx->ShortSelfVDisk, key);
- auto partsToSend = merger.Ingress.LocalParts(top.GType) & moveMask;
- auto partsToDelete = merger.Ingress.LocalParts(top.GType) & delMask;
-
- // collect parts to send on main
- for (const auto& [parts, data]: merger.Parts) {
- if (!(partsToSend & parts).Empty()) {
- SendOnMainParts.push(TPartInfo{
- .Key=It.GetCurKey().LogoBlobID(),
- .PartsMask=parts,
- .PartData=data
- });
+
+ if (auto partsToSend = merger.Ingress.LocalParts(top.GType) & moveMask; !partsToSend.Empty()) {
+ // collect parts to send on main
+ for (const auto& [parts, data]: merger.Parts) {
+ if (!(partsToSend & parts).Empty()) {
+ SendOnMainParts.Data.emplace_back(TPartInfo{
+ .Key=It.GetCurKey().LogoBlobID(),
+ .PartsMask=parts,
+ .PartData=data
+ });
+ }
}
}
- // collect parts to delete
- for (ui8 partIdx = partsToDelete.FirstPosition(); partIdx < partsToDelete.GetSize(); partIdx = partsToDelete.NextPosition(partIdx)) {
- TryDeleteParts.push(TLogoBlobID(It.GetCurKey().LogoBlobID(), partIdx + 1));
- STLOG(PRI_DEBUG, BS_VDISK_BALANCING, BSVB07, VDISKP(Ctx->VCtx, "Delete"), (LogoBlobId, TryDeleteParts.back().ToString()));
+ if (auto partsToDelete = merger.Ingress.LocalParts(top.GType) & delMask; !partsToDelete.Empty()) {
+ // collect parts to delete
+ for (ui8 partIdx = partsToDelete.FirstPosition(); partIdx < partsToDelete.GetSize(); partIdx = partsToDelete.NextPosition(partIdx)) {
+ TryDeleteParts.Data.emplace_back(TLogoBlobID(It.GetCurKey().LogoBlobID(), partIdx + 1));
+ STLOG(PRI_DEBUG, BS_VDISK_BALANCING, BSVB10, VDISKP(Ctx->VCtx, "Delete"), (LogoBlobId, TryDeleteParts.Data.back().ToString()));
+ }
}
merger.Clear();
}
STLOG(PRI_DEBUG, BS_VDISK_BALANCING, BSVB08, VDISKP(Ctx->VCtx, "Keys collected"),
- (SendOnMainParts, SendOnMainParts.size()), (TryDeleteParts, TryDeleteParts.size()));
- Ctx->MonGroup.PlannedToSendOnMain() = SendOnMainParts.size();
- Ctx->MonGroup.CandidatesToDelete() = TryDeleteParts.size();
-
- // register sender and deleter actors
- SenderId = TlsActivationContext->Register(CreateSenderActor(SelfId(), std::move(SendOnMainParts), QueueActorMapPtr, Ctx));
- DeleterId = TlsActivationContext->Register(CreateDeleterActor(SelfId(), std::move(TryDeleteParts), QueueActorMapPtr, Ctx));
+ (SendOnMainParts, SendOnMainParts.Data.size()), (TryDeleteParts, TryDeleteParts.Data.size()));
+ Ctx->MonGroup.PlannedToSendOnMain() = SendOnMainParts.Data.size();
+ Ctx->MonGroup.CandidatesToDelete() = TryDeleteParts.Data.size();
// start balancing
ContinueBalancing();
}
+ void Handle(NActors::TEvents::TEvCompleted::TPtr ev) {
+ BatchManager.Handle(ev);
+ if (BatchManager.IsBatchCompleted()) {
+ Send(MakeBlobStorageReplBrokerID(), new TEvReleaseReplToken);
+
+ ContinueBalancing();
+ }
+ }
///////////////////////////////////////////////////////////////////////////////////////////
// Helper functions
@@ -162,7 +190,7 @@ namespace NBalancing {
void Handle(NActors::TEvents::TEvUndelivered::TPtr ev) {
if (ev.Get()->Type == TEvReplToken::EventType) {
- STLOG(PRI_WARN, BS_VDISK_BALANCING, BSVB09, VDISKP(Ctx->VCtx, "Aske repl token msg not delivered"));
+ STLOG(PRI_WARN, BS_VDISK_BALANCING, BSVB06, VDISKP(Ctx->VCtx, "Ask repl token msg not delivered"));
ScheduleJobQuant();
}
}
@@ -170,8 +198,10 @@ namespace NBalancing {
void Handle(TEvProxyQueueState::TPtr ev) {
const TVDiskID& vdiskId = ev->Get()->VDiskId;
if (ev->Get()->IsConnected) {
+ STLOG(PRI_DEBUG, BS_VDISK_BALANCING, BSVB07, VDISKP(Ctx->VCtx, "VDisk connected"), (VDiskId, vdiskId.ToString()));
ConnectedVDisks.insert(vdiskId);
} else {
+ STLOG(PRI_DEBUG, BS_VDISK_BALANCING, BSVB09, VDISKP(Ctx->VCtx, "VDisk disconnected"), (VDiskId, vdiskId.ToString()));
ConnectedVDisks.erase(vdiskId);
}
}
@@ -184,13 +214,13 @@ namespace NBalancing {
}
GInfo = msg->NewInfo;
- Send(SenderId, msg->Clone());
- Send(DeleterId, msg->Clone());
+ Send(BatchManager.SenderId, msg->Clone());
+ Send(BatchManager.DeleterId, msg->Clone());
}
void PassAway() override {
- Send(SenderId, new NActors::TEvents::TEvPoison);
- Send(DeleterId, new NActors::TEvents::TEvPoison);
+ Send(BatchManager.SenderId, new NActors::TEvents::TEvPoison);
+ Send(BatchManager.DeleterId, new NActors::TEvents::TEvPoison);
for (const auto& kv : *QueueActorMapPtr) {
Send(kv.second, new TEvents::TEvPoison);
}
@@ -198,10 +228,12 @@ namespace NBalancing {
}
STRICT_STFUNC(StateFunc,
+ // Logic events
cFunc(NActors::TEvents::TEvWakeup::EventType, CollectKeys)
cFunc(TEvReplToken::EventType, ScheduleJobQuant)
hFunc(NActors::TEvents::TEvCompleted, Handle)
+ // System events
hFunc(NActors::TEvents::TEvUndelivered, Handle)
cFunc(NActors::TEvents::TEvPoison::EventType, PassAway)
@@ -216,14 +248,17 @@ namespace NBalancing {
, Ctx(ctx)
, GInfo(ctx->GInfo)
, It(Ctx->Snap.HullCtx, &Ctx->Snap.LogoBlobsSnap)
+ , SendOnMainParts(BATCH_SIZE)
+ , TryDeleteParts(BATCH_SIZE)
{
}
void Bootstrap() {
+ Become(&TThis::StateFunc);
CreateVDisksQueues();
It.SeekToFirst();
- Become(&TThis::StateFunc);
- Schedule(TDuration::Seconds(1), new NActors::TEvents::TEvWakeup());
+ ++Ctx->MonGroup.BalancingIterations();
+ Schedule(TDuration::Seconds(10), new NActors::TEvents::TEvWakeup());
}
};
diff --git a/ydb/core/blobstorage/vdisk/balance/balancing_actor.h b/ydb/core/blobstorage/vdisk/balance/balancing_actor.h
index f96debd8ee7..9e35865a537 100644
--- a/ydb/core/blobstorage/vdisk/balance/balancing_actor.h
+++ b/ydb/core/blobstorage/vdisk/balance/balancing_actor.h
@@ -10,13 +10,13 @@ namespace NKikimr {
namespace NBalancing {
IActor* CreateSenderActor(
TActorId notifyId,
- TQueue<TPartInfo> parts,
+ TConstArrayRef<TPartInfo> parts,
TQueueActorMapPtr queueActorMapPtr,
std::shared_ptr<TBalancingCtx> ctx
);
IActor* CreateDeleterActor(
TActorId notifyId,
- TQueue<TLogoBlobID> parts,
+ TConstArrayRef<TLogoBlobID> parts,
TQueueActorMapPtr queueActorMapPtr,
std::shared_ptr<TBalancingCtx> ctx
);
diff --git a/ydb/core/blobstorage/vdisk/balance/defs.h b/ydb/core/blobstorage/vdisk/balance/defs.h
index 45445dd8028..6206e24565c 100644
--- a/ydb/core/blobstorage/vdisk/balance/defs.h
+++ b/ydb/core/blobstorage/vdisk/balance/defs.h
@@ -3,6 +3,7 @@
#include <ydb/core/blobstorage/vdisk/common/vdisk_context.h>
#include <ydb/core/blobstorage/vdisk/common/vdisk_pdiskctx.h>
#include <ydb/core/blobstorage/vdisk/hulldb/hull_ds_all_snap.h>
+#include <ydb/core/blobstorage/vdisk/common/vdisk_hugeblobctx.h>
#include <ydb/core/blobstorage/vdisk/common/vdisk_hulllogctx.h>
@@ -10,6 +11,7 @@ namespace NKikimr {
struct TBalancingCtx {
TIntrusivePtr<TVDiskContext> VCtx;
TPDiskCtxPtr PDiskCtx;
+ THugeBlobCtxPtr HugeBlobCtx;
TActorId SkeletonId;
NMonGroup::TBalancingGroup MonGroup;
@@ -18,21 +20,27 @@ namespace NKikimr {
TIntrusivePtr<TVDiskConfig> VDiskCfg;
TIntrusivePtr<TBlobStorageGroupInfo> GInfo;
+ ui32 MinREALHugeBlobInBytes;
+
TBalancingCtx(
TIntrusivePtr<TVDiskContext> vCtx,
TPDiskCtxPtr pDiskCtx,
+ THugeBlobCtxPtr hugeBlobCtx,
TActorId skeletonId,
NKikimr::THullDsSnap snap,
TIntrusivePtr<TVDiskConfig> vDiskCfg,
- TIntrusivePtr<TBlobStorageGroupInfo> gInfo
+ TIntrusivePtr<TBlobStorageGroupInfo> gInfo,
+ ui32 minREALHugeBlobInBytes
)
: VCtx(std::move(vCtx))
, PDiskCtx(std::move(pDiskCtx))
+ , HugeBlobCtx(std::move(hugeBlobCtx))
, SkeletonId(skeletonId)
, MonGroup(VCtx->VDiskCounters, "subsystem", "balancing")
, Snap(std::move(snap))
, VDiskCfg(std::move(vDiskCfg))
, GInfo(std::move(gInfo))
+ , MinREALHugeBlobInBytes(minREALHugeBlobInBytes)
{
}
};
@@ -47,7 +55,22 @@ namespace NBalancing {
std::variant<TDiskPart, TRope> PartData;
};
- constexpr ui32 SENDER_ID = 0;
- constexpr ui32 DELETER_ID = 1;
+ static constexpr ui32 SENDER_ID = 0;
+ static constexpr ui32 DELETER_ID = 1;
+
+ static constexpr TDuration JOB_GRANULARITY = TDuration::MilliSeconds(1);
+
+ static constexpr TDuration READ_BATCH_TIMEOUT = TDuration::Seconds(10);
+ static constexpr TDuration SEND_BATCH_TIMEOUT = TDuration::Seconds(10);
+ static constexpr TDuration REQUEST_BLOBS_ON_MAIN_BATCH_TIMEOUT = TDuration::Seconds(10);
+ static constexpr TDuration DELETE_BATCH_TIMEOUT = TDuration::Seconds(10);
+
+ static constexpr ui64 READ_TIMEOUT_TAG = 0;
+ static constexpr ui64 SEND_TIMEOUT_TAG = 1;
+ static constexpr ui64 REQUEST_TIMEOUT_TAG = 2;
+ static constexpr ui64 DELETE_TIMEOUT_TAG = 3;
+
+ static constexpr ui32 BATCH_SIZE = 32;
+
} // NBalancing
} // NKikimr
diff --git a/ydb/core/blobstorage/vdisk/balance/deleter.cpp b/ydb/core/blobstorage/vdisk/balance/deleter.cpp
index 8b842cfcf99..e189f3c6988 100644
--- a/ydb/core/blobstorage/vdisk/balance/deleter.cpp
+++ b/ydb/core/blobstorage/vdisk/balance/deleter.cpp
@@ -21,35 +21,33 @@ namespace {
class TPartsRequester {
private:
const TActorId NotifyId;
- const size_t BatchSize;
- TQueue<TLogoBlobID> Parts;
+ TConstArrayRef<TLogoBlobID> Parts;
TReplQuoter::TPtr Quoter;
TIntrusivePtr<TBlobStorageGroupInfo> GInfo;
TQueueActorMapPtr QueueActorMapPtr;
+ NMonGroup::TBalancingGroup& MonGroup;
TVector<TPartOnMain> Result;
- ui32 Responses;
- ui32 ExpectedResponses;
+ ui32 RequestsSent = 0;
+ ui32 Responses = 0;
public:
- TPartsRequester(TActorId notifyId, size_t batchSize, TQueue<TLogoBlobID> parts, TReplQuoter::TPtr quoter, TIntrusivePtr<TBlobStorageGroupInfo> gInfo, TQueueActorMapPtr queueActorMapPtr)
+ TPartsRequester(TActorId notifyId, TConstArrayRef<TLogoBlobID> parts, TReplQuoter::TPtr quoter, TIntrusivePtr<TBlobStorageGroupInfo> gInfo, TQueueActorMapPtr queueActorMapPtr, NMonGroup::TBalancingGroup& monGroup)
: NotifyId(notifyId)
- , BatchSize(batchSize)
- , Parts(std::move(parts))
+ , Parts(parts)
, Quoter(quoter)
, GInfo(gInfo)
, QueueActorMapPtr(queueActorMapPtr)
- , Result(Reserve(BatchSize))
- , Responses(0)
- , ExpectedResponses(0)
- {}
+ , MonGroup(monGroup)
+ , Result(parts.size())
+ {
+ }
+
+ void SendRequestsToCheckPartsOnMain(const TActorId& selfId) {
+ THashMap<TVDiskID, std::unique_ptr<TEvBlobStorage::TEvVGet>> vDiskToQueries;
- void ScheduleJobQuant(const TActorId& selfId) {
- Result.resize(Min(Parts.size(), BatchSize));
- ExpectedResponses = 0;
- for (ui64 i = 0; i < BatchSize && !Parts.empty(); ++i) {
- auto key = Parts.front();
- Parts.pop();
+ for (ui64 i = 0; i < Parts.size(); ++i) {
+ auto key = Parts[i];
Result[i] = TPartOnMain{
.Key=key,
.HasOnMain=false
@@ -57,29 +55,27 @@ namespace {
auto vDiskId = GetMainReplicaVDiskId(*GInfo, key);
- // query which would tell us which parts are realy on main (not by ingress)
- auto ev = TEvBlobStorage::TEvVGet::CreateExtremeIndexQuery(
- vDiskId, TInstant::Max(), NKikimrBlobStorage::EGetHandleClass::AsyncRead,
- TEvBlobStorage::TEvVGet::EFlags::None, i,
- {{key.FullID(), 0, 0}}
- );
+ auto& ev = vDiskToQueries[vDiskId];
+ if (!ev) {
+ ev = TEvBlobStorage::TEvVGet::CreateExtremeIndexQuery(
+ vDiskId, TInstant::Max(), NKikimrBlobStorage::EGetHandleClass::AsyncRead,
+ TEvBlobStorage::TEvVGet::EFlags::None, 0
+ );
+ }
+
+ ev->AddExtremeQuery(key.FullID(), 0, 0, &i);
+ ++MonGroup.CandidatesToDeleteAskedFromMain();
+ }
+
+ for (auto& [vDiskId, ev]: vDiskToQueries) {
ui32 msgSize = ev->CalculateSerializedSize();
TReplQuoter::QuoteMessage(
Quoter,
std::make_unique<IEventHandle>(QueueActorMapPtr->at(TVDiskIdShort(vDiskId)), selfId, ev.release()),
msgSize
);
- ++ExpectedResponses;
- }
- }
-
- std::pair<std::optional<TVector<TPartOnMain>>, ui32> TryGetResults() {
- if (ExpectedResponses == Responses) {
- ExpectedResponses = 0;
- Responses = 0;
- return {std::move(Result), Parts.size()};
+ ++RequestsSent;
}
- return {std::nullopt, Parts.size()};
}
void Handle(TEvBlobStorage::TEvVGetResult::TPtr ev) {
@@ -88,14 +84,81 @@ namespace {
if (msg.GetStatus() != NKikimrProto::EReplyStatus::OK) {
return;
}
- ui64 i = msg.GetCookie();
- auto res = msg.GetResult().at(0);
- for (ui32 partId: res.GetParts()) {
- if (partId == Result[i].Key.PartId()) {
- Result[i].HasOnMain = true;
+
+ for (const auto& res: msg.GetResult()) {
+ auto i = res.GetCookie();
+ Y_DEBUG_ABORT_UNLESS(i < Result.size());
+ for (ui32 partId: res.GetParts()) {
+ if (partId == Result[i].Key.PartId()) {
+ Result[i].HasOnMain = true;
+ }
}
+ ++MonGroup.CandidatesToDeleteAskedFromMainResponse();
}
}
+
+ ui32 GetPartsSize() const {
+ return Parts.size();
+ }
+
+ bool IsDone() const {
+ return Responses == RequestsSent;
+ }
+
+ const TVector<TPartOnMain>& GetResult() const {
+ return Result;
+ }
+ };
+
+ class TPartsDeleter {
+ private:
+ std::shared_ptr<TBalancingCtx> Ctx;
+ TIntrusivePtr<TBlobStorageGroupInfo> GInfo;
+ ui32 OrderId = 0;
+
+ ui32 RequestsSent = 0;
+ ui32 Responses = 0;
+ public:
+ TPartsDeleter(std::shared_ptr<TBalancingCtx> ctx, TIntrusivePtr<TBlobStorageGroupInfo> gInfo)
+ : Ctx(ctx)
+ , GInfo(gInfo)
+ {}
+
+ void DeleteParts(TActorId selfId, const TVector<TPartOnMain>& parts) {
+ for (const auto& part: parts) {
+ if (part.HasOnMain) {
+ DeleteLocal(selfId, part.Key);
+ }
+ }
+ }
+
+ void DeleteLocal(TActorId selfId, const TLogoBlobID& key) {
+ TLogoBlobID keyWithoutPartId(key, 0);
+
+ TIngress ingress;
+ ingress.DeleteHandoff(&GInfo->GetTopology(), Ctx->VCtx->ShortSelfVDisk, key);
+
+ STLOG(PRI_DEBUG, BS_VDISK_BALANCING, BSVB20, VDISKP(Ctx->VCtx, "Deleting local"), (LogoBlobID, key.ToString()),
+ (Ingress, ingress.ToString(&GInfo->GetTopology(), Ctx->VCtx->ShortSelfVDisk, keyWithoutPartId)));
+
+ TlsActivationContext->Send(
+ new IEventHandle(Ctx->SkeletonId, selfId, new TEvDelLogoBlobDataSyncLog(keyWithoutPartId, ingress, OrderId++)));
+
+ ++Ctx->MonGroup.MarkedReadyToDelete();
+ Ctx->MonGroup.MarkedReadyToDeleteBytes() += GInfo->GetTopology().GType.PartSize(key);
+ ++RequestsSent;
+ }
+
+ void Handle(TEvDelLogoBlobDataSyncLogResult::TPtr ev) {
+ ++Responses;
+ ++Ctx->MonGroup.MarkedReadyToDeleteResponse();
+ Ctx->MonGroup.MarkedReadyToDeleteWithResponseBytes() += GInfo->GetTopology().GType.PartSize(ev->Get()->Id);
+ STLOG(PRI_DEBUG, BS_VDISK_BALANCING, BSVB21, VDISKP(Ctx->VCtx, "Deleted local"), (LogoBlobID, ev->Get()->Id));
+ }
+
+ bool IsDone() const {
+ return Responses == RequestsSent;
+ }
};
@@ -104,56 +167,92 @@ namespace {
std::shared_ptr<TBalancingCtx> Ctx;
TIntrusivePtr<TBlobStorageGroupInfo> GInfo;
TPartsRequester PartsRequester;
- ui32 OrderId = 0;
+ TPartsDeleter PartsDeleter;
+
+ ///////////////////////////////////////////////////////////////////////////////////////////
+ // RequestState
+ ///////////////////////////////////////////////////////////////////////////////////////////
+
+ void SendRequestsToCheckPartsOnMain() {
+ Become(&TThis::RequestState);
- struct TStats {
- ui32 PartsRequested = 0;
- ui32 PartsDecidedToDelete = 0;
- ui32 PartsMarkedDeleted = 0;
- };
- TStats Stats;
+ STLOG(PRI_INFO, BS_VDISK_BALANCING, BSVB22, VDISKP(Ctx->VCtx, "SendRequestsToCheckPartsOnMain"), (Parts, PartsRequester.GetPartsSize()));
+
+ if (PartsRequester.GetPartsSize() == 0) {
+ STLOG(PRI_DEBUG, BS_VDISK_BALANCING, BSVB23, VDISKP(Ctx->VCtx, "Nothing to request. PassAway"));
+ PassAway();
+ return;
+ }
- void ScheduleJobQuant() {
- PartsRequester.ScheduleJobQuant(SelfId());
- TryProcessResults();
+ PartsRequester.SendRequestsToCheckPartsOnMain(SelfId());
+
+ Schedule(TDuration::Seconds(15), new NActors::TEvents::TEvWakeup(REQUEST_TIMEOUT_TAG)); // read timeout
}
void Handle(TEvBlobStorage::TEvVGetResult::TPtr ev) {
PartsRequester.Handle(ev);
- TryProcessResults();
+ if (PartsRequester.IsDone()) {
+ DeleteLocalParts();
+ }
}
- void TryProcessResults() {
- if (auto [batch, partsLeft] = PartsRequester.TryGetResults(); batch.has_value()) {
- Stats.PartsRequested += batch->size();
- for (auto& part: *batch) {
- if (part.HasOnMain) {
- ++Stats.PartsDecidedToDelete;
- DeleteLocal(part.Key);
- }
- }
- Send(NotifyId, new NActors::TEvents::TEvCompleted(DELETER_ID, partsLeft));
- if (partsLeft == 0) {
- PassAway();
- }
+ void TimeoutRequest(NActors::TEvents::TEvWakeup::TPtr ev) {
+ if (ev->Get()->Tag != REQUEST_TIMEOUT_TAG) {
+ return;
}
+ STLOG(PRI_DEBUG, BS_VDISK_BALANCING, BSVB24, VDISKP(Ctx->VCtx, "SendRequestsToCheckPartsOnMain timeout"));
+ DeleteLocalParts();
}
- void DeleteLocal(const TLogoBlobID& key) {
- TLogoBlobID keyWithoutPartId(key, 0);
+ STRICT_STFUNC(RequestState,
+ hFunc(TEvBlobStorage::TEvVGetResult, Handle)
+ hFunc(NActors::TEvents::TEvWakeup, TimeoutRequest)
- TIngress ingress;
- ingress.DeleteHandoff(&GInfo->GetTopology(), Ctx->VCtx->ShortSelfVDisk, key);
+ hFunc(TEvVGenerationChange, Handle)
+ cFunc(NActors::TEvents::TEvPoison::EventType, PassAway)
+ );
- STLOG(PRI_DEBUG, BS_VDISK_BALANCING, BSVB10, VDISKP(Ctx->VCtx, "Deleting local"), (LogoBlobID, key.ToString()),
- (Ingress, ingress.ToString(&GInfo->GetTopology(), Ctx->VCtx->ShortSelfVDisk, keyWithoutPartId)));
- Send(Ctx->SkeletonId, new TEvDelLogoBlobDataSyncLog(keyWithoutPartId, ingress, OrderId++));
+ ///////////////////////////////////////////////////////////////////////////////////////////
+ // DeleteState
+ ///////////////////////////////////////////////////////////////////////////////////////////
+
+ void DeleteLocalParts() {
+ Become(&TThis::DeleteState);
+
+ if (PartsRequester.GetResult().empty()) {
+ STLOG(PRI_DEBUG, BS_VDISK_BALANCING, BSVB25, VDISKP(Ctx->VCtx, "Nothing to delete. PassAway"));
+ PassAway();
+ return;
+ }
+
+ {
+ ui32 partsOnMain = 0;
+ for (const auto& part: PartsRequester.GetResult()) {
+ partsOnMain += part.HasOnMain;
+ }
+ STLOG(PRI_INFO, BS_VDISK_BALANCING, BSVB26, VDISKP(Ctx->VCtx, "DeleteLocalParts"), (Parts, PartsRequester.GetResult().size()), (PartsOnMain, partsOnMain));
+ }
+
+ PartsDeleter.DeleteParts(SelfId(), PartsRequester.GetResult());
+
+ Schedule(TDuration::Seconds(15), new NActors::TEvents::TEvWakeup(DELETE_TIMEOUT_TAG)); // delete timeout
}
- void Handle(TEvDelLogoBlobDataSyncLogResult::TPtr ev) {
- Y_VERIFY(ev->Get()->OrderId == Stats.PartsMarkedDeleted++);
- ++Ctx->MonGroup.MarkedReadyToDelete();
+ void HandleDelLogoBlobResult(TEvDelLogoBlobDataSyncLogResult::TPtr ev) {
+ PartsDeleter.Handle(ev);
+ if (PartsDeleter.IsDone()) {
+ STLOG(PRI_INFO, BS_VDISK_BALANCING, BSVB27, VDISKP(Ctx->VCtx, "DeleteLocalParts done"));
+ PassAway();
+ }
+ }
+
+ void TimeoutDelete(NActors::TEvents::TEvWakeup::TPtr ev) {
+ if (ev->Get()->Tag != DELETE_TIMEOUT_TAG) {
+ return;
+ }
+ STLOG(PRI_INFO, BS_VDISK_BALANCING, BSVB28, VDISKP(Ctx->VCtx, "DeleteLocalParts timeout"));
+ PassAway();
}
void PassAway() override {
@@ -161,47 +260,51 @@ namespace {
TActorBootstrapped::PassAway();
}
- void Handle(TEvVGenerationChange::TPtr ev) {
- GInfo = ev->Get()->NewInfo;
- }
-
- STRICT_STFUNC(StateFunc,
- cFunc(NActors::TEvents::TEvWakeup::EventType, ScheduleJobQuant)
- hFunc(TEvBlobStorage::TEvVGetResult, Handle)
- hFunc(TEvDelLogoBlobDataSyncLogResult, Handle)
- cFunc(NActors::TEvents::TEvPoison::EventType, PassAway)
+ STRICT_STFUNC(DeleteState,
+ hFunc(TEvDelLogoBlobDataSyncLogResult, HandleDelLogoBlobResult)
+ hFunc(NActors::TEvents::TEvWakeup, TimeoutDelete)
hFunc(TEvVGenerationChange, Handle)
+ cFunc(NActors::TEvents::TEvPoison::EventType, PassAway)
);
+ ///////////////////////////////////////////////////////////////////////////////////////////
+ // Helper functions
+ ///////////////////////////////////////////////////////////////////////////////////////////
+
+ void Handle(TEvVGenerationChange::TPtr ev) {
+ GInfo = ev->Get()->NewInfo;
+ }
+
public:
TDeleter() = default;
TDeleter(
TActorId notifyId,
- TQueue<TLogoBlobID> parts,
+ TConstArrayRef<TLogoBlobID> parts,
TQueueActorMapPtr queueActorMapPtr,
std::shared_ptr<TBalancingCtx> ctx
)
: NotifyId(notifyId)
, Ctx(ctx)
, GInfo(ctx->GInfo)
- , PartsRequester(SelfId(), 32, std::move(parts), Ctx->VCtx->ReplNodeRequestQuoter, GInfo, queueActorMapPtr)
+ , PartsRequester(SelfId(), parts, Ctx->VCtx->ReplNodeRequestQuoter, GInfo, queueActorMapPtr, Ctx->MonGroup)
+ , PartsDeleter(ctx, GInfo)
{
}
void Bootstrap() {
- Become(&TThis::StateFunc);
+ SendRequestsToCheckPartsOnMain();
}
};
}
IActor* CreateDeleterActor(
TActorId notifyId,
- TQueue<TLogoBlobID> parts,
+ TConstArrayRef<TLogoBlobID> parts,
TQueueActorMapPtr queueActorMapPtr,
std::shared_ptr<TBalancingCtx> ctx
) {
- return new TDeleter(notifyId, std::move(parts), queueActorMapPtr, ctx);
+ return new TDeleter(notifyId, parts, queueActorMapPtr, ctx);
}
} // NBalancing
diff --git a/ydb/core/blobstorage/vdisk/balance/sender.cpp b/ydb/core/blobstorage/vdisk/balance/sender.cpp
index c68b5509433..35bdfcdb3ec 100644
--- a/ydb/core/blobstorage/vdisk/balance/sender.cpp
+++ b/ydb/core/blobstorage/vdisk/balance/sender.cpp
@@ -18,38 +18,29 @@ namespace {
class TReader {
private:
- const size_t BatchSize;
TPDiskCtxPtr PDiskCtx;
- TQueue<TPartInfo> Parts;
+ TConstArrayRef<TPartInfo> Parts;
TReplQuoter::TPtr Quoter;
const TBlobStorageGroupType GType;
+ NMonGroup::TBalancingGroup& MonGroup;
TVector<TPart> Result;
- ui32 Responses;
- ui32 ExpectedResponses;
+ ui32 Responses = 0;
public:
- TReader(size_t batchSize, TPDiskCtxPtr pDiskCtx, TQueue<TPartInfo> parts, TReplQuoter::TPtr replPDiskReadQuoter, TBlobStorageGroupType gType)
- : BatchSize(batchSize)
- , PDiskCtx(pDiskCtx)
- , Parts(std::move(parts))
+ TReader(TPDiskCtxPtr pDiskCtx, TConstArrayRef<TPartInfo> parts, TReplQuoter::TPtr replPDiskReadQuoter, TBlobStorageGroupType gType, NMonGroup::TBalancingGroup& monGroup)
+ : PDiskCtx(pDiskCtx)
+ , Parts(parts)
, Quoter(replPDiskReadQuoter)
, GType(gType)
- , Result(Reserve(BatchSize))
- , Responses(0)
- , ExpectedResponses(0)
+ , MonGroup(monGroup)
+ , Result(parts.size())
{}
- void ScheduleJobQuant(const TActorId& selfId) {
- Result.resize(Min(Parts.size(), BatchSize));
- ExpectedResponses = 0;
- for (ui64 i = 0; i < BatchSize && !Parts.empty(); ++i) {
- auto item = Parts.front();
- Parts.pop();
- Result[i] = TPart{
- .Key=item.Key,
- .PartsMask=item.PartsMask,
- };
+ void SendReadRequests(const TActorId& selfId) {
+ for (ui32 i = 0; i < Parts.size(); ++i) {
+ const auto& item = Parts[i];
+ Result[i] = TPart{.Key=item.Key, .PartsMask=item.PartsMask};
std::visit(TOverloaded{
[&](const TRope& data) {
// part is already in memory, no need to read it from disk
@@ -73,21 +64,12 @@ namespace {
std::make_unique<IEventHandle>(PDiskCtx->PDiskId, selfId, ev.release()),
diskPart.Size
);
+ MonGroup.ReadFromHandoffBytes() += diskPart.Size;
}
}, item.PartData);
- ++ExpectedResponses;
}
}
- std::pair<std::optional<TVector<TPart>>, ui32> TryGetResults() {
- if (ExpectedResponses == Responses) {
- ExpectedResponses = 0;
- Responses = 0;
- return {std::move(Result), Parts.size()};
- }
- return {std::nullopt, Parts.size()};
- }
-
void Handle(NPDisk::TEvChunkReadResult::TPtr ev) {
++Responses;
auto *msg = ev->Get();
@@ -99,15 +81,138 @@ namespace {
auto data = TRope(msg->Data.ToString());
auto localParts = Result[i].PartsMask;
auto diskBlob = TDiskBlob(&data, localParts, GType, key);
+ ui32 readSize = 0;
for (ui8 partIdx = localParts.FirstPosition(); partIdx < localParts.GetSize(); partIdx = localParts.NextPosition(partIdx)) {
TRope result;
result = diskBlob.GetPart(partIdx, &result);
+ readSize += result.size();
Result[i].PartsData.emplace_back(std::move(result));
}
+
+ MonGroup.ReadFromHandoffResponseBytes() += readSize;
+ }
+
+ ui32 GetPartsSize() const {
+ return Parts.size();
+ }
+
+ ui32 GetResponses() const {
+ return Responses;
+ }
+
+ bool IsDone() const {
+ return Responses == Parts.size();
+ }
+
+ TVector<TPart>& GetResult() {
+ return Result;
}
};
+ class TPartsSender {
+ private:
+ std::shared_ptr<TBalancingCtx> Ctx;
+ TIntrusivePtr<TBlobStorageGroupInfo> GInfo;
+ TQueueActorMapPtr QueueActorMapPtr;
+
+ ui32 RequestsSent = 0;
+ ui32 Responses = 0;
+ public:
+
+ TPartsSender(
+ std::shared_ptr<TBalancingCtx> ctx,
+ TIntrusivePtr<TBlobStorageGroupInfo> gInfo,
+ TQueueActorMapPtr queueActorMapPtr
+ )
+ : Ctx(ctx)
+ , GInfo(gInfo)
+ , QueueActorMapPtr(queueActorMapPtr)
+ {}
+
+ void SendRequest(const TVDiskIdShort& vDiskId, const TActorId& selfId, IEventBase* ev, ui32 dataSize) {
+ auto& queue = (*QueueActorMapPtr)[vDiskId];
+ TReplQuoter::QuoteMessage(
+ Ctx->VCtx->ReplNodeRequestQuoter,
+ std::make_unique<IEventHandle>(queue, selfId, ev),
+ dataSize
+ );
+ RequestsSent++;
+ Ctx->MonGroup.SentOnMainBytes() += dataSize;
+ }
+
+ void SendPartsOnMain(const TActorId& selfId, TVector<TPart>& parts) {
+ THashMap<TVDiskID, std::unique_ptr<TEvBlobStorage::TEvVMultiPut>> vDiskToEv;
+ for (auto& part: parts) {
+ auto localParts = part.PartsMask;
+ for (ui8 partIdx = localParts.FirstPosition(), i = 0; partIdx < localParts.GetSize(); partIdx = localParts.NextPosition(partIdx), ++i) {
+ auto key = TLogoBlobID(part.Key, partIdx + 1);
+ auto& data = part.PartsData[i];
+ auto vDiskId = GetMainReplicaVDiskId(*GInfo, key);
+
+ if (Ctx->HugeBlobCtx->IsHugeBlob(GInfo->GetTopology().GType, part.Key, Ctx->MinREALHugeBlobInBytes)) {
+ auto ev = std::make_unique<TEvBlobStorage::TEvVPut>(
+ key, data, vDiskId,
+ true, nullptr,
+ TInstant::Max(), NKikimrBlobStorage::EPutHandleClass::AsyncBlob
+ );
+ SendRequest(TVDiskIdShort(vDiskId), selfId, ev.release(), data.size());
+ } else {
+ STLOG(PRI_DEBUG, BS_VDISK_BALANCING, BSVB11, VDISKP(Ctx->VCtx, "Add in multiput"), (LogoBlobId, key.ToString()),
+ (To, GInfo->GetTopology().GetOrderNumber(TVDiskIdShort(vDiskId))), (DataSize, data.size()));
+
+ auto& ev = vDiskToEv[vDiskId];
+ if (!ev) {
+ ev = std::make_unique<TEvBlobStorage::TEvVMultiPut>(vDiskId, TInstant::Max(), NKikimrBlobStorage::EPutHandleClass::AsyncBlob, true, nullptr);
+ }
+
+ ev->AddVPut(key, TRcBuf(data), nullptr, {}, NWilson::TTraceId());
+ }
+ }
+ }
+
+ for (auto& [vDiskId, ev]: vDiskToEv) {
+ STLOG(PRI_DEBUG, BS_VDISK_BALANCING, BSVB12, VDISKP(Ctx->VCtx, "Send multiput"), (VDisk, vDiskId.ToString()));
+
+ ui32 blobsSize = 0;
+ for (const auto& item: ev->Record.GetItems()) {
+ blobsSize += item.GetBuffer().size();
+ }
+
+ SendRequest(TVDiskIdShort(vDiskId), selfId, ev.release(), blobsSize);
+ }
+ }
+
+ void Handle(TEvBlobStorage::TEvVPutResult::TPtr ev) {
+ ++Responses;
+ if (ev->Get()->Record.GetStatus() != NKikimrProto::OK) {
+ STLOG(PRI_DEBUG, BS_VDISK_BALANCING, BSVB13, VDISKP(Ctx->VCtx, "Put failed"), (Msg, ev->Get()->ToString()));
+ } else {
+ ++Ctx->MonGroup.SentOnMain();
+ Ctx->MonGroup.SentOnMainWithResponseBytes() += GInfo->GetTopology().GType.PartSize(LogoBlobIDFromLogoBlobID(ev->Get()->Record.GetBlobID()));
+ STLOG(PRI_DEBUG, BS_VDISK_BALANCING, BSVB14, VDISKP(Ctx->VCtx, "Put done"), (Msg, ev->Get()->ToString()));
+ }
+ }
+
+ void Handle(TEvBlobStorage::TEvVMultiPutResult::TPtr ev) {
+ ++Responses;
+ const auto& items = ev->Get()->Record.GetItems();
+ for (const auto& item: items) {
+ if (item.GetStatus() != NKikimrProto::OK) {
+ STLOG(PRI_DEBUG, BS_VDISK_BALANCING, BSVB15, VDISKP(Ctx->VCtx, "MultiPut failed"), (Key, LogoBlobIDFromLogoBlobID(item.GetBlobID()).ToString()));
+ continue;
+ }
+ ++Ctx->MonGroup.SentOnMain();
+ Ctx->MonGroup.SentOnMainWithResponseBytes() += GInfo->GetTopology().GType.PartSize(LogoBlobIDFromLogoBlobID(item.GetBlobID()));
+ STLOG(PRI_DEBUG, BS_VDISK_BALANCING, BSVB16, VDISKP(Ctx->VCtx, "MultiPut done"), (Key, LogoBlobIDFromLogoBlobID(item.GetBlobID()).ToString()));
+ }
+ }
+
+ bool IsDone() const {
+ return Responses == RequestsSent;
+ }
+
+ };
class TSender : public TActorBootstrapped<TSender> {
TActorId NotifyId;
@@ -115,76 +220,85 @@ namespace {
std::shared_ptr<TBalancingCtx> Ctx;
TIntrusivePtr<TBlobStorageGroupInfo> GInfo;
TReader Reader;
+ TPartsSender Sender;
- struct TStats {
- ui32 PartsRead = 0;
- ui32 PartsSent = 0;
- ui32 PartsSentUnsuccsesfull = 0;
- };
- TStats Stats;
+ ///////////////////////////////////////////////////////////////////////////////////////////
+ // StateRead
+ ///////////////////////////////////////////////////////////////////////////////////////////
- void ScheduleJobQuant() {
- Reader.ScheduleJobQuant(SelfId());
- // if all parts are already in memory, we could process results right away
- TryProcessResults();
- }
+ void ReadPartsFromDisk() {
+ Become(&TThis::StateRead);
- void TryProcessResults() {
- if (auto [batch, partsLeft] = Reader.TryGetResults(); batch.has_value()) {
- Stats.PartsRead += batch->size();
- SendParts(*batch);
-
- // notify about job quant completion
- Send(NotifyId, new NActors::TEvents::TEvCompleted(SENDER_ID, partsLeft));
+ if (Reader.GetPartsSize() == 0) {
+ STLOG(PRI_DEBUG, BS_VDISK_BALANCING, BSVB10, VDISKP(Ctx->VCtx, "Nothing to read. PassAway"));
+ PassAway();
+ return;
+ }
- if (partsLeft == 0) {
- // no more parts to send
- PassAway();
- }
+ Reader.SendReadRequests(SelfId());
+ if (Reader.IsDone()) {
+ SendPartsOnMain();
+ return;
}
+
+ Schedule(TDuration::Seconds(15), new NActors::TEvents::TEvWakeup(READ_TIMEOUT_TAG)); // read timeout
}
void Handle(NPDisk::TEvChunkReadResult::TPtr ev) {
Reader.Handle(ev);
- TryProcessResults();
+ if (Reader.IsDone()) {
+ SendPartsOnMain();
+ }
}
- void SendParts(const TVector<TPart>& batch) {
- STLOG(PRI_DEBUG, BS_VDISK_BALANCING, BSVB11, VDISKP(Ctx->VCtx, "Sending parts"), (BatchSize, batch.size()));
+ void TimeoutRead(NActors::TEvents::TEvWakeup::TPtr ev) {
+ if (ev->Get()->Tag != READ_TIMEOUT_TAG) {
+ return;
+ }
+ STLOG(PRI_INFO, BS_VDISK_BALANCING, BSVB17, VDISKP(Ctx->VCtx, "TimeoutRead"), (Requests, Reader.GetPartsSize()), (Responses, Reader.GetResponses()));
+ SendPartsOnMain();
+ }
- for (const auto& part: batch) {
- auto localParts = part.PartsMask;
- for (ui8 partIdx = localParts.FirstPosition(), i = 0; partIdx < localParts.GetSize(); partIdx = localParts.NextPosition(partIdx), ++i) {
- auto key = TLogoBlobID(part.Key, partIdx + 1);
- const auto& data = part.PartsData[i];
- auto vDiskId = GetMainReplicaVDiskId(*GInfo, key);
- STLOG(PRI_DEBUG, BS_VDISK_BALANCING, BSVB12, VDISKP(Ctx->VCtx, "Sending"), (LogoBlobId, key.ToString()),
- (To, GInfo->GetTopology().GetOrderNumber(TVDiskIdShort(vDiskId))), (DataSize, data.size()));
-
- auto& queue = (*QueueActorMapPtr)[TVDiskIdShort(vDiskId)];
- auto ev = std::make_unique<TEvBlobStorage::TEvVPut>(
- key, data, vDiskId,
- true, nullptr,
- TInstant::Max(), NKikimrBlobStorage::EPutHandleClass::AsyncBlob
- );
- TReplQuoter::QuoteMessage(
- Ctx->VCtx->ReplNodeRequestQuoter,
- std::make_unique<IEventHandle>(queue, SelfId(), ev.release()),
- data.size()
- );
- }
+ STRICT_STFUNC(StateRead,
+ hFunc(NPDisk::TEvChunkReadResult, Handle)
+ hFunc(NActors::TEvents::TEvWakeup, TimeoutRead)
+
+ hFunc(TEvVGenerationChange, Handle)
+ cFunc(NActors::TEvents::TEvPoison::EventType, PassAway)
+ );
+
+ ///////////////////////////////////////////////////////////////////////////////////////////
+ // StateSend
+ ///////////////////////////////////////////////////////////////////////////////////////////
+
+ void SendPartsOnMain() {
+ Become(&TThis::StateSend);
+
+ if (Reader.GetResult().empty()) {
+ STLOG(PRI_DEBUG, BS_VDISK_BALANCING, BSVB18, VDISKP(Ctx->VCtx, "Nothing to send. PassAway"));
+ PassAway();
+ return;
}
+
+ Sender.SendPartsOnMain(SelfId(), Reader.GetResult());
+
+ Schedule(TDuration::Seconds(15), new NActors::TEvents::TEvWakeup(SEND_TIMEOUT_TAG)); // send timeout
}
- void Handle(TEvBlobStorage::TEvVPutResult::TPtr ev) {
- ++Stats.PartsSent;
- if (ev->Get()->Record.GetStatus() != NKikimrProto::OK) {
- ++Stats.PartsSentUnsuccsesfull;
- STLOG(PRI_WARN, BS_VDISK_BALANCING, BSVB13, VDISKP(Ctx->VCtx, "Put failed"), (Msg, ev->Get()->ToString()));
+ template<class TEvPutResult>
+ void HandlePutResult(TEvPutResult ev) {
+ Sender.Handle(ev);
+ if (Sender.IsDone()) {
+ PassAway();
+ }
+ }
+
+ void TimeoutSend(NActors::TEvents::TEvWakeup::TPtr ev) {
+ if (ev->Get()->Tag != SEND_TIMEOUT_TAG) {
return;
}
- ++Ctx->MonGroup.SentOnMain();
- STLOG(PRI_DEBUG, BS_VDISK_BALANCING, BSVB14, VDISKP(Ctx->VCtx, "Put done"), (Msg, ev->Get()->ToString()));
+ STLOG(PRI_DEBUG, BS_VDISK_BALANCING, BSVB19, VDISKP(Ctx->VCtx, "TimeoutSend"));
+ PassAway();
}
void PassAway() override {
@@ -192,23 +306,27 @@ namespace {
TActorBootstrapped::PassAway();
}
- void Handle(TEvVGenerationChange::TPtr ev) {
- GInfo = ev->Get()->NewInfo;
- }
-
- STRICT_STFUNC(StateFunc,
- cFunc(NActors::TEvents::TEvWakeup::EventType, ScheduleJobQuant)
- hFunc(NPDisk::TEvChunkReadResult, Handle)
- hFunc(TEvBlobStorage::TEvVPutResult, Handle)
- cFunc(NActors::TEvents::TEvPoison::EventType, PassAway)
+ STRICT_STFUNC(StateSend,
+ hFunc(NActors::TEvents::TEvWakeup, TimeoutSend)
+ hFunc(TEvBlobStorage::TEvVPutResult, HandlePutResult)
+ hFunc(TEvBlobStorage::TEvVMultiPutResult, HandlePutResult)
hFunc(TEvVGenerationChange, Handle)
+ cFunc(NActors::TEvents::TEvPoison::EventType, PassAway)
);
+ ///////////////////////////////////////////////////////////////////////////////////////////
+ // Helper functions
+ ///////////////////////////////////////////////////////////////////////////////////////////
+
+ void Handle(TEvVGenerationChange::TPtr ev) {
+ GInfo = ev->Get()->NewInfo;
+ }
+
public:
TSender(
TActorId notifyId,
- TQueue<TPartInfo> parts,
+ TConstArrayRef<TPartInfo> parts,
TQueueActorMapPtr queueActorMapPtr,
std::shared_ptr<TBalancingCtx> ctx
)
@@ -216,18 +334,19 @@ namespace {
, QueueActorMapPtr(queueActorMapPtr)
, Ctx(ctx)
, GInfo(ctx->GInfo)
- , Reader(32, Ctx->PDiskCtx, std::move(parts), ctx->VCtx->ReplPDiskReadQuoter, GInfo->GetTopology().GType)
+ , Reader(Ctx->PDiskCtx, parts, ctx->VCtx->ReplPDiskReadQuoter, GInfo->GetTopology().GType, Ctx->MonGroup)
+ , Sender(ctx, GInfo, queueActorMapPtr)
{}
void Bootstrap() {
- Become(&TThis::StateFunc);
+ ReadPartsFromDisk();
}
};
}
IActor* CreateSenderActor(
TActorId notifyId,
- TQueue<TPartInfo> parts,
+ TConstArrayRef<TPartInfo> parts,
TQueueActorMapPtr queueActorMapPtr,
std::shared_ptr<TBalancingCtx> ctx
) {
diff --git a/ydb/core/blobstorage/vdisk/common/vdisk_mongroups.h b/ydb/core/blobstorage/vdisk/common/vdisk_mongroups.h
index 28fa9e6f37d..93b476cbc20 100644
--- a/ydb/core/blobstorage/vdisk/common/vdisk_mongroups.h
+++ b/ydb/core/blobstorage/vdisk/common/vdisk_mongroups.h
@@ -592,16 +592,39 @@ public:
public:
GROUP_CONSTRUCTOR(TBalancingGroup)
{
+ COUNTER_INIT(BalancingIterations, true);
+
COUNTER_INIT(PlannedToSendOnMain, false);
- COUNTER_INIT(SentOnMain, false);
COUNTER_INIT(CandidatesToDelete, false);
- COUNTER_INIT(MarkedReadyToDelete, false);
+
+ COUNTER_INIT(ReadFromHandoffBytes, true);
+ COUNTER_INIT(ReadFromHandoffResponseBytes, true);
+ COUNTER_INIT(SentOnMain, true);
+ COUNTER_INIT(SentOnMainBytes, true);
+ COUNTER_INIT(SentOnMainWithResponseBytes, true);
+
+ COUNTER_INIT(CandidatesToDeleteAskedFromMain, true);
+ COUNTER_INIT(CandidatesToDeleteAskedFromMainResponse, true);
+ COUNTER_INIT(MarkedReadyToDelete, true);
+ COUNTER_INIT(MarkedReadyToDeleteBytes, true);
+ COUNTER_INIT(MarkedReadyToDeleteResponse, true);
+ COUNTER_INIT(MarkedReadyToDeleteWithResponseBytes, true);
}
+ COUNTER_DEF(BalancingIterations);
COUNTER_DEF(PlannedToSendOnMain);
+ COUNTER_DEF(ReadFromHandoffBytes);
+ COUNTER_DEF(ReadFromHandoffResponseBytes);
COUNTER_DEF(SentOnMain);
+ COUNTER_DEF(SentOnMainBytes);
+ COUNTER_DEF(SentOnMainWithResponseBytes);
COUNTER_DEF(CandidatesToDelete);
+ COUNTER_DEF(CandidatesToDeleteAskedFromMain);
+ COUNTER_DEF(CandidatesToDeleteAskedFromMainResponse);
COUNTER_DEF(MarkedReadyToDelete);
+ COUNTER_DEF(MarkedReadyToDeleteBytes);
+ COUNTER_DEF(MarkedReadyToDeleteResponse);
+ COUNTER_DEF(MarkedReadyToDeleteWithResponseBytes);
};
///////////////////////////////////////////////////////////////////////////////////
diff --git a/ydb/core/blobstorage/vdisk/common/vdisk_private_events.h b/ydb/core/blobstorage/vdisk/common/vdisk_private_events.h
index 64c92463539..3f73fa30ae5 100644
--- a/ydb/core/blobstorage/vdisk/common/vdisk_private_events.h
+++ b/ydb/core/blobstorage/vdisk/common/vdisk_private_events.h
@@ -58,12 +58,14 @@ namespace NKikimr {
: public TEventLocal<TEvDelLogoBlobDataSyncLogResult, TEvBlobStorage::EvDelLogoBlobDataSyncLogResult>
, public TEvVResultBase
{
+ const TLogoBlobID Id;
const ui64 OrderId;
- TEvDelLogoBlobDataSyncLogResult(ui64 orderId, const TInstant &now,
+ TEvDelLogoBlobDataSyncLogResult(const TLogoBlobID &id, ui64 orderId, const TInstant &now,
::NMonitoring::TDynamicCounters::TCounterPtr counterPtr, NVDiskMon::TLtcHistoPtr histoPtr)
: TEvVResultBase(now, TInterconnectChannels::IC_BLOBSTORAGE_SMALL_MSG, counterPtr,
histoPtr)
+ , Id(id)
, OrderId(orderId)
{}
};
diff --git a/ydb/core/blobstorage/vdisk/ingress/blobstorage_ingress.cpp b/ydb/core/blobstorage/vdisk/ingress/blobstorage_ingress.cpp
index 17f7e36482a..7aa332bad0d 100644
--- a/ydb/core/blobstorage/vdisk/ingress/blobstorage_ingress.cpp
+++ b/ydb/core/blobstorage/vdisk/ingress/blobstorage_ingress.cpp
@@ -233,7 +233,8 @@ namespace NKikimr {
TVectorType m = handoff[handoffNodeId].ToVector(); // map of handoff replicas on this node
TVectorType mainVec = main.ToVector();
TVectorType toMove = m - mainVec; // what we can send to main replicas
- TVectorType toDel = m & mainVec; // what we can delete
+ TVectorType deleted = GetVDiskHandoffDeletedVec(top, vdisk, id);
+ TVectorType toDel = m & mainVec & ~deleted; // not deleted, what we can to delete
return TPairOfVectors(toMove, toDel);
}
}
diff --git a/ydb/core/blobstorage/vdisk/skeleton/blobstorage_skeleton.cpp b/ydb/core/blobstorage/vdisk/skeleton/blobstorage_skeleton.cpp
index 043b5300e30..69975e9b2fb 100644
--- a/ydb/core/blobstorage/vdisk/skeleton/blobstorage_skeleton.cpp
+++ b/ydb/core/blobstorage/vdisk/skeleton/blobstorage_skeleton.cpp
@@ -853,7 +853,7 @@ namespace NKikimr {
std::unique_ptr<NSyncLog::TEvSyncLogPut> syncLogMsg(
new NSyncLog::TEvSyncLogPut(Db->GType, seg.Point(), msg->Id, msg->Ingress));
- std::unique_ptr<TEvDelLogoBlobDataSyncLogResult> result(new TEvDelLogoBlobDataSyncLogResult(msg->OrderId, now,
+ std::unique_ptr<TEvDelLogoBlobDataSyncLogResult> result(new TEvDelLogoBlobDataSyncLogResult(msg->Id, msg->OrderId, now,
nullptr, nullptr));
bool confirmSyncLogAlso = static_cast<bool>(syncLogMsg);
@@ -2539,8 +2539,7 @@ namespace NKikimr {
ActiveActors.Erase(BalancingId);
}
auto balancingCtx = std::make_shared<TBalancingCtx>(
- VCtx, PDiskCtx, SelfId(), Hull->GetSnapshot(), Config, GInfo
- );
+ VCtx, PDiskCtx, HugeBlobCtx, SelfId(), Hull->GetSnapshot(), Config, GInfo, MinREALHugeBlobInBytes);
BalancingId = ctx.Register(CreateBalancingActor(balancingCtx));
ActiveActors.Insert(BalancingId, __FILE__, __LINE__, ctx, NKikimrServices::BLOBSTORAGE);
}