aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorivanmorozov <ivanmorozov@yandex-team.com>2023-04-05 16:04:54 +0300
committerivanmorozov <ivanmorozov@yandex-team.com>2023-04-05 16:04:54 +0300
commitf46f5c0317224bbacf907298153f3a3225e9e5ec (patch)
tree6e0ee82ba37f02989255116dc52ceb2fa7c40191
parente4bb8963cf5d91898a2aa4534a719d7bf0f96b1a (diff)
downloadydb-f46f5c0317224bbacf907298153f3a3225e9e5ec.tar.gz
assemble into background conveyor
-rw-r--r--ydb/core/tx/columnshard/engines/indexed_read_data.cpp11
-rw-r--r--ydb/core/tx/columnshard/engines/indexed_read_data.h6
-rw-r--r--ydb/core/tx/columnshard/engines/portion_info.cpp58
-rw-r--r--ydb/core/tx/columnshard/engines/portion_info.h47
4 files changed, 78 insertions, 44 deletions
diff --git a/ydb/core/tx/columnshard/engines/indexed_read_data.cpp b/ydb/core/tx/columnshard/engines/indexed_read_data.cpp
index 050344eee48..2beab575c9b 100644
--- a/ydb/core/tx/columnshard/engines/indexed_read_data.cpp
+++ b/ydb/core/tx/columnshard/engines/indexed_read_data.cpp
@@ -118,7 +118,9 @@ bool TIndexedReadData::TAssembledNotFiltered::DoExecuteImpl() {
/// @warning The replace logic is correct only in assumption that predicate is applyed over a part of ReplaceKey.
/// It's not OK to apply predicate before replacing key duplicates otherwise.
/// Assumption: dup(A, B) <=> PK(A) = PK(B) => Predicate(A) = Predicate(B) => all or no dups for PK(A) here
- auto filtered = NOlap::FilterPortion(Batch, *ReadMetadata);
+ auto batch = BatchConstructor.Assemble();
+ Y_VERIFY(batch);
+ auto filtered = NOlap::FilterPortion(batch, *ReadMetadata);
if (filtered.Batch) {
Y_VERIFY(filtered.Valid());
filtered.ApplyFilter();
@@ -131,6 +133,8 @@ bool TIndexedReadData::TAssembledNotFiltered::DoExecuteImpl() {
Y_VERIFY(filtered.Valid());
filtered.ApplyFilter();
}
+#else
+ Y_UNUSED(AllowEarlyFilter);
#endif
FilteredBatch = filtered.Batch;
return true;
@@ -276,15 +280,14 @@ NColumnShard::IDataPreparationTask::TPtr TIndexedReadData::AssembleIndexedBatch(
auto& portionInfo = Portion(batchNo);
Y_VERIFY(portionInfo.Produced());
- auto batch = portionInfo.AssembleInBatch(ReadMetadata->IndexInfo, ReadMetadata->LoadSchema, Data);
- Y_VERIFY(batch);
+ auto batchConstructor = portionInfo.PrepareForAssemble(ReadMetadata->IndexInfo, ReadMetadata->LoadSchema, Data);
for (auto& rec : portionInfo.Records) {
auto& blobRange = rec.BlobRange;
Data.erase(blobRange);
}
- return std::make_shared<TAssembledNotFiltered>(batch, ReadMetadata, batchNo, portionInfo.AllowEarlyFilter(), processor);
+ return std::make_shared<TAssembledNotFiltered>(std::move(batchConstructor), ReadMetadata, batchNo, portionInfo.AllowEarlyFilter(), processor);
}
void TIndexedReadData::UpdateGranuleWaits(ui32 batchNo) {
diff --git a/ydb/core/tx/columnshard/engines/indexed_read_data.h b/ydb/core/tx/columnshard/engines/indexed_read_data.h
index b815aa216b7..68cdc237530 100644
--- a/ydb/core/tx/columnshard/engines/indexed_read_data.h
+++ b/ydb/core/tx/columnshard/engines/indexed_read_data.h
@@ -279,7 +279,7 @@ private:
class TAssembledNotFiltered: public NColumnShard::IDataPreparationTask {
private:
using TBase = NColumnShard::IDataPreparationTask;
- std::shared_ptr<arrow::RecordBatch> Batch;
+ TPortionInfo::TPreparedBatchData BatchConstructor;
std::shared_ptr<arrow::RecordBatch> FilteredBatch;
NOlap::TReadMetadata::TConstPtr ReadMetadata;
ui32 BatchNo = 0;
@@ -288,10 +288,10 @@ private:
virtual bool DoApply(TIndexedReadData& owner) const override;
virtual bool DoExecuteImpl() override;
public:
- TAssembledNotFiltered(std::shared_ptr<arrow::RecordBatch> batch, NOlap::TReadMetadata::TConstPtr readMetadata,
+ TAssembledNotFiltered(TPortionInfo::TPreparedBatchData&& batchConstructor, NOlap::TReadMetadata::TConstPtr readMetadata,
const ui32 batchNo, const bool allowEarlyFilter, NColumnShard::IDataTasksProcessor::TPtr processor)
: TBase(processor)
- , Batch(batch)
+ , BatchConstructor(batchConstructor)
, ReadMetadata(readMetadata)
, BatchNo(batchNo)
, AllowEarlyFilter(allowEarlyFilter)
diff --git a/ydb/core/tx/columnshard/engines/portion_info.cpp b/ydb/core/tx/columnshard/engines/portion_info.cpp
index 343587642cd..80475480293 100644
--- a/ydb/core/tx/columnshard/engines/portion_info.cpp
+++ b/ydb/core/tx/columnshard/engines/portion_info.cpp
@@ -14,26 +14,6 @@ TString TPortionInfo::SerializeColumn(const std::shared_ptr<arrow::Array>& array
return NArrow::SerializeBatch(batch, writeOptions);
}
-namespace {
-
-std::shared_ptr<arrow::ChunkedArray> DeserializeBlobs(const TVector<TString>& blobs, std::shared_ptr<arrow::Field> field) {
- Y_VERIFY(!blobs.empty());
- auto schema = std::make_shared<arrow::Schema>(arrow::FieldVector{field});
-
- std::vector<std::shared_ptr<arrow::RecordBatch>> batches;
- batches.reserve(blobs.size());
- for (auto& blob : blobs) {
- batches.push_back(NArrow::DeserializeBatch(blob, schema));
- Y_VERIFY(batches.back());
- }
-
- auto res = arrow::Table::FromRecordBatches(batches);
- Y_VERIFY_S(res.ok(), res.status().message());
- return (*res)->column(0);
-}
-
-}
-
TString TPortionInfo::AddOneChunkColumn(const std::shared_ptr<arrow::Array>& array,
const std::shared_ptr<arrow::Field>& field,
TColumnRecord&& record,
@@ -49,7 +29,7 @@ TString TPortionInfo::AddOneChunkColumn(const std::shared_ptr<arrow::Array>& arr
return blob;
}
-std::shared_ptr<arrow::Table> TPortionInfo::Assemble(const TIndexInfo& indexInfo,
+TPortionInfo::TPreparedBatchData TPortionInfo::PrepareForAssemble(const TIndexInfo& indexInfo,
const std::shared_ptr<arrow::Schema>& schema,
const THashMap<TBlobRange, TString>& blobsData) const {
// Correct records order
@@ -68,7 +48,7 @@ std::shared_ptr<arrow::Table> TPortionInfo::Assemble(const TIndexInfo& indexInfo
}
// Make chunked arrays for columns
- std::vector<std::shared_ptr<arrow::ChunkedArray>> columns;
+ std::vector<TPreparedColumn> columns;
columns.reserve(columnChunks.size());
for (auto& [pos, orderedChunks] : columnChunks) {
@@ -81,24 +61,16 @@ std::shared_ptr<arrow::Table> TPortionInfo::Assemble(const TIndexInfo& indexInfo
Y_VERIFY(chunk == expected);
++expected;
- Y_VERIFY(blobsData.count(blobId));
- TString data = blobsData.find(blobId)->second;
+ auto it = blobsData.find(blobId);
+ Y_VERIFY(it != blobsData.end());
+ TString data = it->second;
blobs.push_back(data);
}
- columns.push_back(DeserializeBlobs(blobs, field));
+ columns.emplace_back(TPreparedColumn(field, std::move(blobs)));
}
- return arrow::Table::Make(schema, columns);
-}
-
-std::shared_ptr<arrow::RecordBatch> TPortionInfo::AssembleInBatch(const TIndexInfo& indexInfo,
- const std::shared_ptr<arrow::Schema>& schema,
- const THashMap<TBlobRange, TString>& data) const {
- std::shared_ptr<arrow::Table> portion = Assemble(indexInfo, schema, data);
- auto res = portion->CombineChunks();
- Y_VERIFY(res.ok());
- return NArrow::ToBatch(*res);
+ return TPreparedBatchData(std::move(columns), schema);
}
void TPortionInfo::AddMinMax(ui32 columnId, const std::shared_ptr<arrow::Array>& column, bool sorted) {
@@ -248,4 +220,20 @@ std::shared_ptr<arrow::Scalar> TPortionInfo::MaxValue(ui32 columnId) const {
return Meta.ColumnMeta.find(columnId)->second.Max;
}
+std::shared_ptr<arrow::ChunkedArray> TPortionInfo::TPreparedColumn::Assemble() const {
+ Y_VERIFY(!Blobs.empty());
+ auto schema = std::make_shared<arrow::Schema>(arrow::FieldVector{ Field });
+
+ std::vector<std::shared_ptr<arrow::RecordBatch>> batches;
+ batches.reserve(Blobs.size());
+ for (auto& blob : Blobs) {
+ batches.push_back(NArrow::DeserializeBatch(blob, schema));
+ Y_VERIFY(batches.back());
+ }
+
+ auto res = arrow::Table::FromRecordBatches(batches);
+ Y_VERIFY_S(res.ok(), res.status().message());
+ return (*res)->column(0);
+}
+
}
diff --git a/ydb/core/tx/columnshard/engines/portion_info.h b/ydb/core/tx/columnshard/engines/portion_info.h
index 5a1af22138d..fc9912cc738 100644
--- a/ydb/core/tx/columnshard/engines/portion_info.h
+++ b/ydb/core/tx/columnshard/engines/portion_info.h
@@ -194,12 +194,55 @@ struct TPortionInfo {
return Meta.ColumnMeta.find(columnId)->second.HasMinMax();
}
- std::shared_ptr<arrow::Table> Assemble(const TIndexInfo& indexInfo,
+ class TPreparedColumn {
+ private:
+ std::shared_ptr<arrow::Field> Field;
+ std::vector<TString> Blobs;
+
+ public:
+ TPreparedColumn(const std::shared_ptr<arrow::Field>& field, std::vector<TString>&& blobs)
+ : Field(field)
+ , Blobs(std::move(blobs))
+ {
+
+ }
+
+ std::shared_ptr<arrow::ChunkedArray> Assemble() const;
+ };
+
+ class TPreparedBatchData {
+ private:
+ std::vector<TPreparedColumn> Columns;
+ std::shared_ptr<arrow::Schema> Schema;
+ public:
+ TPreparedBatchData(std::vector<TPreparedColumn>&& columns, std::shared_ptr<arrow::Schema> schema)
+ : Columns(std::move(columns))
+ , Schema(schema)
+ {
+
+ }
+
+ std::shared_ptr<arrow::RecordBatch> Assemble() {
+ std::vector<std::shared_ptr<arrow::ChunkedArray>> columns;
+ for (auto&& i : Columns) {
+ columns.emplace_back(i.Assemble());
+ }
+
+ auto table = arrow::Table::Make(Schema, columns);
+ auto res = table->CombineChunks();
+ Y_VERIFY(res.ok());
+ return NArrow::ToBatch(*res);
+ }
+ };
+
+ TPreparedBatchData PrepareForAssemble(const TIndexInfo& indexInfo,
const std::shared_ptr<arrow::Schema>& schema,
const THashMap<TBlobRange, TString>& data) const;
std::shared_ptr<arrow::RecordBatch> AssembleInBatch(const TIndexInfo& indexInfo,
const std::shared_ptr<arrow::Schema>& schema,
- const THashMap<TBlobRange, TString>& data) const;
+ const THashMap<TBlobRange, TString>& data) const {
+ return PrepareForAssemble(indexInfo, schema, data).Assemble();
+ }
static TString SerializeColumn(const std::shared_ptr<arrow::Array>& array,
const std::shared_ptr<arrow::Field>& field,