diff options
| author | aneporada <[email protected]> | 2023-02-21 16:54:58 +0300 | 
|---|---|---|
| committer | aneporada <[email protected]> | 2023-02-21 16:54:58 +0300 | 
| commit | cf5c82b0e7c3741a363c782402c1082db9dbffad (patch) | |
| tree | be92aadf47d0813287897ab70e7cf5d2036a6316 | |
| parent | a6e5108959804baccfc15da911a9e868b834e15b (diff) | |
Fix usage of IArrayBuilder in UDFs - take into account max output block size
| -rw-r--r-- | ydb/library/yql/public/udf/arrow/udf_arrow_helpers.h | 19 | 
1 files changed, 13 insertions, 6 deletions
| diff --git a/ydb/library/yql/public/udf/arrow/udf_arrow_helpers.h b/ydb/library/yql/public/udf/arrow/udf_arrow_helpers.h index e15f63d36e8..266e0f8a12d 100644 --- a/ydb/library/yql/public/udf/arrow/udf_arrow_helpers.h +++ b/ydb/library/yql/public/udf/arrow/udf_arrow_helpers.h @@ -299,14 +299,21 @@ struct TUnaryKernelExec {          else {              auto& array = *arg.array();              auto& builder = state.GetArrayBuilder(); -            for (int64_t i = 0; i < array.length; ++i) { -                auto item = reader.GetItem(array, i); -                TDerived::Process(item, [&](TBlockItem out) { -                    builder.Add(out); -                }); +            size_t maxBlockLength = builder.MaxLength(); +            Y_ENSURE(maxBlockLength > 0); +            TVector<std::shared_ptr<arrow::ArrayData>> outputArrays; +            for (int64_t i = 0; i < array.length;) { +                for (size_t j = 0; j < maxBlockLength && i < array.length; ++j, ++i) { +                    auto item = reader.GetItem(array, i); +                    TDerived::Process(item, [&](TBlockItem out) { +                        builder.Add(out); +                    }); +                } +                auto outputDatum = builder.Build(false); +                ForEachArrayData(outputDatum, [&](const auto& arr) { outputArrays.push_back(arr); });              } -            *res = builder.Build(false); +            *res = MakeArray(outputArrays);          }          return arrow::Status::OK(); | 
