diff options
| author | Daniil Timizhev <[email protected]> | 2025-10-24 17:55:02 +0300 |
|---|---|---|
| committer | GitHub <[email protected]> | 2025-10-24 17:55:02 +0300 |
| commit | 890dd192ecf76a57f470b5f7edf49ac0ec802689 (patch) | |
| tree | 3b716d6c89b1d1b021cad0e178855978c7082728 | |
| parent | 03c8f256665d13eb2b083f446eeebe10f62db64e (diff) | |
Impl DataType public mapping from MKQL to Arrow (#27295)
6 files changed, 1114 insertions, 596 deletions
diff --git a/ydb/core/kqp/common/result_set_format/kqp_result_set_arrow.cpp b/ydb/core/kqp/common/result_set_format/kqp_result_set_arrow.cpp index 62a0a397f3e..51db4942a44 100644 --- a/ydb/core/kqp/common/result_set_format/kqp_result_set_arrow.cpp +++ b/ydb/core/kqp/common/result_set_format/kqp_result_set_arrow.cpp @@ -2,6 +2,9 @@ #include <ydb/public/lib/scheme_types/scheme_type_id.h> #include <yql/essentials/minikql/mkql_string_util.h> +#include <yql/essentials/types/binary_json/read.h> +#include <yql/essentials/types/binary_json/write.h> +#include <yql/essentials/types/dynumber/dynumber.h> #include <yql/essentials/utils/yql_panic.h> namespace NKikimr::NKqp::NFormats { @@ -57,11 +60,11 @@ bool SwitchMiniKQLDataTypeToArrowType(NUdf::EDataSlot type, TFunc &&callback) { return callback(TTypeWrapper<arrow::DoubleType>()); case NUdf::EDataSlot::Utf8: case NUdf::EDataSlot::Json: + case NUdf::EDataSlot::DyNumber: + case NUdf::EDataSlot::JsonDocument: return callback(TTypeWrapper<arrow::StringType>()); case NUdf::EDataSlot::String: - case NUdf::EDataSlot::DyNumber: case NUdf::EDataSlot::Yson: - case NUdf::EDataSlot::JsonDocument: return callback(TTypeWrapper<arrow::BinaryType>()); case NUdf::EDataSlot::Decimal: case NUdf::EDataSlot::Uuid: @@ -76,26 +79,37 @@ bool SwitchMiniKQLDataTypeToArrowType(NUdf::EDataSlot type, TFunc &&callback) { } } -bool NeedWrapByExternalOptional(const NMiniKQL::TType *type) { +bool NeedWrapByExternalOptional(const NMiniKQL::TType* type) { switch (type->GetKind()) { case NMiniKQL::TType::EKind::Void: case NMiniKQL::TType::EKind::Null: case NMiniKQL::TType::EKind::Variant: case NMiniKQL::TType::EKind::Optional: - return true; case NMiniKQL::TType::EKind::EmptyList: case NMiniKQL::TType::EKind::EmptyDict: + return true; case NMiniKQL::TType::EKind::Data: case NMiniKQL::TType::EKind::Struct: case NMiniKQL::TType::EKind::Tuple: case NMiniKQL::TType::EKind::List: case NMiniKQL::TType::EKind::Dict: + case NMiniKQL::TType::EKind::Tagged: return false; - default: + case NMiniKQL::TType::EKind::Type: + case NMiniKQL::TType::EKind::Stream: + case NMiniKQL::TType::EKind::Callable: + case NMiniKQL::TType::EKind::Any: + case NMiniKQL::TType::EKind::Resource: + case NMiniKQL::TType::EKind::Flow: + case NMiniKQL::TType::EKind::ReservedKind: + case NMiniKQL::TType::EKind::Block: + case NMiniKQL::TType::EKind::Pg: + case NMiniKQL::TType::EKind::Multi: + case NMiniKQL::TType::EKind::Linear: YQL_ENSURE(false, "Unsupported type: " << type->GetKindAsStr()); + return false; } - - return true; + return false; } template <typename TType> @@ -139,12 +153,12 @@ std::shared_ptr<arrow::DataType> CreateEmptyArrowImpl<arrow::StructType>(NUdf::E std::vector<std::shared_ptr<arrow::Field>> fields{ std::make_shared<arrow::Field>("datetime", type, false), - std::make_shared<arrow::Field>("timezoneId", arrow::uint16(), false), + std::make_shared<arrow::Field>("timezone", arrow::utf8(), false), }; return arrow::struct_(fields); } -std::shared_ptr<arrow::DataType> GetArrowType(const NMiniKQL::TDataType *dataType) { +std::shared_ptr<arrow::DataType> GetArrowType(const NMiniKQL::TDataType* dataType) { std::shared_ptr<arrow::DataType> result; bool success = SwitchMiniKQLDataTypeToArrowType(*dataType->GetDataSlot().Get(), [&]<typename TType>(TTypeWrapper<TType> typeHolder) { @@ -158,7 +172,7 @@ std::shared_ptr<arrow::DataType> GetArrowType(const NMiniKQL::TDataType *dataTyp return std::make_shared<arrow::NullType>(); } -std::shared_ptr<arrow::DataType> GetArrowType(const NMiniKQL::TStructType *structType) { +std::shared_ptr<arrow::DataType> GetArrowType(const NMiniKQL::TStructType* structType) { std::vector<std::shared_ptr<arrow::Field>> fields; fields.reserve(structType->GetMembersCount()); for (ui32 index = 0; index < structType->GetMembersCount(); ++index) { @@ -171,7 +185,7 @@ std::shared_ptr<arrow::DataType> GetArrowType(const NMiniKQL::TStructType *struc return arrow::struct_(fields); } -std::shared_ptr<arrow::DataType> GetArrowType(const NMiniKQL::TTupleType *tupleType) { +std::shared_ptr<arrow::DataType> GetArrowType(const NMiniKQL::TTupleType* tupleType) { std::vector<std::shared_ptr<arrow::Field>> fields; fields.reserve(tupleType->GetElementsCount()); for (ui32 index = 0; index < tupleType->GetElementsCount(); ++index) { @@ -184,14 +198,14 @@ std::shared_ptr<arrow::DataType> GetArrowType(const NMiniKQL::TTupleType *tupleT return arrow::struct_(fields); } -std::shared_ptr<arrow::DataType> GetArrowType(const NMiniKQL::TListType *listType) { +std::shared_ptr<arrow::DataType> GetArrowType(const NMiniKQL::TListType* listType) { auto itemType = listType->GetItemType(); auto itemArrowType = NFormats::GetArrowType(itemType); auto field = std::make_shared<arrow::Field>("item", itemArrowType, itemType->IsOptional()); return arrow::list(field); } -std::shared_ptr<arrow::DataType> GetArrowType(const NMiniKQL::TDictType *dictType) { +std::shared_ptr<arrow::DataType> GetArrowType(const NMiniKQL::TDictType* dictType) { auto keyType = dictType->GetKeyType(); auto payloadType = dictType->GetPayloadType(); @@ -213,17 +227,17 @@ std::shared_ptr<arrow::DataType> GetArrowType(const NMiniKQL::TDictType *dictTyp return arrow::struct_({fieldMap, custom}); } -std::shared_ptr<arrow::DataType> GetArrowType(const NMiniKQL::TVariantType *variantType) { - NMiniKQL::TType *innerType = variantType->GetUnderlyingType(); +std::shared_ptr<arrow::DataType> GetArrowType(const NMiniKQL::TVariantType* variantType) { + NMiniKQL::TType* innerType = variantType->GetUnderlyingType(); arrow::FieldVector types; - NMiniKQL::TStructType *structType = nullptr; - NMiniKQL::TTupleType *tupleType = nullptr; + NMiniKQL::TStructType* structType = nullptr; + NMiniKQL::TTupleType* tupleType = nullptr; if (innerType->IsStruct()) { - structType = static_cast<NMiniKQL::TStructType *>(innerType); + structType = static_cast<NMiniKQL::TStructType*>(innerType); } else { YQL_ENSURE(innerType->IsTuple(), "Unexpected underlying variant type: " << innerType->GetKindAsStr()); - tupleType = static_cast<NMiniKQL::TTupleType *>(innerType); + tupleType = static_cast<NMiniKQL::TTupleType*>(innerType); } // Create Union of unions if there are more types then arrow::dense_union supports. @@ -272,12 +286,12 @@ std::shared_ptr<arrow::DataType> GetArrowType(const NMiniKQL::TVariantType *vari return arrow::dense_union(types); } -std::shared_ptr<arrow::DataType> GetArrowType(const NMiniKQL::TOptionalType *optionalType) { +std::shared_ptr<arrow::DataType> GetArrowType(const NMiniKQL::TOptionalType* optionalType) { auto currentType = optionalType->GetItemType(); ui32 depth = 1; while (currentType->IsOptional()) { - currentType = static_cast<const NMiniKQL::TOptionalType *>(currentType)->GetItemType(); + currentType = static_cast<const NMiniKQL::TOptionalType*>(currentType)->GetItemType(); ++depth; } @@ -296,8 +310,9 @@ std::shared_ptr<arrow::DataType> GetArrowType(const NMiniKQL::TOptionalType *opt } template <typename TArrowType> -void AppendDataValue(arrow::ArrayBuilder *builder, NUdf::TUnboxedValue value) { - auto typedBuilder = reinterpret_cast<typename arrow::TypeTraits<TArrowType>::BuilderType *>(builder); +void AppendDataValue(arrow::ArrayBuilder* builder, NUdf::TUnboxedValue value, NUdf::EDataSlot dataSlot) { + Y_UNUSED(dataSlot); + auto typedBuilder = reinterpret_cast<typename arrow::TypeTraits<TArrowType>::BuilderType*>(builder); arrow::Status status; if (!value.HasValue()) { status = typedBuilder->AppendNull(); @@ -308,9 +323,10 @@ void AppendDataValue(arrow::ArrayBuilder *builder, NUdf::TUnboxedValue value) { } template <> -void AppendDataValue<arrow::UInt64Type>(arrow::ArrayBuilder *builder, NUdf::TUnboxedValue value) { +void AppendDataValue<arrow::UInt64Type>(arrow::ArrayBuilder* builder, NUdf::TUnboxedValue value, NUdf::EDataSlot dataSlot) { + Y_UNUSED(dataSlot); YQL_ENSURE(builder->type()->id() == arrow::Type::UINT64, "Unexpected builder type"); - auto typedBuilder = reinterpret_cast<arrow::UInt64Builder *>(builder); + auto typedBuilder = reinterpret_cast<arrow::UInt64Builder*>(builder); arrow::Status status; if (!value.HasValue()) { status = typedBuilder->AppendNull(); @@ -321,9 +337,10 @@ void AppendDataValue<arrow::UInt64Type>(arrow::ArrayBuilder *builder, NUdf::TUnb } template <> -void AppendDataValue<arrow::Int64Type>(arrow::ArrayBuilder *builder, NUdf::TUnboxedValue value) { +void AppendDataValue<arrow::Int64Type>(arrow::ArrayBuilder* builder, NUdf::TUnboxedValue value, NUdf::EDataSlot dataSlot) { + Y_UNUSED(dataSlot); YQL_ENSURE(builder->type()->id() == arrow::Type::INT64, "Unexpected builder type"); - auto typedBuilder = reinterpret_cast<arrow::Int64Builder *>(builder); + auto typedBuilder = reinterpret_cast<arrow::Int64Builder*>(builder); arrow::Status status; if (!value.HasValue()) { status = typedBuilder->AppendNull(); @@ -334,23 +351,48 @@ void AppendDataValue<arrow::Int64Type>(arrow::ArrayBuilder *builder, NUdf::TUnbo } template <> -void AppendDataValue<arrow::StringType>(arrow::ArrayBuilder *builder, NUdf::TUnboxedValue value) { +void AppendDataValue<arrow::StringType>(arrow::ArrayBuilder* builder, NUdf::TUnboxedValue value, NUdf::EDataSlot dataSlot) { YQL_ENSURE(builder->type()->id() == arrow::Type::STRING, "Unexpected builder type"); - auto typedBuilder = reinterpret_cast<arrow::StringBuilder *>(builder); + auto typedBuilder = reinterpret_cast<arrow::StringBuilder*>(builder); arrow::Status status; if (!value.HasValue()) { status = typedBuilder->AppendNull(); } else { - auto data = value.AsStringRef(); - status = typedBuilder->Append(data.Data(), data.Size()); + switch (dataSlot) { + case NUdf::EDataSlot::Utf8: + case NUdf::EDataSlot::Json: { + auto data = value.AsStringRef(); + status = typedBuilder->Append(data.Data(), data.Size()); + break; + } + + case NUdf::EDataSlot::JsonDocument: { + YQL_ENSURE(NBinaryJson::IsValidBinaryJson(value.AsStringRef())); + auto textJson = NBinaryJson::SerializeToJson(value.AsStringRef()); + status = typedBuilder->Append(textJson.data(), textJson.size()); + break; + } + + case NUdf::EDataSlot::DyNumber: { + auto number = NDyNumber::DyNumberToString(value.AsStringRef()); + YQL_ENSURE(number.Defined(), "Failed to convert DyNumber to string"); + status = typedBuilder->Append(number->data(), number->size()); + break; + } + + default: { + YQL_ENSURE(false, "Unexpected data slot"); + } + } } YQL_ENSURE(status.ok(), "Failed to append data value: " << status.ToString()); } template <> -void AppendDataValue<arrow::BinaryType>(arrow::ArrayBuilder *builder, NUdf::TUnboxedValue value) { +void AppendDataValue<arrow::BinaryType>(arrow::ArrayBuilder* builder, NUdf::TUnboxedValue value, NUdf::EDataSlot dataSlot) { + Y_UNUSED(dataSlot); YQL_ENSURE(builder->type()->id() == arrow::Type::BINARY, "Unexpected builder type"); - auto typedBuilder = reinterpret_cast<arrow::BinaryBuilder *>(builder); + auto typedBuilder = reinterpret_cast<arrow::BinaryBuilder*>(builder); arrow::Status status; if (!value.HasValue()) { status = typedBuilder->AppendNull(); @@ -363,9 +405,10 @@ void AppendDataValue<arrow::BinaryType>(arrow::ArrayBuilder *builder, NUdf::TUnb // Only for timezone datetime types template <> -void AppendDataValue<arrow::StructType>(arrow::ArrayBuilder *builder, NUdf::TUnboxedValue value) { +void AppendDataValue<arrow::StructType>(arrow::ArrayBuilder* builder, NUdf::TUnboxedValue value, NUdf::EDataSlot dataSlot) { + Y_UNUSED(dataSlot); YQL_ENSURE(builder->type()->id() == arrow::Type::STRUCT, "Unexpected builder type"); - auto typedBuilder = reinterpret_cast<arrow::StructBuilder *>(builder); + auto typedBuilder = reinterpret_cast<arrow::StructBuilder*>(builder); YQL_ENSURE(typedBuilder->num_fields() == 2, "StructBuilder of timezone datetime types should have 2 fields"); if (!value.HasValue()) { @@ -378,50 +421,56 @@ void AppendDataValue<arrow::StructType>(arrow::ArrayBuilder *builder, NUdf::TUnb YQL_ENSURE(status.ok(), "Failed to append data value: " << status.ToString()); auto datetimeArray = typedBuilder->field_builder(0); - auto timezoneArray = reinterpret_cast<arrow::UInt16Builder *>(typedBuilder->field_builder(1)); + auto timezoneArray = reinterpret_cast<arrow::StringBuilder*>(typedBuilder->field_builder(1)); - switch (datetimeArray->type()->id()) { - // NUdf::EDataSlot::TzDate - case arrow::Type::UINT16: { - status = reinterpret_cast<arrow::UInt16Builder *>(datetimeArray)->Append(value.Get<ui16>()); + switch (dataSlot) { + case NUdf::EDataSlot::TzDate: { + YQL_ENSURE(datetimeArray->type()->id() == arrow::Type::UINT16); + status = reinterpret_cast<arrow::UInt16Builder*>(datetimeArray)->Append(value.Get<ui16>()); break; } - // NUdf::EDataSlot::TzDatetime - case arrow::Type::UINT32: { - status = reinterpret_cast<arrow::UInt32Builder *>(datetimeArray)->Append(value.Get<ui32>()); + + case NUdf::EDataSlot::TzDatetime: { + YQL_ENSURE(datetimeArray->type()->id() == arrow::Type::UINT32); + status = reinterpret_cast<arrow::UInt32Builder*>(datetimeArray)->Append(value.Get<ui32>()); break; } - // NUdf::EDataSlot::TzTimestamp - case arrow::Type::UINT64: { - status = reinterpret_cast<arrow::UInt64Builder *>(datetimeArray)->Append(value.Get<ui64>()); + + case NUdf::EDataSlot::TzTimestamp: { + YQL_ENSURE(datetimeArray->type()->id() == arrow::Type::UINT64); + status = reinterpret_cast<arrow::UInt64Builder*>(datetimeArray)->Append(value.Get<ui64>()); break; } - // NUdf::EDataSlot::TzDate32 - case arrow::Type::INT32: { - status = reinterpret_cast<arrow::Int32Builder *>(datetimeArray)->Append(value.Get<i32>()); + + case NUdf::EDataSlot::TzDate32: { + YQL_ENSURE(datetimeArray->type()->id() == arrow::Type::INT32); + status = reinterpret_cast<arrow::Int32Builder*>(datetimeArray)->Append(value.Get<i32>()); break; } - // NUdf::EDataSlot::TzDatetime64, NUdf::EDataSlot::TzTimestamp64 - case arrow::Type::INT64: { - status = reinterpret_cast<arrow::Int64Builder *>(datetimeArray)->Append(value.Get<i64>()); + + case NUdf::EDataSlot::TzDatetime64: + case NUdf::EDataSlot::TzTimestamp64: { + YQL_ENSURE(datetimeArray->type()->id() == arrow::Type::INT64); + status = reinterpret_cast<arrow::Int64Builder*>(datetimeArray)->Append(value.Get<i64>()); break; } - default: + + default: { YQL_ENSURE(false, "Unexpected timezone datetime slot"); return; + } } YQL_ENSURE(status.ok(), "Failed to append data value: " << status.ToString()); - status = timezoneArray->Append(value.GetTimezoneId()); + auto tzName = NMiniKQL::GetTimezoneIANAName(value.GetTimezoneId()); + status = timezoneArray->Append(tzName.Data(), tzName.size()); YQL_ENSURE(status.ok(), "Failed to append data value: " << status.ToString()); } -template <typename TArrowType> -void AppendFixedSizeDataValue(arrow::ArrayBuilder *builder, NUdf::TUnboxedValue value, NUdf::EDataSlot dataSlot) { - static_assert(std::is_same_v<TArrowType, arrow::FixedSizeBinaryType>, "This function is only for FixedSizeBinaryType"); - +template <> +void AppendDataValue<arrow::FixedSizeBinaryType>(arrow::ArrayBuilder* builder, NUdf::TUnboxedValue value, NUdf::EDataSlot dataSlot) { YQL_ENSURE(builder->type()->id() == arrow::Type::FIXED_SIZE_BINARY, "Unexpected builder type"); - auto typedBuilder = reinterpret_cast<arrow::FixedSizeBinaryBuilder *>(builder); + auto typedBuilder = reinterpret_cast<arrow::FixedSizeBinaryBuilder*>(builder); arrow::Status status; if (!value.HasValue()) { @@ -432,7 +481,7 @@ void AppendFixedSizeDataValue(arrow::ArrayBuilder *builder, NUdf::TUnboxedValue status = typedBuilder->Append(data.Data()); } else if (dataSlot == NUdf::EDataSlot::Decimal) { auto intVal = value.GetInt128(); - status = typedBuilder->Append(reinterpret_cast<const char *>(&intVal)); + status = typedBuilder->Append(reinterpret_cast<const char*>(&intVal)); } else { YQL_ENSURE(false, "Unexpected data slot"); } @@ -442,51 +491,148 @@ void AppendFixedSizeDataValue(arrow::ArrayBuilder *builder, NUdf::TUnboxedValue } // namespace -std::shared_ptr<arrow::DataType> GetArrowType(const NMiniKQL::TType *type) { +std::shared_ptr<arrow::DataType> GetArrowType(const NMiniKQL::TType* type) { switch (type->GetKind()) { - case NMiniKQL::TType::EKind::Void: - case NMiniKQL::TType::EKind::Null: + case NMiniKQL::TType::EKind::Null: { return arrow::null(); + } + + case NMiniKQL::TType::EKind::Void: case NMiniKQL::TType::EKind::EmptyList: - case NMiniKQL::TType::EKind::EmptyDict: + case NMiniKQL::TType::EKind::EmptyDict: { return arrow::struct_({}); + } + case NMiniKQL::TType::EKind::Data: { - auto dataType = static_cast<const NMiniKQL::TDataType *>(type); + auto dataType = static_cast<const NMiniKQL::TDataType*>(type); return GetArrowType(dataType); } + case NMiniKQL::TType::EKind::Struct: { - auto structType = static_cast<const NMiniKQL::TStructType *>(type); + auto structType = static_cast<const NMiniKQL::TStructType*>(type); return GetArrowType(structType); } + case NMiniKQL::TType::EKind::Tuple: { - auto tupleType = static_cast<const NMiniKQL::TTupleType *>(type); + auto tupleType = static_cast<const NMiniKQL::TTupleType*>(type); return GetArrowType(tupleType); } + case NMiniKQL::TType::EKind::Optional: { - auto optionalType = static_cast<const NMiniKQL::TOptionalType *>(type); + auto optionalType = static_cast<const NMiniKQL::TOptionalType*>(type); return GetArrowType(optionalType); } + case NMiniKQL::TType::EKind::List: { - auto listType = static_cast<const NMiniKQL::TListType *>(type); + auto listType = static_cast<const NMiniKQL::TListType*>(type); return GetArrowType(listType); } + case NMiniKQL::TType::EKind::Dict: { - auto dictType = static_cast<const NMiniKQL::TDictType *>(type); + auto dictType = static_cast<const NMiniKQL::TDictType*>(type); return GetArrowType(dictType); } + case NMiniKQL::TType::EKind::Variant: { - auto variantType = static_cast<const NMiniKQL::TVariantType *>(type); + auto variantType = static_cast<const NMiniKQL::TVariantType*>(type); return GetArrowType(variantType); } - default: + + default: { YQL_ENSURE(false, "Unsupported type: " << type->GetKindAsStr()); + } } return arrow::null(); } -void AppendElement(NUdf::TUnboxedValue value, arrow::ArrayBuilder *builder, const NMiniKQL::TType *type) { +bool IsArrowCompatible(const NKikimr::NMiniKQL::TType* type) { switch (type->GetKind()) { case NMiniKQL::TType::EKind::Void: + case NMiniKQL::TType::EKind::Null: + case NMiniKQL::TType::EKind::EmptyList: + case NMiniKQL::TType::EKind::EmptyDict: + case NMiniKQL::TType::EKind::Data: { + return true; + } + + case NMiniKQL::TType::EKind::Struct: { + auto structType = static_cast<const NMiniKQL::TStructType*>(type); + bool isCompatible = true; + for (ui32 index = 0; index < structType->GetMembersCount(); ++index) { + auto memberType = structType->GetMemberType(index); + isCompatible = isCompatible && IsArrowCompatible(memberType); + } + return isCompatible; + } + + case NMiniKQL::TType::EKind::Tuple: { + auto tupleType = static_cast<const NMiniKQL::TTupleType*>(type); + bool isCompatible = true; + for (ui32 index = 0; index < tupleType->GetElementsCount(); ++index) { + auto elementType = tupleType->GetElementType(index); + isCompatible = isCompatible && IsArrowCompatible(elementType); + } + return isCompatible; + } + + case NMiniKQL::TType::EKind::Optional: { + auto optionalType = static_cast<const NMiniKQL::TOptionalType*>(type); + return IsArrowCompatible(optionalType->GetItemType()); + } + + case NMiniKQL::TType::EKind::List: { + auto listType = static_cast<const NMiniKQL::TListType*>(type); + auto itemType = listType->GetItemType(); + return IsArrowCompatible(itemType); + } + + case NMiniKQL::TType::EKind::Variant: { + auto variantType = static_cast<const NMiniKQL::TVariantType*>(type); + ui32 maxTypesCount = (arrow::UnionType::kMaxTypeCode + 1) * (arrow::UnionType::kMaxTypeCode + 1); + if (variantType->GetAlternativesCount() > maxTypesCount) { + return false; + } + + NMiniKQL::TType* innerType = variantType->GetUnderlyingType(); + if (innerType->IsStruct() || innerType->IsTuple()) { + return IsArrowCompatible(innerType); + } + + YQL_ENSURE(false, "Unexpected underlying variant type: " << innerType->GetKindAsStr()); + return false; + } + + case NMiniKQL::TType::EKind::Dict: { + auto dictType = static_cast<const NMiniKQL::TDictType*>(type); + auto keyType = dictType->GetKeyType(); + auto payloadType = dictType->GetPayloadType(); + return IsArrowCompatible(keyType) && IsArrowCompatible(payloadType); + } + + case NMiniKQL::TType::EKind::Tagged: { + auto taggedType = static_cast<const NMiniKQL::TTaggedType*>(type); + return IsArrowCompatible(taggedType->GetBaseType()); + } + + case NMiniKQL::TType::EKind::Type: + case NMiniKQL::TType::EKind::Stream: + case NMiniKQL::TType::EKind::Callable: + case NMiniKQL::TType::EKind::Any: + case NMiniKQL::TType::EKind::Resource: + case NMiniKQL::TType::EKind::Flow: + case NMiniKQL::TType::EKind::ReservedKind: + case NMiniKQL::TType::EKind::Block: + case NMiniKQL::TType::EKind::Pg: + case NMiniKQL::TType::EKind::Multi: + case NMiniKQL::TType::EKind::Linear: { + return false; + } + } + return true; +} + +void AppendElement(NUdf::TUnboxedValue value, arrow::ArrayBuilder* builder, const NMiniKQL::TType* type) { + switch (type->GetKind()) { case NMiniKQL::TType::EKind::Null: { YQL_ENSURE(builder->type()->id() == arrow::Type::NA, "Unexpected builder type"); auto status = builder->AppendNull(); @@ -494,25 +640,22 @@ void AppendElement(NUdf::TUnboxedValue value, arrow::ArrayBuilder *builder, cons break; } + case NMiniKQL::TType::EKind::Void: case NMiniKQL::TType::EKind::EmptyList: case NMiniKQL::TType::EKind::EmptyDict: { YQL_ENSURE(builder->type()->id() == arrow::Type::STRUCT, "Unexpected builder type"); - auto structBuilder = reinterpret_cast<arrow::StructBuilder *>(builder); + auto structBuilder = reinterpret_cast<arrow::StructBuilder*>(builder); auto status = structBuilder->Append(); - YQL_ENSURE(status.ok(), "Failed to append empty dict/list value: " << status.ToString()); + YQL_ENSURE(status.ok(), "Failed to append struct value of a singular type: " << status.ToString()); break; } case NMiniKQL::TType::EKind::Data: { - auto dataType = static_cast<const NMiniKQL::TDataType *>(type); + auto dataType = static_cast<const NMiniKQL::TDataType*>(type); auto slot = *dataType->GetDataSlot().Get(); bool success = SwitchMiniKQLDataTypeToArrowType( slot, [&]<typename TType>(TTypeWrapper<TType> typeHolder) { Y_UNUSED(typeHolder); - if constexpr (std::is_same_v<TType, arrow::FixedSizeBinaryType>) { - AppendFixedSizeDataValue<TType>(builder, value, slot); - } else { - AppendDataValue<TType>(builder, value); - } + AppendDataValue<TType>(builder, value, slot); return true; }); YQL_ENSURE(success, "Failed to append data value to arrow builder"); @@ -520,11 +663,11 @@ void AppendElement(NUdf::TUnboxedValue value, arrow::ArrayBuilder *builder, cons } case NMiniKQL::TType::EKind::Optional: { - auto innerType = static_cast<const NMiniKQL::TOptionalType *>(type)->GetItemType(); + auto innerType = static_cast<const NMiniKQL::TOptionalType*>(type)->GetItemType(); ui32 depth = 1; while (innerType->IsOptional()) { - innerType = static_cast<const NMiniKQL::TOptionalType *>(innerType) ->GetItemType(); + innerType = static_cast<const NMiniKQL::TOptionalType*>(innerType) ->GetItemType(); ++depth; } @@ -537,7 +680,7 @@ void AppendElement(NUdf::TUnboxedValue value, arrow::ArrayBuilder *builder, cons for (ui32 i = 1; i < depth; ++i) { YQL_ENSURE(innerBuilder->type()->id() == arrow::Type::STRUCT, "Unexpected builder type"); - auto structBuilder = reinterpret_cast<arrow::StructBuilder *>(innerBuilder); + auto structBuilder = reinterpret_cast<arrow::StructBuilder*>(innerBuilder); YQL_ENSURE(structBuilder->num_fields() == 1, "Unexpected number of fields"); if (!innerValue) { @@ -563,11 +706,11 @@ void AppendElement(NUdf::TUnboxedValue value, arrow::ArrayBuilder *builder, cons } case NMiniKQL::TType::EKind::List: { - auto listType = static_cast<const NMiniKQL::TListType *>(type); + auto listType = static_cast<const NMiniKQL::TListType*>(type); auto itemType = listType->GetItemType(); YQL_ENSURE(builder->type()->id() == arrow::Type::LIST, "Unexpected builder type"); - auto listBuilder = reinterpret_cast<arrow::ListBuilder *>(builder); + auto listBuilder = reinterpret_cast<arrow::ListBuilder*>(builder); auto status = listBuilder->Append(); YQL_ENSURE(status.ok(), "Failed to append list value: " << status.ToString()); @@ -589,10 +732,10 @@ void AppendElement(NUdf::TUnboxedValue value, arrow::ArrayBuilder *builder, cons } case NMiniKQL::TType::EKind::Struct: { - auto structType = static_cast<const NMiniKQL::TStructType *>(type); + auto structType = static_cast<const NMiniKQL::TStructType*>(type); YQL_ENSURE(builder->type()->id() == arrow::Type::STRUCT, "Unexpected builder type"); - auto structBuilder = reinterpret_cast<arrow::StructBuilder *>(builder); + auto structBuilder = reinterpret_cast<arrow::StructBuilder*>(builder); auto status = structBuilder->Append(); YQL_ENSURE(status.ok(), "Failed to append struct value: " << status.ToString()); @@ -607,10 +750,10 @@ void AppendElement(NUdf::TUnboxedValue value, arrow::ArrayBuilder *builder, cons } case NMiniKQL::TType::EKind::Tuple: { - auto tupleType = static_cast<const NMiniKQL::TTupleType *>(type); + auto tupleType = static_cast<const NMiniKQL::TTupleType*>(type); YQL_ENSURE(builder->type()->id() == arrow::Type::STRUCT, "Unexpected builder type"); - auto structBuilder = reinterpret_cast<arrow::StructBuilder *>(builder); + auto structBuilder = reinterpret_cast<arrow::StructBuilder*>(builder); auto status = structBuilder->Append(); YQL_ENSURE(status.ok(), "Failed to append tuple value: " << status.ToString()); @@ -625,16 +768,16 @@ void AppendElement(NUdf::TUnboxedValue value, arrow::ArrayBuilder *builder, cons } case NMiniKQL::TType::EKind::Dict: { - auto dictType = static_cast<const NMiniKQL::TDictType *>(type); + auto dictType = static_cast<const NMiniKQL::TDictType*>(type); auto keyType = dictType->GetKeyType(); auto payloadType = dictType->GetPayloadType(); - arrow::ArrayBuilder *keyBuilder = nullptr; - arrow::ArrayBuilder *itemBuilder = nullptr; - arrow::StructBuilder *structBuilder = nullptr; + arrow::ArrayBuilder* keyBuilder = nullptr; + arrow::ArrayBuilder* itemBuilder = nullptr; + arrow::StructBuilder* structBuilder = nullptr; YQL_ENSURE(builder->type()->id() == arrow::Type::STRUCT, "Unexpected builder type"); - arrow::StructBuilder *wrapBuilder = reinterpret_cast<arrow::StructBuilder *>(builder); + arrow::StructBuilder* wrapBuilder = reinterpret_cast<arrow::StructBuilder*>(builder); YQL_ENSURE(wrapBuilder->num_fields() == 2, "Unexpected number of fields"); auto status = wrapBuilder->Append(); @@ -642,13 +785,13 @@ void AppendElement(NUdf::TUnboxedValue value, arrow::ArrayBuilder *builder, cons if (keyType->GetKind() == NMiniKQL::TType::EKind::Optional) { YQL_ENSURE(wrapBuilder->field_builder(0)->type()->id() == arrow::Type::LIST, "Unexpected builder type"); - auto listBuilder = reinterpret_cast<arrow::ListBuilder *>(wrapBuilder->field_builder(0)); + auto listBuilder = reinterpret_cast<arrow::ListBuilder*>(wrapBuilder->field_builder(0)); auto status = listBuilder->Append(); YQL_ENSURE(status.ok(), "Failed to append dict value: " << status.ToString()); YQL_ENSURE(listBuilder->value_builder()->type()->id() == arrow::Type::STRUCT, "Unexpected builder type"); - structBuilder = reinterpret_cast<arrow::StructBuilder *>( + structBuilder = reinterpret_cast<arrow::StructBuilder*>( listBuilder->value_builder()); YQL_ENSURE(structBuilder->num_fields() == 2, "Unexpected number of fields"); @@ -656,7 +799,7 @@ void AppendElement(NUdf::TUnboxedValue value, arrow::ArrayBuilder *builder, cons itemBuilder = structBuilder->field_builder(1); } else { YQL_ENSURE(wrapBuilder->field_builder(0)->type()->id() == arrow::Type::MAP, "Unexpected builder type"); - auto mapBuilder = reinterpret_cast<arrow::MapBuilder *>(wrapBuilder->field_builder(0)); + auto mapBuilder = reinterpret_cast<arrow::MapBuilder*>(wrapBuilder->field_builder(0)); auto status = mapBuilder->Append(); YQL_ENSURE(status.ok(), "Failed to append dict value: " << status.ToString()); @@ -665,7 +808,7 @@ void AppendElement(NUdf::TUnboxedValue value, arrow::ArrayBuilder *builder, cons itemBuilder = mapBuilder->item_builder(); } - arrow::UInt64Builder *customBuilder = reinterpret_cast<arrow::UInt64Builder *>(wrapBuilder->field_builder(1)); + arrow::UInt64Builder* customBuilder = reinterpret_cast<arrow::UInt64Builder*>(wrapBuilder->field_builder(1)); status = customBuilder->Append(0); YQL_ENSURE(status.ok(), "Failed to append dict value: " << status.ToString()); @@ -686,19 +829,19 @@ void AppendElement(NUdf::TUnboxedValue value, arrow::ArrayBuilder *builder, cons case NMiniKQL::TType::EKind::Variant: { // TODO Need to properly convert variants containing more than 127*127 // types? - auto variantType = static_cast<const NMiniKQL::TVariantType *>(type); + auto variantType = static_cast<const NMiniKQL::TVariantType*>(type); YQL_ENSURE(builder->type()->id() == arrow::Type::DENSE_UNION, "Unexpected builder type"); - auto unionBuilder = reinterpret_cast<arrow::DenseUnionBuilder *>(builder); + auto unionBuilder = reinterpret_cast<arrow::DenseUnionBuilder*>(builder); ui32 variantIndex = value.GetVariantIndex(); - NMiniKQL::TType *innerType = variantType->GetUnderlyingType(); + NMiniKQL::TType* innerType = variantType->GetUnderlyingType(); if (innerType->IsStruct()) { - innerType = static_cast<NMiniKQL::TStructType *>(innerType)->GetMemberType(variantIndex); + innerType = static_cast<NMiniKQL::TStructType*>(innerType)->GetMemberType(variantIndex); } else { YQL_ENSURE(innerType->IsTuple(), "Unexpected underlying variant type: " << innerType->GetKindAsStr()); - innerType = static_cast<NMiniKQL::TTupleType *>(innerType)->GetElementType(variantIndex); + innerType = static_cast<NMiniKQL::TTupleType*>(innerType)->GetElementType(variantIndex); } if (variantType->GetAlternativesCount() > arrow::UnionType::kMaxTypeCode) { @@ -711,7 +854,7 @@ void AppendElement(NUdf::TUnboxedValue value, arrow::ArrayBuilder *builder, cons auto innerBuilder = unionBuilder->child_builder(groupIndex); YQL_ENSURE(innerBuilder->type()->id() == arrow::Type::DENSE_UNION, "Unexpected builder type"); - auto innerUnionBuilder = reinterpret_cast<arrow::DenseUnionBuilder *>(innerBuilder.get()); + auto innerUnionBuilder = reinterpret_cast<arrow::DenseUnionBuilder*>(innerBuilder.get()); ui32 innerVariantIndex = variantIndex % arrow::UnionType::kMaxTypeCode; status = innerUnionBuilder->Append(innerVariantIndex); @@ -729,8 +872,9 @@ void AppendElement(NUdf::TUnboxedValue value, arrow::ArrayBuilder *builder, cons break; } - default: + default: { YQL_ENSURE(false, "Unsupported type: " << type->GetKindAsStr()); + } } } @@ -739,170 +883,159 @@ namespace NTestUtils { namespace { template <typename TArrowType> -NUdf::TUnboxedValue GetUnboxedValue(std::shared_ptr<arrow::Array> column, ui32 row) { +NUdf::TUnboxedValue GetUnboxedValue(std::shared_ptr<arrow::Array> column, ui32 row, NUdf::EDataSlot dataSlot) { + Y_UNUSED(dataSlot); using TArrayType = typename arrow::TypeTraits<TArrowType>::ArrayType; auto array = std::static_pointer_cast<TArrayType>(column); return NUdf::TUnboxedValuePod(static_cast<typename TArrowType::c_type>(array->Value(row))); } template <> // For darwin build -NUdf::TUnboxedValue GetUnboxedValue<arrow::UInt64Type>(std::shared_ptr<arrow::Array> column, ui32 row) { +NUdf::TUnboxedValue GetUnboxedValue<arrow::UInt64Type>(std::shared_ptr<arrow::Array> column, ui32 row, NUdf::EDataSlot dataSlot) { + Y_UNUSED(dataSlot); auto array = std::static_pointer_cast<arrow::UInt64Array>(column); return NUdf::TUnboxedValuePod(static_cast<ui64>(array->Value(row))); } template <> // For darwin build -NUdf::TUnboxedValue GetUnboxedValue<arrow::Int64Type>(std::shared_ptr<arrow::Array> column, ui32 row) { +NUdf::TUnboxedValue GetUnboxedValue<arrow::Int64Type>(std::shared_ptr<arrow::Array> column, ui32 row, NUdf::EDataSlot dataSlot) { + Y_UNUSED(dataSlot); auto array = std::static_pointer_cast<arrow::Int64Array>(column); return NUdf::TUnboxedValuePod(static_cast<i64>(array->Value(row))); } template <> -NUdf::TUnboxedValue GetUnboxedValue<arrow::StructType>(std::shared_ptr<arrow::Array> column, ui32 row) { +NUdf::TUnboxedValue GetUnboxedValue<arrow::StructType>(std::shared_ptr<arrow::Array> column, ui32 row, NUdf::EDataSlot dataSlot) { auto array = std::static_pointer_cast<arrow::StructArray>(column); YQL_ENSURE(array->num_fields() == 2, "StructArray of some TzDate type should have 2 fields"); auto datetimeArray = array->field(0); - auto timezoneArray = std::static_pointer_cast<arrow::UInt16Array>(array->field(1)); + auto timezoneArray = std::static_pointer_cast<arrow::StringArray>(array->field(1)); NUdf::TUnboxedValuePod value; + auto typeId = datetimeArray->type_id(); - switch (datetimeArray->type()->id()) { - // NUdf::EDataSlot::TzDate - case arrow::Type::UINT16: { + switch (dataSlot) { + case NUdf::EDataSlot::TzDate: { + YQL_ENSURE(typeId == arrow::Type::UINT16); value = NUdf::TUnboxedValuePod(static_cast<ui16>( std::static_pointer_cast<arrow::UInt16Array>(datetimeArray)->Value(row))); break; } - // NUdf::EDataSlot::TzDatetime - case arrow::Type::UINT32: { + + case NUdf::EDataSlot::TzDatetime: { + YQL_ENSURE(typeId == arrow::Type::UINT32); value = NUdf::TUnboxedValuePod(static_cast<ui32>( std::static_pointer_cast<arrow::UInt32Array>(datetimeArray)->Value(row))); break; } - // NUdf::EDataSlot::TzTimestamp - case arrow::Type::UINT64: { + + case NUdf::EDataSlot::TzTimestamp: { + YQL_ENSURE(typeId == arrow::Type::UINT64); value = NUdf::TUnboxedValuePod(static_cast<ui64>( std::static_pointer_cast<arrow::UInt64Array>(datetimeArray)->Value(row))); break; } - // NUdf::EDataSlot::TzDate32 - case arrow::Type::INT32: { + + case NUdf::EDataSlot::TzDate32: { + YQL_ENSURE(typeId == arrow::Type::INT32); value = NUdf::TUnboxedValuePod(static_cast<i32>( std::static_pointer_cast<arrow::Int32Array>(datetimeArray)->Value(row))); break; } - // NUdf::EDataSlot::TzDatetime64, NUdf::EDataSlot::TzTimestamp64 - case arrow::Type::INT64: { + + case NUdf::EDataSlot::TzDatetime64: + case NUdf::EDataSlot::TzTimestamp64: { + YQL_ENSURE(typeId == arrow::Type::INT64); value = NUdf::TUnboxedValuePod(static_cast<i64>( std::static_pointer_cast<arrow::Int64Array>(datetimeArray)->Value(row))); break; } - default: - YQL_ENSURE(false, "Unexpected timezone datetime slot"); + + default: { + YQL_ENSURE(false, "Unexpected timezone datetime data type"); return NUdf::TUnboxedValuePod(); + } } - value.SetTimezoneId(timezoneArray->Value(row)); + auto view = timezoneArray->Value(row); + value.SetTimezoneId(NMiniKQL::GetTimezoneId(NUdf::TStringRef(view.data(), view.size()))); return value; } template <> -NUdf::TUnboxedValue GetUnboxedValue<arrow::BinaryType>(std::shared_ptr<arrow::Array> column, ui32 row) { +NUdf::TUnboxedValue GetUnboxedValue<arrow::BinaryType>(std::shared_ptr<arrow::Array> column, ui32 row, NUdf::EDataSlot dataSlot) { + Y_UNUSED(dataSlot); auto array = std::static_pointer_cast<arrow::BinaryArray>(column); auto data = array->GetView(row); return NMiniKQL::MakeString(NUdf::TStringRef(data.data(), data.size())); } template <> -NUdf::TUnboxedValue GetUnboxedValue<arrow::StringType>(std::shared_ptr<arrow::Array> column, ui32 row) { +NUdf::TUnboxedValue GetUnboxedValue<arrow::StringType>(std::shared_ptr<arrow::Array> column, ui32 row, NUdf::EDataSlot dataSlot) { auto array = std::static_pointer_cast<arrow::StringArray>(column); auto data = array->GetView(row); - return NMiniKQL::MakeString(NUdf::TStringRef(data.data(), data.size())); -} - -template <> -NUdf::TUnboxedValue GetUnboxedValue<arrow::FixedSizeBinaryType>(std::shared_ptr<arrow::Array> column, ui32 row) { - auto array = std::static_pointer_cast<arrow::FixedSizeBinaryArray>(column); - auto data = array->GetView(row); - return NMiniKQL::MakeString(NUdf::TStringRef(data.data(), data.size())); -} -} // namespace - -bool IsArrowCompatible(const NKikimr::NMiniKQL::TType *type) { - switch (type->GetKind()) { - case NMiniKQL::TType::EKind::Void: - case NMiniKQL::TType::EKind::Null: - case NMiniKQL::TType::EKind::EmptyList: - case NMiniKQL::TType::EKind::EmptyDict: - case NMiniKQL::TType::EKind::Data: - return true; + switch (dataSlot) { + case NUdf::EDataSlot::Utf8: + case NUdf::EDataSlot::Json: { + return NMiniKQL::MakeString(NUdf::TStringRef(data.data(), data.size())); + } - case NMiniKQL::TType::EKind::Struct: { - auto structType = static_cast<const NMiniKQL::TStructType *>(type); - bool isCompatible = true; - for (ui32 index = 0; index < structType->GetMembersCount(); ++index) { - auto memberType = structType->GetMemberType(index); - isCompatible = isCompatible && IsArrowCompatible(memberType); + case NUdf::EDataSlot::JsonDocument: { + auto variant = NBinaryJson::SerializeToBinaryJson(TStringBuf(data.data(), data.size())); + if (std::holds_alternative<NBinaryJson::TBinaryJson>(variant)) { + const auto& json = std::get<NBinaryJson::TBinaryJson>(variant); + return NMiniKQL::MakeString(NUdf::TStringRef(json.Data(), json.Size())); } - return isCompatible; + + YQL_ENSURE(false, "Cannot serialize to binary json"); + break; } - case NMiniKQL::TType::EKind::Tuple: { - auto tupleType = static_cast<const NMiniKQL::TTupleType *>(type); - bool isCompatible = true; - for (ui32 index = 0; index < tupleType->GetElementsCount(); ++index) { - auto elementType = tupleType->GetElementType(index); - isCompatible = isCompatible && IsArrowCompatible(elementType); + case NUdf::EDataSlot::DyNumber: { + auto number = NDyNumber::ParseDyNumberString(TStringBuf(data.data(), data.size())); + if (number.Defined()) { + return NMiniKQL::MakeString(*number); } - return isCompatible; + + YQL_ENSURE(false, "Failed to convert string to DyNumber"); + break; } - case NMiniKQL::TType::EKind::Optional: { - auto optionalType = static_cast<const NMiniKQL::TOptionalType *>(type); - auto innerOptionalType = optionalType->GetItemType(); - if (NeedWrapByExternalOptional(innerOptionalType)) { - return false; - } - return IsArrowCompatible(innerOptionalType); + default: { + YQL_ENSURE(false, "Unexpected data slot"); } + } + return NUdf::TUnboxedValuePod(); +} - case NMiniKQL::TType::EKind::List: { - auto listType = static_cast<const NMiniKQL::TListType *>(type); - auto itemType = listType->GetItemType(); - return IsArrowCompatible(itemType); +template <> +NUdf::TUnboxedValue GetUnboxedValue<arrow::FixedSizeBinaryType>(std::shared_ptr<arrow::Array> column, ui32 row, NUdf::EDataSlot dataSlot) { + auto array = std::static_pointer_cast<arrow::FixedSizeBinaryArray>(column); + auto data = array->GetView(row); + + switch (dataSlot) { + case NUdf::EDataSlot::Uuid: { + return NMiniKQL::MakeString(NUdf::TStringRef(data.data(), data.size())); } - case NMiniKQL::TType::EKind::Variant: { - auto variantType = static_cast<const NMiniKQL::TVariantType *>(type); - if (variantType->GetAlternativesCount() > arrow::UnionType::kMaxTypeCode) { - return false; - } - NMiniKQL::TType *innerType = variantType->GetUnderlyingType(); - YQL_ENSURE(innerType->IsTuple() || innerType->IsStruct(), "Unexpected underlying variant type: " << innerType->GetKindAsStr()); - return IsArrowCompatible(innerType); + case NUdf::EDataSlot::Decimal: { + NYql::NDecimal::TInt128 value; + std::memcpy(&value, data.data(), data.size()); + return NUdf::TUnboxedValuePod(value); } - case NMiniKQL::TType::EKind::Dict: - case NMiniKQL::TType::EKind::Block: - case NMiniKQL::TType::EKind::Type: - case NMiniKQL::TType::EKind::Stream: - case NMiniKQL::TType::EKind::Callable: - case NMiniKQL::TType::EKind::Any: - case NMiniKQL::TType::EKind::Resource: - case NMiniKQL::TType::EKind::ReservedKind: - case NMiniKQL::TType::EKind::Flow: - case NMiniKQL::TType::EKind::Tagged: - case NMiniKQL::TType::EKind::Pg: - case NMiniKQL::TType::EKind::Multi: - case NMiniKQL::TType::EKind::Linear: - return false; + default: { + YQL_ENSURE(false, "Unexpected data slot"); + } } - return false; + return NUdf::TUnboxedValuePod(); } -std::unique_ptr<arrow::ArrayBuilder> MakeArrowBuilder(const NMiniKQL::TType *type) { +} // namespace + +std::unique_ptr<arrow::ArrayBuilder> MakeArrowBuilder(const NMiniKQL::TType* type) { auto arrayType = GetArrowType(type); std::unique_ptr<arrow::ArrayBuilder> builder; auto status = arrow::MakeBuilder(arrow::default_memory_pool(), arrayType, &builder); @@ -910,11 +1043,11 @@ std::unique_ptr<arrow::ArrayBuilder> MakeArrowBuilder(const NMiniKQL::TType *typ return builder; } -std::shared_ptr<arrow::Array> MakeArray(NMiniKQL::TUnboxedValueVector &values, const NMiniKQL::TType *itemType) { +std::shared_ptr<arrow::Array> MakeArray(NMiniKQL::TUnboxedValueVector& values, const NMiniKQL::TType* itemType) { auto builder = MakeArrowBuilder(itemType); auto status = builder->Reserve(values.size()); YQL_ENSURE(status.ok(), "Failed to reserve space for array: " << status.ToString()); - for (auto &value : values) { + for (auto& value : values) { AppendElement(value, builder.get(), itemType); } std::shared_ptr<arrow::Array> result; @@ -923,8 +1056,8 @@ std::shared_ptr<arrow::Array> MakeArray(NMiniKQL::TUnboxedValueVector &values, c return result; } -NUdf::TUnboxedValue ExtractUnboxedValue(const std::shared_ptr<arrow::Array> &array, ui64 row, const NMiniKQL::TType *itemType, - const NMiniKQL::THolderFactory &holderFactory) +NUdf::TUnboxedValue ExtractUnboxedValue(const std::shared_ptr<arrow::Array>& array, ui64 row, const NMiniKQL::TType* itemType, + const NMiniKQL::THolderFactory& holderFactory) { if (array->IsNull(row)) { return NUdf::TUnboxedValuePod(); @@ -934,30 +1067,32 @@ NUdf::TUnboxedValue ExtractUnboxedValue(const std::shared_ptr<arrow::Array> &arr case NMiniKQL::TType::EKind::Void: case NMiniKQL::TType::EKind::Null: case NMiniKQL::TType::EKind::EmptyList: - case NMiniKQL::TType::EKind::EmptyDict: + case NMiniKQL::TType::EKind::EmptyDict: { break; + } case NMiniKQL::TType::EKind::Data: { - auto dataType = static_cast<const NMiniKQL::TDataType *>(itemType); + auto dataType = static_cast<const NMiniKQL::TDataType*>(itemType); NUdf::TUnboxedValue result; - bool success = SwitchMiniKQLDataTypeToArrowType(*dataType->GetDataSlot().Get(), + auto dataSlot = *dataType->GetDataSlot().Get(); + bool success = SwitchMiniKQLDataTypeToArrowType(dataSlot, [&]<typename TType>(TTypeWrapper<TType> typeHolder) { Y_UNUSED(typeHolder); - result = GetUnboxedValue<TType>(array, row); + result = GetUnboxedValue<TType>(array, row, dataSlot); return true; }); - Y_ENSURE(success, "Failed to extract unboxed value from arrow array"); + YQL_ENSURE(success, "Failed to extract unboxed value from arrow array"); return result; } case NMiniKQL::TType::EKind::Struct: { - auto structType = static_cast<const NMiniKQL::TStructType *>(itemType); + auto structType = static_cast<const NMiniKQL::TStructType*>(itemType); YQL_ENSURE(array->type_id() == arrow::Type::STRUCT, "Unexpected array type"); auto typedArray = static_pointer_cast<arrow::StructArray>(array); YQL_ENSURE(static_cast<ui32>(typedArray->num_fields()) == structType->GetMembersCount(), "Unexpected count of fields"); - NUdf::TUnboxedValue *itemsPtr = nullptr; + NUdf::TUnboxedValue* itemsPtr = nullptr; auto result = holderFactory.CreateDirectArrayHolder(structType->GetMembersCount(), itemsPtr); for (ui32 index = 0; index < structType->GetMembersCount(); ++index) { @@ -968,13 +1103,13 @@ NUdf::TUnboxedValue ExtractUnboxedValue(const std::shared_ptr<arrow::Array> &arr } case NMiniKQL::TType::EKind::Tuple: { - auto tupleType = static_cast<const NMiniKQL::TTupleType *>(itemType); + auto tupleType = static_cast<const NMiniKQL::TTupleType*>(itemType); YQL_ENSURE(array->type_id() == arrow::Type::STRUCT, "Unexpected array type"); auto typedArray = static_pointer_cast<arrow::StructArray>(array); YQL_ENSURE(static_cast<ui32>(typedArray->num_fields()) == tupleType->GetElementsCount(), "Unexpected count of fields"); - NUdf::TUnboxedValue *itemsPtr = nullptr; + NUdf::TUnboxedValue* itemsPtr = nullptr; auto result = holderFactory.CreateDirectArrayHolder(tupleType->GetElementsCount(), itemsPtr); for (ui32 index = 0; index < tupleType->GetElementsCount(); ++index) { @@ -985,7 +1120,7 @@ NUdf::TUnboxedValue ExtractUnboxedValue(const std::shared_ptr<arrow::Array> &arr } case NMiniKQL::TType::EKind::Optional: { - auto optionalType = static_cast<const NMiniKQL::TOptionalType *>(itemType); + auto optionalType = static_cast<const NMiniKQL::TOptionalType*>(itemType); auto innerOptionalType = optionalType->GetItemType(); if (NeedWrapByExternalOptional(innerOptionalType)) { @@ -1006,7 +1141,7 @@ NUdf::TUnboxedValue ExtractUnboxedValue(const std::shared_ptr<arrow::Array> &arr break; } - innerType = static_cast<const NMiniKQL::TOptionalType *>(innerType)->GetItemType(); + innerType = static_cast<const NMiniKQL::TOptionalType*>(innerType)->GetItemType(); innerArray = structArray->field(0); ++depth; } @@ -1029,7 +1164,7 @@ NUdf::TUnboxedValue ExtractUnboxedValue(const std::shared_ptr<arrow::Array> &arr } case NMiniKQL::TType::EKind::List: { - auto listType = static_cast<const NMiniKQL::TListType *>(itemType); + auto listType = static_cast<const NMiniKQL::TListType*>(itemType); YQL_ENSURE(array->type_id() == arrow::Type::LIST, "Unexpected array type"); auto typedArray = static_pointer_cast<arrow::ListArray>(array); @@ -1038,7 +1173,7 @@ NUdf::TUnboxedValue ExtractUnboxedValue(const std::shared_ptr<arrow::Array> &arr auto itemType = listType->GetItemType(); const auto len = arraySlice->length(); - NUdf::TUnboxedValue *items = nullptr; + NUdf::TUnboxedValue* items = nullptr; auto list = holderFactory.CreateDirectArrayHolder(len, items); for (ui64 i = 0; i < static_cast<ui64>(len); ++i) { *items++ = ExtractUnboxedValue(arraySlice, i, itemType, holderFactory); @@ -1047,7 +1182,7 @@ NUdf::TUnboxedValue ExtractUnboxedValue(const std::shared_ptr<arrow::Array> &arr } case NMiniKQL::TType::EKind::Dict: { - auto dictType = static_cast<const NMiniKQL::TDictType *>(itemType); + auto dictType = static_cast<const NMiniKQL::TDictType*>(itemType); auto keyType = dictType->GetKeyType(); auto payloadType = dictType->GetPayloadType(); @@ -1097,7 +1232,7 @@ NUdf::TUnboxedValue ExtractUnboxedValue(const std::shared_ptr<arrow::Array> &arr case NMiniKQL::TType::EKind::Variant: { // TODO Need to properly convert variants containing more than 127*127 // types? - auto variantType = static_cast<const NMiniKQL::TVariantType *>(itemType); + auto variantType = static_cast<const NMiniKQL::TVariantType*>(itemType); YQL_ENSURE(array->type_id() == arrow::Type::DENSE_UNION, "Unexpected array type"); auto unionArray = static_pointer_cast<arrow::DenseUnionArray>(array); @@ -1117,25 +1252,26 @@ NUdf::TUnboxedValue ExtractUnboxedValue(const std::shared_ptr<arrow::Array> &arr variantIndex =variantIndex * arrow::UnionType::kMaxTypeCode + innerVariantIndex; } - NMiniKQL::TType *innerType = variantType->GetUnderlyingType(); + NMiniKQL::TType* innerType = variantType->GetUnderlyingType(); if (innerType->IsStruct()) { - innerType =static_cast<NMiniKQL::TStructType *>(innerType)->GetMemberType(variantIndex); + innerType =static_cast<NMiniKQL::TStructType*>(innerType)->GetMemberType(variantIndex); } else { YQL_ENSURE(innerType->IsTuple(), "Unexpected underlying variant type: " << innerType->GetKindAsStr()); - innerType = static_cast<NMiniKQL::TTupleType *>(innerType)->GetElementType(variantIndex); + innerType = static_cast<NMiniKQL::TTupleType*>(innerType)->GetElementType(variantIndex); } NUdf::TUnboxedValue value = ExtractUnboxedValue(valuesArray, rowInChild, innerType, holderFactory); return holderFactory.CreateVariantHolder(value.Release(), variantIndex); } - default: + default: { YQL_ENSURE(false, "Unsupported type: " << itemType->GetKindAsStr()); + } } return NUdf::TUnboxedValuePod(); } -NMiniKQL::TUnboxedValueVector ExtractUnboxedValues(const std::shared_ptr<arrow::Array> &array, const NMiniKQL::TType *itemType, - const NMiniKQL::THolderFactory &holderFactory) +NMiniKQL::TUnboxedValueVector ExtractUnboxedValues(const std::shared_ptr<arrow::Array>& array, const NMiniKQL::TType* itemType, + const NMiniKQL::THolderFactory& holderFactory) { NMiniKQL::TUnboxedValueVector values; values.reserve(array->length()); diff --git a/ydb/core/kqp/common/result_set_format/kqp_result_set_arrow.h b/ydb/core/kqp/common/result_set_format/kqp_result_set_arrow.h index e5f1b7decb2..0c3c62a0e2d 100644 --- a/ydb/core/kqp/common/result_set_format/kqp_result_set_arrow.h +++ b/ydb/core/kqp/common/result_set_format/kqp_result_set_arrow.h @@ -68,23 +68,23 @@ namespace NKikimr::NKqp::NFormats { * @return std::shared_ptr<arrow::DataType> arrow type of the same structure as * type */ -std::shared_ptr<arrow::DataType> GetArrowType(const NMiniKQL::TType *type); +std::shared_ptr<arrow::DataType> GetArrowType(const NMiniKQL::TType* type); -void AppendElement(NUdf::TUnboxedValue value, arrow::ArrayBuilder *builder, const NMiniKQL::TType *type); +bool IsArrowCompatible(const NMiniKQL::TType* type); -namespace NTestUtils { +void AppendElement(NUdf::TUnboxedValue value, arrow::ArrayBuilder* builder, const NMiniKQL::TType* type); -bool IsArrowCompatible(const NMiniKQL::TType *type); +namespace NTestUtils { -std::unique_ptr<arrow::ArrayBuilder> MakeArrowBuilder(const NMiniKQL::TType *type); +std::unique_ptr<arrow::ArrayBuilder> MakeArrowBuilder(const NMiniKQL::TType* type); -std::shared_ptr<arrow::Array> MakeArray(NMiniKQL::TUnboxedValueVector &values, const NMiniKQL::TType *itemType); +std::shared_ptr<arrow::Array> MakeArray(NMiniKQL::TUnboxedValueVector& values, const NMiniKQL::TType* itemType); -NUdf::TUnboxedValue ExtractUnboxedValue(const std::shared_ptr<arrow::Array> &array, ui64 row, - const NMiniKQL::TType *itemType, const NMiniKQL::THolderFactory &holderFactory); +NUdf::TUnboxedValue ExtractUnboxedValue(const std::shared_ptr<arrow::Array>& array, ui64 row, + const NMiniKQL::TType* itemType, const NMiniKQL::THolderFactory& holderFactory); -NMiniKQL::TUnboxedValueVector ExtractUnboxedValues(const std::shared_ptr<arrow::Array> &array, - const NMiniKQL::TType *itemType, const NMiniKQL::THolderFactory &holderFactory); +NMiniKQL::TUnboxedValueVector ExtractUnboxedValues(const std::shared_ptr<arrow::Array>& array, + const NMiniKQL::TType* itemType, const NMiniKQL::THolderFactory& holderFactory); } // namespace NTestUtils diff --git a/ydb/core/kqp/common/result_set_format/kqp_result_set_builders.cpp b/ydb/core/kqp/common/result_set_format/kqp_result_set_builders.cpp index 8e8cfbad054..e67360a0f4e 100644 --- a/ydb/core/kqp/common/result_set_format/kqp_result_set_builders.cpp +++ b/ydb/core/kqp/common/result_set_format/kqp_result_set_builders.cpp @@ -12,23 +12,23 @@ namespace NKikimr::NKqp::NFormats { -using TArrowSchemaColumns = std::vector<std::pair<TString, NMiniKQL::TType *>>; +using TArrowSchemaColumns = std::vector<std::pair<TString, NMiniKQL::TType*>>; using TArrowNotNullColumns = std::set<std::string>; using TArrowSchema = std::pair<TArrowSchemaColumns, TArrowNotNullColumns>; namespace { -TArrowSchema GetArrowSchema(const NMiniKQL::TType *mkqlItemType, const TVector<ui32> *columnOrder, const TVector<TString> *columnHints) { +TArrowSchema GetArrowSchema(const NMiniKQL::TType* mkqlItemType, const TVector<ui32>* columnOrder, const TVector<TString>* columnHints) { TArrowSchemaColumns arrowSchemaColumns; TArrowNotNullColumns arrowNotNullColumns; - const auto *mkqlSrcRowStructType = static_cast<const NMiniKQL::TStructType *>(mkqlItemType); + const auto* mkqlSrcRowStructType = static_cast<const NMiniKQL::TStructType*>(mkqlItemType); for (ui32 idx = 0; idx < mkqlSrcRowStructType->GetMembersCount(); ++idx) { ui32 memberIndex = (!columnOrder || columnOrder->empty()) ? idx : (*columnOrder)[idx]; auto columnName = columnHints && columnHints->size() ? columnHints->at(idx) : TString(mkqlSrcRowStructType->GetMemberName(memberIndex)); - auto *columnType = mkqlSrcRowStructType->GetMemberType(memberIndex); + auto* columnType = mkqlSrcRowStructType->GetMemberType(memberIndex); if (columnType->GetKind() != NMiniKQL::TType::EKind::Optional) { arrowNotNullColumns.insert(columnName); @@ -40,8 +40,8 @@ TArrowSchema GetArrowSchema(const NMiniKQL::TType *mkqlItemType, const TVector<u return {arrowSchemaColumns, arrowNotNullColumns}; } -std::shared_ptr<arrow::RecordBatch> BuildArrowFromUnboxedValue(Ydb::ResultSet *ydbResult, const NMiniKQL::TUnboxedValueBatch &rows, - const NMiniKQL::TType *mkqlItemType, const TVector<ui32> *columnOrder, const TVector<TString> *columnHints, TMaybe<ui64> rowsLimitPerWrite) +std::shared_ptr<arrow::RecordBatch> BuildArrowFromUnboxedValue(Ydb::ResultSet* ydbResult, const NMiniKQL::TUnboxedValueBatch& rows, + const NMiniKQL::TType* mkqlItemType, const TVector<ui32>* columnOrder, const TVector<TString>* columnHints, TMaybe<ui64> rowsLimitPerWrite) { auto [arrowSchemaColumns, arrowNotNullColumns] = GetArrowSchema(mkqlItemType, columnOrder, columnHints); @@ -49,7 +49,7 @@ std::shared_ptr<arrow::RecordBatch> BuildArrowFromUnboxedValue(Ydb::ResultSet *y batchBuilder.Reserve(rows.RowCount()); YQL_ENSURE(batchBuilder.Start(arrowSchemaColumns).ok()); - rows.ForEachRow([&](const NUdf::TUnboxedValue &row) -> bool { + rows.ForEachRow([&](const NUdf::TUnboxedValue& row) -> bool { if (rowsLimitPerWrite) { if (*rowsLimitPerWrite == 0) { ydbResult->set_truncated(true); @@ -64,14 +64,14 @@ std::shared_ptr<arrow::RecordBatch> BuildArrowFromUnboxedValue(Ydb::ResultSet *y return batchBuilder.FlushBatch(false, /* flushEmpty */ true); } -std::shared_ptr<arrow::RecordBatch> BuildArrowFromSerializedBatches(const NYql::NDq::TDqDataSerializer &dataSerializer, - TVector<NYql::NDq::TDqSerializedBatch> &&data, const NMiniKQL::TType *mkqlItemType, const TVector<ui32> *columnOrder, - const TVector<TString> *columnHints) +std::shared_ptr<arrow::RecordBatch> BuildArrowFromSerializedBatches(const NYql::NDq::TDqDataSerializer& dataSerializer, + TVector<NYql::NDq::TDqSerializedBatch>&& data, const NMiniKQL::TType* mkqlItemType, const TVector<ui32>* columnOrder, + const TVector<TString>* columnHints) { auto [arrowSchemaColumns, arrowNotNullColumns] = GetArrowSchema(mkqlItemType, columnOrder, columnHints); ui32 rowsCount = 0; - for (const auto &part : data) { + for (const auto& part : data) { if (part.ChunkCount()) { rowsCount += part.RowCount(); } @@ -81,7 +81,7 @@ std::shared_ptr<arrow::RecordBatch> BuildArrowFromSerializedBatches(const NYql:: batchBuilder.Reserve(rowsCount); YQL_ENSURE(batchBuilder.Start(arrowSchemaColumns).ok()); - for (auto &part : data) { + for (auto& part : data) { if (!part.ChunkCount()) { continue; } @@ -89,7 +89,7 @@ std::shared_ptr<arrow::RecordBatch> BuildArrowFromSerializedBatches(const NYql:: NMiniKQL::TUnboxedValueBatch rows(mkqlItemType); dataSerializer.Deserialize(std::move(part), mkqlItemType, rows); - rows.ForEachRow([&](const NUdf::TUnboxedValue &value) { + rows.ForEachRow([&](const NUdf::TUnboxedValue& value) { batchBuilder.AddRow(value, arrowSchemaColumns.size(), columnOrder); }); } @@ -97,26 +97,26 @@ std::shared_ptr<arrow::RecordBatch> BuildArrowFromSerializedBatches(const NYql:: return batchBuilder.FlushBatch(false, /* flushEmpty */ true); } -void FillValueSchema(Ydb::ResultSet *ydbResult, const NMiniKQL::TType *mkqlItemType, const TVector<ui32> *columnOrder, - const TVector<TString> *columnHints) +void FillValueSchema(Ydb::ResultSet* ydbResult, const NMiniKQL::TType* mkqlItemType, const TVector<ui32>* columnOrder, + const TVector<TString>* columnHints) { - const auto *mkqlSrcRowStructType = static_cast<const NMiniKQL::TStructType *>(mkqlItemType); + const auto* mkqlSrcRowStructType = static_cast<const NMiniKQL::TStructType* >(mkqlItemType); for (ui32 idx = 0; idx < mkqlSrcRowStructType->GetMembersCount(); ++idx) { - auto *column = ydbResult->add_columns(); + auto* column = ydbResult->add_columns(); ui32 memberIndex = (!columnOrder || columnOrder->empty()) ? idx : (*columnOrder)[idx]; auto columnName = TString(columnHints && columnHints->size() ? columnHints->at(idx) : mkqlSrcRowStructType->GetMemberName(memberIndex)); - auto *columnType = mkqlSrcRowStructType->GetMemberType(memberIndex); + auto* columnType = mkqlSrcRowStructType->GetMemberType(memberIndex); column->set_name(columnName); - ExportTypeToProto(columnType, *column->mutable_type()); + ExportTypeToProto(columnType,*column->mutable_type()); } } -void FillValueResultSet(Ydb::ResultSet *ydbResult, const NMiniKQL::TUnboxedValueBatch &rows, NMiniKQL::TType *mkqlItemType, bool fillSchema, - const TVector<ui32> *columnOrder, const TVector<TString> *columnHints, TMaybe<ui64> rowsLimitPerWrite) +void FillValueResultSet(Ydb::ResultSet* ydbResult, const NMiniKQL::TUnboxedValueBatch& rows, NMiniKQL::TType* mkqlItemType, bool fillSchema, + const TVector<ui32>* columnOrder, const TVector<TString>* columnHints, TMaybe<ui64> rowsLimitPerWrite) { ydbResult->set_format(Ydb::ResultSet::FORMAT_VALUE); @@ -124,7 +124,7 @@ void FillValueResultSet(Ydb::ResultSet *ydbResult, const NMiniKQL::TUnboxedValue FillValueSchema(ydbResult, mkqlItemType, columnOrder, columnHints); } - rows.ForEachRow([&](const NUdf::TUnboxedValue &value) -> bool { + rows.ForEachRow([&](const NUdf::TUnboxedValue& value) -> bool { if (rowsLimitPerWrite) { if (*rowsLimitPerWrite == 0) { ydbResult->set_truncated(true); @@ -137,9 +137,9 @@ void FillValueResultSet(Ydb::ResultSet *ydbResult, const NMiniKQL::TUnboxedValue }); } -void FillValueResultSet(Ydb::ResultSet *ydbResult, const NYql::NDq::TDqDataSerializer &dataSerializer, - TVector<NYql::NDq::TDqSerializedBatch> &&data, NMiniKQL::TType *mkqlItemType, bool fillSchema, const TVector<ui32> *columnOrder, - const TVector<TString> *columnHints) +void FillValueResultSet(Ydb::ResultSet* ydbResult, const NYql::NDq::TDqDataSerializer& dataSerializer, + TVector<NYql::NDq::TDqSerializedBatch>&& data, NMiniKQL::TType* mkqlItemType, bool fillSchema, const TVector<ui32>* columnOrder, + const TVector<TString>* columnHints) { ydbResult->set_format(Ydb::ResultSet::FORMAT_VALUE); @@ -147,7 +147,7 @@ void FillValueResultSet(Ydb::ResultSet *ydbResult, const NYql::NDq::TDqDataSeria FillValueSchema(ydbResult, mkqlItemType, columnOrder, columnHints); } - for (auto &part : data) { + for (auto& part : data) { if (!part.ChunkCount()) { continue; } @@ -155,14 +155,14 @@ void FillValueResultSet(Ydb::ResultSet *ydbResult, const NYql::NDq::TDqDataSeria NMiniKQL::TUnboxedValueBatch rows(mkqlItemType); dataSerializer.Deserialize(std::move(part), mkqlItemType, rows); - rows.ForEachRow([&](const NUdf::TUnboxedValue &value) { + rows.ForEachRow([&](const NUdf::TUnboxedValue& value) { ExportValueToProto(mkqlItemType, value, *ydbResult->add_rows(), columnOrder); }); } } -void FillArrowResultSet(Ydb::ResultSet *ydbResult, std::shared_ptr<arrow::RecordBatch> batch, const NFormats::TFormatsSettings &settings, - const NMiniKQL::TType *mkqlItemType, bool fillSchema, const TVector<ui32> *columnOrder, const TVector<TString> *columnHints) +void FillArrowResultSet(Ydb::ResultSet* ydbResult, std::shared_ptr<arrow::RecordBatch> batch, const NFormats::TFormatsSettings& settings, + const NMiniKQL::TType* mkqlItemType, bool fillSchema, const TVector<ui32>* columnOrder, const TVector<TString>* columnHints) { ydbResult->set_format(Ydb::ResultSet::FORMAT_ARROW); @@ -188,9 +188,9 @@ void FillArrowResultSet(Ydb::ResultSet *ydbResult, std::shared_ptr<arrow::Record } // namespace -void BuildResultSetFromRows(Ydb::ResultSet *ydbResult, const NFormats::TFormatsSettings &settings, bool fillSchema, - NMiniKQL::TType *mkqlItemType, const NMiniKQL::TUnboxedValueBatch &rows, const TVector<ui32> *columnOrder, - const TVector<TString> *columnHints,TMaybe<ui64> rowsLimitPerWrite) +void BuildResultSetFromRows(Ydb::ResultSet* ydbResult, const NFormats::TFormatsSettings& settings, bool fillSchema, + NMiniKQL::TType* mkqlItemType, const NMiniKQL::TUnboxedValueBatch& rows, const TVector<ui32>* columnOrder, + const TVector<TString>* columnHints,TMaybe<ui64> rowsLimitPerWrite) { YQL_ENSURE(ydbResult); YQL_ENSURE(!rows.IsWide()); @@ -208,9 +208,9 @@ void BuildResultSetFromRows(Ydb::ResultSet *ydbResult, const NFormats::TFormatsS YQL_ENSURE(false, "Unknown output format"); } -void BuildResultSetFromBatches(Ydb::ResultSet *ydbResult, const NFormats::TFormatsSettings &settings, bool fillSchema, - NMiniKQL::TType *mkqlItemType, const NYql::NDq::TDqDataSerializer &dataSerializer, TVector<NYql::NDq::TDqSerializedBatch> &&data, - const TVector<ui32> *columnOrder, const TVector<TString> *columnHints) +void BuildResultSetFromBatches(Ydb::ResultSet* ydbResult, const NFormats::TFormatsSettings& settings, bool fillSchema, + NMiniKQL::TType* mkqlItemType, const NYql::NDq::TDqDataSerializer& dataSerializer, TVector<NYql::NDq::TDqSerializedBatch>&& data, + const TVector<ui32>* columnOrder, const TVector<TString>* columnHints) { YQL_ENSURE(ydbResult); YQL_ENSURE(mkqlItemType && mkqlItemType->GetKind() == NMiniKQL::TType::EKind::Struct); diff --git a/ydb/core/kqp/common/result_set_format/kqp_result_set_builders.h b/ydb/core/kqp/common/result_set_format/kqp_result_set_builders.h index 7879ed0cf79..fe9e0c5fb1f 100644 --- a/ydb/core/kqp/common/result_set_format/kqp_result_set_builders.h +++ b/ydb/core/kqp/common/result_set_format/kqp_result_set_builders.h @@ -9,12 +9,12 @@ namespace NKikimr::NKqp::NFormats { /** * High-level helpers to build YDB result sets for any selected format */ -void BuildResultSetFromRows(Ydb::ResultSet *ydbResult, const NFormats::TFormatsSettings &settings, bool fillSchema, - NMiniKQL::TType *mkqlItemType, const NMiniKQL::TUnboxedValueBatch &rows, const TVector<ui32> *columnOrder, - const TVector<TString> *columnHints, TMaybe<ui64> rowsLimitPerWrite); +void BuildResultSetFromRows(Ydb::ResultSet* ydbResult, const NFormats::TFormatsSettings& settings, bool fillSchema, + NMiniKQL::TType* mkqlItemType, const NMiniKQL::TUnboxedValueBatch& rows, const TVector<ui32>* columnOrder, + const TVector<TString>* columnHints, TMaybe<ui64> rowsLimitPerWrite); -void BuildResultSetFromBatches(Ydb::ResultSet *ydbResult, const NFormats::TFormatsSettings &settings, bool fillSchema, - NMiniKQL::TType *mkqlItemType, const NYql::NDq::TDqDataSerializer &dataSerializer, TVector<NYql::NDq::TDqSerializedBatch> &&data, - const TVector<ui32> *columnOrder, const TVector<TString> *columnHints); +void BuildResultSetFromBatches(Ydb::ResultSet* ydbResult, const NFormats::TFormatsSettings& settings, bool fillSchema, + NMiniKQL::TType* mkqlItemType, const NYql::NDq::TDqDataSerializer& dataSerializer, TVector<NYql::NDq::TDqSerializedBatch>&& data, + const TVector<ui32>* columnOrder, const TVector<TString>* columnHints); } // namespace NKikimr::NKqp::NFormats diff --git a/ydb/core/kqp/common/result_set_format/ut/kqp_result_set_arrow_ut.cpp b/ydb/core/kqp/common/result_set_format/ut/kqp_result_set_arrow_ut.cpp index 1c0e3beb8ca..2339e1fdd98 100644 --- a/ydb/core/kqp/common/result_set_format/ut/kqp_result_set_arrow_ut.cpp +++ b/ydb/core/kqp/common/result_set_format/ut/kqp_result_set_arrow_ut.cpp @@ -8,54 +8,83 @@ #include <ydb/library/formats/arrow/simple_builder/filler.h> #include <ydb/library/yverify_stream/yverify_stream.h> +#include <ydb/public/lib/scheme_types/scheme_type_id.h> #include <yql/essentials/minikql/computation/mkql_block_reader.h> #include <yql/essentials/minikql/computation/mkql_value_builder.h> #include <yql/essentials/minikql/mkql_string_util.h> #include <yql/essentials/public/udf/arrow/defs.h> +#include <yql/essentials/types/binary_json/read.h> +#include <yql/essentials/types/binary_json/write.h> +#include <yql/essentials/types/dynumber/dynumber.h> + +#include <library/cpp/type_info/tz/tz.h> using namespace NKikimr::NMiniKQL; using namespace NKikimr::NArrow; using namespace NYql; +inline static constexpr size_t TEST_ARRAY_SIZE = 1 << 16; +inline static constexpr ui8 DECIMAL_PRECISION = 35; +inline static constexpr ui8 DECIMAL_SCALE = 10; + +static_assert(DECIMAL_PRECISION >= DECIMAL_SCALE, "Decimal precision must be greater than or equal to scale"); + namespace { +ui16 GetTimezoneIdSkipEmpty(ui16 index) { + auto size = NTi::GetTimezones().size(); + while (NTi::GetTimezones()[index % size].empty()) { + index = (index + 1) % size; + } + return GetTimezoneId(NTi::GetTimezones()[index % size]); +} + +std::string SerializeToBinaryJson(const TStringBuf json) { + auto variant = NKikimr::NBinaryJson::SerializeToBinaryJson(json); + if (std::holds_alternative<NKikimr::NBinaryJson::TBinaryJson>(variant)) { + const auto binaryJson = std::get<NKikimr::NBinaryJson::TBinaryJson>(variant); + return std::string(binaryJson.Data(), binaryJson.Size()); + } + UNIT_ASSERT_C(false, "Cannot serialize binary json"); + return {}; +} + NUdf::TUnboxedValue GetValueOfBasicType(TType* type, ui64 value) { Y_ABORT_UNLESS(type->GetKind() == TType::EKind::Data); auto dataType = static_cast<const TDataType*>(type); auto slot = *dataType->GetDataSlot().Get(); - switch(slot) { + switch (slot) { case NUdf::EDataSlot::Bool: return NUdf::TUnboxedValuePod(static_cast<bool>(value % 2 == 0)); case NUdf::EDataSlot::Int8: - return NUdf::TUnboxedValuePod(static_cast<i8>(-(value % 126))); + return NUdf::TUnboxedValuePod(static_cast<i8>(-(value % ((1 << 7) - 1)))); case NUdf::EDataSlot::Uint8: - return NUdf::TUnboxedValuePod(static_cast<ui8>(value % 255)); + return NUdf::TUnboxedValuePod(static_cast<ui8>(value % ((1 << 8)))); case NUdf::EDataSlot::Int16: return NUdf::TUnboxedValuePod(static_cast<i16>(-(value % ((1 << 15) - 1)))); case NUdf::EDataSlot::Uint16: - return NUdf::TUnboxedValuePod(static_cast<ui16>(value % (1 << 16))); + return NUdf::TUnboxedValuePod(static_cast<ui16>(value % (1 << 15))); case NUdf::EDataSlot::Int32: return NUdf::TUnboxedValuePod(static_cast<i32>(-(value % ((1ULL << 31) - 1)))); case NUdf::EDataSlot::Uint32: - return NUdf::TUnboxedValuePod(static_cast<ui32>(value % (1 << 31))); + return NUdf::TUnboxedValuePod(static_cast<ui32>(value % (1ULL << 31))); case NUdf::EDataSlot::Int64: - return NUdf::TUnboxedValuePod(static_cast<i64>(- (value / 2))); + return NUdf::TUnboxedValuePod(static_cast<i64>(-(value % ((1ULL << 63) - 1)))); case NUdf::EDataSlot::Uint64: - return NUdf::TUnboxedValuePod(static_cast<ui64>(value)); + return NUdf::TUnboxedValuePod(static_cast<ui64>(value % (1ULL << 63))); case NUdf::EDataSlot::Float: return NUdf::TUnboxedValuePod(static_cast<float>(value) / 1234); case NUdf::EDataSlot::Double: return NUdf::TUnboxedValuePod(static_cast<double>(value) / 12345); - case NUdf::EDataSlot::String: { - std::string string = TStringBuilder() << value; - return MakeString(NUdf::TStringRef(string.data(), string.size())); + case NUdf::EDataSlot::Decimal: { + auto decimal = NYql::NDecimal::FromString(TStringBuilder() << value << ".123", DECIMAL_PRECISION, DECIMAL_SCALE); + return NUdf::TUnboxedValuePod(decimal); } - case NUdf::EDataSlot::Utf8: { - std::string string = TStringBuilder() << value << "utf8"; - return MakeString(NUdf::TStringRef(string.data(), string.size())); + case NUdf::EDataSlot::DyNumber: { + auto number = NKikimr::NDyNumber::ParseDyNumberString(TStringBuilder() << value); + UNIT_ASSERT_C(number.Defined(), "Failed to convert string to DyNumber"); + return MakeString(*number); } - // case NUdf::EDataSlot::Decimal: // TODO: yql/essentials/minikql/mkql_node.cpp:486 - // return NUdf::TUnboxedValuePod(NDecimal::FromHalfs(value, value / 2 - 1)); case NUdf::EDataSlot::Date: return NUdf::TUnboxedValuePod(static_cast<ui16>(value % NUdf::MAX_DATE)); case NUdf::EDataSlot::Datetime: @@ -64,47 +93,74 @@ NUdf::TUnboxedValue GetValueOfBasicType(TType* type, ui64 value) { return NUdf::TUnboxedValuePod(static_cast<ui64>(value % NUdf::MAX_TIMESTAMP)); case NUdf::EDataSlot::Interval: return NUdf::TUnboxedValuePod(static_cast<i64>(value / 2 - 1)); - case NUdf::EDataSlot::Date32: - return NUdf::TUnboxedValuePod(static_cast<i32>(value % NUdf::MAX_DATE32)); - case NUdf::EDataSlot::Datetime64: - return NUdf::TUnboxedValuePod(static_cast<i64>(value % NUdf::MAX_DATETIME64)); - case NUdf::EDataSlot::Timestamp64: - return NUdf::TUnboxedValuePod(static_cast<i64>(value % NUdf::MAX_TIMESTAMP64)); - case NUdf::EDataSlot::Interval64: - return NUdf::TUnboxedValuePod(static_cast<i64>(value % NUdf::MAX_INTERVAL64)); case NUdf::EDataSlot::TzDate: { auto ret = NUdf::TUnboxedValuePod(static_cast<ui16>(value % NUdf::MAX_DATE)); - ret.SetTimezoneId(NKikimr::NMiniKQL::GetTimezoneId("Europe/Moscow")); + ret.SetTimezoneId(GetTimezoneIdSkipEmpty(value)); return ret; } case NUdf::EDataSlot::TzDatetime: { auto ret = NUdf::TUnboxedValuePod(static_cast<ui32>(value % NUdf::MAX_DATETIME)); - ret.SetTimezoneId(NKikimr::NMiniKQL::GetTimezoneId("Asia/Omsk")); + ret.SetTimezoneId(GetTimezoneIdSkipEmpty(value)); return ret; } case NUdf::EDataSlot::TzTimestamp: { auto ret = NUdf::TUnboxedValuePod(static_cast<ui64>(value % NUdf::MAX_TIMESTAMP)); - ret.SetTimezoneId(NKikimr::NMiniKQL::GetTimezoneId("Europe/Tallinn")); + ret.SetTimezoneId(GetTimezoneIdSkipEmpty(value)); return ret; } + case NUdf::EDataSlot::Date32: + return NUdf::TUnboxedValuePod(static_cast<i32>(value % NUdf::MAX_DATE32)); + case NUdf::EDataSlot::Datetime64: + return NUdf::TUnboxedValuePod(static_cast<i64>(value % NUdf::MAX_DATETIME64)); + case NUdf::EDataSlot::Timestamp64: + return NUdf::TUnboxedValuePod(static_cast<i64>(value % NUdf::MAX_TIMESTAMP64)); + case NUdf::EDataSlot::Interval64: + return NUdf::TUnboxedValuePod(static_cast<i64>(value % NUdf::MAX_INTERVAL64)); case NUdf::EDataSlot::TzDate32: { auto ret = NUdf::TUnboxedValuePod(static_cast<i32>(value % NUdf::MAX_DATE32)); - ret.SetTimezoneId(NKikimr::NMiniKQL::GetTimezoneId("US/Eastern")); + ret.SetTimezoneId(GetTimezoneIdSkipEmpty(value)); return ret; } case NUdf::EDataSlot::TzDatetime64: { auto ret = NUdf::TUnboxedValuePod(static_cast<i64>(value % NUdf::MAX_DATETIME64)); - ret.SetTimezoneId(NKikimr::NMiniKQL::GetTimezoneId("America/Nuuk")); + ret.SetTimezoneId(GetTimezoneIdSkipEmpty(value)); return ret; } case NUdf::EDataSlot::TzTimestamp64: { auto ret = NUdf::TUnboxedValuePod(static_cast<i64>(value % NUdf::MAX_TIMESTAMP64)); - ret.SetTimezoneId(NKikimr::NMiniKQL::GetTimezoneId("Atlantic/Faroe")); + ret.SetTimezoneId(GetTimezoneIdSkipEmpty(value)); return ret; } - default: - Y_ABORT("Not implemented creation value for such type"); + case NUdf::EDataSlot::String: { + std::string string = TStringBuilder() << value; + return MakeString(NUdf::TStringRef(string.data(), string.size())); + } + case NUdf::EDataSlot::Utf8: { + std::string string = TStringBuilder() << value << "utf8"; + return MakeString(NUdf::TStringRef(string.data(), string.size())); + } + case NUdf::EDataSlot::Yson: { + std::string yson = TStringBuilder() << '[' << value << ']'; + return MakeString(NUdf::TStringRef(yson.data(), yson.size())); + } + case NUdf::EDataSlot::Json: { + std::string json = TStringBuilder() << '[' << value << ']'; + return MakeString(NUdf::TStringRef(json.data(), json.size())); + } + case NUdf::EDataSlot::JsonDocument: { + std::string json = SerializeToBinaryJson(TStringBuilder() << "{\"b\": " << value << ", \"a\": " << value / 2 << "}"); + return MakeString(NUdf::TStringRef(json.data(), json.size())); + } + case NUdf::EDataSlot::Uuid: { + std::string uuid; + for (size_t i = 0; i < NKikimr::NScheme::FSB_SIZE / 2; ++i) { + uuid += "a" + std::to_string((i + value) % 10); + } + return MakeString(NUdf::TStringRef(uuid)); + } } + + return NUdf::TUnboxedValuePod(); } struct TTestContext { @@ -115,8 +171,7 @@ struct TTestContext { TDefaultValueBuilder Vb; ui16 VariantSize = 0; - // Used to create LargeVariantType - TVector<TType *> BasicTypes = { + TVector<TType*> BasicTypes = { TDataType::Create(NUdf::TDataType<bool>::Id, TypeEnv), TDataType::Create(NUdf::TDataType<i8>::Id, TypeEnv), TDataType::Create(NUdf::TDataType<ui8>::Id, TypeEnv), @@ -128,23 +183,29 @@ struct TTestContext { TDataType::Create(NUdf::TDataType<ui64>::Id, TypeEnv), TDataType::Create(NUdf::TDataType<float>::Id, TypeEnv), TDataType::Create(NUdf::TDataType<double>::Id, TypeEnv), - TDataType::Create(NUdf::TDataType<char*>::Id, TypeEnv), - TDataType::Create(NUdf::TDataType<NUdf::TUtf8>::Id, TypeEnv), - // TDataType::Create(NUdf::TDataType<NUdf::TDecimal>::Id, TypeEnv), // TODO: yql/essentials/minikql/mkql_node.cpp:486 + TDataDecimalType::Create(DECIMAL_PRECISION, DECIMAL_SCALE, TypeEnv), + TDataType::Create(NUdf::TDataType<NUdf::TDyNumber>::Id, TypeEnv), TDataType::Create(NUdf::TDataType<NUdf::TDate>::Id, TypeEnv), TDataType::Create(NUdf::TDataType<NUdf::TDatetime>::Id, TypeEnv), TDataType::Create(NUdf::TDataType<NUdf::TTimestamp>::Id, TypeEnv), TDataType::Create(NUdf::TDataType<NUdf::TInterval>::Id, TypeEnv), + TDataType::Create(NUdf::TDataType<NUdf::TTzDate>::Id, TypeEnv), + TDataType::Create(NUdf::TDataType<NUdf::TTzDatetime>::Id, TypeEnv), + TDataType::Create(NUdf::TDataType<NUdf::TTzTimestamp>::Id, TypeEnv), TDataType::Create(NUdf::TDataType<NUdf::TDate32>::Id, TypeEnv), TDataType::Create(NUdf::TDataType<NUdf::TDatetime64>::Id, TypeEnv), TDataType::Create(NUdf::TDataType<NUdf::TTimestamp64>::Id, TypeEnv), TDataType::Create(NUdf::TDataType<NUdf::TInterval64>::Id, TypeEnv), - TDataType::Create(NUdf::TDataType<NUdf::TTzDate>::Id, TypeEnv), - TDataType::Create(NUdf::TDataType<NUdf::TTzDatetime>::Id, TypeEnv), - TDataType::Create(NUdf::TDataType<NUdf::TTzTimestamp>::Id, TypeEnv), TDataType::Create(NUdf::TDataType<NUdf::TTzDate32>::Id, TypeEnv), TDataType::Create(NUdf::TDataType<NUdf::TTzDatetime64>::Id, TypeEnv), - TDataType::Create(NUdf::TDataType<NUdf::TTzTimestamp64>::Id, TypeEnv)}; + TDataType::Create(NUdf::TDataType<NUdf::TTzTimestamp64>::Id, TypeEnv), + TDataType::Create(NUdf::TDataType<char*>::Id, TypeEnv), + TDataType::Create(NUdf::TDataType<NUdf::TUtf8>::Id, TypeEnv), + TDataType::Create(NUdf::TDataType<NUdf::TYson>::Id, TypeEnv), + TDataType::Create(NUdf::TDataType<NUdf::TJson>::Id, TypeEnv), + TDataType::Create(NUdf::TDataType<NUdf::TJsonDocument>::Id, TypeEnv), + TDataType::Create(NUdf::TDataType<NUdf::TUuid>::Id, TypeEnv) + }; TTestContext() : Alloc(__LOCATION__) @@ -289,8 +350,8 @@ struct TTestContext { std::string data = TStringBuilder() << "{value=" << value << "}"; item = MakeString(NUdf::TStringRef(data.data(), data.size())); } else if (typeIndex == 1) { - std::string data = TStringBuilder() << "{value:" << value << "}"; - item = MakeString(NUdf::TStringRef(data.data(), data.size())); + std::string data = TStringBuilder() << "{\"value\":" << value << "}"; + item = MakeString(SerializeToBinaryJson(data)); } else if (typeIndex == 2) { std::string sample = "7856341212905634789012345678901"; std::string data = TStringBuilder() << HexDecode(sample + static_cast<char>('0' + (value % 10))); @@ -323,8 +384,8 @@ struct TTestContext { std::string data = TStringBuilder() << "{value=" << value << "}"; item = MakeString(NUdf::TStringRef(data.data(), data.size())); } else if (typeIndex == 1) { - std::string data = TStringBuilder() << "{value:" << value << "}"; - item = MakeString(NUdf::TStringRef(data.data(), data.size())); + std::string data = TStringBuilder() << "{\"value\":" << value << "}"; + item = MakeString(SerializeToBinaryJson(data)); } else if (typeIndex == 2) { std::string sample = "7856341212905634789012345678901"; std::string data = TStringBuilder() << HexDecode(sample + static_cast<char>('0' + (value % 10))); @@ -353,8 +414,8 @@ struct TTestContext { std::string data = TStringBuilder() << "{value=" << value << "}"; item = MakeString(NUdf::TStringRef(data.data(), data.size())); } else if (typeIndex == 1) { - std::string data = TStringBuilder() << "{value:" << value << "}"; - item = MakeString(NUdf::TStringRef(data.data(), data.size())); + std::string data = TStringBuilder() << "{\"value\":" << value << "}"; + item = MakeString(SerializeToBinaryJson(data)); } else if (typeIndex == 2) { std::string sample = "7856341212905634789012345678901"; std::string data = TStringBuilder() << HexDecode(sample + static_cast<char>('0' + (value % 10))); @@ -574,7 +635,6 @@ struct TTestContext { } }; -// Note this equality check is not fully valid. But it is sufficient for UnboxedValues used in tests. void AssertUnboxedValuesAreEqual(NUdf::TUnboxedValue& left, NUdf::TUnboxedValue& right, TType* type) { switch (type->GetKind()) { case TType::EKind::Void: @@ -706,12 +766,300 @@ void AssertUnboxedValuesAreEqual(NUdf::TUnboxedValue& left, NUdf::TUnboxedValue& namespace NKikimr::NKqp::NFormats { +namespace { + +template <typename TMiniKQLType, typename TPhysicalType, typename TArrowArrayType, bool IsStringType = false, bool IsTimezoneType = false> +void TestDataTypeConversion(arrow::Type::type arrowTypeId) { + TTestContext context; + + auto type = TDataType::Create(NUdf::TDataType<TMiniKQLType>::Id, context.TypeEnv); + UNIT_ASSERT(IsArrowCompatible(type)); + + TUnboxedValueVector values; + values.reserve(TEST_ARRAY_SIZE); + + for (size_t i = 0; i < TEST_ARRAY_SIZE; ++i) { + values.emplace_back(GetValueOfBasicType(type, i)); + } + + auto array = NTestUtils::MakeArray(values, type); + UNIT_ASSERT_C(array->ValidateFull().ok(), array->ValidateFull().ToString()); + UNIT_ASSERT(array->length() == static_cast<i64>(values.size())); + + std::shared_ptr<TArrowArrayType> typedArray; + std::shared_ptr<arrow::StringArray> timezoneArray; + + if constexpr (IsTimezoneType) { + UNIT_ASSERT(array->type_id() == arrow::Type::STRUCT); + auto structArray = static_pointer_cast<arrow::StructArray>(array); + UNIT_ASSERT(structArray->num_fields() == 2); + UNIT_ASSERT(structArray->field(0)->type_id() == arrowTypeId); + UNIT_ASSERT(structArray->field(1)->type_id() == arrow::Type::STRING); + + typedArray = static_pointer_cast<TArrowArrayType>(structArray->field(0)); + timezoneArray = static_pointer_cast<arrow::StringArray>(structArray->field(1)); + } else { + UNIT_ASSERT(array->type_id() == arrowTypeId); + typedArray = static_pointer_cast<TArrowArrayType>(array); + } + + for (size_t i = 0; i < TEST_ARRAY_SIZE; ++i) { + if constexpr (IsStringType) { + if constexpr (std::is_same_v<NUdf::TJsonDocument, TMiniKQLType>) { + auto val = NBinaryJson::SerializeToJson(values[i].AsStringRef()); + UNIT_ASSERT(static_cast<TPhysicalType>(typedArray->Value(i)) == val); + } else { + auto value = NTestUtils::ExtractUnboxedValue(array, i, type, context.HolderFactory); + AssertUnboxedValuesAreEqual(value, values[i], type); + } + } else { + UNIT_ASSERT(static_cast<TPhysicalType>(typedArray->Value(i)) == values[i].Get<TPhysicalType>()); + } + + if constexpr (IsTimezoneType) { + auto view = timezoneArray->Value(i); + UNIT_ASSERT(values[i].GetTimezoneId() == GetTimezoneId(NUdf::TStringRef(view.data(), view.size()))); + } + } +} + +template <typename TMiniKQLType, bool IsDecimalType = false> +void TestFixedSizeBinaryDataTypeConversion() { + TTestContext context; + TType* type; + + if constexpr (IsDecimalType) { + type = TDataDecimalType::Create(35, 10, context.TypeEnv); + } else { + type = TDataType::Create(NUdf::TDataType<TMiniKQLType>::Id, context.TypeEnv); + } + + UNIT_ASSERT(IsArrowCompatible(type)); + + TUnboxedValueVector values; + values.reserve(TEST_ARRAY_SIZE); + + for (size_t i = 0; i < TEST_ARRAY_SIZE; ++i) { + values.emplace_back(GetValueOfBasicType(type, i)); + } + + auto array = NTestUtils::MakeArray(values, type); + UNIT_ASSERT_C(array->ValidateFull().ok(), array->ValidateFull().ToString()); + UNIT_ASSERT(array->length() == static_cast<i64>(values.size())); + + std::shared_ptr<arrow::FixedSizeBinaryArray> typedArray; + + UNIT_ASSERT(array->type_id() == arrow::Type::FIXED_SIZE_BINARY); + typedArray = static_pointer_cast<arrow::FixedSizeBinaryArray>(array); + UNIT_ASSERT(typedArray->byte_width() == NScheme::FSB_SIZE); + + for (size_t i = 0; i < TEST_ARRAY_SIZE; ++i) { + auto view = typedArray->GetView(i); + if constexpr (IsDecimalType) { + NYql::NDecimal::TInt128 actual; + std::memcpy(&actual, view.data(), view.size()); + + NYql::NDecimal::TInt128 expected = values[i].GetInt128(); + UNIT_ASSERT(actual == expected); + } else { + auto expected = values[i].AsStringRef(); + UNIT_ASSERT_STRINGS_EQUAL(std::string(view.data(), view.size()), std::string(expected.Data(), expected.Size())); + } + } +} + +template <TType::EKind SingularKind> +void TestSingularTypeConversion() { + TTestContext context; + + TType* type = GetTypeOfSingular<SingularKind>(context.TypeEnv); + UNIT_ASSERT(IsArrowCompatible(type)); + + TUnboxedValueVector values; + values.reserve(TEST_ARRAY_SIZE); + + for (size_t i = 0; i < TEST_ARRAY_SIZE; ++i) { + values.emplace_back(); + } + + auto array = NTestUtils::MakeArray(values, type); + UNIT_ASSERT_C(array->ValidateFull().ok(), array->ValidateFull().ToString()); + UNIT_ASSERT(array->length() == static_cast<i64>(TEST_ARRAY_SIZE)); + + if (SingularKind == TType::EKind::Null) { + UNIT_ASSERT(array->type_id() == arrow::Type::NA); + } else { + UNIT_ASSERT(array->type_id() == arrow::Type::STRUCT); + auto structArray = static_pointer_cast<arrow::StructArray>(array); + UNIT_ASSERT(structArray->num_fields() == 0); + } +} + +} // namespace + +Y_UNIT_TEST_SUITE(KqpFormats_Arrow_Conversion) { + + // Integral types + Y_UNIT_TEST(DataType_Bool) { + TestDataTypeConversion<bool, bool, arrow::UInt8Array>(arrow::Type::UINT8); + } + + Y_UNIT_TEST(DataType_Int8) { + TestDataTypeConversion<i8, i8, arrow::Int8Array>(arrow::Type::INT8); + } + + Y_UNIT_TEST(DataType_UInt8) { + TestDataTypeConversion<ui8, ui8, arrow::UInt8Array>(arrow::Type::UINT8); + } + + Y_UNIT_TEST(DataType_Int16) { + TestDataTypeConversion<i16, i16, arrow::Int16Array>(arrow::Type::INT16); + } + + Y_UNIT_TEST(DataType_UInt16) { + TestDataTypeConversion<ui16, ui16, arrow::UInt16Array>(arrow::Type::UINT16); + } + + Y_UNIT_TEST(DataType_Int32) { + TestDataTypeConversion<i32, i32, arrow::Int32Array>(arrow::Type::INT32); + } + + Y_UNIT_TEST(DataType_UInt32) { + TestDataTypeConversion<ui32, ui32, arrow::UInt32Array>(arrow::Type::UINT32); + } + + Y_UNIT_TEST(DataType_Int64) { + TestDataTypeConversion<i64, i64, arrow::Int64Array>(arrow::Type::INT64); + } + + Y_UNIT_TEST(DataType_UInt64) { + TestDataTypeConversion<ui64, ui64, arrow::UInt64Array>(arrow::Type::UINT64); + } + + // Binary number types + Y_UNIT_TEST(DataType_Decimal) { + TestFixedSizeBinaryDataTypeConversion<NUdf::TDecimal, /* IsDecimalType */ true>(); + } + + Y_UNIT_TEST(DataType_DyNumber) { + TestDataTypeConversion<NUdf::TDyNumber, std::string, arrow::StringArray, /* IsStringType */ true>(arrow::Type::STRING); + } + + // Floating point types + Y_UNIT_TEST(DataType_Float) { + TestDataTypeConversion<float, float, arrow::FloatArray>(arrow::Type::FLOAT); + } + + Y_UNIT_TEST(DataType_Double) { + TestDataTypeConversion<double, double, arrow::DoubleArray>(arrow::Type::DOUBLE); + } + + // Datetime types + Y_UNIT_TEST(DataType_Date) { + TestDataTypeConversion<NUdf::TDate, ui16, arrow::UInt16Array>(arrow::Type::UINT16); + } + + Y_UNIT_TEST(DataType_Datetime) { + TestDataTypeConversion<NUdf::TDatetime, ui32, arrow::UInt32Array>(arrow::Type::UINT32); + } + + Y_UNIT_TEST(DataType_Timestamp) { + TestDataTypeConversion<NUdf::TTimestamp, ui64, arrow::UInt64Array>(arrow::Type::UINT64); + } + + Y_UNIT_TEST(DataType_Interval) { + TestDataTypeConversion<NUdf::TInterval, i64, arrow::Int64Array>(arrow::Type::INT64); + } + + Y_UNIT_TEST(DataType_TzDate) { + TestDataTypeConversion<NUdf::TTzDate, ui16, arrow::UInt16Array, /* IsStringType */ false, /* HasTimezone */ true>(arrow::Type::UINT16); + } + + Y_UNIT_TEST(DataType_TzDatetime) { + TestDataTypeConversion<NUdf::TTzDatetime, ui32, arrow::UInt32Array, /* IsStringType */ false, /* HasTimezone */ true>(arrow::Type::UINT32); + } + + Y_UNIT_TEST(DataType_TzTimestamp) { + TestDataTypeConversion<NUdf::TTzTimestamp, ui64, arrow::UInt64Array, /* IsStringType */ false, /* HasTimezone */ true>(arrow::Type::UINT64); + } + + Y_UNIT_TEST(DataType_Date32) { + TestDataTypeConversion<NUdf::TDate32, i32, arrow::Int32Array>(arrow::Type::INT32); + } + + Y_UNIT_TEST(DataType_Datetime64) { + TestDataTypeConversion<NUdf::TDatetime64, i64, arrow::Int64Array>(arrow::Type::INT64); + } + + Y_UNIT_TEST(DataType_Timestamp64) { + TestDataTypeConversion<NUdf::TTimestamp64, i64, arrow::Int64Array>(arrow::Type::INT64); + } + + Y_UNIT_TEST(DataType_Interval64) { + TestDataTypeConversion<NUdf::TInterval64, i64, arrow::Int64Array>(arrow::Type::INT64); + } + + Y_UNIT_TEST(DataType_TzDate32) { + TestDataTypeConversion<NUdf::TTzDate32, i32, arrow::Int32Array, /* IsStringType */ false, /* HasTimezone */ true>(arrow::Type::INT32); + } + + Y_UNIT_TEST(DataType_TzDatetime64) { + TestDataTypeConversion<NUdf::TTzDatetime64, i64, arrow::Int64Array, /* IsStringType */ false, /* HasTimezone */ true>(arrow::Type::INT64); + } + + Y_UNIT_TEST(DataType_TzTimestamp64) { + TestDataTypeConversion<NUdf::TTzTimestamp64, i64, arrow::Int64Array, /* IsStringType */ false, /* HasTimezone */ true>(arrow::Type::INT64); + } + + // String types + Y_UNIT_TEST(DataType_String) { + TestDataTypeConversion<char*, std::string, arrow::BinaryArray, /* IsStringType */ true>(arrow::Type::BINARY); + } + + Y_UNIT_TEST(DataType_Utf8) { + TestDataTypeConversion<NUdf::TUtf8, std::string, arrow::StringArray, /* IsStringType */ true>(arrow::Type::STRING); + } + + Y_UNIT_TEST(DataType_Yson) { + TestDataTypeConversion<NUdf::TYson, std::string, arrow::BinaryArray, /* IsStringType */ true>(arrow::Type::BINARY); + } + + Y_UNIT_TEST(DataType_Json) { + TestDataTypeConversion<NUdf::TJson, std::string, arrow::StringArray, /* IsStringType */ true>(arrow::Type::STRING); + } + + Y_UNIT_TEST(DataType_JsonDocument) { + TestDataTypeConversion<NUdf::TJsonDocument, std::string, arrow::StringArray, /* IsStringType */ true>(arrow::Type::STRING); + } + + Y_UNIT_TEST(DataType_Uuid) { + TestFixedSizeBinaryDataTypeConversion<NUdf::TUuid>(); + } + + // Singular types + Y_UNIT_TEST(DataType_Null) { + TestSingularTypeConversion<TType::EKind::Null>(); + } + + Y_UNIT_TEST(DataType_Void) { + TestSingularTypeConversion<TType::EKind::Void>(); + } + + Y_UNIT_TEST(DataType_EmptyList) { + TestSingularTypeConversion<TType::EKind::EmptyList>(); + } + + Y_UNIT_TEST(DataType_EmptyDict) { + TestSingularTypeConversion<TType::EKind::EmptyDict>(); + } +} + Y_UNIT_TEST_SUITE(DqUnboxedValueToNativeArrowConversion) { Y_UNIT_TEST(Struct) { TTestContext context; auto structType = context.GetStructType(); - UNIT_ASSERT(NFormats::NTestUtils::IsArrowCompatible(structType)); + UNIT_ASSERT(NFormats::IsArrowCompatible(structType)); auto values = context.CreateStructs(100); auto array = NFormats::NTestUtils::MakeArray(values, structType); @@ -752,7 +1100,7 @@ Y_UNIT_TEST_SUITE(DqUnboxedValueToNativeArrowConversion) { TTestContext context; auto tupleType = context.GetTupleType(); - UNIT_ASSERT(NFormats::NTestUtils::IsArrowCompatible(tupleType)); + UNIT_ASSERT(NFormats::IsArrowCompatible(tupleType)); auto values = context.CreateTuples(100); auto array = NFormats::NTestUtils::MakeArray(values, tupleType); @@ -792,7 +1140,7 @@ Y_UNIT_TEST_SUITE(DqUnboxedValueToNativeArrowConversion) { TTestContext context; auto listType = context.GetListOfJsonsType(); - Y_ABORT_UNLESS(NFormats::NTestUtils::IsArrowCompatible(listType)); + Y_ABORT_UNLESS(NFormats::IsArrowCompatible(listType)); auto values = context.CreateListOfJsons(100); auto array = NFormats::NTestUtils::MakeArray(values, listType); @@ -825,7 +1173,7 @@ Y_UNIT_TEST_SUITE(DqUnboxedValueToNativeArrowConversion) { TTestContext context; auto listType = context.GetOptionalListOfOptional(); - Y_ABORT_UNLESS(NFormats::NTestUtils::IsArrowCompatible(listType)); + Y_ABORT_UNLESS(NFormats::IsArrowCompatible(listType)); auto values = context.CreateOptionalListOfOptional(100); auto array = NFormats::NTestUtils::MakeArray(values, listType); @@ -863,196 +1211,196 @@ Y_UNIT_TEST_SUITE(DqUnboxedValueToNativeArrowConversion) { } } - Y_UNIT_TEST(VariantOverStruct) { - TTestContext context; - - auto variantType = context.GetVariantOverStructType(); - UNIT_ASSERT(NFormats::NTestUtils::IsArrowCompatible(variantType)); - - auto values = context.CreateVariantOverStruct(100); - auto array = NFormats::NTestUtils::MakeArray(values, variantType); - UNIT_ASSERT(array->ValidateFull().ok()); - UNIT_ASSERT(static_cast<ui64>(array->length()) == values.size()); - UNIT_ASSERT(array->type_id() == arrow::Type::DENSE_UNION); - auto unionArray = static_pointer_cast<arrow::DenseUnionArray>(array); - - UNIT_ASSERT(unionArray->num_fields() == 4); - UNIT_ASSERT(unionArray->field(0)->type_id() == arrow::Type::BINARY); - UNIT_ASSERT(unionArray->field(1)->type_id() == arrow::Type::BINARY); - UNIT_ASSERT(unionArray->field(2)->type_id() == arrow::Type::FIXED_SIZE_BINARY); - UNIT_ASSERT(unionArray->field(3)->type_id() == arrow::Type::FLOAT); - - auto ysonArray = static_pointer_cast<arrow::BinaryArray>(unionArray->field(0)); - auto jsonDocArray = static_pointer_cast<arrow::BinaryArray>(unionArray->field(1)); - auto uuidArray = static_pointer_cast<arrow::FixedSizeBinaryArray>(unionArray->field(2)); - auto floatArray = static_pointer_cast<arrow::FloatArray>(unionArray->field(3)); - - for (ui64 index = 0; index < values.size(); ++index) { - auto value = values[index]; - UNIT_ASSERT(value.GetVariantIndex() == static_cast<ui32>(unionArray->child_id(index))); - auto fieldIndex = unionArray->value_offset(index); - if (value.GetVariantIndex() == 3) { - auto valueArrow = floatArray->Value(fieldIndex); - auto valueInner = value.GetVariantItem().Get<float>(); - UNIT_ASSERT(valueArrow == valueInner); - } else { - arrow::util::string_view viewArrow; - if (value.GetVariantIndex() == 0) { - viewArrow = ysonArray->GetView(fieldIndex); - } else if (value.GetVariantIndex() == 1) { - viewArrow = jsonDocArray->GetView(fieldIndex); - } else if (value.GetVariantIndex() == 2) { - viewArrow = uuidArray->GetView(fieldIndex); - } - std::string valueArrow(viewArrow.data(), viewArrow.size()); - auto innerItem = value.GetVariantItem(); - auto refInner = innerItem.AsStringRef(); - std::string valueInner(refInner.Data(), refInner.Size()); - UNIT_ASSERT(valueArrow == valueInner); - } - } - } - - Y_UNIT_TEST(OptionalVariantOverStruct) { - TTestContext context; - - auto variantType = context.GetOptionalVariantOverStructType(); - UNIT_ASSERT(!NFormats::NTestUtils::IsArrowCompatible(variantType)); - - auto values = context.CreateOptionalVariantOverStruct(100); - auto array = NFormats::NTestUtils::MakeArray(values, variantType); - UNIT_ASSERT(array->ValidateFull().ok()); - UNIT_ASSERT(static_cast<ui64>(array->length()) == values.size()); - UNIT_ASSERT(array->type_id() == arrow::Type::STRUCT); - - auto structArray = static_pointer_cast<arrow::StructArray>(array); - UNIT_ASSERT(structArray->num_fields() == 1); - UNIT_ASSERT(structArray->field(0)->type_id() == arrow::Type::DENSE_UNION); - - auto unionArray = static_pointer_cast<arrow::DenseUnionArray>(structArray->field(0)); - - UNIT_ASSERT(unionArray->num_fields() == 4); - UNIT_ASSERT(unionArray->field(0)->type_id() == arrow::Type::BINARY); - UNIT_ASSERT(unionArray->field(1)->type_id() == arrow::Type::BINARY); - UNIT_ASSERT(unionArray->field(2)->type_id() == arrow::Type::FIXED_SIZE_BINARY); - UNIT_ASSERT(unionArray->field(3)->type_id() == arrow::Type::FLOAT); - - auto ysonArray = static_pointer_cast<arrow::BinaryArray>(unionArray->field(0)); - auto jsonDocArray = static_pointer_cast<arrow::BinaryArray>(unionArray->field(1)); - auto uuidArray = static_pointer_cast<arrow::FixedSizeBinaryArray>(unionArray->field(2)); - auto floatArray = static_pointer_cast<arrow::FloatArray>(unionArray->field(3)); - - for (ui64 index = 0; index < values.size(); ++index) { - auto value = values[index]; - if (!value.HasValue()) { - // NULL - UNIT_ASSERT(structArray->IsNull(index)); - continue; - } - - UNIT_ASSERT(!structArray->IsNull(index)); - - UNIT_ASSERT(value.GetVariantIndex() == static_cast<ui32>(unionArray->child_id(index))); - auto fieldIndex = unionArray->value_offset(index); - if (value.GetVariantIndex() == 3) { - auto valueArrow = floatArray->Value(fieldIndex); - auto valueInner = value.GetVariantItem().Get<float>(); - UNIT_ASSERT(valueArrow == valueInner); - } else { - arrow::util::string_view viewArrow; - if (value.GetVariantIndex() == 0) { - viewArrow = ysonArray->GetView(fieldIndex); - } else if (value.GetVariantIndex() == 1) { - viewArrow = jsonDocArray->GetView(fieldIndex); - } else if (value.GetVariantIndex() == 2) { - viewArrow = uuidArray->GetView(fieldIndex); - } - std::string valueArrow(viewArrow.data(), viewArrow.size()); - auto innerItem = value.GetVariantItem(); - auto refInner = innerItem.AsStringRef(); - std::string valueInner(refInner.Data(), refInner.Size()); - UNIT_ASSERT(valueArrow == valueInner); - } - } - } - - Y_UNIT_TEST(DoubleOptionalVariantOverStruct) { - TTestContext context; - - auto variantType = context.GetDoubleOptionalVariantOverStructType(); - UNIT_ASSERT(!NFormats::NTestUtils::IsArrowCompatible(variantType)); - - auto values = context.CreateDoubleOptionalVariantOverStruct(100); - auto array = NFormats::NTestUtils::MakeArray(values, variantType); - UNIT_ASSERT(array->ValidateFull().ok()); - UNIT_ASSERT(static_cast<ui64>(array->length()) == values.size()); - UNIT_ASSERT(array->type_id() == arrow::Type::STRUCT); - - auto firstStructArray = static_pointer_cast<arrow::StructArray>(array); - UNIT_ASSERT(firstStructArray->num_fields() == 1); - UNIT_ASSERT(firstStructArray->field(0)->type_id() == arrow::Type::STRUCT); - - auto secondStructArray = static_pointer_cast<arrow::StructArray>(firstStructArray->field(0)); - UNIT_ASSERT(secondStructArray->num_fields() == 1); - UNIT_ASSERT(secondStructArray->field(0)->type_id() == arrow::Type::DENSE_UNION); - - auto unionArray = static_pointer_cast<arrow::DenseUnionArray>(secondStructArray->field(0)); - - UNIT_ASSERT(unionArray->num_fields() == 4); - UNIT_ASSERT(unionArray->field(0)->type_id() == arrow::Type::BINARY); - UNIT_ASSERT(unionArray->field(1)->type_id() == arrow::Type::BINARY); - UNIT_ASSERT(unionArray->field(2)->type_id() == arrow::Type::FIXED_SIZE_BINARY); - UNIT_ASSERT(unionArray->field(3)->type_id() == arrow::Type::FLOAT); - - auto ysonArray = static_pointer_cast<arrow::BinaryArray>(unionArray->field(0)); - auto jsonDocArray = static_pointer_cast<arrow::BinaryArray>(unionArray->field(1)); - auto uuidArray = static_pointer_cast<arrow::FixedSizeBinaryArray>(unionArray->field(2)); - auto floatArray = static_pointer_cast<arrow::FloatArray>(unionArray->field(3)); - - for (ui64 index = 0; index < values.size(); ++index) { - auto value = values[index]; - if (!value.HasValue()) { - if (value) { - // Optional(NULL) - UNIT_ASSERT(secondStructArray->IsNull(index)); - } else { - // NULL - UNIT_ASSERT(firstStructArray->IsNull(index)); - } - continue; - } - - UNIT_ASSERT(!firstStructArray->IsNull(index) && !secondStructArray->IsNull(index)); - - UNIT_ASSERT(value.GetVariantIndex() == static_cast<ui32>(unionArray->child_id(index))); - auto fieldIndex = unionArray->value_offset(index); - if (value.GetVariantIndex() == 3) { - auto valueArrow = floatArray->Value(fieldIndex); - auto valueInner = value.GetVariantItem().Get<float>(); - UNIT_ASSERT_VALUES_EQUAL(valueArrow, valueInner); - } else { - arrow::util::string_view viewArrow; - if (value.GetVariantIndex() == 0) { - viewArrow = ysonArray->GetView(fieldIndex); - } else if (value.GetVariantIndex() == 1) { - viewArrow = jsonDocArray->GetView(fieldIndex); - } else if (value.GetVariantIndex() == 2) { - viewArrow = uuidArray->GetView(fieldIndex); - } - std::string valueArrow(viewArrow.data(), viewArrow.size()); - auto innerItem = value.GetVariantItem(); - auto refInner = innerItem.AsStringRef(); - std::string valueInner(refInner.Data(), refInner.Size()); - UNIT_ASSERT_VALUES_EQUAL(valueArrow, valueInner); - } - } - } + // Y_UNIT_TEST(VariantOverStruct) { + // TTestContext context; + + // auto variantType = context.GetVariantOverStructType(); + // UNIT_ASSERT(NFormats::IsArrowCompatible(variantType)); + + // auto values = context.CreateVariantOverStruct(100); + // auto array = NFormats::NTestUtils::MakeArray(values, variantType); + // UNIT_ASSERT(array->ValidateFull().ok()); + // UNIT_ASSERT(static_cast<ui64>(array->length()) == values.size()); + // UNIT_ASSERT(array->type_id() == arrow::Type::DENSE_UNION); + // auto unionArray = static_pointer_cast<arrow::DenseUnionArray>(array); + + // UNIT_ASSERT(unionArray->num_fields() == 4); + // UNIT_ASSERT(unionArray->field(0)->type_id() == arrow::Type::BINARY); + // UNIT_ASSERT(unionArray->field(1)->type_id() == arrow::Type::STRING); + // UNIT_ASSERT(unionArray->field(2)->type_id() == arrow::Type::FIXED_SIZE_BINARY); + // UNIT_ASSERT(unionArray->field(3)->type_id() == arrow::Type::FLOAT); + + // auto ysonArray = static_pointer_cast<arrow::BinaryArray>(unionArray->field(0)); + // auto jsonDocArray = static_pointer_cast<arrow::BinaryArray>(unionArray->field(1)); + // auto uuidArray = static_pointer_cast<arrow::FixedSizeBinaryArray>(unionArray->field(2)); + // auto floatArray = static_pointer_cast<arrow::FloatArray>(unionArray->field(3)); + + // for (ui64 index = 0; index < values.size(); ++index) { + // auto value = values[index]; + // UNIT_ASSERT(value.GetVariantIndex() == static_cast<ui32>(unionArray->child_id(index))); + // auto fieldIndex = unionArray->value_offset(index); + // if (value.GetVariantIndex() == 3) { + // auto valueArrow = floatArray->Value(fieldIndex); + // auto valueInner = value.GetVariantItem().Get<float>(); + // UNIT_ASSERT(valueArrow == valueInner); + // } else { + // arrow::util::string_view viewArrow; + // if (value.GetVariantIndex() == 0) { + // viewArrow = ysonArray->GetView(fieldIndex); + // } else if (value.GetVariantIndex() == 1) { + // viewArrow = jsonDocArray->GetView(fieldIndex); + // } else if (value.GetVariantIndex() == 2) { + // viewArrow = uuidArray->GetView(fieldIndex); + // } + // std::string valueArrow(viewArrow.data(), viewArrow.size()); + // auto innerItem = value.GetVariantItem(); + // auto refInner = innerItem.AsStringRef(); + // std::string valueInner(refInner.Data(), refInner.Size()); + // UNIT_ASSERT(valueArrow == valueInner); + // } + // } + // } + + // Y_UNIT_TEST(OptionalVariantOverStruct) { + // TTestContext context; + + // auto variantType = context.GetOptionalVariantOverStructType(); + // UNIT_ASSERT(NFormats::IsArrowCompatible(variantType)); + + // auto values = context.CreateOptionalVariantOverStruct(100); + // auto array = NFormats::NTestUtils::MakeArray(values, variantType); + // UNIT_ASSERT(array->ValidateFull().ok()); + // UNIT_ASSERT(static_cast<ui64>(array->length()) == values.size()); + // UNIT_ASSERT(array->type_id() == arrow::Type::STRUCT); + + // auto structArray = static_pointer_cast<arrow::StructArray>(array); + // UNIT_ASSERT(structArray->num_fields() == 1); + // UNIT_ASSERT(structArray->field(0)->type_id() == arrow::Type::DENSE_UNION); + + // auto unionArray = static_pointer_cast<arrow::DenseUnionArray>(structArray->field(0)); + + // UNIT_ASSERT(unionArray->num_fields() == 4); + // UNIT_ASSERT(unionArray->field(0)->type_id() == arrow::Type::BINARY); + // UNIT_ASSERT(unionArray->field(1)->type_id() == arrow::Type::STRING); + // UNIT_ASSERT(unionArray->field(2)->type_id() == arrow::Type::FIXED_SIZE_BINARY); + // UNIT_ASSERT(unionArray->field(3)->type_id() == arrow::Type::FLOAT); + + // auto ysonArray = static_pointer_cast<arrow::BinaryArray>(unionArray->field(0)); + // auto jsonDocArray = static_pointer_cast<arrow::BinaryArray>(unionArray->field(1)); + // auto uuidArray = static_pointer_cast<arrow::FixedSizeBinaryArray>(unionArray->field(2)); + // auto floatArray = static_pointer_cast<arrow::FloatArray>(unionArray->field(3)); + + // for (ui64 index = 0; index < values.size(); ++index) { + // auto value = values[index]; + // if (!value.HasValue()) { + // // NULL + // UNIT_ASSERT(structArray->IsNull(index)); + // continue; + // } + + // UNIT_ASSERT(!structArray->IsNull(index)); + + // UNIT_ASSERT(value.GetVariantIndex() == static_cast<ui32>(unionArray->child_id(index))); + // auto fieldIndex = unionArray->value_offset(index); + // if (value.GetVariantIndex() == 3) { + // auto valueArrow = floatArray->Value(fieldIndex); + // auto valueInner = value.GetVariantItem().Get<float>(); + // UNIT_ASSERT(valueArrow == valueInner); + // } else { + // arrow::util::string_view viewArrow; + // if (value.GetVariantIndex() == 0) { + // viewArrow = ysonArray->GetView(fieldIndex); + // } else if (value.GetVariantIndex() == 1) { + // viewArrow = jsonDocArray->GetView(fieldIndex); + // } else if (value.GetVariantIndex() == 2) { + // viewArrow = uuidArray->GetView(fieldIndex); + // } + // std::string valueArrow(viewArrow.data(), viewArrow.size()); + // auto innerItem = value.GetVariantItem(); + // auto refInner = innerItem.AsStringRef(); + // std::string valueInner(refInner.Data(), refInner.Size()); + // UNIT_ASSERT(valueArrow == valueInner); + // } + // } + // } + + // Y_UNIT_TEST(DoubleOptionalVariantOverStruct) { + // TTestContext context; + + // auto variantType = context.GetDoubleOptionalVariantOverStructType(); + // UNIT_ASSERT(NFormats::IsArrowCompatible(variantType)); + + // auto values = context.CreateDoubleOptionalVariantOverStruct(100); + // auto array = NFormats::NTestUtils::MakeArray(values, variantType); + // UNIT_ASSERT(array->ValidateFull().ok()); + // UNIT_ASSERT(static_cast<ui64>(array->length()) == values.size()); + // UNIT_ASSERT(array->type_id() == arrow::Type::STRUCT); + + // auto firstStructArray = static_pointer_cast<arrow::StructArray>(array); + // UNIT_ASSERT(firstStructArray->num_fields() == 1); + // UNIT_ASSERT(firstStructArray->field(0)->type_id() == arrow::Type::STRUCT); + + // auto secondStructArray = static_pointer_cast<arrow::StructArray>(firstStructArray->field(0)); + // UNIT_ASSERT(secondStructArray->num_fields() == 1); + // UNIT_ASSERT(secondStructArray->field(0)->type_id() == arrow::Type::DENSE_UNION); + + // auto unionArray = static_pointer_cast<arrow::DenseUnionArray>(secondStructArray->field(0)); + + // UNIT_ASSERT(unionArray->num_fields() == 4); + // UNIT_ASSERT(unionArray->field(0)->type_id() == arrow::Type::BINARY); + // UNIT_ASSERT(unionArray->field(1)->type_id() == arrow::Type::STRING); + // UNIT_ASSERT(unionArray->field(2)->type_id() == arrow::Type::FIXED_SIZE_BINARY); + // UNIT_ASSERT(unionArray->field(3)->type_id() == arrow::Type::FLOAT); + + // auto ysonArray = static_pointer_cast<arrow::BinaryArray>(unionArray->field(0)); + // auto jsonDocArray = static_pointer_cast<arrow::BinaryArray>(unionArray->field(1)); + // auto uuidArray = static_pointer_cast<arrow::FixedSizeBinaryArray>(unionArray->field(2)); + // auto floatArray = static_pointer_cast<arrow::FloatArray>(unionArray->field(3)); + + // for (ui64 index = 0; index < values.size(); ++index) { + // auto value = values[index]; + // if (!value.HasValue()) { + // if (value) { + // // Optional(NULL) + // UNIT_ASSERT(secondStructArray->IsNull(index)); + // } else { + // // NULL + // UNIT_ASSERT(firstStructArray->IsNull(index)); + // } + // continue; + // } + + // UNIT_ASSERT(!firstStructArray->IsNull(index) && !secondStructArray->IsNull(index)); + + // UNIT_ASSERT(value.GetVariantIndex() == static_cast<ui32>(unionArray->child_id(index))); + // auto fieldIndex = unionArray->value_offset(index); + // if (value.GetVariantIndex() == 3) { + // auto valueArrow = floatArray->Value(fieldIndex); + // auto valueInner = value.GetVariantItem().Get<float>(); + // UNIT_ASSERT_VALUES_EQUAL(valueArrow, valueInner); + // } else { + // arrow::util::string_view viewArrow; + // if (value.GetVariantIndex() == 0) { + // viewArrow = ysonArray->GetView(fieldIndex); + // } else if (value.GetVariantIndex() == 1) { + // viewArrow = jsonDocArray->GetView(fieldIndex); + // } else if (value.GetVariantIndex() == 2) { + // viewArrow = uuidArray->GetView(fieldIndex); + // } + // std::string valueArrow(viewArrow.data(), viewArrow.size()); + // auto innerItem = value.GetVariantItem(); + // auto refInner = innerItem.AsStringRef(); + // std::string valueInner(refInner.Data(), refInner.Size()); + // UNIT_ASSERT_VALUES_EQUAL(valueArrow, valueInner); + // } + // } + // } Y_UNIT_TEST(VariantOverTupleWithOptionals) { TTestContext context; auto variantType = context.GetVariantOverTupleWithOptionalsType(); - UNIT_ASSERT(NFormats::NTestUtils::IsArrowCompatible(variantType)); + UNIT_ASSERT(NFormats::IsArrowCompatible(variantType)); auto values = context.CreateVariantOverTupleWithOptionals(100); auto array = NFormats::NTestUtils::MakeArray(values, variantType); @@ -1110,7 +1458,7 @@ Y_UNIT_TEST_SUITE(DqUnboxedValueToNativeArrowConversion) { TTestContext context; auto variantType = context.GetOptionalVariantOverTupleWithOptionalsType(); - UNIT_ASSERT(!NFormats::NTestUtils::IsArrowCompatible(variantType)); + UNIT_ASSERT(NFormats::IsArrowCompatible(variantType)); auto values = context.CreateOptionalVariantOverTupleWithOptionals(100); auto array = NFormats::NTestUtils::MakeArray(values, variantType); @@ -1180,7 +1528,7 @@ Y_UNIT_TEST_SUITE(DqUnboxedValueToNativeArrowConversion) { TTestContext context; auto variantType = context.GetDoubleOptionalVariantOverTupleWithOptionalsType(); - UNIT_ASSERT(!NFormats::NTestUtils::IsArrowCompatible(variantType)); + UNIT_ASSERT(NFormats::IsArrowCompatible(variantType)); auto values = context.CreateDoubleOptionalVariantOverTupleWithOptionals(100); auto array = NFormats::NTestUtils::MakeArray(values, variantType); @@ -1259,7 +1607,7 @@ Y_UNIT_TEST_SUITE(DqUnboxedValueDoNotFitToArrow) { TTestContext context; auto dictType = context.GetDictUtf8ToIntervalType(); - UNIT_ASSERT(!NFormats::NTestUtils::IsArrowCompatible(dictType)); + UNIT_ASSERT(NFormats::IsArrowCompatible(dictType)); auto values = context.CreateDictUtf8ToInterval(100); auto array = NFormats::NTestUtils::MakeArray(values, dictType); @@ -1304,7 +1652,7 @@ Y_UNIT_TEST_SUITE(DqUnboxedValueDoNotFitToArrow) { TTestContext context; auto dictType = context.GetDictOptionalToTupleType(); - UNIT_ASSERT(!NFormats::NTestUtils::IsArrowCompatible(dictType)); + UNIT_ASSERT(NFormats::IsArrowCompatible(dictType)); auto values = context.CreateDictOptionalToTuple(100); auto array = NFormats::NTestUtils::MakeArray(values, dictType); @@ -1359,7 +1707,7 @@ Y_UNIT_TEST_SUITE(DqUnboxedValueDoNotFitToArrow) { TTestContext context; auto doubleOptionalType = context.GetOptionalOfOptionalType(); - UNIT_ASSERT(!NFormats::NTestUtils::IsArrowCompatible(doubleOptionalType)); + UNIT_ASSERT(NFormats::IsArrowCompatible(doubleOptionalType)); auto values = context.CreateOptionalOfOptional(100); auto array = NFormats::NTestUtils::MakeArray(values, doubleOptionalType); @@ -1413,8 +1761,7 @@ Y_UNIT_TEST_SUITE(DqUnboxedValueDoNotFitToArrow) { ui32 numberOfTypes = 500; auto variantType = context.GetLargeVariantType(numberOfTypes); - bool isCompatible = NFormats::NTestUtils::IsArrowCompatible(variantType); - UNIT_ASSERT(!isCompatible); + UNIT_ASSERT(NFormats::IsArrowCompatible(variantType)); auto values = context.CreateLargeVariant(1000); auto array = NFormats::NTestUtils::MakeArray(values, variantType); @@ -1451,7 +1798,7 @@ Y_UNIT_TEST_SUITE(ConvertUnboxedValueToArrowAndBack){ TTestContext context; auto tupleType = context.GetTupleType(); - UNIT_ASSERT(NFormats::NTestUtils::IsArrowCompatible(tupleType)); + UNIT_ASSERT(NFormats::IsArrowCompatible(tupleType)); auto values = context.CreateTuples(100); auto array = NFormats::NTestUtils::MakeArray(values, tupleType); @@ -1466,7 +1813,7 @@ Y_UNIT_TEST_SUITE(ConvertUnboxedValueToArrowAndBack){ TTestContext context; auto dictType = context.GetDictUtf8ToIntervalType(); - UNIT_ASSERT(!NFormats::NTestUtils::IsArrowCompatible(dictType)); + UNIT_ASSERT(NFormats::IsArrowCompatible(dictType)); auto values = context.CreateDictUtf8ToInterval(100); auto array = NFormats::NTestUtils::MakeArray(values, dictType); @@ -1481,7 +1828,7 @@ Y_UNIT_TEST_SUITE(ConvertUnboxedValueToArrowAndBack){ TTestContext context; auto listType = context.GetListOfJsonsType(); - Y_ABORT_UNLESS(NFormats::NTestUtils::IsArrowCompatible(listType)); + Y_ABORT_UNLESS(NFormats::IsArrowCompatible(listType)); auto values = context.CreateListOfJsons(100); auto array = NFormats::NTestUtils::MakeArray(values, listType); @@ -1496,7 +1843,7 @@ Y_UNIT_TEST_SUITE(ConvertUnboxedValueToArrowAndBack){ TTestContext context; auto listType = context.GetOptionalListOfOptional(); - Y_ABORT_UNLESS(NFormats::NTestUtils::IsArrowCompatible(listType)); + Y_ABORT_UNLESS(NFormats::IsArrowCompatible(listType)); auto values = context.CreateOptionalListOfOptional(100); auto array = NFormats::NTestUtils::MakeArray(values, listType); @@ -1507,56 +1854,56 @@ Y_UNIT_TEST_SUITE(ConvertUnboxedValueToArrowAndBack){ } } - Y_UNIT_TEST(VariantOverStruct) { - TTestContext context; - - auto variantType = context.GetVariantOverStructType(); - UNIT_ASSERT(NFormats::NTestUtils::IsArrowCompatible(variantType)); - - auto values = context.CreateVariantOverStruct(100); - auto array = NFormats::NTestUtils::MakeArray(values, variantType); - auto restoredValues = NFormats::NTestUtils::ExtractUnboxedValues(array, variantType, context.HolderFactory); - UNIT_ASSERT_EQUAL(values.size(), restoredValues.size()); - for (ui64 index = 0; index < values.size(); ++index) { - AssertUnboxedValuesAreEqual(values[index], restoredValues[index], variantType); - } - } - - Y_UNIT_TEST(OptionalVariantOverStruct) { - TTestContext context; - - auto optionalVariantType = context.GetOptionalVariantOverStructType(); - UNIT_ASSERT(!NFormats::NTestUtils::IsArrowCompatible(optionalVariantType)); - - auto values = context.CreateOptionalVariantOverStruct(100); - auto array = NFormats::NTestUtils::MakeArray(values, optionalVariantType); - auto restoredValues = NFormats::NTestUtils::ExtractUnboxedValues(array, optionalVariantType, context.HolderFactory); - UNIT_ASSERT_EQUAL(values.size(), restoredValues.size()); - for (ui64 index = 0; index < values.size(); ++index) { - AssertUnboxedValuesAreEqual(values[index], restoredValues[index], optionalVariantType); - } - } - - Y_UNIT_TEST(DoubleOptionalVariantOverStruct) { - TTestContext context; - - auto doubleOptionalVariantType = context.GetDoubleOptionalVariantOverStructType(); - UNIT_ASSERT(!NFormats::NTestUtils::IsArrowCompatible(doubleOptionalVariantType)); - - auto values = context.CreateDoubleOptionalVariantOverStruct(100); - auto array = NFormats::NTestUtils::MakeArray(values, doubleOptionalVariantType); - auto restoredValues = NFormats::NTestUtils::ExtractUnboxedValues(array, doubleOptionalVariantType, context.HolderFactory); - UNIT_ASSERT_EQUAL(values.size(), restoredValues.size()); - for (ui64 index = 0; index < values.size(); ++index) { - AssertUnboxedValuesAreEqual(values[index], restoredValues[index], doubleOptionalVariantType); - } - } + // Y_UNIT_TEST(VariantOverStruct) { + // TTestContext context; + + // auto variantType = context.GetVariantOverStructType(); + // UNIT_ASSERT(NFormats::IsArrowCompatible(variantType)); + + // auto values = context.CreateVariantOverStruct(100); + // auto array = NFormats::NTestUtils::MakeArray(values, variantType); + // auto restoredValues = NFormats::NTestUtils::ExtractUnboxedValues(array, variantType, context.HolderFactory); + // UNIT_ASSERT_EQUAL(values.size(), restoredValues.size()); + // for (ui64 index = 0; index < values.size(); ++index) { + // AssertUnboxedValuesAreEqual(values[index], restoredValues[index], variantType); + // } + // } + + // Y_UNIT_TEST(OptionalVariantOverStruct) { + // TTestContext context; + + // auto optionalVariantType = context.GetOptionalVariantOverStructType(); + // UNIT_ASSERT(NFormats::IsArrowCompatible(optionalVariantType)); + + // auto values = context.CreateOptionalVariantOverStruct(100); + // auto array = NFormats::NTestUtils::MakeArray(values, optionalVariantType); + // auto restoredValues = NFormats::NTestUtils::ExtractUnboxedValues(array, optionalVariantType, context.HolderFactory); + // UNIT_ASSERT_EQUAL(values.size(), restoredValues.size()); + // for (ui64 index = 0; index < values.size(); ++index) { + // AssertUnboxedValuesAreEqual(values[index], restoredValues[index], optionalVariantType); + // } + // } + + // Y_UNIT_TEST(DoubleOptionalVariantOverStruct) { + // TTestContext context; + + // auto doubleOptionalVariantType = context.GetDoubleOptionalVariantOverStructType(); + // UNIT_ASSERT(NFormats::IsArrowCompatible(doubleOptionalVariantType)); + + // auto values = context.CreateDoubleOptionalVariantOverStruct(100); + // auto array = NFormats::NTestUtils::MakeArray(values, doubleOptionalVariantType); + // auto restoredValues = NFormats::NTestUtils::ExtractUnboxedValues(array, doubleOptionalVariantType, context.HolderFactory); + // UNIT_ASSERT_EQUAL(values.size(), restoredValues.size()); + // for (ui64 index = 0; index < values.size(); ++index) { + // AssertUnboxedValuesAreEqual(values[index], restoredValues[index], doubleOptionalVariantType); + // } + // } Y_UNIT_TEST(VariantOverTupleWithOptionals) { TTestContext context; auto variantType = context.GetVariantOverTupleWithOptionalsType(); - UNIT_ASSERT(NFormats::NTestUtils::IsArrowCompatible(variantType)); + UNIT_ASSERT(NFormats::IsArrowCompatible(variantType)); auto values = context.CreateVariantOverTupleWithOptionals(100); auto array = NFormats::NTestUtils::MakeArray(values, variantType); @@ -1571,7 +1918,7 @@ Y_UNIT_TEST_SUITE(ConvertUnboxedValueToArrowAndBack){ TTestContext context; auto optionalVariantType = context.GetOptionalVariantOverTupleWithOptionalsType(); - UNIT_ASSERT(!NFormats::NTestUtils::IsArrowCompatible(optionalVariantType)); + UNIT_ASSERT(NFormats::IsArrowCompatible(optionalVariantType)); auto values = context.CreateOptionalVariantOverTupleWithOptionals(100); auto array = NFormats::NTestUtils::MakeArray(values, optionalVariantType); @@ -1586,7 +1933,7 @@ Y_UNIT_TEST_SUITE(ConvertUnboxedValueToArrowAndBack){ TTestContext context; auto doubleOptionalVariantType = context.GetDoubleOptionalVariantOverTupleWithOptionalsType(); - UNIT_ASSERT(!NFormats::NTestUtils::IsArrowCompatible(doubleOptionalVariantType)); + UNIT_ASSERT(NFormats::IsArrowCompatible(doubleOptionalVariantType)); auto values = context.CreateDoubleOptionalVariantOverTupleWithOptionals(100); auto array = NFormats::NTestUtils::MakeArray(values, doubleOptionalVariantType); @@ -1601,7 +1948,7 @@ Y_UNIT_TEST_SUITE(ConvertUnboxedValueToArrowAndBack){ TTestContext context; auto dictType = context.GetDictOptionalToTupleType(); - UNIT_ASSERT(!NFormats::NTestUtils::IsArrowCompatible(dictType)); + UNIT_ASSERT(NFormats::IsArrowCompatible(dictType)); auto values = context.CreateDictOptionalToTuple(100); auto array = NFormats::NTestUtils::MakeArray(values, dictType); @@ -1616,7 +1963,7 @@ Y_UNIT_TEST_SUITE(ConvertUnboxedValueToArrowAndBack){ TTestContext context; auto doubleOptionalType = context.GetOptionalOfOptionalType(); - UNIT_ASSERT(!NFormats::NTestUtils::IsArrowCompatible(doubleOptionalType)); + UNIT_ASSERT(NFormats::IsArrowCompatible(doubleOptionalType)); auto values = context.CreateOptionalOfOptional(100); auto array = NFormats::NTestUtils::MakeArray(values, doubleOptionalType); @@ -1631,8 +1978,7 @@ Y_UNIT_TEST_SUITE(ConvertUnboxedValueToArrowAndBack){ TTestContext context; auto variantType = context.GetLargeVariantType(500); - bool isCompatible = NFormats::NTestUtils::IsArrowCompatible(variantType); - UNIT_ASSERT(!isCompatible); + UNIT_ASSERT(NFormats::IsArrowCompatible(variantType)); auto values = context.CreateLargeVariant(1000); auto array = NFormats::NTestUtils::MakeArray(values, variantType); diff --git a/ydb/core/kqp/ut/arrow/kqp_result_set_formats_ut.cpp b/ydb/core/kqp/ut/arrow/kqp_result_set_formats_ut.cpp index 92414804a2b..f2d15a3a21f 100644 --- a/ydb/core/kqp/ut/arrow/kqp_result_set_formats_ut.cpp +++ b/ydb/core/kqp/ut/arrow/kqp_result_set_formats_ut.cpp @@ -184,12 +184,6 @@ std::vector<std::shared_ptr<arrow::RecordBatch>> ExecuteAndCombineBatches(TQuery return resultBatches; } -std::string SerializeToBinaryJsonString(const TStringBuf json) { - const auto binaryJson = std::get<NBinaryJson::TBinaryJson>(NBinaryJson::SerializeToBinaryJson(json)); - const TStringBuf buffer(binaryJson.Data(), binaryJson.Size()); - return TString(buffer); -} - void CompareCompressedAndDefaultBatches(TQueryClient& client, std::optional<TArrowFormatSettings::TCompressionCodec> codec, bool assertEqual = false) { std::shared_ptr<arrow::Schema> schemaCompressedBatch; TString compressedBatch; @@ -920,27 +914,69 @@ Y_UNIT_TEST_SUITE(KqpResultSetFormats) { UNIT_ASSERT_C(!batches.empty(), "Batches must not be empty"); - NColumnShard::TTableUpdatesBuilder builder(NArrow::MakeArrowSchema({ - std::make_pair("StringValue", TTypeInfo(NTypeIds::String)), - std::make_pair("YsonValue", TTypeInfo(NTypeIds::Yson)), - std::make_pair("DyNumberValue", TTypeInfo(NTypeIds::DyNumber)), - std::make_pair("JsonDocumentValue", TTypeInfo(NTypeIds::JsonDocument)), - std::make_pair("UuidValue", TTypeInfo(NTypeIds::Uuid)), - std::make_pair("StringNotNullValue", TTypeInfo(NTypeIds::String)), - std::make_pair("YsonNotNullValue", TTypeInfo(NTypeIds::Yson)), - std::make_pair("JsonDocumentNotNullValue", TTypeInfo(NTypeIds::JsonDocument)), - std::make_pair("DyNumberNotNullValue", TTypeInfo(NTypeIds::DyNumber)), - std::make_pair("UuidNotNullValue", TTypeInfo(NTypeIds::Uuid)) - })); - - builder.AddRow().AddNull().Add("[4]").AddNull().AddNull().AddNull().Add("Maria").Add("[5]").Add(SerializeToBinaryJsonString("[6]")).Add(NDyNumber::ParseDyNumberString("7.0")->c_str()).Add<NYdb::TUuidValue>(NYdb::TUuidValue("5b99a330-04ef-4f1a-9b64-ba6d5f44eafe")); - builder.AddRow().Add("John").Add("[1]").Add(NDyNumber::ParseDyNumberString("1.0")->c_str()).Add(SerializeToBinaryJsonString("{\"a\": 1}")).Add(NYdb::TUuidValue("5b99a330-04ef-4f1a-9b64-ba6d5f44eafe")).Add("Mark").Add("[2]").Add(SerializeToBinaryJsonString("{\"b\": 2}")).Add(NDyNumber::ParseDyNumberString("4.0")->c_str()).Add(NYdb::TUuidValue("5b99a330-04ef-4f1a-9b64-ba6d5f44eafe")); - builder.AddRow().Add("Leo").Add("[10]").Add(NDyNumber::ParseDyNumberString("11.0")->c_str()).Add(SerializeToBinaryJsonString("[12]")).Add(NYdb::TUuidValue("5b99a330-04ef-4f1a-9b64-ba6d5f44eafe")).Add("Maria").Add("[13]").Add(SerializeToBinaryJsonString("[14]")).Add(NDyNumber::ParseDyNumberString("15.0")->c_str()).Add(NYdb::TUuidValue("5b99a330-04ef-4f1a-9b64-ba6d5f44eafe")); - builder.AddRow().Add("Mark").AddNull().AddNull().AddNull().AddNull().Add("Michael").Add("[7]").Add(SerializeToBinaryJsonString("[8]")).Add(NDyNumber::ParseDyNumberString("9.0")->c_str()).Add(NYdb::TUuidValue("5b99a330-04ef-4f1a-9b64-ba6d5f44eafe")); - - - auto expected = builder.BuildArrow(); - UNIT_ASSERT_VALUES_EQUAL(batches.front()->ToString(), expected->ToString()); + const TString expected = +R"(StringValue: [ + null, + 4A6F686E, + 4C656F, + 4D61726B + ] +YsonValue: [ + 5B345D, + 5B315D, + 5B31305D, + null + ] +DyNumberValue: [ + null, + ".1e1", + ".11e2", + null + ] +JsonDocumentValue: [ + null, + "{"a":1}", + "[12]", + null + ] +UuidValue: [ + null, + 30A3995BEF041A4F9B64BA6D5F44EAFE, + 30A3995BEF041A4F9B64BA6D5F44EAFE, + null + ] +StringNotNullValue: [ + 4D61726961, + 4D61726B, + 4D61726961, + 4D69636861656C + ] +YsonNotNullValue: [ + 5B355D, + 5B325D, + 5B31335D, + 5B375D + ] +JsonDocumentNotNullValue: [ + "[6]", + "{"b":2}", + "[14]", + "[8]" + ] +DyNumberNotNullValue: [ + ".7e1", + ".4e1", + ".15e2", + ".9e1" + ] +UuidNotNullValue: [ + 30A3995BEF041A4F9B64BA6D5F44EAFE, + 30A3995BEF041A4F9B64BA6D5F44EAFE, + 30A3995BEF041A4F9B64BA6D5F44EAFE, + 30A3995BEF041A4F9B64BA6D5F44EAFE + ] +)"; + UNIT_ASSERT_VALUES_EQUAL(batches.front()->ToString(), expected); } } |
