aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/clickhouse/src/Access/ReplicatedAccessStorage.h
blob: cddb20860f78f23be924eafbd34dc5155abf23e1 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
#pragma once

#include <atomic>

#include <Common/ThreadPool_fwd.h>
#include <Common/ZooKeeper/Common.h>
#include <Common/ZooKeeper/ZooKeeper.h>
#include <Common/ConcurrentBoundedQueue.h>

#include <Access/MemoryAccessStorage.h>


namespace DB
{
class AccessChangesNotifier;

/// Implementation of IAccessStorage which keeps all data in zookeeper.
class ReplicatedAccessStorage : public IAccessStorage
{
public:
    static constexpr char STORAGE_TYPE[] = "replicated";

    ReplicatedAccessStorage(const String & storage_name, const String & zookeeper_path, zkutil::GetZooKeeper get_zookeeper, AccessChangesNotifier & changes_notifier_, bool allow_backup);
    ~ReplicatedAccessStorage() override;

    const char * getStorageType() const override { return STORAGE_TYPE; }

    void startPeriodicReloading() override { startWatchingThread(); }
    void stopPeriodicReloading() override { stopWatchingThread(); }
    void reload(ReloadMode reload_mode) override;

    bool exists(const UUID & id) const override;

    bool isBackupAllowed() const override { return backup_allowed; }
    void backup(BackupEntriesCollector & backup_entries_collector, const String & data_path_in_backup, AccessEntityType type) const override;
    void restoreFromBackup(RestorerFromBackup & restorer) override;

private:
    String zookeeper_path;
    const zkutil::GetZooKeeper get_zookeeper;

    zkutil::ZooKeeperPtr cached_zookeeper TSA_GUARDED_BY(cached_zookeeper_mutex);
    std::mutex cached_zookeeper_mutex;

    std::atomic<bool> watching = false;
    std::unique_ptr<ThreadFromGlobalPool> watching_thread;
    std::shared_ptr<ConcurrentBoundedQueue<UUID>> watched_queue;

    bool insertImpl(const UUID & id, const AccessEntityPtr & new_entity, bool replace_if_exists, bool throw_if_exists) override;
    bool removeImpl(const UUID & id, bool throw_if_not_exists) override;
    bool updateImpl(const UUID & id, const UpdateFunc & update_func, bool throw_if_not_exists) override;

    bool insertZooKeeper(const zkutil::ZooKeeperPtr & zookeeper, const UUID & id, const AccessEntityPtr & entity, bool replace_if_exists, bool throw_if_exists);
    bool removeZooKeeper(const zkutil::ZooKeeperPtr & zookeeper, const UUID & id, bool throw_if_not_exists);
    bool updateZooKeeper(const zkutil::ZooKeeperPtr & zookeeper, const UUID & id, const UpdateFunc & update_func, bool throw_if_not_exists);

    void initZooKeeperWithRetries(size_t max_retries);
    void initZooKeeperIfNeeded();
    zkutil::ZooKeeperPtr getZooKeeper();
    zkutil::ZooKeeperPtr getZooKeeperNoLock() TSA_REQUIRES(cached_zookeeper_mutex);
    void createRootNodes(const zkutil::ZooKeeperPtr & zookeeper);

    void startWatchingThread();
    void stopWatchingThread();

    void runWatchingThread();
    void resetAfterError();

    bool refresh();
    void refreshEntities(const zkutil::ZooKeeperPtr & zookeeper, bool all);
    void refreshEntity(const zkutil::ZooKeeperPtr & zookeeper, const UUID & id);
    void refreshEntityNoLock(const zkutil::ZooKeeperPtr & zookeeper, const UUID & id) TSA_REQUIRES(mutex);

    AccessEntityPtr tryReadEntityFromZooKeeper(const zkutil::ZooKeeperPtr & zookeeper, const UUID & id) const;
    void setEntityNoLock(const UUID & id, const AccessEntityPtr & entity) TSA_REQUIRES(mutex);
    void removeEntityNoLock(const UUID & id) TSA_REQUIRES(mutex);

    std::optional<UUID> findImpl(AccessEntityType type, const String & name) const override;
    std::vector<UUID> findAllImpl(AccessEntityType type) const override;
    AccessEntityPtr readImpl(const UUID & id, bool throw_if_not_exists) const override;

    mutable std::mutex mutex;
    MemoryAccessStorage memory_storage TSA_GUARDED_BY(mutex);
    AccessChangesNotifier & changes_notifier;
    const bool backup_allowed = false;
};
}