diff options
author | ivanmorozov <ivanmorozov@yandex-team.com> | 2023-04-05 16:04:54 +0300 |
---|---|---|
committer | ivanmorozov <ivanmorozov@yandex-team.com> | 2023-04-05 16:04:54 +0300 |
commit | f46f5c0317224bbacf907298153f3a3225e9e5ec (patch) | |
tree | 6e0ee82ba37f02989255116dc52ceb2fa7c40191 | |
parent | e4bb8963cf5d91898a2aa4534a719d7bf0f96b1a (diff) | |
download | ydb-f46f5c0317224bbacf907298153f3a3225e9e5ec.tar.gz |
assemble into background conveyor
-rw-r--r-- | ydb/core/tx/columnshard/engines/indexed_read_data.cpp | 11 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/engines/indexed_read_data.h | 6 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/engines/portion_info.cpp | 58 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/engines/portion_info.h | 47 |
4 files changed, 78 insertions, 44 deletions
diff --git a/ydb/core/tx/columnshard/engines/indexed_read_data.cpp b/ydb/core/tx/columnshard/engines/indexed_read_data.cpp index 050344eee48..2beab575c9b 100644 --- a/ydb/core/tx/columnshard/engines/indexed_read_data.cpp +++ b/ydb/core/tx/columnshard/engines/indexed_read_data.cpp @@ -118,7 +118,9 @@ bool TIndexedReadData::TAssembledNotFiltered::DoExecuteImpl() { /// @warning The replace logic is correct only in assumption that predicate is applyed over a part of ReplaceKey. /// It's not OK to apply predicate before replacing key duplicates otherwise. /// Assumption: dup(A, B) <=> PK(A) = PK(B) => Predicate(A) = Predicate(B) => all or no dups for PK(A) here - auto filtered = NOlap::FilterPortion(Batch, *ReadMetadata); + auto batch = BatchConstructor.Assemble(); + Y_VERIFY(batch); + auto filtered = NOlap::FilterPortion(batch, *ReadMetadata); if (filtered.Batch) { Y_VERIFY(filtered.Valid()); filtered.ApplyFilter(); @@ -131,6 +133,8 @@ bool TIndexedReadData::TAssembledNotFiltered::DoExecuteImpl() { Y_VERIFY(filtered.Valid()); filtered.ApplyFilter(); } +#else + Y_UNUSED(AllowEarlyFilter); #endif FilteredBatch = filtered.Batch; return true; @@ -276,15 +280,14 @@ NColumnShard::IDataPreparationTask::TPtr TIndexedReadData::AssembleIndexedBatch( auto& portionInfo = Portion(batchNo); Y_VERIFY(portionInfo.Produced()); - auto batch = portionInfo.AssembleInBatch(ReadMetadata->IndexInfo, ReadMetadata->LoadSchema, Data); - Y_VERIFY(batch); + auto batchConstructor = portionInfo.PrepareForAssemble(ReadMetadata->IndexInfo, ReadMetadata->LoadSchema, Data); for (auto& rec : portionInfo.Records) { auto& blobRange = rec.BlobRange; Data.erase(blobRange); } - return std::make_shared<TAssembledNotFiltered>(batch, ReadMetadata, batchNo, portionInfo.AllowEarlyFilter(), processor); + return std::make_shared<TAssembledNotFiltered>(std::move(batchConstructor), ReadMetadata, batchNo, portionInfo.AllowEarlyFilter(), processor); } void TIndexedReadData::UpdateGranuleWaits(ui32 batchNo) { diff --git a/ydb/core/tx/columnshard/engines/indexed_read_data.h b/ydb/core/tx/columnshard/engines/indexed_read_data.h index b815aa216b7..68cdc237530 100644 --- a/ydb/core/tx/columnshard/engines/indexed_read_data.h +++ b/ydb/core/tx/columnshard/engines/indexed_read_data.h @@ -279,7 +279,7 @@ private: class TAssembledNotFiltered: public NColumnShard::IDataPreparationTask { private: using TBase = NColumnShard::IDataPreparationTask; - std::shared_ptr<arrow::RecordBatch> Batch; + TPortionInfo::TPreparedBatchData BatchConstructor; std::shared_ptr<arrow::RecordBatch> FilteredBatch; NOlap::TReadMetadata::TConstPtr ReadMetadata; ui32 BatchNo = 0; @@ -288,10 +288,10 @@ private: virtual bool DoApply(TIndexedReadData& owner) const override; virtual bool DoExecuteImpl() override; public: - TAssembledNotFiltered(std::shared_ptr<arrow::RecordBatch> batch, NOlap::TReadMetadata::TConstPtr readMetadata, + TAssembledNotFiltered(TPortionInfo::TPreparedBatchData&& batchConstructor, NOlap::TReadMetadata::TConstPtr readMetadata, const ui32 batchNo, const bool allowEarlyFilter, NColumnShard::IDataTasksProcessor::TPtr processor) : TBase(processor) - , Batch(batch) + , BatchConstructor(batchConstructor) , ReadMetadata(readMetadata) , BatchNo(batchNo) , AllowEarlyFilter(allowEarlyFilter) diff --git a/ydb/core/tx/columnshard/engines/portion_info.cpp b/ydb/core/tx/columnshard/engines/portion_info.cpp index 343587642cd..80475480293 100644 --- a/ydb/core/tx/columnshard/engines/portion_info.cpp +++ b/ydb/core/tx/columnshard/engines/portion_info.cpp @@ -14,26 +14,6 @@ TString TPortionInfo::SerializeColumn(const std::shared_ptr<arrow::Array>& array return NArrow::SerializeBatch(batch, writeOptions); } -namespace { - -std::shared_ptr<arrow::ChunkedArray> DeserializeBlobs(const TVector<TString>& blobs, std::shared_ptr<arrow::Field> field) { - Y_VERIFY(!blobs.empty()); - auto schema = std::make_shared<arrow::Schema>(arrow::FieldVector{field}); - - std::vector<std::shared_ptr<arrow::RecordBatch>> batches; - batches.reserve(blobs.size()); - for (auto& blob : blobs) { - batches.push_back(NArrow::DeserializeBatch(blob, schema)); - Y_VERIFY(batches.back()); - } - - auto res = arrow::Table::FromRecordBatches(batches); - Y_VERIFY_S(res.ok(), res.status().message()); - return (*res)->column(0); -} - -} - TString TPortionInfo::AddOneChunkColumn(const std::shared_ptr<arrow::Array>& array, const std::shared_ptr<arrow::Field>& field, TColumnRecord&& record, @@ -49,7 +29,7 @@ TString TPortionInfo::AddOneChunkColumn(const std::shared_ptr<arrow::Array>& arr return blob; } -std::shared_ptr<arrow::Table> TPortionInfo::Assemble(const TIndexInfo& indexInfo, +TPortionInfo::TPreparedBatchData TPortionInfo::PrepareForAssemble(const TIndexInfo& indexInfo, const std::shared_ptr<arrow::Schema>& schema, const THashMap<TBlobRange, TString>& blobsData) const { // Correct records order @@ -68,7 +48,7 @@ std::shared_ptr<arrow::Table> TPortionInfo::Assemble(const TIndexInfo& indexInfo } // Make chunked arrays for columns - std::vector<std::shared_ptr<arrow::ChunkedArray>> columns; + std::vector<TPreparedColumn> columns; columns.reserve(columnChunks.size()); for (auto& [pos, orderedChunks] : columnChunks) { @@ -81,24 +61,16 @@ std::shared_ptr<arrow::Table> TPortionInfo::Assemble(const TIndexInfo& indexInfo Y_VERIFY(chunk == expected); ++expected; - Y_VERIFY(blobsData.count(blobId)); - TString data = blobsData.find(blobId)->second; + auto it = blobsData.find(blobId); + Y_VERIFY(it != blobsData.end()); + TString data = it->second; blobs.push_back(data); } - columns.push_back(DeserializeBlobs(blobs, field)); + columns.emplace_back(TPreparedColumn(field, std::move(blobs))); } - return arrow::Table::Make(schema, columns); -} - -std::shared_ptr<arrow::RecordBatch> TPortionInfo::AssembleInBatch(const TIndexInfo& indexInfo, - const std::shared_ptr<arrow::Schema>& schema, - const THashMap<TBlobRange, TString>& data) const { - std::shared_ptr<arrow::Table> portion = Assemble(indexInfo, schema, data); - auto res = portion->CombineChunks(); - Y_VERIFY(res.ok()); - return NArrow::ToBatch(*res); + return TPreparedBatchData(std::move(columns), schema); } void TPortionInfo::AddMinMax(ui32 columnId, const std::shared_ptr<arrow::Array>& column, bool sorted) { @@ -248,4 +220,20 @@ std::shared_ptr<arrow::Scalar> TPortionInfo::MaxValue(ui32 columnId) const { return Meta.ColumnMeta.find(columnId)->second.Max; } +std::shared_ptr<arrow::ChunkedArray> TPortionInfo::TPreparedColumn::Assemble() const { + Y_VERIFY(!Blobs.empty()); + auto schema = std::make_shared<arrow::Schema>(arrow::FieldVector{ Field }); + + std::vector<std::shared_ptr<arrow::RecordBatch>> batches; + batches.reserve(Blobs.size()); + for (auto& blob : Blobs) { + batches.push_back(NArrow::DeserializeBatch(blob, schema)); + Y_VERIFY(batches.back()); + } + + auto res = arrow::Table::FromRecordBatches(batches); + Y_VERIFY_S(res.ok(), res.status().message()); + return (*res)->column(0); +} + } diff --git a/ydb/core/tx/columnshard/engines/portion_info.h b/ydb/core/tx/columnshard/engines/portion_info.h index 5a1af22138d..fc9912cc738 100644 --- a/ydb/core/tx/columnshard/engines/portion_info.h +++ b/ydb/core/tx/columnshard/engines/portion_info.h @@ -194,12 +194,55 @@ struct TPortionInfo { return Meta.ColumnMeta.find(columnId)->second.HasMinMax(); } - std::shared_ptr<arrow::Table> Assemble(const TIndexInfo& indexInfo, + class TPreparedColumn { + private: + std::shared_ptr<arrow::Field> Field; + std::vector<TString> Blobs; + + public: + TPreparedColumn(const std::shared_ptr<arrow::Field>& field, std::vector<TString>&& blobs) + : Field(field) + , Blobs(std::move(blobs)) + { + + } + + std::shared_ptr<arrow::ChunkedArray> Assemble() const; + }; + + class TPreparedBatchData { + private: + std::vector<TPreparedColumn> Columns; + std::shared_ptr<arrow::Schema> Schema; + public: + TPreparedBatchData(std::vector<TPreparedColumn>&& columns, std::shared_ptr<arrow::Schema> schema) + : Columns(std::move(columns)) + , Schema(schema) + { + + } + + std::shared_ptr<arrow::RecordBatch> Assemble() { + std::vector<std::shared_ptr<arrow::ChunkedArray>> columns; + for (auto&& i : Columns) { + columns.emplace_back(i.Assemble()); + } + + auto table = arrow::Table::Make(Schema, columns); + auto res = table->CombineChunks(); + Y_VERIFY(res.ok()); + return NArrow::ToBatch(*res); + } + }; + + TPreparedBatchData PrepareForAssemble(const TIndexInfo& indexInfo, const std::shared_ptr<arrow::Schema>& schema, const THashMap<TBlobRange, TString>& data) const; std::shared_ptr<arrow::RecordBatch> AssembleInBatch(const TIndexInfo& indexInfo, const std::shared_ptr<arrow::Schema>& schema, - const THashMap<TBlobRange, TString>& data) const; + const THashMap<TBlobRange, TString>& data) const { + return PrepareForAssemble(indexInfo, schema, data).Assemble(); + } static TString SerializeColumn(const std::shared_ptr<arrow::Array>& array, const std::shared_ptr<arrow::Field>& field, |