diff options
author | vvvv <vvvv@ydb.tech> | 2023-01-16 20:51:16 +0300 |
---|---|---|
committer | vvvv <vvvv@ydb.tech> | 2023-01-16 20:51:16 +0300 |
commit | fed916cd134d1dec999fbc3e1ebcac15bf5cffec (patch) | |
tree | 4aa18ad06e994d875aa36af444f5d61cd8e49b74 | |
parent | f24214276985020c32243e123b73cae38b6ae093 (diff) | |
download | ydb-fed916cd134d1dec999fbc3e1ebcac15bf5cffec.tar.gz |
abi for chunked arrays, updated CH udf
5 files changed, 142 insertions, 33 deletions
diff --git a/ydb/library/yql/minikql/computation/mkql_value_builder.cpp b/ydb/library/yql/minikql/computation/mkql_value_builder.cpp index 1d18acc73c7..281c781e836 100644 --- a/ydb/library/yql/minikql/computation/mkql_value_builder.cpp +++ b/ydb/library/yql/minikql/computation/mkql_value_builder.cpp @@ -7,6 +7,7 @@ #include <ydb/library/yql/parser/pg_wrapper/interface/utils.h> #include <library/cpp/yson/node/node_io.h> +#include <arrow/chunked_array.h> #include <arrow/array/array_base.h> #include <arrow/array/util.h> #include <arrow/c/bridge.h> @@ -208,12 +209,14 @@ NUdf::TUnboxedValue TDefaultValueBuilder::Run(const NUdf::TSourcePosition& calle return ret; } - -void TDefaultValueBuilder::ExportArrowBlock(NUdf::TUnboxedValuePod value, bool& isScalar, ArrowArray* out) const { - const auto datum = TArrowBlock::From(value).GetDatum(); +void TDefaultValueBuilder::ExportArrowBlock(NUdf::TUnboxedValuePod value, ui32 chunk, ArrowArray* out) const { + const auto& datum = TArrowBlock::From(value).GetDatum(); std::shared_ptr<arrow::Array> arr; if (datum.is_scalar()) { - isScalar = true; + if (chunk != 0) { + UdfTerminate("Bad chunk index"); + } + auto arrRes = arrow::MakeArrayFromScalar(*datum.scalar(), 1); if (!arrRes.status().ok()) { UdfTerminate(arrRes.status().ToString().c_str()); @@ -221,8 +224,18 @@ void TDefaultValueBuilder::ExportArrowBlock(NUdf::TUnboxedValuePod value, bool& arr = std::move(arrRes).ValueOrDie(); } else if (datum.is_array()) { - isScalar = false; + if (chunk != 0) { + UdfTerminate("Bad chunk index"); + } + arr = datum.make_array(); + } else if (datum.is_arraylike()) { + const auto& chunks = datum.chunks(); + if (chunk >= chunks.size()) { + UdfTerminate("Bad chunk index"); + } + + arr = chunks[chunk]; } else { UdfTerminate("Unexpected kind of arrow::Datum"); } @@ -233,15 +246,15 @@ void TDefaultValueBuilder::ExportArrowBlock(NUdf::TUnboxedValuePod value, bool& } } -NUdf::TUnboxedValue TDefaultValueBuilder::ImportArrowBlock(ArrowArray* array, const NUdf::IArrowType& type, bool isScalar) const { - const auto dataType = static_cast<const TArrowType&>(type).GetType(); - auto arrRes = arrow::ImportArray(array, dataType); - if (!arrRes.status().ok()) { - UdfTerminate(arrRes.status().ToString().c_str()); - } - - auto arr = std::move(arrRes).ValueOrDie(); +NUdf::TUnboxedValue TDefaultValueBuilder::ImportArrowBlock(ArrowArray* arrays, ui32 chunkCount, bool isScalar, const NUdf::IArrowType& type) const { + const auto dataType = static_cast<const TArrowType&>(type).GetType(); if (isScalar) { + if (chunkCount != 1) { + UdfTerminate("Bad chunkCount value"); + } + + auto arrRes = arrow::ImportArray(arrays, dataType); + auto arr = std::move(arrRes).ValueOrDie(); if (arr->length() != 1) { UdfTerminate("Expected array with one element"); } @@ -254,12 +267,42 @@ NUdf::TUnboxedValue TDefaultValueBuilder::ImportArrowBlock(ArrowArray* array, co auto scalar = std::move(scalarRes).ValueOrDie(); return HolderFactory_.CreateArrowBlock(std::move(scalar)); } else { - return HolderFactory_.CreateArrowBlock(std::move(arr)); + if (chunkCount < 1) { + UdfTerminate("Bad chunkCount value"); + } + + TVector<std::shared_ptr<arrow::Array>> imported(chunkCount); + for (ui32 i = 0; i < chunkCount; ++i) { + auto arrRes = arrow::ImportArray(arrays + i, dataType); + if (!arrRes.status().ok()) { + UdfTerminate(arrRes.status().ToString().c_str()); + } + + imported[i] = std::move(arrRes).ValueOrDie(); + } + + if (chunkCount == 1) { + return HolderFactory_.CreateArrowBlock(imported.front()); + } else { + return HolderFactory_.CreateArrowBlock(arrow::ChunkedArray::Make(std::move(imported), dataType).ValueOrDie()); + } }
} -void TDefaultValueBuilder::Unused3() const { - Y_FAIL("Not implemented"); +ui32 TDefaultValueBuilder::GetArrowBlockChunks(NUdf::TUnboxedValuePod value, bool& isScalar, ui64& length) const { + const auto& datum = TArrowBlock::From(value).GetDatum(); + isScalar = false; + length = datum.length(); + if (datum.is_scalar()) { + isScalar = true; + return 1; + } else if (datum.is_array()) { + return 1; + } else if (datum.is_arraylike()) { + return datum.chunks().size(); + } else { + UdfTerminate("Unexpected kind of arrow::Datum"); + } } bool TDefaultValueBuilder::FindTimezoneName(ui32 id, NUdf::TStringRef& name) const { diff --git a/ydb/library/yql/minikql/computation/mkql_value_builder.h b/ydb/library/yql/minikql/computation/mkql_value_builder.h index 9955a52da66..c66fee73aa1 100644 --- a/ydb/library/yql/minikql/computation/mkql_value_builder.h +++ b/ydb/library/yql/minikql/computation/mkql_value_builder.h @@ -57,9 +57,9 @@ public: bool GetSecureParam(NUdf::TStringRef key, NUdf::TStringRef &value) const final; const NUdf::TSourcePosition* CalleePosition() const final; NUdf::TUnboxedValue Run(const NUdf::TSourcePosition& callee, const NUdf::IBoxedValue& value, const NUdf::TUnboxedValuePod* args) const final; - void ExportArrowBlock(NUdf::TUnboxedValuePod value, bool& isScalar, ArrowArray* out) const final; - NUdf::TUnboxedValue ImportArrowBlock(ArrowArray* array, const NUdf::IArrowType& type, bool isScalar) const final; - void Unused3() const final; + void ExportArrowBlock(NUdf::TUnboxedValuePod value, ui32 chunk, ArrowArray* out) const final; + NUdf::TUnboxedValue ImportArrowBlock(ArrowArray* arrays, ui32 chunkCount, bool isScalar, const NUdf::IArrowType& type) const final; + ui32 GetArrowBlockChunks(NUdf::TUnboxedValuePod value, bool& isScalar, ui64& length) const final; bool MakeDate(ui32 year, ui32 month, ui32 day, ui16& value) const final; bool SplitDate(ui16 value, ui32& year, ui32& month, ui32& day) const final; diff --git a/ydb/library/yql/minikql/computation/mkql_value_builder_ut.cpp b/ydb/library/yql/minikql/computation/mkql_value_builder_ut.cpp index 9504c720676..5921c8bbe48 100644 --- a/ydb/library/yql/minikql/computation/mkql_value_builder_ut.cpp +++ b/ydb/library/yql/minikql/computation/mkql_value_builder_ut.cpp @@ -10,6 +10,7 @@ #include <arrow/array/builder_primitive.h> #include <arrow/c/abi.h> #include <arrow/scalar.h> +#include <arrow/chunked_array.h> namespace NYql { namespace NCommon { @@ -267,13 +268,18 @@ private: { arrow::Datum d1(std::make_shared<arrow::UInt64Scalar>(123)); - auto val1 = HolderFactory.CreateArrowBlock(std::move(d1)); - ArrowArray arr1; + NUdf::TUnboxedValue val1 = HolderFactory.CreateArrowBlock(std::move(d1)); bool isScalar; - Builder.ExportArrowBlock(val1, isScalar, &arr1); + ui64 length; + auto chunks = Builder.GetArrowBlockChunks(val1, isScalar, length); + UNIT_ASSERT_VALUES_EQUAL(chunks, 1); UNIT_ASSERT(isScalar); - auto val2 = Builder.ImportArrowBlock(&arr1, *atype, isScalar); - const auto d2 = TArrowBlock::From(val2).GetDatum(); + UNIT_ASSERT_VALUES_EQUAL(length, 1); + + ArrowArray arr1; + Builder.ExportArrowBlock(val1, 0, &arr1); + NUdf::TUnboxedValue val2 = Builder.ImportArrowBlock(&arr1, 1, isScalar, *atype); + const auto& d2 = TArrowBlock::From(val2).GetDatum(); UNIT_ASSERT(d2.is_scalar()); UNIT_ASSERT_VALUES_EQUAL(d2.scalar_as<arrow::UInt64Scalar>().value, 123); } @@ -287,13 +293,19 @@ private: std::shared_ptr<arrow::ArrayData> builderResult; UNIT_ASSERT(builder.FinishInternal(&builderResult).ok()); arrow::Datum d1(builderResult); - auto val1 = HolderFactory.CreateArrowBlock(std::move(d1)); - ArrowArray arr1; + NUdf::TUnboxedValue val1 = HolderFactory.CreateArrowBlock(std::move(d1)); + bool isScalar; - Builder.ExportArrowBlock(val1, isScalar, &arr1); + ui64 length; + auto chunks = Builder.GetArrowBlockChunks(val1, isScalar, length); + UNIT_ASSERT_VALUES_EQUAL(chunks, 1); UNIT_ASSERT(!isScalar); - auto val2 = Builder.ImportArrowBlock(&arr1, *atype, isScalar); - const auto d2 = TArrowBlock::From(val2).GetDatum(); + UNIT_ASSERT_VALUES_EQUAL(length, 3); + + ArrowArray arr1; + Builder.ExportArrowBlock(val1, 0, &arr1); + NUdf::TUnboxedValue val2 = Builder.ImportArrowBlock(&arr1, 1, isScalar, *atype); + const auto& d2 = TArrowBlock::From(val2).GetDatum(); UNIT_ASSERT(d2.is_array()); UNIT_ASSERT_VALUES_EQUAL(d2.array()->length, 3); UNIT_ASSERT_VALUES_EQUAL(d2.array()->GetNullCount(), 0); @@ -302,6 +314,56 @@ private: UNIT_ASSERT_VALUES_EQUAL(flat[1], 20); UNIT_ASSERT_VALUES_EQUAL(flat[2], 30); } + + { + arrow::UInt64Builder builder1; + UNIT_ASSERT(builder1.Reserve(3).ok()); + builder1.UnsafeAppend(ui64(10)); + builder1.UnsafeAppend(ui64(20)); + builder1.UnsafeAppend(ui64(30)); + std::shared_ptr<arrow::Array> builder1Result; + UNIT_ASSERT(builder1.Finish(&builder1Result).ok()); + + arrow::UInt64Builder builder2; + UNIT_ASSERT(builder2.Reserve(2).ok()); + builder2.UnsafeAppend(ui64(40)); + builder2.UnsafeAppend(ui64(50)); + std::shared_ptr<arrow::Array> builder2Result; + UNIT_ASSERT(builder2.Finish(&builder2Result).ok()); + + auto chunked = arrow::ChunkedArray::Make({ builder1Result, builder2Result }).ValueOrDie(); + arrow::Datum d1(chunked); + NUdf::TUnboxedValue val1 = HolderFactory.CreateArrowBlock(std::move(d1)); + + bool isScalar; + ui64 length; + auto chunks = Builder.GetArrowBlockChunks(val1, isScalar, length); + UNIT_ASSERT_VALUES_EQUAL(chunks, 2); + UNIT_ASSERT(!isScalar); + UNIT_ASSERT_VALUES_EQUAL(length, 5); + + ArrowArray arrs[2]; + Builder.ExportArrowBlock(val1, 0, &arrs[0]); + Builder.ExportArrowBlock(val1, 1, &arrs[1]); + NUdf::TUnboxedValue val2 = Builder.ImportArrowBlock(arrs, 2, isScalar, *atype); + const auto& d2 = TArrowBlock::From(val2).GetDatum(); + UNIT_ASSERT(d2.is_arraylike() && !d2.is_array()); + UNIT_ASSERT_VALUES_EQUAL(d2.length(), 5); + UNIT_ASSERT_VALUES_EQUAL(d2.chunks().size(), 2); + + UNIT_ASSERT_VALUES_EQUAL(d2.chunks()[0]->data()->length, 3); + UNIT_ASSERT_VALUES_EQUAL(d2.chunks()[0]->data()->GetNullCount(), 0); + auto flat = d2.chunks()[0]->data()->GetValues<ui64>(1); + UNIT_ASSERT_VALUES_EQUAL(flat[0], 10); + UNIT_ASSERT_VALUES_EQUAL(flat[1], 20); + UNIT_ASSERT_VALUES_EQUAL(flat[2], 30); + + UNIT_ASSERT_VALUES_EQUAL(d2.chunks()[1]->data()->length, 2); + UNIT_ASSERT_VALUES_EQUAL(d2.chunks()[1]->data()->GetNullCount(), 0); + flat = d2.chunks()[1]->data()->GetValues<ui64>(1); + UNIT_ASSERT_VALUES_EQUAL(flat[0], 40); + UNIT_ASSERT_VALUES_EQUAL(flat[1], 50); + } } }; diff --git a/ydb/library/yql/public/udf/udf_value_builder.h b/ydb/library/yql/public/udf/udf_value_builder.h index 22e18a6cbc8..d04498b7085 100644 --- a/ydb/library/yql/public/udf/udf_value_builder.h +++ b/ydb/library/yql/public/udf/udf_value_builder.h @@ -200,11 +200,15 @@ public: #if UDF_ABI_COMPATIBILITY_VERSION_CURRENT >= UDF_ABI_COMPATIBILITY_VERSION(2, 19) class IValueBuilder5: public IValueBuilder4 { public: + // exports one Array or Scalar to out. should be called for each chunk of ChunkedArray // returns array with one element for scalars - virtual void ExportArrowBlock(TUnboxedValuePod value, bool& isScalar, ArrowArray* out) const = 0; + virtual void ExportArrowBlock(TUnboxedValuePod value, ui32 chunk, ArrowArray* out) const = 0; + // imports all chunks. returns Scalar, ChunkedArray if chunkCount > 1, otherwise Array + // arrays should be a pointer to array of chunkCount structs // the ArrowArray struct has its contents moved to a private object held alive by the result. - virtual TUnboxedValue ImportArrowBlock(ArrowArray* array, const IArrowType& type, bool isScalar) const = 0; - virtual void Unused3() const = 0; + virtual TUnboxedValue ImportArrowBlock(ArrowArray* arrays, ui32 chunkCount, bool isScalar, const IArrowType& type) const = 0; + // always returns 1 for Scalar and Array, >= 1 for ChunkedArray + virtual ui32 GetArrowBlockChunks(TUnboxedValuePod value, bool& isScalar, ui64& length) const = 0; }; #endif diff --git a/ydb/library/yql/public/udf/udf_value_builder_ut.cpp b/ydb/library/yql/public/udf/udf_value_builder_ut.cpp index 527c4f77f87..e94e13bcc90 100644 --- a/ydb/library/yql/public/udf/udf_value_builder_ut.cpp +++ b/ydb/library/yql/public/udf/udf_value_builder_ut.cpp @@ -28,7 +28,7 @@ Y_UNIT_TEST_SUITE(TUdfValueBuilder) { UNIT_ASSERT_VALUES_EQUAL(19, GetMethodIndex(&IValueBuilder::Run)); UNIT_ASSERT_VALUES_EQUAL(20, GetMethodIndex(&IValueBuilder::ExportArrowBlock)); UNIT_ASSERT_VALUES_EQUAL(21, GetMethodIndex(&IValueBuilder::ImportArrowBlock)); - UNIT_ASSERT_VALUES_EQUAL(22, GetMethodIndex(&IValueBuilder::Unused3)); + UNIT_ASSERT_VALUES_EQUAL(22, GetMethodIndex(&IValueBuilder::GetArrowBlockChunks)); UNIT_ASSERT_VALUES_EQUAL(23, GetMethodIndex(&IValueBuilder::GetPgBuilder)); UNIT_ASSERT_VALUES_EQUAL(24, GetMethodIndex(&IValueBuilder::NewArray64)); } |