1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
|
#pragma once
#include "clickhouse_config.h"
#if USE_AWS_S3
#include <Core/Types.h>
#include <Compression/CompressionInfo.h>
#include <Storages/IStorage.h>
#include <Storages/StorageS3Settings.h>
#include <Processors/ISource.h>
#include <Processors/Executors/PullingPipelineExecutor.h>
#include <Processors/Formats/IInputFormat.h>
#include <Poco/URI.h>
#include <IO/S3/getObjectInfo.h>
#include <IO/CompressionMethod.h>
#include <IO/SeekableReadBuffer.h>
#include <Interpreters/Context.h>
#include <Interpreters/threadPoolCallbackRunner.h>
#include <Storages/Cache/SchemaCache.h>
#include <Storages/SelectQueryInfo.h>
#include <Storages/StorageConfiguration.h>
#include <Storages/prepareReadingFromFormat.h>
namespace Aws::S3
{
class Client;
}
namespace DB
{
class PullingPipelineExecutor;
class NamedCollection;
class StorageS3Source : public ISource, WithContext
{
public:
struct KeyWithInfo
{
KeyWithInfo() = default;
KeyWithInfo(String key_, std::optional<S3::ObjectInfo> info_)
: key(std::move(key_)), info(std::move(info_))
{
}
String key;
std::optional<S3::ObjectInfo> info;
};
using KeysWithInfo = std::vector<KeyWithInfo>;
class IIterator
{
public:
virtual ~IIterator() = default;
virtual KeyWithInfo next() = 0;
/// Estimates how many streams we need to process all files.
/// If keys count >= max_threads_count, the returned number may not represent the actual number of the keys.
/// Intended to be called before any next() calls, may underestimate otherwise
/// fixme: May underestimate if the glob has a strong filter, so there are few matches among the first 1000 ListObjects results.
virtual size_t estimatedKeysCount() = 0;
KeyWithInfo operator ()() { return next(); }
};
class DisclosedGlobIterator : public IIterator
{
public:
DisclosedGlobIterator(
const S3::Client & client_,
const S3::URI & globbed_uri_,
ASTPtr query,
const NamesAndTypesList & virtual_columns,
ContextPtr context,
KeysWithInfo * read_keys_ = nullptr,
const S3Settings::RequestSettings & request_settings_ = {},
std::function<void(FileProgress)> progress_callback_ = {});
KeyWithInfo next() override;
size_t estimatedKeysCount() override;
private:
class Impl;
/// shared_ptr to have copy constructor
std::shared_ptr<Impl> pimpl;
};
class KeysIterator : public IIterator
{
public:
explicit KeysIterator(
const S3::Client & client_,
const std::string & version_id_,
const std::vector<String> & keys_,
const String & bucket_,
const S3Settings::RequestSettings & request_settings_,
ASTPtr query,
const NamesAndTypesList & virtual_columns,
ContextPtr context,
KeysWithInfo * read_keys = nullptr,
std::function<void(FileProgress)> progress_callback_ = {});
KeyWithInfo next() override;
size_t estimatedKeysCount() override;
private:
class Impl;
/// shared_ptr to have copy constructor
std::shared_ptr<Impl> pimpl;
};
class ReadTaskIterator : public IIterator
{
public:
explicit ReadTaskIterator(const ReadTaskCallback & callback_, const size_t max_threads_count);
KeyWithInfo next() override;
size_t estimatedKeysCount() override;
private:
KeysWithInfo buffer;
std::atomic_size_t index = 0;
ReadTaskCallback callback;
};
StorageS3Source(
const ReadFromFormatInfo & info,
const String & format,
String name_,
ContextPtr context_,
std::optional<FormatSettings> format_settings_,
UInt64 max_block_size_,
const S3Settings::RequestSettings & request_settings_,
String compression_hint_,
const std::shared_ptr<const S3::Client> & client_,
const String & bucket,
const String & version_id,
const String & url_host_and_port,
std::shared_ptr<IIterator> file_iterator_,
size_t max_parsing_threads,
bool need_only_count_,
std::optional<SelectQueryInfo> query_info);
~StorageS3Source() override;
String getName() const override;
Chunk generate() override;
private:
friend class StorageS3QueueSource;
String name;
String bucket;
String version_id;
String url_host_and_port;
String format;
ColumnsDescription columns_desc;
NamesAndTypesList requested_columns;
UInt64 max_block_size;
S3Settings::RequestSettings request_settings;
String compression_hint;
std::shared_ptr<const S3::Client> client;
Block sample_block;
std::optional<FormatSettings> format_settings;
std::optional<SelectQueryInfo> query_info;
struct ReaderHolder
{
public:
ReaderHolder(
KeyWithInfo key_with_info_,
String bucket_,
std::unique_ptr<ReadBuffer> read_buf_,
std::shared_ptr<ISource> source_,
std::unique_ptr<QueryPipeline> pipeline_,
std::unique_ptr<PullingPipelineExecutor> reader_)
: key_with_info(std::move(key_with_info_))
, bucket(std::move(bucket_))
, read_buf(std::move(read_buf_))
, source(std::move(source_))
, pipeline(std::move(pipeline_))
, reader(std::move(reader_))
{
}
ReaderHolder() = default;
ReaderHolder(const ReaderHolder & other) = delete;
ReaderHolder & operator=(const ReaderHolder & other) = delete;
ReaderHolder(ReaderHolder && other) noexcept
{
*this = std::move(other);
}
ReaderHolder & operator=(ReaderHolder && other) noexcept
{
/// The order of destruction is important.
/// reader uses pipeline, pipeline uses read_buf.
reader = std::move(other.reader);
pipeline = std::move(other.pipeline);
source = std::move(other.source);
read_buf = std::move(other.read_buf);
key_with_info = std::move(other.key_with_info);
bucket = std::move(other.bucket);
return *this;
}
explicit operator bool() const { return reader != nullptr; }
PullingPipelineExecutor * operator->() { return reader.get(); }
const PullingPipelineExecutor * operator->() const { return reader.get(); }
String getPath() const { return fs::path(bucket) / key_with_info.key; }
const String & getFile() const { return key_with_info.key; }
const KeyWithInfo & getKeyWithInfo() const { return key_with_info; }
const IInputFormat * getInputFormat() const { return dynamic_cast<const IInputFormat *>(source.get()); }
private:
KeyWithInfo key_with_info;
String bucket;
std::unique_ptr<ReadBuffer> read_buf;
std::shared_ptr<ISource> source;
std::unique_ptr<QueryPipeline> pipeline;
std::unique_ptr<PullingPipelineExecutor> reader;
};
ReaderHolder reader;
NamesAndTypesList requested_virtual_columns;
std::shared_ptr<IIterator> file_iterator;
size_t max_parsing_threads = 1;
bool need_only_count;
Poco::Logger * log = &Poco::Logger::get("StorageS3Source");
ThreadPool create_reader_pool;
ThreadPoolCallbackRunner<ReaderHolder> create_reader_scheduler;
std::future<ReaderHolder> reader_future;
size_t total_rows_in_file = 0;
/// Recreate ReadBuffer and Pipeline for each file.
ReaderHolder createReader();
std::future<ReaderHolder> createReaderAsync();
std::unique_ptr<ReadBuffer> createS3ReadBuffer(const String & key, size_t object_size);
std::unique_ptr<ReadBuffer> createAsyncS3ReadBuffer(const String & key, const ReadSettings & read_settings, size_t object_size);
void addNumRowsToCache(const String & key, size_t num_rows);
std::optional<size_t> tryGetNumRowsFromCache(const KeyWithInfo & key_with_info);
};
/**
* This class represents table engine for external S3 urls.
* It sends HTTP GET to server when select is called and
* HTTP PUT when insert is called.
*/
class StorageS3 : public IStorage
{
public:
struct Configuration : public StatelessTableEngineConfiguration
{
Configuration() = default;
String getPath() const { return url.key; }
bool update(ContextPtr context);
void connect(ContextPtr context);
bool withGlobs() const { return url.key.find_first_of("*?{") != std::string::npos; }
bool withWildcard() const
{
static const String PARTITION_ID_WILDCARD = "{_partition_id}";
return url.bucket.find(PARTITION_ID_WILDCARD) != String::npos
|| keys.back().find(PARTITION_ID_WILDCARD) != String::npos;
}
S3::URI url;
S3::AuthSettings auth_settings;
S3Settings::RequestSettings request_settings;
/// If s3 configuration was passed from ast, then it is static.
/// If from config - it can be changed with config reload.
bool static_configuration = true;
/// Headers from ast is a part of static configuration.
HTTPHeaderEntries headers_from_ast;
std::shared_ptr<const S3::Client> client;
std::shared_ptr<const S3::Client> client_with_long_timeout;
std::vector<String> keys;
};
StorageS3(
const Configuration & configuration_,
ContextPtr context_,
const StorageID & table_id_,
const ColumnsDescription & columns_,
const ConstraintsDescription & constraints_,
const String & comment,
std::optional<FormatSettings> format_settings_,
bool distributed_processing_ = false,
ASTPtr partition_by_ = nullptr);
String getName() const override
{
return name;
}
Pipe read(
const Names & column_names,
const StorageSnapshotPtr & storage_snapshot,
SelectQueryInfo & query_info,
ContextPtr context,
QueryProcessingStage::Enum processed_stage,
size_t max_block_size,
size_t num_streams) override;
SinkToStoragePtr write(const ASTPtr & query, const StorageMetadataPtr & /*metadata_snapshot*/, ContextPtr context, bool async_insert) override;
void truncate(const ASTPtr & query, const StorageMetadataPtr & metadata_snapshot, ContextPtr local_context, TableExclusiveLockHolder &) override;
NamesAndTypesList getVirtuals() const override;
bool supportsPartitionBy() const override;
static void processNamedCollectionResult(StorageS3::Configuration & configuration, const NamedCollection & collection);
static SchemaCache & getSchemaCache(const ContextPtr & ctx);
static StorageS3::Configuration getConfiguration(ASTs & engine_args, ContextPtr local_context, bool get_format_from_file = true);
static ColumnsDescription getTableStructureFromData(
const StorageS3::Configuration & configuration,
const std::optional<FormatSettings> & format_settings,
ContextPtr ctx);
using KeysWithInfo = StorageS3Source::KeysWithInfo;
static std::optional<ColumnsDescription> tryGetColumnsFromCache(
const KeysWithInfo::const_iterator & begin,
const KeysWithInfo::const_iterator & end,
const Configuration & configuration,
const std::optional<FormatSettings> & format_settings,
const ContextPtr & ctx);
static void addColumnsToCache(
const KeysWithInfo & keys,
const Configuration & configuration,
const ColumnsDescription & columns,
const String & format_name,
const std::optional<FormatSettings> & format_settings,
const ContextPtr & ctx);
bool supportsTrivialCountOptimization() const override { return true; }
protected:
virtual Configuration updateConfigurationAndGetCopy(ContextPtr local_context);
virtual void updateConfiguration(ContextPtr local_context);
void useConfiguration(const Configuration & new_configuration);
const Configuration & getConfiguration();
private:
friend class StorageS3Cluster;
friend class TableFunctionS3Cluster;
friend class StorageS3Queue;
Configuration configuration;
std::mutex configuration_update_mutex;
NamesAndTypesList virtual_columns;
String name;
const bool distributed_processing;
std::optional<FormatSettings> format_settings;
ASTPtr partition_by;
static std::shared_ptr<StorageS3Source::IIterator> createFileIterator(
const Configuration & configuration,
bool distributed_processing,
ContextPtr local_context,
ASTPtr query,
const NamesAndTypesList & virtual_columns,
KeysWithInfo * read_keys = nullptr,
std::function<void(FileProgress)> progress_callback = {});
static ColumnsDescription getTableStructureFromDataImpl(
const Configuration & configuration,
const std::optional<FormatSettings> & format_settings,
ContextPtr ctx);
bool supportsSubcolumns() const override { return true; }
bool supportsSubsetOfColumns(const ContextPtr & context) const;
bool prefersLargeBlocks() const override;
bool parallelizeOutputAfterReading(ContextPtr context) const override;
};
}
#endif
|