aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoraneporada <aneporada@ydb.tech>2023-08-02 21:43:40 +0300
committeraneporada <aneporada@ydb.tech>2023-08-02 21:43:40 +0300
commitb1a8535e42e076174d1585dcd2d463748d75a436 (patch)
treef0bb6733079956f6e9afa969cd5b03b1aceee6d7
parentf18ede359572d7547fb94817d84ca94522a0b922 (diff)
downloadydb-b1a8535e42e076174d1585dcd2d463748d75a436.tar.gz
Fix chunked array building, fix output block len in output consumer
-rw-r--r--ydb/library/yql/dq/runtime/dq_input_producer.cpp20
-rw-r--r--ydb/library/yql/dq/runtime/dq_output_consumer.cpp6
-rw-r--r--ydb/library/yql/public/udf/arrow/args_dechunker.cpp7
-rw-r--r--ydb/library/yql/public/udf/arrow/args_dechunker.h2
-rw-r--r--ydb/library/yql/public/udf/arrow/block_builder.h6
5 files changed, 19 insertions, 22 deletions
diff --git a/ydb/library/yql/dq/runtime/dq_input_producer.cpp b/ydb/library/yql/dq/runtime/dq_input_producer.cpp
index dee5aeb508..9a473bb914 100644
--- a/ydb/library/yql/dq/runtime/dq_input_producer.cpp
+++ b/ydb/library/yql/dq/runtime/dq_input_producer.cpp
@@ -563,7 +563,8 @@ private:
}
std::vector<arrow::Datum> chunk;
- while (!Output_ || !Output_->Next(chunk)) {
+ ui64 blockLen = 0;
+ while (!Output_ || !Output_->Next(chunk, blockLen)) {
auto status = DoMerge();
if (status != NUdf::EFetchStatus::Ok) {
IsFinished_ = status == NUdf::EFetchStatus::Finish;
@@ -572,24 +573,13 @@ private:
YQL_ENSURE(Output_);
}
- TMaybe<ui64> blockLen;
YQL_ENSURE(width == chunk.size() + 1);
for (ui32 i = 0; i < chunk.size(); ++i) {
- auto& item = chunk[i];
- if (item.is_array()) {
- if (blockLen.Defined()) {
- YQL_ENSURE(*blockLen == (ui64)item.array()->length);
- } else {
- blockLen = item.array()->length;
- }
- } else {
- YQL_ENSURE(item.is_scalar());
- }
- result[i] = Factory_.CreateArrowBlock(std::move(item));
+ result[i] = Factory_.CreateArrowBlock(std::move(chunk[i]));
}
- YQL_ENSURE(blockLen.Defined());
- result[chunk.size()] = Factory_.CreateArrowBlock(arrow::Datum(std::make_shared<arrow::UInt64Scalar>(*blockLen)));
+ YQL_ENSURE(blockLen > 0);
+ result[chunk.size()] = Factory_.CreateArrowBlock(arrow::Datum(std::make_shared<arrow::UInt64Scalar>(blockLen)));
// TODO: support stats for blocks
//if (Stats_) {
// Stats_.Add(result, width);
diff --git a/ydb/library/yql/dq/runtime/dq_output_consumer.cpp b/ydb/library/yql/dq/runtime/dq_output_consumer.cpp
index 5b640f16e2..38b866ed18 100644
--- a/ydb/library/yql/dq/runtime/dq_output_consumer.cpp
+++ b/ydb/library/yql/dq/runtime/dq_output_consumer.cpp
@@ -421,7 +421,6 @@ private:
output.emplace_back(Builders_[j]->Build(false));
}
}
- output.emplace_back(arrow::Datum(std::make_shared<arrow::UInt64Scalar>(outputBlockLen)));
outputData.emplace_back(std::make_unique<TArgsDechunker>(std::move(output)));
}
@@ -439,12 +438,15 @@ private:
}
std::vector<arrow::Datum> chunk;
- if (outputData[i] && outputData[i]->Next(chunk)) {
+ ui64 blockLen = 0;
+ if (outputData[i] && outputData[i]->Next(chunk, blockLen)) {
+ YQL_ENSURE(blockLen > 0);
hasData = true;
TUnboxedValueVector outputValues;
for (auto& datum : chunk) {
outputValues.emplace_back(HolderFactory_.CreateArrowBlock(std::move(datum)));
}
+ outputValues.emplace_back(HolderFactory_.CreateArrowBlock(arrow::Datum(std::make_shared<arrow::UInt64Scalar>(blockLen))));
Outputs_[i]->WidePush(outputValues.data(), outputValues.size());
}
}
diff --git a/ydb/library/yql/public/udf/arrow/args_dechunker.cpp b/ydb/library/yql/public/udf/arrow/args_dechunker.cpp
index 6f00a19b07..1104c0e51b 100644
--- a/ydb/library/yql/public/udf/arrow/args_dechunker.cpp
+++ b/ydb/library/yql/public/udf/arrow/args_dechunker.cpp
@@ -20,6 +20,12 @@ TArgsDechunker::TArgsDechunker(std::vector<arrow::Datum>&& args)
}
bool TArgsDechunker::Next(std::vector<arrow::Datum>& chunk) {
+ ui64 chunkLen;
+ return Next(chunk, chunkLen);
+}
+
+bool TArgsDechunker::Next(std::vector<arrow::Datum>& chunk, ui64& chunkLen) {
+ chunkLen = 0;
if (Finish) {
return false;
}
@@ -55,6 +61,7 @@ bool TArgsDechunker::Next(std::vector<arrow::Datum>& chunk) {
chunk[i] = arrow::Datum(Chop(Arrays[i].front(), minSize));
}
}
+ chunkLen = minSize;
return true;
}
diff --git a/ydb/library/yql/public/udf/arrow/args_dechunker.h b/ydb/library/yql/public/udf/arrow/args_dechunker.h
index dd5e8302ca..af267a8c9a 100644
--- a/ydb/library/yql/public/udf/arrow/args_dechunker.h
+++ b/ydb/library/yql/public/udf/arrow/args_dechunker.h
@@ -13,6 +13,8 @@ class TArgsDechunker {
public:
explicit TArgsDechunker(std::vector<arrow::Datum>&& args);
bool Next(std::vector<arrow::Datum>& chunk);
+ // chunkLen will be zero if no arrays are present in chunk
+ bool Next(std::vector<arrow::Datum>& chunk, ui64& chunkLen);
private:
const std::vector<arrow::Datum> Args;
diff --git a/ydb/library/yql/public/udf/arrow/block_builder.h b/ydb/library/yql/public/udf/arrow/block_builder.h
index a9d06df0c3..29ac47431c 100644
--- a/ydb/library/yql/public/udf/arrow/block_builder.h
+++ b/ydb/library/yql/public/udf/arrow/block_builder.h
@@ -258,11 +258,7 @@ private:
return result;
}
- int64_t result = std::numeric_limits<int64_t>::max();
- for (auto& data : tree.Payload) {
- result = std::min(result, data->length);
- }
-
+ int64_t result = tree.Payload.front()->length;
Y_VERIFY(result > 0);
return static_cast<size_t>(result);
}