diff options
author | aneporada <aneporada@ydb.tech> | 2023-02-21 16:54:58 +0300 |
---|---|---|
committer | aneporada <aneporada@ydb.tech> | 2023-02-21 16:54:58 +0300 |
commit | cf5c82b0e7c3741a363c782402c1082db9dbffad (patch) | |
tree | be92aadf47d0813287897ab70e7cf5d2036a6316 | |
parent | a6e5108959804baccfc15da911a9e868b834e15b (diff) | |
download | ydb-cf5c82b0e7c3741a363c782402c1082db9dbffad.tar.gz |
Fix usage of IArrayBuilder in UDFs - take into account max output block size
-rw-r--r-- | ydb/library/yql/public/udf/arrow/udf_arrow_helpers.h | 19 |
1 files changed, 13 insertions, 6 deletions
diff --git a/ydb/library/yql/public/udf/arrow/udf_arrow_helpers.h b/ydb/library/yql/public/udf/arrow/udf_arrow_helpers.h index e15f63d36e..266e0f8a12 100644 --- a/ydb/library/yql/public/udf/arrow/udf_arrow_helpers.h +++ b/ydb/library/yql/public/udf/arrow/udf_arrow_helpers.h @@ -299,14 +299,21 @@ struct TUnaryKernelExec { else { auto& array = *arg.array(); auto& builder = state.GetArrayBuilder(); - for (int64_t i = 0; i < array.length; ++i) { - auto item = reader.GetItem(array, i); - TDerived::Process(item, [&](TBlockItem out) { - builder.Add(out); - }); + size_t maxBlockLength = builder.MaxLength(); + Y_ENSURE(maxBlockLength > 0); + TVector<std::shared_ptr<arrow::ArrayData>> outputArrays; + for (int64_t i = 0; i < array.length;) { + for (size_t j = 0; j < maxBlockLength && i < array.length; ++j, ++i) { + auto item = reader.GetItem(array, i); + TDerived::Process(item, [&](TBlockItem out) { + builder.Add(out); + }); + } + auto outputDatum = builder.Build(false); + ForEachArrayData(outputDatum, [&](const auto& arr) { outputArrays.push_back(arr); }); } - *res = builder.Build(false); + *res = MakeArray(outputArrays); } return arrow::Status::OK(); |