aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/clickhouse/src/Coordination/KeeperLogStore.h
blob: beebd29a6ea3f5a87d8cd7e47733ae708badd0ba (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
#pragma once
#error #include <libnuraft/log_store.hxx>
#include <map>
#include <mutex>
#include <Core/Types.h>
#include <Coordination/Changelog.h>
#include <Coordination/KeeperContext.h>
#include <base/defines.h>

namespace DB
{

/// Wrapper around Changelog class. Implements RAFT log storage.
class KeeperLogStore : public nuraft::log_store
{
public:
    KeeperLogStore(LogFileSettings log_file_settings, KeeperContextPtr keeper_context);

    /// Read log storage from filesystem starting from last_commited_log_index
    void init(uint64_t last_commited_log_index, uint64_t logs_to_keep);

    uint64_t start_index() const override;

    uint64_t next_slot() const override;

    /// return last entry from log
    nuraft::ptr<nuraft::log_entry> last_entry() const override;

    /// Append new entry to log
    uint64_t append(nuraft::ptr<nuraft::log_entry> & entry) override;

    /// Remove all entries starting from index and write entry into index position
    void write_at(uint64_t index, nuraft::ptr<nuraft::log_entry> & entry) override;

    /// Return entries between [start, end)
    nuraft::ptr<std::vector<nuraft::ptr<nuraft::log_entry>>> log_entries(uint64_t start, uint64_t end) override;

    /// Return entry at index
    nuraft::ptr<nuraft::log_entry> entry_at(uint64_t index) override;

    /// Term if the index
    uint64_t term_at(uint64_t index) override;

    /// Serialize entries in interval [index, index + cnt)
    nuraft::ptr<nuraft::buffer> pack(uint64_t index, int32_t cnt) override;

    /// Apply serialized entries starting from index
    void apply_pack(uint64_t index, nuraft::buffer & pack) override;

    /// Entries from last_log_index can be removed from memory and from disk
    bool compact(uint64_t last_log_index) override;

    /// Call fsync to the stored data
    bool flush() override;

    /// Stop background cleanup thread in change
    void shutdownChangelog();

    /// Flush logstore and call shutdown of background thread in changelog
    bool flushChangelogAndShutdown();

    /// Current log storage size
    uint64_t size() const;

    uint64_t last_durable_index() override;

    /// Flush batch of appended entries
    void end_of_append_batch(uint64_t start_index, uint64_t count) override;

    /// Get entry with latest config in logstore
    nuraft::ptr<nuraft::log_entry> getLatestConfigChange() const;

    void setRaftServer(const nuraft::ptr<nuraft::raft_server> & raft_server);

private:
    mutable std::mutex changelog_lock;
    Poco::Logger * log;
    Changelog changelog TSA_GUARDED_BY(changelog_lock);
};

}