summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorSergey Belyakov <[email protected]>2026-04-24 10:19:34 +0300
committerGitHub <[email protected]>2026-04-24 10:19:34 +0300
commit0070166bd0cebfc656d8a8d95c27cce91ceda2c6 (patch)
treed9cffa3ff5d67348052e25ace8dc26d5511f659d
parenta8f115547d190dd2c4e7ab3ca59a4507e7c4dd77 (diff)
Introduce persistent PhantomFlagStorage (#37626)
-rw-r--r--intelliboba.vsixbin0 -> 27652816 bytes
-rw-r--r--ydb/core/base/blobstorage.h3
-rw-r--r--ydb/core/blobstorage/nodewarden/node_warden_impl.cpp2
-rw-r--r--ydb/core/blobstorage/nodewarden/node_warden_impl.h1
-rw-r--r--ydb/core/blobstorage/nodewarden/node_warden_vdisk.cpp1
-rw-r--r--ydb/core/blobstorage/ut_blobstorage/lib/debug_log.cpp1
-rw-r--r--ydb/core/blobstorage/ut_blobstorage/lib/env.h2
-rw-r--r--ydb/core/blobstorage/ut_blobstorage/phantom_blobs.cpp44
-rw-r--r--ydb/core/blobstorage/ut_vdisk/lib/test_synclog.cpp2
-rw-r--r--ydb/core/blobstorage/vdisk/chunk_keeper/chunk_keeper_events.cpp46
-rw-r--r--ydb/core/blobstorage/vdisk/chunk_keeper/chunk_keeper_events.h6
-rw-r--r--ydb/core/blobstorage/vdisk/common/vdisk_config.h1
-rw-r--r--ydb/core/blobstorage/vdisk/skeleton/blobstorage_skeleton.cpp2
-rw-r--r--ydb/core/blobstorage/vdisk/synclog/blobstorage_synclog_context.h6
-rw-r--r--ydb/core/blobstorage/vdisk/synclog/blobstorage_synclog_private_events.cpp10
-rw-r--r--ydb/core/blobstorage/vdisk/synclog/blobstorage_synclog_private_events.h24
-rw-r--r--ydb/core/blobstorage/vdisk/synclog/blobstorage_synclogdata.cpp36
-rw-r--r--ydb/core/blobstorage/vdisk/synclog/blobstorage_synclogdata.h12
-rw-r--r--ydb/core/blobstorage/vdisk/synclog/blobstorage_synclogformat.h2
-rw-r--r--ydb/core/blobstorage/vdisk/synclog/blobstorage_synclogkeeper.cpp15
-rw-r--r--ydb/core/blobstorage/vdisk/synclog/blobstorage_synclogkeeper_state.cpp40
-rw-r--r--ydb/core/blobstorage/vdisk/synclog/blobstorage_synclogkeeper_state.h14
-rw-r--r--ydb/core/blobstorage/vdisk/synclog/blobstorage_synclogkeeper_ut.cpp2
-rw-r--r--ydb/core/blobstorage/vdisk/synclog/phantom_flag_storage/phantom_flag_storage_data.cpp161
-rw-r--r--ydb/core/blobstorage/vdisk/synclog/phantom_flag_storage/phantom_flag_storage_data.h73
-rw-r--r--ydb/core/blobstorage/vdisk/synclog/phantom_flag_storage/phantom_flag_storage_processor.cpp421
-rw-r--r--ydb/core/blobstorage/vdisk/synclog/phantom_flag_storage/phantom_flag_storage_processor.h25
-rw-r--r--ydb/core/blobstorage/vdisk/synclog/phantom_flag_storage/phantom_flag_storage_snapshot.cpp10
-rw-r--r--ydb/core/blobstorage/vdisk/synclog/phantom_flag_storage/phantom_flag_storage_snapshot.h9
-rw-r--r--ydb/core/blobstorage/vdisk/synclog/phantom_flag_storage/phantom_flag_storage_state.cpp137
-rw-r--r--ydb/core/blobstorage/vdisk/synclog/phantom_flag_storage/phantom_flag_storage_state.h35
-rw-r--r--ydb/core/blobstorage/vdisk/synclog/phantom_flag_storage/phantom_flag_thresholds.cpp32
-rw-r--r--ydb/core/blobstorage/vdisk/synclog/phantom_flag_storage/phantom_flag_thresholds.h23
-rw-r--r--ydb/core/blobstorage/vdisk/synclog/phantom_flag_storage/ya.make2
-rw-r--r--ydb/core/protos/blobstorage_vdisk_internal.proto12
-rw-r--r--ydb/core/protos/config.proto7
-rw-r--r--ydb/library/services/services.proto2
37 files changed, 1154 insertions, 67 deletions
diff --git a/intelliboba.vsix b/intelliboba.vsix
new file mode 100644
index 00000000000..0d95ac75f47
--- /dev/null
+++ b/intelliboba.vsix
Binary files differ
diff --git a/ydb/core/base/blobstorage.h b/ydb/core/base/blobstorage.h
index 173e3173545..99810da17d4 100644
--- a/ydb/core/base/blobstorage.h
+++ b/ydb/core/base/blobstorage.h
@@ -783,6 +783,9 @@ struct TEvBlobStorage {
EvChunkWriteRaw,
EvStartCompactionFromDefrag,
EvSyncerFullSyncFinished,
+ EvPhantomFlagStorageWriteItems,
+ EvPhantomFlagStorageCommitData,
+ EvPhantomFlagStorageDrop,
EvYardInitResult = EvPut + 9 * 512, /// 268 636 672
EvLogResult,
diff --git a/ydb/core/blobstorage/nodewarden/node_warden_impl.cpp b/ydb/core/blobstorage/nodewarden/node_warden_impl.cpp
index ff189bf15e5..aae34d3be0d 100644
--- a/ydb/core/blobstorage/nodewarden/node_warden_impl.cpp
+++ b/ydb/core/blobstorage/nodewarden/node_warden_impl.cpp
@@ -61,6 +61,7 @@ TNodeWarden::TNodeWarden(const TIntrusivePtr<TNodeWardenConfig> &cfg)
, ThrottlingMaxLogChunkCount(130, 1, 100000)
, MaxInProgressSyncCount(0, 0, 1000)
, EnablePhantomFlagStorage(1, 0, 1)
+ , EnablePersistentPhantomFlagStorage(0, 0, 1)
, PhantomFlagStorageLimitPerVDiskBytes(10'000'000, 0, 100'000'000'000)
, EnableChunkKeeper(0, 0, 1)
, MaxCommonLogChunksHDD(NPDisk::MaxCommonLogChunks, 1, 1'000'000)
@@ -432,6 +433,7 @@ void TNodeWarden::Bootstrap() {
TControlBoard::RegisterSharedControl(MaxInProgressSyncCount, icb->VDiskControls.MaxInProgressSyncCount);
TControlBoard::RegisterSharedControl(EnablePhantomFlagStorage, icb->VDiskControls.EnablePhantomFlagStorage);
+ TControlBoard::RegisterSharedControl(EnablePersistentPhantomFlagStorage, icb->VDiskControls.EnablePersistentPhantomFlagStorage);
TControlBoard::RegisterSharedControl(PhantomFlagStorageLimitPerVDiskBytes, icb->VDiskControls.PhantomFlagStorageLimitPerVDiskBytes);
TControlBoard::RegisterSharedControl(EnableChunkKeeper, icb->VDiskControls.EnableChunkKeeper);
diff --git a/ydb/core/blobstorage/nodewarden/node_warden_impl.h b/ydb/core/blobstorage/nodewarden/node_warden_impl.h
index 29c37d801aa..34a73ab097a 100644
--- a/ydb/core/blobstorage/nodewarden/node_warden_impl.h
+++ b/ydb/core/blobstorage/nodewarden/node_warden_impl.h
@@ -242,6 +242,7 @@ namespace NKikimr::NStorage {
TControlWrapper MaxInProgressSyncCount;
TControlWrapper EnablePhantomFlagStorage;
+ TControlWrapper EnablePersistentPhantomFlagStorage;
TControlWrapper PhantomFlagStorageLimitPerVDiskBytes;
TControlWrapper EnableChunkKeeper;
diff --git a/ydb/core/blobstorage/nodewarden/node_warden_vdisk.cpp b/ydb/core/blobstorage/nodewarden/node_warden_vdisk.cpp
index fed226301bf..89d3251dc33 100644
--- a/ydb/core/blobstorage/nodewarden/node_warden_vdisk.cpp
+++ b/ydb/core/blobstorage/nodewarden/node_warden_vdisk.cpp
@@ -276,6 +276,7 @@ namespace NKikimr::NStorage {
vdiskConfig->MaxInProgressSyncCount = MaxInProgressSyncCount;
vdiskConfig->EnablePhantomFlagStorage = EnablePhantomFlagStorage;
+ vdiskConfig->EnablePersistentPhantomFlagStorage = EnablePersistentPhantomFlagStorage;
vdiskConfig->PhantomFlagStorageLimit = PhantomFlagStorageLimitPerVDiskBytes;
vdiskConfig->EnableChunkKeeper = EnableChunkKeeper;
diff --git a/ydb/core/blobstorage/ut_blobstorage/lib/debug_log.cpp b/ydb/core/blobstorage/ut_blobstorage/lib/debug_log.cpp
index 9bf79e1b42f..dbf84c3260e 100644
--- a/ydb/core/blobstorage/ut_blobstorage/lib/debug_log.cpp
+++ b/ydb/core/blobstorage/ut_blobstorage/lib/debug_log.cpp
@@ -35,4 +35,5 @@ const std::initializer_list<ui32> TEnvironmentSetup::DebugLogComponents{
// NKikimrServices::BS_VDISK_BALANCING,
// NKikimrServices::BS_PHANTOM_FLAG_STORAGE,
// NKikimrServices::BS_CHUNK_KEEPER,
+// NKikimrServices::BS_PHANTOM_FLAG_PROCESSOR,
};
diff --git a/ydb/core/blobstorage/ut_blobstorage/lib/env.h b/ydb/core/blobstorage/ut_blobstorage/lib/env.h
index 5aff48ff4b3..59630bf6493 100644
--- a/ydb/core/blobstorage/ut_blobstorage/lib/env.h
+++ b/ydb/core/blobstorage/ut_blobstorage/lib/env.h
@@ -76,6 +76,7 @@ struct TEnvironmentSetup {
const TDuration MaxPutTimeoutDSProxy = TDuration::Seconds(60);
const bool StartFakeWilsonCollectors = false;
const bool EnableChunkKeeper = true;
+ const bool EnablePersistentPhantomFlagStorage = false;
};
const TSettings Settings;
@@ -572,6 +573,7 @@ config:
ADD_ICB_CONTROL(PDiskControls.MaxActiveCompactionsPerPDisk, 0, 0, 1'000'000, 0);
ADD_ICB_CONTROL(VDiskControls.GarbageThresholdToRunFullCompactionPerMille, 0, 0, 300, 0);
ADD_ICB_CONTROL(VDiskControls.EnablePhantomFlagStorage, true, false, true, Settings.EnablePhantomFlagStorage);
+ ADD_ICB_CONTROL(VDiskControls.EnablePersistentPhantomFlagStorage, false, false, true, Settings.EnablePersistentPhantomFlagStorage);
ADD_ICB_CONTROL(VDiskControls.PhantomFlagStorageLimitPerVDiskBytes, 10'000'000, 0, 100'000'000'000, Settings.PhantomFlagStorageLimitPerVDiskBytes);
ADD_ICB_CONTROL(VDiskControls.EnableChunkKeeper, true, false, true, Settings.EnableChunkKeeper);
ADD_ICB_CONTROL(VDiskControls.HullCompFreeSpaceThresholdPerMille, 2000, 0, 100'000, 2000);
diff --git a/ydb/core/blobstorage/ut_blobstorage/phantom_blobs.cpp b/ydb/core/blobstorage/ut_blobstorage/phantom_blobs.cpp
index 75d5c347951..2e87a68581c 100644
--- a/ydb/core/blobstorage/ut_blobstorage/phantom_blobs.cpp
+++ b/ydb/core/blobstorage/ut_blobstorage/phantom_blobs.cpp
@@ -17,6 +17,7 @@ Y_UNIT_TEST_SUITE(PhantomBlobs) {
EOnline Online;
bool PhantomFlagStorageEnabled;
ui64 MemoryLimit = 10_MB;
+ bool PersistentPhantomFlagStorageEnabled = false;
};
struct TTestCtx : public TTestCtxBase {
@@ -38,6 +39,8 @@ Y_UNIT_TEST_SUITE(PhantomBlobs) {
NodeStates[nodeId - 1].PhantomFlagStorageEnabled);
Env->SetIcbControl(nodeId, "VDiskControls.PhantomFlagStorageLimitPerVDiskBytes",
NodeStates[nodeId - 1].MemoryLimit);
+ Env->SetIcbControl(nodeId, "VDiskControls.EnablePersistentPhantomFlagStorage",
+ NodeStates[nodeId - 1].PersistentPhantomFlagStorageEnabled);
}
}
@@ -306,27 +309,28 @@ Y_UNIT_TEST_SUITE(PhantomBlobs) {
};
std::vector<TNodeState> GetStates(TBlobStorageGroupType erasure, EOnline online,
- bool phantomFlagStorageEnabled, ui64 memoryLimit) {
+ bool phantomFlagStorageEnabled, bool isPersistent, ui64 memoryLimit) {
return std::vector<TNodeState>(erasure.BlobSubgroupSize(),
TNodeState{
.Online = online,
.PhantomFlagStorageEnabled = phantomFlagStorageEnabled,
- .MemoryLimit = memoryLimit
+ .MemoryLimit = memoryLimit,
+ .PersistentPhantomFlagStorageEnabled = isPersistent,
});
}
std::vector<TNodeState> GetStatesAllAlive(TBlobStorageGroupType erasure, ui64 memoryLimit) {
- return GetStates(erasure, EOnline::Alive, true, memoryLimit);
+ return GetStates(erasure, EOnline::Alive, true, false, memoryLimit);
}
std::vector<TNodeState> GetStatesOneDead(TBlobStorageGroupType erasure, ui64 memoryLimit) {
- std::vector<TNodeState> states = GetStates(erasure, EOnline::Alive, true, memoryLimit);
+ std::vector<TNodeState> states = GetStates(erasure, EOnline::Alive, true, false, memoryLimit);
states[0].Online = EOnline::Dead;
return states;
}
std::vector<TNodeState> GetStatesTwoDead(TBlobStorageGroupType erasure, ui64 memoryLimit) {
- std::vector<TNodeState> states = GetStates(erasure, EOnline::Alive, true, memoryLimit);
+ std::vector<TNodeState> states = GetStates(erasure, EOnline::Alive, true, false, memoryLimit);
states[0].Online = EOnline::Dead;
states[4].Online = EOnline::Dead;
return states;
@@ -334,14 +338,14 @@ Y_UNIT_TEST_SUITE(PhantomBlobs) {
std::vector<TNodeState> GetStatesOneDeadAllRestart(TBlobStorageGroupType erasure,
ui64 memoryLimit) {
- std::vector<TNodeState> states = GetStates(erasure, EOnline::Restart, true, memoryLimit);
+ std::vector<TNodeState> states = GetStates(erasure, EOnline::Restart, true, true, memoryLimit);
states[0].Online = EOnline::Dead;
return states;
}
std::vector<TNodeState> GetStatesOneDeadActiveOneDeadInactive(TBlobStorageGroupType erasure,
ui64 memoryLimit) {
- std::vector<TNodeState> states = GetStates(erasure, EOnline::Alive, true, memoryLimit);
+ std::vector<TNodeState> states = GetStates(erasure, EOnline::Alive, true, false, memoryLimit);
states[0].Online = EOnline::Dead;
states[0].PhantomFlagStorageEnabled = false;
states[4].Online = EOnline::Dead;
@@ -351,7 +355,7 @@ Y_UNIT_TEST_SUITE(PhantomBlobs) {
std::vector<TNodeState> GetStatesTwoDeadInactive(TBlobStorageGroupType erasure,
ui64 memoryLimit) {
- std::vector<TNodeState> states = GetStates(erasure, EOnline::Alive, true, memoryLimit);
+ std::vector<TNodeState> states = GetStates(erasure, EOnline::Alive, true, false, memoryLimit);
states[0].Online = EOnline::Dead;
states[0].PhantomFlagStorageEnabled = false;
states[4].Online = EOnline::Dead;
@@ -361,7 +365,7 @@ Y_UNIT_TEST_SUITE(PhantomBlobs) {
std::vector<TNodeState> GetStatesTwoDeadAllAliveInactive(TBlobStorageGroupType erasure,
ui64 memoryLimit) {
- std::vector<TNodeState> states = GetStates(erasure, EOnline::Alive, false, memoryLimit);
+ std::vector<TNodeState> states = GetStates(erasure, EOnline::Alive, false, false, memoryLimit);
states[0].Online = EOnline::Dead;
states[0].PhantomFlagStorageEnabled = true;
states[4].Online = EOnline::Dead;
@@ -371,7 +375,7 @@ Y_UNIT_TEST_SUITE(PhantomBlobs) {
std::vector<TNodeState> GetStatesTwoDeadSomeAliveInactive(TBlobStorageGroupType erasure,
ui64 memoryLimit) {
- std::vector<TNodeState> states = GetStates(erasure, EOnline::Alive, false, memoryLimit);
+ std::vector<TNodeState> states = GetStates(erasure, EOnline::Alive, false, false, memoryLimit);
states[0].Online = EOnline::Dead;
states[0].PhantomFlagStorageEnabled = true;
states[1].PhantomFlagStorageEnabled = true;
@@ -393,6 +397,7 @@ Y_UNIT_TEST_SUITE(PhantomBlobs) {
.PDiskChunkSize = 32_MB,
.EnablePhantomFlagStorage = false,
.TinySyncLog = true,
+ .EnablePersistentPhantomFlagStorage = false,
}, 100, 10000, nodeStates, expectPhantoms);
if (nodeStates2) {
ctx.RunTestWithUpdate(*nodeStates2);
@@ -415,10 +420,9 @@ Y_UNIT_TEST_SUITE(PhantomBlobs) {
TEST_PHANTOM_BLOBS(TwoDead, Mirror3dc, false, 10_KB);
- // TODO (serg-belyakov@): persistent phantom flag storage
- // TEST_PHANTOM_BLOBS(OneDeadAllRestart, Mirror3dc, false);
- // TEST_PHANTOM_BLOBS(OneDeadAllRestart, Mirror3of4, false);
- // TEST_PHANTOM_BLOBS(OneDeadAllRestart, 4Plus2Block, false);
+ TEST_PHANTOM_BLOBS(OneDeadAllRestart, Mirror3dc, false, 10_KB);
+ TEST_PHANTOM_BLOBS(OneDeadAllRestart, Mirror3of4, false, 10_KB);
+ TEST_PHANTOM_BLOBS(OneDeadAllRestart, 4Plus2Block, false, 10_KB);
TEST_PHANTOM_BLOBS(TwoDeadInactive, Mirror3dc, false, 10_KB);
TEST_PHANTOM_BLOBS(OneDeadActiveOneDeadInactive, Mirror3dc, false, 10_KB);
@@ -429,26 +433,26 @@ Y_UNIT_TEST_SUITE(PhantomBlobs) {
Y_UNIT_TEST(TestDisabling) {
auto erasure = TBlobStorageGroupType::ErasureMirror3dc;
- auto states1 = GetStates(erasure, EOnline::Alive, true, 10_KB);
+ auto states1 = GetStates(erasure, EOnline::Alive, true, false, 10_KB);
states1[0].Online = EOnline::Dead;
- auto states2 = GetStates(erasure, EOnline::Alive, false, 10_KB);
+ auto states2 = GetStates(erasure, EOnline::Alive, false, false, 10_KB);
states2[0].Online = EOnline::Dead;
Test(erasure, states1, states2, true);
}
Y_UNIT_TEST(TestEnabling) {
auto erasure = TBlobStorageGroupType::ErasureMirror3dc;
- auto states1 = GetStates(erasure, EOnline::Alive, false, 10_KB);
+ auto states1 = GetStates(erasure, EOnline::Alive, false, false, 10_KB);
states1[0].Online = EOnline::Dead;
- auto states2 = GetStates(erasure, EOnline::Alive, true, 10_KB);
+ auto states2 = GetStates(erasure, EOnline::Alive, true, false, 10_KB);
states2[0].Online = EOnline::Dead;
Test(erasure, states1, states2, true);
}
Y_UNIT_TEST(TestLoweringMemoryLimit) {
auto erasure = TBlobStorageGroupType::ErasureMirror3dc;
- auto states1 = GetStates(erasure, EOnline::Alive, true, 10_KB);
+ auto states1 = GetStates(erasure, EOnline::Alive, true, false, 10_KB);
states1[0].Online = EOnline::Dead;
- auto states2 = GetStates(erasure, EOnline::Alive, true, 100_B);
+ auto states2 = GetStates(erasure, EOnline::Alive, true, false, 100_B);
states2[0].Online = EOnline::Dead;
Test(erasure, states1, states2, true);
}
diff --git a/ydb/core/blobstorage/ut_vdisk/lib/test_synclog.cpp b/ydb/core/blobstorage/ut_vdisk/lib/test_synclog.cpp
index a65a23a669d..b0d0ffc87a9 100644
--- a/ydb/core/blobstorage/ut_vdisk/lib/test_synclog.cpp
+++ b/ydb/core/blobstorage/ut_vdisk/lib/test_synclog.cpp
@@ -223,6 +223,7 @@ class TSyncLogTestWriteActor : public TActorBootstrapped<TSyncLogTestWriteActor>
TestCtx->LoggerId,
LogCutterId,
TActorId{},
+ TActorId{},
VDiskConfig->SyncLogMaxDiskAmount,
VDiskConfig->SyncLogMaxEntryPointSize,
VDiskConfig->SyncLogMaxMemAmount,
@@ -230,6 +231,7 @@ class TSyncLogTestWriteActor : public TActorBootstrapped<TSyncLogTestWriteActor>
Db->SyncLogFirstLsnToKeep,
false,
TControlWrapper(0, 0, 1),
+ false,
TControlWrapper(20'000'000, 1, 100'000'000'000));
TestCtx->SyncLogId = ctx.Register(CreateSyncLogActor(slCtx, Conf->GroupInfo, TestCtx->SelfVDiskId, std::move(repaired)));
// Send Db birth lsn
diff --git a/ydb/core/blobstorage/vdisk/chunk_keeper/chunk_keeper_events.cpp b/ydb/core/blobstorage/vdisk/chunk_keeper/chunk_keeper_events.cpp
index a7514929a86..242e050a8b6 100644
--- a/ydb/core/blobstorage/vdisk/chunk_keeper/chunk_keeper_events.cpp
+++ b/ydb/core/blobstorage/vdisk/chunk_keeper/chunk_keeper_events.cpp
@@ -13,6 +13,23 @@ TEvChunkKeeperAllocateResult::TEvChunkKeeperAllocateResult(std::optional<ui32> c
, ErrorReason(errorReason)
{}
+TString TEvChunkKeeperAllocateResult::ToString() const {
+ TStringStream str;
+ str << "TEvChunkKeeperAllocateResult{";
+ str << " ChunkIdx# ";
+ if (ChunkIdx) {
+ str << *ChunkIdx;
+ } else {
+ str << "<nullopt>";
+ }
+ str << " Status# " << NKikimrProto::EReplyStatus_Name(Status);
+ if (!ErrorReason.empty()) {
+ str << " ErrorReason# " << ErrorReason;
+ }
+ str << " }";
+ return str.Str();
+}
+
TEvChunkKeeperFree::TEvChunkKeeperFree(ui32 chunkIdx, TSubsystem subsystem)
: ChunkIdx(chunkIdx)
, Subsystem(subsystem)
@@ -26,6 +43,18 @@ TEvChunkKeeperFreeResult::TEvChunkKeeperFreeResult(ui32 chunkIdx, NKikimrProto::
, ErrorReason(errorReason)
{}
+TString TEvChunkKeeperFreeResult::ToString() const {
+ TStringStream str;
+ str << "TEvChunkKeeperFreeResult{";
+ str << " ChunkIdx# " << ChunkIdx;
+ str << " Status# " << NKikimrProto::EReplyStatus_Name(Status);
+ if (!ErrorReason.empty()) {
+ str << " ErrorReason# " << ErrorReason;
+ }
+ str << " }";
+ return str.Str();
+}
+
TEvChunkKeeperDiscover::TEvChunkKeeperDiscover(TSubsystem subsystem)
: Subsystem(subsystem)
{}
@@ -38,4 +67,21 @@ TEvChunkKeeperDiscoverResult::TEvChunkKeeperDiscoverResult(std::vector<TChunkInf
, ErrorReason(errorReason)
{}
+TString TEvChunkKeeperDiscoverResult::ToString() const {
+ TStringStream str;
+ str << "TEvChunkKeeperDiscoverResult{";
+ str << " Status# " << NKikimrProto::EReplyStatus_Name(Status);
+ if (!ErrorReason.empty()) {
+ str << " ErrorReason# " << ErrorReason;
+ }
+ str << " ChunkCount# " << Chunks.size();
+ str << " Chunks# [";
+ for (const TChunkInfo& chunk : Chunks) {
+ str << " { ChunkIdx# " << chunk.ChunkIdx;
+ str << " ShredRequested# " << chunk.ShredRequested << " }";
+ }
+ str << " ] }";
+ return str.Str();
+}
+
} // namespace NKikimr
diff --git a/ydb/core/blobstorage/vdisk/chunk_keeper/chunk_keeper_events.h b/ydb/core/blobstorage/vdisk/chunk_keeper/chunk_keeper_events.h
index 7e27de8f2f0..03ca7197601 100644
--- a/ydb/core/blobstorage/vdisk/chunk_keeper/chunk_keeper_events.h
+++ b/ydb/core/blobstorage/vdisk/chunk_keeper/chunk_keeper_events.h
@@ -49,6 +49,8 @@ struct TEvChunkKeeperAllocateResult : TEventLocal<TEvChunkKeeperAllocateResult,
TEvChunkKeeperAllocateResult(std::optional<ui32> chunkIdx, NKikimrProto::EReplyStatus status,
TString errorReason = "");
+
+ TString ToString() const;
};
struct TEvChunkKeeperFree : TEventLocal<TEvChunkKeeperFree, TEvBlobStorage::EvChunkKeeperFree> {
@@ -66,6 +68,8 @@ struct TEvChunkKeeperFreeResult : TEventLocal<TEvChunkKeeperFreeResult, TEvBlobS
TString ErrorReason;
TEvChunkKeeperFreeResult(ui32 chunkIdx, NKikimrProto::EReplyStatus status, TString errorReason = "");
+
+ TString ToString() const;
};
struct TEvChunkKeeperDiscover : TEventLocal<TEvChunkKeeperDiscover, TEvBlobStorage::EvChunkKeeperDiscover> {
@@ -90,6 +94,8 @@ struct TEvChunkKeeperDiscoverResult : TEventLocal<TEvChunkKeeperDiscoverResult,
TEvChunkKeeperDiscoverResult(std::vector<TChunkInfo>&& chunks, NKikimrProto::EReplyStatus status,
TString errorReason = "");
+
+ TString ToString() const;
};
} // namespace NKikimr
diff --git a/ydb/core/blobstorage/vdisk/common/vdisk_config.h b/ydb/core/blobstorage/vdisk/common/vdisk_config.h
index 330cabb7378..3b6e7f7fc13 100644
--- a/ydb/core/blobstorage/vdisk/common/vdisk_config.h
+++ b/ydb/core/blobstorage/vdisk/common/vdisk_config.h
@@ -278,6 +278,7 @@ namespace NKikimr {
///////////// SYNC SETTINGS //////////////////
TControlWrapper MaxInProgressSyncCount;
TControlWrapper EnablePhantomFlagStorage;
+ bool EnablePersistentPhantomFlagStorage = false;
TControlWrapper PhantomFlagStorageLimit;
///////////// CHUNK Keeper //////////////////
diff --git a/ydb/core/blobstorage/vdisk/skeleton/blobstorage_skeleton.cpp b/ydb/core/blobstorage/vdisk/skeleton/blobstorage_skeleton.cpp
index 83eac9226e5..c4632041986 100644
--- a/ydb/core/blobstorage/vdisk/skeleton/blobstorage_skeleton.cpp
+++ b/ydb/core/blobstorage/vdisk/skeleton/blobstorage_skeleton.cpp
@@ -2038,6 +2038,7 @@ namespace NKikimr {
Db->LoggerID,
Db->LogCutterID,
Db->SkeletonID,
+ Db->ChunkKeeperActorID,
Config->SyncLogMaxDiskAmount,
Config->SyncLogMaxEntryPointSize,
Config->SyncLogMaxMemAmount,
@@ -2045,6 +2046,7 @@ namespace NKikimr {
Db->SyncLogFirstLsnToKeep,
Config->BaseInfo.ReadOnly,
Config->EnablePhantomFlagStorage,
+ Config->EnablePersistentPhantomFlagStorage,
Config->PhantomFlagStorageLimit);
Db->SyncLogID.Set(ctx.Register(CreateSyncLogActor(slCtx, GInfo, SelfVDiskId, std::move(repairedSyncLog))));
ActiveActors.Insert(Db->SyncLogID, __FILE__, __LINE__, ctx, NKikimrServices::BLOBSTORAGE); // keep forever
diff --git a/ydb/core/blobstorage/vdisk/synclog/blobstorage_synclog_context.h b/ydb/core/blobstorage/vdisk/synclog/blobstorage_synclog_context.h
index 56a7d8156f1..0e5ba1062fe 100644
--- a/ydb/core/blobstorage/vdisk/synclog/blobstorage_synclog_context.h
+++ b/ydb/core/blobstorage/vdisk/synclog/blobstorage_synclog_context.h
@@ -45,6 +45,7 @@ public:
const TActorId LoggerID;
const TActorId LogCutterID;
const TActorId SkeletonId;
+ const TActorId ChunkKeeperId;
const ui64 SyncLogMaxDiskAmount;
const ui64 SyncLogMaxEntryPointSize;
@@ -59,6 +60,7 @@ public:
const bool IsReadOnlyVDisk;
TControlWrapper EnablePhantomFlagStorage;
+ bool EnablePersistentPhantomFlagStorage;
TControlWrapper PhantomFlagStorageLimit;
TSyncLogCtx(TIntrusivePtr<TVDiskContext> vctx,
@@ -67,6 +69,7 @@ public:
const TActorId &loggerId,
const TActorId &logCutterId,
const TActorId& skeletonId,
+ const TActorId& chunkKeeperId,
ui64 syncLogMaxDiskAmount,
ui64 syncLogMaxEntryPointSize,
ui64 syncLogMaxMemAmount,
@@ -74,6 +77,7 @@ public:
std::shared_ptr<TSyncLogFirstLsnToKeep> syncLogFirstLsnToKeep,
bool isReadOnlyVDisk,
const TControlWrapper& enablePhantomFlagStorage,
+ bool enablePersistentPhantomFlagStorage,
const TControlWrapper& phantomFlagStorageLimit)
: VCtx(std::move(vctx))
, LsnMngr(std::move(lsnMngr))
@@ -81,6 +85,7 @@ public:
, LoggerID(loggerId)
, LogCutterID(logCutterId)
, SkeletonId(skeletonId)
+ , ChunkKeeperId(chunkKeeperId)
, SyncLogMaxDiskAmount(syncLogMaxDiskAmount)
, SyncLogMaxEntryPointSize(syncLogMaxEntryPointSize)
, SyncLogMaxMemAmount(syncLogMaxMemAmount)
@@ -91,6 +96,7 @@ public:
, PhantomFlagStorageGroup(VCtx->VDiskCounters, "subsystem", "phantomflagstorage")
, IsReadOnlyVDisk(isReadOnlyVDisk)
, EnablePhantomFlagStorage(enablePhantomFlagStorage)
+ , EnablePersistentPhantomFlagStorage(enablePersistentPhantomFlagStorage)
, PhantomFlagStorageLimit(phantomFlagStorageLimit)
{}
};
diff --git a/ydb/core/blobstorage/vdisk/synclog/blobstorage_synclog_private_events.cpp b/ydb/core/blobstorage/vdisk/synclog/blobstorage_synclog_private_events.cpp
index bafc233992c..31a1386349e 100644
--- a/ydb/core/blobstorage/vdisk/synclog/blobstorage_synclog_private_events.cpp
+++ b/ydb/core/blobstorage/vdisk/synclog/blobstorage_synclog_private_events.cpp
@@ -13,6 +13,16 @@ namespace NKikimr {
TEvSyncLogSnapshotResult::~TEvSyncLogSnapshotResult() = default;
+ TEvPhantomFlagStorageWriteItems::TEvPhantomFlagStorageWriteItems(
+ std::vector<TPhantomFlagStorageItem>&& items)
+ : Items(std::move(items))
+ {}
+
+ TEvPhantomFlagStorageCommitData::TEvPhantomFlagStorageCommitData(
+ const std::optional<TPhantomFlagStorageData>& data)
+ : Data(data)
+ {}
+
} // NSyncLog
} // NKikimr
diff --git a/ydb/core/blobstorage/vdisk/synclog/blobstorage_synclog_private_events.h b/ydb/core/blobstorage/vdisk/synclog/blobstorage_synclog_private_events.h
index e64631d00db..8c7995e976a 100644
--- a/ydb/core/blobstorage/vdisk/synclog/blobstorage_synclog_private_events.h
+++ b/ydb/core/blobstorage/vdisk/synclog/blobstorage_synclog_private_events.h
@@ -3,6 +3,7 @@
#include "defs.h"
#include "blobstorage_synclogdata.h"
#include <ydb/core/blobstorage/groupinfo/blobstorage_groupinfo.h>
+#include <ydb/core/blobstorage/vdisk/synclog/phantom_flag_storage/phantom_flag_storage_data.h>
#include <ydb/core/blobstorage/vdisk/synclog/phantom_flag_storage/phantom_flag_storage_snapshot.h>
#include <ydb/core/base/blobstorage.h>
@@ -113,5 +114,28 @@ namespace NKikimr {
, SyncedLsn(syncedLsn)
{}
};
+
+ struct TEvPhantomFlagStorageWriteItems
+ : public TEventLocal<TEvPhantomFlagStorageWriteItems,
+ TEvBlobStorage::EvPhantomFlagStorageWriteItems>
+ {
+ TEvPhantomFlagStorageWriteItems(std::vector<TPhantomFlagStorageItem>&& items);
+
+ std::vector<TPhantomFlagStorageItem> Items;
+ };
+
+ struct TEvPhantomFlagStorageCommitData
+ : public TEventLocal<TEvPhantomFlagStorageCommitData,
+ TEvBlobStorage::EvPhantomFlagStorageCommitData>
+ {
+ TEvPhantomFlagStorageCommitData(const std::optional<TPhantomFlagStorageData>& data);
+
+ std::optional<TPhantomFlagStorageData> Data;
+ };
+
+ struct TEvPhantomFlagStorageDrop
+ : public TEventLocal<TEvPhantomFlagStorageDrop,
+ TEvBlobStorage::EvPhantomFlagStorageDrop>
+ {};
} // NSyncLog
} // NKikimr
diff --git a/ydb/core/blobstorage/vdisk/synclog/blobstorage_synclogdata.cpp b/ydb/core/blobstorage/vdisk/synclog/blobstorage_synclogdata.cpp
index 9a86377809e..06c2506f407 100644
--- a/ydb/core/blobstorage/vdisk/synclog/blobstorage_synclogdata.cpp
+++ b/ydb/core/blobstorage/vdisk/synclog/blobstorage_synclogdata.cpp
@@ -111,13 +111,15 @@ namespace NKikimr {
ui64 logStartLsn,
ui32 appendBlockSize,
const TEntryPointDbgInfo &lastEntryPointDbgInfo,
- const TSyncLogHeader &header)
+ const TSyncLogHeader &header,
+ const std::optional<TPhantomFlagStorageData>& phantomFlagStorageData)
: DiskSnapPtr(diskSnapPtr)
, MemSnapPtr(memSnapPtr)
, LogStartLsn(logStartLsn)
, AppendBlockSize(appendBlockSize)
, LastEntryPointDbgInfo(lastEntryPointDbgInfo)
, Header(header)
+ , PhantomFlagStorageData(phantomFlagStorageData)
{
CheckSnapshotConsistency(); // For debug
}
@@ -146,6 +148,10 @@ namespace NKikimr {
pb.SetPDiskGuid(Header.PDiskGuid);
pb.SetVDiskIncarnationGuid(Header.VDiskIncarnationGuid);
pb.SetLogStartLsn(LogStartLsn);
+ if (PhantomFlagStorageData) {
+ auto* data = pb.MutablePhantomFlagStorageData();
+ PhantomFlagStorageData->Serialize(data);
+ }
// DiskRecLog
TStringStream s;
ui32 indexRecsNum = DiskSnapPtr->Serialize(s, delta);
@@ -276,7 +282,8 @@ namespace NKikimr {
LogStartLsn,
DiskRecLog.AppendBlockSize,
LastEntryPointDbgInfo,
- Header));
+ Header,
+ PhantomFlagStorageData));
}
bool TSyncLog::CheckMemAndDiskRecLogsDoNotIntersect() const {
@@ -398,6 +405,23 @@ namespace NKikimr {
}
////////////////////////////////////////////////////////////////////////
+ // TSyncLog: PhantomFlagStorageData
+ ////////////////////////////////////////////////////////////////////////
+ void TSyncLog::UpdatePhantomFlagStorageData(std::optional<TPhantomFlagStorageData>&& data) {
+ PhantomFlagStorageData = std::move(data);
+ }
+
+ TPhantomFlagStorageData TSyncLog::GetPhantomFlagStorageData() const {
+ TPhantomFlagStorageData res;
+ if (PhantomFlagStorageData) {
+ res = *PhantomFlagStorageData;
+ } else {
+ res.ChunkSize = GetChunkSize();
+ }
+ return res;
+ }
+
+ ////////////////////////////////////////////////////////////////////////
// TSyncLog: PRIVATE
////////////////////////////////////////////////////////////////////////
TSyncLog::TSyncLog(const TSyncLogHeader &header,
@@ -505,6 +529,14 @@ namespace NKikimr {
ChunksToDelete.push_back(pb.GetChunksToDeleteDelayed(i));
}
+ if (pb.HasPhantomFlagStorageData()) {
+ TPhantomFlagStorageData data;
+ data.Deserialize(pb.GetPhantomFlagStorageData());
+ SyncLogPtr->UpdatePhantomFlagStorageData(std::move(data));
+ } else {
+ SyncLogPtr->UpdatePhantomFlagStorageData(std::nullopt);
+ }
+
return true;
}
diff --git a/ydb/core/blobstorage/vdisk/synclog/blobstorage_synclogdata.h b/ydb/core/blobstorage/vdisk/synclog/blobstorage_synclogdata.h
index de296a09418..baa24686792 100644
--- a/ydb/core/blobstorage/vdisk/synclog/blobstorage_synclogdata.h
+++ b/ydb/core/blobstorage/vdisk/synclog/blobstorage_synclogdata.h
@@ -7,6 +7,7 @@
#include <ydb/core/blobstorage/groupinfo/blobstorage_groupinfo.h>
#include <ydb/core/blobstorage/vdisk/common/blobstorage_vdisk_guids.h>
+#include <ydb/core/blobstorage/vdisk/synclog/phantom_flag_storage/phantom_flag_storage_data.h>
#include <ydb/core/protos/blobstorage_vdisk_internal.pb.h>
#include <ydb/core/base/blobstorage.h>
@@ -105,6 +106,7 @@ namespace NKikimr {
const ui32 AppendBlockSize;
const TEntryPointDbgInfo LastEntryPointDbgInfo;
const TSyncLogHeader Header;
+ const std::optional<TPhantomFlagStorageData> PhantomFlagStorageData;
private:
TSyncLogSnapshot(TDiskRecLogSnapshotPtr diskSnapPtr,
@@ -112,7 +114,8 @@ namespace NKikimr {
ui64 logStartLsn,
ui32 appendBlockSize,
const TEntryPointDbgInfo &lastEntryPointDbgInfo,
- const TSyncLogHeader &header);
+ const TSyncLogHeader &header,
+ const std::optional<TPhantomFlagStorageData>& phantomFlagStorageData);
friend class TSyncLog;
};
@@ -239,6 +242,12 @@ namespace NKikimr {
// returns chunks owned by DiskRecLog
void GetOwnedChunks(TSet<TChunkIdx>& chunks) const;
+ ////////////////////////////////////////////////////////////////////////
+ // PhantomFlagStorage
+ ////////////////////////////////////////////////////////////////////////
+ void UpdatePhantomFlagStorageData(std::optional<TPhantomFlagStorageData>&& data);
+ TPhantomFlagStorageData GetPhantomFlagStorageData() const;
+
private:
// part of the log on disk
TDiskRecLog DiskRecLog;
@@ -249,6 +258,7 @@ namespace NKikimr {
ui64 LogStartLsn;
// info about last serialized data written to the log as an entry point
TEntryPointDbgInfo LastEntryPointDbgInfo;
+ std::optional<TPhantomFlagStorageData> PhantomFlagStorageData;
TSyncLog(const TSyncLogHeader &header,
TDiskRecLog &&diskRecLog,
diff --git a/ydb/core/blobstorage/vdisk/synclog/blobstorage_synclogformat.h b/ydb/core/blobstorage/vdisk/synclog/blobstorage_synclogformat.h
index 65371506729..3609d3b508d 100644
--- a/ydb/core/blobstorage/vdisk/synclog/blobstorage_synclogformat.h
+++ b/ydb/core/blobstorage/vdisk/synclog/blobstorage_synclogformat.h
@@ -20,6 +20,8 @@ namespace NKikimr {
ui64 Raw[3]; // TLogoBlobID
TIngress Ingress;
+ TLogoBlobRec() : TLogoBlobRec(TLogoBlobID(0, 0, 0), 0) {}
+
explicit TLogoBlobRec(const TLogoBlobID &id, ui64 ingressRaw)
: Ingress(ingressRaw)
{
diff --git a/ydb/core/blobstorage/vdisk/synclog/blobstorage_synclogkeeper.cpp b/ydb/core/blobstorage/vdisk/synclog/blobstorage_synclogkeeper.cpp
index 38d2e72cadf..69af7875837 100644
--- a/ydb/core/blobstorage/vdisk/synclog/blobstorage_synclogkeeper.cpp
+++ b/ydb/core/blobstorage/vdisk/synclog/blobstorage_synclogkeeper.cpp
@@ -230,6 +230,7 @@ namespace NKikimr {
if (CommitterId) {
ctx.Send(CommitterId, new TEvents::TEvPoisonPill());
}
+ KeepState.Terminate();
Die(ctx);
}
@@ -255,7 +256,13 @@ namespace NKikimr {
}
void Handle(const TEvPhantomFlagStorageGetSnapshot::TPtr& ev) {
- Send(ev->Sender, new TEvPhantomFlagStorageGetSnapshotResult(KeepState.GetPhantomFlagStorageSnapshot()));
+ KeepState.RequestPhantomFlagStorageSnapshot(ev);
+ }
+
+ void Handle(const TEvPhantomFlagStorageGetSnapshotResult::TPtr& ev) {
+ // This actor only requests PhantomFlagStorage snapshot on restart
+ // to rebuild ThresholdsStructure
+ KeepState.RecoverPhantomFlagStorage(std::move(ev->Get()->Snapshot));
}
void Handle(const TEvLocalSyncData::TPtr& ev) {
@@ -268,8 +275,13 @@ namespace NKikimr {
KeepState.UpdateNeighbourSyncedLsn(ev->Get()->OrderNumber, ev->Get()->SyncedLsn);
}
+ void Handle(TEvPhantomFlagStorageCommitData::TPtr ev) {
+ KeepState.UpdatePhantomFlagStorageData(std::move(ev->Get()->Data));
+ }
+
void UpdateCounters() {
KeepState.UpdateMetrics();
+ KeepState.FlushPhantomFlagStorageWriteBufferIfNeeded();
Schedule(TDuration::Seconds(15), new TEvents::TEvWakeup);
}
@@ -287,6 +299,7 @@ namespace NKikimr {
HFunc(TEvListChunks, Handle)
hFunc(TEvPhantomFlagStorageFinishBuilder, Handle)
hFunc(TEvPhantomFlagStorageGetSnapshot, Handle)
+ hFunc(TEvPhantomFlagStorageCommitData, Handle)
hFunc(TEvLocalSyncData, Handle)
hFunc(TEvSyncLogUpdateNeighbourSyncedLsn, Handle)
cFunc(TEvents::TEvWakeup::EventType, UpdateCounters)
diff --git a/ydb/core/blobstorage/vdisk/synclog/blobstorage_synclogkeeper_state.cpp b/ydb/core/blobstorage/vdisk/synclog/blobstorage_synclogkeeper_state.cpp
index f9351a7fd27..cd679920046 100644
--- a/ydb/core/blobstorage/vdisk/synclog/blobstorage_synclogkeeper_state.cpp
+++ b/ydb/core/blobstorage/vdisk/synclog/blobstorage_synclogkeeper_state.cpp
@@ -45,6 +45,7 @@ namespace NKikimr {
, NeedsInitialCommit(repaired->NeedsInitialCommit)
, PhantomFlagStorageState(SlCtx)
, EnablePhantomFlagStorage(SlCtx->EnablePhantomFlagStorage)
+ , EnablePersistentPhantomFlagStorage(SlCtx->EnablePersistentPhantomFlagStorage)
, PhantomFlagStorageLimit(SlCtx->PhantomFlagStorageLimit)
, SelfOrderNumber(SlCtx->VCtx->Top->GetOrderNumber(SlCtx->VCtx->ShortSelfVDisk))
{
@@ -56,6 +57,19 @@ namespace NKikimr {
SyncedMask.set(SelfOrderNumber, true);
}
+ void TSyncLogKeeperState::Init(std::shared_ptr<IActorNotify> notifier,
+ std::shared_ptr<ILoggerCtx> loggerCtx, const TActorId& selfId) {
+ Notifier = std::move(notifier);
+ LoggerCtx = std::move(loggerCtx);
+ SelfId = selfId;
+
+ if (EnablePersistentPhantomFlagStorage) {
+ TPhantomFlagStorageData phantomFlagStorageData = SyncLogPtr->GetPhantomFlagStorageData();
+ PhantomFlagStorageState.InitializePersistent(std::move(phantomFlagStorageData), SelfId,
+ SlCtx->ChunkKeeperId, SyncLogPtr->GetAppendBlockSize());
+ }
+ }
+
// Calculate first lsn in recovery log we must keep
ui64 TSyncLogKeeperState::CalculateFirstLsnToKeep() const {
// calculate first lsn for data
@@ -460,11 +474,15 @@ namespace NKikimr {
}
void TSyncLogKeeperState::FinishPhantomFlagStorageBuilder(TPhantomFlags&& flags, TPhantomFlagThresholds&& thresholds) {
- PhantomFlagStorageState.FinishBuilding(std::move(flags), std::move(thresholds), PhantomFlagStorageLimit);
+ PhantomFlagStorageState.FinishInitialBuilding(std::move(flags), std::move(thresholds), PhantomFlagStorageLimit);
+ }
+
+ void TSyncLogKeeperState::RecoverPhantomFlagStorage(TPhantomFlagStorageSnapshot&& snapshot) {
+ PhantomFlagStorageState.Recover(std::move(snapshot));
}
- TPhantomFlagStorageSnapshot TSyncLogKeeperState::GetPhantomFlagStorageSnapshot() const {
- return PhantomFlagStorageState.GetSnapshot();
+ void TSyncLogKeeperState::RequestPhantomFlagStorageSnapshot(TEvPhantomFlagStorageGetSnapshot::TPtr request) const {
+ PhantomFlagStorageState.RequestSnapshot(request);
}
void TSyncLogKeeperState::ProcessLocalSyncData(ui32 orderNumber, const TString& data) {
@@ -480,7 +498,9 @@ namespace NKikimr {
if (EnablePhantomFlagStorage) {
PhantomFlagStorageState.UpdateSyncedMask(SyncedMask);
- if (!chunks.empty() && !PhantomFlagStorageState.IsActive() && SelfId != TActorId{}) {
+ if (PhantomFlagStorageState.IsActive()) {
+ PhantomFlagStorageState.SyncLogIsCut();
+ } else if (!chunks.empty() && SelfId != TActorId{}) {
PhantomFlagStorageState.StartBuilding();
TActivationContext::Register(CreatePhantomFlagStorageBuilderActor(SlCtx, SelfId, snapshot));
}
@@ -498,5 +518,17 @@ namespace NKikimr {
SlCtx->PhantomFlagStorageGroup.SyncedMask() = SyncedMask.to_ullong();
}
+ void TSyncLogKeeperState::UpdatePhantomFlagStorageData(std::optional<TPhantomFlagStorageData>&& data) {
+ SyncLogPtr->UpdatePhantomFlagStorageData(std::move(data));
+ }
+
+ void TSyncLogKeeperState::FlushPhantomFlagStorageWriteBufferIfNeeded() {
+ PhantomFlagStorageState.FlushWriteBufferIfNeeded();
+ }
+
+ void TSyncLogKeeperState::Terminate() {
+ PhantomFlagStorageState.Terminate();
+ }
+
} // NSyncLog
} // NKikimr
diff --git a/ydb/core/blobstorage/vdisk/synclog/blobstorage_synclogkeeper_state.h b/ydb/core/blobstorage/vdisk/synclog/blobstorage_synclogkeeper_state.h
index f538111a84f..f9a0f829a06 100644
--- a/ydb/core/blobstorage/vdisk/synclog/blobstorage_synclogkeeper_state.h
+++ b/ydb/core/blobstorage/vdisk/synclog/blobstorage_synclogkeeper_state.h
@@ -42,11 +42,7 @@ namespace NKikimr {
ui64 syncLogMaxEntryPointSize);
void Init(std::shared_ptr<IActorNotify> notifier, std::shared_ptr<ILoggerCtx> loggerCtx,
- const TActorId& selfId) {
- Notifier = std::move(notifier);
- LoggerCtx = std::move(loggerCtx);
- SelfId = selfId;
- }
+ const TActorId& selfId);
bool HasDelayedActions() const { return DelayedActions.HasActions(); }
bool GetDeleteChunkAndClear() { return std::exchange(DelayedActions.DeleteChunk, false); }
@@ -92,15 +88,20 @@ namespace NKikimr {
// Add flags from cut sync log snapshot
void FinishPhantomFlagStorageBuilder(TPhantomFlags&& flags, TPhantomFlagThresholds&& thresholds);
- TPhantomFlagStorageSnapshot GetPhantomFlagStorageSnapshot() const;
+ void RecoverPhantomFlagStorage(TPhantomFlagStorageSnapshot&& snapshot);
+ void RequestPhantomFlagStorageSnapshot(TEvPhantomFlagStorageGetSnapshot::TPtr request) const;
+ void UpdatePhantomFlagStorageData(std::optional<TPhantomFlagStorageData>&& data);
void ProcessLocalSyncData(ui32 orderNumber, const TString& data);
void UpdateMetrics();
+ void FlushPhantomFlagStorageWriteBufferIfNeeded();
TVector<ui32> GetChunksToForget() {
return std::exchange(ChunksToForget, {});
}
+ void Terminate();
+
private:
// VDisk Context
TIntrusivePtr<TSyncLogCtx> SlCtx;
@@ -138,6 +139,7 @@ namespace NKikimr {
// phantom flag storage
TPhantomFlagStorageState PhantomFlagStorageState;
TMemorizableControlWrapper EnablePhantomFlagStorage;
+ bool EnablePersistentPhantomFlagStorage;
TMemorizableControlWrapper PhantomFlagStorageLimit;
ui32 SelfOrderNumber;
diff --git a/ydb/core/blobstorage/vdisk/synclog/blobstorage_synclogkeeper_ut.cpp b/ydb/core/blobstorage/vdisk/synclog/blobstorage_synclogkeeper_ut.cpp
index a03bc4970d9..97b136c2c48 100644
--- a/ydb/core/blobstorage/vdisk/synclog/blobstorage_synclogkeeper_ut.cpp
+++ b/ydb/core/blobstorage/vdisk/synclog/blobstorage_synclogkeeper_ut.cpp
@@ -110,6 +110,7 @@ namespace NKikimr {
TActorId{},
TActorId{},
TActorId{},
+ TActorId{},
syncLogMaxDiskAmount,
syncLogMaxEntryPointSize,
syncLogMaxMemAmount,
@@ -117,6 +118,7 @@ namespace NKikimr {
nullptr,
false,
TControlWrapper(0, 0, 1),
+ false,
TControlWrapper(20'000'000, 1, 100'000'000'000));
State = std::make_unique<TSyncLogKeeperState>(slCtx, std::move(repaired), syncLogMaxMemAmount, syncLogMaxDiskAmount,
diff --git a/ydb/core/blobstorage/vdisk/synclog/phantom_flag_storage/phantom_flag_storage_data.cpp b/ydb/core/blobstorage/vdisk/synclog/phantom_flag_storage/phantom_flag_storage_data.cpp
new file mode 100644
index 00000000000..5d2fa7418e4
--- /dev/null
+++ b/ydb/core/blobstorage/vdisk/synclog/phantom_flag_storage/phantom_flag_storage_data.cpp
@@ -0,0 +1,161 @@
+#include "phantom_flag_storage_data.h"
+
+#include <util/generic/overloaded.h>
+
+namespace NKikimr::NSyncLog {
+
+TPhantomFlagStorageItem TPhantomFlagStorageItem::CreateSkip(ui32 skipSize) {
+ TPhantomFlagStorageItem res;
+ res.Data.emplace<TSkip>(skipSize);
+ return res;
+}
+
+TPhantomFlagStorageItem TPhantomFlagStorageItem::CreateFlag(const TLogoBlobRec* blobRec) {
+ TPhantomFlagStorageItem res;
+ res.Data.emplace<TFlag>(*blobRec);
+ return res;
+}
+
+TPhantomFlagStorageItem TPhantomFlagStorageItem::CreateThreshold(ui32 orderNumber,
+ ui64 tabletId, ui8 channel, ui32 generation, ui32 step) {
+ TPhantomFlagStorageItem res;
+ res.Data.emplace<TThreshold>(tabletId, channel, generation, step, orderNumber);
+ return res;
+}
+
+TPhantomFlagStorageItem TPhantomFlagStorageItem::CreateThreshold(ui32 orderNumber,
+ const TLogoBlobID& blobId) {
+ TPhantomFlagStorageItem res;
+ res.Data.emplace<TThreshold>(blobId.TabletID(), blobId.Channel(), blobId.Generation(),
+ blobId.Step(), orderNumber);
+ return res;
+}
+
+EPhantomFlagStorageItem TPhantomFlagStorageItem::GetType() const {
+ EPhantomFlagStorageItem res = EPhantomFlagStorageItem::Skip;
+ std::visit(TOverloaded{
+ [&](const std::monostate&) {},
+ [&](const TSkip&) {},
+ [&](const TFlag&) { res = EPhantomFlagStorageItem::Flag; },
+ [&](const TThreshold&) { res = EPhantomFlagStorageItem::Threshold; }
+ }, Data);
+ return res;
+}
+
+TPhantomFlagStorageItem::TSkip TPhantomFlagStorageItem::GetSkip() const {
+ return std::get<TSkip>(Data);
+}
+
+TPhantomFlagStorageItem::TThreshold TPhantomFlagStorageItem::GetThreshold() const {
+ return std::get<TThreshold>(Data);
+}
+
+TPhantomFlagStorageItem::TFlag TPhantomFlagStorageItem::GetFlag() const {
+ return std::get<TFlag>(Data);
+}
+
+void TPhantomFlagStorageItem::Serialize(TString* buffer) const {
+ std::visit(TOverloaded{
+ [&](const std::monostate&) {},
+ [&](const TSkip& skip) {
+ constexpr static EPhantomFlagStorageItem type = EPhantomFlagStorageItem::Skip;
+ if (skip.Size >= sizeof(type) + sizeof(skip.Size)) {
+ buffer->append(reinterpret_cast<const char*>(&type), sizeof(type));
+ buffer->append(reinterpret_cast<const char*>(&skip), sizeof(skip));
+ } else {
+ TString zeros(skip.Size, '\0');
+ buffer->append(zeros.data(), skip.Size);
+ }
+ },
+ [&](const TFlag& flag) {
+ constexpr static EPhantomFlagStorageItem type = EPhantomFlagStorageItem::Flag;
+ buffer->append(reinterpret_cast<const char*>(&type), sizeof(type));
+ buffer->append(reinterpret_cast<const char*>(&flag), sizeof(flag));
+ },
+ [&](const TThreshold& threshold) {
+ constexpr static EPhantomFlagStorageItem type = EPhantomFlagStorageItem::Threshold;
+ buffer->append(reinterpret_cast<const char*>(&type), sizeof(type));
+ buffer->append(reinterpret_cast<const char*>(&threshold), sizeof(threshold));
+ },
+ }, Data);
+}
+
+TPhantomFlagStorageItem TPhantomFlagStorageItem::DeserializeFromRaw(const char* data) {
+ const EPhantomFlagStorageItem type = *reinterpret_cast<const EPhantomFlagStorageItem*>(data);
+ data += sizeof(type);
+ switch (type) {
+ case EPhantomFlagStorageItem::SkipOneByte: {
+ return TPhantomFlagStorageItem::CreateSkip(1);
+ }
+ case EPhantomFlagStorageItem::Flag: {
+ const TLogoBlobRec rec = ReadUnaligned<TLogoBlobRec>(data);
+ return TPhantomFlagStorageItem::CreateFlag(&rec);
+ }
+ case EPhantomFlagStorageItem::Threshold: {
+ const TThreshold threshold = ReadUnaligned<TThreshold>(data);
+ return TPhantomFlagStorageItem::CreateThreshold(threshold.OrderNumber, threshold.TabletId,
+ threshold.Channel, threshold.Generation, threshold.Step);
+ }
+ case EPhantomFlagStorageItem::Skip:
+ default: {
+ const ui32 size = ReadUnaligned<ui32>(data);
+ return TPhantomFlagStorageItem::CreateSkip(size);
+ }
+ }
+}
+
+ui32 TPhantomFlagStorageItem::SerializedSize() const {
+ ui32 res = 0;
+ std::visit(TOverloaded{
+ [&](const std::monostate&) { },
+ [&](const TSkip& skip) { res = skip.Size; },
+ [&](const TFlag&) { res = sizeof(EPhantomFlagStorageItem) + sizeof(TFlag); },
+ [&](const TThreshold&) { res = sizeof(EPhantomFlagStorageItem) + sizeof(TThreshold); },
+ }, Data);
+ return res;
+}
+
+void TPhantomFlagStorageItem::AlignWriteBlock(TString* buffer, ui32 appendBlockSize, ui32 sizeLimit) {
+ Y_ABORT_UNLESS(buffer);
+ ui32 bufferSize = buffer->size();
+ Y_VERIFY_S(bufferSize <= sizeLimit, "BufferSize# " << bufferSize << " SizeLimit# " << sizeLimit);
+ if (buffer->size() % appendBlockSize == 0) {
+ // write block is already aligned
+ return;
+ }
+
+ ui32 fillSize = std::min(sizeLimit - bufferSize, appendBlockSize - bufferSize % appendBlockSize);
+ ui32 skipRecSize = sizeof(EPhantomFlagStorageItem) + sizeof(TSkip);
+ if (fillSize >= skipRecSize) {
+ // add one Skip record
+ TPhantomFlagStorageItem skip = CreateSkip(fillSize);
+ skip.Serialize(buffer);
+ fillSize -= skipRecSize;
+ }
+
+ // fill the rest with zeros
+ if (fillSize > 0) {
+ TString zeros(fillSize, '\0');
+ buffer->append(zeros.data(), fillSize);
+ }
+}
+
+void TPhantomFlagStorageData::Deserialize(const TPhantomFlagStorageDataProto& proto) {
+ ChunkSize = proto.GetChunkSize();
+ for (const auto& chunk : proto.GetChunks()) {
+ Chunks[chunk.GetChunkIdx()] = TChunk{
+ .DataSize = chunk.GetDataSize(),
+ };
+ }
+}
+
+void TPhantomFlagStorageData::Serialize(TPhantomFlagStorageDataProto* proto) const {
+ for (const auto& [chunkIdx, chunk] : Chunks) {
+ auto* chunkProto = proto->AddChunks();
+ chunkProto->SetChunkIdx(chunkIdx);
+ chunkProto->SetDataSize(chunk.DataSize);
+ }
+ proto->SetChunkSize(ChunkSize);
+}
+
+}
diff --git a/ydb/core/blobstorage/vdisk/synclog/phantom_flag_storage/phantom_flag_storage_data.h b/ydb/core/blobstorage/vdisk/synclog/phantom_flag_storage/phantom_flag_storage_data.h
new file mode 100644
index 00000000000..b23ca228fe1
--- /dev/null
+++ b/ydb/core/blobstorage/vdisk/synclog/phantom_flag_storage/phantom_flag_storage_data.h
@@ -0,0 +1,73 @@
+#pragma once
+
+#include "phantom_flag_thresholds.h"
+
+#include <ydb/core/blobstorage/vdisk/synclog/blobstorage_synclogformat.h>
+
+#include <ydb/core/protos/blobstorage_vdisk_internal.pb.h>
+
+namespace NKikimr::NSyncLog {
+
+using TPhantomFlagStorageDataProto = NKikimrVDiskData::TPhantomFlagStorageData;
+
+enum EPhantomFlagStorageItem : ui8 {
+ SkipOneByte = 0b00,
+ Flag = 0b01,
+ Threshold = 0b10,
+ Skip = 0b11,
+ // All types other than Flag and Threshold must contain ui32 field
+ // equal to the total serialized size of structure
+ // (including Type and Size) fields
+ // in the beginning of byte serialization, for compatibility reasons:
+ // when attempting to read unsupported field it fill be interpreted
+ // as Skip{ Size } and skipped
+ // SkipOneByte records are used to skip alignment blocks
+};
+
+class TPhantomFlagStorageItem {
+public:
+ struct TSkip {
+ ui32 Size;
+ };
+
+ struct TFlag {
+ TLogoBlobRec Record;
+ };
+
+ using TThreshold = TPhantomFlagThresholds::TThreshold;
+
+public:
+ static TPhantomFlagStorageItem CreateSkip(ui32 skipSize);
+ static TPhantomFlagStorageItem CreateFlag(const TLogoBlobRec* blobRec);
+ static TPhantomFlagStorageItem CreateThreshold(ui32 orderNumber, ui64 tabletId, ui8 channel,
+ ui32 generation, ui32 step);
+ static TPhantomFlagStorageItem CreateThreshold(ui32 orderNumber, const TLogoBlobID& blobId);
+
+ static TPhantomFlagStorageItem DeserializeFromRaw(const char* data);
+ void Serialize(TString* buffer) const;
+ ui32 SerializedSize() const;
+ static void AlignWriteBlock(TString* buffer, ui32 appendBlockSize, ui32 sizeLimit);
+
+ EPhantomFlagStorageItem GetType() const;
+
+ TSkip GetSkip() const;
+ TThreshold GetThreshold() const;
+ TFlag GetFlag() const;
+
+private:
+ std::variant<TSkip, TFlag, TThreshold> Data;
+};
+
+struct TPhantomFlagStorageData {
+ struct TChunk {
+ ui32 DataSize;
+ };
+
+ void Deserialize(const TPhantomFlagStorageDataProto& proto);
+ void Serialize(TPhantomFlagStorageDataProto* proto) const;
+
+ std::unordered_map<ui32, TChunk> Chunks;
+ ui32 ChunkSize;
+};
+
+} // namespace NKikimr::NSyncLog
diff --git a/ydb/core/blobstorage/vdisk/synclog/phantom_flag_storage/phantom_flag_storage_processor.cpp b/ydb/core/blobstorage/vdisk/synclog/phantom_flag_storage/phantom_flag_storage_processor.cpp
new file mode 100644
index 00000000000..ef3487eb3df
--- /dev/null
+++ b/ydb/core/blobstorage/vdisk/synclog/phantom_flag_storage/phantom_flag_storage_processor.cpp
@@ -0,0 +1,421 @@
+#include "phantom_flags.h"
+#include "phantom_flag_thresholds.h"
+#include "phantom_flag_storage_processor.h"
+
+#include <util/generic/overloaded.h>
+
+#include <ydb/library/actors/core/actor_bootstrapped.h>
+#include <ydb/core/blobstorage/pdisk/blobstorage_pdisk.h>
+#include <ydb/core/blobstorage/vdisk/synclog/blobstorage_synclog_private_events.h>
+#include <ydb/core/blobstorage/vdisk/chunk_keeper/chunk_keeper_events.h>
+#include <ydb/core/util/stlog.h>
+
+#include <unordered_set>
+
+namespace NKikimr::NSyncLog {
+
+////////////////////////////////////////////////////////////////////////////
+// TPhantomFlagStorageProcessor
+////////////////////////////////////////////////////////////////////////////
+class TPhantomFlagStorageProcessor : public TActorBootstrapped<TPhantomFlagStorageProcessor> {
+public:
+ TPhantomFlagStorageProcessor(TPhantomFlagStorageData&& data,
+ TPhantomFlagStorageProcessorContext&& ctx)
+ : Ctx(std::move(ctx))
+ , Data(std::move(data))
+ , PendingRead(Ctx.SyncLogCtx->VCtx->Top->GType)
+ {}
+
+ static constexpr NKikimrServices::TActivity::EType ActorActivityType() {
+ return NKikimrServices::TActivity::BS_PHANTOM_FLAG_STORAGE_WRITER;
+ }
+
+ void Bootstrap() {
+ STLOG(PRI_NOTICE, BS_PHANTOM_FLAG_PROCESSOR, BSPFP01, VDISKP(Ctx.SyncLogCtx->VCtx,
+ "Bootstrap PhantomFlagStorageProcessor"),
+ (ChunkCount, Data.Chunks.size()),
+ (ChunkSize, Data.ChunkSize));
+ Send(Ctx.ChunkKeeperId, new TEvChunkKeeperDiscover(SubsystemId));
+ RequestInFlight = true;
+ Become(&TThis::StateInit);
+ }
+
+private:
+ //////////////////////////////////////////////////////////////////////
+ // State functions
+ //////////////////////////////////////////////////////////////////////
+ STRICT_STFUNC(StateInit,
+ hFunc(TEvPhantomFlagStorageWriteItems, Handle)
+ hFunc(TEvPhantomFlagStorageDrop, Handle)
+ hFunc(TEvChunkKeeperDiscoverResult, HandleInit)
+ cFunc(TEvents::TEvPoisonPill::EventType, PassAway)
+ )
+
+ STRICT_STFUNC(StateWork,
+ hFunc(TEvPhantomFlagStorageDrop, Handle)
+ hFunc(TEvChunkKeeperAllocateResult, Handle)
+ hFunc(TEvChunkKeeperFreeResult, Handle)
+ hFunc(NPDisk::TEvChunkWriteResult, Handle)
+ hFunc(NPDisk::TEvChunkReadResult, Handle)
+ hFunc(TEvPhantomFlagStorageWriteItems, Handle)
+ hFunc(TEvPhantomFlagStorageGetSnapshot, Handle)
+ cFunc(TEvents::TEvPoisonPill::EventType, PassAway)
+ )
+
+ PDISK_TERMINATE_STATE_FUNC_DEF;
+
+ //////////////////////////////////////////////////////////////////////
+ // Handlers
+ //////////////////////////////////////////////////////////////////////
+ void HandleInit(const TEvChunkKeeperDiscoverResult::TPtr& ev) {
+ STLOG(PRI_NOTICE, BS_PHANTOM_FLAG_PROCESSOR, BSPFP02, VDISKP(Ctx.SyncLogCtx->VCtx,
+ "Handle TEvChunkKeeperDiscoverResult"),
+ (Event, ev->Get()->ToString()));
+ RequestInFlight = false;
+ switch (ev->Get()->Status) {
+ case NKikimrProto::OK: {
+ std::unordered_set<ui32> discoveredChunks;
+ for (const auto& [chunkIdx, chunk] : ev->Get()->Chunks) {
+ discoveredChunks.insert(chunkIdx);
+ if (!Data.Chunks.contains(chunkIdx)) {
+ // chunk was allocated but wasn't committed to SyncLog
+ // entryPoint before crash
+ Data.Chunks[chunkIdx] = TPhantomFlagStorageData::TChunk{
+ .DataSize = 0,
+ };
+ }
+ }
+
+ std::unordered_map<ui32, TPhantomFlagStorageData::TChunk> chunks;
+ for (const auto& [chunkIdx, chunk] : Data.Chunks) {
+ if (discoveredChunks.contains(chunkIdx)) {
+ chunks[chunkIdx] = chunk;
+ } else {
+ EnqueueChunkDeletion(chunkIdx);
+ }
+ }
+ std::exchange(Data.Chunks, chunks); // filter out deallocated chunks
+ SelectTailChunk();
+ break;
+ }
+ default: {
+ // ChunkKeeper is disabled, unable to manage chunks, terminate this actor
+ PassAway();
+ return;
+ }
+ }
+
+ Become(&TThis::StateWork);
+ ProcessQueues();
+ }
+
+ void Handle(TEvPhantomFlagStorageWriteItems::TPtr ev) {
+ STLOG(PRI_DEBUG, BS_PHANTOM_FLAG_PROCESSOR, BSPFP03, VDISKP(Ctx.SyncLogCtx->VCtx,
+ "Handle TEvPhantomFlagStorageWriteItems"),
+ (ItemCount, ev->Get()->Items.size()),
+ (WriteQueueSize, WriteQueue.size()));
+ std::ranges::move(ev->Get()->Items.begin(), ev->Get()->Items.end(),
+ std::back_inserter(WriteQueue));
+ ProcessQueues();
+ }
+
+ void Handle(const TEvChunkKeeperAllocateResult::TPtr& ev) {
+ STLOG(PRI_NOTICE, BS_PHANTOM_FLAG_PROCESSOR, BSPFP04, VDISKP(Ctx.SyncLogCtx->VCtx,
+ "Handle TEvChunkKeeperAllocateResult"),
+ (Event, ev->Get()->ToString()));
+ RequestInFlight = false;
+ switch (ev->Get()->Status) {
+ case NKikimrProto::OK: {
+ const ui32 chunkIdx = *ev->Get()->ChunkIdx;
+ Data.Chunks[chunkIdx] = TPhantomFlagStorageData::TChunk{
+ .DataSize = 0,
+ };
+ TailChunkIdx = chunkIdx;
+ TailAvailableSize = Data.ChunkSize;
+ break;
+ }
+ default:
+ // retry
+ AllocateNewChunk();
+ return;
+ }
+ ProcessQueues();
+ }
+
+ void Handle(const TEvChunkKeeperFreeResult::TPtr& ev) {
+ STLOG(PRI_NOTICE, BS_PHANTOM_FLAG_PROCESSOR, BSPFP05, VDISKP(Ctx.SyncLogCtx->VCtx,
+ "Handle TEvChunkKeeperFreeResult"),
+ (Event, ev->Get()->ToString()));
+ RequestInFlight = false;
+ Data.Chunks.erase(ev->Get()->ChunkIdx);
+ CommitState();
+ ProcessQueues();
+ }
+
+ void Handle(const NPDisk::TEvChunkWriteResult::TPtr& ev) {
+ STLOG(PRI_DEBUG, BS_PHANTOM_FLAG_PROCESSOR, BSPFP06, VDISKP(Ctx.SyncLogCtx->VCtx,
+ "Handle TEvChunkWriteResult"),
+ (Event, ev->Get()->ToString()));
+ CHECK_PDISK_RESPONSE(Ctx.SyncLogCtx->VCtx, ev, TActivationContext::AsActorContext());
+ RequestInFlight = false;
+ ui32 chunkIdx = ev->Get()->ChunkIdx;
+ if (chunkIdx == TailChunkIdx) {
+ TailAvailableSize -= PendingWriteSize;
+ }
+ Data.Chunks[chunkIdx].DataSize += std::exchange(PendingWriteSize, 0);
+ CommitState();
+ ProcessQueues();
+ }
+
+ void Handle(const NPDisk::TEvChunkReadResult::TPtr& ev) {
+ STLOG(PRI_DEBUG, BS_PHANTOM_FLAG_PROCESSOR, BSPFP07, VDISKP(Ctx.SyncLogCtx->VCtx,
+ "Handle TEvChunkReadResult"),
+ (Event, ev->Get()->ToString()));
+ CHECK_PDISK_RESPONSE(Ctx.SyncLogCtx->VCtx, ev, TActivationContext::AsActorContext());
+ ui32 chunkIdx = ev->Get()->ChunkIdx;
+ PendingRead.ChunksToRead.erase(chunkIdx);
+ ProcessReadBuffer(ev->Get()->Data, chunkIdx);
+ if (PendingRead.ChunksToRead.empty()) {
+ FinalizeRead();
+ }
+ ProcessQueues();
+ }
+
+ void Handle(const TEvPhantomFlagStorageGetSnapshot::TPtr& ev) {
+ STLOG(PRI_NOTICE, BS_PHANTOM_FLAG_PROCESSOR, BSPFP08, VDISKP(Ctx.SyncLogCtx->VCtx,
+ "Handle TEvPhantomFlagStorageGetSnapshot"));
+ EnqueueGetSnapshot(ev->Sender);
+ ProcessQueues();
+ }
+
+ void Handle(const TEvPhantomFlagStorageDrop::TPtr&) {
+ STLOG(PRI_NOTICE, BS_PHANTOM_FLAG_PROCESSOR, BSPFP11, VDISKP(Ctx.SyncLogCtx->VCtx,
+ "Handle TEvPhantomFlagStorageDrop"),
+ (ChunkCount, Data.Chunks.size()));
+ TailChunkIdx = std::nullopt;
+ TailAvailableSize = 0;
+ WriteQueue.clear();
+ PendingWrite.clear();
+ PendingWriteSize = 0;
+ for (const auto& [chunkIdx, chunk] : Data.Chunks) {
+ EnqueueChunkDeletion(chunkIdx);
+ }
+ ProcessQueues();
+ }
+
+ void HandlePoison(const TEvents::TEvPoisonPill::TPtr&, const TActorContext&) {
+ PassAway();
+ }
+
+ //////////////////////////////////////////////////////////////////////
+ // Other methods
+ //////////////////////////////////////////////////////////////////////
+
+ void SelectTailChunk() {
+ TailChunkIdx.reset();
+ TailAvailableSize = 0;
+ for (const auto& [chunkIdx, chunk] : Data.Chunks) {
+ if (Data.ChunkSize - chunk.DataSize > TailAvailableSize) {
+ TailChunkIdx.emplace(chunkIdx);
+ TailAvailableSize = Data.ChunkSize - chunk.DataSize;
+ }
+ }
+ }
+
+ void EnqueueChunkDeletion(ui32 chunkIdx) {
+ RequestQueue.emplace_back(TDeleteChunk{chunkIdx});
+ }
+
+ void EnqueueGetSnapshot(TActorId requester) {
+ RequestQueue.emplace_front(TGetSnapshot{requester});
+ }
+
+ void ProcessQueues() {
+ while (!RequestInFlight && !RequestQueue.empty()) {
+ TRequest request = RequestQueue.front();
+ RequestQueue.pop_front();
+ RequestInFlight = true;
+ std::visit(TOverloaded{
+ [&](const std::monostate&) {},
+ [&](const TGetSnapshot& req) { GetSnapshot(req.Requester); },
+ [&](const TDeleteChunk& req) { DeleteChunk(req.ChunkIdx); },
+ }, request);
+ return;
+ }
+ ProcessWriteQueue();
+ }
+
+ void ProcessWriteQueue() {
+ if (RequestInFlight) {
+ return;
+ }
+
+ if (!WriteQueue.empty()) {
+ ui32 nextItemSize = WriteQueue.front().SerializedSize();
+ ui32 minRequiredSize = PendingWrite.size() + nextItemSize + Ctx.AppendBlockSize;
+
+ if (TailAvailableSize < minRequiredSize + Ctx.AppendBlockSize) {
+ AllocateNewChunk();
+ return;
+ }
+ }
+
+ while (!WriteQueue.empty()) {
+ const TPhantomFlagStorageItem& item = WriteQueue.front();
+ if (TailAvailableSize < PendingWrite.size() + item.SerializedSize() + Ctx.AppendBlockSize) {
+ break;
+ }
+ item.Serialize(&PendingWrite);
+ WriteQueue.pop_front();
+ }
+
+ if (!PendingWrite.empty()) {
+ IssueWrite();
+ }
+ }
+
+ void DeleteChunk(ui32 chunkIdx) {
+ if (Data.Chunks.contains(chunkIdx)) {
+ Send(Ctx.ChunkKeeperId, new TEvChunkKeeperFree(chunkIdx, SubsystemId));
+ } else {
+ RequestInFlight = false;
+ }
+ }
+
+ void GetSnapshot(TActorId requester) {
+ RequestInFlight = true;
+ PendingRead.Reset(requester);
+ for (const auto [chunkIdx, chunk] : Data.Chunks) {
+ if (chunk.DataSize > 0) {
+ Send(Ctx.SyncLogCtx->PDiskCtx->PDiskId,
+ new NPDisk::TEvChunkRead(Ctx.SyncLogCtx->PDiskCtx->Dsk->Owner, Ctx.SyncLogCtx->PDiskCtx->Dsk->OwnerRound,
+ chunkIdx, 0, chunk.DataSize, NPriWrite::SyncLog, nullptr));
+ PendingRead.ChunksToRead.insert(chunkIdx);
+ }
+ }
+ STLOG(PRI_NOTICE, BS_PHANTOM_FLAG_PROCESSOR, BSPFP10, VDISKP(Ctx.SyncLogCtx->VCtx,
+ "Start reading snapshot"),
+ (ChunkToReadCount, PendingRead.ChunksToRead.size()));
+ if (PendingRead.ChunksToRead.empty()) {
+ FinalizeRead();
+ }
+ }
+
+ void FinalizeRead() {
+ RequestInFlight = false;
+ STLOG(PRI_NOTICE, BS_PHANTOM_FLAG_PROCESSOR, BSPFP09, VDISKP(Ctx.SyncLogCtx->VCtx,
+ "Send Snapshot"),
+ (FlagCount, PendingRead.Flags.size()));
+ Send(PendingRead.Requester, new TEvPhantomFlagStorageGetSnapshotResult(
+ TPhantomFlagStorageSnapshot(std::move(PendingRead.Flags),
+ std::move(PendingRead.Thresholds))));
+ }
+
+ void AllocateNewChunk() {
+ RequestInFlight = true;
+ Send(Ctx.ChunkKeeperId, new TEvChunkKeeperAllocate(SubsystemId));
+ }
+
+ void CommitState() {
+ Send(Ctx.SyncLogKeeperId, new TEvPhantomFlagStorageCommitData(Data));
+ }
+
+ void IssueWrite() {
+ Y_ABORT_UNLESS(TailChunkIdx);
+ RequestInFlight = true;
+ ui32 offset = Data.Chunks[*TailChunkIdx].DataSize;
+ Y_ABORT_UNLESS(offset <= Data.ChunkSize);
+ ui32 maxSize = Data.ChunkSize - offset;
+ TPhantomFlagStorageItem::AlignWriteBlock(&PendingWrite, Ctx.AppendBlockSize, maxSize);
+ PendingWriteSize = PendingWrite.size();
+ auto parts = MakeIntrusive<NPDisk::TEvChunkWrite::TAlignedParts>(std::move(PendingWrite));
+ Send(Ctx.SyncLogCtx->PDiskCtx->PDiskId,
+ new NPDisk::TEvChunkWrite(Ctx.SyncLogCtx->PDiskCtx->Dsk->Owner, Ctx.SyncLogCtx->PDiskCtx->Dsk->OwnerRound,
+ *TailChunkIdx, offset, parts, nullptr, true, NPriWrite::SyncLog));
+ }
+
+ void ProcessReadBuffer(class TBufferWithGaps& bufferWithGaps, ui32 chunkIdx) {
+ if (!bufferWithGaps.IsReadable()) {
+ return;
+ }
+ TRcBuf buffer = bufferWithGaps.ToString();
+ ui64 offset = 0;
+ while (offset < Data.Chunks[chunkIdx].DataSize) {
+ TPhantomFlagStorageItem item = TPhantomFlagStorageItem::DeserializeFromRaw(
+ buffer.Data() + offset);
+ switch (item.GetType()) {
+ case EPhantomFlagStorageItem::Flag:
+ PendingRead.Flags.push_back(item.GetFlag().Record);
+ break;
+ case EPhantomFlagStorageItem::Threshold: {
+ TPhantomFlagStorageItem::TThreshold threshold = item.GetThreshold();
+ PendingRead.Thresholds.AddBlob(threshold.OrderNumber, threshold.TabletId,
+ threshold.Channel, threshold.Generation, threshold.Step);
+ break;
+ }
+ case EPhantomFlagStorageItem::Skip:
+ case EPhantomFlagStorageItem::SkipOneByte:
+ break;
+ }
+
+ offset += item.SerializedSize();
+ if (item.SerializedSize() == 0) {
+ return;
+ }
+ }
+ }
+
+private:
+ struct TGetSnapshot {
+ TActorId Requester;
+ };
+
+ struct TDeleteChunk {
+ ui32 ChunkIdx;
+ };
+
+ using TRequest = std::variant<TGetSnapshot, TDeleteChunk>;
+
+ struct TReaderInfo {
+ TReaderInfo(const TBlobStorageGroupType& gtype)
+ : Thresholds(gtype)
+ {}
+
+ std::unordered_set<ui32> ChunksToRead;
+ TPhantomFlags Flags;
+ TPhantomFlagThresholds Thresholds;
+ TActorId Requester = TActorId{};
+
+ void Reset(TActorId requester) {
+ ChunksToRead.clear();
+ Flags.clear();
+ Thresholds.Clear();
+ Requester = requester;
+ }
+ };
+
+private:
+ static constexpr NKikimrVDiskData::TChunkKeeperEntryPoint::ESubsystem SubsystemId =
+ NKikimrVDiskData::TChunkKeeperEntryPoint::PhantomFlagStorage;
+
+ const TPhantomFlagStorageProcessorContext Ctx;
+ TPhantomFlagStorageData Data;
+
+ std::deque<TRequest> RequestQueue;
+ bool RequestInFlight = false;
+
+ std::deque<TPhantomFlagStorageItem> WriteQueue;
+ TString PendingWrite;
+ ui32 PendingWriteSize = 0;
+ TReaderInfo PendingRead;
+
+ std::optional<ui32> TailChunkIdx;
+ ui64 TailAvailableSize = 0;
+};
+
+NActors::IActor* CreatePhantomFlagStorageProcessor(TPhantomFlagStorageData&& data,
+ TPhantomFlagStorageProcessorContext&& ctx) {
+ return new TPhantomFlagStorageProcessor(std::move(data), std::move(ctx));
+}
+
+} // namespace NKikimr::NSyncLog
diff --git a/ydb/core/blobstorage/vdisk/synclog/phantom_flag_storage/phantom_flag_storage_processor.h b/ydb/core/blobstorage/vdisk/synclog/phantom_flag_storage/phantom_flag_storage_processor.h
new file mode 100644
index 00000000000..894412d8572
--- /dev/null
+++ b/ydb/core/blobstorage/vdisk/synclog/phantom_flag_storage/phantom_flag_storage_processor.h
@@ -0,0 +1,25 @@
+#pragma once
+
+#include "phantom_flag_storage_data.h"
+
+#include <ydb/core/blobstorage/vdisk/common/vdisk_pdiskctx.h>
+#include <ydb/library/actors/core/actor.h>
+
+namespace NKikimr::NSyncLog {
+
+struct TPhantomFlagStorageProcessorContext {
+ const TIntrusivePtr<TSyncLogCtx> SyncLogCtx;
+ const NActors::TActorId SyncLogKeeperId;
+ const NActors::TActorId ChunkKeeperId;
+ const ui32 AppendBlockSize;
+};
+
+////////////////////////////////////////////////////////////////////////////
+// PHANTOM FLAG STORAGE PROCESSOR CREATOR
+// Creates the actor that writes and reads PhantomFlagStorage data
+// (flags and thresholds) on disk and manages chunks via the ChunkKeeper
+////////////////////////////////////////////////////////////////////////////
+NActors::IActor* CreatePhantomFlagStorageProcessor(TPhantomFlagStorageData&& data,
+ TPhantomFlagStorageProcessorContext&& ctx);
+
+} // namespace NKikimr::NSyncLog
diff --git a/ydb/core/blobstorage/vdisk/synclog/phantom_flag_storage/phantom_flag_storage_snapshot.cpp b/ydb/core/blobstorage/vdisk/synclog/phantom_flag_storage/phantom_flag_storage_snapshot.cpp
index 8cfe33e6e24..43db5654c8a 100644
--- a/ydb/core/blobstorage/vdisk/synclog/phantom_flag_storage/phantom_flag_storage_snapshot.cpp
+++ b/ydb/core/blobstorage/vdisk/synclog/phantom_flag_storage/phantom_flag_storage_snapshot.cpp
@@ -4,8 +4,16 @@ namespace NKikimr {
namespace NSyncLog {
-TPhantomFlagStorageSnapshot::TPhantomFlagStorageSnapshot(const TPhantomFlags& flags)
+TPhantomFlagStorageSnapshot::TPhantomFlagStorageSnapshot(TPhantomFlags&& flags,
+ TPhantomFlagThresholds&& thresholds)
+ : Flags(std::move(flags))
+ , Thresholds(std::move(thresholds))
+{}
+
+TPhantomFlagStorageSnapshot::TPhantomFlagStorageSnapshot(const TPhantomFlags& flags,
+ const TPhantomFlagThresholds& thresholds)
: Flags(flags)
+ , Thresholds(thresholds)
{}
} // namespace NSyncLog
diff --git a/ydb/core/blobstorage/vdisk/synclog/phantom_flag_storage/phantom_flag_storage_snapshot.h b/ydb/core/blobstorage/vdisk/synclog/phantom_flag_storage/phantom_flag_storage_snapshot.h
index f33939a8779..2eae31af9bc 100644
--- a/ydb/core/blobstorage/vdisk/synclog/phantom_flag_storage/phantom_flag_storage_snapshot.h
+++ b/ydb/core/blobstorage/vdisk/synclog/phantom_flag_storage/phantom_flag_storage_snapshot.h
@@ -7,10 +7,15 @@ namespace NKikimr {
namespace NSyncLog {
-// TODO: include thresholds in snapshot
struct TPhantomFlagStorageSnapshot {
- TPhantomFlagStorageSnapshot(const TPhantomFlags& flags);
+ TPhantomFlagStorageSnapshot(TPhantomFlags&& flags,
+ TPhantomFlagThresholds&& thresholds);
+
+ TPhantomFlagStorageSnapshot(const TPhantomFlags& flags,
+ const TPhantomFlagThresholds& thresholds);
+
TPhantomFlags Flags;
+ TPhantomFlagThresholds Thresholds;
};
} // namespace NSyncLog
diff --git a/ydb/core/blobstorage/vdisk/synclog/phantom_flag_storage/phantom_flag_storage_state.cpp b/ydb/core/blobstorage/vdisk/synclog/phantom_flag_storage/phantom_flag_storage_state.cpp
index efcb7cfaab4..6cf7f0177ba 100644
--- a/ydb/core/blobstorage/vdisk/synclog/phantom_flag_storage/phantom_flag_storage_state.cpp
+++ b/ydb/core/blobstorage/vdisk/synclog/phantom_flag_storage/phantom_flag_storage_state.cpp
@@ -1,4 +1,5 @@
#include "phantom_flag_storage_state.h"
+#include "phantom_flag_storage_processor.h"
#include <ydb/core/util/stlog.h>
#include <ydb/core/blobstorage/vdisk/synclog/blobstorage_synclogmsgreader.h>
@@ -11,9 +12,28 @@ TPhantomFlagStorageState::TPhantomFlagStorageState(TIntrusivePtr<TSyncLogCtx> sl
: SlCtx(slCtx)
, GType(slCtx->VCtx->Top->GType)
, Thresholds(GType)
- , MaxFlagsStoredCount(0)
{}
+void TPhantomFlagStorageState::InitializePersistent(TPhantomFlagStorageData&& data,
+ TActorId syncLogKeeperId, TActorId chunkKeeperId, ui32 appendBlockSize) {
+ IsPersistent = true;
+
+ NActors::IActor* processorActor = CreatePhantomFlagStorageProcessor(std::move(data),
+ TPhantomFlagStorageProcessorContext{
+ .SyncLogCtx = SlCtx,
+ .SyncLogKeeperId = syncLogKeeperId,
+ .ChunkKeeperId = chunkKeeperId,
+ .AppendBlockSize = appendBlockSize,
+ });
+ ProcessorId = TActivationContext::Register(processorActor);
+
+ if (!data.Chunks.empty()) {
+ Active = true;
+ Building = true;
+ TActivationContext::Send(new IEventHandle(ProcessorId, syncLogKeeperId, new TEvPhantomFlagStorageGetSnapshot));
+ }
+}
+
void TPhantomFlagStorageState::StartBuilding() {
if (GType.BlobSubgroupSize() > MaxExpectedDisksInGroup) {
STLOG(PRI_ERROR, BS_PHANTOM_FLAG_STORAGE, BSPFS01,
@@ -41,7 +61,11 @@ void TPhantomFlagStorageState::ProcessBlobRecordFromSyncLog(const TLogoBlobRec*
(Building, Building),
(SyncedMask, SyncedMask.to_ullong()),
(Thresholds, Thresholds.ToString()));
- AddFlag(*blobRec);
+ if (IsPersistent) {
+ AddItemToWriteBuffer(TPhantomFlagStorageItem::CreateFlag(blobRec));
+ } else {
+ AddFlag(*blobRec);
+ }
}
}
@@ -58,46 +82,75 @@ void TPhantomFlagStorageState::ProcessBarrierRecordFromNeighbour(ui32 orderNumbe
}
}
-void TPhantomFlagStorageState::FinishBuilding(TPhantomFlags&& flags, TPhantomFlagThresholds&& thresholds,
+void TPhantomFlagStorageState::FinishInitialBuilding(TPhantomFlags&& flags, TPhantomFlagThresholds&& thresholds,
ui64 sizeLimit) {
if (!Active) {
// PhantomFlagStorage was deactivated while building, do nothing
return;
}
- AdjustSize(sizeLimit);
- ui64 flagsAdded = 0;
- for (const TLogoBlobRec& rec : flags) {
- if (!AddFlag(rec)) {
- break;
+ if (IsPersistent) {
+ for (const TLogoBlobRec& rec : flags) {
+ AddItemToWriteBuffer(TPhantomFlagStorageItem::CreateFlag(&rec));
+ }
+ std::vector<TPhantomFlagThresholds::TThreshold> thresholdList = thresholds.GetList();
+ for (const auto [tabletId, channel, generation, step, orderNumber] : thresholdList) {
+ AddItemToWriteBuffer(TPhantomFlagStorageItem::CreateThreshold(orderNumber,
+ tabletId, channel, generation, step));
+ }
+ } else {
+ AdjustSize(sizeLimit);
+ ui64 flagsAdded = 0;
+ for (const TLogoBlobRec& rec : flags) {
+ if (!AddFlag(rec)) {
+ break;
+ }
+ ++flagsAdded;
}
- ++flagsAdded;
+ Thresholds.Merge(std::move(thresholds));
+
+ STLOG(PRI_DEBUG, BS_PHANTOM_FLAG_STORAGE, BSPFS06,
+ VDISKP(SlCtx->VCtx, "Finish building"),
+ (FlagsAdded, flagsAdded),
+ (FlagsReceived, flags.size()));
}
- Thresholds.Merge(std::move(thresholds));
Building = false;
+}
- STLOG(PRI_DEBUG, BS_PHANTOM_FLAG_STORAGE, BSPFS06,
- VDISKP(SlCtx->VCtx, "Finish building"),
- (FlagsAdded, flagsAdded),
- (FlagsReceived, flags.size()));
+void TPhantomFlagStorageState::Recover(TPhantomFlagStorageSnapshot&& snapshot) {
+ STLOG(PRI_DEBUG, BS_PHANTOM_FLAG_STORAGE, BSPFS10,
+ VDISKP(SlCtx->VCtx, "Recovering PhantomFlagStorage"));
+ Building = false;
+ Thresholds.Merge(std::move(snapshot.Thresholds));
}
void TPhantomFlagStorageState::Deactivate() {
STLOG(PRI_NOTICE, BS_PHANTOM_FLAG_STORAGE, BSPFS07,
VDISKP(SlCtx->VCtx, "Deactivating PhantomFlagStorage"),
(FlagsDropped, StoredFlags.size()));
- StoredFlags.clear();
Thresholds.Clear();
Active = false;
Building = false;
+ if (IsPersistent) {
+ TActivationContext::Send(new IEventHandle(ProcessorId, TActorId{}, new TEvPhantomFlagStorageDrop));
+ WriteBuffer.clear();
+ WriteBufferSize = 0;
+ } else {
+ StoredFlags.clear();
+ }
}
-TPhantomFlagStorageSnapshot TPhantomFlagStorageState::GetSnapshot() const {
- STLOG(PRI_DEBUG, BS_PHANTOM_FLAG_STORAGE, BSPFS05,
- VDISKP(SlCtx->VCtx, "Acquiring snapshot"),
- (FlagsCount, StoredFlags.size()));
- return TPhantomFlagStorageSnapshot(StoredFlags);
+void TPhantomFlagStorageState::RequestSnapshot(TEvPhantomFlagStorageGetSnapshot::TPtr ev) const {
+ if (IsPersistent) {
+ TActivationContext::Send(ev->Forward(ProcessorId));
+ } else {
+ STLOG(PRI_DEBUG, BS_PHANTOM_FLAG_STORAGE, BSPFS05,
+ VDISKP(SlCtx->VCtx, "Acquiring snapshot"),
+ (FlagsCount, StoredFlags.size()));
+ auto res = std::make_unique<TEvPhantomFlagStorageGetSnapshotResult>(TPhantomFlagStorageSnapshot(StoredFlags, Thresholds));
+ TActivationContext::Send(new IEventHandle(ev->Sender, ev->Recipient, res.release()));
+ }
}
bool TPhantomFlagStorageState::IsActive() const {
@@ -190,6 +243,50 @@ void TPhantomFlagStorageState::UpdateMetrics() {
SlCtx->PhantomFlagStorageGroup.ThresholdsMemoryConsumption() = Thresholds.EstimatedMemoryConsumption();
}
+void TPhantomFlagStorageState::AddItemToWriteBuffer(const TPhantomFlagStorageItem& item) {
+ if (WriteBufferSize + item.SerializedSize() > WriteBufferSizeLimit) {
+ FlushWriteBuffer();
+ }
+ WriteBuffer.push_back(item);
+ WriteBufferSize += item.SerializedSize();
+}
+
+void TPhantomFlagStorageState::FlushWriteBuffer() {
+ if (!WriteBuffer.empty()) {
+ auto ev = std::make_unique<TEvPhantomFlagStorageWriteItems>(std::move(WriteBuffer));
+ TActivationContext::Send(new IEventHandle(ProcessorId, TActorId{}, ev.release()));
+ WriteBufferSize = 0;
+ WriteBufferFlushTimestamp = TActivationContext::Monotonic();
+ }
+}
+
+void TPhantomFlagStorageState::FlushWriteBufferIfNeeded() {
+ TMonotonic now = TActivationContext::Monotonic();
+ if (now - WriteBufferFlushTimestamp > WriteBufferFlushPeriod) {
+ FlushWriteBuffer();
+ }
+}
+
+void TPhantomFlagStorageState::SyncLogIsCut() {
+ FlushWriteBuffer();
+}
+
+std::optional<TPhantomFlagStorageData> TPhantomFlagStorageState::GetPersistentData() const {
+ return PersistentData;
+}
+
+void TPhantomFlagStorageState::UpdatePersistentData(std::optional<TPhantomFlagStorageData>&& data) {
+ PersistentData = std::move(data);
+}
+
+void TPhantomFlagStorageState::Terminate() {
+ if (ProcessorId != TActorId{}) {
+ TActivationContext::Send(new IEventHandle(ProcessorId, TActorId{}, new TEvents::TEvPoisonPill));
+ WriteBuffer.clear();
+ WriteBufferSize = 0;
+ }
+}
+
} // namespace NSyncLog
} // namespace NKikimr
diff --git a/ydb/core/blobstorage/vdisk/synclog/phantom_flag_storage/phantom_flag_storage_state.h b/ydb/core/blobstorage/vdisk/synclog/phantom_flag_storage/phantom_flag_storage_state.h
index 956bef41d32..f8dd47069e1 100644
--- a/ydb/core/blobstorage/vdisk/synclog/phantom_flag_storage/phantom_flag_storage_state.h
+++ b/ydb/core/blobstorage/vdisk/synclog/phantom_flag_storage/phantom_flag_storage_state.h
@@ -1,8 +1,10 @@
#pragma once
#include <ydb/core/blobstorage/vdisk/synclog/blobstorage_synclogformat.h>
+#include <ydb/core/blobstorage/vdisk/synclog/blobstorage_synclog_private_events.h>
#include "phantom_flags.h"
+#include "phantom_flag_storage_data.h"
#include "phantom_flag_storage_snapshot.h"
#include "phantom_flag_thresholds.h"
@@ -19,6 +21,8 @@ class TPhantomFlagStorageState {
public:
TPhantomFlagStorageState(TIntrusivePtr<TSyncLogCtx> slCtx);
+ void InitializePersistent(TPhantomFlagStorageData&& data, TActorId syncLogKeeperId,
+ TActorId chunkKeeperId, ui32 appendBlockSize);
void StartBuilding();
// Adds DoNotKeep flags from synclog if needed
@@ -27,13 +31,12 @@ public:
// Add all DoNotKeep records from cut synclog snapshot up to sizeLimit
// Note: in some obscure cases there may be two active builders simultaneously
// It shouldn't make any difference though, we just add more flags
- void FinishBuilding(TPhantomFlags&& flags, TPhantomFlagThresholds&& thresholds, ui64 sizeLimit);
+ void FinishInitialBuilding(TPhantomFlags&& flags, TPhantomFlagThresholds&& thresholds, ui64 sizeLimit);
+ void Recover(TPhantomFlagStorageSnapshot&& snapshot);
void Deactivate();
- // TODO: rebuild thresholds structure after restart. Either write it to VDisk log or rebuild from hull snapshot
-
// Read everything from storage
- TPhantomFlagStorageSnapshot GetSnapshot() const;
+ void RequestSnapshot(TEvPhantomFlagStorageGetSnapshot::TPtr request) const;
bool IsActive() const;
// Process sync data from neighbours, we do it to update Thresholds
@@ -46,6 +49,12 @@ public:
void UpdateMetrics();
+ std::optional<TPhantomFlagStorageData> GetPersistentData() const;
+ void UpdatePersistentData(std::optional<TPhantomFlagStorageData>&& data);
+ void FlushWriteBufferIfNeeded();
+ void SyncLogIsCut();
+ void Terminate();
+
private:
// Adds DoNotKeep flags to storage and Keeps to Thresholds for specified neighbour
void ProcessBlobRecordFromNeighbour(ui32 orderNumber, const TLogoBlobRec* blobRec);
@@ -55,15 +64,31 @@ private:
void AdjustSize(ui64 sizeLimit);
bool AddFlag(const TLogoBlobRec& blobRec);
+ void AddItemToWriteBuffer(const TPhantomFlagStorageItem& item);
+ void FlushWriteBuffer();
+
private:
TIntrusivePtr<TSyncLogCtx> SlCtx;
const TBlobStorageGroupType GType;
TPhantomFlagThresholds Thresholds;
TPhantomFlags StoredFlags;
- ui64 MaxFlagsStoredCount;
+ ui64 MaxFlagsStoredCount = 0;
TSyncedMask SyncedMask;
bool Active = false;
bool Building = false;
+
+ // persistent phantom flag storage
+ bool IsPersistent = false;
+ TActorId ProcessorId;
+ std::vector<TPhantomFlagStorageItem> WriteBuffer;
+ ui32 WriteBufferSize = 0;
+ std::optional<TPhantomFlagStorageData> PersistentData;
+ TMonotonic WriteBufferFlushTimestamp = TMonotonic::Zero();
+
+private:
+ // TODO: remove write buffer, use sync log
+ constexpr static ui32 WriteBufferSizeLimit = 1_MB;
+ constexpr static TDuration WriteBufferFlushPeriod = TDuration::Seconds(30);
};
} // namespace NSyncLog
diff --git a/ydb/core/blobstorage/vdisk/synclog/phantom_flag_storage/phantom_flag_thresholds.cpp b/ydb/core/blobstorage/vdisk/synclog/phantom_flag_storage/phantom_flag_thresholds.cpp
index 78b4176e6d3..2d456b32d3d 100644
--- a/ydb/core/blobstorage/vdisk/synclog/phantom_flag_storage/phantom_flag_thresholds.cpp
+++ b/ydb/core/blobstorage/vdisk/synclog/phantom_flag_storage/phantom_flag_thresholds.cpp
@@ -15,10 +15,6 @@ TPhantomFlagThresholds::TTabletChannel TPhantomFlagThresholds::MakeTabletChannel
return TTabletChannel(blobId.TabletID(), blobId.Channel());
}
-inline ui64 TPhantomFlagThresholds::THasher::operator()(const TTabletChannel& x) const {
- return std::hash<ui64>{}((x.first << 8) | x.second);
-}
-
TPhantomFlagThresholds::TTabletThresholds::TTabletThresholds() {
Thresholds.resize(MaxExpectedDisksInGroup);
}
@@ -64,6 +60,17 @@ void TPhantomFlagThresholds::TTabletThresholds::Merge(TBlobStorageGroupType grou
}
}
+std::vector<TPhantomFlagThresholds::TTabletThresholds::TTabletThreshold>
+ TPhantomFlagThresholds::TTabletThresholds::GetList() const {
+ std::vector<TTabletThreshold> res;
+ for (ui32 orderNumber = 0; orderNumber < Thresholds.size(); ++orderNumber) {
+ if (Thresholds[orderNumber]) {
+ res.emplace_back(Thresholds[orderNumber]->first, Thresholds[orderNumber]->second, orderNumber);
+ }
+ }
+ return res;
+}
+
TString TPhantomFlagThresholds::TTabletThresholds::ToString(TBlobStorageGroupType groupType) const {
TStringStream str;
str << "{";
@@ -94,6 +101,10 @@ void TPhantomFlagThresholds::AddBlob(ui32 orderNumber, const TLogoBlobID& blob)
TabletThresholds[MakeTabletChannel(blob)].AddBlob(orderNumber, MakeGenStep(blob));
}
+void TPhantomFlagThresholds::AddBlob(ui32 orderNumber, ui64 tabletId, ui8 channel, ui32 generation, ui32 step) {
+ TabletThresholds[{tabletId, channel}].AddBlob(orderNumber, {generation, step});
+}
+
void TPhantomFlagThresholds::AddHardBarrier(ui32 orderNumber, ui64 tabletId, ui8 channel, ui32 generation, ui32 step) {
TTabletChannel tabletChannel{tabletId, channel};
if (auto it = TabletThresholds.find(tabletChannel); it != TabletThresholds.end()) {
@@ -142,6 +153,19 @@ void TPhantomFlagThresholds::Clear() {
TabletThresholds.clear();
}
+
+std::vector<TPhantomFlagThresholds::TThreshold> TPhantomFlagThresholds::GetList() const {
+ std::vector<TThreshold> res;
+ for (const auto& [tabletChannel, thresholds] : TabletThresholds) {
+ auto [tabletId, channel] = tabletChannel;
+ std::vector<TTabletThresholds::TTabletThreshold> tuples = thresholds.GetList();
+ for (const auto& [generation, step, orderNumber] : tuples) {
+ res.emplace_back(tabletId, channel, generation, step, orderNumber);
+ }
+ }
+ return res;
+}
+
TString TPhantomFlagThresholds::ToString() const {
TStringStream str;
str << "TPhantomFlagThresholds# { ";
diff --git a/ydb/core/blobstorage/vdisk/synclog/phantom_flag_storage/phantom_flag_thresholds.h b/ydb/core/blobstorage/vdisk/synclog/phantom_flag_storage/phantom_flag_thresholds.h
index 014630b0983..2361152a0e6 100644
--- a/ydb/core/blobstorage/vdisk/synclog/phantom_flag_storage/phantom_flag_thresholds.h
+++ b/ydb/core/blobstorage/vdisk/synclog/phantom_flag_storage/phantom_flag_thresholds.h
@@ -22,10 +22,20 @@ namespace NSyncLog {
class TPhantomFlagThresholds {
public:
+ struct TThreshold {
+ ui64 TabletId = 0;
+ ui8 Channel = 0;
+ ui32 Generation = 0;
+ ui32 Step = 0;
+ ui8 OrderNumber = 0;
+ };
+
+public:
TPhantomFlagThresholds(const TBlobStorageGroupType& gtype);
void AddBlob(ui32 orderNumber, const TLogoBlobID& blob);
void AddBlob(const TLogoBlobID& blob);
+ void AddBlob(ui32 orderNumber, ui64 tabletId, ui8 channel, ui32 generation, ui32 step);
void AddHardBarrier(ui32 orderNumber, ui64 tabletId, ui8 channel, ui32 generation, ui32 step);
bool IsBehindThresholdOnUnsynced(const TLogoBlobID& blob, const TSyncedMask& syncedMask) const;
TPhantomFlags Sift(const TPhantomFlags& flags, const TSyncedMask& syncedMask);
@@ -33,6 +43,7 @@ public:
void Merge(TPhantomFlagThresholds&& other);
void Clear();
+ std::vector<TThreshold> GetList() const;
TString ToString() const;
private:
@@ -43,13 +54,22 @@ private:
static TTabletChannel MakeTabletChannel(const TLogoBlobID& blobId);
struct THasher {
- inline ui64 operator()(const TTabletChannel& x) const;
+ inline ui64 operator()(const TTabletChannel& x) const {
+ return std::hash<ui64>{}((x.first << 8) | x.second);
+ }
};
private:
// auxiliary classes
class TTabletThresholds {
public:
+ struct TTabletThreshold {
+ ui32 Generation = 0;
+ ui32 Step = 0;
+ ui8 OrderNumber = 0;
+ };
+
+ public:
TTabletThresholds();
void AddBlob(ui32 orderNumber, TGenStep genStep);
@@ -60,6 +80,7 @@ private:
const TSyncedMask& syncedMask) const;
void Merge(TBlobStorageGroupType groupType, TTabletThresholds&& other);
TString ToString(TBlobStorageGroupType groupType) const;
+ std::vector<TTabletThreshold> GetList() const;
private:
TStackVec<std::optional<TGenStep>, MaxExpectedDisksInGroup> Thresholds;
diff --git a/ydb/core/blobstorage/vdisk/synclog/phantom_flag_storage/ya.make b/ydb/core/blobstorage/vdisk/synclog/phantom_flag_storage/ya.make
index 7f1fe059873..7aeea4f639d 100644
--- a/ydb/core/blobstorage/vdisk/synclog/phantom_flag_storage/ya.make
+++ b/ydb/core/blobstorage/vdisk/synclog/phantom_flag_storage/ya.make
@@ -11,6 +11,8 @@ PEERDIR(
SRCS(
phantom_flag_storage_builder.cpp
+ phantom_flag_storage_data.cpp
+ phantom_flag_storage_processor.cpp
phantom_flag_storage_snapshot.cpp
phantom_flag_storage_state.cpp
phantom_flag_thresholds.cpp
diff --git a/ydb/core/protos/blobstorage_vdisk_internal.proto b/ydb/core/protos/blobstorage_vdisk_internal.proto
index f6ec56c6c45..a7d2f17f370 100644
--- a/ydb/core/protos/blobstorage_vdisk_internal.proto
+++ b/ydb/core/protos/blobstorage_vdisk_internal.proto
@@ -204,6 +204,16 @@ message TAddBulkSstRecoveryLogRec {
////////////////////////////////////////////////////////////////////////////////
// TSyncLogEntryPoint -- entry point for SyncLog
////////////////////////////////////////////////////////////////////////////////
+message TPhantomFlagStorageData {
+ message TPhantomFlagStorageChunk {
+ optional uint32 ChunkIdx = 1;
+ optional uint32 DataSize = 2;
+ };
+
+ repeated TPhantomFlagStorageChunk Chunks = 1;
+ optional uint32 ChunkSize = 2;
+};
+
message TSyncLogEntryPoint {
// HEADER
// PDisk guid for additional check
@@ -223,6 +233,8 @@ message TSyncLogEntryPoint {
repeated uint32 ChunksToDeleteDelayed = 20;
// opaque DiskRecLog serialized bytes
optional bytes DiskRecLogSerialized = 21;
+
+ optional TPhantomFlagStorageData PhantomFlagStorageData = 22;
};
message TScrubState {
diff --git a/ydb/core/protos/config.proto b/ydb/core/protos/config.proto
index 4cbd68c2cc3..4f4bb50804b 100644
--- a/ydb/core/protos/config.proto
+++ b/ydb/core/protos/config.proto
@@ -1840,6 +1840,13 @@ message TImmediateControlsConfig {
MaxValue: 1,
DefaultValue: 0,
}];
+
+ optional uint64 EnablePersistentPhantomFlagStorage = 41 [(ControlOptions) = {
+ Description: "Enables Persistent Phantom Flag Storage",
+ MinValue: 0,
+ MaxValue: 1,
+ DefaultValue: 0,
+ }];
}
message TTabletControls {
diff --git a/ydb/library/services/services.proto b/ydb/library/services/services.proto
index 1a1fdcd4c7b..33ab56ba88e 100644
--- a/ydb/library/services/services.proto
+++ b/ydb/library/services/services.proto
@@ -62,6 +62,7 @@ enum EServiceKikimr {
BS_PHANTOM_FLAG_STORAGE = 2607;
BS_CHUNK_KEEPER = 2608;
BS_COMP_BROKER = 2609;
+ BS_PHANTOM_FLAG_PROCESSOR = 2610;
// DATASHARD section //
TX_DATASHARD = 290; //
@@ -1166,5 +1167,6 @@ message TActivity {
BS_DDISK = 685;
BS_PERSISTENT_BUFFER = 686;
BS_SYNCER_MERGER = 687;
+ BS_PHANTOM_FLAG_STORAGE_WRITER = 688;
};
};