aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/clickhouse/src/Storages/MergeTree/MergeTreeDataWriter.h
blob: 2fb6b1f22d4330e03a08aa55fe3e234fd45756cf (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
#pragma once

#include <Core/Block.h>

#include <IO/WriteBufferFromFile.h>
#include <Compression/CompressedWriteBuffer.h>

#include <Columns/ColumnsNumber.h>

#include <Interpreters/sortBlock.h>

#include <Processors/Chunk.h>

#include <Storages/MergeTree/MergeTreeData.h>
#include <Storages/MergeTree/MergedBlockOutputStream.h>


namespace DB
{

struct BlockWithPartition
{
    Block block;
    Row partition;
    std::vector<size_t> offsets;
    std::vector<String> tokens;

    BlockWithPartition(Block && block_, Row && partition_)
        : block(block_), partition(std::move(partition_))
    {
    }

    BlockWithPartition(Block && block_, Row && partition_, std::vector<size_t> && offsets_, std::vector<String> && tokens_)
        : block(block_), partition(std::move(partition_)), offsets(std::move(offsets_)), tokens(std::move(tokens_))
    {
    }
};

using BlocksWithPartition = std::vector<BlockWithPartition>;

/** Writes new parts of data to the merge tree.
  */
class MergeTreeDataWriter
{
public:
    explicit MergeTreeDataWriter(MergeTreeData & data_)
        : data(data_)
        , log(&Poco::Logger::get(data.getLogName() + " (Writer)"))
    {}

    /** Split the block to blocks, each of them must be written as separate part.
      *  (split rows by partition)
      * Works deterministically: if same block was passed, function will return same result in same order.
      */
    static BlocksWithPartition splitBlockIntoParts(const Block & block, size_t max_parts, const StorageMetadataPtr & metadata_snapshot, ContextPtr context, AsyncInsertInfoPtr async_insert_info = nullptr);

    /// This structure contains not completely written temporary part.
    /// Some writes may happen asynchronously, e.g. for blob storages.
    /// You should call finalize() to wait until all data is written.

    struct TemporaryPart
    {
        MergeTreeData::MutableDataPartPtr part;

        struct Stream
        {
            std::unique_ptr<MergedBlockOutputStream> stream;
            MergedBlockOutputStream::Finalizer finalizer;
        };

        std::vector<Stream> streams;

        scope_guard temporary_directory_lock;

        void cancel();
        void finalize();
    };

    /** All rows must correspond to same partition.
      * Returns part with unique name starting with 'tmp_', yet not added to MergeTreeData.
      */
    TemporaryPart writeTempPart(BlockWithPartition & block, const StorageMetadataPtr & metadata_snapshot, ContextPtr context);

    MergeTreeData::MergingParams::Mode getMergingMode() const
    {
        return data.merging_params.mode;
    }

    TemporaryPart writeTempPartWithoutPrefix(BlockWithPartition & block, const StorageMetadataPtr & metadata_snapshot, int64_t block_number, ContextPtr context);

    /// For insertion.
    static TemporaryPart writeProjectionPart(
        const MergeTreeData & data,
        Poco::Logger * log,
        Block block,
        const ProjectionDescription & projection,
        IMergeTreeDataPart * parent_part);

    /// For mutation: MATERIALIZE PROJECTION.
    static TemporaryPart writeTempProjectionPart(
        const MergeTreeData & data,
        Poco::Logger * log,
        Block block,
        const ProjectionDescription & projection,
        IMergeTreeDataPart * parent_part,
        size_t block_num);

    static Block mergeBlock(
        const Block & block,
        SortDescription sort_description,
        const Names & partition_key_columns,
        IColumn::Permutation *& permutation,
        const MergeTreeData::MergingParams & merging_params);

private:

    TemporaryPart writeTempPartImpl(
        BlockWithPartition & block,
        const StorageMetadataPtr & metadata_snapshot,
        ContextPtr context,
        int64_t block_number,
        bool need_tmp_prefix);

    static TemporaryPart writeProjectionPartImpl(
        const String & part_name,
        bool is_temp,
        IMergeTreeDataPart * parent_part,
        const MergeTreeData & data,
        Poco::Logger * log,
        Block block,
        const ProjectionDescription & projection);

    MergeTreeData & data;
    Poco::Logger * log;
};

}