aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorivanmorozov333 <111685085+ivanmorozov333@users.noreply.github.com>2024-02-01 18:09:58 +0300
committerGitHub <noreply@github.com>2024-02-01 18:09:58 +0300
commit160238c33a4ed5bd4e088b7615690b6536a59ecf (patch)
tree919bc84ae808895f24ea57ad18d11acf2f10f859
parent320f5201bac519d51b66e15b6e83c0ae38e69b44 (diff)
downloadydb-160238c33a4ed5bd4e088b7615690b6536a59ecf.tar.gz
optimizer predicates usage on fetching (#1505)
* dont use predicate columns fetching and filter usage in case whole portion matched in predicates ranges * fix * fix
-rw-r--r--ydb/core/tx/columnshard/engines/predicate/filter.cpp9
-rw-r--r--ydb/core/tx/columnshard/engines/predicate/filter.h1
-rw-r--r--ydb/core/tx/columnshard/engines/predicate/range.cpp31
-rw-r--r--ydb/core/tx/columnshard/engines/predicate/range.h1
-rw-r--r--ydb/core/tx/columnshard/engines/reader/plain_reader/context.cpp45
-rw-r--r--ydb/core/tx/columnshard/engines/reader/plain_reader/context.h5
-rw-r--r--ydb/core/tx/columnshard/engines/reader/plain_reader/plain_read_data.cpp9
-rw-r--r--ydb/core/tx/columnshard/engines/reader/plain_reader/source.h26
-rw-r--r--ydb/core/tx/columnshard/engines/reader/read_metadata.cpp2
9 files changed, 100 insertions, 29 deletions
diff --git a/ydb/core/tx/columnshard/engines/predicate/filter.cpp b/ydb/core/tx/columnshard/engines/predicate/filter.cpp
index 77816507e0c..10d66a832c1 100644
--- a/ydb/core/tx/columnshard/engines/predicate/filter.cpp
+++ b/ydb/core/tx/columnshard/engines/predicate/filter.cpp
@@ -84,6 +84,15 @@ bool TPKRangesFilter::IsPortionInUsage(const TPortionInfo& info, const TIndexInf
return SortedRanges.empty();
}
+bool TPKRangesFilter::IsPortionInPartialUsage(const NArrow::TReplaceKey& start, const NArrow::TReplaceKey& end, const TIndexInfo& indexInfo) const {
+ for (auto&& i : SortedRanges) {
+ if (i.IsPortionInPartialUsage(start, end, indexInfo)) {
+ return true;
+ }
+ }
+ return false;
+}
+
TPKRangesFilter::TPKRangesFilter(const bool reverse)
: ReverseFlag(reverse)
{
diff --git a/ydb/core/tx/columnshard/engines/predicate/filter.h b/ydb/core/tx/columnshard/engines/predicate/filter.h
index d9448d43808..6d30adfdc22 100644
--- a/ydb/core/tx/columnshard/engines/predicate/filter.h
+++ b/ydb/core/tx/columnshard/engines/predicate/filter.h
@@ -38,6 +38,7 @@ public:
}
bool IsPortionInUsage(const TPortionInfo& info, const TIndexInfo& indexInfo) const;
+ bool IsPortionInPartialUsage(const NArrow::TReplaceKey& start, const NArrow::TReplaceKey& end, const TIndexInfo& indexInfo) const;
NArrow::TColumnFilter BuildFilter(const arrow::Datum& data) const;
diff --git a/ydb/core/tx/columnshard/engines/predicate/range.cpp b/ydb/core/tx/columnshard/engines/predicate/range.cpp
index 53e10174b82..1025fa055ca 100644
--- a/ydb/core/tx/columnshard/engines/predicate/range.cpp
+++ b/ydb/core/tx/columnshard/engines/predicate/range.cpp
@@ -59,6 +59,37 @@ bool TPKRangeFilter::IsPortionInUsage(const TPortionInfo& info, const TIndexInfo
return true;
}
+bool TPKRangeFilter::IsPortionInPartialUsage(const NArrow::TReplaceKey& start, const NArrow::TReplaceKey& end, const TIndexInfo& indexInfo) const {
+ bool startUsage = false;
+ bool endUsage = false;
+ if (auto from = PredicateFrom.ExtractKey(indexInfo.GetPrimaryKey())) {
+ AFL_VERIFY(from->Size() <= start.Size());
+ if (PredicateFrom.IsInclude()) {
+ startUsage = std::is_lt(start.ComparePartNotNull(*from, from->Size()));
+ } else {
+ startUsage = std::is_lteq(start.ComparePartNotNull(*from, from->Size()));
+ }
+ } else {
+ startUsage = true;
+ }
+
+ if (auto to = PredicateTo.ExtractKey(indexInfo.GetPrimaryKey())) {
+ AFL_VERIFY(to->Size() <= end.Size());
+ if (PredicateTo.IsInclude()) {
+ endUsage = std::is_gt(end.ComparePartNotNull(*to, to->Size()));
+ } else {
+ endUsage = std::is_gteq(end.ComparePartNotNull(*to, to->Size()));
+ }
+ } else {
+ endUsage = true;
+ }
+
+// AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("start", start.DebugString())("end", end.DebugString())("from", PredicateFrom.DebugString())("to", PredicateTo.DebugString())
+// ("start_usage", startUsage)("end_usage", endUsage);
+
+ return endUsage || startUsage;
+}
+
std::optional<NKikimr::NOlap::TPKRangeFilter> TPKRangeFilter::Build(TPredicateContainer&& from, TPredicateContainer&& to) {
if (!from.CrossRanges(to)) {
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "cannot_build_predicate_range")("error", "predicates from/to not intersected");
diff --git a/ydb/core/tx/columnshard/engines/predicate/range.h b/ydb/core/tx/columnshard/engines/predicate/range.h
index 7e23da4f13d..ff84f35408a 100644
--- a/ydb/core/tx/columnshard/engines/predicate/range.h
+++ b/ydb/core/tx/columnshard/engines/predicate/range.h
@@ -41,6 +41,7 @@ public:
NArrow::TColumnFilter BuildFilter(const arrow::Datum& data) const;
bool IsPortionInUsage(const TPortionInfo& info, const TIndexInfo& indexInfo) const;
+ bool IsPortionInPartialUsage(const NArrow::TReplaceKey& start, const NArrow::TReplaceKey& end, const TIndexInfo& indexInfo) const;
std::set<ui32> GetColumnIds(const TIndexInfo& indexInfo) const;
TString DebugString() const;
diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/context.cpp b/ydb/core/tx/columnshard/engines/reader/plain_reader/context.cpp
index cf07daf49f8..6476e426c38 100644
--- a/ydb/core/tx/columnshard/engines/reader/plain_reader/context.cpp
+++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/context.cpp
@@ -19,21 +19,23 @@ ui64 TSpecialReadContext::GetMemoryForSources(const std::map<ui32, std::shared_p
std::shared_ptr<NKikimr::NOlap::NPlainReader::IFetchingStep> TSpecialReadContext::GetColumnsFetchingPlan(const std::shared_ptr<IDataSource>& source, const bool exclusiveSource) const {
const bool needSnapshots = !exclusiveSource || ReadMetadata->GetSnapshot() < source->GetRecordSnapshotMax();
- auto result = CacheFetchingScripts[needSnapshots ? 1 : 0][exclusiveSource ? 1 : 0];
+ const bool partialUsageByPK = ReadMetadata->GetPKRangesFilter().IsPortionInPartialUsage(source->GetStartReplaceKey(), source->GetFinishReplaceKey(), ReadMetadata->GetIndexInfo());
+ auto result = CacheFetchingScripts[needSnapshots ? 1 : 0][exclusiveSource ? 1 : 0][partialUsageByPK ? 1 : 0];
if (!result) {
return std::make_shared<TBuildFakeSpec>(source->GetRecordsCount(), "fake");
}
return result;
}
-std::shared_ptr<NKikimr::NOlap::NPlainReader::IFetchingStep> TSpecialReadContext::BuildColumnsFetchingPlan(const bool needSnapshots, const bool exclusiveSource) const {
+std::shared_ptr<NKikimr::NOlap::NPlainReader::IFetchingStep> TSpecialReadContext::BuildColumnsFetchingPlan(const bool needSnapshots, const bool exclusiveSource, const bool partialUsageByPredicateExt) const {
std::shared_ptr<IFetchingStep> result = std::make_shared<TFakeStep>();
std::shared_ptr<IFetchingStep> current = result;
+ const bool partialUsageByPredicate = partialUsageByPredicateExt && PredicateColumns->GetColumnsCount();
if (!!IndexChecker) {
current = current->AttachNext(std::make_shared<TBlobsFetchingStep>(std::make_shared<TIndexesSet>(IndexChecker->GetIndexIds())));
current = current->AttachNext(std::make_shared<TApplyIndexStep>(IndexChecker));
}
- if (!EFColumns->GetColumnsCount()) {
+ if (!EFColumns->GetColumnsCount() && !partialUsageByPredicate) {
TColumnsSet columnsFetch = *FFColumns;
if (needSnapshots) {
columnsFetch = columnsFetch + *SpecColumns;
@@ -52,6 +54,9 @@ std::shared_ptr<NKikimr::NOlap::NPlainReader::IFetchingStep> TSpecialReadContext
if (needSnapshots || FFColumns->Contains(SpecColumns)) {
columnsFetch = columnsFetch + *SpecColumns;
}
+ if (partialUsageByPredicate) {
+ columnsFetch = columnsFetch + *PredicateColumns;
+ }
AFL_VERIFY(columnsFetch.GetColumnsCount());
current = current->AttachNext(std::make_shared<TBlobsFetchingStep>(std::make_shared<TColumnsSet>(columnsFetch), "ef"));
@@ -60,9 +65,13 @@ std::shared_ptr<NKikimr::NOlap::NPlainReader::IFetchingStep> TSpecialReadContext
current = current->AttachNext(std::make_shared<TSnapshotFilter>());
columnsFetch = columnsFetch - *SpecColumns;
}
- current = current->AttachNext(std::make_shared<TAssemblerStep>(std::make_shared<TColumnsSet>(columnsFetch)));
- if (!ReadMetadata->GetPKRangesFilter().IsEmpty()) {
+ if (partialUsageByPredicate) {
+ current = current->AttachNext(std::make_shared<TAssemblerStep>(PredicateColumns));
current = current->AttachNext(std::make_shared<TPredicateFilter>());
+ columnsFetch = columnsFetch - *PredicateColumns;
+ }
+ if (columnsFetch.GetColumnsCount()) {
+ current = current->AttachNext(std::make_shared<TAssemblerStep>(std::make_shared<TColumnsSet>(columnsFetch)));
}
for (auto&& i : ReadMetadata->GetProgram().GetSteps()) {
if (!i->IsFilterOnly()) {
@@ -70,7 +79,7 @@ std::shared_ptr<NKikimr::NOlap::NPlainReader::IFetchingStep> TSpecialReadContext
}
current = current->AttachNext(std::make_shared<TFilterProgramStep>(i));
}
- const TColumnsSet columnsAdditionalFetch = *FFColumns - *EFColumns - *SpecColumns;
+ const TColumnsSet columnsAdditionalFetch = *FFColumns - *EFColumns - *SpecColumns - *PredicateColumns;
if (columnsAdditionalFetch.GetColumnsCount()) {
current = current->AttachNext(std::make_shared<TBlobsFetchingStep>(std::make_shared<TColumnsSet>(columnsAdditionalFetch)));
current = current->AttachNext(std::make_shared<TAssemblerStep>(std::make_shared<TColumnsSet>(columnsAdditionalFetch)));
@@ -84,7 +93,7 @@ std::shared_ptr<NKikimr::NOlap::NPlainReader::IFetchingStep> TSpecialReadContext
current = current->AttachNext(std::make_shared<TSnapshotFilter>());
}
current = current->AttachNext(std::make_shared<TAssemblerStep>(PKColumns));
- if (!ReadMetadata->GetPKRangesFilter().IsEmpty()) {
+ if (partialUsageByPredicate) {
current = current->AttachNext(std::make_shared<TPredicateFilter>());
}
const TColumnsSet columnsFetchEF = columnsFetch - *SpecColumns - *PKColumns;
@@ -95,7 +104,7 @@ std::shared_ptr<NKikimr::NOlap::NPlainReader::IFetchingStep> TSpecialReadContext
}
current = current->AttachNext(std::make_shared<TFilterProgramStep>(i));
}
- const TColumnsSet columnsAdditionalFetch = *FFColumns - *EFColumns - *SpecColumns - *PKColumns;
+ const TColumnsSet columnsAdditionalFetch = *FFColumns - *EFColumns - *SpecColumns - *PKColumns - *PredicateColumns;
if (columnsAdditionalFetch.GetColumnsCount()) {
current = current->AttachNext(std::make_shared<TBlobsFetchingStep>(std::make_shared<TColumnsSet>(columnsAdditionalFetch)));
current = current->AttachNext(std::make_shared<TAssemblerStep>(std::make_shared<TColumnsSet>(columnsAdditionalFetch)));
@@ -115,6 +124,14 @@ TSpecialReadContext::TSpecialReadContext(const std::shared_ptr<TReadContext>& co
SpecColumns = std::make_shared<TColumnsSet>(TIndexInfo::GetSpecialColumnIdsSet(), ReadMetadata->GetIndexInfo(), readSchema);
IndexChecker = ReadMetadata->GetProgram().GetIndexChecker();
{
+ auto predicateColumns = ReadMetadata->GetPKRangesFilter().GetColumnIds(ReadMetadata->GetIndexInfo());
+ if (predicateColumns.size()) {
+ PredicateColumns = std::make_shared<TColumnsSet>(predicateColumns, ReadMetadata->GetIndexInfo(), readSchema);
+ } else {
+ PredicateColumns = std::make_shared<TColumnsSet>();
+ }
+ }
+ {
auto efColumns = ReadMetadata->GetEarlyFilterColumnIds();
if (efColumns.size()) {
EFColumns = std::make_shared<TColumnsSet>(efColumns, ReadMetadata->GetIndexInfo(), readSchema);
@@ -144,10 +161,14 @@ TSpecialReadContext::TSpecialReadContext(const std::shared_ptr<TReadContext>& co
MergeColumns = std::make_shared<TColumnsSet>(*PKColumns + *SpecColumns);
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("columns_context_info", DebugString());
- CacheFetchingScripts[0][0] = BuildColumnsFetchingPlan(false, false);
- CacheFetchingScripts[0][1] = BuildColumnsFetchingPlan(false, true);
- CacheFetchingScripts[1][0] = BuildColumnsFetchingPlan(true, false);
- CacheFetchingScripts[1][1] = BuildColumnsFetchingPlan(true, true);
+ CacheFetchingScripts[0][0][0] = BuildColumnsFetchingPlan(false, false, false);
+ CacheFetchingScripts[0][1][0] = BuildColumnsFetchingPlan(false, true, false);
+ CacheFetchingScripts[1][0][0] = BuildColumnsFetchingPlan(true, false, false);
+ CacheFetchingScripts[1][1][0] = BuildColumnsFetchingPlan(true, true, false);
+ CacheFetchingScripts[0][0][1] = BuildColumnsFetchingPlan(false, false, true);
+ CacheFetchingScripts[0][1][1] = BuildColumnsFetchingPlan(false, true, true);
+ CacheFetchingScripts[1][0][1] = BuildColumnsFetchingPlan(true, false, true);
+ CacheFetchingScripts[1][1][1] = BuildColumnsFetchingPlan(true, true, true);
}
}
diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/context.h b/ydb/core/tx/columnshard/engines/reader/plain_reader/context.h
index a8b727a5385..17d52413e39 100644
--- a/ydb/core/tx/columnshard/engines/reader/plain_reader/context.h
+++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/context.h
@@ -15,6 +15,7 @@ private:
YDB_READONLY_DEF(std::shared_ptr<TColumnsSet>, SpecColumns);
YDB_READONLY_DEF(std::shared_ptr<TColumnsSet>, MergeColumns);
YDB_READONLY_DEF(std::shared_ptr<TColumnsSet>, EFColumns);
+ YDB_READONLY_DEF(std::shared_ptr<TColumnsSet>, PredicateColumns);
YDB_READONLY_DEF(std::shared_ptr<TColumnsSet>, PKColumns);
YDB_READONLY_DEF(std::shared_ptr<TColumnsSet>, FFColumns);
YDB_READONLY_DEF(std::shared_ptr<TColumnsSet>, ProgramInputColumns);
@@ -25,8 +26,8 @@ private:
std::shared_ptr<TColumnsSet> PKFFColumns;
std::shared_ptr<TColumnsSet> EFPKColumns;
std::shared_ptr<TColumnsSet> FFMinusEFColumns;
- std::shared_ptr<IFetchingStep> BuildColumnsFetchingPlan(const bool needSnapshotsFilter, const bool exclusiveSource) const;
- std::array<std::array<std::shared_ptr<IFetchingStep>, 2>, 2> CacheFetchingScripts;
+ std::shared_ptr<IFetchingStep> BuildColumnsFetchingPlan(const bool needSnapshotsFilter, const bool exclusiveSource, const bool partialUsageByPredicate) const;
+ std::array<std::array<std::array<std::shared_ptr<IFetchingStep>, 2>, 2>, 2> CacheFetchingScripts;
public:
ui64 GetMemoryForSources(const std::map<ui32, std::shared_ptr<IDataSource>>& sources, const bool isExclusive);
diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/plain_read_data.cpp b/ydb/core/tx/columnshard/engines/reader/plain_reader/plain_read_data.cpp
index 0720416cddb..26b397fe2e3 100644
--- a/ydb/core/tx/columnshard/engines/reader/plain_reader/plain_read_data.cpp
+++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/plain_read_data.cpp
@@ -33,15 +33,10 @@ TPlainReadData::TPlainReadData(const std::shared_ptr<NOlap::TReadContext>& conte
} else {
insertedPortionsBytes += (*itPortion)->BlobsBytes();
}
- auto start = GetReadMetadata()->BuildSortedPosition((*itPortion)->IndexKeyStart());
- auto finish = GetReadMetadata()->BuildSortedPosition((*itPortion)->IndexKeyEnd());
- AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "portions_for_merge")("start", start.DebugJson())("finish", finish.DebugJson());
- sources.emplace_back(std::make_shared<TPortionDataSource>(sourceIdx++, *itPortion, SpecialReadContext, start, finish));
+ sources.emplace_back(std::make_shared<TPortionDataSource>(sourceIdx++, *itPortion, SpecialReadContext, (*itPortion)->IndexKeyStart(), (*itPortion)->IndexKeyEnd()));
++itPortion;
} else {
- auto start = GetReadMetadata()->BuildSortedPosition(itCommitted->GetFirstVerified());
- auto finish = GetReadMetadata()->BuildSortedPosition(itCommitted->GetLastVerified());
- sources.emplace_back(std::make_shared<TCommittedDataSource>(sourceIdx++, *itCommitted, SpecialReadContext, start, finish));
+ sources.emplace_back(std::make_shared<TCommittedDataSource>(sourceIdx++, *itCommitted, SpecialReadContext, itCommitted->GetFirstVerified(), itCommitted->GetLastVerified()));
committedPortionsBytes += itCommitted->GetSize();
++itCommitted;
}
diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/source.h b/ydb/core/tx/columnshard/engines/reader/plain_reader/source.h
index f339312ecfd..3cce8e16eef 100644
--- a/ydb/core/tx/columnshard/engines/reader/plain_reader/source.h
+++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/source.h
@@ -26,6 +26,8 @@ private:
YDB_READONLY(ui32, SourceIdx, 0);
YDB_READONLY_DEF(NIndexedReader::TSortableBatchPosition, Start);
YDB_READONLY_DEF(NIndexedReader::TSortableBatchPosition, Finish);
+ NArrow::TReplaceKey StartReplaceKey;
+ NArrow::TReplaceKey FinishReplaceKey;
YDB_READONLY_DEF(std::shared_ptr<TSpecialReadContext>, Context);
YDB_READONLY(TSnapshot, RecordSnapshotMax, TSnapshot::Zero());
std::optional<ui32> RecordsCount;
@@ -52,6 +54,13 @@ protected:
virtual void DoAbort() = 0;
virtual void DoApplyIndex(const NIndexes::TIndexCheckerContainer& indexMeta) = 0;
public:
+ const NArrow::TReplaceKey& GetStartReplaceKey() const {
+ return StartReplaceKey;
+ }
+ const NArrow::TReplaceKey& GetFinishReplaceKey() const {
+ return FinishReplaceKey;
+ }
+
const TFetchedResult& GetStageResult() const {
AFL_VERIFY(!!StageResult);
return *StageResult;
@@ -147,16 +156,19 @@ public:
void RegisterInterval(TFetchingInterval& interval);
IDataSource(const ui32 sourceIdx, const std::shared_ptr<TSpecialReadContext>& context,
- const NIndexedReader::TSortableBatchPosition& start, const NIndexedReader::TSortableBatchPosition& finish,
+ const NArrow::TReplaceKey& start, const NArrow::TReplaceKey& finish,
const TSnapshot& recordSnapshotMax, const std::optional<ui32> recordsCount
)
: SourceIdx(sourceIdx)
- , Start(start)
- , Finish(finish)
+ , Start(context->GetReadMetadata()->BuildSortedPosition(start))
+ , Finish(context->GetReadMetadata()->BuildSortedPosition(finish))
+ , StartReplaceKey(start)
+ , FinishReplaceKey(finish)
, Context(context)
, RecordSnapshotMax(recordSnapshotMax)
, RecordsCount(recordsCount)
{
+ AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "portions_for_merge")("start", Start.DebugJson())("finish", Finish.DebugJson());
if (Start.IsReverseSort()) {
std::swap(Start, Finish);
}
@@ -210,10 +222,10 @@ public:
}
TPortionDataSource(const ui32 sourceIdx, const std::shared_ptr<TPortionInfo>& portion, const std::shared_ptr<TSpecialReadContext>& context,
- const NIndexedReader::TSortableBatchPosition& start, const NIndexedReader::TSortableBatchPosition& finish)
+ const NArrow::TReplaceKey& start, const NArrow::TReplaceKey& finish)
: TBase(sourceIdx, context, start, finish, portion->RecordSnapshotMax(), portion->GetRecordsCount())
- , Portion(portion) {
-
+ , Portion(portion)
+ {
}
};
@@ -256,7 +268,7 @@ public:
}
TCommittedDataSource(const ui32 sourceIdx, const TCommittedBlob& committed, const std::shared_ptr<TSpecialReadContext>& context,
- const NIndexedReader::TSortableBatchPosition& start, const NIndexedReader::TSortableBatchPosition& finish)
+ const NArrow::TReplaceKey& start, const NArrow::TReplaceKey& finish)
: TBase(sourceIdx, context, start, finish, committed.GetSnapshot(), {})
, CommittedBlob(committed) {
diff --git a/ydb/core/tx/columnshard/engines/reader/read_metadata.cpp b/ydb/core/tx/columnshard/engines/reader/read_metadata.cpp
index 988d6553b32..e9a9e804977 100644
--- a/ydb/core/tx/columnshard/engines/reader/read_metadata.cpp
+++ b/ydb/core/tx/columnshard/engines/reader/read_metadata.cpp
@@ -48,7 +48,7 @@ bool TReadMetadata::Init(const TReadDescription& readDescription, const TDataSto
std::set<ui32> TReadMetadata::GetEarlyFilterColumnIds() const {
auto& indexInfo = ResultIndexSchema->GetIndexInfo();
- std::set<ui32> result = GetPKRangesFilter().GetColumnIds(indexInfo);
+ std::set<ui32> result;
for (auto&& i : GetProgram().GetEarlyFilterColumns()) {
auto id = indexInfo.GetColumnIdOptional(i);
if (id) {