aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/clickhouse/src/Coordination/SessionExpiryQueue.h
blob: 862ec35e2f6c5762152a1bad28eb4dc28ec06242 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
#pragma once
#include <map>
#include <unordered_map>
#include <unordered_set>
#include <vector>
#include <chrono>

namespace DB
{

/// Simple class for checking expired sessions. Main idea -- to round sessions
/// timeouts and place all sessions into buckets rounded by their expired time.
/// So we will have not too many different buckets and can check expired
/// sessions quite fast.
/// So buckets looks like this:
/// [1630580418000] -> {1, 5, 6}
/// [1630580418500] -> {2, 3}
/// ...
/// When new session appear it's added to the existing bucket or create new bucket.
class SessionExpiryQueue
{
private:
    /// Session -> timeout ms
    std::unordered_map<int64_t, int64_t> session_to_expiration_time;

    /// Expire time -> session expire near this time
    std::map<int64_t, std::unordered_set<int64_t>> expiry_to_sessions;

    int64_t expiration_interval;

    static int64_t getNowMilliseconds()
    {
        using namespace std::chrono;
        return duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
    }

    /// Round time to the next expiration interval. The result used as a key for
    /// expiry_to_sessions map.
    int64_t roundToNextInterval(int64_t time) const
    {
        return (time / expiration_interval + 1) * expiration_interval;
    }

public:
    /// expiration_interval -- how often we will check new sessions and how small
    /// buckets we will have. In ZooKeeper normal session timeout is around 30 seconds
    /// and expiration_interval is about 500ms.
    explicit SessionExpiryQueue(int64_t expiration_interval_)
        : expiration_interval(expiration_interval_)
    {
    }

    /// Session was actually removed
    bool remove(int64_t session_id);

    /// Update session expiry time (must be called on heartbeats)
    void addNewSessionOrUpdate(int64_t session_id, int64_t timeout_ms);

    /// Get all expired sessions
    std::vector<int64_t> getExpiredSessions() const;

    void clear();
};

}