aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAlexander Rutkovsky <alexvru@ydb.tech>2024-01-31 11:54:31 +0300
committerGitHub <noreply@github.com>2024-01-31 11:54:31 +0300
commit03d8b00cb82919ad02a176fc7b6d32f678b7698f (patch)
tree3ed69787a4dfca83e5293289a23f23f956c31eb7
parent94651fc6c13f632932aaa1a57a1c790223452a98 (diff)
downloadydb-03d8b00cb82919ad02a176fc7b6d32f678b7698f.tar.gz
Extra test for patching logic (#1446)
-rw-r--r--ydb/core/blobstorage/ut_blobstorage/patch.cpp299
-rw-r--r--ydb/core/util/testactorsys.h1
2 files changed, 300 insertions, 0 deletions
diff --git a/ydb/core/blobstorage/ut_blobstorage/patch.cpp b/ydb/core/blobstorage/ut_blobstorage/patch.cpp
index b35e3930df..c0246738ee 100644
--- a/ydb/core/blobstorage/ut_blobstorage/patch.cpp
+++ b/ydb/core/blobstorage/ut_blobstorage/patch.cpp
@@ -1,5 +1,8 @@
#include <ydb/core/blobstorage/ut_blobstorage/lib/env.h>
#include <ydb/core/blobstorage/ut_blobstorage/lib/common.h>
+#include <ydb/core/blobstorage/groupinfo/blobstorage_groupinfo_partlayout.h>
+#include <ydb/core/util/lz4_data_generator.h>
+#include <library/cpp/digest/crc32c/crc32c.h>
Y_UNIT_TEST_SUITE(BlobPatching) {
@@ -201,5 +204,301 @@ Y_UNIT_TEST_SUITE(BlobPatching) {
}
});
}
+
+ Y_UNIT_TEST(PatchBlock42) {
+ TEnvironmentSetup env{{
+ .NodeCount = 8,
+ .Erasure = TBlobStorageGroupType::Erasure4Plus2Block
+ }};
+ auto& runtime = env.Runtime;
+ env.CreateBoxAndPool(1, 2);
+ env.Sim(TDuration::Seconds(60));
+
+ std::vector<ui32> groups = env.GetGroups();
+ UNIT_ASSERT_VALUES_EQUAL(groups.size(), 2);
+
+ std::unordered_set<ui32> stoppedNodes;
+ std::unordered_set<ui32> runningNodes;
+ std::vector<TLogoBlobID> putBlobs;
+ std::unordered_map<TLogoBlobID, ui32> digest;
+ ui64 tabletId = 1'000'000'000;
+ ui32 generation = 1;
+ ui32 step = 1;
+ ui8 channel = 5;
+
+ ui32 numPatchedBlobs = 0;
+
+ for (ui32 nodeId : runtime->GetNodes()) {
+ if (nodeId != 1) {
+ runningNodes.insert(nodeId);
+ }
+ }
+
+ auto stopNode = [&] {
+ UNIT_ASSERT(runningNodes.size());
+ auto it = runningNodes.begin();
+ std::advance(it, RandomNumber(runningNodes.size()));
+ env.StopNode(*it);
+ Cerr << "*** stopped node " << *it << Endl;
+ stoppedNodes.insert(runningNodes.extract(it));
+ };
+
+ auto startNode = [&] {
+ UNIT_ASSERT(stoppedNodes.size());
+ auto it = stoppedNodes.begin();
+ std::advance(it, RandomNumber(stoppedNodes.size()));
+ env.StartNode(*it);
+ Cerr << "*** started node " << *it << Endl;
+ runningNodes.insert(stoppedNodes.extract(it));
+ };
+
+ auto sim = [&](TActorId edgeId) {
+ std::unique_ptr<IEventHandle> res;
+ auto *edge = dynamic_cast<TTestActorSystem::TEdgeActor*>(runtime->GetActor(edgeId));
+ Y_VERIFY(edge);
+ edge->WaitForEvent(&res);
+
+ bool stopped = false;
+ bool started = false;
+
+ while (!res) {
+ bool iteration = true;
+ runtime->Sim([&] { return std::exchange(iteration, false); });
+ if (stoppedNodes.size() && RandomNumber(1000u) == 0 && !started) {
+ startNode();
+ started = true;
+ } else if (stoppedNodes.size() < 2 && RandomNumber(1000u) == 0 && !stopped) {
+ stopNode();
+ stopped = true;
+ }
+ }
+
+ edge->StopWaitingForEvent();
+ return res;
+ };
+
+ auto dumpContent = [&](std::string_view data) -> TString {
+ TStringStream s;
+ s << '[';
+
+ const ui32 step = 4096;
+ for (ui32 offset = 0; offset < data.size(); offset += step) {
+ auto fragment = data.substr(offset, step);
+ if (offset) {
+ s << ' ';
+ }
+ s << Sprintf("%08" PRIx32, Crc32c(fragment.data(), fragment.size()));
+ }
+ s << ']';
+ return s.Str();
+ };
+
+ auto putBlob = [&] {
+ const TActorId sender = runtime->AllocateEdgeActor(1, __FILE__, __LINE__);
+ ui32 count = 1 + RandomNumber(3u);
+ std::unordered_map<TLogoBlobID, TString> content;
+ for (ui32 n = 0; n < count; ++n) {
+ const ui32 size = 16384;
+ const TLogoBlobID id(tabletId, generation, step, channel, size, 0);
+ ++step;
+ TString data = FastGenDataForLZ4(size, id.Hash());
+ content[id] = dumpContent(data);
+ digest.emplace(id, Crc32c(data.data(), data.size()));
+ runtime->WrapInActorContext(sender, [&] {
+ SendToBSProxy(sender, groups[0], new TEvBlobStorage::TEvPut(id, TRcBuf(data), TInstant::Max()));
+ });
+ }
+ for (ui32 n = 0; n < count; ++n) {
+ auto res = sim(sender);
+ auto *m = res->Get<TEvBlobStorage::TEvPutResult>();
+ if (m->Status == NKikimrProto::OK) {
+ Cerr << "*** put blob Id# " << m->Id << " content# " << content[m->Id] << Endl;
+ putBlobs.push_back(m->Id);
+ }
+ }
+ runtime->DestroyActor(sender);
+ };
+
+ auto restoreBlob = [&] {
+ const TLogoBlobID blobId = putBlobs[RandomNumber(putBlobs.size())];
+ const TActorId sender = runtime->AllocateEdgeActor(1, __FILE__, __LINE__);
+ runtime->WrapInActorContext(sender, [&] {
+ SendToBSProxy(sender, groups[0], new TEvBlobStorage::TEvGet(blobId, 0, 0, TInstant::Max(),
+ NKikimrBlobStorage::EGetHandleClass::FastRead, true, true));
+ });
+ auto res = env.WaitForEdgeActorEvent<TEvBlobStorage::TEvGetResult>(sender);
+ if (res->Get()->Status == NKikimrProto::OK) {
+ UNIT_ASSERT_VALUES_EQUAL(res->Get()->ResponseSz, 1);
+ UNIT_ASSERT_VALUES_EQUAL(res->Get()->Responses[0].Status, NKikimrProto::OK);
+ Cerr << "*** restore blob Id# " << blobId << Endl;
+ }
+ };
+
+ auto patchBlob = [&] {
+ const TLogoBlobID originalId = putBlobs[RandomNumber(putBlobs.size())];
+ const ui32 targetGroup = groups[RandomNumber(groups.size())];
+ const TActorId sender = runtime->AllocateEdgeActor(1, __FILE__, __LINE__);
+ TLogoBlobID patchedId(tabletId, generation, step, channel, originalId.BlobSize(), 0);
+ ++step;
+ const bool success = TEvBlobStorage::TEvPatch::GetBlobIdWithSamePlacement(originalId, &patchedId, 0xff,
+ groups[0], targetGroup);
+ UNIT_ASSERT(targetGroup != groups[0] || success);
+ using TDiff = TEvBlobStorage::TEvPatch::TDiff;
+ std::vector<TDiff> diffs;
+
+ TString data;
+ {
+ const TActorId sender = runtime->AllocateEdgeActor(1, __FILE__, __LINE__);
+ runtime->WrapInActorContext(sender, [&] {
+ SendToBSProxy(sender, groups[0], new TEvBlobStorage::TEvGet(originalId, 0, 0, TInstant::Max(),
+ NKikimrBlobStorage::EGetHandleClass::FastRead));
+ });
+ auto res = env.WaitForEdgeActorEvent<TEvBlobStorage::TEvGetResult>(sender);
+ if (res->Get()->Status != NKikimrProto::OK) {
+ return;
+ }
+ UNIT_ASSERT_VALUES_EQUAL(res->Get()->ResponseSz, 1);
+ UNIT_ASSERT_VALUES_EQUAL(res->Get()->Responses[0].Status, NKikimrProto::OK);
+ data = res->Get()->Responses[0].Buffer.ConvertToString();
+ Cerr << "*** got " << originalId << " content# " << dumpContent(data) << Endl;
+ UNIT_ASSERT_VALUES_EQUAL(data.size(), originalId.BlobSize());
+ UNIT_ASSERT(digest.contains(originalId));
+ UNIT_ASSERT_VALUES_EQUAL(Crc32c(data.data(), data.size()), digest[originalId]);
+ }
+
+ for (ui32 offset = 0; offset < originalId.BlobSize(); offset += 4096) {
+ if (RandomNumber(2u) || (offset + 4096 == originalId.BlobSize() && diffs.empty())) {
+ TString chunk = FastGenDataForLZ4(4096, patchedId.Hash());
+ TDiff diff;
+ diff.Set(TRcBuf(chunk), offset);
+ diffs.push_back(diff);
+ memcpy(data.Detach() + offset, chunk.data(), chunk.size());
+ }
+ }
+ digest.emplace(patchedId, Crc32c(data.data(), data.size()));
+ runtime->WrapInActorContext(sender, [&] {
+ TArrayHolder<TDiff> ptr(new TDiff[diffs.size()]);
+ std::copy(diffs.begin(), diffs.end(), ptr.Get());
+ SendToBSProxy(sender, targetGroup, new TEvBlobStorage::TEvPatch(groups[0], originalId, patchedId,
+ 0xff, std::move(ptr), diffs.size(), TInstant::Max()));
+ });
+ auto res = sim(sender);
+ if (res->Get<TEvBlobStorage::TEvPatchResult>()->Status == NKikimrProto::OK) {
+ Cerr << "*** patched OriginalId# " << originalId << " to PatchedId# " << patchedId <<
+ " content# " << dumpContent(data) << Endl;
+ ++numPatchedBlobs;
+ if (targetGroup == groups[0]) {
+ putBlobs.push_back(patchedId);
+ }
+ }
+ };
+
+ while (putBlobs.size() < 1000 && numPatchedBlobs < 10000) {
+ const ui32 canStop = stoppedNodes.size() < 2 ? 10 : 0;
+ const ui32 canStart = stoppedNodes.size() ? 10 : 0;
+ const ui32 canPut = putBlobs.size() < 1000 ? 100 : 0;
+ const ui32 canRestore = putBlobs.size() ? 50 : 0;
+ const ui32 canPatch = putBlobs.size() ? 2000 : 0;
+ const ui32 canWait = 100;
+ i32 w = RandomNumber(canStop + canStart + canPut + canPatch + canWait);
+ if ((w -= canStop) < 0) {
+ stopNode();
+ env.Sim(TDuration::Seconds(5));
+ } else if ((w -= canStart) < 0) {
+ startNode();
+ env.Sim(TDuration::Seconds(5));
+ } else if ((w -= canPut) < 0) {
+ putBlob();
+ } else if ((w -= canRestore) < 0) {
+ restoreBlob();
+ } else if ((w -= canPatch) < 0) {
+ patchBlob();
+ } else if ((w -= canWait) < 0) {
+ env.Sim(TDuration::Seconds(20));
+ } else {
+ UNIT_FAIL("unexpected scenario");
+ }
+ }
+
+ for (const ui32 nodeId : stoppedNodes) {
+ env.StartNode(nodeId);
+ }
+
+ auto info = env.GetGroupInfo(groups[0]);
+ std::vector<TActorId> queues;
+ for (ui32 i = 0; i < info->GetTotalVDisksNum(); ++i) {
+ queues.push_back(env.CreateQueueActor(info->GetVDiskId(i), NKikimrBlobStorage::EVDiskQueueId::GetFastRead, 1000));
+ }
+
+ const TActorId sender = runtime->AllocateEdgeActor(1, __FILE__, __LINE__);
+
+ auto checkBlob = [&](TLogoBlobID id) {
+ Cerr << "*** checking blob " << id << Endl;
+
+ std::vector<TString> parts(info->Type.TotalPartCount());
+ ui32 mask = 0;
+ TSubgroupPartLayout layout;
+ ui32 writtenParts = 0;
+
+ for (ui32 i = 0; i < info->GetTotalVDisksNum(); ++i) {
+ const TVDiskID& vdiskId = info->GetVDiskId(i);
+ runtime->Send(new IEventHandle(queues[i], sender, TEvBlobStorage::TEvVGet::CreateExtremeDataQuery(vdiskId,
+ TInstant::Max(), NKikimrBlobStorage::EGetHandleClass::FastRead, TEvBlobStorage::TEvVGet::EFlags::None,
+ Nothing(), {{id}}).release()), sender.NodeId());
+
+ auto r = env.WaitForEdgeActorEvent<TEvBlobStorage::TEvVGetResult>(sender, false);
+ auto& record = r->Get()->Record;
+ UNIT_ASSERT_VALUES_EQUAL(record.GetStatus(), NKikimrProto::OK);
+ for (auto& res : record.GetResult()) {
+ if (res.GetStatus() == NKikimrProto::OK) {
+ TString buffer = r->Get()->GetBlobData(res).ConvertToString();
+ const TLogoBlobID id(LogoBlobIDFromLogoBlobID(res.GetBlobID()));
+ const ui32 partIdx = id.PartId() - 1;
+ if (parts[partIdx]) {
+ UNIT_ASSERT_VALUES_EQUAL(parts[partIdx], buffer);
+ }
+ parts[partIdx] = std::move(buffer);
+ layout.AddItem(info->GetIdxInSubgroup(vdiskId, id.Hash()), partIdx, info->Type);
+ mask |= 1 << partIdx;
+ ++writtenParts;
+ }
+ }
+ }
+
+ Cerr << " writtenParts# " << writtenParts << Endl;
+
+ TDataPartSet ps;
+ ps.FullDataSize = id.BlobSize();
+ ps.PartsMask = mask;
+ ps.Parts.resize(parts.size());
+ for (ui32 i = 0; i < parts.size(); ++i) {
+ if (mask >> i & 1) {
+ ps.Parts[i].ResetToWhole(TRope(parts[i]));
+ }
+ }
+
+ TRope outBuffer;
+ info->Type.RestoreData(TErasureType::CrcModeNone, ps, outBuffer, false, true, false);
+ UNIT_ASSERT(digest.contains(id));
+ UNIT_ASSERT_VALUES_EQUAL(outBuffer.size(), id.BlobSize());
+ TString s = outBuffer.ConvertToString();
+ UNIT_ASSERT_VALUES_EQUAL(Crc32c(s.data(), s.size()), digest[id]);
+
+ ps = {};
+ info->Type.SplitData(TErasureType::CrcModeNone, outBuffer, ps);
+ for (ui32 i = 0; i < parts.size(); ++i) {
+ if (mask >> i & 1) {
+ auto& part = ps.Parts[i];
+ UNIT_ASSERT_VALUES_EQUAL(parts[i], TStringBuf(part.Bytes, part.Size));
+ }
+ }
+
+ UNIT_ASSERT_EQUAL(info->GetQuorumChecker().GetBlobState(layout, {&info->GetTopology()}), TBlobStorageGroupInfo::EBS_FULL);
+ };
+
+ for (const TLogoBlobID& id : putBlobs) {
+ checkBlob(id);
+ }
+ }
}
diff --git a/ydb/core/util/testactorsys.h b/ydb/core/util/testactorsys.h
index f6531402e5..147a04a950 100644
--- a/ydb/core/util/testactorsys.h
+++ b/ydb/core/util/testactorsys.h
@@ -147,6 +147,7 @@ class TTestActorSystem {
std::unordered_map<TString, TActorStats> ActorStats;
std::unordered_map<IActor*, TString> ActorName;
+public:
class TEdgeActor : public TActor<TEdgeActor> {
std::unique_ptr<IEventHandle> *HandlePtr = nullptr;
TString Tag;