diff options
author | vvvv <vvvv@ydb.tech> | 2022-09-22 15:04:55 +0300 |
---|---|---|
committer | vvvv <vvvv@ydb.tech> | 2022-09-22 15:04:55 +0300 |
commit | a82a5db38dc17a3d5836b8156d7a8d4a7c9d3516 (patch) | |
tree | 7ada02729185466565ca89a13105f11136980285 | |
parent | 362cf51461c0554f3c10f407b66a73cf4f243d5a (diff) | |
download | ydb-a82a5db38dc17a3d5836b8156d7a8d4a7c9d3516.tar.gz |
[abi] BlockType in type inspection/type builder, Import/Export of arrow blocks and types
22 files changed, 489 insertions, 68 deletions
diff --git a/ydb/library/yql/minikql/arrow/mkql_functions.cpp b/ydb/library/yql/minikql/arrow/mkql_functions.cpp index 226108956d..0d1f045c3f 100644 --- a/ydb/library/yql/minikql/arrow/mkql_functions.cpp +++ b/ydb/library/yql/minikql/arrow/mkql_functions.cpp @@ -1,6 +1,7 @@ #include "mkql_functions.h" #include <ydb/library/yql/minikql/mkql_node_builder.h> #include <ydb/library/yql/minikql/mkql_node_cast.h> +#include <ydb/library/yql/minikql/mkql_type_builder.h> #include <arrow/datum.h> #include <arrow/visitor.h> @@ -10,50 +11,6 @@ namespace NKikimr::NMiniKQL { -bool ConvertArrowType(TType* itemType, bool& isOptional, std::shared_ptr<arrow::DataType>& type) { - auto unpacked = UnpackOptional(itemType, isOptional); - if (!unpacked->IsData()) { - return false; - } - - auto slot = AS_TYPE(TDataType, unpacked)->GetDataSlot(); - if (!slot) { - return false; - } - - switch (*slot) { - case NUdf::EDataSlot::Bool: - type = arrow::boolean(); - return true; - case NUdf::EDataSlot::Uint8: - type = arrow::uint8(); - return true; - case NUdf::EDataSlot::Int8: - type = arrow::int8(); - return true; - case NUdf::EDataSlot::Uint16: - type = arrow::uint16(); - return true; - case NUdf::EDataSlot::Int16: - type = arrow::int16(); - return true; - case NUdf::EDataSlot::Uint32: - type = arrow::uint32(); - return true; - case NUdf::EDataSlot::Int32: - type = arrow::int32(); - return true; - case NUdf::EDataSlot::Int64: - type = arrow::int64(); - return true; - case NUdf::EDataSlot::Uint64: - type = arrow::uint64(); - return true; - default: - return false; - } -} - bool ConvertInputArrowType(TType* blockType, bool& isOptional, arrow::ValueDescr& descr) { auto asBlockType = AS_TYPE(TBlockType, blockType); descr.shape = asBlockType->GetShape() == TBlockType::EShape::Scalar ? arrow::ValueDescr::SCALAR : arrow::ValueDescr::ARRAY; diff --git a/ydb/library/yql/minikql/arrow/mkql_functions.h b/ydb/library/yql/minikql/arrow/mkql_functions.h index 5dffb01a59..1ac4edb578 100644 --- a/ydb/library/yql/minikql/arrow/mkql_functions.h +++ b/ydb/library/yql/minikql/arrow/mkql_functions.h @@ -7,6 +7,5 @@ namespace NKikimr::NMiniKQL { bool FindArrowFunction(TStringBuf name, const TArrayRef<TType*>& inputTypes, TType*& outputType, TTypeEnvironment& env); bool ConvertInputArrowType(TType* blockType, bool& isOptional, arrow::ValueDescr& descr); -bool ConvertArrowType(TType* itemType, bool& isOptional, std::shared_ptr<arrow::DataType>& type); bool HasArrowCast(TType* from, TType* to); } diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_block_func.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_block_func.cpp index 4fe047e43f..5497a89009 100644 --- a/ydb/library/yql/minikql/comp_nodes/mkql_block_func.cpp +++ b/ydb/library/yql/minikql/comp_nodes/mkql_block_func.cpp @@ -1,11 +1,12 @@ #include "mkql_block_func.h" #include <ydb/library/yql/minikql/arrow/arrow_defs.h> +#include <ydb/library/yql/minikql/arrow/mkql_functions.h> #include <ydb/library/yql/minikql/computation/mkql_computation_node_holders.h> #include <ydb/library/yql/minikql/computation/mkql_computation_node_codegen.h> #include <ydb/library/yql/minikql/mkql_node_builder.h> #include <ydb/library/yql/minikql/mkql_node_cast.h> -#include <ydb/library/yql/minikql/arrow/mkql_functions.h> +#include <ydb/library/yql/minikql/mkql_type_builder.h> #include <arrow/array/builder_primitive.h> #include <arrow/compute/cast.h> diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_blocks.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_blocks.cpp index ee68f268c3..eb90882ab5 100644 --- a/ydb/library/yql/minikql/comp_nodes/mkql_blocks.cpp +++ b/ydb/library/yql/minikql/comp_nodes/mkql_blocks.cpp @@ -1,7 +1,7 @@ #include "mkql_blocks.h" #include <ydb/library/yql/minikql/arrow/arrow_defs.h> -#include <ydb/library/yql/minikql/arrow/mkql_functions.h> +#include <ydb/library/yql/minikql/mkql_type_builder.h> #include <ydb/library/yql/minikql/computation/mkql_computation_node_holders.h> #include <ydb/library/yql/minikql/mkql_node_builder.h> #include <ydb/library/yql/minikql/mkql_node_cast.h> diff --git a/ydb/library/yql/minikql/computation/mkql_value_builder.cpp b/ydb/library/yql/minikql/computation/mkql_value_builder.cpp index 5b6869a243..bb6d6d3399 100644 --- a/ydb/library/yql/minikql/computation/mkql_value_builder.cpp +++ b/ydb/library/yql/minikql/computation/mkql_value_builder.cpp @@ -3,8 +3,13 @@ #include <ydb/library/yql/minikql/mkql_node_cast.h> #include <ydb/library/yql/minikql/mkql_string_util.h> +#include <ydb/library/yql/minikql/mkql_type_builder.h> #include <library/cpp/yson/node/node_io.h> +#include <arrow/array/array_base.h> +#include <arrow/array/util.h> +#include <arrow/c/bridge.h> + #include <util/system/env.h> namespace NYql { @@ -202,12 +207,54 @@ NUdf::TUnboxedValue TDefaultValueBuilder::Run(const NUdf::TSourcePosition& calle return ret; } -void TDefaultValueBuilder::Unused1() const { - Y_FAIL("Not implemented"); + +void TDefaultValueBuilder::ExportArrowBlock(NUdf::TUnboxedValuePod value, bool& isScalar, ArrowArray* out) const { + const auto datum = TArrowBlock::From(value).GetDatum(); + std::shared_ptr<arrow::Array> arr; + if (datum.is_scalar()) { + isScalar = true; + auto arrRes = arrow::MakeArrayFromScalar(*datum.scalar(), 1); + if (!arrRes.status().ok()) { + UdfTerminate(arrRes.status().ToString().c_str()); + } + + arr = std::move(arrRes).ValueOrDie(); + } else if (datum.is_array()) { + isScalar = false; + arr = datum.make_array(); + } else { + UdfTerminate("Unexpected kind of arrow::Datum"); + } + + auto status = arrow::ExportArray(*arr, out); + if (!status.ok()) { + UdfTerminate(status.ToString().c_str()); + } } -void TDefaultValueBuilder::Unused2() const { - Y_FAIL("Not implemented"); +NUdf::TUnboxedValue TDefaultValueBuilder::ImportArrowBlock(ArrowArray* array, const NUdf::IArrowType& type, bool isScalar) const { + const auto dataType = ((const TArrowType&)type).GetType(); + auto arrRes = arrow::ImportArray(array, dataType); + if (!arrRes.status().ok()) { + UdfTerminate(arrRes.status().ToString().c_str()); + } + + auto arr = std::move(arrRes).ValueOrDie(); + if (isScalar) { + if (arr->length() != 1) { + UdfTerminate("Expected array with one element"); + } + + auto scalarRes = arr->GetScalar(0); + if (!scalarRes.status().ok()) { + UdfTerminate(scalarRes.status().ToString().c_str()); + } + + auto scalar = std::move(scalarRes).ValueOrDie(); + return HolderFactory_.CreateArrowBlock(std::move(scalar)); + } else { + return HolderFactory_.CreateArrowBlock(std::move(arr)); + }
} void TDefaultValueBuilder::Unused3() const { diff --git a/ydb/library/yql/minikql/computation/mkql_value_builder.h b/ydb/library/yql/minikql/computation/mkql_value_builder.h index dea2ef0205..370f1ff24a 100644 --- a/ydb/library/yql/minikql/computation/mkql_value_builder.h +++ b/ydb/library/yql/minikql/computation/mkql_value_builder.h @@ -57,8 +57,8 @@ public: bool GetSecureParam(NUdf::TStringRef key, NUdf::TStringRef &value) const final; const NUdf::TSourcePosition* CalleePosition() const final; NUdf::TUnboxedValue Run(const NUdf::TSourcePosition& callee, const NUdf::IBoxedValue& value, const NUdf::TUnboxedValuePod* args) const final; - void Unused1() const final; - void Unused2() const final; + void ExportArrowBlock(NUdf::TUnboxedValuePod value, bool& isScalar, ArrowArray* out) const final; + NUdf::TUnboxedValue ImportArrowBlock(ArrowArray* array, const NUdf::IArrowType& type, bool isScalar) const final; void Unused3() const final; bool MakeDate(ui32 year, ui32 month, ui32 day, ui16& value) const final; bool SplitDate(ui16 value, ui32& year, ui32& month, ui32& day) const final; diff --git a/ydb/library/yql/minikql/computation/mkql_value_builder_ut.cpp b/ydb/library/yql/minikql/computation/mkql_value_builder_ut.cpp index 11fd59e5f3..9282538948 100644 --- a/ydb/library/yql/minikql/computation/mkql_value_builder_ut.cpp +++ b/ydb/library/yql/minikql/computation/mkql_value_builder_ut.cpp @@ -7,6 +7,10 @@ #include <ydb/library/yql/parser/pg_catalog/catalog.h> #include <library/cpp/testing/unittest/registar.h> +#include <arrow/array/builder_primitive.h> +#include <arrow/c/abi.h> +#include <arrow/scalar.h> + namespace NYql { namespace NCommon { @@ -68,6 +72,7 @@ private: UNIT_TEST(TestConvertToFromPg); UNIT_TEST(TestConvertToFromPgNulls); UNIT_TEST(TestPgNewString); + UNIT_TEST(TestArrowBlock); UNIT_TEST_SUITE_END(); @@ -254,6 +259,49 @@ private: UNIT_ASSERT_VALUES_EQUAL((TStringBuf)from.AsStringRef(), "ABC"sv); } } + + void TestArrowBlock() { + auto type = FunctionTypeInfoBuilder.SimpleType<ui64>(); + auto atype = FunctionTypeInfoBuilder.MakeArrowType(type); + + { + arrow::Datum d1(std::make_shared<arrow::UInt64Scalar>(123)); + auto val1 = HolderFactory.CreateArrowBlock(std::move(d1)); + ArrowArray arr1; + bool isScalar; + Builder.ExportArrowBlock(val1, isScalar, &arr1); + UNIT_ASSERT(isScalar); + auto val2 = Builder.ImportArrowBlock(&arr1, *atype, isScalar); + const auto d2 = TArrowBlock::From(val2).GetDatum(); + UNIT_ASSERT(d2.is_scalar()); + UNIT_ASSERT_VALUES_EQUAL(d2.scalar_as<arrow::UInt64Scalar>().value, 123); + } + + { + arrow::UInt64Builder builder; + UNIT_ASSERT(builder.Reserve(3).ok()); + builder.UnsafeAppend(ui64(10)); + builder.UnsafeAppend(ui64(20)); + builder.UnsafeAppend(ui64(30)); + std::shared_ptr<arrow::ArrayData> builderResult; + UNIT_ASSERT(builder.FinishInternal(&builderResult).ok()); + arrow::Datum d1(builderResult); + auto val1 = HolderFactory.CreateArrowBlock(std::move(d1)); + ArrowArray arr1; + bool isScalar; + Builder.ExportArrowBlock(val1, isScalar, &arr1); + UNIT_ASSERT(!isScalar); + auto val2 = Builder.ImportArrowBlock(&arr1, *atype, isScalar); + const auto d2 = TArrowBlock::From(val2).GetDatum(); + UNIT_ASSERT(d2.is_array()); + UNIT_ASSERT_VALUES_EQUAL(d2.array()->length, 3); + UNIT_ASSERT_VALUES_EQUAL(d2.array()->GetNullCount(), 0); + auto flat = d2.array()->GetValues<ui64>(1); + UNIT_ASSERT_VALUES_EQUAL(flat[0], 10); + UNIT_ASSERT_VALUES_EQUAL(flat[1], 20); + UNIT_ASSERT_VALUES_EQUAL(flat[2], 30); + } + } }; UNIT_TEST_SUITE_REGISTRATION(TMiniKQLValueBuilderTest); diff --git a/ydb/library/yql/minikql/mkql_type_builder.cpp b/ydb/library/yql/minikql/mkql_type_builder.cpp index 6ead464302..426f4ff6ec 100644 --- a/ydb/library/yql/minikql/mkql_type_builder.cpp +++ b/ydb/library/yql/minikql/mkql_type_builder.cpp @@ -9,6 +9,8 @@ #include <ydb/library/yql/parser/pg_catalog/catalog.h> #include <array> +#include <arrow/c/bridge.h> + // TODO: remove const_casts namespace NKikimr { @@ -1266,10 +1268,102 @@ private: const NUdf::ICompare::TPtr Compare_; }; +////////////////////////////////////////////////////////////////////////////// +// TBlockTypeBuilder +////////////////////////////////////////////////////////////////////////////// +class TBlockTypeBuilder: public NUdf::IBlockTypeBuilder +{ +public: + TBlockTypeBuilder(const NMiniKQL::TFunctionTypeInfoBuilder& parent, bool isScalar) + : NUdf::IBlockTypeBuilder(isScalar) + , Parent_(parent) + { + } + + NUdf::IBlockTypeBuilder& Item(NUdf::TDataTypeId typeId) override { + ItemType_ = NMiniKQL::TDataType::Create(typeId, Parent_.Env()); + return *this; + } + + NUdf::IBlockTypeBuilder& Item(const NUdf::TType* type) override { + ItemType_ = static_cast<const NMiniKQL::TType*>(type); + return *this; + } + + NUdf::IBlockTypeBuilder& Item( + const NUdf::ITypeBuilder& typeBuilder) override + { + ItemType_ = static_cast<NMiniKQL::TType*>(typeBuilder.Build()); + return *this; + } + + NUdf::TType* Build() const override { + return NMiniKQL::TBlockType::Create( + const_cast<NMiniKQL::TType*>(ItemType_), + (IsScalar_ ? NMiniKQL::TBlockType::EShape::Scalar : NMiniKQL::TBlockType::EShape::Many), + Parent_.Env()); + } + +private: + const NMiniKQL::TFunctionTypeInfoBuilder& Parent_; + const NMiniKQL::TType* ItemType_ = nullptr; +}; + } // namespace namespace NMiniKQL { +bool ConvertArrowType(TType* itemType, bool& isOptional, std::shared_ptr<arrow::DataType>& type) { + auto unpacked = UnpackOptional(itemType, isOptional); + if (!unpacked->IsData()) { + return false; + } + + auto slot = AS_TYPE(TDataType, unpacked)->GetDataSlot(); + if (!slot) { + return false; + } + + switch (*slot) { + case NUdf::EDataSlot::Bool: + type = arrow::boolean(); + return true; + case NUdf::EDataSlot::Uint8: + type = arrow::uint8(); + return true; + case NUdf::EDataSlot::Int8: + type = arrow::int8(); + return true; + case NUdf::EDataSlot::Uint16: + type = arrow::uint16(); + return true; + case NUdf::EDataSlot::Int16: + type = arrow::int16(); + return true; + case NUdf::EDataSlot::Uint32: + type = arrow::uint32(); + return true; + case NUdf::EDataSlot::Int32: + type = arrow::int32(); + return true; + case NUdf::EDataSlot::Int64: + type = arrow::int64(); + return true; + case NUdf::EDataSlot::Uint64: + type = arrow::uint64(); + return true; + default: + return false; + } +} + +void TArrowType::Export(ArrowSchema* out) const { + auto status = arrow::ExportType(*Type, out); + if (!status.ok()) { + UdfTerminate(status.ToString().c_str()); + } +} + ////////////////////////////////////////////////////////////////////////////// // TFunctionTypeInfoBuilder ////////////////////////////////////////////////////////////////////////////// @@ -1341,10 +1435,34 @@ NUdf::TType* TFunctionTypeInfoBuilder::Tagged(const NUdf::TType* baseType, const return TTaggedType::Create(const_cast<TType*>(static_cast<const TType*>(baseType)), tag, Env_); } -NUdf::TType* TFunctionTypeInfoBuilder::TFunctionTypeInfoBuilder::Pg(ui32 typeId) const { +NUdf::TType* TFunctionTypeInfoBuilder::Pg(ui32 typeId) const { return TPgType::Create(typeId, Env_); } +NUdf::IBlockTypeBuilder::TPtr TFunctionTypeInfoBuilder::Block(bool isScalar) const { + return new TBlockTypeBuilder(*this, isScalar); +} + +NUdf::IArrowType::TPtr TFunctionTypeInfoBuilder::MakeArrowType(const NUdf::TType* type) const { + bool isOptional; + std::shared_ptr<arrow::DataType> arrowType; + if (!ConvertArrowType(const_cast<TType*>(static_cast<const TType*>(type)), isOptional, arrowType)) { + return nullptr; + } + + return new TArrowType(arrowType); +} + +NUdf::IArrowType::TPtr TFunctionTypeInfoBuilder::ImportArrowType(ArrowSchema* schema) const { + auto res = arrow::ImportType(schema); + auto status = res.status(); + if (!status.ok()) { + UdfTerminate(status.ToString().c_str()); + } + + return new TArrowType(std::move(res).ValueOrDie()); +} + bool TFunctionTypeInfoBuilder::GetSecureParam(NUdf::TStringRef key, NUdf::TStringRef& value) const { if (SecureParamsProvider_) return SecureParamsProvider_->GetSecureParam(key, value); @@ -1607,6 +1725,7 @@ NUdf::ETypeKind TTypeInfoHelper::GetTypeKind(const NUdf::TType* type) const { case NMiniKQL::TType::EKind::EmptyDict: return NUdf::ETypeKind::EmptyDict; case NMiniKQL::TType::EKind::Tagged: return NUdf::ETypeKind::Tagged; case NMiniKQL::TType::EKind::Pg: return NUdf::ETypeKind::Pg; + case NMiniKQL::TType::EKind::Block: return NUdf::ETypeKind::Block; default: Y_VERIFY_DEBUG(false, "Wrong MQKL type kind %s", mkqlType->GetKindAsStr().data()); return NUdf::ETypeKind::Unknown; @@ -1642,6 +1761,7 @@ case NMiniKQL::TType::EKind::TypeKind: { \ MKQL_HANDLE_UDF_TYPE(Resource) MKQL_HANDLE_UDF_TYPE(Tagged) MKQL_HANDLE_UDF_TYPE(Pg) + MKQL_HANDLE_UDF_TYPE(Block) default: Y_VERIFY_DEBUG(false, "Wrong MQKL type kind %s", mkqlType->GetKindAsStr().data()); } @@ -1762,6 +1882,12 @@ void TTypeInfoHelper::DoPg(const NMiniKQL::TPgType* tt, NUdf::ITypeVisitor* v) { } } +void TTypeInfoHelper::DoBlock(const NMiniKQL::TBlockType* tt, NUdf::ITypeVisitor* v) { + if (v->IsCompatibleTo(NUdf::MakeAbiCompatibilityVersion(2, 26))) { + v->OnBlock(tt->GetItemType(), tt->GetShape() == TBlockType::EShape::Scalar); + } +} + NUdf::IHash::TPtr MakeHashImpl(const NMiniKQL::TType* type) { switch (type->GetKind()) { case NMiniKQL::TType::EKind::Data: { diff --git a/ydb/library/yql/minikql/mkql_type_builder.h b/ydb/library/yql/minikql/mkql_type_builder.h index baf7b4106e..68fd699b2d 100644 --- a/ydb/library/yql/minikql/mkql_type_builder.h +++ b/ydb/library/yql/minikql/mkql_type_builder.h @@ -4,9 +4,29 @@ #include <ydb/library/yql/public/udf/udf_type_builder.h> +#include <arrow/datum.h> + namespace NKikimr { namespace NMiniKQL { +bool ConvertArrowType(TType* itemType, bool& isOptional, std::shared_ptr<arrow::DataType>& type); + +class TArrowType : public NUdf::IArrowType { +public: + TArrowType(const std::shared_ptr<arrow::DataType>& type) + : Type(type) + {} + + std::shared_ptr<arrow::DataType> GetType() const { + return Type; + } + + void Export(ArrowSchema* out) const final; + +private: + const std::shared_ptr<arrow::DataType> Type; +}; + ////////////////////////////////////////////////////////////////////////////// // TFunctionTypeInfo ////////////////////////////////////////////////////////////////////////////// @@ -118,6 +138,10 @@ public: NUdf::IEnumTypeBuilder::TPtr Enum(ui32 expectedItems = 10) const override; NUdf::TType* Tagged(const NUdf::TType* baseType, const NUdf::TStringRef& tag) const override; NUdf::TType* Pg(ui32 typeId) const override; + NUdf::IBlockTypeBuilder::TPtr Block(bool isScalar) const override; + NUdf::IArrowType::TPtr MakeArrowType(const NUdf::TType* type) const override; + NUdf::IArrowType::TPtr ImportArrowType(ArrowSchema* schema) const override; + bool GetSecureParam(NUdf::TStringRef key, NUdf::TStringRef& value) const override; private: @@ -162,6 +186,7 @@ private: static void DoResource(const NMiniKQL::TResourceType* rt, NUdf::ITypeVisitor* v); static void DoTagged(const NMiniKQL::TTaggedType* tt, NUdf::ITypeVisitor* v); static void DoPg(const NMiniKQL::TPgType* tt, NUdf::ITypeVisitor* v); + static void DoBlock(const NMiniKQL::TBlockType* tt, NUdf::ITypeVisitor* v); }; NUdf::IHash::TPtr MakeHashImpl(const NMiniKQL::TType* type); diff --git a/ydb/library/yql/minikql/mkql_type_builder_ut.cpp b/ydb/library/yql/minikql/mkql_type_builder_ut.cpp index c8d23b9416..cc18d49835 100644 --- a/ydb/library/yql/minikql/mkql_type_builder_ut.cpp +++ b/ydb/library/yql/minikql/mkql_type_builder_ut.cpp @@ -4,6 +4,8 @@ #include <ydb/library/yql/public/udf/udf_type_printer.h> #include <library/cpp/testing/unittest/registar.h> +#include <arrow/c/abi.h> + namespace NKikimr { namespace NMiniKQL { @@ -35,6 +37,8 @@ private: UNIT_TEST(TestVariantTypeFormat); UNIT_TEST(TestCallableTypeFormat); UNIT_TEST(TestDataTypeFormat); + UNIT_TEST(TestBlockTypeFormat); + UNIT_TEST(TestArrowType); UNIT_TEST_SUITE_END(); TString FormatType(NUdf::TType* t) { @@ -55,6 +59,17 @@ private: } } + void TestBlockTypeFormat() { + { + auto s = FormatType(FunctionTypeInfoBuilder.Block(true)->Item<ui32>().Build()); + UNIT_ASSERT_VALUES_EQUAL(s, "Scalar<Uint32>"); + } + { + auto s = FormatType(FunctionTypeInfoBuilder.Block(false)->Item<ui32>().Build()); + UNIT_ASSERT_VALUES_EQUAL(s, "Block<Uint32>"); + } + } + void TestSingularTypeFormat() { { auto s = FormatType(FunctionTypeInfoBuilder.Null()); @@ -321,6 +336,17 @@ private: UNIT_ASSERT_VALUES_EQUAL(s, "Decimal(7,3)"); } } + + void TestArrowType() { + auto type = FunctionTypeInfoBuilder.SimpleType<ui64>(); + auto atype1 = FunctionTypeInfoBuilder.MakeArrowType(type); + UNIT_ASSERT(atype1); + UNIT_ASSERT_VALUES_EQUAL(static_cast<TArrowType*>(atype1.Get())->GetType()->ToString(), std::string("uint64")); + ArrowSchema s; + atype1->Export(&s); + auto atype2 = FunctionTypeInfoBuilder.ImportArrowType(&s); + UNIT_ASSERT_VALUES_EQUAL(static_cast<TArrowType*>(atype2.Get())->GetType()->ToString(), std::string("uint64")); + } }; UNIT_TEST_SUITE_REGISTRATION(TMiniKQLTypeBuilderTest); diff --git a/ydb/library/yql/minikql/ut/CMakeLists.darwin.txt b/ydb/library/yql/minikql/ut/CMakeLists.darwin.txt index 6943a02d37..701d89f26a 100644 --- a/ydb/library/yql/minikql/ut/CMakeLists.darwin.txt +++ b/ydb/library/yql/minikql/ut/CMakeLists.darwin.txt @@ -20,6 +20,7 @@ target_link_libraries(ydb-library-yql-minikql-ut PUBLIC library-cpp-cpuid_check cpp-testing-unittest_main library-yql-minikql + libs-apache-arrow yql-minikql-computation yql-minikql-invoke_builtins yql-parser-pg_wrapper diff --git a/ydb/library/yql/minikql/ut/CMakeLists.linux.txt b/ydb/library/yql/minikql/ut/CMakeLists.linux.txt index 72e421f3c1..b28e9b8def 100644 --- a/ydb/library/yql/minikql/ut/CMakeLists.linux.txt +++ b/ydb/library/yql/minikql/ut/CMakeLists.linux.txt @@ -22,6 +22,7 @@ target_link_libraries(ydb-library-yql-minikql-ut PUBLIC library-cpp-cpuid_check cpp-testing-unittest_main library-yql-minikql + libs-apache-arrow yql-minikql-computation yql-minikql-invoke_builtins yql-parser-pg_wrapper diff --git a/ydb/library/yql/public/udf/udf_type_builder.h b/ydb/library/yql/public/udf/udf_type_builder.h index 5954430577..7355e380a0 100644 --- a/ydb/library/yql/public/udf/udf_type_builder.h +++ b/ydb/library/yql/public/udf/udf_type_builder.h @@ -9,6 +9,8 @@ #include <type_traits> +struct ArrowSchema; + namespace NYql { namespace NUdf { @@ -61,6 +63,12 @@ struct TTagged { using BaseType = T; }; template <ui32 TypeId> struct TPg; +template <typename T> +struct TBlockType { using ItemType = T; }; + +template <typename T> +struct TScalarType { using ItemType = T; }; + ////////////////////////////////////////////////////////////////////////////// // ITypeBuilder ////////////////////////////////////////////////////////////////////////////// @@ -413,6 +421,33 @@ public: UDF_ASSERT_TYPE_SIZE(ICompare, 16); ////////////////////////////////////////////////////////////////////////////// +// IBlockTypeBuilder +////////////////////////////////////////////////////////////////////////////// +class IBlockTypeBuilder: public ITypeBuilder +{ +public: + using TPtr = TUniquePtr<IBlockTypeBuilder>; + + explicit IBlockTypeBuilder(bool isScalar) + : IsScalar_(isScalar) + {} +public: + template <typename T, typename = std::enable_if_t<TKnownDataType<T>::Result>> + inline IBlockTypeBuilder& Item() { + return Item(TDataType<T>::Id); + } + + virtual IBlockTypeBuilder& Item(TDataTypeId type) = 0; + virtual IBlockTypeBuilder& Item(const TType* type) = 0; + virtual IBlockTypeBuilder& Item(const ITypeBuilder& type) = 0; + +protected: + bool IsScalar_; +}; + +UDF_ASSERT_TYPE_SIZE(IListTypeBuilder, 8); + +////////////////////////////////////////////////////////////////////////////// // IFunctionTypeInfoBuilder ////////////////////////////////////////////////////////////////////////////// namespace NImpl { @@ -594,7 +629,36 @@ public: }; #endif -#if UDF_ABI_COMPATIBILITY_VERSION_CURRENT >= UDF_ABI_COMPATIBILITY_VERSION(2, 25) +#if UDF_ABI_COMPATIBILITY_VERSION_CURRENT >= UDF_ABI_COMPATIBILITY_VERSION(2, 26) + +////////////////////////////////////////////////////////////////////////////// +// IArrowType +////////////////////////////////////////////////////////////////////////////// +class IArrowType +{ +public: + using TPtr = TUniquePtr<IArrowType>; + + virtual ~IArrowType() = default; + + virtual void Export(ArrowSchema* out) const = 0; +}; + +UDF_ASSERT_TYPE_SIZE(IArrowType, 8); + +class IFunctionTypeInfoBuilder14: public IFunctionTypeInfoBuilder13 { +public: + virtual IBlockTypeBuilder::TPtr Block(bool isScalar) const = 0; + // returns nullptr if type isn't supported + virtual IArrowType::TPtr MakeArrowType(const TType* type) const = 0; + // The given ArrowSchema struct is released, even if this function fails. + virtual IArrowType::TPtr ImportArrowType(ArrowSchema* schema) const = 0; +}; +#endif + +#if UDF_ABI_COMPATIBILITY_VERSION_CURRENT >= UDF_ABI_COMPATIBILITY_VERSION(2, 26) +using IFunctionTypeInfoBuilderImpl = IFunctionTypeInfoBuilder14; +#elif UDF_ABI_COMPATIBILITY_VERSION_CURRENT >= UDF_ABI_COMPATIBILITY_VERSION(2, 25) using IFunctionTypeInfoBuilderImpl = IFunctionTypeInfoBuilder13; #elif UDF_ABI_COMPATIBILITY_VERSION_CURRENT >= UDF_ABI_COMPATIBILITY_VERSION(2, 22) using IFunctionTypeInfoBuilderImpl = IFunctionTypeInfoBuilder12; @@ -831,6 +895,26 @@ struct TTypeBuilderHelper<TPg<TypeId>> { }; #endif +#if UDF_ABI_COMPATIBILITY_VERSION_CURRENT >= UDF_ABI_COMPATIBILITY_VERSION(2, 26) +template <typename T> +struct TTypeBuilderHelper<TBlockType<T>> { + static TType* Build(const IFunctionTypeInfoBuilder& builder) { + return builder.Block(false)-> + Item(TTypeBuilderHelper<T>::Build(builder)) + .Build(); + } +}; + +template <typename T> +struct TTypeBuilderHelper<TScalarType<T>> { + static TType* Build(const IFunctionTypeInfoBuilder& builder) { + return builder.Block(true)-> + Item(TTypeBuilderHelper<T>::Build(builder)) + .Build(); + } +}; +#endif + template <> struct TCallableArgsHelper<> { static void Arg( diff --git a/ydb/library/yql/public/udf/udf_type_inspection.h b/ydb/library/yql/public/udf/udf_type_inspection.h index 4363d6ad84..c1fb661d03 100644 --- a/ydb/library/yql/public/udf/udf_type_inspection.h +++ b/ydb/library/yql/public/udf/udf_type_inspection.h @@ -67,7 +67,17 @@ public: }; #endif -#if UDF_ABI_COMPATIBILITY_VERSION_CURRENT >= UDF_ABI_COMPATIBILITY_VERSION(2, 25) +#if UDF_ABI_COMPATIBILITY_VERSION_CURRENT >= UDF_ABI_COMPATIBILITY_VERSION(2, 26) +class TStubTypeVisitor6: public TStubTypeVisitor5 +{ +public: + void OnBlock(const TType* itemType, bool isScalar) override; +}; +#endif + +#if UDF_ABI_COMPATIBILITY_VERSION_CURRENT >= UDF_ABI_COMPATIBILITY_VERSION(2, 26) +using TStubTypeVisitor = TStubTypeVisitor6; +#elif UDF_ABI_COMPATIBILITY_VERSION_CURRENT >= UDF_ABI_COMPATIBILITY_VERSION(2, 25) using TStubTypeVisitor = TStubTypeVisitor5; #elif UDF_ABI_COMPATIBILITY_VERSION_CURRENT >= UDF_ABI_COMPATIBILITY_VERSION(2, 21) using TStubTypeVisitor = TStubTypeVisitor4; @@ -483,6 +493,35 @@ private: }; #endif +#if UDF_ABI_COMPATIBILITY_VERSION_CURRENT >= UDF_ABI_COMPATIBILITY_VERSION(2, 26) +////////////////////////////////////////////////////////////////////////////// +// TBlockTypeInspector +////////////////////////////////////////////////////////////////////////////// +class TBlockTypeInspector: public TStubTypeVisitor +{ +public: + TBlockTypeInspector(const ITypeInfoHelper1& typeHelper, const TType* type) { + if (typeHelper.GetTypeKind(type) == ETypeKind::Block) { + typeHelper.VisitType(type, this); + } + } + + explicit operator bool() const { return ItemType_ != 0; } + const TType* GetItemType() const { return ItemType_; } + bool IsScalar() const { return IsScalar_; } + +private: + void OnBlock(const TType* itemType, bool isScalar) override { + ItemType_ = itemType; + IsScalar_ = isScalar; + } + +private: + const TType* ItemType_ = nullptr; + bool IsScalar_ = false; +}; +#endif + inline void TStubTypeVisitor1::OnDataType(TDataTypeId typeId) { Y_UNUSED(typeId); @@ -563,6 +602,13 @@ inline void TStubTypeVisitor5::OnPg(ui32) { Y_FAIL("Not implemented"); } #endif +#if UDF_ABI_COMPATIBILITY_VERSION_CURRENT >= UDF_ABI_COMPATIBILITY_VERSION(2, 26) +inline void TStubTypeVisitor6::OnBlock(const TType* itemType, bool isScalar) { + Y_UNUSED(itemType); + Y_UNUSED(isScalar); + Y_FAIL("Not implemented"); +} +#endif } // namspace NUdf } // namspace NYql diff --git a/ydb/library/yql/public/udf/udf_type_printer.cpp b/ydb/library/yql/public/udf/udf_type_printer.cpp index 8d994650c3..672bdf7e70 100644 --- a/ydb/library/yql/public/udf/udf_type_printer.cpp +++ b/ydb/library/yql/public/udf/udf_type_printer.cpp @@ -184,5 +184,11 @@ void TTypePrinter5::OnPgImpl(ui32 typeId) { *Output_ << "pg" << name; } +void TTypePrinter6::OnBlockImpl(const TType* itemType, bool isScalar) { + *Output_ << (isScalar ? "Scalar<" : "Block<"); + OutImpl(itemType); + *Output_ << '>'; +} + } } diff --git a/ydb/library/yql/public/udf/udf_type_printer.h b/ydb/library/yql/public/udf/udf_type_printer.h index cc51b0c442..67c3ae7abd 100644 --- a/ydb/library/yql/public/udf/udf_type_printer.h +++ b/ydb/library/yql/public/udf/udf_type_printer.h @@ -2,11 +2,12 @@ #include "udf_string_ref.h" #include "udf_types.h" +#include "udf_type_inspection.h" namespace NYql { namespace NUdf { -class TTypePrinter1 : private ITypeVisitor +class TTypePrinter1 : private TStubTypeVisitor { public: TTypePrinter1(const ITypeInfoHelper1& typeHelper, const TType* type); @@ -78,7 +79,6 @@ protected: #if UDF_ABI_COMPATIBILITY_VERSION_CURRENT >= UDF_ABI_COMPATIBILITY_VERSION(2, 25) class TTypePrinter5 : public TTypePrinter4 { public: - using TTypePrinter4::TTypePrinter4; TTypePrinter5(const ITypeInfoHelper2& typeHelper, const TType* type); protected: @@ -94,7 +94,24 @@ private: }; #endif -#if UDF_ABI_COMPATIBILITY_VERSION_CURRENT >= UDF_ABI_COMPATIBILITY_VERSION(2, 25) +#if UDF_ABI_COMPATIBILITY_VERSION_CURRENT >= UDF_ABI_COMPATIBILITY_VERSION(2, 26) +class TTypePrinter6 : public TTypePrinter5 { +public: + using TTypePrinter5::TTypePrinter5; + +protected: + void OnBlock(const TType* itemType, bool isScalar) final { + OnBlockImpl(itemType, isScalar); + } + +private: + void OnBlockImpl(const TType* itemType, bool isScalar); +}; +#endif + +#if UDF_ABI_COMPATIBILITY_VERSION_CURRENT >= UDF_ABI_COMPATIBILITY_VERSION(2, 26) +using TTypePrinter = TTypePrinter6; +#elif UDF_ABI_COMPATIBILITY_VERSION_CURRENT >= UDF_ABI_COMPATIBILITY_VERSION(2, 25) using TTypePrinter = TTypePrinter5; #elif UDF_ABI_COMPATIBILITY_VERSION_CURRENT >= UDF_ABI_COMPATIBILITY_VERSION(2, 21) using TTypePrinter = TTypePrinter4; diff --git a/ydb/library/yql/public/udf/udf_types.h b/ydb/library/yql/public/udf/udf_types.h index 11298a7882..1da1cd8844 100644 --- a/ydb/library/yql/public/udf/udf_types.h +++ b/ydb/library/yql/public/udf/udf_types.h @@ -12,7 +12,29 @@ namespace NUdf { // opaque type info using TType = void; -#if UDF_ABI_COMPATIBILITY_VERSION_CURRENT >= UDF_ABI_COMPATIBILITY_VERSION(2, 25) +#if UDF_ABI_COMPATIBILITY_VERSION_CURRENT >= UDF_ABI_COMPATIBILITY_VERSION(2, 26) + +#define UDF_TYPE_KIND_MAP(XX) \ + XX(Unknown) \ + XX(Data) \ + XX(Struct) \ + XX(List) \ + XX(Optional) \ + XX(Tuple) \ + XX(Dict) \ + XX(Callable) \ + XX(Resource) \ + XX(Void) \ + XX(Variant) \ + XX(Stream) \ + XX(Null) \ + XX(EmptyList) \ + XX(EmptyDict) \ + XX(Tagged) \ + XX(Pg) \ + XX(Block) + +#elif UDF_ABI_COMPATIBILITY_VERSION_CURRENT >= UDF_ABI_COMPATIBILITY_VERSION(2, 25) #define UDF_TYPE_KIND_MAP(XX) \ XX(Unknown) \ @@ -183,7 +205,16 @@ public: }; #endif -#if UDF_ABI_COMPATIBILITY_VERSION_CURRENT >= UDF_ABI_COMPATIBILITY_VERSION(2, 25) +#if UDF_ABI_COMPATIBILITY_VERSION_CURRENT >= UDF_ABI_COMPATIBILITY_VERSION(2, 26) +class ITypeVisitor6: public ITypeVisitor5 { +public: + virtual void OnBlock(const TType* itemType, bool isScalar) = 0; +}; +#endif + +#if UDF_ABI_COMPATIBILITY_VERSION_CURRENT >= UDF_ABI_COMPATIBILITY_VERSION(2, 26) +using ITypeVisitor = ITypeVisitor6; +#elif UDF_ABI_COMPATIBILITY_VERSION_CURRENT >= UDF_ABI_COMPATIBILITY_VERSION(2, 25) using ITypeVisitor = ITypeVisitor5; #elif UDF_ABI_COMPATIBILITY_VERSION_CURRENT >= UDF_ABI_COMPATIBILITY_VERSION(2, 21) using ITypeVisitor = ITypeVisitor4; diff --git a/ydb/library/yql/public/udf/udf_value_builder.h b/ydb/library/yql/public/udf/udf_value_builder.h index 9249377ee9..557b481faa 100644 --- a/ydb/library/yql/public/udf/udf_value_builder.h +++ b/ydb/library/yql/public/udf/udf_value_builder.h @@ -9,9 +9,13 @@ #include <array> +struct ArrowArray; + namespace NYql { namespace NUdf { +class IArrowType; + /////////////////////////////////////////////////////////////////////////////// // IDictValueBuilder /////////////////////////////////////////////////////////////////////////////// @@ -192,8 +196,10 @@ public: #if UDF_ABI_COMPATIBILITY_VERSION_CURRENT >= UDF_ABI_COMPATIBILITY_VERSION(2, 19) class IValueBuilder5: public IValueBuilder4 { public: - virtual void Unused1() const = 0; - virtual void Unused2() const = 0; + // returns array with one element for scalars + virtual void ExportArrowBlock(TUnboxedValuePod value, bool& isScalar, ArrowArray* out) const = 0; + // the ArrowArray struct has its contents moved to a private object held alive by the result. + virtual TUnboxedValue ImportArrowBlock(ArrowArray* array, const IArrowType& type, bool isScalar) const = 0; virtual void Unused3() const = 0; }; #endif diff --git a/ydb/library/yql/public/udf/udf_value_builder_ut.cpp b/ydb/library/yql/public/udf/udf_value_builder_ut.cpp index afbf6c1e01..94dec78cbc 100644 --- a/ydb/library/yql/public/udf/udf_value_builder_ut.cpp +++ b/ydb/library/yql/public/udf/udf_value_builder_ut.cpp @@ -25,8 +25,8 @@ Y_UNIT_TEST_SUITE(TUdfValueBuilder) { UNIT_ASSERT_VALUES_EQUAL(17, GetMethodIndex(&IValueBuilder::GetSecureParam)); UNIT_ASSERT_VALUES_EQUAL(18, GetMethodIndex(&IValueBuilder::CalleePosition)); UNIT_ASSERT_VALUES_EQUAL(19, GetMethodIndex(&IValueBuilder::Run)); - UNIT_ASSERT_VALUES_EQUAL(20, GetMethodIndex(&IValueBuilder::Unused1)); - UNIT_ASSERT_VALUES_EQUAL(21, GetMethodIndex(&IValueBuilder::Unused2)); + UNIT_ASSERT_VALUES_EQUAL(20, GetMethodIndex(&IValueBuilder::ExportArrowBlock)); + UNIT_ASSERT_VALUES_EQUAL(21, GetMethodIndex(&IValueBuilder::ImportArrowBlock)); UNIT_ASSERT_VALUES_EQUAL(22, GetMethodIndex(&IValueBuilder::Unused3)); UNIT_ASSERT_VALUES_EQUAL(23, GetMethodIndex(&IValueBuilder::GetPgBuilder)); } diff --git a/ydb/library/yql/public/udf/udf_version.h b/ydb/library/yql/public/udf/udf_version.h index 3573592843..39fde6b891 100644 --- a/ydb/library/yql/public/udf/udf_version.h +++ b/ydb/library/yql/public/udf/udf_version.h @@ -7,7 +7,7 @@ namespace NYql { namespace NUdf { #define CURRENT_UDF_ABI_VERSION_MAJOR 2 -#define CURRENT_UDF_ABI_VERSION_MINOR 25 +#define CURRENT_UDF_ABI_VERSION_MINOR 26 #define CURRENT_UDF_ABI_VERSION_PATCH 0 #ifdef USE_CURRENT_UDF_ABI_VERSION diff --git a/ydb/library/yql/udfs/common/clickhouse/client/CMakeLists.darwin.txt b/ydb/library/yql/udfs/common/clickhouse/client/CMakeLists.darwin.txt index 1f4be5d5c5..623f424687 100644 --- a/ydb/library/yql/udfs/common/clickhouse/client/CMakeLists.darwin.txt +++ b/ydb/library/yql/udfs/common/clickhouse/client/CMakeLists.darwin.txt @@ -57,7 +57,7 @@ target_compile_options(clickhouse_client_udf.global PRIVATE -DDBMS_VERSION_PATCH=0 -Wno-unused-parameter -DUDF_ABI_VERSION_MAJOR=2 - -DUDF_ABI_VERSION_MINOR=25 + -DUDF_ABI_VERSION_MINOR=26 -DUDF_ABI_VERSION_PATCH=0 ) target_include_directories(clickhouse_client_udf.global PUBLIC diff --git a/ydb/library/yql/udfs/common/clickhouse/client/CMakeLists.linux.txt b/ydb/library/yql/udfs/common/clickhouse/client/CMakeLists.linux.txt index e9cd1e0586..9c7972634f 100644 --- a/ydb/library/yql/udfs/common/clickhouse/client/CMakeLists.linux.txt +++ b/ydb/library/yql/udfs/common/clickhouse/client/CMakeLists.linux.txt @@ -57,7 +57,7 @@ target_compile_options(clickhouse_client_udf.global PRIVATE -DDBMS_VERSION_PATCH=0 -Wno-unused-parameter -DUDF_ABI_VERSION_MAJOR=2 - -DUDF_ABI_VERSION_MINOR=25 + -DUDF_ABI_VERSION_MINOR=26 -DUDF_ABI_VERSION_PATCH=0 ) target_include_directories(clickhouse_client_udf.global PUBLIC |