aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/clickhouse/src/Dictionaries/CacheDictionaryUpdateQueue.h
blob: 8d0581d2052062526360eeaf7c42d27151729e88 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
#pragma once

#include <atomic>
#include <mutex>
#include <utility>
#include <vector>
#include <functional>

#include <Common/ThreadPool.h>
#include <Common/ConcurrentBoundedQueue.h>
#include <Common/CurrentMetrics.h>
#include <Common/PODArray.h>
#include <Common/HashTable/HashMap.h>
#include <Columns/IColumn.h>
#include <Dictionaries/ICacheDictionaryStorage.h>

namespace CurrentMetrics
{
    extern const Metric CacheDictionaryUpdateQueueBatches;
    extern const Metric CacheDictionaryUpdateQueueKeys;
}

namespace DB
{

/** This class is passed between update queue and update queue client during update.

    For simple keys we pass simple keys.

    For complex keys we pass complex keys columns and requested rows to update.

    During update cache dictionary should fill requested_keys_to_fetched_columns_during_update_index and
    fetched_columns_during_update.

    For complex key to extend lifetime of key complex key arena should be used.
*/
template <DictionaryKeyType dictionary_key_type>
class CacheDictionaryUpdateUnit
{
public:
    using KeyType = std::conditional_t<dictionary_key_type == DictionaryKeyType::Simple, UInt64, StringRef>;

    /// Constructor for complex keys update request
    explicit CacheDictionaryUpdateUnit(
        const Columns & key_columns_,
        const PaddedPODArray<KeyState> & key_index_to_state_from_storage_,
        const DictionaryStorageFetchRequest & request_,
        size_t keys_to_update_size_)
        : key_columns(key_columns_)
        , key_index_to_state(key_index_to_state_from_storage_.begin(), key_index_to_state_from_storage_.end())
        , request(request_)
        , keys_to_update_size(keys_to_update_size_)
        , alive_keys(CurrentMetrics::CacheDictionaryUpdateQueueKeys, keys_to_update_size)
    {}

    CacheDictionaryUpdateUnit()
        : keys_to_update_size(0)
        , alive_keys(CurrentMetrics::CacheDictionaryUpdateQueueKeys, 0)
    {}

    const Columns key_columns;
    const PaddedPODArray<KeyState> key_index_to_state;
    const DictionaryStorageFetchRequest request;
    const size_t keys_to_update_size;

    HashMap<KeyType, size_t> requested_keys_to_fetched_columns_during_update_index;
    MutableColumns fetched_columns_during_update;

    /// Complex keys are serialized in this arena
    DictionaryKeysArenaHolder<dictionary_key_type> complex_keys_arena_holder;

private:
    template <DictionaryKeyType>
    friend class CacheDictionaryUpdateQueue;

    mutable std::mutex update_mutex;
    mutable std::condition_variable is_update_finished;

    bool is_done{false};
    std::exception_ptr current_exception{nullptr}; /// NOLINT

    /// While UpdateUnit is alive, it is accounted in update_queue size.
    CurrentMetrics::Increment alive_batch{CurrentMetrics::CacheDictionaryUpdateQueueBatches};
    CurrentMetrics::Increment alive_keys;
};

template <DictionaryKeyType dictionary_key_type>
using CacheDictionaryUpdateUnitPtr = std::shared_ptr<CacheDictionaryUpdateUnit<dictionary_key_type>>;

extern template class CacheDictionaryUpdateUnit<DictionaryKeyType::Simple>;
extern template class CacheDictionaryUpdateUnit<DictionaryKeyType::Complex>;

struct CacheDictionaryUpdateQueueConfiguration
{
    /// Size of update queue
    const size_t max_update_queue_size;
    /// Size in thead pool of update queue
    const size_t max_threads_for_updates;
    /// Timeout for trying to push update unit into queue
    const size_t update_queue_push_timeout_milliseconds;
    /// Timeout during sync waititing of update unit
    const size_t query_wait_timeout_milliseconds;
};

/** Responsibility of this class is to provide asynchronous and synchronous update support for CacheDictionary

    It is responsibility of CacheDictionary to perform update with UpdateUnit using UpdateFunction.
*/
template <DictionaryKeyType dictionary_key_type>
class CacheDictionaryUpdateQueue
{
public:
    /// Client of update queue must provide this function in constructor and perform update using update unit.
    using UpdateFunction = std::function<void (CacheDictionaryUpdateUnitPtr<dictionary_key_type>)>;

    CacheDictionaryUpdateQueue(
        String dictionary_name_for_logs_,
        CacheDictionaryUpdateQueueConfiguration configuration_,
        UpdateFunction && update_func_);

    ~CacheDictionaryUpdateQueue();

    /// Get configuration that was passed to constructor
    const CacheDictionaryUpdateQueueConfiguration & getConfiguration() const { return configuration; }

    /// Is queue finished
    bool isFinished() const { return update_queue.isFinished(); }

    /// Synchronous wait for update queue to stop
    void stopAndWait();

    /** Try to add update unit into queue.

        If queue is full and oush cannot be performed in update_queue_push_timeout_milliseconds from configuration
        an exception will be thrown.

        If queue already finished an exception will be thrown.
    */
    void tryPushToUpdateQueueOrThrow(CacheDictionaryUpdateUnitPtr<dictionary_key_type> & update_unit_ptr);

    /** Try to synchronously wait for update completion.

        If exception was passed from update function during update it will be rethrowed.

        If update will not be finished in query_wait_timeout_milliseconds from configuration
        an exception will be thrown.

        If queue already finished an exception will be thrown.
    */
    void waitForCurrentUpdateFinish(CacheDictionaryUpdateUnitPtr<dictionary_key_type> & update_unit_ptr) const;

private:
    void updateThreadFunction();

    using UpdateQueue = ConcurrentBoundedQueue<CacheDictionaryUpdateUnitPtr<dictionary_key_type>>;

    String dictionary_name_for_logs;

    CacheDictionaryUpdateQueueConfiguration configuration;
    UpdateFunction update_func;

    UpdateQueue update_queue;
    ThreadPool update_pool;
};

extern template class CacheDictionaryUpdateQueue<DictionaryKeyType::Simple>;
extern template class CacheDictionaryUpdateQueue<DictionaryKeyType::Complex>;

}