aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorRobert Drynkin <rob.drynkin@gmail.com>2023-12-22 16:23:19 +0100
committerGitHub <noreply@github.com>2023-12-22 16:23:19 +0100
commit7116d46ae7c0259b5f9d489de263f8701e432b1c (patch)
tree94bea84b0f1de413ac3d2fde52502acb78090e10
parent86194b9ee88b97e6442d2af9060ff874fc7216d9 (diff)
downloadydb-7116d46ae7c0259b5f9d489de263f8701e432b1c.tar.gz
KIKIMR-20522: Tests for vdisks balancing (#531)
Co-authored-by: robdrynkin <robdrynkin@nebius.com>
-rw-r--r--ydb/core/blobstorage/ut_blobstorage/balancing.cpp305
-rw-r--r--ydb/core/blobstorage/ut_blobstorage/ut_balancing/ya.make15
-rw-r--r--ydb/core/blobstorage/ut_blobstorage/ya.make1
-rw-r--r--ydb/core/util/testactorsys.h2
4 files changed, 322 insertions, 1 deletions
diff --git a/ydb/core/blobstorage/ut_blobstorage/balancing.cpp b/ydb/core/blobstorage/ut_blobstorage/balancing.cpp
new file mode 100644
index 0000000000..dfab97742d
--- /dev/null
+++ b/ydb/core/blobstorage/ut_blobstorage/balancing.cpp
@@ -0,0 +1,305 @@
+#include <ydb/core/blobstorage/ut_blobstorage/lib/env.h>
+
+#include <library/cpp/iterator/enumerate.h>
+
+#include <util/random/entropy.h>
+
+
+using TPartsLocations = TVector<TVector<ui8>>;
+
+
+struct TTestEnv {
+ TTestEnv(ui32 nodeCount, TBlobStorageGroupType erasure)
+ : Env({
+ .NodeCount = nodeCount,
+ .VDiskReplPausedAtStart = false,
+ .Erasure = erasure,
+ })
+ {
+ Env.CreateBoxAndPool(1, 1);
+ Env.Sim(TDuration::Minutes(1));
+
+ auto groups = Env.GetGroups();
+ UNIT_ASSERT_VALUES_EQUAL(groups.size(), 1);
+ GroupInfo = Env.GetGroupInfo(groups.front());
+
+ for (ui32 i = 0; i < Env.Settings.NodeCount; ++i) {
+ RunningNodes.insert(i);
+ }
+ }
+
+ static TString PrepareData(const ui32 dataLen, const ui32 start) {
+ TString data(Reserve(dataLen));
+ for (ui32 i = 0; i < dataLen; ++i) {
+ data.push_back('a' + (start + i) % 26);
+ }
+ return data;
+ };
+
+ void SendPut(ui32 step, const TString& data, NKikimrProto::EReplyStatus expectedStatus) {
+ const TLogoBlobID id(1, 1, step, 0, data.size(), 0);
+ Cerr << "SEND TEvPut with key " << id.ToString() << Endl;
+ const TActorId sender = Env.Runtime->AllocateEdgeActor(GroupInfo->GetActorId(*RunningNodes.begin()).NodeId(), __FILE__, __LINE__);
+ auto ev = std::make_unique<TEvBlobStorage::TEvPut>(id, data, TInstant::Max());
+ Env.Runtime->WrapInActorContext(sender, [&] {
+ SendToBSProxy(sender, GroupInfo->GroupID, ev.release());
+ });
+ auto res = Env.WaitForEdgeActorEvent<TEvBlobStorage::TEvPutResult>(sender, false);
+ UNIT_ASSERT_VALUES_EQUAL(res->Get()->Status, expectedStatus);
+ Cerr << "TEvPutResult: " << res->Get()->ToString() << Endl;
+ };
+
+ auto SendGet(ui32 step, ui32 dataSize, bool mustRestoreFirst=false) {
+ const TLogoBlobID blobId(1, 1, step, 0, dataSize, 0);
+ Cerr << "SEND TEvGet with key " << blobId.ToString() << Endl;
+ const TActorId sender = Env.Runtime->AllocateEdgeActor(GroupInfo->GetActorId(*RunningNodes.begin()).NodeId(), __FILE__, __LINE__);
+ auto ev = std::make_unique<TEvBlobStorage::TEvGet>(
+ blobId,
+ /* shift */ 0,
+ /* size */ dataSize,
+ TInstant::Max(),
+ NKikimrBlobStorage::EGetHandleClass::FastRead,
+ mustRestoreFirst
+ );
+ Env.Runtime->WrapInActorContext(sender, [&] () {
+ SendToBSProxy(sender, GroupInfo->GroupID, ev.release());
+ });
+ TInstant getDeadline = Env.Now() + TDuration::Seconds(30);
+ auto res = Env.WaitForEdgeActorEvent<TEvBlobStorage::TEvGetResult>(sender, /* termOnCapture */ false, getDeadline);
+ Cerr << "TEvGetResult: " << res->Get()->ToString() << Endl;
+ return res;
+ };
+
+ TActorId GetQueue(const TVDiskID& vDiskId) {
+ if (!Queues.contains(vDiskId)) {
+ Queues[vDiskId] = Env.CreateQueueActor(vDiskId, NKikimrBlobStorage::EVDiskQueueId::GetFastRead, 1000);
+ }
+ return Queues[vDiskId];
+ }
+
+ TVector<ui32> GetParts(ui32 position, const TLogoBlobID& blobId) {
+ if (!RunningNodes.contains(position)) {
+ return {};
+ }
+ auto vDiskId = GroupInfo->GetVDiskId(position);
+ auto ev = TEvBlobStorage::TEvVGet::CreateExtremeIndexQuery(
+ vDiskId, TInstant::Max(), NKikimrBlobStorage::EGetHandleClass::AsyncRead,
+ TEvBlobStorage::TEvVGet::EFlags::None, 0,
+ {{blobId, 0, 0}}
+ );
+ const TActorId sender = Env.Runtime->AllocateEdgeActor(GroupInfo->GetActorId(*RunningNodes.begin()).NodeId(), __FILE__, __LINE__);
+ TVector<ui32> partsRes;
+
+ Cerr << "Get request for vdisk " << position << Endl;
+ auto queueId = GetQueue(vDiskId);
+ Env.Runtime->WrapInActorContext(sender, [&] {
+ Env.Runtime->Send(new IEventHandle(queueId, sender, ev.release()));
+ });
+ auto res = Env.WaitForEdgeActorEvent<TEvBlobStorage::TEvVGetResult>(sender, false);
+ auto parts = res->Get()->Record.GetResult().at(0).GetParts();
+ partsRes = TVector<ui32>(parts.begin(), parts.end());
+ return partsRes;
+ }
+
+ TPartsLocations GetExpectedPartsLocations(const TLogoBlobID& blobId) {
+ TPartsLocations result(GroupInfo->GetTopology().GType.BlobSubgroupSize());
+ TBlobStorageGroupInfo::TOrderNums orderNums;
+ GroupInfo->GetTopology().PickSubgroup(blobId.Hash(), orderNums);
+ for (ui32 i = 0; i < GroupInfo->GetTopology().GType.TotalPartCount(); ++i) {
+ result[orderNums[i]].push_back(i + 1);
+ }
+ return result;
+ }
+
+ TPartsLocations GetActualPartsLocations(const TLogoBlobID& blobId) {
+ TPartsLocations result(GroupInfo->GetTopology().GType.BlobSubgroupSize());
+ for (ui32 i = 0; i < result.size(); ++i) {
+ for (ui32 part: GetParts(i, blobId)) {
+ result[i].push_back(part);
+ }
+ Sort(result[i].begin(), result[i].end());
+ }
+ return result;
+ }
+
+ bool CheckPartsLocations(const TLogoBlobID& blobId) {
+ auto expectedParts = GetExpectedPartsLocations(blobId);
+ auto actualParts = GetActualPartsLocations(blobId);
+ UNIT_ASSERT_VALUES_EQUAL(expectedParts.size(), actualParts.size());
+
+ for (ui32 i = 0; i < expectedParts.size(); ++i) {
+ UNIT_ASSERT_VALUES_EQUAL(expectedParts[i].size(), actualParts[i].size());
+ for (ui32 j = 0; j < expectedParts[i].size(); ++j) {
+ UNIT_ASSERT_VALUES_EQUAL(expectedParts[i][j], actualParts[i][j]);
+ }
+ }
+
+ return true;
+ }
+
+ void StopNode(ui32 position) {
+ if (!RunningNodes.contains(position)) {
+ return;
+ }
+ Env.StopNode(GroupInfo->GetActorId(position).NodeId());
+ RunningNodes.erase(position);
+ }
+
+ void StartNode(ui32 position) {
+ if (RunningNodes.contains(position)) {
+ return;
+ }
+ Env.StartNode(GroupInfo->GetActorId(position).NodeId());
+ RunningNodes.insert(position);
+ for (auto [_, queueId]: Queues) {
+ Env.Runtime->Send(new IEventHandle(TEvents::TSystem::Poison, 0, queueId, {}, nullptr, 0), queueId.NodeId());
+ }
+ Queues.clear();
+ }
+
+ TEnvironmentSetup* operator->() {
+ return &Env;
+ }
+
+ TEnvironmentSetup Env;
+ TIntrusivePtr<TBlobStorageGroupInfo> GroupInfo;
+ THashSet<ui32> RunningNodes;
+ THashMap<TVDiskID, TActorId> Queues;
+};
+
+TLogoBlobID MakeLogoBlobId(ui32 step, ui32 dataSize) {
+ return TLogoBlobID(1, 1, step, 0, dataSize, 0);
+}
+
+
+TString GenData(ui32 len) {
+ TString res = TString::Uninitialized(len);
+ EntropyPool().Read(res.Detach(), res.size());
+ return res;
+}
+
+
+struct TStopOneNodeTest {
+ TTestEnv Env;
+ TString data;
+
+ void RunTest() {
+ ui32 step = 0;
+
+ { // Check just a normal put works
+ Env.SendPut(++step, data, NKikimrProto::OK);
+ UNIT_ASSERT_VALUES_EQUAL(Env.SendGet(step, data.size())->Get()->Responses[0].Buffer.ConvertToString(), data);
+ Env.CheckPartsLocations(MakeLogoBlobId(step, data.size()));
+ }
+
+
+ { // Stop one node that should have a part, make put, start it and check that blob would be moved from handoff on main
+ auto blobId = MakeLogoBlobId(++step, data.size());
+ auto locations = Env.GetExpectedPartsLocations(blobId);
+ ui32 nodeIdWithBlob = 0;
+ while (locations[nodeIdWithBlob].size() == 0) ++nodeIdWithBlob;
+
+ Env.StopNode(nodeIdWithBlob);
+ Env.SendPut(step, data, NKikimrProto::OK);
+ Env->Sim(TDuration::Seconds(10));
+ Env.StartNode(nodeIdWithBlob);
+ Env->Sim(TDuration::Seconds(10));
+ Env.CheckPartsLocations(MakeLogoBlobId(step, data.size()));
+ UNIT_ASSERT_VALUES_EQUAL(Env.SendGet(step, data.size())->Get()->Responses[0].Buffer.ConvertToString(), data);
+ }
+ }
+};
+
+struct TRandomTest {
+ TTestEnv Env;
+ ui32 NumIters;
+
+ void RunTest() {
+ TVector<TString> data(Reserve(NumIters));
+
+ for (ui32 step = 0; step < NumIters; ++step) {
+ Cerr << step << Endl;
+ data.push_back(GenData(16 + random() % 4096));
+ auto blobId = MakeLogoBlobId(step, data.back().size());
+ auto locations = Env.GetExpectedPartsLocations(blobId);
+
+ if (random() % 10 == 1 && Env.RunningNodes.size() + 2 > Env->Settings.NodeCount) {
+ ui32 nodeId = random() % Env->Settings.NodeCount;
+ Cerr << "Stop node " << nodeId << Endl;
+ Env.StopNode(nodeId);
+ Env->Sim(TDuration::Seconds(10));
+ }
+
+ Env.SendPut(step, data.back(), NKikimrProto::OK);
+
+ if (random() % 10 == 1) {
+ for (ui32 pos = 0; pos < Env->Settings.NodeCount; ++pos) {
+ if (!Env.RunningNodes.contains(pos)) {
+ Cerr << "Start node " << pos << Endl;
+ Env.StartNode(pos);
+ Env->Sim(TDuration::Seconds(10));
+ break;
+ }
+ }
+ }
+
+ if (random() % 50 == 1) {
+ ui32 pos = random() % Env->Settings.NodeCount;
+ if (Env.RunningNodes.contains(pos)) {
+ Env->CompactVDisk(Env.GroupInfo->GetActorId(pos));
+ Env->Sim(TDuration::Seconds(10));
+ }
+ }
+
+ // Wipe random node
+ if (random() % 100 == 1) {
+ ui32 pos = random() % Env->Settings.NodeCount;
+ if (Env.RunningNodes.contains(pos)) {
+ auto baseConfig = Env->FetchBaseConfig();
+ const auto& somePDisk = baseConfig.GetPDisk(pos);
+ const auto& someVSlot = baseConfig.GetVSlot(pos);
+ Env->Wipe(somePDisk.GetNodeId(), somePDisk.GetPDiskId(), someVSlot.GetVSlotId().GetVSlotId());
+ Env->Sim(TDuration::Seconds(10));
+ }
+ }
+ }
+
+ for (ui32 pos = 0; pos < Env->Settings.NodeCount; ++pos) {
+ Env.StartNode(pos);
+ }
+
+ Env->Sim(TDuration::Seconds(300));
+ Cerr << "Start checking" << Endl;
+ for (ui32 step = 0; step < NumIters; ++step) {
+ Cerr << step << Endl;
+ Env.CheckPartsLocations(MakeLogoBlobId(step, data[step].size()));
+ UNIT_ASSERT_VALUES_EQUAL(Env.SendGet(step, data[step].size())->Get()->Responses[0].Buffer.ConvertToString(), data[step]);
+ }
+ }
+};
+
+
+
+Y_UNIT_TEST_SUITE(VDiskBalancing) {
+
+ Y_UNIT_TEST(TestStopOneNode_Block42) {
+ TStopOneNodeTest{TTestEnv(8, TBlobStorageGroupType::Erasure4Plus2Block), GenData(100)}.RunTest();
+ }
+ Y_UNIT_TEST(TestStopOneNode_Mirror3dc) {
+ TStopOneNodeTest{TTestEnv(9, TBlobStorageGroupType::ErasureMirror3dc), GenData(100)}.RunTest();
+ }
+ Y_UNIT_TEST(TestStopOneNode_Block42_HugeBlob) {
+ TStopOneNodeTest{TTestEnv(8, TBlobStorageGroupType::Erasure4Plus2Block), GenData(521_KB)}.RunTest();
+ }
+ Y_UNIT_TEST(TestStopOneNode_Mirror3dc_HugeBlob) {
+ TStopOneNodeTest{TTestEnv(9, TBlobStorageGroupType::ErasureMirror3dc), GenData(521_KB)}.RunTest();
+ }
+
+ Y_UNIT_TEST(TestRandom_Block42) {
+ TRandomTest{TTestEnv(8, TBlobStorageGroupType::Erasure4Plus2Block), 1000}.RunTest();
+ }
+ Y_UNIT_TEST(TestRandom_Mirror3dc) {
+ TRandomTest{TTestEnv(9, TBlobStorageGroupType::ErasureMirror3dc), 1000}.RunTest();
+ }
+
+}
diff --git a/ydb/core/blobstorage/ut_blobstorage/ut_balancing/ya.make b/ydb/core/blobstorage/ut_blobstorage/ut_balancing/ya.make
new file mode 100644
index 0000000000..d5c45d86d1
--- /dev/null
+++ b/ydb/core/blobstorage/ut_blobstorage/ut_balancing/ya.make
@@ -0,0 +1,15 @@
+UNITTEST_FOR(ydb/core/blobstorage/ut_blobstorage)
+
+ SIZE(MEDIUM)
+
+ TIMEOUT(600)
+
+ SRCS(
+ balancing.cpp
+ )
+
+ PEERDIR(
+ ydb/core/blobstorage/ut_blobstorage/lib
+ )
+
+END()
diff --git a/ydb/core/blobstorage/ut_blobstorage/ya.make b/ydb/core/blobstorage/ut_blobstorage/ya.make
index 179e060cd7..ca53b4d87d 100644
--- a/ydb/core/blobstorage/ut_blobstorage/ya.make
+++ b/ydb/core/blobstorage/ut_blobstorage/ya.make
@@ -60,6 +60,7 @@ REQUIREMENTS(ram:32)
END()
RECURSE_FOR_TESTS(
+ ut_balancing
ut_blob_depot
ut_blob_depot_fat
ut_donor
diff --git a/ydb/core/util/testactorsys.h b/ydb/core/util/testactorsys.h
index cb58762db5..7a0b9f93fe 100644
--- a/ydb/core/util/testactorsys.h
+++ b/ydb/core/util/testactorsys.h
@@ -161,7 +161,7 @@ class TTestActorSystem {
}
void StateFunc(TAutoPtr<IEventHandle>& ev) {
- Y_ABORT_UNLESS(HandlePtr, "event is not being captured by this actor Tag# %s", Tag.data());
+ Y_ABORT_UNLESS(HandlePtr, "event %s is not being captured by this actor Tag# %s", ev->GetTypeName().data(), Tag.data());
Y_ABORT_UNLESS(!*HandlePtr);
HandlePtr->reset(ev.Release());
}