1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
|
#pragma once
#include <Storages/MergeTree/MergeTreePartInfo.h>
#include <Interpreters/InterserverIOHandler.h>
#include <Storages/MergeTree/MergeTreeData.h>
#include <Storages/IStorage_fwd.h>
#include <IO/HashingWriteBuffer.h>
#include <IO/copyData.h>
#include <IO/ConnectionTimeouts.h>
#include <Common/Throttler.h>
namespace zkutil
{
class ZooKeeper;
using ZooKeeperPtr = std::shared_ptr<ZooKeeper>;
}
namespace DB
{
class StorageReplicatedMergeTree;
class PooledReadWriteBufferFromHTTP;
namespace DataPartsExchange
{
/** Service for sending parts from the table *ReplicatedMergeTree.
*/
class Service final : public InterserverIOEndpoint
{
public:
explicit Service(StorageReplicatedMergeTree & data_);
Service(const Service &) = delete;
Service & operator=(const Service &) = delete;
std::string getId(const std::string & node_id) const override;
void processQuery(const HTMLForm & params, ReadBuffer & body, WriteBuffer & out, HTTPServerResponse & response) override;
private:
MergeTreeData::DataPartPtr findPart(const String & name);
void sendPartFromMemory(
const MergeTreeData::DataPartPtr & part,
WriteBuffer & out,
bool send_projections);
MergeTreeData::DataPart::Checksums sendPartFromDisk(
const MergeTreeData::DataPartPtr & part,
WriteBuffer & out,
int client_protocol_version,
bool from_remote_disk,
bool send_projections);
/// StorageReplicatedMergeTree::shutdown() waits for all parts exchange handlers to finish,
/// so Service will never access dangling reference to storage
StorageReplicatedMergeTree & data;
Poco::Logger * log;
};
/** Client for getting the parts from the table *MergeTree.
*/
class Fetcher final : private boost::noncopyable
{
public:
explicit Fetcher(StorageReplicatedMergeTree & data_);
/// Downloads a part to tmp_directory. If to_detached - downloads to the `detached` directory.
std::pair<MergeTreeData::MutableDataPartPtr, scope_guard> fetchSelectedPart(
const StorageMetadataPtr & metadata_snapshot,
ContextPtr context,
const String & part_name,
const String & zookeeper_name,
const String & replica_path,
const String & host,
int port,
const ConnectionTimeouts & timeouts,
const String & user,
const String & password,
const String & interserver_scheme,
ThrottlerPtr throttler,
bool to_detached = false,
const String & tmp_prefix_ = "",
std::optional<CurrentlySubmergingEmergingTagger> * tagger_ptr = nullptr,
bool try_zero_copy = true,
DiskPtr dest_disk = nullptr);
/// You need to stop the data transfer.
ActionBlocker blocker;
private:
using OutputBufferGetter = std::function<std::unique_ptr<WriteBufferFromFileBase>(IDataPartStorage &, const String &, size_t)>;
void downloadBaseOrProjectionPartToDisk(
const String & replica_path,
const MutableDataPartStoragePtr & data_part_storage,
PooledReadWriteBufferFromHTTP & in,
OutputBufferGetter output_buffer_getter,
MergeTreeData::DataPart::Checksums & checksums,
ThrottlerPtr throttler,
bool sync) const;
MergeTreeData::MutableDataPartPtr downloadPartToDisk(
const String & part_name,
const String & replica_path,
bool to_detached,
const String & tmp_prefix_,
DiskPtr disk,
bool to_remote_disk,
PooledReadWriteBufferFromHTTP & in,
OutputBufferGetter output_buffer_getter,
size_t projections,
ThrottlerPtr throttler,
bool sync);
MergeTreeData::MutableDataPartPtr downloadPartToMemory(
MutableDataPartStoragePtr data_part_storage,
const String & part_name,
const MergeTreePartInfo & part_info,
const UUID & part_uuid,
const StorageMetadataPtr & metadata_snapshot,
ContextPtr context,
PooledReadWriteBufferFromHTTP & in,
size_t projections,
bool is_projection,
ThrottlerPtr throttler);
MergeTreeData::MutableDataPartPtr downloadPartToDiskRemoteMeta(
const String & part_name,
const String & replica_path,
bool to_detached,
const String & tmp_prefix_,
DiskPtr disk,
PooledReadWriteBufferFromHTTP & in,
size_t projections,
MergeTreeData::DataPart::Checksums & checksums,
ThrottlerPtr throttler);
StorageReplicatedMergeTree & data;
Poco::Logger * log;
};
}
}
|