1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
|
#include <IO/Resource/StaticResourceManager.h>
#include <IO/SchedulerNodeFactory.h>
#include <IO/ResourceManagerFactory.h>
#include <IO/ISchedulerQueue.h>
#include <Common/Exception.h>
#include <Common/StringUtils/StringUtils.h>
#include <map>
#include <tuple>
#include <algorithm>
namespace DB
{
namespace ErrorCodes
{
extern const int RESOURCE_ACCESS_DENIED;
extern const int RESOURCE_NOT_FOUND;
extern const int INVALID_SCHEDULER_NODE;
}
StaticResourceManager::Resource::Resource(
const String & name,
EventQueue * event_queue,
const Poco::Util::AbstractConfiguration & config,
const std::string & config_prefix)
{
// Initialize scheduler nodes
Poco::Util::AbstractConfiguration::Keys keys;
std::sort(keys.begin(), keys.end()); // for parents to appear before children
config.keys(config_prefix, keys);
for (const auto & key : keys)
{
if (!startsWith(key, "node"))
continue;
// Validate path
String path = config.getString(config_prefix + "." + key + "[@path]", "");
if (path.empty())
throw Exception(ErrorCodes::INVALID_SCHEDULER_NODE, "Attribute 'path' must be specified in all nodes for resource '{}'", name);
if (path[0] != '/')
throw Exception(ErrorCodes::INVALID_SCHEDULER_NODE, "path must start with '/' for resource '{}'", name);
// Create node
String type = config.getString(config_prefix + "." + key + ".type", "fifo");
SchedulerNodePtr node = SchedulerNodeFactory::instance().get(type, event_queue, config, config_prefix + "." + key);
node->basename = path.substr(1);
// Take ownership
if (auto [_, inserted] = nodes.emplace(path, node); !inserted)
throw Exception(ErrorCodes::INVALID_SCHEDULER_NODE, "Duplicate path '{}' for resource '{}'", path, name);
// Attach created node to parent (if not root)
if (path != "/")
{
String parent_path = path.substr(0, path.rfind('/'));
if (parent_path.empty())
parent_path = "/";
if (auto parent = nodes.find(parent_path); parent != nodes.end())
parent->second->attachChild(node);
else
throw Exception(ErrorCodes::INVALID_SCHEDULER_NODE, "Parent doesn't exist for path '{}' for resource '{}'", path, name);
}
}
if (nodes.find("/") == nodes.end())
throw Exception(ErrorCodes::INVALID_SCHEDULER_NODE, "undefined root node path '/' for resource '{}'", name);
}
StaticResourceManager::Classifier::Classifier(const StaticResourceManager & manager, const ClassifierDescription & cfg)
{
for (auto [resource_name, path] : cfg)
{
if (auto resource_iter = manager.resources.find(resource_name); resource_iter != manager.resources.end())
{
const Resource & resource = resource_iter->second;
if (auto node_iter = resource.nodes.find(path); node_iter != resource.nodes.end())
{
if (auto * queue = dynamic_cast<ISchedulerQueue *>(node_iter->second.get()))
resources.emplace(resource_name, ResourceLink{.queue = queue});
else
throw Exception(ErrorCodes::RESOURCE_NOT_FOUND, "Unable to access non-queue node at path '{}' for resource '{}'", path, resource_name);
}
else
throw Exception(ErrorCodes::RESOURCE_NOT_FOUND, "Path '{}' for resource '{}' does not exist", path, resource_name);
}
else
resources.emplace(resource_name, ResourceLink{}); // resource not configured - unlimited
}
}
ResourceLink StaticResourceManager::Classifier::get(const String & resource_name)
{
if (auto iter = resources.find(resource_name); iter != resources.end())
return iter->second;
else
throw Exception(ErrorCodes::RESOURCE_ACCESS_DENIED, "Access denied to resource '{}'", resource_name);
}
void StaticResourceManager::updateConfiguration(const Poco::Util::AbstractConfiguration & config)
{
if (!resources.empty())
return; // already initialized, configuration update is not supported
Poco::Util::AbstractConfiguration::Keys keys;
const String config_prefix = "resources";
config.keys(config_prefix, keys);
// Create resource for every element under <resources> tag
for (const auto & key : keys)
{
auto [iter, _] = resources.emplace(std::piecewise_construct,
std::forward_as_tuple(key),
std::forward_as_tuple(key, scheduler.event_queue, config, config_prefix + "." + key));
// Attach root of resource to scheduler
scheduler.attachChild(iter->second.nodes.find("/")->second);
}
// Initialize classifiers
classifiers = std::make_unique<ClassifiersConfig>(config);
// Run scheduler thread
scheduler.start();
}
ClassifierPtr StaticResourceManager::acquire(const String & classifier_name)
{
return std::make_shared<Classifier>(*this, classifiers->get(classifier_name));
}
void registerStaticResourceManager(ResourceManagerFactory & factory)
{
factory.registerMethod<StaticResourceManager>("static");
}
}
|