aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/clickhouse/src/Disks/IO/AsynchronousBoundedReadBuffer.h
blob: 7d2bf23243a3c9b7868081262bce1d85e540800f (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
#pragma once

#include <chrono>
#include <utility>
#include <IO/AsynchronousReader.h>
#include <IO/ReadBufferFromFile.h>
#include <IO/ReadSettings.h>
#include <Interpreters/FilesystemReadPrefetchesLog.h>
#include "clickhouse_config.h"

namespace Poco { class Logger; }

namespace DB
{

struct AsyncReadCounters;
using AsyncReadCountersPtr = std::shared_ptr<AsyncReadCounters>;
class ReadBufferFromRemoteFSGather;

class AsynchronousBoundedReadBuffer : public ReadBufferFromFileBase
{
public:
    using Impl = ReadBufferFromFileBase;
    using ImplPtr = std::unique_ptr<Impl>;

    explicit AsynchronousBoundedReadBuffer(
        ImplPtr impl_,
        IAsynchronousReader & reader_,
        const ReadSettings & settings_,
        AsyncReadCountersPtr async_read_counters_ = nullptr,
        FilesystemReadPrefetchesLogPtr prefetches_log_ = nullptr);

    ~AsynchronousBoundedReadBuffer() override;

    String getFileName() const override { return impl->getFileName(); }

    size_t getFileSize() override { return impl->getFileSize(); }

    String getInfoForLog() override { return impl->getInfoForLog(); }

    off_t seek(off_t offset_, int whence) override;

    void prefetch(Priority priority) override;

    void setReadUntilPosition(size_t position) override; /// [..., position).

    void setReadUntilEnd() override { return setReadUntilPosition(getFileSize()); }

    size_t getFileOffsetOfBufferEnd() const override  { return file_offset_of_buffer_end; }

    off_t getPosition() override { return file_offset_of_buffer_end - available() + bytes_to_ignore; }

private:
    const ImplPtr impl;
    const ReadSettings read_settings;
    IAsynchronousReader & reader;

    size_t file_offset_of_buffer_end = 0;
    std::optional<size_t> read_until_position;
    /// If nonzero then working_buffer is empty.
    /// If a prefetch is in flight, the prefetch task has been instructed to ignore this many bytes.
    size_t bytes_to_ignore = 0;

    Memory<> prefetch_buffer;
    std::future<IAsynchronousReader::Result> prefetch_future;

    const std::string query_id;
    const std::string current_reader_id;

    Poco::Logger * log;

    AsyncReadCountersPtr async_read_counters;
    FilesystemReadPrefetchesLogPtr prefetches_log;

    struct LastPrefetchInfo
    {
        std::chrono::system_clock::time_point submit_time;
        Priority priority;
    };
    LastPrefetchInfo last_prefetch_info;

    bool nextImpl() override;

    void finalize();

    bool hasPendingDataToRead();

    void appendToPrefetchLog(
        FilesystemPrefetchState state,
        int64_t size,
        const std::unique_ptr<Stopwatch> & execution_watch);

    std::future<IAsynchronousReader::Result> asyncReadInto(char * data, size_t size, Priority priority);

    void resetPrefetch(FilesystemPrefetchState state);

};

}