diff options
author | chertus <azuikov@ydb.tech> | 2023-03-27 18:55:33 +0300 |
---|---|---|
committer | chertus <azuikov@ydb.tech> | 2023-03-27 18:55:33 +0300 |
commit | 66749428b482d3e03a59b7bfa4eef36144b804e6 (patch) | |
tree | 12db5ae04432fab884e139e5ef1b79b229d130f5 | |
parent | 0d8a9f6e68724cbe829c3e06bc810184f1adc996 (diff) | |
download | ydb-66749428b482d3e03a59b7bfa4eef36144b804e6.tar.gz |
early filter optimization
-rw-r--r-- | ydb/core/formats/merging_sorted_input_stream.cpp | 9 | ||||
-rw-r--r-- | ydb/core/formats/program.cpp | 47 | ||||
-rw-r--r-- | ydb/core/formats/program.h | 5 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/engines/filter.cpp | 11 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/engines/filter.h | 1 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/engines/indexed_read_data.cpp | 13 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/engines/portion_info.h | 5 |
7 files changed, 77 insertions, 14 deletions
diff --git a/ydb/core/formats/merging_sorted_input_stream.cpp b/ydb/core/formats/merging_sorted_input_stream.cpp index 2e2b5f7d42..e5b1ea3b03 100644 --- a/ydb/core/formats/merging_sorted_input_stream.cpp +++ b/ydb/core/formats/merging_sorted_input_stream.cpp @@ -122,6 +122,7 @@ TMergingSortedInputStream::TMergingSortedInputStream(const std::vector<IInputStr void TMergingSortedInputStream::Init() { Y_VERIFY(First); First = false; + size_t totalRows = 0; for (size_t i = 0; i < SourceBatches.size(); ++i) { auto& batch = SourceBatches[i]; @@ -134,14 +135,12 @@ void TMergingSortedInputStream::Init() { continue; } - const size_t rows = batch->num_rows(); - if (ExpectedBatchSize < rows) { - ExpectedBatchSize = MaxBatchSize ? std::min(rows, MaxBatchSize) : rows; - } - + totalRows += batch->num_rows(); Cursors[i] = TSortCursorImpl(batch, Description, i); } + ExpectedBatchSize = MaxBatchSize ? std::min(totalRows, MaxBatchSize) : totalRows; + Queue = TSortingHeap(Cursors, Description->NotNull); /// Let's check that all source blocks have the same structure. diff --git a/ydb/core/formats/program.cpp b/ydb/core/formats/program.cpp index 420955a610..3c9fcda4b1 100644 --- a/ydb/core/formats/program.cpp +++ b/ydb/core/formats/program.cpp @@ -608,10 +608,7 @@ arrow::Status TProgramStep::ApplyAggregates( return arrow::Status::OK(); } -arrow::Status TProgramStep::ApplyFilters(TDatumBatch& batch) const { - if (Filters.empty()) { - return arrow::Status::OK(); - } +arrow::Status TProgramStep::MakeCombinedFilter(TDatumBatch& batch, std::vector<bool>& bits) const { std::vector<std::vector<bool>> filters; filters.reserve(Filters.size()); for (auto& colName : Filters) { @@ -631,11 +628,22 @@ arrow::Status TProgramStep::ApplyFilters(TDatumBatch& batch) const { } } - std::vector<bool> bits; for (auto& f : filters) { bits = NArrow::CombineFilters(std::move(bits), std::move(f)); } + return arrow::Status::OK(); +} +arrow::Status TProgramStep::ApplyFilters(TDatumBatch& batch) const { + if (Filters.empty()) { + return arrow::Status::OK(); + } + + std::vector<bool> bits; + auto status = MakeCombinedFilter(batch, bits); + if (!status.ok()) { + return status; + } if (bits.size()) { auto filter = NArrow::MakeFilter(bits); @@ -748,4 +756,33 @@ arrow::Status TProgramStep::Apply(std::shared_ptr<arrow::RecordBatch>& batch, ar return arrow::Status::OK(); } +std::vector<bool> TProgram::MakeEarlyFilter(const std::shared_ptr<arrow::RecordBatch>& srcBatch, + arrow::compute::ExecContext* ctx) const +{ + try { + if (Steps.empty()) { + return {}; + } + auto& step = Steps[0]; + if (step->Filters.empty()) { + return {}; + } + + auto batch = srcBatch; + auto rb = TProgramStep::TDatumBatch::FromRecordBatch(batch); + + if (!step->ApplyAssignes(*rb, ctx).ok()) { + return {}; + } + std::vector<bool> filter; + if (!step->MakeCombinedFilter(*rb, filter).ok()) { + return {}; + } + return filter; + } catch (const std::exception& ex) { + return {}; + } + return {}; +} + } diff --git a/ydb/core/formats/program.h b/ydb/core/formats/program.h index f9cdfa8bfd..2c246415ad 100644 --- a/ydb/core/formats/program.h +++ b/ydb/core/formats/program.h @@ -202,6 +202,8 @@ struct TProgramStep { arrow::Status ApplyFilters(TDatumBatch& batch) const; arrow::Status ApplyProjection(std::shared_ptr<arrow::RecordBatch>& batch) const; arrow::Status ApplyProjection(TDatumBatch& batch) const; + + arrow::Status MakeCombinedFilter(TDatumBatch& batch, std::vector<bool>& bits) const; }; struct TProgram { @@ -227,6 +229,9 @@ struct TProgram { } return arrow::Status::OK(); } + + std::vector<bool> MakeEarlyFilter(const std::shared_ptr<arrow::RecordBatch>& batch, + arrow::compute::ExecContext* ctx) const; }; inline arrow::Status ApplyProgram( diff --git a/ydb/core/tx/columnshard/engines/filter.cpp b/ydb/core/tx/columnshard/engines/filter.cpp index 8b6822700f..43fb510c65 100644 --- a/ydb/core/tx/columnshard/engines/filter.cpp +++ b/ydb/core/tx/columnshard/engines/filter.cpp @@ -2,6 +2,7 @@ #include "filter.h" #include "indexed_read_data.h" #include <ydb/core/formats/arrow_helpers.h> +#include <ydb/core/formats/custom_registry.h> namespace NKikimr::NOlap { @@ -138,8 +139,7 @@ TFilteredBatch FilterPortion(const std::shared_ptr<arrow::RecordBatch>& portion, return TFilteredBatch{portion, filter}; } -TFilteredBatch FilterNotIndexed(const std::shared_ptr<arrow::RecordBatch>& batch, const TReadMetadata& readMetadata) -{ +TFilteredBatch FilterNotIndexed(const std::shared_ptr<arrow::RecordBatch>& batch, const TReadMetadata& readMetadata) { std::vector<bool> less; if (readMetadata.LessPredicate) { Y_VERIFY(NArrow::HasAllColumns(batch, readMetadata.LessPredicate->Batch->schema())); @@ -166,6 +166,13 @@ TFilteredBatch FilterNotIndexed(const std::shared_ptr<arrow::RecordBatch>& batch return TFilteredBatch{batch, filter}; } +TFilteredBatch EarlyFilter(const std::shared_ptr<arrow::RecordBatch>& batch, std::shared_ptr<NSsa::TProgram> ssa) { + return TFilteredBatch{ + .Batch = batch, + .Filter = ssa->MakeEarlyFilter(batch, NArrow::GetCustomExecContext()) + }; +} + void ReplaceDupKeys(std::shared_ptr<arrow::RecordBatch>& batch, const std::shared_ptr<arrow::Schema>& replaceSchema, bool lastWins) { THashSet<NArrow::TReplaceKey> replaces; diff --git a/ydb/core/tx/columnshard/engines/filter.h b/ydb/core/tx/columnshard/engines/filter.h index 9649808e75..ddc5e4dbf4 100644 --- a/ydb/core/tx/columnshard/engines/filter.h +++ b/ydb/core/tx/columnshard/engines/filter.h @@ -35,5 +35,6 @@ void ReplaceDupKeys(std::shared_ptr<arrow::RecordBatch>& batch, struct TReadMetadata; TFilteredBatch FilterPortion(const std::shared_ptr<arrow::RecordBatch>& batch, const TReadMetadata& readMetadata); TFilteredBatch FilterNotIndexed(const std::shared_ptr<arrow::RecordBatch>& batch, const TReadMetadata& readMetadata); +TFilteredBatch EarlyFilter(const std::shared_ptr<arrow::RecordBatch>& batch, std::shared_ptr<NSsa::TProgram> ssa); } diff --git a/ydb/core/tx/columnshard/engines/indexed_read_data.cpp b/ydb/core/tx/columnshard/engines/indexed_read_data.cpp index ea616f8e4b..993379169f 100644 --- a/ydb/core/tx/columnshard/engines/indexed_read_data.cpp +++ b/ydb/core/tx/columnshard/engines/indexed_read_data.cpp @@ -95,7 +95,7 @@ std::vector<std::shared_ptr<arrow::RecordBatch>> SpecialMergeSorted(const std::v } continue; } -#if 1 +#if 0 // optimization auto deduped = SliceSortedBatches(slices, description); for (auto& batch : deduped) { if (batch && batch->num_rows()) { @@ -257,6 +257,15 @@ std::shared_ptr<arrow::RecordBatch> TIndexedReadData::AssembleIndexedBatch(ui32 Y_VERIFY(filtered.Valid()); filtered.ApplyFilter(); } +#if 1 // optimization + if (ReadMetadata->Program && portionInfo.AllowEarlyFilter()) { + filtered = NOlap::EarlyFilter(filtered.Batch, ReadMetadata->Program); + } + if (filtered.Batch) { + Y_VERIFY(filtered.Valid()); + filtered.ApplyFilter(); + } +#endif return filtered.Batch; } @@ -420,7 +429,7 @@ std::vector<std::vector<std::shared_ptr<arrow::RecordBatch>>> TIndexedReadData:: auto deduped = SpecialMergeSorted(inGranule, IndexInfo(), SortReplaceDescription, BatchesToDedup); out.emplace_back(std::move(deduped)); #else - out.push_back(); + out.push_back({}); out.back().emplace_back(CombineSortedBatches(inGranule, SortReplaceDescription)); #endif } else { diff --git a/ydb/core/tx/columnshard/engines/portion_info.h b/ydb/core/tx/columnshard/engines/portion_info.h index a37d5ea4ac..0c9123a454 100644 --- a/ydb/core/tx/columnshard/engines/portion_info.h +++ b/ydb/core/tx/columnshard/engines/portion_info.h @@ -57,6 +57,11 @@ struct TPortionInfo { bool CanIntersectOthers() const { return !Valid() || IsInserted(); } size_t NumRecords() const { return Records.size(); } + bool AllowEarlyFilter() const { + return Meta.Produced == TPortionMeta::COMPACTED + || Meta.Produced == TPortionMeta::SPLIT_COMPACTED; + } + bool EvictReady(size_t hotSize) const { return Meta.Produced == TPortionMeta::COMPACTED || Meta.Produced == TPortionMeta::SPLIT_COMPACTED |