aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorivanmorozov333 <ivanmorozov@ydb.tech>2025-06-10 23:07:05 +0300
committerGitHub <noreply@github.com>2025-06-10 23:07:05 +0300
commitd8e845f6a1e2628eb37b942a2ce8e84be901b8ca (patch)
tree31d90a30dd2c1bbe7ad2ab3ca4640b28b187010e
parente8af538b05a1bd7bc4a3bcba2fdcbe430675f69c (diff)
downloadydb-d8e845f6a1e2628eb37b942a2ce8e84be901b8ca.tar.gz
fix deletion usage on compaction (#19576)
-rw-r--r--ydb/core/tx/columnshard/engines/changes/general_compaction.cpp23
-rw-r--r--ydb/core/tx/columnshard/engines/changes/merge_subset.cpp28
-rw-r--r--ydb/core/tx/columnshard/engines/changes/merge_subset.h36
-rw-r--r--ydb/core/tx/columnshard/hooks/testing/controller.h4
-rw-r--r--ydb/core/tx/schemeshard/ut_olap/ut_olap.cpp1
5 files changed, 57 insertions, 35 deletions
diff --git a/ydb/core/tx/columnshard/engines/changes/general_compaction.cpp b/ydb/core/tx/columnshard/engines/changes/general_compaction.cpp
index 665928fa0b7..f57749aeda4 100644
--- a/ydb/core/tx/columnshard/engines/changes/general_compaction.cpp
+++ b/ydb/core/tx/columnshard/engines/changes/general_compaction.cpp
@@ -69,9 +69,19 @@ TConclusionStatus TGeneralCompactColumnEngineChanges::DoConstructBlobs(TConstruc
AFL_VERIFY(usedPortionIds.emplace(i.GetPortionInfo().GetPortionId()).second);
currentToMerge.emplace_back(std::make_shared<TReadPortionToMerge>(std::move(i), GranuleMeta));
}
+
+ const auto buildPortionsToMerge = [&](const std::vector<std::shared_ptr<ISubsetToMerge>>& toMerge, const bool useDeletion) {
+ std::vector<TPortionToMerge> result;
+ for (auto&& i : toMerge) {
+ auto mergePortions = i->BuildPortionsToMerge(context, seqDataColumnIds, resultFiltered, usedPortionIds, useDeletion);
+ result.insert(result.end(), mergePortions.begin(), mergePortions.end());
+ }
+ return result;
+ };
+
auto shardingActual = context.SchemaVersions.GetShardingInfoActual(GranuleMeta->GetPathId());
while (true) {
- std::vector<TPortionToMerge> toMerge;
+ std::vector<std::shared_ptr<ISubsetToMerge>> toMerge;
ui64 sumMemory = 0;
ui64 totalSumMemory = 0;
std::vector<std::shared_ptr<ISubsetToMerge>> appendedToMerge;
@@ -80,9 +90,9 @@ TConclusionStatus TGeneralCompactColumnEngineChanges::DoConstructBlobs(TConstruc
if (NYDBTest::TControllers::GetColumnShardController()->CheckPortionsToMergeOnCompaction(
sumMemory + i->GetColumnMaxChunkMemory(), subsetsCount) &&
subsetsCount > 1) {
- auto merged = BuildAppendedPortionsByChunks(context, std::move(toMerge), resultFiltered, stats);
+ auto merged = BuildAppendedPortionsByChunks(context, buildPortionsToMerge(toMerge, false), resultFiltered, stats);
if (merged.size()) {
- appendedToMerge.emplace_back(std::make_shared<TWritePortionsToMerge>(std::move(merged)));
+ appendedToMerge.emplace_back(std::make_shared<TWritePortionsToMerge>(std::move(merged), GranuleMeta));
}
toMerge.clear();
sumMemory = 0;
@@ -90,15 +100,14 @@ TConclusionStatus TGeneralCompactColumnEngineChanges::DoConstructBlobs(TConstruc
}
sumMemory += i->GetColumnMaxChunkMemory();
totalSumMemory += i->GetColumnMaxChunkMemory();
- auto mergePortions = i->BuildPortionsToMerge(context, seqDataColumnIds, resultFiltered, usedPortionIds);
- toMerge.insert(toMerge.end(), mergePortions.begin(), mergePortions.end());
+ toMerge.emplace_back(i);
++subsetsCount;
}
if (toMerge.size()) {
- auto merged = BuildAppendedPortionsByChunks(context, std::move(toMerge), resultFiltered, stats);
+ auto merged = BuildAppendedPortionsByChunks(context, buildPortionsToMerge(toMerge, appendedToMerge.empty()), resultFiltered, stats);
if (appendedToMerge.size()) {
if (merged.size()) {
- appendedToMerge.emplace_back(std::make_shared<TWritePortionsToMerge>(std::move(merged)));
+ appendedToMerge.emplace_back(std::make_shared<TWritePortionsToMerge>(std::move(merged), GranuleMeta));
}
} else {
context.Counters.OnCompactionCorrectMemory(totalSumMemory);
diff --git a/ydb/core/tx/columnshard/engines/changes/merge_subset.cpp b/ydb/core/tx/columnshard/engines/changes/merge_subset.cpp
index 6286026d6a2..c978142a346 100644
--- a/ydb/core/tx/columnshard/engines/changes/merge_subset.cpp
+++ b/ydb/core/tx/columnshard/engines/changes/merge_subset.cpp
@@ -2,8 +2,9 @@
namespace NKikimr::NOlap::NCompaction {
-std::shared_ptr<NArrow::TColumnFilter> TReadPortionToMerge::BuildPortionFilter(const std::optional<TGranuleShardingInfo>& shardingActual,
- const std::shared_ptr<NArrow::TGeneralContainer>& batch, const TPortionInfo& pInfo, const THashSet<ui64>& portionsInUsage) const {
+std::shared_ptr<NArrow::TColumnFilter> ISubsetToMerge::BuildPortionFilter(const std::optional<TGranuleShardingInfo>& shardingActual,
+ const std::shared_ptr<NArrow::TGeneralContainer>& batch, const TPortionInfo& pInfo, const THashSet<ui64>& portionsInUsage,
+ const bool useDeletionFilter) const {
std::shared_ptr<NArrow::TColumnFilter> filter;
if (shardingActual && pInfo.NeedShardingFilter(*shardingActual)) {
std::set<std::string> fieldNames;
@@ -15,7 +16,7 @@ std::shared_ptr<NArrow::TColumnFilter> TReadPortionToMerge::BuildPortionFilter(c
filter = shardingActual->GetShardingInfo()->GetFilter(table);
}
NArrow::TColumnFilter filterDeleted = NArrow::TColumnFilter::BuildAllowFilter();
- if (pInfo.GetMeta().GetDeletionsCount()) {
+ if (pInfo.GetMeta().GetDeletionsCount() && useDeletionFilter) {
if (pInfo.GetPortionType() == EPortionType::Written) {
AFL_VERIFY(pInfo.GetMeta().GetDeletionsCount() == pInfo.GetRecordsCount());
filterDeleted = NArrow::TColumnFilter::BuildDenyFilter();
@@ -45,23 +46,26 @@ std::shared_ptr<NArrow::TColumnFilter> TReadPortionToMerge::BuildPortionFilter(c
}
std::vector<TPortionToMerge> TReadPortionToMerge::DoBuildPortionsToMerge(const TConstructionContext& context,
- const std::set<ui32>& seqDataColumnIds, const std::shared_ptr<TFilteredSnapshotSchema>& resultFiltered,
- const THashSet<ui64>& usedPortionIds) const {
+ const std::set<ui32>& seqDataColumnIds, const std::shared_ptr<TFilteredSnapshotSchema>& resultFiltered, const THashSet<ui64>& usedPortionIds,
+ const bool useDeletionFilter) const {
auto blobsSchema = ReadPortion.GetPortionInfo().GetSchema(context.SchemaVersions);
auto batch = ReadPortion.RestoreBatch(*blobsSchema, *resultFiltered, seqDataColumnIds, false).DetachResult();
auto shardingActual = context.SchemaVersions.GetShardingInfoActual(GranuleMeta->GetPathId());
- std::shared_ptr<NArrow::TColumnFilter> filter = BuildPortionFilter(shardingActual, batch, ReadPortion.GetPortionInfo(), usedPortionIds);
+ std::shared_ptr<NArrow::TColumnFilter> filter =
+ BuildPortionFilter(shardingActual, batch, ReadPortion.GetPortionInfo(), usedPortionIds, useDeletionFilter);
return { TPortionToMerge(batch, filter) };
}
std::vector<TPortionToMerge> TWritePortionsToMerge::DoBuildPortionsToMerge(const TConstructionContext& context,
- const std::set<ui32>& seqDataColumnIds, const std::shared_ptr<TFilteredSnapshotSchema>& resultFiltered,
- const THashSet<ui64>& /*usedPortionIds*/) const {
+ const std::set<ui32>& seqDataColumnIds, const std::shared_ptr<TFilteredSnapshotSchema>& resultFiltered, const THashSet<ui64>& usedPortionIds,
+ const bool useDeletionFilter) const {
std::vector<TPortionToMerge> result;
for (auto&& i : WritePortions) {
auto blobsSchema = i.GetPortionResult().GetPortionInfo().GetSchema(context.SchemaVersions);
auto batch = i.RestoreBatch(*blobsSchema, *resultFiltered, seqDataColumnIds, false).DetachResult();
- result.emplace_back(TPortionToMerge(batch, nullptr));
+ std::shared_ptr<NArrow::TColumnFilter> filter =
+ BuildPortionFilter(std::nullopt, batch, i.GetPortionResult().GetPortionInfo(), usedPortionIds, useDeletionFilter);
+ result.emplace_back(TPortionToMerge(batch, filter));
}
return result;
}
@@ -76,8 +80,10 @@ ui64 TWritePortionsToMerge::GetColumnMaxChunkMemory() const {
return result;
}
-TWritePortionsToMerge::TWritePortionsToMerge(std::vector<TWritePortionInfoWithBlobsResult>&& portions)
- : WritePortions(std::move(portions)) {
+TWritePortionsToMerge::TWritePortionsToMerge(
+ std::vector<TWritePortionInfoWithBlobsResult>&& portions, const std::shared_ptr<TGranuleMeta>& granuleMeta)
+ : TBase(granuleMeta)
+ , WritePortions(std::move(portions)) {
ui32 idx = 0;
for (auto&& i : WritePortions) {
i.GetPortionConstructor().MutablePortionConstructor().SetPortionId(++idx);
diff --git a/ydb/core/tx/columnshard/engines/changes/merge_subset.h b/ydb/core/tx/columnshard/engines/changes/merge_subset.h
index da4ce7545dd..03378a59f89 100644
--- a/ydb/core/tx/columnshard/engines/changes/merge_subset.h
+++ b/ydb/core/tx/columnshard/engines/changes/merge_subset.h
@@ -24,32 +24,42 @@ public:
class ISubsetToMerge {
private:
virtual std::vector<TPortionToMerge> DoBuildPortionsToMerge(const TConstructionContext& context, const std::set<ui32>& seqDataColumnIds,
- const std::shared_ptr<TFilteredSnapshotSchema>& resultFiltered, const THashSet<ui64>& usedPortionIds) const = 0;
+ const std::shared_ptr<TFilteredSnapshotSchema>& resultFiltered, const THashSet<ui64>& usedPortionIds,
+ const bool useDeletionFilter) const = 0;
+
+protected:
+ const std::shared_ptr<TGranuleMeta> GranuleMeta;
+ std::shared_ptr<NArrow::TColumnFilter> BuildPortionFilter(const std::optional<NKikimr::NOlap::TGranuleShardingInfo>& shardingActual,
+ const std::shared_ptr<NArrow::TGeneralContainer>& batch, const TPortionInfo& pInfo, const THashSet<ui64>& portionsInUsage,
+ const bool useDeletionFilter) const;
public:
+ ISubsetToMerge(const std::shared_ptr<TGranuleMeta>& granule)
+ : GranuleMeta(granule) {
+ AFL_VERIFY(GranuleMeta);
+ }
virtual ~ISubsetToMerge() = default;
std::vector<TPortionToMerge> BuildPortionsToMerge(const TConstructionContext& context, const std::set<ui32>& seqDataColumnIds,
- const std::shared_ptr<TFilteredSnapshotSchema>& resultFiltered, const THashSet<ui64>& usedPortionIds) const {
- return DoBuildPortionsToMerge(context, seqDataColumnIds, resultFiltered, usedPortionIds);
+ const std::shared_ptr<TFilteredSnapshotSchema>& resultFiltered, const THashSet<ui64>& usedPortionIds,
+ const bool useDeletionFilter) const {
+ return DoBuildPortionsToMerge(context, seqDataColumnIds, resultFiltered, usedPortionIds, useDeletionFilter);
}
virtual ui64 GetColumnMaxChunkMemory() const = 0;
};
class TReadPortionToMerge: public ISubsetToMerge {
private:
+ using TBase = ISubsetToMerge;
TReadPortionInfoWithBlobs ReadPortion;
- const std::shared_ptr<TGranuleMeta> GranuleMeta;
-
- std::shared_ptr<NArrow::TColumnFilter> BuildPortionFilter(const std::optional<NKikimr::NOlap::TGranuleShardingInfo>& shardingActual,
- const std::shared_ptr<NArrow::TGeneralContainer>& batch, const TPortionInfo& pInfo, const THashSet<ui64>& portionsInUsage) const;
virtual std::vector<TPortionToMerge> DoBuildPortionsToMerge(const TConstructionContext& context, const std::set<ui32>& seqDataColumnIds,
- const std::shared_ptr<TFilteredSnapshotSchema>& resultFiltered, const THashSet<ui64>& usedPortionIds) const override;
+ const std::shared_ptr<TFilteredSnapshotSchema>& resultFiltered, const THashSet<ui64>& usedPortionIds,
+ const bool useDeletionFilter) const override;
public:
TReadPortionToMerge(TReadPortionInfoWithBlobs&& rPortion, const std::shared_ptr<TGranuleMeta>& granuleMeta)
- : ReadPortion(std::move(rPortion))
- , GranuleMeta(granuleMeta) {
+ : TBase(granuleMeta)
+ , ReadPortion(std::move(rPortion)) {
}
virtual ui64 GetColumnMaxChunkMemory() const override {
@@ -63,15 +73,17 @@ public:
class TWritePortionsToMerge: public ISubsetToMerge {
private:
+ using TBase = ISubsetToMerge;
std::vector<TWritePortionInfoWithBlobsResult> WritePortions;
virtual std::vector<TPortionToMerge> DoBuildPortionsToMerge(const TConstructionContext& context, const std::set<ui32>& seqDataColumnIds,
- const std::shared_ptr<TFilteredSnapshotSchema>& resultFiltered, const THashSet<ui64>& /*usedPortionIds*/) const override;
+ const std::shared_ptr<TFilteredSnapshotSchema>& resultFiltered, const THashSet<ui64>& usedPortionIds,
+ const bool useDeletionFilter) const override;
virtual ui64 GetColumnMaxChunkMemory() const override;
public:
- TWritePortionsToMerge(std::vector<TWritePortionInfoWithBlobsResult>&& portions);
+ TWritePortionsToMerge(std::vector<TWritePortionInfoWithBlobsResult>&& portions, const std::shared_ptr<TGranuleMeta>& granuleMeta);
};
} // namespace NKikimr::NOlap::NCompaction
diff --git a/ydb/core/tx/columnshard/hooks/testing/controller.h b/ydb/core/tx/columnshard/hooks/testing/controller.h
index 91f7f81943e..e389c9c4c63 100644
--- a/ydb/core/tx/columnshard/hooks/testing/controller.h
+++ b/ydb/core/tx/columnshard/hooks/testing/controller.h
@@ -27,7 +27,6 @@ private:
YDB_ACCESSOR_DEF(std::optional<TDuration>, OverrideCompactionActualizationLag);
YDB_ACCESSOR_DEF(std::optional<TDuration>, OverrideTasksActualizationLag);
YDB_ACCESSOR_DEF(std::optional<TDuration>, OverrideMaxReadStaleness);
- YDB_ACCESSOR_DEF(std::optional<bool>, OverrideAllowMergeFull);
YDB_ACCESSOR(std::optional<ui64>, OverrideMemoryLimitForPortionReading, 100);
YDB_ACCESSOR(std::optional<ui64>, OverrideLimitForPortionsMetadataAsk, 1);
YDB_ACCESSOR(std::optional<NOlap::NSplitter::TSplitSettings>, OverrideBlobSplitSettings, NOlap::NSplitter::TSplitSettings::BuildForTests());
@@ -242,9 +241,6 @@ protected:
public:
virtual bool CheckPortionsToMergeOnCompaction(const ui64 /*memoryAfterAdd*/, const ui32 currentSubsetsCount) override {
- if (OverrideAllowMergeFull && *OverrideAllowMergeFull) {
- return false;
- }
return currentSubsetsCount > 1;
}
diff --git a/ydb/core/tx/schemeshard/ut_olap/ut_olap.cpp b/ydb/core/tx/schemeshard/ut_olap/ut_olap.cpp
index 678699e4683..c39db923f86 100644
--- a/ydb/core/tx/schemeshard/ut_olap/ut_olap.cpp
+++ b/ydb/core/tx/schemeshard/ut_olap/ut_olap.cpp
@@ -1023,7 +1023,6 @@ Y_UNIT_TEST_SUITE(TOlap) {
csController->SetOverridePeriodicWakeupActivationPeriod(TDuration::Seconds(1));
csController->SetOverrideLagForCompactionBeforeTierings(TDuration::Seconds(1));
csController->SetOverrideMaxReadStaleness(TDuration::Seconds(1));
- csController->SetOverrideAllowMergeFull(true);
// disable stats batching
auto& appData = runtime.GetAppData();