aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/clickhouse/src/IO/Resource/SemaphoreConstraint.h
blob: 237e63eaddb452b470df1a7230735a458b4105ae (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
#pragma once

#include <IO/ISchedulerConstraint.h>
#include <IO/SchedulerRoot.h>

#include <mutex>
#include <limits>
#include <utility>

namespace DB
{

/*
 * Limited concurrency constraint.
 * Blocks if either number of concurrent in-flight requests exceeds `max_requests`, or their total cost exceeds `max_cost`
 */
class SemaphoreConstraint : public ISchedulerConstraint
{
    static constexpr Int64 default_max_requests = std::numeric_limits<Int64>::max();
    static constexpr Int64 default_max_cost = std::numeric_limits<Int64>::max();
public:
    SemaphoreConstraint(EventQueue * event_queue_, const Poco::Util::AbstractConfiguration & config = emptyConfig(), const String & config_prefix = {})
        : ISchedulerConstraint(event_queue_, config, config_prefix)
        , max_requests(config.getInt64(config_prefix + ".max_requests", default_max_requests))
        , max_cost(config.getInt64(config_prefix + ".max_cost", config.getInt64(config_prefix + ".max_bytes", default_max_cost)))
    {}

    bool equals(ISchedulerNode * other) override
    {
        if (auto * o = dynamic_cast<SemaphoreConstraint *>(other))
            return max_requests == o->max_requests && max_cost == o->max_cost;
        return false;
    }

    void attachChild(const std::shared_ptr<ISchedulerNode> & child_) override
    {
        // Take ownership
        child = child_;
        child->setParent(this);

        // Activate if required
        if (child->isActive())
            activateChild(child.get());
    }

    void removeChild(ISchedulerNode * child_) override
    {
        if (child.get() == child_)
        {
            child_active = false; // deactivate
            child->setParent(nullptr); // detach
            child.reset();
        }
    }

    ISchedulerNode * getChild(const String & child_name) override
    {
        if (child->basename == child_name)
            return child.get();
        else
            return nullptr;
    }

    std::pair<ResourceRequest *, bool> dequeueRequest() override
    {
        // Dequeue request from the child
        auto [request, child_now_active] = child->dequeueRequest();
        if (!request)
            return {nullptr, false};

        // Request has reference to the first (closest to leaf) `constraint`, which can have `parent_constraint`.
        // The former is initialized here dynamically and the latter is initialized once during hierarchy construction.
        if (!request->constraint)
            request->constraint = this;

        // Update state on request arrival
        std::unique_lock lock(mutex);
        requests++;
        cost += request->cost;
        child_active = child_now_active;

        return {request, active()};
    }

    void finishRequest(ResourceRequest * request) override
    {
        // Recursive traverse of parent flow controls in reverse order
        if (parent_constraint)
            parent_constraint->finishRequest(request);

        // Update state on request departure
        std::unique_lock lock(mutex);
        bool was_active = active();
        requests--;
        cost -= request->cost;

        // Schedule activation on transition from inactive state
        if (!was_active && active())
            scheduleActivation();
    }

    void activateChild(ISchedulerNode * child_) override
    {
        std::unique_lock lock(mutex);
        if (child_ == child.get())
            if (!std::exchange(child_active, true) && satisfied() && parent)
                parent->activateChild(this);
    }

    bool isActive() override
    {
        std::unique_lock lock(mutex);
        return active();
    }

private:
    bool satisfied() const
    {
        return requests < max_requests && cost < max_cost;
    }

    bool active() const
    {
        return satisfied() && child_active;
    }

private:
    std::mutex mutex;
    Int64 requests = 0;
    Int64 cost = 0;
    bool child_active = false;

    SchedulerNodePtr child;
    Int64 max_requests = default_max_requests;
    Int64 max_cost = default_max_cost;
};

}