aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/clickhouse/src/IO/Resource/StaticResourceManager.cpp
blob: a79e8148f941ed340569877eb5ead156dc6343c5 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
#include <IO/Resource/StaticResourceManager.h>

#include <IO/SchedulerNodeFactory.h>
#include <IO/ResourceManagerFactory.h>
#include <IO/ISchedulerQueue.h>

#include <Common/Exception.h>
#include <Common/StringUtils/StringUtils.h>

#include <map>
#include <tuple>
#include <algorithm>

namespace DB
{

namespace ErrorCodes
{
    extern const int RESOURCE_ACCESS_DENIED;
    extern const int RESOURCE_NOT_FOUND;
    extern const int INVALID_SCHEDULER_NODE;
}

StaticResourceManager::Resource::Resource(
    const String & name,
    EventQueue * event_queue,
    const Poco::Util::AbstractConfiguration & config,
    const std::string & config_prefix)
{
    // Initialize scheduler nodes
    Poco::Util::AbstractConfiguration::Keys keys;
    std::sort(keys.begin(), keys.end()); // for parents to appear before children
    config.keys(config_prefix, keys);
    for (const auto & key : keys)
    {
        if (!startsWith(key, "node"))
            continue;

        // Validate path
        String path = config.getString(config_prefix + "." + key + "[@path]", "");
        if (path.empty())
            throw Exception(ErrorCodes::INVALID_SCHEDULER_NODE, "Attribute 'path' must be specified in all nodes for resource '{}'", name);
        if (path[0] != '/')
            throw Exception(ErrorCodes::INVALID_SCHEDULER_NODE, "path must start with '/' for resource '{}'", name);

        // Create node
        String type = config.getString(config_prefix + "." + key + ".type", "fifo");
        SchedulerNodePtr node = SchedulerNodeFactory::instance().get(type, event_queue, config, config_prefix + "." + key);
        node->basename = path.substr(1);

        // Take ownership
        if (auto [_, inserted] = nodes.emplace(path, node); !inserted)
            throw Exception(ErrorCodes::INVALID_SCHEDULER_NODE, "Duplicate path '{}' for resource '{}'", path, name);

        // Attach created node to parent (if not root)
        if (path != "/")
        {
            String parent_path = path.substr(0, path.rfind('/'));
            if (parent_path.empty())
                parent_path = "/";
            if (auto parent = nodes.find(parent_path); parent != nodes.end())
                parent->second->attachChild(node);
            else
                throw Exception(ErrorCodes::INVALID_SCHEDULER_NODE, "Parent doesn't exist for path '{}' for resource '{}'", path, name);
        }
    }

    if (nodes.find("/") == nodes.end())
        throw Exception(ErrorCodes::INVALID_SCHEDULER_NODE, "undefined root node path '/' for resource '{}'", name);
}

StaticResourceManager::Classifier::Classifier(const StaticResourceManager & manager, const ClassifierDescription & cfg)
{
    for (auto [resource_name, path] : cfg)
    {
        if (auto resource_iter = manager.resources.find(resource_name); resource_iter != manager.resources.end())
        {
            const Resource & resource = resource_iter->second;
            if (auto node_iter = resource.nodes.find(path); node_iter != resource.nodes.end())
            {
                if (auto * queue = dynamic_cast<ISchedulerQueue *>(node_iter->second.get()))
                    resources.emplace(resource_name, ResourceLink{.queue = queue});
                else
                    throw Exception(ErrorCodes::RESOURCE_NOT_FOUND, "Unable to access non-queue node at path '{}' for resource '{}'", path, resource_name);
            }
            else
                throw Exception(ErrorCodes::RESOURCE_NOT_FOUND, "Path '{}' for resource '{}' does not exist", path, resource_name);
        }
        else
            resources.emplace(resource_name, ResourceLink{}); // resource not configured - unlimited
    }
}

ResourceLink StaticResourceManager::Classifier::get(const String & resource_name)
{
    if (auto iter = resources.find(resource_name); iter != resources.end())
        return iter->second;
    else
        throw Exception(ErrorCodes::RESOURCE_ACCESS_DENIED, "Access denied to resource '{}'", resource_name);
}

void StaticResourceManager::updateConfiguration(const Poco::Util::AbstractConfiguration & config)
{
    if (!resources.empty())
        return; // already initialized, configuration update is not supported

    Poco::Util::AbstractConfiguration::Keys keys;
    const String config_prefix = "resources";
    config.keys(config_prefix, keys);

    // Create resource for every element under <resources> tag
    for (const auto & key : keys)
    {
        auto [iter, _] = resources.emplace(std::piecewise_construct,
            std::forward_as_tuple(key),
            std::forward_as_tuple(key, scheduler.event_queue, config, config_prefix + "." + key));
        // Attach root of resource to scheduler
        scheduler.attachChild(iter->second.nodes.find("/")->second);
    }

    // Initialize classifiers
    classifiers = std::make_unique<ClassifiersConfig>(config);

    // Run scheduler thread
    scheduler.start();
}

ClassifierPtr StaticResourceManager::acquire(const String & classifier_name)
{
    return std::make_shared<Classifier>(*this, classifiers->get(classifier_name));
}

void registerStaticResourceManager(ResourceManagerFactory & factory)
{
    factory.registerMethod<StaticResourceManager>("static");
}

}