aboutsummaryrefslogtreecommitdiffstats
path: root/yql/essentials/minikql/comp_nodes/mkql_block_exists.cpp
blob: 7df219329e1af5948980d69fbde75b60899e5532 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
#include "mkql_exists.h"
#include <yql/essentials/minikql/arrow/arrow_util.h>
#include <yql/essentials/minikql/computation/mkql_block_impl.h>
#include <yql/essentials/minikql/mkql_node_cast.h>

namespace NKikimr {
namespace NMiniKQL {

namespace {

class TBlockExistsExec {
public:
    arrow::Status Exec(arrow::compute::KernelContext* ctx, const arrow::compute::ExecBatch& batch, arrow::Datum* res) const {
        const auto& input = batch.values[0];
        MKQL_ENSURE(input.is_array(), "Expected array");
        const auto& arr = *input.array();

        auto nullCount = arr.GetNullCount();
        if (nullCount == arr.length) {
            *res = MakeFalseArray(ctx->memory_pool(), arr.length);
        } else if (nullCount == 0) {
            *res = MakeTrueArray(ctx->memory_pool(), arr.length);
        } else {
            *res = MakeBitmapArray(ctx->memory_pool(), arr.length, arr.offset,
                arr.buffers[0]->data());
        }

        return arrow::Status::OK();
    }
};

std::shared_ptr<arrow::compute::ScalarKernel> MakeBlockExistsKernel(const TVector<TType*>& argTypes, TType* resultType) {
    std::shared_ptr<arrow::DataType> returnArrowType;
    MKQL_ENSURE(ConvertArrowType(AS_TYPE(TBlockType, resultType)->GetItemType(), returnArrowType), "Unsupported arrow type");
    // Ensure the result Arrow type (i.e. boolean) is Arrow UInt8Type.
    Y_DEBUG_ABORT_UNLESS(returnArrowType == arrow::uint8());
    auto exec = std::make_shared<TBlockExistsExec>();
    auto kernel = std::make_shared<arrow::compute::ScalarKernel>(ConvertToInputTypes(argTypes), ConvertToOutputType(resultType),
        [exec](arrow::compute::KernelContext* ctx, const arrow::compute::ExecBatch& batch, arrow::Datum* res) {
        return exec->Exec(ctx, batch, res);
    });
    kernel->null_handling = arrow::compute::NullHandling::OUTPUT_NOT_NULL;
    return kernel;
}

} // namespace

IComputationNode* WrapBlockExists(TCallable& callable, const TComputationNodeFactoryContext& ctx) {
    MKQL_ENSURE(callable.GetInputsCount() == 1, "Expected 1 arg");
    auto compute = LocateNode(ctx.NodeLocator, callable, 0);
    TComputationNodePtrVector argsNodes = { compute };
    TVector<TType*> argsTypes = { callable.GetInput(0).GetStaticType() };
    auto kernel = MakeBlockExistsKernel(argsTypes, callable.GetType()->GetReturnType());
    return new TBlockFuncNode(ctx.Mutables, "Exists", std::move(argsNodes), argsTypes, *kernel, kernel);
}

} // namespace NMiniKQL
} // namespace NKikimr