aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorivanmorozov <ivanmorozov@yandex-team.com>2023-03-28 12:36:51 +0300
committerivanmorozov <ivanmorozov@yandex-team.com>2023-03-28 12:36:51 +0300
commita86f9ad8d083e89a2137769078aff95bd4c6827a (patch)
tree67a8d87b557c5f2421afd114bbd2059f2f5cc85a
parent1a3c51ece49c8f974c6bd252cae6811317e700af (diff)
downloadydb-a86f9ad8d083e89a2137769078aff95bd4c6827a.tar.gz
reserve data for builders
-rw-r--r--ydb/core/formats/arrow_helpers.cpp28
-rw-r--r--ydb/core/formats/arrow_helpers.h5
-rw-r--r--ydb/core/formats/merging_sorted_input_stream.cpp10
-rw-r--r--ydb/core/formats/merging_sorted_input_stream.h1
4 files changed, 34 insertions, 10 deletions
diff --git a/ydb/core/formats/arrow_helpers.cpp b/ydb/core/formats/arrow_helpers.cpp
index 57aa4798a39..31d6259ec36 100644
--- a/ydb/core/formats/arrow_helpers.cpp
+++ b/ydb/core/formats/arrow_helpers.cpp
@@ -522,7 +522,7 @@ std::vector<std::shared_ptr<arrow::RecordBatch>> SliceSortedBatches(const std::v
return out;
}
-// Check if the pertumation doesn't reoder anything
+// Check if the permutation doesn't reorder anything
bool IsNoOp(const arrow::UInt64Array& permutation) {
for (i64 i = 0; i < permutation.length(); ++i) {
if (permutation.Value(i) != (ui64)i) {
@@ -677,7 +677,7 @@ bool HasAllColumns(const std::shared_ptr<arrow::RecordBatch>& batch, const std::
}
std::vector<std::unique_ptr<arrow::ArrayBuilder>> MakeBuilders(const std::shared_ptr<arrow::Schema>& schema,
- size_t reserve) {
+ size_t reserve, const std::map<std::string, ui64>& sizeByColumn) {
std::vector<std::unique_ptr<arrow::ArrayBuilder>> builders;
builders.reserve(schema->num_fields());
@@ -685,12 +685,19 @@ std::vector<std::unique_ptr<arrow::ArrayBuilder>> MakeBuilders(const std::shared
std::unique_ptr<arrow::ArrayBuilder> builder;
auto status = arrow::MakeBuilder(arrow::default_memory_pool(), field->type(), &builder);
Y_VERIFY_OK(status);
- builders.emplace_back(std::move(builder));
+ if (sizeByColumn.size()) {
+ auto it = sizeByColumn.find(field->name());
+ if (it != sizeByColumn.end()) {
+ Y_VERIFY(NArrow::ReserveData(*builder, it->second));
+ }
+ }
if (reserve) {
- status = builders.back()->Reserve(reserve);
- Y_VERIFY_OK(status);
+ Y_VERIFY_OK(builder->Reserve(reserve));
}
+
+ builders.emplace_back(std::move(builder));
+
}
return builders;
}
@@ -1406,4 +1413,15 @@ bool ArrayScalarsEqual(const std::shared_ptr<arrow::Array>& lhs, const std::shar
return res;
}
+bool ReserveData(arrow::ArrayBuilder& builder, const size_t size) {
+ if (builder.type()->id() == arrow::Type::BINARY) {
+ arrow::BaseBinaryBuilder<arrow::BinaryType>& bBuilder = static_cast<arrow::BaseBinaryBuilder<arrow::BinaryType>&>(builder);
+ return bBuilder.ReserveData(size).ok();
+ } else if (builder.type()->id() == arrow::Type::STRING) {
+ arrow::BaseBinaryBuilder<arrow::StringType>& bBuilder = static_cast<arrow::BaseBinaryBuilder<arrow::StringType>&>(builder);
+ return bBuilder.ReserveData(size).ok();
+ }
+ return true;
+}
+
}
diff --git a/ydb/core/formats/arrow_helpers.h b/ydb/core/formats/arrow_helpers.h
index 3d34da44871..282e59b896a 100644
--- a/ydb/core/formats/arrow_helpers.h
+++ b/ydb/core/formats/arrow_helpers.h
@@ -94,7 +94,7 @@ std::vector<std::shared_ptr<arrow::RecordBatch>> ShardingSplit(const std::shared
ui32 numShards);
std::vector<std::unique_ptr<arrow::ArrayBuilder>> MakeBuilders(const std::shared_ptr<arrow::Schema>& schema,
- size_t reserve = 0);
+ size_t reserve = 0, const std::map<std::string, ui64>& sizeByColumn = {});
std::vector<std::shared_ptr<arrow::Array>> Finish(std::vector<std::unique_ptr<arrow::ArrayBuilder>>&& builders);
std::shared_ptr<arrow::UInt64Array> MakeUI64Array(ui64 value, i64 size);
@@ -107,9 +107,8 @@ TVector<TString> ColumnNames(const std::shared_ptr<arrow::Schema>& schema);
ui64 GetBatchDataSize(const std::shared_ptr<arrow::RecordBatch>& batch);
// Return size in bytes *not* including size of bitmap mask
ui64 GetArrayDataSize(const std::shared_ptr<arrow::Array>& column);
-
i64 LowerBound(const std::shared_ptr<arrow::Array>& column, const arrow::Scalar& value, i64 offset = 0);
-
+bool ReserveData(arrow::ArrayBuilder& builder, const size_t size);
enum class ECompareType {
LESS = 1,
LESS_OR_EQUAL,
diff --git a/ydb/core/formats/merging_sorted_input_stream.cpp b/ydb/core/formats/merging_sorted_input_stream.cpp
index e5b1ea3b03b..6002d269a58 100644
--- a/ydb/core/formats/merging_sorted_input_stream.cpp
+++ b/ydb/core/formats/merging_sorted_input_stream.cpp
@@ -123,7 +123,6 @@ void TMergingSortedInputStream::Init() {
Y_VERIFY(First);
First = false;
size_t totalRows = 0;
-
for (size_t i = 0; i < SourceBatches.size(); ++i) {
auto& batch = SourceBatches[i];
if (batch) {
@@ -135,11 +134,18 @@ void TMergingSortedInputStream::Init() {
continue;
}
+ for (i32 i = 0; i < batch->num_columns(); ++i) {
+ ColumnSize[batch->column_name(i)] += NArrow::GetArrayDataSize(batch->column(i));
+ }
+
totalRows += batch->num_rows();
Cursors[i] = TSortCursorImpl(batch, Description, i);
}
ExpectedBatchSize = MaxBatchSize ? std::min(totalRows, MaxBatchSize) : totalRows;
+ if (MaxBatchSize && MaxBatchSize < totalRows) {
+ ColumnSize.clear();
+ }
Queue = TSortingHeap(Cursors, Description->NotNull);
@@ -176,7 +182,7 @@ std::shared_ptr<arrow::RecordBatch> TMergingSortedInputStream::ReadImpl() {
}
return batch;
} else {
- auto builders = NArrow::MakeBuilders(Header, ExpectedBatchSize);
+ auto builders = NArrow::MakeBuilders(Header, ExpectedBatchSize, ColumnSize);
if (builders.empty()) {
return {};
}
diff --git a/ydb/core/formats/merging_sorted_input_stream.h b/ydb/core/formats/merging_sorted_input_stream.h
index b5250a7f337..2a3a2f72277 100644
--- a/ydb/core/formats/merging_sorted_input_stream.h
+++ b/ydb/core/formats/merging_sorted_input_stream.h
@@ -35,6 +35,7 @@ private:
bool First = true;
bool Finished = false;
ui64 ExpectedBatchSize = 0; /// May be smaller or equal to max_block_size. To do 'reserve' for columns.
+ std::map<std::string, ui64> ColumnSize;
std::vector<std::shared_ptr<arrow::RecordBatch>> SourceBatches;
std::shared_ptr<TReplaceKey> PrevKey;