aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/clickhouse/src/Storages/MergeTree/InsertBlockInfo.cpp
blob: ac900f8cf097e0a51c6a67e60e383ff53f8d2860 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
#include <Storages/MergeTree/InsertBlockInfo.h>

namespace DB
{

namespace ErrorCodes
{
    extern const int LOGICAL_ERROR;
}

AsyncInsertBlockInfo::AsyncInsertBlockInfo(
    Poco::Logger * log_,
    std::vector<std::string> && block_id_,
    BlockWithPartition && block_,
    std::optional<BlockWithPartition> && unmerged_block_with_partition_)
    : log(log_)
    , block_id(std::move(block_id_))
    , block_with_partition(std::move(block_))
    , unmerged_block_with_partition(std::move(unmerged_block_with_partition_))
{
    initBlockIDMap();
}

void AsyncInsertBlockInfo::initBlockIDMap()
{
    block_id_to_offset_idx.clear();
    for (size_t i = 0; i < block_id.size(); ++i)
    {
        block_id_to_offset_idx[block_id[i]].push_back(i);
    }
}

/// this function check if the block contains duplicate inserts.
/// if so, we keep only one insert for every duplicate ones.
bool AsyncInsertBlockInfo::filterSelfDuplicate()
{
    std::vector<String> dup_block_ids;
    for (const auto & [hash_id, offset_indexes] : block_id_to_offset_idx)
    {
        /// It means more than one inserts have the same hash id, in this case, we should keep only one of them.
        if (offset_indexes.size() > 1)
            dup_block_ids.push_back(hash_id);
    }
    if (dup_block_ids.empty())
        return false;

    filterBlockDuplicate(dup_block_ids, true);
    return true;
}

/// remove the conflict parts of block for rewriting again.
void AsyncInsertBlockInfo::filterBlockDuplicate(const std::vector<String> & block_paths, bool self_dedup)
{
    auto * current_block_with_partition = unmerged_block_with_partition.has_value() ? &unmerged_block_with_partition.value() : &block_with_partition;
    std::vector<size_t> offset_idx;
    for (const auto & raw_path : block_paths)
    {
        std::filesystem::path p(raw_path);
        String conflict_block_id = p.filename();
        auto it = block_id_to_offset_idx.find(conflict_block_id);
        if (it == block_id_to_offset_idx.end())
            throw Exception(ErrorCodes::LOGICAL_ERROR, "Unknown conflict path {}", conflict_block_id);
        /// if this filter is for self_dedup, that means the block paths is selected by `filterSelfDuplicate`, which is a self purge.
        /// in this case, we don't know if zk has this insert, then we should keep one insert, to avoid missing this insert.
        offset_idx.insert(std::end(offset_idx), std::begin(it->second) + self_dedup, std::end(it->second));
    }
    std::sort(offset_idx.begin(), offset_idx.end());

    auto & offsets = current_block_with_partition->offsets;
    size_t idx = 0, remove_count = 0;
    auto it = offset_idx.begin();
    std::vector<size_t> new_offsets;
    std::vector<String> new_block_ids;

    /// construct filter
    size_t rows = current_block_with_partition->block.rows();
    auto filter_col = ColumnUInt8::create(rows, 1u);
    ColumnUInt8::Container & vec = filter_col->getData();
    UInt8 * pos = vec.data();
    for (auto & offset : offsets)
    {
        if (it != offset_idx.end() && *it == idx)
        {
            size_t start_pos = idx > 0 ? offsets[idx - 1] : 0;
            size_t end_pos = offset;
            remove_count += end_pos - start_pos;
            while (start_pos < end_pos)
            {
                *(pos + start_pos) = 0;
                start_pos++;
            }
            it++;
        }
        else
        {
            new_offsets.push_back(offset - remove_count);
            new_block_ids.push_back(block_id[idx]);
        }
        idx++;
    }

    LOG_TRACE(log, "New block IDs: {}, new offsets: {}, size: {}", toString(new_block_ids), toString(new_offsets), new_offsets.size());

    current_block_with_partition->offsets = std::move(new_offsets);
    block_id = std::move(new_block_ids);
    auto cols = current_block_with_partition->block.getColumns();
    for (auto & col : cols)
    {
        col = col->filter(vec, rows - remove_count);
    }
    current_block_with_partition->block.setColumns(cols);

    LOG_TRACE(log, "New block rows {}", current_block_with_partition->block.rows());

    initBlockIDMap();

    if (unmerged_block_with_partition.has_value())
        block_with_partition.block = unmerged_block_with_partition->block;
}

std::vector<String> AsyncInsertBlockInfo::getHashesForBlocks(BlockWithPartition & block, String partition_id)
{
    size_t start = 0;
    auto cols = block.block.getColumns();
    std::vector<String> block_id_vec;
    for (size_t i = 0; i < block.offsets.size(); ++i)
    {
        size_t offset = block.offsets[i];
        std::string_view token = block.tokens[i];
        if (token.empty())
        {
            SipHash hash;
            for (size_t j = start; j < offset; ++j)
            {
                for (const auto & col : cols)
                    col->updateHashWithValue(j, hash);
            }

            const auto hash_value = hash.get128();
            block_id_vec.push_back(partition_id + "_" + DB::toString(hash_value.items[0]) + "_" + DB::toString(hash_value.items[1]));
        }
        else
            block_id_vec.push_back(partition_id + "_" + std::string(token));

        start = offset;
    }
    return block_id_vec;
}

}