diff options
author | Alexander Rutkovsky <alexvru@ydb.tech> | 2024-01-31 11:54:31 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-01-31 11:54:31 +0300 |
commit | 03d8b00cb82919ad02a176fc7b6d32f678b7698f (patch) | |
tree | 3ed69787a4dfca83e5293289a23f23f956c31eb7 | |
parent | 94651fc6c13f632932aaa1a57a1c790223452a98 (diff) | |
download | ydb-03d8b00cb82919ad02a176fc7b6d32f678b7698f.tar.gz |
Extra test for patching logic (#1446)
-rw-r--r-- | ydb/core/blobstorage/ut_blobstorage/patch.cpp | 299 | ||||
-rw-r--r-- | ydb/core/util/testactorsys.h | 1 |
2 files changed, 300 insertions, 0 deletions
diff --git a/ydb/core/blobstorage/ut_blobstorage/patch.cpp b/ydb/core/blobstorage/ut_blobstorage/patch.cpp index b35e3930df..c0246738ee 100644 --- a/ydb/core/blobstorage/ut_blobstorage/patch.cpp +++ b/ydb/core/blobstorage/ut_blobstorage/patch.cpp @@ -1,5 +1,8 @@ #include <ydb/core/blobstorage/ut_blobstorage/lib/env.h> #include <ydb/core/blobstorage/ut_blobstorage/lib/common.h> +#include <ydb/core/blobstorage/groupinfo/blobstorage_groupinfo_partlayout.h> +#include <ydb/core/util/lz4_data_generator.h> +#include <library/cpp/digest/crc32c/crc32c.h> Y_UNIT_TEST_SUITE(BlobPatching) { @@ -201,5 +204,301 @@ Y_UNIT_TEST_SUITE(BlobPatching) { } }); } + + Y_UNIT_TEST(PatchBlock42) { + TEnvironmentSetup env{{ + .NodeCount = 8, + .Erasure = TBlobStorageGroupType::Erasure4Plus2Block + }}; + auto& runtime = env.Runtime; + env.CreateBoxAndPool(1, 2); + env.Sim(TDuration::Seconds(60)); + + std::vector<ui32> groups = env.GetGroups(); + UNIT_ASSERT_VALUES_EQUAL(groups.size(), 2); + + std::unordered_set<ui32> stoppedNodes; + std::unordered_set<ui32> runningNodes; + std::vector<TLogoBlobID> putBlobs; + std::unordered_map<TLogoBlobID, ui32> digest; + ui64 tabletId = 1'000'000'000; + ui32 generation = 1; + ui32 step = 1; + ui8 channel = 5; + + ui32 numPatchedBlobs = 0; + + for (ui32 nodeId : runtime->GetNodes()) { + if (nodeId != 1) { + runningNodes.insert(nodeId); + } + } + + auto stopNode = [&] { + UNIT_ASSERT(runningNodes.size()); + auto it = runningNodes.begin(); + std::advance(it, RandomNumber(runningNodes.size())); + env.StopNode(*it); + Cerr << "*** stopped node " << *it << Endl; + stoppedNodes.insert(runningNodes.extract(it)); + }; + + auto startNode = [&] { + UNIT_ASSERT(stoppedNodes.size()); + auto it = stoppedNodes.begin(); + std::advance(it, RandomNumber(stoppedNodes.size())); + env.StartNode(*it); + Cerr << "*** started node " << *it << Endl; + runningNodes.insert(stoppedNodes.extract(it)); + }; + + auto sim = [&](TActorId edgeId) { + std::unique_ptr<IEventHandle> res; + auto *edge = dynamic_cast<TTestActorSystem::TEdgeActor*>(runtime->GetActor(edgeId)); + Y_VERIFY(edge); + edge->WaitForEvent(&res); + + bool stopped = false; + bool started = false; + + while (!res) { + bool iteration = true; + runtime->Sim([&] { return std::exchange(iteration, false); }); + if (stoppedNodes.size() && RandomNumber(1000u) == 0 && !started) { + startNode(); + started = true; + } else if (stoppedNodes.size() < 2 && RandomNumber(1000u) == 0 && !stopped) { + stopNode(); + stopped = true; + } + } + + edge->StopWaitingForEvent(); + return res; + }; + + auto dumpContent = [&](std::string_view data) -> TString { + TStringStream s; + s << '['; + + const ui32 step = 4096; + for (ui32 offset = 0; offset < data.size(); offset += step) { + auto fragment = data.substr(offset, step); + if (offset) { + s << ' '; + } + s << Sprintf("%08" PRIx32, Crc32c(fragment.data(), fragment.size())); + } + s << ']'; + return s.Str(); + }; + + auto putBlob = [&] { + const TActorId sender = runtime->AllocateEdgeActor(1, __FILE__, __LINE__); + ui32 count = 1 + RandomNumber(3u); + std::unordered_map<TLogoBlobID, TString> content; + for (ui32 n = 0; n < count; ++n) { + const ui32 size = 16384; + const TLogoBlobID id(tabletId, generation, step, channel, size, 0); + ++step; + TString data = FastGenDataForLZ4(size, id.Hash()); + content[id] = dumpContent(data); + digest.emplace(id, Crc32c(data.data(), data.size())); + runtime->WrapInActorContext(sender, [&] { + SendToBSProxy(sender, groups[0], new TEvBlobStorage::TEvPut(id, TRcBuf(data), TInstant::Max())); + }); + } + for (ui32 n = 0; n < count; ++n) { + auto res = sim(sender); + auto *m = res->Get<TEvBlobStorage::TEvPutResult>(); + if (m->Status == NKikimrProto::OK) { + Cerr << "*** put blob Id# " << m->Id << " content# " << content[m->Id] << Endl; + putBlobs.push_back(m->Id); + } + } + runtime->DestroyActor(sender); + }; + + auto restoreBlob = [&] { + const TLogoBlobID blobId = putBlobs[RandomNumber(putBlobs.size())]; + const TActorId sender = runtime->AllocateEdgeActor(1, __FILE__, __LINE__); + runtime->WrapInActorContext(sender, [&] { + SendToBSProxy(sender, groups[0], new TEvBlobStorage::TEvGet(blobId, 0, 0, TInstant::Max(), + NKikimrBlobStorage::EGetHandleClass::FastRead, true, true)); + }); + auto res = env.WaitForEdgeActorEvent<TEvBlobStorage::TEvGetResult>(sender); + if (res->Get()->Status == NKikimrProto::OK) { + UNIT_ASSERT_VALUES_EQUAL(res->Get()->ResponseSz, 1); + UNIT_ASSERT_VALUES_EQUAL(res->Get()->Responses[0].Status, NKikimrProto::OK); + Cerr << "*** restore blob Id# " << blobId << Endl; + } + }; + + auto patchBlob = [&] { + const TLogoBlobID originalId = putBlobs[RandomNumber(putBlobs.size())]; + const ui32 targetGroup = groups[RandomNumber(groups.size())]; + const TActorId sender = runtime->AllocateEdgeActor(1, __FILE__, __LINE__); + TLogoBlobID patchedId(tabletId, generation, step, channel, originalId.BlobSize(), 0); + ++step; + const bool success = TEvBlobStorage::TEvPatch::GetBlobIdWithSamePlacement(originalId, &patchedId, 0xff, + groups[0], targetGroup); + UNIT_ASSERT(targetGroup != groups[0] || success); + using TDiff = TEvBlobStorage::TEvPatch::TDiff; + std::vector<TDiff> diffs; + + TString data; + { + const TActorId sender = runtime->AllocateEdgeActor(1, __FILE__, __LINE__); + runtime->WrapInActorContext(sender, [&] { + SendToBSProxy(sender, groups[0], new TEvBlobStorage::TEvGet(originalId, 0, 0, TInstant::Max(), + NKikimrBlobStorage::EGetHandleClass::FastRead)); + }); + auto res = env.WaitForEdgeActorEvent<TEvBlobStorage::TEvGetResult>(sender); + if (res->Get()->Status != NKikimrProto::OK) { + return; + } + UNIT_ASSERT_VALUES_EQUAL(res->Get()->ResponseSz, 1); + UNIT_ASSERT_VALUES_EQUAL(res->Get()->Responses[0].Status, NKikimrProto::OK); + data = res->Get()->Responses[0].Buffer.ConvertToString(); + Cerr << "*** got " << originalId << " content# " << dumpContent(data) << Endl; + UNIT_ASSERT_VALUES_EQUAL(data.size(), originalId.BlobSize()); + UNIT_ASSERT(digest.contains(originalId)); + UNIT_ASSERT_VALUES_EQUAL(Crc32c(data.data(), data.size()), digest[originalId]); + } + + for (ui32 offset = 0; offset < originalId.BlobSize(); offset += 4096) { + if (RandomNumber(2u) || (offset + 4096 == originalId.BlobSize() && diffs.empty())) { + TString chunk = FastGenDataForLZ4(4096, patchedId.Hash()); + TDiff diff; + diff.Set(TRcBuf(chunk), offset); + diffs.push_back(diff); + memcpy(data.Detach() + offset, chunk.data(), chunk.size()); + } + } + digest.emplace(patchedId, Crc32c(data.data(), data.size())); + runtime->WrapInActorContext(sender, [&] { + TArrayHolder<TDiff> ptr(new TDiff[diffs.size()]); + std::copy(diffs.begin(), diffs.end(), ptr.Get()); + SendToBSProxy(sender, targetGroup, new TEvBlobStorage::TEvPatch(groups[0], originalId, patchedId, + 0xff, std::move(ptr), diffs.size(), TInstant::Max())); + }); + auto res = sim(sender); + if (res->Get<TEvBlobStorage::TEvPatchResult>()->Status == NKikimrProto::OK) { + Cerr << "*** patched OriginalId# " << originalId << " to PatchedId# " << patchedId << + " content# " << dumpContent(data) << Endl; + ++numPatchedBlobs; + if (targetGroup == groups[0]) { + putBlobs.push_back(patchedId); + } + } + }; + + while (putBlobs.size() < 1000 && numPatchedBlobs < 10000) { + const ui32 canStop = stoppedNodes.size() < 2 ? 10 : 0; + const ui32 canStart = stoppedNodes.size() ? 10 : 0; + const ui32 canPut = putBlobs.size() < 1000 ? 100 : 0; + const ui32 canRestore = putBlobs.size() ? 50 : 0; + const ui32 canPatch = putBlobs.size() ? 2000 : 0; + const ui32 canWait = 100; + i32 w = RandomNumber(canStop + canStart + canPut + canPatch + canWait); + if ((w -= canStop) < 0) { + stopNode(); + env.Sim(TDuration::Seconds(5)); + } else if ((w -= canStart) < 0) { + startNode(); + env.Sim(TDuration::Seconds(5)); + } else if ((w -= canPut) < 0) { + putBlob(); + } else if ((w -= canRestore) < 0) { + restoreBlob(); + } else if ((w -= canPatch) < 0) { + patchBlob(); + } else if ((w -= canWait) < 0) { + env.Sim(TDuration::Seconds(20)); + } else { + UNIT_FAIL("unexpected scenario"); + } + } + + for (const ui32 nodeId : stoppedNodes) { + env.StartNode(nodeId); + } + + auto info = env.GetGroupInfo(groups[0]); + std::vector<TActorId> queues; + for (ui32 i = 0; i < info->GetTotalVDisksNum(); ++i) { + queues.push_back(env.CreateQueueActor(info->GetVDiskId(i), NKikimrBlobStorage::EVDiskQueueId::GetFastRead, 1000)); + } + + const TActorId sender = runtime->AllocateEdgeActor(1, __FILE__, __LINE__); + + auto checkBlob = [&](TLogoBlobID id) { + Cerr << "*** checking blob " << id << Endl; + + std::vector<TString> parts(info->Type.TotalPartCount()); + ui32 mask = 0; + TSubgroupPartLayout layout; + ui32 writtenParts = 0; + + for (ui32 i = 0; i < info->GetTotalVDisksNum(); ++i) { + const TVDiskID& vdiskId = info->GetVDiskId(i); + runtime->Send(new IEventHandle(queues[i], sender, TEvBlobStorage::TEvVGet::CreateExtremeDataQuery(vdiskId, + TInstant::Max(), NKikimrBlobStorage::EGetHandleClass::FastRead, TEvBlobStorage::TEvVGet::EFlags::None, + Nothing(), {{id}}).release()), sender.NodeId()); + + auto r = env.WaitForEdgeActorEvent<TEvBlobStorage::TEvVGetResult>(sender, false); + auto& record = r->Get()->Record; + UNIT_ASSERT_VALUES_EQUAL(record.GetStatus(), NKikimrProto::OK); + for (auto& res : record.GetResult()) { + if (res.GetStatus() == NKikimrProto::OK) { + TString buffer = r->Get()->GetBlobData(res).ConvertToString(); + const TLogoBlobID id(LogoBlobIDFromLogoBlobID(res.GetBlobID())); + const ui32 partIdx = id.PartId() - 1; + if (parts[partIdx]) { + UNIT_ASSERT_VALUES_EQUAL(parts[partIdx], buffer); + } + parts[partIdx] = std::move(buffer); + layout.AddItem(info->GetIdxInSubgroup(vdiskId, id.Hash()), partIdx, info->Type); + mask |= 1 << partIdx; + ++writtenParts; + } + } + } + + Cerr << " writtenParts# " << writtenParts << Endl; + + TDataPartSet ps; + ps.FullDataSize = id.BlobSize(); + ps.PartsMask = mask; + ps.Parts.resize(parts.size()); + for (ui32 i = 0; i < parts.size(); ++i) { + if (mask >> i & 1) { + ps.Parts[i].ResetToWhole(TRope(parts[i])); + } + } + + TRope outBuffer; + info->Type.RestoreData(TErasureType::CrcModeNone, ps, outBuffer, false, true, false); + UNIT_ASSERT(digest.contains(id)); + UNIT_ASSERT_VALUES_EQUAL(outBuffer.size(), id.BlobSize()); + TString s = outBuffer.ConvertToString(); + UNIT_ASSERT_VALUES_EQUAL(Crc32c(s.data(), s.size()), digest[id]); + + ps = {}; + info->Type.SplitData(TErasureType::CrcModeNone, outBuffer, ps); + for (ui32 i = 0; i < parts.size(); ++i) { + if (mask >> i & 1) { + auto& part = ps.Parts[i]; + UNIT_ASSERT_VALUES_EQUAL(parts[i], TStringBuf(part.Bytes, part.Size)); + } + } + + UNIT_ASSERT_EQUAL(info->GetQuorumChecker().GetBlobState(layout, {&info->GetTopology()}), TBlobStorageGroupInfo::EBS_FULL); + }; + + for (const TLogoBlobID& id : putBlobs) { + checkBlob(id); + } + } } diff --git a/ydb/core/util/testactorsys.h b/ydb/core/util/testactorsys.h index f6531402e5..147a04a950 100644 --- a/ydb/core/util/testactorsys.h +++ b/ydb/core/util/testactorsys.h @@ -147,6 +147,7 @@ class TTestActorSystem { std::unordered_map<TString, TActorStats> ActorStats; std::unordered_map<IActor*, TString> ActorName; +public: class TEdgeActor : public TActor<TEdgeActor> { std::unique_ptr<IEventHandle> *HandlePtr = nullptr; TString Tag; |