aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/clickhouse/src/Disks/IO/ReadBufferFromRemoteFSGather.h
blob: e1756bd05e7225583ab7f8f1abdf93df044779dc (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
#pragma once

#include "clickhouse_config.h"
#include <IO/ReadBufferFromFile.h>
#include <IO/ReadSettings.h>
#include <IO/AsynchronousReader.h>
#include <Disks/ObjectStorages/IObjectStorage.h>

namespace Poco { class Logger; }

namespace DB
{
class FilesystemCacheLog;

/**
 * Remote disk might need to split one clickhouse file into multiple files in remote fs.
 * This class works like a proxy to allow transition from one file into multiple.
 */
class ReadBufferFromRemoteFSGather final : public ReadBufferFromFileBase
{
friend class ReadIndirectBufferFromRemoteFS;

public:
    using ReadBufferCreator = std::function<std::unique_ptr<ReadBufferFromFileBase>(const std::string & path, size_t read_until_position)>;

    ReadBufferFromRemoteFSGather(
        ReadBufferCreator && read_buffer_creator_,
        const StoredObjects & blobs_to_read_,
        const ReadSettings & settings_,
        std::shared_ptr<FilesystemCacheLog> cache_log_,
        bool use_external_buffer_);

    ~ReadBufferFromRemoteFSGather() override;

    String getFileName() const override { return current_object.remote_path; }

    String getInfoForLog() override { return current_buf ? current_buf->getInfoForLog() : ""; }

    void setReadUntilPosition(size_t position) override;

    void setReadUntilEnd() override { return setReadUntilPosition(getFileSize()); }

    IAsynchronousReader::Result readInto(char * data, size_t size, size_t offset, size_t ignore) override;

    size_t getFileSize() override { return getTotalSize(blobs_to_read); }

    size_t getFileOffsetOfBufferEnd() const override { return file_offset_of_buffer_end; }

    off_t seek(off_t offset, int whence) override;

    off_t getPosition() override { return file_offset_of_buffer_end - available() + bytes_to_ignore; }

    bool seekIsCheap() override { return !current_buf; }

private:
    SeekableReadBufferPtr createImplementationBuffer(const StoredObject & object);

    bool nextImpl() override;

    void initialize();

    bool readImpl();

    bool moveToNextBuffer();

    void appendUncachedReadInfo();

    void reset();

    const ReadSettings settings;
    const StoredObjects blobs_to_read;
    const ReadBufferCreator read_buffer_creator;
    const std::shared_ptr<FilesystemCacheLog> cache_log;
    const String query_id;
    const bool use_external_buffer;
    const bool with_cache;

    size_t read_until_position = 0;
    size_t file_offset_of_buffer_end = 0;
    size_t bytes_to_ignore = 0;

    StoredObject current_object;
    size_t current_buf_idx = 0;
    SeekableReadBufferPtr current_buf;

    Poco::Logger * log;
};

size_t chooseBufferSizeForRemoteReading(const DB::ReadSettings & settings, size_t file_size);
}