diff options
author | vvvv <vvvv@ydb.tech> | 2023-08-09 01:23:01 +0300 |
---|---|---|
committer | vvvv <vvvv@ydb.tech> | 2023-08-09 02:13:40 +0300 |
commit | c4c5c6d2bbec0deeffe1b3d8e387a6039d05c1d7 (patch) | |
tree | ef163d96768850eca49947fab19c3e7071a5b552 | |
parent | cf6597e319da1c2b470bbc9a2f7c9103c8bb25ab (diff) | |
download | ydb-c4c5c6d2bbec0deeffe1b3d8e387a6039d05c1d7.tar.gz |
Kernels for logical ops
4 files changed, 222 insertions, 319 deletions
diff --git a/ydb/library/yql/core/arrow_kernels/registry/ut/registry_ut.cpp b/ydb/library/yql/core/arrow_kernels/registry/ut/registry_ut.cpp index 8cab9844ddb..adf3a536d34 100644 --- a/ydb/library/yql/core/arrow_kernels/registry/ut/registry_ut.cpp +++ b/ydb/library/yql/core/arrow_kernels/registry/ut/registry_ut.cpp @@ -54,6 +54,27 @@ Y_UNIT_TEST_SUITE(TKernelRegistryTest) { }); } + Y_UNIT_TEST(TestAnd) { + TestOne([](auto& b,auto& ctx) { + auto blockBoolType = ctx.template MakeType<TBlockExprType>(ctx.template MakeType<TDataExprType>(EDataSlot::Bool)); + return b.AddBinaryOp(TKernelRequestBuilder::EBinaryOp::And, blockBoolType, blockBoolType, blockBoolType); + }); + } + + Y_UNIT_TEST(TestOr) { + TestOne([](auto& b,auto& ctx) { + auto blockBoolType = ctx.template MakeType<TBlockExprType>(ctx.template MakeType<TDataExprType>(EDataSlot::Bool)); + return b.AddBinaryOp(TKernelRequestBuilder::EBinaryOp::Or, blockBoolType, blockBoolType, blockBoolType); + }); + } + + Y_UNIT_TEST(TestXor) { + TestOne([](auto& b,auto& ctx) { + auto blockBoolType = ctx.template MakeType<TBlockExprType>(ctx.template MakeType<TDataExprType>(EDataSlot::Bool)); + return b.AddBinaryOp(TKernelRequestBuilder::EBinaryOp::Xor, blockBoolType, blockBoolType, blockBoolType); + }); + } + Y_UNIT_TEST(TestAdd) { TestOne([](auto& b,auto& ctx) { auto blockInt32Type = ctx.template MakeType<TBlockExprType>(ctx.template MakeType<TDataExprType>(EDataSlot::Int32)); diff --git a/ydb/library/yql/core/arrow_kernels/request/request.cpp b/ydb/library/yql/core/arrow_kernels/request/request.cpp index 0eb3a8df415..71700d47e09 100644 --- a/ydb/library/yql/core/arrow_kernels/request/request.cpp +++ b/ydb/library/yql/core/arrow_kernels/request/request.cpp @@ -39,15 +39,24 @@ ui32 TKernelRequestBuilder::AddBinaryOp(EBinaryOp op, const TTypeAnnotationNode* auto arg1 = MakeArg(arg1Type); auto arg2 = MakeArg(arg2Type); switch (op) { + case EBinaryOp::And: + Items_.emplace_back(Pb_.BlockAnd(arg1, arg2)); + break; + case EBinaryOp::Or: + Items_.emplace_back(Pb_.BlockOr(arg1, arg2)); + break; + case EBinaryOp::Xor: + Items_.emplace_back(Pb_.BlockXor(arg1, arg2)); + break; case EBinaryOp::Add: Items_.emplace_back(Pb_.BlockFunc("Add", returnType, { arg1, arg2 })); break; case EBinaryOp::Sub: Items_.emplace_back(Pb_.BlockFunc("Sub", returnType, { arg1, arg2 })); - break; + break; case EBinaryOp::Mul: Items_.emplace_back(Pb_.BlockFunc("Mul", returnType, { arg1, arg2 })); - break; + break; case EBinaryOp::Div: Items_.emplace_back(Pb_.BlockFunc("Div", returnType, { arg1, arg2 })); break; diff --git a/ydb/library/yql/core/arrow_kernels/request/request.h b/ydb/library/yql/core/arrow_kernels/request/request.h index 3aa4715a343..94a37bb1141 100644 --- a/ydb/library/yql/core/arrow_kernels/request/request.h +++ b/ydb/library/yql/core/arrow_kernels/request/request.h @@ -13,6 +13,9 @@ public: }; enum EBinaryOp { + And, + Or, + Xor, Add, Sub, Mul, diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_block_logical.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_block_logical.cpp index e2cbcc9e845..39954fb989c 100644 --- a/ydb/library/yql/minikql/comp_nodes/mkql_block_logical.cpp +++ b/ydb/library/yql/minikql/comp_nodes/mkql_block_logical.cpp @@ -3,6 +3,7 @@ #include <ydb/library/yql/minikql/arrow/arrow_defs.h> #include <ydb/library/yql/minikql/arrow/mkql_bit_utils.h> #include <ydb/library/yql/minikql/mkql_type_builder.h> +#include <ydb/library/yql/minikql/computation/mkql_block_impl.h> #include <ydb/library/yql/minikql/computation/mkql_computation_node_holders.h> #include <ydb/library/yql/minikql/mkql_node_builder.h> #include <ydb/library/yql/minikql/mkql_node_cast.h> @@ -24,7 +25,7 @@ using arrow::internal::Bitmap; std::shared_ptr<arrow::Buffer> CopyBitmap(arrow::MemoryPool* pool, const std::shared_ptr<arrow::Buffer>& bitmap, int64_t offset, int64_t len) { std::shared_ptr<arrow::Buffer> result = bitmap; if (bitmap && offset != 0) { - result = arrow::AllocateBitmap(len, pool).ValueOrDie(); + result = ARROW_RESULT(arrow::AllocateBitmap(len, pool)); arrow::internal::CopyBitmap(bitmap->data(), offset, len, result->mutable_data(), 0); } return result; @@ -33,11 +34,27 @@ std::shared_ptr<arrow::Buffer> CopyBitmap(arrow::MemoryPool* pool, const std::sh std::shared_ptr<arrow::Buffer> CopySparseBitmap(arrow::MemoryPool* pool, const std::shared_ptr<arrow::Buffer>& bitmap, int64_t offset, int64_t len) { std::shared_ptr<arrow::Buffer> result = bitmap; if (bitmap && offset != 0) { - result = arrow::AllocateBuffer(len, pool).ValueOrDie(); + result = ARROW_RESULT(arrow::AllocateBuffer(len, pool)); std::memcpy(result->mutable_data(), bitmap->data() + offset, len); } return result; +} + +arrow::Datum MakeFalseArray(arrow::MemoryPool* pool, int64_t len) { + std::shared_ptr<arrow::Buffer> data = ARROW_RESULT(arrow::AllocateBuffer(len, pool)); + std::memset(data->mutable_data(), 0, len); + return arrow::ArrayData::Make(arrow::uint8(), len, { std::shared_ptr<arrow::Buffer>{}, data }); +} +arrow::Datum MakeTrueArray(arrow::MemoryPool* pool, int64_t len) { + std::shared_ptr<arrow::Buffer> data = ARROW_RESULT(arrow::AllocateBuffer(len, pool)); + std::memset(data->mutable_data(), 1, len); + return arrow::ArrayData::Make(arrow::uint8(), len, { std::shared_ptr<arrow::Buffer>{}, data }); +} + +arrow::Datum MakeNullArray(arrow::MemoryPool* pool, int64_t len) { + std::shared_ptr<arrow::Array> arr = ARROW_RESULT(arrow::MakeArrayOfNull(arrow::uint8(), len, pool)); + return arr; } bool IsAllEqualsTo(const arrow::Datum& datum, bool value) { @@ -52,117 +69,90 @@ bool IsAllEqualsTo(const arrow::Datum& datum, bool value) { return popCnt == (value ? len : 0); } -class TBlockAndWrapper : public TMutableComputationNode<TBlockAndWrapper> { +class TAndBlockExec { public: - TBlockAndWrapper(TComputationMutables& mutables, IComputationNode* first, IComputationNode* second, TBlockType::EShape shape) - : TMutableComputationNode(mutables) - , First(first) - , Second(second) - , ScalarResult(shape == TBlockType::EShape::Scalar) - { - } - - NUdf::TUnboxedValuePod DoCalculate(TComputationContext& ctx) const { - auto first = First->GetValue(ctx); - const auto& firstDatum = TArrowBlock::From(first).GetDatum(); + arrow::Status Exec(arrow::compute::KernelContext* ctx, const arrow::compute::ExecBatch& batch, arrow::Datum* res) const { + auto firstDatum = batch.values[0]; + auto secondDatum = batch.values[1]; + MKQL_ENSURE(!firstDatum.is_scalar() || !secondDatum.is_scalar(), "Expected at least one array"); if (IsAllEqualsTo(firstDatum, false)) { // false AND ... = false - if (ScalarResult == firstDatum.is_scalar()) { - return first.Release(); + if (firstDatum.is_array()) { + *res = firstDatum; + } else { + // need length + *res = MakeFalseArray(ctx->memory_pool(), secondDatum.length()); } - Y_VERIFY(firstDatum.is_scalar()); - // need length - auto second = Second->GetValue(ctx); - const auto& secondDatum = TArrowBlock::From(second).GetDatum(); - return MakeFalseArray(ctx, secondDatum.length()); - } - auto second = Second->GetValue(ctx); - const auto& secondDatum = TArrowBlock::From(second).GetDatum(); + return arrow::Status::OK(); + } if (IsAllEqualsTo(secondDatum, false)) { // ... AND false = false - if (ScalarResult == secondDatum.is_scalar()) { - return second.Release(); + if (secondDatum.is_array()) { + *res = secondDatum; + } else { + *res = MakeFalseArray(ctx->memory_pool(), firstDatum.length()); } - Y_VERIFY(secondDatum.is_scalar()); - return MakeFalseArray(ctx, firstDatum.length()); - } - if (firstDatum.is_scalar() && secondDatum.is_scalar()) { - bool first_true = firstDatum.scalar()->is_valid && (firstDatum.scalar_as<arrow::UInt8Scalar>().value & 1u != 0); - bool first_false = firstDatum.scalar()->is_valid && (firstDatum.scalar_as<arrow::UInt8Scalar>().value & 1u == 0); - - bool second_true = secondDatum.scalar()->is_valid && (secondDatum.scalar_as<arrow::UInt8Scalar>().value & 1u != 0); - bool second_false = secondDatum.scalar()->is_valid && (secondDatum.scalar_as<arrow::UInt8Scalar>().value & 1u == 0); - - auto result = std::make_shared<arrow::UInt8Scalar>(ui8(first_true && second_true)); - result->is_valid = first_false || second_false || (first_true && second_true); - - return ctx.HolderFactory.CreateArrowBlock(arrow::Datum(result)); + return arrow::Status::OK(); } if (firstDatum.is_scalar()) { ui8 value = firstDatum.scalar_as<arrow::UInt8Scalar>().value & 1u; bool valid = firstDatum.scalar()->is_valid; - return CalcScalarArray(ctx, value, valid, secondDatum.array()); - } - - if (secondDatum.is_scalar()) { + *res = CalcScalarArray(ctx->memory_pool(), value, valid, secondDatum.array()); + } else if (secondDatum.is_scalar()) { ui8 value = secondDatum.scalar_as<arrow::UInt8Scalar>().value & 1u; bool valid = secondDatum.scalar()->is_valid; - return CalcScalarArray(ctx, value, valid, firstDatum.array()); + *res = CalcScalarArray(ctx->memory_pool(), value, valid, firstDatum.array()); + } else { + *res = CalcArrayArray(ctx->memory_pool(), firstDatum.array(), secondDatum.array()); } - return CalcArrayArray(ctx, firstDatum.array(), secondDatum.array()); + return arrow::Status::OK(); } private: - NUdf::TUnboxedValuePod MakeFalseArray(TComputationContext& ctx, int64_t len) const { - std::shared_ptr<arrow::Buffer> data = arrow::AllocateBuffer(len, &ctx.ArrowMemoryPool).ValueOrDie(); - std::memset(data->mutable_data(), 0, len); - return ctx.HolderFactory.CreateArrowBlock(arrow::ArrayData::Make(arrow::uint8(), len, { std::shared_ptr<arrow::Buffer>{}, data })); - } - - NUdf::TUnboxedValuePod CalcScalarArray(TComputationContext& ctx, ui8 value, bool valid, const std::shared_ptr<arrow::ArrayData>& arr) const { + arrow::Datum CalcScalarArray(arrow::MemoryPool* pool, ui8 value, bool valid, const std::shared_ptr<arrow::ArrayData>& arr) const { bool first_true = valid && value; bool first_false = valid && !value; if (first_false) { - return MakeFalseArray(ctx, arr->length); + return MakeFalseArray(pool, arr->length); } if (first_true) { - return ctx.HolderFactory.CreateArrowBlock(arr); + return arr; } // scalar is null -> result is valid _only_ if arr[i] == false //bitmap = bitmap and not data[i] - std::shared_ptr<arrow::Buffer> bitmap = arrow::AllocateBitmap(arr->length, &ctx.ArrowMemoryPool).ValueOrDie(); + std::shared_ptr<arrow::Buffer> bitmap = ARROW_RESULT(arrow::AllocateBitmap(arr->length, pool)); CompressSparseBitmapNegate(bitmap->mutable_data(), arr->GetValues<ui8>(1), arr->length); if (arr->buffers[0]) { - bitmap = arrow::internal::BitmapAnd(&ctx.ArrowMemoryPool, arr->GetValues<ui8>(0, 0), arr->offset, bitmap->data(), 0, arr->length, 0).ValueOrDie(); + bitmap = ARROW_RESULT(arrow::internal::BitmapAnd(pool, arr->GetValues<ui8>(0, 0), arr->offset, bitmap->data(), 0, arr->length, 0)); } - std::shared_ptr<arrow::Buffer> data = CopySparseBitmap(&ctx.ArrowMemoryPool, arr->buffers[1], arr->offset, arr->length); - return ctx.HolderFactory.CreateArrowBlock(arrow::ArrayData::Make(arr->type, arr->length, { bitmap, data })); + std::shared_ptr<arrow::Buffer> data = CopySparseBitmap(pool, arr->buffers[1], arr->offset, arr->length); + return arrow::ArrayData::Make(arr->type, arr->length, { bitmap, data }); } - NUdf::TUnboxedValuePod CalcArrayArray(TComputationContext& ctx, const std::shared_ptr<arrow::ArrayData>& arr1, + arrow::Datum CalcArrayArray(arrow::MemoryPool* pool, const std::shared_ptr<arrow::ArrayData>& arr1, const std::shared_ptr<arrow::ArrayData>& arr2) const { - Y_VERIFY(arr1->offset == arr2->offset); Y_VERIFY(arr1->length == arr2->length); auto buf1 = arr1->buffers[0]; auto buf2 = arr2->buffers[0]; - const int64_t offset = arr1->offset; + const int64_t offset1 = arr1->offset; + const int64_t offset2 = arr2->offset; const int64_t length = arr1->length; std::shared_ptr<arrow::Buffer> bitmap; if (buf1 || buf2) { - bitmap = arrow::AllocateBitmap(length, &ctx.ArrowMemoryPool).ValueOrDie(); - auto first = arrow::AllocateBitmap(length, &ctx.ArrowMemoryPool).ValueOrDie(); - auto second = arrow::AllocateBitmap(length, &ctx.ArrowMemoryPool).ValueOrDie(); + bitmap = ARROW_RESULT(arrow::AllocateBitmap(length, pool)); + auto first = ARROW_RESULT(arrow::AllocateBitmap(length, pool)); + auto second = ARROW_RESULT(arrow::AllocateBitmap(length, pool)); CompressSparseBitmap(first->mutable_data(), arr1->GetValues<ui8>(1), length); CompressSparseBitmap(second->mutable_data(), arr2->GetValues<ui8>(1), length); @@ -175,8 +165,8 @@ private: //bitmap = first_false | second_false | (first_true & second_true); //bitmap = (b1 & ~v1) | (b2 & ~v2) | (b1 & v1 & b2 & v2) if (buf1 && buf2) { - Bitmap b1(buf1, offset, length); - Bitmap b2(buf2, offset, length); + Bitmap b1(buf1, offset1, length); + Bitmap b2(buf2, offset2, length); std::array<Bitmap, 4> in{b1, v1, b2, v2}; Bitmap::VisitWordsAndWrite(in, &out, [](const std::array<uint64_t, 4>& in, std::array<uint64_t, 1>* out) { @@ -187,7 +177,7 @@ private: out->at(0) = (b1 & ~v1) | (b2 & ~v2) | (b1 & v1 & b2 & v2); }); } else if (buf1) { - Bitmap b1(buf1, offset, length); + Bitmap b1(buf1, offset1, length); std::array<Bitmap, 3> in{b1, v1, v2}; Bitmap::VisitWordsAndWrite(in, &out, [](const std::array<uint64_t, 3>& in, std::array<uint64_t, 1>* out) { @@ -197,7 +187,7 @@ private: out->at(0) = (b1 & ~v1) | (~v2) | (b1 & v1 & v2); }); } else { - Bitmap b2(buf2, offset, length); + Bitmap b2(buf2, offset2, length); std::array<Bitmap, 3> in{v1, b2, v2}; Bitmap::VisitWordsAndWrite(in, &out, [](const std::array<uint64_t, 3>& in, std::array<uint64_t, 1>* out) { @@ -208,133 +198,97 @@ private: }); } } - std::shared_ptr<arrow::Buffer> data = arrow::AllocateBuffer(length, &ctx.ArrowMemoryPool).ValueOrDie(); + std::shared_ptr<arrow::Buffer> data = ARROW_RESULT(arrow::AllocateBuffer(length, pool)); AndSparseBitmaps(data->mutable_data(), arr1->GetValues<ui8>(1), arr2->GetValues<ui8>(1), length); - return ctx.HolderFactory.CreateArrowBlock(arrow::ArrayData::Make(arr1->type, length, { bitmap, data })); + return arrow::ArrayData::Make(arr1->type, length, { bitmap, data }); } - - void RegisterDependencies() const final { - DependsOn(First); - DependsOn(Second); - } - - IComputationNode* const First; - IComputationNode* const Second; - const bool ScalarResult; }; -class TBlockOrWrapper : public TMutableComputationNode<TBlockOrWrapper> { +class TOrBlockExec { public: - TBlockOrWrapper(TComputationMutables& mutables, IComputationNode* first, IComputationNode* second, TBlockType::EShape shape) - : TMutableComputationNode(mutables) - , First(first) - , Second(second) - , ScalarResult(shape == TBlockType::EShape::Scalar) - { - } - - NUdf::TUnboxedValuePod DoCalculate(TComputationContext& ctx) const { - auto first = First->GetValue(ctx); - const auto& firstDatum = TArrowBlock::From(first).GetDatum(); + arrow::Status Exec(arrow::compute::KernelContext* ctx, const arrow::compute::ExecBatch& batch, arrow::Datum* res) const { + auto firstDatum = batch.values[0]; + auto secondDatum = batch.values[1]; + MKQL_ENSURE(!firstDatum.is_scalar() || !secondDatum.is_scalar(), "Expected at least one array"); if (IsAllEqualsTo(firstDatum, true)) { // true OR ... = true - if (ScalarResult == firstDatum.is_scalar()) { - return first.Release(); + if (firstDatum.is_array()) { + *res = firstDatum; + } else { + // need length + *res = MakeTrueArray(ctx->memory_pool(), secondDatum.length()); } - Y_VERIFY(firstDatum.is_scalar()); - // need length - auto second = Second->GetValue(ctx); - const auto& secondDatum = TArrowBlock::From(second).GetDatum(); - return MakeTrueArray(ctx, secondDatum.length()); - } - auto second = Second->GetValue(ctx); - const auto& secondDatum = TArrowBlock::From(second).GetDatum(); + return arrow::Status::OK(); + } if (IsAllEqualsTo(secondDatum, true)) { // ... OR true = true - if (ScalarResult == secondDatum.is_scalar()) { - return second.Release(); + if (secondDatum.is_array()) { + *res = secondDatum; + } else { + *res = MakeTrueArray(ctx->memory_pool(), firstDatum.length()); } - Y_VERIFY(secondDatum.is_scalar()); - return MakeTrueArray(ctx, firstDatum.length()); - } - - if (firstDatum.is_scalar() && secondDatum.is_scalar()) { - bool first_true = firstDatum.scalar()->is_valid && (firstDatum.scalar_as<arrow::UInt8Scalar>().value & 1u != 0); - bool first_false = firstDatum.scalar()->is_valid && (firstDatum.scalar_as<arrow::UInt8Scalar>().value & 1u == 0); - - bool second_true = secondDatum.scalar()->is_valid && (secondDatum.scalar_as<arrow::UInt8Scalar>().value & 1u != 0); - bool second_false = secondDatum.scalar()->is_valid && (secondDatum.scalar_as<arrow::UInt8Scalar>().value & 1u == 0); - auto result = std::make_shared<arrow::UInt8Scalar>(ui8(first_true || second_true)); - result->is_valid = first_true || second_true || (first_false && second_false); - - return ctx.HolderFactory.CreateArrowBlock(arrow::Datum(result)); + return arrow::Status::OK(); } if (firstDatum.is_scalar()) { ui8 value = firstDatum.scalar_as<arrow::UInt8Scalar>().value; bool valid = firstDatum.scalar()->is_valid; - return CalcScalarArray(ctx, value, valid, secondDatum.array()); - } - - if (secondDatum.is_scalar()) { + *res = CalcScalarArray(ctx->memory_pool(), value, valid, secondDatum.array()); + } else if (secondDatum.is_scalar()) { ui8 value = secondDatum.scalar_as<arrow::UInt8Scalar>().value; bool valid = secondDatum.scalar()->is_valid; - return CalcScalarArray(ctx, value, valid, firstDatum.array()); + *res = CalcScalarArray(ctx->memory_pool(), value, valid, firstDatum.array()); + } else { + *res = CalcArrayArray(ctx->memory_pool(), firstDatum.array(), secondDatum.array()); } - return CalcArrayArray(ctx, firstDatum.array(), secondDatum.array()); + return arrow::Status::OK(); } private: - NUdf::TUnboxedValuePod MakeTrueArray(TComputationContext& ctx, int64_t len) const { - std::shared_ptr<arrow::Buffer> data = arrow::AllocateBuffer(len, &ctx.ArrowMemoryPool).ValueOrDie(); - std::memset(data->mutable_data(), 1, len); - return ctx.HolderFactory.CreateArrowBlock(arrow::ArrayData::Make(arrow::uint8(), len, { std::shared_ptr<arrow::Buffer>{}, data })); - } - - NUdf::TUnboxedValuePod CalcScalarArray(TComputationContext& ctx, ui8 value, bool valid, const std::shared_ptr<arrow::ArrayData>& arr) const { + arrow::Datum CalcScalarArray(arrow::MemoryPool* pool, ui8 value, bool valid, const std::shared_ptr<arrow::ArrayData>& arr) const { bool first_true = valid && value; bool first_false = valid && !value; if (first_true) { - return MakeTrueArray(ctx, arr->length); + return MakeTrueArray(pool, arr->length); } if (first_false) { - return ctx.HolderFactory.CreateArrowBlock(arr); + return arr; } // scalar is null -> result is valid _only_ if arr[i] == true //bitmap = bitmap and data[i] - std::shared_ptr<arrow::Buffer> bitmap = arrow::AllocateBitmap(arr->length, &ctx.ArrowMemoryPool).ValueOrDie(); + std::shared_ptr<arrow::Buffer> bitmap = ARROW_RESULT(arrow::AllocateBitmap(arr->length, pool)); CompressSparseBitmap(bitmap->mutable_data(), arr->GetValues<ui8>(1), arr->length); if (arr->buffers[0]) { - bitmap = arrow::internal::BitmapAnd(&ctx.ArrowMemoryPool, arr->GetValues<ui8>(0, 0), arr->offset, bitmap->data(), 0, arr->length, 0).ValueOrDie(); + bitmap = ARROW_RESULT(arrow::internal::BitmapAnd(pool, arr->GetValues<ui8>(0, 0), arr->offset, bitmap->data(), 0, arr->length, 0)); } - std::shared_ptr<arrow::Buffer> data = CopySparseBitmap(&ctx.ArrowMemoryPool, arr->buffers[1], arr->offset, arr->length); - return ctx.HolderFactory.CreateArrowBlock(arrow::ArrayData::Make(arr->type, arr->length, { bitmap, data })); + std::shared_ptr<arrow::Buffer> data = CopySparseBitmap(pool, arr->buffers[1], arr->offset, arr->length); + return arrow::ArrayData::Make(arr->type, arr->length, { bitmap, data }); } - NUdf::TUnboxedValuePod CalcArrayArray(TComputationContext& ctx, const std::shared_ptr<arrow::ArrayData>& arr1, + arrow::Datum CalcArrayArray(arrow::MemoryPool* pool, const std::shared_ptr<arrow::ArrayData>& arr1, const std::shared_ptr<arrow::ArrayData>& arr2) const { - Y_VERIFY(arr1->offset == arr2->offset); Y_VERIFY(arr1->length == arr2->length); auto buf1 = arr1->buffers[0]; auto buf2 = arr2->buffers[0]; - const int64_t offset = arr1->offset; + const int64_t offset1 = arr1->offset; + const int64_t offset2 = arr2->offset; const int64_t length = arr1->length; std::shared_ptr<arrow::Buffer> bitmap; if (buf1 || buf2) { - bitmap = arrow::AllocateBitmap(length, &ctx.ArrowMemoryPool).ValueOrDie(); - auto first = arrow::AllocateBitmap(length, &ctx.ArrowMemoryPool).ValueOrDie(); - auto second = arrow::AllocateBitmap(length, &ctx.ArrowMemoryPool).ValueOrDie(); + bitmap = ARROW_RESULT(arrow::AllocateBitmap(length, pool)); + auto first = ARROW_RESULT(arrow::AllocateBitmap(length, pool)); + auto second = ARROW_RESULT(arrow::AllocateBitmap(length, pool)); CompressSparseBitmap(first->mutable_data(), arr1->GetValues<ui8>(1), length); CompressSparseBitmap(second->mutable_data(), arr2->GetValues<ui8>(1), length); @@ -347,8 +301,8 @@ private: //bitmap = first_true | second_true | (first_false & second_false); //bitmap = (b1 & v1) | (b2 & v2) | (b1 & ~v1 & b2 & ~v2) if (buf1 && buf2) { - Bitmap b1(buf1, offset, length); - Bitmap b2(buf2, offset, length); + Bitmap b1(buf1, offset1, length); + Bitmap b2(buf2, offset2, length); std::array<Bitmap, 4> in{b1, v1, b2, v2}; Bitmap::VisitWordsAndWrite(in, &out, [](const std::array<uint64_t, 4>& in, std::array<uint64_t, 1>* out) { @@ -359,7 +313,7 @@ private: out->at(0) = (b1 & v1) | (b2 & v2) | (b1 & ~v1 & b2 & ~v2); }); } else if (buf1) { - Bitmap b1(buf1, offset, length); + Bitmap b1(buf1, offset1, length); std::array<Bitmap, 3> in{b1, v1, v2}; Bitmap::VisitWordsAndWrite(in, &out, [](const std::array<uint64_t, 3>& in, std::array<uint64_t, 1>* out) { @@ -369,7 +323,7 @@ private: out->at(0) = (b1 & v1) | v2 | (b1 & ~v1 & ~v2); }); } else { - Bitmap b2(buf2, offset, length); + Bitmap b2(buf2, offset2, length); std::array<Bitmap, 3> in{v1, b2, v2}; Bitmap::VisitWordsAndWrite(in, &out, [](const std::array<uint64_t, 3>& in, std::array<uint64_t, 1>* out) { @@ -380,205 +334,114 @@ private: }); } } - std::shared_ptr<arrow::Buffer> data = arrow::AllocateBuffer(length, &ctx.ArrowMemoryPool).ValueOrDie(); + std::shared_ptr<arrow::Buffer> data = ARROW_RESULT(arrow::AllocateBuffer(length, pool)); OrSparseBitmaps(data->mutable_data(), arr1->GetValues<ui8>(1), arr2->GetValues<ui8>(1), length); - return ctx.HolderFactory.CreateArrowBlock(arrow::ArrayData::Make(arr1->type, length, { bitmap, data })); - } - - void RegisterDependencies() const final { - DependsOn(First); - DependsOn(Second); + return arrow::ArrayData::Make(arr1->type, length, { bitmap, data }); } - - IComputationNode* const First; - IComputationNode* const Second; - const bool ScalarResult; }; -class TBlockXorWrapper : public TMutableComputationNode<TBlockXorWrapper> { +class TXorBlockExec { public: - TBlockXorWrapper(TComputationMutables& mutables, IComputationNode* first, IComputationNode* second, TBlockType::EShape shape) - : TMutableComputationNode(mutables) - , First(first) - , Second(second) - , ScalarResult(shape == TBlockType::EShape::Scalar) - { - } - - NUdf::TUnboxedValuePod DoCalculate(TComputationContext& ctx) const { - auto first = First->GetValue(ctx); - const auto& firstDatum = TArrowBlock::From(first).GetDatum(); - + arrow::Status Exec(arrow::compute::KernelContext* ctx, const arrow::compute::ExecBatch& batch, arrow::Datum* res) const { + auto firstDatum = batch.values[0]; + auto secondDatum = batch.values[1]; + MKQL_ENSURE(!firstDatum.is_scalar() || !secondDatum.is_scalar(), "Expected at least one array"); if (firstDatum.null_count() == firstDatum.length()) { - if (ScalarResult == firstDatum.is_scalar()) { - return first.Release(); + if (firstDatum.is_array()) { + *res = firstDatum; + } else { + *res = MakeNullArray(ctx->memory_pool(), secondDatum.length()); } - Y_VERIFY(firstDatum.is_scalar()); - // need length - auto second = Second->GetValue(ctx); - const auto& secondDatum = TArrowBlock::From(second).GetDatum(); - return MakeNullArray(ctx, secondDatum.length()); - } - auto second = Second->GetValue(ctx); - const auto& secondDatum = TArrowBlock::From(second).GetDatum(); + return arrow::Status::OK(); + } if (secondDatum.null_count() == secondDatum.length()) { - return (ScalarResult == secondDatum.is_scalar()) ? second.Release() : MakeNullArray(ctx, firstDatum.length()); - } + if (secondDatum.is_array()) { + *res = secondDatum; + } else { + *res = MakeNullArray(ctx->memory_pool(), firstDatum.length()); + } - if (firstDatum.is_scalar() && secondDatum.is_scalar()) { - Y_VERIFY(firstDatum.scalar()->is_valid); - Y_VERIFY(secondDatum.scalar()->is_valid); - ui8 result = firstDatum.scalar_as<arrow::UInt8Scalar>().value ^ secondDatum.scalar_as<arrow::UInt8Scalar>().value; - result &= 1u; - return ctx.HolderFactory.CreateArrowBlock(arrow::Datum(result)); + return arrow::Status::OK(); } if (firstDatum.is_scalar()) { ui8 value = firstDatum.scalar_as<arrow::UInt8Scalar>().value; - return CalcScalarArray(ctx, value, secondDatum.array()); - } - - if (secondDatum.is_scalar()) { + *res = CalcScalarArray(ctx->memory_pool(), value, secondDatum.array()); + } else if (secondDatum.is_scalar()) { ui8 value = secondDatum.scalar_as<arrow::UInt8Scalar>().value; - return CalcScalarArray(ctx, value, firstDatum.array()); + *res = CalcScalarArray(ctx->memory_pool(), value, firstDatum.array()); + } else { + *res = CalcArrayArray(ctx->memory_pool(), firstDatum.array(), secondDatum.array()); } - return CalcArrayArray(ctx, firstDatum.array(), secondDatum.array()); + return arrow::Status::OK(); } private: - NUdf::TUnboxedValuePod MakeNullArray(TComputationContext& ctx, int64_t len) const { - std::shared_ptr<arrow::Array> arr = arrow::MakeArrayOfNull(arrow::uint8(), len, &ctx.ArrowMemoryPool).ValueOrDie(); - return ctx.HolderFactory.CreateArrowBlock(arr->data()); - } - - NUdf::TUnboxedValuePod CalcScalarArray(TComputationContext& ctx, ui8 value, const std::shared_ptr<arrow::ArrayData>& arr) const { - std::shared_ptr<arrow::Buffer> bitmap = CopyBitmap(&ctx.ArrowMemoryPool, arr->buffers[0], arr->offset, arr->length); - std::shared_ptr<arrow::Buffer> data = arrow::AllocateBuffer(arr->length, &ctx.ArrowMemoryPool).ValueOrDie(); + arrow::Datum CalcScalarArray(arrow::MemoryPool* pool, ui8 value, const std::shared_ptr<arrow::ArrayData>& arr) const { + std::shared_ptr<arrow::Buffer> bitmap = CopyBitmap(pool, arr->buffers[0], arr->offset, arr->length); + std::shared_ptr<arrow::Buffer> data = ARROW_RESULT(arrow::AllocateBuffer(arr->length, pool)); XorSparseBitmapScalar(data->mutable_data(), value, arr->GetValues<ui8>(1), arr->length); - return ctx.HolderFactory.CreateArrowBlock(arrow::ArrayData::Make(arr->type, arr->length, { bitmap, data })); + return arrow::ArrayData::Make(arr->type, arr->length, { bitmap, data }); } - NUdf::TUnboxedValuePod CalcArrayArray(TComputationContext& ctx, const std::shared_ptr<arrow::ArrayData>& arr1, + arrow::Datum CalcArrayArray(arrow::MemoryPool* pool, const std::shared_ptr<arrow::ArrayData>& arr1, const std::shared_ptr<arrow::ArrayData>& arr2) const { - Y_VERIFY(arr1->offset == arr2->offset); Y_VERIFY(arr1->length == arr2->length); auto b1 = arr1->buffers[0]; auto b2 = arr2->buffers[0]; - const int64_t offset = arr1->offset; + const int64_t offset1 = arr1->offset; + const int64_t offset2 = arr2->offset; const int64_t length = arr1->length; std::shared_ptr<arrow::Buffer> bitmap; if (b1 && b2) { - bitmap = arrow::internal::BitmapAnd(&ctx.ArrowMemoryPool, b1->data(), offset, b2->data(), offset, length, 0).ValueOrDie(); + bitmap = ARROW_RESULT(arrow::internal::BitmapAnd(pool, b1->data(), offset1, b2->data(), offset2, length, 0)); } else { - bitmap = CopyBitmap(&ctx.ArrowMemoryPool, b1 ? b1 : b2, offset, length); + bitmap = CopyBitmap(pool, b1 ? b1 : b2, b1 ? offset1 : offset2, length); } - std::shared_ptr<arrow::Buffer> data = arrow::AllocateBuffer(length, &ctx.ArrowMemoryPool).ValueOrDie(); + std::shared_ptr<arrow::Buffer> data = ARROW_RESULT(arrow::AllocateBuffer(length, pool)); XorSparseBitmaps(data->mutable_data(), arr1->GetValues<ui8>(1), arr2->GetValues<ui8>(1), length); - return ctx.HolderFactory.CreateArrowBlock(arrow::ArrayData::Make(arr1->type, length, { bitmap, data })); - } - - void RegisterDependencies() const final { - DependsOn(First); - DependsOn(Second); + return arrow::ArrayData::Make(arr1->type, length, { bitmap, data }); } - - IComputationNode* const First; - IComputationNode* const Second; - const bool ScalarResult; }; -class TBlockNotWrapper : public TMutableComputationNode<TBlockNotWrapper> { -friend class TArrowNode; -public: - class TArrowNode : public IArrowKernelComputationNode { - public: - TArrowNode(const TBlockNotWrapper* parent) - : Parent_(parent) - , ArgsValuesDescr_({arrow::uint8()}) - , Kernel_({arrow::uint8()}, arrow::uint8(), [parent](arrow::compute::KernelContext* ctx, const arrow::compute::ExecBatch& batch, arrow::Datum* res) { - *res = parent->CalculateImpl(MakeDatumProvider(batch.values[0]), *ctx->memory_pool()); - return arrow::Status::OK(); - }) - { - Kernel_.null_handling = arrow::compute::NullHandling::COMPUTED_NO_PREALLOCATE; - Kernel_.mem_allocation = arrow::compute::MemAllocation::NO_PREALLOCATE; - } - - TStringBuf GetKernelName() const final { - return "Not"; - } - - const arrow::compute::ScalarKernel& GetArrowKernel() const { - return Kernel_; - } - - const std::vector<arrow::ValueDescr>& GetArgsDesc() const { - return ArgsValuesDescr_; - } - - const IComputationNode* GetArgument(ui32 index) const { - switch (index) { - case 0: - return Parent_->Value; - default: - throw yexception() << "Bad argument index"; - } - } - - private: - const TBlockNotWrapper* Parent_; - const std::vector<arrow::ValueDescr> ArgsValuesDescr_; - arrow::compute::ScalarKernel Kernel_; - }; - +class TNotBlockExec { public: - TBlockNotWrapper(TComputationMutables& mutables, IComputationNode* value) - : TMutableComputationNode(mutables) - , Value(value) - { - } - - std::unique_ptr<IArrowKernelComputationNode> PrepareArrowKernelComputationNode(TComputationContext& ctx) const final { - Y_UNUSED(ctx); - return std::make_unique<TArrowNode>(this); - } - - arrow::Datum CalculateImpl(const TDatumProvider& valueProv, arrow::MemoryPool& memoryPool) const { - auto datum = valueProv(); - if (datum.null_count() == datum.length()) { - return datum; - } - - if (datum.is_scalar()) { - Y_VERIFY(datum.scalar()->is_valid); - ui8 negated = (~datum.scalar_as<arrow::UInt8Scalar>().value) & 1u; - return arrow::Datum(negated); + arrow::Status Exec(arrow::compute::KernelContext* ctx, const arrow::compute::ExecBatch& batch, arrow::Datum* res) const { + const auto& input = batch.values[0]; + MKQL_ENSURE(input.is_array(), "Expected array"); + const auto& arr = *input.array(); + if (arr.GetNullCount() == arr.length) { + *res = input; + } else { + auto bitmap = CopyBitmap(ctx->memory_pool(), arr.buffers[0], arr.offset, arr.length); + std::shared_ptr<arrow::Buffer> data = ARROW_RESULT(arrow::AllocateBuffer(arr.length, ctx->memory_pool()));; + NegateSparseBitmap(data->mutable_data(), arr.GetValues<ui8>(1), arr.length); + *res = arrow::ArrayData::Make(arr.type, arr.length, { bitmap, data }); } - auto arr = datum.array(); - std::shared_ptr<arrow::Buffer> bitmap = CopyBitmap(&memoryPool, arr->buffers[0], arr->offset, arr->length); - std::shared_ptr<arrow::Buffer> data = arrow::AllocateBuffer(arr->length, &memoryPool).ValueOrDie(); - NegateSparseBitmap(data->mutable_data(), arr->GetValues<ui8>(1), arr->length); - return arrow::ArrayData::Make(arr->type, arr->length, { bitmap, data }); - } - - NUdf::TUnboxedValuePod DoCalculate(TComputationContext& ctx) const { - return ctx.HolderFactory.CreateArrowBlock(CalculateImpl(MakeDatumProvider(Value, ctx), ctx.ArrowMemoryPool)); + return arrow::Status::OK(); } - -private: - void RegisterDependencies() const final { - DependsOn(Value); - } - - IComputationNode* const Value; }; +template <typename TExec> +std::shared_ptr<arrow::compute::ScalarKernel> MakeKernel(const TVector<TType*>& argTypes, TType* resultType) { + std::shared_ptr<arrow::DataType> returnArrowType; + MKQL_ENSURE(ConvertArrowType(AS_TYPE(TBlockType, resultType)->GetItemType(), returnArrowType), "Unsupported arrow type"); + auto exec = std::make_shared<TExec>(); + auto kernel = std::make_shared<arrow::compute::ScalarKernel>(ConvertToInputTypes(argTypes), ConvertToOutputType(resultType), + [exec](arrow::compute::KernelContext* ctx, const arrow::compute::ExecBatch& batch, arrow::Datum* res) { + return exec->Exec(ctx, batch, res); + }); + + kernel->null_handling = arrow::compute::NullHandling::COMPUTED_NO_PREALLOCATE; + return kernel; +} + IComputationNode* WrapBlockLogical(std::string_view name, TCallable& callable, const TComputationNodeFactoryContext& ctx) { MKQL_ENSURE(callable.GetInputsCount() == 2, "Expected 2 args"); @@ -591,33 +454,35 @@ IComputationNode* WrapBlockLogical(std::string_view name, TCallable& callable, c MKQL_ENSURE(UnpackOptionalData(secondType->GetItemType(), isOpt2)->GetSchemeType() == NUdf::TDataType<bool>::Id, "Requires boolean args."); - TBlockType::EShape shape = GetResultShape({firstType, secondType}); - - auto first = LocateNode(ctx.NodeLocator, callable, 0); - auto second = LocateNode(ctx.NodeLocator, callable, 1); - - if (name == "and") { - return new TBlockAndWrapper(ctx.Mutables, first, second, shape); - } - if (name == "or") { - return new TBlockOrWrapper(ctx.Mutables, first, second, shape); + auto compute1 = LocateNode(ctx.NodeLocator, callable, 0); + auto compute2 = LocateNode(ctx.NodeLocator, callable, 1); + TVector<IComputationNode*> argsNodes = { compute1, compute2 }; + TVector<TType*> argsTypes = { callable.GetInput(0).GetStaticType(), callable.GetInput(1).GetStaticType() }; + + std::shared_ptr<arrow::compute::ScalarKernel> kernel; + if (name == "And") { + kernel = MakeKernel<TAndBlockExec>(argsTypes, callable.GetType()->GetReturnType()); + } else if (name == "Or") { + kernel = MakeKernel<TOrBlockExec>(argsTypes, callable.GetType()->GetReturnType()); + } else { + kernel = MakeKernel<TXorBlockExec>(argsTypes, callable.GetType()->GetReturnType()); } - return new TBlockXorWrapper(ctx.Mutables, first, second, shape); -} + return new TBlockFuncNode(ctx.Mutables, name, std::move(argsNodes), argsTypes, *kernel, kernel); +} } // namespace IComputationNode* WrapBlockAnd(TCallable& callable, const TComputationNodeFactoryContext& ctx) { - return WrapBlockLogical("and", callable, ctx); + return WrapBlockLogical("And", callable, ctx); } IComputationNode* WrapBlockOr(TCallable& callable, const TComputationNodeFactoryContext& ctx) { - return WrapBlockLogical("or", callable, ctx); + return WrapBlockLogical("Or", callable, ctx); } IComputationNode* WrapBlockXor(TCallable& callable, const TComputationNodeFactoryContext& ctx) { - return WrapBlockLogical("xor", callable, ctx); + return WrapBlockLogical("Xor", callable, ctx); } IComputationNode* WrapBlockNot(TCallable& callable, const TComputationNodeFactoryContext& ctx) { @@ -628,7 +493,12 @@ IComputationNode* WrapBlockNot(TCallable& callable, const TComputationNodeFactor MKQL_ENSURE(UnpackOptionalData(dataType->GetItemType(), isOpt)->GetSchemeType() == NUdf::TDataType<bool>::Id, "Requires boolean args."); - return new TBlockNotWrapper(ctx.Mutables, LocateNode(ctx.NodeLocator, callable, 0)); + auto compute = LocateNode(ctx.NodeLocator, callable, 0); + TVector<IComputationNode*> argsNodes = { compute }; + TVector<TType*> argsTypes = { callable.GetInput(0).GetStaticType() }; + + auto kernel = MakeKernel<TNotBlockExec>(argsTypes, argsTypes[0]); + return new TBlockFuncNode(ctx.Mutables, "Not", std::move(argsNodes), argsTypes, *kernel, kernel); } |