summaryrefslogtreecommitdiffstats
path: root/contrib/clickhouse/src
diff options
context:
space:
mode:
authorAlexSm <[email protected]>2023-12-22 17:10:22 +0100
committerGitHub <[email protected]>2023-12-22 17:10:22 +0100
commit148f920350c60c0ca2d89b637a5aea9093eee450 (patch)
tree6314b1433dac833398c333731e83f0ad77e81a0b /contrib/clickhouse/src
parent7116d46ae7c0259b5f9d489de263f8701e432b1c (diff)
Library import 2 (#639)
Diffstat (limited to 'contrib/clickhouse/src')
-rw-r--r--contrib/clickhouse/src/Functions/UserDefined/IUserDefinedSQLObjectsLoader.h47
-rw-r--r--contrib/clickhouse/src/Functions/UserDefined/IUserDefinedSQLObjectsStorage.h74
-rw-r--r--contrib/clickhouse/src/Functions/UserDefined/UserDefinedSQLFunctionFactory.cpp130
-rw-r--r--contrib/clickhouse/src/Functions/UserDefined/UserDefinedSQLFunctionFactory.h16
-rw-r--r--contrib/clickhouse/src/Functions/UserDefined/UserDefinedSQLObjectsBackup.cpp12
-rw-r--r--contrib/clickhouse/src/Functions/UserDefined/UserDefinedSQLObjectsDiskStorage.cpp268
-rw-r--r--contrib/clickhouse/src/Functions/UserDefined/UserDefinedSQLObjectsDiskStorage.h (renamed from contrib/clickhouse/src/Functions/UserDefined/UserDefinedSQLObjectsLoaderFromDisk.h)19
-rw-r--r--contrib/clickhouse/src/Functions/UserDefined/UserDefinedSQLObjectsLoaderFromDisk.cpp266
-rw-r--r--contrib/clickhouse/src/Functions/UserDefined/UserDefinedSQLObjectsLoaderFromZooKeeper.cpp433
-rw-r--r--contrib/clickhouse/src/Functions/UserDefined/UserDefinedSQLObjectsStorageBase.cpp190
-rw-r--r--contrib/clickhouse/src/Functions/UserDefined/UserDefinedSQLObjectsStorageBase.h69
-rw-r--r--contrib/clickhouse/src/Functions/UserDefined/UserDefinedSQLObjectsZooKeeperStorage.cpp435
-rw-r--r--contrib/clickhouse/src/Functions/UserDefined/UserDefinedSQLObjectsZooKeeperStorage.h (renamed from contrib/clickhouse/src/Functions/UserDefined/UserDefinedSQLObjectsLoaderFromZooKeeper.h)21
-rw-r--r--contrib/clickhouse/src/Functions/UserDefined/createUserDefinedSQLObjectsLoader.cpp44
-rw-r--r--contrib/clickhouse/src/Functions/UserDefined/createUserDefinedSQLObjectsLoader.h12
-rw-r--r--contrib/clickhouse/src/Functions/UserDefined/createUserDefinedSQLObjectsStorage.cpp44
-rw-r--r--contrib/clickhouse/src/Functions/UserDefined/createUserDefinedSQLObjectsStorage.h12
-rw-r--r--contrib/clickhouse/src/Interpreters/Context.cpp36
-rw-r--r--contrib/clickhouse/src/Interpreters/Context.h7
-rw-r--r--contrib/clickhouse/src/Interpreters/InterpreterCreateFunctionQuery.cpp4
-rw-r--r--contrib/clickhouse/src/Interpreters/InterpreterDropFunctionQuery.cpp4
-rw-r--r--contrib/clickhouse/src/Interpreters/removeOnClusterClauseIfNeeded.cpp4
-rw-r--r--contrib/clickhouse/src/ya.make4
23 files changed, 1186 insertions, 965 deletions
diff --git a/contrib/clickhouse/src/Functions/UserDefined/IUserDefinedSQLObjectsLoader.h b/contrib/clickhouse/src/Functions/UserDefined/IUserDefinedSQLObjectsLoader.h
deleted file mode 100644
index 4c7850951b5..00000000000
--- a/contrib/clickhouse/src/Functions/UserDefined/IUserDefinedSQLObjectsLoader.h
+++ /dev/null
@@ -1,47 +0,0 @@
-#pragma once
-
-#include <base/types.h>
-
-
-namespace DB
-{
-class IAST;
-struct Settings;
-enum class UserDefinedSQLObjectType;
-
-/// Interface for a loader of user-defined SQL objects.
-/// Implementations: UserDefinedSQLLoaderFromDisk, UserDefinedSQLLoaderFromZooKeeper
-class IUserDefinedSQLObjectsLoader
-{
-public:
- virtual ~IUserDefinedSQLObjectsLoader() = default;
-
- /// Whether this loader can replicate SQL objects to another node.
- virtual bool isReplicated() const { return false; }
- virtual String getReplicationID() const { return ""; }
-
- /// Loads all objects. Can be called once - if objects are already loaded the function does nothing.
- virtual void loadObjects() = 0;
-
- /// Stops watching.
- virtual void stopWatching() {}
-
- /// Immediately reloads all objects, throws an exception if failed.
- virtual void reloadObjects() = 0;
-
- /// Immediately reloads a specified object only.
- virtual void reloadObject(UserDefinedSQLObjectType object_type, const String & object_name) = 0;
-
- /// Stores an object (must be called only by UserDefinedSQLFunctionFactory::registerFunction).
- virtual bool storeObject(
- UserDefinedSQLObjectType object_type,
- const String & object_name,
- const IAST & create_object_query,
- bool throw_if_exists,
- bool replace_if_exists,
- const Settings & settings) = 0;
-
- /// Removes an object (must be called only by UserDefinedSQLFunctionFactory::unregisterFunction).
- virtual bool removeObject(UserDefinedSQLObjectType object_type, const String & object_name, bool throw_if_not_exists) = 0;
-};
-}
diff --git a/contrib/clickhouse/src/Functions/UserDefined/IUserDefinedSQLObjectsStorage.h b/contrib/clickhouse/src/Functions/UserDefined/IUserDefinedSQLObjectsStorage.h
new file mode 100644
index 00000000000..345ff8c5954
--- /dev/null
+++ b/contrib/clickhouse/src/Functions/UserDefined/IUserDefinedSQLObjectsStorage.h
@@ -0,0 +1,74 @@
+#pragma once
+
+#include <base/types.h>
+
+#include <Interpreters/Context_fwd.h>
+
+#include <Parsers/IAST_fwd.h>
+
+
+namespace DB
+{
+class IAST;
+struct Settings;
+enum class UserDefinedSQLObjectType;
+
+/// Interface for a storage of user-defined SQL objects.
+/// Implementations: UserDefinedSQLObjectsDiskStorage, UserDefinedSQLObjectsZooKeeperStorage
+class IUserDefinedSQLObjectsStorage
+{
+public:
+ virtual ~IUserDefinedSQLObjectsStorage() = default;
+
+ /// Whether this loader can replicate SQL objects to another node.
+ virtual bool isReplicated() const { return false; }
+ virtual String getReplicationID() const { return ""; }
+
+ /// Loads all objects. Can be called once - if objects are already loaded the function does nothing.
+ virtual void loadObjects() = 0;
+
+ /// Get object by name. If no object stored with object_name throws exception.
+ virtual ASTPtr get(const String & object_name) const = 0;
+
+ /// Get object by name. If no object stored with object_name return nullptr.
+ virtual ASTPtr tryGet(const String & object_name) const = 0;
+
+ /// Check if object with object_name is stored.
+ virtual bool has(const String & object_name) const = 0;
+
+ /// Get all user defined object names.
+ virtual std::vector<String> getAllObjectNames() const = 0;
+
+ /// Get all user defined objects.
+ virtual std::vector<std::pair<String, ASTPtr>> getAllObjects() const = 0;
+
+ /// Check whether any UDFs have been stored.
+ virtual bool empty() const = 0;
+
+ /// Stops watching.
+ virtual void stopWatching() {}
+
+ /// Immediately reloads all objects, throws an exception if failed.
+ virtual void reloadObjects() = 0;
+
+ /// Immediately reloads a specified object only.
+ virtual void reloadObject(UserDefinedSQLObjectType object_type, const String & object_name) = 0;
+
+ /// Stores an object (must be called only by UserDefinedSQLFunctionFactory::registerFunction).
+ virtual bool storeObject(
+ const ContextPtr & current_context,
+ UserDefinedSQLObjectType object_type,
+ const String & object_name,
+ ASTPtr create_object_query,
+ bool throw_if_exists,
+ bool replace_if_exists,
+ const Settings & settings) = 0;
+
+ /// Removes an object (must be called only by UserDefinedSQLFunctionFactory::unregisterFunction).
+ virtual bool removeObject(
+ const ContextPtr & current_context,
+ UserDefinedSQLObjectType object_type,
+ const String & object_name,
+ bool throw_if_not_exists) = 0;
+};
+}
diff --git a/contrib/clickhouse/src/Functions/UserDefined/UserDefinedSQLFunctionFactory.cpp b/contrib/clickhouse/src/Functions/UserDefined/UserDefinedSQLFunctionFactory.cpp
index 622854b3508..1cee137ba10 100644
--- a/contrib/clickhouse/src/Functions/UserDefined/UserDefinedSQLFunctionFactory.cpp
+++ b/contrib/clickhouse/src/Functions/UserDefined/UserDefinedSQLFunctionFactory.cpp
@@ -3,7 +3,7 @@
#include <AggregateFunctions/AggregateFunctionFactory.h>
#include <Backups/RestorerFromBackup.h>
#include <Functions/FunctionFactory.h>
-#include <Functions/UserDefined/IUserDefinedSQLObjectsLoader.h>
+#include <Functions/UserDefined/IUserDefinedSQLObjectsStorage.h>
#include <Functions/UserDefined/UserDefinedExecutableFunctionFactory.h>
#include <Functions/UserDefined/UserDefinedSQLObjectType.h>
#include <Functions/UserDefined/UserDefinedSQLObjectsBackup.h>
@@ -128,20 +128,17 @@ bool UserDefinedSQLFunctionFactory::registerFunction(const ContextMutablePtr & c
checkCanBeRegistered(context, function_name, *create_function_query);
create_function_query = normalizeCreateFunctionQuery(*create_function_query);
- std::lock_guard lock{mutex};
- auto it = function_name_to_create_query_map.find(function_name);
- if (it != function_name_to_create_query_map.end())
- {
- if (throw_if_exists)
- throw Exception(ErrorCodes::FUNCTION_ALREADY_EXISTS, "User-defined function '{}' already exists", function_name);
- else if (!replace_if_exists)
- return false;
- }
-
try
{
- auto & loader = context->getUserDefinedSQLObjectsLoader();
- bool stored = loader.storeObject(UserDefinedSQLObjectType::Function, function_name, *create_function_query, throw_if_exists, replace_if_exists, context->getSettingsRef());
+ auto & loader = context->getUserDefinedSQLObjectsStorage();
+ bool stored = loader.storeObject(
+ context,
+ UserDefinedSQLObjectType::Function,
+ function_name,
+ create_function_query,
+ throw_if_exists,
+ replace_if_exists,
+ context->getSettingsRef());
if (!stored)
return false;
}
@@ -151,7 +148,6 @@ bool UserDefinedSQLFunctionFactory::registerFunction(const ContextMutablePtr & c
throw;
}
- function_name_to_create_query_map[function_name] = create_function_query;
return true;
}
@@ -159,20 +155,14 @@ bool UserDefinedSQLFunctionFactory::unregisterFunction(const ContextMutablePtr &
{
checkCanBeUnregistered(context, function_name);
- std::lock_guard lock(mutex);
- auto it = function_name_to_create_query_map.find(function_name);
- if (it == function_name_to_create_query_map.end())
- {
- if (throw_if_not_exists)
- throw Exception(ErrorCodes::UNKNOWN_FUNCTION, "User-defined function '{}' doesn't exist", function_name);
- else
- return false;
- }
-
try
{
- auto & loader = context->getUserDefinedSQLObjectsLoader();
- bool removed = loader.removeObject(UserDefinedSQLObjectType::Function, function_name, throw_if_not_exists);
+ auto & storage = context->getUserDefinedSQLObjectsStorage();
+ bool removed = storage.removeObject(
+ context,
+ UserDefinedSQLObjectType::Function,
+ function_name,
+ throw_if_not_exists);
if (!removed)
return false;
}
@@ -182,61 +172,41 @@ bool UserDefinedSQLFunctionFactory::unregisterFunction(const ContextMutablePtr &
throw;
}
- function_name_to_create_query_map.erase(function_name);
return true;
}
ASTPtr UserDefinedSQLFunctionFactory::get(const String & function_name) const
{
- std::lock_guard lock(mutex);
-
- auto it = function_name_to_create_query_map.find(function_name);
- if (it == function_name_to_create_query_map.end())
- throw Exception(ErrorCodes::UNKNOWN_FUNCTION,
- "The function name '{}' is not registered",
- function_name);
-
- return it->second;
+ return global_context->getUserDefinedSQLObjectsStorage().get(function_name);
}
ASTPtr UserDefinedSQLFunctionFactory::tryGet(const std::string & function_name) const
{
- std::lock_guard lock(mutex);
-
- auto it = function_name_to_create_query_map.find(function_name);
- if (it == function_name_to_create_query_map.end())
- return nullptr;
-
- return it->second;
+ return global_context->getUserDefinedSQLObjectsStorage().tryGet(function_name);
}
bool UserDefinedSQLFunctionFactory::has(const String & function_name) const
{
- return tryGet(function_name) != nullptr;
+ return global_context->getUserDefinedSQLObjectsStorage().has(function_name);
}
std::vector<std::string> UserDefinedSQLFunctionFactory::getAllRegisteredNames() const
{
- std::vector<std::string> registered_names;
-
- std::lock_guard lock(mutex);
- registered_names.reserve(function_name_to_create_query_map.size());
-
- for (const auto & [name, _] : function_name_to_create_query_map)
- registered_names.emplace_back(name);
-
- return registered_names;
+ return global_context->getUserDefinedSQLObjectsStorage().getAllObjectNames();
}
bool UserDefinedSQLFunctionFactory::empty() const
{
- std::lock_guard lock(mutex);
- return function_name_to_create_query_map.empty();
+ return global_context->getUserDefinedSQLObjectsStorage().empty();
}
void UserDefinedSQLFunctionFactory::backup(BackupEntriesCollector & backup_entries_collector, const String & data_path_in_backup) const
{
- backupUserDefinedSQLObjects(backup_entries_collector, data_path_in_backup, UserDefinedSQLObjectType::Function, getAllFunctions());
+ backupUserDefinedSQLObjects(
+ backup_entries_collector,
+ data_path_in_backup,
+ UserDefinedSQLObjectType::Function,
+ global_context->getUserDefinedSQLObjectsStorage().getAllObjects());
}
void UserDefinedSQLFunctionFactory::restore(RestorerFromBackup & restorer, const String & data_path_in_backup)
@@ -250,52 +220,4 @@ void UserDefinedSQLFunctionFactory::restore(RestorerFromBackup & restorer, const
registerFunction(context, function_name, create_function_query, throw_if_exists, replace_if_exists);
}
-void UserDefinedSQLFunctionFactory::setAllFunctions(const std::vector<std::pair<String, ASTPtr>> & new_functions)
-{
- std::unordered_map<String, ASTPtr> normalized_functions;
- for (const auto & [function_name, create_query] : new_functions)
- normalized_functions[function_name] = normalizeCreateFunctionQuery(*create_query);
-
- std::lock_guard lock(mutex);
- function_name_to_create_query_map = std::move(normalized_functions);
-}
-
-std::vector<std::pair<String, ASTPtr>> UserDefinedSQLFunctionFactory::getAllFunctions() const
-{
- std::lock_guard lock{mutex};
- std::vector<std::pair<String, ASTPtr>> all_functions;
- all_functions.reserve(function_name_to_create_query_map.size());
- std::copy(function_name_to_create_query_map.begin(), function_name_to_create_query_map.end(), std::back_inserter(all_functions));
- return all_functions;
-}
-
-void UserDefinedSQLFunctionFactory::setFunction(const String & function_name, const IAST & create_function_query)
-{
- std::lock_guard lock(mutex);
- function_name_to_create_query_map[function_name] = normalizeCreateFunctionQuery(create_function_query);
-}
-
-void UserDefinedSQLFunctionFactory::removeFunction(const String & function_name)
-{
- std::lock_guard lock(mutex);
- function_name_to_create_query_map.erase(function_name);
-}
-
-void UserDefinedSQLFunctionFactory::removeAllFunctionsExcept(const Strings & function_names_to_keep)
-{
- boost::container::flat_set<std::string_view> names_set_to_keep{function_names_to_keep.begin(), function_names_to_keep.end()};
- std::lock_guard lock(mutex);
- for (auto it = function_name_to_create_query_map.begin(); it != function_name_to_create_query_map.end();)
- {
- auto current = it++;
- if (!names_set_to_keep.contains(current->first))
- function_name_to_create_query_map.erase(current);
- }
-}
-
-std::unique_lock<std::recursive_mutex> UserDefinedSQLFunctionFactory::getLock() const
-{
- return std::unique_lock{mutex};
-}
-
}
diff --git a/contrib/clickhouse/src/Functions/UserDefined/UserDefinedSQLFunctionFactory.h b/contrib/clickhouse/src/Functions/UserDefined/UserDefinedSQLFunctionFactory.h
index 45196759d3b..489feeae6f5 100644
--- a/contrib/clickhouse/src/Functions/UserDefined/UserDefinedSQLFunctionFactory.h
+++ b/contrib/clickhouse/src/Functions/UserDefined/UserDefinedSQLFunctionFactory.h
@@ -6,7 +6,7 @@
#include <Common/NamePrompter.h>
#include <Parsers/ASTCreateFunctionQuery.h>
-#include <Interpreters/Context_fwd.h>
+#include <Interpreters/Context.h>
namespace DB
@@ -48,23 +48,11 @@ public:
void restore(RestorerFromBackup & restorer, const String & data_path_in_backup);
private:
- friend class UserDefinedSQLObjectsLoaderFromDisk;
- friend class UserDefinedSQLObjectsLoaderFromZooKeeper;
-
/// Checks that a specified function can be registered, throws an exception if not.
static void checkCanBeRegistered(const ContextPtr & context, const String & function_name, const IAST & create_function_query);
static void checkCanBeUnregistered(const ContextPtr & context, const String & function_name);
- /// The following functions must be called only by the loader.
- void setAllFunctions(const std::vector<std::pair<String, ASTPtr>> & new_functions);
- std::vector<std::pair<String, ASTPtr>> getAllFunctions() const;
- void setFunction(const String & function_name, const IAST & create_function_query);
- void removeFunction(const String & function_name);
- void removeAllFunctionsExcept(const Strings & function_names_to_keep);
- std::unique_lock<std::recursive_mutex> getLock() const;
-
- std::unordered_map<String, ASTPtr> function_name_to_create_query_map;
- mutable std::recursive_mutex mutex;
+ ContextPtr global_context = Context::getGlobalContextInstance();
};
}
diff --git a/contrib/clickhouse/src/Functions/UserDefined/UserDefinedSQLObjectsBackup.cpp b/contrib/clickhouse/src/Functions/UserDefined/UserDefinedSQLObjectsBackup.cpp
index 6920e8ce2c2..3ec5393fa6f 100644
--- a/contrib/clickhouse/src/Functions/UserDefined/UserDefinedSQLObjectsBackup.cpp
+++ b/contrib/clickhouse/src/Functions/UserDefined/UserDefinedSQLObjectsBackup.cpp
@@ -6,7 +6,7 @@
#include <Backups/IBackupCoordination.h>
#include <Backups/IRestoreCoordination.h>
#include <Backups/RestorerFromBackup.h>
-#include <Functions/UserDefined/IUserDefinedSQLObjectsLoader.h>
+#include <Functions/UserDefined/IUserDefinedSQLObjectsStorage.h>
#include <Functions/UserDefined/UserDefinedSQLObjectType.h>
#include <Interpreters/Context.h>
#include <Parsers/ParserCreateFunctionQuery.h>
@@ -37,9 +37,9 @@ void backupUserDefinedSQLObjects(
escapeForFileName(object_name) + ".sql", std::make_shared<BackupEntryFromMemory>(queryToString(create_object_query)));
auto context = backup_entries_collector.getContext();
- const auto & loader = context->getUserDefinedSQLObjectsLoader();
+ const auto & storage = context->getUserDefinedSQLObjectsStorage();
- if (!loader.isReplicated())
+ if (!storage.isReplicated())
{
fs::path data_path_in_backup_fs{data_path_in_backup};
for (const auto & [file_name, entry] : backup_entries)
@@ -47,7 +47,7 @@ void backupUserDefinedSQLObjects(
return;
}
- String replication_id = loader.getReplicationID();
+ String replication_id = storage.getReplicationID();
auto backup_coordination = backup_entries_collector.getBackupCoordination();
backup_coordination->addReplicatedSQLObjectsDir(replication_id, object_type, data_path_in_backup);
@@ -80,9 +80,9 @@ std::vector<std::pair<String, ASTPtr>>
restoreUserDefinedSQLObjects(RestorerFromBackup & restorer, const String & data_path_in_backup, UserDefinedSQLObjectType object_type)
{
auto context = restorer.getContext();
- const auto & loader = context->getUserDefinedSQLObjectsLoader();
+ const auto & storage = context->getUserDefinedSQLObjectsStorage();
- if (loader.isReplicated() && !restorer.getRestoreCoordination()->acquireReplicatedSQLObjects(loader.getReplicationID(), object_type))
+ if (storage.isReplicated() && !restorer.getRestoreCoordination()->acquireReplicatedSQLObjects(storage.getReplicationID(), object_type))
return {}; /// Other replica is already restoring user-defined SQL objects.
auto backup = restorer.getBackup();
diff --git a/contrib/clickhouse/src/Functions/UserDefined/UserDefinedSQLObjectsDiskStorage.cpp b/contrib/clickhouse/src/Functions/UserDefined/UserDefinedSQLObjectsDiskStorage.cpp
new file mode 100644
index 00000000000..271c464e79a
--- /dev/null
+++ b/contrib/clickhouse/src/Functions/UserDefined/UserDefinedSQLObjectsDiskStorage.cpp
@@ -0,0 +1,268 @@
+#include "Functions/UserDefined/UserDefinedSQLObjectsDiskStorage.h"
+
+#include "Functions/UserDefined/UserDefinedSQLFunctionFactory.h"
+#include "Functions/UserDefined/UserDefinedSQLObjectType.h"
+
+#include <Common/StringUtils/StringUtils.h>
+#include <Common/atomicRename.h>
+#include <Common/escapeForFileName.h>
+#include <Common/logger_useful.h>
+#include <Common/quoteString.h>
+
+#include <IO/ReadBufferFromFile.h>
+#include <IO/ReadHelpers.h>
+#include <IO/WriteBufferFromFile.h>
+#include <IO/WriteHelpers.h>
+
+#include <Interpreters/Context.h>
+
+#include <Parsers/parseQuery.h>
+#include <Parsers/formatAST.h>
+#include <Parsers/ParserCreateFunctionQuery.h>
+
+#include <Poco/DirectoryIterator.h>
+#include <Poco/Logger.h>
+
+#include <filesystem>
+
+namespace fs = std::filesystem;
+
+
+namespace DB
+{
+
+namespace ErrorCodes
+{
+ extern const int DIRECTORY_DOESNT_EXIST;
+ extern const int FUNCTION_ALREADY_EXISTS;
+ extern const int UNKNOWN_FUNCTION;
+}
+
+
+namespace
+{
+ /// Converts a path to an absolute path and append it with a separator.
+ String makeDirectoryPathCanonical(const String & directory_path)
+ {
+ auto canonical_directory_path = std::filesystem::weakly_canonical(directory_path);
+ if (canonical_directory_path.has_filename())
+ canonical_directory_path += std::filesystem::path::preferred_separator;
+ return canonical_directory_path;
+ }
+}
+
+UserDefinedSQLObjectsDiskStorage::UserDefinedSQLObjectsDiskStorage(const ContextPtr & global_context_, const String & dir_path_)
+ : global_context(global_context_)
+ , dir_path{makeDirectoryPathCanonical(dir_path_)}
+ , log{&Poco::Logger::get("UserDefinedSQLObjectsLoaderFromDisk")}
+{
+ createDirectory();
+}
+
+
+ASTPtr UserDefinedSQLObjectsDiskStorage::tryLoadObject(UserDefinedSQLObjectType object_type, const String & object_name)
+{
+ return tryLoadObject(object_type, object_name, getFilePath(object_type, object_name), /* check_file_exists= */ true);
+}
+
+
+ASTPtr UserDefinedSQLObjectsDiskStorage::tryLoadObject(UserDefinedSQLObjectType object_type, const String & object_name, const String & path, bool check_file_exists)
+{
+ LOG_DEBUG(log, "Loading user defined object {} from file {}", backQuote(object_name), path);
+
+ try
+ {
+ if (check_file_exists && !fs::exists(path))
+ return nullptr;
+
+ /// There is .sql file with user defined object creation statement.
+ ReadBufferFromFile in(path);
+
+ String object_create_query;
+ readStringUntilEOF(object_create_query, in);
+
+ switch (object_type)
+ {
+ case UserDefinedSQLObjectType::Function:
+ {
+ ParserCreateFunctionQuery parser;
+ ASTPtr ast = parseQuery(
+ parser,
+ object_create_query.data(),
+ object_create_query.data() + object_create_query.size(),
+ "",
+ 0,
+ global_context->getSettingsRef().max_parser_depth);
+ return ast;
+ }
+ }
+ }
+ catch (...)
+ {
+ tryLogCurrentException(log, fmt::format("while loading user defined SQL object {} from path {}", backQuote(object_name), path));
+ return nullptr; /// Failed to load this sql object, will ignore it
+ }
+}
+
+
+void UserDefinedSQLObjectsDiskStorage::loadObjects()
+{
+ if (!objects_loaded)
+ loadObjectsImpl();
+}
+
+
+void UserDefinedSQLObjectsDiskStorage::reloadObjects()
+{
+ loadObjectsImpl();
+}
+
+
+void UserDefinedSQLObjectsDiskStorage::loadObjectsImpl()
+{
+ LOG_INFO(log, "Loading user defined objects from {}", dir_path);
+ createDirectory();
+
+ std::vector<std::pair<String, ASTPtr>> function_names_and_queries;
+
+ Poco::DirectoryIterator dir_end;
+ for (Poco::DirectoryIterator it(dir_path); it != dir_end; ++it)
+ {
+ if (it->isDirectory())
+ continue;
+
+ const String & file_name = it.name();
+ if (!startsWith(file_name, "function_") || !endsWith(file_name, ".sql"))
+ continue;
+
+ size_t prefix_length = strlen("function_");
+ size_t suffix_length = strlen(".sql");
+ String function_name = unescapeForFileName(file_name.substr(prefix_length, file_name.length() - prefix_length - suffix_length));
+
+ if (function_name.empty())
+ continue;
+
+ ASTPtr ast = tryLoadObject(UserDefinedSQLObjectType::Function, function_name, dir_path + it.name(), /* check_file_exists= */ false);
+ if (ast)
+ function_names_and_queries.emplace_back(function_name, ast);
+ }
+
+ setAllObjects(function_names_and_queries);
+ objects_loaded = true;
+
+ LOG_DEBUG(log, "User defined objects loaded");
+}
+
+
+void UserDefinedSQLObjectsDiskStorage::reloadObject(UserDefinedSQLObjectType object_type, const String & object_name)
+{
+ createDirectory();
+ auto ast = tryLoadObject(object_type, object_name);
+ if (ast)
+ setObject(object_name, *ast);
+ else
+ removeObject(object_name);
+}
+
+
+void UserDefinedSQLObjectsDiskStorage::createDirectory()
+{
+ std::error_code create_dir_error_code;
+ fs::create_directories(dir_path, create_dir_error_code);
+ if (!fs::exists(dir_path) || !fs::is_directory(dir_path) || create_dir_error_code)
+ throw Exception(ErrorCodes::DIRECTORY_DOESNT_EXIST, "Couldn't create directory {} reason: '{}'",
+ dir_path, create_dir_error_code.message());
+}
+
+
+bool UserDefinedSQLObjectsDiskStorage::storeObjectImpl(
+ const ContextPtr & /*current_context*/,
+ UserDefinedSQLObjectType object_type,
+ const String & object_name,
+ ASTPtr create_object_query,
+ bool throw_if_exists,
+ bool replace_if_exists,
+ const Settings & settings)
+{
+ String file_path = getFilePath(object_type, object_name);
+ LOG_DEBUG(log, "Storing user-defined object {} to file {}", backQuote(object_name), file_path);
+
+ if (fs::exists(file_path))
+ {
+ if (throw_if_exists)
+ throw Exception(ErrorCodes::FUNCTION_ALREADY_EXISTS, "User-defined function '{}' already exists", object_name);
+ else if (!replace_if_exists)
+ return false;
+ }
+
+ WriteBufferFromOwnString create_statement_buf;
+ formatAST(*create_object_query, create_statement_buf, false);
+ writeChar('\n', create_statement_buf);
+ String create_statement = create_statement_buf.str();
+
+ String temp_file_path = file_path + ".tmp";
+
+ try
+ {
+ WriteBufferFromFile out(temp_file_path, create_statement.size());
+ writeString(create_statement, out);
+ out.next();
+ if (settings.fsync_metadata)
+ out.sync();
+ out.close();
+
+ if (replace_if_exists)
+ fs::rename(temp_file_path, file_path);
+ else
+ renameNoReplace(temp_file_path, file_path);
+ }
+ catch (...)
+ {
+ fs::remove(temp_file_path);
+ throw;
+ }
+
+ LOG_TRACE(log, "Object {} stored", backQuote(object_name));
+ return true;
+}
+
+
+bool UserDefinedSQLObjectsDiskStorage::removeObjectImpl(
+ const ContextPtr & /*current_context*/,
+ UserDefinedSQLObjectType object_type,
+ const String & object_name,
+ bool throw_if_not_exists)
+{
+ String file_path = getFilePath(object_type, object_name);
+ LOG_DEBUG(log, "Removing user defined object {} stored in file {}", backQuote(object_name), file_path);
+
+ bool existed = fs::remove(file_path);
+
+ if (!existed)
+ {
+ if (throw_if_not_exists)
+ throw Exception(ErrorCodes::UNKNOWN_FUNCTION, "User-defined function '{}' doesn't exist", object_name);
+ else
+ return false;
+ }
+
+ LOG_TRACE(log, "Object {} removed", backQuote(object_name));
+ return true;
+}
+
+
+String UserDefinedSQLObjectsDiskStorage::getFilePath(UserDefinedSQLObjectType object_type, const String & object_name) const
+{
+ String file_path;
+ switch (object_type)
+ {
+ case UserDefinedSQLObjectType::Function:
+ {
+ file_path = dir_path + "function_" + escapeForFileName(object_name) + ".sql";
+ break;
+ }
+ }
+ return file_path;
+}
+
+}
diff --git a/contrib/clickhouse/src/Functions/UserDefined/UserDefinedSQLObjectsLoaderFromDisk.h b/contrib/clickhouse/src/Functions/UserDefined/UserDefinedSQLObjectsDiskStorage.h
index 7b0bb291f42..f0986dbda72 100644
--- a/contrib/clickhouse/src/Functions/UserDefined/UserDefinedSQLObjectsLoaderFromDisk.h
+++ b/contrib/clickhouse/src/Functions/UserDefined/UserDefinedSQLObjectsDiskStorage.h
@@ -1,6 +1,6 @@
#pragma once
-#include <Functions/UserDefined/IUserDefinedSQLObjectsLoader.h>
+#include <Functions/UserDefined/UserDefinedSQLObjectsStorageBase.h>
#include <Interpreters/Context_fwd.h>
#include <Parsers/IAST_fwd.h>
@@ -9,10 +9,10 @@ namespace DB
{
/// Loads user-defined sql objects from a specified folder.
-class UserDefinedSQLObjectsLoaderFromDisk : public IUserDefinedSQLObjectsLoader
+class UserDefinedSQLObjectsDiskStorage : public UserDefinedSQLObjectsStorageBase
{
public:
- UserDefinedSQLObjectsLoaderFromDisk(const ContextPtr & global_context_, const String & dir_path_);
+ UserDefinedSQLObjectsDiskStorage(const ContextPtr & global_context_, const String & dir_path_);
void loadObjects() override;
@@ -20,17 +20,22 @@ public:
void reloadObject(UserDefinedSQLObjectType object_type, const String & object_name) override;
- bool storeObject(
+private:
+ bool storeObjectImpl(
+ const ContextPtr & current_context,
UserDefinedSQLObjectType object_type,
const String & object_name,
- const IAST & create_object_query,
+ ASTPtr create_object_query,
bool throw_if_exists,
bool replace_if_exists,
const Settings & settings) override;
- bool removeObject(UserDefinedSQLObjectType object_type, const String & object_name, bool throw_if_not_exists) override;
+ bool removeObjectImpl(
+ const ContextPtr & current_context,
+ UserDefinedSQLObjectType object_type,
+ const String & object_name,
+ bool throw_if_not_exists) override;
-private:
void createDirectory();
void loadObjectsImpl();
ASTPtr tryLoadObject(UserDefinedSQLObjectType object_type, const String & object_name);
diff --git a/contrib/clickhouse/src/Functions/UserDefined/UserDefinedSQLObjectsLoaderFromDisk.cpp b/contrib/clickhouse/src/Functions/UserDefined/UserDefinedSQLObjectsLoaderFromDisk.cpp
index d67c48f166d..e69de29bb2d 100644
--- a/contrib/clickhouse/src/Functions/UserDefined/UserDefinedSQLObjectsLoaderFromDisk.cpp
+++ b/contrib/clickhouse/src/Functions/UserDefined/UserDefinedSQLObjectsLoaderFromDisk.cpp
@@ -1,266 +0,0 @@
-#include "Functions/UserDefined/UserDefinedSQLObjectsLoaderFromDisk.h"
-
-#include "Functions/UserDefined/UserDefinedSQLFunctionFactory.h"
-#include "Functions/UserDefined/UserDefinedSQLObjectType.h"
-
-#include <Common/StringUtils/StringUtils.h>
-#include <Common/atomicRename.h>
-#include <Common/escapeForFileName.h>
-#include <Common/logger_useful.h>
-#include <Common/quoteString.h>
-
-#include <IO/ReadBufferFromFile.h>
-#include <IO/ReadHelpers.h>
-#include <IO/WriteBufferFromFile.h>
-#include <IO/WriteHelpers.h>
-
-#include <Interpreters/Context.h>
-
-#include <Parsers/parseQuery.h>
-#include <Parsers/formatAST.h>
-#include <Parsers/ParserCreateFunctionQuery.h>
-
-#include <Poco/DirectoryIterator.h>
-#include <Poco/Logger.h>
-
-#include <filesystem>
-
-namespace fs = std::filesystem;
-
-
-namespace DB
-{
-
-namespace ErrorCodes
-{
- extern const int DIRECTORY_DOESNT_EXIST;
- extern const int FUNCTION_ALREADY_EXISTS;
- extern const int UNKNOWN_FUNCTION;
-}
-
-
-namespace
-{
- /// Converts a path to an absolute path and append it with a separator.
- String makeDirectoryPathCanonical(const String & directory_path)
- {
- auto canonical_directory_path = std::filesystem::weakly_canonical(directory_path);
- if (canonical_directory_path.has_filename())
- canonical_directory_path += std::filesystem::path::preferred_separator;
- return canonical_directory_path;
- }
-}
-
-UserDefinedSQLObjectsLoaderFromDisk::UserDefinedSQLObjectsLoaderFromDisk(const ContextPtr & global_context_, const String & dir_path_)
- : global_context(global_context_)
- , dir_path{makeDirectoryPathCanonical(dir_path_)}
- , log{&Poco::Logger::get("UserDefinedSQLObjectsLoaderFromDisk")}
-{
- createDirectory();
-}
-
-
-ASTPtr UserDefinedSQLObjectsLoaderFromDisk::tryLoadObject(UserDefinedSQLObjectType object_type, const String & object_name)
-{
- return tryLoadObject(object_type, object_name, getFilePath(object_type, object_name), /* check_file_exists= */ true);
-}
-
-
-ASTPtr UserDefinedSQLObjectsLoaderFromDisk::tryLoadObject(UserDefinedSQLObjectType object_type, const String & object_name, const String & path, bool check_file_exists)
-{
- LOG_DEBUG(log, "Loading user defined object {} from file {}", backQuote(object_name), path);
-
- try
- {
- if (check_file_exists && !fs::exists(path))
- return nullptr;
-
- /// There is .sql file with user defined object creation statement.
- ReadBufferFromFile in(path);
-
- String object_create_query;
- readStringUntilEOF(object_create_query, in);
-
- switch (object_type)
- {
- case UserDefinedSQLObjectType::Function:
- {
- ParserCreateFunctionQuery parser;
- ASTPtr ast = parseQuery(
- parser,
- object_create_query.data(),
- object_create_query.data() + object_create_query.size(),
- "",
- 0,
- global_context->getSettingsRef().max_parser_depth);
- UserDefinedSQLFunctionFactory::checkCanBeRegistered(global_context, object_name, *ast);
- return ast;
- }
- }
- }
- catch (...)
- {
- tryLogCurrentException(log, fmt::format("while loading user defined SQL object {} from path {}", backQuote(object_name), path));
- return nullptr; /// Failed to load this sql object, will ignore it
- }
-}
-
-
-void UserDefinedSQLObjectsLoaderFromDisk::loadObjects()
-{
- if (!objects_loaded)
- loadObjectsImpl();
-}
-
-
-void UserDefinedSQLObjectsLoaderFromDisk::reloadObjects()
-{
- loadObjectsImpl();
-}
-
-
-void UserDefinedSQLObjectsLoaderFromDisk::loadObjectsImpl()
-{
- LOG_INFO(log, "Loading user defined objects from {}", dir_path);
- createDirectory();
-
- std::vector<std::pair<String, ASTPtr>> function_names_and_queries;
-
- Poco::DirectoryIterator dir_end;
- for (Poco::DirectoryIterator it(dir_path); it != dir_end; ++it)
- {
- if (it->isDirectory())
- continue;
-
- const String & file_name = it.name();
- if (!startsWith(file_name, "function_") || !endsWith(file_name, ".sql"))
- continue;
-
- size_t prefix_length = strlen("function_");
- size_t suffix_length = strlen(".sql");
- String function_name = unescapeForFileName(file_name.substr(prefix_length, file_name.length() - prefix_length - suffix_length));
-
- if (function_name.empty())
- continue;
-
- ASTPtr ast = tryLoadObject(UserDefinedSQLObjectType::Function, function_name, dir_path + it.name(), /* check_file_exists= */ false);
- if (ast)
- function_names_and_queries.emplace_back(function_name, ast);
- }
-
- UserDefinedSQLFunctionFactory::instance().setAllFunctions(function_names_and_queries);
- objects_loaded = true;
-
- LOG_DEBUG(log, "User defined objects loaded");
-}
-
-
-void UserDefinedSQLObjectsLoaderFromDisk::reloadObject(UserDefinedSQLObjectType object_type, const String & object_name)
-{
- createDirectory();
- auto ast = tryLoadObject(object_type, object_name);
- auto & factory = UserDefinedSQLFunctionFactory::instance();
- if (ast)
- factory.setFunction(object_name, *ast);
- else
- factory.removeFunction(object_name);
-}
-
-
-void UserDefinedSQLObjectsLoaderFromDisk::createDirectory()
-{
- std::error_code create_dir_error_code;
- fs::create_directories(dir_path, create_dir_error_code);
- if (!fs::exists(dir_path) || !fs::is_directory(dir_path) || create_dir_error_code)
- throw Exception(ErrorCodes::DIRECTORY_DOESNT_EXIST, "Couldn't create directory {} reason: '{}'",
- dir_path, create_dir_error_code.message());
-}
-
-
-bool UserDefinedSQLObjectsLoaderFromDisk::storeObject(
- UserDefinedSQLObjectType object_type,
- const String & object_name,
- const IAST & create_object_query,
- bool throw_if_exists,
- bool replace_if_exists,
- const Settings & settings)
-{
- String file_path = getFilePath(object_type, object_name);
- LOG_DEBUG(log, "Storing user-defined object {} to file {}", backQuote(object_name), file_path);
-
- if (fs::exists(file_path))
- {
- if (throw_if_exists)
- throw Exception(ErrorCodes::FUNCTION_ALREADY_EXISTS, "User-defined function '{}' already exists", object_name);
- else if (!replace_if_exists)
- return false;
- }
-
- WriteBufferFromOwnString create_statement_buf;
- formatAST(create_object_query, create_statement_buf, false);
- writeChar('\n', create_statement_buf);
- String create_statement = create_statement_buf.str();
-
- String temp_file_path = file_path + ".tmp";
-
- try
- {
- WriteBufferFromFile out(temp_file_path, create_statement.size());
- writeString(create_statement, out);
- out.next();
- if (settings.fsync_metadata)
- out.sync();
- out.close();
-
- if (replace_if_exists)
- fs::rename(temp_file_path, file_path);
- else
- renameNoReplace(temp_file_path, file_path);
- }
- catch (...)
- {
- fs::remove(temp_file_path);
- throw;
- }
-
- LOG_TRACE(log, "Object {} stored", backQuote(object_name));
- return true;
-}
-
-
-bool UserDefinedSQLObjectsLoaderFromDisk::removeObject(
- UserDefinedSQLObjectType object_type, const String & object_name, bool throw_if_not_exists)
-{
- String file_path = getFilePath(object_type, object_name);
- LOG_DEBUG(log, "Removing user defined object {} stored in file {}", backQuote(object_name), file_path);
-
- bool existed = fs::remove(file_path);
-
- if (!existed)
- {
- if (throw_if_not_exists)
- throw Exception(ErrorCodes::UNKNOWN_FUNCTION, "User-defined function '{}' doesn't exist", object_name);
- else
- return false;
- }
-
- LOG_TRACE(log, "Object {} removed", backQuote(object_name));
- return true;
-}
-
-
-String UserDefinedSQLObjectsLoaderFromDisk::getFilePath(UserDefinedSQLObjectType object_type, const String & object_name) const
-{
- String file_path;
- switch (object_type)
- {
- case UserDefinedSQLObjectType::Function:
- {
- file_path = dir_path + "function_" + escapeForFileName(object_name) + ".sql";
- break;
- }
- }
- return file_path;
-}
-
-}
diff --git a/contrib/clickhouse/src/Functions/UserDefined/UserDefinedSQLObjectsLoaderFromZooKeeper.cpp b/contrib/clickhouse/src/Functions/UserDefined/UserDefinedSQLObjectsLoaderFromZooKeeper.cpp
index 29aff666da5..e69de29bb2d 100644
--- a/contrib/clickhouse/src/Functions/UserDefined/UserDefinedSQLObjectsLoaderFromZooKeeper.cpp
+++ b/contrib/clickhouse/src/Functions/UserDefined/UserDefinedSQLObjectsLoaderFromZooKeeper.cpp
@@ -1,433 +0,0 @@
-#include <Functions/UserDefined/UserDefinedSQLObjectsLoaderFromZooKeeper.h>
-
-#include <Functions/UserDefined/UserDefinedSQLFunctionFactory.h>
-#include <Functions/UserDefined/UserDefinedSQLObjectType.h>
-#include <Interpreters/Context.h>
-#include <Parsers/ParserCreateFunctionQuery.h>
-#include <Parsers/formatAST.h>
-#include <Parsers/parseQuery.h>
-#include <base/sleep.h>
-#include <Common/Exception.h>
-#include <Common/ZooKeeper/KeeperException.h>
-#include <Common/escapeForFileName.h>
-#include <Common/logger_useful.h>
-#include <Common/quoteString.h>
-#include <Common/scope_guard_safe.h>
-#include <Common/setThreadName.h>
-
-
-namespace DB
-{
-
-namespace ErrorCodes
-{
- extern const int FUNCTION_ALREADY_EXISTS;
- extern const int UNKNOWN_FUNCTION;
- extern const int BAD_ARGUMENTS;
-}
-
-namespace
-{
- std::string_view getNodePrefix(UserDefinedSQLObjectType object_type)
- {
- switch (object_type)
- {
- case UserDefinedSQLObjectType::Function:
- return "function_";
- }
- UNREACHABLE();
- }
-
- constexpr std::string_view sql_extension = ".sql";
-
- String getNodePath(const String & root_path, UserDefinedSQLObjectType object_type, const String & object_name)
- {
- return root_path + "/" + String{getNodePrefix(object_type)} + escapeForFileName(object_name) + String{sql_extension};
- }
-}
-
-
-UserDefinedSQLObjectsLoaderFromZooKeeper::UserDefinedSQLObjectsLoaderFromZooKeeper(
- const ContextPtr & global_context_, const String & zookeeper_path_)
- : global_context{global_context_}
- , zookeeper_getter{[global_context_]() { return global_context_->getZooKeeper(); }}
- , zookeeper_path{zookeeper_path_}
- , watch_queue{std::make_shared<ConcurrentBoundedQueue<std::pair<UserDefinedSQLObjectType, String>>>(std::numeric_limits<size_t>::max())}
- , log{&Poco::Logger::get("UserDefinedSQLObjectsLoaderFromZooKeeper")}
-{
- if (zookeeper_path.empty())
- throw Exception(ErrorCodes::BAD_ARGUMENTS, "ZooKeeper path must be non-empty");
-
- if (zookeeper_path.back() == '/')
- zookeeper_path.resize(zookeeper_path.size() - 1);
-
- /// If zookeeper chroot prefix is used, path should start with '/', because chroot concatenates without it.
- if (zookeeper_path.front() != '/')
- zookeeper_path = "/" + zookeeper_path;
-}
-
-UserDefinedSQLObjectsLoaderFromZooKeeper::~UserDefinedSQLObjectsLoaderFromZooKeeper()
-{
- SCOPE_EXIT_SAFE(stopWatchingThread());
-}
-
-void UserDefinedSQLObjectsLoaderFromZooKeeper::startWatchingThread()
-{
- if (!watching_flag.exchange(true))
- {
- watching_thread = ThreadFromGlobalPool(&UserDefinedSQLObjectsLoaderFromZooKeeper::processWatchQueue, this);
- }
-}
-
-void UserDefinedSQLObjectsLoaderFromZooKeeper::stopWatchingThread()
-{
- if (watching_flag.exchange(false))
- {
- watch_queue->finish();
- if (watching_thread.joinable())
- watching_thread.join();
- }
-}
-
-zkutil::ZooKeeperPtr UserDefinedSQLObjectsLoaderFromZooKeeper::getZooKeeper()
-{
- auto [zookeeper, session_status] = zookeeper_getter.getZooKeeper();
-
- if (session_status == zkutil::ZooKeeperCachingGetter::SessionStatus::New)
- {
- /// It's possible that we connected to different [Zoo]Keeper instance
- /// so we may read a bit stale state.
- zookeeper->sync(zookeeper_path);
-
- createRootNodes(zookeeper);
- refreshAllObjects(zookeeper);
- }
-
- return zookeeper;
-}
-
-void UserDefinedSQLObjectsLoaderFromZooKeeper::initZooKeeperIfNeeded()
-{
- getZooKeeper();
-}
-
-void UserDefinedSQLObjectsLoaderFromZooKeeper::resetAfterError()
-{
- zookeeper_getter.resetCache();
-}
-
-
-void UserDefinedSQLObjectsLoaderFromZooKeeper::loadObjects()
-{
- /// loadObjects() is called at start from Server::main(), so it's better not to stop here on no connection to ZooKeeper or any other error.
- /// However the watching thread must be started anyway in case the connection will be established later.
- if (!objects_loaded)
- {
- try
- {
- reloadObjects();
- }
- catch (...)
- {
- tryLogCurrentException(log, "Failed to load user-defined objects");
- }
- }
- startWatchingThread();
-}
-
-
-void UserDefinedSQLObjectsLoaderFromZooKeeper::processWatchQueue()
-{
- LOG_DEBUG(log, "Started watching thread");
- setThreadName("UserDefObjWatch");
-
- while (watching_flag)
- {
- try
- {
- UserDefinedSQLObjectTypeAndName watched_object;
-
- /// Re-initialize ZooKeeper session if expired and refresh objects
- initZooKeeperIfNeeded();
-
- if (!watch_queue->tryPop(watched_object, /* timeout_ms: */ 10000))
- continue;
-
- auto zookeeper = getZooKeeper();
- const auto & [object_type, object_name] = watched_object;
-
- if (object_name.empty())
- syncObjects(zookeeper, object_type);
- else
- refreshObject(zookeeper, object_type, object_name);
- }
- catch (...)
- {
- tryLogCurrentException(log, "Will try to restart watching thread after error");
- resetAfterError();
- sleepForSeconds(5);
- }
- }
-
- LOG_DEBUG(log, "Stopped watching thread");
-}
-
-
-void UserDefinedSQLObjectsLoaderFromZooKeeper::stopWatching()
-{
- stopWatchingThread();
-}
-
-
-void UserDefinedSQLObjectsLoaderFromZooKeeper::reloadObjects()
-{
- auto zookeeper = getZooKeeper();
- refreshAllObjects(zookeeper);
- startWatchingThread();
-}
-
-
-void UserDefinedSQLObjectsLoaderFromZooKeeper::reloadObject(UserDefinedSQLObjectType object_type, const String & object_name)
-{
- auto zookeeper = getZooKeeper();
- refreshObject(zookeeper, object_type, object_name);
-}
-
-
-void UserDefinedSQLObjectsLoaderFromZooKeeper::createRootNodes(const zkutil::ZooKeeperPtr & zookeeper)
-{
- zookeeper->createAncestors(zookeeper_path);
- zookeeper->createIfNotExists(zookeeper_path, "");
-}
-
-bool UserDefinedSQLObjectsLoaderFromZooKeeper::storeObject(
- UserDefinedSQLObjectType object_type,
- const String & object_name,
- const IAST & create_object_query,
- bool throw_if_exists,
- bool replace_if_exists,
- const Settings &)
-{
- String path = getNodePath(zookeeper_path, object_type, object_name);
- LOG_DEBUG(log, "Storing user-defined object {} at zk path {}", backQuote(object_name), path);
-
- WriteBufferFromOwnString create_statement_buf;
- formatAST(create_object_query, create_statement_buf, false);
- writeChar('\n', create_statement_buf);
- String create_statement = create_statement_buf.str();
-
- auto zookeeper = getZooKeeper();
-
- size_t num_attempts = 10;
- while (true)
- {
- auto code = zookeeper->tryCreate(path, create_statement, zkutil::CreateMode::Persistent);
- if ((code != Coordination::Error::ZOK) && (code != Coordination::Error::ZNODEEXISTS))
- throw zkutil::KeeperException::fromPath(code, path);
-
- if (code == Coordination::Error::ZNODEEXISTS)
- {
- if (throw_if_exists)
- throw Exception(ErrorCodes::FUNCTION_ALREADY_EXISTS, "User-defined function '{}' already exists", object_name);
- else if (!replace_if_exists)
- return false;
-
- code = zookeeper->trySet(path, create_statement);
- if ((code != Coordination::Error::ZOK) && (code != Coordination::Error::ZNONODE))
- throw zkutil::KeeperException::fromPath(code, path);
- }
-
- if (code == Coordination::Error::ZOK)
- break;
-
- if (!--num_attempts)
- throw zkutil::KeeperException::fromPath(code, path);
- }
- LOG_DEBUG(log, "Object {} stored", backQuote(object_name));
-
- /// Refresh object and set watch for it. Because it can be replaced by another node after creation.
- refreshObject(zookeeper, object_type, object_name);
-
- return true;
-}
-
-
-bool UserDefinedSQLObjectsLoaderFromZooKeeper::removeObject(
- UserDefinedSQLObjectType object_type, const String & object_name, bool throw_if_not_exists)
-{
- String path = getNodePath(zookeeper_path, object_type, object_name);
- LOG_DEBUG(log, "Removing user-defined object {} at zk path {}", backQuote(object_name), path);
-
- auto zookeeper = getZooKeeper();
-
- auto code = zookeeper->tryRemove(path);
- if ((code != Coordination::Error::ZOK) && (code != Coordination::Error::ZNONODE))
- throw zkutil::KeeperException::fromPath(code, path);
-
- if (code == Coordination::Error::ZNONODE)
- {
- if (throw_if_not_exists)
- throw Exception(ErrorCodes::UNKNOWN_FUNCTION, "User-defined object '{}' doesn't exist", object_name);
- else
- return false;
- }
-
- LOG_DEBUG(log, "Object {} removed", backQuote(object_name));
- return true;
-}
-
-bool UserDefinedSQLObjectsLoaderFromZooKeeper::getObjectDataAndSetWatch(
- const zkutil::ZooKeeperPtr & zookeeper,
- String & data,
- const String & path,
- UserDefinedSQLObjectType object_type,
- const String & object_name)
-{
- const auto object_watcher = [my_watch_queue = watch_queue, object_type, object_name](const Coordination::WatchResponse & response)
- {
- if (response.type == Coordination::Event::CHANGED)
- {
- [[maybe_unused]] bool inserted = my_watch_queue->emplace(object_type, object_name);
- /// `inserted` can be false if `watch_queue` was already finalized (which happens when stopWatching() is called).
- }
- /// Event::DELETED is processed as child event by getChildren watch
- };
-
- Coordination::Stat entity_stat;
- String object_create_query;
- return zookeeper->tryGetWatch(path, data, &entity_stat, object_watcher);
-}
-
-ASTPtr UserDefinedSQLObjectsLoaderFromZooKeeper::parseObjectData(const String & object_data, UserDefinedSQLObjectType object_type)
-{
- switch (object_type)
- {
- case UserDefinedSQLObjectType::Function: {
- ParserCreateFunctionQuery parser;
- ASTPtr ast = parseQuery(
- parser,
- object_data.data(),
- object_data.data() + object_data.size(),
- "",
- 0,
- global_context->getSettingsRef().max_parser_depth);
- return ast;
- }
- }
- UNREACHABLE();
-}
-
-ASTPtr UserDefinedSQLObjectsLoaderFromZooKeeper::tryLoadObject(
- const zkutil::ZooKeeperPtr & zookeeper, UserDefinedSQLObjectType object_type, const String & object_name)
-{
- String path = getNodePath(zookeeper_path, object_type, object_name);
- LOG_DEBUG(log, "Loading user defined object {} from zk path {}", backQuote(object_name), path);
-
- try
- {
- String object_data;
- bool exists = getObjectDataAndSetWatch(zookeeper, object_data, path, object_type, object_name);
-
- if (!exists)
- {
- LOG_INFO(log, "User-defined object '{}' can't be loaded from path {}, because it doesn't exist", backQuote(object_name), path);
- return nullptr;
- }
-
- return parseObjectData(object_data, object_type);
- }
- catch (...)
- {
- tryLogCurrentException(log, fmt::format("while loading user defined SQL object {}", backQuote(object_name)));
- return nullptr; /// Failed to load this sql object, will ignore it
- }
-}
-
-Strings UserDefinedSQLObjectsLoaderFromZooKeeper::getObjectNamesAndSetWatch(
- const zkutil::ZooKeeperPtr & zookeeper, UserDefinedSQLObjectType object_type)
-{
- auto object_list_watcher = [my_watch_queue = watch_queue, object_type](const Coordination::WatchResponse &)
- {
- [[maybe_unused]] bool inserted = my_watch_queue->emplace(object_type, "");
- /// `inserted` can be false if `watch_queue` was already finalized (which happens when stopWatching() is called).
- };
-
- Coordination::Stat stat;
- const auto node_names = zookeeper->getChildrenWatch(zookeeper_path, &stat, object_list_watcher);
- const auto prefix = getNodePrefix(object_type);
-
- Strings object_names;
- object_names.reserve(node_names.size());
- for (const auto & node_name : node_names)
- {
- if (node_name.starts_with(prefix) && node_name.ends_with(sql_extension))
- {
- String object_name = unescapeForFileName(node_name.substr(prefix.length(), node_name.length() - prefix.length() - sql_extension.length()));
- if (!object_name.empty())
- object_names.push_back(std::move(object_name));
- }
- }
-
- return object_names;
-}
-
-void UserDefinedSQLObjectsLoaderFromZooKeeper::refreshAllObjects(const zkutil::ZooKeeperPtr & zookeeper)
-{
- /// It doesn't make sense to keep the old watch events because we will reread everything in this function.
- watch_queue->clear();
-
- refreshObjects(zookeeper, UserDefinedSQLObjectType::Function);
- objects_loaded = true;
-}
-
-void UserDefinedSQLObjectsLoaderFromZooKeeper::refreshObjects(const zkutil::ZooKeeperPtr & zookeeper, UserDefinedSQLObjectType object_type)
-{
- LOG_DEBUG(log, "Refreshing all user-defined {} objects", object_type);
- Strings object_names = getObjectNamesAndSetWatch(zookeeper, object_type);
-
- /// Read & parse all SQL objects from ZooKeeper
- std::vector<std::pair<String, ASTPtr>> function_names_and_asts;
- for (const auto & function_name : object_names)
- {
- if (auto ast = tryLoadObject(zookeeper, UserDefinedSQLObjectType::Function, function_name))
- function_names_and_asts.emplace_back(function_name, ast);
- }
-
- UserDefinedSQLFunctionFactory::instance().setAllFunctions(function_names_and_asts);
-
- LOG_DEBUG(log, "All user-defined {} objects refreshed", object_type);
-}
-
-void UserDefinedSQLObjectsLoaderFromZooKeeper::syncObjects(const zkutil::ZooKeeperPtr & zookeeper, UserDefinedSQLObjectType object_type)
-{
- LOG_DEBUG(log, "Syncing user-defined {} objects", object_type);
- Strings object_names = getObjectNamesAndSetWatch(zookeeper, object_type);
-
- auto & factory = UserDefinedSQLFunctionFactory::instance();
- auto lock = factory.getLock();
-
- /// Remove stale objects
- factory.removeAllFunctionsExcept(object_names);
- /// Read & parse only new SQL objects from ZooKeeper
- for (const auto & function_name : object_names)
- {
- if (!UserDefinedSQLFunctionFactory::instance().has(function_name))
- refreshObject(zookeeper, UserDefinedSQLObjectType::Function, function_name);
- }
-
- LOG_DEBUG(log, "User-defined {} objects synced", object_type);
-}
-
-void UserDefinedSQLObjectsLoaderFromZooKeeper::refreshObject(
- const zkutil::ZooKeeperPtr & zookeeper, UserDefinedSQLObjectType object_type, const String & object_name)
-{
- auto ast = tryLoadObject(zookeeper, object_type, object_name);
- auto & factory = UserDefinedSQLFunctionFactory::instance();
-
- if (ast)
- factory.setFunction(object_name, *ast);
- else
- factory.removeFunction(object_name);
-}
-
-}
diff --git a/contrib/clickhouse/src/Functions/UserDefined/UserDefinedSQLObjectsStorageBase.cpp b/contrib/clickhouse/src/Functions/UserDefined/UserDefinedSQLObjectsStorageBase.cpp
new file mode 100644
index 00000000000..8d7a18d93bc
--- /dev/null
+++ b/contrib/clickhouse/src/Functions/UserDefined/UserDefinedSQLObjectsStorageBase.cpp
@@ -0,0 +1,190 @@
+#include "Functions/UserDefined/UserDefinedSQLObjectsStorageBase.h"
+
+#include <boost/container/flat_set.hpp>
+
+#include <Interpreters/FunctionNameNormalizer.h>
+#include <Parsers/ASTCreateFunctionQuery.h>
+
+namespace DB
+{
+
+namespace ErrorCodes
+{
+ extern const int FUNCTION_ALREADY_EXISTS;
+ extern const int UNKNOWN_FUNCTION;
+}
+
+namespace
+{
+
+ASTPtr normalizeCreateFunctionQuery(const IAST & create_function_query)
+{
+ auto ptr = create_function_query.clone();
+ auto & res = typeid_cast<ASTCreateFunctionQuery &>(*ptr);
+ res.if_not_exists = false;
+ res.or_replace = false;
+ FunctionNameNormalizer().visit(res.function_core.get());
+ return ptr;
+}
+
+}
+
+ASTPtr UserDefinedSQLObjectsStorageBase::get(const String & object_name) const
+{
+ std::lock_guard lock(mutex);
+
+ auto it = object_name_to_create_object_map.find(object_name);
+ if (it == object_name_to_create_object_map.end())
+ throw Exception(ErrorCodes::UNKNOWN_FUNCTION,
+ "The object name '{}' is not saved",
+ object_name);
+
+ return it->second;
+}
+
+ASTPtr UserDefinedSQLObjectsStorageBase::tryGet(const std::string & object_name) const
+{
+ std::lock_guard lock(mutex);
+
+ auto it = object_name_to_create_object_map.find(object_name);
+ if (it == object_name_to_create_object_map.end())
+ return nullptr;
+
+ return it->second;
+}
+
+bool UserDefinedSQLObjectsStorageBase::has(const String & object_name) const
+{
+ return tryGet(object_name) != nullptr;
+}
+
+std::vector<std::string> UserDefinedSQLObjectsStorageBase::getAllObjectNames() const
+{
+ std::vector<std::string> object_names;
+
+ std::lock_guard lock(mutex);
+ object_names.reserve(object_name_to_create_object_map.size());
+
+ for (const auto & [name, _] : object_name_to_create_object_map)
+ object_names.emplace_back(name);
+
+ return object_names;
+}
+
+bool UserDefinedSQLObjectsStorageBase::empty() const
+{
+ std::lock_guard lock(mutex);
+ return object_name_to_create_object_map.empty();
+}
+
+bool UserDefinedSQLObjectsStorageBase::storeObject(
+ const ContextPtr & current_context,
+ UserDefinedSQLObjectType object_type,
+ const String & object_name,
+ ASTPtr create_object_query,
+ bool throw_if_exists,
+ bool replace_if_exists,
+ const Settings & settings)
+{
+ std::lock_guard lock{mutex};
+ auto it = object_name_to_create_object_map.find(object_name);
+ if (it != object_name_to_create_object_map.end())
+ {
+ if (throw_if_exists)
+ throw Exception(ErrorCodes::FUNCTION_ALREADY_EXISTS, "User-defined object '{}' already exists", object_name);
+ else if (!replace_if_exists)
+ return false;
+ }
+
+ bool stored = storeObjectImpl(
+ current_context,
+ object_type,
+ object_name,
+ create_object_query,
+ throw_if_exists,
+ replace_if_exists,
+ settings);
+
+ if (stored)
+ object_name_to_create_object_map[object_name] = create_object_query;
+
+ return stored;
+}
+
+bool UserDefinedSQLObjectsStorageBase::removeObject(
+ const ContextPtr & current_context,
+ UserDefinedSQLObjectType object_type,
+ const String & object_name,
+ bool throw_if_not_exists)
+{
+ std::lock_guard lock(mutex);
+ auto it = object_name_to_create_object_map.find(object_name);
+ if (it == object_name_to_create_object_map.end())
+ {
+ if (throw_if_not_exists)
+ throw Exception(ErrorCodes::UNKNOWN_FUNCTION, "User-defined object '{}' doesn't exist", object_name);
+ else
+ return false;
+ }
+
+ bool removed = removeObjectImpl(
+ current_context,
+ object_type,
+ object_name,
+ throw_if_not_exists);
+
+ if (removed)
+ object_name_to_create_object_map.erase(object_name);
+
+ return removed;
+}
+
+std::unique_lock<std::recursive_mutex> UserDefinedSQLObjectsStorageBase::getLock() const
+{
+ return std::unique_lock{mutex};
+}
+
+void UserDefinedSQLObjectsStorageBase::setAllObjects(const std::vector<std::pair<String, ASTPtr>> & new_objects)
+{
+ std::unordered_map<String, ASTPtr> normalized_functions;
+ for (const auto & [function_name, create_query] : new_objects)
+ normalized_functions[function_name] = normalizeCreateFunctionQuery(*create_query);
+
+ std::lock_guard lock(mutex);
+ object_name_to_create_object_map = std::move(normalized_functions);
+}
+
+std::vector<std::pair<String, ASTPtr>> UserDefinedSQLObjectsStorageBase::getAllObjects() const
+{
+ std::lock_guard lock{mutex};
+ std::vector<std::pair<String, ASTPtr>> all_objects;
+ all_objects.reserve(object_name_to_create_object_map.size());
+ std::copy(object_name_to_create_object_map.begin(), object_name_to_create_object_map.end(), std::back_inserter(all_objects));
+ return all_objects;
+}
+
+void UserDefinedSQLObjectsStorageBase::setObject(const String & object_name, const IAST & create_object_query)
+{
+ std::lock_guard lock(mutex);
+ object_name_to_create_object_map[object_name] = normalizeCreateFunctionQuery(create_object_query);
+}
+
+void UserDefinedSQLObjectsStorageBase::removeObject(const String & object_name)
+{
+ std::lock_guard lock(mutex);
+ object_name_to_create_object_map.erase(object_name);
+}
+
+void UserDefinedSQLObjectsStorageBase::removeAllObjectsExcept(const Strings & object_names_to_keep)
+{
+ boost::container::flat_set<std::string_view> names_set_to_keep{object_names_to_keep.begin(), object_names_to_keep.end()};
+ std::lock_guard lock(mutex);
+ for (auto it = object_name_to_create_object_map.begin(); it != object_name_to_create_object_map.end();)
+ {
+ auto current = it++;
+ if (!names_set_to_keep.contains(current->first))
+ object_name_to_create_object_map.erase(current);
+ }
+}
+
+}
diff --git a/contrib/clickhouse/src/Functions/UserDefined/UserDefinedSQLObjectsStorageBase.h b/contrib/clickhouse/src/Functions/UserDefined/UserDefinedSQLObjectsStorageBase.h
new file mode 100644
index 00000000000..cab63a3bfcf
--- /dev/null
+++ b/contrib/clickhouse/src/Functions/UserDefined/UserDefinedSQLObjectsStorageBase.h
@@ -0,0 +1,69 @@
+#pragma once
+
+#include <unordered_map>
+#include <mutex>
+
+#include <Functions/UserDefined/IUserDefinedSQLObjectsStorage.h>
+
+#include <Parsers/IAST.h>
+
+namespace DB
+{
+
+class UserDefinedSQLObjectsStorageBase : public IUserDefinedSQLObjectsStorage
+{
+public:
+ ASTPtr get(const String & object_name) const override;
+
+ ASTPtr tryGet(const String & object_name) const override;
+
+ bool has(const String & object_name) const override;
+
+ std::vector<String> getAllObjectNames() const override;
+
+ std::vector<std::pair<String, ASTPtr>> getAllObjects() const override;
+
+ bool empty() const override;
+
+ bool storeObject(
+ const ContextPtr & current_context,
+ UserDefinedSQLObjectType object_type,
+ const String & object_name,
+ ASTPtr create_object_query,
+ bool throw_if_exists,
+ bool replace_if_exists,
+ const Settings & settings) override;
+
+ bool removeObject(
+ const ContextPtr & current_context,
+ UserDefinedSQLObjectType object_type,
+ const String & object_name,
+ bool throw_if_not_exists) override;
+
+protected:
+ virtual bool storeObjectImpl(
+ const ContextPtr & current_context,
+ UserDefinedSQLObjectType object_type,
+ const String & object_name,
+ ASTPtr create_object_query,
+ bool throw_if_exists,
+ bool replace_if_exists,
+ const Settings & settings) = 0;
+
+ virtual bool removeObjectImpl(
+ const ContextPtr & current_context,
+ UserDefinedSQLObjectType object_type,
+ const String & object_name,
+ bool throw_if_not_exists) = 0;
+
+ std::unique_lock<std::recursive_mutex> getLock() const;
+ void setAllObjects(const std::vector<std::pair<String, ASTPtr>> & new_objects);
+ void setObject(const String & object_name, const IAST & create_object_query);
+ void removeObject(const String & object_name);
+ void removeAllObjectsExcept(const Strings & object_names_to_keep);
+
+ std::unordered_map<String, ASTPtr> object_name_to_create_object_map;
+ mutable std::recursive_mutex mutex;
+};
+
+}
diff --git a/contrib/clickhouse/src/Functions/UserDefined/UserDefinedSQLObjectsZooKeeperStorage.cpp b/contrib/clickhouse/src/Functions/UserDefined/UserDefinedSQLObjectsZooKeeperStorage.cpp
new file mode 100644
index 00000000000..6e5a5338437
--- /dev/null
+++ b/contrib/clickhouse/src/Functions/UserDefined/UserDefinedSQLObjectsZooKeeperStorage.cpp
@@ -0,0 +1,435 @@
+#include <Functions/UserDefined/UserDefinedSQLObjectsZooKeeperStorage.h>
+
+#include <Functions/UserDefined/UserDefinedSQLFunctionFactory.h>
+#include <Functions/UserDefined/UserDefinedSQLObjectType.h>
+#include <Interpreters/Context.h>
+#include <Parsers/ParserCreateFunctionQuery.h>
+#include <Parsers/formatAST.h>
+#include <Parsers/parseQuery.h>
+#include <base/sleep.h>
+#include <Common/Exception.h>
+#include <Common/ZooKeeper/KeeperException.h>
+#include <Common/escapeForFileName.h>
+#include <Common/logger_useful.h>
+#include <Common/quoteString.h>
+#include <Common/scope_guard_safe.h>
+#include <Common/setThreadName.h>
+
+
+namespace DB
+{
+
+namespace ErrorCodes
+{
+ extern const int FUNCTION_ALREADY_EXISTS;
+ extern const int UNKNOWN_FUNCTION;
+ extern const int BAD_ARGUMENTS;
+}
+
+namespace
+{
+ std::string_view getNodePrefix(UserDefinedSQLObjectType object_type)
+ {
+ switch (object_type)
+ {
+ case UserDefinedSQLObjectType::Function:
+ return "function_";
+ }
+ UNREACHABLE();
+ }
+
+ constexpr std::string_view sql_extension = ".sql";
+
+ String getNodePath(const String & root_path, UserDefinedSQLObjectType object_type, const String & object_name)
+ {
+ return root_path + "/" + String{getNodePrefix(object_type)} + escapeForFileName(object_name) + String{sql_extension};
+ }
+}
+
+
+UserDefinedSQLObjectsZooKeeperStorage::UserDefinedSQLObjectsZooKeeperStorage(
+ const ContextPtr & global_context_, const String & zookeeper_path_)
+ : global_context{global_context_}
+ , zookeeper_getter{[global_context_]() { return global_context_->getZooKeeper(); }}
+ , zookeeper_path{zookeeper_path_}
+ , watch_queue{std::make_shared<ConcurrentBoundedQueue<std::pair<UserDefinedSQLObjectType, String>>>(std::numeric_limits<size_t>::max())}
+ , log{&Poco::Logger::get("UserDefinedSQLObjectsLoaderFromZooKeeper")}
+{
+ if (zookeeper_path.empty())
+ throw Exception(ErrorCodes::BAD_ARGUMENTS, "ZooKeeper path must be non-empty");
+
+ if (zookeeper_path.back() == '/')
+ zookeeper_path.resize(zookeeper_path.size() - 1);
+
+ /// If zookeeper chroot prefix is used, path should start with '/', because chroot concatenates without it.
+ if (zookeeper_path.front() != '/')
+ zookeeper_path = "/" + zookeeper_path;
+}
+
+UserDefinedSQLObjectsZooKeeperStorage::~UserDefinedSQLObjectsZooKeeperStorage()
+{
+ SCOPE_EXIT_SAFE(stopWatchingThread());
+}
+
+void UserDefinedSQLObjectsZooKeeperStorage::startWatchingThread()
+{
+ if (!watching_flag.exchange(true))
+ {
+ watching_thread = ThreadFromGlobalPool(&UserDefinedSQLObjectsZooKeeperStorage::processWatchQueue, this);
+ }
+}
+
+void UserDefinedSQLObjectsZooKeeperStorage::stopWatchingThread()
+{
+ if (watching_flag.exchange(false))
+ {
+ watch_queue->finish();
+ if (watching_thread.joinable())
+ watching_thread.join();
+ }
+}
+
+zkutil::ZooKeeperPtr UserDefinedSQLObjectsZooKeeperStorage::getZooKeeper()
+{
+ auto [zookeeper, session_status] = zookeeper_getter.getZooKeeper();
+
+ if (session_status == zkutil::ZooKeeperCachingGetter::SessionStatus::New)
+ {
+ /// It's possible that we connected to different [Zoo]Keeper instance
+ /// so we may read a bit stale state.
+ zookeeper->sync(zookeeper_path);
+
+ createRootNodes(zookeeper);
+ refreshAllObjects(zookeeper);
+ }
+
+ return zookeeper;
+}
+
+void UserDefinedSQLObjectsZooKeeperStorage::initZooKeeperIfNeeded()
+{
+ getZooKeeper();
+}
+
+void UserDefinedSQLObjectsZooKeeperStorage::resetAfterError()
+{
+ zookeeper_getter.resetCache();
+}
+
+
+void UserDefinedSQLObjectsZooKeeperStorage::loadObjects()
+{
+ /// loadObjects() is called at start from Server::main(), so it's better not to stop here on no connection to ZooKeeper or any other error.
+ /// However the watching thread must be started anyway in case the connection will be established later.
+ if (!objects_loaded)
+ {
+ try
+ {
+ reloadObjects();
+ }
+ catch (...)
+ {
+ tryLogCurrentException(log, "Failed to load user-defined objects");
+ }
+ }
+ startWatchingThread();
+}
+
+
+void UserDefinedSQLObjectsZooKeeperStorage::processWatchQueue()
+{
+ LOG_DEBUG(log, "Started watching thread");
+ setThreadName("UserDefObjWatch");
+
+ while (watching_flag)
+ {
+ try
+ {
+ UserDefinedSQLObjectTypeAndName watched_object;
+
+ /// Re-initialize ZooKeeper session if expired and refresh objects
+ initZooKeeperIfNeeded();
+
+ if (!watch_queue->tryPop(watched_object, /* timeout_ms: */ 10000))
+ continue;
+
+ auto zookeeper = getZooKeeper();
+ const auto & [object_type, object_name] = watched_object;
+
+ if (object_name.empty())
+ syncObjects(zookeeper, object_type);
+ else
+ refreshObject(zookeeper, object_type, object_name);
+ }
+ catch (...)
+ {
+ tryLogCurrentException(log, "Will try to restart watching thread after error");
+ resetAfterError();
+ sleepForSeconds(5);
+ }
+ }
+
+ LOG_DEBUG(log, "Stopped watching thread");
+}
+
+
+void UserDefinedSQLObjectsZooKeeperStorage::stopWatching()
+{
+ stopWatchingThread();
+}
+
+
+void UserDefinedSQLObjectsZooKeeperStorage::reloadObjects()
+{
+ auto zookeeper = getZooKeeper();
+ refreshAllObjects(zookeeper);
+ startWatchingThread();
+}
+
+
+void UserDefinedSQLObjectsZooKeeperStorage::reloadObject(UserDefinedSQLObjectType object_type, const String & object_name)
+{
+ auto zookeeper = getZooKeeper();
+ refreshObject(zookeeper, object_type, object_name);
+}
+
+
+void UserDefinedSQLObjectsZooKeeperStorage::createRootNodes(const zkutil::ZooKeeperPtr & zookeeper)
+{
+ zookeeper->createAncestors(zookeeper_path);
+ zookeeper->createIfNotExists(zookeeper_path, "");
+}
+
+bool UserDefinedSQLObjectsZooKeeperStorage::storeObjectImpl(
+ const ContextPtr & /*current_context*/,
+ UserDefinedSQLObjectType object_type,
+ const String & object_name,
+ ASTPtr create_object_query,
+ bool throw_if_exists,
+ bool replace_if_exists,
+ const Settings &)
+{
+ String path = getNodePath(zookeeper_path, object_type, object_name);
+ LOG_DEBUG(log, "Storing user-defined object {} at zk path {}", backQuote(object_name), path);
+
+ WriteBufferFromOwnString create_statement_buf;
+ formatAST(*create_object_query, create_statement_buf, false);
+ writeChar('\n', create_statement_buf);
+ String create_statement = create_statement_buf.str();
+
+ auto zookeeper = getZooKeeper();
+
+ size_t num_attempts = 10;
+ while (true)
+ {
+ auto code = zookeeper->tryCreate(path, create_statement, zkutil::CreateMode::Persistent);
+ if ((code != Coordination::Error::ZOK) && (code != Coordination::Error::ZNODEEXISTS))
+ throw zkutil::KeeperException::fromPath(code, path);
+
+ if (code == Coordination::Error::ZNODEEXISTS)
+ {
+ if (throw_if_exists)
+ throw Exception(ErrorCodes::FUNCTION_ALREADY_EXISTS, "User-defined function '{}' already exists", object_name);
+ else if (!replace_if_exists)
+ return false;
+
+ code = zookeeper->trySet(path, create_statement);
+ if ((code != Coordination::Error::ZOK) && (code != Coordination::Error::ZNONODE))
+ throw zkutil::KeeperException::fromPath(code, path);
+ }
+
+ if (code == Coordination::Error::ZOK)
+ break;
+
+ if (!--num_attempts)
+ throw zkutil::KeeperException::fromPath(code, path);
+ }
+ LOG_DEBUG(log, "Object {} stored", backQuote(object_name));
+
+ /// Refresh object and set watch for it. Because it can be replaced by another node after creation.
+ refreshObject(zookeeper, object_type, object_name);
+
+ return true;
+}
+
+
+bool UserDefinedSQLObjectsZooKeeperStorage::removeObjectImpl(
+ const ContextPtr & /*current_context*/,
+ UserDefinedSQLObjectType object_type,
+ const String & object_name,
+ bool throw_if_not_exists)
+{
+ String path = getNodePath(zookeeper_path, object_type, object_name);
+ LOG_DEBUG(log, "Removing user-defined object {} at zk path {}", backQuote(object_name), path);
+
+ auto zookeeper = getZooKeeper();
+
+ auto code = zookeeper->tryRemove(path);
+ if ((code != Coordination::Error::ZOK) && (code != Coordination::Error::ZNONODE))
+ throw zkutil::KeeperException::fromPath(code, path);
+
+ if (code == Coordination::Error::ZNONODE)
+ {
+ if (throw_if_not_exists)
+ throw Exception(ErrorCodes::UNKNOWN_FUNCTION, "User-defined object '{}' doesn't exist", object_name);
+ else
+ return false;
+ }
+
+ LOG_DEBUG(log, "Object {} removed", backQuote(object_name));
+ return true;
+}
+
+bool UserDefinedSQLObjectsZooKeeperStorage::getObjectDataAndSetWatch(
+ const zkutil::ZooKeeperPtr & zookeeper,
+ String & data,
+ const String & path,
+ UserDefinedSQLObjectType object_type,
+ const String & object_name)
+{
+ const auto object_watcher = [my_watch_queue = watch_queue, object_type, object_name](const Coordination::WatchResponse & response)
+ {
+ if (response.type == Coordination::Event::CHANGED)
+ {
+ [[maybe_unused]] bool inserted = my_watch_queue->emplace(object_type, object_name);
+ /// `inserted` can be false if `watch_queue` was already finalized (which happens when stopWatching() is called).
+ }
+ /// Event::DELETED is processed as child event by getChildren watch
+ };
+
+ Coordination::Stat entity_stat;
+ String object_create_query;
+ return zookeeper->tryGetWatch(path, data, &entity_stat, object_watcher);
+}
+
+ASTPtr UserDefinedSQLObjectsZooKeeperStorage::parseObjectData(const String & object_data, UserDefinedSQLObjectType object_type)
+{
+ switch (object_type)
+ {
+ case UserDefinedSQLObjectType::Function: {
+ ParserCreateFunctionQuery parser;
+ ASTPtr ast = parseQuery(
+ parser,
+ object_data.data(),
+ object_data.data() + object_data.size(),
+ "",
+ 0,
+ global_context->getSettingsRef().max_parser_depth);
+ return ast;
+ }
+ }
+ UNREACHABLE();
+}
+
+ASTPtr UserDefinedSQLObjectsZooKeeperStorage::tryLoadObject(
+ const zkutil::ZooKeeperPtr & zookeeper, UserDefinedSQLObjectType object_type, const String & object_name)
+{
+ String path = getNodePath(zookeeper_path, object_type, object_name);
+ LOG_DEBUG(log, "Loading user defined object {} from zk path {}", backQuote(object_name), path);
+
+ try
+ {
+ String object_data;
+ bool exists = getObjectDataAndSetWatch(zookeeper, object_data, path, object_type, object_name);
+
+ if (!exists)
+ {
+ LOG_INFO(log, "User-defined object '{}' can't be loaded from path {}, because it doesn't exist", backQuote(object_name), path);
+ return nullptr;
+ }
+
+ return parseObjectData(object_data, object_type);
+ }
+ catch (...)
+ {
+ tryLogCurrentException(log, fmt::format("while loading user defined SQL object {}", backQuote(object_name)));
+ return nullptr; /// Failed to load this sql object, will ignore it
+ }
+}
+
+Strings UserDefinedSQLObjectsZooKeeperStorage::getObjectNamesAndSetWatch(
+ const zkutil::ZooKeeperPtr & zookeeper, UserDefinedSQLObjectType object_type)
+{
+ auto object_list_watcher = [my_watch_queue = watch_queue, object_type](const Coordination::WatchResponse &)
+ {
+ [[maybe_unused]] bool inserted = my_watch_queue->emplace(object_type, "");
+ /// `inserted` can be false if `watch_queue` was already finalized (which happens when stopWatching() is called).
+ };
+
+ Coordination::Stat stat;
+ const auto node_names = zookeeper->getChildrenWatch(zookeeper_path, &stat, object_list_watcher);
+ const auto prefix = getNodePrefix(object_type);
+
+ Strings object_names;
+ object_names.reserve(node_names.size());
+ for (const auto & node_name : node_names)
+ {
+ if (node_name.starts_with(prefix) && node_name.ends_with(sql_extension))
+ {
+ String object_name = unescapeForFileName(node_name.substr(prefix.length(), node_name.length() - prefix.length() - sql_extension.length()));
+ if (!object_name.empty())
+ object_names.push_back(std::move(object_name));
+ }
+ }
+
+ return object_names;
+}
+
+void UserDefinedSQLObjectsZooKeeperStorage::refreshAllObjects(const zkutil::ZooKeeperPtr & zookeeper)
+{
+ /// It doesn't make sense to keep the old watch events because we will reread everything in this function.
+ watch_queue->clear();
+
+ refreshObjects(zookeeper, UserDefinedSQLObjectType::Function);
+ objects_loaded = true;
+}
+
+void UserDefinedSQLObjectsZooKeeperStorage::refreshObjects(const zkutil::ZooKeeperPtr & zookeeper, UserDefinedSQLObjectType object_type)
+{
+ LOG_DEBUG(log, "Refreshing all user-defined {} objects", object_type);
+ Strings object_names = getObjectNamesAndSetWatch(zookeeper, object_type);
+
+ /// Read & parse all SQL objects from ZooKeeper
+ std::vector<std::pair<String, ASTPtr>> function_names_and_asts;
+ for (const auto & function_name : object_names)
+ {
+ if (auto ast = tryLoadObject(zookeeper, UserDefinedSQLObjectType::Function, function_name))
+ function_names_and_asts.emplace_back(function_name, ast);
+ }
+
+ setAllObjects(function_names_and_asts);
+
+ LOG_DEBUG(log, "All user-defined {} objects refreshed", object_type);
+}
+
+void UserDefinedSQLObjectsZooKeeperStorage::syncObjects(const zkutil::ZooKeeperPtr & zookeeper, UserDefinedSQLObjectType object_type)
+{
+ LOG_DEBUG(log, "Syncing user-defined {} objects", object_type);
+ Strings object_names = getObjectNamesAndSetWatch(zookeeper, object_type);
+
+ getLock();
+
+ /// Remove stale objects
+ removeAllObjectsExcept(object_names);
+ /// Read & parse only new SQL objects from ZooKeeper
+ for (const auto & function_name : object_names)
+ {
+ if (!UserDefinedSQLFunctionFactory::instance().has(function_name))
+ refreshObject(zookeeper, UserDefinedSQLObjectType::Function, function_name);
+ }
+
+ LOG_DEBUG(log, "User-defined {} objects synced", object_type);
+}
+
+void UserDefinedSQLObjectsZooKeeperStorage::refreshObject(
+ const zkutil::ZooKeeperPtr & zookeeper, UserDefinedSQLObjectType object_type, const String & object_name)
+{
+ auto ast = tryLoadObject(zookeeper, object_type, object_name);
+
+ if (ast)
+ setObject(object_name, *ast);
+ else
+ removeObject(object_name);
+}
+
+}
diff --git a/contrib/clickhouse/src/Functions/UserDefined/UserDefinedSQLObjectsLoaderFromZooKeeper.h b/contrib/clickhouse/src/Functions/UserDefined/UserDefinedSQLObjectsZooKeeperStorage.h
index 38e061fd4d9..9f41763c59c 100644
--- a/contrib/clickhouse/src/Functions/UserDefined/UserDefinedSQLObjectsLoaderFromZooKeeper.h
+++ b/contrib/clickhouse/src/Functions/UserDefined/UserDefinedSQLObjectsZooKeeperStorage.h
@@ -1,6 +1,6 @@
#pragma once
-#include <Functions/UserDefined/IUserDefinedSQLObjectsLoader.h>
+#include <Functions/UserDefined/UserDefinedSQLObjectsStorageBase.h>
#include <Interpreters/Context_fwd.h>
#include <Parsers/IAST_fwd.h>
#include <Common/ConcurrentBoundedQueue.h>
@@ -12,11 +12,11 @@ namespace DB
{
/// Loads user-defined sql objects from ZooKeeper.
-class UserDefinedSQLObjectsLoaderFromZooKeeper : public IUserDefinedSQLObjectsLoader
+class UserDefinedSQLObjectsZooKeeperStorage : public UserDefinedSQLObjectsStorageBase
{
public:
- UserDefinedSQLObjectsLoaderFromZooKeeper(const ContextPtr & global_context_, const String & zookeeper_path_);
- ~UserDefinedSQLObjectsLoaderFromZooKeeper() override;
+ UserDefinedSQLObjectsZooKeeperStorage(const ContextPtr & global_context_, const String & zookeeper_path_);
+ ~UserDefinedSQLObjectsZooKeeperStorage() override;
bool isReplicated() const override { return true; }
String getReplicationID() const override { return zookeeper_path; }
@@ -26,16 +26,21 @@ public:
void reloadObjects() override;
void reloadObject(UserDefinedSQLObjectType object_type, const String & object_name) override;
- bool storeObject(
+private:
+ bool storeObjectImpl(
+ const ContextPtr & current_context,
UserDefinedSQLObjectType object_type,
const String & object_name,
- const IAST & create_object_query,
+ ASTPtr create_object_query,
bool throw_if_exists,
bool replace_if_exists,
const Settings & settings) override;
- bool removeObject(UserDefinedSQLObjectType object_type, const String & object_name, bool throw_if_not_exists) override;
+ bool removeObjectImpl(
+ const ContextPtr & current_context,
+ UserDefinedSQLObjectType object_type,
+ const String & object_name,
+ bool throw_if_not_exists) override;
-private:
void processWatchQueue();
zkutil::ZooKeeperPtr getZooKeeper();
diff --git a/contrib/clickhouse/src/Functions/UserDefined/createUserDefinedSQLObjectsLoader.cpp b/contrib/clickhouse/src/Functions/UserDefined/createUserDefinedSQLObjectsLoader.cpp
index b7ebc7abf14..e69de29bb2d 100644
--- a/contrib/clickhouse/src/Functions/UserDefined/createUserDefinedSQLObjectsLoader.cpp
+++ b/contrib/clickhouse/src/Functions/UserDefined/createUserDefinedSQLObjectsLoader.cpp
@@ -1,44 +0,0 @@
-#include <Functions/UserDefined/createUserDefinedSQLObjectsLoader.h>
-#include <Functions/UserDefined/UserDefinedSQLObjectsLoaderFromDisk.h>
-#include <Functions/UserDefined/UserDefinedSQLObjectsLoaderFromZooKeeper.h>
-#include <Interpreters/Context.h>
-#include <Poco/Util/AbstractConfiguration.h>
-#include <filesystem>
-
-namespace fs = std::filesystem;
-
-
-namespace DB
-{
-
-
-namespace ErrorCodes
-{
- extern const int INVALID_CONFIG_PARAMETER;
-}
-
-std::unique_ptr<IUserDefinedSQLObjectsLoader> createUserDefinedSQLObjectsLoader(const ContextMutablePtr & global_context)
-{
- const String zookeeper_path_key = "user_defined_zookeeper_path";
- const String disk_path_key = "user_defined_path";
-
- const auto & config = global_context->getConfigRef();
- if (config.has(zookeeper_path_key))
- {
- if (config.has(disk_path_key))
- {
- throw Exception(
- ErrorCodes::INVALID_CONFIG_PARAMETER,
- "'{}' and '{}' must not be both specified in the config",
- zookeeper_path_key,
- disk_path_key);
- }
- return std::make_unique<UserDefinedSQLObjectsLoaderFromZooKeeper>(global_context, config.getString(zookeeper_path_key));
- }
-
- String default_path = fs::path{global_context->getPath()} / "user_defined/";
- String path = config.getString(disk_path_key, default_path);
- return std::make_unique<UserDefinedSQLObjectsLoaderFromDisk>(global_context, path);
-}
-
-}
diff --git a/contrib/clickhouse/src/Functions/UserDefined/createUserDefinedSQLObjectsLoader.h b/contrib/clickhouse/src/Functions/UserDefined/createUserDefinedSQLObjectsLoader.h
deleted file mode 100644
index b3a4623dba3..00000000000
--- a/contrib/clickhouse/src/Functions/UserDefined/createUserDefinedSQLObjectsLoader.h
+++ /dev/null
@@ -1,12 +0,0 @@
-#pragma once
-
-#include <Interpreters/Context_fwd.h>
-
-
-namespace DB
-{
-class IUserDefinedSQLObjectsLoader;
-
-std::unique_ptr<IUserDefinedSQLObjectsLoader> createUserDefinedSQLObjectsLoader(const ContextMutablePtr & global_context);
-
-}
diff --git a/contrib/clickhouse/src/Functions/UserDefined/createUserDefinedSQLObjectsStorage.cpp b/contrib/clickhouse/src/Functions/UserDefined/createUserDefinedSQLObjectsStorage.cpp
new file mode 100644
index 00000000000..f8847024508
--- /dev/null
+++ b/contrib/clickhouse/src/Functions/UserDefined/createUserDefinedSQLObjectsStorage.cpp
@@ -0,0 +1,44 @@
+#include <Functions/UserDefined/createUserDefinedSQLObjectsStorage.h>
+#include <Functions/UserDefined/UserDefinedSQLObjectsDiskStorage.h>
+#include <Functions/UserDefined/UserDefinedSQLObjectsZooKeeperStorage.h>
+#include <Interpreters/Context.h>
+#include <Poco/Util/AbstractConfiguration.h>
+#include <filesystem>
+
+namespace fs = std::filesystem;
+
+
+namespace DB
+{
+
+
+namespace ErrorCodes
+{
+ extern const int INVALID_CONFIG_PARAMETER;
+}
+
+std::unique_ptr<IUserDefinedSQLObjectsStorage> createUserDefinedSQLObjectsStorage(const ContextMutablePtr & global_context)
+{
+ const String zookeeper_path_key = "user_defined_zookeeper_path";
+ const String disk_path_key = "user_defined_path";
+
+ const auto & config = global_context->getConfigRef();
+ if (config.has(zookeeper_path_key))
+ {
+ if (config.has(disk_path_key))
+ {
+ throw Exception(
+ ErrorCodes::INVALID_CONFIG_PARAMETER,
+ "'{}' and '{}' must not be both specified in the config",
+ zookeeper_path_key,
+ disk_path_key);
+ }
+ return std::make_unique<UserDefinedSQLObjectsZooKeeperStorage>(global_context, config.getString(zookeeper_path_key));
+ }
+
+ String default_path = fs::path{global_context->getPath()} / "user_defined/";
+ String path = config.getString(disk_path_key, default_path);
+ return std::make_unique<UserDefinedSQLObjectsDiskStorage>(global_context, path);
+}
+
+}
diff --git a/contrib/clickhouse/src/Functions/UserDefined/createUserDefinedSQLObjectsStorage.h b/contrib/clickhouse/src/Functions/UserDefined/createUserDefinedSQLObjectsStorage.h
new file mode 100644
index 00000000000..01659372dec
--- /dev/null
+++ b/contrib/clickhouse/src/Functions/UserDefined/createUserDefinedSQLObjectsStorage.h
@@ -0,0 +1,12 @@
+#pragma once
+
+#include <Interpreters/Context_fwd.h>
+
+
+namespace DB
+{
+class IUserDefinedSQLObjectsStorage;
+
+std::unique_ptr<IUserDefinedSQLObjectsStorage> createUserDefinedSQLObjectsStorage(const ContextMutablePtr & global_context);
+
+}
diff --git a/contrib/clickhouse/src/Interpreters/Context.cpp b/contrib/clickhouse/src/Interpreters/Context.cpp
index 63f59975749..93fe12431ff 100644
--- a/contrib/clickhouse/src/Interpreters/Context.cpp
+++ b/contrib/clickhouse/src/Interpreters/Context.cpp
@@ -58,8 +58,8 @@
#include <Interpreters/EmbeddedDictionaries.h>
#include <Interpreters/ExternalDictionariesLoader.h>
#include <Functions/UserDefined/ExternalUserDefinedExecutableFunctionsLoader.h>
-#include <Functions/UserDefined/IUserDefinedSQLObjectsLoader.h>
-#include <Functions/UserDefined/createUserDefinedSQLObjectsLoader.h>
+#include <Functions/UserDefined/IUserDefinedSQLObjectsStorage.h>
+#include <Functions/UserDefined/createUserDefinedSQLObjectsStorage.h>
#include <Interpreters/ProcessList.h>
#include <Interpreters/InterserverCredentials.h>
#include <Interpreters/Cluster.h>
@@ -238,7 +238,7 @@ struct ContextSharedPart : boost::noncopyable
ExternalLoaderXMLConfigRepository * user_defined_executable_functions_config_repository = nullptr;
scope_guard user_defined_executable_functions_xmls;
- mutable std::unique_ptr<IUserDefinedSQLObjectsLoader> user_defined_sql_objects_loader;
+ mutable std::unique_ptr<IUserDefinedSQLObjectsStorage> user_defined_sql_objects_storage;
#if USE_NLP
mutable std::optional<SynonymsExtensions> synonyms_extensions;
@@ -489,7 +489,7 @@ struct ContextSharedPart : boost::noncopyable
SHUTDOWN(log, "dictionaries loader", external_dictionaries_loader, enablePeriodicUpdates(false));
SHUTDOWN(log, "UDFs loader", external_user_defined_executable_functions_loader, enablePeriodicUpdates(false));
- SHUTDOWN(log, "another UDFs loader", user_defined_sql_objects_loader, stopWatching());
+ SHUTDOWN(log, "another UDFs storage", user_defined_sql_objects_storage, stopWatching());
LOG_TRACE(log, "Shutting down named sessions");
Session::shutdownNamedSessions();
@@ -516,7 +516,7 @@ struct ContextSharedPart : boost::noncopyable
std::unique_ptr<EmbeddedDictionaries> delete_embedded_dictionaries;
std::unique_ptr<ExternalDictionariesLoader> delete_external_dictionaries_loader;
std::unique_ptr<ExternalUserDefinedExecutableFunctionsLoader> delete_external_user_defined_executable_functions_loader;
- std::unique_ptr<IUserDefinedSQLObjectsLoader> delete_user_defined_sql_objects_loader;
+ std::unique_ptr<IUserDefinedSQLObjectsStorage> delete_user_defined_sql_objects_storage;
std::unique_ptr<BackgroundSchedulePool> delete_buffer_flush_schedule_pool;
std::unique_ptr<BackgroundSchedulePool> delete_schedule_pool;
std::unique_ptr<BackgroundSchedulePool> delete_distributed_schedule_pool;
@@ -575,7 +575,7 @@ struct ContextSharedPart : boost::noncopyable
delete_embedded_dictionaries = std::move(embedded_dictionaries);
delete_external_dictionaries_loader = std::move(external_dictionaries_loader);
delete_external_user_defined_executable_functions_loader = std::move(external_user_defined_executable_functions_loader);
- delete_user_defined_sql_objects_loader = std::move(user_defined_sql_objects_loader);
+ delete_user_defined_sql_objects_storage = std::move(user_defined_sql_objects_storage);
delete_buffer_flush_schedule_pool = std::move(buffer_flush_schedule_pool);
delete_schedule_pool = std::move(schedule_pool);
delete_distributed_schedule_pool = std::move(distributed_schedule_pool);
@@ -602,7 +602,7 @@ struct ContextSharedPart : boost::noncopyable
delete_embedded_dictionaries.reset();
delete_external_dictionaries_loader.reset();
delete_external_user_defined_executable_functions_loader.reset();
- delete_user_defined_sql_objects_loader.reset();
+ delete_user_defined_sql_objects_storage.reset();
delete_ddl_worker.reset();
delete_buffer_flush_schedule_pool.reset();
delete_schedule_pool.reset();
@@ -2188,20 +2188,26 @@ void Context::loadOrReloadUserDefinedExecutableFunctions(const Poco::Util::Abstr
shared->user_defined_executable_functions_xmls = external_user_defined_executable_functions_loader.addConfigRepository(std::move(repository));
}
-const IUserDefinedSQLObjectsLoader & Context::getUserDefinedSQLObjectsLoader() const
+const IUserDefinedSQLObjectsStorage & Context::getUserDefinedSQLObjectsStorage() const
{
auto lock = getLock();
- if (!shared->user_defined_sql_objects_loader)
- shared->user_defined_sql_objects_loader = createUserDefinedSQLObjectsLoader(getGlobalContext());
- return *shared->user_defined_sql_objects_loader;
+ if (!shared->user_defined_sql_objects_storage)
+ shared->user_defined_sql_objects_storage = createUserDefinedSQLObjectsStorage(getGlobalContext());
+ return *shared->user_defined_sql_objects_storage;
}
-IUserDefinedSQLObjectsLoader & Context::getUserDefinedSQLObjectsLoader()
+IUserDefinedSQLObjectsStorage & Context::getUserDefinedSQLObjectsStorage()
{
auto lock = getLock();
- if (!shared->user_defined_sql_objects_loader)
- shared->user_defined_sql_objects_loader = createUserDefinedSQLObjectsLoader(getGlobalContext());
- return *shared->user_defined_sql_objects_loader;
+ if (!shared->user_defined_sql_objects_storage)
+ shared->user_defined_sql_objects_storage = createUserDefinedSQLObjectsStorage(getGlobalContext());
+ return *shared->user_defined_sql_objects_storage;
+}
+
+void Context::setUserDefinedSQLObjectsStorage(std::unique_ptr<IUserDefinedSQLObjectsStorage> storage)
+{
+ auto lock = getLock();
+ shared->user_defined_sql_objects_storage = std::move(storage);
}
#if USE_NLP
diff --git a/contrib/clickhouse/src/Interpreters/Context.h b/contrib/clickhouse/src/Interpreters/Context.h
index eba205a88c3..d9ce12d9ae0 100644
--- a/contrib/clickhouse/src/Interpreters/Context.h
+++ b/contrib/clickhouse/src/Interpreters/Context.h
@@ -65,7 +65,7 @@ enum class RowPolicyFilterType;
class EmbeddedDictionaries;
class ExternalDictionariesLoader;
class ExternalUserDefinedExecutableFunctionsLoader;
-class IUserDefinedSQLObjectsLoader;
+class IUserDefinedSQLObjectsStorage;
class InterserverCredentials;
using InterserverCredentialsPtr = std::shared_ptr<const InterserverCredentials>;
class InterserverIOHandler;
@@ -764,8 +764,9 @@ public:
const ExternalUserDefinedExecutableFunctionsLoader & getExternalUserDefinedExecutableFunctionsLoader() const;
ExternalUserDefinedExecutableFunctionsLoader & getExternalUserDefinedExecutableFunctionsLoader();
ExternalUserDefinedExecutableFunctionsLoader & getExternalUserDefinedExecutableFunctionsLoaderUnlocked();
- const IUserDefinedSQLObjectsLoader & getUserDefinedSQLObjectsLoader() const;
- IUserDefinedSQLObjectsLoader & getUserDefinedSQLObjectsLoader();
+ const IUserDefinedSQLObjectsStorage & getUserDefinedSQLObjectsStorage() const;
+ IUserDefinedSQLObjectsStorage & getUserDefinedSQLObjectsStorage();
+ void setUserDefinedSQLObjectsStorage(std::unique_ptr<IUserDefinedSQLObjectsStorage> storage);
void loadOrReloadUserDefinedExecutableFunctions(const Poco::Util::AbstractConfiguration & config);
#if USE_NLP
diff --git a/contrib/clickhouse/src/Interpreters/InterpreterCreateFunctionQuery.cpp b/contrib/clickhouse/src/Interpreters/InterpreterCreateFunctionQuery.cpp
index 3e87f4fe440..b155476fd79 100644
--- a/contrib/clickhouse/src/Interpreters/InterpreterCreateFunctionQuery.cpp
+++ b/contrib/clickhouse/src/Interpreters/InterpreterCreateFunctionQuery.cpp
@@ -1,7 +1,7 @@
#include <Interpreters/InterpreterCreateFunctionQuery.h>
#include <Access/ContextAccess.h>
-#include <Functions/UserDefined/IUserDefinedSQLObjectsLoader.h>
+#include <Functions/UserDefined/IUserDefinedSQLObjectsStorage.h>
#include <Functions/UserDefined/UserDefinedSQLFunctionFactory.h>
#include <Interpreters/Context.h>
#include <Interpreters/executeDDLQueryOnCluster.h>
@@ -32,7 +32,7 @@ BlockIO InterpreterCreateFunctionQuery::execute()
if (!create_function_query.cluster.empty())
{
- if (current_context->getUserDefinedSQLObjectsLoader().isReplicated())
+ if (current_context->getUserDefinedSQLObjectsStorage().isReplicated())
throw Exception(ErrorCodes::INCORRECT_QUERY, "ON CLUSTER is not allowed because used-defined functions are replicated automatically");
DDLQueryOnClusterParams params;
diff --git a/contrib/clickhouse/src/Interpreters/InterpreterDropFunctionQuery.cpp b/contrib/clickhouse/src/Interpreters/InterpreterDropFunctionQuery.cpp
index af60d9c5df7..c2cd24044da 100644
--- a/contrib/clickhouse/src/Interpreters/InterpreterDropFunctionQuery.cpp
+++ b/contrib/clickhouse/src/Interpreters/InterpreterDropFunctionQuery.cpp
@@ -1,7 +1,7 @@
#include <Interpreters/InterpreterDropFunctionQuery.h>
#include <Access/ContextAccess.h>
-#include <Functions/UserDefined/IUserDefinedSQLObjectsLoader.h>
+#include <Functions/UserDefined/IUserDefinedSQLObjectsStorage.h>
#include <Functions/UserDefined/UserDefinedSQLFunctionFactory.h>
#include <Interpreters/Context.h>
#include <Interpreters/FunctionNameNormalizer.h>
@@ -32,7 +32,7 @@ BlockIO InterpreterDropFunctionQuery::execute()
if (!drop_function_query.cluster.empty())
{
- if (current_context->getUserDefinedSQLObjectsLoader().isReplicated())
+ if (current_context->getUserDefinedSQLObjectsStorage().isReplicated())
throw Exception(ErrorCodes::INCORRECT_QUERY, "ON CLUSTER is not allowed because used-defined functions are replicated automatically");
DDLQueryOnClusterParams params;
diff --git a/contrib/clickhouse/src/Interpreters/removeOnClusterClauseIfNeeded.cpp b/contrib/clickhouse/src/Interpreters/removeOnClusterClauseIfNeeded.cpp
index da3930d62a6..f8df03ed830 100644
--- a/contrib/clickhouse/src/Interpreters/removeOnClusterClauseIfNeeded.cpp
+++ b/contrib/clickhouse/src/Interpreters/removeOnClusterClauseIfNeeded.cpp
@@ -3,7 +3,7 @@
#include <Access/AccessControl.h>
#include <Access/ReplicatedAccessStorage.h>
#include <Common/logger_useful.h>
-#include <Functions/UserDefined/IUserDefinedSQLObjectsLoader.h>
+#include <Functions/UserDefined/IUserDefinedSQLObjectsStorage.h>
#include <Interpreters/Context.h>
#include <Parsers/ASTCreateFunctionQuery.h>
#include <Parsers/ASTDropFunctionQuery.h>
@@ -47,7 +47,7 @@ ASTPtr removeOnClusterClauseIfNeeded(const ASTPtr & query, ContextPtr context, c
if ((isUserDefinedFunctionQuery(query)
&& context->getSettings().ignore_on_cluster_for_replicated_udf_queries
- && context->getUserDefinedSQLObjectsLoader().isReplicated())
+ && context->getUserDefinedSQLObjectsStorage().isReplicated())
|| (isAccessControlQuery(query)
&& context->getSettings().ignore_on_cluster_for_replicated_access_entities_queries
&& context->getAccessControl().containsStorage(ReplicatedAccessStorage::STORAGE_TYPE)))
diff --git a/contrib/clickhouse/src/ya.make b/contrib/clickhouse/src/ya.make
index 1f5158a3ca9..4b91f7a155b 100644
--- a/contrib/clickhouse/src/ya.make
+++ b/contrib/clickhouse/src/ya.make
@@ -770,9 +770,13 @@ SRCS(
Functions/UserDefined/UserDefinedSQLFunctionFactory.cpp
Functions/UserDefined/UserDefinedSQLFunctionVisitor.cpp
Functions/UserDefined/UserDefinedSQLObjectsBackup.cpp
+ Functions/UserDefined/UserDefinedSQLObjectsDiskStorage.cpp
Functions/UserDefined/UserDefinedSQLObjectsLoaderFromDisk.cpp
Functions/UserDefined/UserDefinedSQLObjectsLoaderFromZooKeeper.cpp
+ Functions/UserDefined/UserDefinedSQLObjectsStorageBase.cpp
+ Functions/UserDefined/UserDefinedSQLObjectsZooKeeperStorage.cpp
Functions/UserDefined/createUserDefinedSQLObjectsLoader.cpp
+ Functions/UserDefined/createUserDefinedSQLObjectsStorage.cpp
Functions/divide/divide.cpp
GLOBAL Functions/CRC.cpp
GLOBAL Functions/CastOverloadResolver.cpp