aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/clickhouse/src/Functions/sleep.h
blob: fba8293e5ffe7f8366444e011c066ae7a9f961fe (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
#pragma once
#include <unistd.h>
#include <Functions/IFunction.h>
#include <Functions/FunctionHelpers.h>
#include <Columns/ColumnConst.h>
#include <DataTypes/DataTypesNumber.h>
#include <Common/FieldVisitorConvertToNumber.h>
#include <Common/ProfileEvents.h>
#include <Common/assert_cast.h>
#include <base/sleep.h>
#include <IO/WriteHelpers.h>
#include <Interpreters/Context.h>


namespace ProfileEvents
{
extern const Event SleepFunctionCalls;
extern const Event SleepFunctionMicroseconds;
}

namespace DB
{

namespace ErrorCodes
{
    extern const int ILLEGAL_TYPE_OF_ARGUMENT;
    extern const int TOO_SLOW;
    extern const int ILLEGAL_COLUMN;
    extern const int BAD_ARGUMENTS;
}

/** sleep(seconds) - the specified number of seconds sleeps each columns.
  */

enum class FunctionSleepVariant
{
    PerBlock,
    PerRow
};

template <FunctionSleepVariant variant>
class FunctionSleep : public IFunction
{
private:
    UInt64 max_microseconds;
public:
    static constexpr auto name = variant == FunctionSleepVariant::PerBlock ? "sleep" : "sleepEachRow";
    static FunctionPtr create(ContextPtr context)
    {
        return std::make_shared<FunctionSleep<variant>>(context->getSettingsRef().function_sleep_max_microseconds_per_block);
    }

    FunctionSleep(UInt64 max_microseconds_) : max_microseconds(max_microseconds_)
    {
    }

    /// Get the name of the function.
    String getName() const override
    {
        return name;
    }

    /// Do not sleep during query analysis.
    bool isSuitableForConstantFolding() const override
    {
        return false;
    }

    size_t getNumberOfArguments() const override
    {
        return 1;
    }

    bool isSuitableForShortCircuitArgumentsExecution(const DataTypesWithConstInfo & /*arguments*/) const override { return true; }

    DataTypePtr getReturnTypeImpl(const DataTypes & arguments) const override
    {
        WhichDataType which(arguments[0]);

        if (!which.isFloat()
            && !which.isNativeUInt())
            throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT, "Illegal type {} of argument of function {}, expected Float64",
                arguments[0]->getName(), getName());

        return std::make_shared<DataTypeUInt8>();
    }
    ColumnPtr executeImplDryRun(const ColumnsWithTypeAndName & arguments, const DataTypePtr & result_type, size_t /*input_rows_count*/) const override
    {
        return execute(arguments, result_type, true);
    }

    ColumnPtr executeImpl(const ColumnsWithTypeAndName & arguments, const DataTypePtr & result_type, size_t /*input_rows_count*/) const override
    {
        return execute(arguments, result_type, false);
    }

    ColumnPtr execute(const ColumnsWithTypeAndName & arguments, const DataTypePtr & result_type, bool dry_run) const
    {
        const IColumn * col = arguments[0].column.get();

        if (!isColumnConst(*col))
            throw Exception(ErrorCodes::ILLEGAL_COLUMN, "The argument of function {} must be constant.", getName());

        Float64 seconds = applyVisitor(FieldVisitorConvertToNumber<Float64>(), assert_cast<const ColumnConst &>(*col).getField());

        if (seconds < 0 || !std::isfinite(seconds))
            throw Exception(ErrorCodes::BAD_ARGUMENTS, "Cannot sleep infinite or negative amount of time (not implemented)");

        size_t size = col->size();

        /// We do not sleep if the columns is empty.
        if (size > 0)
        {
            /// When sleeping, the query cannot be cancelled. For ability to cancel query, we limit sleep time.
            if (max_microseconds && seconds * 1e6 > max_microseconds)
                throw Exception(ErrorCodes::TOO_SLOW, "The maximum sleep time is {} microseconds. Requested: {}", max_microseconds, seconds);

            if (!dry_run)
            {
                UInt64 count = (variant == FunctionSleepVariant::PerBlock ? 1 : size);
                UInt64 microseconds = static_cast<UInt64>(seconds * count * 1e6);

                if (max_microseconds && microseconds > max_microseconds)
                    throw Exception(ErrorCodes::TOO_SLOW,
                        "The maximum sleep time is {} microseconds. Requested: {} microseconds per block (of size {})",
                        max_microseconds, microseconds, size);

                sleepForMicroseconds(microseconds);
                ProfileEvents::increment(ProfileEvents::SleepFunctionCalls, count);
                ProfileEvents::increment(ProfileEvents::SleepFunctionMicroseconds, microseconds);
            }
        }

        /// convertToFullColumn needed, because otherwise (constant expression case) function will not get called on each columns.
        return result_type->createColumnConst(size, 0u)->convertToFullColumnIfConst();
    }
};

}