1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
|
#pragma once
#include <base/types.h>
#include <Common/ProfileEvents.h>
#include <IO/Progress.h>
#include <Storages/MergeTree/MergeList.h>
namespace ProfileEvents
{
extern const Event MergesTimeMilliseconds;
extern const Event MergedUncompressedBytes;
extern const Event MergedRows;
extern const Event Merge;
}
namespace DB
{
/** Progress callback.
* What it should update:
* - approximate progress
* - amount of read rows
* - various metrics
* - time elapsed for current merge.
*/
/// Auxiliary struct that for each merge stage stores its current progress.
/// A stage is: the horizontal stage + a stage for each gathered column (if we are doing a
/// Vertical merge) or a mutation of a single part. During a single stage all rows are read.
struct MergeStageProgress
{
explicit MergeStageProgress(Float64 weight_)
: is_first(true) , weight(weight_) {}
MergeStageProgress(Float64 initial_progress_, Float64 weight_)
: initial_progress(initial_progress_), is_first(false), weight(weight_) {}
Float64 initial_progress = 0.0;
bool is_first;
Float64 weight;
UInt64 total_rows = 0;
UInt64 rows_read = 0;
};
class MergeProgressCallback
{
public:
MergeProgressCallback(
MergeListElement * merge_list_element_ptr_, UInt64 & watch_prev_elapsed_, MergeStageProgress & stage_)
: merge_list_element_ptr(merge_list_element_ptr_)
, watch_prev_elapsed(watch_prev_elapsed_)
, stage(stage_)
{
updateWatch();
}
MergeListElement * merge_list_element_ptr;
UInt64 & watch_prev_elapsed;
MergeStageProgress & stage;
void updateWatch()
{
UInt64 watch_curr_elapsed = merge_list_element_ptr->watch.elapsed();
ProfileEvents::increment(ProfileEvents::MergesTimeMilliseconds, (watch_curr_elapsed - watch_prev_elapsed) / 1000000);
watch_prev_elapsed = watch_curr_elapsed;
}
void operator() (const Progress & value)
{
ProfileEvents::increment(ProfileEvents::MergedUncompressedBytes, value.read_bytes);
if (stage.is_first)
{
ProfileEvents::increment(ProfileEvents::MergedRows, value.read_rows);
ProfileEvents::increment(ProfileEvents::Merge);
}
updateWatch();
merge_list_element_ptr->bytes_read_uncompressed += value.read_bytes;
if (stage.is_first)
merge_list_element_ptr->rows_read += value.read_rows;
stage.total_rows += value.total_rows_to_read;
stage.rows_read += value.read_rows;
if (stage.total_rows > 0)
{
merge_list_element_ptr->progress.store(
stage.initial_progress + stage.weight * stage.rows_read / stage.total_rows,
std::memory_order_relaxed);
}
}
};
}
|