aboutsummaryrefslogtreecommitdiffstats
path: root/yql/essentials/minikql/comp_nodes/mkql_block_agg_factory.h
blob: 84f24390bd5181fcfcc4ed185af99026d791c59c (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
#pragma once

#include <yql/essentials/minikql/computation/mkql_computation_node.h>
#include <yql/essentials/minikql/computation/mkql_computation_node_holders.h>

namespace NKikimr {
namespace NMiniKQL {

class IAggColumnBuilder {
public:
    virtual ~IAggColumnBuilder() = default;

    virtual void Add(const void* state) = 0;

    virtual NUdf::TUnboxedValue Build() = 0;
};

class IBlockAggregatorBase {
public:
    virtual ~IBlockAggregatorBase() = default;

    const ui32 StateSize;

    explicit IBlockAggregatorBase(ui32 stateSize)
        : StateSize(stateSize)
    {}

    virtual void DestroyState(void* state) noexcept = 0;
};


class IBlockAggregatorCombineAll : public IBlockAggregatorBase {
public:
    virtual void InitState(void* state) = 0;

    virtual void AddMany(void* state, const NUdf::TUnboxedValue* columns, ui64 batchLength, std::optional<ui64> filtered) = 0;

    virtual NUdf::TUnboxedValue FinishOne(const void* state) = 0;

    explicit IBlockAggregatorCombineAll(ui32 stateSize)
        : IBlockAggregatorBase(stateSize)
    {}
};

class IBlockAggregatorCombineKeys : public IBlockAggregatorBase {
public:
    virtual void InitKey(void* state, ui64 batchNum, const NUdf::TUnboxedValue* columns, ui64 row) = 0;

    virtual void UpdateKey(void* state, ui64 batchNum, const NUdf::TUnboxedValue* columns, ui64 row) = 0;

    virtual std::unique_ptr<IAggColumnBuilder> MakeStateBuilder(ui64 size) = 0;

    explicit IBlockAggregatorCombineKeys(ui32 stateSize)
        : IBlockAggregatorBase(stateSize)
    {}
};

class IBlockAggregatorFinalizeKeys : public IBlockAggregatorBase {
public:
    virtual void LoadState(void* state, ui64 batchNum, const NUdf::TUnboxedValue* columns, ui64 row) = 0;

    virtual void UpdateState(void* state, ui64 batchNum, const NUdf::TUnboxedValue* columns, ui64 row) = 0;

    virtual std::unique_ptr<IAggColumnBuilder> MakeResultBuilder(ui64 size) = 0;

    explicit IBlockAggregatorFinalizeKeys(ui32 stateSize)
        : IBlockAggregatorBase(stateSize)
    {}
};

template <typename TBase>
class TBlockAggregatorBase : public TBase {
public:
    TBlockAggregatorBase(ui32 stateSize, std::optional<ui32> filterColumn, TComputationContext& ctx)
        : TBase(stateSize)
        , FilterColumn_(filterColumn)
        , Ctx_(ctx)
    {
    }

protected:
    const std::optional<ui32> FilterColumn_;
    TComputationContext& Ctx_;
};

template <typename T>
class IPreparedBlockAggregator {
public:
    virtual ~IPreparedBlockAggregator() = default;

    virtual std::unique_ptr<T> Make(TComputationContext& ctx) const = 0;

    const ui32 StateSize;

    explicit IPreparedBlockAggregator(ui32 stateSize)
        : StateSize(stateSize)
    {}
};

class IBlockAggregatorFactory {
public:
   virtual ~IBlockAggregatorFactory() = default;

   virtual std::unique_ptr<IPreparedBlockAggregator<IBlockAggregatorCombineAll>> PrepareCombineAll(
       TTupleType* tupleType,
       std::optional<ui32> filterColumn,
       const std::vector<ui32>& argsColumns,
       const TTypeEnvironment& env) const = 0;

   virtual std::unique_ptr<IPreparedBlockAggregator<IBlockAggregatorCombineKeys>> PrepareCombineKeys(
       TTupleType* tupleType,
       const std::vector<ui32>& argsColumns,
       const TTypeEnvironment& env) const = 0;

   virtual std::unique_ptr<IPreparedBlockAggregator<IBlockAggregatorFinalizeKeys>> PrepareFinalizeKeys(
       TTupleType* tupleType,
       const std::vector<ui32>& argsColumns,
       const TTypeEnvironment& env,
       TType* returnType,
       ui32 hint) const = 0;
};

const IBlockAggregatorFactory& GetBlockAggregatorFactory(TStringBuf name);

struct TCombineAllTag {
    using TAggregator = IBlockAggregatorCombineAll;
    using TPreparedAggregator = IPreparedBlockAggregator<TAggregator>;
    using TBase = TBlockAggregatorBase<TAggregator>;
};

struct TCombineKeysTag {
    using TAggregator = IBlockAggregatorCombineKeys;
    using TPreparedAggregator = IPreparedBlockAggregator<TAggregator>;
    using TBase = TBlockAggregatorBase<TAggregator>;
};

struct TFinalizeKeysTag {
    using TAggregator = IBlockAggregatorFinalizeKeys;
    using TPreparedAggregator = IPreparedBlockAggregator<TAggregator>;
    using TBase = TBlockAggregatorBase<TAggregator>;
};

}
}