diff options
| author | AlexSm <[email protected]> | 2023-12-22 17:10:22 +0100 | 
|---|---|---|
| committer | GitHub <[email protected]> | 2023-12-22 17:10:22 +0100 | 
| commit | 148f920350c60c0ca2d89b637a5aea9093eee450 (patch) | |
| tree | 6314b1433dac833398c333731e83f0ad77e81a0b /contrib/clickhouse/src | |
| parent | 7116d46ae7c0259b5f9d489de263f8701e432b1c (diff) | |
Library import 2 (#639)
Diffstat (limited to 'contrib/clickhouse/src')
23 files changed, 1186 insertions, 965 deletions
diff --git a/contrib/clickhouse/src/Functions/UserDefined/IUserDefinedSQLObjectsLoader.h b/contrib/clickhouse/src/Functions/UserDefined/IUserDefinedSQLObjectsLoader.h deleted file mode 100644 index 4c7850951b5..00000000000 --- a/contrib/clickhouse/src/Functions/UserDefined/IUserDefinedSQLObjectsLoader.h +++ /dev/null @@ -1,47 +0,0 @@ -#pragma once - -#include <base/types.h> - - -namespace DB -{ -class IAST; -struct Settings; -enum class UserDefinedSQLObjectType; - -/// Interface for a loader of user-defined SQL objects. -/// Implementations: UserDefinedSQLLoaderFromDisk, UserDefinedSQLLoaderFromZooKeeper -class IUserDefinedSQLObjectsLoader -{ -public: -    virtual ~IUserDefinedSQLObjectsLoader() = default; - -    /// Whether this loader can replicate SQL objects to another node. -    virtual bool isReplicated() const { return false; } -    virtual String getReplicationID() const { return ""; } - -    /// Loads all objects. Can be called once - if objects are already loaded the function does nothing. -    virtual void loadObjects() = 0; - -    /// Stops watching. -    virtual void stopWatching() {} - -    /// Immediately reloads all objects, throws an exception if failed. -    virtual void reloadObjects() = 0; - -    /// Immediately reloads a specified object only. -    virtual void reloadObject(UserDefinedSQLObjectType object_type, const String & object_name) = 0; - -    /// Stores an object (must be called only by UserDefinedSQLFunctionFactory::registerFunction). -    virtual bool storeObject( -        UserDefinedSQLObjectType object_type, -        const String & object_name, -        const IAST & create_object_query, -        bool throw_if_exists, -        bool replace_if_exists, -        const Settings & settings) = 0; - -    /// Removes an object (must be called only by UserDefinedSQLFunctionFactory::unregisterFunction). -    virtual bool removeObject(UserDefinedSQLObjectType object_type, const String & object_name, bool throw_if_not_exists) = 0; -}; -} diff --git a/contrib/clickhouse/src/Functions/UserDefined/IUserDefinedSQLObjectsStorage.h b/contrib/clickhouse/src/Functions/UserDefined/IUserDefinedSQLObjectsStorage.h new file mode 100644 index 00000000000..345ff8c5954 --- /dev/null +++ b/contrib/clickhouse/src/Functions/UserDefined/IUserDefinedSQLObjectsStorage.h @@ -0,0 +1,74 @@ +#pragma once + +#include <base/types.h> + +#include <Interpreters/Context_fwd.h> + +#include <Parsers/IAST_fwd.h> + + +namespace DB +{ +class IAST; +struct Settings; +enum class UserDefinedSQLObjectType; + +/// Interface for a storage of user-defined SQL objects. +/// Implementations: UserDefinedSQLObjectsDiskStorage, UserDefinedSQLObjectsZooKeeperStorage +class IUserDefinedSQLObjectsStorage +{ +public: +    virtual ~IUserDefinedSQLObjectsStorage() = default; + +    /// Whether this loader can replicate SQL objects to another node. +    virtual bool isReplicated() const { return false; } +    virtual String getReplicationID() const { return ""; } + +    /// Loads all objects. Can be called once - if objects are already loaded the function does nothing. +    virtual void loadObjects() = 0; + +    /// Get object by name. If no object stored with object_name throws exception. +    virtual ASTPtr get(const String & object_name) const = 0; + +    /// Get object by name. If no object stored with object_name return nullptr. +    virtual ASTPtr tryGet(const String & object_name) const = 0; + +    /// Check if object with object_name is stored. +    virtual bool has(const String & object_name) const = 0; + +    /// Get all user defined object names. +    virtual std::vector<String> getAllObjectNames() const = 0; + +    /// Get all user defined objects. +    virtual std::vector<std::pair<String, ASTPtr>> getAllObjects() const = 0; + +    /// Check whether any UDFs have been stored. +    virtual bool empty() const = 0; + +    /// Stops watching. +    virtual void stopWatching() {} + +    /// Immediately reloads all objects, throws an exception if failed. +    virtual void reloadObjects() = 0; + +    /// Immediately reloads a specified object only. +    virtual void reloadObject(UserDefinedSQLObjectType object_type, const String & object_name) = 0; + +    /// Stores an object (must be called only by UserDefinedSQLFunctionFactory::registerFunction). +    virtual bool storeObject( +        const ContextPtr & current_context, +        UserDefinedSQLObjectType object_type, +        const String & object_name, +        ASTPtr create_object_query, +        bool throw_if_exists, +        bool replace_if_exists, +        const Settings & settings) = 0; + +    /// Removes an object (must be called only by UserDefinedSQLFunctionFactory::unregisterFunction). +    virtual bool removeObject( +        const ContextPtr & current_context, +        UserDefinedSQLObjectType object_type, +        const String & object_name, +        bool throw_if_not_exists) = 0; +}; +} diff --git a/contrib/clickhouse/src/Functions/UserDefined/UserDefinedSQLFunctionFactory.cpp b/contrib/clickhouse/src/Functions/UserDefined/UserDefinedSQLFunctionFactory.cpp index 622854b3508..1cee137ba10 100644 --- a/contrib/clickhouse/src/Functions/UserDefined/UserDefinedSQLFunctionFactory.cpp +++ b/contrib/clickhouse/src/Functions/UserDefined/UserDefinedSQLFunctionFactory.cpp @@ -3,7 +3,7 @@  #include <AggregateFunctions/AggregateFunctionFactory.h>  #include <Backups/RestorerFromBackup.h>  #include <Functions/FunctionFactory.h> -#include <Functions/UserDefined/IUserDefinedSQLObjectsLoader.h> +#include <Functions/UserDefined/IUserDefinedSQLObjectsStorage.h>  #include <Functions/UserDefined/UserDefinedExecutableFunctionFactory.h>  #include <Functions/UserDefined/UserDefinedSQLObjectType.h>  #include <Functions/UserDefined/UserDefinedSQLObjectsBackup.h> @@ -128,20 +128,17 @@ bool UserDefinedSQLFunctionFactory::registerFunction(const ContextMutablePtr & c      checkCanBeRegistered(context, function_name, *create_function_query);      create_function_query = normalizeCreateFunctionQuery(*create_function_query); -    std::lock_guard lock{mutex}; -    auto it = function_name_to_create_query_map.find(function_name); -    if (it != function_name_to_create_query_map.end()) -    { -        if (throw_if_exists) -            throw Exception(ErrorCodes::FUNCTION_ALREADY_EXISTS, "User-defined function '{}' already exists", function_name); -        else if (!replace_if_exists) -            return false; -    } -      try      { -        auto & loader = context->getUserDefinedSQLObjectsLoader(); -        bool stored = loader.storeObject(UserDefinedSQLObjectType::Function, function_name, *create_function_query, throw_if_exists, replace_if_exists, context->getSettingsRef()); +        auto & loader = context->getUserDefinedSQLObjectsStorage(); +        bool stored = loader.storeObject( +            context, +            UserDefinedSQLObjectType::Function, +            function_name, +            create_function_query, +            throw_if_exists, +            replace_if_exists, +            context->getSettingsRef());          if (!stored)              return false;      } @@ -151,7 +148,6 @@ bool UserDefinedSQLFunctionFactory::registerFunction(const ContextMutablePtr & c          throw;      } -    function_name_to_create_query_map[function_name] = create_function_query;      return true;  } @@ -159,20 +155,14 @@ bool UserDefinedSQLFunctionFactory::unregisterFunction(const ContextMutablePtr &  {      checkCanBeUnregistered(context, function_name); -    std::lock_guard lock(mutex); -    auto it = function_name_to_create_query_map.find(function_name); -    if (it == function_name_to_create_query_map.end()) -    { -        if (throw_if_not_exists) -            throw Exception(ErrorCodes::UNKNOWN_FUNCTION, "User-defined function '{}' doesn't exist", function_name); -        else -            return false; -    } -      try      { -        auto & loader = context->getUserDefinedSQLObjectsLoader(); -        bool removed = loader.removeObject(UserDefinedSQLObjectType::Function, function_name, throw_if_not_exists); +        auto & storage = context->getUserDefinedSQLObjectsStorage(); +        bool removed = storage.removeObject( +            context, +            UserDefinedSQLObjectType::Function, +            function_name, +            throw_if_not_exists);          if (!removed)              return false;      } @@ -182,61 +172,41 @@ bool UserDefinedSQLFunctionFactory::unregisterFunction(const ContextMutablePtr &          throw;      } -    function_name_to_create_query_map.erase(function_name);      return true;  }  ASTPtr UserDefinedSQLFunctionFactory::get(const String & function_name) const  { -    std::lock_guard lock(mutex); - -    auto it = function_name_to_create_query_map.find(function_name); -    if (it == function_name_to_create_query_map.end()) -        throw Exception(ErrorCodes::UNKNOWN_FUNCTION, -            "The function name '{}' is not registered", -            function_name); - -    return it->second; +    return global_context->getUserDefinedSQLObjectsStorage().get(function_name);  }  ASTPtr UserDefinedSQLFunctionFactory::tryGet(const std::string & function_name) const  { -    std::lock_guard lock(mutex); - -    auto it = function_name_to_create_query_map.find(function_name); -    if (it == function_name_to_create_query_map.end()) -        return nullptr; - -    return it->second; +    return global_context->getUserDefinedSQLObjectsStorage().tryGet(function_name);  }  bool UserDefinedSQLFunctionFactory::has(const String & function_name) const  { -    return tryGet(function_name) != nullptr; +    return global_context->getUserDefinedSQLObjectsStorage().has(function_name);  }  std::vector<std::string> UserDefinedSQLFunctionFactory::getAllRegisteredNames() const  { -    std::vector<std::string> registered_names; - -    std::lock_guard lock(mutex); -    registered_names.reserve(function_name_to_create_query_map.size()); - -    for (const auto & [name, _] : function_name_to_create_query_map) -        registered_names.emplace_back(name); - -    return registered_names; +    return global_context->getUserDefinedSQLObjectsStorage().getAllObjectNames();  }  bool UserDefinedSQLFunctionFactory::empty() const  { -    std::lock_guard lock(mutex); -    return function_name_to_create_query_map.empty(); +    return global_context->getUserDefinedSQLObjectsStorage().empty();  }  void UserDefinedSQLFunctionFactory::backup(BackupEntriesCollector & backup_entries_collector, const String & data_path_in_backup) const  { -    backupUserDefinedSQLObjects(backup_entries_collector, data_path_in_backup, UserDefinedSQLObjectType::Function, getAllFunctions()); +    backupUserDefinedSQLObjects( +        backup_entries_collector, +        data_path_in_backup, +        UserDefinedSQLObjectType::Function, +        global_context->getUserDefinedSQLObjectsStorage().getAllObjects());  }  void UserDefinedSQLFunctionFactory::restore(RestorerFromBackup & restorer, const String & data_path_in_backup) @@ -250,52 +220,4 @@ void UserDefinedSQLFunctionFactory::restore(RestorerFromBackup & restorer, const          registerFunction(context, function_name, create_function_query, throw_if_exists, replace_if_exists);  } -void UserDefinedSQLFunctionFactory::setAllFunctions(const std::vector<std::pair<String, ASTPtr>> & new_functions) -{ -    std::unordered_map<String, ASTPtr> normalized_functions; -    for (const auto & [function_name, create_query] : new_functions) -        normalized_functions[function_name] = normalizeCreateFunctionQuery(*create_query); - -    std::lock_guard lock(mutex); -    function_name_to_create_query_map = std::move(normalized_functions); -} - -std::vector<std::pair<String, ASTPtr>> UserDefinedSQLFunctionFactory::getAllFunctions() const -{ -    std::lock_guard lock{mutex}; -    std::vector<std::pair<String, ASTPtr>> all_functions; -    all_functions.reserve(function_name_to_create_query_map.size()); -    std::copy(function_name_to_create_query_map.begin(), function_name_to_create_query_map.end(), std::back_inserter(all_functions)); -    return all_functions; -} - -void UserDefinedSQLFunctionFactory::setFunction(const String & function_name, const IAST & create_function_query) -{ -    std::lock_guard lock(mutex); -    function_name_to_create_query_map[function_name] = normalizeCreateFunctionQuery(create_function_query); -} - -void UserDefinedSQLFunctionFactory::removeFunction(const String & function_name) -{ -    std::lock_guard lock(mutex); -    function_name_to_create_query_map.erase(function_name); -} - -void UserDefinedSQLFunctionFactory::removeAllFunctionsExcept(const Strings & function_names_to_keep) -{ -    boost::container::flat_set<std::string_view> names_set_to_keep{function_names_to_keep.begin(), function_names_to_keep.end()}; -    std::lock_guard lock(mutex); -    for (auto it = function_name_to_create_query_map.begin(); it != function_name_to_create_query_map.end();) -    { -        auto current = it++; -        if (!names_set_to_keep.contains(current->first)) -            function_name_to_create_query_map.erase(current); -    } -} - -std::unique_lock<std::recursive_mutex> UserDefinedSQLFunctionFactory::getLock() const -{ -    return std::unique_lock{mutex}; -} -  } diff --git a/contrib/clickhouse/src/Functions/UserDefined/UserDefinedSQLFunctionFactory.h b/contrib/clickhouse/src/Functions/UserDefined/UserDefinedSQLFunctionFactory.h index 45196759d3b..489feeae6f5 100644 --- a/contrib/clickhouse/src/Functions/UserDefined/UserDefinedSQLFunctionFactory.h +++ b/contrib/clickhouse/src/Functions/UserDefined/UserDefinedSQLFunctionFactory.h @@ -6,7 +6,7 @@  #include <Common/NamePrompter.h>  #include <Parsers/ASTCreateFunctionQuery.h> -#include <Interpreters/Context_fwd.h> +#include <Interpreters/Context.h>  namespace DB @@ -48,23 +48,11 @@ public:      void restore(RestorerFromBackup & restorer, const String & data_path_in_backup);  private: -    friend class UserDefinedSQLObjectsLoaderFromDisk; -    friend class UserDefinedSQLObjectsLoaderFromZooKeeper; -      /// Checks that a specified function can be registered, throws an exception if not.      static void checkCanBeRegistered(const ContextPtr & context, const String & function_name, const IAST & create_function_query);      static void checkCanBeUnregistered(const ContextPtr & context, const String & function_name); -    /// The following functions must be called only by the loader. -    void setAllFunctions(const std::vector<std::pair<String, ASTPtr>> & new_functions); -    std::vector<std::pair<String, ASTPtr>> getAllFunctions() const; -    void setFunction(const String & function_name, const IAST & create_function_query); -    void removeFunction(const String & function_name); -    void removeAllFunctionsExcept(const Strings & function_names_to_keep); -    std::unique_lock<std::recursive_mutex> getLock() const; - -    std::unordered_map<String, ASTPtr> function_name_to_create_query_map; -    mutable std::recursive_mutex mutex; +    ContextPtr global_context = Context::getGlobalContextInstance();  };  } diff --git a/contrib/clickhouse/src/Functions/UserDefined/UserDefinedSQLObjectsBackup.cpp b/contrib/clickhouse/src/Functions/UserDefined/UserDefinedSQLObjectsBackup.cpp index 6920e8ce2c2..3ec5393fa6f 100644 --- a/contrib/clickhouse/src/Functions/UserDefined/UserDefinedSQLObjectsBackup.cpp +++ b/contrib/clickhouse/src/Functions/UserDefined/UserDefinedSQLObjectsBackup.cpp @@ -6,7 +6,7 @@  #include <Backups/IBackupCoordination.h>  #include <Backups/IRestoreCoordination.h>  #include <Backups/RestorerFromBackup.h> -#include <Functions/UserDefined/IUserDefinedSQLObjectsLoader.h> +#include <Functions/UserDefined/IUserDefinedSQLObjectsStorage.h>  #include <Functions/UserDefined/UserDefinedSQLObjectType.h>  #include <Interpreters/Context.h>  #include <Parsers/ParserCreateFunctionQuery.h> @@ -37,9 +37,9 @@ void backupUserDefinedSQLObjects(              escapeForFileName(object_name) + ".sql", std::make_shared<BackupEntryFromMemory>(queryToString(create_object_query)));      auto context = backup_entries_collector.getContext(); -    const auto & loader = context->getUserDefinedSQLObjectsLoader(); +    const auto & storage = context->getUserDefinedSQLObjectsStorage(); -    if (!loader.isReplicated()) +    if (!storage.isReplicated())      {          fs::path data_path_in_backup_fs{data_path_in_backup};          for (const auto & [file_name, entry] : backup_entries) @@ -47,7 +47,7 @@ void backupUserDefinedSQLObjects(          return;      } -    String replication_id = loader.getReplicationID(); +    String replication_id = storage.getReplicationID();      auto backup_coordination = backup_entries_collector.getBackupCoordination();      backup_coordination->addReplicatedSQLObjectsDir(replication_id, object_type, data_path_in_backup); @@ -80,9 +80,9 @@ std::vector<std::pair<String, ASTPtr>>  restoreUserDefinedSQLObjects(RestorerFromBackup & restorer, const String & data_path_in_backup, UserDefinedSQLObjectType object_type)  {      auto context = restorer.getContext(); -    const auto & loader = context->getUserDefinedSQLObjectsLoader(); +    const auto & storage = context->getUserDefinedSQLObjectsStorage(); -    if (loader.isReplicated() && !restorer.getRestoreCoordination()->acquireReplicatedSQLObjects(loader.getReplicationID(), object_type)) +    if (storage.isReplicated() && !restorer.getRestoreCoordination()->acquireReplicatedSQLObjects(storage.getReplicationID(), object_type))          return {}; /// Other replica is already restoring user-defined SQL objects.      auto backup = restorer.getBackup(); diff --git a/contrib/clickhouse/src/Functions/UserDefined/UserDefinedSQLObjectsDiskStorage.cpp b/contrib/clickhouse/src/Functions/UserDefined/UserDefinedSQLObjectsDiskStorage.cpp new file mode 100644 index 00000000000..271c464e79a --- /dev/null +++ b/contrib/clickhouse/src/Functions/UserDefined/UserDefinedSQLObjectsDiskStorage.cpp @@ -0,0 +1,268 @@ +#include "Functions/UserDefined/UserDefinedSQLObjectsDiskStorage.h" + +#include "Functions/UserDefined/UserDefinedSQLFunctionFactory.h" +#include "Functions/UserDefined/UserDefinedSQLObjectType.h" + +#include <Common/StringUtils/StringUtils.h> +#include <Common/atomicRename.h> +#include <Common/escapeForFileName.h> +#include <Common/logger_useful.h> +#include <Common/quoteString.h> + +#include <IO/ReadBufferFromFile.h> +#include <IO/ReadHelpers.h> +#include <IO/WriteBufferFromFile.h> +#include <IO/WriteHelpers.h> + +#include <Interpreters/Context.h> + +#include <Parsers/parseQuery.h> +#include <Parsers/formatAST.h> +#include <Parsers/ParserCreateFunctionQuery.h> + +#include <Poco/DirectoryIterator.h> +#include <Poco/Logger.h> + +#include <filesystem> + +namespace fs = std::filesystem; + + +namespace DB +{ + +namespace ErrorCodes +{ +    extern const int DIRECTORY_DOESNT_EXIST; +    extern const int FUNCTION_ALREADY_EXISTS; +    extern const int UNKNOWN_FUNCTION; +} + + +namespace +{ +    /// Converts a path to an absolute path and append it with a separator. +    String makeDirectoryPathCanonical(const String & directory_path) +    { +        auto canonical_directory_path = std::filesystem::weakly_canonical(directory_path); +        if (canonical_directory_path.has_filename()) +            canonical_directory_path += std::filesystem::path::preferred_separator; +        return canonical_directory_path; +    } +} + +UserDefinedSQLObjectsDiskStorage::UserDefinedSQLObjectsDiskStorage(const ContextPtr & global_context_, const String & dir_path_) +    : global_context(global_context_) +    , dir_path{makeDirectoryPathCanonical(dir_path_)} +    , log{&Poco::Logger::get("UserDefinedSQLObjectsLoaderFromDisk")} +{ +    createDirectory(); +} + + +ASTPtr UserDefinedSQLObjectsDiskStorage::tryLoadObject(UserDefinedSQLObjectType object_type, const String & object_name) +{ +    return tryLoadObject(object_type, object_name, getFilePath(object_type, object_name), /* check_file_exists= */ true); +} + + +ASTPtr UserDefinedSQLObjectsDiskStorage::tryLoadObject(UserDefinedSQLObjectType object_type, const String & object_name, const String & path, bool check_file_exists) +{ +    LOG_DEBUG(log, "Loading user defined object {} from file {}", backQuote(object_name), path); + +    try +    { +        if (check_file_exists && !fs::exists(path)) +            return nullptr; + +        /// There is .sql file with user defined object creation statement. +        ReadBufferFromFile in(path); + +        String object_create_query; +        readStringUntilEOF(object_create_query, in); + +        switch (object_type) +        { +            case UserDefinedSQLObjectType::Function: +            { +                ParserCreateFunctionQuery parser; +                ASTPtr ast = parseQuery( +                    parser, +                    object_create_query.data(), +                    object_create_query.data() + object_create_query.size(), +                    "", +                    0, +                    global_context->getSettingsRef().max_parser_depth); +                return ast; +            } +        } +    } +    catch (...) +    { +        tryLogCurrentException(log, fmt::format("while loading user defined SQL object {} from path {}", backQuote(object_name), path)); +        return nullptr; /// Failed to load this sql object, will ignore it +    } +} + + +void UserDefinedSQLObjectsDiskStorage::loadObjects() +{ +    if (!objects_loaded) +        loadObjectsImpl(); +} + + +void UserDefinedSQLObjectsDiskStorage::reloadObjects() +{ +    loadObjectsImpl(); +} + + +void UserDefinedSQLObjectsDiskStorage::loadObjectsImpl() +{ +    LOG_INFO(log, "Loading user defined objects from {}", dir_path); +    createDirectory(); + +    std::vector<std::pair<String, ASTPtr>> function_names_and_queries; + +    Poco::DirectoryIterator dir_end; +    for (Poco::DirectoryIterator it(dir_path); it != dir_end; ++it) +    { +        if (it->isDirectory()) +            continue; + +        const String & file_name = it.name(); +        if (!startsWith(file_name, "function_") || !endsWith(file_name, ".sql")) +            continue; + +        size_t prefix_length = strlen("function_"); +        size_t suffix_length = strlen(".sql"); +        String function_name = unescapeForFileName(file_name.substr(prefix_length, file_name.length() - prefix_length - suffix_length)); + +        if (function_name.empty()) +            continue; + +        ASTPtr ast = tryLoadObject(UserDefinedSQLObjectType::Function, function_name, dir_path + it.name(), /* check_file_exists= */ false); +        if (ast) +            function_names_and_queries.emplace_back(function_name, ast); +    } + +    setAllObjects(function_names_and_queries); +    objects_loaded = true; + +    LOG_DEBUG(log, "User defined objects loaded"); +} + + +void UserDefinedSQLObjectsDiskStorage::reloadObject(UserDefinedSQLObjectType object_type, const String & object_name) +{ +    createDirectory(); +    auto ast = tryLoadObject(object_type, object_name); +    if (ast) +        setObject(object_name, *ast); +    else +        removeObject(object_name); +} + + +void UserDefinedSQLObjectsDiskStorage::createDirectory() +{ +    std::error_code create_dir_error_code; +    fs::create_directories(dir_path, create_dir_error_code); +    if (!fs::exists(dir_path) || !fs::is_directory(dir_path) || create_dir_error_code) +        throw Exception(ErrorCodes::DIRECTORY_DOESNT_EXIST, "Couldn't create directory {} reason: '{}'", +                        dir_path, create_dir_error_code.message()); +} + + +bool UserDefinedSQLObjectsDiskStorage::storeObjectImpl( +    const ContextPtr & /*current_context*/, +    UserDefinedSQLObjectType object_type, +    const String & object_name, +    ASTPtr create_object_query, +    bool throw_if_exists, +    bool replace_if_exists, +    const Settings & settings) +{ +    String file_path = getFilePath(object_type, object_name); +    LOG_DEBUG(log, "Storing user-defined object {} to file {}", backQuote(object_name), file_path); + +    if (fs::exists(file_path)) +    { +        if (throw_if_exists) +            throw Exception(ErrorCodes::FUNCTION_ALREADY_EXISTS, "User-defined function '{}' already exists", object_name); +        else if (!replace_if_exists) +            return false; +    } + +    WriteBufferFromOwnString create_statement_buf; +    formatAST(*create_object_query, create_statement_buf, false); +    writeChar('\n', create_statement_buf); +    String create_statement = create_statement_buf.str(); + +    String temp_file_path = file_path + ".tmp"; + +    try +    { +        WriteBufferFromFile out(temp_file_path, create_statement.size()); +        writeString(create_statement, out); +        out.next(); +        if (settings.fsync_metadata) +            out.sync(); +        out.close(); + +        if (replace_if_exists) +            fs::rename(temp_file_path, file_path); +        else +            renameNoReplace(temp_file_path, file_path); +    } +    catch (...) +    { +        fs::remove(temp_file_path); +        throw; +    } + +    LOG_TRACE(log, "Object {} stored", backQuote(object_name)); +    return true; +} + + +bool UserDefinedSQLObjectsDiskStorage::removeObjectImpl( +    const ContextPtr & /*current_context*/, +    UserDefinedSQLObjectType object_type, +    const String & object_name, +    bool throw_if_not_exists) +{ +    String file_path = getFilePath(object_type, object_name); +    LOG_DEBUG(log, "Removing user defined object {} stored in file {}", backQuote(object_name), file_path); + +    bool existed = fs::remove(file_path); + +    if (!existed) +    { +        if (throw_if_not_exists) +            throw Exception(ErrorCodes::UNKNOWN_FUNCTION, "User-defined function '{}' doesn't exist", object_name); +        else +            return false; +    } + +    LOG_TRACE(log, "Object {} removed", backQuote(object_name)); +    return true; +} + + +String UserDefinedSQLObjectsDiskStorage::getFilePath(UserDefinedSQLObjectType object_type, const String & object_name) const +{ +    String file_path; +    switch (object_type) +    { +        case UserDefinedSQLObjectType::Function: +        { +            file_path = dir_path + "function_" + escapeForFileName(object_name) + ".sql"; +            break; +        } +    } +    return file_path; +} + +} diff --git a/contrib/clickhouse/src/Functions/UserDefined/UserDefinedSQLObjectsLoaderFromDisk.h b/contrib/clickhouse/src/Functions/UserDefined/UserDefinedSQLObjectsDiskStorage.h index 7b0bb291f42..f0986dbda72 100644 --- a/contrib/clickhouse/src/Functions/UserDefined/UserDefinedSQLObjectsLoaderFromDisk.h +++ b/contrib/clickhouse/src/Functions/UserDefined/UserDefinedSQLObjectsDiskStorage.h @@ -1,6 +1,6 @@  #pragma once -#include <Functions/UserDefined/IUserDefinedSQLObjectsLoader.h> +#include <Functions/UserDefined/UserDefinedSQLObjectsStorageBase.h>  #include <Interpreters/Context_fwd.h>  #include <Parsers/IAST_fwd.h> @@ -9,10 +9,10 @@ namespace DB  {  /// Loads user-defined sql objects from a specified folder. -class UserDefinedSQLObjectsLoaderFromDisk : public IUserDefinedSQLObjectsLoader +class UserDefinedSQLObjectsDiskStorage : public UserDefinedSQLObjectsStorageBase  {  public: -    UserDefinedSQLObjectsLoaderFromDisk(const ContextPtr & global_context_, const String & dir_path_); +    UserDefinedSQLObjectsDiskStorage(const ContextPtr & global_context_, const String & dir_path_);      void loadObjects() override; @@ -20,17 +20,22 @@ public:      void reloadObject(UserDefinedSQLObjectType object_type, const String & object_name) override; -    bool storeObject( +private: +    bool storeObjectImpl( +        const ContextPtr & current_context,          UserDefinedSQLObjectType object_type,          const String & object_name, -        const IAST & create_object_query, +        ASTPtr create_object_query,          bool throw_if_exists,          bool replace_if_exists,          const Settings & settings) override; -    bool removeObject(UserDefinedSQLObjectType object_type, const String & object_name, bool throw_if_not_exists) override; +    bool removeObjectImpl( +        const ContextPtr & current_context, +        UserDefinedSQLObjectType object_type, +        const String & object_name, +        bool throw_if_not_exists) override; -private:      void createDirectory();      void loadObjectsImpl();      ASTPtr tryLoadObject(UserDefinedSQLObjectType object_type, const String & object_name); diff --git a/contrib/clickhouse/src/Functions/UserDefined/UserDefinedSQLObjectsLoaderFromDisk.cpp b/contrib/clickhouse/src/Functions/UserDefined/UserDefinedSQLObjectsLoaderFromDisk.cpp index d67c48f166d..e69de29bb2d 100644 --- a/contrib/clickhouse/src/Functions/UserDefined/UserDefinedSQLObjectsLoaderFromDisk.cpp +++ b/contrib/clickhouse/src/Functions/UserDefined/UserDefinedSQLObjectsLoaderFromDisk.cpp @@ -1,266 +0,0 @@ -#include "Functions/UserDefined/UserDefinedSQLObjectsLoaderFromDisk.h" - -#include "Functions/UserDefined/UserDefinedSQLFunctionFactory.h" -#include "Functions/UserDefined/UserDefinedSQLObjectType.h" - -#include <Common/StringUtils/StringUtils.h> -#include <Common/atomicRename.h> -#include <Common/escapeForFileName.h> -#include <Common/logger_useful.h> -#include <Common/quoteString.h> - -#include <IO/ReadBufferFromFile.h> -#include <IO/ReadHelpers.h> -#include <IO/WriteBufferFromFile.h> -#include <IO/WriteHelpers.h> - -#include <Interpreters/Context.h> - -#include <Parsers/parseQuery.h> -#include <Parsers/formatAST.h> -#include <Parsers/ParserCreateFunctionQuery.h> - -#include <Poco/DirectoryIterator.h> -#include <Poco/Logger.h> - -#include <filesystem> - -namespace fs = std::filesystem; - - -namespace DB -{ - -namespace ErrorCodes -{ -    extern const int DIRECTORY_DOESNT_EXIST; -    extern const int FUNCTION_ALREADY_EXISTS; -    extern const int UNKNOWN_FUNCTION; -} - - -namespace -{ -    /// Converts a path to an absolute path and append it with a separator. -    String makeDirectoryPathCanonical(const String & directory_path) -    { -        auto canonical_directory_path = std::filesystem::weakly_canonical(directory_path); -        if (canonical_directory_path.has_filename()) -            canonical_directory_path += std::filesystem::path::preferred_separator; -        return canonical_directory_path; -    } -} - -UserDefinedSQLObjectsLoaderFromDisk::UserDefinedSQLObjectsLoaderFromDisk(const ContextPtr & global_context_, const String & dir_path_) -    : global_context(global_context_) -    , dir_path{makeDirectoryPathCanonical(dir_path_)} -    , log{&Poco::Logger::get("UserDefinedSQLObjectsLoaderFromDisk")} -{ -    createDirectory(); -} - - -ASTPtr UserDefinedSQLObjectsLoaderFromDisk::tryLoadObject(UserDefinedSQLObjectType object_type, const String & object_name) -{ -    return tryLoadObject(object_type, object_name, getFilePath(object_type, object_name), /* check_file_exists= */ true); -} - - -ASTPtr UserDefinedSQLObjectsLoaderFromDisk::tryLoadObject(UserDefinedSQLObjectType object_type, const String & object_name, const String & path, bool check_file_exists) -{ -    LOG_DEBUG(log, "Loading user defined object {} from file {}", backQuote(object_name), path); - -    try -    { -        if (check_file_exists && !fs::exists(path)) -            return nullptr; - -        /// There is .sql file with user defined object creation statement. -        ReadBufferFromFile in(path); - -        String object_create_query; -        readStringUntilEOF(object_create_query, in); - -        switch (object_type) -        { -            case UserDefinedSQLObjectType::Function: -            { -                ParserCreateFunctionQuery parser; -                ASTPtr ast = parseQuery( -                    parser, -                    object_create_query.data(), -                    object_create_query.data() + object_create_query.size(), -                    "", -                    0, -                    global_context->getSettingsRef().max_parser_depth); -                UserDefinedSQLFunctionFactory::checkCanBeRegistered(global_context, object_name, *ast); -                return ast; -            } -        } -    } -    catch (...) -    { -        tryLogCurrentException(log, fmt::format("while loading user defined SQL object {} from path {}", backQuote(object_name), path)); -        return nullptr; /// Failed to load this sql object, will ignore it -    } -} - - -void UserDefinedSQLObjectsLoaderFromDisk::loadObjects() -{ -    if (!objects_loaded) -        loadObjectsImpl(); -} - - -void UserDefinedSQLObjectsLoaderFromDisk::reloadObjects() -{ -    loadObjectsImpl(); -} - - -void UserDefinedSQLObjectsLoaderFromDisk::loadObjectsImpl() -{ -    LOG_INFO(log, "Loading user defined objects from {}", dir_path); -    createDirectory(); - -    std::vector<std::pair<String, ASTPtr>> function_names_and_queries; - -    Poco::DirectoryIterator dir_end; -    for (Poco::DirectoryIterator it(dir_path); it != dir_end; ++it) -    { -        if (it->isDirectory()) -            continue; - -        const String & file_name = it.name(); -        if (!startsWith(file_name, "function_") || !endsWith(file_name, ".sql")) -            continue; - -        size_t prefix_length = strlen("function_"); -        size_t suffix_length = strlen(".sql"); -        String function_name = unescapeForFileName(file_name.substr(prefix_length, file_name.length() - prefix_length - suffix_length)); - -        if (function_name.empty()) -            continue; - -        ASTPtr ast = tryLoadObject(UserDefinedSQLObjectType::Function, function_name, dir_path + it.name(), /* check_file_exists= */ false); -        if (ast) -            function_names_and_queries.emplace_back(function_name, ast); -    } - -    UserDefinedSQLFunctionFactory::instance().setAllFunctions(function_names_and_queries); -    objects_loaded = true; - -    LOG_DEBUG(log, "User defined objects loaded"); -} - - -void UserDefinedSQLObjectsLoaderFromDisk::reloadObject(UserDefinedSQLObjectType object_type, const String & object_name) -{ -    createDirectory(); -    auto ast = tryLoadObject(object_type, object_name); -    auto & factory = UserDefinedSQLFunctionFactory::instance(); -    if (ast) -        factory.setFunction(object_name, *ast); -    else -        factory.removeFunction(object_name); -} - - -void UserDefinedSQLObjectsLoaderFromDisk::createDirectory() -{ -    std::error_code create_dir_error_code; -    fs::create_directories(dir_path, create_dir_error_code); -    if (!fs::exists(dir_path) || !fs::is_directory(dir_path) || create_dir_error_code) -        throw Exception(ErrorCodes::DIRECTORY_DOESNT_EXIST, "Couldn't create directory {} reason: '{}'", -                        dir_path, create_dir_error_code.message()); -} - - -bool UserDefinedSQLObjectsLoaderFromDisk::storeObject( -    UserDefinedSQLObjectType object_type, -    const String & object_name, -    const IAST & create_object_query, -    bool throw_if_exists, -    bool replace_if_exists, -    const Settings & settings) -{ -    String file_path = getFilePath(object_type, object_name); -    LOG_DEBUG(log, "Storing user-defined object {} to file {}", backQuote(object_name), file_path); - -    if (fs::exists(file_path)) -    { -        if (throw_if_exists) -            throw Exception(ErrorCodes::FUNCTION_ALREADY_EXISTS, "User-defined function '{}' already exists", object_name); -        else if (!replace_if_exists) -            return false; -    } - -    WriteBufferFromOwnString create_statement_buf; -    formatAST(create_object_query, create_statement_buf, false); -    writeChar('\n', create_statement_buf); -    String create_statement = create_statement_buf.str(); - -    String temp_file_path = file_path + ".tmp"; - -    try -    { -        WriteBufferFromFile out(temp_file_path, create_statement.size()); -        writeString(create_statement, out); -        out.next(); -        if (settings.fsync_metadata) -            out.sync(); -        out.close(); - -        if (replace_if_exists) -            fs::rename(temp_file_path, file_path); -        else -            renameNoReplace(temp_file_path, file_path); -    } -    catch (...) -    { -        fs::remove(temp_file_path); -        throw; -    } - -    LOG_TRACE(log, "Object {} stored", backQuote(object_name)); -    return true; -} - - -bool UserDefinedSQLObjectsLoaderFromDisk::removeObject( -    UserDefinedSQLObjectType object_type, const String & object_name, bool throw_if_not_exists) -{ -    String file_path = getFilePath(object_type, object_name); -    LOG_DEBUG(log, "Removing user defined object {} stored in file {}", backQuote(object_name), file_path); - -    bool existed = fs::remove(file_path); - -    if (!existed) -    { -        if (throw_if_not_exists) -            throw Exception(ErrorCodes::UNKNOWN_FUNCTION, "User-defined function '{}' doesn't exist", object_name); -        else -            return false; -    } - -    LOG_TRACE(log, "Object {} removed", backQuote(object_name)); -    return true; -} - - -String UserDefinedSQLObjectsLoaderFromDisk::getFilePath(UserDefinedSQLObjectType object_type, const String & object_name) const -{ -    String file_path; -    switch (object_type) -    { -        case UserDefinedSQLObjectType::Function: -        { -            file_path = dir_path + "function_" + escapeForFileName(object_name) + ".sql"; -            break; -        } -    } -    return file_path; -} - -} diff --git a/contrib/clickhouse/src/Functions/UserDefined/UserDefinedSQLObjectsLoaderFromZooKeeper.cpp b/contrib/clickhouse/src/Functions/UserDefined/UserDefinedSQLObjectsLoaderFromZooKeeper.cpp index 29aff666da5..e69de29bb2d 100644 --- a/contrib/clickhouse/src/Functions/UserDefined/UserDefinedSQLObjectsLoaderFromZooKeeper.cpp +++ b/contrib/clickhouse/src/Functions/UserDefined/UserDefinedSQLObjectsLoaderFromZooKeeper.cpp @@ -1,433 +0,0 @@ -#include <Functions/UserDefined/UserDefinedSQLObjectsLoaderFromZooKeeper.h> - -#include <Functions/UserDefined/UserDefinedSQLFunctionFactory.h> -#include <Functions/UserDefined/UserDefinedSQLObjectType.h> -#include <Interpreters/Context.h> -#include <Parsers/ParserCreateFunctionQuery.h> -#include <Parsers/formatAST.h> -#include <Parsers/parseQuery.h> -#include <base/sleep.h> -#include <Common/Exception.h> -#include <Common/ZooKeeper/KeeperException.h> -#include <Common/escapeForFileName.h> -#include <Common/logger_useful.h> -#include <Common/quoteString.h> -#include <Common/scope_guard_safe.h> -#include <Common/setThreadName.h> - - -namespace DB -{ - -namespace ErrorCodes -{ -    extern const int FUNCTION_ALREADY_EXISTS; -    extern const int UNKNOWN_FUNCTION; -    extern const int BAD_ARGUMENTS; -} - -namespace -{ -    std::string_view getNodePrefix(UserDefinedSQLObjectType object_type) -    { -        switch (object_type) -        { -            case UserDefinedSQLObjectType::Function: -                return "function_"; -        } -        UNREACHABLE(); -    } - -    constexpr std::string_view sql_extension = ".sql"; - -    String getNodePath(const String & root_path, UserDefinedSQLObjectType object_type, const String & object_name) -    { -        return root_path + "/" + String{getNodePrefix(object_type)} + escapeForFileName(object_name) + String{sql_extension}; -    } -} - - -UserDefinedSQLObjectsLoaderFromZooKeeper::UserDefinedSQLObjectsLoaderFromZooKeeper( -    const ContextPtr & global_context_, const String & zookeeper_path_) -    : global_context{global_context_} -    , zookeeper_getter{[global_context_]() { return global_context_->getZooKeeper(); }} -    , zookeeper_path{zookeeper_path_} -    , watch_queue{std::make_shared<ConcurrentBoundedQueue<std::pair<UserDefinedSQLObjectType, String>>>(std::numeric_limits<size_t>::max())} -    , log{&Poco::Logger::get("UserDefinedSQLObjectsLoaderFromZooKeeper")} -{ -    if (zookeeper_path.empty()) -        throw Exception(ErrorCodes::BAD_ARGUMENTS, "ZooKeeper path must be non-empty"); - -    if (zookeeper_path.back() == '/') -        zookeeper_path.resize(zookeeper_path.size() - 1); - -    /// If zookeeper chroot prefix is used, path should start with '/', because chroot concatenates without it. -    if (zookeeper_path.front() != '/') -        zookeeper_path = "/" + zookeeper_path; -} - -UserDefinedSQLObjectsLoaderFromZooKeeper::~UserDefinedSQLObjectsLoaderFromZooKeeper() -{ -    SCOPE_EXIT_SAFE(stopWatchingThread()); -} - -void UserDefinedSQLObjectsLoaderFromZooKeeper::startWatchingThread() -{ -    if (!watching_flag.exchange(true)) -    { -        watching_thread = ThreadFromGlobalPool(&UserDefinedSQLObjectsLoaderFromZooKeeper::processWatchQueue, this); -    } -} - -void UserDefinedSQLObjectsLoaderFromZooKeeper::stopWatchingThread() -{ -    if (watching_flag.exchange(false)) -    { -        watch_queue->finish(); -        if (watching_thread.joinable()) -            watching_thread.join(); -    } -} - -zkutil::ZooKeeperPtr UserDefinedSQLObjectsLoaderFromZooKeeper::getZooKeeper() -{ -    auto [zookeeper, session_status] = zookeeper_getter.getZooKeeper(); - -    if (session_status == zkutil::ZooKeeperCachingGetter::SessionStatus::New) -    { -        /// It's possible that we connected to different [Zoo]Keeper instance -        /// so we may read a bit stale state. -        zookeeper->sync(zookeeper_path); - -        createRootNodes(zookeeper); -        refreshAllObjects(zookeeper); -    } - -    return zookeeper; -} - -void UserDefinedSQLObjectsLoaderFromZooKeeper::initZooKeeperIfNeeded() -{ -    getZooKeeper(); -} - -void UserDefinedSQLObjectsLoaderFromZooKeeper::resetAfterError() -{ -    zookeeper_getter.resetCache(); -} - - -void UserDefinedSQLObjectsLoaderFromZooKeeper::loadObjects() -{ -    /// loadObjects() is called at start from Server::main(), so it's better not to stop here on no connection to ZooKeeper or any other error. -    /// However the watching thread must be started anyway in case the connection will be established later. -    if (!objects_loaded) -    { -        try -        { -            reloadObjects(); -        } -        catch (...) -        { -            tryLogCurrentException(log, "Failed to load user-defined objects"); -        } -    } -    startWatchingThread(); -} - - -void UserDefinedSQLObjectsLoaderFromZooKeeper::processWatchQueue() -{ -    LOG_DEBUG(log, "Started watching thread"); -    setThreadName("UserDefObjWatch"); - -    while (watching_flag) -    { -        try -        { -            UserDefinedSQLObjectTypeAndName watched_object; - -            /// Re-initialize ZooKeeper session if expired and refresh objects -            initZooKeeperIfNeeded(); - -            if (!watch_queue->tryPop(watched_object, /* timeout_ms: */ 10000)) -                continue; - -            auto zookeeper = getZooKeeper(); -            const auto & [object_type, object_name] = watched_object; - -            if (object_name.empty()) -                syncObjects(zookeeper, object_type); -            else -                refreshObject(zookeeper, object_type, object_name); -        } -        catch (...) -        { -            tryLogCurrentException(log, "Will try to restart watching thread after error"); -            resetAfterError(); -            sleepForSeconds(5); -        } -    } - -    LOG_DEBUG(log, "Stopped watching thread"); -} - - -void UserDefinedSQLObjectsLoaderFromZooKeeper::stopWatching() -{ -    stopWatchingThread(); -} - - -void UserDefinedSQLObjectsLoaderFromZooKeeper::reloadObjects() -{ -    auto zookeeper = getZooKeeper(); -    refreshAllObjects(zookeeper); -    startWatchingThread(); -} - - -void UserDefinedSQLObjectsLoaderFromZooKeeper::reloadObject(UserDefinedSQLObjectType object_type, const String & object_name) -{ -    auto zookeeper = getZooKeeper(); -    refreshObject(zookeeper, object_type, object_name); -} - - -void UserDefinedSQLObjectsLoaderFromZooKeeper::createRootNodes(const zkutil::ZooKeeperPtr & zookeeper) -{ -    zookeeper->createAncestors(zookeeper_path); -    zookeeper->createIfNotExists(zookeeper_path, ""); -} - -bool UserDefinedSQLObjectsLoaderFromZooKeeper::storeObject( -    UserDefinedSQLObjectType object_type, -    const String & object_name, -    const IAST & create_object_query, -    bool throw_if_exists, -    bool replace_if_exists, -    const Settings &) -{ -    String path = getNodePath(zookeeper_path, object_type, object_name); -    LOG_DEBUG(log, "Storing user-defined object {} at zk path {}", backQuote(object_name), path); - -    WriteBufferFromOwnString create_statement_buf; -    formatAST(create_object_query, create_statement_buf, false); -    writeChar('\n', create_statement_buf); -    String create_statement = create_statement_buf.str(); - -    auto zookeeper = getZooKeeper(); - -    size_t num_attempts = 10; -    while (true) -    { -        auto code = zookeeper->tryCreate(path, create_statement, zkutil::CreateMode::Persistent); -        if ((code != Coordination::Error::ZOK) && (code != Coordination::Error::ZNODEEXISTS)) -            throw zkutil::KeeperException::fromPath(code, path); - -        if (code == Coordination::Error::ZNODEEXISTS) -        { -            if (throw_if_exists) -                throw Exception(ErrorCodes::FUNCTION_ALREADY_EXISTS, "User-defined function '{}' already exists", object_name); -            else if (!replace_if_exists) -                return false; - -            code = zookeeper->trySet(path, create_statement); -            if ((code != Coordination::Error::ZOK) && (code != Coordination::Error::ZNONODE)) -                throw zkutil::KeeperException::fromPath(code, path); -        } - -        if (code == Coordination::Error::ZOK) -            break; - -        if (!--num_attempts) -            throw zkutil::KeeperException::fromPath(code, path); -    } -    LOG_DEBUG(log, "Object {} stored", backQuote(object_name)); - -    /// Refresh object and set watch for it. Because it can be replaced by another node after creation. -    refreshObject(zookeeper, object_type, object_name); - -    return true; -} - - -bool UserDefinedSQLObjectsLoaderFromZooKeeper::removeObject( -    UserDefinedSQLObjectType object_type, const String & object_name, bool throw_if_not_exists) -{ -    String path = getNodePath(zookeeper_path, object_type, object_name); -    LOG_DEBUG(log, "Removing user-defined object {} at zk path {}", backQuote(object_name), path); - -    auto zookeeper = getZooKeeper(); - -    auto code = zookeeper->tryRemove(path); -    if ((code != Coordination::Error::ZOK) && (code != Coordination::Error::ZNONODE)) -        throw zkutil::KeeperException::fromPath(code, path); - -    if (code == Coordination::Error::ZNONODE) -    { -        if (throw_if_not_exists) -            throw Exception(ErrorCodes::UNKNOWN_FUNCTION, "User-defined object '{}' doesn't exist", object_name); -        else -            return false; -    } - -    LOG_DEBUG(log, "Object {} removed", backQuote(object_name)); -    return true; -} - -bool UserDefinedSQLObjectsLoaderFromZooKeeper::getObjectDataAndSetWatch( -    const zkutil::ZooKeeperPtr & zookeeper, -    String & data, -    const String & path, -    UserDefinedSQLObjectType object_type, -    const String & object_name) -{ -    const auto object_watcher = [my_watch_queue = watch_queue, object_type, object_name](const Coordination::WatchResponse & response) -    { -        if (response.type == Coordination::Event::CHANGED) -        { -            [[maybe_unused]] bool inserted = my_watch_queue->emplace(object_type, object_name); -            /// `inserted` can be false if `watch_queue` was already finalized (which happens when stopWatching() is called). -        } -        /// Event::DELETED is processed as child event by getChildren watch -    }; - -    Coordination::Stat entity_stat; -    String object_create_query; -    return zookeeper->tryGetWatch(path, data, &entity_stat, object_watcher); -} - -ASTPtr UserDefinedSQLObjectsLoaderFromZooKeeper::parseObjectData(const String & object_data, UserDefinedSQLObjectType object_type) -{ -    switch (object_type) -    { -        case UserDefinedSQLObjectType::Function: { -            ParserCreateFunctionQuery parser; -            ASTPtr ast = parseQuery( -                parser, -                object_data.data(), -                object_data.data() + object_data.size(), -                "", -                0, -                global_context->getSettingsRef().max_parser_depth); -            return ast; -        } -    } -    UNREACHABLE(); -} - -ASTPtr UserDefinedSQLObjectsLoaderFromZooKeeper::tryLoadObject( -    const zkutil::ZooKeeperPtr & zookeeper, UserDefinedSQLObjectType object_type, const String & object_name) -{ -    String path = getNodePath(zookeeper_path, object_type, object_name); -    LOG_DEBUG(log, "Loading user defined object {} from zk path {}", backQuote(object_name), path); - -    try -    { -        String object_data; -        bool exists = getObjectDataAndSetWatch(zookeeper, object_data, path, object_type, object_name); - -        if (!exists) -        { -            LOG_INFO(log, "User-defined object '{}' can't be loaded from path {}, because it doesn't exist", backQuote(object_name), path); -            return nullptr; -        } - -        return parseObjectData(object_data, object_type); -    } -    catch (...) -    { -        tryLogCurrentException(log, fmt::format("while loading user defined SQL object {}", backQuote(object_name))); -        return nullptr; /// Failed to load this sql object, will ignore it -    } -} - -Strings UserDefinedSQLObjectsLoaderFromZooKeeper::getObjectNamesAndSetWatch( -    const zkutil::ZooKeeperPtr & zookeeper, UserDefinedSQLObjectType object_type) -{ -    auto object_list_watcher = [my_watch_queue = watch_queue, object_type](const Coordination::WatchResponse &) -    { -        [[maybe_unused]] bool inserted = my_watch_queue->emplace(object_type, ""); -        /// `inserted` can be false if `watch_queue` was already finalized (which happens when stopWatching() is called). -    }; - -    Coordination::Stat stat; -    const auto node_names = zookeeper->getChildrenWatch(zookeeper_path, &stat, object_list_watcher); -    const auto prefix = getNodePrefix(object_type); - -    Strings object_names; -    object_names.reserve(node_names.size()); -    for (const auto & node_name : node_names) -    { -        if (node_name.starts_with(prefix) && node_name.ends_with(sql_extension)) -        { -            String object_name = unescapeForFileName(node_name.substr(prefix.length(), node_name.length() - prefix.length() - sql_extension.length())); -            if (!object_name.empty()) -                object_names.push_back(std::move(object_name)); -        } -    } - -    return object_names; -} - -void UserDefinedSQLObjectsLoaderFromZooKeeper::refreshAllObjects(const zkutil::ZooKeeperPtr & zookeeper) -{ -    /// It doesn't make sense to keep the old watch events because we will reread everything in this function. -    watch_queue->clear(); - -    refreshObjects(zookeeper, UserDefinedSQLObjectType::Function); -    objects_loaded = true; -} - -void UserDefinedSQLObjectsLoaderFromZooKeeper::refreshObjects(const zkutil::ZooKeeperPtr & zookeeper, UserDefinedSQLObjectType object_type) -{ -    LOG_DEBUG(log, "Refreshing all user-defined {} objects", object_type); -    Strings object_names = getObjectNamesAndSetWatch(zookeeper, object_type); - -    /// Read & parse all SQL objects from ZooKeeper -    std::vector<std::pair<String, ASTPtr>> function_names_and_asts; -    for (const auto & function_name : object_names) -    { -        if (auto ast = tryLoadObject(zookeeper, UserDefinedSQLObjectType::Function, function_name)) -            function_names_and_asts.emplace_back(function_name, ast); -    } - -    UserDefinedSQLFunctionFactory::instance().setAllFunctions(function_names_and_asts); - -    LOG_DEBUG(log, "All user-defined {} objects refreshed", object_type); -} - -void UserDefinedSQLObjectsLoaderFromZooKeeper::syncObjects(const zkutil::ZooKeeperPtr & zookeeper, UserDefinedSQLObjectType object_type) -{ -    LOG_DEBUG(log, "Syncing user-defined {} objects", object_type); -    Strings object_names = getObjectNamesAndSetWatch(zookeeper, object_type); - -    auto & factory = UserDefinedSQLFunctionFactory::instance(); -    auto lock = factory.getLock(); - -    /// Remove stale objects -    factory.removeAllFunctionsExcept(object_names); -    /// Read & parse only new SQL objects from ZooKeeper -    for (const auto & function_name : object_names) -    { -        if (!UserDefinedSQLFunctionFactory::instance().has(function_name)) -            refreshObject(zookeeper, UserDefinedSQLObjectType::Function, function_name); -    } - -    LOG_DEBUG(log, "User-defined {} objects synced", object_type); -} - -void UserDefinedSQLObjectsLoaderFromZooKeeper::refreshObject( -    const zkutil::ZooKeeperPtr & zookeeper, UserDefinedSQLObjectType object_type, const String & object_name) -{ -    auto ast = tryLoadObject(zookeeper, object_type, object_name); -    auto & factory = UserDefinedSQLFunctionFactory::instance(); - -    if (ast) -        factory.setFunction(object_name, *ast); -    else -        factory.removeFunction(object_name); -} - -} diff --git a/contrib/clickhouse/src/Functions/UserDefined/UserDefinedSQLObjectsStorageBase.cpp b/contrib/clickhouse/src/Functions/UserDefined/UserDefinedSQLObjectsStorageBase.cpp new file mode 100644 index 00000000000..8d7a18d93bc --- /dev/null +++ b/contrib/clickhouse/src/Functions/UserDefined/UserDefinedSQLObjectsStorageBase.cpp @@ -0,0 +1,190 @@ +#include "Functions/UserDefined/UserDefinedSQLObjectsStorageBase.h" + +#include <boost/container/flat_set.hpp> + +#include <Interpreters/FunctionNameNormalizer.h> +#include <Parsers/ASTCreateFunctionQuery.h> + +namespace DB +{ + +namespace ErrorCodes +{ +    extern const int FUNCTION_ALREADY_EXISTS; +    extern const int UNKNOWN_FUNCTION; +} + +namespace +{ + +ASTPtr normalizeCreateFunctionQuery(const IAST & create_function_query) +{ +    auto ptr = create_function_query.clone(); +    auto & res = typeid_cast<ASTCreateFunctionQuery &>(*ptr); +    res.if_not_exists = false; +    res.or_replace = false; +    FunctionNameNormalizer().visit(res.function_core.get()); +    return ptr; +} + +} + +ASTPtr UserDefinedSQLObjectsStorageBase::get(const String & object_name) const +{ +    std::lock_guard lock(mutex); + +    auto it = object_name_to_create_object_map.find(object_name); +    if (it == object_name_to_create_object_map.end()) +        throw Exception(ErrorCodes::UNKNOWN_FUNCTION, +            "The object name '{}' is not saved", +            object_name); + +    return it->second; +} + +ASTPtr UserDefinedSQLObjectsStorageBase::tryGet(const std::string & object_name) const +{ +    std::lock_guard lock(mutex); + +    auto it = object_name_to_create_object_map.find(object_name); +    if (it == object_name_to_create_object_map.end()) +        return nullptr; + +    return it->second; +} + +bool UserDefinedSQLObjectsStorageBase::has(const String & object_name) const +{ +    return tryGet(object_name) != nullptr; +} + +std::vector<std::string> UserDefinedSQLObjectsStorageBase::getAllObjectNames() const +{ +    std::vector<std::string> object_names; + +    std::lock_guard lock(mutex); +    object_names.reserve(object_name_to_create_object_map.size()); + +    for (const auto & [name, _] : object_name_to_create_object_map) +        object_names.emplace_back(name); + +    return object_names; +} + +bool UserDefinedSQLObjectsStorageBase::empty() const +{ +    std::lock_guard lock(mutex); +    return object_name_to_create_object_map.empty(); +} + +bool UserDefinedSQLObjectsStorageBase::storeObject( +    const ContextPtr & current_context, +    UserDefinedSQLObjectType object_type, +    const String & object_name, +    ASTPtr create_object_query, +    bool throw_if_exists, +    bool replace_if_exists, +    const Settings & settings) +{ +    std::lock_guard lock{mutex}; +    auto it = object_name_to_create_object_map.find(object_name); +    if (it != object_name_to_create_object_map.end()) +    { +        if (throw_if_exists) +            throw Exception(ErrorCodes::FUNCTION_ALREADY_EXISTS, "User-defined object '{}' already exists", object_name); +        else if (!replace_if_exists) +            return false; +    } + +    bool stored = storeObjectImpl( +        current_context, +        object_type, +        object_name, +        create_object_query, +        throw_if_exists, +        replace_if_exists, +        settings); +     +    if (stored) +        object_name_to_create_object_map[object_name] = create_object_query; + +    return stored; +} + +bool UserDefinedSQLObjectsStorageBase::removeObject( +        const ContextPtr & current_context, +        UserDefinedSQLObjectType object_type, +        const String & object_name, +        bool throw_if_not_exists) +{ +    std::lock_guard lock(mutex); +    auto it = object_name_to_create_object_map.find(object_name); +    if (it == object_name_to_create_object_map.end()) +    { +        if (throw_if_not_exists) +            throw Exception(ErrorCodes::UNKNOWN_FUNCTION, "User-defined object '{}' doesn't exist", object_name); +        else +            return false; +    } + +    bool removed = removeObjectImpl( +        current_context, +        object_type, +        object_name, +        throw_if_not_exists); + +    if (removed) +        object_name_to_create_object_map.erase(object_name); + +    return removed; +} + +std::unique_lock<std::recursive_mutex> UserDefinedSQLObjectsStorageBase::getLock() const +{ +    return std::unique_lock{mutex}; +} + +void UserDefinedSQLObjectsStorageBase::setAllObjects(const std::vector<std::pair<String, ASTPtr>> & new_objects) +{ +    std::unordered_map<String, ASTPtr> normalized_functions; +    for (const auto & [function_name, create_query] : new_objects) +        normalized_functions[function_name] = normalizeCreateFunctionQuery(*create_query); + +    std::lock_guard lock(mutex); +    object_name_to_create_object_map = std::move(normalized_functions); +} + +std::vector<std::pair<String, ASTPtr>> UserDefinedSQLObjectsStorageBase::getAllObjects() const +{ +    std::lock_guard lock{mutex}; +    std::vector<std::pair<String, ASTPtr>> all_objects; +    all_objects.reserve(object_name_to_create_object_map.size()); +    std::copy(object_name_to_create_object_map.begin(), object_name_to_create_object_map.end(), std::back_inserter(all_objects)); +    return all_objects; +} + +void UserDefinedSQLObjectsStorageBase::setObject(const String & object_name, const IAST & create_object_query) +{ +    std::lock_guard lock(mutex); +    object_name_to_create_object_map[object_name] = normalizeCreateFunctionQuery(create_object_query); +} + +void UserDefinedSQLObjectsStorageBase::removeObject(const String & object_name) +{ +    std::lock_guard lock(mutex); +    object_name_to_create_object_map.erase(object_name); +} + +void UserDefinedSQLObjectsStorageBase::removeAllObjectsExcept(const Strings & object_names_to_keep) +{ +    boost::container::flat_set<std::string_view> names_set_to_keep{object_names_to_keep.begin(), object_names_to_keep.end()}; +    std::lock_guard lock(mutex); +    for (auto it = object_name_to_create_object_map.begin(); it != object_name_to_create_object_map.end();) +    { +        auto current = it++; +        if (!names_set_to_keep.contains(current->first)) +            object_name_to_create_object_map.erase(current); +    } +} + +} diff --git a/contrib/clickhouse/src/Functions/UserDefined/UserDefinedSQLObjectsStorageBase.h b/contrib/clickhouse/src/Functions/UserDefined/UserDefinedSQLObjectsStorageBase.h new file mode 100644 index 00000000000..cab63a3bfcf --- /dev/null +++ b/contrib/clickhouse/src/Functions/UserDefined/UserDefinedSQLObjectsStorageBase.h @@ -0,0 +1,69 @@ +#pragma once + +#include <unordered_map> +#include <mutex> + +#include <Functions/UserDefined/IUserDefinedSQLObjectsStorage.h> + +#include <Parsers/IAST.h> + +namespace DB +{ + +class UserDefinedSQLObjectsStorageBase : public IUserDefinedSQLObjectsStorage +{ +public: +    ASTPtr get(const String & object_name) const override; + +    ASTPtr tryGet(const String & object_name) const override; + +    bool has(const String & object_name) const override; + +    std::vector<String> getAllObjectNames() const override; + +    std::vector<std::pair<String, ASTPtr>> getAllObjects() const override; + +    bool empty() const override; + +    bool storeObject( +        const ContextPtr & current_context, +        UserDefinedSQLObjectType object_type, +        const String & object_name, +        ASTPtr create_object_query, +        bool throw_if_exists, +        bool replace_if_exists, +        const Settings & settings) override; + +    bool removeObject( +        const ContextPtr & current_context, +        UserDefinedSQLObjectType object_type, +        const String & object_name, +        bool throw_if_not_exists) override; + +protected: +    virtual bool storeObjectImpl( +        const ContextPtr & current_context, +        UserDefinedSQLObjectType object_type, +        const String & object_name, +        ASTPtr create_object_query, +        bool throw_if_exists, +        bool replace_if_exists, +        const Settings & settings) = 0; + +    virtual bool removeObjectImpl( +        const ContextPtr & current_context, +        UserDefinedSQLObjectType object_type, +        const String & object_name, +        bool throw_if_not_exists) = 0; + +    std::unique_lock<std::recursive_mutex> getLock() const; +    void setAllObjects(const std::vector<std::pair<String, ASTPtr>> & new_objects); +    void setObject(const String & object_name, const IAST & create_object_query); +    void removeObject(const String & object_name); +    void removeAllObjectsExcept(const Strings & object_names_to_keep); + +    std::unordered_map<String, ASTPtr> object_name_to_create_object_map; +    mutable std::recursive_mutex mutex; +}; + +} diff --git a/contrib/clickhouse/src/Functions/UserDefined/UserDefinedSQLObjectsZooKeeperStorage.cpp b/contrib/clickhouse/src/Functions/UserDefined/UserDefinedSQLObjectsZooKeeperStorage.cpp new file mode 100644 index 00000000000..6e5a5338437 --- /dev/null +++ b/contrib/clickhouse/src/Functions/UserDefined/UserDefinedSQLObjectsZooKeeperStorage.cpp @@ -0,0 +1,435 @@ +#include <Functions/UserDefined/UserDefinedSQLObjectsZooKeeperStorage.h> + +#include <Functions/UserDefined/UserDefinedSQLFunctionFactory.h> +#include <Functions/UserDefined/UserDefinedSQLObjectType.h> +#include <Interpreters/Context.h> +#include <Parsers/ParserCreateFunctionQuery.h> +#include <Parsers/formatAST.h> +#include <Parsers/parseQuery.h> +#include <base/sleep.h> +#include <Common/Exception.h> +#include <Common/ZooKeeper/KeeperException.h> +#include <Common/escapeForFileName.h> +#include <Common/logger_useful.h> +#include <Common/quoteString.h> +#include <Common/scope_guard_safe.h> +#include <Common/setThreadName.h> + + +namespace DB +{ + +namespace ErrorCodes +{ +    extern const int FUNCTION_ALREADY_EXISTS; +    extern const int UNKNOWN_FUNCTION; +    extern const int BAD_ARGUMENTS; +} + +namespace +{ +    std::string_view getNodePrefix(UserDefinedSQLObjectType object_type) +    { +        switch (object_type) +        { +            case UserDefinedSQLObjectType::Function: +                return "function_"; +        } +        UNREACHABLE(); +    } + +    constexpr std::string_view sql_extension = ".sql"; + +    String getNodePath(const String & root_path, UserDefinedSQLObjectType object_type, const String & object_name) +    { +        return root_path + "/" + String{getNodePrefix(object_type)} + escapeForFileName(object_name) + String{sql_extension}; +    } +} + + +UserDefinedSQLObjectsZooKeeperStorage::UserDefinedSQLObjectsZooKeeperStorage( +    const ContextPtr & global_context_, const String & zookeeper_path_) +    : global_context{global_context_} +    , zookeeper_getter{[global_context_]() { return global_context_->getZooKeeper(); }} +    , zookeeper_path{zookeeper_path_} +    , watch_queue{std::make_shared<ConcurrentBoundedQueue<std::pair<UserDefinedSQLObjectType, String>>>(std::numeric_limits<size_t>::max())} +    , log{&Poco::Logger::get("UserDefinedSQLObjectsLoaderFromZooKeeper")} +{ +    if (zookeeper_path.empty()) +        throw Exception(ErrorCodes::BAD_ARGUMENTS, "ZooKeeper path must be non-empty"); + +    if (zookeeper_path.back() == '/') +        zookeeper_path.resize(zookeeper_path.size() - 1); + +    /// If zookeeper chroot prefix is used, path should start with '/', because chroot concatenates without it. +    if (zookeeper_path.front() != '/') +        zookeeper_path = "/" + zookeeper_path; +} + +UserDefinedSQLObjectsZooKeeperStorage::~UserDefinedSQLObjectsZooKeeperStorage() +{ +    SCOPE_EXIT_SAFE(stopWatchingThread()); +} + +void UserDefinedSQLObjectsZooKeeperStorage::startWatchingThread() +{ +    if (!watching_flag.exchange(true)) +    { +        watching_thread = ThreadFromGlobalPool(&UserDefinedSQLObjectsZooKeeperStorage::processWatchQueue, this); +    } +} + +void UserDefinedSQLObjectsZooKeeperStorage::stopWatchingThread() +{ +    if (watching_flag.exchange(false)) +    { +        watch_queue->finish(); +        if (watching_thread.joinable()) +            watching_thread.join(); +    } +} + +zkutil::ZooKeeperPtr UserDefinedSQLObjectsZooKeeperStorage::getZooKeeper() +{ +    auto [zookeeper, session_status] = zookeeper_getter.getZooKeeper(); + +    if (session_status == zkutil::ZooKeeperCachingGetter::SessionStatus::New) +    { +        /// It's possible that we connected to different [Zoo]Keeper instance +        /// so we may read a bit stale state. +        zookeeper->sync(zookeeper_path); + +        createRootNodes(zookeeper); +        refreshAllObjects(zookeeper); +    } + +    return zookeeper; +} + +void UserDefinedSQLObjectsZooKeeperStorage::initZooKeeperIfNeeded() +{ +    getZooKeeper(); +} + +void UserDefinedSQLObjectsZooKeeperStorage::resetAfterError() +{ +    zookeeper_getter.resetCache(); +} + + +void UserDefinedSQLObjectsZooKeeperStorage::loadObjects() +{ +    /// loadObjects() is called at start from Server::main(), so it's better not to stop here on no connection to ZooKeeper or any other error. +    /// However the watching thread must be started anyway in case the connection will be established later. +    if (!objects_loaded) +    { +        try +        { +            reloadObjects(); +        } +        catch (...) +        { +            tryLogCurrentException(log, "Failed to load user-defined objects"); +        } +    } +    startWatchingThread(); +} + + +void UserDefinedSQLObjectsZooKeeperStorage::processWatchQueue() +{ +    LOG_DEBUG(log, "Started watching thread"); +    setThreadName("UserDefObjWatch"); + +    while (watching_flag) +    { +        try +        { +            UserDefinedSQLObjectTypeAndName watched_object; + +            /// Re-initialize ZooKeeper session if expired and refresh objects +            initZooKeeperIfNeeded(); + +            if (!watch_queue->tryPop(watched_object, /* timeout_ms: */ 10000)) +                continue; + +            auto zookeeper = getZooKeeper(); +            const auto & [object_type, object_name] = watched_object; + +            if (object_name.empty()) +                syncObjects(zookeeper, object_type); +            else +                refreshObject(zookeeper, object_type, object_name); +        } +        catch (...) +        { +            tryLogCurrentException(log, "Will try to restart watching thread after error"); +            resetAfterError(); +            sleepForSeconds(5); +        } +    } + +    LOG_DEBUG(log, "Stopped watching thread"); +} + + +void UserDefinedSQLObjectsZooKeeperStorage::stopWatching() +{ +    stopWatchingThread(); +} + + +void UserDefinedSQLObjectsZooKeeperStorage::reloadObjects() +{ +    auto zookeeper = getZooKeeper(); +    refreshAllObjects(zookeeper); +    startWatchingThread(); +} + + +void UserDefinedSQLObjectsZooKeeperStorage::reloadObject(UserDefinedSQLObjectType object_type, const String & object_name) +{ +    auto zookeeper = getZooKeeper(); +    refreshObject(zookeeper, object_type, object_name); +} + + +void UserDefinedSQLObjectsZooKeeperStorage::createRootNodes(const zkutil::ZooKeeperPtr & zookeeper) +{ +    zookeeper->createAncestors(zookeeper_path); +    zookeeper->createIfNotExists(zookeeper_path, ""); +} + +bool UserDefinedSQLObjectsZooKeeperStorage::storeObjectImpl( +    const ContextPtr & /*current_context*/, +    UserDefinedSQLObjectType object_type, +    const String & object_name, +    ASTPtr create_object_query, +    bool throw_if_exists, +    bool replace_if_exists, +    const Settings &) +{ +    String path = getNodePath(zookeeper_path, object_type, object_name); +    LOG_DEBUG(log, "Storing user-defined object {} at zk path {}", backQuote(object_name), path); + +    WriteBufferFromOwnString create_statement_buf; +    formatAST(*create_object_query, create_statement_buf, false); +    writeChar('\n', create_statement_buf); +    String create_statement = create_statement_buf.str(); + +    auto zookeeper = getZooKeeper(); + +    size_t num_attempts = 10; +    while (true) +    { +        auto code = zookeeper->tryCreate(path, create_statement, zkutil::CreateMode::Persistent); +        if ((code != Coordination::Error::ZOK) && (code != Coordination::Error::ZNODEEXISTS)) +            throw zkutil::KeeperException::fromPath(code, path); + +        if (code == Coordination::Error::ZNODEEXISTS) +        { +            if (throw_if_exists) +                throw Exception(ErrorCodes::FUNCTION_ALREADY_EXISTS, "User-defined function '{}' already exists", object_name); +            else if (!replace_if_exists) +                return false; + +            code = zookeeper->trySet(path, create_statement); +            if ((code != Coordination::Error::ZOK) && (code != Coordination::Error::ZNONODE)) +                throw zkutil::KeeperException::fromPath(code, path); +        } + +        if (code == Coordination::Error::ZOK) +            break; + +        if (!--num_attempts) +            throw zkutil::KeeperException::fromPath(code, path); +    } +    LOG_DEBUG(log, "Object {} stored", backQuote(object_name)); + +    /// Refresh object and set watch for it. Because it can be replaced by another node after creation. +    refreshObject(zookeeper, object_type, object_name); + +    return true; +} + + +bool UserDefinedSQLObjectsZooKeeperStorage::removeObjectImpl( +    const ContextPtr & /*current_context*/, +    UserDefinedSQLObjectType object_type, +    const String & object_name, +    bool throw_if_not_exists) +{ +    String path = getNodePath(zookeeper_path, object_type, object_name); +    LOG_DEBUG(log, "Removing user-defined object {} at zk path {}", backQuote(object_name), path); + +    auto zookeeper = getZooKeeper(); + +    auto code = zookeeper->tryRemove(path); +    if ((code != Coordination::Error::ZOK) && (code != Coordination::Error::ZNONODE)) +        throw zkutil::KeeperException::fromPath(code, path); + +    if (code == Coordination::Error::ZNONODE) +    { +        if (throw_if_not_exists) +            throw Exception(ErrorCodes::UNKNOWN_FUNCTION, "User-defined object '{}' doesn't exist", object_name); +        else +            return false; +    } + +    LOG_DEBUG(log, "Object {} removed", backQuote(object_name)); +    return true; +} + +bool UserDefinedSQLObjectsZooKeeperStorage::getObjectDataAndSetWatch( +    const zkutil::ZooKeeperPtr & zookeeper, +    String & data, +    const String & path, +    UserDefinedSQLObjectType object_type, +    const String & object_name) +{ +    const auto object_watcher = [my_watch_queue = watch_queue, object_type, object_name](const Coordination::WatchResponse & response) +    { +        if (response.type == Coordination::Event::CHANGED) +        { +            [[maybe_unused]] bool inserted = my_watch_queue->emplace(object_type, object_name); +            /// `inserted` can be false if `watch_queue` was already finalized (which happens when stopWatching() is called). +        } +        /// Event::DELETED is processed as child event by getChildren watch +    }; + +    Coordination::Stat entity_stat; +    String object_create_query; +    return zookeeper->tryGetWatch(path, data, &entity_stat, object_watcher); +} + +ASTPtr UserDefinedSQLObjectsZooKeeperStorage::parseObjectData(const String & object_data, UserDefinedSQLObjectType object_type) +{ +    switch (object_type) +    { +        case UserDefinedSQLObjectType::Function: { +            ParserCreateFunctionQuery parser; +            ASTPtr ast = parseQuery( +                parser, +                object_data.data(), +                object_data.data() + object_data.size(), +                "", +                0, +                global_context->getSettingsRef().max_parser_depth); +            return ast; +        } +    } +    UNREACHABLE(); +} + +ASTPtr UserDefinedSQLObjectsZooKeeperStorage::tryLoadObject( +    const zkutil::ZooKeeperPtr & zookeeper, UserDefinedSQLObjectType object_type, const String & object_name) +{ +    String path = getNodePath(zookeeper_path, object_type, object_name); +    LOG_DEBUG(log, "Loading user defined object {} from zk path {}", backQuote(object_name), path); + +    try +    { +        String object_data; +        bool exists = getObjectDataAndSetWatch(zookeeper, object_data, path, object_type, object_name); + +        if (!exists) +        { +            LOG_INFO(log, "User-defined object '{}' can't be loaded from path {}, because it doesn't exist", backQuote(object_name), path); +            return nullptr; +        } + +        return parseObjectData(object_data, object_type); +    } +    catch (...) +    { +        tryLogCurrentException(log, fmt::format("while loading user defined SQL object {}", backQuote(object_name))); +        return nullptr; /// Failed to load this sql object, will ignore it +    } +} + +Strings UserDefinedSQLObjectsZooKeeperStorage::getObjectNamesAndSetWatch( +    const zkutil::ZooKeeperPtr & zookeeper, UserDefinedSQLObjectType object_type) +{ +    auto object_list_watcher = [my_watch_queue = watch_queue, object_type](const Coordination::WatchResponse &) +    { +        [[maybe_unused]] bool inserted = my_watch_queue->emplace(object_type, ""); +        /// `inserted` can be false if `watch_queue` was already finalized (which happens when stopWatching() is called). +    }; + +    Coordination::Stat stat; +    const auto node_names = zookeeper->getChildrenWatch(zookeeper_path, &stat, object_list_watcher); +    const auto prefix = getNodePrefix(object_type); + +    Strings object_names; +    object_names.reserve(node_names.size()); +    for (const auto & node_name : node_names) +    { +        if (node_name.starts_with(prefix) && node_name.ends_with(sql_extension)) +        { +            String object_name = unescapeForFileName(node_name.substr(prefix.length(), node_name.length() - prefix.length() - sql_extension.length())); +            if (!object_name.empty()) +                object_names.push_back(std::move(object_name)); +        } +    } + +    return object_names; +} + +void UserDefinedSQLObjectsZooKeeperStorage::refreshAllObjects(const zkutil::ZooKeeperPtr & zookeeper) +{ +    /// It doesn't make sense to keep the old watch events because we will reread everything in this function. +    watch_queue->clear(); + +    refreshObjects(zookeeper, UserDefinedSQLObjectType::Function); +    objects_loaded = true; +} + +void UserDefinedSQLObjectsZooKeeperStorage::refreshObjects(const zkutil::ZooKeeperPtr & zookeeper, UserDefinedSQLObjectType object_type) +{ +    LOG_DEBUG(log, "Refreshing all user-defined {} objects", object_type); +    Strings object_names = getObjectNamesAndSetWatch(zookeeper, object_type); + +    /// Read & parse all SQL objects from ZooKeeper +    std::vector<std::pair<String, ASTPtr>> function_names_and_asts; +    for (const auto & function_name : object_names) +    { +        if (auto ast = tryLoadObject(zookeeper, UserDefinedSQLObjectType::Function, function_name)) +            function_names_and_asts.emplace_back(function_name, ast); +    } + +    setAllObjects(function_names_and_asts); + +    LOG_DEBUG(log, "All user-defined {} objects refreshed", object_type); +} + +void UserDefinedSQLObjectsZooKeeperStorage::syncObjects(const zkutil::ZooKeeperPtr & zookeeper, UserDefinedSQLObjectType object_type) +{ +    LOG_DEBUG(log, "Syncing user-defined {} objects", object_type); +    Strings object_names = getObjectNamesAndSetWatch(zookeeper, object_type); + +    getLock(); + +    /// Remove stale objects +    removeAllObjectsExcept(object_names); +    /// Read & parse only new SQL objects from ZooKeeper +    for (const auto & function_name : object_names) +    { +        if (!UserDefinedSQLFunctionFactory::instance().has(function_name)) +            refreshObject(zookeeper, UserDefinedSQLObjectType::Function, function_name); +    } + +    LOG_DEBUG(log, "User-defined {} objects synced", object_type); +} + +void UserDefinedSQLObjectsZooKeeperStorage::refreshObject( +    const zkutil::ZooKeeperPtr & zookeeper, UserDefinedSQLObjectType object_type, const String & object_name) +{ +    auto ast = tryLoadObject(zookeeper, object_type, object_name); + +    if (ast) +        setObject(object_name, *ast); +    else +        removeObject(object_name); +} + +} diff --git a/contrib/clickhouse/src/Functions/UserDefined/UserDefinedSQLObjectsLoaderFromZooKeeper.h b/contrib/clickhouse/src/Functions/UserDefined/UserDefinedSQLObjectsZooKeeperStorage.h index 38e061fd4d9..9f41763c59c 100644 --- a/contrib/clickhouse/src/Functions/UserDefined/UserDefinedSQLObjectsLoaderFromZooKeeper.h +++ b/contrib/clickhouse/src/Functions/UserDefined/UserDefinedSQLObjectsZooKeeperStorage.h @@ -1,6 +1,6 @@  #pragma once -#include <Functions/UserDefined/IUserDefinedSQLObjectsLoader.h> +#include <Functions/UserDefined/UserDefinedSQLObjectsStorageBase.h>  #include <Interpreters/Context_fwd.h>  #include <Parsers/IAST_fwd.h>  #include <Common/ConcurrentBoundedQueue.h> @@ -12,11 +12,11 @@ namespace DB  {  /// Loads user-defined sql objects from ZooKeeper. -class UserDefinedSQLObjectsLoaderFromZooKeeper : public IUserDefinedSQLObjectsLoader +class UserDefinedSQLObjectsZooKeeperStorage : public UserDefinedSQLObjectsStorageBase  {  public: -    UserDefinedSQLObjectsLoaderFromZooKeeper(const ContextPtr & global_context_, const String & zookeeper_path_); -    ~UserDefinedSQLObjectsLoaderFromZooKeeper() override; +    UserDefinedSQLObjectsZooKeeperStorage(const ContextPtr & global_context_, const String & zookeeper_path_); +    ~UserDefinedSQLObjectsZooKeeperStorage() override;      bool isReplicated() const override { return true; }      String getReplicationID() const override { return zookeeper_path; } @@ -26,16 +26,21 @@ public:      void reloadObjects() override;      void reloadObject(UserDefinedSQLObjectType object_type, const String & object_name) override; -    bool storeObject( +private: +    bool storeObjectImpl( +        const ContextPtr & current_context,          UserDefinedSQLObjectType object_type,          const String & object_name, -        const IAST & create_object_query, +        ASTPtr create_object_query,          bool throw_if_exists,          bool replace_if_exists,          const Settings & settings) override; -    bool removeObject(UserDefinedSQLObjectType object_type, const String & object_name, bool throw_if_not_exists) override; +    bool removeObjectImpl( +        const ContextPtr & current_context, +        UserDefinedSQLObjectType object_type, +        const String & object_name, +        bool throw_if_not_exists) override; -private:      void processWatchQueue();      zkutil::ZooKeeperPtr getZooKeeper(); diff --git a/contrib/clickhouse/src/Functions/UserDefined/createUserDefinedSQLObjectsLoader.cpp b/contrib/clickhouse/src/Functions/UserDefined/createUserDefinedSQLObjectsLoader.cpp index b7ebc7abf14..e69de29bb2d 100644 --- a/contrib/clickhouse/src/Functions/UserDefined/createUserDefinedSQLObjectsLoader.cpp +++ b/contrib/clickhouse/src/Functions/UserDefined/createUserDefinedSQLObjectsLoader.cpp @@ -1,44 +0,0 @@ -#include <Functions/UserDefined/createUserDefinedSQLObjectsLoader.h> -#include <Functions/UserDefined/UserDefinedSQLObjectsLoaderFromDisk.h> -#include <Functions/UserDefined/UserDefinedSQLObjectsLoaderFromZooKeeper.h> -#include <Interpreters/Context.h> -#include <Poco/Util/AbstractConfiguration.h> -#include <filesystem> - -namespace fs = std::filesystem; - - -namespace DB -{ - - -namespace ErrorCodes -{ -    extern const int INVALID_CONFIG_PARAMETER; -} - -std::unique_ptr<IUserDefinedSQLObjectsLoader> createUserDefinedSQLObjectsLoader(const ContextMutablePtr & global_context) -{ -    const String zookeeper_path_key = "user_defined_zookeeper_path"; -    const String disk_path_key = "user_defined_path"; - -    const auto & config = global_context->getConfigRef(); -    if (config.has(zookeeper_path_key)) -    { -        if (config.has(disk_path_key)) -        { -            throw Exception( -                ErrorCodes::INVALID_CONFIG_PARAMETER, -                "'{}' and '{}' must not be both specified in the config", -                zookeeper_path_key, -                disk_path_key); -        } -        return std::make_unique<UserDefinedSQLObjectsLoaderFromZooKeeper>(global_context, config.getString(zookeeper_path_key)); -    } - -    String default_path = fs::path{global_context->getPath()} / "user_defined/"; -    String path = config.getString(disk_path_key, default_path); -    return std::make_unique<UserDefinedSQLObjectsLoaderFromDisk>(global_context, path); -} - -} diff --git a/contrib/clickhouse/src/Functions/UserDefined/createUserDefinedSQLObjectsLoader.h b/contrib/clickhouse/src/Functions/UserDefined/createUserDefinedSQLObjectsLoader.h deleted file mode 100644 index b3a4623dba3..00000000000 --- a/contrib/clickhouse/src/Functions/UserDefined/createUserDefinedSQLObjectsLoader.h +++ /dev/null @@ -1,12 +0,0 @@ -#pragma once - -#include <Interpreters/Context_fwd.h> - - -namespace DB -{ -class IUserDefinedSQLObjectsLoader; - -std::unique_ptr<IUserDefinedSQLObjectsLoader> createUserDefinedSQLObjectsLoader(const ContextMutablePtr & global_context); - -} diff --git a/contrib/clickhouse/src/Functions/UserDefined/createUserDefinedSQLObjectsStorage.cpp b/contrib/clickhouse/src/Functions/UserDefined/createUserDefinedSQLObjectsStorage.cpp new file mode 100644 index 00000000000..f8847024508 --- /dev/null +++ b/contrib/clickhouse/src/Functions/UserDefined/createUserDefinedSQLObjectsStorage.cpp @@ -0,0 +1,44 @@ +#include <Functions/UserDefined/createUserDefinedSQLObjectsStorage.h> +#include <Functions/UserDefined/UserDefinedSQLObjectsDiskStorage.h> +#include <Functions/UserDefined/UserDefinedSQLObjectsZooKeeperStorage.h> +#include <Interpreters/Context.h> +#include <Poco/Util/AbstractConfiguration.h> +#include <filesystem> + +namespace fs = std::filesystem; + + +namespace DB +{ + + +namespace ErrorCodes +{ +    extern const int INVALID_CONFIG_PARAMETER; +} + +std::unique_ptr<IUserDefinedSQLObjectsStorage> createUserDefinedSQLObjectsStorage(const ContextMutablePtr & global_context) +{ +    const String zookeeper_path_key = "user_defined_zookeeper_path"; +    const String disk_path_key = "user_defined_path"; + +    const auto & config = global_context->getConfigRef(); +    if (config.has(zookeeper_path_key)) +    { +        if (config.has(disk_path_key)) +        { +            throw Exception( +                ErrorCodes::INVALID_CONFIG_PARAMETER, +                "'{}' and '{}' must not be both specified in the config", +                zookeeper_path_key, +                disk_path_key); +        } +        return std::make_unique<UserDefinedSQLObjectsZooKeeperStorage>(global_context, config.getString(zookeeper_path_key)); +    } + +    String default_path = fs::path{global_context->getPath()} / "user_defined/"; +    String path = config.getString(disk_path_key, default_path); +    return std::make_unique<UserDefinedSQLObjectsDiskStorage>(global_context, path); +} + +} diff --git a/contrib/clickhouse/src/Functions/UserDefined/createUserDefinedSQLObjectsStorage.h b/contrib/clickhouse/src/Functions/UserDefined/createUserDefinedSQLObjectsStorage.h new file mode 100644 index 00000000000..01659372dec --- /dev/null +++ b/contrib/clickhouse/src/Functions/UserDefined/createUserDefinedSQLObjectsStorage.h @@ -0,0 +1,12 @@ +#pragma once + +#include <Interpreters/Context_fwd.h> + + +namespace DB +{ +class IUserDefinedSQLObjectsStorage; + +std::unique_ptr<IUserDefinedSQLObjectsStorage> createUserDefinedSQLObjectsStorage(const ContextMutablePtr & global_context); + +} diff --git a/contrib/clickhouse/src/Interpreters/Context.cpp b/contrib/clickhouse/src/Interpreters/Context.cpp index 63f59975749..93fe12431ff 100644 --- a/contrib/clickhouse/src/Interpreters/Context.cpp +++ b/contrib/clickhouse/src/Interpreters/Context.cpp @@ -58,8 +58,8 @@  #include <Interpreters/EmbeddedDictionaries.h>  #include <Interpreters/ExternalDictionariesLoader.h>  #include <Functions/UserDefined/ExternalUserDefinedExecutableFunctionsLoader.h> -#include <Functions/UserDefined/IUserDefinedSQLObjectsLoader.h> -#include <Functions/UserDefined/createUserDefinedSQLObjectsLoader.h> +#include <Functions/UserDefined/IUserDefinedSQLObjectsStorage.h> +#include <Functions/UserDefined/createUserDefinedSQLObjectsStorage.h>  #include <Interpreters/ProcessList.h>  #include <Interpreters/InterserverCredentials.h>  #include <Interpreters/Cluster.h> @@ -238,7 +238,7 @@ struct ContextSharedPart : boost::noncopyable      ExternalLoaderXMLConfigRepository * user_defined_executable_functions_config_repository = nullptr;      scope_guard user_defined_executable_functions_xmls; -    mutable std::unique_ptr<IUserDefinedSQLObjectsLoader> user_defined_sql_objects_loader; +    mutable std::unique_ptr<IUserDefinedSQLObjectsStorage> user_defined_sql_objects_storage;  #if USE_NLP      mutable std::optional<SynonymsExtensions> synonyms_extensions; @@ -489,7 +489,7 @@ struct ContextSharedPart : boost::noncopyable          SHUTDOWN(log, "dictionaries loader", external_dictionaries_loader, enablePeriodicUpdates(false));          SHUTDOWN(log, "UDFs loader", external_user_defined_executable_functions_loader, enablePeriodicUpdates(false)); -        SHUTDOWN(log, "another UDFs loader", user_defined_sql_objects_loader, stopWatching()); +        SHUTDOWN(log, "another UDFs storage", user_defined_sql_objects_storage, stopWatching());          LOG_TRACE(log, "Shutting down named sessions");          Session::shutdownNamedSessions(); @@ -516,7 +516,7 @@ struct ContextSharedPart : boost::noncopyable          std::unique_ptr<EmbeddedDictionaries> delete_embedded_dictionaries;          std::unique_ptr<ExternalDictionariesLoader> delete_external_dictionaries_loader;          std::unique_ptr<ExternalUserDefinedExecutableFunctionsLoader> delete_external_user_defined_executable_functions_loader; -        std::unique_ptr<IUserDefinedSQLObjectsLoader> delete_user_defined_sql_objects_loader; +        std::unique_ptr<IUserDefinedSQLObjectsStorage> delete_user_defined_sql_objects_storage;          std::unique_ptr<BackgroundSchedulePool> delete_buffer_flush_schedule_pool;          std::unique_ptr<BackgroundSchedulePool> delete_schedule_pool;          std::unique_ptr<BackgroundSchedulePool> delete_distributed_schedule_pool; @@ -575,7 +575,7 @@ struct ContextSharedPart : boost::noncopyable              delete_embedded_dictionaries = std::move(embedded_dictionaries);              delete_external_dictionaries_loader = std::move(external_dictionaries_loader);              delete_external_user_defined_executable_functions_loader = std::move(external_user_defined_executable_functions_loader); -            delete_user_defined_sql_objects_loader = std::move(user_defined_sql_objects_loader); +            delete_user_defined_sql_objects_storage = std::move(user_defined_sql_objects_storage);              delete_buffer_flush_schedule_pool = std::move(buffer_flush_schedule_pool);              delete_schedule_pool = std::move(schedule_pool);              delete_distributed_schedule_pool = std::move(distributed_schedule_pool); @@ -602,7 +602,7 @@ struct ContextSharedPart : boost::noncopyable          delete_embedded_dictionaries.reset();          delete_external_dictionaries_loader.reset();          delete_external_user_defined_executable_functions_loader.reset(); -        delete_user_defined_sql_objects_loader.reset(); +        delete_user_defined_sql_objects_storage.reset();          delete_ddl_worker.reset();          delete_buffer_flush_schedule_pool.reset();          delete_schedule_pool.reset(); @@ -2188,20 +2188,26 @@ void Context::loadOrReloadUserDefinedExecutableFunctions(const Poco::Util::Abstr      shared->user_defined_executable_functions_xmls = external_user_defined_executable_functions_loader.addConfigRepository(std::move(repository));  } -const IUserDefinedSQLObjectsLoader & Context::getUserDefinedSQLObjectsLoader() const +const IUserDefinedSQLObjectsStorage & Context::getUserDefinedSQLObjectsStorage() const  {      auto lock = getLock(); -    if (!shared->user_defined_sql_objects_loader) -        shared->user_defined_sql_objects_loader = createUserDefinedSQLObjectsLoader(getGlobalContext()); -    return *shared->user_defined_sql_objects_loader; +    if (!shared->user_defined_sql_objects_storage) +        shared->user_defined_sql_objects_storage = createUserDefinedSQLObjectsStorage(getGlobalContext()); +    return *shared->user_defined_sql_objects_storage;  } -IUserDefinedSQLObjectsLoader & Context::getUserDefinedSQLObjectsLoader() +IUserDefinedSQLObjectsStorage & Context::getUserDefinedSQLObjectsStorage()  {      auto lock = getLock(); -    if (!shared->user_defined_sql_objects_loader) -        shared->user_defined_sql_objects_loader = createUserDefinedSQLObjectsLoader(getGlobalContext()); -    return *shared->user_defined_sql_objects_loader; +    if (!shared->user_defined_sql_objects_storage) +        shared->user_defined_sql_objects_storage = createUserDefinedSQLObjectsStorage(getGlobalContext()); +    return *shared->user_defined_sql_objects_storage; +} + +void Context::setUserDefinedSQLObjectsStorage(std::unique_ptr<IUserDefinedSQLObjectsStorage> storage) +{ +    auto lock = getLock(); +    shared->user_defined_sql_objects_storage = std::move(storage);  }  #if USE_NLP diff --git a/contrib/clickhouse/src/Interpreters/Context.h b/contrib/clickhouse/src/Interpreters/Context.h index eba205a88c3..d9ce12d9ae0 100644 --- a/contrib/clickhouse/src/Interpreters/Context.h +++ b/contrib/clickhouse/src/Interpreters/Context.h @@ -65,7 +65,7 @@ enum class RowPolicyFilterType;  class EmbeddedDictionaries;  class ExternalDictionariesLoader;  class ExternalUserDefinedExecutableFunctionsLoader; -class IUserDefinedSQLObjectsLoader; +class IUserDefinedSQLObjectsStorage;  class InterserverCredentials;  using InterserverCredentialsPtr = std::shared_ptr<const InterserverCredentials>;  class InterserverIOHandler; @@ -764,8 +764,9 @@ public:      const ExternalUserDefinedExecutableFunctionsLoader & getExternalUserDefinedExecutableFunctionsLoader() const;      ExternalUserDefinedExecutableFunctionsLoader & getExternalUserDefinedExecutableFunctionsLoader();      ExternalUserDefinedExecutableFunctionsLoader & getExternalUserDefinedExecutableFunctionsLoaderUnlocked(); -    const IUserDefinedSQLObjectsLoader & getUserDefinedSQLObjectsLoader() const; -    IUserDefinedSQLObjectsLoader & getUserDefinedSQLObjectsLoader(); +    const IUserDefinedSQLObjectsStorage & getUserDefinedSQLObjectsStorage() const; +    IUserDefinedSQLObjectsStorage & getUserDefinedSQLObjectsStorage(); +    void setUserDefinedSQLObjectsStorage(std::unique_ptr<IUserDefinedSQLObjectsStorage> storage);      void loadOrReloadUserDefinedExecutableFunctions(const Poco::Util::AbstractConfiguration & config);  #if USE_NLP diff --git a/contrib/clickhouse/src/Interpreters/InterpreterCreateFunctionQuery.cpp b/contrib/clickhouse/src/Interpreters/InterpreterCreateFunctionQuery.cpp index 3e87f4fe440..b155476fd79 100644 --- a/contrib/clickhouse/src/Interpreters/InterpreterCreateFunctionQuery.cpp +++ b/contrib/clickhouse/src/Interpreters/InterpreterCreateFunctionQuery.cpp @@ -1,7 +1,7 @@  #include <Interpreters/InterpreterCreateFunctionQuery.h>  #include <Access/ContextAccess.h> -#include <Functions/UserDefined/IUserDefinedSQLObjectsLoader.h> +#include <Functions/UserDefined/IUserDefinedSQLObjectsStorage.h>  #include <Functions/UserDefined/UserDefinedSQLFunctionFactory.h>  #include <Interpreters/Context.h>  #include <Interpreters/executeDDLQueryOnCluster.h> @@ -32,7 +32,7 @@ BlockIO InterpreterCreateFunctionQuery::execute()      if (!create_function_query.cluster.empty())      { -        if (current_context->getUserDefinedSQLObjectsLoader().isReplicated()) +        if (current_context->getUserDefinedSQLObjectsStorage().isReplicated())              throw Exception(ErrorCodes::INCORRECT_QUERY, "ON CLUSTER is not allowed because used-defined functions are replicated automatically");          DDLQueryOnClusterParams params; diff --git a/contrib/clickhouse/src/Interpreters/InterpreterDropFunctionQuery.cpp b/contrib/clickhouse/src/Interpreters/InterpreterDropFunctionQuery.cpp index af60d9c5df7..c2cd24044da 100644 --- a/contrib/clickhouse/src/Interpreters/InterpreterDropFunctionQuery.cpp +++ b/contrib/clickhouse/src/Interpreters/InterpreterDropFunctionQuery.cpp @@ -1,7 +1,7 @@  #include <Interpreters/InterpreterDropFunctionQuery.h>  #include <Access/ContextAccess.h> -#include <Functions/UserDefined/IUserDefinedSQLObjectsLoader.h> +#include <Functions/UserDefined/IUserDefinedSQLObjectsStorage.h>  #include <Functions/UserDefined/UserDefinedSQLFunctionFactory.h>  #include <Interpreters/Context.h>  #include <Interpreters/FunctionNameNormalizer.h> @@ -32,7 +32,7 @@ BlockIO InterpreterDropFunctionQuery::execute()      if (!drop_function_query.cluster.empty())      { -        if (current_context->getUserDefinedSQLObjectsLoader().isReplicated()) +        if (current_context->getUserDefinedSQLObjectsStorage().isReplicated())              throw Exception(ErrorCodes::INCORRECT_QUERY, "ON CLUSTER is not allowed because used-defined functions are replicated automatically");          DDLQueryOnClusterParams params; diff --git a/contrib/clickhouse/src/Interpreters/removeOnClusterClauseIfNeeded.cpp b/contrib/clickhouse/src/Interpreters/removeOnClusterClauseIfNeeded.cpp index da3930d62a6..f8df03ed830 100644 --- a/contrib/clickhouse/src/Interpreters/removeOnClusterClauseIfNeeded.cpp +++ b/contrib/clickhouse/src/Interpreters/removeOnClusterClauseIfNeeded.cpp @@ -3,7 +3,7 @@  #include <Access/AccessControl.h>  #include <Access/ReplicatedAccessStorage.h>  #include <Common/logger_useful.h> -#include <Functions/UserDefined/IUserDefinedSQLObjectsLoader.h> +#include <Functions/UserDefined/IUserDefinedSQLObjectsStorage.h>  #include <Interpreters/Context.h>  #include <Parsers/ASTCreateFunctionQuery.h>  #include <Parsers/ASTDropFunctionQuery.h> @@ -47,7 +47,7 @@ ASTPtr removeOnClusterClauseIfNeeded(const ASTPtr & query, ContextPtr context, c      if ((isUserDefinedFunctionQuery(query)           && context->getSettings().ignore_on_cluster_for_replicated_udf_queries -         && context->getUserDefinedSQLObjectsLoader().isReplicated()) +         && context->getUserDefinedSQLObjectsStorage().isReplicated())          || (isAccessControlQuery(query)              && context->getSettings().ignore_on_cluster_for_replicated_access_entities_queries              && context->getAccessControl().containsStorage(ReplicatedAccessStorage::STORAGE_TYPE))) diff --git a/contrib/clickhouse/src/ya.make b/contrib/clickhouse/src/ya.make index 1f5158a3ca9..4b91f7a155b 100644 --- a/contrib/clickhouse/src/ya.make +++ b/contrib/clickhouse/src/ya.make @@ -770,9 +770,13 @@ SRCS(      Functions/UserDefined/UserDefinedSQLFunctionFactory.cpp      Functions/UserDefined/UserDefinedSQLFunctionVisitor.cpp      Functions/UserDefined/UserDefinedSQLObjectsBackup.cpp +    Functions/UserDefined/UserDefinedSQLObjectsDiskStorage.cpp      Functions/UserDefined/UserDefinedSQLObjectsLoaderFromDisk.cpp      Functions/UserDefined/UserDefinedSQLObjectsLoaderFromZooKeeper.cpp +    Functions/UserDefined/UserDefinedSQLObjectsStorageBase.cpp +    Functions/UserDefined/UserDefinedSQLObjectsZooKeeperStorage.cpp      Functions/UserDefined/createUserDefinedSQLObjectsLoader.cpp +    Functions/UserDefined/createUserDefinedSQLObjectsStorage.cpp      Functions/divide/divide.cpp      GLOBAL Functions/CRC.cpp      GLOBAL Functions/CastOverloadResolver.cpp  | 
