diff options
author | ivanmorozov333 <111685085+ivanmorozov333@users.noreply.github.com> | 2024-02-01 18:09:58 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-02-01 18:09:58 +0300 |
commit | 160238c33a4ed5bd4e088b7615690b6536a59ecf (patch) | |
tree | 919bc84ae808895f24ea57ad18d11acf2f10f859 | |
parent | 320f5201bac519d51b66e15b6e83c0ae38e69b44 (diff) | |
download | ydb-160238c33a4ed5bd4e088b7615690b6536a59ecf.tar.gz |
optimizer predicates usage on fetching (#1505)
* dont use predicate columns fetching and filter usage in case whole portion matched in predicates ranges
* fix
* fix
9 files changed, 100 insertions, 29 deletions
diff --git a/ydb/core/tx/columnshard/engines/predicate/filter.cpp b/ydb/core/tx/columnshard/engines/predicate/filter.cpp index 77816507e0c..10d66a832c1 100644 --- a/ydb/core/tx/columnshard/engines/predicate/filter.cpp +++ b/ydb/core/tx/columnshard/engines/predicate/filter.cpp @@ -84,6 +84,15 @@ bool TPKRangesFilter::IsPortionInUsage(const TPortionInfo& info, const TIndexInf return SortedRanges.empty(); } +bool TPKRangesFilter::IsPortionInPartialUsage(const NArrow::TReplaceKey& start, const NArrow::TReplaceKey& end, const TIndexInfo& indexInfo) const { + for (auto&& i : SortedRanges) { + if (i.IsPortionInPartialUsage(start, end, indexInfo)) { + return true; + } + } + return false; +} + TPKRangesFilter::TPKRangesFilter(const bool reverse) : ReverseFlag(reverse) { diff --git a/ydb/core/tx/columnshard/engines/predicate/filter.h b/ydb/core/tx/columnshard/engines/predicate/filter.h index d9448d43808..6d30adfdc22 100644 --- a/ydb/core/tx/columnshard/engines/predicate/filter.h +++ b/ydb/core/tx/columnshard/engines/predicate/filter.h @@ -38,6 +38,7 @@ public: } bool IsPortionInUsage(const TPortionInfo& info, const TIndexInfo& indexInfo) const; + bool IsPortionInPartialUsage(const NArrow::TReplaceKey& start, const NArrow::TReplaceKey& end, const TIndexInfo& indexInfo) const; NArrow::TColumnFilter BuildFilter(const arrow::Datum& data) const; diff --git a/ydb/core/tx/columnshard/engines/predicate/range.cpp b/ydb/core/tx/columnshard/engines/predicate/range.cpp index 53e10174b82..1025fa055ca 100644 --- a/ydb/core/tx/columnshard/engines/predicate/range.cpp +++ b/ydb/core/tx/columnshard/engines/predicate/range.cpp @@ -59,6 +59,37 @@ bool TPKRangeFilter::IsPortionInUsage(const TPortionInfo& info, const TIndexInfo return true; } +bool TPKRangeFilter::IsPortionInPartialUsage(const NArrow::TReplaceKey& start, const NArrow::TReplaceKey& end, const TIndexInfo& indexInfo) const { + bool startUsage = false; + bool endUsage = false; + if (auto from = PredicateFrom.ExtractKey(indexInfo.GetPrimaryKey())) { + AFL_VERIFY(from->Size() <= start.Size()); + if (PredicateFrom.IsInclude()) { + startUsage = std::is_lt(start.ComparePartNotNull(*from, from->Size())); + } else { + startUsage = std::is_lteq(start.ComparePartNotNull(*from, from->Size())); + } + } else { + startUsage = true; + } + + if (auto to = PredicateTo.ExtractKey(indexInfo.GetPrimaryKey())) { + AFL_VERIFY(to->Size() <= end.Size()); + if (PredicateTo.IsInclude()) { + endUsage = std::is_gt(end.ComparePartNotNull(*to, to->Size())); + } else { + endUsage = std::is_gteq(end.ComparePartNotNull(*to, to->Size())); + } + } else { + endUsage = true; + } + +// AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("start", start.DebugString())("end", end.DebugString())("from", PredicateFrom.DebugString())("to", PredicateTo.DebugString()) +// ("start_usage", startUsage)("end_usage", endUsage); + + return endUsage || startUsage; +} + std::optional<NKikimr::NOlap::TPKRangeFilter> TPKRangeFilter::Build(TPredicateContainer&& from, TPredicateContainer&& to) { if (!from.CrossRanges(to)) { AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "cannot_build_predicate_range")("error", "predicates from/to not intersected"); diff --git a/ydb/core/tx/columnshard/engines/predicate/range.h b/ydb/core/tx/columnshard/engines/predicate/range.h index 7e23da4f13d..ff84f35408a 100644 --- a/ydb/core/tx/columnshard/engines/predicate/range.h +++ b/ydb/core/tx/columnshard/engines/predicate/range.h @@ -41,6 +41,7 @@ public: NArrow::TColumnFilter BuildFilter(const arrow::Datum& data) const; bool IsPortionInUsage(const TPortionInfo& info, const TIndexInfo& indexInfo) const; + bool IsPortionInPartialUsage(const NArrow::TReplaceKey& start, const NArrow::TReplaceKey& end, const TIndexInfo& indexInfo) const; std::set<ui32> GetColumnIds(const TIndexInfo& indexInfo) const; TString DebugString() const; diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/context.cpp b/ydb/core/tx/columnshard/engines/reader/plain_reader/context.cpp index cf07daf49f8..6476e426c38 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/context.cpp +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/context.cpp @@ -19,21 +19,23 @@ ui64 TSpecialReadContext::GetMemoryForSources(const std::map<ui32, std::shared_p std::shared_ptr<NKikimr::NOlap::NPlainReader::IFetchingStep> TSpecialReadContext::GetColumnsFetchingPlan(const std::shared_ptr<IDataSource>& source, const bool exclusiveSource) const { const bool needSnapshots = !exclusiveSource || ReadMetadata->GetSnapshot() < source->GetRecordSnapshotMax(); - auto result = CacheFetchingScripts[needSnapshots ? 1 : 0][exclusiveSource ? 1 : 0]; + const bool partialUsageByPK = ReadMetadata->GetPKRangesFilter().IsPortionInPartialUsage(source->GetStartReplaceKey(), source->GetFinishReplaceKey(), ReadMetadata->GetIndexInfo()); + auto result = CacheFetchingScripts[needSnapshots ? 1 : 0][exclusiveSource ? 1 : 0][partialUsageByPK ? 1 : 0]; if (!result) { return std::make_shared<TBuildFakeSpec>(source->GetRecordsCount(), "fake"); } return result; } -std::shared_ptr<NKikimr::NOlap::NPlainReader::IFetchingStep> TSpecialReadContext::BuildColumnsFetchingPlan(const bool needSnapshots, const bool exclusiveSource) const { +std::shared_ptr<NKikimr::NOlap::NPlainReader::IFetchingStep> TSpecialReadContext::BuildColumnsFetchingPlan(const bool needSnapshots, const bool exclusiveSource, const bool partialUsageByPredicateExt) const { std::shared_ptr<IFetchingStep> result = std::make_shared<TFakeStep>(); std::shared_ptr<IFetchingStep> current = result; + const bool partialUsageByPredicate = partialUsageByPredicateExt && PredicateColumns->GetColumnsCount(); if (!!IndexChecker) { current = current->AttachNext(std::make_shared<TBlobsFetchingStep>(std::make_shared<TIndexesSet>(IndexChecker->GetIndexIds()))); current = current->AttachNext(std::make_shared<TApplyIndexStep>(IndexChecker)); } - if (!EFColumns->GetColumnsCount()) { + if (!EFColumns->GetColumnsCount() && !partialUsageByPredicate) { TColumnsSet columnsFetch = *FFColumns; if (needSnapshots) { columnsFetch = columnsFetch + *SpecColumns; @@ -52,6 +54,9 @@ std::shared_ptr<NKikimr::NOlap::NPlainReader::IFetchingStep> TSpecialReadContext if (needSnapshots || FFColumns->Contains(SpecColumns)) { columnsFetch = columnsFetch + *SpecColumns; } + if (partialUsageByPredicate) { + columnsFetch = columnsFetch + *PredicateColumns; + } AFL_VERIFY(columnsFetch.GetColumnsCount()); current = current->AttachNext(std::make_shared<TBlobsFetchingStep>(std::make_shared<TColumnsSet>(columnsFetch), "ef")); @@ -60,9 +65,13 @@ std::shared_ptr<NKikimr::NOlap::NPlainReader::IFetchingStep> TSpecialReadContext current = current->AttachNext(std::make_shared<TSnapshotFilter>()); columnsFetch = columnsFetch - *SpecColumns; } - current = current->AttachNext(std::make_shared<TAssemblerStep>(std::make_shared<TColumnsSet>(columnsFetch))); - if (!ReadMetadata->GetPKRangesFilter().IsEmpty()) { + if (partialUsageByPredicate) { + current = current->AttachNext(std::make_shared<TAssemblerStep>(PredicateColumns)); current = current->AttachNext(std::make_shared<TPredicateFilter>()); + columnsFetch = columnsFetch - *PredicateColumns; + } + if (columnsFetch.GetColumnsCount()) { + current = current->AttachNext(std::make_shared<TAssemblerStep>(std::make_shared<TColumnsSet>(columnsFetch))); } for (auto&& i : ReadMetadata->GetProgram().GetSteps()) { if (!i->IsFilterOnly()) { @@ -70,7 +79,7 @@ std::shared_ptr<NKikimr::NOlap::NPlainReader::IFetchingStep> TSpecialReadContext } current = current->AttachNext(std::make_shared<TFilterProgramStep>(i)); } - const TColumnsSet columnsAdditionalFetch = *FFColumns - *EFColumns - *SpecColumns; + const TColumnsSet columnsAdditionalFetch = *FFColumns - *EFColumns - *SpecColumns - *PredicateColumns; if (columnsAdditionalFetch.GetColumnsCount()) { current = current->AttachNext(std::make_shared<TBlobsFetchingStep>(std::make_shared<TColumnsSet>(columnsAdditionalFetch))); current = current->AttachNext(std::make_shared<TAssemblerStep>(std::make_shared<TColumnsSet>(columnsAdditionalFetch))); @@ -84,7 +93,7 @@ std::shared_ptr<NKikimr::NOlap::NPlainReader::IFetchingStep> TSpecialReadContext current = current->AttachNext(std::make_shared<TSnapshotFilter>()); } current = current->AttachNext(std::make_shared<TAssemblerStep>(PKColumns)); - if (!ReadMetadata->GetPKRangesFilter().IsEmpty()) { + if (partialUsageByPredicate) { current = current->AttachNext(std::make_shared<TPredicateFilter>()); } const TColumnsSet columnsFetchEF = columnsFetch - *SpecColumns - *PKColumns; @@ -95,7 +104,7 @@ std::shared_ptr<NKikimr::NOlap::NPlainReader::IFetchingStep> TSpecialReadContext } current = current->AttachNext(std::make_shared<TFilterProgramStep>(i)); } - const TColumnsSet columnsAdditionalFetch = *FFColumns - *EFColumns - *SpecColumns - *PKColumns; + const TColumnsSet columnsAdditionalFetch = *FFColumns - *EFColumns - *SpecColumns - *PKColumns - *PredicateColumns; if (columnsAdditionalFetch.GetColumnsCount()) { current = current->AttachNext(std::make_shared<TBlobsFetchingStep>(std::make_shared<TColumnsSet>(columnsAdditionalFetch))); current = current->AttachNext(std::make_shared<TAssemblerStep>(std::make_shared<TColumnsSet>(columnsAdditionalFetch))); @@ -115,6 +124,14 @@ TSpecialReadContext::TSpecialReadContext(const std::shared_ptr<TReadContext>& co SpecColumns = std::make_shared<TColumnsSet>(TIndexInfo::GetSpecialColumnIdsSet(), ReadMetadata->GetIndexInfo(), readSchema); IndexChecker = ReadMetadata->GetProgram().GetIndexChecker(); { + auto predicateColumns = ReadMetadata->GetPKRangesFilter().GetColumnIds(ReadMetadata->GetIndexInfo()); + if (predicateColumns.size()) { + PredicateColumns = std::make_shared<TColumnsSet>(predicateColumns, ReadMetadata->GetIndexInfo(), readSchema); + } else { + PredicateColumns = std::make_shared<TColumnsSet>(); + } + } + { auto efColumns = ReadMetadata->GetEarlyFilterColumnIds(); if (efColumns.size()) { EFColumns = std::make_shared<TColumnsSet>(efColumns, ReadMetadata->GetIndexInfo(), readSchema); @@ -144,10 +161,14 @@ TSpecialReadContext::TSpecialReadContext(const std::shared_ptr<TReadContext>& co MergeColumns = std::make_shared<TColumnsSet>(*PKColumns + *SpecColumns); AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("columns_context_info", DebugString()); - CacheFetchingScripts[0][0] = BuildColumnsFetchingPlan(false, false); - CacheFetchingScripts[0][1] = BuildColumnsFetchingPlan(false, true); - CacheFetchingScripts[1][0] = BuildColumnsFetchingPlan(true, false); - CacheFetchingScripts[1][1] = BuildColumnsFetchingPlan(true, true); + CacheFetchingScripts[0][0][0] = BuildColumnsFetchingPlan(false, false, false); + CacheFetchingScripts[0][1][0] = BuildColumnsFetchingPlan(false, true, false); + CacheFetchingScripts[1][0][0] = BuildColumnsFetchingPlan(true, false, false); + CacheFetchingScripts[1][1][0] = BuildColumnsFetchingPlan(true, true, false); + CacheFetchingScripts[0][0][1] = BuildColumnsFetchingPlan(false, false, true); + CacheFetchingScripts[0][1][1] = BuildColumnsFetchingPlan(false, true, true); + CacheFetchingScripts[1][0][1] = BuildColumnsFetchingPlan(true, false, true); + CacheFetchingScripts[1][1][1] = BuildColumnsFetchingPlan(true, true, true); } } diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/context.h b/ydb/core/tx/columnshard/engines/reader/plain_reader/context.h index a8b727a5385..17d52413e39 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/context.h +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/context.h @@ -15,6 +15,7 @@ private: YDB_READONLY_DEF(std::shared_ptr<TColumnsSet>, SpecColumns); YDB_READONLY_DEF(std::shared_ptr<TColumnsSet>, MergeColumns); YDB_READONLY_DEF(std::shared_ptr<TColumnsSet>, EFColumns); + YDB_READONLY_DEF(std::shared_ptr<TColumnsSet>, PredicateColumns); YDB_READONLY_DEF(std::shared_ptr<TColumnsSet>, PKColumns); YDB_READONLY_DEF(std::shared_ptr<TColumnsSet>, FFColumns); YDB_READONLY_DEF(std::shared_ptr<TColumnsSet>, ProgramInputColumns); @@ -25,8 +26,8 @@ private: std::shared_ptr<TColumnsSet> PKFFColumns; std::shared_ptr<TColumnsSet> EFPKColumns; std::shared_ptr<TColumnsSet> FFMinusEFColumns; - std::shared_ptr<IFetchingStep> BuildColumnsFetchingPlan(const bool needSnapshotsFilter, const bool exclusiveSource) const; - std::array<std::array<std::shared_ptr<IFetchingStep>, 2>, 2> CacheFetchingScripts; + std::shared_ptr<IFetchingStep> BuildColumnsFetchingPlan(const bool needSnapshotsFilter, const bool exclusiveSource, const bool partialUsageByPredicate) const; + std::array<std::array<std::array<std::shared_ptr<IFetchingStep>, 2>, 2>, 2> CacheFetchingScripts; public: ui64 GetMemoryForSources(const std::map<ui32, std::shared_ptr<IDataSource>>& sources, const bool isExclusive); diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/plain_read_data.cpp b/ydb/core/tx/columnshard/engines/reader/plain_reader/plain_read_data.cpp index 0720416cddb..26b397fe2e3 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/plain_read_data.cpp +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/plain_read_data.cpp @@ -33,15 +33,10 @@ TPlainReadData::TPlainReadData(const std::shared_ptr<NOlap::TReadContext>& conte } else { insertedPortionsBytes += (*itPortion)->BlobsBytes(); } - auto start = GetReadMetadata()->BuildSortedPosition((*itPortion)->IndexKeyStart()); - auto finish = GetReadMetadata()->BuildSortedPosition((*itPortion)->IndexKeyEnd()); - AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "portions_for_merge")("start", start.DebugJson())("finish", finish.DebugJson()); - sources.emplace_back(std::make_shared<TPortionDataSource>(sourceIdx++, *itPortion, SpecialReadContext, start, finish)); + sources.emplace_back(std::make_shared<TPortionDataSource>(sourceIdx++, *itPortion, SpecialReadContext, (*itPortion)->IndexKeyStart(), (*itPortion)->IndexKeyEnd())); ++itPortion; } else { - auto start = GetReadMetadata()->BuildSortedPosition(itCommitted->GetFirstVerified()); - auto finish = GetReadMetadata()->BuildSortedPosition(itCommitted->GetLastVerified()); - sources.emplace_back(std::make_shared<TCommittedDataSource>(sourceIdx++, *itCommitted, SpecialReadContext, start, finish)); + sources.emplace_back(std::make_shared<TCommittedDataSource>(sourceIdx++, *itCommitted, SpecialReadContext, itCommitted->GetFirstVerified(), itCommitted->GetLastVerified())); committedPortionsBytes += itCommitted->GetSize(); ++itCommitted; } diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/source.h b/ydb/core/tx/columnshard/engines/reader/plain_reader/source.h index f339312ecfd..3cce8e16eef 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/source.h +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/source.h @@ -26,6 +26,8 @@ private: YDB_READONLY(ui32, SourceIdx, 0); YDB_READONLY_DEF(NIndexedReader::TSortableBatchPosition, Start); YDB_READONLY_DEF(NIndexedReader::TSortableBatchPosition, Finish); + NArrow::TReplaceKey StartReplaceKey; + NArrow::TReplaceKey FinishReplaceKey; YDB_READONLY_DEF(std::shared_ptr<TSpecialReadContext>, Context); YDB_READONLY(TSnapshot, RecordSnapshotMax, TSnapshot::Zero()); std::optional<ui32> RecordsCount; @@ -52,6 +54,13 @@ protected: virtual void DoAbort() = 0; virtual void DoApplyIndex(const NIndexes::TIndexCheckerContainer& indexMeta) = 0; public: + const NArrow::TReplaceKey& GetStartReplaceKey() const { + return StartReplaceKey; + } + const NArrow::TReplaceKey& GetFinishReplaceKey() const { + return FinishReplaceKey; + } + const TFetchedResult& GetStageResult() const { AFL_VERIFY(!!StageResult); return *StageResult; @@ -147,16 +156,19 @@ public: void RegisterInterval(TFetchingInterval& interval); IDataSource(const ui32 sourceIdx, const std::shared_ptr<TSpecialReadContext>& context, - const NIndexedReader::TSortableBatchPosition& start, const NIndexedReader::TSortableBatchPosition& finish, + const NArrow::TReplaceKey& start, const NArrow::TReplaceKey& finish, const TSnapshot& recordSnapshotMax, const std::optional<ui32> recordsCount ) : SourceIdx(sourceIdx) - , Start(start) - , Finish(finish) + , Start(context->GetReadMetadata()->BuildSortedPosition(start)) + , Finish(context->GetReadMetadata()->BuildSortedPosition(finish)) + , StartReplaceKey(start) + , FinishReplaceKey(finish) , Context(context) , RecordSnapshotMax(recordSnapshotMax) , RecordsCount(recordsCount) { + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "portions_for_merge")("start", Start.DebugJson())("finish", Finish.DebugJson()); if (Start.IsReverseSort()) { std::swap(Start, Finish); } @@ -210,10 +222,10 @@ public: } TPortionDataSource(const ui32 sourceIdx, const std::shared_ptr<TPortionInfo>& portion, const std::shared_ptr<TSpecialReadContext>& context, - const NIndexedReader::TSortableBatchPosition& start, const NIndexedReader::TSortableBatchPosition& finish) + const NArrow::TReplaceKey& start, const NArrow::TReplaceKey& finish) : TBase(sourceIdx, context, start, finish, portion->RecordSnapshotMax(), portion->GetRecordsCount()) - , Portion(portion) { - + , Portion(portion) + { } }; @@ -256,7 +268,7 @@ public: } TCommittedDataSource(const ui32 sourceIdx, const TCommittedBlob& committed, const std::shared_ptr<TSpecialReadContext>& context, - const NIndexedReader::TSortableBatchPosition& start, const NIndexedReader::TSortableBatchPosition& finish) + const NArrow::TReplaceKey& start, const NArrow::TReplaceKey& finish) : TBase(sourceIdx, context, start, finish, committed.GetSnapshot(), {}) , CommittedBlob(committed) { diff --git a/ydb/core/tx/columnshard/engines/reader/read_metadata.cpp b/ydb/core/tx/columnshard/engines/reader/read_metadata.cpp index 988d6553b32..e9a9e804977 100644 --- a/ydb/core/tx/columnshard/engines/reader/read_metadata.cpp +++ b/ydb/core/tx/columnshard/engines/reader/read_metadata.cpp @@ -48,7 +48,7 @@ bool TReadMetadata::Init(const TReadDescription& readDescription, const TDataSto std::set<ui32> TReadMetadata::GetEarlyFilterColumnIds() const { auto& indexInfo = ResultIndexSchema->GetIndexInfo(); - std::set<ui32> result = GetPKRangesFilter().GetColumnIds(indexInfo); + std::set<ui32> result; for (auto&& i : GetProgram().GetEarlyFilterColumns()) { auto id = indexInfo.GetColumnIdOptional(i); if (id) { |