diff options
| author | BarkovBG <[email protected]> | 2026-06-09 01:08:04 +0300 |
|---|---|---|
| committer | GitHub <[email protected]> | 2026-06-09 01:08:04 +0300 |
| commit | fec99c4e60e3f43eedc145ada280f0769861fa01 (patch) | |
| tree | d50e3920c3baa3de37c966c7d1ceaf6104178936 | |
| parent | 5ebf61ab6d0bca1b3d9557723c0c9967804d5ce1 (diff) | |
Introduce barrier cleanup (#41766)
41 files changed, 861 insertions, 132 deletions
diff --git a/ydb/core/nbs/cloud/blockstore/config/config.cpp b/ydb/core/nbs/cloud/blockstore/config/config.cpp index e12d288db3d..5c34ec6e837 100644 --- a/ydb/core/nbs/cloud/blockstore/config/config.cpp +++ b/ydb/core/nbs/cloud/blockstore/config/config.cpp @@ -40,6 +40,7 @@ TStorageConfig::TStorageConfig( xxx(DirtyMapDebugPrintInterval, TDuration, TDuration::Seconds(0) )\ xxx(VhostThreadsCount, ui32, 4 )\ xxx(VhostQueuesCount, ui32, 4 )\ + xxx(PBufferCleanupLsnStep, ui64, 0 )\ // BLOCKSTORE_STORAGE_CONFIG_RO // clang-format on diff --git a/ydb/core/nbs/cloud/blockstore/config/config.h b/ydb/core/nbs/cloud/blockstore/config/config.h index f1ba614ec71..86283e0a8c2 100644 --- a/ydb/core/nbs/cloud/blockstore/config/config.h +++ b/ydb/core/nbs/cloud/blockstore/config/config.h @@ -41,6 +41,7 @@ public: [[nodiscard]] TDuration GetDirtyMapDebugPrintInterval() const; [[nodiscard]] ui32 GetVhostThreadsCount() const; [[nodiscard]] ui32 GetVhostQueuesCount() const; + [[nodiscard]] ui64 GetPBufferCleanupLsnStep() const; private: NProto::TStorageServiceConfig StorageServiceConfig; diff --git a/ydb/core/nbs/cloud/blockstore/config/protos/storage.proto b/ydb/core/nbs/cloud/blockstore/config/protos/storage.proto index 35c32c65031..5fe0488db73 100644 --- a/ydb/core/nbs/cloud/blockstore/config/protos/storage.proto +++ b/ydb/core/nbs/cloud/blockstore/config/protos/storage.proto @@ -121,4 +121,8 @@ message TStorageServiceConfig // (each queue can be served by a different VHOST thread). Capped // at runtime by VhostThreadsCount. optional uint32 VhostQueuesCount = 25; + + // Persistent buffer cleanup is triggered once the cleanup LSN advances by + // at least this many LSNs since the last cleanup. 0 disables cleanup. + optional uint64 PBufferCleanupLsnStep = 26; } diff --git a/ydb/core/nbs/cloud/blockstore/libs/common/block_range_map.h b/ydb/core/nbs/cloud/blockstore/libs/common/block_range_map.h index 52142b30805..abf602d5155 100644 --- a/ydb/core/nbs/cloud/blockstore/libs/common/block_range_map.h +++ b/ydb/core/nbs/cloud/blockstore/libs/common/block_range_map.h @@ -214,6 +214,17 @@ public: return Ranges.empty(); } + [[nodiscard]] std::optional<TKey> GetMinKey() const + { + std::optional<TKey> minKey; + for (const auto& [itemKey, value]: Ranges) { + if (!minKey || itemKey.Key < *minKey) { + minKey = itemKey.Key; + } + } + return minKey; + } + [[nodiscard]] size_t Size() const { return Ranges.size(); diff --git a/ydb/core/nbs/cloud/blockstore/libs/service/partition_direct_service.h b/ydb/core/nbs/cloud/blockstore/libs/service/partition_direct_service.h index 6aecb121cc6..424aecef9f3 100644 --- a/ydb/core/nbs/cloud/blockstore/libs/service/partition_direct_service.h +++ b/ydb/core/nbs/cloud/blockstore/libs/service/partition_direct_service.h @@ -37,6 +37,12 @@ struct IPartitionDirectService // local DB. Caller must ensure cfg.IsValid(). virtual void UpdateVChunkConfig( const NStorage::NPartitionDirect::TVChunkConfig& cfg) = 0; + + // Generates the next tablet-wide write LSN. Called by a vchunk on its + // executor thread when it starts processing a write, so generation and + // dirty-map registration happen on the same thread. Also drives periodic + // persistent buffer cleanup. + virtual ui64 GenerateLsn() = 0; }; //////////////////////////////////////////////////////////////////////////////// diff --git a/ydb/core/nbs/cloud/blockstore/libs/service/partition_direct_service_mock.h b/ydb/core/nbs/cloud/blockstore/libs/service/partition_direct_service_mock.h index 212464fc152..fadd0108a70 100644 --- a/ydb/core/nbs/cloud/blockstore/libs/service/partition_direct_service_mock.h +++ b/ydb/core/nbs/cloud/blockstore/libs/service/partition_direct_service_mock.h @@ -37,6 +37,13 @@ struct TPartitionDirectServiceMock: public IPartitionDirectService { Y_UNUSED(cfg); } + + ui64 LsnGenerator = 0; + + ui64 GenerateLsn() override + { + return ++LsnGenerator; + } }; using TPartitionDirectServiceMockPtr = diff --git a/ydb/core/nbs/cloud/blockstore/libs/service/storage_test.h b/ydb/core/nbs/cloud/blockstore/libs/service/storage_test.h index 9b23e313cfb..b4973d053ab 100644 --- a/ydb/core/nbs/cloud/blockstore/libs/service/storage_test.h +++ b/ydb/core/nbs/cloud/blockstore/libs/service/storage_test.h @@ -46,6 +46,13 @@ public: { Y_UNUSED(cfg); } + + ui64 LsnGenerator = 0; + + ui64 GenerateLsn() override + { + return ++LsnGenerator; + } }; struct TTestStorage: public IStorage diff --git a/ydb/core/nbs/cloud/blockstore/libs/storage/partition_direct/base_test_fixture.cpp b/ydb/core/nbs/cloud/blockstore/libs/storage/partition_direct/base_test_fixture.cpp index 15b078ce163..65f5b049a9f 100644 --- a/ydb/core/nbs/cloud/blockstore/libs/storage/partition_direct/base_test_fixture.cpp +++ b/ydb/core/nbs/cloud/blockstore/libs/storage/partition_direct/base_test_fixture.cpp @@ -203,7 +203,7 @@ void TBaseFixture::Init() return future; }; - DirectBlockGroup->EraseFromPBufferHandler = [&] // + DirectBlockGroup->BatchEraseFromPBufferHandler = [&] // (ui32 vChunkIndex, THostIndex hostIndex, const TVector<TPBufferSegment>& segments, diff --git a/ydb/core/nbs/cloud/blockstore/libs/storage/partition_direct/ddisk_data_copier_ut.cpp b/ydb/core/nbs/cloud/blockstore/libs/storage/partition_direct/ddisk_data_copier_ut.cpp index 5df681ea283..6d56e7506c7 100644 --- a/ydb/core/nbs/cloud/blockstore/libs/storage/partition_direct/ddisk_data_copier_ut.cpp +++ b/ydb/core/nbs/cloud/blockstore/libs/storage/partition_direct/ddisk_data_copier_ut.cpp @@ -355,16 +355,19 @@ Y_UNIT_TEST_SUITE(TDDiskDataCopierTest) // Mark DDisk#1 completely fresh. DirtyMap.MarkFresh(FreshDDisk, 0); + DirtyMap.RegisterInflightWrite(123, TBlockRange64::WithLength(10, 10)); DirtyMap.WriteFinished( 123, TBlockRange64::WithLength(10, 10), // #0 MakePrimariesMask(), MakePrimariesMask()); + DirtyMap.RegisterInflightWrite(124, TBlockRange64::WithLength(250, 10)); DirtyMap.WriteFinished( 124, TBlockRange64::WithLength(250, 10), // #0 + #1 MakePrimariesMask(), MakePrimariesMask()); + DirtyMap.RegisterInflightWrite(125, TBlockRange64::WithLength(260, 10)); DirtyMap.WriteFinished( 125, TBlockRange64::WithLength(260, 10), // #1 diff --git a/ydb/core/nbs/cloud/blockstore/libs/storage/partition_direct/direct_block_group.h b/ydb/core/nbs/cloud/blockstore/libs/storage/partition_direct/direct_block_group.h index f6b2e2c4af7..9c562dd5292 100644 --- a/ydb/core/nbs/cloud/blockstore/libs/storage/partition_direct/direct_block_group.h +++ b/ydb/core/nbs/cloud/blockstore/libs/storage/partition_direct/direct_block_group.h @@ -188,12 +188,21 @@ public: const NWilson::TTraceId& traceId) = 0; // Batch operation to erase a list of PBuffer entries. - virtual NThreading::TFuture<TDBGEraseResponse> EraseFromPBuffer( + virtual NThreading::TFuture<TDBGEraseResponse> BatchEraseFromPBuffer( ui32 vChunkIndex, THostIndex hostIndex, const TVector<TPBufferSegment>& segments, const NWilson::TTraceId& traceId) = 0; + virtual void BarrierEraseFromPBuffer(ui64 lsn) = 0; + + // The lowest lsn that must be preserved across all vchunks of this + // DirectBlockGroup (records below it are safe to erase). Used to compute + // the tablet-wide cleanup watermark. Resolves on the executor thread. + // nullopt means nothing is inflight here. + virtual NThreading::TFuture<std::optional<ui64>> + GatherSafeBarrierForErase() = 0; + // Get a list of all entries in PBuffers belonging to a given vChunkIndex. virtual NThreading::TFuture<TDBGRestoreResponse> RestoreDBGPBuffers( ui32 vChunkIndex) = 0; diff --git a/ydb/core/nbs/cloud/blockstore/libs/storage/partition_direct/direct_block_group_impl.cpp b/ydb/core/nbs/cloud/blockstore/libs/storage/partition_direct/direct_block_group_impl.cpp index 7e011564184..b01e603a9bb 100644 --- a/ydb/core/nbs/cloud/blockstore/libs/storage/partition_direct/direct_block_group_impl.cpp +++ b/ydb/core/nbs/cloud/blockstore/libs/storage/partition_direct/direct_block_group_impl.cpp @@ -772,7 +772,7 @@ TDBGFlushResponse TDirectBlockGroup::HandleSyncWithPBufferResponse( return result; } -NThreading::TFuture<TDBGEraseResponse> TDirectBlockGroup::EraseFromPBuffer( +NThreading::TFuture<TDBGEraseResponse> TDirectBlockGroup::BatchEraseFromPBuffer( ui32 vChunkIndex, THostIndex hostIndex, const TVector<TPBufferSegment>& segments, @@ -795,11 +795,12 @@ NThreading::TFuture<TDBGEraseResponse> TDirectBlockGroup::EraseFromPBuffer( lsns.push_back(segment.Lsn); } - auto childSpan = CreateChildSpan(traceId, "NbsPartition.EraseFromPBuffer"); + auto childSpan = + CreateChildSpan(traceId, "NbsPartition.BatchEraseFromPBuffer"); OnRequest(hostIndex, EOperation::Erase); - auto future = StorageTransport->EraseFromPBuffer( + auto future = StorageTransport->BatchEraseFromPBuffer( PBufferConnections[hostIndex].HostConnection, std::move(selectors), std::move(lsns), @@ -851,6 +852,117 @@ NThreading::TFuture<TDBGEraseResponse> TDirectBlockGroup::EraseFromPBuffer( return result; } +void TDirectBlockGroup::BarrierEraseFromPBuffer(ui64 lsn) +{ + Executor->ExecuteSimple( + [weakSelf = weak_from_this(), lsn]() + { + auto self = weakSelf.lock(); + if (!self) { + return; + } + LOG_DEBUG( + *self->ActorSystem, + NKikimrServices::NBS_PARTITION, + "%s barrier-erase lsn=%lu on %lu PBuffer hosts", + self->LogTitle.GetWithTime().c_str(), + lsn, + self->PBufferConnections.size()); + for (THostIndex h = 0; h < self->PBufferConnections.size(); ++h) { + self->DoBarrierEraseFromPBuffer(h, lsn, NWilson::TTraceId{}); + } + }); +} + +void TDirectBlockGroup::DoBarrierEraseFromPBuffer( + THostIndex hostIndex, + ui64 lsn, + const NWilson::TTraceId& traceId) +{ + Y_ABORT_UNLESS(ExecutorThreadChecker.Check()); + + using TEvErasePersistentBufferResult = + NKikimrBlobStorage::NDDisk::TEvErasePersistentBufferResult; + + const auto startAt = TMonotonic::Now(); + + auto childSpan = + CreateChildSpan(traceId, "NbsPartition.DoBarrierEraseFromPBuffer"); + + OnRequest(hostIndex, EOperation::BarrierErase); + + auto future = StorageTransport->BarrierEraseFromPBuffer( + PBufferConnections[hostIndex].HostConnection, + lsn, + childSpan.get()); + + future.Subscribe( + [weakSelf = weak_from_this(), + childSpan = std::move(childSpan), + hostIndex, + startAt, + executor = Executor, + threadChecker = ExecutorThreadChecker.CreateDelegate()] // + (const TFuture<TEvErasePersistentBufferResult>& f) mutable + { + // ActorSystem thread + + executor->ExecuteSimple( + [weakSelf, + childSpan = std::move(childSpan), + hostIndex, + startAt, + threadChecker, + result = UnsafeExtractValue(f)] // + () mutable + { + Y_ABORT_UNLESS(threadChecker.Check()); + + auto self = weakSelf.lock(); + if (!self) { + return; + } + self->OnResponse( + hostIndex, + TMonotonic::Now() - startAt, + EOperation::BarrierErase, + TranslateError(result)); + }); + }); +} + +NThreading::TFuture<std::optional<ui64>> +TDirectBlockGroup::GatherSafeBarrierForErase() +{ + auto promise = NewPromise<std::optional<ui64>>(); + auto future = promise.GetFuture(); + + Executor->ExecuteSimple( + [weakSelf = weak_from_this(), promise]() mutable + { + auto self = weakSelf.lock(); + if (!self) { + promise.SetValue(std::nullopt); + return; + } + + std::optional<ui64> safeBarrier; + for (const auto& weakVChunk: self->VChunks) { + auto vChunk = weakVChunk.lock(); + if (!vChunk) { + continue; + } + const auto lsn = vChunk->GetSafeBarrierForErase(); + if (lsn && (!safeBarrier || *lsn < *safeBarrier)) { + safeBarrier = lsn; + } + } + promise.SetValue(safeBarrier); + }); + + return future; +} + NThreading::TFuture<TDBGRestoreResponse> TDirectBlockGroup::RestoreDBGPBuffers( ui32 vChunkIndex) { diff --git a/ydb/core/nbs/cloud/blockstore/libs/storage/partition_direct/direct_block_group_impl.h b/ydb/core/nbs/cloud/blockstore/libs/storage/partition_direct/direct_block_group_impl.h index 280ed627e47..a5ebf2465ce 100644 --- a/ydb/core/nbs/cloud/blockstore/libs/storage/partition_direct/direct_block_group_impl.h +++ b/ydb/core/nbs/cloud/blockstore/libs/storage/partition_direct/direct_block_group_impl.h @@ -106,12 +106,17 @@ public: const TVector<TPBufferSegment>& segments, const NWilson::TTraceId& traceId) override; - NThreading::TFuture<TDBGEraseResponse> EraseFromPBuffer( + NThreading::TFuture<TDBGEraseResponse> BatchEraseFromPBuffer( ui32 vChunkIndex, THostIndex hostIndex, const TVector<TPBufferSegment>& segments, const NWilson::TTraceId& traceId) override; + void BarrierEraseFromPBuffer(ui64 lsn) override; + + NThreading::TFuture<std::optional<ui64>> + GatherSafeBarrierForErase() override; + NThreading::TFuture<TDBGRestoreResponse> RestoreDBGPBuffers( ui32 vChunkIndex) override; @@ -169,6 +174,11 @@ private: const TEvSyncWithPersistentBufferResult& response, size_t segmentCount); + void DoBarrierEraseFromPBuffer( + THostIndex hostIndex, + ui64 lsn, + const NWilson::TTraceId& traceId); + void DoRestore( NThreading::TPromise<TDBGRestoreResponse> promise, ui32 vChunkIndex); diff --git a/ydb/core/nbs/cloud/blockstore/libs/storage/partition_direct/direct_block_group_mock.cpp b/ydb/core/nbs/cloud/blockstore/libs/storage/partition_direct/direct_block_group_mock.cpp index 8f192b4d647..dfd73bdb244 100644 --- a/ydb/core/nbs/cloud/blockstore/libs/storage/partition_direct/direct_block_group_mock.cpp +++ b/ydb/core/nbs/cloud/blockstore/libs/storage/partition_direct/direct_block_group_mock.cpp @@ -107,9 +107,9 @@ TDirectBlockGroupMock::TDirectBlockGroupMock() Y_ABORT_UNLESS(false, "Should set SyncWithPBufferHandler"); return NThreading::TFuture<TDBGFlushResponse>(); }; - EraseFromPBufferHandler = [](const auto&...) + BatchEraseFromPBufferHandler = [](const auto&...) { - Y_ABORT_UNLESS(false, "Should set EraseFromPBufferHandler"); + Y_ABORT_UNLESS(false, "Should set BatchEraseFromPBufferHandler"); return NThreading::TFuture<TDBGEraseResponse>(); }; RestoreDBGPBuffersHandler = [](const auto&...) @@ -269,13 +269,29 @@ NThreading::TFuture<TDBGFlushResponse> TDirectBlockGroupMock::SyncWithPBuffer( traceId); } -NThreading::TFuture<TDBGEraseResponse> TDirectBlockGroupMock::EraseFromPBuffer( +NThreading::TFuture<TDBGEraseResponse> +TDirectBlockGroupMock::BatchEraseFromPBuffer( ui32 vChunkIndex, THostIndex hostIndex, const TVector<TPBufferSegment>& segments, const NWilson::TTraceId& traceId) { - return EraseFromPBufferHandler(vChunkIndex, hostIndex, segments, traceId); + return BatchEraseFromPBufferHandler( + vChunkIndex, + hostIndex, + segments, + traceId); +} + +void TDirectBlockGroupMock::BarrierEraseFromPBuffer(ui64 lsn) +{ + Y_UNUSED(lsn); +} + +NThreading::TFuture<std::optional<ui64>> +TDirectBlockGroupMock::GatherSafeBarrierForErase() +{ + return NThreading::MakeFuture<std::optional<ui64>>(std::nullopt); } NThreading::TFuture<TDBGRestoreResponse> diff --git a/ydb/core/nbs/cloud/blockstore/libs/storage/partition_direct/direct_block_group_mock.h b/ydb/core/nbs/cloud/blockstore/libs/storage/partition_direct/direct_block_group_mock.h index 0e94b146000..3a8109497df 100644 --- a/ydb/core/nbs/cloud/blockstore/libs/storage/partition_direct/direct_block_group_mock.h +++ b/ydb/core/nbs/cloud/blockstore/libs/storage/partition_direct/direct_block_group_mock.h @@ -94,7 +94,7 @@ public: THostIndex ddiskHostIndex, const TVector<TPBufferSegment>& segments, const NWilson::TTraceId& traceId)>; - using TEraseFromPBufferHandler = + using TBatchEraseFromPBufferHandler = std::function<NThreading::TFuture<TDBGEraseResponse>( ui32 vChunkIndex, THostIndex hostIndex, @@ -119,7 +119,7 @@ public: TWriteBlocksToPBufferHandler WriteBlocksToPBufferHandler; TWriteBlocksToManyPBuffersHandler WriteBlocksToManyPBuffersHandler; TSyncWithPBufferHandler SyncWithPBufferHandler; - TEraseFromPBufferHandler EraseFromPBufferHandler; + TBatchEraseFromPBufferHandler BatchEraseFromPBufferHandler; TDBGRestoreHandler RestoreDBGPBuffersHandler; TListPBuffersHandler ListPBuffersHandler; TDBGDumpHandler DumpHandler; @@ -189,12 +189,17 @@ public: const TVector<TPBufferSegment>& segments, const NWilson::TTraceId& traceId) override; - NThreading::TFuture<TDBGEraseResponse> EraseFromPBuffer( + NThreading::TFuture<TDBGEraseResponse> BatchEraseFromPBuffer( ui32 vChunkIndex, THostIndex hostIndex, const TVector<TPBufferSegment>& segments, const NWilson::TTraceId& traceId) override; + void BarrierEraseFromPBuffer(ui64 lsn) override; + + NThreading::TFuture<std::optional<ui64>> + GatherSafeBarrierForErase() override; + NThreading::TFuture<TDBGRestoreResponse> RestoreDBGPBuffers( ui32 vChunkIndex) override; diff --git a/ydb/core/nbs/cloud/blockstore/libs/storage/partition_direct/dirty_map/dirty_map.cpp b/ydb/core/nbs/cloud/blockstore/libs/storage/partition_direct/dirty_map/dirty_map.cpp index 82a2627ad09..f93fb3dee18 100644 --- a/ydb/core/nbs/cloud/blockstore/libs/storage/partition_direct/dirty_map/dirty_map.cpp +++ b/ydb/core/nbs/cloud/blockstore/libs/storage/partition_direct/dirty_map/dirty_map.cpp @@ -485,26 +485,34 @@ TEraseHints TBlocksDirtyMap::MakeEraseBelatedHint() return result; } +void TBlocksDirtyMap::RegisterInflightWrite(ui64 lsn, TBlockRange64 range) +{ + const bool inserted = Inflight.AddRange( + lsn, + range, + TInflightInfo(this, lsn, range.Size() * BlockSize)); + Y_ABORT_UNLESS(inserted); +} + void TBlocksDirtyMap::WriteFinished( ui64 lsn, TBlockRange64 range, THostMask requested, THostMask confirmed) { + // Every write is pre-registered as pending at generation time (see + // RegisterInflightWrite), so the entry always exists here. + auto item = Inflight.GetValue(lsn); + Y_ABORT_UNLESS(item); + Y_ABORT_UNLESS(item->Range == range); + if (confirmed.Count() < QuorumDirectBlockGroupHostCount) { + const bool removed = Inflight.RemoveRange(lsn); + Y_ABORT_UNLESS(removed); return; } - const bool inserted = Inflight.AddRange( - lsn, - range, - TInflightInfo( - this, - lsn, - range.Size() * BlockSize, - requested, - confirmed)); - Y_ABORT_UNLESS(inserted); + item->Value.OnWritten(requested, confirmed); } void TBlocksDirtyMap::FlushFinished( @@ -636,6 +644,11 @@ ui64 TBlocksDirtyMap::GetMinErasePendingLsn() const return *ReadyToErase.begin(); } +std::optional<ui64> TBlocksDirtyMap::GetSafeBarrierForErase() const +{ + return Inflight.GetMinKey(); +} + const TPBufferCounters& TBlocksDirtyMap::GetPBufferCounters( THostIndex host) const { diff --git a/ydb/core/nbs/cloud/blockstore/libs/storage/partition_direct/dirty_map/dirty_map.h b/ydb/core/nbs/cloud/blockstore/libs/storage/partition_direct/dirty_map/dirty_map.h index 792a77be8ed..91abd9100ab 100644 --- a/ydb/core/nbs/cloud/blockstore/libs/storage/partition_direct/dirty_map/dirty_map.h +++ b/ydb/core/nbs/cloud/blockstore/libs/storage/partition_direct/dirty_map/dirty_map.h @@ -221,6 +221,10 @@ public: [[nodiscard]] TEraseHints MakeEraseHint(size_t batchSize); [[nodiscard]] TEraseHints MakeEraseBelatedHint(); + // Registers a write as pending (lsn generated, data not in any PBuffer + // yet) so that the cleanup bound covers it from the moment of generation. + void RegisterInflightWrite(ui64 lsn, TBlockRange64 range); + void WriteFinished( ui64 lsn, TBlockRange64 range, @@ -258,6 +262,7 @@ public: [[nodiscard]] size_t GetEraseBelatedCount() const; [[nodiscard]] ui64 GetMinFlushPendingLsn() const; [[nodiscard]] ui64 GetMinErasePendingLsn() const; + [[nodiscard]] std::optional<ui64> GetSafeBarrierForErase() const; [[nodiscard]] const TPBufferCounters& GetPBufferCounters( THostIndex host) const; diff --git a/ydb/core/nbs/cloud/blockstore/libs/storage/partition_direct/dirty_map/dirty_map_ut.cpp b/ydb/core/nbs/cloud/blockstore/libs/storage/partition_direct/dirty_map/dirty_map_ut.cpp index 0dfded36268..e3b8473ddb8 100644 --- a/ydb/core/nbs/cloud/blockstore/libs/storage/partition_direct/dirty_map/dirty_map_ut.cpp +++ b/ydb/core/nbs/cloud/blockstore/libs/storage/partition_direct/dirty_map/dirty_map_ut.cpp @@ -323,6 +323,7 @@ Y_UNIT_TEST_SUITE(TDirtyMapTest) DefaultBlockSize, DefaultVChunkSize / DefaultBlockSize); + dirtyMap.RegisterInflightWrite(123, TBlockRange64::WithLength(10, 10)); dirtyMap.WriteFinished( 123, TBlockRange64::WithLength(10, 10), @@ -371,12 +372,14 @@ Y_UNIT_TEST_SUITE(TDirtyMapTest) DefaultBlockSize, DefaultVChunkSize / DefaultBlockSize); + dirtyMap.RegisterInflightWrite(123, TBlockRange64::WithLength(10, 10)); dirtyMap.WriteFinished( 123, TBlockRange64::WithLength(10, 10), MakePrimaryHosts(), MakePrimaryHosts()); + dirtyMap.RegisterInflightWrite(124, TBlockRange64::WithLength(10, 10)); dirtyMap.WriteFinished( 124, TBlockRange64::WithLength(10, 10), @@ -446,6 +449,7 @@ Y_UNIT_TEST_SUITE(TDirtyMapTest) // Flush commands should be generated after completing the required // number of write operations. + dirtyMap.RegisterInflightWrite(123, TBlockRange64::WithLength(10, 10)); dirtyMap.WriteFinished( 123, TBlockRange64::WithLength(10, 10), @@ -458,6 +462,7 @@ Y_UNIT_TEST_SUITE(TDirtyMapTest) flushHint = dirtyMap.MakeFlushHint(2); UNIT_ASSERT_EQUAL(true, flushHint.Empty()); + dirtyMap.RegisterInflightWrite(124, TBlockRange64::WithLength(20, 10)); dirtyMap.WriteFinished( 124, TBlockRange64::WithLength(20, 10), @@ -580,6 +585,7 @@ Y_UNIT_TEST_SUITE(TDirtyMapTest) MakeHostMask(false, true, true, true, false); const THostMask confirmed = requested; + dirtyMap.RegisterInflightWrite(123, TBlockRange64::WithLength(10, 10)); dirtyMap.WriteFinished( 123, TBlockRange64::WithLength(10, 10), @@ -634,6 +640,7 @@ Y_UNIT_TEST_SUITE(TDirtyMapTest) MakeHostMask(false, true, true, true, false); const THostMask confirmed = requested; + dirtyMap.RegisterInflightWrite(123, TBlockRange64::WithLength(10, 10)); dirtyMap.WriteFinished( 123, TBlockRange64::WithLength(10, 10), @@ -696,6 +703,7 @@ Y_UNIT_TEST_SUITE(TDirtyMapTest) MakeHostMask(false, false, true, true, true); const THostMask confirmed = requested; + dirtyMap.RegisterInflightWrite(123, TBlockRange64::WithLength(10, 10)); dirtyMap.WriteFinished( 123, TBlockRange64::WithLength(10, 10), @@ -753,6 +761,7 @@ Y_UNIT_TEST_SUITE(TDirtyMapTest) MakeHostMask(true, true, true, false, false); const THostMask confirmed = requested; + dirtyMap.RegisterInflightWrite(123, TBlockRange64::WithLength(10, 10)); dirtyMap.WriteFinished( 123, TBlockRange64::WithLength(10, 10), @@ -806,18 +815,21 @@ Y_UNIT_TEST_SUITE(TDirtyMapTest) const THostMask confirmed = requested; // Range below write watermark. Should be flushed to 4 ddisks. + dirtyMap.RegisterInflightWrite(123, TBlockRange64::WithLength(10, 10)); dirtyMap.WriteFinished( 123, TBlockRange64::WithLength(10, 10), requested, confirmed); // Range cross write watermark. Should be flushed to 4 ddisks. + dirtyMap.RegisterInflightWrite(124, TBlockRange64::WithLength(95, 10)); dirtyMap.WriteFinished( 124, TBlockRange64::WithLength(95, 10), requested, confirmed); // Range over write watermark. Should be flushed to 3 ddisks. + dirtyMap.RegisterInflightWrite(125, TBlockRange64::WithLength(100, 10)); dirtyMap.WriteFinished( 125, TBlockRange64::WithLength(100, 10), @@ -850,18 +862,21 @@ Y_UNIT_TEST_SUITE(TDirtyMapTest) const THostMask confirmed = requested; // Range below write watermark. Should be flushed. + dirtyMap.RegisterInflightWrite(123, TBlockRange64::WithLength(10, 10)); dirtyMap.WriteFinished( 123, TBlockRange64::WithLength(10, 10), requested, confirmed); // Range cross write watermark. Should be flushed. + dirtyMap.RegisterInflightWrite(124, TBlockRange64::WithLength(95, 10)); dirtyMap.WriteFinished( 124, TBlockRange64::WithLength(95, 10), requested, confirmed); // Range over write watermark. Should not be flushed. + dirtyMap.RegisterInflightWrite(125, TBlockRange64::WithLength(100, 10)); dirtyMap.WriteFinished( 125, TBlockRange64::WithLength(100, 10), @@ -884,6 +899,7 @@ Y_UNIT_TEST_SUITE(TDirtyMapTest) DefaultBlockSize, DefaultVChunkSize / DefaultBlockSize); + dirtyMap.RegisterInflightWrite(123, TBlockRange64::WithLength(10, 10)); dirtyMap.WriteFinished( 123, TBlockRange64::WithLength(10, 10), @@ -925,6 +941,7 @@ Y_UNIT_TEST_SUITE(TDirtyMapTest) dirtyMap.LockDDiskRange(TBlockRange64::WithLength(5, 10), mask); // User write to overlapped with locked range. + dirtyMap.RegisterInflightWrite(123, TBlockRange64::WithLength(10, 10)); dirtyMap.WriteFinished( 123, TBlockRange64::WithLength(10, 10), @@ -1075,6 +1092,7 @@ Y_UNIT_TEST_SUITE(TDirtyMapTest) DefaultBlockSize, DefaultVChunkSize / DefaultBlockSize); + dirtyMap.RegisterInflightWrite(123, TBlockRange64::WithLength(0, 100)); dirtyMap.WriteFinished( 123, TBlockRange64::WithLength(0, 100), @@ -1110,6 +1128,7 @@ Y_UNIT_TEST_SUITE(TDirtyMapTest) dirtyMap.EraseFinished(THostIndex{0}, {123}, {}); + dirtyMap.RegisterInflightWrite(124, TBlockRange64::WithLength(10, 10)); dirtyMap.WriteFinished( 124, TBlockRange64::WithLength(10, 10), @@ -1178,12 +1197,14 @@ Y_UNIT_TEST_SUITE(TDirtyMapTest) DefaultBlockSize, DefaultVChunkSize / DefaultBlockSize); + dirtyMap.RegisterInflightWrite(100, TBlockRange64::WithLength(10, 10)); dirtyMap.WriteFinished( 100, TBlockRange64::WithLength(10, 10), MakePrimaryHosts(), MakePrimaryHosts()); + dirtyMap.RegisterInflightWrite(200, TBlockRange64::WithLength(30, 10)); dirtyMap.WriteFinished( 200, TBlockRange64::WithLength(30, 10), @@ -1210,12 +1231,14 @@ Y_UNIT_TEST_SUITE(TDirtyMapTest) DefaultBlockSize, DefaultVChunkSize / DefaultBlockSize); + dirtyMap.RegisterInflightWrite(100, TBlockRange64::WithLength(10, 41)); dirtyMap.WriteFinished( 100, TBlockRange64::WithLength(10, 41), MakePrimaryHosts(), MakePrimaryHosts()); + dirtyMap.RegisterInflightWrite(200, TBlockRange64::WithLength(20, 11)); dirtyMap.WriteFinished( 200, TBlockRange64::WithLength(20, 11), @@ -1232,6 +1255,7 @@ Y_UNIT_TEST_SUITE(TDirtyMapTest) "100{[H0,H1,H2][31..50][21..40]};", readHint.DebugPrint()); + dirtyMap.RegisterInflightWrite(300, TBlockRange64::WithLength(0, 50)); dirtyMap.WriteFinished( 300, TBlockRange64::WithLength(0, 50), @@ -1253,12 +1277,14 @@ Y_UNIT_TEST_SUITE(TDirtyMapTest) DefaultBlockSize, DefaultVChunkSize / DefaultBlockSize); + dirtyMap.RegisterInflightWrite(100, TBlockRange64::WithLength(10, 21)); dirtyMap.WriteFinished( 100, TBlockRange64::WithLength(10, 21), MakePrimaryHosts(), MakePrimaryHosts()); + dirtyMap.RegisterInflightWrite(200, TBlockRange64::WithLength(25, 21)); dirtyMap.WriteFinished( 200, TBlockRange64::WithLength(25, 21), @@ -1283,18 +1309,21 @@ Y_UNIT_TEST_SUITE(TDirtyMapTest) DefaultBlockSize, DefaultVChunkSize / DefaultBlockSize); + dirtyMap.RegisterInflightWrite(100, TBlockRange64::WithLength(10, 41)); dirtyMap.WriteFinished( 100, TBlockRange64::WithLength(10, 41), MakePrimaryHosts(), MakePrimaryHosts()); + dirtyMap.RegisterInflightWrite(150, TBlockRange64::WithLength(20, 21)); dirtyMap.WriteFinished( 150, TBlockRange64::WithLength(20, 21), MakePrimaryHosts(), MakePrimaryHosts()); + dirtyMap.RegisterInflightWrite(200, TBlockRange64::WithLength(30, 6)); dirtyMap.WriteFinished( 200, TBlockRange64::WithLength(30, 6), @@ -1322,6 +1351,7 @@ Y_UNIT_TEST_SUITE(TDirtyMapTest) DefaultBlockSize, DefaultVChunkSize / DefaultBlockSize); + dirtyMap.RegisterInflightWrite(100, TBlockRange64::WithLength(10, 10)); dirtyMap.WriteFinished( 100, TBlockRange64::WithLength(10, 10), @@ -1345,12 +1375,14 @@ Y_UNIT_TEST_SUITE(TDirtyMapTest) DefaultBlockSize, DefaultVChunkSize / DefaultBlockSize); + dirtyMap.RegisterInflightWrite(100, TBlockRange64::WithLength(10, 100)); dirtyMap.WriteFinished( 100, TBlockRange64::WithLength(10, 100), MakePrimaryHosts(), MakePrimaryHosts()); + dirtyMap.RegisterInflightWrite(200, TBlockRange64::WithLength(10, 40)); dirtyMap.WriteFinished( 200, TBlockRange64::WithLength(10, 40), @@ -1378,6 +1410,7 @@ Y_UNIT_TEST_SUITE(TDirtyMapTest) const int lsnsCount = 100; for (int i = 1; i <= lsnsCount; ++i) { + dirtyMap.RegisterInflightWrite(i, TBlockRange64::WithLength(i, 1)); dirtyMap.WriteFinished( i, TBlockRange64::WithLength(i, 1), @@ -1413,18 +1446,21 @@ Y_UNIT_TEST_SUITE(TDirtyMapTest) DefaultBlockSize, DefaultVChunkSize / DefaultBlockSize); + dirtyMap.RegisterInflightWrite(100, TBlockRange64::WithLength(10, 21)); dirtyMap.WriteFinished( 100, TBlockRange64::WithLength(10, 21), MakePrimaryHosts(), MakePrimaryHosts()); + dirtyMap.RegisterInflightWrite(200, TBlockRange64::WithLength(25, 21)); dirtyMap.WriteFinished( 200, TBlockRange64::WithLength(25, 21), MakePrimaryHosts(), MakePrimaryHosts()); + dirtyMap.RegisterInflightWrite(300, TBlockRange64::WithLength(40, 21)); dirtyMap.WriteFinished( 300, TBlockRange64::WithLength(40, 21), @@ -1450,18 +1486,21 @@ Y_UNIT_TEST_SUITE(TDirtyMapTest) DefaultBlockSize, DefaultVChunkSize / DefaultBlockSize); + dirtyMap.RegisterInflightWrite(100, TBlockRange64::WithLength(10, 6)); dirtyMap.WriteFinished( 100, TBlockRange64::WithLength(10, 6), MakePrimaryHosts(), MakePrimaryHosts()); + dirtyMap.RegisterInflightWrite(200, TBlockRange64::WithLength(25, 6)); dirtyMap.WriteFinished( 200, TBlockRange64::WithLength(25, 6), MakePrimaryHosts(), MakePrimaryHosts()); + dirtyMap.RegisterInflightWrite(300, TBlockRange64::WithLength(45, 6)); dirtyMap.WriteFinished( 300, TBlockRange64::WithLength(45, 6), @@ -1490,24 +1529,28 @@ Y_UNIT_TEST_SUITE(TDirtyMapTest) DefaultBlockSize, DefaultVChunkSize / DefaultBlockSize); + dirtyMap.RegisterInflightWrite(100, TBlockRange64::WithLength(10, 91)); dirtyMap.WriteFinished( 100, TBlockRange64::WithLength(10, 91), MakePrimaryHosts(), MakePrimaryHosts()); + dirtyMap.RegisterInflightWrite(200, TBlockRange64::WithLength(20, 6)); dirtyMap.WriteFinished( 200, TBlockRange64::WithLength(20, 6), MakePrimaryHosts(), MakePrimaryHosts()); + dirtyMap.RegisterInflightWrite(300, TBlockRange64::WithLength(40, 6)); dirtyMap.WriteFinished( 300, TBlockRange64::WithLength(40, 6), MakePrimaryHosts(), MakePrimaryHosts()); + dirtyMap.RegisterInflightWrite(400, TBlockRange64::WithLength(70, 6)); dirtyMap.WriteFinished( 400, TBlockRange64::WithLength(70, 6), @@ -1538,6 +1581,7 @@ Y_UNIT_TEST_SUITE(TDirtyMapTest) DefaultVChunkSize / DefaultBlockSize); auto inflightCounterBeforeWrite = dirtyMap.GetInflightCount(); + dirtyMap.RegisterInflightWrite(100, TBlockRange64::WithLength(10, 41)); dirtyMap.WriteFinished( 100, TBlockRange64::WithLength(10, 41), @@ -1557,6 +1601,7 @@ Y_UNIT_TEST_SUITE(TDirtyMapTest) "0{[H0,H1,H2][10..50][0..40]};", readHint.DebugPrint()); + dirtyMap.RegisterInflightWrite(200, TBlockRange64::WithLength(10, 41)); dirtyMap.WriteFinished( 200, TBlockRange64::WithLength(10, 41), diff --git a/ydb/core/nbs/cloud/blockstore/libs/storage/partition_direct/dirty_map/inflight_info.cpp b/ydb/core/nbs/cloud/blockstore/libs/storage/partition_direct/dirty_map/inflight_info.cpp index 0ea01e62024..6f954dfad69 100644 --- a/ydb/core/nbs/cloud/blockstore/libs/storage/partition_direct/dirty_map/inflight_info.cpp +++ b/ydb/core/nbs/cloud/blockstore/libs/storage/partition_direct/dirty_map/inflight_info.cpp @@ -31,21 +31,16 @@ TInflightInfo::TInflightInfo( TInflightInfo::TInflightInfo( IReadyQueue* readyQueue, ui64 lsn, - size_t byteCount, - THostMask writeRequested, - THostMask writeConfirmed) - : State(EState::PBufferWritten) + size_t byteCount) + : State(EState::PBufferPendingWrite) , ReadyQueue(readyQueue) , Lsn(lsn) , ByteCount(byteCount) , StartAt(TInstant::Now()) - , WriteRequested(writeRequested) - , WriteConfirmed(writeConfirmed) { - Y_ABORT_UNLESS(WriteConfirmed.Count() >= QuorumDirectBlockGroupHostCount); - - ReadyQueue->Register(Lsn, IReadyQueue::EQueueType::Flush); - ApplyBytes(WriteRequested, IReadyQueue::EPBufferCounter::Total, true); + // Pending: no PBuffer holds the data yet, so nothing is registered in a + // ready queue and no bytes are accounted. The write is not acknowledged, so + // reads ignore it (PBufferPendingWrite reads from DDisk, never blocks). } TInflightInfo::TInflightInfo(TInflightInfo&& other) noexcept @@ -99,6 +94,22 @@ void TInflightInfo::RestorePBuffer(THostIndex host) } } +void TInflightInfo::OnWritten( + THostMask writeRequested, + THostMask writeConfirmed) +{ + Y_ABORT_UNLESS(State == EState::PBufferPendingWrite); + Y_ABORT_UNLESS(WriteConfirmed.Count() == 0); + Y_ABORT_UNLESS(writeConfirmed.Count() >= QuorumDirectBlockGroupHostCount); + + WriteRequested = writeRequested; + WriteConfirmed = writeConfirmed; + State = EState::PBufferWritten; + + ApplyBytes(WriteRequested, IReadyQueue::EPBufferCounter::Total, true); + ReadyQueue->Register(Lsn, IReadyQueue::EQueueType::Flush); +} + TInflightInfo::EState TInflightInfo::GetState() const { return State; @@ -115,6 +126,11 @@ NThreading::TFuture<void> TInflightInfo::GetQuorumReadyFuture() TReadSource TInflightInfo::ReadMask() const { switch (State) { + case EState::PBufferPendingWrite: + // The write is not acknowledged yet, so it is invisible to reads: + // read the pre-write data from DDisk (Lsn=0). Never blocks. + return {THostMask::MakeAll(MaxHostCount), /*Lsn=*/0}; + case EState::PBufferIncompleteWrite: // Reading will be possible only after receiving a quorum. return {THostMask::MakeEmpty(), /*Lsn=*/0}; diff --git a/ydb/core/nbs/cloud/blockstore/libs/storage/partition_direct/dirty_map/inflight_info.h b/ydb/core/nbs/cloud/blockstore/libs/storage/partition_direct/dirty_map/inflight_info.h index 7bb1a74745d..654e7ff206b 100644 --- a/ydb/core/nbs/cloud/blockstore/libs/storage/partition_direct/dirty_map/inflight_info.h +++ b/ydb/core/nbs/cloud/blockstore/libs/storage/partition_direct/dirty_map/inflight_info.h @@ -78,6 +78,11 @@ class TInflightInfo: public TDisableCopy public: enum class EState { + // The lsn is generated but the write has not been acknowledged yet. + // Tracked only to hold the cleanup watermark; invisible to reads (a + // concurrent read sees the pre-write data on DDisk, as before). + PBufferPendingWrite, + // During the recovery, a item without quorum was detected. It must be // copied to other PBuffers. // Reading will be possible only after receiving a quorum. @@ -109,12 +114,11 @@ public: ui64 lsn, size_t byteCount, THostIndex host); - TInflightInfo( - IReadyQueue* readyQueues, - ui64 lsn, - size_t byteCount, - THostMask writeRequested, - THostMask writeConfirmed); + + // Pending write: lsn is generated but data is not in any PBuffer yet. + // ReadMask is empty (reads wait on the quorum future) and the write is not + // flushable. Call OnWritten once a quorum of PBuffers confirms the write. + TInflightInfo(IReadyQueue* readyQueue, ui64 lsn, size_t byteCount); TInflightInfo(TInflightInfo&& other) noexcept; @@ -125,6 +129,10 @@ public: void RestorePBuffer(THostIndex host); + // Transitions a pending write (see the byteCount-only constructor) to the + // written state once a quorum of PBuffers confirmed it. + void OnWritten(THostMask writeRequested, THostMask writeConfirmed); + [[nodiscard]] EState GetState() const; // The subscription is triggered when the quorum is reached. diff --git a/ydb/core/nbs/cloud/blockstore/libs/storage/partition_direct/dirty_map/inflight_info_ut.cpp b/ydb/core/nbs/cloud/blockstore/libs/storage/partition_direct/dirty_map/inflight_info_ut.cpp index b9985ec07a6..5b80be73d6f 100644 --- a/ydb/core/nbs/cloud/blockstore/libs/storage/partition_direct/dirty_map/inflight_info_ut.cpp +++ b/ydb/core/nbs/cloud/blockstore/libs/storage/partition_direct/dirty_map/inflight_info_ut.cpp @@ -99,12 +99,8 @@ Y_UNIT_TEST_SUITE(TInflightInfoTests) Y_UNIT_TEST(ShouldHandleConfirmedWrite) { TTestReadyQueue readyQueue; - TInflightInfo inflightInfo( - &readyQueue, - 123, - 4096, - MakePrimaryHosts(), - MakePrimaryHosts()); + TInflightInfo inflightInfo(&readyQueue, 123, 4096); + inflightInfo.OnWritten(MakePrimaryHosts(), MakePrimaryHosts()); UNIT_ASSERT_VALUES_EQUAL(true, readyQueue.ReadyToFlush.contains(123)); // Start flushes @@ -158,12 +154,8 @@ Y_UNIT_TEST_SUITE(TInflightInfoTests) Y_UNIT_TEST(ShouldHandleLock) { TTestReadyQueue readyQueue; - TInflightInfo inflightInfo( - &readyQueue, - 123, - 4096, - MakePrimaryHosts(), - MakePrimaryHosts()); + TInflightInfo inflightInfo(&readyQueue, 123, 4096); + inflightInfo.OnWritten(MakePrimaryHosts(), MakePrimaryHosts()); UNIT_ASSERT_VALUES_EQUAL(true, readyQueue.ReadyToFlush.contains(123)); // Start flushes @@ -219,12 +211,8 @@ Y_UNIT_TEST_SUITE(TInflightInfoTests) Y_UNIT_TEST(ShouldPutToReadyQueueOnFail) { TTestReadyQueue readyQueue; - TInflightInfo inflightInfo( - &readyQueue, - 123, - 4096, - MakePrimaryHosts(), - MakePrimaryHosts()); + TInflightInfo inflightInfo(&readyQueue, 123, 4096); + inflightInfo.OnWritten(MakePrimaryHosts(), MakePrimaryHosts()); // Flush started UNIT_ASSERT_VALUES_EQUAL( @@ -308,12 +296,8 @@ Y_UNIT_TEST_SUITE(TInflightInfoTests) { TTestReadyQueue readyQueue; { - TInflightInfo inflightInfo( - &readyQueue, - 123, - 4096, - MakePrimaryHosts(), - MakePrimaryHosts()); + TInflightInfo inflightInfo(&readyQueue, 123, 4096); + inflightInfo.OnWritten(MakePrimaryHosts(), MakePrimaryHosts()); UNIT_ASSERT_VALUES_EQUAL( 4096, diff --git a/ydb/core/nbs/cloud/blockstore/libs/storage/partition_direct/erase_request.cpp b/ydb/core/nbs/cloud/blockstore/libs/storage/partition_direct/erase_request.cpp index 434db986f4f..551fb965a70 100644 --- a/ydb/core/nbs/cloud/blockstore/libs/storage/partition_direct/erase_request.cpp +++ b/ydb/core/nbs/cloud/blockstore/libs/storage/partition_direct/erase_request.cpp @@ -38,7 +38,7 @@ TEraseRequestExecutor::~TEraseRequestExecutor() void TEraseRequestExecutor::Run() { - auto future = DirectBlockGroup->EraseFromPBuffer( + auto future = DirectBlockGroup->BatchEraseFromPBuffer( VChunkConfig.GetVChunkIndex(), Host, Hint.Segments, diff --git a/ydb/core/nbs/cloud/blockstore/libs/storage/partition_direct/fast_path_service.cpp b/ydb/core/nbs/cloud/blockstore/libs/storage/partition_direct/fast_path_service.cpp index 0c45357b028..61501cebe07 100644 --- a/ydb/core/nbs/cloud/blockstore/libs/storage/partition_direct/fast_path_service.cpp +++ b/ydb/core/nbs/cloud/blockstore/libs/storage/partition_direct/fast_path_service.cpp @@ -87,6 +87,11 @@ NMonitoring::TDynamicCounterPtr MakeCountersChain( return result; } +size_t RegionCount(ui64 blockCount, ui32 blockSize) +{ + return AlignUp(blockCount * blockSize, RegionSize) / RegionSize; +} + TVector<std::shared_ptr<TRegion>> CreateRegions( IPartitionDirectService* partitionDirectService, ui64 blockCount, @@ -96,10 +101,9 @@ TVector<std::shared_ptr<TRegion>> CreateRegions( const TStorageConfig& storageConfig, NMonitoring::TDynamicCounterPtr counters) { - const ui64 volumeSize = blockCount * blockSize; - const ui64 regionsCount = AlignUp(volumeSize, RegionSize) / RegionSize; - TVector<std::shared_ptr<TRegion>> regions(regionsCount); - for (size_t i = 0; i < regionsCount; i++) { + const size_t regionCount = RegionCount(blockCount, blockSize); + TVector<std::shared_ptr<TRegion>> regions(regionCount); + for (size_t i = 0; i < regionCount; i++) { NMonitoring::TDynamicCounterPtr regionCounters = counters->GetSubgroup("region", ToString(i)); @@ -260,7 +264,6 @@ TFastPathService::WriteBlocksLocal( auto result = Regions[regionIndex]->WriteBlocksLocal( std::move(callContext), std::move(request), - GenerateSequenceNumber(), span->GetTraceId()); result.Subscribe( @@ -338,9 +341,78 @@ void TFastPathService::UpdateVChunkConfig(const TVChunkConfig& cfg) new TEvPartitionDirectPrivate::TEvUpdateVChunkConfig(cfg)); } -ui64 TFastPathService::GenerateSequenceNumber() +ui64 TFastPathService::GenerateLsn() +{ + const ui64 lsn = ++SequenceGenerator; + MaybeTriggerPBufferCleanup(lsn); + return lsn; +} + +void TFastPathService::MaybeTriggerPBufferCleanup(ui64 lsn) +{ + const ui64 step = StorageConfig->GetPBufferCleanupLsnStep(); + if (!step || lsn % step != 0) { + return; + } + // lsn values are unique, so exactly one generator hits each multiple of + // step. Skip if a previous cleanup is still gathering, to avoid overlap. + bool expected = false; + if (!CleanupGather.Active.compare_exchange_strong(expected, true)) { + return; + } + PBufferCleanup(); +} + +void TFastPathService::PBufferCleanup() { - return ++SequenceGenerator; + // Pull the smallest inflight lsn from every DirectBlockGroup. Each group + // writes its own result slot; the last responder computes the global + // minimum (see FinishPBufferCleanup). + const size_t dbgCount = DirectBlockGroups.size(); + CleanupGather.SafeBarriers.assign(dbgCount, std::nullopt); + CleanupGather.PendingResponses.store(dbgCount); + + for (size_t i = 0; i < dbgCount; ++i) { + DirectBlockGroups[i]->GatherSafeBarrierForErase().Subscribe( + [weakSelf = weak_from_this(), i] // + (const NThreading::TFuture<std::optional<ui64>>& f) + { + if (auto self = weakSelf.lock()) { + self->OnGatherSafeBarrierForErase(i, f.GetValue()); + } + }); + } +} + +void TFastPathService::OnGatherSafeBarrierForErase( + size_t dbgIndex, + std::optional<ui64> safeBarrier) +{ + CleanupGather.SafeBarriers[dbgIndex] = safeBarrier; + if (CleanupGather.PendingResponses.fetch_sub(1) == 1) { + FinishPBufferCleanup(); + } +} + +void TFastPathService::FinishPBufferCleanup() +{ + std::optional<ui64> globalMin; + for (const auto& safeBarrier: CleanupGather.SafeBarriers) { + if (safeBarrier && (!globalMin || *safeBarrier < *globalMin)) { + globalMin = safeBarrier; + } + } + + CleanupGather.Active.store(false); + + if (!globalMin) { + return; + } + + const ui64 cleanupBound = *globalMin - 1; + for (const auto& dbg: DirectBlockGroups) { + dbg->BarrierEraseFromPBuffer(cleanupBound); + } } void TFastPathService::ScheduleDirtyMapDebugPrint() diff --git a/ydb/core/nbs/cloud/blockstore/libs/storage/partition_direct/fast_path_service.h b/ydb/core/nbs/cloud/blockstore/libs/storage/partition_direct/fast_path_service.h index 4509a99c649..94ae6971b06 100644 --- a/ydb/core/nbs/cloud/blockstore/libs/storage/partition_direct/fast_path_service.h +++ b/ydb/core/nbs/cloud/blockstore/libs/storage/partition_direct/fast_path_service.h @@ -44,6 +44,15 @@ private: size_t DumpCount = 0; TMap<size_t, TDBGDumpResponse> DebugDumps; + struct TPBufferCleanupGather + { + std::atomic<bool> Active{false}; + TVector<std::optional<ui64>> SafeBarriers; + std::atomic<size_t> PendingResponses{0}; + }; + + TPBufferCleanupGather CleanupGather; + public: TFastPathService( NActors::TActorSystem* actorSystem, @@ -89,11 +98,19 @@ public: void UpdateVChunkConfig(const TVChunkConfig& cfg) override; + ui64 GenerateLsn() override; + private: - ui64 GenerateSequenceNumber(); void ScheduleDirtyMapDebugPrint(); void QueryDirtyMapDebugDump(); void OnDebugDump(size_t dbgIndex, TDBGDumpResponse dump); + + void MaybeTriggerPBufferCleanup(ui64 lsn); + void PBufferCleanup(); + void OnGatherSafeBarrierForErase( + size_t dbgIndex, + std::optional<ui64> safeBarrier); + void FinishPBufferCleanup(); }; } // namespace NYdb::NBS::NBlockStore::NStorage::NPartitionDirect diff --git a/ydb/core/nbs/cloud/blockstore/libs/storage/partition_direct/model/host_stat.h b/ydb/core/nbs/cloud/blockstore/libs/storage/partition_direct/model/host_stat.h index 53e8045de94..861b8954158 100644 --- a/ydb/core/nbs/cloud/blockstore/libs/storage/partition_direct/model/host_stat.h +++ b/ydb/core/nbs/cloud/blockstore/libs/storage/partition_direct/model/host_stat.h @@ -18,6 +18,7 @@ enum class EOperation Flush, FlushCrossNode, Erase, + BarrierErase, // Must remain the last entry. Used to size per-operation containers. Count_, diff --git a/ydb/core/nbs/cloud/blockstore/libs/storage/partition_direct/partition_direct_ut.cpp b/ydb/core/nbs/cloud/blockstore/libs/storage/partition_direct/partition_direct_ut.cpp index c847a9f34d8..a93a3c1beb6 100644 --- a/ydb/core/nbs/cloud/blockstore/libs/storage/partition_direct/partition_direct_ut.cpp +++ b/ydb/core/nbs/cloud/blockstore/libs/storage/partition_direct/partition_direct_ut.cpp @@ -48,7 +48,9 @@ struct TScopedNbsService: TDisableCopyMove [[nodiscard]] TScopedNbsService SetupStorage( TEnvironmentSetup& env, EWriteMode writeMode, - TDuration writeHedgingDelay = TDuration::Seconds(1)) + TDuration writeHedgingDelay = TDuration::Seconds(1), + ui64 pbufferCleanupLsnStep = 0, + ui32 syncRequestsBatchSize = 0) { env.CreateBoxAndPool(); env.Sim(TDuration::Seconds(30)); @@ -82,6 +84,10 @@ struct TScopedNbsService: TDisableCopyMove storageConfig->SetWriteMode(GetProtoWriteMode(writeMode)); storageConfig->SetVChunkSize(DefaultVChunkSize); storageConfig->SetWriteHedgingDelay(writeHedgingDelay.MicroSeconds()); + storageConfig->SetPBufferCleanupLsnStep(pbufferCleanupLsnStep); + if (syncRequestsBatchSize) { + storageConfig->SetSyncRequestsBatchSize(syncRequestsBatchSize); + } return TScopedNbsService(nbsConfig); } @@ -178,6 +184,55 @@ TActorId GetLoadActorAdapterActorId( return loadActorAdapter; } +void WriteBlock( + TEnvironmentSetup& env, + const TActorId& loadActorAdapter, + const TActorId& edge, + ui64 index, + const TString& data) +{ + auto request = std::make_unique<TEvService::TEvWriteBlocksRequest>(); + request->Record.SetStartIndex(index); + request->Record.MutableBlocks()->AddBuffers(data); + + env.Runtime->Send( + new IEventHandle(loadActorAdapter, edge, request.release()), + edge.NodeId()); + + auto res = env.WaitForEdgeActorEvent<TEvService::TEvWriteBlocksResponse>( + edge, + false); + UNIT_ASSERT_VALUES_EQUAL_C( + S_OK, + res->Get()->Record.GetError().GetCode(), + FormatError(res->Get()->Record.GetError())); +} + +TString ReadBlock( + TEnvironmentSetup& env, + const TActorId& loadActorAdapter, + const TActorId& edge, + ui64 index) +{ + auto request = std::make_unique<TEvService::TEvReadBlocksRequest>(); + request->Record.SetStartIndex(index); + request->Record.SetBlocksCount(1); + + env.Runtime->Send( + new IEventHandle(loadActorAdapter, edge, request.release()), + edge.NodeId()); + + auto res = env.WaitForEdgeActorEvent<TEvService::TEvReadBlocksResponse>( + edge, + false); + UNIT_ASSERT_VALUES_EQUAL_C( + S_OK, + res->Get()->Record.GetError().GetCode(), + FormatError(res->Get()->Record.GetError())); + UNIT_ASSERT_VALUES_EQUAL(1, res->Get()->Record.GetBlocks().BuffersSize()); + return res->Get()->Record.GetBlocks().GetBuffers(0); +} + } // namespace //////////////////////////////////////////////////////////////////////////////// @@ -973,6 +1028,182 @@ Y_UNIT_TEST_SUITE(TPartitionDirectTest) expectedData); } } + + // PBuffer cleanup: once the write LSN advances by PBufferCleanupLsnStep the + // tablet barrier-erases PBuffer records up to the cleanup bound. Drive two + // write batches and assert a real barrier-erase (TEvErasePersistentBuffer + // with Lsn > 0) reaches the persistent buffer, with no data lost. + Y_UNIT_TEST(ShouldBarrierErasePBufferOnCleanup) + { + TEnvironmentSetup env{{ + .NodeCount = 8, + .Erasure = TBlobStorageGroupType::Erasure4Plus2Block, + }}; + auto& runtime = env.Runtime; + + auto scopedService = SetupStorage( + env, + EWriteMode::DirectPBuffersFilling, + TDuration::Seconds(1), + /*pbufferCleanupLsnStep=*/4, + /*syncRequestsBatchSize=*/1); + + auto partition = CreatePartitionTablet(env); + + const TActorId& edge = runtime->AllocateEdgeActor( + env.Settings.ControllerNodeId, + __FILE__, + __LINE__); + auto loadActorAdapter = + GetLoadActorAdapterActorId(env, partition, edge); + + TVector<ui64> barrierEraseLsns; + runtime->FilterFunction = + [&](ui32 /*nodeId*/, std::unique_ptr<IEventHandle>& ev) + { + if (ev->GetTypeRewrite() == + NDDisk::TEvErasePersistentBuffer::EventType) + { + barrierEraseLsns.push_back( + ev->Get<NDDisk::TEvErasePersistentBuffer>() + ->Record.GetLsn()); + } + return true; + }; + + constexpr ui64 BlockCount = 16; + TVector<TString> data(BlockCount); + for (ui64 i = 0; i < BlockCount; ++i) { + data[i] = NUnitTest::RandomString(DefaultBlockSize, i); + } + + // First batch: let it flush+erase so the cleanup floor moves past + // lsn 1 (a barrier of lsn 0 is suppressed by design). + for (ui64 i = 0; i < BlockCount / 2; ++i) { + WriteBlock(env, loadActorAdapter, edge, i, data[i]); + } + env.Sim(TDuration::Seconds(10)); + + // Second batch: still in flight when cleanup triggers, so the barrier + // fires at (oldest-in-flight lsn - 1) > 0. + for (ui64 i = BlockCount / 2; i < BlockCount; ++i) { + WriteBlock(env, loadActorAdapter, edge, i, data[i]); + } + env.Sim(TDuration::Seconds(10)); + + ui64 maxBarrierLsn = 0; + for (const ui64 lsn: barrierEraseLsns) { + if (lsn > maxBarrierLsn) { + maxBarrierLsn = lsn; + } + } + // A real barrier reached the PBuffer and never erased the newest write + // (exact lsn is timing-dependent in this sustained-flow test). + UNIT_ASSERT_C( + maxBarrierLsn > 0 && maxBarrierLsn < BlockCount, + "barrier lsn outside (0, " << BlockCount << "): " << maxBarrierLsn); + + // Nothing extra was erased: every block still reads back correctly. + for (ui64 i = 0; i < BlockCount; ++i) { + UNIT_ASSERT_VALUES_EQUAL( + ReadBlock(env, loadActorAdapter, edge, i), + data[i]); + } + } + + // PBuffer cleanup must never barrier-erase a record that has not been + // per-record erased yet (still in the dirty map) - even one already + // flushed to DDisk; erasing it out from under the dirty map desyncs it + // from the PBuffer. We let an initial batch flush+erase (advancing the + // floor so the barrier fires), then hold back every per-record erase and + // assert the barrier stops at the floor, never reaching the un-erased + // records. + Y_UNIT_TEST(ShouldNotBarrierEraseUnerasedRecords) + { + TEnvironmentSetup env{{ + .NodeCount = 8, + .Erasure = TBlobStorageGroupType::Erasure4Plus2Block, + }}; + auto& runtime = env.Runtime; + + auto scopedService = SetupStorage( + env, + EWriteMode::DirectPBuffersFilling, + TDuration::Seconds(1), + /*pbufferCleanupLsnStep=*/2, + /*syncRequestsBatchSize=*/1); + + auto partition = CreatePartitionTablet(env); + + const TActorId& edge = runtime->AllocateEdgeActor( + env.Settings.ControllerNodeId, + __FILE__, + __LINE__); + auto loadActorAdapter = + GetLoadActorAdapterActorId(env, partition, edge); + + bool holdErases = false; + TVector<ui64> barrierEraseLsns; + runtime->FilterFunction = + [&](ui32 /*nodeId*/, std::unique_ptr<IEventHandle>& ev) + { + const auto type = ev->GetTypeRewrite(); + if (type == NDDisk::TEvErasePersistentBuffer::EventType) { + barrierEraseLsns.push_back( + ev->Get<NDDisk::TEvErasePersistentBuffer>() + ->Record.GetLsn()); + } else if ( + holdErases && + type == NDDisk::TEvBatchErasePersistentBuffer::EventType) + { + return false; // flush succeeds, but the record is never + // erased + } + return true; + }; + + // lsn == cumulative write count (one lsn per write on a fresh volume), + // so the first ErasedCount writes get lsns 1..ErasedCount and the held + // records get lsns > ErasedCount. + constexpr ui64 ErasedCount = 4; + constexpr ui64 UnerasedCount = 8; + TVector<TString> data(ErasedCount + UnerasedCount); + for (ui64 i = 0; i < data.size(); ++i) { + data[i] = NUnitTest::RandomString(DefaultBlockSize, 1000 + i); + } + + // Erased batch: flush + per-record erase, advancing the cleanup floor + // past lsn 1 so the barrier can fire. + for (ui64 i = 0; i < ErasedCount; ++i) { + WriteBlock(env, loadActorAdapter, edge, i, data[i]); + } + env.Sim(TDuration::Seconds(10)); + + // From now on records flush but are never erased -> they stay in the + // dirty map, so the barrier must not advance past them. + holdErases = true; + for (ui64 i = ErasedCount; i < ErasedCount + UnerasedCount; ++i) { + WriteBlock(env, loadActorAdapter, edge, i, data[i]); + } + env.Sim(TDuration::Seconds(5)); + + ui64 maxBarrierLsn = 0; + for (const ui64 lsn: barrierEraseLsns) { + if (lsn > maxBarrierLsn) { + maxBarrierLsn = lsn; + } + } + // Floor pinned at ErasedCount+1, so the barrier lands exactly at + // ErasedCount and never reaches the un-erased records. + UNIT_ASSERT_VALUES_EQUAL(maxBarrierLsn, ErasedCount); + + // The un-erased records still read back correctly. + for (ui64 i = ErasedCount; i < ErasedCount + UnerasedCount; ++i) { + UNIT_ASSERT_VALUES_EQUAL( + ReadBlock(env, loadActorAdapter, edge, i), + data[i]); + } + } } } // namespace NYdb::NBS::NBlockStore::NStorage::NPartitionDirect diff --git a/ydb/core/nbs/cloud/blockstore/libs/storage/partition_direct/read_request_ut.cpp b/ydb/core/nbs/cloud/blockstore/libs/storage/partition_direct/read_request_ut.cpp index 2c84200ef8a..7f0c76420ab 100644 --- a/ydb/core/nbs/cloud/blockstore/libs/storage/partition_direct/read_request_ut.cpp +++ b/ydb/core/nbs/cloud/blockstore/libs/storage/partition_direct/read_request_ut.cpp @@ -65,6 +65,7 @@ Y_UNIT_TEST_SUITE(TReadRequestTest) .RequestId = 1, .Range = range}); + DirtyMap.RegisterInflightWrite(100, TBlockRange64::WithLength(20, 10)); DirtyMap.WriteFinished( 100, TBlockRange64::WithLength(20, 10), @@ -131,12 +132,14 @@ Y_UNIT_TEST_SUITE(TReadRequestTest) { Init(); + DirtyMap.RegisterInflightWrite(100, TBlockRange64::WithLength(20, 10)); DirtyMap.WriteFinished( 100, TBlockRange64::WithLength(20, 10), VChunkConfig.GetDesiredPBuffers(), VChunkConfig.GetDesiredPBuffers()); + DirtyMap.RegisterInflightWrite(200, TBlockRange64::WithLength(40, 10)); DirtyMap.WriteFinished( 200, TBlockRange64::WithLength(40, 10), diff --git a/ydb/core/nbs/cloud/blockstore/libs/storage/partition_direct/region.cpp b/ydb/core/nbs/cloud/blockstore/libs/storage/partition_direct/region.cpp index 5a67f390ea6..557128e0c0c 100644 --- a/ydb/core/nbs/cloud/blockstore/libs/storage/partition_direct/region.cpp +++ b/ydb/core/nbs/cloud/blockstore/libs/storage/partition_direct/region.cpp @@ -95,7 +95,6 @@ NThreading::TFuture<TReadBlocksLocalResponse> TRegion::ReadBlocksLocal( NThreading::TFuture<TWriteBlocksLocalResponse> TRegion::WriteBlocksLocal( TCallContextPtr callContext, std::shared_ptr<TWriteBlocksLocalRequest> request, - ui64 lsn, const NWilson::TTraceId& traceId) { const size_t vChunkIndex = VChunkIndexFromHeaders(request->Headers); @@ -103,7 +102,6 @@ NThreading::TFuture<TWriteBlocksLocalResponse> TRegion::WriteBlocksLocal( return VChunks[vChunkIndex]->WriteBlocksLocal( std::move(callContext), std::move(request), - lsn, traceId); } diff --git a/ydb/core/nbs/cloud/blockstore/libs/storage/partition_direct/region.h b/ydb/core/nbs/cloud/blockstore/libs/storage/partition_direct/region.h index 284ee47569d..784857c8da0 100644 --- a/ydb/core/nbs/cloud/blockstore/libs/storage/partition_direct/region.h +++ b/ydb/core/nbs/cloud/blockstore/libs/storage/partition_direct/region.h @@ -40,7 +40,6 @@ public: NThreading::TFuture<TWriteBlocksLocalResponse> WriteBlocksLocal( TCallContextPtr callContext, std::shared_ptr<TWriteBlocksLocalRequest> request, - ui64 lsn, const NWilson::TTraceId& traceId); private: diff --git a/ydb/core/nbs/cloud/blockstore/libs/storage/partition_direct/vchunk.cpp b/ydb/core/nbs/cloud/blockstore/libs/storage/partition_direct/vchunk.cpp index 6d95c55dd52..4b792f2d05b 100644 --- a/ydb/core/nbs/cloud/blockstore/libs/storage/partition_direct/vchunk.cpp +++ b/ydb/core/nbs/cloud/blockstore/libs/storage/partition_direct/vchunk.cpp @@ -146,7 +146,6 @@ TFuture<TReadBlocksLocalResponse> TVChunk::ReadBlocksLocal( TFuture<TWriteBlocksLocalResponse> TVChunk::WriteBlocksLocal( TCallContextPtr callContext, std::shared_ptr<TWriteBlocksLocalRequest> request, - ui64 lsn, const NWilson::TTraceId& traceId) { // VHost thread @@ -177,8 +176,7 @@ TFuture<TWriteBlocksLocalResponse> TVChunk::WriteBlocksLocal( std::move(request), traceId, std::move(callContext), - vchunkRange, - lsn); + vchunkRange); bundle->GetSpan().Attribute("VChunkIndex", VChunkConfig.GetVChunkIndex()); @@ -237,6 +235,13 @@ ui64 TVChunk::GetPBufferUsedSize(THostIndex hostIndex) const return BlocksDirtyMap.GetPBufferCounters(hostIndex).CurrentBytesCount; } +std::optional<ui64> TVChunk::GetSafeBarrierForErase() const +{ + Y_ABORT_UNLESS(ExecutorThreadChecker.Check()); + + return BlocksDirtyMap.GetSafeBarrierForErase(); +} + TString TVChunk::DebugPrintDirtyMap() { Y_ABORT_UNLESS(ExecutorThreadChecker.Check()); @@ -478,11 +483,18 @@ void TVChunk::DoWriteBlocksLocal(std::shared_ptr<TWriteRequestBundle> bundle) { Y_ABORT_UNLESS(ExecutorThreadChecker.Check()); + // Generate the lsn and register the write as inflight on the same executor + // thread, so the cleanup watermark covers it from the moment of generation. + const ui64 lsn = PartitionDirectService->GenerateLsn(); + bundle->SetLsn(lsn); + BlocksDirtyMap.RegisterInflightWrite(lsn, bundle->GetVChunkRange()); + LOG_DEBUG( *ActorSystem, NKikimrServices::NBS_PARTITION, - "%s DoWriteBlocksLocal: %s", + "%s DoWriteBlocksLocal: lsn %lu %s", LogTitle.GetWithTime().c_str(), + lsn, bundle->GetVChunkRange().Print().c_str()); auto writeExecutor = CreateWriteRequestExecutor( diff --git a/ydb/core/nbs/cloud/blockstore/libs/storage/partition_direct/vchunk.h b/ydb/core/nbs/cloud/blockstore/libs/storage/partition_direct/vchunk.h index ab1507b54ef..da3877055b2 100644 --- a/ydb/core/nbs/cloud/blockstore/libs/storage/partition_direct/vchunk.h +++ b/ydb/core/nbs/cloud/blockstore/libs/storage/partition_direct/vchunk.h @@ -55,7 +55,6 @@ public: NThreading::TFuture<TWriteBlocksLocalResponse> WriteBlocksLocal( TCallContextPtr callContext, std::shared_ptr<TWriteBlocksLocalRequest> request, - ui64 lsn, const NWilson::TTraceId& traceId); void SetHostState(THostIndex hostIndex, EHostState state); @@ -64,6 +63,11 @@ public: [[nodiscard]] ui64 GetPBufferUsedSize(THostIndex hostIndex) const; [[nodiscard]] TString DebugPrintDirtyMap(); + // This vchunk's contribution to the tablet-wide cleanup watermark: the + // smallest lsn still held in PBuffers, or nullopt when nothing is inflight. + // Must run on the executor thread. + [[nodiscard]] std::optional<ui64> GetSafeBarrierForErase() const; + // IWriteClient implementation void OnWriteBlocksResponse( std::shared_ptr<TWriteRequestBundle> bundle, diff --git a/ydb/core/nbs/cloud/blockstore/libs/storage/partition_direct/vchunk_ut.cpp b/ydb/core/nbs/cloud/blockstore/libs/storage/partition_direct/vchunk_ut.cpp index 71671961b2d..4322a93da7d 100644 --- a/ydb/core/nbs/cloud/blockstore/libs/storage/partition_direct/vchunk_ut.cpp +++ b/ydb/core/nbs/cloud/blockstore/libs/storage/partition_direct/vchunk_ut.cpp @@ -40,11 +40,8 @@ Y_UNIT_TEST_SUITE(TVChunkTest) vchunk->Start(); // Run write request - auto future = vchunk->WriteBlocksLocal( - callContext, - request, - 1000, - NWilson::TTraceId()); + auto future = + vchunk->WriteBlocksLocal(callContext, request, NWilson::TTraceId()); // Wait for three PBuffers write requests. UNIT_ASSERT_VALUES_EQUAL( diff --git a/ydb/core/nbs/cloud/blockstore/libs/storage/partition_direct/write_request_bundle.cpp b/ydb/core/nbs/cloud/blockstore/libs/storage/partition_direct/write_request_bundle.cpp index 4c50b6a3eb9..c8148a6a72a 100644 --- a/ydb/core/nbs/cloud/blockstore/libs/storage/partition_direct/write_request_bundle.cpp +++ b/ydb/core/nbs/cloud/blockstore/libs/storage/partition_direct/write_request_bundle.cpp @@ -13,8 +13,7 @@ TWriteRequestBundle::TWriteRequestBundle( std::shared_ptr<TWriteBlocksLocalRequest> request, const NWilson::TTraceId& traceId, TCallContextPtr callContext, - TBlockRange64 vchunkRange, - ui64 lsn) + TBlockRange64 vchunkRange) : WriteClient(std::move(writeClient)) , Request(std::move(request)) , Span( @@ -25,7 +24,6 @@ TWriteRequestBundle::TWriteRequestBundle( actorSystem) , CallContext(std::move(callContext)) , VChunkRange(vchunkRange) - , Lsn(lsn) , Promise(NThreading::NewPromise<TWriteBlocksLocalResponse>()) {} @@ -81,6 +79,11 @@ TBlockRange64 TWriteRequestBundle::GetVChunkRange() const return VChunkRange; } +void TWriteRequestBundle::SetLsn(ui64 lsn) +{ + Lsn = lsn; +} + ui64 TWriteRequestBundle::GetLsn() const { return Lsn; diff --git a/ydb/core/nbs/cloud/blockstore/libs/storage/partition_direct/write_request_bundle.h b/ydb/core/nbs/cloud/blockstore/libs/storage/partition_direct/write_request_bundle.h index 949c5ee9bc4..02d14a9b72e 100644 --- a/ydb/core/nbs/cloud/blockstore/libs/storage/partition_direct/write_request_bundle.h +++ b/ydb/core/nbs/cloud/blockstore/libs/storage/partition_direct/write_request_bundle.h @@ -37,8 +37,7 @@ public: std::shared_ptr<TWriteBlocksLocalRequest> request, const NWilson::TTraceId& traceId, TCallContextPtr callContext, - TBlockRange64 vchunkRange, - ui64 lsn); + TBlockRange64 vchunkRange); // Respond via WriteClient to VChunk. void Reply( @@ -54,6 +53,7 @@ public: NThreading::TFuture<TWriteBlocksLocalResponse> GetFuture(); NWilson::TSpan& GetSpan(); TBlockRange64 GetVChunkRange() const; + void SetLsn(ui64 lsn); ui64 GetLsn() const; TGuardedSgList& GetSgList(); diff --git a/ydb/core/nbs/cloud/blockstore/libs/storage/partition_direct/write_request_test_fixture.cpp b/ydb/core/nbs/cloud/blockstore/libs/storage/partition_direct/write_request_test_fixture.cpp index dabe8095b6d..5ac55a4eb96 100644 --- a/ydb/core/nbs/cloud/blockstore/libs/storage/partition_direct/write_request_test_fixture.cpp +++ b/ydb/core/nbs/cloud/blockstore/libs/storage/partition_direct/write_request_test_fixture.cpp @@ -194,8 +194,8 @@ TWriteRequestTestFixture::CreatePBufferReplicationExecutor( std::move(originalRequest), NWilson::TTraceId(), MakeIntrusive<TCallContext>(), - Range, - UserLsn); + Range); + bundle->SetLsn(UserLsn); WriteClient->Response.reset(); @@ -223,8 +223,8 @@ TWriteRequestTestFixture::CreateDirectReplicationExecutor( std::move(originalRequest), NWilson::TTraceId(), MakeIntrusive<TCallContext>(), - Range, - UserLsn); + Range); + bundle->SetLsn(UserLsn); WriteClient->Response.reset(); diff --git a/ydb/core/nbs/cloud/blockstore/libs/storage/storage_transport/ic_storage_transport.cpp b/ydb/core/nbs/cloud/blockstore/libs/storage/storage_transport/ic_storage_transport.cpp index a2cc3d2856a..daa7b6f2b09 100644 --- a/ydb/core/nbs/cloud/blockstore/libs/storage/storage_transport/ic_storage_transport.cpp +++ b/ydb/core/nbs/cloud/blockstore/libs/storage/storage_transport/ic_storage_transport.cpp @@ -128,7 +128,7 @@ TICStorageTransport::WriteToDDisk( } TFuture<NKikimrBlobStorage::NDDisk::TEvErasePersistentBufferResult> -TICStorageTransport::EraseFromPBuffer( +TICStorageTransport::BatchEraseFromPBuffer( const THostConnection& connection, TVector<NKikimr::NDDisk::TBlockSelector> selectors, TVector<ui64> lsns, @@ -136,12 +136,38 @@ TICStorageTransport::EraseFromPBuffer( { Y_ABORT_UNLESS(connection.ConnectionType == EConnectionType::PBuffer); - auto request = std::make_unique<TEvTransportPrivate::TEvEraseFromPBuffer>( - connection.GetServiceId(), - connection.Credentials, - std::move(selectors), - std::move(lsns), - span ? span->GetTraceId() : NWilson::TTraceId()); + auto request = + std::make_unique<TEvTransportPrivate::TEvBatchEraseFromPBuffer>( + connection.GetServiceId(), + connection.Credentials, + std::move(selectors), + std::move(lsns), + span ? span->GetTraceId() : NWilson::TTraceId()); + + auto future = request->Promise.GetFuture(); + + if (span) { + span->Event("ActorSystem_Send"); + } + ActorSystem->Send(ICStorageTransportActorId, request.release()); + + return future; +} + +TFuture<NKikimrBlobStorage::NDDisk::TEvErasePersistentBufferResult> +TICStorageTransport::BarrierEraseFromPBuffer( + const THostConnection& connection, + ui64 lsn, + NWilson::TSpan* span) +{ + Y_ABORT_UNLESS(connection.ConnectionType == EConnectionType::PBuffer); + + auto request = + std::make_unique<TEvTransportPrivate::TEvBarrierEraseFromPBuffer>( + connection.GetServiceId(), + connection.Credentials, + lsn, + span ? span->GetTraceId() : NWilson::TTraceId()); auto future = request->Promise.GetFuture(); diff --git a/ydb/core/nbs/cloud/blockstore/libs/storage/storage_transport/ic_storage_transport.h b/ydb/core/nbs/cloud/blockstore/libs/storage/storage_transport/ic_storage_transport.h index 0e5eb74f376..11acd98f67c 100644 --- a/ydb/core/nbs/cloud/blockstore/libs/storage/storage_transport/ic_storage_transport.h +++ b/ydb/core/nbs/cloud/blockstore/libs/storage/storage_transport/ic_storage_transport.h @@ -64,12 +64,17 @@ public: TVector<ui64> lsns, NWilson::TSpan* span) override; - NThreading::TFuture<TEvErasePersistentBufferResult> EraseFromPBuffer( + NThreading::TFuture<TEvErasePersistentBufferResult> BatchEraseFromPBuffer( const THostConnection& connection, TVector<NKikimr::NDDisk::TBlockSelector> selectors, TVector<ui64> lsns, NWilson::TSpan* span) override; + NThreading::TFuture<TEvErasePersistentBufferResult> BarrierEraseFromPBuffer( + const THostConnection& connection, + ui64 lsn, + NWilson::TSpan* span) override; + NThreading::TFuture<TEvListPersistentBufferResult> ListPBufferEntries( const THostConnection& connection) override; diff --git a/ydb/core/nbs/cloud/blockstore/libs/storage/storage_transport/ic_storage_transport_actor.cpp b/ydb/core/nbs/cloud/blockstore/libs/storage/storage_transport/ic_storage_transport_actor.cpp index 856753245d6..8277aeb03a9 100644 --- a/ydb/core/nbs/cloud/blockstore/libs/storage/storage_transport/ic_storage_transport_actor.cpp +++ b/ydb/core/nbs/cloud/blockstore/libs/storage/storage_transport/ic_storage_transport_actor.cpp @@ -93,7 +93,9 @@ TICStorageTransportActor::~TICStorageTransportActor() RejectAllPending<NDDisk::TEvSyncWithPersistentBufferResult>( FlushFromPBufferRequests); RejectAllPending<NDDisk::TEvErasePersistentBufferResult>( - EraseFromPBufferRequests); + BatchEraseFromPBufferRequests); + RejectAllPending<NDDisk::TEvErasePersistentBufferResult>( + BarrierEraseFromPBufferRequests); RejectAllPending<NDDisk::TEvListPersistentBufferResult>( ListPBufferEntriesRequests); @@ -566,21 +568,22 @@ void TICStorageTransportActor::HandleWriteToDDiskResult( } } -void TICStorageTransportActor::HandleErasePersistentBuffer( - const TEvTransportPrivate::TEvEraseFromPBuffer::TPtr& ev, +void TICStorageTransportActor::HandleBatchErasePersistentBuffer( + const TEvTransportPrivate::TEvBatchEraseFromPBuffer::TPtr& ev, const TActorContext& ctx) { auto* msg = ev->Get(); const ui64 requestId = ++RequestIdGenerator; - auto [it, inserted] = - EraseFromPBufferRequests.emplace(requestId, ev->Release().Release()); + auto [it, inserted] = BatchEraseFromPBufferRequests.emplace( + requestId, + ev->Release().Release()); Y_ABORT_UNLESS(inserted); LOG_DEBUG( ctx, NKikimrServices::NBS_PARTITION, - "Sent TEvEraseFromPBuffer with requestId# %lu", + "Sent TEvBatchEraseFromPBuffer with requestId# %lu", requestId); auto request = std::make_unique<NDDisk::TEvBatchErasePersistentBuffer>( @@ -589,6 +592,39 @@ void TICStorageTransportActor::HandleErasePersistentBuffer( request->AddErase(lsn, msg->Credentials.Generation); } + ctx.Send(MakeHolder<IEventHandle>( + msg->ServiceId, + ctx.SelfID, + request.release(), + 0, // flags + requestId, // cookie + nullptr, + std::move(msg->TraceId))); +} + +void TICStorageTransportActor::HandleErasePersistentBuffer( + const TEvTransportPrivate::TEvBarrierEraseFromPBuffer::TPtr& ev, + const TActorContext& ctx) +{ + auto* msg = ev->Get(); + + const ui64 requestId = ++RequestIdGenerator; + auto [it, inserted] = BarrierEraseFromPBufferRequests.emplace( + requestId, + ev->Release().Release()); + Y_ABORT_UNLESS(inserted); + + LOG_DEBUG( + ctx, + NKikimrServices::NBS_PARTITION, + "Sent TEvBarrierEraseFromPBuffer with requestId# %lu lsn# %lu", + requestId, + msg->Lsn); + + auto request = std::make_unique<NDDisk::TEvErasePersistentBuffer>( + msg->Credentials, + msg->Lsn); + SendWithUndeliveryTracking( ctx, msg->ServiceId, @@ -610,13 +646,13 @@ void TICStorageTransportActor::HandleErasePersistentBufferUndelivery( "requestId# %lu", requestId); - if (auto* r = EraseFromPBufferRequests.FindPtr(requestId)) { + if (auto* r = BatchEraseFromPBufferRequests.FindPtr(requestId)) { auto& request = **r; auto result = NKikimrBlobStorage::NDDisk::TEvErasePersistentBufferResult(); SetUndeliveryError(result); request.Promise.SetValue(std::move(result)); - EraseFromPBufferRequests.erase(requestId); + BatchEraseFromPBufferRequests.erase(requestId); } else { // That means that request is already completed LOG_ERROR( @@ -639,18 +675,24 @@ void TICStorageTransportActor::HandleErasePersistentBufferResult( "Received TEvErasePersistentBufferResult with requestId# %lu", requestId); - if (auto* r = EraseFromPBufferRequests.FindPtr(requestId)) { + if (auto* r = BatchEraseFromPBufferRequests.FindPtr(requestId)) { auto& request = **r; request.Promise.SetValue(std::move(ev->Get()->Record)); - EraseFromPBufferRequests.erase(requestId); - } else { - // That means that request is already completed - LOG_ERROR( - ctx, - NKikimrServices::NBS_PARTITION, - "ErasePersistentBufferEvent with requestId# %lu not found", - requestId); + BatchEraseFromPBufferRequests.erase(requestId); + return; + } + if (auto* r = BarrierEraseFromPBufferRequests.FindPtr(requestId)) { + auto& request = **r; + request.Promise.SetValue(std::move(ev->Get()->Record)); + BarrierEraseFromPBufferRequests.erase(requestId); + return; } + // That means that request is already completed + LOG_ERROR( + ctx, + NKikimrServices::NBS_PARTITION, + "ErasePersistentBufferEvent with requestId# %lu not found", + requestId); } void TICStorageTransportActor::HandleReadPersistentBuffer( @@ -1076,7 +1118,10 @@ STFUNC(TICStorageTransportActor::StateWork) HFunc(NDDisk::TEvWriteResult, HandleWriteToDDiskResult); HFunc( - TEvTransportPrivate::TEvEraseFromPBuffer, + TEvTransportPrivate::TEvBatchEraseFromPBuffer, + HandleBatchErasePersistentBuffer); + HFunc( + TEvTransportPrivate::TEvBarrierEraseFromPBuffer, HandleErasePersistentBuffer); HFunc( NDDisk::TEvBatchErasePersistentBuffer, diff --git a/ydb/core/nbs/cloud/blockstore/libs/storage/storage_transport/ic_storage_transport_actor.h b/ydb/core/nbs/cloud/blockstore/libs/storage/storage_transport/ic_storage_transport_actor.h index 96d5553bcc7..a10fc47a53b 100644 --- a/ydb/core/nbs/cloud/blockstore/libs/storage/storage_transport/ic_storage_transport_actor.h +++ b/ydb/core/nbs/cloud/blockstore/libs/storage/storage_transport/ic_storage_transport_actor.h @@ -33,8 +33,15 @@ private: THashMap<ui64, std::unique_ptr<TEvTransportPrivate::TEvSyncWithPBuffer>> FlushFromPBufferRequests; - THashMap<ui64, std::unique_ptr<TEvTransportPrivate::TEvEraseFromPBuffer>> - EraseFromPBufferRequests; + THashMap< + ui64, + std::unique_ptr<TEvTransportPrivate::TEvBatchEraseFromPBuffer>> + BatchEraseFromPBufferRequests; + + THashMap< + ui64, + std::unique_ptr<TEvTransportPrivate::TEvBarrierEraseFromPBuffer>> + BarrierEraseFromPBufferRequests; THashMap<ui64, std::unique_ptr<TEvTransportPrivate::TEvListPBufferEntries>> ListPBufferEntriesRequests; @@ -97,8 +104,12 @@ private: const NKikimr::NDDisk::TEvWriteResult::TPtr& ev, const NActors::TActorContext& ctx); + void HandleBatchErasePersistentBuffer( + const TEvTransportPrivate::TEvBatchEraseFromPBuffer::TPtr& ev, + const NActors::TActorContext& ctx); + void HandleErasePersistentBuffer( - const TEvTransportPrivate::TEvEraseFromPBuffer::TPtr& ev, + const TEvTransportPrivate::TEvBarrierEraseFromPBuffer::TPtr& ev, const NActors::TActorContext& ctx); void HandleErasePersistentBufferUndelivery( const NKikimr::NDDisk::TEvBatchErasePersistentBuffer::TPtr& ev, diff --git a/ydb/core/nbs/cloud/blockstore/libs/storage/storage_transport/ic_storage_transport_events.cpp b/ydb/core/nbs/cloud/blockstore/libs/storage/storage_transport/ic_storage_transport_events.cpp index e8fba2724d2..dcf6a1cffc5 100644 --- a/ydb/core/nbs/cloud/blockstore/libs/storage/storage_transport/ic_storage_transport_events.cpp +++ b/ydb/core/nbs/cloud/blockstore/libs/storage/storage_transport/ic_storage_transport_events.cpp @@ -24,7 +24,12 @@ TEvTransportPrivate::TWriteToDDisk::~TWriteToDDisk() Y_ABORT_UNLESS(Promise.IsReady()); } -TEvTransportPrivate::TEraseFromPBuffer::~TEraseFromPBuffer() +TEvTransportPrivate::TBatchEraseFromPBuffer::~TBatchEraseFromPBuffer() +{ + Y_ABORT_UNLESS(Promise.IsReady()); +} + +TEvTransportPrivate::TBarrierEraseFromPBuffer::~TBarrierEraseFromPBuffer() { Y_ABORT_UNLESS(Promise.IsReady()); } diff --git a/ydb/core/nbs/cloud/blockstore/libs/storage/storage_transport/ic_storage_transport_events.h b/ydb/core/nbs/cloud/blockstore/libs/storage/storage_transport/ic_storage_transport_events.h index 3bc49c0d690..dc94f6c6f75 100644 --- a/ydb/core/nbs/cloud/blockstore/libs/storage/storage_transport/ic_storage_transport_events.h +++ b/ydb/core/nbs/cloud/blockstore/libs/storage/storage_transport/ic_storage_transport_events.h @@ -105,7 +105,7 @@ struct TEvTransportPrivate ~TWriteToDDisk(); }; - struct TEraseFromPBuffer: TDisableCopyMove + struct TBatchEraseFromPBuffer: TDisableCopyMove { using TResult = NKikimrBlobStorage::NDDisk::TEvErasePersistentBufferResult; @@ -118,7 +118,7 @@ struct TEvTransportPrivate NThreading::TPromise<TResult> Promise = NThreading::NewPromise<TResult>(); - TEraseFromPBuffer( + TBatchEraseFromPBuffer( const NActors::TActorId serviceId, const NKikimr::NDDisk::TQueryCredentials& credentials, TVector<NKikimr::NDDisk::TBlockSelector> selectors, @@ -131,7 +131,33 @@ struct TEvTransportPrivate , TraceId(std::move(traceId)) {} - ~TEraseFromPBuffer(); + ~TBatchEraseFromPBuffer(); + }; + + struct TBarrierEraseFromPBuffer: TDisableCopyMove + { + using TResult = + NKikimrBlobStorage::NDDisk::TEvErasePersistentBufferResult; + + const NActors::TActorId ServiceId; + const NKikimr::NDDisk::TQueryCredentials Credentials; + const ui64 Lsn; + NWilson::TTraceId TraceId; + NThreading::TPromise<TResult> Promise = + NThreading::NewPromise<TResult>(); + + TBarrierEraseFromPBuffer( + const NActors::TActorId serviceId, + const NKikimr::NDDisk::TQueryCredentials& credentials, + ui64 lsn, + NWilson::TTraceId traceId) + : ServiceId(serviceId) + , Credentials(credentials) + , Lsn(lsn) + , TraceId(std::move(traceId)) + {} + + ~TBarrierEraseFromPBuffer(); }; struct TReadFromPBuffer: TDisableCopyMove @@ -310,7 +336,8 @@ struct TEvTransportPrivate EvConnect, EvWriteToPBuffer, EvWriteToDDisk, - EvEraseFromPBuffer, + EvBatchEraseFromPBuffer, + EvBarrierEraseFromPBuffer, EvReadFromPBuffer, EvReadFromDDisk, EvSyncWithPBuffer, @@ -335,8 +362,12 @@ struct TEvTransportPrivate using TEvSyncWithPBuffer = TRequestEvent<TSyncWithPBuffer, EEvents::EvSyncWithPBuffer>; - using TEvEraseFromPBuffer = - TRequestEvent<TEraseFromPBuffer, EEvents::EvEraseFromPBuffer>; + using TEvBatchEraseFromPBuffer = + TRequestEvent<TBatchEraseFromPBuffer, EEvents::EvBatchEraseFromPBuffer>; + + using TEvBarrierEraseFromPBuffer = TRequestEvent< + TBarrierEraseFromPBuffer, + EEvents::EvBarrierEraseFromPBuffer>; using TEvListPBufferEntries = TRequestEvent<TListPBufferEntries, EEvents::EvListPBufferEntries>; diff --git a/ydb/core/nbs/cloud/blockstore/libs/storage/storage_transport/storage_transport.h b/ydb/core/nbs/cloud/blockstore/libs/storage/storage_transport/storage_transport.h index fafd07490ca..f7752c918b7 100644 --- a/ydb/core/nbs/cloud/blockstore/libs/storage/storage_transport/storage_transport.h +++ b/ydb/core/nbs/cloud/blockstore/libs/storage/storage_transport/storage_transport.h @@ -116,12 +116,18 @@ public: NWilson::TSpan* span) = 0; virtual NThreading::TFuture<TEvErasePersistentBufferResult> - EraseFromPBuffer( + BatchEraseFromPBuffer( const THostConnection& connection, TVector<NKikimr::NDDisk::TBlockSelector> selectors, TVector<ui64> lsns, NWilson::TSpan* span) = 0; + virtual NThreading::TFuture<TEvErasePersistentBufferResult> + BarrierEraseFromPBuffer( + const THostConnection& connection, + ui64 lsn, + NWilson::TSpan* span) = 0; + virtual NThreading::TFuture<TEvListPersistentBufferResult> ListPBufferEntries(const THostConnection& connection) = 0; }; |
