aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/clickhouse/src/Storages/FileLog/ReadBufferFromFileLog.h
blob: 5991fe29b70e8fe7387b8413c682c316bbfebe42 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
#pragma once

#include <Core/BackgroundSchedulePool.h>
#include <IO/ReadBuffer.h>
#include <Storages/FileLog/StorageFileLog.h>

#include <fstream>
#include <mutex>

namespace DB
{
class ReadBufferFromFileLog : public ReadBuffer
{
public:
    ReadBufferFromFileLog(
        StorageFileLog & storage_,
        size_t max_batch_size,
        size_t poll_timeout_,
        ContextPtr context_,
        size_t stream_number_,
        size_t max_streams_number_);

    ~ReadBufferFromFileLog() override = default;

    auto pollTimeout() const { return poll_timeout; }

    bool hasMorePolledRecords() const { return current != records.end(); }

    bool poll();

    bool noRecords() { return buffer_status == BufferStatus::NO_RECORD_RETURNED; }

    auto getFileName() const { return current_file; }
    auto getOffset() const { return current_offset; }

private:
    enum class BufferStatus
    {
        INIT,
        NO_RECORD_RETURNED,
        POLLED_OK,
    };

    BufferStatus buffer_status = BufferStatus::INIT;

    Poco::Logger * log;

    StorageFileLog & storage;

    bool stream_out = false;

    size_t batch_size;
    size_t poll_timeout;

    ContextPtr context;

    size_t stream_number;
    size_t max_streams_number;

    bool allowed = true;

    using RecordData = std::string;
    struct Record
    {
        RecordData data;
        std::string file_name;
        /// Offset is the start of a row, which is needed for virtual columns.
        UInt64 offset;
    };
    using Records = std::vector<Record>;

    Records records;
    Records::const_iterator current;

    String current_file;
    UInt64 current_offset = 0;

    using TaskThread = BackgroundSchedulePool::TaskHolder;

    Records pollBatch(size_t batch_size_);

    void readNewRecords(Records & new_records, size_t batch_size_);

    bool nextImpl() override;
};
}