aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoraneporada <aneporada@ydb.tech>2023-02-21 16:54:58 +0300
committeraneporada <aneporada@ydb.tech>2023-02-21 16:54:58 +0300
commitcf5c82b0e7c3741a363c782402c1082db9dbffad (patch)
treebe92aadf47d0813287897ab70e7cf5d2036a6316
parenta6e5108959804baccfc15da911a9e868b834e15b (diff)
downloadydb-cf5c82b0e7c3741a363c782402c1082db9dbffad.tar.gz
Fix usage of IArrayBuilder in UDFs - take into account max output block size
-rw-r--r--ydb/library/yql/public/udf/arrow/udf_arrow_helpers.h19
1 files changed, 13 insertions, 6 deletions
diff --git a/ydb/library/yql/public/udf/arrow/udf_arrow_helpers.h b/ydb/library/yql/public/udf/arrow/udf_arrow_helpers.h
index e15f63d36e..266e0f8a12 100644
--- a/ydb/library/yql/public/udf/arrow/udf_arrow_helpers.h
+++ b/ydb/library/yql/public/udf/arrow/udf_arrow_helpers.h
@@ -299,14 +299,21 @@ struct TUnaryKernelExec {
else {
auto& array = *arg.array();
auto& builder = state.GetArrayBuilder();
- for (int64_t i = 0; i < array.length; ++i) {
- auto item = reader.GetItem(array, i);
- TDerived::Process(item, [&](TBlockItem out) {
- builder.Add(out);
- });
+ size_t maxBlockLength = builder.MaxLength();
+ Y_ENSURE(maxBlockLength > 0);
+ TVector<std::shared_ptr<arrow::ArrayData>> outputArrays;
+ for (int64_t i = 0; i < array.length;) {
+ for (size_t j = 0; j < maxBlockLength && i < array.length; ++j, ++i) {
+ auto item = reader.GetItem(array, i);
+ TDerived::Process(item, [&](TBlockItem out) {
+ builder.Add(out);
+ });
+ }
+ auto outputDatum = builder.Build(false);
+ ForEachArrayData(outputDatum, [&](const auto& arr) { outputArrays.push_back(arr); });
}
- *res = builder.Build(false);
+ *res = MakeArray(outputArrays);
}
return arrow::Status::OK();