aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/clickhouse/src/Processors/TTL/TTLDeleteAlgorithm.cpp
blob: f176df2d0030c82b908e2e1d953f073a71ba37dd (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
#include <Processors/TTL/TTLDeleteAlgorithm.h>

namespace DB
{

TTLDeleteAlgorithm::TTLDeleteAlgorithm(
    const TTLDescription & description_, const TTLInfo & old_ttl_info_, time_t current_time_, bool force_)
    : ITTLAlgorithm(description_, old_ttl_info_, current_time_, force_)
{
    if (!isMinTTLExpired())
        new_ttl_info = old_ttl_info;

    if (isMaxTTLExpired())
        new_ttl_info.ttl_finished = true;
}

void TTLDeleteAlgorithm::execute(Block & block)
{
    if (!block || !isMinTTLExpired())
        return;

    auto ttl_column = executeExpressionAndGetColumn(description.expression, block, description.result_column);
    auto where_column = executeExpressionAndGetColumn(description.where_expression, block, description.where_result_column);

    MutableColumns result_columns;
    const auto & column_names = block.getNames();

    result_columns.reserve(column_names.size());
    for (auto it = column_names.begin(); it != column_names.end(); ++it)
    {
        const IColumn * values_column = block.getByName(*it).column.get();
        MutableColumnPtr result_column = values_column->cloneEmpty();
        result_column->reserve(block.rows());

        for (size_t i = 0; i < block.rows(); ++i)
        {
            UInt32 cur_ttl = getTimestampByIndex(ttl_column.get(), i);
            bool where_filter_passed = !where_column || where_column->getBool(i);

            if (!isTTLExpired(cur_ttl) || !where_filter_passed)
            {
                new_ttl_info.update(cur_ttl);
                result_column->insertFrom(*values_column, i);
            }
            else if (it == column_names.begin())
                ++rows_removed;
        }

        result_columns.emplace_back(std::move(result_column));
    }

    block = block.cloneWithColumns(std::move(result_columns));
}

void TTLDeleteAlgorithm::finalize(const MutableDataPartPtr & data_part) const
{
    if (description.where_expression)
        data_part->ttl_infos.rows_where_ttl[description.result_column] = new_ttl_info;
    else
        data_part->ttl_infos.table_ttl = new_ttl_info;

    data_part->ttl_infos.updatePartMinMaxTTL(new_ttl_info.min, new_ttl_info.max);
}

}