aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/clickhouse/src/Storages/MergeTree/MergedColumnOnlyOutputStream.cpp
blob: 3b2eb96f2d4ea3c715e71adf67b16b4842f77432 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
#include <Storages/MergeTree/MergedColumnOnlyOutputStream.h>
#include <Storages/MergeTree/MergeTreeDataPartWriterOnDisk.h>
#include <Interpreters/Context.h>
#include <IO/WriteSettings.h>

namespace DB
{
namespace ErrorCodes
{
    extern const int NOT_IMPLEMENTED;
}

MergedColumnOnlyOutputStream::MergedColumnOnlyOutputStream(
    const MergeTreeMutableDataPartPtr & data_part,
    const StorageMetadataPtr & metadata_snapshot_,
    const Block & header_,
    CompressionCodecPtr default_codec,
    const MergeTreeIndices & indices_to_recalc,
    WrittenOffsetColumns * offset_columns_,
    const MergeTreeIndexGranularity & index_granularity,
    const MergeTreeIndexGranularityInfo * index_granularity_info)
    : IMergedBlockOutputStream(data_part, metadata_snapshot_, header_.getNamesAndTypesList(), /*reset_columns=*/ true)
    , header(header_)
{
    const auto & global_settings = data_part->storage.getContext()->getSettings();
    const auto & storage_settings = data_part->storage.getSettings();

    MergeTreeWriterSettings writer_settings(
        global_settings,
        data_part->storage.getContext()->getWriteSettings(),
        storage_settings,
        index_granularity_info ? index_granularity_info->mark_type.adaptive : data_part->storage.canUseAdaptiveGranularity(),
        /* rewrite_primary_key = */ false);

    writer = data_part->getWriter(
        header.getNamesAndTypesList(),
        metadata_snapshot_,
        indices_to_recalc,
        default_codec,
        writer_settings,
        index_granularity);

    auto * writer_on_disk = dynamic_cast<MergeTreeDataPartWriterOnDisk *>(writer.get());
    if (!writer_on_disk)
        throw Exception(ErrorCodes::NOT_IMPLEMENTED, "MergedColumnOnlyOutputStream supports only parts stored on disk");

    writer_on_disk->setWrittenOffsetColumns(offset_columns_);
}

void MergedColumnOnlyOutputStream::write(const Block & block)
{
    if (!block.rows())
        return;

    writer->write(block, nullptr);
    new_serialization_infos.add(block);
}

MergeTreeData::DataPart::Checksums
MergedColumnOnlyOutputStream::fillChecksums(
    MergeTreeData::MutableDataPartPtr & new_part,
    MergeTreeData::DataPart::Checksums & all_checksums)
{
    /// Finish columns serialization.
    MergeTreeData::DataPart::Checksums checksums;
    writer->fillChecksums(checksums);

    for (const auto & [projection_name, projection_part] : new_part->getProjectionParts())
        checksums.addFile(
            projection_name + ".proj",
            projection_part->checksums.getTotalSizeOnDisk(),
            projection_part->checksums.getTotalChecksumUInt128());

    auto columns = new_part->getColumns();
    auto serialization_infos = new_part->getSerializationInfos();
    serialization_infos.replaceData(new_serialization_infos);

    auto removed_files = removeEmptyColumnsFromPart(new_part, columns, serialization_infos, checksums);

    for (const String & removed_file : removed_files)
    {
        new_part->getDataPartStorage().removeFileIfExists(removed_file);

        if (all_checksums.files.contains(removed_file))
            all_checksums.files.erase(removed_file);
    }

    new_part->setColumns(columns, serialization_infos, metadata_snapshot->getMetadataVersion());
    return checksums;
}

void MergedColumnOnlyOutputStream::finish(bool sync)
{
    writer->finish(sync);
}

}