aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/clickhouse/src/Storages/WindowView/WindowViewSource.h
blob: 74720bf8074febe613b1e85c4c24cafbf24b1f1b (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
#pragma once

#include <Storages/WindowView/StorageWindowView.h>
#include <Processors/ISource.h>


namespace DB
{

class WindowViewSource : public ISource
{
public:
    WindowViewSource(
        std::shared_ptr<StorageWindowView> storage_,
        const bool is_events_,
        String window_view_timezone_,
        const bool has_limit_,
        const UInt64 limit_,
        const UInt64 heartbeat_interval_sec_)
        : ISource(
            is_events_ ? Block(
                {ColumnWithTypeAndName(ColumnUInt32::create(), std::make_shared<DataTypeDateTime>(window_view_timezone_), "watermark")})
                       : storage_->getOutputHeader())
        , storage(storage_)
        , is_events(is_events_)
        , window_view_timezone(window_view_timezone_)
        , has_limit(has_limit_)
        , limit(limit_)
        , heartbeat_interval_usec(heartbeat_interval_sec_ * 1000000)
    {
        if (is_events)
            header.insert(
                ColumnWithTypeAndName(ColumnUInt32::create(), std::make_shared<DataTypeDateTime>(window_view_timezone_), "watermark"));
        else
            header = storage->getOutputHeader();
    }

    String getName() const override { return "WindowViewSource"; }

    void addBlock(Block block_, UInt32 watermark)
    {
        std::lock_guard lock(blocks_mutex);
        blocks_with_watermark.push_back({std::move(block_), watermark});
    }

protected:
    Block getHeader() const { return header; }

    Chunk generate() override
    {
        Block block;
        UInt32 watermark;
        std::tie(block, watermark) = generateImpl();
        if (!block)
            return Chunk();
        if (is_events)
        {
            return Chunk(
                {DataTypeDateTime(window_view_timezone).createColumnConst(block.rows(), watermark)->convertToFullColumnIfConst()},
                block.rows());
        }
        else
        {
            return Chunk(block.getColumns(), block.rows());
        }
    }

    std::pair<Block, UInt32> generateImpl()
    {
        if (has_limit && num_updates == static_cast<Int64>(limit))
            return {Block(), 0};

        if (isCancelled() || storage->shutdown_called)
            return {Block(), 0};

        std::unique_lock lock(blocks_mutex);
        if (blocks_with_watermark.empty())
        {
            if (!end_of_blocks)
            {
                end_of_blocks = true;
                num_updates += 1;
                return {getHeader(), 0};
            }

            while ((Poco::Timestamp().epochMicroseconds() - last_heartbeat_timestamp_usec) < heartbeat_interval_usec)
            {
                bool signaled = std::cv_status::no_timeout == storage->fire_condition.wait_for(lock, std::chrono::microseconds(1000));
                if (signaled)
                    break;
                if (isCancelled() || storage->shutdown_called)
                    return {Block(), 0};
            }
        }

        if (!blocks_with_watermark.empty())
        {
            end_of_blocks = false;
            auto res = blocks_with_watermark.front();
            blocks_with_watermark.pop_front();
            return res;
        }
        else
        {
            last_heartbeat_timestamp_usec = static_cast<UInt64>(Poco::Timestamp().epochMicroseconds());
            return {getHeader(), 0};
        }
    }

private:
    std::shared_ptr<StorageWindowView> storage;

    std::list<std::pair<Block, UInt32>> blocks_with_watermark;

    Block header;
    const bool is_events;
    String window_view_timezone;
    const bool has_limit;
    const UInt64 limit;
    Int64 num_updates = -1;
    bool end_of_blocks = false;
    std::mutex blocks_mutex;
    UInt64 heartbeat_interval_usec;
    UInt64 last_heartbeat_timestamp_usec = 0;
};
}