aboutsummaryrefslogtreecommitdiffstats
path: root/yql/essentials/minikql/computation/mkql_block_impl.h
blob: c94047aa049af4436c9772d537f9661ebdc2661d (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
#pragma once

#include "mkql_computation_node_impl.h"
#include "mkql_computation_node_holders.h"

#include <yql/essentials/minikql/arrow/arrow_util.h>
#include <yql/essentials/public/udf/arrow/block_item.h>

#include <arrow/array.h>
#include <arrow/scalar.h>
#include <arrow/datum.h>
#include <arrow/compute/kernel.h>

extern "C" uint64_t GetBlockCount(const NYql::NUdf::TUnboxedValuePod data);
extern "C" uint64_t GetBitmapPopCountCount(const NYql::NUdf::TUnboxedValuePod data);
extern "C" uint8_t GetBitmapScalarValue(const NYql::NUdf::TUnboxedValuePod data);

namespace NKikimr::NMiniKQL {

arrow::Datum ConvertScalar(TType* type, const NUdf::TUnboxedValuePod& value, arrow::MemoryPool& pool);
arrow::Datum ConvertScalar(TType* type, const NUdf::TBlockItem& value, arrow::MemoryPool& pool);
arrow::Datum MakeArrayFromScalar(const arrow::Scalar& scalar, size_t len, TType* type, arrow::MemoryPool& pool);

arrow::ValueDescr ToValueDescr(TType* type);
std::vector<arrow::ValueDescr> ToValueDescr(const TVector<TType*>& types);

std::vector<arrow::compute::InputType> ConvertToInputTypes(const TVector<TType*>& argTypes);
arrow::compute::OutputType ConvertToOutputType(TType* output);

NUdf::TUnboxedValuePod MakeBlockCount(const THolderFactory& holderFactory, const uint64_t count);

class TBlockFuncNode : public TMutableComputationNode<TBlockFuncNode> {

public:
    TBlockFuncNode(TComputationMutables& mutables, TStringBuf name, TComputationNodePtrVector&& argsNodes,
        const TVector<TType*>& argsTypes, const arrow::compute::ScalarKernel& kernel,
        std::shared_ptr<arrow::compute::ScalarKernel> kernelHolder = {},
        const arrow::compute::FunctionOptions* functionOptions = nullptr);

    NUdf::TUnboxedValuePod DoCalculate(TComputationContext& ctx) const;
private:
    class TArrowNode : public IArrowKernelComputationNode {
    public:
        TArrowNode(const TBlockFuncNode* parent);
        TStringBuf GetKernelName() const final;
        const arrow::compute::ScalarKernel& GetArrowKernel() const final;
        const std::vector<arrow::ValueDescr>& GetArgsDesc() const final;
        const IComputationNode* GetArgument(ui32 index) const final;

    private:
        const TBlockFuncNode* Parent_;
    };
    friend class TArrowNode;

    struct TState : public TComputationValue<TState> {
        using TComputationValue::TComputationValue;

        TState(TMemoryUsageInfo* memInfo, const arrow::compute::FunctionOptions* options,
               const arrow::compute::ScalarKernel& kernel, const std::vector<arrow::ValueDescr>& argsValuesDescr,
               TComputationContext& ctx)
               : TComputationValue(memInfo)
               , ExecContext(&ctx.ArrowMemoryPool, nullptr, nullptr)
               , KernelContext(&ExecContext)
        {
            if (kernel.init) {
                State = ARROW_RESULT(kernel.init(&KernelContext, { &kernel, argsValuesDescr, options }));
                KernelContext.SetState(State.get());
            }
        }

        arrow::compute::ExecContext ExecContext;
        arrow::compute::KernelContext KernelContext;
        std::unique_ptr<arrow::compute::KernelState> State;
    };

    void RegisterDependencies() const final;
    TState& GetState(TComputationContext& ctx) const;

    std::unique_ptr<IArrowKernelComputationNode> PrepareArrowKernelComputationNode(TComputationContext& ctx) const final;

private:
    const ui32 StateIndex;
    const TComputationNodePtrVector ArgsNodes;
    const std::vector<arrow::ValueDescr> ArgsValuesDescr;
    const arrow::compute::ScalarKernel& Kernel;
    const std::shared_ptr<arrow::compute::ScalarKernel> KernelHolder;
    const arrow::compute::FunctionOptions* const Options;
    const bool ScalarOutput;
    const TString Name;
};

struct TBlockState : public TComputationValue<TBlockState> {
    using TBase = TComputationValue<TBlockState>;

    ui64 Count = 0;
    NUdf::TUnboxedValue* Pointer_ = nullptr;

    TUnboxedValueVector Values;
    std::vector<std::deque<std::shared_ptr<arrow::ArrayData>>> Deques;
    std::vector<std::shared_ptr<arrow::ArrayData>> Arrays;

    TBlockState(TMemoryUsageInfo* memInfo, size_t width);

    void ClearValues();

    void FillArrays();

    ui64 Slice();

    NUdf::TUnboxedValuePod Get(const ui64 sliceSize, const THolderFactory& holderFactory, const size_t idx) const;
};
} //namespace NKikimr::NMiniKQL