summaryrefslogtreecommitdiffstats
path: root/contrib/clickhouse/src/Functions/PerformanceAdaptors.h
blob: ef2c788bf43840da76cd655ede1853b1b10cc9a6 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
#pragma once

#include <Functions/IFunction.h>

#include <Common/TargetSpecific.h>
#include <Common/Stopwatch.h>
#include <Interpreters/Context.h>

#include <mutex>
#include <random>

/* This file contains helper class ImplementationSelector. It makes easier to combine
 * several implementations of IFunction/IExecutableFunctionImpl.
 */

namespace DB
{

namespace ErrorCodes
{
    extern const int NO_SUITABLE_FUNCTION_IMPLEMENTATION;
}

namespace detail
{
    class PerformanceStatistics
    {
    public:
        size_t select(bool considarable)
        {
            /// We don't need to choose/measure anything if there's only one variant.
            if (size() == 1)
                return 0;

            std::lock_guard guard(lock);

            size_t best = 0;
            double best_sample = data[0].sample(rng);

            for (size_t i = 1; i < data.size(); ++i)
            {
                double sample = data[i].sample(rng);
                if (sample < best_sample)
                {
                    best_sample = sample;
                    best = i;
                }
            }

            if (considarable)
                data[best].run();

            return best;
        }

        void complete(size_t id, double seconds, double bytes)
        {
            if (size() == 1)
                return;

            std::lock_guard guard(lock);
            data[id].complete(seconds, bytes);
        }

        size_t size() const
        {
            return data.size();
        }

        bool empty() const
        {
            return size() == 0;
        }

        void emplace_back() /// NOLINT
        {
            data.emplace_back();
        }

    private:
        struct Element
        {
            int completed_count = 0;
            int running_count = 0;
            double sum = 0;

            int adjustedCount() const
            {
                return completed_count - NUM_INVOCATIONS_TO_THROW_OFF;
            }

            double mean() const
            {
                return sum / adjustedCount();
            }

            /// For better convergence, we don't use proper estimate of stddev.
            /// We want to eventually separate between two algorithms even in case
            ///  when there is no statistical significant difference between them.
            double sigma() const
            {
                return mean() / sqrt(adjustedCount());
            }

            void run()
            {
                ++running_count;
            }

            void complete(double seconds, double bytes)
            {
                --running_count;
                ++completed_count;

                if (adjustedCount() > 0)
                    sum += seconds / bytes;
            }

            double sample(pcg64 & stat_rng) const
            {
                /// If there is a variant with not enough statistics, always choose it.
                /// And in that case prefer variant with less number of invocations.

                if (adjustedCount() < 2)
                    return adjustedCount() - 1 + running_count;
                return std::normal_distribution<>(mean(), sigma())(stat_rng);
            }
        };

        std::vector<Element> data;
        std::mutex lock;
        /// It's Ok that generator is not seeded.
        pcg64 rng;
        /// Cold invocations may be affected by additional memory latencies. Don't take first invocations into account.
        static constexpr int NUM_INVOCATIONS_TO_THROW_OFF = 2;
    };

    template <typename T, class = decltype(T::getImplementationTag())>
    std::true_type hasImplementationTagTest(const T&);
    std::false_type hasImplementationTagTest(...);

    template <typename T>
    constexpr bool has_implementation_tag = decltype(hasImplementationTagTest(std::declval<T>()))::value;

    /* Implementation tag is used to run specific implementation (for debug/testing purposes).
     * It can be specified via static method ::getImplementationTag() in Function (optional).
     */
    template <typename T>
    String getImplementationTag(TargetArch arch)
    {
        if constexpr (has_implementation_tag<T>)
            return toString(arch) + "_" + T::getImplementationTag();
        else
            return toString(arch);
    }
}

/* Class which is used to store implementations for the function and to select the best one to run
 * based on processor architecture and statistics from previous runs.
 *
 * FunctionInterface is typically IFunction or IExecutableFunctionImpl, but practically it can be
 * any interface that contains "execute" method (IFunction is an exception and is supported as well).
 *
 * Example of usage:
 *
 * class MyDefaulImpl : public IFunction {...};
 * DECLARE_AVX2_SPECIFIC_CODE(
 * class MyAVX2Impl : public IFunction {...};
 * )
 *
 * /// All methods but execute/executeImpl are usually not bottleneck, so just use them from
 * /// default implementation.
 * class MyFunction : public MyDefaultImpl
 * {
 *     MyFunction(ContextPtr context) : selector(context) {
 *         /// Register all implementations in constructor.
 *         /// There could be as many implementation for every target as you want.
 *         selector.registerImplementation<TargetArch::Default, MyDefaultImpl>();
 *     #if USE_MULTITARGET_CODE
 *         selector.registerImplementation<TargetArch::AVX2, TargetSpecific::AVX2::MyAVX2Impl>();
 *     #endif
 *     }
 *
 *     void executeImpl(...) override {
 *         selector.selectAndExecute(...);
 *     }
 *
 *     static FunctionPtr create(ContextPtr context) {
 *         return std::make_shared<MyFunction>(context);
 *     }
 * private:
 *     ImplementationSelector<IFunction> selector;
 * };
 */
template <typename FunctionInterface>
class ImplementationSelector : WithContext
{
public:
    using ImplementationPtr = std::shared_ptr<FunctionInterface>;

    explicit ImplementationSelector(ContextPtr context_) : WithContext(context_) {}

    /* Select the best implementation based on previous runs.
     * If FunctionInterface is IFunction, then "executeImpl" method of the implementation will be called
     * and "execute" otherwise.
     */
    ColumnPtr selectAndExecute(const ColumnsWithTypeAndName & arguments, const DataTypePtr & result_type, size_t input_rows_count) const
    {
        if (implementations.empty())
            throw Exception(ErrorCodes::NO_SUITABLE_FUNCTION_IMPLEMENTATION,
                            "There are no available implementations for function " "TODO(dakovalkov): add name");

        /// Statistics shouldn't rely on small columns.
        bool considerable = (input_rows_count > 1000);
        ColumnPtr res;

        size_t id = statistics.select(considerable);
        Stopwatch watch;

        if constexpr (std::is_same_v<FunctionInterface, IFunction>)
            res = implementations[id]->executeImpl(arguments, result_type, input_rows_count);
        else
            res = implementations[id]->execute(arguments, result_type, input_rows_count);

        watch.stop();

        if (considerable)
        {
            // TODO(dakovalkov): Calculate something more informative than rows count.
            statistics.complete(id, watch.elapsedSeconds(), input_rows_count);
        }

        return res;
    }

    /* Register new implementation for function.
     *
     * Arch - required instruction set for running the implementation. It's guaranteed that no method would
     * be called (even the constructor and static methods) if the processor doesn't support this instruction set.
     *
     * FunctionImpl - implementation, should be inherited from template argument FunctionInterface.
     *
     * All function arguments will be forwarded to the implementation constructor.
     */
    template <TargetArch Arch, typename FunctionImpl, typename ...Args>
    void registerImplementation(Args &&... args)
    {
        if (isArchSupported(Arch))
        {
            // TODO(dakovalkov): make this option better.
            const auto & choose_impl = getContext()->getSettingsRef().function_implementation.value;
            if (choose_impl.empty() || choose_impl == detail::getImplementationTag<FunctionImpl>(Arch))
            {
                implementations.emplace_back(std::make_shared<FunctionImpl>(std::forward<Args>(args)...));
                statistics.emplace_back();
            }
        }
    }

private:
    std::vector<ImplementationPtr> implementations;
    mutable detail::PerformanceStatistics statistics; /// It is protected by internal mutex.
};

}