aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorSemyon <yentsovsemyon@ydb.tech>2025-06-03 18:58:22 +0300
committerGitHub <noreply@github.com>2025-06-03 15:58:22 +0000
commit9f92ebb034cea30aa04d9bcce9a98184a60af1a0 (patch)
tree67d221be4612353f403761933ecfbf4b7fb4e733
parent7ae0076cf92f2aa55f0e3742326d81d906cf2d12 (diff)
downloadydb-9f92ebb034cea30aa04d9bcce9a98184a60af1a0.tar.gz
duplicate filter construction task (#19120)
-rw-r--r--ydb/core/formats/arrow/reader/batch_iterator.h43
-rw-r--r--ydb/core/formats/arrow/reader/merger.h12
-rw-r--r--ydb/core/formats/arrow/ut/ut_arrow.cpp10
-rw-r--r--ydb/core/tx/columnshard/columnshard_private_events.h1
-rw-r--r--ydb/core/tx/columnshard/common/snapshot.h4
-rw-r--r--ydb/core/tx/columnshard/counters/counters_manager.h2
-rw-r--r--ydb/core/tx/columnshard/counters/duplicate_filtering.cpp10
-rw-r--r--ydb/core/tx/columnshard/counters/duplicate_filtering.h26
-rw-r--r--ydb/core/tx/columnshard/counters/ya.make1
-rw-r--r--ydb/core/tx/columnshard/engines/changes/compaction/merger.cpp2
-rw-r--r--ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/merge.cpp3
-rw-r--r--ydb/core/tx/columnshard/engines/reader/simple_reader/duplicates/common.cpp3
-rw-r--r--ydb/core/tx/columnshard/engines/reader/simple_reader/duplicates/common.h61
-rw-r--r--ydb/core/tx/columnshard/engines/reader/simple_reader/duplicates/events.h30
-rw-r--r--ydb/core/tx/columnshard/engines/reader/simple_reader/duplicates/merge.cpp76
-rw-r--r--ydb/core/tx/columnshard/engines/reader/simple_reader/duplicates/merge.h56
-rw-r--r--ydb/core/tx/columnshard/engines/reader/simple_reader/duplicates/ya.make2
-rw-r--r--ydb/core/tx/columnshard/operations/slice_builder/pack_builder.cpp2
18 files changed, 322 insertions, 22 deletions
diff --git a/ydb/core/formats/arrow/reader/batch_iterator.h b/ydb/core/formats/arrow/reader/batch_iterator.h
index 91d47d78041..5fba3f53fc5 100644
--- a/ydb/core/formats/arrow/reader/batch_iterator.h
+++ b/ydb/core/formats/arrow/reader/batch_iterator.h
@@ -4,6 +4,26 @@
namespace NKikimr::NArrow::NMerger {
+class TIterationOrder {
+private:
+ YDB_READONLY_DEF(bool, IsReversed);
+ YDB_READONLY_DEF(ui64, Start);
+
+public:
+ TIterationOrder(const bool reverse, const ui64 start)
+ : IsReversed(reverse)
+ , Start(start) {
+ }
+
+ static TIterationOrder Forward(const ui64 start) {
+ return TIterationOrder(false, start);
+ }
+
+ static TIterationOrder Reversed(const ui64 start) {
+ return TIterationOrder(true, start);
+ }
+};
+
class TBatchIterator {
private:
bool ControlPointFlag;
@@ -16,11 +36,12 @@ private:
std::shared_ptr<NArrow::TColumnFilter> Filter;
std::shared_ptr<NArrow::TColumnFilter::TIterator> FilterIterator;
- i32 GetFirstPosition() const {
+ i32 GetPosition(const ui64 position) const {
+ AFL_VERIFY((i64)position < RecordsCount);
if (ReverseSortKff > 0) {
- return 0;
+ return position;
} else {
- return RecordsCount - 1;
+ return RecordsCount - position - 1;
}
}
@@ -52,20 +73,24 @@ public:
template <class TDataContainer>
TBatchIterator(std::shared_ptr<TDataContainer> batch, std::shared_ptr<NArrow::TColumnFilter> filter, const arrow::Schema& keySchema,
- const arrow::Schema& dataSchema, const bool reverseSort, const std::vector<std::string>& versionColumnNames, const ui64 sourceId)
+ const arrow::Schema& dataSchema, const std::vector<std::string>& versionColumnNames, const ui64 sourceId,
+ const TIterationOrder& order = TIterationOrder::Forward(0))
: ControlPointFlag(false)
- , KeyColumns(batch, 0, keySchema.field_names(), dataSchema.field_names(), reverseSort)
+ , KeyColumns(batch, 0, keySchema.field_names(), dataSchema.field_names(), order.GetIsReversed())
, VersionColumns(batch, 0, versionColumnNames, {}, false)
, RecordsCount(batch->num_rows())
- , ReverseSortKff(reverseSort ? -1 : 1)
+ , ReverseSortKff(order.GetIsReversed() ? -1 : 1)
, SourceId(sourceId)
, Filter(filter) {
AFL_VERIFY(KeyColumns.IsSameSortingSchema(keySchema))("batch", KeyColumns.DebugJson())("schema", keySchema.ToString());
AFL_VERIFY(KeyColumns.IsSameDataSchema(dataSchema))("batch", KeyColumns.DebugJson())("schema", dataSchema.ToString());
- Y_ABORT_UNLESS(KeyColumns.InitPosition(GetFirstPosition()));
- Y_ABORT_UNLESS(VersionColumns.InitPosition(GetFirstPosition()));
+ Y_ABORT_UNLESS(KeyColumns.InitPosition(GetPosition(order.GetStart())));
+ Y_ABORT_UNLESS(VersionColumns.InitPosition(GetPosition(order.GetStart())));
if (Filter) {
- FilterIterator = std::make_shared<NArrow::TColumnFilter::TIterator>(Filter->GetIterator(reverseSort, RecordsCount));
+ FilterIterator = std::make_shared<NArrow::TColumnFilter::TIterator>(Filter->GetIterator(order.GetIsReversed(), RecordsCount));
+ if (order.GetStart()) {
+ FilterIterator->Next(order.GetStart());
+ }
}
}
diff --git a/ydb/core/formats/arrow/reader/merger.h b/ydb/core/formats/arrow/reader/merger.h
index b49e1b6a67e..282312361b2 100644
--- a/ydb/core/formats/arrow/reader/merger.h
+++ b/ydb/core/formats/arrow/reader/merger.h
@@ -159,18 +159,20 @@ public:
}
template <class TDataContainer>
- void AddSource(const std::shared_ptr<TDataContainer>& batch, const std::shared_ptr<NArrow::TColumnFilter>& filter,
- const std::optional<ui64> sourceIdExt = std::nullopt) {
+ void AddSource(const std::shared_ptr<TDataContainer>& batch,
+ const std::shared_ptr<NArrow::TColumnFilter>& filter, const TIterationOrder& order, const std::optional<ui64> sourceIdExt = std::nullopt) {
+ AFL_VERIFY(order.GetIsReversed() == Reverse);
const ui64 sourceId = sourceIdExt.value_or(SortHeap.Size());
- if (!batch || !batch->num_rows()) {
+ if (!batch || (i64)batch->num_rows() == (i64)order.GetStart()) {
return;
}
+ AFL_VERIFY((i64)order.GetStart() < (i64)batch->num_rows())("start", order.GetStart())("num_rows", batch->num_rows());
// Y_DEBUG_ABORT_UNLESS(NArrow::IsSorted(batch, SortSchema));
const bool isDenyFilter = filter && filter->IsTotalDenyFilter();
auto filterImpl = (!filter || filter->IsTotalAllowFilter()) ? nullptr : filter;
static const arrow::Schema emptySchema = arrow::Schema(arrow::FieldVector());
- TBatchIterator iterator(
- batch, filterImpl, *SortSchema, (!isDenyFilter && DataSchema) ? *DataSchema : emptySchema, Reverse, VersionColumnNames, sourceId);
+ TBatchIterator iterator(batch, filterImpl, *SortSchema, (!isDenyFilter && DataSchema) ? *DataSchema : emptySchema, VersionColumnNames,
+ sourceId, TIterationOrder(Reverse, order.GetStart()));
if (MaxVersion) {
MaxVersion->ValidateSchema(*iterator.GetVersionColumns().GetSorting());
}
diff --git a/ydb/core/formats/arrow/ut/ut_arrow.cpp b/ydb/core/formats/arrow/ut/ut_arrow.cpp
index ccbf117a5ee..f531ca4f4c6 100644
--- a/ydb/core/formats/arrow/ut/ut_arrow.cpp
+++ b/ydb/core/formats/arrow/ut/ut_arrow.cpp
@@ -694,7 +694,7 @@ Y_UNIT_TEST_SUITE(ArrowTest) {
auto merger =
std::make_shared<NArrow::NMerger::TMergePartialStream>(batch->schema(), batch->schema(), false, vColumns, std::nullopt);
for (auto&& i : batches) {
- merger->AddSource(i, nullptr);
+ merger->AddSource(i, nullptr, NArrow::NMerger::TIterationOrder::Forward(0));
}
merger->DrainAll(builder);
sorted = builder.Finalize();
@@ -721,7 +721,7 @@ Y_UNIT_TEST_SUITE(ArrowTest) {
const std::vector<std::string> vColumns = {batch->schema()->field(0)->name()};
auto merger = std::make_shared<NArrow::NMerger::TMergePartialStream>(batch->schema(), batch->schema(), true, vColumns, std::nullopt);
for (auto&& i : batches) {
- merger->AddSource(i, nullptr);
+ merger->AddSource(i, nullptr, NArrow::NMerger::TIterationOrder::Reversed(0));
}
merger->DrainAll(builder);
sorted = builder.Finalize();
@@ -748,7 +748,7 @@ Y_UNIT_TEST_SUITE(ArrowTest) {
auto merger =
std::make_shared<NArrow::NMerger::TMergePartialStream>(batch->schema(), batches[0]->schema(), false, vColumns, std::nullopt);
for (auto&& i : batches) {
- merger->AddSource(i, nullptr);
+ merger->AddSource(i, nullptr, NArrow::NMerger::TIterationOrder::Forward(0));
}
merger->DrainAll(builder);
sorted = builder.Finalize();
@@ -787,7 +787,7 @@ Y_UNIT_TEST_SUITE(ArrowTest) {
auto merger =
std::make_shared<NArrow::NMerger::TMergePartialStream>(batch->schema(), batches[0]->schema(), false, vColumns, maxVersionCursor);
for (auto&& i : batches) {
- merger->AddSource(i, nullptr);
+ merger->AddSource(i, nullptr, NArrow::NMerger::TIterationOrder::Forward(0));
}
merger->DrainAll(builder);
sorted = builder.Finalize();
@@ -833,7 +833,7 @@ Y_UNIT_TEST_SUITE(ArrowTest) {
auto merger =
std::make_shared<NArrow::NMerger::TMergePartialStream>(sortingSchema, batches[0]->schema(), false, vColumns, maxVersionCursor);
for (auto&& i : batches) {
- merger->AddSource(i, nullptr);
+ merger->AddSource(i, nullptr, NArrow::NMerger::TIterationOrder::Forward(0));
}
merger->DrainAll(builder);
sorted = builder.Finalize();
diff --git a/ydb/core/tx/columnshard/columnshard_private_events.h b/ydb/core/tx/columnshard/columnshard_private_events.h
index 9ae18b686e6..f88aa69b39c 100644
--- a/ydb/core/tx/columnshard/columnshard_private_events.h
+++ b/ydb/core/tx/columnshard/columnshard_private_events.h
@@ -70,6 +70,7 @@ struct TEvPrivate {
EvMetadataAccessorsInfo,
EvRequestFilter,
+ EvFilterConstructionResult,
EvEnd
};
diff --git a/ydb/core/tx/columnshard/common/snapshot.h b/ydb/core/tx/columnshard/common/snapshot.h
index 4f9e3841f79..7e1b7a78b14 100644
--- a/ydb/core/tx/columnshard/common/snapshot.h
+++ b/ydb/core/tx/columnshard/common/snapshot.h
@@ -102,6 +102,10 @@ public:
TString DebugString() const;
NJson::TJsonValue DebugJson() const;
+
+ operator size_t() const {
+ return CombineHashes(PlanStep, TxId);
+ }
};
} // namespace NKikimr::NOlap
diff --git a/ydb/core/tx/columnshard/counters/counters_manager.h b/ydb/core/tx/columnshard/counters/counters_manager.h
index 2f6b2c4ad86..2ade7ca6770 100644
--- a/ydb/core/tx/columnshard/counters/counters_manager.h
+++ b/ydb/core/tx/columnshard/counters/counters_manager.h
@@ -17,6 +17,7 @@
#include <ydb/core/tablet/tablet_counters.h>
#include <ydb/core/tablet_flat/tablet_flat_executor.h>
#include <ydb/core/tx/columnshard/common/path_id.h>
+#include <ydb/core/tx/columnshard/counters/duplicate_filtering.h>
#include <ydb/core/tx/columnshard/engines/column_engine.h>
#include <library/cpp/time_provider/time_provider.h>
@@ -37,6 +38,7 @@ private:
YDB_READONLY(TIndexationCounters, IndexationCounters, TIndexationCounters("Indexation"));
YDB_READONLY(TIndexationCounters, CompactionCounters, TIndexationCounters("GeneralCompaction"));
YDB_READONLY(TScanCounters, ScanCounters, TScanCounters("Scan"));
+ YDB_READONLY_DEF(TDuplicateFilteringCounters, DuplicateFilteringCounters);
YDB_READONLY_DEF(std::shared_ptr<TRequestsTracerCounters>, RequestsTracingCounters);
YDB_READONLY_DEF(std::shared_ptr<NOlap::NResourceBroker::NSubscribe::TSubscriberCounters>, SubscribeCounters);
diff --git a/ydb/core/tx/columnshard/counters/duplicate_filtering.cpp b/ydb/core/tx/columnshard/counters/duplicate_filtering.cpp
new file mode 100644
index 00000000000..daf1817df9b
--- /dev/null
+++ b/ydb/core/tx/columnshard/counters/duplicate_filtering.cpp
@@ -0,0 +1,10 @@
+#include "duplicate_filtering.h"
+
+namespace NKikimr::NColumnShard {
+TDuplicateFilteringCounters::TDuplicateFilteringCounters()
+ : TBase("DuplicateFiltering")
+ , MergeRowsAccepted(TBase::GetDeriviative("SourcesMerging/RowsAccepted"))
+ , MergeRowsRejected(TBase::GetDeriviative("SourcesMerging/RowsRejected"))
+ , MergeRowsBulkAccepted(TBase::GetDeriviative("SourcesMerging/RowsBulkAccepted")) {
+}
+} // namespace NKikimr::NColumnShard
diff --git a/ydb/core/tx/columnshard/counters/duplicate_filtering.h b/ydb/core/tx/columnshard/counters/duplicate_filtering.h
new file mode 100644
index 00000000000..06cda0b95c2
--- /dev/null
+++ b/ydb/core/tx/columnshard/counters/duplicate_filtering.h
@@ -0,0 +1,26 @@
+#pragma once
+
+#include <ydb/library/signals/histogram.h>
+#include <ydb/library/signals/owner.h>
+
+#include <library/cpp/monlib/dynamic_counters/counters.h>
+
+namespace NKikimr::NColumnShard {
+class TDuplicateFilteringCounters: public TCommonCountersOwner {
+private:
+ using TBase = TCommonCountersOwner;
+
+ NMonitoring::TDynamicCounters::TCounterPtr MergeRowsAccepted;
+ NMonitoring::TDynamicCounters::TCounterPtr MergeRowsRejected;
+ NMonitoring::TDynamicCounters::TCounterPtr MergeRowsBulkAccepted;
+
+public:
+ TDuplicateFilteringCounters();
+
+ void OnRowsMerged(const ui64 accepted, const ui64 rejected, const ui64 bulkAccepted) const {
+ MergeRowsAccepted->Add(accepted);
+ MergeRowsRejected->Add(rejected);
+ MergeRowsBulkAccepted->Add(bulkAccepted);
+ }
+};
+} // namespace NKikimr::NColumnShard
diff --git a/ydb/core/tx/columnshard/counters/ya.make b/ydb/core/tx/columnshard/counters/ya.make
index fd81a293fbf..5f54a69ba72 100644
--- a/ydb/core/tx/columnshard/counters/ya.make
+++ b/ydb/core/tx/columnshard/counters/ya.make
@@ -16,6 +16,7 @@ SRCS(
portions.cpp
writes_monitor.cpp
portion_index.cpp
+ duplicate_filtering.cpp
)
PEERDIR(
diff --git a/ydb/core/tx/columnshard/engines/changes/compaction/merger.cpp b/ydb/core/tx/columnshard/engines/changes/compaction/merger.cpp
index c3a38a045e7..3a1e3571bde 100644
--- a/ydb/core/tx/columnshard/engines/changes/compaction/merger.cpp
+++ b/ydb/core/tx/columnshard/engines/changes/compaction/merger.cpp
@@ -221,7 +221,7 @@ std::vector<TWritePortionInfoWithBlobsResult> TMerger::Execute(const std::shared
IColumnMerger::PortionRecordIndexFieldName);
batch->AddField(IColumnMerger::PortionRecordIndexField, column->BuildArray(batch->num_rows())).Validate();
}
- mergeStream.AddSource(batch, Filters[idx]);
+ mergeStream.AddSource(batch, Filters[idx], NArrow::NMerger::TIterationOrder::Forward(0));
++idx;
}
auto batchResults = mergeStream.DrainAllParts(checkPoints, indexFields);
diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/merge.cpp b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/merge.cpp
index a27739ffcdf..411301cb67a 100644
--- a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/merge.cpp
+++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/merge.cpp
@@ -133,7 +133,8 @@ TConclusionStatus TStartMergeTask::DoExecuteImpl() {
if (!i->GetStageResult().IsEmpty()) {
isEmpty = false;
}
- Merger->AddSource(rb, i->GetStageResult().GetNotAppliedFilter());
+ Merger->AddSource(rb, i->GetStageResult().GetNotAppliedFilter(),
+ NArrow::NMerger::TIterationOrder(Context->GetCommonContext()->IsReverse(), 0));
}
}
AFL_VERIFY(Merger->GetSourcesCount() <= Sources.size());
diff --git a/ydb/core/tx/columnshard/engines/reader/simple_reader/duplicates/common.cpp b/ydb/core/tx/columnshard/engines/reader/simple_reader/duplicates/common.cpp
new file mode 100644
index 00000000000..b69eab2aa0c
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/reader/simple_reader/duplicates/common.cpp
@@ -0,0 +1,3 @@
+#include "common.h"
+
+namespace NKikimr::NOlap::NReader::NSimple::NDuplicateFiltering {}
diff --git a/ydb/core/tx/columnshard/engines/reader/simple_reader/duplicates/common.h b/ydb/core/tx/columnshard/engines/reader/simple_reader/duplicates/common.h
new file mode 100644
index 00000000000..13cd548bb50
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/reader/simple_reader/duplicates/common.h
@@ -0,0 +1,61 @@
+#pragma once
+
+#include <ydb/core/formats/arrow/common/container.h>
+#include <ydb/core/tx/columnshard/common/snapshot.h>
+#include <ydb/core/tx/limiter/grouped_memory/usage/abstract.h>
+
+#include <ydb/library/accessor/accessor.h>
+
+namespace NKikimr::NOlap::NReader::NSimple::NDuplicateFiltering {
+
+class TColumnsData {
+private:
+ YDB_READONLY_DEF(std::shared_ptr<NArrow::TGeneralContainer>, Data);
+ YDB_READONLY_DEF(std::shared_ptr<NGroupedMemoryManager::TAllocationGuard>, MemoryGuard);
+
+public:
+ TColumnsData(const std::shared_ptr<NArrow::TGeneralContainer>& data, const std::shared_ptr<NGroupedMemoryManager::TAllocationGuard>& memory)
+ : Data(data)
+ , MemoryGuard(memory) {
+ AFL_VERIFY(MemoryGuard);
+ }
+
+ ui64 GetRawSize() const {
+ return MemoryGuard->GetMemory();
+ }
+};
+
+class TDuplicateMapInfo {
+private:
+ TSnapshot MaxVersion;
+ YDB_READONLY_DEF(ui64, Offset);
+ YDB_READONLY_DEF(ui64, RowsCount);
+ YDB_READONLY_DEF(ui64, SourceId);
+
+public:
+ TDuplicateMapInfo(const TSnapshot& maxVersion, const ui64 offset, const ui64 rowsCount, const ui64 sourceId)
+ : MaxVersion(maxVersion)
+ , Offset(offset)
+ , RowsCount(rowsCount)
+ , SourceId(sourceId) {
+ }
+
+ operator size_t() const {
+ ui64 h = 0;
+ h = CombineHashes(h, (size_t)MaxVersion);
+ h = CombineHashes(h, Offset);
+ h = CombineHashes(h, RowsCount);
+ h = CombineHashes(h, SourceId);
+ return h;
+ }
+ bool operator==(const TDuplicateMapInfo& other) const {
+ return std::tie(MaxVersion, Offset, RowsCount, SourceId) == std::tie(other.MaxVersion, other.Offset, other.RowsCount, other.SourceId);
+ }
+
+ TString DebugString() const {
+ return TStringBuilder() << "MaxVersion=" << MaxVersion.DebugString() << ";Offset=" << Offset << ";RowsCount=" << RowsCount
+ << ";SourceId=" << SourceId;
+ }
+};
+
+} // namespace NKikimr::NOlap::NReader::NSimple::NDuplicateFiltering
diff --git a/ydb/core/tx/columnshard/engines/reader/simple_reader/duplicates/events.h b/ydb/core/tx/columnshard/engines/reader/simple_reader/duplicates/events.h
index 6a386fdde4f..51c35837a30 100644
--- a/ydb/core/tx/columnshard/engines/reader/simple_reader/duplicates/events.h
+++ b/ydb/core/tx/columnshard/engines/reader/simple_reader/duplicates/events.h
@@ -1,10 +1,13 @@
#pragma once
+#include "common.h"
+
#include <ydb/core/formats/arrow/arrow_filter.h>
#include <ydb/core/tx/columnshard/columnshard_private_events.h>
#include <ydb/core/tx/columnshard/engines/reader/common_reader/iterator/source.h>
#include <ydb/library/actors/core/event_local.h>
+#include <ydb/library/conclusion/result.h>
namespace NKikimr::NOlap::NReader::NSimple {
class IDataSource;
@@ -32,4 +35,31 @@ public:
TEvRequestFilter(const IDataSource& source, const std::shared_ptr<IFilterSubscriber>& subscriber);
};
+class TEvFilterConstructionResult
+ : public NActors::TEventLocal<TEvFilterConstructionResult, NColumnShard::TEvPrivate::EvFilterConstructionResult> {
+private:
+ using TFilters = THashMap<TDuplicateMapInfo, NArrow::TColumnFilter>;
+ TConclusion<TFilters> Result;
+
+public:
+ TEvFilterConstructionResult(TConclusion<TFilters>&& result)
+ : Result(std::move(result)) {
+ if (Result.IsSuccess()) {
+ for (const auto& [info, filter] : *Result) {
+ AFL_VERIFY(!!filter.GetRecordsCount() && filter.GetRecordsCountVerified() == info.GetRowsCount())(
+ "filter", filter.GetRecordsCount().value_or(0))(
+ "info", info.DebugString());
+ }
+ }
+ }
+
+ const TConclusion<TFilters>& GetConclusion() const {
+ return Result;
+ }
+
+ TFilters&& ExtractResult() {
+ return Result.DetachResult();
+ }
+};
+
} // namespace NKikimr::NOlap::NReader::NSimple::NDuplicateFiltering
diff --git a/ydb/core/tx/columnshard/engines/reader/simple_reader/duplicates/merge.cpp b/ydb/core/tx/columnshard/engines/reader/simple_reader/duplicates/merge.cpp
new file mode 100644
index 00000000000..59ef376ebb4
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/reader/simple_reader/duplicates/merge.cpp
@@ -0,0 +1,76 @@
+#include "merge.h"
+
+#include <ydb/core/tx/columnshard/engines/reader/simple_reader/duplicates/events.h>
+
+namespace NKikimr::NOlap::NReader::NSimple::NDuplicateFiltering {
+
+class TFiltersBuilder {
+private:
+ THashMap<ui64, NArrow::TColumnFilter> Filters;
+ YDB_READONLY(ui64, RowsAdded, 0);
+ YDB_READONLY(ui64, RowsSkipped, 0);
+
+ void AddImpl(const ui64 sourceId, const bool value) {
+ auto* findFilter = Filters.FindPtr(sourceId);
+ AFL_VERIFY(findFilter);
+ findFilter->Add(value);
+ }
+
+public:
+ void AddRecord(const NArrow::NMerger::TBatchIterator& cursor) {
+ AddImpl(cursor.GetSourceId(), true);
+ ++RowsAdded;
+ }
+
+ void SkipRecord(const NArrow::NMerger::TBatchIterator& cursor) {
+ AddImpl(cursor.GetSourceId(), false);
+ ++RowsSkipped;
+ }
+
+ void ValidateDataSchema(const std::shared_ptr<arrow::Schema>& /*schema*/) const {
+ }
+
+ bool IsBufferExhausted() const {
+ return false;
+ }
+
+ THashMap<ui64, NArrow::TColumnFilter>&& ExtractFilters() && {
+ return std::move(Filters);
+ }
+
+ void AddSource(const ui64 sourceId) {
+ AFL_VERIFY(Filters.emplace(sourceId, NArrow::TColumnFilter::BuildAllowFilter()).second);
+ }
+};
+
+void TBuildDuplicateFilters::DoExecute(const std::shared_ptr<ITask>& /*taskPtr*/) {
+ NArrow::NMerger::TMergePartialStream merger(PKSchema, nullptr, false, VersionColumnNames, MaxVersion);
+ merger.PutControlPoint(Finish.BuildSortablePosition(), false);
+ TFiltersBuilder filtersBuilder;
+ for (const auto& [interval, data] : SourcesById) {
+ merger.AddSource(data->GetData(), nullptr, NArrow::NMerger::TIterationOrder::Forward(interval.GetOffset()), interval.GetSourceId());
+ filtersBuilder.AddSource(interval.GetSourceId());
+ }
+ merger.DrainToControlPoint(filtersBuilder, IncludeFinish);
+ Counters.OnRowsMerged(filtersBuilder.GetRowsAdded(), filtersBuilder.GetRowsSkipped(), 0);
+
+ THashMap<ui64, NArrow::TColumnFilter> filtersBySource = std::move(filtersBuilder).ExtractFilters();
+ THashMap<TDuplicateMapInfo, NArrow::TColumnFilter> filters;
+ for (auto&& [interval, data] : SourcesById) {
+ NArrow::TColumnFilter* findFilter = filtersBySource.FindPtr(interval.GetSourceId());
+ AFL_VERIFY(findFilter);
+ filters.emplace(interval, std::move(*findFilter));
+ }
+
+ AFL_VERIFY(Owner);
+ TActivationContext::AsActorContext().Send(Owner, new TEvFilterConstructionResult(std::move(filters)));
+ Owner = TActorId();
+}
+
+void TBuildDuplicateFilters::DoOnCannotExecute(const TString& reason) {
+ AFL_VERIFY(Owner);
+ TActivationContext::AsActorContext().Send(Owner, new TEvFilterConstructionResult(TConclusionStatus::Fail(reason)));
+ Owner = TActorId();
+}
+
+} // namespace NKikimr::NOlap::NReader::NSimple::NDuplicateFiltering
diff --git a/ydb/core/tx/columnshard/engines/reader/simple_reader/duplicates/merge.h b/ydb/core/tx/columnshard/engines/reader/simple_reader/duplicates/merge.h
new file mode 100644
index 00000000000..e12dbe66863
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/reader/simple_reader/duplicates/merge.h
@@ -0,0 +1,56 @@
+#pragma once
+
+#include "common.h"
+
+#include <ydb/core/formats/arrow/reader/merger.h>
+#include <ydb/core/formats/arrow/rows/view.h>
+#include <ydb/core/tx/columnshard/counters/duplicate_filtering.h>
+#include <ydb/core/tx/columnshard/engines/scheme/abstract/index_info.h>
+#include <ydb/core/tx/conveyor/usage/abstract.h>
+
+#include <ydb/library/actors/interconnect/types.h>
+
+namespace NKikimr::NOlap::NReader::NSimple::NDuplicateFiltering {
+
+class TBuildDuplicateFilters: public NConveyor::ITask {
+private:
+ THashMap<TDuplicateMapInfo, std::shared_ptr<TColumnsData>> SourcesById;
+ std::shared_ptr<arrow::Schema> PKSchema;
+ std::vector<std::string> VersionColumnNames;
+ TActorId Owner;
+ NColumnShard::TDuplicateFilteringCounters Counters;
+ std::optional<NArrow::NMerger::TCursor> MaxVersion;
+ NArrow::TSimpleRow Finish;
+ bool IncludeFinish;
+
+private:
+ virtual void DoExecute(const std::shared_ptr<ITask>& /*taskPtr*/) override;
+ virtual void DoOnCannotExecute(const TString& reason) override;
+
+ virtual TString GetTaskClassIdentifier() const override {
+ return "BUILD_DUPLICATE_FILTERS";
+ }
+
+public:
+ TBuildDuplicateFilters(const std::shared_ptr<arrow::Schema>& sortingSchema, const std::optional<NArrow::NMerger::TCursor>& maxVersion,
+ const NArrow::TSimpleRow& finish, const bool includeFinish, const NColumnShard::TDuplicateFilteringCounters& counters,
+ const TActorId& owner)
+ : PKSchema(sortingSchema)
+ , VersionColumnNames(IIndexInfo::GetSnapshotColumnNames())
+ , Owner(owner)
+ , Counters(counters)
+ , MaxVersion(maxVersion)
+ , Finish(finish)
+ , IncludeFinish(includeFinish) {
+ AFL_VERIFY(finish.GetSchema()->Equals(sortingSchema));
+ }
+
+ void AddSource(const std::shared_ptr<TColumnsData>& batch, const TDuplicateMapInfo& interval) {
+ AFL_VERIFY(interval.GetRowsCount());
+ AFL_VERIFY(interval.GetOffset() < batch->GetData()->GetRecordsCount())("interval", interval.DebugString())(
+ "records", batch->GetData()->GetRecordsCount());
+ AFL_VERIFY(SourcesById.emplace(interval, batch).second);
+ }
+};
+
+} // namespace NKikimr::NOlap::NReader::NSimple
diff --git a/ydb/core/tx/columnshard/engines/reader/simple_reader/duplicates/ya.make b/ydb/core/tx/columnshard/engines/reader/simple_reader/duplicates/ya.make
index 3e972005540..3f3851979bf 100644
--- a/ydb/core/tx/columnshard/engines/reader/simple_reader/duplicates/ya.make
+++ b/ydb/core/tx/columnshard/engines/reader/simple_reader/duplicates/ya.make
@@ -3,6 +3,8 @@ LIBRARY()
SRCS(
manager.cpp
events.cpp
+ merge.cpp
+ common.cpp
)
PEERDIR(
diff --git a/ydb/core/tx/columnshard/operations/slice_builder/pack_builder.cpp b/ydb/core/tx/columnshard/operations/slice_builder/pack_builder.cpp
index f697f6d3a4c..1661a6bf831 100644
--- a/ydb/core/tx/columnshard/operations/slice_builder/pack_builder.cpp
+++ b/ydb/core/tx/columnshard/operations/slice_builder/pack_builder.cpp
@@ -160,7 +160,7 @@ public:
NArrow::NMerger::TMergePartialStream stream(context.GetActualSchema()->GetIndexInfo().GetReplaceKey(), dataSchema, false,
{ IIndexInfo::GetWriteIdField()->name() }, std::nullopt);
for (auto&& i : containers) {
- stream.AddSource(i, nullptr);
+ stream.AddSource(i, nullptr, NArrow::NMerger::TIterationOrder::Forward(0));
}
NArrow::NMerger::TRecordBatchBuilder rbBuilder(dataSchema->fields(), recordsCountSum);
stream.DrainAll(rbBuilder);