blob: beebd29a6ea3f5a87d8cd7e47733ae708badd0ba (
plain) (
blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
|
#pragma once
#error #include <libnuraft/log_store.hxx>
#include <map>
#include <mutex>
#include <Core/Types.h>
#include <Coordination/Changelog.h>
#include <Coordination/KeeperContext.h>
#include <base/defines.h>
namespace DB
{
/// Wrapper around Changelog class. Implements RAFT log storage.
class KeeperLogStore : public nuraft::log_store
{
public:
KeeperLogStore(LogFileSettings log_file_settings, KeeperContextPtr keeper_context);
/// Read log storage from filesystem starting from last_commited_log_index
void init(uint64_t last_commited_log_index, uint64_t logs_to_keep);
uint64_t start_index() const override;
uint64_t next_slot() const override;
/// return last entry from log
nuraft::ptr<nuraft::log_entry> last_entry() const override;
/// Append new entry to log
uint64_t append(nuraft::ptr<nuraft::log_entry> & entry) override;
/// Remove all entries starting from index and write entry into index position
void write_at(uint64_t index, nuraft::ptr<nuraft::log_entry> & entry) override;
/// Return entries between [start, end)
nuraft::ptr<std::vector<nuraft::ptr<nuraft::log_entry>>> log_entries(uint64_t start, uint64_t end) override;
/// Return entry at index
nuraft::ptr<nuraft::log_entry> entry_at(uint64_t index) override;
/// Term if the index
uint64_t term_at(uint64_t index) override;
/// Serialize entries in interval [index, index + cnt)
nuraft::ptr<nuraft::buffer> pack(uint64_t index, int32_t cnt) override;
/// Apply serialized entries starting from index
void apply_pack(uint64_t index, nuraft::buffer & pack) override;
/// Entries from last_log_index can be removed from memory and from disk
bool compact(uint64_t last_log_index) override;
/// Call fsync to the stored data
bool flush() override;
/// Stop background cleanup thread in change
void shutdownChangelog();
/// Flush logstore and call shutdown of background thread in changelog
bool flushChangelogAndShutdown();
/// Current log storage size
uint64_t size() const;
uint64_t last_durable_index() override;
/// Flush batch of appended entries
void end_of_append_batch(uint64_t start_index, uint64_t count) override;
/// Get entry with latest config in logstore
nuraft::ptr<nuraft::log_entry> getLatestConfigChange() const;
void setRaftServer(const nuraft::ptr<nuraft::raft_server> & raft_server);
private:
mutable std::mutex changelog_lock;
Poco::Logger * log;
Changelog changelog TSA_GUARDED_BY(changelog_lock);
};
}
|