diff options
22 files changed, 342 insertions, 19 deletions
diff --git a/ydb/core/formats/arrow/arrow_filter.cpp b/ydb/core/formats/arrow/arrow_filter.cpp index 7be4883bea8..f7544701c2c 100644 --- a/ydb/core/formats/arrow/arrow_filter.cpp +++ b/ydb/core/formats/arrow/arrow_filter.cpp @@ -297,6 +297,9 @@ const std::vector<bool>& TColumnFilter::BuildSimpleFilter() const { class TMergePolicyAnd { private: public: + static bool HasValue(const bool /*a*/, const bool /*b*/) { + return true; + } static bool Calc(const bool a, const bool b) { return a && b; } @@ -307,11 +310,17 @@ public: return TColumnFilter::BuildDenyFilter(); } } + static TColumnFilter MergeWithSimple(const bool simpleValue, const TColumnFilter& filter) { + return MergeWithSimple(filter, simpleValue); + } }; class TMergePolicyOr { private: public: + static bool HasValue(const bool /*a*/, const bool /*b*/) { + return true; + } static bool Calc(const bool a, const bool b) { return a || b; } @@ -322,6 +331,43 @@ public: return filter; } } + static TColumnFilter MergeWithSimple(const bool simpleValue, const TColumnFilter& filter) { + return MergeWithSimple(filter, simpleValue); + } +}; + +class TMergePolicyApplyFilter { +private: +public: + static bool HasValue(const bool /*a*/, const bool b) { + return b; + } + static bool Calc(const bool a, const bool /*b*/) { + return a; + } + static TColumnFilter MergeWithSimple(const TColumnFilter& filter, const bool simpleValue) { + if (simpleValue) { + return filter; + } else { + return TColumnFilter::BuildDenyFilter(); + } + } + static TColumnFilter MergeWithSimple(const bool simpleValue, const TColumnFilter& filter) { + const auto count = filter.GetRecordsCount(); + if (simpleValue) { + TColumnFilter result = TColumnFilter::BuildAllowFilter(); + if (count) { + result.Add(true, *count); + } + return result; + } else { + TColumnFilter result = TColumnFilter::BuildDenyFilter(); + if (count) { + result.Add(false, *count); + } + return result; + } + } }; class TColumnFilter::TMergerImpl { @@ -340,7 +386,7 @@ public: if (Filter1.empty() && Filter2.empty()) { return TColumnFilter(TMergePolicy::Calc(Filter1.DefaultFilterValue, Filter2.DefaultFilterValue)); } else if (Filter1.empty()) { - return TMergePolicy::MergeWithSimple(Filter2, Filter1.DefaultFilterValue); + return TMergePolicy::MergeWithSimple(Filter1.DefaultFilterValue, Filter2); } else if (Filter2.empty()) { return TMergePolicy::MergeWithSimple(Filter1, Filter2.DefaultFilterValue); } else { @@ -359,7 +405,7 @@ public: while (it1 != Filter1.Filter.cend() && it2 != Filter2.Filter.cend()) { const ui32 delta = TColumnFilter::CrossSize(pos2, pos2 + *it2, pos1, pos1 + *it1); - if (delta) { + if (delta && TMergePolicy::HasValue(curValue1, curValue2)) { if (!count || curCurrent != TMergePolicy::Calc(curValue1, curValue2)) { resultFilter.emplace_back(delta); curCurrent = TMergePolicy::Calc(curValue1, curValue2); @@ -405,10 +451,15 @@ TColumnFilter TColumnFilter::Or(const TColumnFilter& extFilter) const { return TMergerImpl(*this, extFilter).Merge<TMergePolicyOr>(); } +TColumnFilter TColumnFilter::ApplyFilterFrom(const TColumnFilter& extFilter) const { + ResetCaches(); + return TMergerImpl(*this, extFilter).Merge<TMergePolicyApplyFilter>(); +} + TColumnFilter TColumnFilter::CombineSequentialAnd(const TColumnFilter& extFilter) const { ResetCaches(); if (Filter.empty()) { - return TMergePolicyAnd::MergeWithSimple(extFilter, DefaultFilterValue); + return TMergePolicyAnd::MergeWithSimple(DefaultFilterValue, extFilter); } else if (extFilter.Filter.empty()) { return TMergePolicyAnd::MergeWithSimple(*this, extFilter.DefaultFilterValue); } else { diff --git a/ydb/core/formats/arrow/arrow_filter.h b/ydb/core/formats/arrow/arrow_filter.h index 9a135f5961a..4fae0884223 100644 --- a/ydb/core/formats/arrow/arrow_filter.h +++ b/ydb/core/formats/arrow/arrow_filter.h @@ -263,8 +263,9 @@ public: return TColumnFilter(false); } - TColumnFilter And(const TColumnFilter& extFilter) const Y_WARN_UNUSED_RESULT; - TColumnFilter Or(const TColumnFilter& extFilter) const Y_WARN_UNUSED_RESULT; + [[nodiscard]] TColumnFilter And(const TColumnFilter& extFilter) const; + [[nodiscard]] TColumnFilter Or(const TColumnFilter& extFilter) const; + [[nodiscard]] TColumnFilter ApplyFilterFrom(const TColumnFilter& filter) const; class TApplyContext { private: diff --git a/ydb/core/formats/arrow/program/collection.h b/ydb/core/formats/arrow/program/collection.h index 5d1f6a5b299..ccd62cc6be6 100644 --- a/ydb/core/formats/arrow/program/collection.h +++ b/ydb/core/formats/arrow/program/collection.h @@ -453,6 +453,9 @@ public: } void AddFilter(const TColumnFilter& filter) { + if (filter.IsTotalAllowFilter()) { + return; + } if (!UseFilter) { *Filter = Filter->And(filter); } else { diff --git a/ydb/core/formats/arrow/ut/ut_column_filter.cpp b/ydb/core/formats/arrow/ut/ut_column_filter.cpp index 5994e8c45ce..11acbe16261 100644 --- a/ydb/core/formats/arrow/ut/ut_column_filter.cpp +++ b/ydb/core/formats/arrow/ut/ut_column_filter.cpp @@ -28,6 +28,27 @@ Y_UNIT_TEST_SUITE(ColumnFilter) { UNIT_ASSERT_VALUES_EQUAL(JoinSeq(",", resultVec), "1,1,1,0,1,0,0"); } + Y_UNIT_TEST(ApplyFilterToFilter) { + TColumnFilter filter1({ true, true, true, false, true, true, false }); + TColumnFilter filter2({ false, true, true, true, true, false, false }); + + { + auto result = filter1.ApplyFilterFrom(filter2); + + UNIT_ASSERT_VALUES_EQUAL(result.GetRecordsCountVerified(), 4); + auto resultVec = result.BuildSimpleFilter(); + UNIT_ASSERT_VALUES_EQUAL(JoinSeq(",", resultVec), "1,1,0,1"); + } + + { + auto result = filter2.ApplyFilterFrom(filter1); + + UNIT_ASSERT_VALUES_EQUAL(result.GetRecordsCountVerified(), 5); + auto resultVec = result.BuildSimpleFilter(); + UNIT_ASSERT_VALUES_EQUAL(JoinSeq(",", resultVec), "0,1,1,1,0"); + } + } + Y_UNIT_TEST(FilterSlice) { TColumnFilter filter({ true, true, true, false, true, true, false }); { diff --git a/ydb/core/tx/columnshard/columnshard_private_events.h b/ydb/core/tx/columnshard/columnshard_private_events.h index be8b01f88be..779cfe83e53 100644 --- a/ydb/core/tx/columnshard/columnshard_private_events.h +++ b/ydb/core/tx/columnshard/columnshard_private_events.h @@ -69,6 +69,8 @@ struct TEvPrivate { EvRemovePortionDataAccessor, EvMetadataAccessorsInfo, + EvRequestFilter, + EvEnd }; diff --git a/ydb/core/tx/columnshard/counters/scan.h b/ydb/core/tx/columnshard/counters/scan.h index 9c5313f240f..f1cc245931f 100644 --- a/ydb/core/tx/columnshard/counters/scan.h +++ b/ydb/core/tx/columnshard/counters/scan.h @@ -354,6 +354,7 @@ private: std::shared_ptr<TAtomicCounter> ResourcesAllocationTasksCount = std::make_shared<TAtomicCounter>(); std::shared_ptr<TAtomicCounter> ResultsForSourceCount = std::make_shared<TAtomicCounter>(); std::shared_ptr<TAtomicCounter> ResultsForReplyGuard = std::make_shared<TAtomicCounter>(); + std::shared_ptr<TAtomicCounter> FilterFetchingGuard = std::make_shared<TAtomicCounter>(); std::shared_ptr<TAtomicCounter> TotalExecutionDurationUs = std::make_shared<TAtomicCounter>(); THashMap<ui32, std::shared_ptr<TAtomicCounter>> SkipNodesCount; THashMap<ui32, std::shared_ptr<TAtomicCounter>> ExecuteNodesCount; @@ -393,7 +394,8 @@ public: << "ReadTasksCount:" << ReadTasksCount->Val() << ";" << "ResourcesAllocationTasksCount:" << ResourcesAllocationTasksCount->Val() << ";" << "ResultsForSourceCount:" << ResultsForSourceCount->Val() << ";" - << "ResultsForReplyGuard:" << ResultsForReplyGuard->Val() << ";"; + << "ResultsForReplyGuard:" << ResultsForReplyGuard->Val() << ";" + << "FilterFetchingGuard:" << FilterFetchingGuard->Val() << ";"; } TCounterGuard GetResultsForReplyGuard() const { @@ -428,9 +430,14 @@ public: return TCounterGuard(AssembleTasksCount); } + TCounterGuard GetFilterFetchingGuard() const { + return TCounterGuard(FilterFetchingGuard); + } + bool InWaiting() const { return MergeTasksCount->Val() || AssembleTasksCount->Val() || ReadTasksCount->Val() || ResourcesAllocationTasksCount->Val() || - FetchAccessorsCount->Val() || ResultsForSourceCount->Val() || FetchBlobsCount->Val() || ResultsForReplyGuard->Val(); + FetchAccessorsCount->Val() || ResultsForSourceCount->Val() || FetchBlobsCount->Val() || ResultsForReplyGuard->Val() || + FilterFetchingGuard->Val(); } const THashMap<ui32, std::shared_ptr<TAtomicCounter>>& GetSkipStats() const { diff --git a/ydb/core/tx/columnshard/engines/reader/abstract/read_metadata.h b/ydb/core/tx/columnshard/engines/reader/abstract/read_metadata.h index aac729f0a4a..b369cec4737 100644 --- a/ydb/core/tx/columnshard/engines/reader/abstract/read_metadata.h +++ b/ydb/core/tx/columnshard/engines/reader/abstract/read_metadata.h @@ -58,6 +58,7 @@ protected: std::shared_ptr<ISnapshotSchema> ResultIndexSchema; ui64 TxId = 0; std::optional<ui64> LockId; + EDeduplicationPolicy DeduplicationPolicy = EDeduplicationPolicy::ALLOW_DUPLICATES; public: using TConstPtr = std::shared_ptr<const TReadMetadataBase>; @@ -107,6 +108,10 @@ public: return LockId; } + EDeduplicationPolicy GetDeduplicationPolicy() const { + return DeduplicationPolicy; + } + void OnReadFinished(NColumnShard::TColumnShard& owner) const { DoOnReadFinished(owner); } diff --git a/ydb/core/tx/columnshard/engines/reader/common/description.h b/ydb/core/tx/columnshard/engines/reader/common/description.h index 0572c4e8fc4..8c1edddea79 100644 --- a/ydb/core/tx/columnshard/engines/reader/common/description.h +++ b/ydb/core/tx/columnshard/engines/reader/common/description.h @@ -13,6 +13,11 @@ enum class ERequestSorting { DESC /* "descending" */, }; +enum class EDeduplicationPolicy { + ALLOW_DUPLICATES = 0, + PREVENT_DUPLICATES, +}; + // Describes read/scan request struct TReadDescription { private: @@ -35,6 +40,7 @@ public: // operations with potentially different columns. We have to remove columns to support -Inf (Null) and +Inf. std::shared_ptr<NOlap::TPKRangesFilter> PKRangesFilter; NYql::NDqProto::EDqStatsMode StatsMode = NYql::NDqProto::EDqStatsMode::DQ_STATS_MODE_NONE; + EDeduplicationPolicy DeduplicationPolicy = EDeduplicationPolicy::ALLOW_DUPLICATES; // List of columns std::vector<ui32> ColumnIds; diff --git a/ydb/core/tx/columnshard/engines/reader/common_reader/constructor/read_metadata.cpp b/ydb/core/tx/columnshard/engines/reader/common_reader/constructor/read_metadata.cpp index 1e3999e2ede..934a603e4df 100644 --- a/ydb/core/tx/columnshard/engines/reader/common_reader/constructor/read_metadata.cpp +++ b/ydb/core/tx/columnshard/engines/reader/common_reader/constructor/read_metadata.cpp @@ -41,6 +41,7 @@ TConclusionStatus TReadMetadata::Init( } StatsMode = readDescription.StatsMode; + DeduplicationPolicy = readDescription.DeduplicationPolicy; return TConclusionStatus::Success(); } diff --git a/ydb/core/tx/columnshard/engines/reader/simple_reader/duplicates/events.cpp b/ydb/core/tx/columnshard/engines/reader/simple_reader/duplicates/events.cpp new file mode 100644 index 00000000000..602e4aa26bb --- /dev/null +++ b/ydb/core/tx/columnshard/engines/reader/simple_reader/duplicates/events.cpp @@ -0,0 +1,16 @@ +#include "events.h" + +#include <ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/source.h> + +namespace NKikimr::NOlap::NReader::NSimple::NDuplicateFiltering { + +TEvRequestFilter::TEvRequestFilter(const IDataSource& source, const std::shared_ptr<IFilterSubscriber>& subscriber) + : MinPK(source.GetMinPK()) + , MaxPK(source.GetMaxPK()) + , SourceId(source.GetSourceId()) + , RecordsCount(source.GetRecordsCount()) + , MaxVersion(source.GetContext()->GetCommonContext()->GetReadMetadata()->GetRequestSnapshot()) + , Subscriber(subscriber) { +} + +} // namespace NKikimr::NOlap::NReader::NSimple::NDuplicateFiltering diff --git a/ydb/core/tx/columnshard/engines/reader/simple_reader/duplicates/events.h b/ydb/core/tx/columnshard/engines/reader/simple_reader/duplicates/events.h new file mode 100644 index 00000000000..6a386fdde4f --- /dev/null +++ b/ydb/core/tx/columnshard/engines/reader/simple_reader/duplicates/events.h @@ -0,0 +1,35 @@ +#pragma once + +#include <ydb/core/formats/arrow/arrow_filter.h> +#include <ydb/core/tx/columnshard/columnshard_private_events.h> +#include <ydb/core/tx/columnshard/engines/reader/common_reader/iterator/source.h> + +#include <ydb/library/actors/core/event_local.h> + +namespace NKikimr::NOlap::NReader::NSimple { +class IDataSource; +} + +namespace NKikimr::NOlap::NReader::NSimple::NDuplicateFiltering { + +class IFilterSubscriber { +public: + virtual void OnFilterReady(NArrow::TColumnFilter&&) = 0; + virtual void OnFailure(const TString& reason) = 0; + virtual ~IFilterSubscriber() = default; +}; + +class TEvRequestFilter: public NActors::TEventLocal<TEvRequestFilter, NColumnShard::TEvPrivate::EvRequestFilter> { +private: + NArrow::TSimpleRow MinPK; + NArrow::TSimpleRow MaxPK; + YDB_READONLY_DEF(ui64, SourceId); + YDB_READONLY_DEF(ui64, RecordsCount); + TSnapshot MaxVersion; + YDB_READONLY_DEF(std::shared_ptr<IFilterSubscriber>, Subscriber); + +public: + TEvRequestFilter(const IDataSource& source, const std::shared_ptr<IFilterSubscriber>& subscriber); +}; + +} // namespace NKikimr::NOlap::NReader::NSimple::NDuplicateFiltering diff --git a/ydb/core/tx/columnshard/engines/reader/simple_reader/duplicates/manager.cpp b/ydb/core/tx/columnshard/engines/reader/simple_reader/duplicates/manager.cpp new file mode 100644 index 00000000000..6a856b5b802 --- /dev/null +++ b/ydb/core/tx/columnshard/engines/reader/simple_reader/duplicates/manager.cpp @@ -0,0 +1,17 @@ +#include "manager.h" + +#include <ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/context.h> + +namespace NKikimr::NOlap::NReader::NSimple::NDuplicateFiltering { + +TDuplicateManager::TDuplicateManager(const TSpecialReadContext& /*context*/) + : TActor(&TDuplicateManager::StateMain) { +} + +void TDuplicateManager::Handle(const TEvRequestFilter::TPtr& ev) { + NArrow::TColumnFilter filter = NArrow::TColumnFilter::BuildAllowFilter(); + filter.Add(true, ev->Get()->GetRecordsCount()); + ev->Get()->GetSubscriber()->OnFilterReady(std::move(filter)); +} + +} // namespace NKikimr::NOlap::NReader::NSimple::NDuplicateFiltering diff --git a/ydb/core/tx/columnshard/engines/reader/simple_reader/duplicates/manager.h b/ydb/core/tx/columnshard/engines/reader/simple_reader/duplicates/manager.h new file mode 100644 index 00000000000..9b710f35925 --- /dev/null +++ b/ydb/core/tx/columnshard/engines/reader/simple_reader/duplicates/manager.h @@ -0,0 +1,33 @@ +#pragma once + +#include "events.h" + +#include <ydb/library/actors/core/actor_bootstrapped.h> + +namespace NKikimr::NOlap::NReader::NSimple { +class TSpecialReadContext; +} + +namespace NKikimr::NOlap::NReader::NSimple::NDuplicateFiltering { + +class TDuplicateManager: public NActors::TActor<TDuplicateManager> { +private: + STATEFN(StateMain) { + switch (ev->GetTypeRewrite()) { + hFunc(TEvRequestFilter, Handle); + hFunc(NActors::TEvents::TEvPoison, Handle); + default: + AFL_VERIFY(false)("unexpected_event", ev->GetTypeName()); + } + } + + void Handle(const TEvRequestFilter::TPtr&); + void Handle(const NActors::TEvents::TEvPoison::TPtr&) { + PassAway(); + } + +public: + TDuplicateManager(const TSpecialReadContext& context); +}; + +} // namespace NKikimr::NOlap::NReader::NSimple::NDuplicateFiltering diff --git a/ydb/core/tx/columnshard/engines/reader/simple_reader/duplicates/ya.make b/ydb/core/tx/columnshard/engines/reader/simple_reader/duplicates/ya.make new file mode 100644 index 00000000000..3e972005540 --- /dev/null +++ b/ydb/core/tx/columnshard/engines/reader/simple_reader/duplicates/ya.make @@ -0,0 +1,12 @@ +LIBRARY() + +SRCS( + manager.cpp + events.cpp +) + +PEERDIR( + ydb/core/tx/columnshard/engines/reader/common_reader/iterator +) + +END() diff --git a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/context.cpp b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/context.cpp index 59f3601bf55..59c560c9e98 100644 --- a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/context.cpp +++ b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/context.cpp @@ -2,6 +2,7 @@ #include "source.h" #include <ydb/core/tx/columnshard/engines/reader/common_reader/iterator/fetch_steps.h> +#include <ydb/core/tx/columnshard/engines/reader/simple_reader/duplicates/manager.h> #include <ydb/core/tx/limiter/grouped_memory/usage/service.h> namespace NKikimr::NOlap::NReader::NSimple { @@ -42,14 +43,15 @@ std::shared_ptr<TFetchingScript> TSpecialReadContext::DoGetColumnsFetchingPlan(c needShardingFilter = true; } } + const bool preventDuplicates = GetReadMetadata()->GetDeduplicationPolicy() == EDeduplicationPolicy::PREVENT_DUPLICATES; { auto& result = CacheFetchingScripts[needSnapshots ? 1 : 0][partialUsageByPK ? 1 : 0][useIndexes ? 1 : 0][needShardingFilter ? 1 : 0] - [hasDeletions ? 1 : 0]; + [hasDeletions ? 1 : 0][preventDuplicates ? 1 : 0]; if (result.NeedInitialization()) { TGuard<TMutex> g(Mutex); if (auto gInit = result.StartInitialization()) { gInit->InitializationFinished( - BuildColumnsFetchingPlan(needSnapshots, partialUsageByPK, useIndexes, needShardingFilter, hasDeletions)); + BuildColumnsFetchingPlan(needSnapshots, partialUsageByPK, useIndexes, needShardingFilter, hasDeletions, preventDuplicates)); } AFL_VERIFY(!result.NeedInitialization()); } @@ -58,7 +60,7 @@ std::shared_ptr<TFetchingScript> TSpecialReadContext::DoGetColumnsFetchingPlan(c } std::shared_ptr<TFetchingScript> TSpecialReadContext::BuildColumnsFetchingPlan(const bool needSnapshots, const bool partialUsageByPredicateExt, - const bool /*useIndexes*/, const bool needFilterSharding, const bool needFilterDeletion) const { + const bool /*useIndexes*/, const bool needFilterSharding, const bool needFilterDeletion, const bool preventDuplicates) const { const bool partialUsageByPredicate = partialUsageByPredicateExt && GetPredicateColumns()->GetColumnsCount(); NCommon::TFetchingScriptBuilder acc(*this); @@ -91,6 +93,9 @@ std::shared_ptr<TFetchingScript> TSpecialReadContext::BuildColumnsFetchingPlan(c acc.AddAssembleStep(*GetSpecColumns(), "SPEC", NArrow::NSSA::IMemoryCalculationPolicy::EStage::Filter, false); acc.AddStep(std::make_shared<TSnapshotFilter>()); } + if (preventDuplicates) { + acc.AddStep(std::make_shared<TDuplicateFilter>()); + } const auto& chainProgram = GetReadMetadata()->GetProgram().GetChainVerified(); acc.AddStep(std::make_shared<NCommon::TProgramStep>(chainProgram)); } @@ -99,8 +104,18 @@ std::shared_ptr<TFetchingScript> TSpecialReadContext::BuildColumnsFetchingPlan(c return std::move(acc).Build(); } -TSpecialReadContext::TSpecialReadContext(const std::shared_ptr<TReadContext>& commonContext) - : TBase(commonContext) { +void TSpecialReadContext::RegisterActors() { + AFL_VERIFY(!DuplicatesManager); + if (GetReadMetadata()->GetDeduplicationPolicy() == EDeduplicationPolicy::PREVENT_DUPLICATES) { + DuplicatesManager = NActors::TActivationContext::Register(new NDuplicateFiltering::TDuplicateManager(*this)); + } +} + +void TSpecialReadContext::UnregisterActors() { + if (DuplicatesManager) { + NActors::TActivationContext::AsActorContext().Send(DuplicatesManager, new NActors::TEvents::TEvPoison); + DuplicatesManager = TActorId(); + } } TString TSpecialReadContext::ProfileDebugString() const { @@ -109,8 +124,8 @@ TString TSpecialReadContext::ProfileDebugString() const { return (val & (1 << pos)) ? 1 : 0; }; - for (ui32 i = 0; i < (1 << 5); ++i) { - auto& script = CacheFetchingScripts[GetBit(i, 0)][GetBit(i, 1)][GetBit(i, 2)][GetBit(i, 3)][GetBit(i, 4)]; + for (ui32 i = 0; i < (1 << 6); ++i) { + auto& script = CacheFetchingScripts[GetBit(i, 0)][GetBit(i, 1)][GetBit(i, 2)][GetBit(i, 3)][GetBit(i, 4)][GetBit(i, 5)]; if (script.HasScript()) { sb << script.ProfileDebugString() << ";"; } diff --git a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/context.h b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/context.h index c1859a3062d..cb9b07b9834 100644 --- a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/context.h +++ b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/context.h @@ -11,18 +11,22 @@ namespace NKikimr::NOlap::NReader::NSimple { class IDataSource; +class TSourceConstructor; using TColumnsSet = NCommon::TColumnsSet; using TColumnsSetIds = NCommon::TColumnsSetIds; using EMemType = NCommon::EMemType; using TFetchingScript = NCommon::TFetchingScript; -class TSpecialReadContext: public NCommon::TSpecialReadContext { +class TSpecialReadContext: public NCommon::TSpecialReadContext, TNonCopyable { private: using TBase = NCommon::TSpecialReadContext; + TActorId DuplicatesManager = TActorId(); + +private: std::shared_ptr<TFetchingScript> BuildColumnsFetchingPlan(const bool needSnapshots, const bool partialUsageByPredicateExt, - const bool useIndexes, const bool needFilterSharding, const bool needFilterDeletion) const; + const bool useIndexes, const bool needFilterSharding, const bool needFilterDeletion, const bool needFilterDuplicates) const; TMutex Mutex; - std::array<std::array<std::array<std::array<std::array<NCommon::TFetchingScriptOwner, 2>, 2>, 2>, 2>, 2> CacheFetchingScripts; + std::array<std::array<std::array<std::array<std::array<std::array<NCommon::TFetchingScriptOwner, 2>, 2>, 2>, 2>, 2>, 2> CacheFetchingScripts; std::shared_ptr<TFetchingScript> AskAccumulatorsScript; virtual std::shared_ptr<TFetchingScript> DoGetColumnsFetchingPlan(const std::shared_ptr<NCommon::IDataSource>& source) override; @@ -30,7 +34,21 @@ private: public: virtual TString ProfileDebugString() const override; - TSpecialReadContext(const std::shared_ptr<TReadContext>& commonContext); + void RegisterActors(); + void UnregisterActors(); + + const TActorId& GetDuplicatesManagerVerified() const { + AFL_VERIFY(DuplicatesManager); + return DuplicatesManager; + } + + TSpecialReadContext(const std::shared_ptr<TReadContext>& commonContext) + : TBase(commonContext) { + } + + ~TSpecialReadContext() { + AFL_VERIFY(!DuplicatesManager); + } }; } // namespace NKikimr::NOlap::NReader::NSimple diff --git a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/fetching.cpp b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/fetching.cpp index 23447a08d49..3db14fb1c9a 100644 --- a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/fetching.cpp +++ b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/fetching.cpp @@ -3,6 +3,7 @@ #include "source.h" #include <ydb/core/tx/columnshard/engines/filter.h> +#include <ydb/core/tx/columnshard/engines/reader/simple_reader/duplicates/events.h> #include <ydb/core/tx/conveyor/usage/service.h> #include <ydb/core/tx/limiter/grouped_memory/usage/service.h> @@ -203,4 +204,37 @@ TConclusion<bool> TPrepareResultStep::DoExecuteInplace(const std::shared_ptr<IDa } } +void TDuplicateFilter::TFilterSubscriber::OnFilterReady(NArrow::TColumnFilter&& filter) { + if (auto source = Source.lock()) { + AFL_TRACE(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "fetch_filter")("source", source->GetSourceId())("filter", filter.DebugString()); + if (source->GetContext()->IsAborted()) { + return; + } + if (const std::shared_ptr<NArrow::TColumnFilter> appliedFilter = source->GetStageData().GetAppliedFilter()) { + filter = filter.ApplyFilterFrom(*appliedFilter); + } + source->MutableStageData().AddFilter(std::move(filter)); + Step.Next(); + auto task = std::make_shared<TStepAction>(source, std::move(Step), source->GetContext()->GetCommonContext()->GetScanActorId(), false); + NConveyor::TScanServiceOperator::SendTaskToExecute(task, source->GetContext()->GetCommonContext()->GetConveyorProcessId()); + } +} + +void TDuplicateFilter::TFilterSubscriber::OnFailure(const TString& reason) { + if (auto source = Source.lock()) { + source->GetContext()->GetCommonContext()->AbortWithError("cannot build duplicate filter: " + reason); + } +} + +TDuplicateFilter::TFilterSubscriber::TFilterSubscriber(const std::shared_ptr<IDataSource>& source, const TFetchingScriptCursor& step) + : Source(source) + , Step(step) + , TaskGuard(source->GetContext()->GetCommonContext()->GetCounters().GetFilterFetchingGuard()) { +} + +TConclusion<bool> TDuplicateFilter::DoExecuteInplace(const std::shared_ptr<IDataSource>& source, const TFetchingScriptCursor& step) const { + source->StartFetchingDuplicateFilter(std::make_shared<TFilterSubscriber>(source, step)); + return false; +} + } // namespace NKikimr::NOlap::NReader::NSimple diff --git a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/fetching.h b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/fetching.h index d1dcc822b03..3b9da2b28e0 100644 --- a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/fetching.h +++ b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/fetching.h @@ -4,6 +4,7 @@ #include <ydb/core/tx/columnshard/engines/reader/common/conveyor_task.h> #include <ydb/core/tx/columnshard/engines/reader/common_reader/iterator/columns_set.h> #include <ydb/core/tx/columnshard/engines/reader/common_reader/iterator/fetching.h> +#include <ydb/core/tx/columnshard/engines/reader/simple_reader/duplicates/events.h> #include <ydb/core/tx/columnshard/engines/scheme/abstract_scheme.h> #include <ydb/core/tx/columnshard/engines/scheme/index_info.h> #include <ydb/core/tx/limiter/grouped_memory/usage/abstract.h> @@ -212,4 +213,28 @@ public: } }; +class TDuplicateFilter: public IFetchingStep { +private: + using TBase = IFetchingStep; + + class TFilterSubscriber: public NDuplicateFiltering::IFilterSubscriber { + private: + std::weak_ptr<IDataSource> Source; + TFetchingScriptCursor Step; + NColumnShard::TCounterGuard TaskGuard; + + virtual void OnFilterReady(NArrow::TColumnFilter&& filter) override; + virtual void OnFailure(const TString& reason) override; + + public: + TFilterSubscriber(const std::shared_ptr<IDataSource>& source, const TFetchingScriptCursor& step); + }; + +public: + virtual TConclusion<bool> DoExecuteInplace(const std::shared_ptr<IDataSource>& source, const TFetchingScriptCursor& step) const override; + TDuplicateFilter() + : TBase("DUPLICATE") { + } +}; + } // namespace NKikimr::NOlap::NReader::NSimple diff --git a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/plain_read_data.h b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/plain_read_data.h index 19c8e14aced..5a64e9250e9 100644 --- a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/plain_read_data.h +++ b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/plain_read_data.h @@ -19,6 +19,7 @@ private: protected: virtual TConclusionStatus DoStart() override { + SpecialReadContext->RegisterActors(); return Scanner->Start(); } @@ -69,6 +70,7 @@ public: if (SpecialReadContext->IsActive()) { Abort("unexpected on destructor"); } + SpecialReadContext->UnregisterActors(); } }; diff --git a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/source.h b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/source.h index 0146875d97a..d2a0d456433 100644 --- a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/source.h +++ b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/source.h @@ -192,7 +192,7 @@ public: const TReplaceKeyAdapter& GetStart() const { return Start; } - const TReplaceKeyAdapter GetFinish() const { + const TReplaceKeyAdapter& GetFinish() const { return Finish; } @@ -250,6 +250,12 @@ public: return DoStartFetchingAccessor(sourcePtr, step); } + void StartFetchingDuplicateFilter(std::shared_ptr<NDuplicateFiltering::IFilterSubscriber>&& subscriber) { + NActors::TActivationContext::AsActorContext().Send( + std::static_pointer_cast<TSpecialReadContext>(GetContext())->GetDuplicatesManagerVerified(), + new NDuplicateFiltering::TEvRequestFilter(*this, std::move(subscriber))); + } + virtual TInternalPathId GetPathId() const = 0; virtual bool HasIndexes(const std::set<ui32>& indexIds) const = 0; @@ -260,6 +266,9 @@ public: virtual ui64 GetIndexRawBytes(const std::set<ui32>& indexIds) const = 0; + virtual NArrow::TSimpleRow GetMinPK() const = 0; + virtual NArrow::TSimpleRow GetMaxPK() const = 0; + void Abort() { DoAbort(); } @@ -430,6 +439,14 @@ public: return GetStageData().GetPortionAccessor().GetIndexRawBytes(indexIds, false); } + virtual NArrow::TSimpleRow GetMinPK() const override { + return Portion->GetMeta().IndexKeyStart(); + } + + virtual NArrow::TSimpleRow GetMaxPK() const override { + return Portion->GetMeta().IndexKeyEnd(); + } + const TPortionInfo& GetPortionInfo() const { return *Portion; } diff --git a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/ya.make b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/ya.make index baff60941f2..37599539e47 100644 --- a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/ya.make +++ b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/ya.make @@ -16,6 +16,7 @@ PEERDIR( ydb/core/tx/columnshard/engines/reader/common_reader/iterator ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/collections ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/sync_points + ydb/core/tx/columnshard/engines/reader/simple_reader/duplicates ydb/core/tx/conveyor/usage ydb/core/tx/limiter/grouped_memory/usage ) diff --git a/ydb/core/tx/columnshard/engines/reader/transaction/tx_internal_scan.cpp b/ydb/core/tx/columnshard/engines/reader/transaction/tx_internal_scan.cpp index 14c0cb9ef08..863407ef326 100644 --- a/ydb/core/tx/columnshard/engines/reader/transaction/tx_internal_scan.cpp +++ b/ydb/core/tx/columnshard/engines/reader/transaction/tx_internal_scan.cpp @@ -49,6 +49,7 @@ void TTxInternalScan::Complete(const TActorContext& ctx) { read.PathId = request.GetPathId(); read.LockId = LockId; read.ReadNothing = !Self->TablesManager.HasTable(read.PathId); + read.DeduplicationPolicy = EDeduplicationPolicy::PREVENT_DUPLICATES; std::unique_ptr<IScannerConstructor> scannerConstructor(new NPlain::TIndexScannerConstructor(context)); read.ColumnIds = request.GetColumnIds(); if (request.RangesFilter) { |