diff options
author | ivanmorozov <ivanmorozov@yandex-team.com> | 2023-03-28 12:36:51 +0300 |
---|---|---|
committer | ivanmorozov <ivanmorozov@yandex-team.com> | 2023-03-28 12:36:51 +0300 |
commit | a86f9ad8d083e89a2137769078aff95bd4c6827a (patch) | |
tree | 67a8d87b557c5f2421afd114bbd2059f2f5cc85a | |
parent | 1a3c51ece49c8f974c6bd252cae6811317e700af (diff) | |
download | ydb-a86f9ad8d083e89a2137769078aff95bd4c6827a.tar.gz |
reserve data for builders
-rw-r--r-- | ydb/core/formats/arrow_helpers.cpp | 28 | ||||
-rw-r--r-- | ydb/core/formats/arrow_helpers.h | 5 | ||||
-rw-r--r-- | ydb/core/formats/merging_sorted_input_stream.cpp | 10 | ||||
-rw-r--r-- | ydb/core/formats/merging_sorted_input_stream.h | 1 |
4 files changed, 34 insertions, 10 deletions
diff --git a/ydb/core/formats/arrow_helpers.cpp b/ydb/core/formats/arrow_helpers.cpp index 57aa4798a39..31d6259ec36 100644 --- a/ydb/core/formats/arrow_helpers.cpp +++ b/ydb/core/formats/arrow_helpers.cpp @@ -522,7 +522,7 @@ std::vector<std::shared_ptr<arrow::RecordBatch>> SliceSortedBatches(const std::v return out; } -// Check if the pertumation doesn't reoder anything +// Check if the permutation doesn't reorder anything bool IsNoOp(const arrow::UInt64Array& permutation) { for (i64 i = 0; i < permutation.length(); ++i) { if (permutation.Value(i) != (ui64)i) { @@ -677,7 +677,7 @@ bool HasAllColumns(const std::shared_ptr<arrow::RecordBatch>& batch, const std:: } std::vector<std::unique_ptr<arrow::ArrayBuilder>> MakeBuilders(const std::shared_ptr<arrow::Schema>& schema, - size_t reserve) { + size_t reserve, const std::map<std::string, ui64>& sizeByColumn) { std::vector<std::unique_ptr<arrow::ArrayBuilder>> builders; builders.reserve(schema->num_fields()); @@ -685,12 +685,19 @@ std::vector<std::unique_ptr<arrow::ArrayBuilder>> MakeBuilders(const std::shared std::unique_ptr<arrow::ArrayBuilder> builder; auto status = arrow::MakeBuilder(arrow::default_memory_pool(), field->type(), &builder); Y_VERIFY_OK(status); - builders.emplace_back(std::move(builder)); + if (sizeByColumn.size()) { + auto it = sizeByColumn.find(field->name()); + if (it != sizeByColumn.end()) { + Y_VERIFY(NArrow::ReserveData(*builder, it->second)); + } + } if (reserve) { - status = builders.back()->Reserve(reserve); - Y_VERIFY_OK(status); + Y_VERIFY_OK(builder->Reserve(reserve)); } + + builders.emplace_back(std::move(builder)); + } return builders; } @@ -1406,4 +1413,15 @@ bool ArrayScalarsEqual(const std::shared_ptr<arrow::Array>& lhs, const std::shar return res; } +bool ReserveData(arrow::ArrayBuilder& builder, const size_t size) { + if (builder.type()->id() == arrow::Type::BINARY) { + arrow::BaseBinaryBuilder<arrow::BinaryType>& bBuilder = static_cast<arrow::BaseBinaryBuilder<arrow::BinaryType>&>(builder); + return bBuilder.ReserveData(size).ok(); + } else if (builder.type()->id() == arrow::Type::STRING) { + arrow::BaseBinaryBuilder<arrow::StringType>& bBuilder = static_cast<arrow::BaseBinaryBuilder<arrow::StringType>&>(builder); + return bBuilder.ReserveData(size).ok(); + } + return true; +} + } diff --git a/ydb/core/formats/arrow_helpers.h b/ydb/core/formats/arrow_helpers.h index 3d34da44871..282e59b896a 100644 --- a/ydb/core/formats/arrow_helpers.h +++ b/ydb/core/formats/arrow_helpers.h @@ -94,7 +94,7 @@ std::vector<std::shared_ptr<arrow::RecordBatch>> ShardingSplit(const std::shared ui32 numShards); std::vector<std::unique_ptr<arrow::ArrayBuilder>> MakeBuilders(const std::shared_ptr<arrow::Schema>& schema, - size_t reserve = 0); + size_t reserve = 0, const std::map<std::string, ui64>& sizeByColumn = {}); std::vector<std::shared_ptr<arrow::Array>> Finish(std::vector<std::unique_ptr<arrow::ArrayBuilder>>&& builders); std::shared_ptr<arrow::UInt64Array> MakeUI64Array(ui64 value, i64 size); @@ -107,9 +107,8 @@ TVector<TString> ColumnNames(const std::shared_ptr<arrow::Schema>& schema); ui64 GetBatchDataSize(const std::shared_ptr<arrow::RecordBatch>& batch); // Return size in bytes *not* including size of bitmap mask ui64 GetArrayDataSize(const std::shared_ptr<arrow::Array>& column); - i64 LowerBound(const std::shared_ptr<arrow::Array>& column, const arrow::Scalar& value, i64 offset = 0); - +bool ReserveData(arrow::ArrayBuilder& builder, const size_t size); enum class ECompareType { LESS = 1, LESS_OR_EQUAL, diff --git a/ydb/core/formats/merging_sorted_input_stream.cpp b/ydb/core/formats/merging_sorted_input_stream.cpp index e5b1ea3b03b..6002d269a58 100644 --- a/ydb/core/formats/merging_sorted_input_stream.cpp +++ b/ydb/core/formats/merging_sorted_input_stream.cpp @@ -123,7 +123,6 @@ void TMergingSortedInputStream::Init() { Y_VERIFY(First); First = false; size_t totalRows = 0; - for (size_t i = 0; i < SourceBatches.size(); ++i) { auto& batch = SourceBatches[i]; if (batch) { @@ -135,11 +134,18 @@ void TMergingSortedInputStream::Init() { continue; } + for (i32 i = 0; i < batch->num_columns(); ++i) { + ColumnSize[batch->column_name(i)] += NArrow::GetArrayDataSize(batch->column(i)); + } + totalRows += batch->num_rows(); Cursors[i] = TSortCursorImpl(batch, Description, i); } ExpectedBatchSize = MaxBatchSize ? std::min(totalRows, MaxBatchSize) : totalRows; + if (MaxBatchSize && MaxBatchSize < totalRows) { + ColumnSize.clear(); + } Queue = TSortingHeap(Cursors, Description->NotNull); @@ -176,7 +182,7 @@ std::shared_ptr<arrow::RecordBatch> TMergingSortedInputStream::ReadImpl() { } return batch; } else { - auto builders = NArrow::MakeBuilders(Header, ExpectedBatchSize); + auto builders = NArrow::MakeBuilders(Header, ExpectedBatchSize, ColumnSize); if (builders.empty()) { return {}; } diff --git a/ydb/core/formats/merging_sorted_input_stream.h b/ydb/core/formats/merging_sorted_input_stream.h index b5250a7f337..2a3a2f72277 100644 --- a/ydb/core/formats/merging_sorted_input_stream.h +++ b/ydb/core/formats/merging_sorted_input_stream.h @@ -35,6 +35,7 @@ private: bool First = true; bool Finished = false; ui64 ExpectedBatchSize = 0; /// May be smaller or equal to max_block_size. To do 'reserve' for columns. + std::map<std::string, ui64> ColumnSize; std::vector<std::shared_ptr<arrow::RecordBatch>> SourceBatches; std::shared_ptr<TReplaceKey> PrevKey; |