aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorvvvv <vvvv@ydb.tech>2023-01-16 20:51:16 +0300
committervvvv <vvvv@ydb.tech>2023-01-16 20:51:16 +0300
commitfed916cd134d1dec999fbc3e1ebcac15bf5cffec (patch)
tree4aa18ad06e994d875aa36af444f5d61cd8e49b74
parentf24214276985020c32243e123b73cae38b6ae093 (diff)
downloadydb-fed916cd134d1dec999fbc3e1ebcac15bf5cffec.tar.gz
abi for chunked arrays, updated CH udf
-rw-r--r--ydb/library/yql/minikql/computation/mkql_value_builder.cpp75
-rw-r--r--ydb/library/yql/minikql/computation/mkql_value_builder.h6
-rw-r--r--ydb/library/yql/minikql/computation/mkql_value_builder_ut.cpp82
-rw-r--r--ydb/library/yql/public/udf/udf_value_builder.h10
-rw-r--r--ydb/library/yql/public/udf/udf_value_builder_ut.cpp2
5 files changed, 142 insertions, 33 deletions
diff --git a/ydb/library/yql/minikql/computation/mkql_value_builder.cpp b/ydb/library/yql/minikql/computation/mkql_value_builder.cpp
index 1d18acc73c7..281c781e836 100644
--- a/ydb/library/yql/minikql/computation/mkql_value_builder.cpp
+++ b/ydb/library/yql/minikql/computation/mkql_value_builder.cpp
@@ -7,6 +7,7 @@
#include <ydb/library/yql/parser/pg_wrapper/interface/utils.h>
#include <library/cpp/yson/node/node_io.h>
+#include <arrow/chunked_array.h>
#include <arrow/array/array_base.h>
#include <arrow/array/util.h>
#include <arrow/c/bridge.h>
@@ -208,12 +209,14 @@ NUdf::TUnboxedValue TDefaultValueBuilder::Run(const NUdf::TSourcePosition& calle
return ret;
}
-
-void TDefaultValueBuilder::ExportArrowBlock(NUdf::TUnboxedValuePod value, bool& isScalar, ArrowArray* out) const {
- const auto datum = TArrowBlock::From(value).GetDatum();
+void TDefaultValueBuilder::ExportArrowBlock(NUdf::TUnboxedValuePod value, ui32 chunk, ArrowArray* out) const {
+ const auto& datum = TArrowBlock::From(value).GetDatum();
std::shared_ptr<arrow::Array> arr;
if (datum.is_scalar()) {
- isScalar = true;
+ if (chunk != 0) {
+ UdfTerminate("Bad chunk index");
+ }
+
auto arrRes = arrow::MakeArrayFromScalar(*datum.scalar(), 1);
if (!arrRes.status().ok()) {
UdfTerminate(arrRes.status().ToString().c_str());
@@ -221,8 +224,18 @@ void TDefaultValueBuilder::ExportArrowBlock(NUdf::TUnboxedValuePod value, bool&
arr = std::move(arrRes).ValueOrDie();
} else if (datum.is_array()) {
- isScalar = false;
+ if (chunk != 0) {
+ UdfTerminate("Bad chunk index");
+ }
+
arr = datum.make_array();
+ } else if (datum.is_arraylike()) {
+ const auto& chunks = datum.chunks();
+ if (chunk >= chunks.size()) {
+ UdfTerminate("Bad chunk index");
+ }
+
+ arr = chunks[chunk];
} else {
UdfTerminate("Unexpected kind of arrow::Datum");
}
@@ -233,15 +246,15 @@ void TDefaultValueBuilder::ExportArrowBlock(NUdf::TUnboxedValuePod value, bool&
}
}
-NUdf::TUnboxedValue TDefaultValueBuilder::ImportArrowBlock(ArrowArray* array, const NUdf::IArrowType& type, bool isScalar) const {
- const auto dataType = static_cast<const TArrowType&>(type).GetType();
- auto arrRes = arrow::ImportArray(array, dataType);
- if (!arrRes.status().ok()) {
- UdfTerminate(arrRes.status().ToString().c_str());
- }
-
- auto arr = std::move(arrRes).ValueOrDie();
+NUdf::TUnboxedValue TDefaultValueBuilder::ImportArrowBlock(ArrowArray* arrays, ui32 chunkCount, bool isScalar, const NUdf::IArrowType& type) const {
+ const auto dataType = static_cast<const TArrowType&>(type).GetType();
if (isScalar) {
+ if (chunkCount != 1) {
+ UdfTerminate("Bad chunkCount value");
+ }
+
+ auto arrRes = arrow::ImportArray(arrays, dataType);
+ auto arr = std::move(arrRes).ValueOrDie();
if (arr->length() != 1) {
UdfTerminate("Expected array with one element");
}
@@ -254,12 +267,42 @@ NUdf::TUnboxedValue TDefaultValueBuilder::ImportArrowBlock(ArrowArray* array, co
auto scalar = std::move(scalarRes).ValueOrDie();
return HolderFactory_.CreateArrowBlock(std::move(scalar));
} else {
- return HolderFactory_.CreateArrowBlock(std::move(arr));
+ if (chunkCount < 1) {
+ UdfTerminate("Bad chunkCount value");
+ }
+
+ TVector<std::shared_ptr<arrow::Array>> imported(chunkCount);
+ for (ui32 i = 0; i < chunkCount; ++i) {
+ auto arrRes = arrow::ImportArray(arrays + i, dataType);
+ if (!arrRes.status().ok()) {
+ UdfTerminate(arrRes.status().ToString().c_str());
+ }
+
+ imported[i] = std::move(arrRes).ValueOrDie();
+ }
+
+ if (chunkCount == 1) {
+ return HolderFactory_.CreateArrowBlock(imported.front());
+ } else {
+ return HolderFactory_.CreateArrowBlock(arrow::ChunkedArray::Make(std::move(imported), dataType).ValueOrDie());
+ }
}
}
-void TDefaultValueBuilder::Unused3() const {
- Y_FAIL("Not implemented");
+ui32 TDefaultValueBuilder::GetArrowBlockChunks(NUdf::TUnboxedValuePod value, bool& isScalar, ui64& length) const {
+ const auto& datum = TArrowBlock::From(value).GetDatum();
+ isScalar = false;
+ length = datum.length();
+ if (datum.is_scalar()) {
+ isScalar = true;
+ return 1;
+ } else if (datum.is_array()) {
+ return 1;
+ } else if (datum.is_arraylike()) {
+ return datum.chunks().size();
+ } else {
+ UdfTerminate("Unexpected kind of arrow::Datum");
+ }
}
bool TDefaultValueBuilder::FindTimezoneName(ui32 id, NUdf::TStringRef& name) const {
diff --git a/ydb/library/yql/minikql/computation/mkql_value_builder.h b/ydb/library/yql/minikql/computation/mkql_value_builder.h
index 9955a52da66..c66fee73aa1 100644
--- a/ydb/library/yql/minikql/computation/mkql_value_builder.h
+++ b/ydb/library/yql/minikql/computation/mkql_value_builder.h
@@ -57,9 +57,9 @@ public:
bool GetSecureParam(NUdf::TStringRef key, NUdf::TStringRef &value) const final;
const NUdf::TSourcePosition* CalleePosition() const final;
NUdf::TUnboxedValue Run(const NUdf::TSourcePosition& callee, const NUdf::IBoxedValue& value, const NUdf::TUnboxedValuePod* args) const final;
- void ExportArrowBlock(NUdf::TUnboxedValuePod value, bool& isScalar, ArrowArray* out) const final;
- NUdf::TUnboxedValue ImportArrowBlock(ArrowArray* array, const NUdf::IArrowType& type, bool isScalar) const final;
- void Unused3() const final;
+ void ExportArrowBlock(NUdf::TUnboxedValuePod value, ui32 chunk, ArrowArray* out) const final;
+ NUdf::TUnboxedValue ImportArrowBlock(ArrowArray* arrays, ui32 chunkCount, bool isScalar, const NUdf::IArrowType& type) const final;
+ ui32 GetArrowBlockChunks(NUdf::TUnboxedValuePod value, bool& isScalar, ui64& length) const final;
bool MakeDate(ui32 year, ui32 month, ui32 day, ui16& value) const final;
bool SplitDate(ui16 value, ui32& year, ui32& month, ui32& day) const final;
diff --git a/ydb/library/yql/minikql/computation/mkql_value_builder_ut.cpp b/ydb/library/yql/minikql/computation/mkql_value_builder_ut.cpp
index 9504c720676..5921c8bbe48 100644
--- a/ydb/library/yql/minikql/computation/mkql_value_builder_ut.cpp
+++ b/ydb/library/yql/minikql/computation/mkql_value_builder_ut.cpp
@@ -10,6 +10,7 @@
#include <arrow/array/builder_primitive.h>
#include <arrow/c/abi.h>
#include <arrow/scalar.h>
+#include <arrow/chunked_array.h>
namespace NYql {
namespace NCommon {
@@ -267,13 +268,18 @@ private:
{
arrow::Datum d1(std::make_shared<arrow::UInt64Scalar>(123));
- auto val1 = HolderFactory.CreateArrowBlock(std::move(d1));
- ArrowArray arr1;
+ NUdf::TUnboxedValue val1 = HolderFactory.CreateArrowBlock(std::move(d1));
bool isScalar;
- Builder.ExportArrowBlock(val1, isScalar, &arr1);
+ ui64 length;
+ auto chunks = Builder.GetArrowBlockChunks(val1, isScalar, length);
+ UNIT_ASSERT_VALUES_EQUAL(chunks, 1);
UNIT_ASSERT(isScalar);
- auto val2 = Builder.ImportArrowBlock(&arr1, *atype, isScalar);
- const auto d2 = TArrowBlock::From(val2).GetDatum();
+ UNIT_ASSERT_VALUES_EQUAL(length, 1);
+
+ ArrowArray arr1;
+ Builder.ExportArrowBlock(val1, 0, &arr1);
+ NUdf::TUnboxedValue val2 = Builder.ImportArrowBlock(&arr1, 1, isScalar, *atype);
+ const auto& d2 = TArrowBlock::From(val2).GetDatum();
UNIT_ASSERT(d2.is_scalar());
UNIT_ASSERT_VALUES_EQUAL(d2.scalar_as<arrow::UInt64Scalar>().value, 123);
}
@@ -287,13 +293,19 @@ private:
std::shared_ptr<arrow::ArrayData> builderResult;
UNIT_ASSERT(builder.FinishInternal(&builderResult).ok());
arrow::Datum d1(builderResult);
- auto val1 = HolderFactory.CreateArrowBlock(std::move(d1));
- ArrowArray arr1;
+ NUdf::TUnboxedValue val1 = HolderFactory.CreateArrowBlock(std::move(d1));
+
bool isScalar;
- Builder.ExportArrowBlock(val1, isScalar, &arr1);
+ ui64 length;
+ auto chunks = Builder.GetArrowBlockChunks(val1, isScalar, length);
+ UNIT_ASSERT_VALUES_EQUAL(chunks, 1);
UNIT_ASSERT(!isScalar);
- auto val2 = Builder.ImportArrowBlock(&arr1, *atype, isScalar);
- const auto d2 = TArrowBlock::From(val2).GetDatum();
+ UNIT_ASSERT_VALUES_EQUAL(length, 3);
+
+ ArrowArray arr1;
+ Builder.ExportArrowBlock(val1, 0, &arr1);
+ NUdf::TUnboxedValue val2 = Builder.ImportArrowBlock(&arr1, 1, isScalar, *atype);
+ const auto& d2 = TArrowBlock::From(val2).GetDatum();
UNIT_ASSERT(d2.is_array());
UNIT_ASSERT_VALUES_EQUAL(d2.array()->length, 3);
UNIT_ASSERT_VALUES_EQUAL(d2.array()->GetNullCount(), 0);
@@ -302,6 +314,56 @@ private:
UNIT_ASSERT_VALUES_EQUAL(flat[1], 20);
UNIT_ASSERT_VALUES_EQUAL(flat[2], 30);
}
+
+ {
+ arrow::UInt64Builder builder1;
+ UNIT_ASSERT(builder1.Reserve(3).ok());
+ builder1.UnsafeAppend(ui64(10));
+ builder1.UnsafeAppend(ui64(20));
+ builder1.UnsafeAppend(ui64(30));
+ std::shared_ptr<arrow::Array> builder1Result;
+ UNIT_ASSERT(builder1.Finish(&builder1Result).ok());
+
+ arrow::UInt64Builder builder2;
+ UNIT_ASSERT(builder2.Reserve(2).ok());
+ builder2.UnsafeAppend(ui64(40));
+ builder2.UnsafeAppend(ui64(50));
+ std::shared_ptr<arrow::Array> builder2Result;
+ UNIT_ASSERT(builder2.Finish(&builder2Result).ok());
+
+ auto chunked = arrow::ChunkedArray::Make({ builder1Result, builder2Result }).ValueOrDie();
+ arrow::Datum d1(chunked);
+ NUdf::TUnboxedValue val1 = HolderFactory.CreateArrowBlock(std::move(d1));
+
+ bool isScalar;
+ ui64 length;
+ auto chunks = Builder.GetArrowBlockChunks(val1, isScalar, length);
+ UNIT_ASSERT_VALUES_EQUAL(chunks, 2);
+ UNIT_ASSERT(!isScalar);
+ UNIT_ASSERT_VALUES_EQUAL(length, 5);
+
+ ArrowArray arrs[2];
+ Builder.ExportArrowBlock(val1, 0, &arrs[0]);
+ Builder.ExportArrowBlock(val1, 1, &arrs[1]);
+ NUdf::TUnboxedValue val2 = Builder.ImportArrowBlock(arrs, 2, isScalar, *atype);
+ const auto& d2 = TArrowBlock::From(val2).GetDatum();
+ UNIT_ASSERT(d2.is_arraylike() && !d2.is_array());
+ UNIT_ASSERT_VALUES_EQUAL(d2.length(), 5);
+ UNIT_ASSERT_VALUES_EQUAL(d2.chunks().size(), 2);
+
+ UNIT_ASSERT_VALUES_EQUAL(d2.chunks()[0]->data()->length, 3);
+ UNIT_ASSERT_VALUES_EQUAL(d2.chunks()[0]->data()->GetNullCount(), 0);
+ auto flat = d2.chunks()[0]->data()->GetValues<ui64>(1);
+ UNIT_ASSERT_VALUES_EQUAL(flat[0], 10);
+ UNIT_ASSERT_VALUES_EQUAL(flat[1], 20);
+ UNIT_ASSERT_VALUES_EQUAL(flat[2], 30);
+
+ UNIT_ASSERT_VALUES_EQUAL(d2.chunks()[1]->data()->length, 2);
+ UNIT_ASSERT_VALUES_EQUAL(d2.chunks()[1]->data()->GetNullCount(), 0);
+ flat = d2.chunks()[1]->data()->GetValues<ui64>(1);
+ UNIT_ASSERT_VALUES_EQUAL(flat[0], 40);
+ UNIT_ASSERT_VALUES_EQUAL(flat[1], 50);
+ }
}
};
diff --git a/ydb/library/yql/public/udf/udf_value_builder.h b/ydb/library/yql/public/udf/udf_value_builder.h
index 22e18a6cbc8..d04498b7085 100644
--- a/ydb/library/yql/public/udf/udf_value_builder.h
+++ b/ydb/library/yql/public/udf/udf_value_builder.h
@@ -200,11 +200,15 @@ public:
#if UDF_ABI_COMPATIBILITY_VERSION_CURRENT >= UDF_ABI_COMPATIBILITY_VERSION(2, 19)
class IValueBuilder5: public IValueBuilder4 {
public:
+ // exports one Array or Scalar to out. should be called for each chunk of ChunkedArray
// returns array with one element for scalars
- virtual void ExportArrowBlock(TUnboxedValuePod value, bool& isScalar, ArrowArray* out) const = 0;
+ virtual void ExportArrowBlock(TUnboxedValuePod value, ui32 chunk, ArrowArray* out) const = 0;
+ // imports all chunks. returns Scalar, ChunkedArray if chunkCount > 1, otherwise Array
+ // arrays should be a pointer to array of chunkCount structs
// the ArrowArray struct has its contents moved to a private object held alive by the result.
- virtual TUnboxedValue ImportArrowBlock(ArrowArray* array, const IArrowType& type, bool isScalar) const = 0;
- virtual void Unused3() const = 0;
+ virtual TUnboxedValue ImportArrowBlock(ArrowArray* arrays, ui32 chunkCount, bool isScalar, const IArrowType& type) const = 0;
+ // always returns 1 for Scalar and Array, >= 1 for ChunkedArray
+ virtual ui32 GetArrowBlockChunks(TUnboxedValuePod value, bool& isScalar, ui64& length) const = 0;
};
#endif
diff --git a/ydb/library/yql/public/udf/udf_value_builder_ut.cpp b/ydb/library/yql/public/udf/udf_value_builder_ut.cpp
index 527c4f77f87..e94e13bcc90 100644
--- a/ydb/library/yql/public/udf/udf_value_builder_ut.cpp
+++ b/ydb/library/yql/public/udf/udf_value_builder_ut.cpp
@@ -28,7 +28,7 @@ Y_UNIT_TEST_SUITE(TUdfValueBuilder) {
UNIT_ASSERT_VALUES_EQUAL(19, GetMethodIndex(&IValueBuilder::Run));
UNIT_ASSERT_VALUES_EQUAL(20, GetMethodIndex(&IValueBuilder::ExportArrowBlock));
UNIT_ASSERT_VALUES_EQUAL(21, GetMethodIndex(&IValueBuilder::ImportArrowBlock));
- UNIT_ASSERT_VALUES_EQUAL(22, GetMethodIndex(&IValueBuilder::Unused3));
+ UNIT_ASSERT_VALUES_EQUAL(22, GetMethodIndex(&IValueBuilder::GetArrowBlockChunks));
UNIT_ASSERT_VALUES_EQUAL(23, GetMethodIndex(&IValueBuilder::GetPgBuilder));
UNIT_ASSERT_VALUES_EQUAL(24, GetMethodIndex(&IValueBuilder::NewArray64));
}