1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
|
#pragma once
#include <condition_variable>
#include <functional>
#include <optional>
#include <base/types.h>
#include <IO/WriteBuffer.h>
#include <IO/ReadBuffer.h>
#include <Storages/MergeTree/MarkRange.h>
#include <Storages/MergeTree/RangesInDataPart.h>
namespace DB
{
/// This enum is being serialized and transferred over a network
/// You can't reorder it or add another value in the middle
enum class CoordinationMode : uint8_t
{
Default = 0,
/// For reading in order
WithOrder = 1,
ReverseOrder = 2,
MAX = ReverseOrder,
};
/// Represents a segment [left; right]
struct PartBlockRange
{
Int64 begin;
Int64 end;
bool operator==(const PartBlockRange & rhs) const
{
return begin == rhs.begin && end == rhs.end;
}
};
/// ParallelReadRequest is used by remote replicas during parallel read
/// to signal an initiator that they need more marks to read.
struct ParallelReadRequest
{
/// No default constructor, you must initialize all fields at once.
ParallelReadRequest(
CoordinationMode mode_,
size_t replica_num_,
size_t min_number_of_marks_,
RangesInDataPartsDescription description_)
: mode(mode_)
, replica_num(replica_num_)
, min_number_of_marks(min_number_of_marks_)
, description(std::move(description_))
{}
CoordinationMode mode;
size_t replica_num;
size_t min_number_of_marks;
/// Extension for Ordered (InOrder or ReverseOrder) mode
/// Contains only data part names without mark ranges.
RangesInDataPartsDescription description;
void serialize(WriteBuffer & out) const;
String describe() const;
static ParallelReadRequest deserialize(ReadBuffer & in);
void merge(ParallelReadRequest & other);
};
/// ParallelReadResponse is used by an initiator to tell
/// remote replicas about what to read during parallel reading.
/// Additionally contains information whether there are more available
/// marks to read (whether it is the last packet or not).
struct ParallelReadResponse
{
bool finish{false};
RangesInDataPartsDescription description;
void serialize(WriteBuffer & out) const;
String describe() const;
void deserialize(ReadBuffer & in);
};
/// The set of parts (their names) along with ranges to read which is sent back
/// to the initiator by remote replicas during parallel reading.
/// Additionally contains an identifier (replica_num) plus
/// the reading algorithm chosen (Default, InOrder or ReverseOrder).
struct InitialAllRangesAnnouncement
{
/// No default constructor, you must initialize all fields at once.
InitialAllRangesAnnouncement(
CoordinationMode mode_,
RangesInDataPartsDescription description_,
size_t replica_num_)
: mode(mode_)
, description(description_)
, replica_num(replica_num_)
{}
CoordinationMode mode;
RangesInDataPartsDescription description;
size_t replica_num;
void serialize(WriteBuffer & out) const;
String describe();
static InitialAllRangesAnnouncement deserialize(ReadBuffer & in);
};
using MergeTreeAllRangesCallback = std::function<void(InitialAllRangesAnnouncement)>;
using MergeTreeReadTaskCallback = std::function<std::optional<ParallelReadResponse>(ParallelReadRequest)>;
}
|