blob: 5991fe29b70e8fe7387b8413c682c316bbfebe42 (
plain) (
blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
|
#pragma once
#include <Core/BackgroundSchedulePool.h>
#include <IO/ReadBuffer.h>
#include <Storages/FileLog/StorageFileLog.h>
#include <fstream>
#include <mutex>
namespace DB
{
class ReadBufferFromFileLog : public ReadBuffer
{
public:
ReadBufferFromFileLog(
StorageFileLog & storage_,
size_t max_batch_size,
size_t poll_timeout_,
ContextPtr context_,
size_t stream_number_,
size_t max_streams_number_);
~ReadBufferFromFileLog() override = default;
auto pollTimeout() const { return poll_timeout; }
bool hasMorePolledRecords() const { return current != records.end(); }
bool poll();
bool noRecords() { return buffer_status == BufferStatus::NO_RECORD_RETURNED; }
auto getFileName() const { return current_file; }
auto getOffset() const { return current_offset; }
private:
enum class BufferStatus
{
INIT,
NO_RECORD_RETURNED,
POLLED_OK,
};
BufferStatus buffer_status = BufferStatus::INIT;
Poco::Logger * log;
StorageFileLog & storage;
bool stream_out = false;
size_t batch_size;
size_t poll_timeout;
ContextPtr context;
size_t stream_number;
size_t max_streams_number;
bool allowed = true;
using RecordData = std::string;
struct Record
{
RecordData data;
std::string file_name;
/// Offset is the start of a row, which is needed for virtual columns.
UInt64 offset;
};
using Records = std::vector<Record>;
Records records;
Records::const_iterator current;
String current_file;
UInt64 current_offset = 0;
using TaskThread = BackgroundSchedulePool::TaskHolder;
Records pollBatch(size_t batch_size_);
void readNewRecords(Records & new_records, size_t batch_size_);
bool nextImpl() override;
};
}
|