aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/clickhouse/src/IO/SharedThreadPools.cpp
blob: 6a0e953f0efe5851136ad148b9074c2e8f313a76 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
#include <IO/SharedThreadPools.h>
#include <Common/CurrentMetrics.h>
#include <Common/ThreadPool.h>
#include <Core/Field.h>

namespace CurrentMetrics
{
    extern const Metric IOThreads;
    extern const Metric IOThreadsActive;
    extern const Metric BackupsIOThreads;
    extern const Metric BackupsIOThreadsActive;
    extern const Metric MergeTreePartsLoaderThreads;
    extern const Metric MergeTreePartsLoaderThreadsActive;
    extern const Metric MergeTreePartsCleanerThreads;
    extern const Metric MergeTreePartsCleanerThreadsActive;
    extern const Metric MergeTreeOutdatedPartsLoaderThreads;
    extern const Metric MergeTreeOutdatedPartsLoaderThreadsActive;
}

namespace DB
{

namespace ErrorCodes
{
    extern const int LOGICAL_ERROR;
}


StaticThreadPool::StaticThreadPool(
    const String & name_,
    CurrentMetrics::Metric threads_metric_,
    CurrentMetrics::Metric threads_active_metric_)
    : name(name_)
    , threads_metric(threads_metric_)
    , threads_active_metric(threads_active_metric_)
{
}

void StaticThreadPool::initialize(size_t max_threads, size_t max_free_threads, size_t queue_size)
{
    if (instance)
        throw Exception(ErrorCodes::LOGICAL_ERROR, "The {} is initialized twice", name);

    /// By default enabling "turbo mode" won't affect the number of threads anyhow
    max_threads_turbo = max_threads;
    max_threads_normal = max_threads;
    instance = std::make_unique<ThreadPool>(
        threads_metric,
        threads_active_metric,
        max_threads,
        max_free_threads,
        queue_size,
        /* shutdown_on_exception= */ false);
}

void StaticThreadPool::reloadConfiguration(size_t max_threads, size_t max_free_threads, size_t queue_size)
{
    if (!instance)
        throw Exception(ErrorCodes::LOGICAL_ERROR, "The {} is not initialized", name);

    instance->setMaxThreads(turbo_mode_enabled > 0 ? max_threads_turbo : max_threads);
    instance->setMaxFreeThreads(max_free_threads);
    instance->setQueueSize(queue_size);
}


ThreadPool & StaticThreadPool::get()
{
    if (!instance)
        throw Exception(ErrorCodes::LOGICAL_ERROR, "The {} is not initialized", name);

    return *instance;
}

void StaticThreadPool::enableTurboMode()
{
    if (!instance)
        throw Exception(ErrorCodes::LOGICAL_ERROR, "The {} is not initialized", name);

    std::lock_guard lock(mutex);

    ++turbo_mode_enabled;
    if (turbo_mode_enabled == 1)
        instance->setMaxThreads(max_threads_turbo);
}

void StaticThreadPool::disableTurboMode()
{
    if (!instance)
        throw Exception(ErrorCodes::LOGICAL_ERROR, "The {} is not initialized", name);

    std::lock_guard lock(mutex);

    --turbo_mode_enabled;
    if (turbo_mode_enabled == 0)
        instance->setMaxThreads(max_threads_normal);
}

void StaticThreadPool::setMaxTurboThreads(size_t max_threads_turbo_)
{
    if (!instance)
        throw Exception(ErrorCodes::LOGICAL_ERROR, "The {} is not initialized", name);

    std::lock_guard lock(mutex);

    max_threads_turbo = max_threads_turbo_;
    if (turbo_mode_enabled > 0)
        instance->setMaxThreads(max_threads_turbo);
}

StaticThreadPool & getIOThreadPool()
{
    static StaticThreadPool instance("IOThreadPool", CurrentMetrics::IOThreads, CurrentMetrics::IOThreadsActive);
    return instance;
}

StaticThreadPool & getBackupsIOThreadPool()
{
    static StaticThreadPool instance("BackupsIOThreadPool", CurrentMetrics::BackupsIOThreads, CurrentMetrics::BackupsIOThreadsActive);
    return instance;
}

StaticThreadPool & getActivePartsLoadingThreadPool()
{
    static StaticThreadPool instance("MergeTreePartsLoaderThreadPool", CurrentMetrics::MergeTreePartsLoaderThreads, CurrentMetrics::MergeTreePartsLoaderThreadsActive);
    return instance;
}

StaticThreadPool & getPartsCleaningThreadPool()
{
    static StaticThreadPool instance("MergeTreePartsCleanerThreadPool", CurrentMetrics::MergeTreePartsCleanerThreads, CurrentMetrics::MergeTreePartsCleanerThreadsActive);
    return instance;
}

StaticThreadPool & getOutdatedPartsLoadingThreadPool()
{
    static StaticThreadPool instance("MergeTreeOutdatedPartsLoaderThreadPool", CurrentMetrics::MergeTreeOutdatedPartsLoaderThreads, CurrentMetrics::MergeTreeOutdatedPartsLoaderThreadsActive);
    return instance;
}

}