aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorvvvv <vvvv@ydb.tech>2023-01-25 12:37:19 +0300
committervvvv <vvvv@ydb.tech>2023-01-25 12:37:19 +0300
commit72e9af3baa230f3e1b63832986ba6e2c92b9ac9b (patch)
tree5f2e9462601c6b3278d654b31de1f2dcbd13504e
parentce1b7ebdaf9751df49f242c3dbc748fbfc8e49dc (diff)
downloadydb-72e9af3baa230f3e1b63832986ba6e2c92b9ac9b.tar.gz
simple udfs executor
-rw-r--r--ydb/library/yql/minikql/arrow/CMakeLists.darwin.txt1
-rw-r--r--ydb/library/yql/minikql/arrow/CMakeLists.linux-aarch64.txt1
-rw-r--r--ydb/library/yql/minikql/arrow/CMakeLists.linux.txt1
-rw-r--r--ydb/library/yql/minikql/arrow/arrow_defs.h21
-rw-r--r--ydb/library/yql/minikql/arrow/arrow_util.cpp58
-rw-r--r--ydb/library/yql/minikql/arrow/arrow_util.h12
-rw-r--r--ydb/library/yql/minikql/comp_nodes/CMakeLists.darwin.txt1
-rw-r--r--ydb/library/yql/minikql/comp_nodes/CMakeLists.linux-aarch64.txt1
-rw-r--r--ydb/library/yql/minikql/comp_nodes/CMakeLists.linux.txt1
-rw-r--r--ydb/library/yql/minikql/comp_nodes/mkql_block_impl.cpp62
-rw-r--r--ydb/library/yql/public/udf/arrow/CMakeLists.darwin.txt2
-rw-r--r--ydb/library/yql/public/udf/arrow/CMakeLists.linux-aarch64.txt2
-rw-r--r--ydb/library/yql/public/udf/arrow/CMakeLists.linux.txt2
-rw-r--r--ydb/library/yql/public/udf/arrow/args_dechunker.cpp63
-rw-r--r--ydb/library/yql/public/udf/arrow/args_dechunker.h24
-rw-r--r--ydb/library/yql/public/udf/arrow/defs.h27
-rw-r--r--ydb/library/yql/public/udf/arrow/udf_arrow_helpers.h107
-rw-r--r--ydb/library/yql/public/udf/arrow/util.cpp69
-rw-r--r--ydb/library/yql/public/udf/arrow/util.h22
-rw-r--r--ydb/library/yql/udfs/common/url_base/lib/url_base_udf.h5
20 files changed, 289 insertions, 193 deletions
diff --git a/ydb/library/yql/minikql/arrow/CMakeLists.darwin.txt b/ydb/library/yql/minikql/arrow/CMakeLists.darwin.txt
index 080541694f..5bb17a45a4 100644
--- a/ydb/library/yql/minikql/arrow/CMakeLists.darwin.txt
+++ b/ydb/library/yql/minikql/arrow/CMakeLists.darwin.txt
@@ -16,6 +16,7 @@ target_link_libraries(yql-minikql-arrow PUBLIC
yutil
libs-apache-arrow
library-yql-minikql
+ public-udf-arrow
)
target_sources(yql-minikql-arrow PRIVATE
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/arrow/arrow_util.cpp
diff --git a/ydb/library/yql/minikql/arrow/CMakeLists.linux-aarch64.txt b/ydb/library/yql/minikql/arrow/CMakeLists.linux-aarch64.txt
index edb0b73378..7321351567 100644
--- a/ydb/library/yql/minikql/arrow/CMakeLists.linux-aarch64.txt
+++ b/ydb/library/yql/minikql/arrow/CMakeLists.linux-aarch64.txt
@@ -17,6 +17,7 @@ target_link_libraries(yql-minikql-arrow PUBLIC
yutil
libs-apache-arrow
library-yql-minikql
+ public-udf-arrow
)
target_sources(yql-minikql-arrow PRIVATE
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/arrow/arrow_util.cpp
diff --git a/ydb/library/yql/minikql/arrow/CMakeLists.linux.txt b/ydb/library/yql/minikql/arrow/CMakeLists.linux.txt
index edb0b73378..7321351567 100644
--- a/ydb/library/yql/minikql/arrow/CMakeLists.linux.txt
+++ b/ydb/library/yql/minikql/arrow/CMakeLists.linux.txt
@@ -17,6 +17,7 @@ target_link_libraries(yql-minikql-arrow PUBLIC
yutil
libs-apache-arrow
library-yql-minikql
+ public-udf-arrow
)
target_sources(yql-minikql-arrow PRIVATE
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/arrow/arrow_util.cpp
diff --git a/ydb/library/yql/minikql/arrow/arrow_defs.h b/ydb/library/yql/minikql/arrow/arrow_defs.h
index 97b26e0345..8607cae612 100644
--- a/ydb/library/yql/minikql/arrow/arrow_defs.h
+++ b/ydb/library/yql/minikql/arrow/arrow_defs.h
@@ -1,24 +1,5 @@
#pragma once
#include <ydb/library/yql/minikql/defs.h>
+#include <ydb/library/yql/public/udf/arrow/defs.h>
-#define ARROW_CHECK_STATUS(s, op, ...) \
- MKQL_ENSURE(s.ok(), "Operation failed: [" << #op << "]\n" \
- << "" __VA_ARGS__ << ": [" << s.ToString() << "]") \
-
-#define ARROW_OK_S(op, ...) \
-do { \
- ::arrow::Status _s = (op); \
- ARROW_CHECK_STATUS(_s, op, __VA_ARGS__); \
- } while (false)
-
-#define ARROW_OK(op) ARROW_OK_S(op, "Bad status")
-
-#define ARROW_RESULT_S(op, ...) \
- [&]() { \
- auto result = (op); \
- ARROW_CHECK_STATUS(result.status(), op, __VA_ARGS__); \
- return std::move(result).ValueOrDie(); \
- }()
-
-#define ARROW_RESULT(op) ARROW_RESULT_S(op, "Bad status")
diff --git a/ydb/library/yql/minikql/arrow/arrow_util.cpp b/ydb/library/yql/minikql/arrow/arrow_util.cpp
index ff8015734c..942ae6ef95 100644
--- a/ydb/library/yql/minikql/arrow/arrow_util.cpp
+++ b/ydb/library/yql/minikql/arrow/arrow_util.cpp
@@ -10,38 +10,6 @@
namespace NKikimr::NMiniKQL {
-std::shared_ptr<arrow::ArrayData> DeepSlice(const std::shared_ptr<arrow::ArrayData>& data, size_t offset, size_t len) {
- Y_VERIFY(data->length >= 0);
- Y_VERIFY(offset + len <= (size_t)data->length);
- if (offset == 0 && len == (size_t)data->length) {
- return data;
- }
-
- std::shared_ptr<arrow::ArrayData> result = data->Copy();
- result->offset = data->offset + offset;
- result->length = len;
-
- if (data->null_count == data->length) {
- result->null_count = len;
- } else if (len == 0) {
- result->null_count = 0;
- } else {
- result->null_count = data->null_count != 0 ? arrow::kUnknownNullCount : 0;
- }
-
- for (size_t i = 0; i < data->child_data.size(); ++i) {
- result->child_data[i] = DeepSlice(data->child_data[i], offset, len);
- }
-
- return result;
-}
-
-std::shared_ptr<arrow::ArrayData> Chop(std::shared_ptr<arrow::ArrayData>& data, size_t len) {
- auto first = DeepSlice(data, 0, len);
- data = DeepSlice(data, len, data->length - len);
- return first;
-}
-
std::shared_ptr<arrow::ArrayData> Unwrap(const arrow::ArrayData& data, TType* itemType) {
bool isOptional;
auto unpacked = UnpackOptional(itemType, isOptional);
@@ -71,30 +39,4 @@ std::shared_ptr<arrow::Buffer> MakeDenseBitmap(const ui8* srcSparse, size_t len,
return bitmap;
}
-void ForEachArrayData(const arrow::Datum& datum, const std::function<void(const std::shared_ptr<arrow::ArrayData>&)>& func) {
- MKQL_ENSURE(datum.is_arraylike(), "Expected array");
- if (datum.is_array()) {
- func(datum.array());
- } else {
- for (auto& chunk : datum.chunks()) {
- func(chunk->data());
- }
- }
-}
-
-arrow::Datum MakeArray(const TVector<std::shared_ptr<arrow::ArrayData>>& chunks) {
- MKQL_ENSURE(!chunks.empty(), "Expected non empty chunks");
- arrow::ArrayVector resultChunks;
- for (auto& chunk : chunks) {
- resultChunks.push_back(arrow::Datum(chunk).make_array());
- }
-
- if (resultChunks.size() > 1) {
- auto type = resultChunks.front()->type();
- auto chunked = ARROW_RESULT(arrow::ChunkedArray::Make(std::move(resultChunks), type));
- return arrow::Datum(chunked);
- }
- return arrow::Datum(resultChunks.front());
-}
-
}
diff --git a/ydb/library/yql/minikql/arrow/arrow_util.h b/ydb/library/yql/minikql/arrow/arrow_util.h
index 86e0890732..6623fc4384 100644
--- a/ydb/library/yql/minikql/arrow/arrow_util.h
+++ b/ydb/library/yql/minikql/arrow/arrow_util.h
@@ -9,14 +9,12 @@
#include <arrow/util/bitmap.h>
#include <ydb/library/yql/minikql/mkql_node.h>
+#include <ydb/library/yql/public/udf/arrow/util.h>
namespace NKikimr::NMiniKQL {
-/// \brief Recursive version of ArrayData::Slice() method
-std::shared_ptr<arrow::ArrayData> DeepSlice(const std::shared_ptr<arrow::ArrayData>& data, size_t offset, size_t len);
-
-/// \brief Chops first len items of `data` as new ArrayData object
-std::shared_ptr<arrow::ArrayData> Chop(std::shared_ptr<arrow::ArrayData>& data, size_t len);
+using NYql::NUdf::DeepSlice;
+using NYql::NUdf::Chop;
/// \brief Remove optional from `data` as new ArrayData object
std::shared_ptr<arrow::ArrayData> Unwrap(const arrow::ArrayData& data, TType* itemType);
@@ -28,8 +26,8 @@ inline arrow::internal::Bitmap GetBitmap(const arrow::ArrayData& arr, int index)
return arrow::internal::Bitmap{ arr.buffers[index], arr.offset, arr.length };
}
-void ForEachArrayData(const arrow::Datum& datum, const std::function<void(const std::shared_ptr<arrow::ArrayData>&)>& func);
-arrow::Datum MakeArray(const TVector<std::shared_ptr<arrow::ArrayData>>& chunks);
+using NYql::NUdf::ForEachArrayData;
+using NYql::NUdf::MakeArray;
template <typename T>
T GetPrimitiveScalarValue(const arrow::Scalar& scalar) {
diff --git a/ydb/library/yql/minikql/comp_nodes/CMakeLists.darwin.txt b/ydb/library/yql/minikql/comp_nodes/CMakeLists.darwin.txt
index 1167e76e2c..0584fa1f2e 100644
--- a/ydb/library/yql/minikql/comp_nodes/CMakeLists.darwin.txt
+++ b/ydb/library/yql/minikql/comp_nodes/CMakeLists.darwin.txt
@@ -20,6 +20,7 @@ target_link_libraries(yql-minikql-comp_nodes PUBLIC
library-yql-minikql
yql-minikql-arrow
yql-minikql-invoke_builtins
+ public-udf-arrow
parser-pg_wrapper-interface
library-yql-utils
cpp-actors-core
diff --git a/ydb/library/yql/minikql/comp_nodes/CMakeLists.linux-aarch64.txt b/ydb/library/yql/minikql/comp_nodes/CMakeLists.linux-aarch64.txt
index fe47fb92fa..870b944bfe 100644
--- a/ydb/library/yql/minikql/comp_nodes/CMakeLists.linux-aarch64.txt
+++ b/ydb/library/yql/minikql/comp_nodes/CMakeLists.linux-aarch64.txt
@@ -21,6 +21,7 @@ target_link_libraries(yql-minikql-comp_nodes PUBLIC
library-yql-minikql
yql-minikql-arrow
yql-minikql-invoke_builtins
+ public-udf-arrow
parser-pg_wrapper-interface
library-yql-utils
cpp-actors-core
diff --git a/ydb/library/yql/minikql/comp_nodes/CMakeLists.linux.txt b/ydb/library/yql/minikql/comp_nodes/CMakeLists.linux.txt
index fe47fb92fa..870b944bfe 100644
--- a/ydb/library/yql/minikql/comp_nodes/CMakeLists.linux.txt
+++ b/ydb/library/yql/minikql/comp_nodes/CMakeLists.linux.txt
@@ -21,6 +21,7 @@ target_link_libraries(yql-minikql-comp_nodes PUBLIC
library-yql-minikql
yql-minikql-arrow
yql-minikql-invoke_builtins
+ public-udf-arrow
parser-pg_wrapper-interface
library-yql-utils
cpp-actors-core
diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_block_impl.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_block_impl.cpp
index f0b709d949..ce7803fe0e 100644
--- a/ydb/library/yql/minikql/comp_nodes/mkql_block_impl.cpp
+++ b/ydb/library/yql/minikql/comp_nodes/mkql_block_impl.cpp
@@ -5,6 +5,7 @@
#include <ydb/library/yql/minikql/arrow/mkql_functions.h>
#include <ydb/library/yql/minikql/mkql_node_builder.h>
#include <ydb/library/yql/minikql/arrow/arrow_util.h>
+#include <ydb/library/yql/public/udf/arrow/args_dechunker.h>
#include <arrow/compute/exec_internal.h>
@@ -12,65 +13,6 @@ namespace NKikimr::NMiniKQL {
namespace {
-class TArgsDechunker {
-public:
- explicit TArgsDechunker(std::vector<arrow::Datum>&& args)
- : Args(std::move(args))
- , Arrays(Args.size())
- {
- for (size_t i = 0; i < Args.size(); ++i) {
- if (Args[i].is_arraylike()) {
- ForEachArrayData(Args[i], [&](const auto& data) {
- Arrays[i].push_back(data);
- });
- }
- }
- }
-
- bool Next(std::vector<arrow::Datum>& chunk) {
- if (Finish) {
- return false;
- }
-
- size_t minSize = Max<size_t>();
- bool haveData = false;
- chunk.resize(Args.size());
- for (size_t i = 0; i < Args.size(); ++i) {
- if (Args[i].is_scalar()) {
- chunk[i] = Args[i];
- continue;
- }
- while (!Arrays[i].empty() && Arrays[i].front()->length == 0) {
- Arrays[i].pop_front();
- }
- if (!Arrays[i].empty()) {
- haveData = true;
- minSize = std::min<size_t>(minSize, Arrays[i].front()->length);
- } else {
- minSize = 0;
- }
- }
-
- MKQL_ENSURE(!haveData || minSize > 0, "Block length mismatch");
- if (!haveData) {
- Finish = true;
- return false;
- }
-
- for (size_t i = 0; i < Args.size(); ++i) {
- if (!Args[i].is_scalar()) {
- MKQL_ENSURE(!Arrays[i].empty(), "Block length mismatch");
- chunk[i] = arrow::Datum(Chop(Arrays[i].front(), minSize));
- }
- }
- return true;
- }
-private:
- const std::vector<arrow::Datum> Args;
- std::vector<std::deque<std::shared_ptr<arrow::ArrayData>>> Arrays;
- bool Finish = false;
-};
-
std::vector<arrow::ValueDescr> ToValueDescr(const TVector<TType*>& types) {
std::vector<arrow::ValueDescr> res;
res.reserve(types.size());
@@ -149,7 +91,7 @@ NUdf::TUnboxedValuePod TBlockFuncNode::DoCalculate(TComputationContext& ctx) con
return ctx.HolderFactory.CreateArrowBlock(std::move(output));
}
- TArgsDechunker dechunker(std::move(argDatums));
+ NYql::NUdf::TArgsDechunker dechunker(std::move(argDatums));
std::vector<arrow::Datum> chunk;
TVector<std::shared_ptr<arrow::ArrayData>> arrays;
diff --git a/ydb/library/yql/public/udf/arrow/CMakeLists.darwin.txt b/ydb/library/yql/public/udf/arrow/CMakeLists.darwin.txt
index 62f5a45d7f..deda564cfd 100644
--- a/ydb/library/yql/public/udf/arrow/CMakeLists.darwin.txt
+++ b/ydb/library/yql/public/udf/arrow/CMakeLists.darwin.txt
@@ -18,5 +18,7 @@ target_link_libraries(public-udf-arrow PUBLIC
libs-apache-arrow
)
target_sources(public-udf-arrow PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/public/udf/arrow/args_dechunker.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/public/udf/arrow/udf_arrow_helpers.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/public/udf/arrow/util.cpp
)
diff --git a/ydb/library/yql/public/udf/arrow/CMakeLists.linux-aarch64.txt b/ydb/library/yql/public/udf/arrow/CMakeLists.linux-aarch64.txt
index d3d13ca78a..a87a49d25a 100644
--- a/ydb/library/yql/public/udf/arrow/CMakeLists.linux-aarch64.txt
+++ b/ydb/library/yql/public/udf/arrow/CMakeLists.linux-aarch64.txt
@@ -19,5 +19,7 @@ target_link_libraries(public-udf-arrow PUBLIC
libs-apache-arrow
)
target_sources(public-udf-arrow PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/public/udf/arrow/args_dechunker.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/public/udf/arrow/udf_arrow_helpers.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/public/udf/arrow/util.cpp
)
diff --git a/ydb/library/yql/public/udf/arrow/CMakeLists.linux.txt b/ydb/library/yql/public/udf/arrow/CMakeLists.linux.txt
index d3d13ca78a..a87a49d25a 100644
--- a/ydb/library/yql/public/udf/arrow/CMakeLists.linux.txt
+++ b/ydb/library/yql/public/udf/arrow/CMakeLists.linux.txt
@@ -19,5 +19,7 @@ target_link_libraries(public-udf-arrow PUBLIC
libs-apache-arrow
)
target_sources(public-udf-arrow PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/public/udf/arrow/args_dechunker.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/public/udf/arrow/udf_arrow_helpers.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/public/udf/arrow/util.cpp
)
diff --git a/ydb/library/yql/public/udf/arrow/args_dechunker.cpp b/ydb/library/yql/public/udf/arrow/args_dechunker.cpp
new file mode 100644
index 0000000000..6f00a19b07
--- /dev/null
+++ b/ydb/library/yql/public/udf/arrow/args_dechunker.cpp
@@ -0,0 +1,63 @@
+#include "args_dechunker.h"
+
+#include <util/generic/yexception.h>
+#include <util/generic/ylimits.h>
+
+namespace NYql {
+namespace NUdf {
+
+TArgsDechunker::TArgsDechunker(std::vector<arrow::Datum>&& args)
+ : Args(std::move(args))
+ , Arrays(Args.size())
+{
+ for (size_t i = 0; i < Args.size(); ++i) {
+ if (Args[i].is_arraylike()) {
+ ForEachArrayData(Args[i], [&](const auto& data) {
+ Arrays[i].push_back(data);
+ });
+ }
+ }
+}
+
+bool TArgsDechunker::Next(std::vector<arrow::Datum>& chunk) {
+ if (Finish) {
+ return false;
+ }
+
+ size_t minSize = Max<size_t>();
+ bool haveData = false;
+ chunk.resize(Args.size());
+ for (size_t i = 0; i < Args.size(); ++i) {
+ if (Args[i].is_scalar()) {
+ chunk[i] = Args[i];
+ continue;
+ }
+ while (!Arrays[i].empty() && Arrays[i].front()->length == 0) {
+ Arrays[i].pop_front();
+ }
+ if (!Arrays[i].empty()) {
+ haveData = true;
+ minSize = std::min<size_t>(minSize, Arrays[i].front()->length);
+ } else {
+ minSize = 0;
+ }
+ }
+
+ Y_ENSURE(!haveData || minSize > 0, "Block length mismatch");
+ if (!haveData) {
+ Finish = true;
+ return false;
+ }
+
+ for (size_t i = 0; i < Args.size(); ++i) {
+ if (!Args[i].is_scalar()) {
+ Y_ENSURE(!Arrays[i].empty(), "Block length mismatch");
+ chunk[i] = arrow::Datum(Chop(Arrays[i].front(), minSize));
+ }
+ }
+ return true;
+}
+
+
+}
+}
diff --git a/ydb/library/yql/public/udf/arrow/args_dechunker.h b/ydb/library/yql/public/udf/arrow/args_dechunker.h
new file mode 100644
index 0000000000..dd5e8302ca
--- /dev/null
+++ b/ydb/library/yql/public/udf/arrow/args_dechunker.h
@@ -0,0 +1,24 @@
+#pragma once
+
+#include <arrow/datum.h>
+#include <vector>
+#include <deque>
+
+#include "util.h"
+
+namespace NYql {
+namespace NUdf {
+
+class TArgsDechunker {
+public:
+ explicit TArgsDechunker(std::vector<arrow::Datum>&& args);
+ bool Next(std::vector<arrow::Datum>& chunk);
+
+private:
+ const std::vector<arrow::Datum> Args;
+ std::vector<std::deque<std::shared_ptr<arrow::ArrayData>>> Arrays;
+ bool Finish = false;
+};
+
+}
+}
diff --git a/ydb/library/yql/public/udf/arrow/defs.h b/ydb/library/yql/public/udf/arrow/defs.h
new file mode 100644
index 0000000000..f3f14f56d1
--- /dev/null
+++ b/ydb/library/yql/public/udf/arrow/defs.h
@@ -0,0 +1,27 @@
+#pragma once
+#include <arrow/status.h>
+#include <arrow/result.h>
+#include <util/generic/yexception.h>
+
+#define ARROW_CHECK_STATUS(s, op, ...) \
+ if (!s.ok()) { \
+ ythrow yexception() << "Operation failed: [" << #op << "]\n" \
+ << "" __VA_ARGS__ << ": [" << s.ToString() << "]"; \
+ }
+
+#define ARROW_OK_S(op, ...) \
+do { \
+ ::arrow::Status _s = (op); \
+ ARROW_CHECK_STATUS(_s, op, __VA_ARGS__); \
+ } while (false)
+
+#define ARROW_OK(op) ARROW_OK_S(op, "Bad status")
+
+#define ARROW_RESULT_S(op, ...) \
+ [&]() { \
+ auto result = (op); \
+ ARROW_CHECK_STATUS(result.status(), op, __VA_ARGS__); \
+ return std::move(result).ValueOrDie(); \
+ }()
+
+#define ARROW_RESULT(op) ARROW_RESULT_S(op, "Bad status")
diff --git a/ydb/library/yql/public/udf/arrow/udf_arrow_helpers.h b/ydb/library/yql/public/udf/arrow/udf_arrow_helpers.h
index 80c3525f24..cfdc9030cf 100644
--- a/ydb/library/yql/public/udf/arrow/udf_arrow_helpers.h
+++ b/ydb/library/yql/public/udf/arrow/udf_arrow_helpers.h
@@ -1,14 +1,20 @@
+#pragma once
#include <ydb/library/yql/public/udf/udf_type_builder.h>
#include <ydb/library/yql/public/udf/udf_value.h>
#include <ydb/library/yql/public/udf/udf_helpers.h>
#include <ydb/library/yql/public/udf/udf_data_type.h>
#include <ydb/library/yql/public/udf/udf_type_inspection.h>
+#include "defs.h"
+#include "util.h"
+#include "args_dechunker.h"
+
#include <arrow/array/array_base.h>
#include <arrow/array/util.h>
#include <arrow/c/bridge.h>
#include <arrow/chunked_array.h>
#include <arrow/compute/kernel.h>
+#include <arrow/compute/exec_internal.h>
namespace NYql {
namespace NUdf {
@@ -26,11 +32,25 @@ public:
, Pos_(GetSourcePosition(builder))
, Name_(name)
, KernelContext_(&ExecContext_)
- {}
+ {
+ Kernel_.null_handling = arrow::compute::NullHandling::COMPUTED_NO_PREALLOCATE;
+ Kernel_.exec = Exec_;
+ std::vector<arrow::compute::InputType> inTypes;
+ for (const auto& t : ArgTypes_) {
+ inTypes.emplace_back(t);
+ ArgsValuesDescr_.emplace_back(t);
+ }
+
+ ArrowSchema s;
+ ReturnType_->Export(&s);
+ arrow::compute::OutputType outType = ARROW_RESULT(arrow::ImportType(&s));
+
+ Kernel_.signature = arrow::compute::KernelSignature::Make(std::move(inTypes), std::move(outType));
+ }
TUnboxedValue Run(const IValueBuilder* valueBuilder, const TUnboxedValuePod* args) const final {
try {
- TVector<arrow::Datum> datums(ArgTypes_.size());
+ TVector<arrow::Datum> argDatums(ArgTypes_.size());
for (ui32 i = 0; i < ArgTypes_.size(); ++i) {
bool isScalar;
ui64 length;
@@ -38,75 +58,65 @@ public:
if (isScalar) {
ArrowArray a;
valueBuilder->ExportArrowBlock(args[i], 0, &a);
- auto res = arrow::ImportArray(&a, ArgTypes_[i]);
- if (!res.status().ok()) {
- throw yexception() << res.status().ToString();
- }
-
- auto arr = std::move(res).ValueOrDie();
- auto scalarRes = arr->GetScalar(0);
- if (!scalarRes.status().ok()) {
- throw yexception() << scalarRes.status().ToString();
- }
-
- auto scalar = std::move(scalarRes).ValueOrDie();
- datums[i] = scalar;
+ auto arr = ARROW_RESULT(arrow::ImportArray(&a, ArgTypes_[i]));
+ auto scalar = ARROW_RESULT(arr->GetScalar(0));
+ argDatums[i] = scalar;
} else {
TVector<std::shared_ptr<arrow::Array>> imported(chunkCount);
for (ui32 i = 0; i < chunkCount; ++i) {
ArrowArray a;
valueBuilder->ExportArrowBlock(args[i], i, &a);
- auto arrRes = arrow::ImportArray(&a, ArgTypes_[i]);
- if (!arrRes.status().ok()) {
- UdfTerminate(arrRes.status().ToString().c_str());
- }
-
- imported[i] = std::move(arrRes).ValueOrDie();
+ auto arr = ARROW_RESULT(arrow::ImportArray(&a, ArgTypes_[i]));
+ imported[i] = arr;
}
if (chunkCount == 1) {
- datums[i] = imported.front();
+ argDatums[i] = imported.front();
} else {
- datums[i] = arrow::ChunkedArray::Make(std::move(imported), ArgTypes_[i]).ValueOrDie();
+ argDatums[i] = ARROW_RESULT(arrow::ChunkedArray::Make(std::move(imported), ArgTypes_[i]));
}
}
}
- // TODO dechunking, scalar executor
- Y_ENSURE(false);
- Y_UNUSED(Exec_);
- Y_UNUSED(KernelContext_);
+ auto executor = arrow::compute::detail::KernelExecutor::MakeScalar();
+ ARROW_OK(executor->Init(&KernelContext_, { &Kernel_, ArgsValuesDescr_, nullptr }));
+
arrow::Datum res;
if (OnlyScalars_) {
- auto arrRes = arrow::MakeArrayFromScalar(*res.scalar(), 1);
- if (!arrRes.status().ok()) {
- throw yexception() << arrRes.status().ToString();
- }
+ auto listener = std::make_shared<arrow::compute::detail::DatumAccumulator>();
+ ARROW_OK(executor->Execute(argDatums, listener.get()));
+ res = executor->WrapResults(argDatums, listener->values());
+ } else {
+ TArgsDechunker dechunker(std::move(argDatums));
+ std::vector<arrow::Datum> chunk;
+ TVector<std::shared_ptr<arrow::ArrayData>> arrays;
- auto arr = std::move(arrRes).ValueOrDie();
- ArrowArray a;
- auto status = arrow::ExportArray(*arr, &a);
- if (!status.ok()) {
- throw yexception() << status.ToString();
+ while (dechunker.Next(chunk)) {
+ arrow::compute::detail::DatumAccumulator listener;
+ ARROW_OK(executor->Execute(chunk, &listener));
+ auto output = executor->WrapResults(chunk, listener.values());
+
+ ForEachArrayData(output, [&](const auto& arr) { arrays.push_back(arr); });
}
+ res = MakeArray(arrays);
+ }
+
+ if (OnlyScalars_) {
+ auto arr = ARROW_RESULT(arrow::MakeArrayFromScalar(*res.scalar(), 1));
+ ArrowArray a;
+ ARROW_OK(arrow::ExportArray(*arr, &a));
return valueBuilder->ImportArrowBlock(&a, 1, true, *ReturnType_);
} else {
TVector<ArrowArray> a;
if (res.is_array()) {
a.resize(1);
- auto status = arrow::ExportArray(*res.make_array(), &a[0]);
- if (!status.ok()) {
- throw yexception() << status.ToString();
- }
+ ARROW_OK(arrow::ExportArray(*res.make_array(), &a[0]));
} else {
Y_ENSURE(res.is_arraylike());
a.resize(res.chunks().size());
for (ui32 i = 0; i < res.chunks().size(); ++i) {
- auto status = arrow::ExportArray(*res.chunks()[i], &a[i]);
- if (!status.ok()) {
- throw yexception() << status.ToString();
- }
+ ARROW_OK(arrow::ExportArray(*res.chunks()[i], &a[i]));
}
}
@@ -131,6 +141,8 @@ private:
arrow::compute::ExecContext ExecContext_;
mutable arrow::compute::KernelContext KernelContext_;
+ arrow::compute::ScalarKernel Kernel_;
+ std::vector<arrow::ValueDescr> ArgsValuesDescr_;
};
inline void PrepareSimpleArrowUdf(IFunctionTypeInfoBuilder& builder, TType* signature, TType* userType, TExec exec, bool typesOnly,
@@ -179,7 +191,12 @@ inline void PrepareSimpleArrowUdf(IFunctionTypeInfoBuilder& builder, TType* sign
argsBuilder->Flags(callableInspector.GetArgumentFlags(i));
}
- // TODO fill argTypes
+ auto arrowTypeHandle = typeInfoHelper->MakeArrowType(initalType);
+ Y_ENSURE(arrowTypeHandle);
+ ArrowSchema s;
+ arrowTypeHandle->Export(&s);
+ auto type = ARROW_RESULT(arrow::ImportType(&s));
+ argTypes.emplace_back(type);
}
builder.Returns(builder.Block(onlyScalars)->Item(callableInspector.GetReturnType()).Build());
diff --git a/ydb/library/yql/public/udf/arrow/util.cpp b/ydb/library/yql/public/udf/arrow/util.cpp
new file mode 100644
index 0000000000..d8b8c6b6bb
--- /dev/null
+++ b/ydb/library/yql/public/udf/arrow/util.cpp
@@ -0,0 +1,69 @@
+#include "util.h"
+#include "defs.h"
+
+#include <arrow/array/array_base.h>
+#include <arrow/chunked_array.h>
+
+namespace NYql {
+namespace NUdf {
+
+std::shared_ptr<arrow::ArrayData> DeepSlice(const std::shared_ptr<arrow::ArrayData>& data, size_t offset, size_t len) {
+ Y_ENSURE(data->length >= 0);
+ Y_ENSURE(offset + len <= (size_t)data->length);
+ if (offset == 0 && len == (size_t)data->length) {
+ return data;
+ }
+
+ std::shared_ptr<arrow::ArrayData> result = data->Copy();
+ result->offset = data->offset + offset;
+ result->length = len;
+
+ if (data->null_count == data->length) {
+ result->null_count = len;
+ } else if (len == 0) {
+ result->null_count = 0;
+ } else {
+ result->null_count = data->null_count != 0 ? arrow::kUnknownNullCount : 0;
+ }
+
+ for (size_t i = 0; i < data->child_data.size(); ++i) {
+ result->child_data[i] = DeepSlice(data->child_data[i], offset, len);
+ }
+
+ return result;
+}
+
+std::shared_ptr<arrow::ArrayData> Chop(std::shared_ptr<arrow::ArrayData>& data, size_t len) {
+ auto first = DeepSlice(data, 0, len);
+ data = DeepSlice(data, len, data->length - len);
+ return first;
+}
+
+void ForEachArrayData(const arrow::Datum& datum, const std::function<void(const std::shared_ptr<arrow::ArrayData>&)>& func) {
+ Y_ENSURE(datum.is_arraylike(), "Expected array");
+ if (datum.is_array()) {
+ func(datum.array());
+ } else {
+ for (auto& chunk : datum.chunks()) {
+ func(chunk->data());
+ }
+ }
+}
+
+arrow::Datum MakeArray(const TVector<std::shared_ptr<arrow::ArrayData>>& chunks) {
+ Y_ENSURE(!chunks.empty(), "Expected non empty chunks");
+ arrow::ArrayVector resultChunks;
+ for (auto& chunk : chunks) {
+ resultChunks.push_back(arrow::Datum(chunk).make_array());
+ }
+
+ if (resultChunks.size() > 1) {
+ auto type = resultChunks.front()->type();
+ auto chunked = ARROW_RESULT(arrow::ChunkedArray::Make(std::move(resultChunks), type));
+ return arrow::Datum(chunked);
+ }
+ return arrow::Datum(resultChunks.front());
+}
+
+}
+}
diff --git a/ydb/library/yql/public/udf/arrow/util.h b/ydb/library/yql/public/udf/arrow/util.h
new file mode 100644
index 0000000000..ec8a0ebb11
--- /dev/null
+++ b/ydb/library/yql/public/udf/arrow/util.h
@@ -0,0 +1,22 @@
+#pragma once
+
+#include <util/generic/vector.h>
+
+#include <arrow/datum.h>
+
+#include <functional>
+
+namespace NYql {
+namespace NUdf {
+
+/// \brief Recursive version of ArrayData::Slice() method
+std::shared_ptr<arrow::ArrayData> DeepSlice(const std::shared_ptr<arrow::ArrayData>& data, size_t offset, size_t len);
+
+/// \brief Chops first len items of `data` as new ArrayData object
+std::shared_ptr<arrow::ArrayData> Chop(std::shared_ptr<arrow::ArrayData>& data, size_t len);
+
+void ForEachArrayData(const arrow::Datum& datum, const std::function<void(const std::shared_ptr<arrow::ArrayData>&)>& func);
+arrow::Datum MakeArray(const TVector<std::shared_ptr<arrow::ArrayData>>& chunks);
+
+}
+}
diff --git a/ydb/library/yql/udfs/common/url_base/lib/url_base_udf.h b/ydb/library/yql/udfs/common/url_base/lib/url_base_udf.h
index 832b61a228..5fcf17c914 100644
--- a/ydb/library/yql/udfs/common/url_base/lib/url_base_udf.h
+++ b/ydb/library/yql/udfs/common/url_base/lib/url_base_udf.h
@@ -43,9 +43,8 @@ SIMPLE_UDF(TGetScheme, char*(TAutoMap<char*>)) {
inline arrow::Status GetHostKernelExec(arrow::compute::KernelContext* ctx, const arrow::compute::ExecBatch& batch, arrow::Datum* res) {
Y_UNUSED(ctx);
- Y_UNUSED(batch);
- Y_UNUSED(res);
- Y_ENSURE(false);
+ *res = batch.values[0];
+ return arrow::Status::OK();
}
BEGIN_SIMPLE_ARROW_UDF(TGetHost, TOptional<char*>(TOptional<char*>)) {