aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/clickhouse/src/Interpreters/AsynchronousInsertQueue.h
blob: 577752af45a03f4072affcb23f6fda387d7b14bd (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
#pragma once

#include <Core/Settings.h>
#include <Parsers/IAST_fwd.h>
#include <Poco/Logger.h>
#include <Common/CurrentThread.h>
#include <Common/MemoryTrackerSwitcher.h>
#include <Common/ThreadPool.h>

#include <future>

namespace DB
{

/// A queue, that stores data for insert queries and periodically flushes it to tables.
/// The data is grouped by table, format and settings of insert query.
class AsynchronousInsertQueue : public WithContext
{
public:
    using Milliseconds = std::chrono::milliseconds;

    AsynchronousInsertQueue(ContextPtr context_, size_t pool_size_, bool flush_on_shutdown_);
    ~AsynchronousInsertQueue();

    struct PushResult
    {
        enum Status
        {
            OK,
            TOO_MUCH_DATA,
        };

        Status status;

        /// Future that allows to wait until the query is flushed.
        std::future<void> future;

        /// Read buffer that contains extracted
        /// from query data in case of too much data.
        std::unique_ptr<ReadBuffer> insert_data_buffer;
    };

    /// Force flush the whole queue.
    void flushAll();
    PushResult push(ASTPtr query, ContextPtr query_context);
    size_t getPoolSize() const { return pool_size; }

private:

    struct InsertQuery
    {
    public:
        ASTPtr query;
        String query_str;
        Settings settings;
        UInt128 hash;

        InsertQuery(const ASTPtr & query_, const Settings & settings_);
        InsertQuery(const InsertQuery & other);
        InsertQuery & operator=(const InsertQuery & other);
        bool operator==(const InsertQuery & other) const;

    private:
        UInt128 calculateHash() const;
    };

    struct InsertData
    {
        struct Entry
        {
        public:
            String bytes;
            const String query_id;
            const String async_dedup_token;
            MemoryTracker * const user_memory_tracker;
            const std::chrono::time_point<std::chrono::system_clock> create_time;

            Entry(String && bytes_, String && query_id_, const String & async_dedup_token, MemoryTracker * user_memory_tracker_);

            void finish(std::exception_ptr exception_ = nullptr);
            std::future<void> getFuture() { return promise.get_future(); }
            bool isFinished() const { return finished; }

        private:
            std::promise<void> promise;
            std::atomic_bool finished = false;
        };

        ~InsertData()
        {
            auto it = entries.begin();
            // Entries must be destroyed in context of user who runs async insert.
            // Each entry in the list may correspond to a different user,
            // so we need to switch current thread's MemoryTracker parent on each iteration.
            while (it != entries.end())
            {
                MemoryTrackerSwitcher switcher((*it)->user_memory_tracker);
                it = entries.erase(it);
            }
        }

        using EntryPtr = std::shared_ptr<Entry>;

        std::list<EntryPtr> entries;
        size_t size_in_bytes = 0;
    };

    using InsertDataPtr = std::unique_ptr<InsertData>;

    struct Container
    {
        InsertQuery key;
        InsertDataPtr data;
    };

    /// Ordered container
    /// Key is a timestamp of the first insert into batch.
    /// Used to detect for how long the batch is active, so we can dump it by timer.
    using Queue = std::map<std::chrono::steady_clock::time_point, Container>;
    using QueueIterator = Queue::iterator;
    using QueueIteratorByKey = std::unordered_map<UInt128, QueueIterator>;

    struct QueueShard
    {
        mutable std::mutex mutex;
        mutable std::condition_variable are_tasks_available;

        Queue queue;
        QueueIteratorByKey iterators;
    };

    const size_t pool_size;
    const bool flush_on_shutdown;

    std::vector<QueueShard> queue_shards;

    /// Logic and events behind queue are as follows:
    ///  - async_insert_busy_timeout_ms:
    ///   if queue is active for too long and there are a lot of rapid inserts, then we dump the data, so it doesn't
    ///   grow for a long period of time and users will be able to select new data in deterministic manner.
    ///
    /// During processing incoming INSERT queries we can also check whether the maximum size of data in buffer is reached
    /// (async_insert_max_data_size setting). If so, then again we dump the data.

    std::atomic<bool> shutdown{false};
    std::atomic<bool> flush_stopped{false};

    /// A mutex that prevents concurrent forced flushes of queue.
    mutable std::mutex flush_mutex;

    /// Dump the data only inside this pool.
    ThreadPool pool;

    /// Uses async_insert_busy_timeout_ms and processBatchDeadlines()
    std::vector<ThreadFromGlobalPool> dump_by_first_update_threads;

    Poco::Logger * log = &Poco::Logger::get("AsynchronousInsertQueue");

    void processBatchDeadlines(size_t shard_num);
    void scheduleDataProcessingJob(const InsertQuery & key, InsertDataPtr data, ContextPtr global_context);

    static void processData(InsertQuery key, InsertDataPtr data, ContextPtr global_context);

    template <typename E>
    static void finishWithException(const ASTPtr & query, const std::list<InsertData::EntryPtr> & entries, const E & exception);

public:
    auto getQueueLocked(size_t shard_num) const
    {
        auto & shard = queue_shards[shard_num];
        std::unique_lock lock(shard.mutex);
        return std::make_pair(std::ref(shard.queue), std::move(lock));
    }
};

}