aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/clickhouse/src/AggregateFunctions/AggregateFunctionDeltaSum.h
blob: d64f949825a8891110e548357b017a54eccec794 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
#pragma once

#include <type_traits>

#include <IO/ReadHelpers.h>
#include <IO/WriteHelpers.h>

#include <Columns/ColumnVector.h>
#include <DataTypes/DataTypesDecimal.h>
#include <DataTypes/DataTypesNumber.h>

#include <AggregateFunctions/IAggregateFunction.h>


namespace DB
{
struct Settings;

template <typename T>
struct AggregationFunctionDeltaSumData
{
    T sum = 0;
    T last = 0;
    T first = 0;
    bool seen = false;
};

template <typename T>
class AggregationFunctionDeltaSum final
    : public IAggregateFunctionDataHelper<AggregationFunctionDeltaSumData<T>, AggregationFunctionDeltaSum<T>>
{
public:
    AggregationFunctionDeltaSum(const DataTypes & arguments, const Array & params)
        : IAggregateFunctionDataHelper<AggregationFunctionDeltaSumData<T>, AggregationFunctionDeltaSum<T>>{arguments, params, createResultType()}
    {}

    AggregationFunctionDeltaSum()
        : IAggregateFunctionDataHelper<AggregationFunctionDeltaSumData<T>, AggregationFunctionDeltaSum<T>>{}
    {}

    String getName() const override { return "deltaSum"; }

    static DataTypePtr createResultType() { return std::make_shared<DataTypeNumber<T>>(); }

    bool allocatesMemoryInArena() const override { return false; }

    void NO_SANITIZE_UNDEFINED ALWAYS_INLINE add(AggregateDataPtr __restrict place, const IColumn ** columns, size_t row_num, Arena *) const override
    {
        auto value = assert_cast<const ColumnVector<T> &>(*columns[0]).getData()[row_num];

        if ((this->data(place).last < value) && this->data(place).seen)
        {
            this->data(place).sum += (value - this->data(place).last);
        }

        this->data(place).last = value;

        if (!this->data(place).seen)
        {
            this->data(place).first = value;
            this->data(place).seen = true;
        }
    }

    void NO_SANITIZE_UNDEFINED ALWAYS_INLINE merge(AggregateDataPtr __restrict place, ConstAggregateDataPtr rhs, Arena *) const override
    {
        auto place_data = &this->data(place);
        auto rhs_data = &this->data(rhs);

        if ((place_data->last < rhs_data->first) && place_data->seen && rhs_data->seen)
        {
            // If the lhs last number seen is less than the first number the rhs saw, the lhs is before
            // the rhs, for example [0, 2] [4, 7]. So we want to add the deltasums, but also add the
            // difference between lhs last number and rhs first number (the 2 and 4). Then we want to
            // take last value from the rhs, so first and last become 0 and 7.

            place_data->sum += rhs_data->sum + (rhs_data->first - place_data->last);
            place_data->last = rhs_data->last;
        }
        else if ((rhs_data->first < place_data->last && rhs_data->seen && place_data->seen))
        {
            // In the opposite scenario, the lhs comes after the rhs, e.g. [4, 6] [1, 2]. Since we
            // assume the input interval states are sorted by time, we assume this is a counter
            // reset, and therefore do *not* add the difference between our first value and the
            // rhs last value.

            place_data->sum += rhs_data->sum;
            place_data->last = rhs_data->last;
        }
        else if (rhs_data->seen && !place_data->seen)
        {
            // If we're here then the lhs is an empty state and the rhs does have some state, so
            // we'll just take that state.

            place_data->first = rhs_data->first;
            place_data->last = rhs_data->last;
            place_data->sum = rhs_data->sum;
            place_data->seen = rhs_data->seen;
        }

        // Otherwise lhs either has data or is uninitialized, so we don't need to modify its values.
    }

    void serialize(ConstAggregateDataPtr __restrict place, WriteBuffer & buf, std::optional<size_t> /* version */) const override
    {
        writeBinaryLittleEndian(this->data(place).sum, buf);
        writeBinaryLittleEndian(this->data(place).first, buf);
        writeBinaryLittleEndian(this->data(place).last, buf);
        writeBinaryLittleEndian(this->data(place).seen, buf);
    }

    void deserialize(AggregateDataPtr __restrict place, ReadBuffer & buf, std::optional<size_t> /* version */, Arena *) const override
    {
        readBinaryLittleEndian(this->data(place).sum, buf);
        readBinaryLittleEndian(this->data(place).first, buf);
        readBinaryLittleEndian(this->data(place).last, buf);
        readBinaryLittleEndian(this->data(place).seen, buf);
    }

    void insertResultInto(AggregateDataPtr __restrict place, IColumn & to, Arena *) const override
    {
        assert_cast<ColumnVector<T> &>(to).getData().push_back(this->data(place).sum);
    }
};

}