diff options
author | ivanmorozov <ivanmorozov@yandex-team.com> | 2023-04-28 15:34:40 +0300 |
---|---|---|
committer | ivanmorozov <ivanmorozov@yandex-team.com> | 2023-04-28 15:34:40 +0300 |
commit | 252edae5dce1381309d136306c615141a18407e3 (patch) | |
tree | 2985b31c9718562990cfdc4d9718b5a43d47db81 | |
parent | feb79a8842a81474d42b9c460ffb7d93ef44184c (diff) | |
download | ydb-252edae5dce1381309d136306c615141a18407e3.tar.gz |
skip blobs not filtered
19 files changed, 327 insertions, 127 deletions
diff --git a/ydb/core/formats/arrow_filter.h b/ydb/core/formats/arrow_filter.h index 8e1e60c57b..43aaed58d0 100644 --- a/ydb/core/formats/arrow_filter.h +++ b/ydb/core/formats/arrow_filter.h @@ -40,6 +40,46 @@ private: public: + class TIterator { + private: + ui32 InternalPosition = 0; + std::deque<ui32>::const_iterator It; + std::deque<ui32>::const_iterator ItEnd; + bool CurrentValue; + public: + TIterator(const std::deque<ui32>& filter, const bool startValue) + : It(filter.begin()) + , ItEnd(filter.end()) + , CurrentValue(startValue) + { + + } + + bool IsBatchForSkip(const ui32 size) const { + return !CurrentValue && (*It - InternalPosition) >= size; + } + + bool Next(const ui32 size) { + ui32 sizeRemain = size; + while (It != ItEnd) { + if (*It - InternalPosition > sizeRemain) { + InternalPosition += sizeRemain; + return true; + } else { + sizeRemain -= *It - InternalPosition; + InternalPosition = 0; + CurrentValue = !CurrentValue; + ++It; + } + } + return false; + } + }; + + TIterator GetIterator() const { + return TIterator(Filter, GetStartValue()); + } + TColumnFilter(std::vector<bool>&& values) { const ui32 count = values.size(); Reset(count, std::move(values)); diff --git a/ydb/core/formats/arrow_helpers.cpp b/ydb/core/formats/arrow_helpers.cpp index db47cb93bc..c1f2165b1d 100644 --- a/ydb/core/formats/arrow_helpers.cpp +++ b/ydb/core/formats/arrow_helpers.cpp @@ -225,12 +225,12 @@ std::shared_ptr<arrow::RecordBatch> DeserializeBatch(const TString& blob, const return *batch; } -std::shared_ptr<arrow::RecordBatch> MakeEmptyBatch(const std::shared_ptr<arrow::Schema>& schema) { +std::shared_ptr<arrow::RecordBatch> MakeEmptyBatch(const std::shared_ptr<arrow::Schema>& schema, const ui32 rowsCount) { std::vector<std::shared_ptr<arrow::Array>> columns; columns.reserve(schema->num_fields()); for (auto& field : schema->fields()) { - auto result = arrow::MakeArrayOfNull(field->type(), 0); + auto result = arrow::MakeArrayOfNull(field->type(), rowsCount); Y_VERIFY_OK(result.status()); columns.emplace_back(*result); Y_VERIFY(columns.back()); diff --git a/ydb/core/formats/arrow_helpers.h b/ydb/core/formats/arrow_helpers.h index 3d6f060171..d96fd11b0a 100644 --- a/ydb/core/formats/arrow_helpers.h +++ b/ydb/core/formats/arrow_helpers.h @@ -70,7 +70,7 @@ TString SerializeBatchNoCompression(const std::shared_ptr<arrow::RecordBatch>& b std::shared_ptr<arrow::RecordBatch> DeserializeBatch(const TString& blob, const std::shared_ptr<arrow::Schema>& schema); -std::shared_ptr<arrow::RecordBatch> MakeEmptyBatch(const std::shared_ptr<arrow::Schema>& schema); +std::shared_ptr<arrow::RecordBatch> MakeEmptyBatch(const std::shared_ptr<arrow::Schema>& schema, const ui32 rowsCount = 0); std::shared_ptr<arrow::RecordBatch> ExtractColumns(const std::shared_ptr<arrow::RecordBatch>& srcBatch, const std::vector<TString>& columnNames); diff --git a/ydb/core/tx/columnshard/columnshard__scan.cpp b/ydb/core/tx/columnshard/columnshard__scan.cpp index b408adb99a..56497ad8b0 100644 --- a/ydb/core/tx/columnshard/columnshard__scan.cpp +++ b/ydb/core/tx/columnshard/columnshard__scan.cpp @@ -346,7 +346,7 @@ private: } // Switch to the next range if the current one is finished - if (ScanIterator && ScanIterator->Finished() && !InFlightReads) { + if (ScanIterator && ScanIterator->Finished()) { NextReadMetadata(); } diff --git a/ydb/core/tx/columnshard/engines/columns_table.h b/ydb/core/tx/columnshard/engines/columns_table.h index 99021dcde3..8ad4b38abd 100644 --- a/ydb/core/tx/columnshard/engines/columns_table.h +++ b/ydb/core/tx/columnshard/engines/columns_table.h @@ -19,6 +19,10 @@ struct TColumnRecord { TBlobRange BlobRange; TString Metadata; + ui32 GetRowsCount() const { + return 0; + } + bool operator == (const TColumnRecord& rec) const { return (Granule == rec.Granule) && (ColumnId == rec.ColumnId) && (PlanStep == rec.PlanStep) && (TxId == rec.TxId) && (Portion == rec.Portion) && (Chunk == rec.Chunk); diff --git a/ydb/core/tx/columnshard/engines/index_info.cpp b/ydb/core/tx/columnshard/engines/index_info.cpp index e2a041af03..441089d748 100644 --- a/ydb/core/tx/columnshard/engines/index_info.cpp +++ b/ydb/core/tx/columnshard/engines/index_info.cpp @@ -54,11 +54,18 @@ std::shared_ptr<arrow::Schema> TIndexInfo::ArrowSchemaSnapshot() { } bool TIndexInfo::IsSpecialColumn(const arrow::Field& field) { - const auto& name = field.name(); - return (name == SPEC_COL_PLAN_STEP) - || (name == SPEC_COL_TX_ID); + return IsSpecialColumn(field.name()); } +bool TIndexInfo::IsSpecialColumn(const std::string& fieldName) { + return fieldName == SPEC_COL_PLAN_STEP + || fieldName == SPEC_COL_TX_ID; +} + +bool TIndexInfo::IsSpecialColumn(const ui32 fieldId) { + return fieldId == (ui32)ESpecialColumn::PLAN_STEP + || fieldId == (ui32)ESpecialColumn::TX_ID; +} ui32 TIndexInfo::GetColumnId(const std::string& name) const { auto id = GetColumnIdOptional(name); @@ -69,16 +76,15 @@ ui32 TIndexInfo::GetColumnId(const std::string& name) const { std::optional<ui32> TIndexInfo::GetColumnIdOptional(const std::string& name) const { const auto ni = ColumnNames.find(name); - if (ni == ColumnNames.end()) { - if (name == SPEC_COL_PLAN_STEP) { - return ui32(ESpecialColumn::PLAN_STEP); - } else if (name == SPEC_COL_TX_ID) { - return ui32(ESpecialColumn::TX_ID); - } - return {}; + if (ni != ColumnNames.end()) { + return ni->second; } - - return ni->second; + if (name == SPEC_COL_PLAN_STEP) { + return ui32(ESpecialColumn::PLAN_STEP); + } else if (name == SPEC_COL_TX_ID) { + return ui32(ESpecialColumn::TX_ID); + } + return {}; } TString TIndexInfo::GetColumnName(ui32 id, bool required) const { diff --git a/ydb/core/tx/columnshard/engines/index_info.h b/ydb/core/tx/columnshard/engines/index_info.h index d9b96bc902..c6af3b0d9b 100644 --- a/ydb/core/tx/columnshard/engines/index_info.h +++ b/ydb/core/tx/columnshard/engines/index_info.h @@ -48,7 +48,17 @@ public: /// Matches name of the filed with names of the special columns. static bool IsSpecialColumn(const arrow::Field& field); - + static bool IsSpecialColumn(const ui32 field); + static bool IsSpecialColumn(const std::string& fieldName); + template <class TContainer> + static bool IsSpecialColumns(const TContainer& c) { + for (auto&& i : c) { + if (!IsSpecialColumn(i)) { + return false; + } + } + return true; + } public: TIndexInfo(const TString& name, ui32 id); diff --git a/ydb/core/tx/columnshard/engines/indexed_read_data.cpp b/ydb/core/tx/columnshard/engines/indexed_read_data.cpp index 91e6d4d010..2fa4c30922 100644 --- a/ydb/core/tx/columnshard/engines/indexed_read_data.cpp +++ b/ydb/core/tx/columnshard/engines/indexed_read_data.cpp @@ -117,7 +117,7 @@ std::unique_ptr<NColumnShard::TScanIteratorBase> TReadMetadata::StartScan(NColum return std::make_unique<NColumnShard::TColumnShardScanIterator>(this->shared_from_this(), tasksProcessor, scanCounters); } -std::set<ui32> TReadMetadata::GetEarlyFilterColumnIds(const bool noTrivial) const { +std::set<ui32> TReadMetadata::GetEarlyFilterColumnIds() const { std::set<ui32> result; if (LessPredicate) { for (auto&& i : LessPredicate->ColumnNames()) { @@ -140,9 +140,6 @@ std::set<ui32> TReadMetadata::GetEarlyFilterColumnIds(const bool noTrivial) cons } } } - if (noTrivial && result.empty()) { - return result; - } if (PlanStep) { auto snapSchema = TIndexInfo::ArrowSchemaSnapshot(); for (auto&& i : snapSchema->fields()) { @@ -195,6 +192,19 @@ void TIndexedReadData::AddBlobForFetch(const TBlobRange& range, NIndexedReader:: } } +bool TIndexedReadData::PredictManyResultsAfterFilter(const TPortionInfo& portionInfo) const { + if (!portionInfo.AllowEarlyFilter()) { + return true; + } + if (EarlyFilterColumns.empty()) { + return true; + } + if (TIndexInfo::IsSpecialColumns(EarlyFilterColumns)) { + return true; + } + return false; +} + void TIndexedReadData::InitRead(ui32 inputBatch, bool inGranulesOrder) { Y_VERIFY(ReadMetadata->BlobSchema); Y_VERIFY(ReadMetadata->LoadSchema); @@ -222,10 +232,10 @@ void TIndexedReadData::InitRead(ui32 inputBatch, bool inGranulesOrder) { } NIndexedReader::TBatch& currentBatch = itGranule->second.AddBatch(batchNo, portionInfo); - if (portionInfo.AllowEarlyFilter()) { - currentBatch.Reset(&EarlyFilterColumns); + if (!OnePhaseReadMode && !PredictManyResultsAfterFilter(portionInfo)) { + currentBatch.ResetNoFilter(&EarlyFilterColumns); } else { - currentBatch.Reset(&UsedColumns); + currentBatch.ResetNoFilter(&UsedColumns); } Batches[batchNo] = ¤tBatch; ++batchNo; @@ -573,17 +583,13 @@ TIndexedReadData::TIndexedReadData(NOlap::TReadMetadata::TConstPtr readMetadata, : Counters(counters) , FetchBlobsQueue(fetchBlobsQueue) , ReadMetadata(readMetadata) + , OnePhaseReadMode(internalRead) { UsedColumns = ReadMetadata->GetUsedColumnIds(); PostFilterColumns = ReadMetadata->GetUsedColumnIds(); - EarlyFilterColumns = ReadMetadata->GetEarlyFilterColumnIds(true); - if (internalRead || EarlyFilterColumns.empty()) { - EarlyFilterColumns = PostFilterColumns; - PostFilterColumns.clear(); - } else { - for (auto&& i : EarlyFilterColumns) { - PostFilterColumns.erase(i); - } + EarlyFilterColumns = ReadMetadata->GetEarlyFilterColumnIds(); + for (auto&& i : EarlyFilterColumns) { + PostFilterColumns.erase(i); } Y_VERIFY(ReadMetadata->SelectInfo); } diff --git a/ydb/core/tx/columnshard/engines/indexed_read_data.h b/ydb/core/tx/columnshard/engines/indexed_read_data.h index 3a4c3c430d..b33355b129 100644 --- a/ydb/core/tx/columnshard/engines/indexed_read_data.h +++ b/ydb/core/tx/columnshard/engines/indexed_read_data.h @@ -126,7 +126,7 @@ struct TReadMetadata : public TReadMetadataBase, public std::enable_shared_from_ return result; } - std::set<ui32> GetEarlyFilterColumnIds(const bool noTrivial) const; + std::set<ui32> GetEarlyFilterColumnIds() const; std::set<ui32> GetUsedColumnIds() const; bool Empty() const { @@ -225,15 +225,14 @@ struct TPartialReadResult { class TIndexedReadData { private: - std::set<ui32> EarlyFilterColumns; - std::set<ui32> UsedColumns; + YDB_READONLY_DEF(std::set<ui32>, EarlyFilterColumns); YDB_READONLY_DEF(std::set<ui32>, PostFilterColumns); + std::set<ui32> UsedColumns; bool AbortedFlag = false; YDB_READONLY_DEF(NColumnShard::TScanCounters, Counters); std::vector<NIndexedReader::TBatch*> Batches; TFetchBlobsQueue& FetchBlobsQueue; - friend class NIndexedReader::TBatch; - friend class NIndexedReader::TGranule; + bool PredictManyResultsAfterFilter(const TPortionInfo& portionInfo) const; public: TIndexedReadData(NOlap::TReadMetadata::TConstPtr readMetadata, TFetchBlobsQueue& fetchBlobsQueue, const bool internalRead, const NColumnShard::TScanCounters& counters); @@ -272,17 +271,11 @@ public: Y_VERIFY(ReadyGranulesAccumulator.size() == Granules.size()); Y_VERIFY(!IsInProgress()); } -private: - NOlap::TReadMetadata::TConstPtr ReadMetadata; - - std::vector<std::shared_ptr<arrow::RecordBatch>> NotIndexed; - - void AddBlobForFetch(const TBlobRange& range, NIndexedReader::TBatch& batch); - void OnGranuleReady(NIndexedReader::TGranule& granule) { - Y_VERIFY(GranulesToOut.emplace(granule.GetGranuleId(), &granule).second); - Y_VERIFY(ReadyGranulesAccumulator.emplace(granule.GetGranuleId()).second || AbortedFlag); + NOlap::TReadMetadata::TConstPtr GetReadMetadata() const { + return ReadMetadata; } + void AddBlobForFetch(const TBlobRange& range, NIndexedReader::TBatch& batch); void OnBatchReady(const NIndexedReader::TBatch& batchInfo, std::shared_ptr<arrow::RecordBatch> batch) { if (batch && batch->num_rows()) { ReadMetadata->ReadStats->SelectedRows += batch->num_rows(); @@ -295,6 +288,17 @@ private: } } + void OnGranuleReady(NIndexedReader::TGranule& granule) { + Y_VERIFY(GranulesToOut.emplace(granule.GetGranuleId(), &granule).second); + Y_VERIFY(ReadyGranulesAccumulator.emplace(granule.GetGranuleId()).second || AbortedFlag); + } + +private: + NOlap::TReadMetadata::TConstPtr ReadMetadata; + bool OnePhaseReadMode = false; + + std::vector<std::shared_ptr<arrow::RecordBatch>> NotIndexed; + THashSet<const void*> BatchesToDedup; THashMap<TBlobRange, NIndexedReader::TBatch*> IndexedBlobSubscriber; // blobId -> batch THashMap<ui64, NIndexedReader::TGranule*> GranulesToOut; diff --git a/ydb/core/tx/columnshard/engines/portion_info.cpp b/ydb/core/tx/columnshard/engines/portion_info.cpp index 73035b6c24..c0d34a36c5 100644 --- a/ydb/core/tx/columnshard/engines/portion_info.cpp +++ b/ydb/core/tx/columnshard/engines/portion_info.cpp @@ -29,62 +29,6 @@ TString TPortionInfo::AddOneChunkColumn(const std::shared_ptr<arrow::Array>& arr return blob; } -TPortionInfo::TPreparedBatchData TPortionInfo::PrepareForAssemble(const TIndexInfo& indexInfo, - const std::shared_ptr<arrow::Schema>& schema, - const THashMap<TBlobRange, TString>& blobsData, const std::optional<std::set<ui32>>& columnIds) const { - // Correct records order - TMap<int, TMap<ui32, TBlobRange>> columnChunks; // position in schema -> ordered chunks - - std::vector<std::shared_ptr<arrow::Field>> schemaFields; - - for (auto&& i : schema->fields()) { - if (columnIds && !columnIds->contains(indexInfo.GetColumnId(i->name()))) { - continue; - } - schemaFields.emplace_back(i); - } - - for (auto& rec : Records) { - if (columnIds && !columnIds->contains(rec.ColumnId)) { - continue; - } - ui32 columnId = rec.ColumnId; - TString columnName = indexInfo.GetColumnName(columnId); - std::string name(columnName.data(), columnName.size()); - int pos = schema->GetFieldIndex(name); - if (pos < 0) { - continue; // no such column in schema - do not need it - } - - columnChunks[pos][rec.Chunk] = rec.BlobRange; - } - - // Make chunked arrays for columns - std::vector<TPreparedColumn> columns; - columns.reserve(columnChunks.size()); - - for (auto& [pos, orderedChunks] : columnChunks) { - auto field = schema->field(pos); - - TVector<TString> blobs; - blobs.reserve(orderedChunks.size()); - ui32 expected = 0; - for (auto& [chunk, blobId] : orderedChunks) { - Y_VERIFY(chunk == expected); - ++expected; - - auto it = blobsData.find(blobId); - Y_VERIFY(it != blobsData.end()); - TString data = it->second; - blobs.push_back(data); - } - - columns.emplace_back(TPreparedColumn(field, std::move(blobs))); - } - - return TPreparedBatchData(std::move(columns), std::make_shared<arrow::Schema>(schemaFields)); -} - void TPortionInfo::AddMinMax(ui32 columnId, const std::shared_ptr<arrow::Array>& column, bool sorted) { Y_VERIFY(column->length()); @@ -252,7 +196,7 @@ std::shared_ptr<arrow::ChunkedArray> TPortionInfo::TPreparedColumn::Assemble(con ui32 count = 0; if (!reverse) { for (auto& blob : Blobs) { - batches.push_back(NArrow::DeserializeBatch(blob, schema)); + batches.push_back(blob.BuildRecordBatch(schema)); Y_VERIFY(batches.back()); if (count + batches.back()->num_rows() >= needCount) { Y_VERIFY(count <= needCount); @@ -266,7 +210,7 @@ std::shared_ptr<arrow::ChunkedArray> TPortionInfo::TPreparedColumn::Assemble(con } } else { for (auto it = Blobs.rbegin(); it != Blobs.rend(); ++it) { - batches.push_back(NArrow::DeserializeBatch(*it, schema)); + batches.push_back(it->BuildRecordBatch(schema)); Y_VERIFY(batches.back()); if (count + batches.back()->num_rows() >= needCount) { Y_VERIFY(count <= needCount); @@ -288,11 +232,16 @@ std::shared_ptr<arrow::ChunkedArray> TPortionInfo::TPreparedColumn::Assemble(con std::shared_ptr<arrow::RecordBatch> TPortionInfo::TPreparedBatchData::Assemble(const TAssembleOptions& options) const { std::vector<std::shared_ptr<arrow::ChunkedArray>> columns; + std::vector< std::shared_ptr<arrow::Field>> fields; for (auto&& i : Columns) { + if (!options.IsAcceptedColumn(i.GetColumnId())) { + continue; + } columns.emplace_back(i.Assemble(options.GetRecordsCountLimitDef(Max<ui32>()), !options.IsForwardAssemble())); + fields.emplace_back(i.GetField()); } - auto table = arrow::Table::Make(Schema, columns); + auto table = arrow::Table::Make(std::make_shared<arrow::Schema>(fields), columns); auto res = table->CombineChunks(); Y_VERIFY(res.ok()); return NArrow::ToBatch(*res); diff --git a/ydb/core/tx/columnshard/engines/portion_info.h b/ydb/core/tx/columnshard/engines/portion_info.h index db2123bc94..ed6a188055 100644 --- a/ydb/core/tx/columnshard/engines/portion_info.h +++ b/ydb/core/tx/columnshard/engines/portion_info.h @@ -203,17 +203,49 @@ struct TPortionInfo { return Meta.ColumnMeta.find(columnId)->second.HasMinMax(); } + class TAssembleBlobInfo { + private: + YDB_READONLY(ui32, NullRowsCount, 0); + YDB_READONLY_DEF(TString, Data); + public: + TAssembleBlobInfo(const ui32 rowsCount) + : NullRowsCount(rowsCount) { + + } + + TAssembleBlobInfo(const TString& data) + : Data(data) { + + } + + std::shared_ptr<arrow::RecordBatch> BuildRecordBatch(std::shared_ptr<arrow::Schema> schema) const { + if (NullRowsCount) { + Y_VERIFY(!Data); + return NArrow::MakeEmptyBatch(schema, NullRowsCount); + } else { + Y_VERIFY(Data); + return NArrow::DeserializeBatch(Data, schema); + } + } + }; + class TPreparedColumn { private: + YDB_READONLY(ui32, ColumnId, 0); std::shared_ptr<arrow::Field> Field; - std::vector<TString> Blobs; + std::vector<TAssembleBlobInfo> Blobs; public: const std::string& GetName() const { return Field->name(); } - TPreparedColumn(const std::shared_ptr<arrow::Field>& field, std::vector<TString>&& blobs) - : Field(field) + std::shared_ptr<arrow::Field> GetField() const { + return Field; + } + + TPreparedColumn(const std::shared_ptr<arrow::Field>& field, std::vector<TAssembleBlobInfo>&& blobs, const ui32 columnId) + : ColumnId(columnId) + , Field(field) , Blobs(std::move(blobs)) { @@ -229,11 +261,26 @@ struct TPortionInfo { public: + std::vector<std::string> GetSchemaColumnNames() const { + return Schema->field_names(); + } + class TAssembleOptions { private: YDB_OPT(ui32, RecordsCountLimit); YDB_FLAG_ACCESSOR(ForwardAssemble, true); + YDB_OPT(std::set<ui32>, IncludedColumnIds); + YDB_OPT(std::set<ui32>, ExcludedColumnIds); public: + bool IsAcceptedColumn(const ui32 columnId) const { + if (IncludedColumnIds && !IncludedColumnIds->contains(columnId)) { + return false; + } + if (ExcludedColumnIds && ExcludedColumnIds->contains(columnId)) { + return false; + } + return true; + } }; size_t GetColumnsCount() const { @@ -250,9 +297,61 @@ struct TPortionInfo { std::shared_ptr<arrow::RecordBatch> Assemble(const TAssembleOptions& options = Default<TAssembleOptions>()) const; }; + template <class TExternalBlobInfo> TPreparedBatchData PrepareForAssemble(const TIndexInfo& indexInfo, - const std::shared_ptr<arrow::Schema>& schema, - const THashMap<TBlobRange, TString>& data, const std::optional<std::set<ui32>>& columnIds) const; + const std::shared_ptr<arrow::Schema>& schema, + const THashMap<TBlobRange, TExternalBlobInfo>& blobsData, const std::optional<std::set<ui32>>& columnIds) const { + // Correct records order + TMap<int, TMap<ui32, TBlobRange>> columnChunks; // position in schema -> ordered chunks + + std::vector<std::shared_ptr<arrow::Field>> schemaFields; + + for (auto&& i : schema->fields()) { + if (columnIds && !columnIds->contains(indexInfo.GetColumnId(i->name()))) { + continue; + } + schemaFields.emplace_back(i); + } + + for (auto& rec : Records) { + if (columnIds && !columnIds->contains(rec.ColumnId)) { + continue; + } + ui32 columnId = rec.ColumnId; + TString columnName = indexInfo.GetColumnName(columnId); + std::string name(columnName.data(), columnName.size()); + int pos = schema->GetFieldIndex(name); + if (pos < 0) { + continue; // no such column in schema - do not need it + } + + columnChunks[pos][rec.Chunk] = rec.BlobRange; + } + + // Make chunked arrays for columns + std::vector<TPreparedColumn> columns; + columns.reserve(columnChunks.size()); + + for (auto& [pos, orderedChunks] : columnChunks) { + auto field = schema->field(pos); + TVector<TAssembleBlobInfo> blobs; + blobs.reserve(orderedChunks.size()); + ui32 expected = 0; + for (auto& [chunk, blobRange] : orderedChunks) { + Y_VERIFY(chunk == expected); + ++expected; + + auto it = blobsData.find(blobRange); + Y_VERIFY(it != blobsData.end()); + blobs.emplace_back(it->second); + } + + columns.emplace_back(TPreparedColumn(field, std::move(blobs), indexInfo.GetColumnId(field->name()))); + } + + return TPreparedBatchData(std::move(columns), std::make_shared<arrow::Schema>(schemaFields)); + } + std::shared_ptr<arrow::RecordBatch> AssembleInBatch(const TIndexInfo& indexInfo, const std::shared_ptr<arrow::Schema>& schema, const THashMap<TBlobRange, TString>& data) const { diff --git a/ydb/core/tx/columnshard/engines/reader/batch.cpp b/ydb/core/tx/columnshard/engines/reader/batch.cpp index c6baead8e9..3ac9a61cfe 100644 --- a/ydb/core/tx/columnshard/engines/reader/batch.cpp +++ b/ydb/core/tx/columnshard/engines/reader/batch.cpp @@ -29,7 +29,7 @@ NColumnShard::IDataTasksProcessor::ITask::TPtr TBatch::AssembleTask(NColumnShard auto batchConstructor = PortionInfo->PrepareForAssemble(readMetadata->IndexInfo, readMetadata->LoadSchema, Data, CurrentColumnIds); Data.clear(); if (!Filter) { - return std::make_shared<TAssembleFilter>(std::move(batchConstructor), readMetadata, *this, PortionInfo->AllowEarlyFilter(), processor); + return std::make_shared<TAssembleFilter>(std::move(batchConstructor), readMetadata, *this, PortionInfo->AllowEarlyFilter(), Owner->GetEarlyFilterColumns(), processor); } else { Y_VERIFY(FilterBatch); return std::make_shared<TAssembleBatch>(std::move(batchConstructor), *this, readMetadata->GetColumnsOrder(), processor); @@ -65,7 +65,7 @@ ui64 TBatch::GetFetchBytes(const std::set<ui32>* columnIds) { return result; } -void TBatch::Reset(const std::set<ui32>* columnIds) { +void TBatch::ResetCommon(const std::set<ui32>* columnIds) { if (!columnIds) { CurrentColumnIds.reset(); } else { @@ -80,13 +80,18 @@ void TBatch::Reset(const std::set<ui32>* columnIds) { Y_VERIFY(Data.empty()); WaitingBytes = 0; FetchedBytes = 0; +} + +void TBatch::ResetNoFilter(const std::set<ui32>* columnIds) { + Y_VERIFY(!Filter); + ResetCommon(columnIds); for (const NOlap::TColumnRecord& rec : PortionInfo->Records) { if (CurrentColumnIds && !CurrentColumnIds->contains(rec.ColumnId)) { continue; } AskedColumnIds.emplace(rec.ColumnId); Y_VERIFY(WaitIndexed.emplace(rec.BlobRange).second); - Owner->Owner->AddBlobForFetch(rec.BlobRange, *this); + Owner->AddBlobForFetch(rec.BlobRange, *this); Y_VERIFY(rec.Portion == Portion); Y_VERIFY(rec.Valid()); Y_VERIFY(Granule == rec.Granule); @@ -94,6 +99,46 @@ void TBatch::Reset(const std::set<ui32>* columnIds) { } } +void TBatch::ResetWithFilter(const std::set<ui32>* columnIds) { + Y_VERIFY(Filter); + ResetCommon(columnIds); + std::map<ui32, std::map<ui16, const TColumnRecord*>> orderedObjects; + for (const NOlap::TColumnRecord& rec : PortionInfo->Records) { + if (CurrentColumnIds && !CurrentColumnIds->contains(rec.ColumnId)) { + continue; + } + AskedColumnIds.emplace(rec.ColumnId); + orderedObjects[rec.ColumnId][rec.Chunk] = &rec; + Y_VERIFY(rec.Valid()); + Y_VERIFY(Portion == rec.Portion); + Y_VERIFY(Granule == rec.Granule); + } + + for (auto&& columnInfo : orderedObjects) { + ui32 expected = 0; + auto it = Filter->GetIterator(); + bool undefinedShift = false; + bool itFinished = false; + for (auto&& [chunk, rec] : columnInfo.second) { + Y_VERIFY(!itFinished); + Y_VERIFY(expected++ == chunk); + if (!rec->GetRowsCount()) { + undefinedShift = true; + } + if (!undefinedShift && it.IsBatchForSkip(rec->GetRowsCount())) { + Data.emplace(rec->BlobRange, TPortionInfo::TAssembleBlobInfo(rec->GetRowsCount())); + } else { + Y_VERIFY(WaitIndexed.emplace(rec->BlobRange).second); + Owner->AddBlobForFetch(rec->BlobRange, *this); + WaitingBytes += rec->BlobRange.Size; + } + if (!undefinedShift) { + itFinished = !it.Next(rec->GetRowsCount()); + } + } + } +} + void TBatch::InitFilter(std::shared_ptr<NArrow::TColumnFilter> filter, std::shared_ptr<arrow::RecordBatch> filterBatch) { Y_VERIFY(filter); Y_VERIFY(!Filter); @@ -110,11 +155,12 @@ void TBatch::InitBatch(std::shared_ptr<arrow::RecordBatch> batch) { bool TBatch::AddIndexedReady(const TBlobRange& bRange, const TString& blobData) { if (!WaitIndexed.erase(bRange)) { + Y_ASSERT(false); return false; } WaitingBytes -= bRange.Size; FetchedBytes += bRange.Size; - Data.emplace(bRange, blobData); + Data.emplace(bRange, TPortionInfo::TAssembleBlobInfo(blobData)); return true; } diff --git a/ydb/core/tx/columnshard/engines/reader/batch.h b/ydb/core/tx/columnshard/engines/reader/batch.h index d6dd76809c..55f3417d26 100644 --- a/ydb/core/tx/columnshard/engines/reader/batch.h +++ b/ydb/core/tx/columnshard/engines/reader/batch.h @@ -30,20 +30,20 @@ private: YDB_READONLY_DEF(std::shared_ptr<arrow::RecordBatch>, FilterBatch); YDB_READONLY_DEF(std::shared_ptr<NArrow::TColumnFilter>, Filter); YDB_FLAG_ACCESSOR(DuplicationsAvailable, false); - THashMap<TBlobRange, TString> Data; - TGranule* Owner = nullptr; + THashMap<TBlobRange, TPortionInfo::TAssembleBlobInfo> Data; + TGranule* Owner; const TPortionInfo* PortionInfo = nullptr; - friend class TGranule; - TBatch(const ui32 batchNo, TGranule& owner, const TPortionInfo& portionInfo); - YDB_READONLY_DEF(std::optional<std::set<ui32>>, CurrentColumnIds); std::set<ui32> AskedColumnIds; + void ResetCommon(const std::set<ui32>* columnIds); public: + TBatch(const ui32 batchNo, TGranule& owner, const TPortionInfo& portionInfo); bool AddIndexedReady(const TBlobRange& bRange, const TString& blobData); bool AskedColumnsAlready(const std::set<ui32>& columnIds) const; - void Reset(const std::set<ui32>* columnIds); + void ResetNoFilter(const std::set<ui32>* columnIds); + void ResetWithFilter(const std::set<ui32>* columnIds); ui64 GetFetchBytes(const std::set<ui32>* columnIds); void InitFilter(std::shared_ptr<NArrow::TColumnFilter> filter, std::shared_ptr<arrow::RecordBatch> filterBatch); diff --git a/ydb/core/tx/columnshard/engines/reader/conveyor_task.cpp b/ydb/core/tx/columnshard/engines/reader/conveyor_task.cpp index c013266d6f..c3738a9c66 100644 --- a/ydb/core/tx/columnshard/engines/reader/conveyor_task.cpp +++ b/ydb/core/tx/columnshard/engines/reader/conveyor_task.cpp @@ -22,6 +22,10 @@ bool IDataTasksProcessor::ITask::Apply(NOlap::TIndexedReadData& indexedDataRead) return DoApply(indexedDataRead); } +TDataTasksProcessorContainer IDataTasksProcessor::ITask::GetTasksProcessorContainer() const { + return TDataTasksProcessorContainer(OwnerOperator); +} + bool IDataTasksProcessor::Add(ITask::TPtr task) { if (IsStopped()) { return false; diff --git a/ydb/core/tx/columnshard/engines/reader/conveyor_task.h b/ydb/core/tx/columnshard/engines/reader/conveyor_task.h index efb808cdac..90a8f98a10 100644 --- a/ydb/core/tx/columnshard/engines/reader/conveyor_task.h +++ b/ydb/core/tx/columnshard/engines/reader/conveyor_task.h @@ -8,7 +8,7 @@ class TIndexedReadData; namespace NKikimr::NColumnShard { -class IDataTasksProcessor; +class TDataTasksProcessorContainer; class IDataTasksProcessor { private: @@ -22,6 +22,7 @@ public: std::shared_ptr<IDataTasksProcessor> OwnerOperator; YDB_READONLY_FLAG(DataProcessed, false); protected: + TDataTasksProcessorContainer GetTasksProcessorContainer() const; virtual bool DoApply(NOlap::TIndexedReadData& indexedDataRead) const = 0; virtual bool DoExecuteImpl() = 0; diff --git a/ydb/core/tx/columnshard/engines/reader/filter_assembler.cpp b/ydb/core/tx/columnshard/engines/reader/filter_assembler.cpp index 5039b0405d..1ae809aa49 100644 --- a/ydb/core/tx/columnshard/engines/reader/filter_assembler.cpp +++ b/ydb/core/tx/columnshard/engines/reader/filter_assembler.cpp @@ -7,7 +7,10 @@ bool TAssembleFilter::DoExecuteImpl() { /// @warning The replace logic is correct only in assumption that predicate is applied over a part of ReplaceKey. /// It's not OK to apply predicate before replacing key duplicates otherwise. /// Assumption: dup(A, B) <=> PK(A) = PK(B) => Predicate(A) = Predicate(B) => all or no dups for PK(A) here - auto batch = BatchConstructor.Assemble(); + + TPortionInfo::TPreparedBatchData::TAssembleOptions options; + options.SetIncludedColumnIds(FilterColumnIds); + auto batch = BatchConstructor.Assemble(options); Y_VERIFY(batch); Y_VERIFY(batch->num_rows()); OriginalCount = batch->num_rows(); @@ -26,8 +29,18 @@ bool TAssembleFilter::DoExecuteImpl() { return true; } } + + if ((size_t)batch->schema()->num_fields() < BatchConstructor.GetColumnsCount()) { + TPortionInfo::TPreparedBatchData::TAssembleOptions options; + options.SetExcludedColumnIds(FilterColumnIds); + auto addBatch = BatchConstructor.Assemble(options); + Y_VERIFY(addBatch); + Y_VERIFY(Filter->Apply(addBatch)); + Y_VERIFY(NArrow::MergeBatchColumns({ batch, addBatch }, batch, BatchConstructor.GetSchemaColumnNames(), true)); + } AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "not_skip_data") - ("original_count", OriginalCount)("filtered_count", batch->num_rows())("columns_count", BatchConstructor.GetColumnsCount())("allow_early", AllowEarlyFilter); + ("original_count", OriginalCount)("filtered_count", batch->num_rows())("columns_count", BatchConstructor.GetColumnsCount())("allow_early", AllowEarlyFilter) + ("filter_columns", FilterColumnIds.size()); FilteredBatch = batch; return true; @@ -57,7 +70,13 @@ bool TAssembleFilter::DoApply(TIndexedReadData& owner) const { owner.GetCounters().GetTwoPhasesFilterFetchedBytes()->Add(batch.GetFetchedBytes()); owner.GetCounters().GetTwoPhasesFilterUsefulBytes()->Add(batch.GetFetchedBytes() * FilteredBatch->num_rows() / OriginalCount); - batch.Reset(&owner.GetPostFilterColumns()); + batch.ResetWithFilter(&owner.GetPostFilterColumns()); + if (batch.IsFetchingReady()) { + auto processor = GetTasksProcessorContainer(); + if (auto assembleBatchTask = batch.AssembleTask(processor.GetObject(), owner.GetReadMetadata())) { + processor.Add(owner, assembleBatchTask); + } + } owner.GetCounters().GetTwoPhasesCount()->Add(1); owner.GetCounters().GetTwoPhasesPostFilterFetchedBytes()->Add(batch.GetWaitingBytes()); diff --git a/ydb/core/tx/columnshard/engines/reader/filter_assembler.h b/ydb/core/tx/columnshard/engines/reader/filter_assembler.h index 902276896f..7ec1b9af0c 100644 --- a/ydb/core/tx/columnshard/engines/reader/filter_assembler.h +++ b/ydb/core/tx/columnshard/engines/reader/filter_assembler.h @@ -19,17 +19,19 @@ namespace NKikimr::NOlap::NIndexedReader { const ui32 BatchNo; ui32 OriginalCount = 0; bool AllowEarlyFilter = false; + std::set<ui32> FilterColumnIds; protected: virtual bool DoApply(TIndexedReadData& owner) const override; virtual bool DoExecuteImpl() override; public: TAssembleFilter(TPortionInfo::TPreparedBatchData&& batchConstructor, NOlap::TReadMetadata::TConstPtr readMetadata, - TBatch& batch, const bool allowEarlyFilter, NColumnShard::IDataTasksProcessor::TPtr processor) + TBatch& batch, const bool allowEarlyFilter, const std::set<ui32>& filterColumnIds, NColumnShard::IDataTasksProcessor::TPtr processor) : TBase(processor) , BatchConstructor(batchConstructor) , ReadMetadata(readMetadata) , BatchNo(batch.GetBatchNo()) , AllowEarlyFilter(allowEarlyFilter) + , FilterColumnIds(filterColumnIds) { TBase::SetPriority(TBase::EPriority::Normal); } diff --git a/ydb/core/tx/columnshard/engines/reader/granule.cpp b/ydb/core/tx/columnshard/engines/reader/granule.cpp index f92bf1b482..e7c4d80f82 100644 --- a/ydb/core/tx/columnshard/engines/reader/granule.cpp +++ b/ydb/core/tx/columnshard/engines/reader/granule.cpp @@ -25,4 +25,12 @@ NKikimr::NOlap::NIndexedReader::TBatch& TGranule::AddBatch(const ui32 batchNo, c return infoEmplace.first->second; } +void TGranule::AddBlobForFetch(const TBlobRange& range, NIndexedReader::TBatch& batch) const { + Owner->AddBlobForFetch(range, batch); +} + +const std::set<ui32>& TGranule::GetEarlyFilterColumns() const { + return Owner->GetEarlyFilterColumns(); +} + } diff --git a/ydb/core/tx/columnshard/engines/reader/granule.h b/ydb/core/tx/columnshard/engines/reader/granule.h index cad0a9318e..6c32ea426e 100644 --- a/ydb/core/tx/columnshard/engines/reader/granule.h +++ b/ydb/core/tx/columnshard/engines/reader/granule.h @@ -17,8 +17,6 @@ private: THashMap<ui32, TBatch> Batches; std::set<ui32> WaitBatches; TIndexedReadData* Owner = nullptr; - void OnBatchReady(const TBatch& batchInfo, std::shared_ptr<arrow::RecordBatch> batch); - friend class NIndexedReader::TBatch; public: TGranule(const ui64 granuleId, TIndexedReadData& owner) : GranuleId(granuleId) @@ -26,7 +24,11 @@ public: } + const std::set<ui32>& GetEarlyFilterColumns() const; + void OnBatchReady(const TBatch& batchInfo, std::shared_ptr<arrow::RecordBatch> batch); TBatch& AddBatch(const ui32 batchNo, const TPortionInfo& portionInfo); + void AddBlobForFetch(const TBlobRange& range, NIndexedReader::TBatch& batch) const; + }; } |