aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorvvvv <vvvv@ydb.tech>2023-08-09 01:23:01 +0300
committervvvv <vvvv@ydb.tech>2023-08-09 02:13:40 +0300
commitc4c5c6d2bbec0deeffe1b3d8e387a6039d05c1d7 (patch)
treeef163d96768850eca49947fab19c3e7071a5b552
parentcf6597e319da1c2b470bbc9a2f7c9103c8bb25ab (diff)
downloadydb-c4c5c6d2bbec0deeffe1b3d8e387a6039d05c1d7.tar.gz
Kernels for logical ops
-rw-r--r--ydb/library/yql/core/arrow_kernels/registry/ut/registry_ut.cpp21
-rw-r--r--ydb/library/yql/core/arrow_kernels/request/request.cpp13
-rw-r--r--ydb/library/yql/core/arrow_kernels/request/request.h3
-rw-r--r--ydb/library/yql/minikql/comp_nodes/mkql_block_logical.cpp504
4 files changed, 222 insertions, 319 deletions
diff --git a/ydb/library/yql/core/arrow_kernels/registry/ut/registry_ut.cpp b/ydb/library/yql/core/arrow_kernels/registry/ut/registry_ut.cpp
index 8cab9844ddb..adf3a536d34 100644
--- a/ydb/library/yql/core/arrow_kernels/registry/ut/registry_ut.cpp
+++ b/ydb/library/yql/core/arrow_kernels/registry/ut/registry_ut.cpp
@@ -54,6 +54,27 @@ Y_UNIT_TEST_SUITE(TKernelRegistryTest) {
});
}
+ Y_UNIT_TEST(TestAnd) {
+ TestOne([](auto& b,auto& ctx) {
+ auto blockBoolType = ctx.template MakeType<TBlockExprType>(ctx.template MakeType<TDataExprType>(EDataSlot::Bool));
+ return b.AddBinaryOp(TKernelRequestBuilder::EBinaryOp::And, blockBoolType, blockBoolType, blockBoolType);
+ });
+ }
+
+ Y_UNIT_TEST(TestOr) {
+ TestOne([](auto& b,auto& ctx) {
+ auto blockBoolType = ctx.template MakeType<TBlockExprType>(ctx.template MakeType<TDataExprType>(EDataSlot::Bool));
+ return b.AddBinaryOp(TKernelRequestBuilder::EBinaryOp::Or, blockBoolType, blockBoolType, blockBoolType);
+ });
+ }
+
+ Y_UNIT_TEST(TestXor) {
+ TestOne([](auto& b,auto& ctx) {
+ auto blockBoolType = ctx.template MakeType<TBlockExprType>(ctx.template MakeType<TDataExprType>(EDataSlot::Bool));
+ return b.AddBinaryOp(TKernelRequestBuilder::EBinaryOp::Xor, blockBoolType, blockBoolType, blockBoolType);
+ });
+ }
+
Y_UNIT_TEST(TestAdd) {
TestOne([](auto& b,auto& ctx) {
auto blockInt32Type = ctx.template MakeType<TBlockExprType>(ctx.template MakeType<TDataExprType>(EDataSlot::Int32));
diff --git a/ydb/library/yql/core/arrow_kernels/request/request.cpp b/ydb/library/yql/core/arrow_kernels/request/request.cpp
index 0eb3a8df415..71700d47e09 100644
--- a/ydb/library/yql/core/arrow_kernels/request/request.cpp
+++ b/ydb/library/yql/core/arrow_kernels/request/request.cpp
@@ -39,15 +39,24 @@ ui32 TKernelRequestBuilder::AddBinaryOp(EBinaryOp op, const TTypeAnnotationNode*
auto arg1 = MakeArg(arg1Type);
auto arg2 = MakeArg(arg2Type);
switch (op) {
+ case EBinaryOp::And:
+ Items_.emplace_back(Pb_.BlockAnd(arg1, arg2));
+ break;
+ case EBinaryOp::Or:
+ Items_.emplace_back(Pb_.BlockOr(arg1, arg2));
+ break;
+ case EBinaryOp::Xor:
+ Items_.emplace_back(Pb_.BlockXor(arg1, arg2));
+ break;
case EBinaryOp::Add:
Items_.emplace_back(Pb_.BlockFunc("Add", returnType, { arg1, arg2 }));
break;
case EBinaryOp::Sub:
Items_.emplace_back(Pb_.BlockFunc("Sub", returnType, { arg1, arg2 }));
- break;
+ break;
case EBinaryOp::Mul:
Items_.emplace_back(Pb_.BlockFunc("Mul", returnType, { arg1, arg2 }));
- break;
+ break;
case EBinaryOp::Div:
Items_.emplace_back(Pb_.BlockFunc("Div", returnType, { arg1, arg2 }));
break;
diff --git a/ydb/library/yql/core/arrow_kernels/request/request.h b/ydb/library/yql/core/arrow_kernels/request/request.h
index 3aa4715a343..94a37bb1141 100644
--- a/ydb/library/yql/core/arrow_kernels/request/request.h
+++ b/ydb/library/yql/core/arrow_kernels/request/request.h
@@ -13,6 +13,9 @@ public:
};
enum EBinaryOp {
+ And,
+ Or,
+ Xor,
Add,
Sub,
Mul,
diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_block_logical.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_block_logical.cpp
index e2cbcc9e845..39954fb989c 100644
--- a/ydb/library/yql/minikql/comp_nodes/mkql_block_logical.cpp
+++ b/ydb/library/yql/minikql/comp_nodes/mkql_block_logical.cpp
@@ -3,6 +3,7 @@
#include <ydb/library/yql/minikql/arrow/arrow_defs.h>
#include <ydb/library/yql/minikql/arrow/mkql_bit_utils.h>
#include <ydb/library/yql/minikql/mkql_type_builder.h>
+#include <ydb/library/yql/minikql/computation/mkql_block_impl.h>
#include <ydb/library/yql/minikql/computation/mkql_computation_node_holders.h>
#include <ydb/library/yql/minikql/mkql_node_builder.h>
#include <ydb/library/yql/minikql/mkql_node_cast.h>
@@ -24,7 +25,7 @@ using arrow::internal::Bitmap;
std::shared_ptr<arrow::Buffer> CopyBitmap(arrow::MemoryPool* pool, const std::shared_ptr<arrow::Buffer>& bitmap, int64_t offset, int64_t len) {
std::shared_ptr<arrow::Buffer> result = bitmap;
if (bitmap && offset != 0) {
- result = arrow::AllocateBitmap(len, pool).ValueOrDie();
+ result = ARROW_RESULT(arrow::AllocateBitmap(len, pool));
arrow::internal::CopyBitmap(bitmap->data(), offset, len, result->mutable_data(), 0);
}
return result;
@@ -33,11 +34,27 @@ std::shared_ptr<arrow::Buffer> CopyBitmap(arrow::MemoryPool* pool, const std::sh
std::shared_ptr<arrow::Buffer> CopySparseBitmap(arrow::MemoryPool* pool, const std::shared_ptr<arrow::Buffer>& bitmap, int64_t offset, int64_t len) {
std::shared_ptr<arrow::Buffer> result = bitmap;
if (bitmap && offset != 0) {
- result = arrow::AllocateBuffer(len, pool).ValueOrDie();
+ result = ARROW_RESULT(arrow::AllocateBuffer(len, pool));
std::memcpy(result->mutable_data(), bitmap->data() + offset, len);
}
return result;
+}
+
+arrow::Datum MakeFalseArray(arrow::MemoryPool* pool, int64_t len) {
+ std::shared_ptr<arrow::Buffer> data = ARROW_RESULT(arrow::AllocateBuffer(len, pool));
+ std::memset(data->mutable_data(), 0, len);
+ return arrow::ArrayData::Make(arrow::uint8(), len, { std::shared_ptr<arrow::Buffer>{}, data });
+}
+arrow::Datum MakeTrueArray(arrow::MemoryPool* pool, int64_t len) {
+ std::shared_ptr<arrow::Buffer> data = ARROW_RESULT(arrow::AllocateBuffer(len, pool));
+ std::memset(data->mutable_data(), 1, len);
+ return arrow::ArrayData::Make(arrow::uint8(), len, { std::shared_ptr<arrow::Buffer>{}, data });
+}
+
+arrow::Datum MakeNullArray(arrow::MemoryPool* pool, int64_t len) {
+ std::shared_ptr<arrow::Array> arr = ARROW_RESULT(arrow::MakeArrayOfNull(arrow::uint8(), len, pool));
+ return arr;
}
bool IsAllEqualsTo(const arrow::Datum& datum, bool value) {
@@ -52,117 +69,90 @@ bool IsAllEqualsTo(const arrow::Datum& datum, bool value) {
return popCnt == (value ? len : 0);
}
-class TBlockAndWrapper : public TMutableComputationNode<TBlockAndWrapper> {
+class TAndBlockExec {
public:
- TBlockAndWrapper(TComputationMutables& mutables, IComputationNode* first, IComputationNode* second, TBlockType::EShape shape)
- : TMutableComputationNode(mutables)
- , First(first)
- , Second(second)
- , ScalarResult(shape == TBlockType::EShape::Scalar)
- {
- }
-
- NUdf::TUnboxedValuePod DoCalculate(TComputationContext& ctx) const {
- auto first = First->GetValue(ctx);
- const auto& firstDatum = TArrowBlock::From(first).GetDatum();
+ arrow::Status Exec(arrow::compute::KernelContext* ctx, const arrow::compute::ExecBatch& batch, arrow::Datum* res) const {
+ auto firstDatum = batch.values[0];
+ auto secondDatum = batch.values[1];
+ MKQL_ENSURE(!firstDatum.is_scalar() || !secondDatum.is_scalar(), "Expected at least one array");
if (IsAllEqualsTo(firstDatum, false)) {
// false AND ... = false
- if (ScalarResult == firstDatum.is_scalar()) {
- return first.Release();
+ if (firstDatum.is_array()) {
+ *res = firstDatum;
+ } else {
+ // need length
+ *res = MakeFalseArray(ctx->memory_pool(), secondDatum.length());
}
- Y_VERIFY(firstDatum.is_scalar());
- // need length
- auto second = Second->GetValue(ctx);
- const auto& secondDatum = TArrowBlock::From(second).GetDatum();
- return MakeFalseArray(ctx, secondDatum.length());
- }
- auto second = Second->GetValue(ctx);
- const auto& secondDatum = TArrowBlock::From(second).GetDatum();
+ return arrow::Status::OK();
+ }
if (IsAllEqualsTo(secondDatum, false)) {
// ... AND false = false
- if (ScalarResult == secondDatum.is_scalar()) {
- return second.Release();
+ if (secondDatum.is_array()) {
+ *res = secondDatum;
+ } else {
+ *res = MakeFalseArray(ctx->memory_pool(), firstDatum.length());
}
- Y_VERIFY(secondDatum.is_scalar());
- return MakeFalseArray(ctx, firstDatum.length());
- }
- if (firstDatum.is_scalar() && secondDatum.is_scalar()) {
- bool first_true = firstDatum.scalar()->is_valid && (firstDatum.scalar_as<arrow::UInt8Scalar>().value & 1u != 0);
- bool first_false = firstDatum.scalar()->is_valid && (firstDatum.scalar_as<arrow::UInt8Scalar>().value & 1u == 0);
-
- bool second_true = secondDatum.scalar()->is_valid && (secondDatum.scalar_as<arrow::UInt8Scalar>().value & 1u != 0);
- bool second_false = secondDatum.scalar()->is_valid && (secondDatum.scalar_as<arrow::UInt8Scalar>().value & 1u == 0);
-
- auto result = std::make_shared<arrow::UInt8Scalar>(ui8(first_true && second_true));
- result->is_valid = first_false || second_false || (first_true && second_true);
-
- return ctx.HolderFactory.CreateArrowBlock(arrow::Datum(result));
+ return arrow::Status::OK();
}
if (firstDatum.is_scalar()) {
ui8 value = firstDatum.scalar_as<arrow::UInt8Scalar>().value & 1u;
bool valid = firstDatum.scalar()->is_valid;
- return CalcScalarArray(ctx, value, valid, secondDatum.array());
- }
-
- if (secondDatum.is_scalar()) {
+ *res = CalcScalarArray(ctx->memory_pool(), value, valid, secondDatum.array());
+ } else if (secondDatum.is_scalar()) {
ui8 value = secondDatum.scalar_as<arrow::UInt8Scalar>().value & 1u;
bool valid = secondDatum.scalar()->is_valid;
- return CalcScalarArray(ctx, value, valid, firstDatum.array());
+ *res = CalcScalarArray(ctx->memory_pool(), value, valid, firstDatum.array());
+ } else {
+ *res = CalcArrayArray(ctx->memory_pool(), firstDatum.array(), secondDatum.array());
}
- return CalcArrayArray(ctx, firstDatum.array(), secondDatum.array());
+ return arrow::Status::OK();
}
private:
- NUdf::TUnboxedValuePod MakeFalseArray(TComputationContext& ctx, int64_t len) const {
- std::shared_ptr<arrow::Buffer> data = arrow::AllocateBuffer(len, &ctx.ArrowMemoryPool).ValueOrDie();
- std::memset(data->mutable_data(), 0, len);
- return ctx.HolderFactory.CreateArrowBlock(arrow::ArrayData::Make(arrow::uint8(), len, { std::shared_ptr<arrow::Buffer>{}, data }));
- }
-
- NUdf::TUnboxedValuePod CalcScalarArray(TComputationContext& ctx, ui8 value, bool valid, const std::shared_ptr<arrow::ArrayData>& arr) const {
+ arrow::Datum CalcScalarArray(arrow::MemoryPool* pool, ui8 value, bool valid, const std::shared_ptr<arrow::ArrayData>& arr) const {
bool first_true = valid && value;
bool first_false = valid && !value;
if (first_false) {
- return MakeFalseArray(ctx, arr->length);
+ return MakeFalseArray(pool, arr->length);
}
if (first_true) {
- return ctx.HolderFactory.CreateArrowBlock(arr);
+ return arr;
}
// scalar is null -> result is valid _only_ if arr[i] == false
//bitmap = bitmap and not data[i]
- std::shared_ptr<arrow::Buffer> bitmap = arrow::AllocateBitmap(arr->length, &ctx.ArrowMemoryPool).ValueOrDie();
+ std::shared_ptr<arrow::Buffer> bitmap = ARROW_RESULT(arrow::AllocateBitmap(arr->length, pool));
CompressSparseBitmapNegate(bitmap->mutable_data(), arr->GetValues<ui8>(1), arr->length);
if (arr->buffers[0]) {
- bitmap = arrow::internal::BitmapAnd(&ctx.ArrowMemoryPool, arr->GetValues<ui8>(0, 0), arr->offset, bitmap->data(), 0, arr->length, 0).ValueOrDie();
+ bitmap = ARROW_RESULT(arrow::internal::BitmapAnd(pool, arr->GetValues<ui8>(0, 0), arr->offset, bitmap->data(), 0, arr->length, 0));
}
- std::shared_ptr<arrow::Buffer> data = CopySparseBitmap(&ctx.ArrowMemoryPool, arr->buffers[1], arr->offset, arr->length);
- return ctx.HolderFactory.CreateArrowBlock(arrow::ArrayData::Make(arr->type, arr->length, { bitmap, data }));
+ std::shared_ptr<arrow::Buffer> data = CopySparseBitmap(pool, arr->buffers[1], arr->offset, arr->length);
+ return arrow::ArrayData::Make(arr->type, arr->length, { bitmap, data });
}
- NUdf::TUnboxedValuePod CalcArrayArray(TComputationContext& ctx, const std::shared_ptr<arrow::ArrayData>& arr1,
+ arrow::Datum CalcArrayArray(arrow::MemoryPool* pool, const std::shared_ptr<arrow::ArrayData>& arr1,
const std::shared_ptr<arrow::ArrayData>& arr2) const
{
- Y_VERIFY(arr1->offset == arr2->offset);
Y_VERIFY(arr1->length == arr2->length);
auto buf1 = arr1->buffers[0];
auto buf2 = arr2->buffers[0];
- const int64_t offset = arr1->offset;
+ const int64_t offset1 = arr1->offset;
+ const int64_t offset2 = arr2->offset;
const int64_t length = arr1->length;
std::shared_ptr<arrow::Buffer> bitmap;
if (buf1 || buf2) {
- bitmap = arrow::AllocateBitmap(length, &ctx.ArrowMemoryPool).ValueOrDie();
- auto first = arrow::AllocateBitmap(length, &ctx.ArrowMemoryPool).ValueOrDie();
- auto second = arrow::AllocateBitmap(length, &ctx.ArrowMemoryPool).ValueOrDie();
+ bitmap = ARROW_RESULT(arrow::AllocateBitmap(length, pool));
+ auto first = ARROW_RESULT(arrow::AllocateBitmap(length, pool));
+ auto second = ARROW_RESULT(arrow::AllocateBitmap(length, pool));
CompressSparseBitmap(first->mutable_data(), arr1->GetValues<ui8>(1), length);
CompressSparseBitmap(second->mutable_data(), arr2->GetValues<ui8>(1), length);
@@ -175,8 +165,8 @@ private:
//bitmap = first_false | second_false | (first_true & second_true);
//bitmap = (b1 & ~v1) | (b2 & ~v2) | (b1 & v1 & b2 & v2)
if (buf1 && buf2) {
- Bitmap b1(buf1, offset, length);
- Bitmap b2(buf2, offset, length);
+ Bitmap b1(buf1, offset1, length);
+ Bitmap b2(buf2, offset2, length);
std::array<Bitmap, 4> in{b1, v1, b2, v2};
Bitmap::VisitWordsAndWrite(in, &out, [](const std::array<uint64_t, 4>& in, std::array<uint64_t, 1>* out) {
@@ -187,7 +177,7 @@ private:
out->at(0) = (b1 & ~v1) | (b2 & ~v2) | (b1 & v1 & b2 & v2);
});
} else if (buf1) {
- Bitmap b1(buf1, offset, length);
+ Bitmap b1(buf1, offset1, length);
std::array<Bitmap, 3> in{b1, v1, v2};
Bitmap::VisitWordsAndWrite(in, &out, [](const std::array<uint64_t, 3>& in, std::array<uint64_t, 1>* out) {
@@ -197,7 +187,7 @@ private:
out->at(0) = (b1 & ~v1) | (~v2) | (b1 & v1 & v2);
});
} else {
- Bitmap b2(buf2, offset, length);
+ Bitmap b2(buf2, offset2, length);
std::array<Bitmap, 3> in{v1, b2, v2};
Bitmap::VisitWordsAndWrite(in, &out, [](const std::array<uint64_t, 3>& in, std::array<uint64_t, 1>* out) {
@@ -208,133 +198,97 @@ private:
});
}
}
- std::shared_ptr<arrow::Buffer> data = arrow::AllocateBuffer(length, &ctx.ArrowMemoryPool).ValueOrDie();
+ std::shared_ptr<arrow::Buffer> data = ARROW_RESULT(arrow::AllocateBuffer(length, pool));
AndSparseBitmaps(data->mutable_data(), arr1->GetValues<ui8>(1), arr2->GetValues<ui8>(1), length);
- return ctx.HolderFactory.CreateArrowBlock(arrow::ArrayData::Make(arr1->type, length, { bitmap, data }));
+ return arrow::ArrayData::Make(arr1->type, length, { bitmap, data });
}
-
- void RegisterDependencies() const final {
- DependsOn(First);
- DependsOn(Second);
- }
-
- IComputationNode* const First;
- IComputationNode* const Second;
- const bool ScalarResult;
};
-class TBlockOrWrapper : public TMutableComputationNode<TBlockOrWrapper> {
+class TOrBlockExec {
public:
- TBlockOrWrapper(TComputationMutables& mutables, IComputationNode* first, IComputationNode* second, TBlockType::EShape shape)
- : TMutableComputationNode(mutables)
- , First(first)
- , Second(second)
- , ScalarResult(shape == TBlockType::EShape::Scalar)
- {
- }
-
- NUdf::TUnboxedValuePod DoCalculate(TComputationContext& ctx) const {
- auto first = First->GetValue(ctx);
- const auto& firstDatum = TArrowBlock::From(first).GetDatum();
+ arrow::Status Exec(arrow::compute::KernelContext* ctx, const arrow::compute::ExecBatch& batch, arrow::Datum* res) const {
+ auto firstDatum = batch.values[0];
+ auto secondDatum = batch.values[1];
+ MKQL_ENSURE(!firstDatum.is_scalar() || !secondDatum.is_scalar(), "Expected at least one array");
if (IsAllEqualsTo(firstDatum, true)) {
// true OR ... = true
- if (ScalarResult == firstDatum.is_scalar()) {
- return first.Release();
+ if (firstDatum.is_array()) {
+ *res = firstDatum;
+ } else {
+ // need length
+ *res = MakeTrueArray(ctx->memory_pool(), secondDatum.length());
}
- Y_VERIFY(firstDatum.is_scalar());
- // need length
- auto second = Second->GetValue(ctx);
- const auto& secondDatum = TArrowBlock::From(second).GetDatum();
- return MakeTrueArray(ctx, secondDatum.length());
- }
- auto second = Second->GetValue(ctx);
- const auto& secondDatum = TArrowBlock::From(second).GetDatum();
+ return arrow::Status::OK();
+ }
if (IsAllEqualsTo(secondDatum, true)) {
// ... OR true = true
- if (ScalarResult == secondDatum.is_scalar()) {
- return second.Release();
+ if (secondDatum.is_array()) {
+ *res = secondDatum;
+ } else {
+ *res = MakeTrueArray(ctx->memory_pool(), firstDatum.length());
}
- Y_VERIFY(secondDatum.is_scalar());
- return MakeTrueArray(ctx, firstDatum.length());
- }
-
- if (firstDatum.is_scalar() && secondDatum.is_scalar()) {
- bool first_true = firstDatum.scalar()->is_valid && (firstDatum.scalar_as<arrow::UInt8Scalar>().value & 1u != 0);
- bool first_false = firstDatum.scalar()->is_valid && (firstDatum.scalar_as<arrow::UInt8Scalar>().value & 1u == 0);
-
- bool second_true = secondDatum.scalar()->is_valid && (secondDatum.scalar_as<arrow::UInt8Scalar>().value & 1u != 0);
- bool second_false = secondDatum.scalar()->is_valid && (secondDatum.scalar_as<arrow::UInt8Scalar>().value & 1u == 0);
- auto result = std::make_shared<arrow::UInt8Scalar>(ui8(first_true || second_true));
- result->is_valid = first_true || second_true || (first_false && second_false);
-
- return ctx.HolderFactory.CreateArrowBlock(arrow::Datum(result));
+ return arrow::Status::OK();
}
if (firstDatum.is_scalar()) {
ui8 value = firstDatum.scalar_as<arrow::UInt8Scalar>().value;
bool valid = firstDatum.scalar()->is_valid;
- return CalcScalarArray(ctx, value, valid, secondDatum.array());
- }
-
- if (secondDatum.is_scalar()) {
+ *res = CalcScalarArray(ctx->memory_pool(), value, valid, secondDatum.array());
+ } else if (secondDatum.is_scalar()) {
ui8 value = secondDatum.scalar_as<arrow::UInt8Scalar>().value;
bool valid = secondDatum.scalar()->is_valid;
- return CalcScalarArray(ctx, value, valid, firstDatum.array());
+ *res = CalcScalarArray(ctx->memory_pool(), value, valid, firstDatum.array());
+ } else {
+ *res = CalcArrayArray(ctx->memory_pool(), firstDatum.array(), secondDatum.array());
}
- return CalcArrayArray(ctx, firstDatum.array(), secondDatum.array());
+ return arrow::Status::OK();
}
private:
- NUdf::TUnboxedValuePod MakeTrueArray(TComputationContext& ctx, int64_t len) const {
- std::shared_ptr<arrow::Buffer> data = arrow::AllocateBuffer(len, &ctx.ArrowMemoryPool).ValueOrDie();
- std::memset(data->mutable_data(), 1, len);
- return ctx.HolderFactory.CreateArrowBlock(arrow::ArrayData::Make(arrow::uint8(), len, { std::shared_ptr<arrow::Buffer>{}, data }));
- }
-
- NUdf::TUnboxedValuePod CalcScalarArray(TComputationContext& ctx, ui8 value, bool valid, const std::shared_ptr<arrow::ArrayData>& arr) const {
+ arrow::Datum CalcScalarArray(arrow::MemoryPool* pool, ui8 value, bool valid, const std::shared_ptr<arrow::ArrayData>& arr) const {
bool first_true = valid && value;
bool first_false = valid && !value;
if (first_true) {
- return MakeTrueArray(ctx, arr->length);
+ return MakeTrueArray(pool, arr->length);
}
if (first_false) {
- return ctx.HolderFactory.CreateArrowBlock(arr);
+ return arr;
}
// scalar is null -> result is valid _only_ if arr[i] == true
//bitmap = bitmap and data[i]
- std::shared_ptr<arrow::Buffer> bitmap = arrow::AllocateBitmap(arr->length, &ctx.ArrowMemoryPool).ValueOrDie();
+ std::shared_ptr<arrow::Buffer> bitmap = ARROW_RESULT(arrow::AllocateBitmap(arr->length, pool));
CompressSparseBitmap(bitmap->mutable_data(), arr->GetValues<ui8>(1), arr->length);
if (arr->buffers[0]) {
- bitmap = arrow::internal::BitmapAnd(&ctx.ArrowMemoryPool, arr->GetValues<ui8>(0, 0), arr->offset, bitmap->data(), 0, arr->length, 0).ValueOrDie();
+ bitmap = ARROW_RESULT(arrow::internal::BitmapAnd(pool, arr->GetValues<ui8>(0, 0), arr->offset, bitmap->data(), 0, arr->length, 0));
}
- std::shared_ptr<arrow::Buffer> data = CopySparseBitmap(&ctx.ArrowMemoryPool, arr->buffers[1], arr->offset, arr->length);
- return ctx.HolderFactory.CreateArrowBlock(arrow::ArrayData::Make(arr->type, arr->length, { bitmap, data }));
+ std::shared_ptr<arrow::Buffer> data = CopySparseBitmap(pool, arr->buffers[1], arr->offset, arr->length);
+ return arrow::ArrayData::Make(arr->type, arr->length, { bitmap, data });
}
- NUdf::TUnboxedValuePod CalcArrayArray(TComputationContext& ctx, const std::shared_ptr<arrow::ArrayData>& arr1,
+ arrow::Datum CalcArrayArray(arrow::MemoryPool* pool, const std::shared_ptr<arrow::ArrayData>& arr1,
const std::shared_ptr<arrow::ArrayData>& arr2) const
{
- Y_VERIFY(arr1->offset == arr2->offset);
Y_VERIFY(arr1->length == arr2->length);
auto buf1 = arr1->buffers[0];
auto buf2 = arr2->buffers[0];
- const int64_t offset = arr1->offset;
+ const int64_t offset1 = arr1->offset;
+ const int64_t offset2 = arr2->offset;
const int64_t length = arr1->length;
std::shared_ptr<arrow::Buffer> bitmap;
if (buf1 || buf2) {
- bitmap = arrow::AllocateBitmap(length, &ctx.ArrowMemoryPool).ValueOrDie();
- auto first = arrow::AllocateBitmap(length, &ctx.ArrowMemoryPool).ValueOrDie();
- auto second = arrow::AllocateBitmap(length, &ctx.ArrowMemoryPool).ValueOrDie();
+ bitmap = ARROW_RESULT(arrow::AllocateBitmap(length, pool));
+ auto first = ARROW_RESULT(arrow::AllocateBitmap(length, pool));
+ auto second = ARROW_RESULT(arrow::AllocateBitmap(length, pool));
CompressSparseBitmap(first->mutable_data(), arr1->GetValues<ui8>(1), length);
CompressSparseBitmap(second->mutable_data(), arr2->GetValues<ui8>(1), length);
@@ -347,8 +301,8 @@ private:
//bitmap = first_true | second_true | (first_false & second_false);
//bitmap = (b1 & v1) | (b2 & v2) | (b1 & ~v1 & b2 & ~v2)
if (buf1 && buf2) {
- Bitmap b1(buf1, offset, length);
- Bitmap b2(buf2, offset, length);
+ Bitmap b1(buf1, offset1, length);
+ Bitmap b2(buf2, offset2, length);
std::array<Bitmap, 4> in{b1, v1, b2, v2};
Bitmap::VisitWordsAndWrite(in, &out, [](const std::array<uint64_t, 4>& in, std::array<uint64_t, 1>* out) {
@@ -359,7 +313,7 @@ private:
out->at(0) = (b1 & v1) | (b2 & v2) | (b1 & ~v1 & b2 & ~v2);
});
} else if (buf1) {
- Bitmap b1(buf1, offset, length);
+ Bitmap b1(buf1, offset1, length);
std::array<Bitmap, 3> in{b1, v1, v2};
Bitmap::VisitWordsAndWrite(in, &out, [](const std::array<uint64_t, 3>& in, std::array<uint64_t, 1>* out) {
@@ -369,7 +323,7 @@ private:
out->at(0) = (b1 & v1) | v2 | (b1 & ~v1 & ~v2);
});
} else {
- Bitmap b2(buf2, offset, length);
+ Bitmap b2(buf2, offset2, length);
std::array<Bitmap, 3> in{v1, b2, v2};
Bitmap::VisitWordsAndWrite(in, &out, [](const std::array<uint64_t, 3>& in, std::array<uint64_t, 1>* out) {
@@ -380,205 +334,114 @@ private:
});
}
}
- std::shared_ptr<arrow::Buffer> data = arrow::AllocateBuffer(length, &ctx.ArrowMemoryPool).ValueOrDie();
+ std::shared_ptr<arrow::Buffer> data = ARROW_RESULT(arrow::AllocateBuffer(length, pool));
OrSparseBitmaps(data->mutable_data(), arr1->GetValues<ui8>(1), arr2->GetValues<ui8>(1), length);
- return ctx.HolderFactory.CreateArrowBlock(arrow::ArrayData::Make(arr1->type, length, { bitmap, data }));
- }
-
- void RegisterDependencies() const final {
- DependsOn(First);
- DependsOn(Second);
+ return arrow::ArrayData::Make(arr1->type, length, { bitmap, data });
}
-
- IComputationNode* const First;
- IComputationNode* const Second;
- const bool ScalarResult;
};
-class TBlockXorWrapper : public TMutableComputationNode<TBlockXorWrapper> {
+class TXorBlockExec {
public:
- TBlockXorWrapper(TComputationMutables& mutables, IComputationNode* first, IComputationNode* second, TBlockType::EShape shape)
- : TMutableComputationNode(mutables)
- , First(first)
- , Second(second)
- , ScalarResult(shape == TBlockType::EShape::Scalar)
- {
- }
-
- NUdf::TUnboxedValuePod DoCalculate(TComputationContext& ctx) const {
- auto first = First->GetValue(ctx);
- const auto& firstDatum = TArrowBlock::From(first).GetDatum();
-
+ arrow::Status Exec(arrow::compute::KernelContext* ctx, const arrow::compute::ExecBatch& batch, arrow::Datum* res) const {
+ auto firstDatum = batch.values[0];
+ auto secondDatum = batch.values[1];
+ MKQL_ENSURE(!firstDatum.is_scalar() || !secondDatum.is_scalar(), "Expected at least one array");
if (firstDatum.null_count() == firstDatum.length()) {
- if (ScalarResult == firstDatum.is_scalar()) {
- return first.Release();
+ if (firstDatum.is_array()) {
+ *res = firstDatum;
+ } else {
+ *res = MakeNullArray(ctx->memory_pool(), secondDatum.length());
}
- Y_VERIFY(firstDatum.is_scalar());
- // need length
- auto second = Second->GetValue(ctx);
- const auto& secondDatum = TArrowBlock::From(second).GetDatum();
- return MakeNullArray(ctx, secondDatum.length());
- }
- auto second = Second->GetValue(ctx);
- const auto& secondDatum = TArrowBlock::From(second).GetDatum();
+ return arrow::Status::OK();
+ }
if (secondDatum.null_count() == secondDatum.length()) {
- return (ScalarResult == secondDatum.is_scalar()) ? second.Release() : MakeNullArray(ctx, firstDatum.length());
- }
+ if (secondDatum.is_array()) {
+ *res = secondDatum;
+ } else {
+ *res = MakeNullArray(ctx->memory_pool(), firstDatum.length());
+ }
- if (firstDatum.is_scalar() && secondDatum.is_scalar()) {
- Y_VERIFY(firstDatum.scalar()->is_valid);
- Y_VERIFY(secondDatum.scalar()->is_valid);
- ui8 result = firstDatum.scalar_as<arrow::UInt8Scalar>().value ^ secondDatum.scalar_as<arrow::UInt8Scalar>().value;
- result &= 1u;
- return ctx.HolderFactory.CreateArrowBlock(arrow::Datum(result));
+ return arrow::Status::OK();
}
if (firstDatum.is_scalar()) {
ui8 value = firstDatum.scalar_as<arrow::UInt8Scalar>().value;
- return CalcScalarArray(ctx, value, secondDatum.array());
- }
-
- if (secondDatum.is_scalar()) {
+ *res = CalcScalarArray(ctx->memory_pool(), value, secondDatum.array());
+ } else if (secondDatum.is_scalar()) {
ui8 value = secondDatum.scalar_as<arrow::UInt8Scalar>().value;
- return CalcScalarArray(ctx, value, firstDatum.array());
+ *res = CalcScalarArray(ctx->memory_pool(), value, firstDatum.array());
+ } else {
+ *res = CalcArrayArray(ctx->memory_pool(), firstDatum.array(), secondDatum.array());
}
- return CalcArrayArray(ctx, firstDatum.array(), secondDatum.array());
+ return arrow::Status::OK();
}
private:
- NUdf::TUnboxedValuePod MakeNullArray(TComputationContext& ctx, int64_t len) const {
- std::shared_ptr<arrow::Array> arr = arrow::MakeArrayOfNull(arrow::uint8(), len, &ctx.ArrowMemoryPool).ValueOrDie();
- return ctx.HolderFactory.CreateArrowBlock(arr->data());
- }
-
- NUdf::TUnboxedValuePod CalcScalarArray(TComputationContext& ctx, ui8 value, const std::shared_ptr<arrow::ArrayData>& arr) const {
- std::shared_ptr<arrow::Buffer> bitmap = CopyBitmap(&ctx.ArrowMemoryPool, arr->buffers[0], arr->offset, arr->length);
- std::shared_ptr<arrow::Buffer> data = arrow::AllocateBuffer(arr->length, &ctx.ArrowMemoryPool).ValueOrDie();
+ arrow::Datum CalcScalarArray(arrow::MemoryPool* pool, ui8 value, const std::shared_ptr<arrow::ArrayData>& arr) const {
+ std::shared_ptr<arrow::Buffer> bitmap = CopyBitmap(pool, arr->buffers[0], arr->offset, arr->length);
+ std::shared_ptr<arrow::Buffer> data = ARROW_RESULT(arrow::AllocateBuffer(arr->length, pool));
XorSparseBitmapScalar(data->mutable_data(), value, arr->GetValues<ui8>(1), arr->length);
- return ctx.HolderFactory.CreateArrowBlock(arrow::ArrayData::Make(arr->type, arr->length, { bitmap, data }));
+ return arrow::ArrayData::Make(arr->type, arr->length, { bitmap, data });
}
- NUdf::TUnboxedValuePod CalcArrayArray(TComputationContext& ctx, const std::shared_ptr<arrow::ArrayData>& arr1,
+ arrow::Datum CalcArrayArray(arrow::MemoryPool* pool, const std::shared_ptr<arrow::ArrayData>& arr1,
const std::shared_ptr<arrow::ArrayData>& arr2) const
{
- Y_VERIFY(arr1->offset == arr2->offset);
Y_VERIFY(arr1->length == arr2->length);
auto b1 = arr1->buffers[0];
auto b2 = arr2->buffers[0];
- const int64_t offset = arr1->offset;
+ const int64_t offset1 = arr1->offset;
+ const int64_t offset2 = arr2->offset;
const int64_t length = arr1->length;
std::shared_ptr<arrow::Buffer> bitmap;
if (b1 && b2) {
- bitmap = arrow::internal::BitmapAnd(&ctx.ArrowMemoryPool, b1->data(), offset, b2->data(), offset, length, 0).ValueOrDie();
+ bitmap = ARROW_RESULT(arrow::internal::BitmapAnd(pool, b1->data(), offset1, b2->data(), offset2, length, 0));
} else {
- bitmap = CopyBitmap(&ctx.ArrowMemoryPool, b1 ? b1 : b2, offset, length);
+ bitmap = CopyBitmap(pool, b1 ? b1 : b2, b1 ? offset1 : offset2, length);
}
- std::shared_ptr<arrow::Buffer> data = arrow::AllocateBuffer(length, &ctx.ArrowMemoryPool).ValueOrDie();
+ std::shared_ptr<arrow::Buffer> data = ARROW_RESULT(arrow::AllocateBuffer(length, pool));
XorSparseBitmaps(data->mutable_data(), arr1->GetValues<ui8>(1), arr2->GetValues<ui8>(1), length);
- return ctx.HolderFactory.CreateArrowBlock(arrow::ArrayData::Make(arr1->type, length, { bitmap, data }));
- }
-
- void RegisterDependencies() const final {
- DependsOn(First);
- DependsOn(Second);
+ return arrow::ArrayData::Make(arr1->type, length, { bitmap, data });
}
-
- IComputationNode* const First;
- IComputationNode* const Second;
- const bool ScalarResult;
};
-class TBlockNotWrapper : public TMutableComputationNode<TBlockNotWrapper> {
-friend class TArrowNode;
-public:
- class TArrowNode : public IArrowKernelComputationNode {
- public:
- TArrowNode(const TBlockNotWrapper* parent)
- : Parent_(parent)
- , ArgsValuesDescr_({arrow::uint8()})
- , Kernel_({arrow::uint8()}, arrow::uint8(), [parent](arrow::compute::KernelContext* ctx, const arrow::compute::ExecBatch& batch, arrow::Datum* res) {
- *res = parent->CalculateImpl(MakeDatumProvider(batch.values[0]), *ctx->memory_pool());
- return arrow::Status::OK();
- })
- {
- Kernel_.null_handling = arrow::compute::NullHandling::COMPUTED_NO_PREALLOCATE;
- Kernel_.mem_allocation = arrow::compute::MemAllocation::NO_PREALLOCATE;
- }
-
- TStringBuf GetKernelName() const final {
- return "Not";
- }
-
- const arrow::compute::ScalarKernel& GetArrowKernel() const {
- return Kernel_;
- }
-
- const std::vector<arrow::ValueDescr>& GetArgsDesc() const {
- return ArgsValuesDescr_;
- }
-
- const IComputationNode* GetArgument(ui32 index) const {
- switch (index) {
- case 0:
- return Parent_->Value;
- default:
- throw yexception() << "Bad argument index";
- }
- }
-
- private:
- const TBlockNotWrapper* Parent_;
- const std::vector<arrow::ValueDescr> ArgsValuesDescr_;
- arrow::compute::ScalarKernel Kernel_;
- };
-
+class TNotBlockExec {
public:
- TBlockNotWrapper(TComputationMutables& mutables, IComputationNode* value)
- : TMutableComputationNode(mutables)
- , Value(value)
- {
- }
-
- std::unique_ptr<IArrowKernelComputationNode> PrepareArrowKernelComputationNode(TComputationContext& ctx) const final {
- Y_UNUSED(ctx);
- return std::make_unique<TArrowNode>(this);
- }
-
- arrow::Datum CalculateImpl(const TDatumProvider& valueProv, arrow::MemoryPool& memoryPool) const {
- auto datum = valueProv();
- if (datum.null_count() == datum.length()) {
- return datum;
- }
-
- if (datum.is_scalar()) {
- Y_VERIFY(datum.scalar()->is_valid);
- ui8 negated = (~datum.scalar_as<arrow::UInt8Scalar>().value) & 1u;
- return arrow::Datum(negated);
+ arrow::Status Exec(arrow::compute::KernelContext* ctx, const arrow::compute::ExecBatch& batch, arrow::Datum* res) const {
+ const auto& input = batch.values[0];
+ MKQL_ENSURE(input.is_array(), "Expected array");
+ const auto& arr = *input.array();
+ if (arr.GetNullCount() == arr.length) {
+ *res = input;
+ } else {
+ auto bitmap = CopyBitmap(ctx->memory_pool(), arr.buffers[0], arr.offset, arr.length);
+ std::shared_ptr<arrow::Buffer> data = ARROW_RESULT(arrow::AllocateBuffer(arr.length, ctx->memory_pool()));;
+ NegateSparseBitmap(data->mutable_data(), arr.GetValues<ui8>(1), arr.length);
+ *res = arrow::ArrayData::Make(arr.type, arr.length, { bitmap, data });
}
- auto arr = datum.array();
- std::shared_ptr<arrow::Buffer> bitmap = CopyBitmap(&memoryPool, arr->buffers[0], arr->offset, arr->length);
- std::shared_ptr<arrow::Buffer> data = arrow::AllocateBuffer(arr->length, &memoryPool).ValueOrDie();
- NegateSparseBitmap(data->mutable_data(), arr->GetValues<ui8>(1), arr->length);
- return arrow::ArrayData::Make(arr->type, arr->length, { bitmap, data });
- }
-
- NUdf::TUnboxedValuePod DoCalculate(TComputationContext& ctx) const {
- return ctx.HolderFactory.CreateArrowBlock(CalculateImpl(MakeDatumProvider(Value, ctx), ctx.ArrowMemoryPool));
+ return arrow::Status::OK();
}
-
-private:
- void RegisterDependencies() const final {
- DependsOn(Value);
- }
-
- IComputationNode* const Value;
};
+template <typename TExec>
+std::shared_ptr<arrow::compute::ScalarKernel> MakeKernel(const TVector<TType*>& argTypes, TType* resultType) {
+ std::shared_ptr<arrow::DataType> returnArrowType;
+ MKQL_ENSURE(ConvertArrowType(AS_TYPE(TBlockType, resultType)->GetItemType(), returnArrowType), "Unsupported arrow type");
+ auto exec = std::make_shared<TExec>();
+ auto kernel = std::make_shared<arrow::compute::ScalarKernel>(ConvertToInputTypes(argTypes), ConvertToOutputType(resultType),
+ [exec](arrow::compute::KernelContext* ctx, const arrow::compute::ExecBatch& batch, arrow::Datum* res) {
+ return exec->Exec(ctx, batch, res);
+ });
+
+ kernel->null_handling = arrow::compute::NullHandling::COMPUTED_NO_PREALLOCATE;
+ return kernel;
+}
+
IComputationNode* WrapBlockLogical(std::string_view name, TCallable& callable, const TComputationNodeFactoryContext& ctx) {
MKQL_ENSURE(callable.GetInputsCount() == 2, "Expected 2 args");
@@ -591,33 +454,35 @@ IComputationNode* WrapBlockLogical(std::string_view name, TCallable& callable, c
MKQL_ENSURE(UnpackOptionalData(secondType->GetItemType(), isOpt2)->GetSchemeType() == NUdf::TDataType<bool>::Id,
"Requires boolean args.");
- TBlockType::EShape shape = GetResultShape({firstType, secondType});
-
- auto first = LocateNode(ctx.NodeLocator, callable, 0);
- auto second = LocateNode(ctx.NodeLocator, callable, 1);
-
- if (name == "and") {
- return new TBlockAndWrapper(ctx.Mutables, first, second, shape);
- }
- if (name == "or") {
- return new TBlockOrWrapper(ctx.Mutables, first, second, shape);
+ auto compute1 = LocateNode(ctx.NodeLocator, callable, 0);
+ auto compute2 = LocateNode(ctx.NodeLocator, callable, 1);
+ TVector<IComputationNode*> argsNodes = { compute1, compute2 };
+ TVector<TType*> argsTypes = { callable.GetInput(0).GetStaticType(), callable.GetInput(1).GetStaticType() };
+
+ std::shared_ptr<arrow::compute::ScalarKernel> kernel;
+ if (name == "And") {
+ kernel = MakeKernel<TAndBlockExec>(argsTypes, callable.GetType()->GetReturnType());
+ } else if (name == "Or") {
+ kernel = MakeKernel<TOrBlockExec>(argsTypes, callable.GetType()->GetReturnType());
+ } else {
+ kernel = MakeKernel<TXorBlockExec>(argsTypes, callable.GetType()->GetReturnType());
}
- return new TBlockXorWrapper(ctx.Mutables, first, second, shape);
-}
+ return new TBlockFuncNode(ctx.Mutables, name, std::move(argsNodes), argsTypes, *kernel, kernel);
+}
} // namespace
IComputationNode* WrapBlockAnd(TCallable& callable, const TComputationNodeFactoryContext& ctx) {
- return WrapBlockLogical("and", callable, ctx);
+ return WrapBlockLogical("And", callable, ctx);
}
IComputationNode* WrapBlockOr(TCallable& callable, const TComputationNodeFactoryContext& ctx) {
- return WrapBlockLogical("or", callable, ctx);
+ return WrapBlockLogical("Or", callable, ctx);
}
IComputationNode* WrapBlockXor(TCallable& callable, const TComputationNodeFactoryContext& ctx) {
- return WrapBlockLogical("xor", callable, ctx);
+ return WrapBlockLogical("Xor", callable, ctx);
}
IComputationNode* WrapBlockNot(TCallable& callable, const TComputationNodeFactoryContext& ctx) {
@@ -628,7 +493,12 @@ IComputationNode* WrapBlockNot(TCallable& callable, const TComputationNodeFactor
MKQL_ENSURE(UnpackOptionalData(dataType->GetItemType(), isOpt)->GetSchemeType() == NUdf::TDataType<bool>::Id,
"Requires boolean args.");
- return new TBlockNotWrapper(ctx.Mutables, LocateNode(ctx.NodeLocator, callable, 0));
+ auto compute = LocateNode(ctx.NodeLocator, callable, 0);
+ TVector<IComputationNode*> argsNodes = { compute };
+ TVector<TType*> argsTypes = { callable.GetInput(0).GetStaticType() };
+
+ auto kernel = MakeKernel<TNotBlockExec>(argsTypes, argsTypes[0]);
+ return new TBlockFuncNode(ctx.Mutables, "Not", std::move(argsNodes), argsTypes, *kernel, kernel);
}