diff options
author | Robert Drynkin <rob.drynkin@gmail.com> | 2023-12-22 16:23:19 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-12-22 16:23:19 +0100 |
commit | 7116d46ae7c0259b5f9d489de263f8701e432b1c (patch) | |
tree | 94bea84b0f1de413ac3d2fde52502acb78090e10 | |
parent | 86194b9ee88b97e6442d2af9060ff874fc7216d9 (diff) | |
download | ydb-7116d46ae7c0259b5f9d489de263f8701e432b1c.tar.gz |
KIKIMR-20522: Tests for vdisks balancing (#531)
Co-authored-by: robdrynkin <robdrynkin@nebius.com>
-rw-r--r-- | ydb/core/blobstorage/ut_blobstorage/balancing.cpp | 305 | ||||
-rw-r--r-- | ydb/core/blobstorage/ut_blobstorage/ut_balancing/ya.make | 15 | ||||
-rw-r--r-- | ydb/core/blobstorage/ut_blobstorage/ya.make | 1 | ||||
-rw-r--r-- | ydb/core/util/testactorsys.h | 2 |
4 files changed, 322 insertions, 1 deletions
diff --git a/ydb/core/blobstorage/ut_blobstorage/balancing.cpp b/ydb/core/blobstorage/ut_blobstorage/balancing.cpp new file mode 100644 index 0000000000..dfab97742d --- /dev/null +++ b/ydb/core/blobstorage/ut_blobstorage/balancing.cpp @@ -0,0 +1,305 @@ +#include <ydb/core/blobstorage/ut_blobstorage/lib/env.h> + +#include <library/cpp/iterator/enumerate.h> + +#include <util/random/entropy.h> + + +using TPartsLocations = TVector<TVector<ui8>>; + + +struct TTestEnv { + TTestEnv(ui32 nodeCount, TBlobStorageGroupType erasure) + : Env({ + .NodeCount = nodeCount, + .VDiskReplPausedAtStart = false, + .Erasure = erasure, + }) + { + Env.CreateBoxAndPool(1, 1); + Env.Sim(TDuration::Minutes(1)); + + auto groups = Env.GetGroups(); + UNIT_ASSERT_VALUES_EQUAL(groups.size(), 1); + GroupInfo = Env.GetGroupInfo(groups.front()); + + for (ui32 i = 0; i < Env.Settings.NodeCount; ++i) { + RunningNodes.insert(i); + } + } + + static TString PrepareData(const ui32 dataLen, const ui32 start) { + TString data(Reserve(dataLen)); + for (ui32 i = 0; i < dataLen; ++i) { + data.push_back('a' + (start + i) % 26); + } + return data; + }; + + void SendPut(ui32 step, const TString& data, NKikimrProto::EReplyStatus expectedStatus) { + const TLogoBlobID id(1, 1, step, 0, data.size(), 0); + Cerr << "SEND TEvPut with key " << id.ToString() << Endl; + const TActorId sender = Env.Runtime->AllocateEdgeActor(GroupInfo->GetActorId(*RunningNodes.begin()).NodeId(), __FILE__, __LINE__); + auto ev = std::make_unique<TEvBlobStorage::TEvPut>(id, data, TInstant::Max()); + Env.Runtime->WrapInActorContext(sender, [&] { + SendToBSProxy(sender, GroupInfo->GroupID, ev.release()); + }); + auto res = Env.WaitForEdgeActorEvent<TEvBlobStorage::TEvPutResult>(sender, false); + UNIT_ASSERT_VALUES_EQUAL(res->Get()->Status, expectedStatus); + Cerr << "TEvPutResult: " << res->Get()->ToString() << Endl; + }; + + auto SendGet(ui32 step, ui32 dataSize, bool mustRestoreFirst=false) { + const TLogoBlobID blobId(1, 1, step, 0, dataSize, 0); + Cerr << "SEND TEvGet with key " << blobId.ToString() << Endl; + const TActorId sender = Env.Runtime->AllocateEdgeActor(GroupInfo->GetActorId(*RunningNodes.begin()).NodeId(), __FILE__, __LINE__); + auto ev = std::make_unique<TEvBlobStorage::TEvGet>( + blobId, + /* shift */ 0, + /* size */ dataSize, + TInstant::Max(), + NKikimrBlobStorage::EGetHandleClass::FastRead, + mustRestoreFirst + ); + Env.Runtime->WrapInActorContext(sender, [&] () { + SendToBSProxy(sender, GroupInfo->GroupID, ev.release()); + }); + TInstant getDeadline = Env.Now() + TDuration::Seconds(30); + auto res = Env.WaitForEdgeActorEvent<TEvBlobStorage::TEvGetResult>(sender, /* termOnCapture */ false, getDeadline); + Cerr << "TEvGetResult: " << res->Get()->ToString() << Endl; + return res; + }; + + TActorId GetQueue(const TVDiskID& vDiskId) { + if (!Queues.contains(vDiskId)) { + Queues[vDiskId] = Env.CreateQueueActor(vDiskId, NKikimrBlobStorage::EVDiskQueueId::GetFastRead, 1000); + } + return Queues[vDiskId]; + } + + TVector<ui32> GetParts(ui32 position, const TLogoBlobID& blobId) { + if (!RunningNodes.contains(position)) { + return {}; + } + auto vDiskId = GroupInfo->GetVDiskId(position); + auto ev = TEvBlobStorage::TEvVGet::CreateExtremeIndexQuery( + vDiskId, TInstant::Max(), NKikimrBlobStorage::EGetHandleClass::AsyncRead, + TEvBlobStorage::TEvVGet::EFlags::None, 0, + {{blobId, 0, 0}} + ); + const TActorId sender = Env.Runtime->AllocateEdgeActor(GroupInfo->GetActorId(*RunningNodes.begin()).NodeId(), __FILE__, __LINE__); + TVector<ui32> partsRes; + + Cerr << "Get request for vdisk " << position << Endl; + auto queueId = GetQueue(vDiskId); + Env.Runtime->WrapInActorContext(sender, [&] { + Env.Runtime->Send(new IEventHandle(queueId, sender, ev.release())); + }); + auto res = Env.WaitForEdgeActorEvent<TEvBlobStorage::TEvVGetResult>(sender, false); + auto parts = res->Get()->Record.GetResult().at(0).GetParts(); + partsRes = TVector<ui32>(parts.begin(), parts.end()); + return partsRes; + } + + TPartsLocations GetExpectedPartsLocations(const TLogoBlobID& blobId) { + TPartsLocations result(GroupInfo->GetTopology().GType.BlobSubgroupSize()); + TBlobStorageGroupInfo::TOrderNums orderNums; + GroupInfo->GetTopology().PickSubgroup(blobId.Hash(), orderNums); + for (ui32 i = 0; i < GroupInfo->GetTopology().GType.TotalPartCount(); ++i) { + result[orderNums[i]].push_back(i + 1); + } + return result; + } + + TPartsLocations GetActualPartsLocations(const TLogoBlobID& blobId) { + TPartsLocations result(GroupInfo->GetTopology().GType.BlobSubgroupSize()); + for (ui32 i = 0; i < result.size(); ++i) { + for (ui32 part: GetParts(i, blobId)) { + result[i].push_back(part); + } + Sort(result[i].begin(), result[i].end()); + } + return result; + } + + bool CheckPartsLocations(const TLogoBlobID& blobId) { + auto expectedParts = GetExpectedPartsLocations(blobId); + auto actualParts = GetActualPartsLocations(blobId); + UNIT_ASSERT_VALUES_EQUAL(expectedParts.size(), actualParts.size()); + + for (ui32 i = 0; i < expectedParts.size(); ++i) { + UNIT_ASSERT_VALUES_EQUAL(expectedParts[i].size(), actualParts[i].size()); + for (ui32 j = 0; j < expectedParts[i].size(); ++j) { + UNIT_ASSERT_VALUES_EQUAL(expectedParts[i][j], actualParts[i][j]); + } + } + + return true; + } + + void StopNode(ui32 position) { + if (!RunningNodes.contains(position)) { + return; + } + Env.StopNode(GroupInfo->GetActorId(position).NodeId()); + RunningNodes.erase(position); + } + + void StartNode(ui32 position) { + if (RunningNodes.contains(position)) { + return; + } + Env.StartNode(GroupInfo->GetActorId(position).NodeId()); + RunningNodes.insert(position); + for (auto [_, queueId]: Queues) { + Env.Runtime->Send(new IEventHandle(TEvents::TSystem::Poison, 0, queueId, {}, nullptr, 0), queueId.NodeId()); + } + Queues.clear(); + } + + TEnvironmentSetup* operator->() { + return &Env; + } + + TEnvironmentSetup Env; + TIntrusivePtr<TBlobStorageGroupInfo> GroupInfo; + THashSet<ui32> RunningNodes; + THashMap<TVDiskID, TActorId> Queues; +}; + +TLogoBlobID MakeLogoBlobId(ui32 step, ui32 dataSize) { + return TLogoBlobID(1, 1, step, 0, dataSize, 0); +} + + +TString GenData(ui32 len) { + TString res = TString::Uninitialized(len); + EntropyPool().Read(res.Detach(), res.size()); + return res; +} + + +struct TStopOneNodeTest { + TTestEnv Env; + TString data; + + void RunTest() { + ui32 step = 0; + + { // Check just a normal put works + Env.SendPut(++step, data, NKikimrProto::OK); + UNIT_ASSERT_VALUES_EQUAL(Env.SendGet(step, data.size())->Get()->Responses[0].Buffer.ConvertToString(), data); + Env.CheckPartsLocations(MakeLogoBlobId(step, data.size())); + } + + + { // Stop one node that should have a part, make put, start it and check that blob would be moved from handoff on main + auto blobId = MakeLogoBlobId(++step, data.size()); + auto locations = Env.GetExpectedPartsLocations(blobId); + ui32 nodeIdWithBlob = 0; + while (locations[nodeIdWithBlob].size() == 0) ++nodeIdWithBlob; + + Env.StopNode(nodeIdWithBlob); + Env.SendPut(step, data, NKikimrProto::OK); + Env->Sim(TDuration::Seconds(10)); + Env.StartNode(nodeIdWithBlob); + Env->Sim(TDuration::Seconds(10)); + Env.CheckPartsLocations(MakeLogoBlobId(step, data.size())); + UNIT_ASSERT_VALUES_EQUAL(Env.SendGet(step, data.size())->Get()->Responses[0].Buffer.ConvertToString(), data); + } + } +}; + +struct TRandomTest { + TTestEnv Env; + ui32 NumIters; + + void RunTest() { + TVector<TString> data(Reserve(NumIters)); + + for (ui32 step = 0; step < NumIters; ++step) { + Cerr << step << Endl; + data.push_back(GenData(16 + random() % 4096)); + auto blobId = MakeLogoBlobId(step, data.back().size()); + auto locations = Env.GetExpectedPartsLocations(blobId); + + if (random() % 10 == 1 && Env.RunningNodes.size() + 2 > Env->Settings.NodeCount) { + ui32 nodeId = random() % Env->Settings.NodeCount; + Cerr << "Stop node " << nodeId << Endl; + Env.StopNode(nodeId); + Env->Sim(TDuration::Seconds(10)); + } + + Env.SendPut(step, data.back(), NKikimrProto::OK); + + if (random() % 10 == 1) { + for (ui32 pos = 0; pos < Env->Settings.NodeCount; ++pos) { + if (!Env.RunningNodes.contains(pos)) { + Cerr << "Start node " << pos << Endl; + Env.StartNode(pos); + Env->Sim(TDuration::Seconds(10)); + break; + } + } + } + + if (random() % 50 == 1) { + ui32 pos = random() % Env->Settings.NodeCount; + if (Env.RunningNodes.contains(pos)) { + Env->CompactVDisk(Env.GroupInfo->GetActorId(pos)); + Env->Sim(TDuration::Seconds(10)); + } + } + + // Wipe random node + if (random() % 100 == 1) { + ui32 pos = random() % Env->Settings.NodeCount; + if (Env.RunningNodes.contains(pos)) { + auto baseConfig = Env->FetchBaseConfig(); + const auto& somePDisk = baseConfig.GetPDisk(pos); + const auto& someVSlot = baseConfig.GetVSlot(pos); + Env->Wipe(somePDisk.GetNodeId(), somePDisk.GetPDiskId(), someVSlot.GetVSlotId().GetVSlotId()); + Env->Sim(TDuration::Seconds(10)); + } + } + } + + for (ui32 pos = 0; pos < Env->Settings.NodeCount; ++pos) { + Env.StartNode(pos); + } + + Env->Sim(TDuration::Seconds(300)); + Cerr << "Start checking" << Endl; + for (ui32 step = 0; step < NumIters; ++step) { + Cerr << step << Endl; + Env.CheckPartsLocations(MakeLogoBlobId(step, data[step].size())); + UNIT_ASSERT_VALUES_EQUAL(Env.SendGet(step, data[step].size())->Get()->Responses[0].Buffer.ConvertToString(), data[step]); + } + } +}; + + + +Y_UNIT_TEST_SUITE(VDiskBalancing) { + + Y_UNIT_TEST(TestStopOneNode_Block42) { + TStopOneNodeTest{TTestEnv(8, TBlobStorageGroupType::Erasure4Plus2Block), GenData(100)}.RunTest(); + } + Y_UNIT_TEST(TestStopOneNode_Mirror3dc) { + TStopOneNodeTest{TTestEnv(9, TBlobStorageGroupType::ErasureMirror3dc), GenData(100)}.RunTest(); + } + Y_UNIT_TEST(TestStopOneNode_Block42_HugeBlob) { + TStopOneNodeTest{TTestEnv(8, TBlobStorageGroupType::Erasure4Plus2Block), GenData(521_KB)}.RunTest(); + } + Y_UNIT_TEST(TestStopOneNode_Mirror3dc_HugeBlob) { + TStopOneNodeTest{TTestEnv(9, TBlobStorageGroupType::ErasureMirror3dc), GenData(521_KB)}.RunTest(); + } + + Y_UNIT_TEST(TestRandom_Block42) { + TRandomTest{TTestEnv(8, TBlobStorageGroupType::Erasure4Plus2Block), 1000}.RunTest(); + } + Y_UNIT_TEST(TestRandom_Mirror3dc) { + TRandomTest{TTestEnv(9, TBlobStorageGroupType::ErasureMirror3dc), 1000}.RunTest(); + } + +} diff --git a/ydb/core/blobstorage/ut_blobstorage/ut_balancing/ya.make b/ydb/core/blobstorage/ut_blobstorage/ut_balancing/ya.make new file mode 100644 index 0000000000..d5c45d86d1 --- /dev/null +++ b/ydb/core/blobstorage/ut_blobstorage/ut_balancing/ya.make @@ -0,0 +1,15 @@ +UNITTEST_FOR(ydb/core/blobstorage/ut_blobstorage) + + SIZE(MEDIUM) + + TIMEOUT(600) + + SRCS( + balancing.cpp + ) + + PEERDIR( + ydb/core/blobstorage/ut_blobstorage/lib + ) + +END() diff --git a/ydb/core/blobstorage/ut_blobstorage/ya.make b/ydb/core/blobstorage/ut_blobstorage/ya.make index 179e060cd7..ca53b4d87d 100644 --- a/ydb/core/blobstorage/ut_blobstorage/ya.make +++ b/ydb/core/blobstorage/ut_blobstorage/ya.make @@ -60,6 +60,7 @@ REQUIREMENTS(ram:32) END() RECURSE_FOR_TESTS( + ut_balancing ut_blob_depot ut_blob_depot_fat ut_donor diff --git a/ydb/core/util/testactorsys.h b/ydb/core/util/testactorsys.h index cb58762db5..7a0b9f93fe 100644 --- a/ydb/core/util/testactorsys.h +++ b/ydb/core/util/testactorsys.h @@ -161,7 +161,7 @@ class TTestActorSystem { } void StateFunc(TAutoPtr<IEventHandle>& ev) { - Y_ABORT_UNLESS(HandlePtr, "event is not being captured by this actor Tag# %s", Tag.data()); + Y_ABORT_UNLESS(HandlePtr, "event %s is not being captured by this actor Tag# %s", ev->GetTypeName().data(), Tag.data()); Y_ABORT_UNLESS(!*HandlePtr); HandlePtr->reset(ev.Release()); } |