summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorBarkovBG <[email protected]>2026-06-09 01:08:04 +0300
committerGitHub <[email protected]>2026-06-09 01:08:04 +0300
commitfec99c4e60e3f43eedc145ada280f0769861fa01 (patch)
treed50e3920c3baa3de37c966c7d1ceaf6104178936
parent5ebf61ab6d0bca1b3d9557723c0c9967804d5ce1 (diff)
Introduce barrier cleanup (#41766)
-rw-r--r--ydb/core/nbs/cloud/blockstore/config/config.cpp1
-rw-r--r--ydb/core/nbs/cloud/blockstore/config/config.h1
-rw-r--r--ydb/core/nbs/cloud/blockstore/config/protos/storage.proto4
-rw-r--r--ydb/core/nbs/cloud/blockstore/libs/common/block_range_map.h11
-rw-r--r--ydb/core/nbs/cloud/blockstore/libs/service/partition_direct_service.h6
-rw-r--r--ydb/core/nbs/cloud/blockstore/libs/service/partition_direct_service_mock.h7
-rw-r--r--ydb/core/nbs/cloud/blockstore/libs/service/storage_test.h7
-rw-r--r--ydb/core/nbs/cloud/blockstore/libs/storage/partition_direct/base_test_fixture.cpp2
-rw-r--r--ydb/core/nbs/cloud/blockstore/libs/storage/partition_direct/ddisk_data_copier_ut.cpp3
-rw-r--r--ydb/core/nbs/cloud/blockstore/libs/storage/partition_direct/direct_block_group.h11
-rw-r--r--ydb/core/nbs/cloud/blockstore/libs/storage/partition_direct/direct_block_group_impl.cpp118
-rw-r--r--ydb/core/nbs/cloud/blockstore/libs/storage/partition_direct/direct_block_group_impl.h12
-rw-r--r--ydb/core/nbs/cloud/blockstore/libs/storage/partition_direct/direct_block_group_mock.cpp24
-rw-r--r--ydb/core/nbs/cloud/blockstore/libs/storage/partition_direct/direct_block_group_mock.h11
-rw-r--r--ydb/core/nbs/cloud/blockstore/libs/storage/partition_direct/dirty_map/dirty_map.cpp33
-rw-r--r--ydb/core/nbs/cloud/blockstore/libs/storage/partition_direct/dirty_map/dirty_map.h5
-rw-r--r--ydb/core/nbs/cloud/blockstore/libs/storage/partition_direct/dirty_map/dirty_map_ut.cpp45
-rw-r--r--ydb/core/nbs/cloud/blockstore/libs/storage/partition_direct/dirty_map/inflight_info.cpp36
-rw-r--r--ydb/core/nbs/cloud/blockstore/libs/storage/partition_direct/dirty_map/inflight_info.h20
-rw-r--r--ydb/core/nbs/cloud/blockstore/libs/storage/partition_direct/dirty_map/inflight_info_ut.cpp32
-rw-r--r--ydb/core/nbs/cloud/blockstore/libs/storage/partition_direct/erase_request.cpp2
-rw-r--r--ydb/core/nbs/cloud/blockstore/libs/storage/partition_direct/fast_path_service.cpp86
-rw-r--r--ydb/core/nbs/cloud/blockstore/libs/storage/partition_direct/fast_path_service.h19
-rw-r--r--ydb/core/nbs/cloud/blockstore/libs/storage/partition_direct/model/host_stat.h1
-rw-r--r--ydb/core/nbs/cloud/blockstore/libs/storage/partition_direct/partition_direct_ut.cpp233
-rw-r--r--ydb/core/nbs/cloud/blockstore/libs/storage/partition_direct/read_request_ut.cpp3
-rw-r--r--ydb/core/nbs/cloud/blockstore/libs/storage/partition_direct/region.cpp2
-rw-r--r--ydb/core/nbs/cloud/blockstore/libs/storage/partition_direct/region.h1
-rw-r--r--ydb/core/nbs/cloud/blockstore/libs/storage/partition_direct/vchunk.cpp20
-rw-r--r--ydb/core/nbs/cloud/blockstore/libs/storage/partition_direct/vchunk.h6
-rw-r--r--ydb/core/nbs/cloud/blockstore/libs/storage/partition_direct/vchunk_ut.cpp7
-rw-r--r--ydb/core/nbs/cloud/blockstore/libs/storage/partition_direct/write_request_bundle.cpp9
-rw-r--r--ydb/core/nbs/cloud/blockstore/libs/storage/partition_direct/write_request_bundle.h4
-rw-r--r--ydb/core/nbs/cloud/blockstore/libs/storage/partition_direct/write_request_test_fixture.cpp8
-rw-r--r--ydb/core/nbs/cloud/blockstore/libs/storage/storage_transport/ic_storage_transport.cpp40
-rw-r--r--ydb/core/nbs/cloud/blockstore/libs/storage/storage_transport/ic_storage_transport.h7
-rw-r--r--ydb/core/nbs/cloud/blockstore/libs/storage/storage_transport/ic_storage_transport_actor.cpp81
-rw-r--r--ydb/core/nbs/cloud/blockstore/libs/storage/storage_transport/ic_storage_transport_actor.h17
-rw-r--r--ydb/core/nbs/cloud/blockstore/libs/storage/storage_transport/ic_storage_transport_events.cpp7
-rw-r--r--ydb/core/nbs/cloud/blockstore/libs/storage/storage_transport/ic_storage_transport_events.h43
-rw-r--r--ydb/core/nbs/cloud/blockstore/libs/storage/storage_transport/storage_transport.h8
41 files changed, 861 insertions, 132 deletions
diff --git a/ydb/core/nbs/cloud/blockstore/config/config.cpp b/ydb/core/nbs/cloud/blockstore/config/config.cpp
index e12d288db3d..5c34ec6e837 100644
--- a/ydb/core/nbs/cloud/blockstore/config/config.cpp
+++ b/ydb/core/nbs/cloud/blockstore/config/config.cpp
@@ -40,6 +40,7 @@ TStorageConfig::TStorageConfig(
xxx(DirtyMapDebugPrintInterval, TDuration, TDuration::Seconds(0) )\
xxx(VhostThreadsCount, ui32, 4 )\
xxx(VhostQueuesCount, ui32, 4 )\
+ xxx(PBufferCleanupLsnStep, ui64, 0 )\
// BLOCKSTORE_STORAGE_CONFIG_RO
// clang-format on
diff --git a/ydb/core/nbs/cloud/blockstore/config/config.h b/ydb/core/nbs/cloud/blockstore/config/config.h
index f1ba614ec71..86283e0a8c2 100644
--- a/ydb/core/nbs/cloud/blockstore/config/config.h
+++ b/ydb/core/nbs/cloud/blockstore/config/config.h
@@ -41,6 +41,7 @@ public:
[[nodiscard]] TDuration GetDirtyMapDebugPrintInterval() const;
[[nodiscard]] ui32 GetVhostThreadsCount() const;
[[nodiscard]] ui32 GetVhostQueuesCount() const;
+ [[nodiscard]] ui64 GetPBufferCleanupLsnStep() const;
private:
NProto::TStorageServiceConfig StorageServiceConfig;
diff --git a/ydb/core/nbs/cloud/blockstore/config/protos/storage.proto b/ydb/core/nbs/cloud/blockstore/config/protos/storage.proto
index 35c32c65031..5fe0488db73 100644
--- a/ydb/core/nbs/cloud/blockstore/config/protos/storage.proto
+++ b/ydb/core/nbs/cloud/blockstore/config/protos/storage.proto
@@ -121,4 +121,8 @@ message TStorageServiceConfig
// (each queue can be served by a different VHOST thread). Capped
// at runtime by VhostThreadsCount.
optional uint32 VhostQueuesCount = 25;
+
+ // Persistent buffer cleanup is triggered once the cleanup LSN advances by
+ // at least this many LSNs since the last cleanup. 0 disables cleanup.
+ optional uint64 PBufferCleanupLsnStep = 26;
}
diff --git a/ydb/core/nbs/cloud/blockstore/libs/common/block_range_map.h b/ydb/core/nbs/cloud/blockstore/libs/common/block_range_map.h
index 52142b30805..abf602d5155 100644
--- a/ydb/core/nbs/cloud/blockstore/libs/common/block_range_map.h
+++ b/ydb/core/nbs/cloud/blockstore/libs/common/block_range_map.h
@@ -214,6 +214,17 @@ public:
return Ranges.empty();
}
+ [[nodiscard]] std::optional<TKey> GetMinKey() const
+ {
+ std::optional<TKey> minKey;
+ for (const auto& [itemKey, value]: Ranges) {
+ if (!minKey || itemKey.Key < *minKey) {
+ minKey = itemKey.Key;
+ }
+ }
+ return minKey;
+ }
+
[[nodiscard]] size_t Size() const
{
return Ranges.size();
diff --git a/ydb/core/nbs/cloud/blockstore/libs/service/partition_direct_service.h b/ydb/core/nbs/cloud/blockstore/libs/service/partition_direct_service.h
index 6aecb121cc6..424aecef9f3 100644
--- a/ydb/core/nbs/cloud/blockstore/libs/service/partition_direct_service.h
+++ b/ydb/core/nbs/cloud/blockstore/libs/service/partition_direct_service.h
@@ -37,6 +37,12 @@ struct IPartitionDirectService
// local DB. Caller must ensure cfg.IsValid().
virtual void UpdateVChunkConfig(
const NStorage::NPartitionDirect::TVChunkConfig& cfg) = 0;
+
+ // Generates the next tablet-wide write LSN. Called by a vchunk on its
+ // executor thread when it starts processing a write, so generation and
+ // dirty-map registration happen on the same thread. Also drives periodic
+ // persistent buffer cleanup.
+ virtual ui64 GenerateLsn() = 0;
};
////////////////////////////////////////////////////////////////////////////////
diff --git a/ydb/core/nbs/cloud/blockstore/libs/service/partition_direct_service_mock.h b/ydb/core/nbs/cloud/blockstore/libs/service/partition_direct_service_mock.h
index 212464fc152..fadd0108a70 100644
--- a/ydb/core/nbs/cloud/blockstore/libs/service/partition_direct_service_mock.h
+++ b/ydb/core/nbs/cloud/blockstore/libs/service/partition_direct_service_mock.h
@@ -37,6 +37,13 @@ struct TPartitionDirectServiceMock: public IPartitionDirectService
{
Y_UNUSED(cfg);
}
+
+ ui64 LsnGenerator = 0;
+
+ ui64 GenerateLsn() override
+ {
+ return ++LsnGenerator;
+ }
};
using TPartitionDirectServiceMockPtr =
diff --git a/ydb/core/nbs/cloud/blockstore/libs/service/storage_test.h b/ydb/core/nbs/cloud/blockstore/libs/service/storage_test.h
index 9b23e313cfb..b4973d053ab 100644
--- a/ydb/core/nbs/cloud/blockstore/libs/service/storage_test.h
+++ b/ydb/core/nbs/cloud/blockstore/libs/service/storage_test.h
@@ -46,6 +46,13 @@ public:
{
Y_UNUSED(cfg);
}
+
+ ui64 LsnGenerator = 0;
+
+ ui64 GenerateLsn() override
+ {
+ return ++LsnGenerator;
+ }
};
struct TTestStorage: public IStorage
diff --git a/ydb/core/nbs/cloud/blockstore/libs/storage/partition_direct/base_test_fixture.cpp b/ydb/core/nbs/cloud/blockstore/libs/storage/partition_direct/base_test_fixture.cpp
index 15b078ce163..65f5b049a9f 100644
--- a/ydb/core/nbs/cloud/blockstore/libs/storage/partition_direct/base_test_fixture.cpp
+++ b/ydb/core/nbs/cloud/blockstore/libs/storage/partition_direct/base_test_fixture.cpp
@@ -203,7 +203,7 @@ void TBaseFixture::Init()
return future;
};
- DirectBlockGroup->EraseFromPBufferHandler = [&] //
+ DirectBlockGroup->BatchEraseFromPBufferHandler = [&] //
(ui32 vChunkIndex,
THostIndex hostIndex,
const TVector<TPBufferSegment>& segments,
diff --git a/ydb/core/nbs/cloud/blockstore/libs/storage/partition_direct/ddisk_data_copier_ut.cpp b/ydb/core/nbs/cloud/blockstore/libs/storage/partition_direct/ddisk_data_copier_ut.cpp
index 5df681ea283..6d56e7506c7 100644
--- a/ydb/core/nbs/cloud/blockstore/libs/storage/partition_direct/ddisk_data_copier_ut.cpp
+++ b/ydb/core/nbs/cloud/blockstore/libs/storage/partition_direct/ddisk_data_copier_ut.cpp
@@ -355,16 +355,19 @@ Y_UNIT_TEST_SUITE(TDDiskDataCopierTest)
// Mark DDisk#1 completely fresh.
DirtyMap.MarkFresh(FreshDDisk, 0);
+ DirtyMap.RegisterInflightWrite(123, TBlockRange64::WithLength(10, 10));
DirtyMap.WriteFinished(
123,
TBlockRange64::WithLength(10, 10), // #0
MakePrimariesMask(),
MakePrimariesMask());
+ DirtyMap.RegisterInflightWrite(124, TBlockRange64::WithLength(250, 10));
DirtyMap.WriteFinished(
124,
TBlockRange64::WithLength(250, 10), // #0 + #1
MakePrimariesMask(),
MakePrimariesMask());
+ DirtyMap.RegisterInflightWrite(125, TBlockRange64::WithLength(260, 10));
DirtyMap.WriteFinished(
125,
TBlockRange64::WithLength(260, 10), // #1
diff --git a/ydb/core/nbs/cloud/blockstore/libs/storage/partition_direct/direct_block_group.h b/ydb/core/nbs/cloud/blockstore/libs/storage/partition_direct/direct_block_group.h
index f6b2e2c4af7..9c562dd5292 100644
--- a/ydb/core/nbs/cloud/blockstore/libs/storage/partition_direct/direct_block_group.h
+++ b/ydb/core/nbs/cloud/blockstore/libs/storage/partition_direct/direct_block_group.h
@@ -188,12 +188,21 @@ public:
const NWilson::TTraceId& traceId) = 0;
// Batch operation to erase a list of PBuffer entries.
- virtual NThreading::TFuture<TDBGEraseResponse> EraseFromPBuffer(
+ virtual NThreading::TFuture<TDBGEraseResponse> BatchEraseFromPBuffer(
ui32 vChunkIndex,
THostIndex hostIndex,
const TVector<TPBufferSegment>& segments,
const NWilson::TTraceId& traceId) = 0;
+ virtual void BarrierEraseFromPBuffer(ui64 lsn) = 0;
+
+ // The lowest lsn that must be preserved across all vchunks of this
+ // DirectBlockGroup (records below it are safe to erase). Used to compute
+ // the tablet-wide cleanup watermark. Resolves on the executor thread.
+ // nullopt means nothing is inflight here.
+ virtual NThreading::TFuture<std::optional<ui64>>
+ GatherSafeBarrierForErase() = 0;
+
// Get a list of all entries in PBuffers belonging to a given vChunkIndex.
virtual NThreading::TFuture<TDBGRestoreResponse> RestoreDBGPBuffers(
ui32 vChunkIndex) = 0;
diff --git a/ydb/core/nbs/cloud/blockstore/libs/storage/partition_direct/direct_block_group_impl.cpp b/ydb/core/nbs/cloud/blockstore/libs/storage/partition_direct/direct_block_group_impl.cpp
index 7e011564184..b01e603a9bb 100644
--- a/ydb/core/nbs/cloud/blockstore/libs/storage/partition_direct/direct_block_group_impl.cpp
+++ b/ydb/core/nbs/cloud/blockstore/libs/storage/partition_direct/direct_block_group_impl.cpp
@@ -772,7 +772,7 @@ TDBGFlushResponse TDirectBlockGroup::HandleSyncWithPBufferResponse(
return result;
}
-NThreading::TFuture<TDBGEraseResponse> TDirectBlockGroup::EraseFromPBuffer(
+NThreading::TFuture<TDBGEraseResponse> TDirectBlockGroup::BatchEraseFromPBuffer(
ui32 vChunkIndex,
THostIndex hostIndex,
const TVector<TPBufferSegment>& segments,
@@ -795,11 +795,12 @@ NThreading::TFuture<TDBGEraseResponse> TDirectBlockGroup::EraseFromPBuffer(
lsns.push_back(segment.Lsn);
}
- auto childSpan = CreateChildSpan(traceId, "NbsPartition.EraseFromPBuffer");
+ auto childSpan =
+ CreateChildSpan(traceId, "NbsPartition.BatchEraseFromPBuffer");
OnRequest(hostIndex, EOperation::Erase);
- auto future = StorageTransport->EraseFromPBuffer(
+ auto future = StorageTransport->BatchEraseFromPBuffer(
PBufferConnections[hostIndex].HostConnection,
std::move(selectors),
std::move(lsns),
@@ -851,6 +852,117 @@ NThreading::TFuture<TDBGEraseResponse> TDirectBlockGroup::EraseFromPBuffer(
return result;
}
+void TDirectBlockGroup::BarrierEraseFromPBuffer(ui64 lsn)
+{
+ Executor->ExecuteSimple(
+ [weakSelf = weak_from_this(), lsn]()
+ {
+ auto self = weakSelf.lock();
+ if (!self) {
+ return;
+ }
+ LOG_DEBUG(
+ *self->ActorSystem,
+ NKikimrServices::NBS_PARTITION,
+ "%s barrier-erase lsn=%lu on %lu PBuffer hosts",
+ self->LogTitle.GetWithTime().c_str(),
+ lsn,
+ self->PBufferConnections.size());
+ for (THostIndex h = 0; h < self->PBufferConnections.size(); ++h) {
+ self->DoBarrierEraseFromPBuffer(h, lsn, NWilson::TTraceId{});
+ }
+ });
+}
+
+void TDirectBlockGroup::DoBarrierEraseFromPBuffer(
+ THostIndex hostIndex,
+ ui64 lsn,
+ const NWilson::TTraceId& traceId)
+{
+ Y_ABORT_UNLESS(ExecutorThreadChecker.Check());
+
+ using TEvErasePersistentBufferResult =
+ NKikimrBlobStorage::NDDisk::TEvErasePersistentBufferResult;
+
+ const auto startAt = TMonotonic::Now();
+
+ auto childSpan =
+ CreateChildSpan(traceId, "NbsPartition.DoBarrierEraseFromPBuffer");
+
+ OnRequest(hostIndex, EOperation::BarrierErase);
+
+ auto future = StorageTransport->BarrierEraseFromPBuffer(
+ PBufferConnections[hostIndex].HostConnection,
+ lsn,
+ childSpan.get());
+
+ future.Subscribe(
+ [weakSelf = weak_from_this(),
+ childSpan = std::move(childSpan),
+ hostIndex,
+ startAt,
+ executor = Executor,
+ threadChecker = ExecutorThreadChecker.CreateDelegate()] //
+ (const TFuture<TEvErasePersistentBufferResult>& f) mutable
+ {
+ // ActorSystem thread
+
+ executor->ExecuteSimple(
+ [weakSelf,
+ childSpan = std::move(childSpan),
+ hostIndex,
+ startAt,
+ threadChecker,
+ result = UnsafeExtractValue(f)] //
+ () mutable
+ {
+ Y_ABORT_UNLESS(threadChecker.Check());
+
+ auto self = weakSelf.lock();
+ if (!self) {
+ return;
+ }
+ self->OnResponse(
+ hostIndex,
+ TMonotonic::Now() - startAt,
+ EOperation::BarrierErase,
+ TranslateError(result));
+ });
+ });
+}
+
+NThreading::TFuture<std::optional<ui64>>
+TDirectBlockGroup::GatherSafeBarrierForErase()
+{
+ auto promise = NewPromise<std::optional<ui64>>();
+ auto future = promise.GetFuture();
+
+ Executor->ExecuteSimple(
+ [weakSelf = weak_from_this(), promise]() mutable
+ {
+ auto self = weakSelf.lock();
+ if (!self) {
+ promise.SetValue(std::nullopt);
+ return;
+ }
+
+ std::optional<ui64> safeBarrier;
+ for (const auto& weakVChunk: self->VChunks) {
+ auto vChunk = weakVChunk.lock();
+ if (!vChunk) {
+ continue;
+ }
+ const auto lsn = vChunk->GetSafeBarrierForErase();
+ if (lsn && (!safeBarrier || *lsn < *safeBarrier)) {
+ safeBarrier = lsn;
+ }
+ }
+ promise.SetValue(safeBarrier);
+ });
+
+ return future;
+}
+
NThreading::TFuture<TDBGRestoreResponse> TDirectBlockGroup::RestoreDBGPBuffers(
ui32 vChunkIndex)
{
diff --git a/ydb/core/nbs/cloud/blockstore/libs/storage/partition_direct/direct_block_group_impl.h b/ydb/core/nbs/cloud/blockstore/libs/storage/partition_direct/direct_block_group_impl.h
index 280ed627e47..a5ebf2465ce 100644
--- a/ydb/core/nbs/cloud/blockstore/libs/storage/partition_direct/direct_block_group_impl.h
+++ b/ydb/core/nbs/cloud/blockstore/libs/storage/partition_direct/direct_block_group_impl.h
@@ -106,12 +106,17 @@ public:
const TVector<TPBufferSegment>& segments,
const NWilson::TTraceId& traceId) override;
- NThreading::TFuture<TDBGEraseResponse> EraseFromPBuffer(
+ NThreading::TFuture<TDBGEraseResponse> BatchEraseFromPBuffer(
ui32 vChunkIndex,
THostIndex hostIndex,
const TVector<TPBufferSegment>& segments,
const NWilson::TTraceId& traceId) override;
+ void BarrierEraseFromPBuffer(ui64 lsn) override;
+
+ NThreading::TFuture<std::optional<ui64>>
+ GatherSafeBarrierForErase() override;
+
NThreading::TFuture<TDBGRestoreResponse> RestoreDBGPBuffers(
ui32 vChunkIndex) override;
@@ -169,6 +174,11 @@ private:
const TEvSyncWithPersistentBufferResult& response,
size_t segmentCount);
+ void DoBarrierEraseFromPBuffer(
+ THostIndex hostIndex,
+ ui64 lsn,
+ const NWilson::TTraceId& traceId);
+
void DoRestore(
NThreading::TPromise<TDBGRestoreResponse> promise,
ui32 vChunkIndex);
diff --git a/ydb/core/nbs/cloud/blockstore/libs/storage/partition_direct/direct_block_group_mock.cpp b/ydb/core/nbs/cloud/blockstore/libs/storage/partition_direct/direct_block_group_mock.cpp
index 8f192b4d647..dfd73bdb244 100644
--- a/ydb/core/nbs/cloud/blockstore/libs/storage/partition_direct/direct_block_group_mock.cpp
+++ b/ydb/core/nbs/cloud/blockstore/libs/storage/partition_direct/direct_block_group_mock.cpp
@@ -107,9 +107,9 @@ TDirectBlockGroupMock::TDirectBlockGroupMock()
Y_ABORT_UNLESS(false, "Should set SyncWithPBufferHandler");
return NThreading::TFuture<TDBGFlushResponse>();
};
- EraseFromPBufferHandler = [](const auto&...)
+ BatchEraseFromPBufferHandler = [](const auto&...)
{
- Y_ABORT_UNLESS(false, "Should set EraseFromPBufferHandler");
+ Y_ABORT_UNLESS(false, "Should set BatchEraseFromPBufferHandler");
return NThreading::TFuture<TDBGEraseResponse>();
};
RestoreDBGPBuffersHandler = [](const auto&...)
@@ -269,13 +269,29 @@ NThreading::TFuture<TDBGFlushResponse> TDirectBlockGroupMock::SyncWithPBuffer(
traceId);
}
-NThreading::TFuture<TDBGEraseResponse> TDirectBlockGroupMock::EraseFromPBuffer(
+NThreading::TFuture<TDBGEraseResponse>
+TDirectBlockGroupMock::BatchEraseFromPBuffer(
ui32 vChunkIndex,
THostIndex hostIndex,
const TVector<TPBufferSegment>& segments,
const NWilson::TTraceId& traceId)
{
- return EraseFromPBufferHandler(vChunkIndex, hostIndex, segments, traceId);
+ return BatchEraseFromPBufferHandler(
+ vChunkIndex,
+ hostIndex,
+ segments,
+ traceId);
+}
+
+void TDirectBlockGroupMock::BarrierEraseFromPBuffer(ui64 lsn)
+{
+ Y_UNUSED(lsn);
+}
+
+NThreading::TFuture<std::optional<ui64>>
+TDirectBlockGroupMock::GatherSafeBarrierForErase()
+{
+ return NThreading::MakeFuture<std::optional<ui64>>(std::nullopt);
}
NThreading::TFuture<TDBGRestoreResponse>
diff --git a/ydb/core/nbs/cloud/blockstore/libs/storage/partition_direct/direct_block_group_mock.h b/ydb/core/nbs/cloud/blockstore/libs/storage/partition_direct/direct_block_group_mock.h
index 0e94b146000..3a8109497df 100644
--- a/ydb/core/nbs/cloud/blockstore/libs/storage/partition_direct/direct_block_group_mock.h
+++ b/ydb/core/nbs/cloud/blockstore/libs/storage/partition_direct/direct_block_group_mock.h
@@ -94,7 +94,7 @@ public:
THostIndex ddiskHostIndex,
const TVector<TPBufferSegment>& segments,
const NWilson::TTraceId& traceId)>;
- using TEraseFromPBufferHandler =
+ using TBatchEraseFromPBufferHandler =
std::function<NThreading::TFuture<TDBGEraseResponse>(
ui32 vChunkIndex,
THostIndex hostIndex,
@@ -119,7 +119,7 @@ public:
TWriteBlocksToPBufferHandler WriteBlocksToPBufferHandler;
TWriteBlocksToManyPBuffersHandler WriteBlocksToManyPBuffersHandler;
TSyncWithPBufferHandler SyncWithPBufferHandler;
- TEraseFromPBufferHandler EraseFromPBufferHandler;
+ TBatchEraseFromPBufferHandler BatchEraseFromPBufferHandler;
TDBGRestoreHandler RestoreDBGPBuffersHandler;
TListPBuffersHandler ListPBuffersHandler;
TDBGDumpHandler DumpHandler;
@@ -189,12 +189,17 @@ public:
const TVector<TPBufferSegment>& segments,
const NWilson::TTraceId& traceId) override;
- NThreading::TFuture<TDBGEraseResponse> EraseFromPBuffer(
+ NThreading::TFuture<TDBGEraseResponse> BatchEraseFromPBuffer(
ui32 vChunkIndex,
THostIndex hostIndex,
const TVector<TPBufferSegment>& segments,
const NWilson::TTraceId& traceId) override;
+ void BarrierEraseFromPBuffer(ui64 lsn) override;
+
+ NThreading::TFuture<std::optional<ui64>>
+ GatherSafeBarrierForErase() override;
+
NThreading::TFuture<TDBGRestoreResponse> RestoreDBGPBuffers(
ui32 vChunkIndex) override;
diff --git a/ydb/core/nbs/cloud/blockstore/libs/storage/partition_direct/dirty_map/dirty_map.cpp b/ydb/core/nbs/cloud/blockstore/libs/storage/partition_direct/dirty_map/dirty_map.cpp
index 82a2627ad09..f93fb3dee18 100644
--- a/ydb/core/nbs/cloud/blockstore/libs/storage/partition_direct/dirty_map/dirty_map.cpp
+++ b/ydb/core/nbs/cloud/blockstore/libs/storage/partition_direct/dirty_map/dirty_map.cpp
@@ -485,26 +485,34 @@ TEraseHints TBlocksDirtyMap::MakeEraseBelatedHint()
return result;
}
+void TBlocksDirtyMap::RegisterInflightWrite(ui64 lsn, TBlockRange64 range)
+{
+ const bool inserted = Inflight.AddRange(
+ lsn,
+ range,
+ TInflightInfo(this, lsn, range.Size() * BlockSize));
+ Y_ABORT_UNLESS(inserted);
+}
+
void TBlocksDirtyMap::WriteFinished(
ui64 lsn,
TBlockRange64 range,
THostMask requested,
THostMask confirmed)
{
+ // Every write is pre-registered as pending at generation time (see
+ // RegisterInflightWrite), so the entry always exists here.
+ auto item = Inflight.GetValue(lsn);
+ Y_ABORT_UNLESS(item);
+ Y_ABORT_UNLESS(item->Range == range);
+
if (confirmed.Count() < QuorumDirectBlockGroupHostCount) {
+ const bool removed = Inflight.RemoveRange(lsn);
+ Y_ABORT_UNLESS(removed);
return;
}
- const bool inserted = Inflight.AddRange(
- lsn,
- range,
- TInflightInfo(
- this,
- lsn,
- range.Size() * BlockSize,
- requested,
- confirmed));
- Y_ABORT_UNLESS(inserted);
+ item->Value.OnWritten(requested, confirmed);
}
void TBlocksDirtyMap::FlushFinished(
@@ -636,6 +644,11 @@ ui64 TBlocksDirtyMap::GetMinErasePendingLsn() const
return *ReadyToErase.begin();
}
+std::optional<ui64> TBlocksDirtyMap::GetSafeBarrierForErase() const
+{
+ return Inflight.GetMinKey();
+}
+
const TPBufferCounters& TBlocksDirtyMap::GetPBufferCounters(
THostIndex host) const
{
diff --git a/ydb/core/nbs/cloud/blockstore/libs/storage/partition_direct/dirty_map/dirty_map.h b/ydb/core/nbs/cloud/blockstore/libs/storage/partition_direct/dirty_map/dirty_map.h
index 792a77be8ed..91abd9100ab 100644
--- a/ydb/core/nbs/cloud/blockstore/libs/storage/partition_direct/dirty_map/dirty_map.h
+++ b/ydb/core/nbs/cloud/blockstore/libs/storage/partition_direct/dirty_map/dirty_map.h
@@ -221,6 +221,10 @@ public:
[[nodiscard]] TEraseHints MakeEraseHint(size_t batchSize);
[[nodiscard]] TEraseHints MakeEraseBelatedHint();
+ // Registers a write as pending (lsn generated, data not in any PBuffer
+ // yet) so that the cleanup bound covers it from the moment of generation.
+ void RegisterInflightWrite(ui64 lsn, TBlockRange64 range);
+
void WriteFinished(
ui64 lsn,
TBlockRange64 range,
@@ -258,6 +262,7 @@ public:
[[nodiscard]] size_t GetEraseBelatedCount() const;
[[nodiscard]] ui64 GetMinFlushPendingLsn() const;
[[nodiscard]] ui64 GetMinErasePendingLsn() const;
+ [[nodiscard]] std::optional<ui64> GetSafeBarrierForErase() const;
[[nodiscard]] const TPBufferCounters& GetPBufferCounters(
THostIndex host) const;
diff --git a/ydb/core/nbs/cloud/blockstore/libs/storage/partition_direct/dirty_map/dirty_map_ut.cpp b/ydb/core/nbs/cloud/blockstore/libs/storage/partition_direct/dirty_map/dirty_map_ut.cpp
index 0dfded36268..e3b8473ddb8 100644
--- a/ydb/core/nbs/cloud/blockstore/libs/storage/partition_direct/dirty_map/dirty_map_ut.cpp
+++ b/ydb/core/nbs/cloud/blockstore/libs/storage/partition_direct/dirty_map/dirty_map_ut.cpp
@@ -323,6 +323,7 @@ Y_UNIT_TEST_SUITE(TDirtyMapTest)
DefaultBlockSize,
DefaultVChunkSize / DefaultBlockSize);
+ dirtyMap.RegisterInflightWrite(123, TBlockRange64::WithLength(10, 10));
dirtyMap.WriteFinished(
123,
TBlockRange64::WithLength(10, 10),
@@ -371,12 +372,14 @@ Y_UNIT_TEST_SUITE(TDirtyMapTest)
DefaultBlockSize,
DefaultVChunkSize / DefaultBlockSize);
+ dirtyMap.RegisterInflightWrite(123, TBlockRange64::WithLength(10, 10));
dirtyMap.WriteFinished(
123,
TBlockRange64::WithLength(10, 10),
MakePrimaryHosts(),
MakePrimaryHosts());
+ dirtyMap.RegisterInflightWrite(124, TBlockRange64::WithLength(10, 10));
dirtyMap.WriteFinished(
124,
TBlockRange64::WithLength(10, 10),
@@ -446,6 +449,7 @@ Y_UNIT_TEST_SUITE(TDirtyMapTest)
// Flush commands should be generated after completing the required
// number of write operations.
+ dirtyMap.RegisterInflightWrite(123, TBlockRange64::WithLength(10, 10));
dirtyMap.WriteFinished(
123,
TBlockRange64::WithLength(10, 10),
@@ -458,6 +462,7 @@ Y_UNIT_TEST_SUITE(TDirtyMapTest)
flushHint = dirtyMap.MakeFlushHint(2);
UNIT_ASSERT_EQUAL(true, flushHint.Empty());
+ dirtyMap.RegisterInflightWrite(124, TBlockRange64::WithLength(20, 10));
dirtyMap.WriteFinished(
124,
TBlockRange64::WithLength(20, 10),
@@ -580,6 +585,7 @@ Y_UNIT_TEST_SUITE(TDirtyMapTest)
MakeHostMask(false, true, true, true, false);
const THostMask confirmed = requested;
+ dirtyMap.RegisterInflightWrite(123, TBlockRange64::WithLength(10, 10));
dirtyMap.WriteFinished(
123,
TBlockRange64::WithLength(10, 10),
@@ -634,6 +640,7 @@ Y_UNIT_TEST_SUITE(TDirtyMapTest)
MakeHostMask(false, true, true, true, false);
const THostMask confirmed = requested;
+ dirtyMap.RegisterInflightWrite(123, TBlockRange64::WithLength(10, 10));
dirtyMap.WriteFinished(
123,
TBlockRange64::WithLength(10, 10),
@@ -696,6 +703,7 @@ Y_UNIT_TEST_SUITE(TDirtyMapTest)
MakeHostMask(false, false, true, true, true);
const THostMask confirmed = requested;
+ dirtyMap.RegisterInflightWrite(123, TBlockRange64::WithLength(10, 10));
dirtyMap.WriteFinished(
123,
TBlockRange64::WithLength(10, 10),
@@ -753,6 +761,7 @@ Y_UNIT_TEST_SUITE(TDirtyMapTest)
MakeHostMask(true, true, true, false, false);
const THostMask confirmed = requested;
+ dirtyMap.RegisterInflightWrite(123, TBlockRange64::WithLength(10, 10));
dirtyMap.WriteFinished(
123,
TBlockRange64::WithLength(10, 10),
@@ -806,18 +815,21 @@ Y_UNIT_TEST_SUITE(TDirtyMapTest)
const THostMask confirmed = requested;
// Range below write watermark. Should be flushed to 4 ddisks.
+ dirtyMap.RegisterInflightWrite(123, TBlockRange64::WithLength(10, 10));
dirtyMap.WriteFinished(
123,
TBlockRange64::WithLength(10, 10),
requested,
confirmed);
// Range cross write watermark. Should be flushed to 4 ddisks.
+ dirtyMap.RegisterInflightWrite(124, TBlockRange64::WithLength(95, 10));
dirtyMap.WriteFinished(
124,
TBlockRange64::WithLength(95, 10),
requested,
confirmed);
// Range over write watermark. Should be flushed to 3 ddisks.
+ dirtyMap.RegisterInflightWrite(125, TBlockRange64::WithLength(100, 10));
dirtyMap.WriteFinished(
125,
TBlockRange64::WithLength(100, 10),
@@ -850,18 +862,21 @@ Y_UNIT_TEST_SUITE(TDirtyMapTest)
const THostMask confirmed = requested;
// Range below write watermark. Should be flushed.
+ dirtyMap.RegisterInflightWrite(123, TBlockRange64::WithLength(10, 10));
dirtyMap.WriteFinished(
123,
TBlockRange64::WithLength(10, 10),
requested,
confirmed);
// Range cross write watermark. Should be flushed.
+ dirtyMap.RegisterInflightWrite(124, TBlockRange64::WithLength(95, 10));
dirtyMap.WriteFinished(
124,
TBlockRange64::WithLength(95, 10),
requested,
confirmed);
// Range over write watermark. Should not be flushed.
+ dirtyMap.RegisterInflightWrite(125, TBlockRange64::WithLength(100, 10));
dirtyMap.WriteFinished(
125,
TBlockRange64::WithLength(100, 10),
@@ -884,6 +899,7 @@ Y_UNIT_TEST_SUITE(TDirtyMapTest)
DefaultBlockSize,
DefaultVChunkSize / DefaultBlockSize);
+ dirtyMap.RegisterInflightWrite(123, TBlockRange64::WithLength(10, 10));
dirtyMap.WriteFinished(
123,
TBlockRange64::WithLength(10, 10),
@@ -925,6 +941,7 @@ Y_UNIT_TEST_SUITE(TDirtyMapTest)
dirtyMap.LockDDiskRange(TBlockRange64::WithLength(5, 10), mask);
// User write to overlapped with locked range.
+ dirtyMap.RegisterInflightWrite(123, TBlockRange64::WithLength(10, 10));
dirtyMap.WriteFinished(
123,
TBlockRange64::WithLength(10, 10),
@@ -1075,6 +1092,7 @@ Y_UNIT_TEST_SUITE(TDirtyMapTest)
DefaultBlockSize,
DefaultVChunkSize / DefaultBlockSize);
+ dirtyMap.RegisterInflightWrite(123, TBlockRange64::WithLength(0, 100));
dirtyMap.WriteFinished(
123,
TBlockRange64::WithLength(0, 100),
@@ -1110,6 +1128,7 @@ Y_UNIT_TEST_SUITE(TDirtyMapTest)
dirtyMap.EraseFinished(THostIndex{0}, {123}, {});
+ dirtyMap.RegisterInflightWrite(124, TBlockRange64::WithLength(10, 10));
dirtyMap.WriteFinished(
124,
TBlockRange64::WithLength(10, 10),
@@ -1178,12 +1197,14 @@ Y_UNIT_TEST_SUITE(TDirtyMapTest)
DefaultBlockSize,
DefaultVChunkSize / DefaultBlockSize);
+ dirtyMap.RegisterInflightWrite(100, TBlockRange64::WithLength(10, 10));
dirtyMap.WriteFinished(
100,
TBlockRange64::WithLength(10, 10),
MakePrimaryHosts(),
MakePrimaryHosts());
+ dirtyMap.RegisterInflightWrite(200, TBlockRange64::WithLength(30, 10));
dirtyMap.WriteFinished(
200,
TBlockRange64::WithLength(30, 10),
@@ -1210,12 +1231,14 @@ Y_UNIT_TEST_SUITE(TDirtyMapTest)
DefaultBlockSize,
DefaultVChunkSize / DefaultBlockSize);
+ dirtyMap.RegisterInflightWrite(100, TBlockRange64::WithLength(10, 41));
dirtyMap.WriteFinished(
100,
TBlockRange64::WithLength(10, 41),
MakePrimaryHosts(),
MakePrimaryHosts());
+ dirtyMap.RegisterInflightWrite(200, TBlockRange64::WithLength(20, 11));
dirtyMap.WriteFinished(
200,
TBlockRange64::WithLength(20, 11),
@@ -1232,6 +1255,7 @@ Y_UNIT_TEST_SUITE(TDirtyMapTest)
"100{[H0,H1,H2][31..50][21..40]};",
readHint.DebugPrint());
+ dirtyMap.RegisterInflightWrite(300, TBlockRange64::WithLength(0, 50));
dirtyMap.WriteFinished(
300,
TBlockRange64::WithLength(0, 50),
@@ -1253,12 +1277,14 @@ Y_UNIT_TEST_SUITE(TDirtyMapTest)
DefaultBlockSize,
DefaultVChunkSize / DefaultBlockSize);
+ dirtyMap.RegisterInflightWrite(100, TBlockRange64::WithLength(10, 21));
dirtyMap.WriteFinished(
100,
TBlockRange64::WithLength(10, 21),
MakePrimaryHosts(),
MakePrimaryHosts());
+ dirtyMap.RegisterInflightWrite(200, TBlockRange64::WithLength(25, 21));
dirtyMap.WriteFinished(
200,
TBlockRange64::WithLength(25, 21),
@@ -1283,18 +1309,21 @@ Y_UNIT_TEST_SUITE(TDirtyMapTest)
DefaultBlockSize,
DefaultVChunkSize / DefaultBlockSize);
+ dirtyMap.RegisterInflightWrite(100, TBlockRange64::WithLength(10, 41));
dirtyMap.WriteFinished(
100,
TBlockRange64::WithLength(10, 41),
MakePrimaryHosts(),
MakePrimaryHosts());
+ dirtyMap.RegisterInflightWrite(150, TBlockRange64::WithLength(20, 21));
dirtyMap.WriteFinished(
150,
TBlockRange64::WithLength(20, 21),
MakePrimaryHosts(),
MakePrimaryHosts());
+ dirtyMap.RegisterInflightWrite(200, TBlockRange64::WithLength(30, 6));
dirtyMap.WriteFinished(
200,
TBlockRange64::WithLength(30, 6),
@@ -1322,6 +1351,7 @@ Y_UNIT_TEST_SUITE(TDirtyMapTest)
DefaultBlockSize,
DefaultVChunkSize / DefaultBlockSize);
+ dirtyMap.RegisterInflightWrite(100, TBlockRange64::WithLength(10, 10));
dirtyMap.WriteFinished(
100,
TBlockRange64::WithLength(10, 10),
@@ -1345,12 +1375,14 @@ Y_UNIT_TEST_SUITE(TDirtyMapTest)
DefaultBlockSize,
DefaultVChunkSize / DefaultBlockSize);
+ dirtyMap.RegisterInflightWrite(100, TBlockRange64::WithLength(10, 100));
dirtyMap.WriteFinished(
100,
TBlockRange64::WithLength(10, 100),
MakePrimaryHosts(),
MakePrimaryHosts());
+ dirtyMap.RegisterInflightWrite(200, TBlockRange64::WithLength(10, 40));
dirtyMap.WriteFinished(
200,
TBlockRange64::WithLength(10, 40),
@@ -1378,6 +1410,7 @@ Y_UNIT_TEST_SUITE(TDirtyMapTest)
const int lsnsCount = 100;
for (int i = 1; i <= lsnsCount; ++i) {
+ dirtyMap.RegisterInflightWrite(i, TBlockRange64::WithLength(i, 1));
dirtyMap.WriteFinished(
i,
TBlockRange64::WithLength(i, 1),
@@ -1413,18 +1446,21 @@ Y_UNIT_TEST_SUITE(TDirtyMapTest)
DefaultBlockSize,
DefaultVChunkSize / DefaultBlockSize);
+ dirtyMap.RegisterInflightWrite(100, TBlockRange64::WithLength(10, 21));
dirtyMap.WriteFinished(
100,
TBlockRange64::WithLength(10, 21),
MakePrimaryHosts(),
MakePrimaryHosts());
+ dirtyMap.RegisterInflightWrite(200, TBlockRange64::WithLength(25, 21));
dirtyMap.WriteFinished(
200,
TBlockRange64::WithLength(25, 21),
MakePrimaryHosts(),
MakePrimaryHosts());
+ dirtyMap.RegisterInflightWrite(300, TBlockRange64::WithLength(40, 21));
dirtyMap.WriteFinished(
300,
TBlockRange64::WithLength(40, 21),
@@ -1450,18 +1486,21 @@ Y_UNIT_TEST_SUITE(TDirtyMapTest)
DefaultBlockSize,
DefaultVChunkSize / DefaultBlockSize);
+ dirtyMap.RegisterInflightWrite(100, TBlockRange64::WithLength(10, 6));
dirtyMap.WriteFinished(
100,
TBlockRange64::WithLength(10, 6),
MakePrimaryHosts(),
MakePrimaryHosts());
+ dirtyMap.RegisterInflightWrite(200, TBlockRange64::WithLength(25, 6));
dirtyMap.WriteFinished(
200,
TBlockRange64::WithLength(25, 6),
MakePrimaryHosts(),
MakePrimaryHosts());
+ dirtyMap.RegisterInflightWrite(300, TBlockRange64::WithLength(45, 6));
dirtyMap.WriteFinished(
300,
TBlockRange64::WithLength(45, 6),
@@ -1490,24 +1529,28 @@ Y_UNIT_TEST_SUITE(TDirtyMapTest)
DefaultBlockSize,
DefaultVChunkSize / DefaultBlockSize);
+ dirtyMap.RegisterInflightWrite(100, TBlockRange64::WithLength(10, 91));
dirtyMap.WriteFinished(
100,
TBlockRange64::WithLength(10, 91),
MakePrimaryHosts(),
MakePrimaryHosts());
+ dirtyMap.RegisterInflightWrite(200, TBlockRange64::WithLength(20, 6));
dirtyMap.WriteFinished(
200,
TBlockRange64::WithLength(20, 6),
MakePrimaryHosts(),
MakePrimaryHosts());
+ dirtyMap.RegisterInflightWrite(300, TBlockRange64::WithLength(40, 6));
dirtyMap.WriteFinished(
300,
TBlockRange64::WithLength(40, 6),
MakePrimaryHosts(),
MakePrimaryHosts());
+ dirtyMap.RegisterInflightWrite(400, TBlockRange64::WithLength(70, 6));
dirtyMap.WriteFinished(
400,
TBlockRange64::WithLength(70, 6),
@@ -1538,6 +1581,7 @@ Y_UNIT_TEST_SUITE(TDirtyMapTest)
DefaultVChunkSize / DefaultBlockSize);
auto inflightCounterBeforeWrite = dirtyMap.GetInflightCount();
+ dirtyMap.RegisterInflightWrite(100, TBlockRange64::WithLength(10, 41));
dirtyMap.WriteFinished(
100,
TBlockRange64::WithLength(10, 41),
@@ -1557,6 +1601,7 @@ Y_UNIT_TEST_SUITE(TDirtyMapTest)
"0{[H0,H1,H2][10..50][0..40]};",
readHint.DebugPrint());
+ dirtyMap.RegisterInflightWrite(200, TBlockRange64::WithLength(10, 41));
dirtyMap.WriteFinished(
200,
TBlockRange64::WithLength(10, 41),
diff --git a/ydb/core/nbs/cloud/blockstore/libs/storage/partition_direct/dirty_map/inflight_info.cpp b/ydb/core/nbs/cloud/blockstore/libs/storage/partition_direct/dirty_map/inflight_info.cpp
index 0ea01e62024..6f954dfad69 100644
--- a/ydb/core/nbs/cloud/blockstore/libs/storage/partition_direct/dirty_map/inflight_info.cpp
+++ b/ydb/core/nbs/cloud/blockstore/libs/storage/partition_direct/dirty_map/inflight_info.cpp
@@ -31,21 +31,16 @@ TInflightInfo::TInflightInfo(
TInflightInfo::TInflightInfo(
IReadyQueue* readyQueue,
ui64 lsn,
- size_t byteCount,
- THostMask writeRequested,
- THostMask writeConfirmed)
- : State(EState::PBufferWritten)
+ size_t byteCount)
+ : State(EState::PBufferPendingWrite)
, ReadyQueue(readyQueue)
, Lsn(lsn)
, ByteCount(byteCount)
, StartAt(TInstant::Now())
- , WriteRequested(writeRequested)
- , WriteConfirmed(writeConfirmed)
{
- Y_ABORT_UNLESS(WriteConfirmed.Count() >= QuorumDirectBlockGroupHostCount);
-
- ReadyQueue->Register(Lsn, IReadyQueue::EQueueType::Flush);
- ApplyBytes(WriteRequested, IReadyQueue::EPBufferCounter::Total, true);
+ // Pending: no PBuffer holds the data yet, so nothing is registered in a
+ // ready queue and no bytes are accounted. The write is not acknowledged, so
+ // reads ignore it (PBufferPendingWrite reads from DDisk, never blocks).
}
TInflightInfo::TInflightInfo(TInflightInfo&& other) noexcept
@@ -99,6 +94,22 @@ void TInflightInfo::RestorePBuffer(THostIndex host)
}
}
+void TInflightInfo::OnWritten(
+ THostMask writeRequested,
+ THostMask writeConfirmed)
+{
+ Y_ABORT_UNLESS(State == EState::PBufferPendingWrite);
+ Y_ABORT_UNLESS(WriteConfirmed.Count() == 0);
+ Y_ABORT_UNLESS(writeConfirmed.Count() >= QuorumDirectBlockGroupHostCount);
+
+ WriteRequested = writeRequested;
+ WriteConfirmed = writeConfirmed;
+ State = EState::PBufferWritten;
+
+ ApplyBytes(WriteRequested, IReadyQueue::EPBufferCounter::Total, true);
+ ReadyQueue->Register(Lsn, IReadyQueue::EQueueType::Flush);
+}
+
TInflightInfo::EState TInflightInfo::GetState() const
{
return State;
@@ -115,6 +126,11 @@ NThreading::TFuture<void> TInflightInfo::GetQuorumReadyFuture()
TReadSource TInflightInfo::ReadMask() const
{
switch (State) {
+ case EState::PBufferPendingWrite:
+ // The write is not acknowledged yet, so it is invisible to reads:
+ // read the pre-write data from DDisk (Lsn=0). Never blocks.
+ return {THostMask::MakeAll(MaxHostCount), /*Lsn=*/0};
+
case EState::PBufferIncompleteWrite:
// Reading will be possible only after receiving a quorum.
return {THostMask::MakeEmpty(), /*Lsn=*/0};
diff --git a/ydb/core/nbs/cloud/blockstore/libs/storage/partition_direct/dirty_map/inflight_info.h b/ydb/core/nbs/cloud/blockstore/libs/storage/partition_direct/dirty_map/inflight_info.h
index 7bb1a74745d..654e7ff206b 100644
--- a/ydb/core/nbs/cloud/blockstore/libs/storage/partition_direct/dirty_map/inflight_info.h
+++ b/ydb/core/nbs/cloud/blockstore/libs/storage/partition_direct/dirty_map/inflight_info.h
@@ -78,6 +78,11 @@ class TInflightInfo: public TDisableCopy
public:
enum class EState
{
+ // The lsn is generated but the write has not been acknowledged yet.
+ // Tracked only to hold the cleanup watermark; invisible to reads (a
+ // concurrent read sees the pre-write data on DDisk, as before).
+ PBufferPendingWrite,
+
// During the recovery, a item without quorum was detected. It must be
// copied to other PBuffers.
// Reading will be possible only after receiving a quorum.
@@ -109,12 +114,11 @@ public:
ui64 lsn,
size_t byteCount,
THostIndex host);
- TInflightInfo(
- IReadyQueue* readyQueues,
- ui64 lsn,
- size_t byteCount,
- THostMask writeRequested,
- THostMask writeConfirmed);
+
+ // Pending write: lsn is generated but data is not in any PBuffer yet.
+ // ReadMask is empty (reads wait on the quorum future) and the write is not
+ // flushable. Call OnWritten once a quorum of PBuffers confirms the write.
+ TInflightInfo(IReadyQueue* readyQueue, ui64 lsn, size_t byteCount);
TInflightInfo(TInflightInfo&& other) noexcept;
@@ -125,6 +129,10 @@ public:
void RestorePBuffer(THostIndex host);
+ // Transitions a pending write (see the byteCount-only constructor) to the
+ // written state once a quorum of PBuffers confirmed it.
+ void OnWritten(THostMask writeRequested, THostMask writeConfirmed);
+
[[nodiscard]] EState GetState() const;
// The subscription is triggered when the quorum is reached.
diff --git a/ydb/core/nbs/cloud/blockstore/libs/storage/partition_direct/dirty_map/inflight_info_ut.cpp b/ydb/core/nbs/cloud/blockstore/libs/storage/partition_direct/dirty_map/inflight_info_ut.cpp
index b9985ec07a6..5b80be73d6f 100644
--- a/ydb/core/nbs/cloud/blockstore/libs/storage/partition_direct/dirty_map/inflight_info_ut.cpp
+++ b/ydb/core/nbs/cloud/blockstore/libs/storage/partition_direct/dirty_map/inflight_info_ut.cpp
@@ -99,12 +99,8 @@ Y_UNIT_TEST_SUITE(TInflightInfoTests)
Y_UNIT_TEST(ShouldHandleConfirmedWrite)
{
TTestReadyQueue readyQueue;
- TInflightInfo inflightInfo(
- &readyQueue,
- 123,
- 4096,
- MakePrimaryHosts(),
- MakePrimaryHosts());
+ TInflightInfo inflightInfo(&readyQueue, 123, 4096);
+ inflightInfo.OnWritten(MakePrimaryHosts(), MakePrimaryHosts());
UNIT_ASSERT_VALUES_EQUAL(true, readyQueue.ReadyToFlush.contains(123));
// Start flushes
@@ -158,12 +154,8 @@ Y_UNIT_TEST_SUITE(TInflightInfoTests)
Y_UNIT_TEST(ShouldHandleLock)
{
TTestReadyQueue readyQueue;
- TInflightInfo inflightInfo(
- &readyQueue,
- 123,
- 4096,
- MakePrimaryHosts(),
- MakePrimaryHosts());
+ TInflightInfo inflightInfo(&readyQueue, 123, 4096);
+ inflightInfo.OnWritten(MakePrimaryHosts(), MakePrimaryHosts());
UNIT_ASSERT_VALUES_EQUAL(true, readyQueue.ReadyToFlush.contains(123));
// Start flushes
@@ -219,12 +211,8 @@ Y_UNIT_TEST_SUITE(TInflightInfoTests)
Y_UNIT_TEST(ShouldPutToReadyQueueOnFail)
{
TTestReadyQueue readyQueue;
- TInflightInfo inflightInfo(
- &readyQueue,
- 123,
- 4096,
- MakePrimaryHosts(),
- MakePrimaryHosts());
+ TInflightInfo inflightInfo(&readyQueue, 123, 4096);
+ inflightInfo.OnWritten(MakePrimaryHosts(), MakePrimaryHosts());
// Flush started
UNIT_ASSERT_VALUES_EQUAL(
@@ -308,12 +296,8 @@ Y_UNIT_TEST_SUITE(TInflightInfoTests)
{
TTestReadyQueue readyQueue;
{
- TInflightInfo inflightInfo(
- &readyQueue,
- 123,
- 4096,
- MakePrimaryHosts(),
- MakePrimaryHosts());
+ TInflightInfo inflightInfo(&readyQueue, 123, 4096);
+ inflightInfo.OnWritten(MakePrimaryHosts(), MakePrimaryHosts());
UNIT_ASSERT_VALUES_EQUAL(
4096,
diff --git a/ydb/core/nbs/cloud/blockstore/libs/storage/partition_direct/erase_request.cpp b/ydb/core/nbs/cloud/blockstore/libs/storage/partition_direct/erase_request.cpp
index 434db986f4f..551fb965a70 100644
--- a/ydb/core/nbs/cloud/blockstore/libs/storage/partition_direct/erase_request.cpp
+++ b/ydb/core/nbs/cloud/blockstore/libs/storage/partition_direct/erase_request.cpp
@@ -38,7 +38,7 @@ TEraseRequestExecutor::~TEraseRequestExecutor()
void TEraseRequestExecutor::Run()
{
- auto future = DirectBlockGroup->EraseFromPBuffer(
+ auto future = DirectBlockGroup->BatchEraseFromPBuffer(
VChunkConfig.GetVChunkIndex(),
Host,
Hint.Segments,
diff --git a/ydb/core/nbs/cloud/blockstore/libs/storage/partition_direct/fast_path_service.cpp b/ydb/core/nbs/cloud/blockstore/libs/storage/partition_direct/fast_path_service.cpp
index 0c45357b028..61501cebe07 100644
--- a/ydb/core/nbs/cloud/blockstore/libs/storage/partition_direct/fast_path_service.cpp
+++ b/ydb/core/nbs/cloud/blockstore/libs/storage/partition_direct/fast_path_service.cpp
@@ -87,6 +87,11 @@ NMonitoring::TDynamicCounterPtr MakeCountersChain(
return result;
}
+size_t RegionCount(ui64 blockCount, ui32 blockSize)
+{
+ return AlignUp(blockCount * blockSize, RegionSize) / RegionSize;
+}
+
TVector<std::shared_ptr<TRegion>> CreateRegions(
IPartitionDirectService* partitionDirectService,
ui64 blockCount,
@@ -96,10 +101,9 @@ TVector<std::shared_ptr<TRegion>> CreateRegions(
const TStorageConfig& storageConfig,
NMonitoring::TDynamicCounterPtr counters)
{
- const ui64 volumeSize = blockCount * blockSize;
- const ui64 regionsCount = AlignUp(volumeSize, RegionSize) / RegionSize;
- TVector<std::shared_ptr<TRegion>> regions(regionsCount);
- for (size_t i = 0; i < regionsCount; i++) {
+ const size_t regionCount = RegionCount(blockCount, blockSize);
+ TVector<std::shared_ptr<TRegion>> regions(regionCount);
+ for (size_t i = 0; i < regionCount; i++) {
NMonitoring::TDynamicCounterPtr regionCounters =
counters->GetSubgroup("region", ToString(i));
@@ -260,7 +264,6 @@ TFastPathService::WriteBlocksLocal(
auto result = Regions[regionIndex]->WriteBlocksLocal(
std::move(callContext),
std::move(request),
- GenerateSequenceNumber(),
span->GetTraceId());
result.Subscribe(
@@ -338,9 +341,78 @@ void TFastPathService::UpdateVChunkConfig(const TVChunkConfig& cfg)
new TEvPartitionDirectPrivate::TEvUpdateVChunkConfig(cfg));
}
-ui64 TFastPathService::GenerateSequenceNumber()
+ui64 TFastPathService::GenerateLsn()
+{
+ const ui64 lsn = ++SequenceGenerator;
+ MaybeTriggerPBufferCleanup(lsn);
+ return lsn;
+}
+
+void TFastPathService::MaybeTriggerPBufferCleanup(ui64 lsn)
+{
+ const ui64 step = StorageConfig->GetPBufferCleanupLsnStep();
+ if (!step || lsn % step != 0) {
+ return;
+ }
+ // lsn values are unique, so exactly one generator hits each multiple of
+ // step. Skip if a previous cleanup is still gathering, to avoid overlap.
+ bool expected = false;
+ if (!CleanupGather.Active.compare_exchange_strong(expected, true)) {
+ return;
+ }
+ PBufferCleanup();
+}
+
+void TFastPathService::PBufferCleanup()
{
- return ++SequenceGenerator;
+ // Pull the smallest inflight lsn from every DirectBlockGroup. Each group
+ // writes its own result slot; the last responder computes the global
+ // minimum (see FinishPBufferCleanup).
+ const size_t dbgCount = DirectBlockGroups.size();
+ CleanupGather.SafeBarriers.assign(dbgCount, std::nullopt);
+ CleanupGather.PendingResponses.store(dbgCount);
+
+ for (size_t i = 0; i < dbgCount; ++i) {
+ DirectBlockGroups[i]->GatherSafeBarrierForErase().Subscribe(
+ [weakSelf = weak_from_this(), i] //
+ (const NThreading::TFuture<std::optional<ui64>>& f)
+ {
+ if (auto self = weakSelf.lock()) {
+ self->OnGatherSafeBarrierForErase(i, f.GetValue());
+ }
+ });
+ }
+}
+
+void TFastPathService::OnGatherSafeBarrierForErase(
+ size_t dbgIndex,
+ std::optional<ui64> safeBarrier)
+{
+ CleanupGather.SafeBarriers[dbgIndex] = safeBarrier;
+ if (CleanupGather.PendingResponses.fetch_sub(1) == 1) {
+ FinishPBufferCleanup();
+ }
+}
+
+void TFastPathService::FinishPBufferCleanup()
+{
+ std::optional<ui64> globalMin;
+ for (const auto& safeBarrier: CleanupGather.SafeBarriers) {
+ if (safeBarrier && (!globalMin || *safeBarrier < *globalMin)) {
+ globalMin = safeBarrier;
+ }
+ }
+
+ CleanupGather.Active.store(false);
+
+ if (!globalMin) {
+ return;
+ }
+
+ const ui64 cleanupBound = *globalMin - 1;
+ for (const auto& dbg: DirectBlockGroups) {
+ dbg->BarrierEraseFromPBuffer(cleanupBound);
+ }
}
void TFastPathService::ScheduleDirtyMapDebugPrint()
diff --git a/ydb/core/nbs/cloud/blockstore/libs/storage/partition_direct/fast_path_service.h b/ydb/core/nbs/cloud/blockstore/libs/storage/partition_direct/fast_path_service.h
index 4509a99c649..94ae6971b06 100644
--- a/ydb/core/nbs/cloud/blockstore/libs/storage/partition_direct/fast_path_service.h
+++ b/ydb/core/nbs/cloud/blockstore/libs/storage/partition_direct/fast_path_service.h
@@ -44,6 +44,15 @@ private:
size_t DumpCount = 0;
TMap<size_t, TDBGDumpResponse> DebugDumps;
+ struct TPBufferCleanupGather
+ {
+ std::atomic<bool> Active{false};
+ TVector<std::optional<ui64>> SafeBarriers;
+ std::atomic<size_t> PendingResponses{0};
+ };
+
+ TPBufferCleanupGather CleanupGather;
+
public:
TFastPathService(
NActors::TActorSystem* actorSystem,
@@ -89,11 +98,19 @@ public:
void UpdateVChunkConfig(const TVChunkConfig& cfg) override;
+ ui64 GenerateLsn() override;
+
private:
- ui64 GenerateSequenceNumber();
void ScheduleDirtyMapDebugPrint();
void QueryDirtyMapDebugDump();
void OnDebugDump(size_t dbgIndex, TDBGDumpResponse dump);
+
+ void MaybeTriggerPBufferCleanup(ui64 lsn);
+ void PBufferCleanup();
+ void OnGatherSafeBarrierForErase(
+ size_t dbgIndex,
+ std::optional<ui64> safeBarrier);
+ void FinishPBufferCleanup();
};
} // namespace NYdb::NBS::NBlockStore::NStorage::NPartitionDirect
diff --git a/ydb/core/nbs/cloud/blockstore/libs/storage/partition_direct/model/host_stat.h b/ydb/core/nbs/cloud/blockstore/libs/storage/partition_direct/model/host_stat.h
index 53e8045de94..861b8954158 100644
--- a/ydb/core/nbs/cloud/blockstore/libs/storage/partition_direct/model/host_stat.h
+++ b/ydb/core/nbs/cloud/blockstore/libs/storage/partition_direct/model/host_stat.h
@@ -18,6 +18,7 @@ enum class EOperation
Flush,
FlushCrossNode,
Erase,
+ BarrierErase,
// Must remain the last entry. Used to size per-operation containers.
Count_,
diff --git a/ydb/core/nbs/cloud/blockstore/libs/storage/partition_direct/partition_direct_ut.cpp b/ydb/core/nbs/cloud/blockstore/libs/storage/partition_direct/partition_direct_ut.cpp
index c847a9f34d8..a93a3c1beb6 100644
--- a/ydb/core/nbs/cloud/blockstore/libs/storage/partition_direct/partition_direct_ut.cpp
+++ b/ydb/core/nbs/cloud/blockstore/libs/storage/partition_direct/partition_direct_ut.cpp
@@ -48,7 +48,9 @@ struct TScopedNbsService: TDisableCopyMove
[[nodiscard]] TScopedNbsService SetupStorage(
TEnvironmentSetup& env,
EWriteMode writeMode,
- TDuration writeHedgingDelay = TDuration::Seconds(1))
+ TDuration writeHedgingDelay = TDuration::Seconds(1),
+ ui64 pbufferCleanupLsnStep = 0,
+ ui32 syncRequestsBatchSize = 0)
{
env.CreateBoxAndPool();
env.Sim(TDuration::Seconds(30));
@@ -82,6 +84,10 @@ struct TScopedNbsService: TDisableCopyMove
storageConfig->SetWriteMode(GetProtoWriteMode(writeMode));
storageConfig->SetVChunkSize(DefaultVChunkSize);
storageConfig->SetWriteHedgingDelay(writeHedgingDelay.MicroSeconds());
+ storageConfig->SetPBufferCleanupLsnStep(pbufferCleanupLsnStep);
+ if (syncRequestsBatchSize) {
+ storageConfig->SetSyncRequestsBatchSize(syncRequestsBatchSize);
+ }
return TScopedNbsService(nbsConfig);
}
@@ -178,6 +184,55 @@ TActorId GetLoadActorAdapterActorId(
return loadActorAdapter;
}
+void WriteBlock(
+ TEnvironmentSetup& env,
+ const TActorId& loadActorAdapter,
+ const TActorId& edge,
+ ui64 index,
+ const TString& data)
+{
+ auto request = std::make_unique<TEvService::TEvWriteBlocksRequest>();
+ request->Record.SetStartIndex(index);
+ request->Record.MutableBlocks()->AddBuffers(data);
+
+ env.Runtime->Send(
+ new IEventHandle(loadActorAdapter, edge, request.release()),
+ edge.NodeId());
+
+ auto res = env.WaitForEdgeActorEvent<TEvService::TEvWriteBlocksResponse>(
+ edge,
+ false);
+ UNIT_ASSERT_VALUES_EQUAL_C(
+ S_OK,
+ res->Get()->Record.GetError().GetCode(),
+ FormatError(res->Get()->Record.GetError()));
+}
+
+TString ReadBlock(
+ TEnvironmentSetup& env,
+ const TActorId& loadActorAdapter,
+ const TActorId& edge,
+ ui64 index)
+{
+ auto request = std::make_unique<TEvService::TEvReadBlocksRequest>();
+ request->Record.SetStartIndex(index);
+ request->Record.SetBlocksCount(1);
+
+ env.Runtime->Send(
+ new IEventHandle(loadActorAdapter, edge, request.release()),
+ edge.NodeId());
+
+ auto res = env.WaitForEdgeActorEvent<TEvService::TEvReadBlocksResponse>(
+ edge,
+ false);
+ UNIT_ASSERT_VALUES_EQUAL_C(
+ S_OK,
+ res->Get()->Record.GetError().GetCode(),
+ FormatError(res->Get()->Record.GetError()));
+ UNIT_ASSERT_VALUES_EQUAL(1, res->Get()->Record.GetBlocks().BuffersSize());
+ return res->Get()->Record.GetBlocks().GetBuffers(0);
+}
+
} // namespace
////////////////////////////////////////////////////////////////////////////////
@@ -973,6 +1028,182 @@ Y_UNIT_TEST_SUITE(TPartitionDirectTest)
expectedData);
}
}
+
+ // PBuffer cleanup: once the write LSN advances by PBufferCleanupLsnStep the
+ // tablet barrier-erases PBuffer records up to the cleanup bound. Drive two
+ // write batches and assert a real barrier-erase (TEvErasePersistentBuffer
+ // with Lsn > 0) reaches the persistent buffer, with no data lost.
+ Y_UNIT_TEST(ShouldBarrierErasePBufferOnCleanup)
+ {
+ TEnvironmentSetup env{{
+ .NodeCount = 8,
+ .Erasure = TBlobStorageGroupType::Erasure4Plus2Block,
+ }};
+ auto& runtime = env.Runtime;
+
+ auto scopedService = SetupStorage(
+ env,
+ EWriteMode::DirectPBuffersFilling,
+ TDuration::Seconds(1),
+ /*pbufferCleanupLsnStep=*/4,
+ /*syncRequestsBatchSize=*/1);
+
+ auto partition = CreatePartitionTablet(env);
+
+ const TActorId& edge = runtime->AllocateEdgeActor(
+ env.Settings.ControllerNodeId,
+ __FILE__,
+ __LINE__);
+ auto loadActorAdapter =
+ GetLoadActorAdapterActorId(env, partition, edge);
+
+ TVector<ui64> barrierEraseLsns;
+ runtime->FilterFunction =
+ [&](ui32 /*nodeId*/, std::unique_ptr<IEventHandle>& ev)
+ {
+ if (ev->GetTypeRewrite() ==
+ NDDisk::TEvErasePersistentBuffer::EventType)
+ {
+ barrierEraseLsns.push_back(
+ ev->Get<NDDisk::TEvErasePersistentBuffer>()
+ ->Record.GetLsn());
+ }
+ return true;
+ };
+
+ constexpr ui64 BlockCount = 16;
+ TVector<TString> data(BlockCount);
+ for (ui64 i = 0; i < BlockCount; ++i) {
+ data[i] = NUnitTest::RandomString(DefaultBlockSize, i);
+ }
+
+ // First batch: let it flush+erase so the cleanup floor moves past
+ // lsn 1 (a barrier of lsn 0 is suppressed by design).
+ for (ui64 i = 0; i < BlockCount / 2; ++i) {
+ WriteBlock(env, loadActorAdapter, edge, i, data[i]);
+ }
+ env.Sim(TDuration::Seconds(10));
+
+ // Second batch: still in flight when cleanup triggers, so the barrier
+ // fires at (oldest-in-flight lsn - 1) > 0.
+ for (ui64 i = BlockCount / 2; i < BlockCount; ++i) {
+ WriteBlock(env, loadActorAdapter, edge, i, data[i]);
+ }
+ env.Sim(TDuration::Seconds(10));
+
+ ui64 maxBarrierLsn = 0;
+ for (const ui64 lsn: barrierEraseLsns) {
+ if (lsn > maxBarrierLsn) {
+ maxBarrierLsn = lsn;
+ }
+ }
+ // A real barrier reached the PBuffer and never erased the newest write
+ // (exact lsn is timing-dependent in this sustained-flow test).
+ UNIT_ASSERT_C(
+ maxBarrierLsn > 0 && maxBarrierLsn < BlockCount,
+ "barrier lsn outside (0, " << BlockCount << "): " << maxBarrierLsn);
+
+ // Nothing extra was erased: every block still reads back correctly.
+ for (ui64 i = 0; i < BlockCount; ++i) {
+ UNIT_ASSERT_VALUES_EQUAL(
+ ReadBlock(env, loadActorAdapter, edge, i),
+ data[i]);
+ }
+ }
+
+ // PBuffer cleanup must never barrier-erase a record that has not been
+ // per-record erased yet (still in the dirty map) - even one already
+ // flushed to DDisk; erasing it out from under the dirty map desyncs it
+ // from the PBuffer. We let an initial batch flush+erase (advancing the
+ // floor so the barrier fires), then hold back every per-record erase and
+ // assert the barrier stops at the floor, never reaching the un-erased
+ // records.
+ Y_UNIT_TEST(ShouldNotBarrierEraseUnerasedRecords)
+ {
+ TEnvironmentSetup env{{
+ .NodeCount = 8,
+ .Erasure = TBlobStorageGroupType::Erasure4Plus2Block,
+ }};
+ auto& runtime = env.Runtime;
+
+ auto scopedService = SetupStorage(
+ env,
+ EWriteMode::DirectPBuffersFilling,
+ TDuration::Seconds(1),
+ /*pbufferCleanupLsnStep=*/2,
+ /*syncRequestsBatchSize=*/1);
+
+ auto partition = CreatePartitionTablet(env);
+
+ const TActorId& edge = runtime->AllocateEdgeActor(
+ env.Settings.ControllerNodeId,
+ __FILE__,
+ __LINE__);
+ auto loadActorAdapter =
+ GetLoadActorAdapterActorId(env, partition, edge);
+
+ bool holdErases = false;
+ TVector<ui64> barrierEraseLsns;
+ runtime->FilterFunction =
+ [&](ui32 /*nodeId*/, std::unique_ptr<IEventHandle>& ev)
+ {
+ const auto type = ev->GetTypeRewrite();
+ if (type == NDDisk::TEvErasePersistentBuffer::EventType) {
+ barrierEraseLsns.push_back(
+ ev->Get<NDDisk::TEvErasePersistentBuffer>()
+ ->Record.GetLsn());
+ } else if (
+ holdErases &&
+ type == NDDisk::TEvBatchErasePersistentBuffer::EventType)
+ {
+ return false; // flush succeeds, but the record is never
+ // erased
+ }
+ return true;
+ };
+
+ // lsn == cumulative write count (one lsn per write on a fresh volume),
+ // so the first ErasedCount writes get lsns 1..ErasedCount and the held
+ // records get lsns > ErasedCount.
+ constexpr ui64 ErasedCount = 4;
+ constexpr ui64 UnerasedCount = 8;
+ TVector<TString> data(ErasedCount + UnerasedCount);
+ for (ui64 i = 0; i < data.size(); ++i) {
+ data[i] = NUnitTest::RandomString(DefaultBlockSize, 1000 + i);
+ }
+
+ // Erased batch: flush + per-record erase, advancing the cleanup floor
+ // past lsn 1 so the barrier can fire.
+ for (ui64 i = 0; i < ErasedCount; ++i) {
+ WriteBlock(env, loadActorAdapter, edge, i, data[i]);
+ }
+ env.Sim(TDuration::Seconds(10));
+
+ // From now on records flush but are never erased -> they stay in the
+ // dirty map, so the barrier must not advance past them.
+ holdErases = true;
+ for (ui64 i = ErasedCount; i < ErasedCount + UnerasedCount; ++i) {
+ WriteBlock(env, loadActorAdapter, edge, i, data[i]);
+ }
+ env.Sim(TDuration::Seconds(5));
+
+ ui64 maxBarrierLsn = 0;
+ for (const ui64 lsn: barrierEraseLsns) {
+ if (lsn > maxBarrierLsn) {
+ maxBarrierLsn = lsn;
+ }
+ }
+ // Floor pinned at ErasedCount+1, so the barrier lands exactly at
+ // ErasedCount and never reaches the un-erased records.
+ UNIT_ASSERT_VALUES_EQUAL(maxBarrierLsn, ErasedCount);
+
+ // The un-erased records still read back correctly.
+ for (ui64 i = ErasedCount; i < ErasedCount + UnerasedCount; ++i) {
+ UNIT_ASSERT_VALUES_EQUAL(
+ ReadBlock(env, loadActorAdapter, edge, i),
+ data[i]);
+ }
+ }
}
} // namespace NYdb::NBS::NBlockStore::NStorage::NPartitionDirect
diff --git a/ydb/core/nbs/cloud/blockstore/libs/storage/partition_direct/read_request_ut.cpp b/ydb/core/nbs/cloud/blockstore/libs/storage/partition_direct/read_request_ut.cpp
index 2c84200ef8a..7f0c76420ab 100644
--- a/ydb/core/nbs/cloud/blockstore/libs/storage/partition_direct/read_request_ut.cpp
+++ b/ydb/core/nbs/cloud/blockstore/libs/storage/partition_direct/read_request_ut.cpp
@@ -65,6 +65,7 @@ Y_UNIT_TEST_SUITE(TReadRequestTest)
.RequestId = 1,
.Range = range});
+ DirtyMap.RegisterInflightWrite(100, TBlockRange64::WithLength(20, 10));
DirtyMap.WriteFinished(
100,
TBlockRange64::WithLength(20, 10),
@@ -131,12 +132,14 @@ Y_UNIT_TEST_SUITE(TReadRequestTest)
{
Init();
+ DirtyMap.RegisterInflightWrite(100, TBlockRange64::WithLength(20, 10));
DirtyMap.WriteFinished(
100,
TBlockRange64::WithLength(20, 10),
VChunkConfig.GetDesiredPBuffers(),
VChunkConfig.GetDesiredPBuffers());
+ DirtyMap.RegisterInflightWrite(200, TBlockRange64::WithLength(40, 10));
DirtyMap.WriteFinished(
200,
TBlockRange64::WithLength(40, 10),
diff --git a/ydb/core/nbs/cloud/blockstore/libs/storage/partition_direct/region.cpp b/ydb/core/nbs/cloud/blockstore/libs/storage/partition_direct/region.cpp
index 5a67f390ea6..557128e0c0c 100644
--- a/ydb/core/nbs/cloud/blockstore/libs/storage/partition_direct/region.cpp
+++ b/ydb/core/nbs/cloud/blockstore/libs/storage/partition_direct/region.cpp
@@ -95,7 +95,6 @@ NThreading::TFuture<TReadBlocksLocalResponse> TRegion::ReadBlocksLocal(
NThreading::TFuture<TWriteBlocksLocalResponse> TRegion::WriteBlocksLocal(
TCallContextPtr callContext,
std::shared_ptr<TWriteBlocksLocalRequest> request,
- ui64 lsn,
const NWilson::TTraceId& traceId)
{
const size_t vChunkIndex = VChunkIndexFromHeaders(request->Headers);
@@ -103,7 +102,6 @@ NThreading::TFuture<TWriteBlocksLocalResponse> TRegion::WriteBlocksLocal(
return VChunks[vChunkIndex]->WriteBlocksLocal(
std::move(callContext),
std::move(request),
- lsn,
traceId);
}
diff --git a/ydb/core/nbs/cloud/blockstore/libs/storage/partition_direct/region.h b/ydb/core/nbs/cloud/blockstore/libs/storage/partition_direct/region.h
index 284ee47569d..784857c8da0 100644
--- a/ydb/core/nbs/cloud/blockstore/libs/storage/partition_direct/region.h
+++ b/ydb/core/nbs/cloud/blockstore/libs/storage/partition_direct/region.h
@@ -40,7 +40,6 @@ public:
NThreading::TFuture<TWriteBlocksLocalResponse> WriteBlocksLocal(
TCallContextPtr callContext,
std::shared_ptr<TWriteBlocksLocalRequest> request,
- ui64 lsn,
const NWilson::TTraceId& traceId);
private:
diff --git a/ydb/core/nbs/cloud/blockstore/libs/storage/partition_direct/vchunk.cpp b/ydb/core/nbs/cloud/blockstore/libs/storage/partition_direct/vchunk.cpp
index 6d95c55dd52..4b792f2d05b 100644
--- a/ydb/core/nbs/cloud/blockstore/libs/storage/partition_direct/vchunk.cpp
+++ b/ydb/core/nbs/cloud/blockstore/libs/storage/partition_direct/vchunk.cpp
@@ -146,7 +146,6 @@ TFuture<TReadBlocksLocalResponse> TVChunk::ReadBlocksLocal(
TFuture<TWriteBlocksLocalResponse> TVChunk::WriteBlocksLocal(
TCallContextPtr callContext,
std::shared_ptr<TWriteBlocksLocalRequest> request,
- ui64 lsn,
const NWilson::TTraceId& traceId)
{
// VHost thread
@@ -177,8 +176,7 @@ TFuture<TWriteBlocksLocalResponse> TVChunk::WriteBlocksLocal(
std::move(request),
traceId,
std::move(callContext),
- vchunkRange,
- lsn);
+ vchunkRange);
bundle->GetSpan().Attribute("VChunkIndex", VChunkConfig.GetVChunkIndex());
@@ -237,6 +235,13 @@ ui64 TVChunk::GetPBufferUsedSize(THostIndex hostIndex) const
return BlocksDirtyMap.GetPBufferCounters(hostIndex).CurrentBytesCount;
}
+std::optional<ui64> TVChunk::GetSafeBarrierForErase() const
+{
+ Y_ABORT_UNLESS(ExecutorThreadChecker.Check());
+
+ return BlocksDirtyMap.GetSafeBarrierForErase();
+}
+
TString TVChunk::DebugPrintDirtyMap()
{
Y_ABORT_UNLESS(ExecutorThreadChecker.Check());
@@ -478,11 +483,18 @@ void TVChunk::DoWriteBlocksLocal(std::shared_ptr<TWriteRequestBundle> bundle)
{
Y_ABORT_UNLESS(ExecutorThreadChecker.Check());
+ // Generate the lsn and register the write as inflight on the same executor
+ // thread, so the cleanup watermark covers it from the moment of generation.
+ const ui64 lsn = PartitionDirectService->GenerateLsn();
+ bundle->SetLsn(lsn);
+ BlocksDirtyMap.RegisterInflightWrite(lsn, bundle->GetVChunkRange());
+
LOG_DEBUG(
*ActorSystem,
NKikimrServices::NBS_PARTITION,
- "%s DoWriteBlocksLocal: %s",
+ "%s DoWriteBlocksLocal: lsn %lu %s",
LogTitle.GetWithTime().c_str(),
+ lsn,
bundle->GetVChunkRange().Print().c_str());
auto writeExecutor = CreateWriteRequestExecutor(
diff --git a/ydb/core/nbs/cloud/blockstore/libs/storage/partition_direct/vchunk.h b/ydb/core/nbs/cloud/blockstore/libs/storage/partition_direct/vchunk.h
index ab1507b54ef..da3877055b2 100644
--- a/ydb/core/nbs/cloud/blockstore/libs/storage/partition_direct/vchunk.h
+++ b/ydb/core/nbs/cloud/blockstore/libs/storage/partition_direct/vchunk.h
@@ -55,7 +55,6 @@ public:
NThreading::TFuture<TWriteBlocksLocalResponse> WriteBlocksLocal(
TCallContextPtr callContext,
std::shared_ptr<TWriteBlocksLocalRequest> request,
- ui64 lsn,
const NWilson::TTraceId& traceId);
void SetHostState(THostIndex hostIndex, EHostState state);
@@ -64,6 +63,11 @@ public:
[[nodiscard]] ui64 GetPBufferUsedSize(THostIndex hostIndex) const;
[[nodiscard]] TString DebugPrintDirtyMap();
+ // This vchunk's contribution to the tablet-wide cleanup watermark: the
+ // smallest lsn still held in PBuffers, or nullopt when nothing is inflight.
+ // Must run on the executor thread.
+ [[nodiscard]] std::optional<ui64> GetSafeBarrierForErase() const;
+
// IWriteClient implementation
void OnWriteBlocksResponse(
std::shared_ptr<TWriteRequestBundle> bundle,
diff --git a/ydb/core/nbs/cloud/blockstore/libs/storage/partition_direct/vchunk_ut.cpp b/ydb/core/nbs/cloud/blockstore/libs/storage/partition_direct/vchunk_ut.cpp
index 71671961b2d..4322a93da7d 100644
--- a/ydb/core/nbs/cloud/blockstore/libs/storage/partition_direct/vchunk_ut.cpp
+++ b/ydb/core/nbs/cloud/blockstore/libs/storage/partition_direct/vchunk_ut.cpp
@@ -40,11 +40,8 @@ Y_UNIT_TEST_SUITE(TVChunkTest)
vchunk->Start();
// Run write request
- auto future = vchunk->WriteBlocksLocal(
- callContext,
- request,
- 1000,
- NWilson::TTraceId());
+ auto future =
+ vchunk->WriteBlocksLocal(callContext, request, NWilson::TTraceId());
// Wait for three PBuffers write requests.
UNIT_ASSERT_VALUES_EQUAL(
diff --git a/ydb/core/nbs/cloud/blockstore/libs/storage/partition_direct/write_request_bundle.cpp b/ydb/core/nbs/cloud/blockstore/libs/storage/partition_direct/write_request_bundle.cpp
index 4c50b6a3eb9..c8148a6a72a 100644
--- a/ydb/core/nbs/cloud/blockstore/libs/storage/partition_direct/write_request_bundle.cpp
+++ b/ydb/core/nbs/cloud/blockstore/libs/storage/partition_direct/write_request_bundle.cpp
@@ -13,8 +13,7 @@ TWriteRequestBundle::TWriteRequestBundle(
std::shared_ptr<TWriteBlocksLocalRequest> request,
const NWilson::TTraceId& traceId,
TCallContextPtr callContext,
- TBlockRange64 vchunkRange,
- ui64 lsn)
+ TBlockRange64 vchunkRange)
: WriteClient(std::move(writeClient))
, Request(std::move(request))
, Span(
@@ -25,7 +24,6 @@ TWriteRequestBundle::TWriteRequestBundle(
actorSystem)
, CallContext(std::move(callContext))
, VChunkRange(vchunkRange)
- , Lsn(lsn)
, Promise(NThreading::NewPromise<TWriteBlocksLocalResponse>())
{}
@@ -81,6 +79,11 @@ TBlockRange64 TWriteRequestBundle::GetVChunkRange() const
return VChunkRange;
}
+void TWriteRequestBundle::SetLsn(ui64 lsn)
+{
+ Lsn = lsn;
+}
+
ui64 TWriteRequestBundle::GetLsn() const
{
return Lsn;
diff --git a/ydb/core/nbs/cloud/blockstore/libs/storage/partition_direct/write_request_bundle.h b/ydb/core/nbs/cloud/blockstore/libs/storage/partition_direct/write_request_bundle.h
index 949c5ee9bc4..02d14a9b72e 100644
--- a/ydb/core/nbs/cloud/blockstore/libs/storage/partition_direct/write_request_bundle.h
+++ b/ydb/core/nbs/cloud/blockstore/libs/storage/partition_direct/write_request_bundle.h
@@ -37,8 +37,7 @@ public:
std::shared_ptr<TWriteBlocksLocalRequest> request,
const NWilson::TTraceId& traceId,
TCallContextPtr callContext,
- TBlockRange64 vchunkRange,
- ui64 lsn);
+ TBlockRange64 vchunkRange);
// Respond via WriteClient to VChunk.
void Reply(
@@ -54,6 +53,7 @@ public:
NThreading::TFuture<TWriteBlocksLocalResponse> GetFuture();
NWilson::TSpan& GetSpan();
TBlockRange64 GetVChunkRange() const;
+ void SetLsn(ui64 lsn);
ui64 GetLsn() const;
TGuardedSgList& GetSgList();
diff --git a/ydb/core/nbs/cloud/blockstore/libs/storage/partition_direct/write_request_test_fixture.cpp b/ydb/core/nbs/cloud/blockstore/libs/storage/partition_direct/write_request_test_fixture.cpp
index dabe8095b6d..5ac55a4eb96 100644
--- a/ydb/core/nbs/cloud/blockstore/libs/storage/partition_direct/write_request_test_fixture.cpp
+++ b/ydb/core/nbs/cloud/blockstore/libs/storage/partition_direct/write_request_test_fixture.cpp
@@ -194,8 +194,8 @@ TWriteRequestTestFixture::CreatePBufferReplicationExecutor(
std::move(originalRequest),
NWilson::TTraceId(),
MakeIntrusive<TCallContext>(),
- Range,
- UserLsn);
+ Range);
+ bundle->SetLsn(UserLsn);
WriteClient->Response.reset();
@@ -223,8 +223,8 @@ TWriteRequestTestFixture::CreateDirectReplicationExecutor(
std::move(originalRequest),
NWilson::TTraceId(),
MakeIntrusive<TCallContext>(),
- Range,
- UserLsn);
+ Range);
+ bundle->SetLsn(UserLsn);
WriteClient->Response.reset();
diff --git a/ydb/core/nbs/cloud/blockstore/libs/storage/storage_transport/ic_storage_transport.cpp b/ydb/core/nbs/cloud/blockstore/libs/storage/storage_transport/ic_storage_transport.cpp
index a2cc3d2856a..daa7b6f2b09 100644
--- a/ydb/core/nbs/cloud/blockstore/libs/storage/storage_transport/ic_storage_transport.cpp
+++ b/ydb/core/nbs/cloud/blockstore/libs/storage/storage_transport/ic_storage_transport.cpp
@@ -128,7 +128,7 @@ TICStorageTransport::WriteToDDisk(
}
TFuture<NKikimrBlobStorage::NDDisk::TEvErasePersistentBufferResult>
-TICStorageTransport::EraseFromPBuffer(
+TICStorageTransport::BatchEraseFromPBuffer(
const THostConnection& connection,
TVector<NKikimr::NDDisk::TBlockSelector> selectors,
TVector<ui64> lsns,
@@ -136,12 +136,38 @@ TICStorageTransport::EraseFromPBuffer(
{
Y_ABORT_UNLESS(connection.ConnectionType == EConnectionType::PBuffer);
- auto request = std::make_unique<TEvTransportPrivate::TEvEraseFromPBuffer>(
- connection.GetServiceId(),
- connection.Credentials,
- std::move(selectors),
- std::move(lsns),
- span ? span->GetTraceId() : NWilson::TTraceId());
+ auto request =
+ std::make_unique<TEvTransportPrivate::TEvBatchEraseFromPBuffer>(
+ connection.GetServiceId(),
+ connection.Credentials,
+ std::move(selectors),
+ std::move(lsns),
+ span ? span->GetTraceId() : NWilson::TTraceId());
+
+ auto future = request->Promise.GetFuture();
+
+ if (span) {
+ span->Event("ActorSystem_Send");
+ }
+ ActorSystem->Send(ICStorageTransportActorId, request.release());
+
+ return future;
+}
+
+TFuture<NKikimrBlobStorage::NDDisk::TEvErasePersistentBufferResult>
+TICStorageTransport::BarrierEraseFromPBuffer(
+ const THostConnection& connection,
+ ui64 lsn,
+ NWilson::TSpan* span)
+{
+ Y_ABORT_UNLESS(connection.ConnectionType == EConnectionType::PBuffer);
+
+ auto request =
+ std::make_unique<TEvTransportPrivate::TEvBarrierEraseFromPBuffer>(
+ connection.GetServiceId(),
+ connection.Credentials,
+ lsn,
+ span ? span->GetTraceId() : NWilson::TTraceId());
auto future = request->Promise.GetFuture();
diff --git a/ydb/core/nbs/cloud/blockstore/libs/storage/storage_transport/ic_storage_transport.h b/ydb/core/nbs/cloud/blockstore/libs/storage/storage_transport/ic_storage_transport.h
index 0e5eb74f376..11acd98f67c 100644
--- a/ydb/core/nbs/cloud/blockstore/libs/storage/storage_transport/ic_storage_transport.h
+++ b/ydb/core/nbs/cloud/blockstore/libs/storage/storage_transport/ic_storage_transport.h
@@ -64,12 +64,17 @@ public:
TVector<ui64> lsns,
NWilson::TSpan* span) override;
- NThreading::TFuture<TEvErasePersistentBufferResult> EraseFromPBuffer(
+ NThreading::TFuture<TEvErasePersistentBufferResult> BatchEraseFromPBuffer(
const THostConnection& connection,
TVector<NKikimr::NDDisk::TBlockSelector> selectors,
TVector<ui64> lsns,
NWilson::TSpan* span) override;
+ NThreading::TFuture<TEvErasePersistentBufferResult> BarrierEraseFromPBuffer(
+ const THostConnection& connection,
+ ui64 lsn,
+ NWilson::TSpan* span) override;
+
NThreading::TFuture<TEvListPersistentBufferResult> ListPBufferEntries(
const THostConnection& connection) override;
diff --git a/ydb/core/nbs/cloud/blockstore/libs/storage/storage_transport/ic_storage_transport_actor.cpp b/ydb/core/nbs/cloud/blockstore/libs/storage/storage_transport/ic_storage_transport_actor.cpp
index 856753245d6..8277aeb03a9 100644
--- a/ydb/core/nbs/cloud/blockstore/libs/storage/storage_transport/ic_storage_transport_actor.cpp
+++ b/ydb/core/nbs/cloud/blockstore/libs/storage/storage_transport/ic_storage_transport_actor.cpp
@@ -93,7 +93,9 @@ TICStorageTransportActor::~TICStorageTransportActor()
RejectAllPending<NDDisk::TEvSyncWithPersistentBufferResult>(
FlushFromPBufferRequests);
RejectAllPending<NDDisk::TEvErasePersistentBufferResult>(
- EraseFromPBufferRequests);
+ BatchEraseFromPBufferRequests);
+ RejectAllPending<NDDisk::TEvErasePersistentBufferResult>(
+ BarrierEraseFromPBufferRequests);
RejectAllPending<NDDisk::TEvListPersistentBufferResult>(
ListPBufferEntriesRequests);
@@ -566,21 +568,22 @@ void TICStorageTransportActor::HandleWriteToDDiskResult(
}
}
-void TICStorageTransportActor::HandleErasePersistentBuffer(
- const TEvTransportPrivate::TEvEraseFromPBuffer::TPtr& ev,
+void TICStorageTransportActor::HandleBatchErasePersistentBuffer(
+ const TEvTransportPrivate::TEvBatchEraseFromPBuffer::TPtr& ev,
const TActorContext& ctx)
{
auto* msg = ev->Get();
const ui64 requestId = ++RequestIdGenerator;
- auto [it, inserted] =
- EraseFromPBufferRequests.emplace(requestId, ev->Release().Release());
+ auto [it, inserted] = BatchEraseFromPBufferRequests.emplace(
+ requestId,
+ ev->Release().Release());
Y_ABORT_UNLESS(inserted);
LOG_DEBUG(
ctx,
NKikimrServices::NBS_PARTITION,
- "Sent TEvEraseFromPBuffer with requestId# %lu",
+ "Sent TEvBatchEraseFromPBuffer with requestId# %lu",
requestId);
auto request = std::make_unique<NDDisk::TEvBatchErasePersistentBuffer>(
@@ -589,6 +592,39 @@ void TICStorageTransportActor::HandleErasePersistentBuffer(
request->AddErase(lsn, msg->Credentials.Generation);
}
+ ctx.Send(MakeHolder<IEventHandle>(
+ msg->ServiceId,
+ ctx.SelfID,
+ request.release(),
+ 0, // flags
+ requestId, // cookie
+ nullptr,
+ std::move(msg->TraceId)));
+}
+
+void TICStorageTransportActor::HandleErasePersistentBuffer(
+ const TEvTransportPrivate::TEvBarrierEraseFromPBuffer::TPtr& ev,
+ const TActorContext& ctx)
+{
+ auto* msg = ev->Get();
+
+ const ui64 requestId = ++RequestIdGenerator;
+ auto [it, inserted] = BarrierEraseFromPBufferRequests.emplace(
+ requestId,
+ ev->Release().Release());
+ Y_ABORT_UNLESS(inserted);
+
+ LOG_DEBUG(
+ ctx,
+ NKikimrServices::NBS_PARTITION,
+ "Sent TEvBarrierEraseFromPBuffer with requestId# %lu lsn# %lu",
+ requestId,
+ msg->Lsn);
+
+ auto request = std::make_unique<NDDisk::TEvErasePersistentBuffer>(
+ msg->Credentials,
+ msg->Lsn);
+
SendWithUndeliveryTracking(
ctx,
msg->ServiceId,
@@ -610,13 +646,13 @@ void TICStorageTransportActor::HandleErasePersistentBufferUndelivery(
"requestId# %lu",
requestId);
- if (auto* r = EraseFromPBufferRequests.FindPtr(requestId)) {
+ if (auto* r = BatchEraseFromPBufferRequests.FindPtr(requestId)) {
auto& request = **r;
auto result =
NKikimrBlobStorage::NDDisk::TEvErasePersistentBufferResult();
SetUndeliveryError(result);
request.Promise.SetValue(std::move(result));
- EraseFromPBufferRequests.erase(requestId);
+ BatchEraseFromPBufferRequests.erase(requestId);
} else {
// That means that request is already completed
LOG_ERROR(
@@ -639,18 +675,24 @@ void TICStorageTransportActor::HandleErasePersistentBufferResult(
"Received TEvErasePersistentBufferResult with requestId# %lu",
requestId);
- if (auto* r = EraseFromPBufferRequests.FindPtr(requestId)) {
+ if (auto* r = BatchEraseFromPBufferRequests.FindPtr(requestId)) {
auto& request = **r;
request.Promise.SetValue(std::move(ev->Get()->Record));
- EraseFromPBufferRequests.erase(requestId);
- } else {
- // That means that request is already completed
- LOG_ERROR(
- ctx,
- NKikimrServices::NBS_PARTITION,
- "ErasePersistentBufferEvent with requestId# %lu not found",
- requestId);
+ BatchEraseFromPBufferRequests.erase(requestId);
+ return;
+ }
+ if (auto* r = BarrierEraseFromPBufferRequests.FindPtr(requestId)) {
+ auto& request = **r;
+ request.Promise.SetValue(std::move(ev->Get()->Record));
+ BarrierEraseFromPBufferRequests.erase(requestId);
+ return;
}
+ // That means that request is already completed
+ LOG_ERROR(
+ ctx,
+ NKikimrServices::NBS_PARTITION,
+ "ErasePersistentBufferEvent with requestId# %lu not found",
+ requestId);
}
void TICStorageTransportActor::HandleReadPersistentBuffer(
@@ -1076,7 +1118,10 @@ STFUNC(TICStorageTransportActor::StateWork)
HFunc(NDDisk::TEvWriteResult, HandleWriteToDDiskResult);
HFunc(
- TEvTransportPrivate::TEvEraseFromPBuffer,
+ TEvTransportPrivate::TEvBatchEraseFromPBuffer,
+ HandleBatchErasePersistentBuffer);
+ HFunc(
+ TEvTransportPrivate::TEvBarrierEraseFromPBuffer,
HandleErasePersistentBuffer);
HFunc(
NDDisk::TEvBatchErasePersistentBuffer,
diff --git a/ydb/core/nbs/cloud/blockstore/libs/storage/storage_transport/ic_storage_transport_actor.h b/ydb/core/nbs/cloud/blockstore/libs/storage/storage_transport/ic_storage_transport_actor.h
index 96d5553bcc7..a10fc47a53b 100644
--- a/ydb/core/nbs/cloud/blockstore/libs/storage/storage_transport/ic_storage_transport_actor.h
+++ b/ydb/core/nbs/cloud/blockstore/libs/storage/storage_transport/ic_storage_transport_actor.h
@@ -33,8 +33,15 @@ private:
THashMap<ui64, std::unique_ptr<TEvTransportPrivate::TEvSyncWithPBuffer>>
FlushFromPBufferRequests;
- THashMap<ui64, std::unique_ptr<TEvTransportPrivate::TEvEraseFromPBuffer>>
- EraseFromPBufferRequests;
+ THashMap<
+ ui64,
+ std::unique_ptr<TEvTransportPrivate::TEvBatchEraseFromPBuffer>>
+ BatchEraseFromPBufferRequests;
+
+ THashMap<
+ ui64,
+ std::unique_ptr<TEvTransportPrivate::TEvBarrierEraseFromPBuffer>>
+ BarrierEraseFromPBufferRequests;
THashMap<ui64, std::unique_ptr<TEvTransportPrivate::TEvListPBufferEntries>>
ListPBufferEntriesRequests;
@@ -97,8 +104,12 @@ private:
const NKikimr::NDDisk::TEvWriteResult::TPtr& ev,
const NActors::TActorContext& ctx);
+ void HandleBatchErasePersistentBuffer(
+ const TEvTransportPrivate::TEvBatchEraseFromPBuffer::TPtr& ev,
+ const NActors::TActorContext& ctx);
+
void HandleErasePersistentBuffer(
- const TEvTransportPrivate::TEvEraseFromPBuffer::TPtr& ev,
+ const TEvTransportPrivate::TEvBarrierEraseFromPBuffer::TPtr& ev,
const NActors::TActorContext& ctx);
void HandleErasePersistentBufferUndelivery(
const NKikimr::NDDisk::TEvBatchErasePersistentBuffer::TPtr& ev,
diff --git a/ydb/core/nbs/cloud/blockstore/libs/storage/storage_transport/ic_storage_transport_events.cpp b/ydb/core/nbs/cloud/blockstore/libs/storage/storage_transport/ic_storage_transport_events.cpp
index e8fba2724d2..dcf6a1cffc5 100644
--- a/ydb/core/nbs/cloud/blockstore/libs/storage/storage_transport/ic_storage_transport_events.cpp
+++ b/ydb/core/nbs/cloud/blockstore/libs/storage/storage_transport/ic_storage_transport_events.cpp
@@ -24,7 +24,12 @@ TEvTransportPrivate::TWriteToDDisk::~TWriteToDDisk()
Y_ABORT_UNLESS(Promise.IsReady());
}
-TEvTransportPrivate::TEraseFromPBuffer::~TEraseFromPBuffer()
+TEvTransportPrivate::TBatchEraseFromPBuffer::~TBatchEraseFromPBuffer()
+{
+ Y_ABORT_UNLESS(Promise.IsReady());
+}
+
+TEvTransportPrivate::TBarrierEraseFromPBuffer::~TBarrierEraseFromPBuffer()
{
Y_ABORT_UNLESS(Promise.IsReady());
}
diff --git a/ydb/core/nbs/cloud/blockstore/libs/storage/storage_transport/ic_storage_transport_events.h b/ydb/core/nbs/cloud/blockstore/libs/storage/storage_transport/ic_storage_transport_events.h
index 3bc49c0d690..dc94f6c6f75 100644
--- a/ydb/core/nbs/cloud/blockstore/libs/storage/storage_transport/ic_storage_transport_events.h
+++ b/ydb/core/nbs/cloud/blockstore/libs/storage/storage_transport/ic_storage_transport_events.h
@@ -105,7 +105,7 @@ struct TEvTransportPrivate
~TWriteToDDisk();
};
- struct TEraseFromPBuffer: TDisableCopyMove
+ struct TBatchEraseFromPBuffer: TDisableCopyMove
{
using TResult =
NKikimrBlobStorage::NDDisk::TEvErasePersistentBufferResult;
@@ -118,7 +118,7 @@ struct TEvTransportPrivate
NThreading::TPromise<TResult> Promise =
NThreading::NewPromise<TResult>();
- TEraseFromPBuffer(
+ TBatchEraseFromPBuffer(
const NActors::TActorId serviceId,
const NKikimr::NDDisk::TQueryCredentials& credentials,
TVector<NKikimr::NDDisk::TBlockSelector> selectors,
@@ -131,7 +131,33 @@ struct TEvTransportPrivate
, TraceId(std::move(traceId))
{}
- ~TEraseFromPBuffer();
+ ~TBatchEraseFromPBuffer();
+ };
+
+ struct TBarrierEraseFromPBuffer: TDisableCopyMove
+ {
+ using TResult =
+ NKikimrBlobStorage::NDDisk::TEvErasePersistentBufferResult;
+
+ const NActors::TActorId ServiceId;
+ const NKikimr::NDDisk::TQueryCredentials Credentials;
+ const ui64 Lsn;
+ NWilson::TTraceId TraceId;
+ NThreading::TPromise<TResult> Promise =
+ NThreading::NewPromise<TResult>();
+
+ TBarrierEraseFromPBuffer(
+ const NActors::TActorId serviceId,
+ const NKikimr::NDDisk::TQueryCredentials& credentials,
+ ui64 lsn,
+ NWilson::TTraceId traceId)
+ : ServiceId(serviceId)
+ , Credentials(credentials)
+ , Lsn(lsn)
+ , TraceId(std::move(traceId))
+ {}
+
+ ~TBarrierEraseFromPBuffer();
};
struct TReadFromPBuffer: TDisableCopyMove
@@ -310,7 +336,8 @@ struct TEvTransportPrivate
EvConnect,
EvWriteToPBuffer,
EvWriteToDDisk,
- EvEraseFromPBuffer,
+ EvBatchEraseFromPBuffer,
+ EvBarrierEraseFromPBuffer,
EvReadFromPBuffer,
EvReadFromDDisk,
EvSyncWithPBuffer,
@@ -335,8 +362,12 @@ struct TEvTransportPrivate
using TEvSyncWithPBuffer =
TRequestEvent<TSyncWithPBuffer, EEvents::EvSyncWithPBuffer>;
- using TEvEraseFromPBuffer =
- TRequestEvent<TEraseFromPBuffer, EEvents::EvEraseFromPBuffer>;
+ using TEvBatchEraseFromPBuffer =
+ TRequestEvent<TBatchEraseFromPBuffer, EEvents::EvBatchEraseFromPBuffer>;
+
+ using TEvBarrierEraseFromPBuffer = TRequestEvent<
+ TBarrierEraseFromPBuffer,
+ EEvents::EvBarrierEraseFromPBuffer>;
using TEvListPBufferEntries =
TRequestEvent<TListPBufferEntries, EEvents::EvListPBufferEntries>;
diff --git a/ydb/core/nbs/cloud/blockstore/libs/storage/storage_transport/storage_transport.h b/ydb/core/nbs/cloud/blockstore/libs/storage/storage_transport/storage_transport.h
index fafd07490ca..f7752c918b7 100644
--- a/ydb/core/nbs/cloud/blockstore/libs/storage/storage_transport/storage_transport.h
+++ b/ydb/core/nbs/cloud/blockstore/libs/storage/storage_transport/storage_transport.h
@@ -116,12 +116,18 @@ public:
NWilson::TSpan* span) = 0;
virtual NThreading::TFuture<TEvErasePersistentBufferResult>
- EraseFromPBuffer(
+ BatchEraseFromPBuffer(
const THostConnection& connection,
TVector<NKikimr::NDDisk::TBlockSelector> selectors,
TVector<ui64> lsns,
NWilson::TSpan* span) = 0;
+ virtual NThreading::TFuture<TEvErasePersistentBufferResult>
+ BarrierEraseFromPBuffer(
+ const THostConnection& connection,
+ ui64 lsn,
+ NWilson::TSpan* span) = 0;
+
virtual NThreading::TFuture<TEvListPersistentBufferResult>
ListPBufferEntries(const THostConnection& connection) = 0;
};