aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/clickhouse/src/Access/AccessChangesNotifier.cpp
blob: b27dda8214231c76f56ff5c41cb3185a4d912d57 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
#include <Access/AccessChangesNotifier.h>
#include <boost/range/algorithm/copy.hpp>


namespace DB
{

AccessChangesNotifier::AccessChangesNotifier() : handlers(std::make_shared<Handlers>())
{
}

AccessChangesNotifier::~AccessChangesNotifier() = default;

void AccessChangesNotifier::onEntityAdded(const UUID & id, const AccessEntityPtr & new_entity)
{
    std::lock_guard lock{queue_mutex};
    Event event;
    event.id = id;
    event.entity = new_entity;
    event.type = new_entity->getType();
    queue.push(std::move(event));
}

void AccessChangesNotifier::onEntityUpdated(const UUID & id, const AccessEntityPtr & changed_entity)
{
    std::lock_guard lock{queue_mutex};
    Event event;
    event.id = id;
    event.entity = changed_entity;
    event.type = changed_entity->getType();
    queue.push(std::move(event));
}

void AccessChangesNotifier::onEntityRemoved(const UUID & id, AccessEntityType type)
{
    std::lock_guard lock{queue_mutex};
    Event event;
    event.id = id;
    event.type = type;
    queue.push(std::move(event));
}

scope_guard AccessChangesNotifier::subscribeForChanges(AccessEntityType type, const OnChangedHandler & handler)
{
    std::lock_guard lock{handlers->mutex};
    auto & list = handlers->by_type[static_cast<size_t>(type)];
    list.push_back(handler);
    auto handler_it = std::prev(list.end());

    return [my_handlers = handlers, type, handler_it]
    {
        std::lock_guard lock2{my_handlers->mutex};
        auto & list2 = my_handlers->by_type[static_cast<size_t>(type)];
        list2.erase(handler_it);
    };
}

scope_guard AccessChangesNotifier::subscribeForChanges(const UUID & id, const OnChangedHandler & handler)
{
    std::lock_guard lock{handlers->mutex};
    auto it = handlers->by_id.emplace(id, std::list<OnChangedHandler>{}).first;
    auto & list = it->second;
    list.push_back(handler);
    auto handler_it = std::prev(list.end());

    return [my_handlers = handlers, it, handler_it]
    {
        std::lock_guard lock2{my_handlers->mutex};
        auto & list2 = it->second;
        list2.erase(handler_it);
        if (list2.empty())
            my_handlers->by_id.erase(it);
    };
}


scope_guard AccessChangesNotifier::subscribeForChanges(const std::vector<UUID> & ids, const OnChangedHandler & handler)
{
    scope_guard subscriptions;
    for (const auto & id : ids)
        subscriptions.join(subscribeForChanges(id, handler));
    return subscriptions;
}

void AccessChangesNotifier::sendNotifications()
{
    /// Only one thread can send notification at any time.
    std::lock_guard sending_notifications_lock{sending_notifications};

    std::unique_lock queue_lock{queue_mutex};
    while (!queue.empty())
    {
        auto event = std::move(queue.front());
        queue.pop();
        queue_lock.unlock();

        std::vector<OnChangedHandler> current_handlers;
        {
            std::lock_guard handlers_lock{handlers->mutex};
            boost::range::copy(handlers->by_type[static_cast<size_t>(event.type)], std::back_inserter(current_handlers));
            auto it = handlers->by_id.find(event.id);
            if (it != handlers->by_id.end())
                boost::range::copy(it->second, std::back_inserter(current_handlers));
        }

        for (const auto & handler : current_handlers)
        {
            try
            {
                handler(event.id, event.entity);
            }
            catch (...)
            {
                tryLogCurrentException(__PRETTY_FUNCTION__);
            }
        }

        queue_lock.lock();
    }
}

}