aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/clickhouse/src/Storages/MergeTree/MergeTreeDataPartWriterInMemory.cpp
blob: 9afa7a1e80d6918b65a521408ca09928d5803f90 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
#include <Storages/MergeTree/MergeTreeDataPartWriterInMemory.h>
#include <Storages/MergeTree/MergeTreeDataPartInMemory.h>
#include <Storages/MergeTree/MergeTreeWriteAheadLog.h>

namespace DB
{

namespace ErrorCodes
{
    extern const int LOGICAL_ERROR;
}

MergeTreeDataPartWriterInMemory::MergeTreeDataPartWriterInMemory(
    const MutableDataPartInMemoryPtr & part_,
    const NamesAndTypesList & columns_list_,
    const StorageMetadataPtr & metadata_snapshot_,
    const MergeTreeWriterSettings & settings_)
    : IMergeTreeDataPartWriter(part_, columns_list_, metadata_snapshot_, settings_)
    , part_in_memory(part_) {}

void MergeTreeDataPartWriterInMemory::write(
    const Block & block, const IColumn::Permutation * permutation)
{
    if (part_in_memory->block)
        throw Exception(ErrorCodes::LOGICAL_ERROR, "DataPartWriterInMemory supports only one write");

    Block primary_key_block;
    if (settings.rewrite_primary_key)
        primary_key_block = getBlockAndPermute(block, metadata_snapshot->getPrimaryKeyColumns(), permutation);

    Block result_block;
    if (permutation)
    {
        for (const auto & col : columns_list)
        {
            if (primary_key_block.has(col.name))
                result_block.insert(primary_key_block.getByName(col.name));
            else
            {
                auto permuted = block.getByName(col.name);
                permuted.column = permuted.column->permute(*permutation, 0);
                result_block.insert(permuted);
            }
        }
    }
    else
    {
        for (const auto & col : columns_list)
            result_block.insert(block.getByName(col.name));
    }

    index_granularity.appendMark(result_block.rows());
    if (with_final_mark)
        index_granularity.appendMark(0);
    part_in_memory->block = std::move(result_block);

    if (settings.rewrite_primary_key)
        calculateAndSerializePrimaryIndex(primary_key_block);
}

void MergeTreeDataPartWriterInMemory::calculateAndSerializePrimaryIndex(const Block & primary_index_block)
{
    size_t rows = primary_index_block.rows();
    if (!rows)
        return;

    size_t primary_columns_num = primary_index_block.columns();
    index_columns.resize(primary_columns_num);
    for (size_t i = 0; i < primary_columns_num; ++i)
    {
        const auto & primary_column = *primary_index_block.getByPosition(i).column;
        index_columns[i] = primary_column.cloneEmpty();
        index_columns[i]->insertFrom(primary_column, 0);
        if (with_final_mark)
            index_columns[i]->insertFrom(primary_column, rows - 1);
    }
}

void MergeTreeDataPartWriterInMemory::fillChecksums(IMergeTreeDataPart::Checksums & checksums)
{
    /// If part is empty we still need to initialize block by empty columns.
    if (!part_in_memory->block)
        for (const auto & column : columns_list)
            part_in_memory->block.insert(ColumnWithTypeAndName{column.type, column.name});

    checksums.files["data.bin"] = part_in_memory->calculateBlockChecksum();
}

}