diff options
author | Semyon <yentsovsemyon@ydb.tech> | 2025-06-03 18:58:22 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2025-06-03 15:58:22 +0000 |
commit | 9f92ebb034cea30aa04d9bcce9a98184a60af1a0 (patch) | |
tree | 67d221be4612353f403761933ecfbf4b7fb4e733 | |
parent | 7ae0076cf92f2aa55f0e3742326d81d906cf2d12 (diff) | |
download | ydb-9f92ebb034cea30aa04d9bcce9a98184a60af1a0.tar.gz |
duplicate filter construction task (#19120)
18 files changed, 322 insertions, 22 deletions
diff --git a/ydb/core/formats/arrow/reader/batch_iterator.h b/ydb/core/formats/arrow/reader/batch_iterator.h index 91d47d78041..5fba3f53fc5 100644 --- a/ydb/core/formats/arrow/reader/batch_iterator.h +++ b/ydb/core/formats/arrow/reader/batch_iterator.h @@ -4,6 +4,26 @@ namespace NKikimr::NArrow::NMerger { +class TIterationOrder { +private: + YDB_READONLY_DEF(bool, IsReversed); + YDB_READONLY_DEF(ui64, Start); + +public: + TIterationOrder(const bool reverse, const ui64 start) + : IsReversed(reverse) + , Start(start) { + } + + static TIterationOrder Forward(const ui64 start) { + return TIterationOrder(false, start); + } + + static TIterationOrder Reversed(const ui64 start) { + return TIterationOrder(true, start); + } +}; + class TBatchIterator { private: bool ControlPointFlag; @@ -16,11 +36,12 @@ private: std::shared_ptr<NArrow::TColumnFilter> Filter; std::shared_ptr<NArrow::TColumnFilter::TIterator> FilterIterator; - i32 GetFirstPosition() const { + i32 GetPosition(const ui64 position) const { + AFL_VERIFY((i64)position < RecordsCount); if (ReverseSortKff > 0) { - return 0; + return position; } else { - return RecordsCount - 1; + return RecordsCount - position - 1; } } @@ -52,20 +73,24 @@ public: template <class TDataContainer> TBatchIterator(std::shared_ptr<TDataContainer> batch, std::shared_ptr<NArrow::TColumnFilter> filter, const arrow::Schema& keySchema, - const arrow::Schema& dataSchema, const bool reverseSort, const std::vector<std::string>& versionColumnNames, const ui64 sourceId) + const arrow::Schema& dataSchema, const std::vector<std::string>& versionColumnNames, const ui64 sourceId, + const TIterationOrder& order = TIterationOrder::Forward(0)) : ControlPointFlag(false) - , KeyColumns(batch, 0, keySchema.field_names(), dataSchema.field_names(), reverseSort) + , KeyColumns(batch, 0, keySchema.field_names(), dataSchema.field_names(), order.GetIsReversed()) , VersionColumns(batch, 0, versionColumnNames, {}, false) , RecordsCount(batch->num_rows()) - , ReverseSortKff(reverseSort ? -1 : 1) + , ReverseSortKff(order.GetIsReversed() ? -1 : 1) , SourceId(sourceId) , Filter(filter) { AFL_VERIFY(KeyColumns.IsSameSortingSchema(keySchema))("batch", KeyColumns.DebugJson())("schema", keySchema.ToString()); AFL_VERIFY(KeyColumns.IsSameDataSchema(dataSchema))("batch", KeyColumns.DebugJson())("schema", dataSchema.ToString()); - Y_ABORT_UNLESS(KeyColumns.InitPosition(GetFirstPosition())); - Y_ABORT_UNLESS(VersionColumns.InitPosition(GetFirstPosition())); + Y_ABORT_UNLESS(KeyColumns.InitPosition(GetPosition(order.GetStart()))); + Y_ABORT_UNLESS(VersionColumns.InitPosition(GetPosition(order.GetStart()))); if (Filter) { - FilterIterator = std::make_shared<NArrow::TColumnFilter::TIterator>(Filter->GetIterator(reverseSort, RecordsCount)); + FilterIterator = std::make_shared<NArrow::TColumnFilter::TIterator>(Filter->GetIterator(order.GetIsReversed(), RecordsCount)); + if (order.GetStart()) { + FilterIterator->Next(order.GetStart()); + } } } diff --git a/ydb/core/formats/arrow/reader/merger.h b/ydb/core/formats/arrow/reader/merger.h index b49e1b6a67e..282312361b2 100644 --- a/ydb/core/formats/arrow/reader/merger.h +++ b/ydb/core/formats/arrow/reader/merger.h @@ -159,18 +159,20 @@ public: } template <class TDataContainer> - void AddSource(const std::shared_ptr<TDataContainer>& batch, const std::shared_ptr<NArrow::TColumnFilter>& filter, - const std::optional<ui64> sourceIdExt = std::nullopt) { + void AddSource(const std::shared_ptr<TDataContainer>& batch, + const std::shared_ptr<NArrow::TColumnFilter>& filter, const TIterationOrder& order, const std::optional<ui64> sourceIdExt = std::nullopt) { + AFL_VERIFY(order.GetIsReversed() == Reverse); const ui64 sourceId = sourceIdExt.value_or(SortHeap.Size()); - if (!batch || !batch->num_rows()) { + if (!batch || (i64)batch->num_rows() == (i64)order.GetStart()) { return; } + AFL_VERIFY((i64)order.GetStart() < (i64)batch->num_rows())("start", order.GetStart())("num_rows", batch->num_rows()); // Y_DEBUG_ABORT_UNLESS(NArrow::IsSorted(batch, SortSchema)); const bool isDenyFilter = filter && filter->IsTotalDenyFilter(); auto filterImpl = (!filter || filter->IsTotalAllowFilter()) ? nullptr : filter; static const arrow::Schema emptySchema = arrow::Schema(arrow::FieldVector()); - TBatchIterator iterator( - batch, filterImpl, *SortSchema, (!isDenyFilter && DataSchema) ? *DataSchema : emptySchema, Reverse, VersionColumnNames, sourceId); + TBatchIterator iterator(batch, filterImpl, *SortSchema, (!isDenyFilter && DataSchema) ? *DataSchema : emptySchema, VersionColumnNames, + sourceId, TIterationOrder(Reverse, order.GetStart())); if (MaxVersion) { MaxVersion->ValidateSchema(*iterator.GetVersionColumns().GetSorting()); } diff --git a/ydb/core/formats/arrow/ut/ut_arrow.cpp b/ydb/core/formats/arrow/ut/ut_arrow.cpp index ccbf117a5ee..f531ca4f4c6 100644 --- a/ydb/core/formats/arrow/ut/ut_arrow.cpp +++ b/ydb/core/formats/arrow/ut/ut_arrow.cpp @@ -694,7 +694,7 @@ Y_UNIT_TEST_SUITE(ArrowTest) { auto merger = std::make_shared<NArrow::NMerger::TMergePartialStream>(batch->schema(), batch->schema(), false, vColumns, std::nullopt); for (auto&& i : batches) { - merger->AddSource(i, nullptr); + merger->AddSource(i, nullptr, NArrow::NMerger::TIterationOrder::Forward(0)); } merger->DrainAll(builder); sorted = builder.Finalize(); @@ -721,7 +721,7 @@ Y_UNIT_TEST_SUITE(ArrowTest) { const std::vector<std::string> vColumns = {batch->schema()->field(0)->name()}; auto merger = std::make_shared<NArrow::NMerger::TMergePartialStream>(batch->schema(), batch->schema(), true, vColumns, std::nullopt); for (auto&& i : batches) { - merger->AddSource(i, nullptr); + merger->AddSource(i, nullptr, NArrow::NMerger::TIterationOrder::Reversed(0)); } merger->DrainAll(builder); sorted = builder.Finalize(); @@ -748,7 +748,7 @@ Y_UNIT_TEST_SUITE(ArrowTest) { auto merger = std::make_shared<NArrow::NMerger::TMergePartialStream>(batch->schema(), batches[0]->schema(), false, vColumns, std::nullopt); for (auto&& i : batches) { - merger->AddSource(i, nullptr); + merger->AddSource(i, nullptr, NArrow::NMerger::TIterationOrder::Forward(0)); } merger->DrainAll(builder); sorted = builder.Finalize(); @@ -787,7 +787,7 @@ Y_UNIT_TEST_SUITE(ArrowTest) { auto merger = std::make_shared<NArrow::NMerger::TMergePartialStream>(batch->schema(), batches[0]->schema(), false, vColumns, maxVersionCursor); for (auto&& i : batches) { - merger->AddSource(i, nullptr); + merger->AddSource(i, nullptr, NArrow::NMerger::TIterationOrder::Forward(0)); } merger->DrainAll(builder); sorted = builder.Finalize(); @@ -833,7 +833,7 @@ Y_UNIT_TEST_SUITE(ArrowTest) { auto merger = std::make_shared<NArrow::NMerger::TMergePartialStream>(sortingSchema, batches[0]->schema(), false, vColumns, maxVersionCursor); for (auto&& i : batches) { - merger->AddSource(i, nullptr); + merger->AddSource(i, nullptr, NArrow::NMerger::TIterationOrder::Forward(0)); } merger->DrainAll(builder); sorted = builder.Finalize(); diff --git a/ydb/core/tx/columnshard/columnshard_private_events.h b/ydb/core/tx/columnshard/columnshard_private_events.h index 9ae18b686e6..f88aa69b39c 100644 --- a/ydb/core/tx/columnshard/columnshard_private_events.h +++ b/ydb/core/tx/columnshard/columnshard_private_events.h @@ -70,6 +70,7 @@ struct TEvPrivate { EvMetadataAccessorsInfo, EvRequestFilter, + EvFilterConstructionResult, EvEnd }; diff --git a/ydb/core/tx/columnshard/common/snapshot.h b/ydb/core/tx/columnshard/common/snapshot.h index 4f9e3841f79..7e1b7a78b14 100644 --- a/ydb/core/tx/columnshard/common/snapshot.h +++ b/ydb/core/tx/columnshard/common/snapshot.h @@ -102,6 +102,10 @@ public: TString DebugString() const; NJson::TJsonValue DebugJson() const; + + operator size_t() const { + return CombineHashes(PlanStep, TxId); + } }; } // namespace NKikimr::NOlap diff --git a/ydb/core/tx/columnshard/counters/counters_manager.h b/ydb/core/tx/columnshard/counters/counters_manager.h index 2f6b2c4ad86..2ade7ca6770 100644 --- a/ydb/core/tx/columnshard/counters/counters_manager.h +++ b/ydb/core/tx/columnshard/counters/counters_manager.h @@ -17,6 +17,7 @@ #include <ydb/core/tablet/tablet_counters.h> #include <ydb/core/tablet_flat/tablet_flat_executor.h> #include <ydb/core/tx/columnshard/common/path_id.h> +#include <ydb/core/tx/columnshard/counters/duplicate_filtering.h> #include <ydb/core/tx/columnshard/engines/column_engine.h> #include <library/cpp/time_provider/time_provider.h> @@ -37,6 +38,7 @@ private: YDB_READONLY(TIndexationCounters, IndexationCounters, TIndexationCounters("Indexation")); YDB_READONLY(TIndexationCounters, CompactionCounters, TIndexationCounters("GeneralCompaction")); YDB_READONLY(TScanCounters, ScanCounters, TScanCounters("Scan")); + YDB_READONLY_DEF(TDuplicateFilteringCounters, DuplicateFilteringCounters); YDB_READONLY_DEF(std::shared_ptr<TRequestsTracerCounters>, RequestsTracingCounters); YDB_READONLY_DEF(std::shared_ptr<NOlap::NResourceBroker::NSubscribe::TSubscriberCounters>, SubscribeCounters); diff --git a/ydb/core/tx/columnshard/counters/duplicate_filtering.cpp b/ydb/core/tx/columnshard/counters/duplicate_filtering.cpp new file mode 100644 index 00000000000..daf1817df9b --- /dev/null +++ b/ydb/core/tx/columnshard/counters/duplicate_filtering.cpp @@ -0,0 +1,10 @@ +#include "duplicate_filtering.h" + +namespace NKikimr::NColumnShard { +TDuplicateFilteringCounters::TDuplicateFilteringCounters() + : TBase("DuplicateFiltering") + , MergeRowsAccepted(TBase::GetDeriviative("SourcesMerging/RowsAccepted")) + , MergeRowsRejected(TBase::GetDeriviative("SourcesMerging/RowsRejected")) + , MergeRowsBulkAccepted(TBase::GetDeriviative("SourcesMerging/RowsBulkAccepted")) { +} +} // namespace NKikimr::NColumnShard diff --git a/ydb/core/tx/columnshard/counters/duplicate_filtering.h b/ydb/core/tx/columnshard/counters/duplicate_filtering.h new file mode 100644 index 00000000000..06cda0b95c2 --- /dev/null +++ b/ydb/core/tx/columnshard/counters/duplicate_filtering.h @@ -0,0 +1,26 @@ +#pragma once + +#include <ydb/library/signals/histogram.h> +#include <ydb/library/signals/owner.h> + +#include <library/cpp/monlib/dynamic_counters/counters.h> + +namespace NKikimr::NColumnShard { +class TDuplicateFilteringCounters: public TCommonCountersOwner { +private: + using TBase = TCommonCountersOwner; + + NMonitoring::TDynamicCounters::TCounterPtr MergeRowsAccepted; + NMonitoring::TDynamicCounters::TCounterPtr MergeRowsRejected; + NMonitoring::TDynamicCounters::TCounterPtr MergeRowsBulkAccepted; + +public: + TDuplicateFilteringCounters(); + + void OnRowsMerged(const ui64 accepted, const ui64 rejected, const ui64 bulkAccepted) const { + MergeRowsAccepted->Add(accepted); + MergeRowsRejected->Add(rejected); + MergeRowsBulkAccepted->Add(bulkAccepted); + } +}; +} // namespace NKikimr::NColumnShard diff --git a/ydb/core/tx/columnshard/counters/ya.make b/ydb/core/tx/columnshard/counters/ya.make index fd81a293fbf..5f54a69ba72 100644 --- a/ydb/core/tx/columnshard/counters/ya.make +++ b/ydb/core/tx/columnshard/counters/ya.make @@ -16,6 +16,7 @@ SRCS( portions.cpp writes_monitor.cpp portion_index.cpp + duplicate_filtering.cpp ) PEERDIR( diff --git a/ydb/core/tx/columnshard/engines/changes/compaction/merger.cpp b/ydb/core/tx/columnshard/engines/changes/compaction/merger.cpp index c3a38a045e7..3a1e3571bde 100644 --- a/ydb/core/tx/columnshard/engines/changes/compaction/merger.cpp +++ b/ydb/core/tx/columnshard/engines/changes/compaction/merger.cpp @@ -221,7 +221,7 @@ std::vector<TWritePortionInfoWithBlobsResult> TMerger::Execute(const std::shared IColumnMerger::PortionRecordIndexFieldName); batch->AddField(IColumnMerger::PortionRecordIndexField, column->BuildArray(batch->num_rows())).Validate(); } - mergeStream.AddSource(batch, Filters[idx]); + mergeStream.AddSource(batch, Filters[idx], NArrow::NMerger::TIterationOrder::Forward(0)); ++idx; } auto batchResults = mergeStream.DrainAllParts(checkPoints, indexFields); diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/merge.cpp b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/merge.cpp index a27739ffcdf..411301cb67a 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/merge.cpp +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/merge.cpp @@ -133,7 +133,8 @@ TConclusionStatus TStartMergeTask::DoExecuteImpl() { if (!i->GetStageResult().IsEmpty()) { isEmpty = false; } - Merger->AddSource(rb, i->GetStageResult().GetNotAppliedFilter()); + Merger->AddSource(rb, i->GetStageResult().GetNotAppliedFilter(), + NArrow::NMerger::TIterationOrder(Context->GetCommonContext()->IsReverse(), 0)); } } AFL_VERIFY(Merger->GetSourcesCount() <= Sources.size()); diff --git a/ydb/core/tx/columnshard/engines/reader/simple_reader/duplicates/common.cpp b/ydb/core/tx/columnshard/engines/reader/simple_reader/duplicates/common.cpp new file mode 100644 index 00000000000..b69eab2aa0c --- /dev/null +++ b/ydb/core/tx/columnshard/engines/reader/simple_reader/duplicates/common.cpp @@ -0,0 +1,3 @@ +#include "common.h" + +namespace NKikimr::NOlap::NReader::NSimple::NDuplicateFiltering {} diff --git a/ydb/core/tx/columnshard/engines/reader/simple_reader/duplicates/common.h b/ydb/core/tx/columnshard/engines/reader/simple_reader/duplicates/common.h new file mode 100644 index 00000000000..13cd548bb50 --- /dev/null +++ b/ydb/core/tx/columnshard/engines/reader/simple_reader/duplicates/common.h @@ -0,0 +1,61 @@ +#pragma once + +#include <ydb/core/formats/arrow/common/container.h> +#include <ydb/core/tx/columnshard/common/snapshot.h> +#include <ydb/core/tx/limiter/grouped_memory/usage/abstract.h> + +#include <ydb/library/accessor/accessor.h> + +namespace NKikimr::NOlap::NReader::NSimple::NDuplicateFiltering { + +class TColumnsData { +private: + YDB_READONLY_DEF(std::shared_ptr<NArrow::TGeneralContainer>, Data); + YDB_READONLY_DEF(std::shared_ptr<NGroupedMemoryManager::TAllocationGuard>, MemoryGuard); + +public: + TColumnsData(const std::shared_ptr<NArrow::TGeneralContainer>& data, const std::shared_ptr<NGroupedMemoryManager::TAllocationGuard>& memory) + : Data(data) + , MemoryGuard(memory) { + AFL_VERIFY(MemoryGuard); + } + + ui64 GetRawSize() const { + return MemoryGuard->GetMemory(); + } +}; + +class TDuplicateMapInfo { +private: + TSnapshot MaxVersion; + YDB_READONLY_DEF(ui64, Offset); + YDB_READONLY_DEF(ui64, RowsCount); + YDB_READONLY_DEF(ui64, SourceId); + +public: + TDuplicateMapInfo(const TSnapshot& maxVersion, const ui64 offset, const ui64 rowsCount, const ui64 sourceId) + : MaxVersion(maxVersion) + , Offset(offset) + , RowsCount(rowsCount) + , SourceId(sourceId) { + } + + operator size_t() const { + ui64 h = 0; + h = CombineHashes(h, (size_t)MaxVersion); + h = CombineHashes(h, Offset); + h = CombineHashes(h, RowsCount); + h = CombineHashes(h, SourceId); + return h; + } + bool operator==(const TDuplicateMapInfo& other) const { + return std::tie(MaxVersion, Offset, RowsCount, SourceId) == std::tie(other.MaxVersion, other.Offset, other.RowsCount, other.SourceId); + } + + TString DebugString() const { + return TStringBuilder() << "MaxVersion=" << MaxVersion.DebugString() << ";Offset=" << Offset << ";RowsCount=" << RowsCount + << ";SourceId=" << SourceId; + } +}; + +} // namespace NKikimr::NOlap::NReader::NSimple::NDuplicateFiltering diff --git a/ydb/core/tx/columnshard/engines/reader/simple_reader/duplicates/events.h b/ydb/core/tx/columnshard/engines/reader/simple_reader/duplicates/events.h index 6a386fdde4f..51c35837a30 100644 --- a/ydb/core/tx/columnshard/engines/reader/simple_reader/duplicates/events.h +++ b/ydb/core/tx/columnshard/engines/reader/simple_reader/duplicates/events.h @@ -1,10 +1,13 @@ #pragma once +#include "common.h" + #include <ydb/core/formats/arrow/arrow_filter.h> #include <ydb/core/tx/columnshard/columnshard_private_events.h> #include <ydb/core/tx/columnshard/engines/reader/common_reader/iterator/source.h> #include <ydb/library/actors/core/event_local.h> +#include <ydb/library/conclusion/result.h> namespace NKikimr::NOlap::NReader::NSimple { class IDataSource; @@ -32,4 +35,31 @@ public: TEvRequestFilter(const IDataSource& source, const std::shared_ptr<IFilterSubscriber>& subscriber); }; +class TEvFilterConstructionResult + : public NActors::TEventLocal<TEvFilterConstructionResult, NColumnShard::TEvPrivate::EvFilterConstructionResult> { +private: + using TFilters = THashMap<TDuplicateMapInfo, NArrow::TColumnFilter>; + TConclusion<TFilters> Result; + +public: + TEvFilterConstructionResult(TConclusion<TFilters>&& result) + : Result(std::move(result)) { + if (Result.IsSuccess()) { + for (const auto& [info, filter] : *Result) { + AFL_VERIFY(!!filter.GetRecordsCount() && filter.GetRecordsCountVerified() == info.GetRowsCount())( + "filter", filter.GetRecordsCount().value_or(0))( + "info", info.DebugString()); + } + } + } + + const TConclusion<TFilters>& GetConclusion() const { + return Result; + } + + TFilters&& ExtractResult() { + return Result.DetachResult(); + } +}; + } // namespace NKikimr::NOlap::NReader::NSimple::NDuplicateFiltering diff --git a/ydb/core/tx/columnshard/engines/reader/simple_reader/duplicates/merge.cpp b/ydb/core/tx/columnshard/engines/reader/simple_reader/duplicates/merge.cpp new file mode 100644 index 00000000000..59ef376ebb4 --- /dev/null +++ b/ydb/core/tx/columnshard/engines/reader/simple_reader/duplicates/merge.cpp @@ -0,0 +1,76 @@ +#include "merge.h" + +#include <ydb/core/tx/columnshard/engines/reader/simple_reader/duplicates/events.h> + +namespace NKikimr::NOlap::NReader::NSimple::NDuplicateFiltering { + +class TFiltersBuilder { +private: + THashMap<ui64, NArrow::TColumnFilter> Filters; + YDB_READONLY(ui64, RowsAdded, 0); + YDB_READONLY(ui64, RowsSkipped, 0); + + void AddImpl(const ui64 sourceId, const bool value) { + auto* findFilter = Filters.FindPtr(sourceId); + AFL_VERIFY(findFilter); + findFilter->Add(value); + } + +public: + void AddRecord(const NArrow::NMerger::TBatchIterator& cursor) { + AddImpl(cursor.GetSourceId(), true); + ++RowsAdded; + } + + void SkipRecord(const NArrow::NMerger::TBatchIterator& cursor) { + AddImpl(cursor.GetSourceId(), false); + ++RowsSkipped; + } + + void ValidateDataSchema(const std::shared_ptr<arrow::Schema>& /*schema*/) const { + } + + bool IsBufferExhausted() const { + return false; + } + + THashMap<ui64, NArrow::TColumnFilter>&& ExtractFilters() && { + return std::move(Filters); + } + + void AddSource(const ui64 sourceId) { + AFL_VERIFY(Filters.emplace(sourceId, NArrow::TColumnFilter::BuildAllowFilter()).second); + } +}; + +void TBuildDuplicateFilters::DoExecute(const std::shared_ptr<ITask>& /*taskPtr*/) { + NArrow::NMerger::TMergePartialStream merger(PKSchema, nullptr, false, VersionColumnNames, MaxVersion); + merger.PutControlPoint(Finish.BuildSortablePosition(), false); + TFiltersBuilder filtersBuilder; + for (const auto& [interval, data] : SourcesById) { + merger.AddSource(data->GetData(), nullptr, NArrow::NMerger::TIterationOrder::Forward(interval.GetOffset()), interval.GetSourceId()); + filtersBuilder.AddSource(interval.GetSourceId()); + } + merger.DrainToControlPoint(filtersBuilder, IncludeFinish); + Counters.OnRowsMerged(filtersBuilder.GetRowsAdded(), filtersBuilder.GetRowsSkipped(), 0); + + THashMap<ui64, NArrow::TColumnFilter> filtersBySource = std::move(filtersBuilder).ExtractFilters(); + THashMap<TDuplicateMapInfo, NArrow::TColumnFilter> filters; + for (auto&& [interval, data] : SourcesById) { + NArrow::TColumnFilter* findFilter = filtersBySource.FindPtr(interval.GetSourceId()); + AFL_VERIFY(findFilter); + filters.emplace(interval, std::move(*findFilter)); + } + + AFL_VERIFY(Owner); + TActivationContext::AsActorContext().Send(Owner, new TEvFilterConstructionResult(std::move(filters))); + Owner = TActorId(); +} + +void TBuildDuplicateFilters::DoOnCannotExecute(const TString& reason) { + AFL_VERIFY(Owner); + TActivationContext::AsActorContext().Send(Owner, new TEvFilterConstructionResult(TConclusionStatus::Fail(reason))); + Owner = TActorId(); +} + +} // namespace NKikimr::NOlap::NReader::NSimple::NDuplicateFiltering diff --git a/ydb/core/tx/columnshard/engines/reader/simple_reader/duplicates/merge.h b/ydb/core/tx/columnshard/engines/reader/simple_reader/duplicates/merge.h new file mode 100644 index 00000000000..e12dbe66863 --- /dev/null +++ b/ydb/core/tx/columnshard/engines/reader/simple_reader/duplicates/merge.h @@ -0,0 +1,56 @@ +#pragma once + +#include "common.h" + +#include <ydb/core/formats/arrow/reader/merger.h> +#include <ydb/core/formats/arrow/rows/view.h> +#include <ydb/core/tx/columnshard/counters/duplicate_filtering.h> +#include <ydb/core/tx/columnshard/engines/scheme/abstract/index_info.h> +#include <ydb/core/tx/conveyor/usage/abstract.h> + +#include <ydb/library/actors/interconnect/types.h> + +namespace NKikimr::NOlap::NReader::NSimple::NDuplicateFiltering { + +class TBuildDuplicateFilters: public NConveyor::ITask { +private: + THashMap<TDuplicateMapInfo, std::shared_ptr<TColumnsData>> SourcesById; + std::shared_ptr<arrow::Schema> PKSchema; + std::vector<std::string> VersionColumnNames; + TActorId Owner; + NColumnShard::TDuplicateFilteringCounters Counters; + std::optional<NArrow::NMerger::TCursor> MaxVersion; + NArrow::TSimpleRow Finish; + bool IncludeFinish; + +private: + virtual void DoExecute(const std::shared_ptr<ITask>& /*taskPtr*/) override; + virtual void DoOnCannotExecute(const TString& reason) override; + + virtual TString GetTaskClassIdentifier() const override { + return "BUILD_DUPLICATE_FILTERS"; + } + +public: + TBuildDuplicateFilters(const std::shared_ptr<arrow::Schema>& sortingSchema, const std::optional<NArrow::NMerger::TCursor>& maxVersion, + const NArrow::TSimpleRow& finish, const bool includeFinish, const NColumnShard::TDuplicateFilteringCounters& counters, + const TActorId& owner) + : PKSchema(sortingSchema) + , VersionColumnNames(IIndexInfo::GetSnapshotColumnNames()) + , Owner(owner) + , Counters(counters) + , MaxVersion(maxVersion) + , Finish(finish) + , IncludeFinish(includeFinish) { + AFL_VERIFY(finish.GetSchema()->Equals(sortingSchema)); + } + + void AddSource(const std::shared_ptr<TColumnsData>& batch, const TDuplicateMapInfo& interval) { + AFL_VERIFY(interval.GetRowsCount()); + AFL_VERIFY(interval.GetOffset() < batch->GetData()->GetRecordsCount())("interval", interval.DebugString())( + "records", batch->GetData()->GetRecordsCount()); + AFL_VERIFY(SourcesById.emplace(interval, batch).second); + } +}; + +} // namespace NKikimr::NOlap::NReader::NSimple diff --git a/ydb/core/tx/columnshard/engines/reader/simple_reader/duplicates/ya.make b/ydb/core/tx/columnshard/engines/reader/simple_reader/duplicates/ya.make index 3e972005540..3f3851979bf 100644 --- a/ydb/core/tx/columnshard/engines/reader/simple_reader/duplicates/ya.make +++ b/ydb/core/tx/columnshard/engines/reader/simple_reader/duplicates/ya.make @@ -3,6 +3,8 @@ LIBRARY() SRCS( manager.cpp events.cpp + merge.cpp + common.cpp ) PEERDIR( diff --git a/ydb/core/tx/columnshard/operations/slice_builder/pack_builder.cpp b/ydb/core/tx/columnshard/operations/slice_builder/pack_builder.cpp index f697f6d3a4c..1661a6bf831 100644 --- a/ydb/core/tx/columnshard/operations/slice_builder/pack_builder.cpp +++ b/ydb/core/tx/columnshard/operations/slice_builder/pack_builder.cpp @@ -160,7 +160,7 @@ public: NArrow::NMerger::TMergePartialStream stream(context.GetActualSchema()->GetIndexInfo().GetReplaceKey(), dataSchema, false, { IIndexInfo::GetWriteIdField()->name() }, std::nullopt); for (auto&& i : containers) { - stream.AddSource(i, nullptr); + stream.AddSource(i, nullptr, NArrow::NMerger::TIterationOrder::Forward(0)); } NArrow::NMerger::TRecordBatchBuilder rbBuilder(dataSchema->fields(), recordsCountSum); stream.DrainAll(rbBuilder); |