aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/clickhouse/src/Disks/IO/CachedOnDiskReadBufferFromFile.h
blob: bcca380b878010e28f0654b8e7e850d8165690a7 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
#pragma once

#include <Interpreters/Cache/FileCache.h>
#include <IO/SeekableReadBuffer.h>
#include <IO/WriteBufferFromFile.h>
#include <IO/ReadSettings.h>
#include <IO/ReadBufferFromFileBase.h>
#include <Interpreters/FilesystemCacheLog.h>
#include <Interpreters/Cache/FileSegment.h>


namespace CurrentMetrics
{
extern const Metric FilesystemCacheReadBuffers;
}

namespace DB
{

class CachedOnDiskReadBufferFromFile : public ReadBufferFromFileBase
{
public:
    using ImplementationBufferCreator = std::function<std::unique_ptr<ReadBufferFromFileBase>()>;

    CachedOnDiskReadBufferFromFile(
        const String & source_file_path_,
        const FileCache::Key & cache_key_,
        FileCachePtr cache_,
        ImplementationBufferCreator implementation_buffer_creator_,
        const ReadSettings & settings_,
        const String & query_id_,
        size_t file_size_,
        bool allow_seeks_after_first_read_,
        bool use_external_buffer_,
        std::optional<size_t> read_until_position_,
        std::shared_ptr<FilesystemCacheLog> cache_log_);

    ~CachedOnDiskReadBufferFromFile() override;

    bool nextImpl() override;

    off_t seek(off_t off, int whence) override;

    off_t getPosition() override;

    size_t getFileOffsetOfBufferEnd() const override { return file_offset_of_buffer_end; }

    String getInfoForLog() override;

    void setReadUntilPosition(size_t position) override;

    void setReadUntilEnd() override;

    String getFileName() const override { return source_file_path; }

    enum class ReadType
    {
        CACHED,
        REMOTE_FS_READ_BYPASS_CACHE,
        REMOTE_FS_READ_AND_PUT_IN_CACHE,
    };

private:
    using ImplementationBufferPtr = std::shared_ptr<ReadBufferFromFileBase>;

    void initialize(size_t offset, size_t size);
    void assertCorrectness() const;

    /**
     * Return a list of file segments ordered in ascending order. This list represents
     * a full contiguous interval (without holes).
     */
    FileSegmentsHolderPtr getFileSegments(size_t offset, size_t size) const;

    ImplementationBufferPtr getImplementationBuffer(FileSegment & file_segment);

    ImplementationBufferPtr getReadBufferForFileSegment(FileSegment & file_segment);

    ImplementationBufferPtr getCacheReadBuffer(const FileSegment & file_segment);

    ImplementationBufferPtr getRemoteReadBuffer(FileSegment & file_segment, ReadType read_type_);

    bool updateImplementationBufferIfNeeded();

    void predownload(FileSegment & file_segment);

    bool nextImplStep();

    size_t getTotalSizeToRead();

    bool completeFileSegmentAndGetNext();

    void appendFilesystemCacheLog(const FileSegment & file_segment, ReadType read_type);

    bool writeCache(char * data, size_t size, size_t offset, FileSegment & file_segment);

    static bool canStartFromCache(size_t current_offset, const FileSegment & file_segment);

    Poco::Logger * log;
    FileCache::Key cache_key;
    String source_file_path;

    FileCachePtr cache;
    ReadSettings settings;

    size_t read_until_position;
    size_t file_offset_of_buffer_end = 0;
    size_t bytes_to_predownload = 0;

    ImplementationBufferCreator implementation_buffer_creator;

    /// Remote read buffer, which can only be owned by current buffer.
    ImplementationBufferPtr remote_file_reader;
    ImplementationBufferPtr cache_file_reader;

    FileSegmentsHolderPtr file_segments;

    ImplementationBufferPtr implementation_buffer;
    bool initialized = false;

    ReadType read_type = ReadType::REMOTE_FS_READ_BYPASS_CACHE;

    static String toString(ReadType type)
    {
        switch (type)
        {
            case ReadType::CACHED:
                return "CACHED";
            case ReadType::REMOTE_FS_READ_BYPASS_CACHE:
                return "REMOTE_FS_READ_BYPASS_CACHE";
            case ReadType::REMOTE_FS_READ_AND_PUT_IN_CACHE:
                return "REMOTE_FS_READ_AND_PUT_IN_CACHE";
        }
        UNREACHABLE();
    }

    size_t first_offset = 0;
    String nextimpl_step_log_info;
    String last_caller_id;

    String query_id;
    String current_buffer_id;

    bool allow_seeks_after_first_read;
    [[maybe_unused]]bool use_external_buffer;
    CurrentMetrics::Increment metric_increment{CurrentMetrics::FilesystemCacheReadBuffers};
    ProfileEvents::Counters current_file_segment_counters;

    FileCache::QueryContextHolderPtr query_context_holder;

    std::shared_ptr<FilesystemCacheLog> cache_log;
};

}