diff options
| author | Sergey Belyakov <[email protected]> | 2026-04-24 10:19:34 +0300 |
|---|---|---|
| committer | GitHub <[email protected]> | 2026-04-24 10:19:34 +0300 |
| commit | 0070166bd0cebfc656d8a8d95c27cce91ceda2c6 (patch) | |
| tree | d9cffa3ff5d67348052e25ace8dc26d5511f659d | |
| parent | a8f115547d190dd2c4e7ab3ca59a4507e7c4dd77 (diff) | |
Introduce persistent PhantomFlagStorage (#37626)
37 files changed, 1154 insertions, 67 deletions
diff --git a/intelliboba.vsix b/intelliboba.vsix Binary files differnew file mode 100644 index 00000000000..0d95ac75f47 --- /dev/null +++ b/intelliboba.vsix diff --git a/ydb/core/base/blobstorage.h b/ydb/core/base/blobstorage.h index 173e3173545..99810da17d4 100644 --- a/ydb/core/base/blobstorage.h +++ b/ydb/core/base/blobstorage.h @@ -783,6 +783,9 @@ struct TEvBlobStorage { EvChunkWriteRaw, EvStartCompactionFromDefrag, EvSyncerFullSyncFinished, + EvPhantomFlagStorageWriteItems, + EvPhantomFlagStorageCommitData, + EvPhantomFlagStorageDrop, EvYardInitResult = EvPut + 9 * 512, /// 268 636 672 EvLogResult, diff --git a/ydb/core/blobstorage/nodewarden/node_warden_impl.cpp b/ydb/core/blobstorage/nodewarden/node_warden_impl.cpp index ff189bf15e5..aae34d3be0d 100644 --- a/ydb/core/blobstorage/nodewarden/node_warden_impl.cpp +++ b/ydb/core/blobstorage/nodewarden/node_warden_impl.cpp @@ -61,6 +61,7 @@ TNodeWarden::TNodeWarden(const TIntrusivePtr<TNodeWardenConfig> &cfg) , ThrottlingMaxLogChunkCount(130, 1, 100000) , MaxInProgressSyncCount(0, 0, 1000) , EnablePhantomFlagStorage(1, 0, 1) + , EnablePersistentPhantomFlagStorage(0, 0, 1) , PhantomFlagStorageLimitPerVDiskBytes(10'000'000, 0, 100'000'000'000) , EnableChunkKeeper(0, 0, 1) , MaxCommonLogChunksHDD(NPDisk::MaxCommonLogChunks, 1, 1'000'000) @@ -432,6 +433,7 @@ void TNodeWarden::Bootstrap() { TControlBoard::RegisterSharedControl(MaxInProgressSyncCount, icb->VDiskControls.MaxInProgressSyncCount); TControlBoard::RegisterSharedControl(EnablePhantomFlagStorage, icb->VDiskControls.EnablePhantomFlagStorage); + TControlBoard::RegisterSharedControl(EnablePersistentPhantomFlagStorage, icb->VDiskControls.EnablePersistentPhantomFlagStorage); TControlBoard::RegisterSharedControl(PhantomFlagStorageLimitPerVDiskBytes, icb->VDiskControls.PhantomFlagStorageLimitPerVDiskBytes); TControlBoard::RegisterSharedControl(EnableChunkKeeper, icb->VDiskControls.EnableChunkKeeper); diff --git a/ydb/core/blobstorage/nodewarden/node_warden_impl.h b/ydb/core/blobstorage/nodewarden/node_warden_impl.h index 29c37d801aa..34a73ab097a 100644 --- a/ydb/core/blobstorage/nodewarden/node_warden_impl.h +++ b/ydb/core/blobstorage/nodewarden/node_warden_impl.h @@ -242,6 +242,7 @@ namespace NKikimr::NStorage { TControlWrapper MaxInProgressSyncCount; TControlWrapper EnablePhantomFlagStorage; + TControlWrapper EnablePersistentPhantomFlagStorage; TControlWrapper PhantomFlagStorageLimitPerVDiskBytes; TControlWrapper EnableChunkKeeper; diff --git a/ydb/core/blobstorage/nodewarden/node_warden_vdisk.cpp b/ydb/core/blobstorage/nodewarden/node_warden_vdisk.cpp index fed226301bf..89d3251dc33 100644 --- a/ydb/core/blobstorage/nodewarden/node_warden_vdisk.cpp +++ b/ydb/core/blobstorage/nodewarden/node_warden_vdisk.cpp @@ -276,6 +276,7 @@ namespace NKikimr::NStorage { vdiskConfig->MaxInProgressSyncCount = MaxInProgressSyncCount; vdiskConfig->EnablePhantomFlagStorage = EnablePhantomFlagStorage; + vdiskConfig->EnablePersistentPhantomFlagStorage = EnablePersistentPhantomFlagStorage; vdiskConfig->PhantomFlagStorageLimit = PhantomFlagStorageLimitPerVDiskBytes; vdiskConfig->EnableChunkKeeper = EnableChunkKeeper; diff --git a/ydb/core/blobstorage/ut_blobstorage/lib/debug_log.cpp b/ydb/core/blobstorage/ut_blobstorage/lib/debug_log.cpp index 9bf79e1b42f..dbf84c3260e 100644 --- a/ydb/core/blobstorage/ut_blobstorage/lib/debug_log.cpp +++ b/ydb/core/blobstorage/ut_blobstorage/lib/debug_log.cpp @@ -35,4 +35,5 @@ const std::initializer_list<ui32> TEnvironmentSetup::DebugLogComponents{ // NKikimrServices::BS_VDISK_BALANCING, // NKikimrServices::BS_PHANTOM_FLAG_STORAGE, // NKikimrServices::BS_CHUNK_KEEPER, +// NKikimrServices::BS_PHANTOM_FLAG_PROCESSOR, }; diff --git a/ydb/core/blobstorage/ut_blobstorage/lib/env.h b/ydb/core/blobstorage/ut_blobstorage/lib/env.h index 5aff48ff4b3..59630bf6493 100644 --- a/ydb/core/blobstorage/ut_blobstorage/lib/env.h +++ b/ydb/core/blobstorage/ut_blobstorage/lib/env.h @@ -76,6 +76,7 @@ struct TEnvironmentSetup { const TDuration MaxPutTimeoutDSProxy = TDuration::Seconds(60); const bool StartFakeWilsonCollectors = false; const bool EnableChunkKeeper = true; + const bool EnablePersistentPhantomFlagStorage = false; }; const TSettings Settings; @@ -572,6 +573,7 @@ config: ADD_ICB_CONTROL(PDiskControls.MaxActiveCompactionsPerPDisk, 0, 0, 1'000'000, 0); ADD_ICB_CONTROL(VDiskControls.GarbageThresholdToRunFullCompactionPerMille, 0, 0, 300, 0); ADD_ICB_CONTROL(VDiskControls.EnablePhantomFlagStorage, true, false, true, Settings.EnablePhantomFlagStorage); + ADD_ICB_CONTROL(VDiskControls.EnablePersistentPhantomFlagStorage, false, false, true, Settings.EnablePersistentPhantomFlagStorage); ADD_ICB_CONTROL(VDiskControls.PhantomFlagStorageLimitPerVDiskBytes, 10'000'000, 0, 100'000'000'000, Settings.PhantomFlagStorageLimitPerVDiskBytes); ADD_ICB_CONTROL(VDiskControls.EnableChunkKeeper, true, false, true, Settings.EnableChunkKeeper); ADD_ICB_CONTROL(VDiskControls.HullCompFreeSpaceThresholdPerMille, 2000, 0, 100'000, 2000); diff --git a/ydb/core/blobstorage/ut_blobstorage/phantom_blobs.cpp b/ydb/core/blobstorage/ut_blobstorage/phantom_blobs.cpp index 75d5c347951..2e87a68581c 100644 --- a/ydb/core/blobstorage/ut_blobstorage/phantom_blobs.cpp +++ b/ydb/core/blobstorage/ut_blobstorage/phantom_blobs.cpp @@ -17,6 +17,7 @@ Y_UNIT_TEST_SUITE(PhantomBlobs) { EOnline Online; bool PhantomFlagStorageEnabled; ui64 MemoryLimit = 10_MB; + bool PersistentPhantomFlagStorageEnabled = false; }; struct TTestCtx : public TTestCtxBase { @@ -38,6 +39,8 @@ Y_UNIT_TEST_SUITE(PhantomBlobs) { NodeStates[nodeId - 1].PhantomFlagStorageEnabled); Env->SetIcbControl(nodeId, "VDiskControls.PhantomFlagStorageLimitPerVDiskBytes", NodeStates[nodeId - 1].MemoryLimit); + Env->SetIcbControl(nodeId, "VDiskControls.EnablePersistentPhantomFlagStorage", + NodeStates[nodeId - 1].PersistentPhantomFlagStorageEnabled); } } @@ -306,27 +309,28 @@ Y_UNIT_TEST_SUITE(PhantomBlobs) { }; std::vector<TNodeState> GetStates(TBlobStorageGroupType erasure, EOnline online, - bool phantomFlagStorageEnabled, ui64 memoryLimit) { + bool phantomFlagStorageEnabled, bool isPersistent, ui64 memoryLimit) { return std::vector<TNodeState>(erasure.BlobSubgroupSize(), TNodeState{ .Online = online, .PhantomFlagStorageEnabled = phantomFlagStorageEnabled, - .MemoryLimit = memoryLimit + .MemoryLimit = memoryLimit, + .PersistentPhantomFlagStorageEnabled = isPersistent, }); } std::vector<TNodeState> GetStatesAllAlive(TBlobStorageGroupType erasure, ui64 memoryLimit) { - return GetStates(erasure, EOnline::Alive, true, memoryLimit); + return GetStates(erasure, EOnline::Alive, true, false, memoryLimit); } std::vector<TNodeState> GetStatesOneDead(TBlobStorageGroupType erasure, ui64 memoryLimit) { - std::vector<TNodeState> states = GetStates(erasure, EOnline::Alive, true, memoryLimit); + std::vector<TNodeState> states = GetStates(erasure, EOnline::Alive, true, false, memoryLimit); states[0].Online = EOnline::Dead; return states; } std::vector<TNodeState> GetStatesTwoDead(TBlobStorageGroupType erasure, ui64 memoryLimit) { - std::vector<TNodeState> states = GetStates(erasure, EOnline::Alive, true, memoryLimit); + std::vector<TNodeState> states = GetStates(erasure, EOnline::Alive, true, false, memoryLimit); states[0].Online = EOnline::Dead; states[4].Online = EOnline::Dead; return states; @@ -334,14 +338,14 @@ Y_UNIT_TEST_SUITE(PhantomBlobs) { std::vector<TNodeState> GetStatesOneDeadAllRestart(TBlobStorageGroupType erasure, ui64 memoryLimit) { - std::vector<TNodeState> states = GetStates(erasure, EOnline::Restart, true, memoryLimit); + std::vector<TNodeState> states = GetStates(erasure, EOnline::Restart, true, true, memoryLimit); states[0].Online = EOnline::Dead; return states; } std::vector<TNodeState> GetStatesOneDeadActiveOneDeadInactive(TBlobStorageGroupType erasure, ui64 memoryLimit) { - std::vector<TNodeState> states = GetStates(erasure, EOnline::Alive, true, memoryLimit); + std::vector<TNodeState> states = GetStates(erasure, EOnline::Alive, true, false, memoryLimit); states[0].Online = EOnline::Dead; states[0].PhantomFlagStorageEnabled = false; states[4].Online = EOnline::Dead; @@ -351,7 +355,7 @@ Y_UNIT_TEST_SUITE(PhantomBlobs) { std::vector<TNodeState> GetStatesTwoDeadInactive(TBlobStorageGroupType erasure, ui64 memoryLimit) { - std::vector<TNodeState> states = GetStates(erasure, EOnline::Alive, true, memoryLimit); + std::vector<TNodeState> states = GetStates(erasure, EOnline::Alive, true, false, memoryLimit); states[0].Online = EOnline::Dead; states[0].PhantomFlagStorageEnabled = false; states[4].Online = EOnline::Dead; @@ -361,7 +365,7 @@ Y_UNIT_TEST_SUITE(PhantomBlobs) { std::vector<TNodeState> GetStatesTwoDeadAllAliveInactive(TBlobStorageGroupType erasure, ui64 memoryLimit) { - std::vector<TNodeState> states = GetStates(erasure, EOnline::Alive, false, memoryLimit); + std::vector<TNodeState> states = GetStates(erasure, EOnline::Alive, false, false, memoryLimit); states[0].Online = EOnline::Dead; states[0].PhantomFlagStorageEnabled = true; states[4].Online = EOnline::Dead; @@ -371,7 +375,7 @@ Y_UNIT_TEST_SUITE(PhantomBlobs) { std::vector<TNodeState> GetStatesTwoDeadSomeAliveInactive(TBlobStorageGroupType erasure, ui64 memoryLimit) { - std::vector<TNodeState> states = GetStates(erasure, EOnline::Alive, false, memoryLimit); + std::vector<TNodeState> states = GetStates(erasure, EOnline::Alive, false, false, memoryLimit); states[0].Online = EOnline::Dead; states[0].PhantomFlagStorageEnabled = true; states[1].PhantomFlagStorageEnabled = true; @@ -393,6 +397,7 @@ Y_UNIT_TEST_SUITE(PhantomBlobs) { .PDiskChunkSize = 32_MB, .EnablePhantomFlagStorage = false, .TinySyncLog = true, + .EnablePersistentPhantomFlagStorage = false, }, 100, 10000, nodeStates, expectPhantoms); if (nodeStates2) { ctx.RunTestWithUpdate(*nodeStates2); @@ -415,10 +420,9 @@ Y_UNIT_TEST_SUITE(PhantomBlobs) { TEST_PHANTOM_BLOBS(TwoDead, Mirror3dc, false, 10_KB); - // TODO (serg-belyakov@): persistent phantom flag storage - // TEST_PHANTOM_BLOBS(OneDeadAllRestart, Mirror3dc, false); - // TEST_PHANTOM_BLOBS(OneDeadAllRestart, Mirror3of4, false); - // TEST_PHANTOM_BLOBS(OneDeadAllRestart, 4Plus2Block, false); + TEST_PHANTOM_BLOBS(OneDeadAllRestart, Mirror3dc, false, 10_KB); + TEST_PHANTOM_BLOBS(OneDeadAllRestart, Mirror3of4, false, 10_KB); + TEST_PHANTOM_BLOBS(OneDeadAllRestart, 4Plus2Block, false, 10_KB); TEST_PHANTOM_BLOBS(TwoDeadInactive, Mirror3dc, false, 10_KB); TEST_PHANTOM_BLOBS(OneDeadActiveOneDeadInactive, Mirror3dc, false, 10_KB); @@ -429,26 +433,26 @@ Y_UNIT_TEST_SUITE(PhantomBlobs) { Y_UNIT_TEST(TestDisabling) { auto erasure = TBlobStorageGroupType::ErasureMirror3dc; - auto states1 = GetStates(erasure, EOnline::Alive, true, 10_KB); + auto states1 = GetStates(erasure, EOnline::Alive, true, false, 10_KB); states1[0].Online = EOnline::Dead; - auto states2 = GetStates(erasure, EOnline::Alive, false, 10_KB); + auto states2 = GetStates(erasure, EOnline::Alive, false, false, 10_KB); states2[0].Online = EOnline::Dead; Test(erasure, states1, states2, true); } Y_UNIT_TEST(TestEnabling) { auto erasure = TBlobStorageGroupType::ErasureMirror3dc; - auto states1 = GetStates(erasure, EOnline::Alive, false, 10_KB); + auto states1 = GetStates(erasure, EOnline::Alive, false, false, 10_KB); states1[0].Online = EOnline::Dead; - auto states2 = GetStates(erasure, EOnline::Alive, true, 10_KB); + auto states2 = GetStates(erasure, EOnline::Alive, true, false, 10_KB); states2[0].Online = EOnline::Dead; Test(erasure, states1, states2, true); } Y_UNIT_TEST(TestLoweringMemoryLimit) { auto erasure = TBlobStorageGroupType::ErasureMirror3dc; - auto states1 = GetStates(erasure, EOnline::Alive, true, 10_KB); + auto states1 = GetStates(erasure, EOnline::Alive, true, false, 10_KB); states1[0].Online = EOnline::Dead; - auto states2 = GetStates(erasure, EOnline::Alive, true, 100_B); + auto states2 = GetStates(erasure, EOnline::Alive, true, false, 100_B); states2[0].Online = EOnline::Dead; Test(erasure, states1, states2, true); } diff --git a/ydb/core/blobstorage/ut_vdisk/lib/test_synclog.cpp b/ydb/core/blobstorage/ut_vdisk/lib/test_synclog.cpp index a65a23a669d..b0d0ffc87a9 100644 --- a/ydb/core/blobstorage/ut_vdisk/lib/test_synclog.cpp +++ b/ydb/core/blobstorage/ut_vdisk/lib/test_synclog.cpp @@ -223,6 +223,7 @@ class TSyncLogTestWriteActor : public TActorBootstrapped<TSyncLogTestWriteActor> TestCtx->LoggerId, LogCutterId, TActorId{}, + TActorId{}, VDiskConfig->SyncLogMaxDiskAmount, VDiskConfig->SyncLogMaxEntryPointSize, VDiskConfig->SyncLogMaxMemAmount, @@ -230,6 +231,7 @@ class TSyncLogTestWriteActor : public TActorBootstrapped<TSyncLogTestWriteActor> Db->SyncLogFirstLsnToKeep, false, TControlWrapper(0, 0, 1), + false, TControlWrapper(20'000'000, 1, 100'000'000'000)); TestCtx->SyncLogId = ctx.Register(CreateSyncLogActor(slCtx, Conf->GroupInfo, TestCtx->SelfVDiskId, std::move(repaired))); // Send Db birth lsn diff --git a/ydb/core/blobstorage/vdisk/chunk_keeper/chunk_keeper_events.cpp b/ydb/core/blobstorage/vdisk/chunk_keeper/chunk_keeper_events.cpp index a7514929a86..242e050a8b6 100644 --- a/ydb/core/blobstorage/vdisk/chunk_keeper/chunk_keeper_events.cpp +++ b/ydb/core/blobstorage/vdisk/chunk_keeper/chunk_keeper_events.cpp @@ -13,6 +13,23 @@ TEvChunkKeeperAllocateResult::TEvChunkKeeperAllocateResult(std::optional<ui32> c , ErrorReason(errorReason) {} +TString TEvChunkKeeperAllocateResult::ToString() const { + TStringStream str; + str << "TEvChunkKeeperAllocateResult{"; + str << " ChunkIdx# "; + if (ChunkIdx) { + str << *ChunkIdx; + } else { + str << "<nullopt>"; + } + str << " Status# " << NKikimrProto::EReplyStatus_Name(Status); + if (!ErrorReason.empty()) { + str << " ErrorReason# " << ErrorReason; + } + str << " }"; + return str.Str(); +} + TEvChunkKeeperFree::TEvChunkKeeperFree(ui32 chunkIdx, TSubsystem subsystem) : ChunkIdx(chunkIdx) , Subsystem(subsystem) @@ -26,6 +43,18 @@ TEvChunkKeeperFreeResult::TEvChunkKeeperFreeResult(ui32 chunkIdx, NKikimrProto:: , ErrorReason(errorReason) {} +TString TEvChunkKeeperFreeResult::ToString() const { + TStringStream str; + str << "TEvChunkKeeperFreeResult{"; + str << " ChunkIdx# " << ChunkIdx; + str << " Status# " << NKikimrProto::EReplyStatus_Name(Status); + if (!ErrorReason.empty()) { + str << " ErrorReason# " << ErrorReason; + } + str << " }"; + return str.Str(); +} + TEvChunkKeeperDiscover::TEvChunkKeeperDiscover(TSubsystem subsystem) : Subsystem(subsystem) {} @@ -38,4 +67,21 @@ TEvChunkKeeperDiscoverResult::TEvChunkKeeperDiscoverResult(std::vector<TChunkInf , ErrorReason(errorReason) {} +TString TEvChunkKeeperDiscoverResult::ToString() const { + TStringStream str; + str << "TEvChunkKeeperDiscoverResult{"; + str << " Status# " << NKikimrProto::EReplyStatus_Name(Status); + if (!ErrorReason.empty()) { + str << " ErrorReason# " << ErrorReason; + } + str << " ChunkCount# " << Chunks.size(); + str << " Chunks# ["; + for (const TChunkInfo& chunk : Chunks) { + str << " { ChunkIdx# " << chunk.ChunkIdx; + str << " ShredRequested# " << chunk.ShredRequested << " }"; + } + str << " ] }"; + return str.Str(); +} + } // namespace NKikimr diff --git a/ydb/core/blobstorage/vdisk/chunk_keeper/chunk_keeper_events.h b/ydb/core/blobstorage/vdisk/chunk_keeper/chunk_keeper_events.h index 7e27de8f2f0..03ca7197601 100644 --- a/ydb/core/blobstorage/vdisk/chunk_keeper/chunk_keeper_events.h +++ b/ydb/core/blobstorage/vdisk/chunk_keeper/chunk_keeper_events.h @@ -49,6 +49,8 @@ struct TEvChunkKeeperAllocateResult : TEventLocal<TEvChunkKeeperAllocateResult, TEvChunkKeeperAllocateResult(std::optional<ui32> chunkIdx, NKikimrProto::EReplyStatus status, TString errorReason = ""); + + TString ToString() const; }; struct TEvChunkKeeperFree : TEventLocal<TEvChunkKeeperFree, TEvBlobStorage::EvChunkKeeperFree> { @@ -66,6 +68,8 @@ struct TEvChunkKeeperFreeResult : TEventLocal<TEvChunkKeeperFreeResult, TEvBlobS TString ErrorReason; TEvChunkKeeperFreeResult(ui32 chunkIdx, NKikimrProto::EReplyStatus status, TString errorReason = ""); + + TString ToString() const; }; struct TEvChunkKeeperDiscover : TEventLocal<TEvChunkKeeperDiscover, TEvBlobStorage::EvChunkKeeperDiscover> { @@ -90,6 +94,8 @@ struct TEvChunkKeeperDiscoverResult : TEventLocal<TEvChunkKeeperDiscoverResult, TEvChunkKeeperDiscoverResult(std::vector<TChunkInfo>&& chunks, NKikimrProto::EReplyStatus status, TString errorReason = ""); + + TString ToString() const; }; } // namespace NKikimr diff --git a/ydb/core/blobstorage/vdisk/common/vdisk_config.h b/ydb/core/blobstorage/vdisk/common/vdisk_config.h index 330cabb7378..3b6e7f7fc13 100644 --- a/ydb/core/blobstorage/vdisk/common/vdisk_config.h +++ b/ydb/core/blobstorage/vdisk/common/vdisk_config.h @@ -278,6 +278,7 @@ namespace NKikimr { ///////////// SYNC SETTINGS ////////////////// TControlWrapper MaxInProgressSyncCount; TControlWrapper EnablePhantomFlagStorage; + bool EnablePersistentPhantomFlagStorage = false; TControlWrapper PhantomFlagStorageLimit; ///////////// CHUNK Keeper ////////////////// diff --git a/ydb/core/blobstorage/vdisk/skeleton/blobstorage_skeleton.cpp b/ydb/core/blobstorage/vdisk/skeleton/blobstorage_skeleton.cpp index 83eac9226e5..c4632041986 100644 --- a/ydb/core/blobstorage/vdisk/skeleton/blobstorage_skeleton.cpp +++ b/ydb/core/blobstorage/vdisk/skeleton/blobstorage_skeleton.cpp @@ -2038,6 +2038,7 @@ namespace NKikimr { Db->LoggerID, Db->LogCutterID, Db->SkeletonID, + Db->ChunkKeeperActorID, Config->SyncLogMaxDiskAmount, Config->SyncLogMaxEntryPointSize, Config->SyncLogMaxMemAmount, @@ -2045,6 +2046,7 @@ namespace NKikimr { Db->SyncLogFirstLsnToKeep, Config->BaseInfo.ReadOnly, Config->EnablePhantomFlagStorage, + Config->EnablePersistentPhantomFlagStorage, Config->PhantomFlagStorageLimit); Db->SyncLogID.Set(ctx.Register(CreateSyncLogActor(slCtx, GInfo, SelfVDiskId, std::move(repairedSyncLog)))); ActiveActors.Insert(Db->SyncLogID, __FILE__, __LINE__, ctx, NKikimrServices::BLOBSTORAGE); // keep forever diff --git a/ydb/core/blobstorage/vdisk/synclog/blobstorage_synclog_context.h b/ydb/core/blobstorage/vdisk/synclog/blobstorage_synclog_context.h index 56a7d8156f1..0e5ba1062fe 100644 --- a/ydb/core/blobstorage/vdisk/synclog/blobstorage_synclog_context.h +++ b/ydb/core/blobstorage/vdisk/synclog/blobstorage_synclog_context.h @@ -45,6 +45,7 @@ public: const TActorId LoggerID; const TActorId LogCutterID; const TActorId SkeletonId; + const TActorId ChunkKeeperId; const ui64 SyncLogMaxDiskAmount; const ui64 SyncLogMaxEntryPointSize; @@ -59,6 +60,7 @@ public: const bool IsReadOnlyVDisk; TControlWrapper EnablePhantomFlagStorage; + bool EnablePersistentPhantomFlagStorage; TControlWrapper PhantomFlagStorageLimit; TSyncLogCtx(TIntrusivePtr<TVDiskContext> vctx, @@ -67,6 +69,7 @@ public: const TActorId &loggerId, const TActorId &logCutterId, const TActorId& skeletonId, + const TActorId& chunkKeeperId, ui64 syncLogMaxDiskAmount, ui64 syncLogMaxEntryPointSize, ui64 syncLogMaxMemAmount, @@ -74,6 +77,7 @@ public: std::shared_ptr<TSyncLogFirstLsnToKeep> syncLogFirstLsnToKeep, bool isReadOnlyVDisk, const TControlWrapper& enablePhantomFlagStorage, + bool enablePersistentPhantomFlagStorage, const TControlWrapper& phantomFlagStorageLimit) : VCtx(std::move(vctx)) , LsnMngr(std::move(lsnMngr)) @@ -81,6 +85,7 @@ public: , LoggerID(loggerId) , LogCutterID(logCutterId) , SkeletonId(skeletonId) + , ChunkKeeperId(chunkKeeperId) , SyncLogMaxDiskAmount(syncLogMaxDiskAmount) , SyncLogMaxEntryPointSize(syncLogMaxEntryPointSize) , SyncLogMaxMemAmount(syncLogMaxMemAmount) @@ -91,6 +96,7 @@ public: , PhantomFlagStorageGroup(VCtx->VDiskCounters, "subsystem", "phantomflagstorage") , IsReadOnlyVDisk(isReadOnlyVDisk) , EnablePhantomFlagStorage(enablePhantomFlagStorage) + , EnablePersistentPhantomFlagStorage(enablePersistentPhantomFlagStorage) , PhantomFlagStorageLimit(phantomFlagStorageLimit) {} }; diff --git a/ydb/core/blobstorage/vdisk/synclog/blobstorage_synclog_private_events.cpp b/ydb/core/blobstorage/vdisk/synclog/blobstorage_synclog_private_events.cpp index bafc233992c..31a1386349e 100644 --- a/ydb/core/blobstorage/vdisk/synclog/blobstorage_synclog_private_events.cpp +++ b/ydb/core/blobstorage/vdisk/synclog/blobstorage_synclog_private_events.cpp @@ -13,6 +13,16 @@ namespace NKikimr { TEvSyncLogSnapshotResult::~TEvSyncLogSnapshotResult() = default; + TEvPhantomFlagStorageWriteItems::TEvPhantomFlagStorageWriteItems( + std::vector<TPhantomFlagStorageItem>&& items) + : Items(std::move(items)) + {} + + TEvPhantomFlagStorageCommitData::TEvPhantomFlagStorageCommitData( + const std::optional<TPhantomFlagStorageData>& data) + : Data(data) + {} + } // NSyncLog } // NKikimr diff --git a/ydb/core/blobstorage/vdisk/synclog/blobstorage_synclog_private_events.h b/ydb/core/blobstorage/vdisk/synclog/blobstorage_synclog_private_events.h index e64631d00db..8c7995e976a 100644 --- a/ydb/core/blobstorage/vdisk/synclog/blobstorage_synclog_private_events.h +++ b/ydb/core/blobstorage/vdisk/synclog/blobstorage_synclog_private_events.h @@ -3,6 +3,7 @@ #include "defs.h" #include "blobstorage_synclogdata.h" #include <ydb/core/blobstorage/groupinfo/blobstorage_groupinfo.h> +#include <ydb/core/blobstorage/vdisk/synclog/phantom_flag_storage/phantom_flag_storage_data.h> #include <ydb/core/blobstorage/vdisk/synclog/phantom_flag_storage/phantom_flag_storage_snapshot.h> #include <ydb/core/base/blobstorage.h> @@ -113,5 +114,28 @@ namespace NKikimr { , SyncedLsn(syncedLsn) {} }; + + struct TEvPhantomFlagStorageWriteItems + : public TEventLocal<TEvPhantomFlagStorageWriteItems, + TEvBlobStorage::EvPhantomFlagStorageWriteItems> + { + TEvPhantomFlagStorageWriteItems(std::vector<TPhantomFlagStorageItem>&& items); + + std::vector<TPhantomFlagStorageItem> Items; + }; + + struct TEvPhantomFlagStorageCommitData + : public TEventLocal<TEvPhantomFlagStorageCommitData, + TEvBlobStorage::EvPhantomFlagStorageCommitData> + { + TEvPhantomFlagStorageCommitData(const std::optional<TPhantomFlagStorageData>& data); + + std::optional<TPhantomFlagStorageData> Data; + }; + + struct TEvPhantomFlagStorageDrop + : public TEventLocal<TEvPhantomFlagStorageDrop, + TEvBlobStorage::EvPhantomFlagStorageDrop> + {}; } // NSyncLog } // NKikimr diff --git a/ydb/core/blobstorage/vdisk/synclog/blobstorage_synclogdata.cpp b/ydb/core/blobstorage/vdisk/synclog/blobstorage_synclogdata.cpp index 9a86377809e..06c2506f407 100644 --- a/ydb/core/blobstorage/vdisk/synclog/blobstorage_synclogdata.cpp +++ b/ydb/core/blobstorage/vdisk/synclog/blobstorage_synclogdata.cpp @@ -111,13 +111,15 @@ namespace NKikimr { ui64 logStartLsn, ui32 appendBlockSize, const TEntryPointDbgInfo &lastEntryPointDbgInfo, - const TSyncLogHeader &header) + const TSyncLogHeader &header, + const std::optional<TPhantomFlagStorageData>& phantomFlagStorageData) : DiskSnapPtr(diskSnapPtr) , MemSnapPtr(memSnapPtr) , LogStartLsn(logStartLsn) , AppendBlockSize(appendBlockSize) , LastEntryPointDbgInfo(lastEntryPointDbgInfo) , Header(header) + , PhantomFlagStorageData(phantomFlagStorageData) { CheckSnapshotConsistency(); // For debug } @@ -146,6 +148,10 @@ namespace NKikimr { pb.SetPDiskGuid(Header.PDiskGuid); pb.SetVDiskIncarnationGuid(Header.VDiskIncarnationGuid); pb.SetLogStartLsn(LogStartLsn); + if (PhantomFlagStorageData) { + auto* data = pb.MutablePhantomFlagStorageData(); + PhantomFlagStorageData->Serialize(data); + } // DiskRecLog TStringStream s; ui32 indexRecsNum = DiskSnapPtr->Serialize(s, delta); @@ -276,7 +282,8 @@ namespace NKikimr { LogStartLsn, DiskRecLog.AppendBlockSize, LastEntryPointDbgInfo, - Header)); + Header, + PhantomFlagStorageData)); } bool TSyncLog::CheckMemAndDiskRecLogsDoNotIntersect() const { @@ -398,6 +405,23 @@ namespace NKikimr { } //////////////////////////////////////////////////////////////////////// + // TSyncLog: PhantomFlagStorageData + //////////////////////////////////////////////////////////////////////// + void TSyncLog::UpdatePhantomFlagStorageData(std::optional<TPhantomFlagStorageData>&& data) { + PhantomFlagStorageData = std::move(data); + } + + TPhantomFlagStorageData TSyncLog::GetPhantomFlagStorageData() const { + TPhantomFlagStorageData res; + if (PhantomFlagStorageData) { + res = *PhantomFlagStorageData; + } else { + res.ChunkSize = GetChunkSize(); + } + return res; + } + + //////////////////////////////////////////////////////////////////////// // TSyncLog: PRIVATE //////////////////////////////////////////////////////////////////////// TSyncLog::TSyncLog(const TSyncLogHeader &header, @@ -505,6 +529,14 @@ namespace NKikimr { ChunksToDelete.push_back(pb.GetChunksToDeleteDelayed(i)); } + if (pb.HasPhantomFlagStorageData()) { + TPhantomFlagStorageData data; + data.Deserialize(pb.GetPhantomFlagStorageData()); + SyncLogPtr->UpdatePhantomFlagStorageData(std::move(data)); + } else { + SyncLogPtr->UpdatePhantomFlagStorageData(std::nullopt); + } + return true; } diff --git a/ydb/core/blobstorage/vdisk/synclog/blobstorage_synclogdata.h b/ydb/core/blobstorage/vdisk/synclog/blobstorage_synclogdata.h index de296a09418..baa24686792 100644 --- a/ydb/core/blobstorage/vdisk/synclog/blobstorage_synclogdata.h +++ b/ydb/core/blobstorage/vdisk/synclog/blobstorage_synclogdata.h @@ -7,6 +7,7 @@ #include <ydb/core/blobstorage/groupinfo/blobstorage_groupinfo.h> #include <ydb/core/blobstorage/vdisk/common/blobstorage_vdisk_guids.h> +#include <ydb/core/blobstorage/vdisk/synclog/phantom_flag_storage/phantom_flag_storage_data.h> #include <ydb/core/protos/blobstorage_vdisk_internal.pb.h> #include <ydb/core/base/blobstorage.h> @@ -105,6 +106,7 @@ namespace NKikimr { const ui32 AppendBlockSize; const TEntryPointDbgInfo LastEntryPointDbgInfo; const TSyncLogHeader Header; + const std::optional<TPhantomFlagStorageData> PhantomFlagStorageData; private: TSyncLogSnapshot(TDiskRecLogSnapshotPtr diskSnapPtr, @@ -112,7 +114,8 @@ namespace NKikimr { ui64 logStartLsn, ui32 appendBlockSize, const TEntryPointDbgInfo &lastEntryPointDbgInfo, - const TSyncLogHeader &header); + const TSyncLogHeader &header, + const std::optional<TPhantomFlagStorageData>& phantomFlagStorageData); friend class TSyncLog; }; @@ -239,6 +242,12 @@ namespace NKikimr { // returns chunks owned by DiskRecLog void GetOwnedChunks(TSet<TChunkIdx>& chunks) const; + //////////////////////////////////////////////////////////////////////// + // PhantomFlagStorage + //////////////////////////////////////////////////////////////////////// + void UpdatePhantomFlagStorageData(std::optional<TPhantomFlagStorageData>&& data); + TPhantomFlagStorageData GetPhantomFlagStorageData() const; + private: // part of the log on disk TDiskRecLog DiskRecLog; @@ -249,6 +258,7 @@ namespace NKikimr { ui64 LogStartLsn; // info about last serialized data written to the log as an entry point TEntryPointDbgInfo LastEntryPointDbgInfo; + std::optional<TPhantomFlagStorageData> PhantomFlagStorageData; TSyncLog(const TSyncLogHeader &header, TDiskRecLog &&diskRecLog, diff --git a/ydb/core/blobstorage/vdisk/synclog/blobstorage_synclogformat.h b/ydb/core/blobstorage/vdisk/synclog/blobstorage_synclogformat.h index 65371506729..3609d3b508d 100644 --- a/ydb/core/blobstorage/vdisk/synclog/blobstorage_synclogformat.h +++ b/ydb/core/blobstorage/vdisk/synclog/blobstorage_synclogformat.h @@ -20,6 +20,8 @@ namespace NKikimr { ui64 Raw[3]; // TLogoBlobID TIngress Ingress; + TLogoBlobRec() : TLogoBlobRec(TLogoBlobID(0, 0, 0), 0) {} + explicit TLogoBlobRec(const TLogoBlobID &id, ui64 ingressRaw) : Ingress(ingressRaw) { diff --git a/ydb/core/blobstorage/vdisk/synclog/blobstorage_synclogkeeper.cpp b/ydb/core/blobstorage/vdisk/synclog/blobstorage_synclogkeeper.cpp index 38d2e72cadf..69af7875837 100644 --- a/ydb/core/blobstorage/vdisk/synclog/blobstorage_synclogkeeper.cpp +++ b/ydb/core/blobstorage/vdisk/synclog/blobstorage_synclogkeeper.cpp @@ -230,6 +230,7 @@ namespace NKikimr { if (CommitterId) { ctx.Send(CommitterId, new TEvents::TEvPoisonPill()); } + KeepState.Terminate(); Die(ctx); } @@ -255,7 +256,13 @@ namespace NKikimr { } void Handle(const TEvPhantomFlagStorageGetSnapshot::TPtr& ev) { - Send(ev->Sender, new TEvPhantomFlagStorageGetSnapshotResult(KeepState.GetPhantomFlagStorageSnapshot())); + KeepState.RequestPhantomFlagStorageSnapshot(ev); + } + + void Handle(const TEvPhantomFlagStorageGetSnapshotResult::TPtr& ev) { + // This actor only requests PhantomFlagStorage snapshot on restart + // to rebuild ThresholdsStructure + KeepState.RecoverPhantomFlagStorage(std::move(ev->Get()->Snapshot)); } void Handle(const TEvLocalSyncData::TPtr& ev) { @@ -268,8 +275,13 @@ namespace NKikimr { KeepState.UpdateNeighbourSyncedLsn(ev->Get()->OrderNumber, ev->Get()->SyncedLsn); } + void Handle(TEvPhantomFlagStorageCommitData::TPtr ev) { + KeepState.UpdatePhantomFlagStorageData(std::move(ev->Get()->Data)); + } + void UpdateCounters() { KeepState.UpdateMetrics(); + KeepState.FlushPhantomFlagStorageWriteBufferIfNeeded(); Schedule(TDuration::Seconds(15), new TEvents::TEvWakeup); } @@ -287,6 +299,7 @@ namespace NKikimr { HFunc(TEvListChunks, Handle) hFunc(TEvPhantomFlagStorageFinishBuilder, Handle) hFunc(TEvPhantomFlagStorageGetSnapshot, Handle) + hFunc(TEvPhantomFlagStorageCommitData, Handle) hFunc(TEvLocalSyncData, Handle) hFunc(TEvSyncLogUpdateNeighbourSyncedLsn, Handle) cFunc(TEvents::TEvWakeup::EventType, UpdateCounters) diff --git a/ydb/core/blobstorage/vdisk/synclog/blobstorage_synclogkeeper_state.cpp b/ydb/core/blobstorage/vdisk/synclog/blobstorage_synclogkeeper_state.cpp index f9351a7fd27..cd679920046 100644 --- a/ydb/core/blobstorage/vdisk/synclog/blobstorage_synclogkeeper_state.cpp +++ b/ydb/core/blobstorage/vdisk/synclog/blobstorage_synclogkeeper_state.cpp @@ -45,6 +45,7 @@ namespace NKikimr { , NeedsInitialCommit(repaired->NeedsInitialCommit) , PhantomFlagStorageState(SlCtx) , EnablePhantomFlagStorage(SlCtx->EnablePhantomFlagStorage) + , EnablePersistentPhantomFlagStorage(SlCtx->EnablePersistentPhantomFlagStorage) , PhantomFlagStorageLimit(SlCtx->PhantomFlagStorageLimit) , SelfOrderNumber(SlCtx->VCtx->Top->GetOrderNumber(SlCtx->VCtx->ShortSelfVDisk)) { @@ -56,6 +57,19 @@ namespace NKikimr { SyncedMask.set(SelfOrderNumber, true); } + void TSyncLogKeeperState::Init(std::shared_ptr<IActorNotify> notifier, + std::shared_ptr<ILoggerCtx> loggerCtx, const TActorId& selfId) { + Notifier = std::move(notifier); + LoggerCtx = std::move(loggerCtx); + SelfId = selfId; + + if (EnablePersistentPhantomFlagStorage) { + TPhantomFlagStorageData phantomFlagStorageData = SyncLogPtr->GetPhantomFlagStorageData(); + PhantomFlagStorageState.InitializePersistent(std::move(phantomFlagStorageData), SelfId, + SlCtx->ChunkKeeperId, SyncLogPtr->GetAppendBlockSize()); + } + } + // Calculate first lsn in recovery log we must keep ui64 TSyncLogKeeperState::CalculateFirstLsnToKeep() const { // calculate first lsn for data @@ -460,11 +474,15 @@ namespace NKikimr { } void TSyncLogKeeperState::FinishPhantomFlagStorageBuilder(TPhantomFlags&& flags, TPhantomFlagThresholds&& thresholds) { - PhantomFlagStorageState.FinishBuilding(std::move(flags), std::move(thresholds), PhantomFlagStorageLimit); + PhantomFlagStorageState.FinishInitialBuilding(std::move(flags), std::move(thresholds), PhantomFlagStorageLimit); + } + + void TSyncLogKeeperState::RecoverPhantomFlagStorage(TPhantomFlagStorageSnapshot&& snapshot) { + PhantomFlagStorageState.Recover(std::move(snapshot)); } - TPhantomFlagStorageSnapshot TSyncLogKeeperState::GetPhantomFlagStorageSnapshot() const { - return PhantomFlagStorageState.GetSnapshot(); + void TSyncLogKeeperState::RequestPhantomFlagStorageSnapshot(TEvPhantomFlagStorageGetSnapshot::TPtr request) const { + PhantomFlagStorageState.RequestSnapshot(request); } void TSyncLogKeeperState::ProcessLocalSyncData(ui32 orderNumber, const TString& data) { @@ -480,7 +498,9 @@ namespace NKikimr { if (EnablePhantomFlagStorage) { PhantomFlagStorageState.UpdateSyncedMask(SyncedMask); - if (!chunks.empty() && !PhantomFlagStorageState.IsActive() && SelfId != TActorId{}) { + if (PhantomFlagStorageState.IsActive()) { + PhantomFlagStorageState.SyncLogIsCut(); + } else if (!chunks.empty() && SelfId != TActorId{}) { PhantomFlagStorageState.StartBuilding(); TActivationContext::Register(CreatePhantomFlagStorageBuilderActor(SlCtx, SelfId, snapshot)); } @@ -498,5 +518,17 @@ namespace NKikimr { SlCtx->PhantomFlagStorageGroup.SyncedMask() = SyncedMask.to_ullong(); } + void TSyncLogKeeperState::UpdatePhantomFlagStorageData(std::optional<TPhantomFlagStorageData>&& data) { + SyncLogPtr->UpdatePhantomFlagStorageData(std::move(data)); + } + + void TSyncLogKeeperState::FlushPhantomFlagStorageWriteBufferIfNeeded() { + PhantomFlagStorageState.FlushWriteBufferIfNeeded(); + } + + void TSyncLogKeeperState::Terminate() { + PhantomFlagStorageState.Terminate(); + } + } // NSyncLog } // NKikimr diff --git a/ydb/core/blobstorage/vdisk/synclog/blobstorage_synclogkeeper_state.h b/ydb/core/blobstorage/vdisk/synclog/blobstorage_synclogkeeper_state.h index f538111a84f..f9a0f829a06 100644 --- a/ydb/core/blobstorage/vdisk/synclog/blobstorage_synclogkeeper_state.h +++ b/ydb/core/blobstorage/vdisk/synclog/blobstorage_synclogkeeper_state.h @@ -42,11 +42,7 @@ namespace NKikimr { ui64 syncLogMaxEntryPointSize); void Init(std::shared_ptr<IActorNotify> notifier, std::shared_ptr<ILoggerCtx> loggerCtx, - const TActorId& selfId) { - Notifier = std::move(notifier); - LoggerCtx = std::move(loggerCtx); - SelfId = selfId; - } + const TActorId& selfId); bool HasDelayedActions() const { return DelayedActions.HasActions(); } bool GetDeleteChunkAndClear() { return std::exchange(DelayedActions.DeleteChunk, false); } @@ -92,15 +88,20 @@ namespace NKikimr { // Add flags from cut sync log snapshot void FinishPhantomFlagStorageBuilder(TPhantomFlags&& flags, TPhantomFlagThresholds&& thresholds); - TPhantomFlagStorageSnapshot GetPhantomFlagStorageSnapshot() const; + void RecoverPhantomFlagStorage(TPhantomFlagStorageSnapshot&& snapshot); + void RequestPhantomFlagStorageSnapshot(TEvPhantomFlagStorageGetSnapshot::TPtr request) const; + void UpdatePhantomFlagStorageData(std::optional<TPhantomFlagStorageData>&& data); void ProcessLocalSyncData(ui32 orderNumber, const TString& data); void UpdateMetrics(); + void FlushPhantomFlagStorageWriteBufferIfNeeded(); TVector<ui32> GetChunksToForget() { return std::exchange(ChunksToForget, {}); } + void Terminate(); + private: // VDisk Context TIntrusivePtr<TSyncLogCtx> SlCtx; @@ -138,6 +139,7 @@ namespace NKikimr { // phantom flag storage TPhantomFlagStorageState PhantomFlagStorageState; TMemorizableControlWrapper EnablePhantomFlagStorage; + bool EnablePersistentPhantomFlagStorage; TMemorizableControlWrapper PhantomFlagStorageLimit; ui32 SelfOrderNumber; diff --git a/ydb/core/blobstorage/vdisk/synclog/blobstorage_synclogkeeper_ut.cpp b/ydb/core/blobstorage/vdisk/synclog/blobstorage_synclogkeeper_ut.cpp index a03bc4970d9..97b136c2c48 100644 --- a/ydb/core/blobstorage/vdisk/synclog/blobstorage_synclogkeeper_ut.cpp +++ b/ydb/core/blobstorage/vdisk/synclog/blobstorage_synclogkeeper_ut.cpp @@ -110,6 +110,7 @@ namespace NKikimr { TActorId{}, TActorId{}, TActorId{}, + TActorId{}, syncLogMaxDiskAmount, syncLogMaxEntryPointSize, syncLogMaxMemAmount, @@ -117,6 +118,7 @@ namespace NKikimr { nullptr, false, TControlWrapper(0, 0, 1), + false, TControlWrapper(20'000'000, 1, 100'000'000'000)); State = std::make_unique<TSyncLogKeeperState>(slCtx, std::move(repaired), syncLogMaxMemAmount, syncLogMaxDiskAmount, diff --git a/ydb/core/blobstorage/vdisk/synclog/phantom_flag_storage/phantom_flag_storage_data.cpp b/ydb/core/blobstorage/vdisk/synclog/phantom_flag_storage/phantom_flag_storage_data.cpp new file mode 100644 index 00000000000..5d2fa7418e4 --- /dev/null +++ b/ydb/core/blobstorage/vdisk/synclog/phantom_flag_storage/phantom_flag_storage_data.cpp @@ -0,0 +1,161 @@ +#include "phantom_flag_storage_data.h" + +#include <util/generic/overloaded.h> + +namespace NKikimr::NSyncLog { + +TPhantomFlagStorageItem TPhantomFlagStorageItem::CreateSkip(ui32 skipSize) { + TPhantomFlagStorageItem res; + res.Data.emplace<TSkip>(skipSize); + return res; +} + +TPhantomFlagStorageItem TPhantomFlagStorageItem::CreateFlag(const TLogoBlobRec* blobRec) { + TPhantomFlagStorageItem res; + res.Data.emplace<TFlag>(*blobRec); + return res; +} + +TPhantomFlagStorageItem TPhantomFlagStorageItem::CreateThreshold(ui32 orderNumber, + ui64 tabletId, ui8 channel, ui32 generation, ui32 step) { + TPhantomFlagStorageItem res; + res.Data.emplace<TThreshold>(tabletId, channel, generation, step, orderNumber); + return res; +} + +TPhantomFlagStorageItem TPhantomFlagStorageItem::CreateThreshold(ui32 orderNumber, + const TLogoBlobID& blobId) { + TPhantomFlagStorageItem res; + res.Data.emplace<TThreshold>(blobId.TabletID(), blobId.Channel(), blobId.Generation(), + blobId.Step(), orderNumber); + return res; +} + +EPhantomFlagStorageItem TPhantomFlagStorageItem::GetType() const { + EPhantomFlagStorageItem res = EPhantomFlagStorageItem::Skip; + std::visit(TOverloaded{ + [&](const std::monostate&) {}, + [&](const TSkip&) {}, + [&](const TFlag&) { res = EPhantomFlagStorageItem::Flag; }, + [&](const TThreshold&) { res = EPhantomFlagStorageItem::Threshold; } + }, Data); + return res; +} + +TPhantomFlagStorageItem::TSkip TPhantomFlagStorageItem::GetSkip() const { + return std::get<TSkip>(Data); +} + +TPhantomFlagStorageItem::TThreshold TPhantomFlagStorageItem::GetThreshold() const { + return std::get<TThreshold>(Data); +} + +TPhantomFlagStorageItem::TFlag TPhantomFlagStorageItem::GetFlag() const { + return std::get<TFlag>(Data); +} + +void TPhantomFlagStorageItem::Serialize(TString* buffer) const { + std::visit(TOverloaded{ + [&](const std::monostate&) {}, + [&](const TSkip& skip) { + constexpr static EPhantomFlagStorageItem type = EPhantomFlagStorageItem::Skip; + if (skip.Size >= sizeof(type) + sizeof(skip.Size)) { + buffer->append(reinterpret_cast<const char*>(&type), sizeof(type)); + buffer->append(reinterpret_cast<const char*>(&skip), sizeof(skip)); + } else { + TString zeros(skip.Size, '\0'); + buffer->append(zeros.data(), skip.Size); + } + }, + [&](const TFlag& flag) { + constexpr static EPhantomFlagStorageItem type = EPhantomFlagStorageItem::Flag; + buffer->append(reinterpret_cast<const char*>(&type), sizeof(type)); + buffer->append(reinterpret_cast<const char*>(&flag), sizeof(flag)); + }, + [&](const TThreshold& threshold) { + constexpr static EPhantomFlagStorageItem type = EPhantomFlagStorageItem::Threshold; + buffer->append(reinterpret_cast<const char*>(&type), sizeof(type)); + buffer->append(reinterpret_cast<const char*>(&threshold), sizeof(threshold)); + }, + }, Data); +} + +TPhantomFlagStorageItem TPhantomFlagStorageItem::DeserializeFromRaw(const char* data) { + const EPhantomFlagStorageItem type = *reinterpret_cast<const EPhantomFlagStorageItem*>(data); + data += sizeof(type); + switch (type) { + case EPhantomFlagStorageItem::SkipOneByte: { + return TPhantomFlagStorageItem::CreateSkip(1); + } + case EPhantomFlagStorageItem::Flag: { + const TLogoBlobRec rec = ReadUnaligned<TLogoBlobRec>(data); + return TPhantomFlagStorageItem::CreateFlag(&rec); + } + case EPhantomFlagStorageItem::Threshold: { + const TThreshold threshold = ReadUnaligned<TThreshold>(data); + return TPhantomFlagStorageItem::CreateThreshold(threshold.OrderNumber, threshold.TabletId, + threshold.Channel, threshold.Generation, threshold.Step); + } + case EPhantomFlagStorageItem::Skip: + default: { + const ui32 size = ReadUnaligned<ui32>(data); + return TPhantomFlagStorageItem::CreateSkip(size); + } + } +} + +ui32 TPhantomFlagStorageItem::SerializedSize() const { + ui32 res = 0; + std::visit(TOverloaded{ + [&](const std::monostate&) { }, + [&](const TSkip& skip) { res = skip.Size; }, + [&](const TFlag&) { res = sizeof(EPhantomFlagStorageItem) + sizeof(TFlag); }, + [&](const TThreshold&) { res = sizeof(EPhantomFlagStorageItem) + sizeof(TThreshold); }, + }, Data); + return res; +} + +void TPhantomFlagStorageItem::AlignWriteBlock(TString* buffer, ui32 appendBlockSize, ui32 sizeLimit) { + Y_ABORT_UNLESS(buffer); + ui32 bufferSize = buffer->size(); + Y_VERIFY_S(bufferSize <= sizeLimit, "BufferSize# " << bufferSize << " SizeLimit# " << sizeLimit); + if (buffer->size() % appendBlockSize == 0) { + // write block is already aligned + return; + } + + ui32 fillSize = std::min(sizeLimit - bufferSize, appendBlockSize - bufferSize % appendBlockSize); + ui32 skipRecSize = sizeof(EPhantomFlagStorageItem) + sizeof(TSkip); + if (fillSize >= skipRecSize) { + // add one Skip record + TPhantomFlagStorageItem skip = CreateSkip(fillSize); + skip.Serialize(buffer); + fillSize -= skipRecSize; + } + + // fill the rest with zeros + if (fillSize > 0) { + TString zeros(fillSize, '\0'); + buffer->append(zeros.data(), fillSize); + } +} + +void TPhantomFlagStorageData::Deserialize(const TPhantomFlagStorageDataProto& proto) { + ChunkSize = proto.GetChunkSize(); + for (const auto& chunk : proto.GetChunks()) { + Chunks[chunk.GetChunkIdx()] = TChunk{ + .DataSize = chunk.GetDataSize(), + }; + } +} + +void TPhantomFlagStorageData::Serialize(TPhantomFlagStorageDataProto* proto) const { + for (const auto& [chunkIdx, chunk] : Chunks) { + auto* chunkProto = proto->AddChunks(); + chunkProto->SetChunkIdx(chunkIdx); + chunkProto->SetDataSize(chunk.DataSize); + } + proto->SetChunkSize(ChunkSize); +} + +} diff --git a/ydb/core/blobstorage/vdisk/synclog/phantom_flag_storage/phantom_flag_storage_data.h b/ydb/core/blobstorage/vdisk/synclog/phantom_flag_storage/phantom_flag_storage_data.h new file mode 100644 index 00000000000..b23ca228fe1 --- /dev/null +++ b/ydb/core/blobstorage/vdisk/synclog/phantom_flag_storage/phantom_flag_storage_data.h @@ -0,0 +1,73 @@ +#pragma once + +#include "phantom_flag_thresholds.h" + +#include <ydb/core/blobstorage/vdisk/synclog/blobstorage_synclogformat.h> + +#include <ydb/core/protos/blobstorage_vdisk_internal.pb.h> + +namespace NKikimr::NSyncLog { + +using TPhantomFlagStorageDataProto = NKikimrVDiskData::TPhantomFlagStorageData; + +enum EPhantomFlagStorageItem : ui8 { + SkipOneByte = 0b00, + Flag = 0b01, + Threshold = 0b10, + Skip = 0b11, + // All types other than Flag and Threshold must contain ui32 field + // equal to the total serialized size of structure + // (including Type and Size) fields + // in the beginning of byte serialization, for compatibility reasons: + // when attempting to read unsupported field it fill be interpreted + // as Skip{ Size } and skipped + // SkipOneByte records are used to skip alignment blocks +}; + +class TPhantomFlagStorageItem { +public: + struct TSkip { + ui32 Size; + }; + + struct TFlag { + TLogoBlobRec Record; + }; + + using TThreshold = TPhantomFlagThresholds::TThreshold; + +public: + static TPhantomFlagStorageItem CreateSkip(ui32 skipSize); + static TPhantomFlagStorageItem CreateFlag(const TLogoBlobRec* blobRec); + static TPhantomFlagStorageItem CreateThreshold(ui32 orderNumber, ui64 tabletId, ui8 channel, + ui32 generation, ui32 step); + static TPhantomFlagStorageItem CreateThreshold(ui32 orderNumber, const TLogoBlobID& blobId); + + static TPhantomFlagStorageItem DeserializeFromRaw(const char* data); + void Serialize(TString* buffer) const; + ui32 SerializedSize() const; + static void AlignWriteBlock(TString* buffer, ui32 appendBlockSize, ui32 sizeLimit); + + EPhantomFlagStorageItem GetType() const; + + TSkip GetSkip() const; + TThreshold GetThreshold() const; + TFlag GetFlag() const; + +private: + std::variant<TSkip, TFlag, TThreshold> Data; +}; + +struct TPhantomFlagStorageData { + struct TChunk { + ui32 DataSize; + }; + + void Deserialize(const TPhantomFlagStorageDataProto& proto); + void Serialize(TPhantomFlagStorageDataProto* proto) const; + + std::unordered_map<ui32, TChunk> Chunks; + ui32 ChunkSize; +}; + +} // namespace NKikimr::NSyncLog diff --git a/ydb/core/blobstorage/vdisk/synclog/phantom_flag_storage/phantom_flag_storage_processor.cpp b/ydb/core/blobstorage/vdisk/synclog/phantom_flag_storage/phantom_flag_storage_processor.cpp new file mode 100644 index 00000000000..ef3487eb3df --- /dev/null +++ b/ydb/core/blobstorage/vdisk/synclog/phantom_flag_storage/phantom_flag_storage_processor.cpp @@ -0,0 +1,421 @@ +#include "phantom_flags.h" +#include "phantom_flag_thresholds.h" +#include "phantom_flag_storage_processor.h" + +#include <util/generic/overloaded.h> + +#include <ydb/library/actors/core/actor_bootstrapped.h> +#include <ydb/core/blobstorage/pdisk/blobstorage_pdisk.h> +#include <ydb/core/blobstorage/vdisk/synclog/blobstorage_synclog_private_events.h> +#include <ydb/core/blobstorage/vdisk/chunk_keeper/chunk_keeper_events.h> +#include <ydb/core/util/stlog.h> + +#include <unordered_set> + +namespace NKikimr::NSyncLog { + +//////////////////////////////////////////////////////////////////////////// +// TPhantomFlagStorageProcessor +//////////////////////////////////////////////////////////////////////////// +class TPhantomFlagStorageProcessor : public TActorBootstrapped<TPhantomFlagStorageProcessor> { +public: + TPhantomFlagStorageProcessor(TPhantomFlagStorageData&& data, + TPhantomFlagStorageProcessorContext&& ctx) + : Ctx(std::move(ctx)) + , Data(std::move(data)) + , PendingRead(Ctx.SyncLogCtx->VCtx->Top->GType) + {} + + static constexpr NKikimrServices::TActivity::EType ActorActivityType() { + return NKikimrServices::TActivity::BS_PHANTOM_FLAG_STORAGE_WRITER; + } + + void Bootstrap() { + STLOG(PRI_NOTICE, BS_PHANTOM_FLAG_PROCESSOR, BSPFP01, VDISKP(Ctx.SyncLogCtx->VCtx, + "Bootstrap PhantomFlagStorageProcessor"), + (ChunkCount, Data.Chunks.size()), + (ChunkSize, Data.ChunkSize)); + Send(Ctx.ChunkKeeperId, new TEvChunkKeeperDiscover(SubsystemId)); + RequestInFlight = true; + Become(&TThis::StateInit); + } + +private: + ////////////////////////////////////////////////////////////////////// + // State functions + ////////////////////////////////////////////////////////////////////// + STRICT_STFUNC(StateInit, + hFunc(TEvPhantomFlagStorageWriteItems, Handle) + hFunc(TEvPhantomFlagStorageDrop, Handle) + hFunc(TEvChunkKeeperDiscoverResult, HandleInit) + cFunc(TEvents::TEvPoisonPill::EventType, PassAway) + ) + + STRICT_STFUNC(StateWork, + hFunc(TEvPhantomFlagStorageDrop, Handle) + hFunc(TEvChunkKeeperAllocateResult, Handle) + hFunc(TEvChunkKeeperFreeResult, Handle) + hFunc(NPDisk::TEvChunkWriteResult, Handle) + hFunc(NPDisk::TEvChunkReadResult, Handle) + hFunc(TEvPhantomFlagStorageWriteItems, Handle) + hFunc(TEvPhantomFlagStorageGetSnapshot, Handle) + cFunc(TEvents::TEvPoisonPill::EventType, PassAway) + ) + + PDISK_TERMINATE_STATE_FUNC_DEF; + + ////////////////////////////////////////////////////////////////////// + // Handlers + ////////////////////////////////////////////////////////////////////// + void HandleInit(const TEvChunkKeeperDiscoverResult::TPtr& ev) { + STLOG(PRI_NOTICE, BS_PHANTOM_FLAG_PROCESSOR, BSPFP02, VDISKP(Ctx.SyncLogCtx->VCtx, + "Handle TEvChunkKeeperDiscoverResult"), + (Event, ev->Get()->ToString())); + RequestInFlight = false; + switch (ev->Get()->Status) { + case NKikimrProto::OK: { + std::unordered_set<ui32> discoveredChunks; + for (const auto& [chunkIdx, chunk] : ev->Get()->Chunks) { + discoveredChunks.insert(chunkIdx); + if (!Data.Chunks.contains(chunkIdx)) { + // chunk was allocated but wasn't committed to SyncLog + // entryPoint before crash + Data.Chunks[chunkIdx] = TPhantomFlagStorageData::TChunk{ + .DataSize = 0, + }; + } + } + + std::unordered_map<ui32, TPhantomFlagStorageData::TChunk> chunks; + for (const auto& [chunkIdx, chunk] : Data.Chunks) { + if (discoveredChunks.contains(chunkIdx)) { + chunks[chunkIdx] = chunk; + } else { + EnqueueChunkDeletion(chunkIdx); + } + } + std::exchange(Data.Chunks, chunks); // filter out deallocated chunks + SelectTailChunk(); + break; + } + default: { + // ChunkKeeper is disabled, unable to manage chunks, terminate this actor + PassAway(); + return; + } + } + + Become(&TThis::StateWork); + ProcessQueues(); + } + + void Handle(TEvPhantomFlagStorageWriteItems::TPtr ev) { + STLOG(PRI_DEBUG, BS_PHANTOM_FLAG_PROCESSOR, BSPFP03, VDISKP(Ctx.SyncLogCtx->VCtx, + "Handle TEvPhantomFlagStorageWriteItems"), + (ItemCount, ev->Get()->Items.size()), + (WriteQueueSize, WriteQueue.size())); + std::ranges::move(ev->Get()->Items.begin(), ev->Get()->Items.end(), + std::back_inserter(WriteQueue)); + ProcessQueues(); + } + + void Handle(const TEvChunkKeeperAllocateResult::TPtr& ev) { + STLOG(PRI_NOTICE, BS_PHANTOM_FLAG_PROCESSOR, BSPFP04, VDISKP(Ctx.SyncLogCtx->VCtx, + "Handle TEvChunkKeeperAllocateResult"), + (Event, ev->Get()->ToString())); + RequestInFlight = false; + switch (ev->Get()->Status) { + case NKikimrProto::OK: { + const ui32 chunkIdx = *ev->Get()->ChunkIdx; + Data.Chunks[chunkIdx] = TPhantomFlagStorageData::TChunk{ + .DataSize = 0, + }; + TailChunkIdx = chunkIdx; + TailAvailableSize = Data.ChunkSize; + break; + } + default: + // retry + AllocateNewChunk(); + return; + } + ProcessQueues(); + } + + void Handle(const TEvChunkKeeperFreeResult::TPtr& ev) { + STLOG(PRI_NOTICE, BS_PHANTOM_FLAG_PROCESSOR, BSPFP05, VDISKP(Ctx.SyncLogCtx->VCtx, + "Handle TEvChunkKeeperFreeResult"), + (Event, ev->Get()->ToString())); + RequestInFlight = false; + Data.Chunks.erase(ev->Get()->ChunkIdx); + CommitState(); + ProcessQueues(); + } + + void Handle(const NPDisk::TEvChunkWriteResult::TPtr& ev) { + STLOG(PRI_DEBUG, BS_PHANTOM_FLAG_PROCESSOR, BSPFP06, VDISKP(Ctx.SyncLogCtx->VCtx, + "Handle TEvChunkWriteResult"), + (Event, ev->Get()->ToString())); + CHECK_PDISK_RESPONSE(Ctx.SyncLogCtx->VCtx, ev, TActivationContext::AsActorContext()); + RequestInFlight = false; + ui32 chunkIdx = ev->Get()->ChunkIdx; + if (chunkIdx == TailChunkIdx) { + TailAvailableSize -= PendingWriteSize; + } + Data.Chunks[chunkIdx].DataSize += std::exchange(PendingWriteSize, 0); + CommitState(); + ProcessQueues(); + } + + void Handle(const NPDisk::TEvChunkReadResult::TPtr& ev) { + STLOG(PRI_DEBUG, BS_PHANTOM_FLAG_PROCESSOR, BSPFP07, VDISKP(Ctx.SyncLogCtx->VCtx, + "Handle TEvChunkReadResult"), + (Event, ev->Get()->ToString())); + CHECK_PDISK_RESPONSE(Ctx.SyncLogCtx->VCtx, ev, TActivationContext::AsActorContext()); + ui32 chunkIdx = ev->Get()->ChunkIdx; + PendingRead.ChunksToRead.erase(chunkIdx); + ProcessReadBuffer(ev->Get()->Data, chunkIdx); + if (PendingRead.ChunksToRead.empty()) { + FinalizeRead(); + } + ProcessQueues(); + } + + void Handle(const TEvPhantomFlagStorageGetSnapshot::TPtr& ev) { + STLOG(PRI_NOTICE, BS_PHANTOM_FLAG_PROCESSOR, BSPFP08, VDISKP(Ctx.SyncLogCtx->VCtx, + "Handle TEvPhantomFlagStorageGetSnapshot")); + EnqueueGetSnapshot(ev->Sender); + ProcessQueues(); + } + + void Handle(const TEvPhantomFlagStorageDrop::TPtr&) { + STLOG(PRI_NOTICE, BS_PHANTOM_FLAG_PROCESSOR, BSPFP11, VDISKP(Ctx.SyncLogCtx->VCtx, + "Handle TEvPhantomFlagStorageDrop"), + (ChunkCount, Data.Chunks.size())); + TailChunkIdx = std::nullopt; + TailAvailableSize = 0; + WriteQueue.clear(); + PendingWrite.clear(); + PendingWriteSize = 0; + for (const auto& [chunkIdx, chunk] : Data.Chunks) { + EnqueueChunkDeletion(chunkIdx); + } + ProcessQueues(); + } + + void HandlePoison(const TEvents::TEvPoisonPill::TPtr&, const TActorContext&) { + PassAway(); + } + + ////////////////////////////////////////////////////////////////////// + // Other methods + ////////////////////////////////////////////////////////////////////// + + void SelectTailChunk() { + TailChunkIdx.reset(); + TailAvailableSize = 0; + for (const auto& [chunkIdx, chunk] : Data.Chunks) { + if (Data.ChunkSize - chunk.DataSize > TailAvailableSize) { + TailChunkIdx.emplace(chunkIdx); + TailAvailableSize = Data.ChunkSize - chunk.DataSize; + } + } + } + + void EnqueueChunkDeletion(ui32 chunkIdx) { + RequestQueue.emplace_back(TDeleteChunk{chunkIdx}); + } + + void EnqueueGetSnapshot(TActorId requester) { + RequestQueue.emplace_front(TGetSnapshot{requester}); + } + + void ProcessQueues() { + while (!RequestInFlight && !RequestQueue.empty()) { + TRequest request = RequestQueue.front(); + RequestQueue.pop_front(); + RequestInFlight = true; + std::visit(TOverloaded{ + [&](const std::monostate&) {}, + [&](const TGetSnapshot& req) { GetSnapshot(req.Requester); }, + [&](const TDeleteChunk& req) { DeleteChunk(req.ChunkIdx); }, + }, request); + return; + } + ProcessWriteQueue(); + } + + void ProcessWriteQueue() { + if (RequestInFlight) { + return; + } + + if (!WriteQueue.empty()) { + ui32 nextItemSize = WriteQueue.front().SerializedSize(); + ui32 minRequiredSize = PendingWrite.size() + nextItemSize + Ctx.AppendBlockSize; + + if (TailAvailableSize < minRequiredSize + Ctx.AppendBlockSize) { + AllocateNewChunk(); + return; + } + } + + while (!WriteQueue.empty()) { + const TPhantomFlagStorageItem& item = WriteQueue.front(); + if (TailAvailableSize < PendingWrite.size() + item.SerializedSize() + Ctx.AppendBlockSize) { + break; + } + item.Serialize(&PendingWrite); + WriteQueue.pop_front(); + } + + if (!PendingWrite.empty()) { + IssueWrite(); + } + } + + void DeleteChunk(ui32 chunkIdx) { + if (Data.Chunks.contains(chunkIdx)) { + Send(Ctx.ChunkKeeperId, new TEvChunkKeeperFree(chunkIdx, SubsystemId)); + } else { + RequestInFlight = false; + } + } + + void GetSnapshot(TActorId requester) { + RequestInFlight = true; + PendingRead.Reset(requester); + for (const auto [chunkIdx, chunk] : Data.Chunks) { + if (chunk.DataSize > 0) { + Send(Ctx.SyncLogCtx->PDiskCtx->PDiskId, + new NPDisk::TEvChunkRead(Ctx.SyncLogCtx->PDiskCtx->Dsk->Owner, Ctx.SyncLogCtx->PDiskCtx->Dsk->OwnerRound, + chunkIdx, 0, chunk.DataSize, NPriWrite::SyncLog, nullptr)); + PendingRead.ChunksToRead.insert(chunkIdx); + } + } + STLOG(PRI_NOTICE, BS_PHANTOM_FLAG_PROCESSOR, BSPFP10, VDISKP(Ctx.SyncLogCtx->VCtx, + "Start reading snapshot"), + (ChunkToReadCount, PendingRead.ChunksToRead.size())); + if (PendingRead.ChunksToRead.empty()) { + FinalizeRead(); + } + } + + void FinalizeRead() { + RequestInFlight = false; + STLOG(PRI_NOTICE, BS_PHANTOM_FLAG_PROCESSOR, BSPFP09, VDISKP(Ctx.SyncLogCtx->VCtx, + "Send Snapshot"), + (FlagCount, PendingRead.Flags.size())); + Send(PendingRead.Requester, new TEvPhantomFlagStorageGetSnapshotResult( + TPhantomFlagStorageSnapshot(std::move(PendingRead.Flags), + std::move(PendingRead.Thresholds)))); + } + + void AllocateNewChunk() { + RequestInFlight = true; + Send(Ctx.ChunkKeeperId, new TEvChunkKeeperAllocate(SubsystemId)); + } + + void CommitState() { + Send(Ctx.SyncLogKeeperId, new TEvPhantomFlagStorageCommitData(Data)); + } + + void IssueWrite() { + Y_ABORT_UNLESS(TailChunkIdx); + RequestInFlight = true; + ui32 offset = Data.Chunks[*TailChunkIdx].DataSize; + Y_ABORT_UNLESS(offset <= Data.ChunkSize); + ui32 maxSize = Data.ChunkSize - offset; + TPhantomFlagStorageItem::AlignWriteBlock(&PendingWrite, Ctx.AppendBlockSize, maxSize); + PendingWriteSize = PendingWrite.size(); + auto parts = MakeIntrusive<NPDisk::TEvChunkWrite::TAlignedParts>(std::move(PendingWrite)); + Send(Ctx.SyncLogCtx->PDiskCtx->PDiskId, + new NPDisk::TEvChunkWrite(Ctx.SyncLogCtx->PDiskCtx->Dsk->Owner, Ctx.SyncLogCtx->PDiskCtx->Dsk->OwnerRound, + *TailChunkIdx, offset, parts, nullptr, true, NPriWrite::SyncLog)); + } + + void ProcessReadBuffer(class TBufferWithGaps& bufferWithGaps, ui32 chunkIdx) { + if (!bufferWithGaps.IsReadable()) { + return; + } + TRcBuf buffer = bufferWithGaps.ToString(); + ui64 offset = 0; + while (offset < Data.Chunks[chunkIdx].DataSize) { + TPhantomFlagStorageItem item = TPhantomFlagStorageItem::DeserializeFromRaw( + buffer.Data() + offset); + switch (item.GetType()) { + case EPhantomFlagStorageItem::Flag: + PendingRead.Flags.push_back(item.GetFlag().Record); + break; + case EPhantomFlagStorageItem::Threshold: { + TPhantomFlagStorageItem::TThreshold threshold = item.GetThreshold(); + PendingRead.Thresholds.AddBlob(threshold.OrderNumber, threshold.TabletId, + threshold.Channel, threshold.Generation, threshold.Step); + break; + } + case EPhantomFlagStorageItem::Skip: + case EPhantomFlagStorageItem::SkipOneByte: + break; + } + + offset += item.SerializedSize(); + if (item.SerializedSize() == 0) { + return; + } + } + } + +private: + struct TGetSnapshot { + TActorId Requester; + }; + + struct TDeleteChunk { + ui32 ChunkIdx; + }; + + using TRequest = std::variant<TGetSnapshot, TDeleteChunk>; + + struct TReaderInfo { + TReaderInfo(const TBlobStorageGroupType& gtype) + : Thresholds(gtype) + {} + + std::unordered_set<ui32> ChunksToRead; + TPhantomFlags Flags; + TPhantomFlagThresholds Thresholds; + TActorId Requester = TActorId{}; + + void Reset(TActorId requester) { + ChunksToRead.clear(); + Flags.clear(); + Thresholds.Clear(); + Requester = requester; + } + }; + +private: + static constexpr NKikimrVDiskData::TChunkKeeperEntryPoint::ESubsystem SubsystemId = + NKikimrVDiskData::TChunkKeeperEntryPoint::PhantomFlagStorage; + + const TPhantomFlagStorageProcessorContext Ctx; + TPhantomFlagStorageData Data; + + std::deque<TRequest> RequestQueue; + bool RequestInFlight = false; + + std::deque<TPhantomFlagStorageItem> WriteQueue; + TString PendingWrite; + ui32 PendingWriteSize = 0; + TReaderInfo PendingRead; + + std::optional<ui32> TailChunkIdx; + ui64 TailAvailableSize = 0; +}; + +NActors::IActor* CreatePhantomFlagStorageProcessor(TPhantomFlagStorageData&& data, + TPhantomFlagStorageProcessorContext&& ctx) { + return new TPhantomFlagStorageProcessor(std::move(data), std::move(ctx)); +} + +} // namespace NKikimr::NSyncLog diff --git a/ydb/core/blobstorage/vdisk/synclog/phantom_flag_storage/phantom_flag_storage_processor.h b/ydb/core/blobstorage/vdisk/synclog/phantom_flag_storage/phantom_flag_storage_processor.h new file mode 100644 index 00000000000..894412d8572 --- /dev/null +++ b/ydb/core/blobstorage/vdisk/synclog/phantom_flag_storage/phantom_flag_storage_processor.h @@ -0,0 +1,25 @@ +#pragma once + +#include "phantom_flag_storage_data.h" + +#include <ydb/core/blobstorage/vdisk/common/vdisk_pdiskctx.h> +#include <ydb/library/actors/core/actor.h> + +namespace NKikimr::NSyncLog { + +struct TPhantomFlagStorageProcessorContext { + const TIntrusivePtr<TSyncLogCtx> SyncLogCtx; + const NActors::TActorId SyncLogKeeperId; + const NActors::TActorId ChunkKeeperId; + const ui32 AppendBlockSize; +}; + +//////////////////////////////////////////////////////////////////////////// +// PHANTOM FLAG STORAGE PROCESSOR CREATOR +// Creates the actor that writes and reads PhantomFlagStorage data +// (flags and thresholds) on disk and manages chunks via the ChunkKeeper +//////////////////////////////////////////////////////////////////////////// +NActors::IActor* CreatePhantomFlagStorageProcessor(TPhantomFlagStorageData&& data, + TPhantomFlagStorageProcessorContext&& ctx); + +} // namespace NKikimr::NSyncLog diff --git a/ydb/core/blobstorage/vdisk/synclog/phantom_flag_storage/phantom_flag_storage_snapshot.cpp b/ydb/core/blobstorage/vdisk/synclog/phantom_flag_storage/phantom_flag_storage_snapshot.cpp index 8cfe33e6e24..43db5654c8a 100644 --- a/ydb/core/blobstorage/vdisk/synclog/phantom_flag_storage/phantom_flag_storage_snapshot.cpp +++ b/ydb/core/blobstorage/vdisk/synclog/phantom_flag_storage/phantom_flag_storage_snapshot.cpp @@ -4,8 +4,16 @@ namespace NKikimr { namespace NSyncLog { -TPhantomFlagStorageSnapshot::TPhantomFlagStorageSnapshot(const TPhantomFlags& flags) +TPhantomFlagStorageSnapshot::TPhantomFlagStorageSnapshot(TPhantomFlags&& flags, + TPhantomFlagThresholds&& thresholds) + : Flags(std::move(flags)) + , Thresholds(std::move(thresholds)) +{} + +TPhantomFlagStorageSnapshot::TPhantomFlagStorageSnapshot(const TPhantomFlags& flags, + const TPhantomFlagThresholds& thresholds) : Flags(flags) + , Thresholds(thresholds) {} } // namespace NSyncLog diff --git a/ydb/core/blobstorage/vdisk/synclog/phantom_flag_storage/phantom_flag_storage_snapshot.h b/ydb/core/blobstorage/vdisk/synclog/phantom_flag_storage/phantom_flag_storage_snapshot.h index f33939a8779..2eae31af9bc 100644 --- a/ydb/core/blobstorage/vdisk/synclog/phantom_flag_storage/phantom_flag_storage_snapshot.h +++ b/ydb/core/blobstorage/vdisk/synclog/phantom_flag_storage/phantom_flag_storage_snapshot.h @@ -7,10 +7,15 @@ namespace NKikimr { namespace NSyncLog { -// TODO: include thresholds in snapshot struct TPhantomFlagStorageSnapshot { - TPhantomFlagStorageSnapshot(const TPhantomFlags& flags); + TPhantomFlagStorageSnapshot(TPhantomFlags&& flags, + TPhantomFlagThresholds&& thresholds); + + TPhantomFlagStorageSnapshot(const TPhantomFlags& flags, + const TPhantomFlagThresholds& thresholds); + TPhantomFlags Flags; + TPhantomFlagThresholds Thresholds; }; } // namespace NSyncLog diff --git a/ydb/core/blobstorage/vdisk/synclog/phantom_flag_storage/phantom_flag_storage_state.cpp b/ydb/core/blobstorage/vdisk/synclog/phantom_flag_storage/phantom_flag_storage_state.cpp index efcb7cfaab4..6cf7f0177ba 100644 --- a/ydb/core/blobstorage/vdisk/synclog/phantom_flag_storage/phantom_flag_storage_state.cpp +++ b/ydb/core/blobstorage/vdisk/synclog/phantom_flag_storage/phantom_flag_storage_state.cpp @@ -1,4 +1,5 @@ #include "phantom_flag_storage_state.h" +#include "phantom_flag_storage_processor.h" #include <ydb/core/util/stlog.h> #include <ydb/core/blobstorage/vdisk/synclog/blobstorage_synclogmsgreader.h> @@ -11,9 +12,28 @@ TPhantomFlagStorageState::TPhantomFlagStorageState(TIntrusivePtr<TSyncLogCtx> sl : SlCtx(slCtx) , GType(slCtx->VCtx->Top->GType) , Thresholds(GType) - , MaxFlagsStoredCount(0) {} +void TPhantomFlagStorageState::InitializePersistent(TPhantomFlagStorageData&& data, + TActorId syncLogKeeperId, TActorId chunkKeeperId, ui32 appendBlockSize) { + IsPersistent = true; + + NActors::IActor* processorActor = CreatePhantomFlagStorageProcessor(std::move(data), + TPhantomFlagStorageProcessorContext{ + .SyncLogCtx = SlCtx, + .SyncLogKeeperId = syncLogKeeperId, + .ChunkKeeperId = chunkKeeperId, + .AppendBlockSize = appendBlockSize, + }); + ProcessorId = TActivationContext::Register(processorActor); + + if (!data.Chunks.empty()) { + Active = true; + Building = true; + TActivationContext::Send(new IEventHandle(ProcessorId, syncLogKeeperId, new TEvPhantomFlagStorageGetSnapshot)); + } +} + void TPhantomFlagStorageState::StartBuilding() { if (GType.BlobSubgroupSize() > MaxExpectedDisksInGroup) { STLOG(PRI_ERROR, BS_PHANTOM_FLAG_STORAGE, BSPFS01, @@ -41,7 +61,11 @@ void TPhantomFlagStorageState::ProcessBlobRecordFromSyncLog(const TLogoBlobRec* (Building, Building), (SyncedMask, SyncedMask.to_ullong()), (Thresholds, Thresholds.ToString())); - AddFlag(*blobRec); + if (IsPersistent) { + AddItemToWriteBuffer(TPhantomFlagStorageItem::CreateFlag(blobRec)); + } else { + AddFlag(*blobRec); + } } } @@ -58,46 +82,75 @@ void TPhantomFlagStorageState::ProcessBarrierRecordFromNeighbour(ui32 orderNumbe } } -void TPhantomFlagStorageState::FinishBuilding(TPhantomFlags&& flags, TPhantomFlagThresholds&& thresholds, +void TPhantomFlagStorageState::FinishInitialBuilding(TPhantomFlags&& flags, TPhantomFlagThresholds&& thresholds, ui64 sizeLimit) { if (!Active) { // PhantomFlagStorage was deactivated while building, do nothing return; } - AdjustSize(sizeLimit); - ui64 flagsAdded = 0; - for (const TLogoBlobRec& rec : flags) { - if (!AddFlag(rec)) { - break; + if (IsPersistent) { + for (const TLogoBlobRec& rec : flags) { + AddItemToWriteBuffer(TPhantomFlagStorageItem::CreateFlag(&rec)); + } + std::vector<TPhantomFlagThresholds::TThreshold> thresholdList = thresholds.GetList(); + for (const auto [tabletId, channel, generation, step, orderNumber] : thresholdList) { + AddItemToWriteBuffer(TPhantomFlagStorageItem::CreateThreshold(orderNumber, + tabletId, channel, generation, step)); + } + } else { + AdjustSize(sizeLimit); + ui64 flagsAdded = 0; + for (const TLogoBlobRec& rec : flags) { + if (!AddFlag(rec)) { + break; + } + ++flagsAdded; } - ++flagsAdded; + Thresholds.Merge(std::move(thresholds)); + + STLOG(PRI_DEBUG, BS_PHANTOM_FLAG_STORAGE, BSPFS06, + VDISKP(SlCtx->VCtx, "Finish building"), + (FlagsAdded, flagsAdded), + (FlagsReceived, flags.size())); } - Thresholds.Merge(std::move(thresholds)); Building = false; +} - STLOG(PRI_DEBUG, BS_PHANTOM_FLAG_STORAGE, BSPFS06, - VDISKP(SlCtx->VCtx, "Finish building"), - (FlagsAdded, flagsAdded), - (FlagsReceived, flags.size())); +void TPhantomFlagStorageState::Recover(TPhantomFlagStorageSnapshot&& snapshot) { + STLOG(PRI_DEBUG, BS_PHANTOM_FLAG_STORAGE, BSPFS10, + VDISKP(SlCtx->VCtx, "Recovering PhantomFlagStorage")); + Building = false; + Thresholds.Merge(std::move(snapshot.Thresholds)); } void TPhantomFlagStorageState::Deactivate() { STLOG(PRI_NOTICE, BS_PHANTOM_FLAG_STORAGE, BSPFS07, VDISKP(SlCtx->VCtx, "Deactivating PhantomFlagStorage"), (FlagsDropped, StoredFlags.size())); - StoredFlags.clear(); Thresholds.Clear(); Active = false; Building = false; + if (IsPersistent) { + TActivationContext::Send(new IEventHandle(ProcessorId, TActorId{}, new TEvPhantomFlagStorageDrop)); + WriteBuffer.clear(); + WriteBufferSize = 0; + } else { + StoredFlags.clear(); + } } -TPhantomFlagStorageSnapshot TPhantomFlagStorageState::GetSnapshot() const { - STLOG(PRI_DEBUG, BS_PHANTOM_FLAG_STORAGE, BSPFS05, - VDISKP(SlCtx->VCtx, "Acquiring snapshot"), - (FlagsCount, StoredFlags.size())); - return TPhantomFlagStorageSnapshot(StoredFlags); +void TPhantomFlagStorageState::RequestSnapshot(TEvPhantomFlagStorageGetSnapshot::TPtr ev) const { + if (IsPersistent) { + TActivationContext::Send(ev->Forward(ProcessorId)); + } else { + STLOG(PRI_DEBUG, BS_PHANTOM_FLAG_STORAGE, BSPFS05, + VDISKP(SlCtx->VCtx, "Acquiring snapshot"), + (FlagsCount, StoredFlags.size())); + auto res = std::make_unique<TEvPhantomFlagStorageGetSnapshotResult>(TPhantomFlagStorageSnapshot(StoredFlags, Thresholds)); + TActivationContext::Send(new IEventHandle(ev->Sender, ev->Recipient, res.release())); + } } bool TPhantomFlagStorageState::IsActive() const { @@ -190,6 +243,50 @@ void TPhantomFlagStorageState::UpdateMetrics() { SlCtx->PhantomFlagStorageGroup.ThresholdsMemoryConsumption() = Thresholds.EstimatedMemoryConsumption(); } +void TPhantomFlagStorageState::AddItemToWriteBuffer(const TPhantomFlagStorageItem& item) { + if (WriteBufferSize + item.SerializedSize() > WriteBufferSizeLimit) { + FlushWriteBuffer(); + } + WriteBuffer.push_back(item); + WriteBufferSize += item.SerializedSize(); +} + +void TPhantomFlagStorageState::FlushWriteBuffer() { + if (!WriteBuffer.empty()) { + auto ev = std::make_unique<TEvPhantomFlagStorageWriteItems>(std::move(WriteBuffer)); + TActivationContext::Send(new IEventHandle(ProcessorId, TActorId{}, ev.release())); + WriteBufferSize = 0; + WriteBufferFlushTimestamp = TActivationContext::Monotonic(); + } +} + +void TPhantomFlagStorageState::FlushWriteBufferIfNeeded() { + TMonotonic now = TActivationContext::Monotonic(); + if (now - WriteBufferFlushTimestamp > WriteBufferFlushPeriod) { + FlushWriteBuffer(); + } +} + +void TPhantomFlagStorageState::SyncLogIsCut() { + FlushWriteBuffer(); +} + +std::optional<TPhantomFlagStorageData> TPhantomFlagStorageState::GetPersistentData() const { + return PersistentData; +} + +void TPhantomFlagStorageState::UpdatePersistentData(std::optional<TPhantomFlagStorageData>&& data) { + PersistentData = std::move(data); +} + +void TPhantomFlagStorageState::Terminate() { + if (ProcessorId != TActorId{}) { + TActivationContext::Send(new IEventHandle(ProcessorId, TActorId{}, new TEvents::TEvPoisonPill)); + WriteBuffer.clear(); + WriteBufferSize = 0; + } +} + } // namespace NSyncLog } // namespace NKikimr diff --git a/ydb/core/blobstorage/vdisk/synclog/phantom_flag_storage/phantom_flag_storage_state.h b/ydb/core/blobstorage/vdisk/synclog/phantom_flag_storage/phantom_flag_storage_state.h index 956bef41d32..f8dd47069e1 100644 --- a/ydb/core/blobstorage/vdisk/synclog/phantom_flag_storage/phantom_flag_storage_state.h +++ b/ydb/core/blobstorage/vdisk/synclog/phantom_flag_storage/phantom_flag_storage_state.h @@ -1,8 +1,10 @@ #pragma once #include <ydb/core/blobstorage/vdisk/synclog/blobstorage_synclogformat.h> +#include <ydb/core/blobstorage/vdisk/synclog/blobstorage_synclog_private_events.h> #include "phantom_flags.h" +#include "phantom_flag_storage_data.h" #include "phantom_flag_storage_snapshot.h" #include "phantom_flag_thresholds.h" @@ -19,6 +21,8 @@ class TPhantomFlagStorageState { public: TPhantomFlagStorageState(TIntrusivePtr<TSyncLogCtx> slCtx); + void InitializePersistent(TPhantomFlagStorageData&& data, TActorId syncLogKeeperId, + TActorId chunkKeeperId, ui32 appendBlockSize); void StartBuilding(); // Adds DoNotKeep flags from synclog if needed @@ -27,13 +31,12 @@ public: // Add all DoNotKeep records from cut synclog snapshot up to sizeLimit // Note: in some obscure cases there may be two active builders simultaneously // It shouldn't make any difference though, we just add more flags - void FinishBuilding(TPhantomFlags&& flags, TPhantomFlagThresholds&& thresholds, ui64 sizeLimit); + void FinishInitialBuilding(TPhantomFlags&& flags, TPhantomFlagThresholds&& thresholds, ui64 sizeLimit); + void Recover(TPhantomFlagStorageSnapshot&& snapshot); void Deactivate(); - // TODO: rebuild thresholds structure after restart. Either write it to VDisk log or rebuild from hull snapshot - // Read everything from storage - TPhantomFlagStorageSnapshot GetSnapshot() const; + void RequestSnapshot(TEvPhantomFlagStorageGetSnapshot::TPtr request) const; bool IsActive() const; // Process sync data from neighbours, we do it to update Thresholds @@ -46,6 +49,12 @@ public: void UpdateMetrics(); + std::optional<TPhantomFlagStorageData> GetPersistentData() const; + void UpdatePersistentData(std::optional<TPhantomFlagStorageData>&& data); + void FlushWriteBufferIfNeeded(); + void SyncLogIsCut(); + void Terminate(); + private: // Adds DoNotKeep flags to storage and Keeps to Thresholds for specified neighbour void ProcessBlobRecordFromNeighbour(ui32 orderNumber, const TLogoBlobRec* blobRec); @@ -55,15 +64,31 @@ private: void AdjustSize(ui64 sizeLimit); bool AddFlag(const TLogoBlobRec& blobRec); + void AddItemToWriteBuffer(const TPhantomFlagStorageItem& item); + void FlushWriteBuffer(); + private: TIntrusivePtr<TSyncLogCtx> SlCtx; const TBlobStorageGroupType GType; TPhantomFlagThresholds Thresholds; TPhantomFlags StoredFlags; - ui64 MaxFlagsStoredCount; + ui64 MaxFlagsStoredCount = 0; TSyncedMask SyncedMask; bool Active = false; bool Building = false; + + // persistent phantom flag storage + bool IsPersistent = false; + TActorId ProcessorId; + std::vector<TPhantomFlagStorageItem> WriteBuffer; + ui32 WriteBufferSize = 0; + std::optional<TPhantomFlagStorageData> PersistentData; + TMonotonic WriteBufferFlushTimestamp = TMonotonic::Zero(); + +private: + // TODO: remove write buffer, use sync log + constexpr static ui32 WriteBufferSizeLimit = 1_MB; + constexpr static TDuration WriteBufferFlushPeriod = TDuration::Seconds(30); }; } // namespace NSyncLog diff --git a/ydb/core/blobstorage/vdisk/synclog/phantom_flag_storage/phantom_flag_thresholds.cpp b/ydb/core/blobstorage/vdisk/synclog/phantom_flag_storage/phantom_flag_thresholds.cpp index 78b4176e6d3..2d456b32d3d 100644 --- a/ydb/core/blobstorage/vdisk/synclog/phantom_flag_storage/phantom_flag_thresholds.cpp +++ b/ydb/core/blobstorage/vdisk/synclog/phantom_flag_storage/phantom_flag_thresholds.cpp @@ -15,10 +15,6 @@ TPhantomFlagThresholds::TTabletChannel TPhantomFlagThresholds::MakeTabletChannel return TTabletChannel(blobId.TabletID(), blobId.Channel()); } -inline ui64 TPhantomFlagThresholds::THasher::operator()(const TTabletChannel& x) const { - return std::hash<ui64>{}((x.first << 8) | x.second); -} - TPhantomFlagThresholds::TTabletThresholds::TTabletThresholds() { Thresholds.resize(MaxExpectedDisksInGroup); } @@ -64,6 +60,17 @@ void TPhantomFlagThresholds::TTabletThresholds::Merge(TBlobStorageGroupType grou } } +std::vector<TPhantomFlagThresholds::TTabletThresholds::TTabletThreshold> + TPhantomFlagThresholds::TTabletThresholds::GetList() const { + std::vector<TTabletThreshold> res; + for (ui32 orderNumber = 0; orderNumber < Thresholds.size(); ++orderNumber) { + if (Thresholds[orderNumber]) { + res.emplace_back(Thresholds[orderNumber]->first, Thresholds[orderNumber]->second, orderNumber); + } + } + return res; +} + TString TPhantomFlagThresholds::TTabletThresholds::ToString(TBlobStorageGroupType groupType) const { TStringStream str; str << "{"; @@ -94,6 +101,10 @@ void TPhantomFlagThresholds::AddBlob(ui32 orderNumber, const TLogoBlobID& blob) TabletThresholds[MakeTabletChannel(blob)].AddBlob(orderNumber, MakeGenStep(blob)); } +void TPhantomFlagThresholds::AddBlob(ui32 orderNumber, ui64 tabletId, ui8 channel, ui32 generation, ui32 step) { + TabletThresholds[{tabletId, channel}].AddBlob(orderNumber, {generation, step}); +} + void TPhantomFlagThresholds::AddHardBarrier(ui32 orderNumber, ui64 tabletId, ui8 channel, ui32 generation, ui32 step) { TTabletChannel tabletChannel{tabletId, channel}; if (auto it = TabletThresholds.find(tabletChannel); it != TabletThresholds.end()) { @@ -142,6 +153,19 @@ void TPhantomFlagThresholds::Clear() { TabletThresholds.clear(); } + +std::vector<TPhantomFlagThresholds::TThreshold> TPhantomFlagThresholds::GetList() const { + std::vector<TThreshold> res; + for (const auto& [tabletChannel, thresholds] : TabletThresholds) { + auto [tabletId, channel] = tabletChannel; + std::vector<TTabletThresholds::TTabletThreshold> tuples = thresholds.GetList(); + for (const auto& [generation, step, orderNumber] : tuples) { + res.emplace_back(tabletId, channel, generation, step, orderNumber); + } + } + return res; +} + TString TPhantomFlagThresholds::ToString() const { TStringStream str; str << "TPhantomFlagThresholds# { "; diff --git a/ydb/core/blobstorage/vdisk/synclog/phantom_flag_storage/phantom_flag_thresholds.h b/ydb/core/blobstorage/vdisk/synclog/phantom_flag_storage/phantom_flag_thresholds.h index 014630b0983..2361152a0e6 100644 --- a/ydb/core/blobstorage/vdisk/synclog/phantom_flag_storage/phantom_flag_thresholds.h +++ b/ydb/core/blobstorage/vdisk/synclog/phantom_flag_storage/phantom_flag_thresholds.h @@ -22,10 +22,20 @@ namespace NSyncLog { class TPhantomFlagThresholds { public: + struct TThreshold { + ui64 TabletId = 0; + ui8 Channel = 0; + ui32 Generation = 0; + ui32 Step = 0; + ui8 OrderNumber = 0; + }; + +public: TPhantomFlagThresholds(const TBlobStorageGroupType& gtype); void AddBlob(ui32 orderNumber, const TLogoBlobID& blob); void AddBlob(const TLogoBlobID& blob); + void AddBlob(ui32 orderNumber, ui64 tabletId, ui8 channel, ui32 generation, ui32 step); void AddHardBarrier(ui32 orderNumber, ui64 tabletId, ui8 channel, ui32 generation, ui32 step); bool IsBehindThresholdOnUnsynced(const TLogoBlobID& blob, const TSyncedMask& syncedMask) const; TPhantomFlags Sift(const TPhantomFlags& flags, const TSyncedMask& syncedMask); @@ -33,6 +43,7 @@ public: void Merge(TPhantomFlagThresholds&& other); void Clear(); + std::vector<TThreshold> GetList() const; TString ToString() const; private: @@ -43,13 +54,22 @@ private: static TTabletChannel MakeTabletChannel(const TLogoBlobID& blobId); struct THasher { - inline ui64 operator()(const TTabletChannel& x) const; + inline ui64 operator()(const TTabletChannel& x) const { + return std::hash<ui64>{}((x.first << 8) | x.second); + } }; private: // auxiliary classes class TTabletThresholds { public: + struct TTabletThreshold { + ui32 Generation = 0; + ui32 Step = 0; + ui8 OrderNumber = 0; + }; + + public: TTabletThresholds(); void AddBlob(ui32 orderNumber, TGenStep genStep); @@ -60,6 +80,7 @@ private: const TSyncedMask& syncedMask) const; void Merge(TBlobStorageGroupType groupType, TTabletThresholds&& other); TString ToString(TBlobStorageGroupType groupType) const; + std::vector<TTabletThreshold> GetList() const; private: TStackVec<std::optional<TGenStep>, MaxExpectedDisksInGroup> Thresholds; diff --git a/ydb/core/blobstorage/vdisk/synclog/phantom_flag_storage/ya.make b/ydb/core/blobstorage/vdisk/synclog/phantom_flag_storage/ya.make index 7f1fe059873..7aeea4f639d 100644 --- a/ydb/core/blobstorage/vdisk/synclog/phantom_flag_storage/ya.make +++ b/ydb/core/blobstorage/vdisk/synclog/phantom_flag_storage/ya.make @@ -11,6 +11,8 @@ PEERDIR( SRCS( phantom_flag_storage_builder.cpp + phantom_flag_storage_data.cpp + phantom_flag_storage_processor.cpp phantom_flag_storage_snapshot.cpp phantom_flag_storage_state.cpp phantom_flag_thresholds.cpp diff --git a/ydb/core/protos/blobstorage_vdisk_internal.proto b/ydb/core/protos/blobstorage_vdisk_internal.proto index f6ec56c6c45..a7d2f17f370 100644 --- a/ydb/core/protos/blobstorage_vdisk_internal.proto +++ b/ydb/core/protos/blobstorage_vdisk_internal.proto @@ -204,6 +204,16 @@ message TAddBulkSstRecoveryLogRec { //////////////////////////////////////////////////////////////////////////////// // TSyncLogEntryPoint -- entry point for SyncLog //////////////////////////////////////////////////////////////////////////////// +message TPhantomFlagStorageData { + message TPhantomFlagStorageChunk { + optional uint32 ChunkIdx = 1; + optional uint32 DataSize = 2; + }; + + repeated TPhantomFlagStorageChunk Chunks = 1; + optional uint32 ChunkSize = 2; +}; + message TSyncLogEntryPoint { // HEADER // PDisk guid for additional check @@ -223,6 +233,8 @@ message TSyncLogEntryPoint { repeated uint32 ChunksToDeleteDelayed = 20; // opaque DiskRecLog serialized bytes optional bytes DiskRecLogSerialized = 21; + + optional TPhantomFlagStorageData PhantomFlagStorageData = 22; }; message TScrubState { diff --git a/ydb/core/protos/config.proto b/ydb/core/protos/config.proto index 4cbd68c2cc3..4f4bb50804b 100644 --- a/ydb/core/protos/config.proto +++ b/ydb/core/protos/config.proto @@ -1840,6 +1840,13 @@ message TImmediateControlsConfig { MaxValue: 1, DefaultValue: 0, }]; + + optional uint64 EnablePersistentPhantomFlagStorage = 41 [(ControlOptions) = { + Description: "Enables Persistent Phantom Flag Storage", + MinValue: 0, + MaxValue: 1, + DefaultValue: 0, + }]; } message TTabletControls { diff --git a/ydb/library/services/services.proto b/ydb/library/services/services.proto index 1a1fdcd4c7b..33ab56ba88e 100644 --- a/ydb/library/services/services.proto +++ b/ydb/library/services/services.proto @@ -62,6 +62,7 @@ enum EServiceKikimr { BS_PHANTOM_FLAG_STORAGE = 2607; BS_CHUNK_KEEPER = 2608; BS_COMP_BROKER = 2609; + BS_PHANTOM_FLAG_PROCESSOR = 2610; // DATASHARD section // TX_DATASHARD = 290; // @@ -1166,5 +1167,6 @@ message TActivity { BS_DDISK = 685; BS_PERSISTENT_BUFFER = 686; BS_SYNCER_MERGER = 687; + BS_PHANTOM_FLAG_STORAGE_WRITER = 688; }; }; |
