aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorchertus <azuikov@ydb.tech>2023-03-27 18:55:33 +0300
committerchertus <azuikov@ydb.tech>2023-03-27 18:55:33 +0300
commit66749428b482d3e03a59b7bfa4eef36144b804e6 (patch)
tree12db5ae04432fab884e139e5ef1b79b229d130f5
parent0d8a9f6e68724cbe829c3e06bc810184f1adc996 (diff)
downloadydb-66749428b482d3e03a59b7bfa4eef36144b804e6.tar.gz
early filter optimization
-rw-r--r--ydb/core/formats/merging_sorted_input_stream.cpp9
-rw-r--r--ydb/core/formats/program.cpp47
-rw-r--r--ydb/core/formats/program.h5
-rw-r--r--ydb/core/tx/columnshard/engines/filter.cpp11
-rw-r--r--ydb/core/tx/columnshard/engines/filter.h1
-rw-r--r--ydb/core/tx/columnshard/engines/indexed_read_data.cpp13
-rw-r--r--ydb/core/tx/columnshard/engines/portion_info.h5
7 files changed, 77 insertions, 14 deletions
diff --git a/ydb/core/formats/merging_sorted_input_stream.cpp b/ydb/core/formats/merging_sorted_input_stream.cpp
index 2e2b5f7d42..e5b1ea3b03 100644
--- a/ydb/core/formats/merging_sorted_input_stream.cpp
+++ b/ydb/core/formats/merging_sorted_input_stream.cpp
@@ -122,6 +122,7 @@ TMergingSortedInputStream::TMergingSortedInputStream(const std::vector<IInputStr
void TMergingSortedInputStream::Init() {
Y_VERIFY(First);
First = false;
+ size_t totalRows = 0;
for (size_t i = 0; i < SourceBatches.size(); ++i) {
auto& batch = SourceBatches[i];
@@ -134,14 +135,12 @@ void TMergingSortedInputStream::Init() {
continue;
}
- const size_t rows = batch->num_rows();
- if (ExpectedBatchSize < rows) {
- ExpectedBatchSize = MaxBatchSize ? std::min(rows, MaxBatchSize) : rows;
- }
-
+ totalRows += batch->num_rows();
Cursors[i] = TSortCursorImpl(batch, Description, i);
}
+ ExpectedBatchSize = MaxBatchSize ? std::min(totalRows, MaxBatchSize) : totalRows;
+
Queue = TSortingHeap(Cursors, Description->NotNull);
/// Let's check that all source blocks have the same structure.
diff --git a/ydb/core/formats/program.cpp b/ydb/core/formats/program.cpp
index 420955a610..3c9fcda4b1 100644
--- a/ydb/core/formats/program.cpp
+++ b/ydb/core/formats/program.cpp
@@ -608,10 +608,7 @@ arrow::Status TProgramStep::ApplyAggregates(
return arrow::Status::OK();
}
-arrow::Status TProgramStep::ApplyFilters(TDatumBatch& batch) const {
- if (Filters.empty()) {
- return arrow::Status::OK();
- }
+arrow::Status TProgramStep::MakeCombinedFilter(TDatumBatch& batch, std::vector<bool>& bits) const {
std::vector<std::vector<bool>> filters;
filters.reserve(Filters.size());
for (auto& colName : Filters) {
@@ -631,11 +628,22 @@ arrow::Status TProgramStep::ApplyFilters(TDatumBatch& batch) const {
}
}
- std::vector<bool> bits;
for (auto& f : filters) {
bits = NArrow::CombineFilters(std::move(bits), std::move(f));
}
+ return arrow::Status::OK();
+}
+arrow::Status TProgramStep::ApplyFilters(TDatumBatch& batch) const {
+ if (Filters.empty()) {
+ return arrow::Status::OK();
+ }
+
+ std::vector<bool> bits;
+ auto status = MakeCombinedFilter(batch, bits);
+ if (!status.ok()) {
+ return status;
+ }
if (bits.size()) {
auto filter = NArrow::MakeFilter(bits);
@@ -748,4 +756,33 @@ arrow::Status TProgramStep::Apply(std::shared_ptr<arrow::RecordBatch>& batch, ar
return arrow::Status::OK();
}
+std::vector<bool> TProgram::MakeEarlyFilter(const std::shared_ptr<arrow::RecordBatch>& srcBatch,
+ arrow::compute::ExecContext* ctx) const
+{
+ try {
+ if (Steps.empty()) {
+ return {};
+ }
+ auto& step = Steps[0];
+ if (step->Filters.empty()) {
+ return {};
+ }
+
+ auto batch = srcBatch;
+ auto rb = TProgramStep::TDatumBatch::FromRecordBatch(batch);
+
+ if (!step->ApplyAssignes(*rb, ctx).ok()) {
+ return {};
+ }
+ std::vector<bool> filter;
+ if (!step->MakeCombinedFilter(*rb, filter).ok()) {
+ return {};
+ }
+ return filter;
+ } catch (const std::exception& ex) {
+ return {};
+ }
+ return {};
+}
+
}
diff --git a/ydb/core/formats/program.h b/ydb/core/formats/program.h
index f9cdfa8bfd..2c246415ad 100644
--- a/ydb/core/formats/program.h
+++ b/ydb/core/formats/program.h
@@ -202,6 +202,8 @@ struct TProgramStep {
arrow::Status ApplyFilters(TDatumBatch& batch) const;
arrow::Status ApplyProjection(std::shared_ptr<arrow::RecordBatch>& batch) const;
arrow::Status ApplyProjection(TDatumBatch& batch) const;
+
+ arrow::Status MakeCombinedFilter(TDatumBatch& batch, std::vector<bool>& bits) const;
};
struct TProgram {
@@ -227,6 +229,9 @@ struct TProgram {
}
return arrow::Status::OK();
}
+
+ std::vector<bool> MakeEarlyFilter(const std::shared_ptr<arrow::RecordBatch>& batch,
+ arrow::compute::ExecContext* ctx) const;
};
inline arrow::Status ApplyProgram(
diff --git a/ydb/core/tx/columnshard/engines/filter.cpp b/ydb/core/tx/columnshard/engines/filter.cpp
index 8b6822700f..43fb510c65 100644
--- a/ydb/core/tx/columnshard/engines/filter.cpp
+++ b/ydb/core/tx/columnshard/engines/filter.cpp
@@ -2,6 +2,7 @@
#include "filter.h"
#include "indexed_read_data.h"
#include <ydb/core/formats/arrow_helpers.h>
+#include <ydb/core/formats/custom_registry.h>
namespace NKikimr::NOlap {
@@ -138,8 +139,7 @@ TFilteredBatch FilterPortion(const std::shared_ptr<arrow::RecordBatch>& portion,
return TFilteredBatch{portion, filter};
}
-TFilteredBatch FilterNotIndexed(const std::shared_ptr<arrow::RecordBatch>& batch, const TReadMetadata& readMetadata)
-{
+TFilteredBatch FilterNotIndexed(const std::shared_ptr<arrow::RecordBatch>& batch, const TReadMetadata& readMetadata) {
std::vector<bool> less;
if (readMetadata.LessPredicate) {
Y_VERIFY(NArrow::HasAllColumns(batch, readMetadata.LessPredicate->Batch->schema()));
@@ -166,6 +166,13 @@ TFilteredBatch FilterNotIndexed(const std::shared_ptr<arrow::RecordBatch>& batch
return TFilteredBatch{batch, filter};
}
+TFilteredBatch EarlyFilter(const std::shared_ptr<arrow::RecordBatch>& batch, std::shared_ptr<NSsa::TProgram> ssa) {
+ return TFilteredBatch{
+ .Batch = batch,
+ .Filter = ssa->MakeEarlyFilter(batch, NArrow::GetCustomExecContext())
+ };
+}
+
void ReplaceDupKeys(std::shared_ptr<arrow::RecordBatch>& batch,
const std::shared_ptr<arrow::Schema>& replaceSchema, bool lastWins) {
THashSet<NArrow::TReplaceKey> replaces;
diff --git a/ydb/core/tx/columnshard/engines/filter.h b/ydb/core/tx/columnshard/engines/filter.h
index 9649808e75..ddc5e4dbf4 100644
--- a/ydb/core/tx/columnshard/engines/filter.h
+++ b/ydb/core/tx/columnshard/engines/filter.h
@@ -35,5 +35,6 @@ void ReplaceDupKeys(std::shared_ptr<arrow::RecordBatch>& batch,
struct TReadMetadata;
TFilteredBatch FilterPortion(const std::shared_ptr<arrow::RecordBatch>& batch, const TReadMetadata& readMetadata);
TFilteredBatch FilterNotIndexed(const std::shared_ptr<arrow::RecordBatch>& batch, const TReadMetadata& readMetadata);
+TFilteredBatch EarlyFilter(const std::shared_ptr<arrow::RecordBatch>& batch, std::shared_ptr<NSsa::TProgram> ssa);
}
diff --git a/ydb/core/tx/columnshard/engines/indexed_read_data.cpp b/ydb/core/tx/columnshard/engines/indexed_read_data.cpp
index ea616f8e4b..993379169f 100644
--- a/ydb/core/tx/columnshard/engines/indexed_read_data.cpp
+++ b/ydb/core/tx/columnshard/engines/indexed_read_data.cpp
@@ -95,7 +95,7 @@ std::vector<std::shared_ptr<arrow::RecordBatch>> SpecialMergeSorted(const std::v
}
continue;
}
-#if 1
+#if 0 // optimization
auto deduped = SliceSortedBatches(slices, description);
for (auto& batch : deduped) {
if (batch && batch->num_rows()) {
@@ -257,6 +257,15 @@ std::shared_ptr<arrow::RecordBatch> TIndexedReadData::AssembleIndexedBatch(ui32
Y_VERIFY(filtered.Valid());
filtered.ApplyFilter();
}
+#if 1 // optimization
+ if (ReadMetadata->Program && portionInfo.AllowEarlyFilter()) {
+ filtered = NOlap::EarlyFilter(filtered.Batch, ReadMetadata->Program);
+ }
+ if (filtered.Batch) {
+ Y_VERIFY(filtered.Valid());
+ filtered.ApplyFilter();
+ }
+#endif
return filtered.Batch;
}
@@ -420,7 +429,7 @@ std::vector<std::vector<std::shared_ptr<arrow::RecordBatch>>> TIndexedReadData::
auto deduped = SpecialMergeSorted(inGranule, IndexInfo(), SortReplaceDescription, BatchesToDedup);
out.emplace_back(std::move(deduped));
#else
- out.push_back();
+ out.push_back({});
out.back().emplace_back(CombineSortedBatches(inGranule, SortReplaceDescription));
#endif
} else {
diff --git a/ydb/core/tx/columnshard/engines/portion_info.h b/ydb/core/tx/columnshard/engines/portion_info.h
index a37d5ea4ac..0c9123a454 100644
--- a/ydb/core/tx/columnshard/engines/portion_info.h
+++ b/ydb/core/tx/columnshard/engines/portion_info.h
@@ -57,6 +57,11 @@ struct TPortionInfo {
bool CanIntersectOthers() const { return !Valid() || IsInserted(); }
size_t NumRecords() const { return Records.size(); }
+ bool AllowEarlyFilter() const {
+ return Meta.Produced == TPortionMeta::COMPACTED
+ || Meta.Produced == TPortionMeta::SPLIT_COMPACTED;
+ }
+
bool EvictReady(size_t hotSize) const {
return Meta.Produced == TPortionMeta::COMPACTED
|| Meta.Produced == TPortionMeta::SPLIT_COMPACTED