aboutsummaryrefslogtreecommitdiffstats
path: root/yql/essentials/minikql/computation/mkql_key_payload_value_lru_cache.h
blob: e8f8eb7d1598937e921a37fc85cf4edd0653e507 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
#pragma once

#include <yql/essentials/public/udf/udf_value.h>
#include <yql/essentials/minikql/mkql_node.h>
#include <yql/essentials/minikql/computation/mkql_computation_node_holders.h>
#include <util/generic/string.h>
#include <unordered_map>
#include <list>
#include <chrono>

namespace NKikimr::NMiniKQL {

//Design notes:
//  Commodity LRUCache with HashMap and DoubleLinkedList. Key(that is TUnboxedValue) is stored in both HashMap and List
//  Lazy(postponed) TTL implementation. An entry is not deleted immediately when its TTL is expired, the deletion is postponed until:
//   - when it is accessed via Get()
//   - when it is not accessed for a long period of time and becomes the LRU. Then it is garbage collected in Tick()
//  Not thread safe
//  Never requests system time, expects monotonically increased time points in methods argument
class TUnboxedKeyValueLruCacheWithTtl {
    struct TEntry {
        TEntry(NUdf::TUnboxedValue key, NUdf::TUnboxedValue value, std::chrono::time_point<std::chrono::steady_clock> expiration)
            : Key(std::move(key))
            , Value(std::move(value))
            , Expiration(std::move(expiration))
        {}
        NUdf::TUnboxedValue Key;
        NUdf::TUnboxedValue Value;
        std::chrono::time_point<std::chrono::steady_clock> Expiration;
    };
    using TUsageList = std::list<TEntry>;

public:
    TUnboxedKeyValueLruCacheWithTtl(size_t maxSize, const NKikimr::NMiniKQL::TType* keyType)
        : MaxSize(maxSize)
        , KeyTypeHelper(keyType)
        , Map(
            1000,
            KeyTypeHelper.GetValueHash(),
            KeyTypeHelper.GetValueEqual()
        )
    {
        Y_ABORT_UNLESS(MaxSize > 0);
    }
    TUnboxedKeyValueLruCacheWithTtl(const TUnboxedKeyValueLruCacheWithTtl&) = delete; //to prevent unintentional copy of a large object

    void Update(NUdf::TUnboxedValue&& key, NUdf::TUnboxedValue&& value, std::chrono::time_point<std::chrono::steady_clock>&& expiration) {
        if (auto it = Map.find(key); it != Map.end()) {
            Touch(it->second);
            auto& entry = *it->second;
            entry.Value = std::move(value);
            entry.Expiration = std::move(expiration);
        } else {
            if (Map.size() == MaxSize) {
                RemoveLeastRecentlyUsedEntry();
            }
            UsageList.emplace_back(key, std::move(value), std::move(expiration));
            Map.emplace_hint(it, std::move(key), --UsageList.end());
        }
    }

    std::optional<NUdf::TUnboxedValue> Get(const NUdf::TUnboxedValue key, const std::chrono::time_point<std::chrono::steady_clock>& now) {
        if (auto it = Map.find(key); it != Map.end()) {
            if (now < it->second->Expiration) {
                Touch(it->second);
                return it->second->Value;
            } else {
                UsageList.erase(it->second);
                Map.erase(it);
                return std::nullopt;
            }
        }
        return std::nullopt;
    }

    // Perform garbage collection, single step, O(1) time.
    // Must be called periodically
    bool Tick(const std::chrono::time_point<std::chrono::steady_clock>& now) {
        if (UsageList.empty()) {
            return false;
        }
        if (now < UsageList.front().Expiration) {
            return false;
        }
        RemoveLeastRecentlyUsedEntry();
        return true;
    }

    // Perform garbage collection, O(1) amortized, but O(n) one-time
    void Prune(const std::chrono::time_point<std::chrono::steady_clock>& now) {
        while (Tick(now)) {
        }
    }

    size_t Size() const {
        Y_ABORT_UNLESS(Map.size() == UsageList.size());
        return Map.size();
    }
private:
    struct TKeyTypeHelpers {
            TKeyTypes KeyTypes;
            bool IsTuple;
            NUdf::IHash::TPtr Hash;
            NUdf::IEquate::TPtr Equate;
    };

    void Touch(TUsageList::iterator it) {
        UsageList.splice(UsageList.end(), UsageList, it); //move accessed element to the end of Usage list
    }
    void RemoveLeastRecentlyUsedEntry() {
        Map.erase(UsageList.front().Key);
        UsageList.pop_front();
    }
private:
    const size_t MaxSize;
    TUsageList UsageList;
    const TKeyTypeContanerHelper<true, true, false> KeyTypeHelper;
    std::unordered_map<
        NUdf::TUnboxedValue, 
        TUsageList::iterator,
        TValueHasher,
        TValueEqual,
        NKikimr::NMiniKQL::TMKQLAllocator<std::pair<const NUdf::TUnboxedValue, TUsageList::iterator>>
    > Map;

};

} //namespace NKikimr::NMiniKQL