aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/clickhouse/src/Storages/MergeTree/MergeTreeInOrderSelectProcessor.cpp
blob: 2676adfac2d0aa59737fa52ade9031a056c78cd4 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
#include <Storages/MergeTree/MergeTreeInOrderSelectProcessor.h>
#include "Storages/MergeTree/RangesInDataPart.h"
#include <Storages/MergeTree/IntersectionsIndexes.h>

namespace DB
{

namespace ErrorCodes
{
    extern const int MEMORY_LIMIT_EXCEEDED;
}

bool MergeTreeInOrderSelectAlgorithm::getNewTaskImpl()
try
{
    if (all_mark_ranges.empty())
        return false;

    if (!reader)
        initializeReaders();

    MarkRanges mark_ranges_for_task;

    if (!pool)
    {
        /// If we need to read few rows, set one range per task to reduce number of read data.
        if (has_limit_below_one_block)
        {
            mark_ranges_for_task = MarkRanges{};
            mark_ranges_for_task.emplace_front(std::move(all_mark_ranges.front()));
            all_mark_ranges.pop_front();
        }
        else
        {
            mark_ranges_for_task = std::move(all_mark_ranges);
            all_mark_ranges.clear();
        }
    }
    else
    {
        auto description = RangesInDataPartDescription{
            .info = data_part->info,
            /// We just ignore all the distribution done before
            /// Everything will be done on coordinator side
            .ranges = {},
        };

        mark_ranges_for_task = pool->getNewTask(description);

        if (mark_ranges_for_task.empty())
            return false;
    }

    auto size_predictor = (preferred_block_size_bytes == 0) ? nullptr
        : getSizePredictor(data_part, task_columns, sample_block);

    task = std::make_unique<MergeTreeReadTask>(
        data_part,
        alter_conversions,
        mark_ranges_for_task,
        part_index_in_query,
        column_name_set,
        task_columns,
        std::move(size_predictor));

    return true;
}
catch (...)
{
    /// Suspicion of the broken part. A part is added to the queue for verification.
    if (getCurrentExceptionCode() != ErrorCodes::MEMORY_LIMIT_EXCEEDED)
        storage.reportBrokenPart(data_part);
    throw;
}

}