1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
|
#pragma once
#include <Storages/WindowView/StorageWindowView.h>
#include <Processors/ISource.h>
namespace DB
{
class WindowViewSource : public ISource
{
public:
WindowViewSource(
std::shared_ptr<StorageWindowView> storage_,
const bool is_events_,
String window_view_timezone_,
const bool has_limit_,
const UInt64 limit_,
const UInt64 heartbeat_interval_sec_)
: ISource(
is_events_ ? Block(
{ColumnWithTypeAndName(ColumnUInt32::create(), std::make_shared<DataTypeDateTime>(window_view_timezone_), "watermark")})
: storage_->getOutputHeader())
, storage(storage_)
, is_events(is_events_)
, window_view_timezone(window_view_timezone_)
, has_limit(has_limit_)
, limit(limit_)
, heartbeat_interval_usec(heartbeat_interval_sec_ * 1000000)
{
if (is_events)
header.insert(
ColumnWithTypeAndName(ColumnUInt32::create(), std::make_shared<DataTypeDateTime>(window_view_timezone_), "watermark"));
else
header = storage->getOutputHeader();
}
String getName() const override { return "WindowViewSource"; }
void addBlock(Block block_, UInt32 watermark)
{
std::lock_guard lock(blocks_mutex);
blocks_with_watermark.push_back({std::move(block_), watermark});
}
protected:
Block getHeader() const { return header; }
Chunk generate() override
{
Block block;
UInt32 watermark;
std::tie(block, watermark) = generateImpl();
if (!block)
return Chunk();
if (is_events)
{
return Chunk(
{DataTypeDateTime(window_view_timezone).createColumnConst(block.rows(), watermark)->convertToFullColumnIfConst()},
block.rows());
}
else
{
return Chunk(block.getColumns(), block.rows());
}
}
std::pair<Block, UInt32> generateImpl()
{
if (has_limit && num_updates == static_cast<Int64>(limit))
return {Block(), 0};
if (isCancelled() || storage->shutdown_called)
return {Block(), 0};
std::unique_lock lock(blocks_mutex);
if (blocks_with_watermark.empty())
{
if (!end_of_blocks)
{
end_of_blocks = true;
num_updates += 1;
return {getHeader(), 0};
}
while ((Poco::Timestamp().epochMicroseconds() - last_heartbeat_timestamp_usec) < heartbeat_interval_usec)
{
bool signaled = std::cv_status::no_timeout == storage->fire_condition.wait_for(lock, std::chrono::microseconds(1000));
if (signaled)
break;
if (isCancelled() || storage->shutdown_called)
return {Block(), 0};
}
}
if (!blocks_with_watermark.empty())
{
end_of_blocks = false;
auto res = blocks_with_watermark.front();
blocks_with_watermark.pop_front();
return res;
}
else
{
last_heartbeat_timestamp_usec = static_cast<UInt64>(Poco::Timestamp().epochMicroseconds());
return {getHeader(), 0};
}
}
private:
std::shared_ptr<StorageWindowView> storage;
std::list<std::pair<Block, UInt32>> blocks_with_watermark;
Block header;
const bool is_events;
String window_view_timezone;
const bool has_limit;
const UInt64 limit;
Int64 num_updates = -1;
bool end_of_blocks = false;
std::mutex blocks_mutex;
UInt64 heartbeat_interval_usec;
UInt64 last_heartbeat_timestamp_usec = 0;
};
}
|