aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/clickhouse/src/AggregateFunctions/UniqExactSet.h
blob: 06157405cc5358344e76567cf68adeea38698598 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
#pragma once

#include <exception>
#include <Common/CurrentThread.h>
#include <Common/HashTable/HashSet.h>
#include <Common/ThreadPool.h>
#include <Common/scope_guard_safe.h>
#include <Common/setThreadName.h>


namespace DB
{

template <typename SingleLevelSet, typename TwoLevelSet>
class UniqExactSet
{
    static_assert(std::is_same_v<typename SingleLevelSet::value_type, typename TwoLevelSet::value_type>);

public:
    using value_type = typename SingleLevelSet::value_type;

    template <typename Arg, bool use_single_level_hash_table = true>
    auto ALWAYS_INLINE insert(Arg && arg)
    {
        if constexpr (use_single_level_hash_table)
            asSingleLevel().insert(std::forward<Arg>(arg));
        else
            asTwoLevel().insert(std::forward<Arg>(arg));
    }

    /// In merge, if one of the lhs and rhs is twolevelset and the other is singlelevelset, then the singlelevelset will need to convertToTwoLevel().
    /// It's not in parallel and will cost extra large time if the thread_num is large.
    /// This method will convert all the SingleLevelSet to TwoLevelSet in parallel if the hashsets are not all singlelevel or not all twolevel.
    static void parallelizeMergePrepare(const std::vector<UniqExactSet *> & data_vec, ThreadPool & thread_pool)
    {
        unsigned long single_level_set_num = 0;
        unsigned long all_single_hash_size = 0;

        for (auto ele : data_vec)
        {
            if (ele->isSingleLevel())
                single_level_set_num ++;
        }

        if (single_level_set_num == data_vec.size())
        {
            for (auto ele : data_vec)
                all_single_hash_size += ele->size();
        }

        /// If all the hashtables are mixed by singleLevel and twoLevel, or all singleLevel (larger than 6000 for average value), they could be converted into
        /// twoLevel hashtables in parallel and then merge together. please refer to the following PR for more details.
        /// https://github.com/ClickHouse/ClickHouse/pull/50748
        /// https://github.com/ClickHouse/ClickHouse/pull/52973
        if ((single_level_set_num > 0 && single_level_set_num < data_vec.size()) || ((all_single_hash_size/data_vec.size()) > 6000))
        {
            try
            {
                auto data_vec_atomic_index = std::make_shared<std::atomic_uint32_t>(0);
                auto thread_func = [data_vec, data_vec_atomic_index, thread_group = CurrentThread::getGroup()]()
                {
                    SCOPE_EXIT_SAFE(
                        if (thread_group)
                            CurrentThread::detachFromGroupIfNotDetached();
                    );
                    if (thread_group)
                        CurrentThread::attachToGroupIfDetached(thread_group);

                    setThreadName("UniqExaConvert");

                    while (true)
                    {
                        const auto i = data_vec_atomic_index->fetch_add(1);
                        if (i >= data_vec.size())
                            return;
                        if (data_vec[i]->isSingleLevel())
                            data_vec[i]->convertToTwoLevel();
                    }
                };
                for (size_t i = 0; i < std::min<size_t>(thread_pool.getMaxThreads(), single_level_set_num); ++i)
                    thread_pool.scheduleOrThrowOnError(thread_func);

                thread_pool.wait();
            }
            catch (...)
            {
                thread_pool.wait();
                throw;
            }
        }
    }

    auto merge(const UniqExactSet & other, ThreadPool * thread_pool = nullptr)
    {
        if (isSingleLevel() && other.isTwoLevel())
            convertToTwoLevel();

        if (isSingleLevel())
        {
            asSingleLevel().merge(other.asSingleLevel());
        }
        else
        {
            auto & lhs = asTwoLevel();
            const auto rhs_ptr = other.getTwoLevelSet();
            const auto & rhs = *rhs_ptr;
            if (!thread_pool)
            {
                for (size_t i = 0; i < rhs.NUM_BUCKETS; ++i)
                    lhs.impls[i].merge(rhs.impls[i]);
            }
            else
            {
                try
                {
                    auto next_bucket_to_merge = std::make_shared<std::atomic_uint32_t>(0);

                    auto thread_func = [&lhs, &rhs, next_bucket_to_merge, thread_group = CurrentThread::getGroup()]()
                    {
                        SCOPE_EXIT_SAFE(
                            if (thread_group)
                                CurrentThread::detachFromGroupIfNotDetached();
                        );
                        if (thread_group)
                            CurrentThread::attachToGroupIfDetached(thread_group);
                        setThreadName("UniqExactMerger");

                        while (true)
                        {
                            const auto bucket = next_bucket_to_merge->fetch_add(1);
                            if (bucket >= rhs.NUM_BUCKETS)
                                return;
                            lhs.impls[bucket].merge(rhs.impls[bucket]);
                        }
                    };

                    for (size_t i = 0; i < std::min<size_t>(thread_pool->getMaxThreads(), rhs.NUM_BUCKETS); ++i)
                        thread_pool->scheduleOrThrowOnError(thread_func);
                    thread_pool->wait();
                }
                catch (...)
                {
                    thread_pool->wait();
                    throw;
                }
            }
        }
    }

    void read(ReadBuffer & in) { asSingleLevel().read(in); }

    void write(WriteBuffer & out) const
    {
        if (isSingleLevel())
            asSingleLevel().write(out);
        else
            /// We have to preserve compatibility with the old implementation that used only single level hash sets.
            asTwoLevel().writeAsSingleLevel(out);
    }

    size_t size() const { return isSingleLevel() ? asSingleLevel().size() : asTwoLevel().size(); }

    /// To convert set to two level before merging (we cannot just call convertToTwoLevel() on right hand side set, because it is declared const).
    std::shared_ptr<TwoLevelSet> getTwoLevelSet() const
    {
        return two_level_set ? two_level_set : std::make_shared<TwoLevelSet>(asSingleLevel());
    }

    void convertToTwoLevel()
    {
        two_level_set = getTwoLevelSet();
        single_level_set.clear();
    }

    bool isSingleLevel() const { return !two_level_set; }
    bool isTwoLevel() const { return !!two_level_set; }

private:
    SingleLevelSet & asSingleLevel() { return single_level_set; }
    const SingleLevelSet & asSingleLevel() const { return single_level_set; }

    TwoLevelSet & asTwoLevel() { return *two_level_set; }
    const TwoLevelSet & asTwoLevel() const { return *two_level_set; }

    SingleLevelSet single_level_set;
    std::shared_ptr<TwoLevelSet> two_level_set;
};
}