aboutsummaryrefslogtreecommitdiffstats
path: root/yt/yt/core/misc/async_expiring_cache.h
blob: 2031c6e31fb5d2a61558d019570b4d5a08ca5343 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
#pragma once

#include "public.h"
#include "cache_config.h"

#include <type_traits>
#include <yt/yt/core/actions/future.h>

#include <yt/yt/core/logging/log.h>

#include <yt/yt/library/profiling/sensor.h>

#include <library/cpp/yt/threading/spin_lock.h>

#include <atomic>

namespace NYT {

////////////////////////////////////////////////////////////////////////////////

template <class TKey, class TValue>
class TAsyncExpiringCache
    : public virtual TRefCounted
{
public:
    using KeyType = TKey;
    using ValueType = TValue;

    struct TExtendedGetResult
    {
        TFuture<TValue> Future;
        bool RequestInitialized;
    };

    explicit TAsyncExpiringCache(
        TAsyncExpiringCacheConfigPtr config,
        NLogging::TLogger logger = {},
        NProfiling::TProfiler profiler = {});

    TFuture<TValue> Get(const TKey& key);
    TExtendedGetResult GetExtended(const TKey& key);
    TFuture<std::vector<TErrorOr<TValue>>> GetMany(const std::vector<TKey>& keys);

    std::optional<TErrorOr<TValue>> Find(const TKey& key);
    std::vector<std::optional<TErrorOr<TValue>>> FindMany(const std::vector<TKey>& keys);

    //! InvalidateActive removes key from the cache, if it's value is currently set.
    void InvalidateActive(const TKey& key);

    //! InvalidateValue removes key from the cache, if it's value is equal to provided.
    template <class T>
    void InvalidateValue(const TKey& key, const T& value);

    //! ForceRefresh marks current value as outdated, forcing value update.
    template <class T>
    void ForceRefresh(const TKey& key, const T& value);

    void Set(const TKey& key, TErrorOr<TValue> valueOrError);

    void Clear();

    void Reconfigure(TAsyncExpiringCacheConfigPtr config);

    enum EUpdateReason
    {
        InitialFetch,
        PeriodicUpdate,
        ForcedUpdate,
    };

protected:
    TAsyncExpiringCacheConfigPtr GetConfig() const;

    virtual TFuture<TValue> DoGet(
        const TKey& key,
        bool isPeriodicUpdate) noexcept = 0;

    virtual TFuture<TValue> DoGet(
        const TKey& key,
        const TErrorOr<TValue>* oldValue,
        EUpdateReason reason) noexcept;

    virtual TFuture<std::vector<TErrorOr<TValue>>> DoGetMany(
        const std::vector<TKey>& keys,
        bool isPeriodicUpdate) noexcept;

    //! Called under write lock.
    virtual void OnAdded(const TKey& key) noexcept;

    //! Called under write lock.
    virtual void OnRemoved(const TKey& key) noexcept;

    virtual bool CanCacheError(const TError& error) noexcept;

    //! Ping resets refresh timer period and behaves like successful entry update.
    void Ping(const TKey& key);

private:
    const NLogging::TLogger Logger_;

    struct TEntry
        : public TRefCounted
    {
        //! When this entry must be evicted with respect to access timeout.
        std::atomic<NProfiling::TCpuInstant> AccessDeadline;

        //! When this entry must be evicted with respect to update timeout.
        NProfiling::TCpuInstant UpdateDeadline;

        //! Some latest known value (possibly not yet set).
        TPromise<TValue> Promise;

        //! Uncancelable version of #Promise.
        TFuture<TValue> Future;

        //! Corresponds to a future probation request.
        NConcurrency::TDelayedExecutorCookie ProbationCookie;

        //! Constructs a fresh entry.
        explicit TEntry(NProfiling::TCpuInstant accessDeadline);

        //! Check that entry is expired with respect to either access or update.
        bool IsExpired(NProfiling::TCpuInstant now) const;
    };

    using TEntryPtr = TIntrusivePtr<TEntry>;

    YT_DECLARE_SPIN_LOCK(NThreading::TReaderWriterSpinLock, SpinLock_);
    THashMap<TKey, TEntryPtr> Map_;
    TAsyncExpiringCacheConfigPtr Config_;

    NProfiling::TCounter HitCounter_;
    NProfiling::TCounter MissedCounter_;
    NProfiling::TGauge SizeCounter_;

    void SetResult(
        const TWeakPtr<TEntry>& entry,
        const TKey& key,
        const TErrorOr<TValue>& valueOrError,
        bool isPeriodicUpdate);

    void InvokeGetMany(
        const std::vector<TWeakPtr<TEntry>>& entries,
        const std::vector<TKey>& keys,
        std::optional<TDuration> periodicRefreshTime);

    void InvokeGet(
        const TEntryPtr& entry,
        const TKey& key);

    bool TryEraseExpired(
        const TEntryPtr& Entry,
        const TKey& key);

    void UpdateAll();

    void ScheduleEntryRefresh(
        const TEntryPtr& entry,
        const TKey& key,
        std::optional<TDuration> refreshTime);

    TPromise<TValue> GetPromise(const TEntryPtr& entry) noexcept;

    const TAsyncExpiringCacheConfigPtr& Config() const;
};

////////////////////////////////////////////////////////////////////////////////

} // namespace NYT

#define EXPIRING_CACHE_INL_H_
#include "async_expiring_cache-inl.h"
#undef EXPIRING_CACHE_INL_H_