diff options
author | aneporada <aneporada@ydb.tech> | 2023-08-02 21:43:40 +0300 |
---|---|---|
committer | aneporada <aneporada@ydb.tech> | 2023-08-02 21:43:40 +0300 |
commit | b1a8535e42e076174d1585dcd2d463748d75a436 (patch) | |
tree | f0bb6733079956f6e9afa969cd5b03b1aceee6d7 | |
parent | f18ede359572d7547fb94817d84ca94522a0b922 (diff) | |
download | ydb-b1a8535e42e076174d1585dcd2d463748d75a436.tar.gz |
Fix chunked array building, fix output block len in output consumer
5 files changed, 19 insertions, 22 deletions
diff --git a/ydb/library/yql/dq/runtime/dq_input_producer.cpp b/ydb/library/yql/dq/runtime/dq_input_producer.cpp index dee5aeb508..9a473bb914 100644 --- a/ydb/library/yql/dq/runtime/dq_input_producer.cpp +++ b/ydb/library/yql/dq/runtime/dq_input_producer.cpp @@ -563,7 +563,8 @@ private: } std::vector<arrow::Datum> chunk; - while (!Output_ || !Output_->Next(chunk)) { + ui64 blockLen = 0; + while (!Output_ || !Output_->Next(chunk, blockLen)) { auto status = DoMerge(); if (status != NUdf::EFetchStatus::Ok) { IsFinished_ = status == NUdf::EFetchStatus::Finish; @@ -572,24 +573,13 @@ private: YQL_ENSURE(Output_); } - TMaybe<ui64> blockLen; YQL_ENSURE(width == chunk.size() + 1); for (ui32 i = 0; i < chunk.size(); ++i) { - auto& item = chunk[i]; - if (item.is_array()) { - if (blockLen.Defined()) { - YQL_ENSURE(*blockLen == (ui64)item.array()->length); - } else { - blockLen = item.array()->length; - } - } else { - YQL_ENSURE(item.is_scalar()); - } - result[i] = Factory_.CreateArrowBlock(std::move(item)); + result[i] = Factory_.CreateArrowBlock(std::move(chunk[i])); } - YQL_ENSURE(blockLen.Defined()); - result[chunk.size()] = Factory_.CreateArrowBlock(arrow::Datum(std::make_shared<arrow::UInt64Scalar>(*blockLen))); + YQL_ENSURE(blockLen > 0); + result[chunk.size()] = Factory_.CreateArrowBlock(arrow::Datum(std::make_shared<arrow::UInt64Scalar>(blockLen))); // TODO: support stats for blocks //if (Stats_) { // Stats_.Add(result, width); diff --git a/ydb/library/yql/dq/runtime/dq_output_consumer.cpp b/ydb/library/yql/dq/runtime/dq_output_consumer.cpp index 5b640f16e2..38b866ed18 100644 --- a/ydb/library/yql/dq/runtime/dq_output_consumer.cpp +++ b/ydb/library/yql/dq/runtime/dq_output_consumer.cpp @@ -421,7 +421,6 @@ private: output.emplace_back(Builders_[j]->Build(false)); } } - output.emplace_back(arrow::Datum(std::make_shared<arrow::UInt64Scalar>(outputBlockLen))); outputData.emplace_back(std::make_unique<TArgsDechunker>(std::move(output))); } @@ -439,12 +438,15 @@ private: } std::vector<arrow::Datum> chunk; - if (outputData[i] && outputData[i]->Next(chunk)) { + ui64 blockLen = 0; + if (outputData[i] && outputData[i]->Next(chunk, blockLen)) { + YQL_ENSURE(blockLen > 0); hasData = true; TUnboxedValueVector outputValues; for (auto& datum : chunk) { outputValues.emplace_back(HolderFactory_.CreateArrowBlock(std::move(datum))); } + outputValues.emplace_back(HolderFactory_.CreateArrowBlock(arrow::Datum(std::make_shared<arrow::UInt64Scalar>(blockLen)))); Outputs_[i]->WidePush(outputValues.data(), outputValues.size()); } } diff --git a/ydb/library/yql/public/udf/arrow/args_dechunker.cpp b/ydb/library/yql/public/udf/arrow/args_dechunker.cpp index 6f00a19b07..1104c0e51b 100644 --- a/ydb/library/yql/public/udf/arrow/args_dechunker.cpp +++ b/ydb/library/yql/public/udf/arrow/args_dechunker.cpp @@ -20,6 +20,12 @@ TArgsDechunker::TArgsDechunker(std::vector<arrow::Datum>&& args) } bool TArgsDechunker::Next(std::vector<arrow::Datum>& chunk) { + ui64 chunkLen; + return Next(chunk, chunkLen); +} + +bool TArgsDechunker::Next(std::vector<arrow::Datum>& chunk, ui64& chunkLen) { + chunkLen = 0; if (Finish) { return false; } @@ -55,6 +61,7 @@ bool TArgsDechunker::Next(std::vector<arrow::Datum>& chunk) { chunk[i] = arrow::Datum(Chop(Arrays[i].front(), minSize)); } } + chunkLen = minSize; return true; } diff --git a/ydb/library/yql/public/udf/arrow/args_dechunker.h b/ydb/library/yql/public/udf/arrow/args_dechunker.h index dd5e8302ca..af267a8c9a 100644 --- a/ydb/library/yql/public/udf/arrow/args_dechunker.h +++ b/ydb/library/yql/public/udf/arrow/args_dechunker.h @@ -13,6 +13,8 @@ class TArgsDechunker { public: explicit TArgsDechunker(std::vector<arrow::Datum>&& args); bool Next(std::vector<arrow::Datum>& chunk); + // chunkLen will be zero if no arrays are present in chunk + bool Next(std::vector<arrow::Datum>& chunk, ui64& chunkLen); private: const std::vector<arrow::Datum> Args; diff --git a/ydb/library/yql/public/udf/arrow/block_builder.h b/ydb/library/yql/public/udf/arrow/block_builder.h index a9d06df0c3..29ac47431c 100644 --- a/ydb/library/yql/public/udf/arrow/block_builder.h +++ b/ydb/library/yql/public/udf/arrow/block_builder.h @@ -258,11 +258,7 @@ private: return result; } - int64_t result = std::numeric_limits<int64_t>::max(); - for (auto& data : tree.Payload) { - result = std::min(result, data->length); - } - + int64_t result = tree.Payload.front()->length; Y_VERIFY(result > 0); return static_cast<size_t>(result); } |