diff options
author | ivanmorozov333 <ivanmorozov@ydb.tech> | 2025-06-10 23:07:05 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2025-06-10 23:07:05 +0300 |
commit | d8e845f6a1e2628eb37b942a2ce8e84be901b8ca (patch) | |
tree | 31d90a30dd2c1bbe7ad2ab3ca4640b28b187010e | |
parent | e8af538b05a1bd7bc4a3bcba2fdcbe430675f69c (diff) | |
download | ydb-d8e845f6a1e2628eb37b942a2ce8e84be901b8ca.tar.gz |
fix deletion usage on compaction (#19576)
5 files changed, 57 insertions, 35 deletions
diff --git a/ydb/core/tx/columnshard/engines/changes/general_compaction.cpp b/ydb/core/tx/columnshard/engines/changes/general_compaction.cpp index 665928fa0b7..f57749aeda4 100644 --- a/ydb/core/tx/columnshard/engines/changes/general_compaction.cpp +++ b/ydb/core/tx/columnshard/engines/changes/general_compaction.cpp @@ -69,9 +69,19 @@ TConclusionStatus TGeneralCompactColumnEngineChanges::DoConstructBlobs(TConstruc AFL_VERIFY(usedPortionIds.emplace(i.GetPortionInfo().GetPortionId()).second); currentToMerge.emplace_back(std::make_shared<TReadPortionToMerge>(std::move(i), GranuleMeta)); } + + const auto buildPortionsToMerge = [&](const std::vector<std::shared_ptr<ISubsetToMerge>>& toMerge, const bool useDeletion) { + std::vector<TPortionToMerge> result; + for (auto&& i : toMerge) { + auto mergePortions = i->BuildPortionsToMerge(context, seqDataColumnIds, resultFiltered, usedPortionIds, useDeletion); + result.insert(result.end(), mergePortions.begin(), mergePortions.end()); + } + return result; + }; + auto shardingActual = context.SchemaVersions.GetShardingInfoActual(GranuleMeta->GetPathId()); while (true) { - std::vector<TPortionToMerge> toMerge; + std::vector<std::shared_ptr<ISubsetToMerge>> toMerge; ui64 sumMemory = 0; ui64 totalSumMemory = 0; std::vector<std::shared_ptr<ISubsetToMerge>> appendedToMerge; @@ -80,9 +90,9 @@ TConclusionStatus TGeneralCompactColumnEngineChanges::DoConstructBlobs(TConstruc if (NYDBTest::TControllers::GetColumnShardController()->CheckPortionsToMergeOnCompaction( sumMemory + i->GetColumnMaxChunkMemory(), subsetsCount) && subsetsCount > 1) { - auto merged = BuildAppendedPortionsByChunks(context, std::move(toMerge), resultFiltered, stats); + auto merged = BuildAppendedPortionsByChunks(context, buildPortionsToMerge(toMerge, false), resultFiltered, stats); if (merged.size()) { - appendedToMerge.emplace_back(std::make_shared<TWritePortionsToMerge>(std::move(merged))); + appendedToMerge.emplace_back(std::make_shared<TWritePortionsToMerge>(std::move(merged), GranuleMeta)); } toMerge.clear(); sumMemory = 0; @@ -90,15 +100,14 @@ TConclusionStatus TGeneralCompactColumnEngineChanges::DoConstructBlobs(TConstruc } sumMemory += i->GetColumnMaxChunkMemory(); totalSumMemory += i->GetColumnMaxChunkMemory(); - auto mergePortions = i->BuildPortionsToMerge(context, seqDataColumnIds, resultFiltered, usedPortionIds); - toMerge.insert(toMerge.end(), mergePortions.begin(), mergePortions.end()); + toMerge.emplace_back(i); ++subsetsCount; } if (toMerge.size()) { - auto merged = BuildAppendedPortionsByChunks(context, std::move(toMerge), resultFiltered, stats); + auto merged = BuildAppendedPortionsByChunks(context, buildPortionsToMerge(toMerge, appendedToMerge.empty()), resultFiltered, stats); if (appendedToMerge.size()) { if (merged.size()) { - appendedToMerge.emplace_back(std::make_shared<TWritePortionsToMerge>(std::move(merged))); + appendedToMerge.emplace_back(std::make_shared<TWritePortionsToMerge>(std::move(merged), GranuleMeta)); } } else { context.Counters.OnCompactionCorrectMemory(totalSumMemory); diff --git a/ydb/core/tx/columnshard/engines/changes/merge_subset.cpp b/ydb/core/tx/columnshard/engines/changes/merge_subset.cpp index 6286026d6a2..c978142a346 100644 --- a/ydb/core/tx/columnshard/engines/changes/merge_subset.cpp +++ b/ydb/core/tx/columnshard/engines/changes/merge_subset.cpp @@ -2,8 +2,9 @@ namespace NKikimr::NOlap::NCompaction { -std::shared_ptr<NArrow::TColumnFilter> TReadPortionToMerge::BuildPortionFilter(const std::optional<TGranuleShardingInfo>& shardingActual, - const std::shared_ptr<NArrow::TGeneralContainer>& batch, const TPortionInfo& pInfo, const THashSet<ui64>& portionsInUsage) const { +std::shared_ptr<NArrow::TColumnFilter> ISubsetToMerge::BuildPortionFilter(const std::optional<TGranuleShardingInfo>& shardingActual, + const std::shared_ptr<NArrow::TGeneralContainer>& batch, const TPortionInfo& pInfo, const THashSet<ui64>& portionsInUsage, + const bool useDeletionFilter) const { std::shared_ptr<NArrow::TColumnFilter> filter; if (shardingActual && pInfo.NeedShardingFilter(*shardingActual)) { std::set<std::string> fieldNames; @@ -15,7 +16,7 @@ std::shared_ptr<NArrow::TColumnFilter> TReadPortionToMerge::BuildPortionFilter(c filter = shardingActual->GetShardingInfo()->GetFilter(table); } NArrow::TColumnFilter filterDeleted = NArrow::TColumnFilter::BuildAllowFilter(); - if (pInfo.GetMeta().GetDeletionsCount()) { + if (pInfo.GetMeta().GetDeletionsCount() && useDeletionFilter) { if (pInfo.GetPortionType() == EPortionType::Written) { AFL_VERIFY(pInfo.GetMeta().GetDeletionsCount() == pInfo.GetRecordsCount()); filterDeleted = NArrow::TColumnFilter::BuildDenyFilter(); @@ -45,23 +46,26 @@ std::shared_ptr<NArrow::TColumnFilter> TReadPortionToMerge::BuildPortionFilter(c } std::vector<TPortionToMerge> TReadPortionToMerge::DoBuildPortionsToMerge(const TConstructionContext& context, - const std::set<ui32>& seqDataColumnIds, const std::shared_ptr<TFilteredSnapshotSchema>& resultFiltered, - const THashSet<ui64>& usedPortionIds) const { + const std::set<ui32>& seqDataColumnIds, const std::shared_ptr<TFilteredSnapshotSchema>& resultFiltered, const THashSet<ui64>& usedPortionIds, + const bool useDeletionFilter) const { auto blobsSchema = ReadPortion.GetPortionInfo().GetSchema(context.SchemaVersions); auto batch = ReadPortion.RestoreBatch(*blobsSchema, *resultFiltered, seqDataColumnIds, false).DetachResult(); auto shardingActual = context.SchemaVersions.GetShardingInfoActual(GranuleMeta->GetPathId()); - std::shared_ptr<NArrow::TColumnFilter> filter = BuildPortionFilter(shardingActual, batch, ReadPortion.GetPortionInfo(), usedPortionIds); + std::shared_ptr<NArrow::TColumnFilter> filter = + BuildPortionFilter(shardingActual, batch, ReadPortion.GetPortionInfo(), usedPortionIds, useDeletionFilter); return { TPortionToMerge(batch, filter) }; } std::vector<TPortionToMerge> TWritePortionsToMerge::DoBuildPortionsToMerge(const TConstructionContext& context, - const std::set<ui32>& seqDataColumnIds, const std::shared_ptr<TFilteredSnapshotSchema>& resultFiltered, - const THashSet<ui64>& /*usedPortionIds*/) const { + const std::set<ui32>& seqDataColumnIds, const std::shared_ptr<TFilteredSnapshotSchema>& resultFiltered, const THashSet<ui64>& usedPortionIds, + const bool useDeletionFilter) const { std::vector<TPortionToMerge> result; for (auto&& i : WritePortions) { auto blobsSchema = i.GetPortionResult().GetPortionInfo().GetSchema(context.SchemaVersions); auto batch = i.RestoreBatch(*blobsSchema, *resultFiltered, seqDataColumnIds, false).DetachResult(); - result.emplace_back(TPortionToMerge(batch, nullptr)); + std::shared_ptr<NArrow::TColumnFilter> filter = + BuildPortionFilter(std::nullopt, batch, i.GetPortionResult().GetPortionInfo(), usedPortionIds, useDeletionFilter); + result.emplace_back(TPortionToMerge(batch, filter)); } return result; } @@ -76,8 +80,10 @@ ui64 TWritePortionsToMerge::GetColumnMaxChunkMemory() const { return result; } -TWritePortionsToMerge::TWritePortionsToMerge(std::vector<TWritePortionInfoWithBlobsResult>&& portions) - : WritePortions(std::move(portions)) { +TWritePortionsToMerge::TWritePortionsToMerge( + std::vector<TWritePortionInfoWithBlobsResult>&& portions, const std::shared_ptr<TGranuleMeta>& granuleMeta) + : TBase(granuleMeta) + , WritePortions(std::move(portions)) { ui32 idx = 0; for (auto&& i : WritePortions) { i.GetPortionConstructor().MutablePortionConstructor().SetPortionId(++idx); diff --git a/ydb/core/tx/columnshard/engines/changes/merge_subset.h b/ydb/core/tx/columnshard/engines/changes/merge_subset.h index da4ce7545dd..03378a59f89 100644 --- a/ydb/core/tx/columnshard/engines/changes/merge_subset.h +++ b/ydb/core/tx/columnshard/engines/changes/merge_subset.h @@ -24,32 +24,42 @@ public: class ISubsetToMerge { private: virtual std::vector<TPortionToMerge> DoBuildPortionsToMerge(const TConstructionContext& context, const std::set<ui32>& seqDataColumnIds, - const std::shared_ptr<TFilteredSnapshotSchema>& resultFiltered, const THashSet<ui64>& usedPortionIds) const = 0; + const std::shared_ptr<TFilteredSnapshotSchema>& resultFiltered, const THashSet<ui64>& usedPortionIds, + const bool useDeletionFilter) const = 0; + +protected: + const std::shared_ptr<TGranuleMeta> GranuleMeta; + std::shared_ptr<NArrow::TColumnFilter> BuildPortionFilter(const std::optional<NKikimr::NOlap::TGranuleShardingInfo>& shardingActual, + const std::shared_ptr<NArrow::TGeneralContainer>& batch, const TPortionInfo& pInfo, const THashSet<ui64>& portionsInUsage, + const bool useDeletionFilter) const; public: + ISubsetToMerge(const std::shared_ptr<TGranuleMeta>& granule) + : GranuleMeta(granule) { + AFL_VERIFY(GranuleMeta); + } virtual ~ISubsetToMerge() = default; std::vector<TPortionToMerge> BuildPortionsToMerge(const TConstructionContext& context, const std::set<ui32>& seqDataColumnIds, - const std::shared_ptr<TFilteredSnapshotSchema>& resultFiltered, const THashSet<ui64>& usedPortionIds) const { - return DoBuildPortionsToMerge(context, seqDataColumnIds, resultFiltered, usedPortionIds); + const std::shared_ptr<TFilteredSnapshotSchema>& resultFiltered, const THashSet<ui64>& usedPortionIds, + const bool useDeletionFilter) const { + return DoBuildPortionsToMerge(context, seqDataColumnIds, resultFiltered, usedPortionIds, useDeletionFilter); } virtual ui64 GetColumnMaxChunkMemory() const = 0; }; class TReadPortionToMerge: public ISubsetToMerge { private: + using TBase = ISubsetToMerge; TReadPortionInfoWithBlobs ReadPortion; - const std::shared_ptr<TGranuleMeta> GranuleMeta; - - std::shared_ptr<NArrow::TColumnFilter> BuildPortionFilter(const std::optional<NKikimr::NOlap::TGranuleShardingInfo>& shardingActual, - const std::shared_ptr<NArrow::TGeneralContainer>& batch, const TPortionInfo& pInfo, const THashSet<ui64>& portionsInUsage) const; virtual std::vector<TPortionToMerge> DoBuildPortionsToMerge(const TConstructionContext& context, const std::set<ui32>& seqDataColumnIds, - const std::shared_ptr<TFilteredSnapshotSchema>& resultFiltered, const THashSet<ui64>& usedPortionIds) const override; + const std::shared_ptr<TFilteredSnapshotSchema>& resultFiltered, const THashSet<ui64>& usedPortionIds, + const bool useDeletionFilter) const override; public: TReadPortionToMerge(TReadPortionInfoWithBlobs&& rPortion, const std::shared_ptr<TGranuleMeta>& granuleMeta) - : ReadPortion(std::move(rPortion)) - , GranuleMeta(granuleMeta) { + : TBase(granuleMeta) + , ReadPortion(std::move(rPortion)) { } virtual ui64 GetColumnMaxChunkMemory() const override { @@ -63,15 +73,17 @@ public: class TWritePortionsToMerge: public ISubsetToMerge { private: + using TBase = ISubsetToMerge; std::vector<TWritePortionInfoWithBlobsResult> WritePortions; virtual std::vector<TPortionToMerge> DoBuildPortionsToMerge(const TConstructionContext& context, const std::set<ui32>& seqDataColumnIds, - const std::shared_ptr<TFilteredSnapshotSchema>& resultFiltered, const THashSet<ui64>& /*usedPortionIds*/) const override; + const std::shared_ptr<TFilteredSnapshotSchema>& resultFiltered, const THashSet<ui64>& usedPortionIds, + const bool useDeletionFilter) const override; virtual ui64 GetColumnMaxChunkMemory() const override; public: - TWritePortionsToMerge(std::vector<TWritePortionInfoWithBlobsResult>&& portions); + TWritePortionsToMerge(std::vector<TWritePortionInfoWithBlobsResult>&& portions, const std::shared_ptr<TGranuleMeta>& granuleMeta); }; } // namespace NKikimr::NOlap::NCompaction diff --git a/ydb/core/tx/columnshard/hooks/testing/controller.h b/ydb/core/tx/columnshard/hooks/testing/controller.h index 91f7f81943e..e389c9c4c63 100644 --- a/ydb/core/tx/columnshard/hooks/testing/controller.h +++ b/ydb/core/tx/columnshard/hooks/testing/controller.h @@ -27,7 +27,6 @@ private: YDB_ACCESSOR_DEF(std::optional<TDuration>, OverrideCompactionActualizationLag); YDB_ACCESSOR_DEF(std::optional<TDuration>, OverrideTasksActualizationLag); YDB_ACCESSOR_DEF(std::optional<TDuration>, OverrideMaxReadStaleness); - YDB_ACCESSOR_DEF(std::optional<bool>, OverrideAllowMergeFull); YDB_ACCESSOR(std::optional<ui64>, OverrideMemoryLimitForPortionReading, 100); YDB_ACCESSOR(std::optional<ui64>, OverrideLimitForPortionsMetadataAsk, 1); YDB_ACCESSOR(std::optional<NOlap::NSplitter::TSplitSettings>, OverrideBlobSplitSettings, NOlap::NSplitter::TSplitSettings::BuildForTests()); @@ -242,9 +241,6 @@ protected: public: virtual bool CheckPortionsToMergeOnCompaction(const ui64 /*memoryAfterAdd*/, const ui32 currentSubsetsCount) override { - if (OverrideAllowMergeFull && *OverrideAllowMergeFull) { - return false; - } return currentSubsetsCount > 1; } diff --git a/ydb/core/tx/schemeshard/ut_olap/ut_olap.cpp b/ydb/core/tx/schemeshard/ut_olap/ut_olap.cpp index 678699e4683..c39db923f86 100644 --- a/ydb/core/tx/schemeshard/ut_olap/ut_olap.cpp +++ b/ydb/core/tx/schemeshard/ut_olap/ut_olap.cpp @@ -1023,7 +1023,6 @@ Y_UNIT_TEST_SUITE(TOlap) { csController->SetOverridePeriodicWakeupActivationPeriod(TDuration::Seconds(1)); csController->SetOverrideLagForCompactionBeforeTierings(TDuration::Seconds(1)); csController->SetOverrideMaxReadStaleness(TDuration::Seconds(1)); - csController->SetOverrideAllowMergeFull(true); // disable stats batching auto& appData = runtime.GetAppData(); |