1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
|
#pragma once
#include <Common/Allocator.h>
#include <Columns/IColumn.h>
#include <Formats/FormatSettings.h>
#include <Interpreters/Context_fwd.h>
#include <IO/BufferWithOwnMemory.h>
#include <IO/CompressionMethod.h>
#include <IO/ParallelReadBuffer.h>
#include <base/types.h>
#include <Core/NamesAndTypes.h>
#include <boost/noncopyable.hpp>
#include <functional>
#include <memory>
#include <unordered_map>
namespace DB
{
class Block;
struct Settings;
struct FormatFactorySettings;
class ReadBuffer;
class WriteBuffer;
class IProcessor;
using ProcessorPtr = std::shared_ptr<IProcessor>;
class IInputFormat;
class IOutputFormat;
class IRowOutputFormat;
struct RowInputFormatParams;
class ISchemaReader;
class IExternalSchemaReader;
using SchemaReaderPtr = std::shared_ptr<ISchemaReader>;
using ExternalSchemaReaderPtr = std::shared_ptr<IExternalSchemaReader>;
using InputFormatPtr = std::shared_ptr<IInputFormat>;
using OutputFormatPtr = std::shared_ptr<IOutputFormat>;
using RowOutputFormatPtr = std::shared_ptr<IRowOutputFormat>;
template <typename Allocator>
struct Memory;
FormatSettings getFormatSettings(ContextPtr context);
template <typename T>
FormatSettings getFormatSettings(ContextPtr context, const T & settings);
/** Allows to create an IInputFormat or IOutputFormat by the name of the format.
* Note: format and compression are independent things.
*/
class FormatFactory final : private boost::noncopyable
{
public:
/** Fast reading data from buffer and save result to memory.
* Reads at least `min_bytes` and some more until the end of the chunk, depends on the format.
* If `max_rows` is non-zero the function also stops after reading the `max_rows` number of rows
* (even if the `min_bytes` boundary isn't reached yet).
* Used in ParallelParsingInputFormat.
*/
using FileSegmentationEngine = std::function<std::pair<bool, size_t>(
ReadBuffer & buf,
DB::Memory<Allocator<false>> & memory,
size_t min_bytes,
size_t max_rows)>;
private:
// On the input side, there are two kinds of formats:
// * InputCreator - formats parsed sequentially, e.g. CSV. Almost all formats are like this.
// FormatFactory uses ParallelReadBuffer to read in parallel, and ParallelParsingInputFormat
// to parse in parallel; the formats mostly don't need to worry about it.
// * RandomAccessInputCreator - column-oriented formats that require seeking back and forth in
// the file when reading. E.g. Parquet has metadata at the end of the file (needs to be read
// before we can parse any data), can skip columns by seeking in the file, and often reads
// many short byte ranges from the file. ParallelReadBuffer and ParallelParsingInputFormat
// are a poor fit. Instead, the format implementation is in charge of parallelizing both
// reading and parsing.
using InputCreator = std::function<InputFormatPtr(
ReadBuffer & buf,
const Block & header,
const RowInputFormatParams & params,
const FormatSettings & settings)>;
// Incompatible with FileSegmentationEngine.
using RandomAccessInputCreator = std::function<InputFormatPtr(
ReadBuffer & buf,
const Block & header,
const FormatSettings & settings,
const ReadSettings& read_settings,
bool is_remote_fs,
size_t max_download_threads,
size_t max_parsing_threads)>;
using OutputCreator = std::function<OutputFormatPtr(
WriteBuffer & buf,
const Block & sample,
const FormatSettings & settings)>;
/// Some input formats can have non trivial readPrefix() and readSuffix(),
/// so in some cases there is no possibility to use parallel parsing.
/// The checker should return true if parallel parsing should be disabled.
using NonTrivialPrefixAndSuffixChecker = std::function<bool(ReadBuffer & buf)>;
/// Some formats can support append depending on settings.
/// The checker should return true if format support append.
using AppendSupportChecker = std::function<bool(const FormatSettings & settings)>;
using SchemaReaderCreator = std::function<SchemaReaderPtr(ReadBuffer & in, const FormatSettings & settings)>;
using ExternalSchemaReaderCreator = std::function<ExternalSchemaReaderPtr(const FormatSettings & settings)>;
/// Some formats can extract different schemas from the same source depending on
/// some settings. To process this case in schema cache we should add some additional
/// information to a cache key. This getter should return some string with information
/// about such settings. For example, for Protobuf format it's the path to the schema
/// and the name of the message.
using AdditionalInfoForSchemaCacheGetter = std::function<String(const FormatSettings & settings)>;
/// Some formats can support reading subset of columns depending on settings.
/// The checker should return true if format support append.
using SubsetOfColumnsSupportChecker = std::function<bool(const FormatSettings & settings)>;
struct Creators
{
InputCreator input_creator;
RandomAccessInputCreator random_access_input_creator;
OutputCreator output_creator;
FileSegmentationEngine file_segmentation_engine;
SchemaReaderCreator schema_reader_creator;
ExternalSchemaReaderCreator external_schema_reader_creator;
bool supports_parallel_formatting{false};
bool prefers_large_blocks{false};
NonTrivialPrefixAndSuffixChecker non_trivial_prefix_and_suffix_checker;
AppendSupportChecker append_support_checker;
AdditionalInfoForSchemaCacheGetter additional_info_for_schema_cache_getter;
SubsetOfColumnsSupportChecker subset_of_columns_support_checker;
};
using FormatsDictionary = std::unordered_map<String, Creators>;
using FileExtensionFormats = std::unordered_map<String, String>;
public:
static FormatFactory & instance();
/// This has two tricks up its sleeve:
/// * Parallel reading.
/// To enable it, make sure `buf` is a SeekableReadBuffer implementing readBigAt().
/// * Parallel parsing.
/// `buf` must outlive the returned IInputFormat.
InputFormatPtr getInput(
const String & name,
ReadBuffer & buf,
const Block & sample,
ContextPtr context,
UInt64 max_block_size,
const std::optional<FormatSettings> & format_settings = std::nullopt,
std::optional<size_t> max_parsing_threads = std::nullopt,
std::optional<size_t> max_download_threads = std::nullopt,
// affects things like buffer sizes and parallel reading
bool is_remote_fs = false,
// allows to do: buf -> parallel read -> decompression,
// because parallel read after decompression is not possible
CompressionMethod compression = CompressionMethod::None) const;
/// Checks all preconditions. Returns ordinary format if parallel formatting cannot be done.
OutputFormatPtr getOutputFormatParallelIfPossible(
const String & name,
WriteBuffer & buf,
const Block & sample,
ContextPtr context,
const std::optional<FormatSettings> & format_settings = std::nullopt) const;
OutputFormatPtr getOutputFormat(
const String & name,
WriteBuffer & buf,
const Block & sample,
ContextPtr context,
const std::optional<FormatSettings> & _format_settings = std::nullopt) const;
String getContentType(
const String & name,
ContextPtr context,
const std::optional<FormatSettings> & format_settings = std::nullopt) const;
SchemaReaderPtr getSchemaReader(
const String & name,
ReadBuffer & buf,
ContextPtr & context,
const std::optional<FormatSettings> & format_settings = std::nullopt) const;
ExternalSchemaReaderPtr getExternalSchemaReader(
const String & name,
ContextPtr & context,
const std::optional<FormatSettings> & format_settings = std::nullopt) const;
void registerFileSegmentationEngine(const String & name, FileSegmentationEngine file_segmentation_engine);
void registerNonTrivialPrefixAndSuffixChecker(const String & name, NonTrivialPrefixAndSuffixChecker non_trivial_prefix_and_suffix_checker);
void registerAppendSupportChecker(const String & name, AppendSupportChecker append_support_checker);
/// If format always doesn't support append, you can use this method instead of
/// registerAppendSupportChecker with append_support_checker that always returns true.
void markFormatHasNoAppendSupport(const String & name);
bool checkIfFormatSupportAppend(const String & name, ContextPtr context, const std::optional<FormatSettings> & format_settings_ = std::nullopt);
/// Register format by its name.
void registerInputFormat(const String & name, InputCreator input_creator);
void registerRandomAccessInputFormat(const String & name, RandomAccessInputCreator input_creator);
void registerOutputFormat(const String & name, OutputCreator output_creator);
/// Register file extension for format
void registerFileExtension(const String & extension, const String & format_name);
String getFormatFromFileName(String file_name, bool throw_if_not_found = false);
String getFormatFromFileDescriptor(int fd);
/// Register schema readers for format its name.
void registerSchemaReader(const String & name, SchemaReaderCreator schema_reader_creator);
void registerExternalSchemaReader(const String & name, ExternalSchemaReaderCreator external_schema_reader_creator);
void markOutputFormatSupportsParallelFormatting(const String & name);
void markOutputFormatPrefersLargeBlocks(const String & name);
void markFormatSupportsSubsetOfColumns(const String & name);
void registerSubsetOfColumnsSupportChecker(const String & name, SubsetOfColumnsSupportChecker subset_of_columns_support_checker);
bool checkIfFormatSupportsSubsetOfColumns(const String & name, const ContextPtr & context, const std::optional<FormatSettings> & format_settings_ = std::nullopt) const;
bool checkIfFormatHasSchemaReader(const String & name) const;
bool checkIfFormatHasExternalSchemaReader(const String & name) const;
bool checkIfFormatHasAnySchemaReader(const String & name) const;
bool checkIfOutputFormatPrefersLargeBlocks(const String & name) const;
bool checkParallelizeOutputAfterReading(const String & name, ContextPtr context) const;
void registerAdditionalInfoForSchemaCacheGetter(const String & name, AdditionalInfoForSchemaCacheGetter additional_info_for_schema_cache_getter);
String getAdditionalInfoForSchemaCache(const String & name, ContextPtr context, const std::optional<FormatSettings> & format_settings_ = std::nullopt);
const FormatsDictionary & getAllFormats() const
{
return dict;
}
bool isInputFormat(const String & name) const;
bool isOutputFormat(const String & name) const;
/// Check that format with specified name exists and throw an exception otherwise.
void checkFormatName(const String & name) const;
private:
FormatsDictionary dict;
FileExtensionFormats file_extension_formats;
const Creators & getCreators(const String & name) const;
// Creates a ReadBuffer to give to an input format. Returns nullptr if we should use `buf` directly.
std::unique_ptr<ReadBuffer> wrapReadBufferIfNeeded(
ReadBuffer & buf,
CompressionMethod compression,
const Creators & creators,
const FormatSettings & format_settings,
const Settings & settings,
bool is_remote_fs,
size_t max_download_threads) const;
};
}
|