aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorchertus <azuikov@ydb.tech>2023-03-22 12:51:03 +0300
committerchertus <azuikov@ydb.tech>2023-03-22 12:51:03 +0300
commit0a462e22a244df5f5006af2a158864e01e5bc9c7 (patch)
tree44a60c30f977cf67141ea5cf1d0d9b6c9d430f6e
parentc662852c21526004a43573404fc01a6ac70a7b64 (diff)
downloadydb-0a462e22a244df5f5006af2a158864e01e5bc9c7.tar.gz
early remove empty portions in SELECT
-rw-r--r--ydb/core/formats/arrow_helpers.cpp24
-rw-r--r--ydb/core/formats/arrow_helpers.h1
-rw-r--r--ydb/core/tx/columnshard/engines/filter.cpp85
-rw-r--r--ydb/core/tx/columnshard/engines/filter.h20
-rw-r--r--ydb/core/tx/columnshard/engines/indexed_read_data.cpp72
5 files changed, 119 insertions, 83 deletions
diff --git a/ydb/core/formats/arrow_helpers.cpp b/ydb/core/formats/arrow_helpers.cpp
index 362c66a33cc..b92512dd284 100644
--- a/ydb/core/formats/arrow_helpers.cpp
+++ b/ydb/core/formats/arrow_helpers.cpp
@@ -974,6 +974,30 @@ std::vector<bool> CombineFilters(std::vector<bool>&& f1, std::vector<bool>&& f2)
return f1;
}
+std::vector<bool> CombineFilters(std::vector<bool>&& f1, std::vector<bool>&& f2, size_t& count) {
+ count = 0;
+ if (f1.empty() && !f2.empty()) {
+ f1.swap(f2);
+ }
+ if (f1.empty()) {
+ return {};
+ }
+
+ if (f2.empty()) {
+ for (bool bit : f1) {
+ count += bit;
+ }
+ return f1;
+ }
+
+ Y_VERIFY(f1.size() == f2.size());
+ for (size_t i = 0; i < f1.size(); ++i) {
+ f1[i] = f1[i] && f2[i];
+ count += f1[i];
+ }
+ return f1;
+}
+
std::vector<bool> MakePredicateFilter(const arrow::Datum& datum, const arrow::Datum& border,
ECompareType compareType) {
std::vector<NArrow::ECompareResult> cmps;
diff --git a/ydb/core/formats/arrow_helpers.h b/ydb/core/formats/arrow_helpers.h
index 8895d4bf92a..de87cc36f60 100644
--- a/ydb/core/formats/arrow_helpers.h
+++ b/ydb/core/formats/arrow_helpers.h
@@ -101,6 +101,7 @@ std::shared_ptr<arrow::UInt64Array> MakeUI64Array(ui64 value, i64 size);
std::shared_ptr<arrow::UInt64Array> MakePermutation(int size, bool reverse = false);
std::shared_ptr<arrow::BooleanArray> MakeFilter(const std::vector<bool>& bits);
std::vector<bool> CombineFilters(std::vector<bool>&& f1, std::vector<bool>&& f2);
+std::vector<bool> CombineFilters(std::vector<bool>&& f1, std::vector<bool>&& f2, size_t& count);
TVector<TString> ColumnNames(const std::shared_ptr<arrow::Schema>& schema);
// Return size in bytes including size of bitmap mask
ui64 GetBatchDataSize(const std::shared_ptr<arrow::RecordBatch>& batch);
diff --git a/ydb/core/tx/columnshard/engines/filter.cpp b/ydb/core/tx/columnshard/engines/filter.cpp
index ea9160c43c1..8b6822700fe 100644
--- a/ydb/core/tx/columnshard/engines/filter.cpp
+++ b/ydb/core/tx/columnshard/engines/filter.cpp
@@ -5,27 +5,33 @@
namespace NKikimr::NOlap {
-std::vector<bool> MakeSnapshotFilter(std::shared_ptr<arrow::Table> table,
+void TFilteredBatch::ApplyFilter() {
+ if (Filter.empty()) {
+ return;
+ }
+ auto res = arrow::compute::Filter(Batch, NArrow::MakeFilter(Filter));
+ Y_VERIFY_S(res.ok(), res.status().message());
+ Y_VERIFY((*res).kind() == arrow::Datum::RECORD_BATCH);
+ Batch = (*res).record_batch();
+ Filter.clear();
+}
+
+std::vector<bool> MakeSnapshotFilter(std::shared_ptr<arrow::RecordBatch> batch,
std::shared_ptr<arrow::Schema> snapSchema,
ui64 planStep, ui64 txId) {
- Y_VERIFY(table);
+ Y_VERIFY(batch);
Y_VERIFY(snapSchema);
Y_VERIFY(snapSchema->num_fields() == 2);
- std::vector<std::shared_ptr<arrow::ChunkedArray>> snapColumns;
- snapColumns.reserve(snapSchema->num_fields());
- for (auto& field : snapSchema->fields()) {
- snapColumns.push_back(table->GetColumnByName(field->name()));
- Y_VERIFY(snapColumns.back());
- }
-
bool alwaysTrue = true;
std::vector<bool> bits;
- bits.reserve(snapColumns[0]->length());
+ bits.reserve(batch->num_rows());
- for (int ch = 0; ch < snapColumns[0]->num_chunks(); ++ch) {
- auto steps = snapColumns[0]->chunk(ch);
- auto ids = snapColumns[1]->chunk(ch);
+ {
+ auto steps = batch->GetColumnByName(snapSchema->fields()[0]->name());
+ auto ids = batch->GetColumnByName(snapSchema->fields()[1]->name());
+ Y_VERIFY(steps);
+ Y_VERIFY(ids);
Y_VERIFY(steps->length() == ids->length());
const auto* rawSteps = std::static_pointer_cast<arrow::UInt64Array>(steps)->raw_values();
@@ -101,8 +107,7 @@ std::vector<bool> MakeReplaceFilterLastWins(std::shared_ptr<arrow::RecordBatch>
return bits;
}
-std::shared_ptr<arrow::RecordBatch> FilterPortion(std::shared_ptr<arrow::Table> portion,
- const TReadMetadata& readMetadata) {
+TFilteredBatch FilterPortion(const std::shared_ptr<arrow::RecordBatch>& portion, const TReadMetadata& readMetadata) {
Y_VERIFY(portion);
std::vector<bool> snapFilter;
if (readMetadata.PlanStep) {
@@ -124,31 +129,41 @@ std::shared_ptr<arrow::RecordBatch> FilterPortion(std::shared_ptr<arrow::Table>
greater = NArrow::MakePredicateFilter(portion, readMetadata.GreaterPredicate->Batch, cmpType);
}
- std::vector<bool> bits = NArrow::CombineFilters(std::move(snapFilter),
- NArrow::CombineFilters(std::move(less), std::move(greater)));
- if (bits.size()) {
- auto res = arrow::compute::Filter(portion, NArrow::MakeFilter(bits));
- Y_VERIFY_S(res.ok(), res.status().message());
- Y_VERIFY((*res).kind() == arrow::Datum::TABLE);
- portion = (*res).table();
+ size_t numRows = 0;
+ std::vector<bool> filter = NArrow::CombineFilters(
+ std::move(snapFilter), NArrow::CombineFilters(std::move(less), std::move(greater)), numRows);
+ if (filter.size() && !numRows) {
+ return {};
}
+ return TFilteredBatch{portion, filter};
+}
- Y_VERIFY(portion);
- if (!portion->num_rows()) {
- // TableBatchReader return nullptr in case of empty table. We need a valid batch with 0 rows.
- return NArrow::MakeEmptyBatch(portion->schema());
+TFilteredBatch FilterNotIndexed(const std::shared_ptr<arrow::RecordBatch>& batch, const TReadMetadata& readMetadata)
+{
+ std::vector<bool> less;
+ if (readMetadata.LessPredicate) {
+ Y_VERIFY(NArrow::HasAllColumns(batch, readMetadata.LessPredicate->Batch->schema()));
+
+ auto cmpType = readMetadata.LessPredicate->Inclusive ?
+ NArrow::ECompareType::LESS_OR_EQUAL : NArrow::ECompareType::LESS;
+ less = NArrow::MakePredicateFilter(batch, readMetadata.LessPredicate->Batch, cmpType);
}
- auto res = portion->CombineChunks();
- Y_VERIFY(res.ok());
+ std::vector<bool> greater;
+ if (readMetadata.GreaterPredicate) {
+ Y_VERIFY(NArrow::HasAllColumns(batch, readMetadata.GreaterPredicate->Batch->schema()));
- arrow::TableBatchReader reader(*portion);
- auto result = reader.Next();
- Y_VERIFY(result.ok());
- auto batch = *result;
- result = reader.Next();
- Y_VERIFY(result.ok() && !(*result));
- return batch;
+ auto cmpType = readMetadata.GreaterPredicate->Inclusive ?
+ NArrow::ECompareType::GREATER_OR_EQUAL : NArrow::ECompareType::GREATER;
+ greater = NArrow::MakePredicateFilter(batch, readMetadata.GreaterPredicate->Batch, cmpType);
+ }
+
+ size_t numRows = 0;
+ std::vector<bool> filter = NArrow::CombineFilters(std::move(less), std::move(greater), numRows);
+ if (filter.size() && !numRows) {
+ return {};
+ }
+ return TFilteredBatch{batch, filter};
}
void ReplaceDupKeys(std::shared_ptr<arrow::RecordBatch>& batch,
diff --git a/ydb/core/tx/columnshard/engines/filter.h b/ydb/core/tx/columnshard/engines/filter.h
index 8926e535adb..9649808e75e 100644
--- a/ydb/core/tx/columnshard/engines/filter.h
+++ b/ydb/core/tx/columnshard/engines/filter.h
@@ -3,7 +3,21 @@
namespace NKikimr::NOlap {
-std::vector<bool> MakeSnapshotFilter(std::shared_ptr<arrow::Table> table,
+struct TFilteredBatch {
+ std::shared_ptr<arrow::RecordBatch> Batch;
+ std::vector<bool> Filter;
+
+ bool Valid() const {
+ if (Batch) {
+ return Filter.empty() || (Filter.size() == (size_t)Batch->num_rows());
+ }
+ return false;
+ }
+
+ void ApplyFilter();
+};
+
+std::vector<bool> MakeSnapshotFilter(std::shared_ptr<arrow::RecordBatch> batch,
std::shared_ptr<arrow::Schema> snapSchema,
ui64 planStep, ui64 txId);
@@ -19,7 +33,7 @@ void ReplaceDupKeys(std::shared_ptr<arrow::RecordBatch>& batch,
const std::shared_ptr<arrow::Schema>& replaceSchema, bool lastWins = false);
struct TReadMetadata;
-std::shared_ptr<arrow::RecordBatch> FilterPortion(std::shared_ptr<arrow::Table> table,
- const TReadMetadata& readMetadata);
+TFilteredBatch FilterPortion(const std::shared_ptr<arrow::RecordBatch>& batch, const TReadMetadata& readMetadata);
+TFilteredBatch FilterNotIndexed(const std::shared_ptr<arrow::RecordBatch>& batch, const TReadMetadata& readMetadata);
}
diff --git a/ydb/core/tx/columnshard/engines/indexed_read_data.cpp b/ydb/core/tx/columnshard/engines/indexed_read_data.cpp
index 0ddde19a5a4..b20269dfa99 100644
--- a/ydb/core/tx/columnshard/engines/indexed_read_data.cpp
+++ b/ydb/core/tx/columnshard/engines/indexed_read_data.cpp
@@ -244,7 +244,9 @@ void TIndexedReadData::AddIndexed(const TBlobRange& blobRange, const TString& co
if (waitingFor.empty()) {
WaitIndexed.erase(batchNo);
- Indexed[batchNo] = AssembleIndexedBatch(batchNo);
+ if (auto batch = AssembleIndexedBatch(batchNo)) {
+ Indexed[batchNo] = batch;
+ }
UpdateGranuleWaits(batchNo);
}
}
@@ -252,13 +254,7 @@ void TIndexedReadData::AddIndexed(const TBlobRange& blobRange, const TString& co
std::shared_ptr<arrow::RecordBatch> TIndexedReadData::AssembleIndexedBatch(ui32 batchNo) {
auto& portionInfo = Portion(batchNo);
- auto portion = portionInfo.Assemble(ReadMetadata->IndexInfo, ReadMetadata->LoadSchema, Data);
- Y_VERIFY(portion);
-
- /// @warning The replace logic is correct only in assumption that predicate is applyed over a part of ReplaceKey.
- /// It's not OK to apply predicate before replacing key duplicates otherwise.
- /// Assumption: dup(A, B) <=> PK(A) = PK(B) => Predicate(A) = Predicate(B) => all or no dups for PK(A) here
- auto batch = NOlap::FilterPortion(portion, *ReadMetadata);
+ auto batch = portionInfo.AssembleInBatch(ReadMetadata->IndexInfo, ReadMetadata->LoadSchema, Data);
Y_VERIFY(batch);
for (auto& rec : portionInfo.Records) {
@@ -266,7 +262,15 @@ std::shared_ptr<arrow::RecordBatch> TIndexedReadData::AssembleIndexedBatch(ui32
Data.erase(blobRange);
}
- return batch;
+ /// @warning The replace logic is correct only in assumption that predicate is applyed over a part of ReplaceKey.
+ /// It's not OK to apply predicate before replacing key duplicates otherwise.
+ /// Assumption: dup(A, B) <=> PK(A) = PK(B) => Predicate(A) = Predicate(B) => all or no dups for PK(A) here
+ auto filtered = NOlap::FilterPortion(batch, *ReadMetadata);
+ if (filtered.Batch) {
+ Y_VERIFY(filtered.Valid());
+ filtered.ApplyFilter();
+ }
+ return filtered.Batch;
}
void TIndexedReadData::UpdateGranuleWaits(ui32 batchNo) {
@@ -282,8 +286,7 @@ void TIndexedReadData::UpdateGranuleWaits(ui32 batchNo) {
}
std::shared_ptr<arrow::RecordBatch>
-TIndexedReadData::MakeNotIndexedBatch(const std::shared_ptr<arrow::RecordBatch>& srcBatch,
- ui64 planStep, ui64 txId) const {
+TIndexedReadData::MakeNotIndexedBatch(const std::shared_ptr<arrow::RecordBatch>& srcBatch, ui64 planStep, ui64 txId) const {
Y_VERIFY(srcBatch);
// Extract columns (without check), filter, attach snapshot, extract columns with check
@@ -292,41 +295,20 @@ TIndexedReadData::MakeNotIndexedBatch(const std::shared_ptr<arrow::RecordBatch>&
auto batch = NArrow::ExtractExistedColumns(srcBatch, ReadMetadata->LoadSchema);
Y_VERIFY(batch);
- { // Apply predicate
- // TODO: Extract this info function
- std::vector<bool> less;
- if (ReadMetadata->LessPredicate) {
- Y_VERIFY(NArrow::HasAllColumns(batch, ReadMetadata->LessPredicate->Batch->schema()));
-
- auto cmpType = ReadMetadata->LessPredicate->Inclusive ?
- NArrow::ECompareType::LESS_OR_EQUAL : NArrow::ECompareType::LESS;
- less = NArrow::MakePredicateFilter(batch, ReadMetadata->LessPredicate->Batch, cmpType);
- }
-
- std::vector<bool> greater;
- if (ReadMetadata->GreaterPredicate) {
- Y_VERIFY(NArrow::HasAllColumns(batch, ReadMetadata->GreaterPredicate->Batch->schema()));
-
- auto cmpType = ReadMetadata->GreaterPredicate->Inclusive ?
- NArrow::ECompareType::GREATER_OR_EQUAL : NArrow::ECompareType::GREATER;
- greater = NArrow::MakePredicateFilter(batch, ReadMetadata->GreaterPredicate->Batch, cmpType);
- }
-
- std::vector<bool> bits = NArrow::CombineFilters(std::move(less), std::move(greater));
- if (bits.size()) {
- auto res = arrow::compute::Filter(batch, NArrow::MakeFilter(bits));
- Y_VERIFY_S(res.ok(), res.status().message());
- Y_VERIFY((*res).kind() == arrow::Datum::RECORD_BATCH);
- batch = (*res).record_batch();
- }
+ auto filtered = FilterNotIndexed(batch, *ReadMetadata);
+ if (!filtered.Batch) {
+ return {};
}
- batch = TIndexInfo::AddSpecialColumns(batch, planStep, txId);
- Y_VERIFY(batch);
+ filtered.Batch = TIndexInfo::AddSpecialColumns(filtered.Batch, planStep, txId);
+ Y_VERIFY(filtered.Batch);
- batch = NArrow::ExtractColumns(batch, ReadMetadata->LoadSchema);
- Y_VERIFY(batch);
- return batch;
+ filtered.Batch = NArrow::ExtractColumns(filtered.Batch, ReadMetadata->LoadSchema);
+ Y_VERIFY(filtered.Batch);
+
+ Y_VERIFY(filtered.Valid());
+ filtered.ApplyFilter();
+ return filtered.Batch;
}
TVector<TPartialReadResult> TIndexedReadData::GetReadyResults(const int64_t maxRowsInBatch) {
@@ -350,7 +332,7 @@ TVector<TPartialReadResult> TIndexedReadData::GetReadyResults(const int64_t maxR
}
// Extact ready granules (they are ready themselves but probably not ready to go out)
- TVector<ui32> ready;
+ std::vector<ui32> ready;
for (auto& [batchNo, batch] : Indexed) {
ui64 granule = BatchGranule(batchNo);
if (ReadyGranules.count(granule)) {
@@ -477,7 +459,7 @@ TIndexedReadData::MergeNotIndexed(std::vector<std::shared_ptr<arrow::RecordBatch
{ // remove empty batches
size_t dst = 0;
for (size_t src = 0; src < batches.size(); ++src) {
- if (batches[src]->num_rows()) {
+ if (batches[src] && batches[src]->num_rows()) {
if (dst != src) {
batches[dst] = batches[src];
}