1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
|
#pragma once
#include <chrono>
#include <functional>
#include <unordered_map>
#include <base/types.h>
#include <Interpreters/IExternalLoadable.h>
#include <Interpreters/IExternalLoaderConfigRepository.h>
#include <base/scope_guard.h>
#include <Common/ExternalLoaderStatus.h>
#include <Core/Types.h>
namespace Poco { class Logger; }
namespace DB
{
/* External configuration structure.
*
* <external_group>
* <external_config>
* <external_name>name</external_name>
* ....
* </external_config>
* </external_group>
*/
struct ExternalLoaderConfigSettings
{
std::string external_config;
std::string external_name;
std::string external_database;
std::string external_uuid;
};
/** Interface for manage user-defined objects.
* Monitors configuration file and automatically reloads objects in separate threads.
* The monitoring thread wakes up every 'check_period_sec' seconds and checks
* modification time of objects' configuration file. If said time is greater than
* 'config_last_modified', the objects are created from scratch using configuration file,
* possibly overriding currently existing objects with the same name (previous versions of
* overridden objects will live as long as there are any users retaining them).
*
* Apart from checking configuration file for modifications, each object
* has a lifetime of its own and may be updated if it supportUpdates.
* The time of next update is calculated by choosing uniformly a random number
* distributed between lifetime.min_sec and lifetime.max_sec.
* If either of lifetime.min_sec and lifetime.max_sec is zero, such object is never updated.
*/
class ExternalLoader
{
public:
using LoadablePtr = std::shared_ptr<const IExternalLoadable>;
using Loadables = std::vector<LoadablePtr>;
using Status = ExternalLoaderStatus;
using Duration = std::chrono::milliseconds;
using TimePoint = std::chrono::system_clock::time_point;
struct ObjectConfig
{
Poco::AutoPtr<Poco::Util::AbstractConfiguration> config;
String key_in_config;
String repository_name;
bool from_temp_repository = false;
String path;
};
struct LoadResult
{
Status status = Status::NOT_EXIST;
String name;
LoadablePtr object;
TimePoint loading_start_time;
TimePoint last_successful_update_time;
Duration loading_duration;
std::exception_ptr exception;
std::shared_ptr<const ObjectConfig> config;
};
using LoadResults = std::vector<LoadResult>;
template <typename T>
static constexpr bool is_scalar_load_result_type = std::is_same_v<T, LoadResult> || std::is_same_v<T, LoadablePtr>;
template <typename T>
static constexpr bool is_vector_load_result_type = std::is_same_v<T, LoadResults> || std::is_same_v<T, Loadables>;
ExternalLoader(const String & type_name_, Poco::Logger * log);
virtual ~ExternalLoader();
/// Adds a repository which will be used to read configurations from.
scope_guard addConfigRepository(std::unique_ptr<IExternalLoaderConfigRepository> config_repository) const;
void setConfigSettings(const ExternalLoaderConfigSettings & settings_);
/// Sets whether all the objects from the configuration should be always loaded (even those which are never used).
void enableAlwaysLoadEverything(bool enable);
/// Sets whether the objects should be loaded asynchronously, each loading in a new thread (from the thread pool).
void enableAsyncLoading(bool enable);
/// Sets settings for periodic updates.
void enablePeriodicUpdates(bool enable);
/// Returns the status of the object.
/// If the object has not been loaded yet then the function returns Status::NOT_LOADED.
/// If the specified name isn't found in the configuration then the function returns Status::NOT_EXIST.
Status getCurrentStatus(const String & name) const;
/// Returns the result of loading the object.
/// The function doesn't load anything, it just returns the current load result as is.
template <typename ReturnType = LoadResult, typename = std::enable_if_t<is_scalar_load_result_type<ReturnType>, void>>
ReturnType getLoadResult(const String & name) const;
using FilterByNameFunction = std::function<bool(const String &)>;
/// Returns all the load results as a map.
/// The function doesn't load anything, it just returns the current load results as is.
template <typename ReturnType = LoadResults, typename = std::enable_if_t<is_vector_load_result_type<ReturnType>, void>>
ReturnType getLoadResults() const { return getLoadResults<ReturnType>(FilterByNameFunction{}); }
template <typename ReturnType = LoadResults, typename = std::enable_if_t<is_vector_load_result_type<ReturnType>, void>>
ReturnType getLoadResults(const FilterByNameFunction & filter) const;
/// Returns all loaded objects as a map.
/// The function doesn't load anything, it just returns the current load results as is.
Loadables getLoadedObjects() const;
Loadables getLoadedObjects(const FilterByNameFunction & filter) const;
/// Returns true if any object was loaded.
bool hasLoadedObjects() const;
size_t getNumberOfLoadedObjects() const;
/// Returns true if there is no object.
bool hasObjects() const { return getNumberOfObjects() == 0; }
/// Returns number of objects.
size_t getNumberOfObjects() const;
static constexpr Duration NO_WAIT = Duration::zero();
static constexpr Duration WAIT = Duration::max();
/// Loads a specified object.
/// The function does nothing if it's already loaded.
/// The function doesn't throw an exception if it's failed to load.
template <typename ReturnType = LoadablePtr, typename = std::enable_if_t<is_scalar_load_result_type<ReturnType>, void>>
ReturnType tryLoad(const String & name, Duration timeout = WAIT) const;
/// Loads objects by filter.
/// The function does nothing for already loaded objects, it just returns them.
/// The function doesn't throw an exception if it's failed to load something.
template <typename ReturnType = Loadables, typename = std::enable_if_t<is_vector_load_result_type<ReturnType>, void>>
ReturnType tryLoad(const FilterByNameFunction & filter, Duration timeout = WAIT) const;
/// Loads all objects.
/// The function does nothing for already loaded objects, it just returns them.
/// The function doesn't throw an exception if it's failed to load something.
template <typename ReturnType = Loadables, typename = std::enable_if_t<is_vector_load_result_type<ReturnType>, void>>
ReturnType tryLoadAll(Duration timeout = WAIT) const { return tryLoad<ReturnType>(FilterByNameFunction{}, timeout); }
/// Loads a specified object.
/// The function does nothing if it's already loaded.
/// The function throws an exception if it's failed to load.
template <typename ReturnType = LoadablePtr, typename = std::enable_if_t<is_scalar_load_result_type<ReturnType>, void>>
ReturnType load(const String & name) const;
/// Loads objects by filter.
/// The function does nothing for already loaded objects, it just returns them.
/// The function throws an exception if it's failed to load something.
template <typename ReturnType = Loadables, typename = std::enable_if_t<is_vector_load_result_type<ReturnType>, void>>
ReturnType load(const FilterByNameFunction & filter) const;
/// Loads all objects. Not recommended to use.
/// The function does nothing for already loaded objects, it just returns them.
/// The function throws an exception if it's failed to load something.
template <typename ReturnType = Loadables, typename = std::enable_if_t<is_vector_load_result_type<ReturnType>, void>>
ReturnType loadAll() const { return load<ReturnType>(FilterByNameFunction{}); }
/// Loads or reloads a specified object.
/// The function reloads the object if it's already loaded.
/// The function throws an exception if it's failed to load or reload.
template <typename ReturnType = LoadablePtr, typename = std::enable_if_t<is_scalar_load_result_type<ReturnType>, void>>
ReturnType loadOrReload(const String & name) const;
/// Loads or reloads objects by filter.
/// The function reloads the objects which are already loaded.
/// The function throws an exception if it's failed to load or reload something.
template <typename ReturnType = Loadables, typename = std::enable_if_t<is_vector_load_result_type<ReturnType>, void>>
ReturnType loadOrReload(const FilterByNameFunction & filter) const;
/// Load or reloads all objects. Not recommended to use.
/// The function throws an exception if it's failed to load or reload something.
template <typename ReturnType = Loadables, typename = std::enable_if_t<is_vector_load_result_type<ReturnType>, void>>
ReturnType loadOrReloadAll() const { return loadOrReload<ReturnType>(FilterByNameFunction{}); }
/// Reloads objects by filter which were tried to load before (successfully or not).
/// The function throws an exception if it's failed to load or reload something.
template <typename ReturnType = Loadables, typename = std::enable_if_t<is_vector_load_result_type<ReturnType>, void>>
ReturnType reloadAllTriedToLoad() const;
/// Check if object with name exists in configuration
bool has(const String & name) const;
/// Reloads all config repositories.
void reloadConfig() const;
/// Reloads only a specified config repository.
void reloadConfig(const String & repository_name) const;
/// Reload only a specified path in a specified config repository.
void reloadConfig(const String & repository_name, const String & path) const;
protected:
virtual LoadablePtr create(const String & name, const Poco::Util::AbstractConfiguration & config, const String & key_in_config, const String & repository_name) const = 0;
private:
void checkLoaded(const LoadResult & result, bool check_no_errors) const;
void checkLoaded(const LoadResults & results, bool check_no_errors) const;
Strings getAllTriedToLoadNames() const;
LoadablePtr createObject(const String & name, const ObjectConfig & config, const LoadablePtr & previous_version) const;
class LoadablesConfigReader;
std::unique_ptr<LoadablesConfigReader> config_files_reader;
class LoadingDispatcher;
std::unique_ptr<LoadingDispatcher> loading_dispatcher;
class PeriodicUpdater;
std::unique_ptr<PeriodicUpdater> periodic_updater;
const String type_name;
Poco::Logger * log;
};
}
|