1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
|
#pragma once
#include "clickhouse_config.h"
#if USE_ARROW || USE_ORC || USE_PARQUET
#include <optional>
#error #include <arrow/io/interfaces.h>
#define ORC_MAGIC_BYTES "ORC"
#define PARQUET_MAGIC_BYTES "PAR1"
#define ARROW_MAGIC_BYTES "ARROW1"
namespace DB
{
class ReadBuffer;
class WriteBuffer;
class SeekableReadBuffer;
struct FormatSettings;
class ArrowBufferedOutputStream : public arrow::io::OutputStream
{
public:
explicit ArrowBufferedOutputStream(WriteBuffer & out_);
// FileInterface
arrow::Status Close() override;
arrow::Result<int64_t> Tell() const override;
bool closed() const override { return !is_open; }
// Writable
arrow::Status Write(const void * data, int64_t length) override;
private:
WriteBuffer & out;
int64_t total_length = 0;
bool is_open = false;
ARROW_DISALLOW_COPY_AND_ASSIGN(ArrowBufferedOutputStream);
};
class RandomAccessFileFromSeekableReadBuffer : public arrow::io::RandomAccessFile
{
public:
RandomAccessFileFromSeekableReadBuffer(ReadBuffer & in_, std::optional<off_t> file_size_, bool avoid_buffering_);
arrow::Result<int64_t> GetSize() override;
arrow::Status Close() override;
arrow::Result<int64_t> Tell() const override;
bool closed() const override { return !is_open; }
arrow::Result<int64_t> Read(int64_t nbytes, void * out) override;
arrow::Result<std::shared_ptr<arrow::Buffer>> Read(int64_t nbytes) override;
/// Override async reading to avoid using internal arrow thread pool.
/// In our code we don't use async reading, so implementation is sync,
/// we just call ReadAt and return future with ready value.
arrow::Future<std::shared_ptr<arrow::Buffer>> ReadAsync(const arrow::io::IOContext&, int64_t position, int64_t nbytes) override;
arrow::Status Seek(int64_t position) override;
private:
ReadBuffer & in;
SeekableReadBuffer & seekable_in;
std::optional<off_t> file_size;
bool is_open = false;
bool avoid_buffering = false;
ARROW_DISALLOW_COPY_AND_ASSIGN(RandomAccessFileFromSeekableReadBuffer);
};
class RandomAccessFileFromRandomAccessReadBuffer : public arrow::io::RandomAccessFile
{
public:
explicit RandomAccessFileFromRandomAccessReadBuffer(SeekableReadBuffer & in_, size_t file_size_);
// These are thread safe.
arrow::Result<int64_t> GetSize() override;
arrow::Result<int64_t> ReadAt(int64_t position, int64_t nbytes, void* out) override;
arrow::Result<std::shared_ptr<arrow::Buffer>> ReadAt(int64_t position, int64_t nbytes) override;
arrow::Future<std::shared_ptr<arrow::Buffer>> ReadAsync(
const arrow::io::IOContext&, int64_t position, int64_t nbytes) override;
// These are not thread safe, and arrow shouldn't call them. Return NotImplemented error.
arrow::Status Seek(int64_t) override;
arrow::Result<int64_t> Tell() const override;
arrow::Result<int64_t> Read(int64_t, void*) override;
arrow::Result<std::shared_ptr<arrow::Buffer>> Read(int64_t) override;
arrow::Status Close() override;
bool closed() const override { return !is_open; }
private:
SeekableReadBuffer & in;
size_t file_size;
bool is_open = true;
ARROW_DISALLOW_COPY_AND_ASSIGN(RandomAccessFileFromRandomAccessReadBuffer);
};
class ArrowInputStreamFromReadBuffer : public arrow::io::InputStream
{
public:
explicit ArrowInputStreamFromReadBuffer(ReadBuffer & in);
arrow::Result<int64_t> Read(int64_t nbytes, void* out) override;
arrow::Result<std::shared_ptr<arrow::Buffer>> Read(int64_t nbytes) override;
arrow::Status Abort() override;
arrow::Result<int64_t> Tell() const override;
arrow::Status Close() override;
bool closed() const override { return !is_open; }
private:
ReadBuffer & in;
bool is_open = false;
ARROW_DISALLOW_COPY_AND_ASSIGN(ArrowInputStreamFromReadBuffer);
};
std::shared_ptr<arrow::io::RandomAccessFile> asArrowFile(
ReadBuffer & in,
const FormatSettings & settings,
std::atomic<int> & is_cancelled,
const std::string & format_name,
const std::string & magic_bytes,
// If true, we'll use ReadBuffer::setReadUntilPosition() to avoid buffering and readahead as
// much as possible. For HTTP or S3 ReadBuffer, this means that each RandomAccessFile
// read call will do a new HTTP request. Used in parquet pre-buffered reading mode, which makes
// arrow do its own buffering and coalescing of reads.
// (ReadBuffer is not a good abstraction in this case, but it works.)
bool avoid_buffering = false);
// Reads the whole file into a memory buffer, owned by the returned RandomAccessFile.
std::shared_ptr<arrow::io::RandomAccessFile> asArrowFileLoadIntoMemory(
ReadBuffer & in,
std::atomic<int> & is_cancelled,
const std::string & format_name,
const std::string & magic_bytes);
}
#endif
|