diff options
author | chertus <azuikov@ydb.tech> | 2023-03-22 12:51:03 +0300 |
---|---|---|
committer | chertus <azuikov@ydb.tech> | 2023-03-22 12:51:03 +0300 |
commit | 0a462e22a244df5f5006af2a158864e01e5bc9c7 (patch) | |
tree | 44a60c30f977cf67141ea5cf1d0d9b6c9d430f6e | |
parent | c662852c21526004a43573404fc01a6ac70a7b64 (diff) | |
download | ydb-0a462e22a244df5f5006af2a158864e01e5bc9c7.tar.gz |
early remove empty portions in SELECT
-rw-r--r-- | ydb/core/formats/arrow_helpers.cpp | 24 | ||||
-rw-r--r-- | ydb/core/formats/arrow_helpers.h | 1 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/engines/filter.cpp | 85 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/engines/filter.h | 20 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/engines/indexed_read_data.cpp | 72 |
5 files changed, 119 insertions, 83 deletions
diff --git a/ydb/core/formats/arrow_helpers.cpp b/ydb/core/formats/arrow_helpers.cpp index 362c66a33cc..b92512dd284 100644 --- a/ydb/core/formats/arrow_helpers.cpp +++ b/ydb/core/formats/arrow_helpers.cpp @@ -974,6 +974,30 @@ std::vector<bool> CombineFilters(std::vector<bool>&& f1, std::vector<bool>&& f2) return f1; } +std::vector<bool> CombineFilters(std::vector<bool>&& f1, std::vector<bool>&& f2, size_t& count) { + count = 0; + if (f1.empty() && !f2.empty()) { + f1.swap(f2); + } + if (f1.empty()) { + return {}; + } + + if (f2.empty()) { + for (bool bit : f1) { + count += bit; + } + return f1; + } + + Y_VERIFY(f1.size() == f2.size()); + for (size_t i = 0; i < f1.size(); ++i) { + f1[i] = f1[i] && f2[i]; + count += f1[i]; + } + return f1; +} + std::vector<bool> MakePredicateFilter(const arrow::Datum& datum, const arrow::Datum& border, ECompareType compareType) { std::vector<NArrow::ECompareResult> cmps; diff --git a/ydb/core/formats/arrow_helpers.h b/ydb/core/formats/arrow_helpers.h index 8895d4bf92a..de87cc36f60 100644 --- a/ydb/core/formats/arrow_helpers.h +++ b/ydb/core/formats/arrow_helpers.h @@ -101,6 +101,7 @@ std::shared_ptr<arrow::UInt64Array> MakeUI64Array(ui64 value, i64 size); std::shared_ptr<arrow::UInt64Array> MakePermutation(int size, bool reverse = false); std::shared_ptr<arrow::BooleanArray> MakeFilter(const std::vector<bool>& bits); std::vector<bool> CombineFilters(std::vector<bool>&& f1, std::vector<bool>&& f2); +std::vector<bool> CombineFilters(std::vector<bool>&& f1, std::vector<bool>&& f2, size_t& count); TVector<TString> ColumnNames(const std::shared_ptr<arrow::Schema>& schema); // Return size in bytes including size of bitmap mask ui64 GetBatchDataSize(const std::shared_ptr<arrow::RecordBatch>& batch); diff --git a/ydb/core/tx/columnshard/engines/filter.cpp b/ydb/core/tx/columnshard/engines/filter.cpp index ea9160c43c1..8b6822700fe 100644 --- a/ydb/core/tx/columnshard/engines/filter.cpp +++ b/ydb/core/tx/columnshard/engines/filter.cpp @@ -5,27 +5,33 @@ namespace NKikimr::NOlap { -std::vector<bool> MakeSnapshotFilter(std::shared_ptr<arrow::Table> table, +void TFilteredBatch::ApplyFilter() { + if (Filter.empty()) { + return; + } + auto res = arrow::compute::Filter(Batch, NArrow::MakeFilter(Filter)); + Y_VERIFY_S(res.ok(), res.status().message()); + Y_VERIFY((*res).kind() == arrow::Datum::RECORD_BATCH); + Batch = (*res).record_batch(); + Filter.clear(); +} + +std::vector<bool> MakeSnapshotFilter(std::shared_ptr<arrow::RecordBatch> batch, std::shared_ptr<arrow::Schema> snapSchema, ui64 planStep, ui64 txId) { - Y_VERIFY(table); + Y_VERIFY(batch); Y_VERIFY(snapSchema); Y_VERIFY(snapSchema->num_fields() == 2); - std::vector<std::shared_ptr<arrow::ChunkedArray>> snapColumns; - snapColumns.reserve(snapSchema->num_fields()); - for (auto& field : snapSchema->fields()) { - snapColumns.push_back(table->GetColumnByName(field->name())); - Y_VERIFY(snapColumns.back()); - } - bool alwaysTrue = true; std::vector<bool> bits; - bits.reserve(snapColumns[0]->length()); + bits.reserve(batch->num_rows()); - for (int ch = 0; ch < snapColumns[0]->num_chunks(); ++ch) { - auto steps = snapColumns[0]->chunk(ch); - auto ids = snapColumns[1]->chunk(ch); + { + auto steps = batch->GetColumnByName(snapSchema->fields()[0]->name()); + auto ids = batch->GetColumnByName(snapSchema->fields()[1]->name()); + Y_VERIFY(steps); + Y_VERIFY(ids); Y_VERIFY(steps->length() == ids->length()); const auto* rawSteps = std::static_pointer_cast<arrow::UInt64Array>(steps)->raw_values(); @@ -101,8 +107,7 @@ std::vector<bool> MakeReplaceFilterLastWins(std::shared_ptr<arrow::RecordBatch> return bits; } -std::shared_ptr<arrow::RecordBatch> FilterPortion(std::shared_ptr<arrow::Table> portion, - const TReadMetadata& readMetadata) { +TFilteredBatch FilterPortion(const std::shared_ptr<arrow::RecordBatch>& portion, const TReadMetadata& readMetadata) { Y_VERIFY(portion); std::vector<bool> snapFilter; if (readMetadata.PlanStep) { @@ -124,31 +129,41 @@ std::shared_ptr<arrow::RecordBatch> FilterPortion(std::shared_ptr<arrow::Table> greater = NArrow::MakePredicateFilter(portion, readMetadata.GreaterPredicate->Batch, cmpType); } - std::vector<bool> bits = NArrow::CombineFilters(std::move(snapFilter), - NArrow::CombineFilters(std::move(less), std::move(greater))); - if (bits.size()) { - auto res = arrow::compute::Filter(portion, NArrow::MakeFilter(bits)); - Y_VERIFY_S(res.ok(), res.status().message()); - Y_VERIFY((*res).kind() == arrow::Datum::TABLE); - portion = (*res).table(); + size_t numRows = 0; + std::vector<bool> filter = NArrow::CombineFilters( + std::move(snapFilter), NArrow::CombineFilters(std::move(less), std::move(greater)), numRows); + if (filter.size() && !numRows) { + return {}; } + return TFilteredBatch{portion, filter}; +} - Y_VERIFY(portion); - if (!portion->num_rows()) { - // TableBatchReader return nullptr in case of empty table. We need a valid batch with 0 rows. - return NArrow::MakeEmptyBatch(portion->schema()); +TFilteredBatch FilterNotIndexed(const std::shared_ptr<arrow::RecordBatch>& batch, const TReadMetadata& readMetadata) +{ + std::vector<bool> less; + if (readMetadata.LessPredicate) { + Y_VERIFY(NArrow::HasAllColumns(batch, readMetadata.LessPredicate->Batch->schema())); + + auto cmpType = readMetadata.LessPredicate->Inclusive ? + NArrow::ECompareType::LESS_OR_EQUAL : NArrow::ECompareType::LESS; + less = NArrow::MakePredicateFilter(batch, readMetadata.LessPredicate->Batch, cmpType); } - auto res = portion->CombineChunks(); - Y_VERIFY(res.ok()); + std::vector<bool> greater; + if (readMetadata.GreaterPredicate) { + Y_VERIFY(NArrow::HasAllColumns(batch, readMetadata.GreaterPredicate->Batch->schema())); - arrow::TableBatchReader reader(*portion); - auto result = reader.Next(); - Y_VERIFY(result.ok()); - auto batch = *result; - result = reader.Next(); - Y_VERIFY(result.ok() && !(*result)); - return batch; + auto cmpType = readMetadata.GreaterPredicate->Inclusive ? + NArrow::ECompareType::GREATER_OR_EQUAL : NArrow::ECompareType::GREATER; + greater = NArrow::MakePredicateFilter(batch, readMetadata.GreaterPredicate->Batch, cmpType); + } + + size_t numRows = 0; + std::vector<bool> filter = NArrow::CombineFilters(std::move(less), std::move(greater), numRows); + if (filter.size() && !numRows) { + return {}; + } + return TFilteredBatch{batch, filter}; } void ReplaceDupKeys(std::shared_ptr<arrow::RecordBatch>& batch, diff --git a/ydb/core/tx/columnshard/engines/filter.h b/ydb/core/tx/columnshard/engines/filter.h index 8926e535adb..9649808e75e 100644 --- a/ydb/core/tx/columnshard/engines/filter.h +++ b/ydb/core/tx/columnshard/engines/filter.h @@ -3,7 +3,21 @@ namespace NKikimr::NOlap { -std::vector<bool> MakeSnapshotFilter(std::shared_ptr<arrow::Table> table, +struct TFilteredBatch { + std::shared_ptr<arrow::RecordBatch> Batch; + std::vector<bool> Filter; + + bool Valid() const { + if (Batch) { + return Filter.empty() || (Filter.size() == (size_t)Batch->num_rows()); + } + return false; + } + + void ApplyFilter(); +}; + +std::vector<bool> MakeSnapshotFilter(std::shared_ptr<arrow::RecordBatch> batch, std::shared_ptr<arrow::Schema> snapSchema, ui64 planStep, ui64 txId); @@ -19,7 +33,7 @@ void ReplaceDupKeys(std::shared_ptr<arrow::RecordBatch>& batch, const std::shared_ptr<arrow::Schema>& replaceSchema, bool lastWins = false); struct TReadMetadata; -std::shared_ptr<arrow::RecordBatch> FilterPortion(std::shared_ptr<arrow::Table> table, - const TReadMetadata& readMetadata); +TFilteredBatch FilterPortion(const std::shared_ptr<arrow::RecordBatch>& batch, const TReadMetadata& readMetadata); +TFilteredBatch FilterNotIndexed(const std::shared_ptr<arrow::RecordBatch>& batch, const TReadMetadata& readMetadata); } diff --git a/ydb/core/tx/columnshard/engines/indexed_read_data.cpp b/ydb/core/tx/columnshard/engines/indexed_read_data.cpp index 0ddde19a5a4..b20269dfa99 100644 --- a/ydb/core/tx/columnshard/engines/indexed_read_data.cpp +++ b/ydb/core/tx/columnshard/engines/indexed_read_data.cpp @@ -244,7 +244,9 @@ void TIndexedReadData::AddIndexed(const TBlobRange& blobRange, const TString& co if (waitingFor.empty()) { WaitIndexed.erase(batchNo); - Indexed[batchNo] = AssembleIndexedBatch(batchNo); + if (auto batch = AssembleIndexedBatch(batchNo)) { + Indexed[batchNo] = batch; + } UpdateGranuleWaits(batchNo); } } @@ -252,13 +254,7 @@ void TIndexedReadData::AddIndexed(const TBlobRange& blobRange, const TString& co std::shared_ptr<arrow::RecordBatch> TIndexedReadData::AssembleIndexedBatch(ui32 batchNo) { auto& portionInfo = Portion(batchNo); - auto portion = portionInfo.Assemble(ReadMetadata->IndexInfo, ReadMetadata->LoadSchema, Data); - Y_VERIFY(portion); - - /// @warning The replace logic is correct only in assumption that predicate is applyed over a part of ReplaceKey. - /// It's not OK to apply predicate before replacing key duplicates otherwise. - /// Assumption: dup(A, B) <=> PK(A) = PK(B) => Predicate(A) = Predicate(B) => all or no dups for PK(A) here - auto batch = NOlap::FilterPortion(portion, *ReadMetadata); + auto batch = portionInfo.AssembleInBatch(ReadMetadata->IndexInfo, ReadMetadata->LoadSchema, Data); Y_VERIFY(batch); for (auto& rec : portionInfo.Records) { @@ -266,7 +262,15 @@ std::shared_ptr<arrow::RecordBatch> TIndexedReadData::AssembleIndexedBatch(ui32 Data.erase(blobRange); } - return batch; + /// @warning The replace logic is correct only in assumption that predicate is applyed over a part of ReplaceKey. + /// It's not OK to apply predicate before replacing key duplicates otherwise. + /// Assumption: dup(A, B) <=> PK(A) = PK(B) => Predicate(A) = Predicate(B) => all or no dups for PK(A) here + auto filtered = NOlap::FilterPortion(batch, *ReadMetadata); + if (filtered.Batch) { + Y_VERIFY(filtered.Valid()); + filtered.ApplyFilter(); + } + return filtered.Batch; } void TIndexedReadData::UpdateGranuleWaits(ui32 batchNo) { @@ -282,8 +286,7 @@ void TIndexedReadData::UpdateGranuleWaits(ui32 batchNo) { } std::shared_ptr<arrow::RecordBatch> -TIndexedReadData::MakeNotIndexedBatch(const std::shared_ptr<arrow::RecordBatch>& srcBatch, - ui64 planStep, ui64 txId) const { +TIndexedReadData::MakeNotIndexedBatch(const std::shared_ptr<arrow::RecordBatch>& srcBatch, ui64 planStep, ui64 txId) const { Y_VERIFY(srcBatch); // Extract columns (without check), filter, attach snapshot, extract columns with check @@ -292,41 +295,20 @@ TIndexedReadData::MakeNotIndexedBatch(const std::shared_ptr<arrow::RecordBatch>& auto batch = NArrow::ExtractExistedColumns(srcBatch, ReadMetadata->LoadSchema); Y_VERIFY(batch); - { // Apply predicate - // TODO: Extract this info function - std::vector<bool> less; - if (ReadMetadata->LessPredicate) { - Y_VERIFY(NArrow::HasAllColumns(batch, ReadMetadata->LessPredicate->Batch->schema())); - - auto cmpType = ReadMetadata->LessPredicate->Inclusive ? - NArrow::ECompareType::LESS_OR_EQUAL : NArrow::ECompareType::LESS; - less = NArrow::MakePredicateFilter(batch, ReadMetadata->LessPredicate->Batch, cmpType); - } - - std::vector<bool> greater; - if (ReadMetadata->GreaterPredicate) { - Y_VERIFY(NArrow::HasAllColumns(batch, ReadMetadata->GreaterPredicate->Batch->schema())); - - auto cmpType = ReadMetadata->GreaterPredicate->Inclusive ? - NArrow::ECompareType::GREATER_OR_EQUAL : NArrow::ECompareType::GREATER; - greater = NArrow::MakePredicateFilter(batch, ReadMetadata->GreaterPredicate->Batch, cmpType); - } - - std::vector<bool> bits = NArrow::CombineFilters(std::move(less), std::move(greater)); - if (bits.size()) { - auto res = arrow::compute::Filter(batch, NArrow::MakeFilter(bits)); - Y_VERIFY_S(res.ok(), res.status().message()); - Y_VERIFY((*res).kind() == arrow::Datum::RECORD_BATCH); - batch = (*res).record_batch(); - } + auto filtered = FilterNotIndexed(batch, *ReadMetadata); + if (!filtered.Batch) { + return {}; } - batch = TIndexInfo::AddSpecialColumns(batch, planStep, txId); - Y_VERIFY(batch); + filtered.Batch = TIndexInfo::AddSpecialColumns(filtered.Batch, planStep, txId); + Y_VERIFY(filtered.Batch); - batch = NArrow::ExtractColumns(batch, ReadMetadata->LoadSchema); - Y_VERIFY(batch); - return batch; + filtered.Batch = NArrow::ExtractColumns(filtered.Batch, ReadMetadata->LoadSchema); + Y_VERIFY(filtered.Batch); + + Y_VERIFY(filtered.Valid()); + filtered.ApplyFilter(); + return filtered.Batch; } TVector<TPartialReadResult> TIndexedReadData::GetReadyResults(const int64_t maxRowsInBatch) { @@ -350,7 +332,7 @@ TVector<TPartialReadResult> TIndexedReadData::GetReadyResults(const int64_t maxR } // Extact ready granules (they are ready themselves but probably not ready to go out) - TVector<ui32> ready; + std::vector<ui32> ready; for (auto& [batchNo, batch] : Indexed) { ui64 granule = BatchGranule(batchNo); if (ReadyGranules.count(granule)) { @@ -477,7 +459,7 @@ TIndexedReadData::MergeNotIndexed(std::vector<std::shared_ptr<arrow::RecordBatch { // remove empty batches size_t dst = 0; for (size_t src = 0; src < batches.size(); ++src) { - if (batches[src]->num_rows()) { + if (batches[src] && batches[src]->num_rows()) { if (dst != src) { batches[dst] = batches[src]; } |