aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoraneporada <aneporada@ydb.tech>2023-05-02 21:16:46 +0300
committeraneporada <aneporada@ydb.tech>2023-05-02 21:16:46 +0300
commitf58a59b3ab6a6bdbd3feae3007b14426b2bdd7ad (patch)
tree8fcaa37851c1fb7f8ba4db368a7ba38f686e4797
parent71b550e662169207c2fcc129078ec815c819052b (diff)
downloadydb-f58a59b3ab6a6bdbd3feae3007b14426b2bdd7ad.tar.gz
Support BlockCoalesce for all arrow types
initial
-rw-r--r--ydb/library/yql/core/type_ann/type_ann_blocks.cpp24
-rw-r--r--ydb/library/yql/minikql/comp_nodes/mkql_block_coalesce.cpp266
-rw-r--r--ydb/library/yql/minikql/mkql_program_builder.cpp14
-rw-r--r--ydb/library/yql/public/udf/arrow/block_builder.h34
-rw-r--r--ydb/library/yql/public/udf/arrow/util.cpp12
-rw-r--r--ydb/library/yql/public/udf/arrow/util.h3
6 files changed, 139 insertions, 214 deletions
diff --git a/ydb/library/yql/core/type_ann/type_ann_blocks.cpp b/ydb/library/yql/core/type_ann/type_ann_blocks.cpp
index dc0739e50e..21a11d2b9f 100644
--- a/ydb/library/yql/core/type_ann/type_ann_blocks.cpp
+++ b/ydb/library/yql/core/type_ann/type_ann_blocks.cpp
@@ -100,6 +100,7 @@ IGraphTransformer::TStatus BlockExpandChunkedWrapper(const TExprNode::TPtr& inpu
}
IGraphTransformer::TStatus BlockCoalesceWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx) {
+ Y_UNUSED(output);
if (!EnsureArgsCount(*input, 2U, ctx.Expr)) {
return IGraphTransformer::TStatus::Error;
}
@@ -114,29 +115,24 @@ IGraphTransformer::TStatus BlockCoalesceWrapper(const TExprNode::TPtr& input, TE
bool firstIsScalar;
auto firstItemType = GetBlockItemType(*first->GetTypeAnn(), firstIsScalar);
- bool firstIsOptional = firstItemType->GetKind() == ETypeAnnotationKind::Optional;
- firstItemType = RemoveOptionalType(firstItemType);
+ if (firstItemType->GetKind() != ETypeAnnotationKind::Optional && firstItemType->GetKind() != ETypeAnnotationKind::Pg) {
+ ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(first->Pos()), TStringBuilder() <<
+ "Expecting Optional or Pg type as first argument, but got: " << *firstItemType));
+ return IGraphTransformer::TStatus::Error;
+ }
bool secondIsScalar;
auto secondItemType = GetBlockItemType(*second->GetTypeAnn(), secondIsScalar);
- bool secondIsOptional = secondItemType->GetKind() == ETypeAnnotationKind::Optional;
- secondItemType = RemoveOptionalType(secondItemType);
- if (!IsSameAnnotation(*firstItemType, *secondItemType)) {
+ if (!IsSameAnnotation(*firstItemType, *secondItemType) &&
+ !IsSameAnnotation(*RemoveOptionalType(firstItemType), *secondItemType))
+ {
ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(input->Pos()), TStringBuilder() <<
- "Mismatch item types: first is " << *firstItemType << ", second is " << *secondItemType));
+ "Uncompatible coalesce types: first is " << *firstItemType << ", second is " << *secondItemType));
return IGraphTransformer::TStatus::Error;
}
- if (!firstIsOptional) {
- output = input->HeadPtr();
- return IGraphTransformer::TStatus::Repeat;
- }
-
auto outputItemType = secondItemType;
- if (secondIsOptional) {
- outputItemType = ctx.Expr.MakeType<TOptionalExprType>(outputItemType);
- }
if (firstIsScalar && secondIsScalar) {
input->SetTypeAnn(ctx.Expr.MakeType<TScalarExprType>(outputItemType));
} else {
diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_block_coalesce.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_block_coalesce.cpp
index c18400e15a..7210f11396 100644
--- a/ydb/library/yql/minikql/comp_nodes/mkql_block_coalesce.cpp
+++ b/ydb/library/yql/minikql/comp_nodes/mkql_block_coalesce.cpp
@@ -5,6 +5,9 @@
#include <ydb/library/yql/minikql/computation/mkql_computation_node_holders.h>
#include <ydb/library/yql/minikql/mkql_node_builder.h>
#include <ydb/library/yql/minikql/mkql_node_cast.h>
+#include <ydb/library/yql/public/udf/arrow/block_builder.h>
+#include <ydb/library/yql/public/udf/arrow/block_reader.h>
+#include <ydb/library/yql/public/udf/arrow/util.h>
#include <arrow/util/bitmap_ops.h>
@@ -16,13 +19,14 @@ namespace {
class TCoalesceBlockWrapper : public TMutableComputationNode<TCoalesceBlockWrapper> {
public:
- TCoalesceBlockWrapper(TComputationMutables& mutables, IComputationNode* first, IComputationNode* second, NUdf::EDataSlot slot)
+ TCoalesceBlockWrapper(TComputationMutables& mutables, IComputationNode* first, IComputationNode* second, TType* firstType, TType* secondType, bool unwrapFirst)
: TMutableComputationNode(mutables)
, First(first)
, Second(second)
- , Slot(slot)
+ , FirstType(firstType)
+ , SecondType(secondType)
+ , UnwrapFirst(unwrapFirst)
{
- MKQL_ENSURE(ConvertArrowType(slot, Type), "Unsupported type of data");
}
NUdf::TUnboxedValuePod DoCalculate(TComputationContext& ctx) const {
@@ -33,210 +37,90 @@ public:
const auto& secondDatum = TArrowBlock::From(second).GetDatum();
if (firstDatum.is_scalar() && secondDatum.is_scalar()) {
- return (firstDatum.null_count() == 0) ? first.Release() : second.Release();
+ if (!firstDatum.scalar()->is_valid) {
+ return second.Release();
+ } else if (!UnwrapFirst) {
+ return first.Release();
+ }
+ auto reader = NYql::NUdf::MakeBlockReader(TTypeInfoHelper(), FirstType);
+ auto builder = NYql::NUdf::MakeScalarBuilder(TTypeInfoHelper(), SecondType);
+ auto firstItem = reader->GetScalarItem(*firstDatum.scalar());
+ return ctx.HolderFactory.CreateArrowBlock(builder->Build(firstItem.GetOptionalValue()));
}
size_t len = firstDatum.is_scalar() ? (size_t)secondDatum.length() : (size_t)firstDatum.length();
if (firstDatum.null_count() == firstDatum.length()) {
- return secondDatum.is_scalar() ? CopyAsArray(ctx, secondDatum, len) : second.Release();
- } else if (firstDatum.null_count() == 0) {
- return firstDatum.is_scalar() ? CopyAsArray(ctx, firstDatum, len) : first.Release();
+ return secondDatum.is_scalar() ? CopyAsArray(ctx, secondDatum, SecondType, false, len) : second.Release();
+ } else if (firstDatum.null_count() == 0 || secondDatum.null_count() == secondDatum.length()) {
+ if (firstDatum.is_scalar()) {
+ return CopyAsArray(ctx, firstDatum, FirstType, UnwrapFirst, len);
+ } else if (!UnwrapFirst) {
+ return first.Release();
+ }
+ bool isNestedOptional = static_cast<TOptionalType*>(FirstType)->GetItemType()->IsOptional();
+ return ctx.HolderFactory.CreateArrowBlock(NYql::NUdf::Unwrap(*firstDatum.array(), isNestedOptional));
}
Y_VERIFY(firstDatum.is_array());
- if (secondDatum.null_count() == secondDatum.length()) {
- return first.Release();
- }
-
return secondDatum.is_scalar() ?
- Coalesce(ctx, firstDatum.array(), secondDatum) :
- Coalesce(ctx, firstDatum.array(), secondDatum.array());
+ Coalesce(ctx, *firstDatum.array(), secondDatum) :
+ Coalesce(ctx, *firstDatum.array(), *secondDatum.array());
}
private:
- NUdf::TUnboxedValuePod CopyAsArray(TComputationContext& ctx, const arrow::Datum& scalar, size_t len) const {
+ NUdf::TUnboxedValuePod CopyAsArray(TComputationContext& ctx, const arrow::Datum& scalar, TType* type, bool unwrap, size_t len) const {
Y_VERIFY(scalar.is_scalar());
- const auto* fixedType = dynamic_cast<const arrow::FixedWidthType*>(Type.get());
- MKQL_ENSURE(fixedType, "Only fixed width types are currently supported");
- size_t dataSize = (size_t)arrow::BitUtil::BytesForBits(fixedType->bit_width() * len);
-
- std::shared_ptr<arrow::Buffer> data = arrow::AllocateBuffer(dataSize, &ctx.ArrowMemoryPool).ValueOrDie();
- std::shared_ptr<arrow::Buffer> bitmap;
- if (scalar.scalar()->is_valid) {
- switch (Slot) {
- case NUdf::EDataSlot::Int8:
- FillArray(scalar.scalar_as<arrow::Int8Scalar>().value, (int8_t*)data->mutable_data(), len); break;
- case NUdf::EDataSlot::Uint8:
- case NUdf::EDataSlot::Bool:
- FillArray(scalar.scalar_as<arrow::UInt8Scalar>().value, (uint8_t*)data->mutable_data(), len); break;
- case NUdf::EDataSlot::Int16:
- FillArray(scalar.scalar_as<arrow::Int16Scalar>().value, (int16_t*)data->mutable_data(), len); break;
- case NUdf::EDataSlot::Uint16:
- case NUdf::EDataSlot::Date:
- FillArray(scalar.scalar_as<arrow::UInt16Scalar>().value, (uint16_t*)data->mutable_data(), len); break;
- case NUdf::EDataSlot::Int32:
- FillArray(scalar.scalar_as<arrow::Int32Scalar>().value, (int32_t*)data->mutable_data(), len); break;
- case NUdf::EDataSlot::Uint32:
- case NUdf::EDataSlot::Datetime:
- FillArray(scalar.scalar_as<arrow::UInt32Scalar>().value, (uint32_t*)data->mutable_data(), len); break;
- case NUdf::EDataSlot::Int64:
- case NUdf::EDataSlot::Interval:
- FillArray(scalar.scalar_as<arrow::Int64Scalar>().value, (int64_t*)data->mutable_data(), len); break;
- case NUdf::EDataSlot::Uint64:
- case NUdf::EDataSlot::Timestamp:
- FillArray(scalar.scalar_as<arrow::UInt64Scalar>().value, (uint64_t*)data->mutable_data(), len); break;
- default:
- MKQL_ENSURE(false, "Unsupported data slot");
- }
- } else {
- bitmap = arrow::AllocateEmptyBitmap(len, &ctx.ArrowMemoryPool).ValueOrDie();
- std::memset(data->mutable_data(), 0, dataSize);
+ auto reader = NYql::NUdf::MakeBlockReader(TTypeInfoHelper(), type);
+ auto builder = NYql::NUdf::MakeArrayBuilder(TTypeInfoHelper(), type, ctx.ArrowMemoryPool, len, &ctx.Builder->GetPgBuilder());
+ auto item = reader->GetScalarItem(*scalar.scalar());
+ if (unwrap) {
+ item = item.GetOptionalValue();
}
-
- return ctx.HolderFactory.CreateArrowBlock(arrow::ArrayData::Make(Type, len, { bitmap, data }));
+ builder->Add(item, len);
+ return ctx.HolderFactory.CreateArrowBlock(builder->Build(true));
}
- template<typename T>
- static void FillArray(T value, T* dst, size_t len) {
- while (len) {
- *dst++ = value;
- len--;
- }
- }
-
- NUdf::TUnboxedValuePod Coalesce(TComputationContext& ctx, const std::shared_ptr<arrow::ArrayData>& arr, const arrow::Datum& scalar) const {
+ NUdf::TUnboxedValuePod Coalesce(TComputationContext& ctx, const arrow::ArrayData& arr, const arrow::Datum& scalar) const {
Y_VERIFY(scalar.is_scalar());
Y_VERIFY(scalar.scalar()->is_valid);
- size_t offset = (size_t)arr->offset;
- size_t len = (size_t)arr->length;
- const ui8* arrMask = arr->GetValues<ui8>(0, 0);
- Y_VERIFY(arrMask);
- NUdf::TUnboxedValuePod result;
- switch (Slot) {
- case NUdf::EDataSlot::Int8:
- result = Coalesce(ctx, arr->GetValues<int8_t>(1), arrMask, scalar.scalar_as<arrow::Int8Scalar>().value, offset, len); break;
- case NUdf::EDataSlot::Uint8:
- case NUdf::EDataSlot::Bool:
- result = Coalesce(ctx, arr->GetValues<uint8_t>(1), arrMask, scalar.scalar_as<arrow::UInt8Scalar>().value, offset, len); break;
- case NUdf::EDataSlot::Int16:
- result = Coalesce(ctx, arr->GetValues<int16_t>(1), arrMask, scalar.scalar_as<arrow::Int16Scalar>().value, offset, len); break;
- case NUdf::EDataSlot::Uint16:
- case NUdf::EDataSlot::Date:
- result = Coalesce(ctx, arr->GetValues<uint16_t>(1), arrMask, scalar.scalar_as<arrow::UInt16Scalar>().value, offset, len); break;
- case NUdf::EDataSlot::Int32:
- result = Coalesce(ctx, arr->GetValues<int32_t>(1), arrMask, scalar.scalar_as<arrow::Int32Scalar>().value, offset, len); break;
- case NUdf::EDataSlot::Uint32:
- case NUdf::EDataSlot::Datetime:
- result = Coalesce(ctx, arr->GetValues<uint32_t>(1), arrMask, scalar.scalar_as<arrow::UInt32Scalar>().value, offset, len); break;
- case NUdf::EDataSlot::Int64:
- case NUdf::EDataSlot::Interval:
- result = Coalesce(ctx, arr->GetValues<int64_t>(1), arrMask, scalar.scalar_as<arrow::Int64Scalar>().value, offset, len); break;
- case NUdf::EDataSlot::Uint64:
- case NUdf::EDataSlot::Timestamp:
- result = Coalesce(ctx, arr->GetValues<uint64_t>(1), arrMask, scalar.scalar_as<arrow::UInt64Scalar>().value, offset, len); break;
- default:
- MKQL_ENSURE(false, "Unsupported data slot");
- }
- return result;
- }
+ auto firstReader = NYql::NUdf::MakeBlockReader(TTypeInfoHelper(), FirstType);
+ auto secondReader = NYql::NUdf::MakeBlockReader(TTypeInfoHelper(), SecondType);
- NUdf::TUnboxedValuePod Coalesce(TComputationContext& ctx, const std::shared_ptr<arrow::ArrayData>& arr1, const std::shared_ptr<arrow::ArrayData>& arr2) const {
- Y_VERIFY(arr1->offset == arr2->offset);
- Y_VERIFY(arr1->length == arr2->length);
+ auto secondItem = secondReader->GetScalarItem(*scalar.scalar());
- size_t offset = (size_t)arr1->offset;
- size_t len = (size_t)arr1->length;
- const ui8* arr1Mask = arr1->GetValues<ui8>(0, 0);
- Y_ENSURE(arr1Mask);
- Y_ENSURE(arr2->buffers.size() == 2);
- std::shared_ptr<arrow::Buffer> arr2Mask = arr2->buffers[0];
- NUdf::TUnboxedValuePod result;
- switch (Slot) {
- case NUdf::EDataSlot::Int8:
- result = Coalesce(ctx, arr1->GetValues<int8_t>(1), arr1Mask, arr2->GetValues<int8_t>(1), arr2Mask, offset, len); break;
- case NUdf::EDataSlot::Uint8:
- case NUdf::EDataSlot::Bool:
- result = Coalesce(ctx, arr1->GetValues<uint8_t>(1), arr1Mask, arr2->GetValues<uint8_t>(1), arr2Mask, offset, len); break;
- case NUdf::EDataSlot::Int16:
- result = Coalesce(ctx, arr1->GetValues<int16_t>(1), arr1Mask, arr2->GetValues<int16_t>(1), arr2Mask, offset, len); break;
- case NUdf::EDataSlot::Uint16:
- case NUdf::EDataSlot::Date:
- result = Coalesce(ctx, arr1->GetValues<uint16_t>(1), arr1Mask, arr2->GetValues<uint16_t>(1), arr2Mask, offset, len); break;
- case NUdf::EDataSlot::Int32:
- result = Coalesce(ctx, arr1->GetValues<int32_t>(1), arr1Mask, arr2->GetValues<int32_t>(1), arr2Mask, offset, len); break;
- case NUdf::EDataSlot::Uint32:
- case NUdf::EDataSlot::Datetime:
- result = Coalesce(ctx, arr1->GetValues<uint32_t>(1), arr1Mask, arr2->GetValues<uint32_t>(1), arr2Mask, offset, len); break;
- case NUdf::EDataSlot::Int64:
- case NUdf::EDataSlot::Interval:
- result = Coalesce(ctx, arr1->GetValues<int64_t>(1), arr1Mask, arr2->GetValues<int64_t>(1), arr2Mask, offset, len); break;
- case NUdf::EDataSlot::Uint64:
- case NUdf::EDataSlot::Timestamp:
- result = Coalesce(ctx, arr1->GetValues<uint64_t>(1), arr1Mask, arr2->GetValues<uint64_t>(1), arr2Mask, offset, len); break;
- default:
- MKQL_ENSURE(false, "Unsupported data slot");
- }
- return result;
- }
-
- template<typename T>
- NUdf::TUnboxedValuePod Coalesce(TComputationContext& ctx, const T* first, const ui8* firstMask, T second, size_t offset, size_t len) const {
- const auto* fixedType = dynamic_cast<const arrow::FixedWidthType*>(Type.get());
- MKQL_ENSURE(fixedType, "Only fixed width types are currently supported");
- size_t size = (size_t)arrow::BitUtil::BytesForBits(fixedType->bit_width());
- Y_VERIFY(size > 0);
-
- std::shared_ptr<arrow::Buffer> result = arrow::AllocateBuffer(len * size, &ctx.ArrowMemoryPool).ValueOrDie();
+ const size_t len = arr.length;
+ auto builder = NYql::NUdf::MakeArrayBuilder(TTypeInfoHelper(), SecondType, ctx.ArrowMemoryPool, len, &ctx.Builder->GetPgBuilder());
- T* output = (T*)result->mutable_data();
for (size_t i = 0; i < len; ++i) {
- T m1 = T(((firstMask[offset >> 3] >> (offset & 0x07)) & 1) ^ 1) - T(1);
-
- *output++ = (*first & m1) | ((~m1) & second);
- first++;
- offset++;
+ auto result = firstReader->GetItem(arr, i);
+ if (!result) {
+ result = secondItem;
+ } else if (UnwrapFirst) {
+ result = result.GetOptionalValue();
+ }
+ builder->Add(result);
}
- return ctx.HolderFactory.CreateArrowBlock(arrow::ArrayData::Make(Type, len, { std::shared_ptr<arrow::Buffer>(), result }));
+ return ctx.HolderFactory.CreateArrowBlock(builder->Build(true));
}
- template<typename T>
- NUdf::TUnboxedValuePod Coalesce(TComputationContext& ctx, const T* first, const ui8* firstMask,
- const T* second, const std::shared_ptr<arrow::Buffer>& secondMask, size_t offset, size_t len) const
- {
- const auto* fixedType = dynamic_cast<const arrow::FixedWidthType*>(Type.get());
- MKQL_ENSURE(fixedType, "Only fixed width types are currently supported");
- size_t size = (size_t)arrow::BitUtil::BytesForBits(fixedType->bit_width());
- Y_VERIFY(size > 0);
-
- std::shared_ptr<arrow::Buffer> result = arrow::AllocateBuffer(len * size, &ctx.ArrowMemoryPool).ValueOrDie();
- T* output = (T*)result->mutable_data();
- std::shared_ptr<arrow::Buffer> resultMask;
- if (secondMask) {
- // resultMask = m1 | m2;
- // result = (v1 & m1) | (v2 & m2 & ~m1)
- resultMask = arrow::internal::BitmapOr(&ctx.ArrowMemoryPool, firstMask, offset, secondMask->data(), offset, len, 0).ValueOrDie();
- const ui8* sm = secondMask->data();
- for (size_t i = 0; i < len; ++i) {
- T m1 = T(((firstMask[offset >> 3] >> (offset & 0x07)) & 1) ^ 1) - T(1);
- T m2 = T(((sm[offset >> 3] >> (offset & 0x07)) & 1) ^ 1) - T(1);
+ NUdf::TUnboxedValuePod Coalesce(TComputationContext& ctx, const arrow::ArrayData& arr1, const arrow::ArrayData& arr2) const {
+ Y_VERIFY(arr1.length == arr2.length);
+ const size_t len = (size_t)arr1.length;
+ auto firstReader = NYql::NUdf::MakeBlockReader(TTypeInfoHelper(), FirstType);
+ auto secondReader = NYql::NUdf::MakeBlockReader(TTypeInfoHelper(), SecondType);
+ auto builder = NYql::NUdf::MakeArrayBuilder(TTypeInfoHelper(), SecondType, ctx.ArrowMemoryPool, len, &ctx.Builder->GetPgBuilder());
- *output++ = (*first & m1) | ((~m1) & *second & m2);
- first++;
- second++;
- offset++;
- }
- } else {
- for (size_t i = 0; i < len; ++i) {
- T m1 = T(((firstMask[offset >> 3] >> (offset & 0x07)) & 1) ^ 1) - T(1);
- *output++ = (*first & m1) | ((~m1) & *second);
- first++;
- second++;
- offset++;
+ for (size_t i = 0; i < len; ++i) {
+ auto result = firstReader->GetItem(arr1, i);
+ if (!result) {
+ result = secondReader->GetItem(arr2, i);
+ } else if (UnwrapFirst) {
+ result = result.GetOptionalValue();
}
+ builder->Add(result);
}
-
- return ctx.HolderFactory.CreateArrowBlock(arrow::ArrayData::Make(Type, len, { resultMask, result }));
+ return ctx.HolderFactory.CreateArrowBlock(builder->Build(true));
}
void RegisterDependencies() const final {
@@ -246,8 +130,9 @@ private:
IComputationNode* const First;
IComputationNode* const Second;
- std::shared_ptr<arrow::DataType> Type;
- const NUdf::EDataSlot Slot;
+ TType* const FirstType;
+ TType* const SecondType;
+ const bool UnwrapFirst;
};
} // namespace
@@ -261,18 +146,21 @@ IComputationNode* WrapBlockCoalesce(TCallable& callable, const TComputationNodeF
auto firstType = AS_TYPE(TBlockType, first.GetStaticType());
auto secondType = AS_TYPE(TBlockType, second.GetStaticType());
- bool firstOptional;
- auto firstItemType = UnpackOptionalData(firstType->GetItemType(), firstOptional);
-
- bool secondOptional;
- auto secondItemType = UnpackOptionalData(secondType->GetItemType(), secondOptional);
+ auto firstItemType = firstType->GetItemType();
+ auto secondItemType = secondType->GetItemType();
+ MKQL_ENSURE(firstItemType->IsOptional() || firstItemType->IsPg(), "Expecting Optional or Pg type as first argument");
- MKQL_ENSURE(firstOptional, "BlockCoalesce with non-optional first argument");
- MKQL_ENSURE(firstItemType->IsSameType(*secondItemType), "Mismatch types");
+ bool needUnwrapFirst = false;
+ if (!firstItemType->IsSameType(*secondItemType)) {
+ needUnwrapFirst = true;
+ bool firstOptional;
+ firstItemType = UnpackOptional(firstItemType, firstOptional);
+ MKQL_ENSURE(firstItemType->IsSameType(*secondItemType), "Uncompatible arguemnt types");
+ }
auto firstCompute = LocateNode(ctx.NodeLocator, callable, 0);
auto secondCompute = LocateNode(ctx.NodeLocator, callable, 1);
- return new TCoalesceBlockWrapper(ctx.Mutables, firstCompute, secondCompute, *secondItemType->GetDataSlot());
+ return new TCoalesceBlockWrapper(ctx.Mutables, firstCompute, secondCompute, firstType->GetItemType(), secondType->GetItemType(), needUnwrapFirst);
}
}
diff --git a/ydb/library/yql/minikql/mkql_program_builder.cpp b/ydb/library/yql/minikql/mkql_program_builder.cpp
index 9bb8cc0cea..4d48dec16b 100644
--- a/ydb/library/yql/minikql/mkql_program_builder.cpp
+++ b/ydb/library/yql/minikql/mkql_program_builder.cpp
@@ -1536,14 +1536,16 @@ TRuntimeNode TProgramBuilder::BlockCoalesce(TRuntimeNode first, TRuntimeNode sec
auto firstType = AS_TYPE(TBlockType, first.GetStaticType());
auto secondType = AS_TYPE(TBlockType, second.GetStaticType());
- bool firstOptional;
- auto firstItemType = UnpackOptionalData(firstType->GetItemType(), firstOptional);
+ auto firstItemType = firstType->GetItemType();
+ auto secondItemType = secondType->GetItemType();
- bool secondOptional;
- auto secondItemType = UnpackOptionalData(secondType->GetItemType(), secondOptional);
+ MKQL_ENSURE(firstItemType->IsOptional() || firstItemType->IsPg(), "Expecting Optional or Pg type as first argument");
- MKQL_ENSURE(firstOptional, "BlockCoalesce with non-optional first argument");
- MKQL_ENSURE(firstItemType->IsSameType(*secondItemType), "Argument should have same base types");
+ if (!firstItemType->IsSameType(*secondItemType)) {
+ bool firstOptional;
+ firstItemType = UnpackOptional(firstItemType, firstOptional);
+ MKQL_ENSURE(firstItemType->IsSameType(*secondItemType), "Uncompatible arguemnt types");
+ }
auto outputType = NewBlockType(secondType->GetItemType(), GetResultShape({firstType, secondType}));
diff --git a/ydb/library/yql/public/udf/arrow/block_builder.h b/ydb/library/yql/public/udf/arrow/block_builder.h
index 3178569aa9..49865bcf5f 100644
--- a/ydb/library/yql/public/udf/arrow/block_builder.h
+++ b/ydb/library/yql/public/udf/arrow/block_builder.h
@@ -27,6 +27,7 @@ public:
virtual size_t MaxLength() const = 0;
virtual void Add(NUdf::TUnboxedValuePod value) = 0;
virtual void Add(TBlockItem value) = 0;
+ virtual void Add(TBlockItem value, size_t count) = 0;
virtual void Add(TInputBuffer& input) = 0;
virtual void AddMany(const arrow::ArrayData& array, size_t popCount, const ui8* sparseBitmap, size_t bitmapSize) = 0;
virtual void AddMany(const TArrayDataItem* arrays, size_t arrayCount, ui64 beginIndex, size_t count) = 0;
@@ -95,26 +96,31 @@ public:
}
void Add(NUdf::TUnboxedValuePod value) final {
- Y_VERIFY(CurrLen < MaxLen);
+ Y_VERIFY_DEBUG(CurrLen < MaxLen);
DoAdd(value);
CurrLen++;
}
void Add(TBlockItem value) final {
- Y_VERIFY(CurrLen < MaxLen);
+ Y_VERIFY_DEBUG(CurrLen < MaxLen);
DoAdd(value);
CurrLen++;
}
+ void Add(TBlockItem value, size_t count) final {
+ Y_VERIFY_DEBUG(CurrLen + count <= MaxLen);
+ DoAdd(value, count);
+ CurrLen += count;
+ }
void Add(TInputBuffer& input) final {
- Y_VERIFY(CurrLen < MaxLen);
+ Y_VERIFY_DEBUG(CurrLen < MaxLen);
DoAdd(input);
CurrLen++;
}
void AddDefault() {
- Y_VERIFY(CurrLen < MaxLen);
+ Y_VERIFY_DEBUG(CurrLen < MaxLen);
DoAddDefault();
CurrLen++;
}
@@ -223,6 +229,11 @@ public:
protected:
virtual void DoAdd(NUdf::TUnboxedValuePod value) = 0;
virtual void DoAdd(TBlockItem value) = 0;
+ virtual void DoAdd(TBlockItem value, size_t count) {
+ for (size_t i = 0; i < count; ++i) {
+ DoAdd(value);
+ }
+ }
virtual void DoAdd(TInputBuffer& input) = 0;
virtual void DoAddDefault() = 0;
virtual void DoAddMany(const arrow::ArrayData& array, const ui8* sparseBitmap, size_t popCount) = 0;
@@ -351,7 +362,20 @@ public:
DataPtr[GetCurrLen()] = value.As<T>();
}
- void DoAdd(TInputBuffer& input) final {
+ void DoAdd(TBlockItem value, size_t count) final {
+ if constexpr (Nullable) {
+ if (!value) {
+ std::fill(NullPtr + GetCurrLen(), NullPtr + GetCurrLen() + count, 0);
+ std::fill(DataPtr + GetCurrLen(), DataPtr + GetCurrLen() + count, T{});
+ return;
+ }
+ std::fill(NullPtr + GetCurrLen(), NullPtr + GetCurrLen() + count, 1);
+ }
+
+ std::fill(DataPtr + GetCurrLen(), DataPtr + GetCurrLen() + count, value.As<T>());
+ }
+
+ void DoAdd(TInputBuffer &input) final {
if constexpr (Nullable) {
if (!input.PopChar()) {
return DoAdd(TBlockItem{});
diff --git a/ydb/library/yql/public/udf/arrow/util.cpp b/ydb/library/yql/public/udf/arrow/util.cpp
index be385c43ed..41bdb4cb07 100644
--- a/ydb/library/yql/public/udf/arrow/util.cpp
+++ b/ydb/library/yql/public/udf/arrow/util.cpp
@@ -141,6 +141,18 @@ std::shared_ptr<arrow::ArrayData> Chop(std::shared_ptr<arrow::ArrayData>& data,
return first;
}
+std::shared_ptr<arrow::ArrayData> Unwrap(const arrow::ArrayData& data, bool isNestedOptional) {
+ Y_ENSURE(data.GetNullCount() == 0);
+ if (isNestedOptional) {
+ Y_ENSURE(data.buffers.size() == 1);
+ Y_ENSURE(data.child_data.size() == 1);
+ return data.child_data.front();
+ }
+ auto result = data.Copy();
+ result->buffers.front().reset();
+ return result;
+}
+
void ForEachArrayData(const arrow::Datum& datum, const std::function<void(const std::shared_ptr<arrow::ArrayData>&)>& func) {
Y_ENSURE(datum.is_arraylike(), "Expected array");
if (datum.is_array()) {
diff --git a/ydb/library/yql/public/udf/arrow/util.h b/ydb/library/yql/public/udf/arrow/util.h
index ae3e103c83..ba7fd7de2e 100644
--- a/ydb/library/yql/public/udf/arrow/util.h
+++ b/ydb/library/yql/public/udf/arrow/util.h
@@ -29,6 +29,9 @@ std::shared_ptr<arrow::ArrayData> DeepSlice(const std::shared_ptr<arrow::ArrayDa
/// \brief Chops first len items of `data` as new ArrayData object
std::shared_ptr<arrow::ArrayData> Chop(std::shared_ptr<arrow::ArrayData>& data, size_t len);
+/// \brief Unwrap array (decrease optional level)
+std::shared_ptr<arrow::ArrayData> Unwrap(const arrow::ArrayData& data, bool isNestedOptional);
+
void ForEachArrayData(const arrow::Datum& datum, const std::function<void(const std::shared_ptr<arrow::ArrayData>&)>& func);
arrow::Datum MakeArray(const TVector<std::shared_ptr<arrow::ArrayData>>& chunks);