diff options
author | vvvv <vvvv@ydb.tech> | 2023-01-25 12:37:19 +0300 |
---|---|---|
committer | vvvv <vvvv@ydb.tech> | 2023-01-25 12:37:19 +0300 |
commit | 72e9af3baa230f3e1b63832986ba6e2c92b9ac9b (patch) | |
tree | 5f2e9462601c6b3278d654b31de1f2dcbd13504e | |
parent | ce1b7ebdaf9751df49f242c3dbc748fbfc8e49dc (diff) | |
download | ydb-72e9af3baa230f3e1b63832986ba6e2c92b9ac9b.tar.gz |
simple udfs executor
20 files changed, 289 insertions, 193 deletions
diff --git a/ydb/library/yql/minikql/arrow/CMakeLists.darwin.txt b/ydb/library/yql/minikql/arrow/CMakeLists.darwin.txt index 080541694f..5bb17a45a4 100644 --- a/ydb/library/yql/minikql/arrow/CMakeLists.darwin.txt +++ b/ydb/library/yql/minikql/arrow/CMakeLists.darwin.txt @@ -16,6 +16,7 @@ target_link_libraries(yql-minikql-arrow PUBLIC yutil libs-apache-arrow library-yql-minikql + public-udf-arrow ) target_sources(yql-minikql-arrow PRIVATE ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/arrow/arrow_util.cpp diff --git a/ydb/library/yql/minikql/arrow/CMakeLists.linux-aarch64.txt b/ydb/library/yql/minikql/arrow/CMakeLists.linux-aarch64.txt index edb0b73378..7321351567 100644 --- a/ydb/library/yql/minikql/arrow/CMakeLists.linux-aarch64.txt +++ b/ydb/library/yql/minikql/arrow/CMakeLists.linux-aarch64.txt @@ -17,6 +17,7 @@ target_link_libraries(yql-minikql-arrow PUBLIC yutil libs-apache-arrow library-yql-minikql + public-udf-arrow ) target_sources(yql-minikql-arrow PRIVATE ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/arrow/arrow_util.cpp diff --git a/ydb/library/yql/minikql/arrow/CMakeLists.linux.txt b/ydb/library/yql/minikql/arrow/CMakeLists.linux.txt index edb0b73378..7321351567 100644 --- a/ydb/library/yql/minikql/arrow/CMakeLists.linux.txt +++ b/ydb/library/yql/minikql/arrow/CMakeLists.linux.txt @@ -17,6 +17,7 @@ target_link_libraries(yql-minikql-arrow PUBLIC yutil libs-apache-arrow library-yql-minikql + public-udf-arrow ) target_sources(yql-minikql-arrow PRIVATE ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/arrow/arrow_util.cpp diff --git a/ydb/library/yql/minikql/arrow/arrow_defs.h b/ydb/library/yql/minikql/arrow/arrow_defs.h index 97b26e0345..8607cae612 100644 --- a/ydb/library/yql/minikql/arrow/arrow_defs.h +++ b/ydb/library/yql/minikql/arrow/arrow_defs.h @@ -1,24 +1,5 @@ #pragma once #include <ydb/library/yql/minikql/defs.h> +#include <ydb/library/yql/public/udf/arrow/defs.h> -#define ARROW_CHECK_STATUS(s, op, ...) \ - MKQL_ENSURE(s.ok(), "Operation failed: [" << #op << "]\n" \ - << "" __VA_ARGS__ << ": [" << s.ToString() << "]") \ - -#define ARROW_OK_S(op, ...) \ -do { \ - ::arrow::Status _s = (op); \ - ARROW_CHECK_STATUS(_s, op, __VA_ARGS__); \ - } while (false) - -#define ARROW_OK(op) ARROW_OK_S(op, "Bad status") - -#define ARROW_RESULT_S(op, ...) \ - [&]() { \ - auto result = (op); \ - ARROW_CHECK_STATUS(result.status(), op, __VA_ARGS__); \ - return std::move(result).ValueOrDie(); \ - }() - -#define ARROW_RESULT(op) ARROW_RESULT_S(op, "Bad status") diff --git a/ydb/library/yql/minikql/arrow/arrow_util.cpp b/ydb/library/yql/minikql/arrow/arrow_util.cpp index ff8015734c..942ae6ef95 100644 --- a/ydb/library/yql/minikql/arrow/arrow_util.cpp +++ b/ydb/library/yql/minikql/arrow/arrow_util.cpp @@ -10,38 +10,6 @@ namespace NKikimr::NMiniKQL { -std::shared_ptr<arrow::ArrayData> DeepSlice(const std::shared_ptr<arrow::ArrayData>& data, size_t offset, size_t len) { - Y_VERIFY(data->length >= 0); - Y_VERIFY(offset + len <= (size_t)data->length); - if (offset == 0 && len == (size_t)data->length) { - return data; - } - - std::shared_ptr<arrow::ArrayData> result = data->Copy(); - result->offset = data->offset + offset; - result->length = len; - - if (data->null_count == data->length) { - result->null_count = len; - } else if (len == 0) { - result->null_count = 0; - } else { - result->null_count = data->null_count != 0 ? arrow::kUnknownNullCount : 0; - } - - for (size_t i = 0; i < data->child_data.size(); ++i) { - result->child_data[i] = DeepSlice(data->child_data[i], offset, len); - } - - return result; -} - -std::shared_ptr<arrow::ArrayData> Chop(std::shared_ptr<arrow::ArrayData>& data, size_t len) { - auto first = DeepSlice(data, 0, len); - data = DeepSlice(data, len, data->length - len); - return first; -} - std::shared_ptr<arrow::ArrayData> Unwrap(const arrow::ArrayData& data, TType* itemType) { bool isOptional; auto unpacked = UnpackOptional(itemType, isOptional); @@ -71,30 +39,4 @@ std::shared_ptr<arrow::Buffer> MakeDenseBitmap(const ui8* srcSparse, size_t len, return bitmap; } -void ForEachArrayData(const arrow::Datum& datum, const std::function<void(const std::shared_ptr<arrow::ArrayData>&)>& func) { - MKQL_ENSURE(datum.is_arraylike(), "Expected array"); - if (datum.is_array()) { - func(datum.array()); - } else { - for (auto& chunk : datum.chunks()) { - func(chunk->data()); - } - } -} - -arrow::Datum MakeArray(const TVector<std::shared_ptr<arrow::ArrayData>>& chunks) { - MKQL_ENSURE(!chunks.empty(), "Expected non empty chunks"); - arrow::ArrayVector resultChunks; - for (auto& chunk : chunks) { - resultChunks.push_back(arrow::Datum(chunk).make_array()); - } - - if (resultChunks.size() > 1) { - auto type = resultChunks.front()->type(); - auto chunked = ARROW_RESULT(arrow::ChunkedArray::Make(std::move(resultChunks), type)); - return arrow::Datum(chunked); - } - return arrow::Datum(resultChunks.front()); -} - } diff --git a/ydb/library/yql/minikql/arrow/arrow_util.h b/ydb/library/yql/minikql/arrow/arrow_util.h index 86e0890732..6623fc4384 100644 --- a/ydb/library/yql/minikql/arrow/arrow_util.h +++ b/ydb/library/yql/minikql/arrow/arrow_util.h @@ -9,14 +9,12 @@ #include <arrow/util/bitmap.h> #include <ydb/library/yql/minikql/mkql_node.h> +#include <ydb/library/yql/public/udf/arrow/util.h> namespace NKikimr::NMiniKQL { -/// \brief Recursive version of ArrayData::Slice() method -std::shared_ptr<arrow::ArrayData> DeepSlice(const std::shared_ptr<arrow::ArrayData>& data, size_t offset, size_t len); - -/// \brief Chops first len items of `data` as new ArrayData object -std::shared_ptr<arrow::ArrayData> Chop(std::shared_ptr<arrow::ArrayData>& data, size_t len); +using NYql::NUdf::DeepSlice; +using NYql::NUdf::Chop; /// \brief Remove optional from `data` as new ArrayData object std::shared_ptr<arrow::ArrayData> Unwrap(const arrow::ArrayData& data, TType* itemType); @@ -28,8 +26,8 @@ inline arrow::internal::Bitmap GetBitmap(const arrow::ArrayData& arr, int index) return arrow::internal::Bitmap{ arr.buffers[index], arr.offset, arr.length }; } -void ForEachArrayData(const arrow::Datum& datum, const std::function<void(const std::shared_ptr<arrow::ArrayData>&)>& func); -arrow::Datum MakeArray(const TVector<std::shared_ptr<arrow::ArrayData>>& chunks); +using NYql::NUdf::ForEachArrayData; +using NYql::NUdf::MakeArray; template <typename T> T GetPrimitiveScalarValue(const arrow::Scalar& scalar) { diff --git a/ydb/library/yql/minikql/comp_nodes/CMakeLists.darwin.txt b/ydb/library/yql/minikql/comp_nodes/CMakeLists.darwin.txt index 1167e76e2c..0584fa1f2e 100644 --- a/ydb/library/yql/minikql/comp_nodes/CMakeLists.darwin.txt +++ b/ydb/library/yql/minikql/comp_nodes/CMakeLists.darwin.txt @@ -20,6 +20,7 @@ target_link_libraries(yql-minikql-comp_nodes PUBLIC library-yql-minikql yql-minikql-arrow yql-minikql-invoke_builtins + public-udf-arrow parser-pg_wrapper-interface library-yql-utils cpp-actors-core diff --git a/ydb/library/yql/minikql/comp_nodes/CMakeLists.linux-aarch64.txt b/ydb/library/yql/minikql/comp_nodes/CMakeLists.linux-aarch64.txt index fe47fb92fa..870b944bfe 100644 --- a/ydb/library/yql/minikql/comp_nodes/CMakeLists.linux-aarch64.txt +++ b/ydb/library/yql/minikql/comp_nodes/CMakeLists.linux-aarch64.txt @@ -21,6 +21,7 @@ target_link_libraries(yql-minikql-comp_nodes PUBLIC library-yql-minikql yql-minikql-arrow yql-minikql-invoke_builtins + public-udf-arrow parser-pg_wrapper-interface library-yql-utils cpp-actors-core diff --git a/ydb/library/yql/minikql/comp_nodes/CMakeLists.linux.txt b/ydb/library/yql/minikql/comp_nodes/CMakeLists.linux.txt index fe47fb92fa..870b944bfe 100644 --- a/ydb/library/yql/minikql/comp_nodes/CMakeLists.linux.txt +++ b/ydb/library/yql/minikql/comp_nodes/CMakeLists.linux.txt @@ -21,6 +21,7 @@ target_link_libraries(yql-minikql-comp_nodes PUBLIC library-yql-minikql yql-minikql-arrow yql-minikql-invoke_builtins + public-udf-arrow parser-pg_wrapper-interface library-yql-utils cpp-actors-core diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_block_impl.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_block_impl.cpp index f0b709d949..ce7803fe0e 100644 --- a/ydb/library/yql/minikql/comp_nodes/mkql_block_impl.cpp +++ b/ydb/library/yql/minikql/comp_nodes/mkql_block_impl.cpp @@ -5,6 +5,7 @@ #include <ydb/library/yql/minikql/arrow/mkql_functions.h> #include <ydb/library/yql/minikql/mkql_node_builder.h> #include <ydb/library/yql/minikql/arrow/arrow_util.h> +#include <ydb/library/yql/public/udf/arrow/args_dechunker.h> #include <arrow/compute/exec_internal.h> @@ -12,65 +13,6 @@ namespace NKikimr::NMiniKQL { namespace { -class TArgsDechunker { -public: - explicit TArgsDechunker(std::vector<arrow::Datum>&& args) - : Args(std::move(args)) - , Arrays(Args.size()) - { - for (size_t i = 0; i < Args.size(); ++i) { - if (Args[i].is_arraylike()) { - ForEachArrayData(Args[i], [&](const auto& data) { - Arrays[i].push_back(data); - }); - } - } - } - - bool Next(std::vector<arrow::Datum>& chunk) { - if (Finish) { - return false; - } - - size_t minSize = Max<size_t>(); - bool haveData = false; - chunk.resize(Args.size()); - for (size_t i = 0; i < Args.size(); ++i) { - if (Args[i].is_scalar()) { - chunk[i] = Args[i]; - continue; - } - while (!Arrays[i].empty() && Arrays[i].front()->length == 0) { - Arrays[i].pop_front(); - } - if (!Arrays[i].empty()) { - haveData = true; - minSize = std::min<size_t>(minSize, Arrays[i].front()->length); - } else { - minSize = 0; - } - } - - MKQL_ENSURE(!haveData || minSize > 0, "Block length mismatch"); - if (!haveData) { - Finish = true; - return false; - } - - for (size_t i = 0; i < Args.size(); ++i) { - if (!Args[i].is_scalar()) { - MKQL_ENSURE(!Arrays[i].empty(), "Block length mismatch"); - chunk[i] = arrow::Datum(Chop(Arrays[i].front(), minSize)); - } - } - return true; - } -private: - const std::vector<arrow::Datum> Args; - std::vector<std::deque<std::shared_ptr<arrow::ArrayData>>> Arrays; - bool Finish = false; -}; - std::vector<arrow::ValueDescr> ToValueDescr(const TVector<TType*>& types) { std::vector<arrow::ValueDescr> res; res.reserve(types.size()); @@ -149,7 +91,7 @@ NUdf::TUnboxedValuePod TBlockFuncNode::DoCalculate(TComputationContext& ctx) con return ctx.HolderFactory.CreateArrowBlock(std::move(output)); } - TArgsDechunker dechunker(std::move(argDatums)); + NYql::NUdf::TArgsDechunker dechunker(std::move(argDatums)); std::vector<arrow::Datum> chunk; TVector<std::shared_ptr<arrow::ArrayData>> arrays; diff --git a/ydb/library/yql/public/udf/arrow/CMakeLists.darwin.txt b/ydb/library/yql/public/udf/arrow/CMakeLists.darwin.txt index 62f5a45d7f..deda564cfd 100644 --- a/ydb/library/yql/public/udf/arrow/CMakeLists.darwin.txt +++ b/ydb/library/yql/public/udf/arrow/CMakeLists.darwin.txt @@ -18,5 +18,7 @@ target_link_libraries(public-udf-arrow PUBLIC libs-apache-arrow ) target_sources(public-udf-arrow PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/library/yql/public/udf/arrow/args_dechunker.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/public/udf/arrow/udf_arrow_helpers.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/yql/public/udf/arrow/util.cpp ) diff --git a/ydb/library/yql/public/udf/arrow/CMakeLists.linux-aarch64.txt b/ydb/library/yql/public/udf/arrow/CMakeLists.linux-aarch64.txt index d3d13ca78a..a87a49d25a 100644 --- a/ydb/library/yql/public/udf/arrow/CMakeLists.linux-aarch64.txt +++ b/ydb/library/yql/public/udf/arrow/CMakeLists.linux-aarch64.txt @@ -19,5 +19,7 @@ target_link_libraries(public-udf-arrow PUBLIC libs-apache-arrow ) target_sources(public-udf-arrow PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/library/yql/public/udf/arrow/args_dechunker.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/public/udf/arrow/udf_arrow_helpers.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/yql/public/udf/arrow/util.cpp ) diff --git a/ydb/library/yql/public/udf/arrow/CMakeLists.linux.txt b/ydb/library/yql/public/udf/arrow/CMakeLists.linux.txt index d3d13ca78a..a87a49d25a 100644 --- a/ydb/library/yql/public/udf/arrow/CMakeLists.linux.txt +++ b/ydb/library/yql/public/udf/arrow/CMakeLists.linux.txt @@ -19,5 +19,7 @@ target_link_libraries(public-udf-arrow PUBLIC libs-apache-arrow ) target_sources(public-udf-arrow PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/library/yql/public/udf/arrow/args_dechunker.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/public/udf/arrow/udf_arrow_helpers.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/yql/public/udf/arrow/util.cpp ) diff --git a/ydb/library/yql/public/udf/arrow/args_dechunker.cpp b/ydb/library/yql/public/udf/arrow/args_dechunker.cpp new file mode 100644 index 0000000000..6f00a19b07 --- /dev/null +++ b/ydb/library/yql/public/udf/arrow/args_dechunker.cpp @@ -0,0 +1,63 @@ +#include "args_dechunker.h" + +#include <util/generic/yexception.h> +#include <util/generic/ylimits.h> + +namespace NYql { +namespace NUdf { + +TArgsDechunker::TArgsDechunker(std::vector<arrow::Datum>&& args) + : Args(std::move(args)) + , Arrays(Args.size()) +{ + for (size_t i = 0; i < Args.size(); ++i) { + if (Args[i].is_arraylike()) { + ForEachArrayData(Args[i], [&](const auto& data) { + Arrays[i].push_back(data); + }); + } + } +} + +bool TArgsDechunker::Next(std::vector<arrow::Datum>& chunk) { + if (Finish) { + return false; + } + + size_t minSize = Max<size_t>(); + bool haveData = false; + chunk.resize(Args.size()); + for (size_t i = 0; i < Args.size(); ++i) { + if (Args[i].is_scalar()) { + chunk[i] = Args[i]; + continue; + } + while (!Arrays[i].empty() && Arrays[i].front()->length == 0) { + Arrays[i].pop_front(); + } + if (!Arrays[i].empty()) { + haveData = true; + minSize = std::min<size_t>(minSize, Arrays[i].front()->length); + } else { + minSize = 0; + } + } + + Y_ENSURE(!haveData || minSize > 0, "Block length mismatch"); + if (!haveData) { + Finish = true; + return false; + } + + for (size_t i = 0; i < Args.size(); ++i) { + if (!Args[i].is_scalar()) { + Y_ENSURE(!Arrays[i].empty(), "Block length mismatch"); + chunk[i] = arrow::Datum(Chop(Arrays[i].front(), minSize)); + } + } + return true; +} + + +} +} diff --git a/ydb/library/yql/public/udf/arrow/args_dechunker.h b/ydb/library/yql/public/udf/arrow/args_dechunker.h new file mode 100644 index 0000000000..dd5e8302ca --- /dev/null +++ b/ydb/library/yql/public/udf/arrow/args_dechunker.h @@ -0,0 +1,24 @@ +#pragma once + +#include <arrow/datum.h> +#include <vector> +#include <deque> + +#include "util.h" + +namespace NYql { +namespace NUdf { + +class TArgsDechunker { +public: + explicit TArgsDechunker(std::vector<arrow::Datum>&& args); + bool Next(std::vector<arrow::Datum>& chunk); + +private: + const std::vector<arrow::Datum> Args; + std::vector<std::deque<std::shared_ptr<arrow::ArrayData>>> Arrays; + bool Finish = false; +}; + +} +} diff --git a/ydb/library/yql/public/udf/arrow/defs.h b/ydb/library/yql/public/udf/arrow/defs.h new file mode 100644 index 0000000000..f3f14f56d1 --- /dev/null +++ b/ydb/library/yql/public/udf/arrow/defs.h @@ -0,0 +1,27 @@ +#pragma once +#include <arrow/status.h> +#include <arrow/result.h> +#include <util/generic/yexception.h> + +#define ARROW_CHECK_STATUS(s, op, ...) \ + if (!s.ok()) { \ + ythrow yexception() << "Operation failed: [" << #op << "]\n" \ + << "" __VA_ARGS__ << ": [" << s.ToString() << "]"; \ + } + +#define ARROW_OK_S(op, ...) \ +do { \ + ::arrow::Status _s = (op); \ + ARROW_CHECK_STATUS(_s, op, __VA_ARGS__); \ + } while (false) + +#define ARROW_OK(op) ARROW_OK_S(op, "Bad status") + +#define ARROW_RESULT_S(op, ...) \ + [&]() { \ + auto result = (op); \ + ARROW_CHECK_STATUS(result.status(), op, __VA_ARGS__); \ + return std::move(result).ValueOrDie(); \ + }() + +#define ARROW_RESULT(op) ARROW_RESULT_S(op, "Bad status") diff --git a/ydb/library/yql/public/udf/arrow/udf_arrow_helpers.h b/ydb/library/yql/public/udf/arrow/udf_arrow_helpers.h index 80c3525f24..cfdc9030cf 100644 --- a/ydb/library/yql/public/udf/arrow/udf_arrow_helpers.h +++ b/ydb/library/yql/public/udf/arrow/udf_arrow_helpers.h @@ -1,14 +1,20 @@ +#pragma once #include <ydb/library/yql/public/udf/udf_type_builder.h> #include <ydb/library/yql/public/udf/udf_value.h> #include <ydb/library/yql/public/udf/udf_helpers.h> #include <ydb/library/yql/public/udf/udf_data_type.h> #include <ydb/library/yql/public/udf/udf_type_inspection.h> +#include "defs.h" +#include "util.h" +#include "args_dechunker.h" + #include <arrow/array/array_base.h> #include <arrow/array/util.h> #include <arrow/c/bridge.h> #include <arrow/chunked_array.h> #include <arrow/compute/kernel.h> +#include <arrow/compute/exec_internal.h> namespace NYql { namespace NUdf { @@ -26,11 +32,25 @@ public: , Pos_(GetSourcePosition(builder)) , Name_(name) , KernelContext_(&ExecContext_) - {} + { + Kernel_.null_handling = arrow::compute::NullHandling::COMPUTED_NO_PREALLOCATE; + Kernel_.exec = Exec_; + std::vector<arrow::compute::InputType> inTypes; + for (const auto& t : ArgTypes_) { + inTypes.emplace_back(t); + ArgsValuesDescr_.emplace_back(t); + } + + ArrowSchema s; + ReturnType_->Export(&s); + arrow::compute::OutputType outType = ARROW_RESULT(arrow::ImportType(&s)); + + Kernel_.signature = arrow::compute::KernelSignature::Make(std::move(inTypes), std::move(outType)); + } TUnboxedValue Run(const IValueBuilder* valueBuilder, const TUnboxedValuePod* args) const final { try { - TVector<arrow::Datum> datums(ArgTypes_.size()); + TVector<arrow::Datum> argDatums(ArgTypes_.size()); for (ui32 i = 0; i < ArgTypes_.size(); ++i) { bool isScalar; ui64 length; @@ -38,75 +58,65 @@ public: if (isScalar) { ArrowArray a; valueBuilder->ExportArrowBlock(args[i], 0, &a); - auto res = arrow::ImportArray(&a, ArgTypes_[i]); - if (!res.status().ok()) { - throw yexception() << res.status().ToString(); - } - - auto arr = std::move(res).ValueOrDie(); - auto scalarRes = arr->GetScalar(0); - if (!scalarRes.status().ok()) { - throw yexception() << scalarRes.status().ToString(); - } - - auto scalar = std::move(scalarRes).ValueOrDie(); - datums[i] = scalar; + auto arr = ARROW_RESULT(arrow::ImportArray(&a, ArgTypes_[i])); + auto scalar = ARROW_RESULT(arr->GetScalar(0)); + argDatums[i] = scalar; } else { TVector<std::shared_ptr<arrow::Array>> imported(chunkCount); for (ui32 i = 0; i < chunkCount; ++i) { ArrowArray a; valueBuilder->ExportArrowBlock(args[i], i, &a); - auto arrRes = arrow::ImportArray(&a, ArgTypes_[i]); - if (!arrRes.status().ok()) { - UdfTerminate(arrRes.status().ToString().c_str()); - } - - imported[i] = std::move(arrRes).ValueOrDie(); + auto arr = ARROW_RESULT(arrow::ImportArray(&a, ArgTypes_[i])); + imported[i] = arr; } if (chunkCount == 1) { - datums[i] = imported.front(); + argDatums[i] = imported.front(); } else { - datums[i] = arrow::ChunkedArray::Make(std::move(imported), ArgTypes_[i]).ValueOrDie(); + argDatums[i] = ARROW_RESULT(arrow::ChunkedArray::Make(std::move(imported), ArgTypes_[i])); } } } - // TODO dechunking, scalar executor - Y_ENSURE(false); - Y_UNUSED(Exec_); - Y_UNUSED(KernelContext_); + auto executor = arrow::compute::detail::KernelExecutor::MakeScalar(); + ARROW_OK(executor->Init(&KernelContext_, { &Kernel_, ArgsValuesDescr_, nullptr })); + arrow::Datum res; if (OnlyScalars_) { - auto arrRes = arrow::MakeArrayFromScalar(*res.scalar(), 1); - if (!arrRes.status().ok()) { - throw yexception() << arrRes.status().ToString(); - } + auto listener = std::make_shared<arrow::compute::detail::DatumAccumulator>(); + ARROW_OK(executor->Execute(argDatums, listener.get())); + res = executor->WrapResults(argDatums, listener->values()); + } else { + TArgsDechunker dechunker(std::move(argDatums)); + std::vector<arrow::Datum> chunk; + TVector<std::shared_ptr<arrow::ArrayData>> arrays; - auto arr = std::move(arrRes).ValueOrDie(); - ArrowArray a; - auto status = arrow::ExportArray(*arr, &a); - if (!status.ok()) { - throw yexception() << status.ToString(); + while (dechunker.Next(chunk)) { + arrow::compute::detail::DatumAccumulator listener; + ARROW_OK(executor->Execute(chunk, &listener)); + auto output = executor->WrapResults(chunk, listener.values()); + + ForEachArrayData(output, [&](const auto& arr) { arrays.push_back(arr); }); } + res = MakeArray(arrays); + } + + if (OnlyScalars_) { + auto arr = ARROW_RESULT(arrow::MakeArrayFromScalar(*res.scalar(), 1)); + ArrowArray a; + ARROW_OK(arrow::ExportArray(*arr, &a)); return valueBuilder->ImportArrowBlock(&a, 1, true, *ReturnType_); } else { TVector<ArrowArray> a; if (res.is_array()) { a.resize(1); - auto status = arrow::ExportArray(*res.make_array(), &a[0]); - if (!status.ok()) { - throw yexception() << status.ToString(); - } + ARROW_OK(arrow::ExportArray(*res.make_array(), &a[0])); } else { Y_ENSURE(res.is_arraylike()); a.resize(res.chunks().size()); for (ui32 i = 0; i < res.chunks().size(); ++i) { - auto status = arrow::ExportArray(*res.chunks()[i], &a[i]); - if (!status.ok()) { - throw yexception() << status.ToString(); - } + ARROW_OK(arrow::ExportArray(*res.chunks()[i], &a[i])); } } @@ -131,6 +141,8 @@ private: arrow::compute::ExecContext ExecContext_; mutable arrow::compute::KernelContext KernelContext_; + arrow::compute::ScalarKernel Kernel_; + std::vector<arrow::ValueDescr> ArgsValuesDescr_; }; inline void PrepareSimpleArrowUdf(IFunctionTypeInfoBuilder& builder, TType* signature, TType* userType, TExec exec, bool typesOnly, @@ -179,7 +191,12 @@ inline void PrepareSimpleArrowUdf(IFunctionTypeInfoBuilder& builder, TType* sign argsBuilder->Flags(callableInspector.GetArgumentFlags(i)); } - // TODO fill argTypes + auto arrowTypeHandle = typeInfoHelper->MakeArrowType(initalType); + Y_ENSURE(arrowTypeHandle); + ArrowSchema s; + arrowTypeHandle->Export(&s); + auto type = ARROW_RESULT(arrow::ImportType(&s)); + argTypes.emplace_back(type); } builder.Returns(builder.Block(onlyScalars)->Item(callableInspector.GetReturnType()).Build()); diff --git a/ydb/library/yql/public/udf/arrow/util.cpp b/ydb/library/yql/public/udf/arrow/util.cpp new file mode 100644 index 0000000000..d8b8c6b6bb --- /dev/null +++ b/ydb/library/yql/public/udf/arrow/util.cpp @@ -0,0 +1,69 @@ +#include "util.h" +#include "defs.h" + +#include <arrow/array/array_base.h> +#include <arrow/chunked_array.h> + +namespace NYql { +namespace NUdf { + +std::shared_ptr<arrow::ArrayData> DeepSlice(const std::shared_ptr<arrow::ArrayData>& data, size_t offset, size_t len) { + Y_ENSURE(data->length >= 0); + Y_ENSURE(offset + len <= (size_t)data->length); + if (offset == 0 && len == (size_t)data->length) { + return data; + } + + std::shared_ptr<arrow::ArrayData> result = data->Copy(); + result->offset = data->offset + offset; + result->length = len; + + if (data->null_count == data->length) { + result->null_count = len; + } else if (len == 0) { + result->null_count = 0; + } else { + result->null_count = data->null_count != 0 ? arrow::kUnknownNullCount : 0; + } + + for (size_t i = 0; i < data->child_data.size(); ++i) { + result->child_data[i] = DeepSlice(data->child_data[i], offset, len); + } + + return result; +} + +std::shared_ptr<arrow::ArrayData> Chop(std::shared_ptr<arrow::ArrayData>& data, size_t len) { + auto first = DeepSlice(data, 0, len); + data = DeepSlice(data, len, data->length - len); + return first; +} + +void ForEachArrayData(const arrow::Datum& datum, const std::function<void(const std::shared_ptr<arrow::ArrayData>&)>& func) { + Y_ENSURE(datum.is_arraylike(), "Expected array"); + if (datum.is_array()) { + func(datum.array()); + } else { + for (auto& chunk : datum.chunks()) { + func(chunk->data()); + } + } +} + +arrow::Datum MakeArray(const TVector<std::shared_ptr<arrow::ArrayData>>& chunks) { + Y_ENSURE(!chunks.empty(), "Expected non empty chunks"); + arrow::ArrayVector resultChunks; + for (auto& chunk : chunks) { + resultChunks.push_back(arrow::Datum(chunk).make_array()); + } + + if (resultChunks.size() > 1) { + auto type = resultChunks.front()->type(); + auto chunked = ARROW_RESULT(arrow::ChunkedArray::Make(std::move(resultChunks), type)); + return arrow::Datum(chunked); + } + return arrow::Datum(resultChunks.front()); +} + +} +} diff --git a/ydb/library/yql/public/udf/arrow/util.h b/ydb/library/yql/public/udf/arrow/util.h new file mode 100644 index 0000000000..ec8a0ebb11 --- /dev/null +++ b/ydb/library/yql/public/udf/arrow/util.h @@ -0,0 +1,22 @@ +#pragma once + +#include <util/generic/vector.h> + +#include <arrow/datum.h> + +#include <functional> + +namespace NYql { +namespace NUdf { + +/// \brief Recursive version of ArrayData::Slice() method +std::shared_ptr<arrow::ArrayData> DeepSlice(const std::shared_ptr<arrow::ArrayData>& data, size_t offset, size_t len); + +/// \brief Chops first len items of `data` as new ArrayData object +std::shared_ptr<arrow::ArrayData> Chop(std::shared_ptr<arrow::ArrayData>& data, size_t len); + +void ForEachArrayData(const arrow::Datum& datum, const std::function<void(const std::shared_ptr<arrow::ArrayData>&)>& func); +arrow::Datum MakeArray(const TVector<std::shared_ptr<arrow::ArrayData>>& chunks); + +} +} diff --git a/ydb/library/yql/udfs/common/url_base/lib/url_base_udf.h b/ydb/library/yql/udfs/common/url_base/lib/url_base_udf.h index 832b61a228..5fcf17c914 100644 --- a/ydb/library/yql/udfs/common/url_base/lib/url_base_udf.h +++ b/ydb/library/yql/udfs/common/url_base/lib/url_base_udf.h @@ -43,9 +43,8 @@ SIMPLE_UDF(TGetScheme, char*(TAutoMap<char*>)) { inline arrow::Status GetHostKernelExec(arrow::compute::KernelContext* ctx, const arrow::compute::ExecBatch& batch, arrow::Datum* res) { Y_UNUSED(ctx); - Y_UNUSED(batch); - Y_UNUSED(res); - Y_ENSURE(false); + *res = batch.values[0]; + return arrow::Status::OK(); } BEGIN_SIMPLE_ARROW_UDF(TGetHost, TOptional<char*>(TOptional<char*>)) { |