diff options
author | aneporada <aneporada@ydb.tech> | 2023-05-02 21:16:46 +0300 |
---|---|---|
committer | aneporada <aneporada@ydb.tech> | 2023-05-02 21:16:46 +0300 |
commit | f58a59b3ab6a6bdbd3feae3007b14426b2bdd7ad (patch) | |
tree | 8fcaa37851c1fb7f8ba4db368a7ba38f686e4797 | |
parent | 71b550e662169207c2fcc129078ec815c819052b (diff) | |
download | ydb-f58a59b3ab6a6bdbd3feae3007b14426b2bdd7ad.tar.gz |
Support BlockCoalesce for all arrow types
initial
-rw-r--r-- | ydb/library/yql/core/type_ann/type_ann_blocks.cpp | 24 | ||||
-rw-r--r-- | ydb/library/yql/minikql/comp_nodes/mkql_block_coalesce.cpp | 266 | ||||
-rw-r--r-- | ydb/library/yql/minikql/mkql_program_builder.cpp | 14 | ||||
-rw-r--r-- | ydb/library/yql/public/udf/arrow/block_builder.h | 34 | ||||
-rw-r--r-- | ydb/library/yql/public/udf/arrow/util.cpp | 12 | ||||
-rw-r--r-- | ydb/library/yql/public/udf/arrow/util.h | 3 |
6 files changed, 139 insertions, 214 deletions
diff --git a/ydb/library/yql/core/type_ann/type_ann_blocks.cpp b/ydb/library/yql/core/type_ann/type_ann_blocks.cpp index dc0739e50e..21a11d2b9f 100644 --- a/ydb/library/yql/core/type_ann/type_ann_blocks.cpp +++ b/ydb/library/yql/core/type_ann/type_ann_blocks.cpp @@ -100,6 +100,7 @@ IGraphTransformer::TStatus BlockExpandChunkedWrapper(const TExprNode::TPtr& inpu } IGraphTransformer::TStatus BlockCoalesceWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx) { + Y_UNUSED(output); if (!EnsureArgsCount(*input, 2U, ctx.Expr)) { return IGraphTransformer::TStatus::Error; } @@ -114,29 +115,24 @@ IGraphTransformer::TStatus BlockCoalesceWrapper(const TExprNode::TPtr& input, TE bool firstIsScalar; auto firstItemType = GetBlockItemType(*first->GetTypeAnn(), firstIsScalar); - bool firstIsOptional = firstItemType->GetKind() == ETypeAnnotationKind::Optional; - firstItemType = RemoveOptionalType(firstItemType); + if (firstItemType->GetKind() != ETypeAnnotationKind::Optional && firstItemType->GetKind() != ETypeAnnotationKind::Pg) { + ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(first->Pos()), TStringBuilder() << + "Expecting Optional or Pg type as first argument, but got: " << *firstItemType)); + return IGraphTransformer::TStatus::Error; + } bool secondIsScalar; auto secondItemType = GetBlockItemType(*second->GetTypeAnn(), secondIsScalar); - bool secondIsOptional = secondItemType->GetKind() == ETypeAnnotationKind::Optional; - secondItemType = RemoveOptionalType(secondItemType); - if (!IsSameAnnotation(*firstItemType, *secondItemType)) { + if (!IsSameAnnotation(*firstItemType, *secondItemType) && + !IsSameAnnotation(*RemoveOptionalType(firstItemType), *secondItemType)) + { ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(input->Pos()), TStringBuilder() << - "Mismatch item types: first is " << *firstItemType << ", second is " << *secondItemType)); + "Uncompatible coalesce types: first is " << *firstItemType << ", second is " << *secondItemType)); return IGraphTransformer::TStatus::Error; } - if (!firstIsOptional) { - output = input->HeadPtr(); - return IGraphTransformer::TStatus::Repeat; - } - auto outputItemType = secondItemType; - if (secondIsOptional) { - outputItemType = ctx.Expr.MakeType<TOptionalExprType>(outputItemType); - } if (firstIsScalar && secondIsScalar) { input->SetTypeAnn(ctx.Expr.MakeType<TScalarExprType>(outputItemType)); } else { diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_block_coalesce.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_block_coalesce.cpp index c18400e15a..7210f11396 100644 --- a/ydb/library/yql/minikql/comp_nodes/mkql_block_coalesce.cpp +++ b/ydb/library/yql/minikql/comp_nodes/mkql_block_coalesce.cpp @@ -5,6 +5,9 @@ #include <ydb/library/yql/minikql/computation/mkql_computation_node_holders.h> #include <ydb/library/yql/minikql/mkql_node_builder.h> #include <ydb/library/yql/minikql/mkql_node_cast.h> +#include <ydb/library/yql/public/udf/arrow/block_builder.h> +#include <ydb/library/yql/public/udf/arrow/block_reader.h> +#include <ydb/library/yql/public/udf/arrow/util.h> #include <arrow/util/bitmap_ops.h> @@ -16,13 +19,14 @@ namespace { class TCoalesceBlockWrapper : public TMutableComputationNode<TCoalesceBlockWrapper> { public: - TCoalesceBlockWrapper(TComputationMutables& mutables, IComputationNode* first, IComputationNode* second, NUdf::EDataSlot slot) + TCoalesceBlockWrapper(TComputationMutables& mutables, IComputationNode* first, IComputationNode* second, TType* firstType, TType* secondType, bool unwrapFirst) : TMutableComputationNode(mutables) , First(first) , Second(second) - , Slot(slot) + , FirstType(firstType) + , SecondType(secondType) + , UnwrapFirst(unwrapFirst) { - MKQL_ENSURE(ConvertArrowType(slot, Type), "Unsupported type of data"); } NUdf::TUnboxedValuePod DoCalculate(TComputationContext& ctx) const { @@ -33,210 +37,90 @@ public: const auto& secondDatum = TArrowBlock::From(second).GetDatum(); if (firstDatum.is_scalar() && secondDatum.is_scalar()) { - return (firstDatum.null_count() == 0) ? first.Release() : second.Release(); + if (!firstDatum.scalar()->is_valid) { + return second.Release(); + } else if (!UnwrapFirst) { + return first.Release(); + } + auto reader = NYql::NUdf::MakeBlockReader(TTypeInfoHelper(), FirstType); + auto builder = NYql::NUdf::MakeScalarBuilder(TTypeInfoHelper(), SecondType); + auto firstItem = reader->GetScalarItem(*firstDatum.scalar()); + return ctx.HolderFactory.CreateArrowBlock(builder->Build(firstItem.GetOptionalValue())); } size_t len = firstDatum.is_scalar() ? (size_t)secondDatum.length() : (size_t)firstDatum.length(); if (firstDatum.null_count() == firstDatum.length()) { - return secondDatum.is_scalar() ? CopyAsArray(ctx, secondDatum, len) : second.Release(); - } else if (firstDatum.null_count() == 0) { - return firstDatum.is_scalar() ? CopyAsArray(ctx, firstDatum, len) : first.Release(); + return secondDatum.is_scalar() ? CopyAsArray(ctx, secondDatum, SecondType, false, len) : second.Release(); + } else if (firstDatum.null_count() == 0 || secondDatum.null_count() == secondDatum.length()) { + if (firstDatum.is_scalar()) { + return CopyAsArray(ctx, firstDatum, FirstType, UnwrapFirst, len); + } else if (!UnwrapFirst) { + return first.Release(); + } + bool isNestedOptional = static_cast<TOptionalType*>(FirstType)->GetItemType()->IsOptional(); + return ctx.HolderFactory.CreateArrowBlock(NYql::NUdf::Unwrap(*firstDatum.array(), isNestedOptional)); } Y_VERIFY(firstDatum.is_array()); - if (secondDatum.null_count() == secondDatum.length()) { - return first.Release(); - } - return secondDatum.is_scalar() ? - Coalesce(ctx, firstDatum.array(), secondDatum) : - Coalesce(ctx, firstDatum.array(), secondDatum.array()); + Coalesce(ctx, *firstDatum.array(), secondDatum) : + Coalesce(ctx, *firstDatum.array(), *secondDatum.array()); } private: - NUdf::TUnboxedValuePod CopyAsArray(TComputationContext& ctx, const arrow::Datum& scalar, size_t len) const { + NUdf::TUnboxedValuePod CopyAsArray(TComputationContext& ctx, const arrow::Datum& scalar, TType* type, bool unwrap, size_t len) const { Y_VERIFY(scalar.is_scalar()); - const auto* fixedType = dynamic_cast<const arrow::FixedWidthType*>(Type.get()); - MKQL_ENSURE(fixedType, "Only fixed width types are currently supported"); - size_t dataSize = (size_t)arrow::BitUtil::BytesForBits(fixedType->bit_width() * len); - - std::shared_ptr<arrow::Buffer> data = arrow::AllocateBuffer(dataSize, &ctx.ArrowMemoryPool).ValueOrDie(); - std::shared_ptr<arrow::Buffer> bitmap; - if (scalar.scalar()->is_valid) { - switch (Slot) { - case NUdf::EDataSlot::Int8: - FillArray(scalar.scalar_as<arrow::Int8Scalar>().value, (int8_t*)data->mutable_data(), len); break; - case NUdf::EDataSlot::Uint8: - case NUdf::EDataSlot::Bool: - FillArray(scalar.scalar_as<arrow::UInt8Scalar>().value, (uint8_t*)data->mutable_data(), len); break; - case NUdf::EDataSlot::Int16: - FillArray(scalar.scalar_as<arrow::Int16Scalar>().value, (int16_t*)data->mutable_data(), len); break; - case NUdf::EDataSlot::Uint16: - case NUdf::EDataSlot::Date: - FillArray(scalar.scalar_as<arrow::UInt16Scalar>().value, (uint16_t*)data->mutable_data(), len); break; - case NUdf::EDataSlot::Int32: - FillArray(scalar.scalar_as<arrow::Int32Scalar>().value, (int32_t*)data->mutable_data(), len); break; - case NUdf::EDataSlot::Uint32: - case NUdf::EDataSlot::Datetime: - FillArray(scalar.scalar_as<arrow::UInt32Scalar>().value, (uint32_t*)data->mutable_data(), len); break; - case NUdf::EDataSlot::Int64: - case NUdf::EDataSlot::Interval: - FillArray(scalar.scalar_as<arrow::Int64Scalar>().value, (int64_t*)data->mutable_data(), len); break; - case NUdf::EDataSlot::Uint64: - case NUdf::EDataSlot::Timestamp: - FillArray(scalar.scalar_as<arrow::UInt64Scalar>().value, (uint64_t*)data->mutable_data(), len); break; - default: - MKQL_ENSURE(false, "Unsupported data slot"); - } - } else { - bitmap = arrow::AllocateEmptyBitmap(len, &ctx.ArrowMemoryPool).ValueOrDie(); - std::memset(data->mutable_data(), 0, dataSize); + auto reader = NYql::NUdf::MakeBlockReader(TTypeInfoHelper(), type); + auto builder = NYql::NUdf::MakeArrayBuilder(TTypeInfoHelper(), type, ctx.ArrowMemoryPool, len, &ctx.Builder->GetPgBuilder()); + auto item = reader->GetScalarItem(*scalar.scalar()); + if (unwrap) { + item = item.GetOptionalValue(); } - - return ctx.HolderFactory.CreateArrowBlock(arrow::ArrayData::Make(Type, len, { bitmap, data })); + builder->Add(item, len); + return ctx.HolderFactory.CreateArrowBlock(builder->Build(true)); } - template<typename T> - static void FillArray(T value, T* dst, size_t len) { - while (len) { - *dst++ = value; - len--; - } - } - - NUdf::TUnboxedValuePod Coalesce(TComputationContext& ctx, const std::shared_ptr<arrow::ArrayData>& arr, const arrow::Datum& scalar) const { + NUdf::TUnboxedValuePod Coalesce(TComputationContext& ctx, const arrow::ArrayData& arr, const arrow::Datum& scalar) const { Y_VERIFY(scalar.is_scalar()); Y_VERIFY(scalar.scalar()->is_valid); - size_t offset = (size_t)arr->offset; - size_t len = (size_t)arr->length; - const ui8* arrMask = arr->GetValues<ui8>(0, 0); - Y_VERIFY(arrMask); - NUdf::TUnboxedValuePod result; - switch (Slot) { - case NUdf::EDataSlot::Int8: - result = Coalesce(ctx, arr->GetValues<int8_t>(1), arrMask, scalar.scalar_as<arrow::Int8Scalar>().value, offset, len); break; - case NUdf::EDataSlot::Uint8: - case NUdf::EDataSlot::Bool: - result = Coalesce(ctx, arr->GetValues<uint8_t>(1), arrMask, scalar.scalar_as<arrow::UInt8Scalar>().value, offset, len); break; - case NUdf::EDataSlot::Int16: - result = Coalesce(ctx, arr->GetValues<int16_t>(1), arrMask, scalar.scalar_as<arrow::Int16Scalar>().value, offset, len); break; - case NUdf::EDataSlot::Uint16: - case NUdf::EDataSlot::Date: - result = Coalesce(ctx, arr->GetValues<uint16_t>(1), arrMask, scalar.scalar_as<arrow::UInt16Scalar>().value, offset, len); break; - case NUdf::EDataSlot::Int32: - result = Coalesce(ctx, arr->GetValues<int32_t>(1), arrMask, scalar.scalar_as<arrow::Int32Scalar>().value, offset, len); break; - case NUdf::EDataSlot::Uint32: - case NUdf::EDataSlot::Datetime: - result = Coalesce(ctx, arr->GetValues<uint32_t>(1), arrMask, scalar.scalar_as<arrow::UInt32Scalar>().value, offset, len); break; - case NUdf::EDataSlot::Int64: - case NUdf::EDataSlot::Interval: - result = Coalesce(ctx, arr->GetValues<int64_t>(1), arrMask, scalar.scalar_as<arrow::Int64Scalar>().value, offset, len); break; - case NUdf::EDataSlot::Uint64: - case NUdf::EDataSlot::Timestamp: - result = Coalesce(ctx, arr->GetValues<uint64_t>(1), arrMask, scalar.scalar_as<arrow::UInt64Scalar>().value, offset, len); break; - default: - MKQL_ENSURE(false, "Unsupported data slot"); - } - return result; - } + auto firstReader = NYql::NUdf::MakeBlockReader(TTypeInfoHelper(), FirstType); + auto secondReader = NYql::NUdf::MakeBlockReader(TTypeInfoHelper(), SecondType); - NUdf::TUnboxedValuePod Coalesce(TComputationContext& ctx, const std::shared_ptr<arrow::ArrayData>& arr1, const std::shared_ptr<arrow::ArrayData>& arr2) const { - Y_VERIFY(arr1->offset == arr2->offset); - Y_VERIFY(arr1->length == arr2->length); + auto secondItem = secondReader->GetScalarItem(*scalar.scalar()); - size_t offset = (size_t)arr1->offset; - size_t len = (size_t)arr1->length; - const ui8* arr1Mask = arr1->GetValues<ui8>(0, 0); - Y_ENSURE(arr1Mask); - Y_ENSURE(arr2->buffers.size() == 2); - std::shared_ptr<arrow::Buffer> arr2Mask = arr2->buffers[0]; - NUdf::TUnboxedValuePod result; - switch (Slot) { - case NUdf::EDataSlot::Int8: - result = Coalesce(ctx, arr1->GetValues<int8_t>(1), arr1Mask, arr2->GetValues<int8_t>(1), arr2Mask, offset, len); break; - case NUdf::EDataSlot::Uint8: - case NUdf::EDataSlot::Bool: - result = Coalesce(ctx, arr1->GetValues<uint8_t>(1), arr1Mask, arr2->GetValues<uint8_t>(1), arr2Mask, offset, len); break; - case NUdf::EDataSlot::Int16: - result = Coalesce(ctx, arr1->GetValues<int16_t>(1), arr1Mask, arr2->GetValues<int16_t>(1), arr2Mask, offset, len); break; - case NUdf::EDataSlot::Uint16: - case NUdf::EDataSlot::Date: - result = Coalesce(ctx, arr1->GetValues<uint16_t>(1), arr1Mask, arr2->GetValues<uint16_t>(1), arr2Mask, offset, len); break; - case NUdf::EDataSlot::Int32: - result = Coalesce(ctx, arr1->GetValues<int32_t>(1), arr1Mask, arr2->GetValues<int32_t>(1), arr2Mask, offset, len); break; - case NUdf::EDataSlot::Uint32: - case NUdf::EDataSlot::Datetime: - result = Coalesce(ctx, arr1->GetValues<uint32_t>(1), arr1Mask, arr2->GetValues<uint32_t>(1), arr2Mask, offset, len); break; - case NUdf::EDataSlot::Int64: - case NUdf::EDataSlot::Interval: - result = Coalesce(ctx, arr1->GetValues<int64_t>(1), arr1Mask, arr2->GetValues<int64_t>(1), arr2Mask, offset, len); break; - case NUdf::EDataSlot::Uint64: - case NUdf::EDataSlot::Timestamp: - result = Coalesce(ctx, arr1->GetValues<uint64_t>(1), arr1Mask, arr2->GetValues<uint64_t>(1), arr2Mask, offset, len); break; - default: - MKQL_ENSURE(false, "Unsupported data slot"); - } - return result; - } - - template<typename T> - NUdf::TUnboxedValuePod Coalesce(TComputationContext& ctx, const T* first, const ui8* firstMask, T second, size_t offset, size_t len) const { - const auto* fixedType = dynamic_cast<const arrow::FixedWidthType*>(Type.get()); - MKQL_ENSURE(fixedType, "Only fixed width types are currently supported"); - size_t size = (size_t)arrow::BitUtil::BytesForBits(fixedType->bit_width()); - Y_VERIFY(size > 0); - - std::shared_ptr<arrow::Buffer> result = arrow::AllocateBuffer(len * size, &ctx.ArrowMemoryPool).ValueOrDie(); + const size_t len = arr.length; + auto builder = NYql::NUdf::MakeArrayBuilder(TTypeInfoHelper(), SecondType, ctx.ArrowMemoryPool, len, &ctx.Builder->GetPgBuilder()); - T* output = (T*)result->mutable_data(); for (size_t i = 0; i < len; ++i) { - T m1 = T(((firstMask[offset >> 3] >> (offset & 0x07)) & 1) ^ 1) - T(1); - - *output++ = (*first & m1) | ((~m1) & second); - first++; - offset++; + auto result = firstReader->GetItem(arr, i); + if (!result) { + result = secondItem; + } else if (UnwrapFirst) { + result = result.GetOptionalValue(); + } + builder->Add(result); } - return ctx.HolderFactory.CreateArrowBlock(arrow::ArrayData::Make(Type, len, { std::shared_ptr<arrow::Buffer>(), result })); + return ctx.HolderFactory.CreateArrowBlock(builder->Build(true)); } - template<typename T> - NUdf::TUnboxedValuePod Coalesce(TComputationContext& ctx, const T* first, const ui8* firstMask, - const T* second, const std::shared_ptr<arrow::Buffer>& secondMask, size_t offset, size_t len) const - { - const auto* fixedType = dynamic_cast<const arrow::FixedWidthType*>(Type.get()); - MKQL_ENSURE(fixedType, "Only fixed width types are currently supported"); - size_t size = (size_t)arrow::BitUtil::BytesForBits(fixedType->bit_width()); - Y_VERIFY(size > 0); - - std::shared_ptr<arrow::Buffer> result = arrow::AllocateBuffer(len * size, &ctx.ArrowMemoryPool).ValueOrDie(); - T* output = (T*)result->mutable_data(); - std::shared_ptr<arrow::Buffer> resultMask; - if (secondMask) { - // resultMask = m1 | m2; - // result = (v1 & m1) | (v2 & m2 & ~m1) - resultMask = arrow::internal::BitmapOr(&ctx.ArrowMemoryPool, firstMask, offset, secondMask->data(), offset, len, 0).ValueOrDie(); - const ui8* sm = secondMask->data(); - for (size_t i = 0; i < len; ++i) { - T m1 = T(((firstMask[offset >> 3] >> (offset & 0x07)) & 1) ^ 1) - T(1); - T m2 = T(((sm[offset >> 3] >> (offset & 0x07)) & 1) ^ 1) - T(1); + NUdf::TUnboxedValuePod Coalesce(TComputationContext& ctx, const arrow::ArrayData& arr1, const arrow::ArrayData& arr2) const { + Y_VERIFY(arr1.length == arr2.length); + const size_t len = (size_t)arr1.length; + auto firstReader = NYql::NUdf::MakeBlockReader(TTypeInfoHelper(), FirstType); + auto secondReader = NYql::NUdf::MakeBlockReader(TTypeInfoHelper(), SecondType); + auto builder = NYql::NUdf::MakeArrayBuilder(TTypeInfoHelper(), SecondType, ctx.ArrowMemoryPool, len, &ctx.Builder->GetPgBuilder()); - *output++ = (*first & m1) | ((~m1) & *second & m2); - first++; - second++; - offset++; - } - } else { - for (size_t i = 0; i < len; ++i) { - T m1 = T(((firstMask[offset >> 3] >> (offset & 0x07)) & 1) ^ 1) - T(1); - *output++ = (*first & m1) | ((~m1) & *second); - first++; - second++; - offset++; + for (size_t i = 0; i < len; ++i) { + auto result = firstReader->GetItem(arr1, i); + if (!result) { + result = secondReader->GetItem(arr2, i); + } else if (UnwrapFirst) { + result = result.GetOptionalValue(); } + builder->Add(result); } - - return ctx.HolderFactory.CreateArrowBlock(arrow::ArrayData::Make(Type, len, { resultMask, result })); + return ctx.HolderFactory.CreateArrowBlock(builder->Build(true)); } void RegisterDependencies() const final { @@ -246,8 +130,9 @@ private: IComputationNode* const First; IComputationNode* const Second; - std::shared_ptr<arrow::DataType> Type; - const NUdf::EDataSlot Slot; + TType* const FirstType; + TType* const SecondType; + const bool UnwrapFirst; }; } // namespace @@ -261,18 +146,21 @@ IComputationNode* WrapBlockCoalesce(TCallable& callable, const TComputationNodeF auto firstType = AS_TYPE(TBlockType, first.GetStaticType()); auto secondType = AS_TYPE(TBlockType, second.GetStaticType()); - bool firstOptional; - auto firstItemType = UnpackOptionalData(firstType->GetItemType(), firstOptional); - - bool secondOptional; - auto secondItemType = UnpackOptionalData(secondType->GetItemType(), secondOptional); + auto firstItemType = firstType->GetItemType(); + auto secondItemType = secondType->GetItemType(); + MKQL_ENSURE(firstItemType->IsOptional() || firstItemType->IsPg(), "Expecting Optional or Pg type as first argument"); - MKQL_ENSURE(firstOptional, "BlockCoalesce with non-optional first argument"); - MKQL_ENSURE(firstItemType->IsSameType(*secondItemType), "Mismatch types"); + bool needUnwrapFirst = false; + if (!firstItemType->IsSameType(*secondItemType)) { + needUnwrapFirst = true; + bool firstOptional; + firstItemType = UnpackOptional(firstItemType, firstOptional); + MKQL_ENSURE(firstItemType->IsSameType(*secondItemType), "Uncompatible arguemnt types"); + } auto firstCompute = LocateNode(ctx.NodeLocator, callable, 0); auto secondCompute = LocateNode(ctx.NodeLocator, callable, 1); - return new TCoalesceBlockWrapper(ctx.Mutables, firstCompute, secondCompute, *secondItemType->GetDataSlot()); + return new TCoalesceBlockWrapper(ctx.Mutables, firstCompute, secondCompute, firstType->GetItemType(), secondType->GetItemType(), needUnwrapFirst); } } diff --git a/ydb/library/yql/minikql/mkql_program_builder.cpp b/ydb/library/yql/minikql/mkql_program_builder.cpp index 9bb8cc0cea..4d48dec16b 100644 --- a/ydb/library/yql/minikql/mkql_program_builder.cpp +++ b/ydb/library/yql/minikql/mkql_program_builder.cpp @@ -1536,14 +1536,16 @@ TRuntimeNode TProgramBuilder::BlockCoalesce(TRuntimeNode first, TRuntimeNode sec auto firstType = AS_TYPE(TBlockType, first.GetStaticType()); auto secondType = AS_TYPE(TBlockType, second.GetStaticType()); - bool firstOptional; - auto firstItemType = UnpackOptionalData(firstType->GetItemType(), firstOptional); + auto firstItemType = firstType->GetItemType(); + auto secondItemType = secondType->GetItemType(); - bool secondOptional; - auto secondItemType = UnpackOptionalData(secondType->GetItemType(), secondOptional); + MKQL_ENSURE(firstItemType->IsOptional() || firstItemType->IsPg(), "Expecting Optional or Pg type as first argument"); - MKQL_ENSURE(firstOptional, "BlockCoalesce with non-optional first argument"); - MKQL_ENSURE(firstItemType->IsSameType(*secondItemType), "Argument should have same base types"); + if (!firstItemType->IsSameType(*secondItemType)) { + bool firstOptional; + firstItemType = UnpackOptional(firstItemType, firstOptional); + MKQL_ENSURE(firstItemType->IsSameType(*secondItemType), "Uncompatible arguemnt types"); + } auto outputType = NewBlockType(secondType->GetItemType(), GetResultShape({firstType, secondType})); diff --git a/ydb/library/yql/public/udf/arrow/block_builder.h b/ydb/library/yql/public/udf/arrow/block_builder.h index 3178569aa9..49865bcf5f 100644 --- a/ydb/library/yql/public/udf/arrow/block_builder.h +++ b/ydb/library/yql/public/udf/arrow/block_builder.h @@ -27,6 +27,7 @@ public: virtual size_t MaxLength() const = 0; virtual void Add(NUdf::TUnboxedValuePod value) = 0; virtual void Add(TBlockItem value) = 0; + virtual void Add(TBlockItem value, size_t count) = 0; virtual void Add(TInputBuffer& input) = 0; virtual void AddMany(const arrow::ArrayData& array, size_t popCount, const ui8* sparseBitmap, size_t bitmapSize) = 0; virtual void AddMany(const TArrayDataItem* arrays, size_t arrayCount, ui64 beginIndex, size_t count) = 0; @@ -95,26 +96,31 @@ public: } void Add(NUdf::TUnboxedValuePod value) final { - Y_VERIFY(CurrLen < MaxLen); + Y_VERIFY_DEBUG(CurrLen < MaxLen); DoAdd(value); CurrLen++; } void Add(TBlockItem value) final { - Y_VERIFY(CurrLen < MaxLen); + Y_VERIFY_DEBUG(CurrLen < MaxLen); DoAdd(value); CurrLen++; } + void Add(TBlockItem value, size_t count) final { + Y_VERIFY_DEBUG(CurrLen + count <= MaxLen); + DoAdd(value, count); + CurrLen += count; + } void Add(TInputBuffer& input) final { - Y_VERIFY(CurrLen < MaxLen); + Y_VERIFY_DEBUG(CurrLen < MaxLen); DoAdd(input); CurrLen++; } void AddDefault() { - Y_VERIFY(CurrLen < MaxLen); + Y_VERIFY_DEBUG(CurrLen < MaxLen); DoAddDefault(); CurrLen++; } @@ -223,6 +229,11 @@ public: protected: virtual void DoAdd(NUdf::TUnboxedValuePod value) = 0; virtual void DoAdd(TBlockItem value) = 0; + virtual void DoAdd(TBlockItem value, size_t count) { + for (size_t i = 0; i < count; ++i) { + DoAdd(value); + } + } virtual void DoAdd(TInputBuffer& input) = 0; virtual void DoAddDefault() = 0; virtual void DoAddMany(const arrow::ArrayData& array, const ui8* sparseBitmap, size_t popCount) = 0; @@ -351,7 +362,20 @@ public: DataPtr[GetCurrLen()] = value.As<T>(); } - void DoAdd(TInputBuffer& input) final { + void DoAdd(TBlockItem value, size_t count) final { + if constexpr (Nullable) { + if (!value) { + std::fill(NullPtr + GetCurrLen(), NullPtr + GetCurrLen() + count, 0); + std::fill(DataPtr + GetCurrLen(), DataPtr + GetCurrLen() + count, T{}); + return; + } + std::fill(NullPtr + GetCurrLen(), NullPtr + GetCurrLen() + count, 1); + } + + std::fill(DataPtr + GetCurrLen(), DataPtr + GetCurrLen() + count, value.As<T>()); + } + + void DoAdd(TInputBuffer &input) final { if constexpr (Nullable) { if (!input.PopChar()) { return DoAdd(TBlockItem{}); diff --git a/ydb/library/yql/public/udf/arrow/util.cpp b/ydb/library/yql/public/udf/arrow/util.cpp index be385c43ed..41bdb4cb07 100644 --- a/ydb/library/yql/public/udf/arrow/util.cpp +++ b/ydb/library/yql/public/udf/arrow/util.cpp @@ -141,6 +141,18 @@ std::shared_ptr<arrow::ArrayData> Chop(std::shared_ptr<arrow::ArrayData>& data, return first; } +std::shared_ptr<arrow::ArrayData> Unwrap(const arrow::ArrayData& data, bool isNestedOptional) { + Y_ENSURE(data.GetNullCount() == 0); + if (isNestedOptional) { + Y_ENSURE(data.buffers.size() == 1); + Y_ENSURE(data.child_data.size() == 1); + return data.child_data.front(); + } + auto result = data.Copy(); + result->buffers.front().reset(); + return result; +} + void ForEachArrayData(const arrow::Datum& datum, const std::function<void(const std::shared_ptr<arrow::ArrayData>&)>& func) { Y_ENSURE(datum.is_arraylike(), "Expected array"); if (datum.is_array()) { diff --git a/ydb/library/yql/public/udf/arrow/util.h b/ydb/library/yql/public/udf/arrow/util.h index ae3e103c83..ba7fd7de2e 100644 --- a/ydb/library/yql/public/udf/arrow/util.h +++ b/ydb/library/yql/public/udf/arrow/util.h @@ -29,6 +29,9 @@ std::shared_ptr<arrow::ArrayData> DeepSlice(const std::shared_ptr<arrow::ArrayDa /// \brief Chops first len items of `data` as new ArrayData object std::shared_ptr<arrow::ArrayData> Chop(std::shared_ptr<arrow::ArrayData>& data, size_t len); +/// \brief Unwrap array (decrease optional level) +std::shared_ptr<arrow::ArrayData> Unwrap(const arrow::ArrayData& data, bool isNestedOptional); + void ForEachArrayData(const arrow::Datum& datum, const std::function<void(const std::shared_ptr<arrow::ArrayData>&)>& func); arrow::Datum MakeArray(const TVector<std::shared_ptr<arrow::ArrayData>>& chunks); |