aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/clickhouse/src/Storages/System/StorageSystemZeros.cpp
blob: 6c2ddd8d3ddb707cf68beb78514694ab07257540 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
#include <Storages/System/StorageSystemZeros.h>

#include <Processors/ISource.h>
#include <QueryPipeline/Pipe.h>

#include <DataTypes/DataTypesNumber.h>
#include <Columns/ColumnsNumber.h>

namespace DB
{

namespace
{

struct ZerosState
{
    std::atomic<UInt64> num_generated_rows = 0;
};

using ZerosStatePtr = std::shared_ptr<ZerosState>;


/// Source which generates zeros.
/// Uses state to share the number of generated rows between threads.
/// If state is nullptr, then limit is ignored.
class ZerosSource : public ISource
{
public:
    ZerosSource(UInt64 block_size, UInt64 limit_, ZerosStatePtr state_)
            : ISource(createHeader()), limit(limit_), state(std::move(state_))
    {
        column = createColumn(block_size);
    }

    String getName() const override { return "Zeros"; }

protected:
    Chunk generate() override
    {
        auto column_ptr = column;
        size_t column_size = column_ptr->size();

        if (state)
        {
            auto generated_rows = state->num_generated_rows.fetch_add(column_size, std::memory_order_acquire);

            if (generated_rows >= limit)
                return {};

            if (generated_rows + column_size > limit)
            {
                column_size = limit - generated_rows;
                column_ptr = createColumn(column_size);
            }
        }

        progress(column->size(), column->byteSize());

        return { Columns {std::move(column_ptr)}, column_size };
    }

private:
    UInt64 limit;
    ZerosStatePtr state;
    ColumnPtr column;

    static Block createHeader()
    {
        return { ColumnWithTypeAndName(ColumnUInt8::create(), std::make_shared<DataTypeUInt8>(), "zero") };
    }

    static ColumnPtr createColumn(size_t size)
    {
        auto column_ptr = ColumnUInt8::create();
        /// It is probably the fastest method to create zero column, cause resize_fill uses memset internally.
        column_ptr->getData().resize_fill(size);

        return column_ptr;
    }
};

}

StorageSystemZeros::StorageSystemZeros(const StorageID & table_id_, bool multithreaded_, std::optional<UInt64> limit_)
    : IStorage(table_id_), multithreaded(multithreaded_), limit(limit_)
{
    StorageInMemoryMetadata storage_metadata;
    storage_metadata.setColumns(ColumnsDescription({{"zero", std::make_shared<DataTypeUInt8>()}}));
    setInMemoryMetadata(storage_metadata);

}

Pipe StorageSystemZeros::read(
    const Names & column_names,
    const StorageSnapshotPtr & storage_snapshot,
    SelectQueryInfo &,
    ContextPtr /*context*/,
    QueryProcessingStage::Enum /*processed_stage*/,
    size_t max_block_size,
    size_t num_streams)
{
    storage_snapshot->check(column_names);

    bool use_multiple_streams = multithreaded;

    if (limit && *limit < max_block_size)
    {
        max_block_size = static_cast<size_t>(*limit);
        use_multiple_streams = false;
    }

    if (!use_multiple_streams)
        num_streams = 1;

    Pipe res;

    ZerosStatePtr state;

    if (limit)
        state = std::make_shared<ZerosState>();

    for (size_t i = 0; i < num_streams; ++i)
    {
        auto source = std::make_shared<ZerosSource>(max_block_size, limit ? *limit : 0, state);

        if (limit && i == 0)
            source->addTotalRowsApprox(*limit);

        res.addSource(std::move(source));
    }

    return res;
}

}