1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
|
#pragma once
#include "mkql_computation_node_impl.h"
#include "mkql_computation_node_holders.h"
#include <yql/essentials/minikql/arrow/arrow_util.h>
#include <yql/essentials/public/udf/arrow/block_item.h>
#include <arrow/array.h>
#include <arrow/scalar.h>
#include <arrow/datum.h>
#include <arrow/compute/kernel.h>
extern "C" uint64_t GetBlockCount(const NYql::NUdf::TUnboxedValuePod data);
extern "C" uint64_t GetBitmapPopCountCount(const NYql::NUdf::TUnboxedValuePod data);
extern "C" uint8_t GetBitmapScalarValue(const NYql::NUdf::TUnboxedValuePod data);
namespace NKikimr::NMiniKQL {
arrow::Datum ConvertScalar(TType* type, const NUdf::TUnboxedValuePod& value, arrow::MemoryPool& pool);
arrow::Datum ConvertScalar(TType* type, const NUdf::TBlockItem& value, arrow::MemoryPool& pool);
arrow::Datum MakeArrayFromScalar(const arrow::Scalar& scalar, size_t len, TType* type, arrow::MemoryPool& pool);
arrow::ValueDescr ToValueDescr(TType* type);
std::vector<arrow::ValueDescr> ToValueDescr(const TVector<TType*>& types);
std::vector<arrow::compute::InputType> ConvertToInputTypes(const TVector<TType*>& argTypes);
arrow::compute::OutputType ConvertToOutputType(TType* output);
NUdf::TUnboxedValuePod MakeBlockCount(const THolderFactory& holderFactory, const uint64_t count);
class TBlockFuncNode : public TMutableComputationNode<TBlockFuncNode> {
public:
TBlockFuncNode(TComputationMutables& mutables, TStringBuf name, TComputationNodePtrVector&& argsNodes,
const TVector<TType*>& argsTypes, const arrow::compute::ScalarKernel& kernel,
std::shared_ptr<arrow::compute::ScalarKernel> kernelHolder = {},
const arrow::compute::FunctionOptions* functionOptions = nullptr);
NUdf::TUnboxedValuePod DoCalculate(TComputationContext& ctx) const;
private:
class TArrowNode : public IArrowKernelComputationNode {
public:
TArrowNode(const TBlockFuncNode* parent);
TStringBuf GetKernelName() const final;
const arrow::compute::ScalarKernel& GetArrowKernel() const final;
const std::vector<arrow::ValueDescr>& GetArgsDesc() const final;
const IComputationNode* GetArgument(ui32 index) const final;
private:
const TBlockFuncNode* Parent_;
};
friend class TArrowNode;
struct TState : public TComputationValue<TState> {
using TComputationValue::TComputationValue;
TState(TMemoryUsageInfo* memInfo, const arrow::compute::FunctionOptions* options,
const arrow::compute::ScalarKernel& kernel, const std::vector<arrow::ValueDescr>& argsValuesDescr,
TComputationContext& ctx)
: TComputationValue(memInfo)
, ExecContext(&ctx.ArrowMemoryPool, nullptr, nullptr)
, KernelContext(&ExecContext)
{
if (kernel.init) {
State = ARROW_RESULT(kernel.init(&KernelContext, { &kernel, argsValuesDescr, options }));
KernelContext.SetState(State.get());
}
}
arrow::compute::ExecContext ExecContext;
arrow::compute::KernelContext KernelContext;
std::unique_ptr<arrow::compute::KernelState> State;
};
void RegisterDependencies() const final;
TState& GetState(TComputationContext& ctx) const;
std::unique_ptr<IArrowKernelComputationNode> PrepareArrowKernelComputationNode(TComputationContext& ctx) const final;
private:
const ui32 StateIndex;
const TComputationNodePtrVector ArgsNodes;
const std::vector<arrow::ValueDescr> ArgsValuesDescr;
const arrow::compute::ScalarKernel& Kernel;
const std::shared_ptr<arrow::compute::ScalarKernel> KernelHolder;
const arrow::compute::FunctionOptions* const Options;
const bool ScalarOutput;
const TString Name;
};
struct TBlockState : public TComputationValue<TBlockState> {
using TBase = TComputationValue<TBlockState>;
ui64 Count = 0;
NUdf::TUnboxedValue* Pointer_ = nullptr;
TUnboxedValueVector Values;
std::vector<std::deque<std::shared_ptr<arrow::ArrayData>>> Deques;
std::vector<std::shared_ptr<arrow::ArrayData>> Arrays;
TBlockState(TMemoryUsageInfo* memInfo, size_t width);
void ClearValues();
void FillArrays();
ui64 Slice();
NUdf::TUnboxedValuePod Get(const ui64 sliceSize, const THolderFactory& holderFactory, const size_t idx) const;
};
} //namespace NKikimr::NMiniKQL
|