1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
|
#pragma once
#include <IO/WriteBufferFromFileDecorator.h>
#include <IO/WriteSettings.h>
#include <Interpreters/Cache/FileCache.h>
#include <Interpreters/FilesystemCacheLog.h>
namespace Poco
{
class Logger;
}
namespace DB
{
/**
* We want to write eventually some size, which is not known until the very end.
* Therefore we allocate file segments lazily. Each file segment is assigned capacity
* of max_file_segment_size, but reserved_size remains 0, until call to tryReserve().
* Once current file segment is full (reached max_file_segment_size), we allocate a
* new file segment. All allocated file segments resize in file segments holder.
* If at the end of all writes, the last file segment is not full, then it is resized.
*/
class FileSegmentRangeWriter
{
public:
FileSegmentRangeWriter(
FileCache * cache_, const FileSegment::Key & key_,
std::shared_ptr<FilesystemCacheLog> cache_log_, const String & query_id_, const String & source_path_);
/**
* Write a range of file segments. Allocate file segment of `max_file_segment_size` and write to
* it until it is full and then allocate next file segment.
*/
bool write(const char * data, size_t size, size_t offset, FileSegmentKind segment_kind);
void finalize();
~FileSegmentRangeWriter();
private:
FileSegment & allocateFileSegment(size_t offset, FileSegmentKind segment_kind);
void appendFilesystemCacheLog(const FileSegment & file_segment);
void completeFileSegment();
FileCache * cache;
FileSegment::Key key;
Poco::Logger * log;
std::shared_ptr<FilesystemCacheLog> cache_log;
String query_id;
String source_path;
FileSegmentsHolderPtr file_segments;
size_t expected_write_offset = 0;
bool finalized = false;
};
/**
* Write buffer for filesystem caching on write operations.
*/
class CachedOnDiskWriteBufferFromFile final : public WriteBufferFromFileDecorator
{
public:
CachedOnDiskWriteBufferFromFile(
std::unique_ptr<WriteBuffer> impl_,
FileCachePtr cache_,
const String & source_path_,
const FileCache::Key & key_,
const String & query_id_,
const WriteSettings & settings_);
void nextImpl() override;
void finalizeImpl() override;
private:
void cacheData(char * data, size_t size, bool throw_on_error);
Poco::Logger * log;
FileCachePtr cache;
String source_path;
FileCache::Key key;
size_t current_download_offset = 0;
const String query_id;
bool enable_cache_log;
bool throw_on_error_from_cache;
bool cache_in_error_state_or_disabled = false;
std::unique_ptr<FileSegmentRangeWriter> cache_writer;
};
}
|