blob: 7d2bf23243a3c9b7868081262bce1d85e540800f (
plain) (
blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
|
#pragma once
#include <chrono>
#include <utility>
#include <IO/AsynchronousReader.h>
#include <IO/ReadBufferFromFile.h>
#include <IO/ReadSettings.h>
#include <Interpreters/FilesystemReadPrefetchesLog.h>
#include "clickhouse_config.h"
namespace Poco { class Logger; }
namespace DB
{
struct AsyncReadCounters;
using AsyncReadCountersPtr = std::shared_ptr<AsyncReadCounters>;
class ReadBufferFromRemoteFSGather;
class AsynchronousBoundedReadBuffer : public ReadBufferFromFileBase
{
public:
using Impl = ReadBufferFromFileBase;
using ImplPtr = std::unique_ptr<Impl>;
explicit AsynchronousBoundedReadBuffer(
ImplPtr impl_,
IAsynchronousReader & reader_,
const ReadSettings & settings_,
AsyncReadCountersPtr async_read_counters_ = nullptr,
FilesystemReadPrefetchesLogPtr prefetches_log_ = nullptr);
~AsynchronousBoundedReadBuffer() override;
String getFileName() const override { return impl->getFileName(); }
size_t getFileSize() override { return impl->getFileSize(); }
String getInfoForLog() override { return impl->getInfoForLog(); }
off_t seek(off_t offset_, int whence) override;
void prefetch(Priority priority) override;
void setReadUntilPosition(size_t position) override; /// [..., position).
void setReadUntilEnd() override { return setReadUntilPosition(getFileSize()); }
size_t getFileOffsetOfBufferEnd() const override { return file_offset_of_buffer_end; }
off_t getPosition() override { return file_offset_of_buffer_end - available() + bytes_to_ignore; }
private:
const ImplPtr impl;
const ReadSettings read_settings;
IAsynchronousReader & reader;
size_t file_offset_of_buffer_end = 0;
std::optional<size_t> read_until_position;
/// If nonzero then working_buffer is empty.
/// If a prefetch is in flight, the prefetch task has been instructed to ignore this many bytes.
size_t bytes_to_ignore = 0;
Memory<> prefetch_buffer;
std::future<IAsynchronousReader::Result> prefetch_future;
const std::string query_id;
const std::string current_reader_id;
Poco::Logger * log;
AsyncReadCountersPtr async_read_counters;
FilesystemReadPrefetchesLogPtr prefetches_log;
struct LastPrefetchInfo
{
std::chrono::system_clock::time_point submit_time;
Priority priority;
};
LastPrefetchInfo last_prefetch_info;
bool nextImpl() override;
void finalize();
bool hasPendingDataToRead();
void appendToPrefetchLog(
FilesystemPrefetchState state,
int64_t size,
const std::unique_ptr<Stopwatch> & execution_watch);
std::future<IAsynchronousReader::Result> asyncReadInto(char * data, size_t size, Priority priority);
void resetPrefetch(FilesystemPrefetchState state);
};
}
|