diff options
| author | vitalyisaev <[email protected]> | 2023-11-14 09:58:56 +0300 | 
|---|---|---|
| committer | vitalyisaev <[email protected]> | 2023-11-14 10:20:20 +0300 | 
| commit | c2b2dfd9827a400a8495e172a56343462e3ceb82 (patch) | |
| tree | cd4e4f597d01bede4c82dffeb2d780d0a9046bd0 /contrib/clickhouse/programs/server/Server.cpp | |
| parent | d4ae8f119e67808cb0cf776ba6e0cf95296f2df7 (diff) | |
YQ Connector: move tests from yql to ydb (OSS)
Перенос папки с тестами на Коннектор из папки yql в папку ydb (синхронизируется с github).
Diffstat (limited to 'contrib/clickhouse/programs/server/Server.cpp')
| -rw-r--r-- | contrib/clickhouse/programs/server/Server.cpp | 2542 | 
1 files changed, 2542 insertions, 0 deletions
diff --git a/contrib/clickhouse/programs/server/Server.cpp b/contrib/clickhouse/programs/server/Server.cpp new file mode 100644 index 00000000000..ec6c499e5a1 --- /dev/null +++ b/contrib/clickhouse/programs/server/Server.cpp @@ -0,0 +1,2542 @@ +#include "Server.h" + +#include <memory> +#include <sys/resource.h> +#include <sys/stat.h> +#include <sys/types.h> +#include <pwd.h> +#include <unistd.h> +#include <Poco/Net/HTTPServer.h> +#include <Poco/Net/NetException.h> +#include <Poco/Util/HelpFormatter.h> +#include <Poco/Environment.h> +#include <Common/scope_guard_safe.h> +#include <Common/logger_useful.h> +#include <base/phdr_cache.h> +#include <Common/ErrorHandlers.h> +#include <base/getMemoryAmount.h> +#include <base/getAvailableMemoryAmount.h> +#include <base/errnoToString.h> +#include <base/coverage.h> +#include <base/getFQDNOrHostName.h> +#include <base/safeExit.h> +#include <Common/MemoryTracker.h> +#include <Common/ClickHouseRevision.h> +#include <Common/DNSResolver.h> +#include <Common/CurrentMetrics.h> +#include <Common/ConcurrencyControl.h> +#include <Common/Macros.h> +#include <Common/ShellCommand.h> +#include <Common/ZooKeeper/ZooKeeper.h> +#include <Common/ZooKeeper/ZooKeeperNodeCache.h> +#include <Common/formatReadable.h> +#include <Common/getMultipleKeysFromConfig.h> +#include <Common/getNumberOfPhysicalCPUCores.h> +#include <Common/getExecutablePath.h> +#include <Common/ProfileEvents.h> +#include <Common/ThreadProfileEvents.h> +#include <Common/ThreadStatus.h> +#include <Common/getMappedArea.h> +#include <Common/remapExecutable.h> +#include <Common/TLDListsHolder.h> +#include <Common/Config/AbstractConfigurationComparison.h> +#include <Common/assertProcessUserMatchesDataOwner.h> +#include <Common/makeSocketAddress.h> +#include <Server/waitServersToFinish.h> +#include <Core/ServerUUID.h> +#include <IO/ReadHelpers.h> +#include <IO/ReadBufferFromFile.h> +#include <IO/SharedThreadPools.h> +#include <IO/UseSSL.h> +#include <Interpreters/ServerAsynchronousMetrics.h> +#include <Interpreters/DDLWorker.h> +#include <Interpreters/DNSCacheUpdater.h> +#include <Interpreters/DatabaseCatalog.h> +#include <Interpreters/ExternalDictionariesLoader.h> +#include <Interpreters/ProcessList.h> +#include <Interpreters/loadMetadata.h> +#include <Interpreters/JIT/CompiledExpressionCache.h> +#include <Access/AccessControl.h> +#include <Storages/StorageReplicatedMergeTree.h> +#include <Storages/System/attachSystemTables.h> +#include <Storages/System/attachInformationSchemaTables.h> +#include <Storages/Cache/ExternalDataSourceCache.h> +#include <Storages/Cache/registerRemoteFileMetadatas.h> +#include <Common/NamedCollections/NamedCollectionUtils.h> +#include <AggregateFunctions/registerAggregateFunctions.h> +#include <Functions/UserDefined/IUserDefinedSQLObjectsLoader.h> +#include <Functions/registerFunctions.h> +#include <TableFunctions/registerTableFunctions.h> +#include <Formats/registerFormats.h> +#include <Storages/registerStorages.h> +#include <Dictionaries/registerDictionaries.h> +#include <Disks/registerDisks.h> +#include <IO/Resource/registerSchedulerNodes.h> +#include <IO/Resource/registerResourceManagers.h> +#include <Common/Config/ConfigReloader.h> +#include <Server/HTTPHandlerFactory.h> +#include "MetricsTransmitter.h" +#include <Common/StatusFile.h> +#include <Server/TCPHandlerFactory.h> +#include <Server/TCPServer.h> +#include <Common/SensitiveDataMasker.h> +#include <Common/ThreadFuzzer.h> +#include <Common/getHashOfLoadedBinary.h> +#include <Common/filesystemHelpers.h> +#include <Compression/CompressionCodecEncrypted.h> +#include <Server/HTTP/HTTPServerConnectionFactory.h> +#include <Server/MySQLHandlerFactory.h> +#include <Server/PostgreSQLHandlerFactory.h> +#include <Server/ProxyV1HandlerFactory.h> +#include <Server/TLSHandlerFactory.h> +#include <Server/ProtocolServerAdapter.h> +#include <Server/HTTP/HTTPServer.h> +#include <Interpreters/AsynchronousInsertQueue.h> +#include <Core/ServerSettings.h> +#include <filesystem> +#include <unordered_set> + +#include "clickhouse_config.h" +#include "config_version.h" + +#if defined(OS_LINUX) +#    include <cstdlib> +#    include <sys/un.h> +#    include <sys/mman.h> +#    include <sys/ptrace.h> +#    include <Common/hasLinuxCapability.h> +#endif + +#if USE_SSL +#    include <Poco/Net/SecureServerSocket.h> +#    include <Server/CertificateReloader.h> +#endif + +#if USE_GRPC +#   include <Server/GRPCServer.h> +#endif + +#if USE_NURAFT +#    include <Coordination/FourLetterCommand.h> +#    error #include <Server/KeeperTCPHandlerFactory.h> +#endif + +#if USE_JEMALLOC +#    include <jemalloc/jemalloc.h> +#endif + +#if USE_AZURE_BLOB_STORAGE +#   error #include <azure/storage/common/internal/xml_wrapper.hpp> +#endif + +#include <incbin.h> +/// A minimal file used when the server is run without installation +INCBIN(resource_embedded_xml, SOURCE_DIR "/programs/server/embedded.xml"); + +namespace CurrentMetrics +{ +    extern const Metric Revision; +    extern const Metric VersionInteger; +    extern const Metric MemoryTracking; +    extern const Metric MergesMutationsMemoryTracking; +    extern const Metric MaxDDLEntryID; +    extern const Metric MaxPushedDDLEntryID; +} + +namespace ProfileEvents +{ +    extern const Event MainConfigLoads; +    extern const Event ServerStartupMilliseconds; +} + +namespace fs = std::filesystem; + +#if USE_JEMALLOC +static bool jemallocOptionEnabled(const char *name) +{ +    bool value; +    size_t size = sizeof(value); + +    if (mallctl(name, reinterpret_cast<void *>(&value), &size, /* newp= */ nullptr, /* newlen= */ 0)) +        throw Poco::SystemException("mallctl() failed"); + +    return value; +} +#else +static bool jemallocOptionEnabled(const char *) { return 0; } +#endif + +int mainEntryClickHouseServer(int argc, char ** argv) +{ +    DB::Server app; + +    if (jemallocOptionEnabled("opt.background_thread")) +    { +        LOG_ERROR(&app.logger(), +            "jemalloc.background_thread was requested, " +            "however ClickHouse uses percpu_arena and background_thread most likely will not give any benefits, " +            "and also background_thread is not compatible with ClickHouse watchdog " +            "(that can be disabled with CLICKHOUSE_WATCHDOG_ENABLE=0)"); +    } + +    /// Do not fork separate process from watchdog if we attached to terminal. +    /// Otherwise it breaks gdb usage. +    /// Can be overridden by environment variable (cannot use server config at this moment). +    if (argc > 0) +    { +        const char * env_watchdog = getenv("CLICKHOUSE_WATCHDOG_ENABLE"); // NOLINT(concurrency-mt-unsafe) +        if (env_watchdog) +        { +            if (0 == strcmp(env_watchdog, "1")) +                app.shouldSetupWatchdog(argv[0]); + +            /// Other values disable watchdog explicitly. +        } +        else if (!isatty(STDIN_FILENO) && !isatty(STDOUT_FILENO) && !isatty(STDERR_FILENO)) +            app.shouldSetupWatchdog(argv[0]); +    } + +    try +    { +        return app.run(argc, argv); +    } +    catch (...) +    { +        std::cerr << DB::getCurrentExceptionMessage(true) << "\n"; +        auto code = DB::getCurrentExceptionCode(); +        return code ? code : 1; +    } +} + +namespace DB +{ + +namespace ErrorCodes +{ +    extern const int NO_ELEMENTS_IN_CONFIG; +    extern const int SUPPORT_IS_DISABLED; +    extern const int ARGUMENT_OUT_OF_BOUND; +    extern const int EXCESSIVE_ELEMENT_IN_CONFIG; +    extern const int INVALID_CONFIG_PARAMETER; +    extern const int NETWORK_ERROR; +    extern const int CORRUPTED_DATA; +} + + +static std::string getCanonicalPath(std::string && path) +{ +    Poco::trimInPlace(path); +    if (path.empty()) +        throw Exception(ErrorCodes::INVALID_CONFIG_PARAMETER, "path configuration parameter is empty"); +    if (path.back() != '/') +        path += '/'; +    return std::move(path); +} + +Poco::Net::SocketAddress Server::socketBindListen( +    const Poco::Util::AbstractConfiguration & config, +    Poco::Net::ServerSocket & socket, +    const std::string & host, +    UInt16 port, +    [[maybe_unused]] bool secure) const +{ +    auto address = makeSocketAddress(host, port, &logger()); +    socket.bind(address, /* reuseAddress = */ true, /* reusePort = */ config.getBool("listen_reuse_port", false)); +    /// If caller requests any available port from the OS, discover it after binding. +    if (port == 0) +    { +        address = socket.address(); +        LOG_DEBUG(&logger(), "Requested any available port (port == 0), actual port is {:d}", address.port()); +    } + +    socket.listen(/* backlog = */ config.getUInt("listen_backlog", 4096)); + +    return address; +} + +Strings getListenHosts(const Poco::Util::AbstractConfiguration & config) +{ +    auto listen_hosts = DB::getMultipleValuesFromConfig(config, "", "listen_host"); +    if (listen_hosts.empty()) +    { +        listen_hosts.emplace_back("::1"); +        listen_hosts.emplace_back("127.0.0.1"); +    } +    return listen_hosts; +} + +Strings getInterserverListenHosts(const Poco::Util::AbstractConfiguration & config) +{ +    auto interserver_listen_hosts = DB::getMultipleValuesFromConfig(config, "", "interserver_listen_host"); +    if (!interserver_listen_hosts.empty()) +      return interserver_listen_hosts; + +    /// Use more general restriction in case of emptiness +    return getListenHosts(config); +} + +bool getListenTry(const Poco::Util::AbstractConfiguration & config) +{ +    bool listen_try = config.getBool("listen_try", false); +    if (!listen_try) +    { +        Poco::Util::AbstractConfiguration::Keys protocols; +        config.keys("protocols", protocols); +        listen_try = +            DB::getMultipleValuesFromConfig(config, "", "listen_host").empty() && +            std::none_of(protocols.begin(), protocols.end(), [&](const auto & protocol) +            { +                return config.has("protocols." + protocol + ".host") && config.has("protocols." + protocol + ".port"); +            }); +    } +    return listen_try; +} + + +void Server::createServer( +    Poco::Util::AbstractConfiguration & config, +    const std::string & listen_host, +    const char * port_name, +    bool listen_try, +    bool start_server, +    std::vector<ProtocolServerAdapter> & servers, +    CreateServerFunc && func) const +{ +    /// For testing purposes, user may omit tcp_port or http_port or https_port in configuration file. +    if (config.getString(port_name, "").empty()) +        return; + +    /// If we already have an active server for this listen_host/port_name, don't create it again +    for (const auto & server : servers) +    { +        if (!server.isStopping() && server.getListenHost() == listen_host && server.getPortName() == port_name) +            return; +    } + +    auto port = config.getInt(port_name); +    try +    { +        servers.push_back(func(port)); +        if (start_server) +        { +            servers.back().start(); +            LOG_INFO(&logger(), "Listening for {}", servers.back().getDescription()); +        } +        global_context->registerServerPort(port_name, port); +    } +    catch (const Poco::Exception &) +    { +        if (listen_try) +        { +            LOG_WARNING(&logger(), "Listen [{}]:{} failed: {}. If it is an IPv6 or IPv4 address and your host has disabled IPv6 or IPv4, " +                "then consider to " +                "specify not disabled IPv4 or IPv6 address to listen in <listen_host> element of configuration " +                "file. Example for disabled IPv6: <listen_host>0.0.0.0</listen_host> ." +                " Example for disabled IPv4: <listen_host>::</listen_host>", +                listen_host, port, getCurrentExceptionMessage(false)); +        } +        else +        { +            throw Exception(ErrorCodes::NETWORK_ERROR, "Listen [{}]:{} failed: {}", listen_host, port, getCurrentExceptionMessage(false)); +        } +    } +} + + +#if defined(OS_LINUX) +namespace +{ + +void setOOMScore(int value, Poco::Logger * log) +{ +    try +    { +        std::string value_string = std::to_string(value); +        DB::WriteBufferFromFile buf("/proc/self/oom_score_adj"); +        buf.write(value_string.c_str(), value_string.size()); +        buf.next(); +        buf.close(); +    } +    catch (const Poco::Exception & e) +    { +        LOG_WARNING(log, "Failed to adjust OOM score: '{}'.", e.displayText()); +        return; +    } +    LOG_INFO(log, "Set OOM score adjustment to {}", value); +} + +} +#endif + + +void Server::uninitialize() +{ +    logger().information("shutting down"); +    BaseDaemon::uninitialize(); +} + +int Server::run() +{ +    if (config().hasOption("help")) +    { +        Poco::Util::HelpFormatter help_formatter(Server::options()); +        auto header_str = fmt::format("{} [OPTION] [-- [ARG]...]\n" +                                      "positional arguments can be used to rewrite config.xml properties, for example, --http_port=8010", +                                      commandName()); +        help_formatter.setHeader(header_str); +        help_formatter.format(std::cout); +        return 0; +    } +    if (config().hasOption("version")) +    { +        std::cout << VERSION_NAME << " server version " << VERSION_STRING << VERSION_OFFICIAL << "." << std::endl; +        return 0; +    } +    return Application::run(); // NOLINT +} + +void Server::initialize(Poco::Util::Application & self) +{ +    ConfigProcessor::registerEmbeddedConfig("config.xml", std::string_view(reinterpret_cast<const char *>(gresource_embedded_xmlData), gresource_embedded_xmlSize)); +    BaseDaemon::initialize(self); +    logger().information("starting up"); + +    LOG_INFO(&logger(), "OS name: {}, version: {}, architecture: {}", +        Poco::Environment::osName(), +        Poco::Environment::osVersion(), +        Poco::Environment::osArchitecture()); +} + +std::string Server::getDefaultCorePath() const +{ +    return getCanonicalPath(config().getString("path", DBMS_DEFAULT_PATH)) + "cores"; +} + +void Server::defineOptions(Poco::Util::OptionSet & options) +{ +    options.addOption( +        Poco::Util::Option("help", "h", "show help and exit") +            .required(false) +            .repeatable(false) +            .binding("help")); +    options.addOption( +        Poco::Util::Option("version", "V", "show version and exit") +            .required(false) +            .repeatable(false) +            .binding("version")); +    BaseDaemon::defineOptions(options); +} + + +void checkForUsersNotInMainConfig( +    const Poco::Util::AbstractConfiguration & config, +    const std::string & config_path, +    const std::string & users_config_path, +    Poco::Logger * log) +{ +    if (config.getBool("skip_check_for_incorrect_settings", false)) +        return; + +    if (config.has("users") || config.has("profiles") || config.has("quotas")) +    { +        /// We cannot throw exception here, because we have support for obsolete 'conf.d' directory +        /// (that does not correspond to config.d or users.d) but substitute configuration to both of them. + +        LOG_ERROR(log, "The <users>, <profiles> and <quotas> elements should be located in users config file: {} not in main config {}." +            " Also note that you should place configuration changes to the appropriate *.d directory like 'users.d'.", +            users_config_path, config_path); +    } +} + +/// Unused in other builds +#if defined(OS_LINUX) +static String readLine(const String & path) +{ +    ReadBufferFromFile in(path); +    String contents; +    readStringUntilNewlineInto(contents, in); +    return contents; +} + +static int readNumber(const String & path) +{ +    ReadBufferFromFile in(path); +    int result; +    readText(result, in); +    return result; +} + +#endif + +static void sanityChecks(Server & server) +{ +    std::string data_path = getCanonicalPath(server.config().getString("path", DBMS_DEFAULT_PATH)); +    std::string logs_path = server.config().getString("logger.log", ""); + +    if (server.logger().is(Poco::Message::PRIO_TEST)) +        server.context()->addWarningMessage("Server logging level is set to 'test' and performance is degraded. This cannot be used in production."); + +#if defined(OS_LINUX) +    try +    { +        const std::unordered_set<std::string> fastClockSources = { +            // ARM clock +            "arch_sys_counter", +            // KVM guest clock +            "kvm-clock", +            // X86 clock +            "tsc", +        }; +        const char * filename = "/sys/devices/system/clocksource/clocksource0/current_clocksource"; +        if (!fastClockSources.contains(readLine(filename))) +            server.context()->addWarningMessage("Linux is not using a fast clock source. Performance can be degraded. Check " + String(filename)); +    } +    catch (...) +    { +    } + +    try +    { +        const char * filename = "/proc/sys/vm/overcommit_memory"; +        if (readNumber(filename) == 2) +            server.context()->addWarningMessage("Linux memory overcommit is disabled. Check " + String(filename)); +    } +    catch (...) +    { +    } + +    try +    { +        const char * filename = "/sys/kernel/mm/transparent_hugepage/enabled"; +        if (readLine(filename).find("[always]") != std::string::npos) +            server.context()->addWarningMessage("Linux transparent hugepages are set to \"always\". Check " + String(filename)); +    } +    catch (...) +    { +    } + +    try +    { +        const char * filename = "/proc/sys/kernel/pid_max"; +        if (readNumber(filename) < 30000) +            server.context()->addWarningMessage("Linux max PID is too low. Check " + String(filename)); +    } +    catch (...) +    { +    } + +    try +    { +        const char * filename = "/proc/sys/kernel/threads-max"; +        if (readNumber(filename) < 30000) +            server.context()->addWarningMessage("Linux threads max count is too low. Check " + String(filename)); +    } +    catch (...) +    { +    } + +    std::string dev_id = getBlockDeviceId(data_path); +    if (getBlockDeviceType(dev_id) == BlockDeviceType::ROT && getBlockDeviceReadAheadBytes(dev_id) == 0) +        server.context()->addWarningMessage("Rotational disk with disabled readahead is in use. Performance can be degraded. Used for data: " + String(data_path)); +#endif + +    try +    { +        if (getAvailableMemoryAmount() < (2l << 30)) +            server.context()->addWarningMessage("Available memory at server startup is too low (2GiB)."); +    } +    catch (...) +    { +    } + +    try +    { +        if (!enoughSpaceInDirectory(data_path, 1ull << 30)) +            server.context()->addWarningMessage("Available disk space for data at server startup is too low (1GiB): " + String(data_path)); +    } +    catch (...) +    { +    } + +    try +    { +        if (!logs_path.empty()) +        { +            auto logs_parent = fs::path(logs_path).parent_path(); +            if (!enoughSpaceInDirectory(logs_parent, 1ull << 30)) +                server.context()->addWarningMessage("Available disk space for logs at server startup is too low (1GiB): " + String(logs_parent)); +        } +    } +    catch (...) +    { +    } + +    if (server.context()->getMergeTreeSettings().allow_remote_fs_zero_copy_replication) +    { +        server.context()->addWarningMessage("The setting 'allow_remote_fs_zero_copy_replication' is enabled for MergeTree tables." +            " But the feature of 'zero-copy replication' is under development and is not ready for production." +            " The usage of this feature can lead to data corruption and loss. The setting should be disabled in production."); +    } +} + +int Server::main(const std::vector<std::string> & /*args*/) +try +{ +    Stopwatch startup_watch; + +    Poco::Logger * log = &logger(); + +    UseSSL use_ssl; + +    MainThreadStatus::getInstance(); + +    ServerSettings server_settings; +    server_settings.loadSettingsFromConfig(config()); + +    StackTrace::setShowAddresses(server_settings.show_addresses_in_stack_traces); + +#if USE_HDFS +    /// This will point libhdfs3 to the right location for its config. +    /// Note: this has to be done once at server initialization, because 'setenv' is not thread-safe. + +    String libhdfs3_conf = config().getString("hdfs.libhdfs3_conf", ""); +    if (!libhdfs3_conf.empty()) +    { +        if (std::filesystem::path{libhdfs3_conf}.is_relative() && !std::filesystem::exists(libhdfs3_conf)) +        { +            const String config_path = config().getString("config-file", "config.xml"); +            const auto config_dir = std::filesystem::path{config_path}.remove_filename(); +            if (std::filesystem::exists(config_dir / libhdfs3_conf)) +                libhdfs3_conf = std::filesystem::absolute(config_dir / libhdfs3_conf); +        } +        setenv("LIBHDFS3_CONF", libhdfs3_conf.c_str(), true /* overwrite */); // NOLINT +    } +#endif + +#if USE_OPENSSL_INTREE +    /// When building openssl into clickhouse, clickhouse owns the configuration +    /// Therefore, the clickhouse openssl configuration should be kept separate from +    /// the OS. Default to the one in the standard config directory, unless overridden +    /// by a key in the config. +    if (config().has("opensslconf")) +    { +        std::string opensslconf_path = config().getString("opensslconf"); +        setenv("OPENSSL_CONF", opensslconf_path.c_str(), true); +    } +    else +    { +        const String config_path = config().getString("config-file", "config.xml"); +        const auto config_dir = std::filesystem::path{config_path}.replace_filename("openssl.conf"); +        setenv("OPENSSL_CONF", config_dir.c_str(), true); +    } +#endif + +    registerFunctions(); +    registerAggregateFunctions(); +    registerTableFunctions(); +    registerStorages(); +    registerDictionaries(); +    registerDisks(/* global_skip_access_check= */ false); +    registerFormats(); +    registerRemoteFileMetadatas(); +    registerSchedulerNodes(); +    registerResourceManagers(); + +    CurrentMetrics::set(CurrentMetrics::Revision, ClickHouseRevision::getVersionRevision()); +    CurrentMetrics::set(CurrentMetrics::VersionInteger, ClickHouseRevision::getVersionInteger()); + +    /** Context contains all that query execution is dependent: +      *  settings, available functions, data types, aggregate functions, databases, ... +      */ +    auto shared_context = Context::createShared(); +    global_context = Context::createGlobal(shared_context.get()); + +    global_context->makeGlobalContext(); +    global_context->setApplicationType(Context::ApplicationType::SERVER); + +#if !defined(NDEBUG) || !defined(__OPTIMIZE__) +    global_context->addWarningMessage("Server was built in debug mode. It will work slowly."); +#endif + +    if (ThreadFuzzer::instance().isEffective()) +        global_context->addWarningMessage("ThreadFuzzer is enabled. Application will run slowly and unstable."); + +#if defined(SANITIZER) +    global_context->addWarningMessage("Server was built with sanitizer. It will work slowly."); +#endif + +    const size_t physical_server_memory = getMemoryAmount(); + +    LOG_INFO(log, "Available RAM: {}; physical cores: {}; logical cores: {}.", +        formatReadableSizeWithBinarySuffix(physical_server_memory), +        getNumberOfPhysicalCPUCores(),  // on ARM processors it can show only enabled at current moment cores +        std::thread::hardware_concurrency()); + +    sanityChecks(*this); + +    // Initialize global thread pool. Do it before we fetch configs from zookeeper +    // nodes (`from_zk`), because ZooKeeper interface uses the pool. We will +    // ignore `max_thread_pool_size` in configs we fetch from ZK, but oh well. +    GlobalThreadPool::initialize( +        server_settings.max_thread_pool_size, +        server_settings.max_thread_pool_free_size, +        server_settings.thread_pool_queue_size); + +#if USE_AZURE_BLOB_STORAGE +    /// It makes sense to deinitialize libxml after joining of all threads +    /// in global pool because libxml uses thread-local memory allocations via +    /// 'pthread_key_create' and 'pthread_setspecific' which should be deallocated +    /// at 'pthread_exit'. Deinitialization of libxml leads to call of 'pthread_key_delete' +    /// and if it is done before joining of threads, allocated memory will not be freed +    /// and there may be memory leaks in threads that used libxml. +    GlobalThreadPool::instance().addOnDestroyCallback([] +    { +        Azure::Storage::_internal::XmlGlobalDeinitialize(); +    }); +#endif + +    getIOThreadPool().initialize( +        server_settings.max_io_thread_pool_size, +        server_settings.max_io_thread_pool_free_size, +        server_settings.io_thread_pool_queue_size); + +    getBackupsIOThreadPool().initialize( +        server_settings.max_backups_io_thread_pool_size, +        server_settings.max_backups_io_thread_pool_free_size, +        server_settings.backups_io_thread_pool_queue_size); + +    getActivePartsLoadingThreadPool().initialize( +        server_settings.max_active_parts_loading_thread_pool_size, +        0, // We don't need any threads once all the parts will be loaded +        server_settings.max_active_parts_loading_thread_pool_size); + +    getOutdatedPartsLoadingThreadPool().initialize( +        server_settings.max_outdated_parts_loading_thread_pool_size, +        0, // We don't need any threads once all the parts will be loaded +        server_settings.max_outdated_parts_loading_thread_pool_size); + +    /// It could grow if we need to synchronously wait until all the data parts will be loaded. +    getOutdatedPartsLoadingThreadPool().setMaxTurboThreads( +        server_settings.max_active_parts_loading_thread_pool_size +    ); + +    getPartsCleaningThreadPool().initialize( +        server_settings.max_parts_cleaning_thread_pool_size, +        0, // We don't need any threads one all the parts will be deleted +        server_settings.max_parts_cleaning_thread_pool_size); + +    /// Initialize global local cache for remote filesystem. +    if (config().has("local_cache_for_remote_fs")) +    { +        bool enable = config().getBool("local_cache_for_remote_fs.enable", false); +        if (enable) +        { +            String root_dir = config().getString("local_cache_for_remote_fs.root_dir"); +            UInt64 limit_size = config().getUInt64("local_cache_for_remote_fs.limit_size"); +            UInt64 bytes_read_before_flush +                = config().getUInt64("local_cache_for_remote_fs.bytes_read_before_flush", DBMS_DEFAULT_BUFFER_SIZE); +            ExternalDataSourceCache::instance().initOnce(global_context, root_dir, limit_size, bytes_read_before_flush); +        } +    } + +    Poco::ThreadPool server_pool(3, server_settings.max_connections); +    std::mutex servers_lock; +    std::vector<ProtocolServerAdapter> servers; +    std::vector<ProtocolServerAdapter> servers_to_start_before_tables; +    /// This object will periodically calculate some metrics. +    ServerAsynchronousMetrics async_metrics( +        global_context, +        server_settings.asynchronous_metrics_update_period_s, +        server_settings.asynchronous_heavy_metrics_update_period_s, +        [&]() -> std::vector<ProtocolServerMetrics> +        { +            std::vector<ProtocolServerMetrics> metrics; + +            std::lock_guard lock(servers_lock); +            metrics.reserve(servers_to_start_before_tables.size() + servers.size()); + +            for (const auto & server : servers_to_start_before_tables) +                metrics.emplace_back(ProtocolServerMetrics{server.getPortName(), server.currentThreads()}); + +            for (const auto & server : servers) +                metrics.emplace_back(ProtocolServerMetrics{server.getPortName(), server.currentThreads()}); +            return metrics; +        } +    ); + +    zkutil::validateZooKeeperConfig(config()); +    bool has_zookeeper = zkutil::hasZooKeeperConfig(config()); + +    zkutil::ZooKeeperNodeCache main_config_zk_node_cache([&] { return global_context->getZooKeeper(); }); +    zkutil::EventPtr main_config_zk_changed_event = std::make_shared<Poco::Event>(); +    if (loaded_config.has_zk_includes) +    { +        auto old_configuration = loaded_config.configuration; +        ConfigProcessor config_processor(config_path); +        loaded_config = config_processor.loadConfigWithZooKeeperIncludes( +            main_config_zk_node_cache, main_config_zk_changed_event, /* fallback_to_preprocessed = */ true); +        config_processor.savePreprocessedConfig(loaded_config, config().getString("path", DBMS_DEFAULT_PATH)); +        config().removeConfiguration(old_configuration.get()); +        config().add(loaded_config.configuration.duplicate(), PRIO_DEFAULT, false); +    } + +    Settings::checkNoSettingNamesAtTopLevel(config(), config_path); + +    /// We need to reload server settings because config could be updated via zookeeper. +    server_settings.loadSettingsFromConfig(config()); + +#if defined(OS_LINUX) +    std::string executable_path = getExecutablePath(); + +    if (!executable_path.empty()) +    { +        /// Integrity check based on checksum of the executable code. +        /// Note: it is not intended to protect from malicious party, +        /// because the reference checksum can be easily modified as well. +        /// And we don't involve asymmetric encryption with PKI yet. +        /// It's only intended to protect from faulty hardware. +        /// Note: it is only based on machine code. +        /// But there are other sections of the binary (e.g. exception handling tables) +        /// that are interpreted (not executed) but can alter the behaviour of the program as well. + +        /// Please keep the below log messages in-sync with the ones in daemon/BaseDaemon.cpp +        if (stored_binary_hash.empty()) +        { +            LOG_WARNING(log, "Integrity check of the executable skipped because the reference checksum could not be read."); +        } +        else +        { +            String calculated_binary_hash = getHashOfLoadedBinaryHex(); +            if (calculated_binary_hash == stored_binary_hash) +            { +                LOG_INFO(log, "Integrity check of the executable successfully passed (checksum: {})", calculated_binary_hash); +            } +            else +            { +                /// If program is run under debugger, ptrace will fail. +                if (ptrace(PTRACE_TRACEME, 0, nullptr, nullptr) == -1) +                { +                    /// Program is run under debugger. Modification of it's binary image is ok for breakpoints. +                    global_context->addWarningMessage(fmt::format( +                        "Server is run under debugger and its binary image is modified (most likely with breakpoints).", +                        calculated_binary_hash)); +                } +                else +                { +                    throw Exception( +                        ErrorCodes::CORRUPTED_DATA, +                        "Calculated checksum of the executable ({0}) does not correspond" +                        " to the reference checksum stored in the executable ({1})." +                        " This may indicate one of the following:" +                        " - the executable {2} was changed just after startup;" +                        " - the executable {2} was corrupted on disk due to faulty hardware;" +                        " - the loaded executable was corrupted in memory due to faulty hardware;" +                        " - the file {2} was intentionally modified;" +                        " - a logical error in the code.", +                        calculated_binary_hash, +                        stored_binary_hash, +                        executable_path); +                } +            } +        } +    } +    else +        executable_path = "/usr/bin/clickhouse";    /// It is used for information messages. + +    /// After full config loaded +    { +        if (config().getBool("remap_executable", false)) +        { +            LOG_DEBUG(log, "Will remap executable in memory."); +            size_t size = remapExecutable(); +            LOG_DEBUG(log, "The code ({}) in memory has been successfully remapped.", ReadableSize(size)); +        } + +        if (config().getBool("mlock_executable", false)) +        { +            if (hasLinuxCapability(CAP_IPC_LOCK)) +            { +                try +                { +                    /// Get the memory area with (current) code segment. +                    /// It's better to lock only the code segment instead of calling "mlockall", +                    /// because otherwise debug info will be also locked in memory, and it can be huge. +                    auto [addr, len] = getMappedArea(reinterpret_cast<void *>(mainEntryClickHouseServer)); + +                    LOG_TRACE(log, "Will do mlock to prevent executable memory from being paged out. It may take a few seconds."); +                    if (0 != mlock(addr, len)) +                        LOG_WARNING(log, "Failed mlock: {}", errnoToString()); +                    else +                        LOG_TRACE(log, "The memory map of clickhouse executable has been mlock'ed, total {}", ReadableSize(len)); +                } +                catch (...) +                { +                    LOG_WARNING(log, "Cannot mlock: {}", getCurrentExceptionMessage(false)); +                } +            } +            else +            { +                LOG_INFO(log, "It looks like the process has no CAP_IPC_LOCK capability, binary mlock will be disabled." +                    " It could happen due to incorrect ClickHouse package installation." +                    " You could resolve the problem manually with 'sudo setcap cap_ipc_lock=+ep {}'." +                    " Note that it will not work on 'nosuid' mounted filesystems.", executable_path); +            } +        } +    } + +    int default_oom_score = 0; + +#if !defined(NDEBUG) +    /// In debug version on Linux, increase oom score so that clickhouse is killed +    /// first, instead of some service. Use a carefully chosen random score of 555: +    /// the maximum is 1000, and chromium uses 300 for its tab processes. Ignore +    /// whatever errors that occur, because it's just a debugging aid and we don't +    /// care if it breaks. +    default_oom_score = 555; +#endif + +    int oom_score = config().getInt("oom_score", default_oom_score); +    if (oom_score) +        setOOMScore(oom_score, log); +#endif + +    global_context->setRemoteHostFilter(config()); +    global_context->setHTTPHeaderFilter(config()); + +    std::string path_str = getCanonicalPath(config().getString("path", DBMS_DEFAULT_PATH)); +    fs::path path = path_str; +    std::string default_database = server_settings.default_database.toString(); + +    /// Check that the process user id matches the owner of the data. +    assertProcessUserMatchesDataOwner(path_str, [&](const std::string & message){ global_context->addWarningMessage(message); }); + +    global_context->setPath(path_str); + +    StatusFile status{path / "status", StatusFile::write_full_info}; + +    ServerUUID::load(path / "uuid", log); + +    /// Try to increase limit on number of open files. +    { +        rlimit rlim; +        if (getrlimit(RLIMIT_NOFILE, &rlim)) +            throw Poco::Exception("Cannot getrlimit"); + +        if (rlim.rlim_cur == rlim.rlim_max) +        { +            LOG_DEBUG(log, "rlimit on number of file descriptors is {}", rlim.rlim_cur); +        } +        else +        { +            rlim_t old = rlim.rlim_cur; +            rlim.rlim_cur = config().getUInt("max_open_files", static_cast<unsigned>(rlim.rlim_max)); +            int rc = setrlimit(RLIMIT_NOFILE, &rlim); +            if (rc != 0) +                LOG_WARNING(log, "Cannot set max number of file descriptors to {}. Try to specify max_open_files according to your system limits. error: {}", rlim.rlim_cur, errnoToString()); +            else +                LOG_DEBUG(log, "Set max number of file descriptors to {} (was {}).", rlim.rlim_cur, old); +        } +    } + +    /// Try to increase limit on number of threads. +    { +        rlimit rlim; +        if (getrlimit(RLIMIT_NPROC, &rlim)) +            throw Poco::Exception("Cannot getrlimit"); + +        if (rlim.rlim_cur == rlim.rlim_max) +        { +            LOG_DEBUG(log, "rlimit on number of threads is {}", rlim.rlim_cur); +        } +        else +        { +            rlim_t old = rlim.rlim_cur; +            rlim.rlim_cur = rlim.rlim_max; +            int rc = setrlimit(RLIMIT_NPROC, &rlim); +            if (rc != 0) +            { +                LOG_WARNING(log, "Cannot set max number of threads to {}. error: {}", rlim.rlim_cur, errnoToString()); +                rlim.rlim_cur = old; +            } +            else +            { +                LOG_DEBUG(log, "Set max number of threads to {} (was {}).", rlim.rlim_cur, old); +            } +        } + +        if (rlim.rlim_cur < 30000) +        { +            global_context->addWarningMessage("Maximum number of threads is lower than 30000. There could be problems with handling a lot of simultaneous queries."); +        } +    } + +    static ServerErrorHandler error_handler; +    Poco::ErrorHandler::set(&error_handler); + +    /// Initialize DateLUT early, to not interfere with running time of first query. +    LOG_DEBUG(log, "Initializing DateLUT."); +    DateLUT::serverTimezoneInstance(); +    LOG_TRACE(log, "Initialized DateLUT with time zone '{}'.", DateLUT::serverTimezoneInstance().getTimeZone()); + +    /// Storage with temporary data for processing of heavy queries. +    if (!server_settings.tmp_policy.value.empty()) +    { +        global_context->setTemporaryStoragePolicy(server_settings.tmp_policy, server_settings.max_temporary_data_on_disk_size); +    } +    else if (!server_settings.temporary_data_in_cache.value.empty()) +    { +        global_context->setTemporaryStorageInCache(server_settings.temporary_data_in_cache, server_settings.max_temporary_data_on_disk_size); +    } +    else +    { +        std::string temporary_path = config().getString("tmp_path", path / "tmp/"); +        global_context->setTemporaryStoragePath(temporary_path, server_settings.max_temporary_data_on_disk_size); +    } + +    /** Directory with 'flags': files indicating temporary settings for the server set by system administrator. +      * Flags may be cleared automatically after being applied by the server. +      * Examples: do repair of local data; clone all replicated tables from replica. +      */ +    { +        auto flags_path = path / "flags/"; +        fs::create_directories(flags_path); +        global_context->setFlagsPath(flags_path); +    } + +    /** Directory with user provided files that are usable by 'file' table function. +      */ +    { + +        std::string user_files_path = config().getString("user_files_path", path / "user_files/"); +        global_context->setUserFilesPath(user_files_path); +        fs::create_directories(user_files_path); +    } + +    { +        std::string dictionaries_lib_path = config().getString("dictionaries_lib_path", path / "dictionaries_lib/"); +        global_context->setDictionariesLibPath(dictionaries_lib_path); +        fs::create_directories(dictionaries_lib_path); +    } + +    { +        std::string user_scripts_path = config().getString("user_scripts_path", path / "user_scripts/"); +        global_context->setUserScriptsPath(user_scripts_path); +        fs::create_directories(user_scripts_path); +    } + +    /// top_level_domains_lists +    { +        const std::string & top_level_domains_path = config().getString("top_level_domains_path", path / "top_level_domains/"); +        TLDListsHolder::getInstance().parseConfig(fs::path(top_level_domains_path) / "", config()); +    } + +    { +        fs::create_directories(path / "data/"); +        fs::create_directories(path / "metadata/"); + +        /// Directory with metadata of tables, which was marked as dropped by Atomic database +        fs::create_directories(path / "metadata_dropped/"); +    } + +#if USE_ROCKSDB +    /// Initialize merge tree metadata cache +    if (config().has("merge_tree_metadata_cache")) +    { +        global_context->addWarningMessage("The setting 'merge_tree_metadata_cache' is enabled." +            " But the feature of 'metadata cache in RocksDB' is experimental and is not ready for production." +            " The usage of this feature can lead to data corruption and loss. The setting should be disabled in production." +            " See the corresponding report at https://github.com/ClickHouse/ClickHouse/issues/51182"); + +        fs::create_directories(path / "rocksdb/"); +        size_t size = config().getUInt64("merge_tree_metadata_cache.lru_cache_size", 256 << 20); +        bool continue_if_corrupted = config().getBool("merge_tree_metadata_cache.continue_if_corrupted", false); +        try +        { +            LOG_DEBUG(log, "Initializing MergeTree metadata cache, lru_cache_size: {} continue_if_corrupted: {}", +                ReadableSize(size), continue_if_corrupted); +            global_context->initializeMergeTreeMetadataCache(path_str + "/" + "rocksdb", size); +        } +        catch (...) +        { +            if (continue_if_corrupted) +            { +                /// Rename rocksdb directory and reinitialize merge tree metadata cache +                time_t now = time(nullptr); +                fs::rename(path / "rocksdb", path / ("rocksdb.old." + std::to_string(now))); +                global_context->initializeMergeTreeMetadataCache(path_str + "/" + "rocksdb", size); +            } +            else +            { +                throw; +            } +        } +    } +#endif + +    if (config().has("interserver_http_port") && config().has("interserver_https_port")) +        throw Exception(ErrorCodes::EXCESSIVE_ELEMENT_IN_CONFIG, "Both http and https interserver ports are specified"); + +    static const auto interserver_tags = +    { +        std::make_tuple("interserver_http_host", "interserver_http_port", "http"), +        std::make_tuple("interserver_https_host", "interserver_https_port", "https") +    }; + +    for (auto [host_tag, port_tag, scheme] : interserver_tags) +    { +        if (config().has(port_tag)) +        { +            String this_host = config().getString(host_tag, ""); + +            if (this_host.empty()) +            { +                this_host = getFQDNOrHostName(); +                LOG_DEBUG(log, "Configuration parameter '{}' doesn't exist or exists and empty. Will use '{}' as replica host.", +                    host_tag, this_host); +            } + +            String port_str = config().getString(port_tag); +            int port = parse<int>(port_str); + +            if (port < 0 || port > 0xFFFF) +                throw Exception(ErrorCodes::ARGUMENT_OUT_OF_BOUND, "Out of range '{}': {}", String(port_tag), port); + +            global_context->setInterserverIOAddress(this_host, port); +            global_context->setInterserverScheme(scheme); +        } +    } + +    LOG_DEBUG(log, "Initializing interserver credentials."); +    global_context->updateInterserverCredentials(config()); + +    if (config().has("macros")) +        global_context->setMacros(std::make_unique<Macros>(config(), "macros", log)); + +    /// Set up caches. + +    const size_t max_cache_size = static_cast<size_t>(physical_server_memory * server_settings.cache_size_to_ram_max_ratio); + +    String uncompressed_cache_policy = server_settings.uncompressed_cache_policy; +    size_t uncompressed_cache_size = server_settings.uncompressed_cache_size; +    double uncompressed_cache_size_ratio = server_settings.uncompressed_cache_size_ratio; +    if (uncompressed_cache_size > max_cache_size) +    { +        uncompressed_cache_size = max_cache_size; +        LOG_INFO(log, "Lowered uncompressed cache size to {} because the system has limited RAM", formatReadableSizeWithBinarySuffix(uncompressed_cache_size)); +    } +    global_context->setUncompressedCache(uncompressed_cache_policy, uncompressed_cache_size, uncompressed_cache_size_ratio); + +    String mark_cache_policy = server_settings.mark_cache_policy; +    size_t mark_cache_size = server_settings.mark_cache_size; +    double mark_cache_size_ratio = server_settings.mark_cache_size_ratio; +    if (mark_cache_size > max_cache_size) +    { +        mark_cache_size = max_cache_size; +        LOG_INFO(log, "Lowered mark cache size to {} because the system has limited RAM", formatReadableSizeWithBinarySuffix(mark_cache_size)); +    } +    global_context->setMarkCache(mark_cache_policy, mark_cache_size, mark_cache_size_ratio); + +    String index_uncompressed_cache_policy = server_settings.index_uncompressed_cache_policy; +    size_t index_uncompressed_cache_size = server_settings.index_uncompressed_cache_size; +    double index_uncompressed_cache_size_ratio = server_settings.index_uncompressed_cache_size_ratio; +    if (index_uncompressed_cache_size > max_cache_size) +    { +        index_uncompressed_cache_size = max_cache_size; +        LOG_INFO(log, "Lowered index uncompressed cache size to {} because the system has limited RAM", formatReadableSizeWithBinarySuffix(uncompressed_cache_size)); +    } +    global_context->setIndexUncompressedCache(index_uncompressed_cache_policy, index_uncompressed_cache_size, index_uncompressed_cache_size_ratio); + +    String index_mark_cache_policy = server_settings.index_mark_cache_policy; +    size_t index_mark_cache_size = server_settings.index_mark_cache_size; +    double index_mark_cache_size_ratio = server_settings.index_mark_cache_size_ratio; +    if (index_mark_cache_size > max_cache_size) +    { +        index_mark_cache_size = max_cache_size; +        LOG_INFO(log, "Lowered index mark cache size to {} because the system has limited RAM", formatReadableSizeWithBinarySuffix(uncompressed_cache_size)); +    } +    global_context->setIndexMarkCache(index_mark_cache_policy, index_mark_cache_size, index_mark_cache_size_ratio); + +    size_t mmap_cache_size = server_settings.mmap_cache_size; +    if (mmap_cache_size > max_cache_size) +    { +        mmap_cache_size = max_cache_size; +        LOG_INFO(log, "Lowered mmap file cache size to {} because the system has limited RAM", formatReadableSizeWithBinarySuffix(uncompressed_cache_size)); +    } +    global_context->setMMappedFileCache(mmap_cache_size); + +    size_t query_cache_max_size_in_bytes = config().getUInt64("query_cache.max_size_in_bytes", DEFAULT_QUERY_CACHE_MAX_SIZE); +    size_t query_cache_max_entries = config().getUInt64("query_cache.max_entries", DEFAULT_QUERY_CACHE_MAX_ENTRIES); +    size_t query_cache_query_cache_max_entry_size_in_bytes = config().getUInt64("query_cache.max_entry_size_in_bytes", DEFAULT_QUERY_CACHE_MAX_ENTRY_SIZE_IN_BYTES); +    size_t query_cache_max_entry_size_in_rows = config().getUInt64("query_cache.max_entry_rows_in_rows", DEFAULT_QUERY_CACHE_MAX_ENTRY_SIZE_IN_ROWS); +    if (query_cache_max_size_in_bytes > max_cache_size) +    { +        query_cache_max_size_in_bytes = max_cache_size; +        LOG_INFO(log, "Lowered query cache size to {} because the system has limited RAM", formatReadableSizeWithBinarySuffix(uncompressed_cache_size)); +    } +    global_context->setQueryCache(query_cache_max_size_in_bytes, query_cache_max_entries, query_cache_query_cache_max_entry_size_in_bytes, query_cache_max_entry_size_in_rows); + +#if USE_EMBEDDED_COMPILER +    size_t compiled_expression_cache_max_size_in_bytes = config().getUInt64("compiled_expression_cache_size", DEFAULT_COMPILED_EXPRESSION_CACHE_MAX_SIZE); +    size_t compiled_expression_cache_max_elements = config().getUInt64("compiled_expression_cache_elements_size", DEFAULT_COMPILED_EXPRESSION_CACHE_MAX_ENTRIES); +    CompiledExpressionCacheFactory::instance().init(compiled_expression_cache_max_size_in_bytes, compiled_expression_cache_max_elements); +#endif + +    /// Initialize main config reloader. +    std::string include_from_path = config().getString("include_from", "/etc/metrika.xml"); + +    if (config().has("query_masking_rules")) +    { +        SensitiveDataMasker::setInstance(std::make_unique<SensitiveDataMasker>(config(), "query_masking_rules")); +    } + +    const std::string cert_path = config().getString("openSSL.server.certificateFile", ""); +    const std::string key_path = config().getString("openSSL.server.privateKeyFile", ""); + +    std::vector<std::string> extra_paths = {include_from_path}; +    if (!cert_path.empty()) +        extra_paths.emplace_back(cert_path); +    if (!key_path.empty()) +        extra_paths.emplace_back(key_path); + +    auto main_config_reloader = std::make_unique<ConfigReloader>( +        config_path, +        extra_paths, +        config().getString("path", ""), +        std::move(main_config_zk_node_cache), +        main_config_zk_changed_event, +        [&](ConfigurationPtr config, bool initial_loading) +        { +            Settings::checkNoSettingNamesAtTopLevel(*config, config_path); + +            ServerSettings server_settings_; +            server_settings_.loadSettingsFromConfig(*config); + +            size_t max_server_memory_usage = server_settings_.max_server_memory_usage; +            double max_server_memory_usage_to_ram_ratio = server_settings_.max_server_memory_usage_to_ram_ratio; + +            size_t current_physical_server_memory = getMemoryAmount(); /// With cgroups, the amount of memory available to the server can be changed dynamically. +            size_t default_max_server_memory_usage = static_cast<size_t>(current_physical_server_memory * max_server_memory_usage_to_ram_ratio); + +            if (max_server_memory_usage == 0) +            { +                max_server_memory_usage = default_max_server_memory_usage; +                LOG_INFO(log, "Setting max_server_memory_usage was set to {}" +                    " ({} available * {:.2f} max_server_memory_usage_to_ram_ratio)", +                    formatReadableSizeWithBinarySuffix(max_server_memory_usage), +                    formatReadableSizeWithBinarySuffix(current_physical_server_memory), +                    max_server_memory_usage_to_ram_ratio); +            } +            else if (max_server_memory_usage > default_max_server_memory_usage) +            { +                max_server_memory_usage = default_max_server_memory_usage; +                LOG_INFO(log, "Setting max_server_memory_usage was lowered to {}" +                    " because the system has low amount of memory. The amount was" +                    " calculated as {} available" +                    " * {:.2f} max_server_memory_usage_to_ram_ratio", +                    formatReadableSizeWithBinarySuffix(max_server_memory_usage), +                    formatReadableSizeWithBinarySuffix(current_physical_server_memory), +                    max_server_memory_usage_to_ram_ratio); +            } + +            total_memory_tracker.setHardLimit(max_server_memory_usage); +            total_memory_tracker.setDescription("(total)"); +            total_memory_tracker.setMetric(CurrentMetrics::MemoryTracking); + +            size_t merges_mutations_memory_usage_soft_limit = server_settings_.merges_mutations_memory_usage_soft_limit; + +            size_t default_merges_mutations_server_memory_usage = static_cast<size_t>(current_physical_server_memory * server_settings_.merges_mutations_memory_usage_to_ram_ratio); +            if (merges_mutations_memory_usage_soft_limit == 0) +            { +                merges_mutations_memory_usage_soft_limit = default_merges_mutations_server_memory_usage; +                LOG_INFO(log, "Setting merges_mutations_memory_usage_soft_limit was set to {}" +                    " ({} available * {:.2f} merges_mutations_memory_usage_to_ram_ratio)", +                    formatReadableSizeWithBinarySuffix(merges_mutations_memory_usage_soft_limit), +                    formatReadableSizeWithBinarySuffix(current_physical_server_memory), +                    server_settings_.merges_mutations_memory_usage_to_ram_ratio); +            } +            else if (merges_mutations_memory_usage_soft_limit > default_merges_mutations_server_memory_usage) +            { +                merges_mutations_memory_usage_soft_limit = default_merges_mutations_server_memory_usage; +                LOG_WARNING(log, "Setting merges_mutations_memory_usage_soft_limit was set to {}" +                    " ({} available * {:.2f} merges_mutations_memory_usage_to_ram_ratio)", +                    formatReadableSizeWithBinarySuffix(merges_mutations_memory_usage_soft_limit), +                    formatReadableSizeWithBinarySuffix(current_physical_server_memory), +                    server_settings_.merges_mutations_memory_usage_to_ram_ratio); +            } + +            LOG_INFO(log, "Merges and mutations memory limit is set to {}", +                formatReadableSizeWithBinarySuffix(merges_mutations_memory_usage_soft_limit)); +            background_memory_tracker.setSoftLimit(merges_mutations_memory_usage_soft_limit); +            background_memory_tracker.setDescription("(background)"); +            background_memory_tracker.setMetric(CurrentMetrics::MergesMutationsMemoryTracking); + +            total_memory_tracker.setAllowUseJemallocMemory(server_settings_.allow_use_jemalloc_memory); + +            auto * global_overcommit_tracker = global_context->getGlobalOvercommitTracker(); +            total_memory_tracker.setOvercommitTracker(global_overcommit_tracker); + +            // FIXME logging-related things need synchronization -- see the 'Logger * log' saved +            // in a lot of places. For now, disable updating log configuration without server restart. +            //setTextLog(global_context->getTextLog()); +            updateLevels(*config, logger()); +            global_context->setClustersConfig(config, has_zookeeper); +            global_context->setMacros(std::make_unique<Macros>(*config, "macros", log)); +            global_context->setExternalAuthenticatorsConfig(*config); + +            if (global_context->isServerCompletelyStarted()) +            { +                /// It does not make sense to reload anything before server has started. +                /// Moreover, it may break initialization order. +                global_context->loadOrReloadDictionaries(*config); +                global_context->loadOrReloadUserDefinedExecutableFunctions(*config); +            } + +            global_context->setRemoteHostFilter(*config); +            global_context->setHTTPHeaderFilter(*config); + +            global_context->setMaxTableSizeToDrop(server_settings_.max_table_size_to_drop); +            global_context->setMaxPartitionSizeToDrop(server_settings_.max_partition_size_to_drop); + +            ConcurrencyControl::SlotCount concurrent_threads_soft_limit = ConcurrencyControl::Unlimited; +            if (server_settings_.concurrent_threads_soft_limit_num > 0 && server_settings_.concurrent_threads_soft_limit_num < concurrent_threads_soft_limit) +                concurrent_threads_soft_limit = server_settings_.concurrent_threads_soft_limit_num; +            if (server_settings_.concurrent_threads_soft_limit_ratio_to_cores > 0) +            { +                auto value = server_settings_.concurrent_threads_soft_limit_ratio_to_cores * std::thread::hardware_concurrency(); +                if (value > 0 && value < concurrent_threads_soft_limit) +                    concurrent_threads_soft_limit = value; +            } +            ConcurrencyControl::instance().setMaxConcurrency(concurrent_threads_soft_limit); + +            global_context->getProcessList().setMaxSize(server_settings_.max_concurrent_queries); +            global_context->getProcessList().setMaxInsertQueriesAmount(server_settings_.max_concurrent_insert_queries); +            global_context->getProcessList().setMaxSelectQueriesAmount(server_settings_.max_concurrent_select_queries); + +            if (config->has("keeper_server")) +                global_context->updateKeeperConfiguration(*config); + +            /// Reload the number of threads for global pools. +            /// Note: If you specified it in the top level config (not it config of default profile) +            /// then ClickHouse will use it exactly. +            /// This is done for backward compatibility. +            if (global_context->areBackgroundExecutorsInitialized()) +            { +                auto new_pool_size = server_settings_.background_pool_size; +                auto new_ratio = server_settings_.background_merges_mutations_concurrency_ratio; +                global_context->getMergeMutateExecutor()->increaseThreadsAndMaxTasksCount(new_pool_size, static_cast<size_t>(new_pool_size * new_ratio)); +                global_context->getMergeMutateExecutor()->updateSchedulingPolicy(server_settings_.background_merges_mutations_scheduling_policy.toString()); +            } + +            if (global_context->areBackgroundExecutorsInitialized()) +            { +                auto new_pool_size = server_settings_.background_move_pool_size; +                global_context->getMovesExecutor()->increaseThreadsAndMaxTasksCount(new_pool_size, new_pool_size); +            } + +            if (global_context->areBackgroundExecutorsInitialized()) +            { +                auto new_pool_size = server_settings_.background_fetches_pool_size; +                global_context->getFetchesExecutor()->increaseThreadsAndMaxTasksCount(new_pool_size, new_pool_size); +            } + +            if (global_context->areBackgroundExecutorsInitialized()) +            { +                auto new_pool_size = server_settings_.background_common_pool_size; +                global_context->getCommonExecutor()->increaseThreadsAndMaxTasksCount(new_pool_size, new_pool_size); +            } + +            global_context->getBufferFlushSchedulePool().increaseThreadsCount(server_settings_.background_buffer_flush_schedule_pool_size); +            global_context->getSchedulePool().increaseThreadsCount(server_settings_.background_schedule_pool_size); +            global_context->getMessageBrokerSchedulePool().increaseThreadsCount(server_settings_.background_message_broker_schedule_pool_size); +            global_context->getDistributedSchedulePool().increaseThreadsCount(server_settings_.background_distributed_schedule_pool_size); + +            getIOThreadPool().reloadConfiguration( +                server_settings.max_io_thread_pool_size, +                server_settings.max_io_thread_pool_free_size, +                server_settings.io_thread_pool_queue_size); + +            getBackupsIOThreadPool().reloadConfiguration( +                server_settings.max_backups_io_thread_pool_size, +                server_settings.max_backups_io_thread_pool_free_size, +                server_settings.backups_io_thread_pool_queue_size); + +            getActivePartsLoadingThreadPool().reloadConfiguration( +                server_settings.max_active_parts_loading_thread_pool_size, +                0, // We don't need any threads once all the parts will be loaded +                server_settings.max_active_parts_loading_thread_pool_size); + +            getOutdatedPartsLoadingThreadPool().reloadConfiguration( +                server_settings.max_outdated_parts_loading_thread_pool_size, +                0, // We don't need any threads once all the parts will be loaded +                server_settings.max_outdated_parts_loading_thread_pool_size); + +            /// It could grow if we need to synchronously wait until all the data parts will be loaded. +            getOutdatedPartsLoadingThreadPool().setMaxTurboThreads( +                server_settings.max_active_parts_loading_thread_pool_size +            ); + +            getPartsCleaningThreadPool().reloadConfiguration( +                server_settings.max_parts_cleaning_thread_pool_size, +                0, // We don't need any threads one all the parts will be deleted +                server_settings.max_parts_cleaning_thread_pool_size); + +            if (config->has("resources")) +            { +                global_context->getResourceManager()->updateConfiguration(*config); +            } + +            if (!initial_loading) +            { +                /// We do not load ZooKeeper configuration on the first config loading +                /// because TestKeeper server is not started yet. +                if (zkutil::hasZooKeeperConfig(*config)) +                    global_context->reloadZooKeeperIfChanged(config); + +                global_context->reloadAuxiliaryZooKeepersConfigIfChanged(config); + +                std::lock_guard lock(servers_lock); +                updateServers(*config, server_pool, async_metrics, servers, servers_to_start_before_tables); +            } + +            global_context->updateStorageConfiguration(*config); +            global_context->updateInterserverCredentials(*config); + +            global_context->updateUncompressedCacheConfiguration(*config); +            global_context->updateMarkCacheConfiguration(*config); +            global_context->updateIndexUncompressedCacheConfiguration(*config); +            global_context->updateIndexMarkCacheConfiguration(*config); +            global_context->updateMMappedFileCacheConfiguration(*config); +            global_context->updateQueryCacheConfiguration(*config); + +            CompressionCodecEncrypted::Configuration::instance().tryLoad(*config, "encryption_codecs"); +#if USE_SSL +            CertificateReloader::instance().tryLoad(*config); +#endif +            NamedCollectionUtils::reloadFromConfig(*config); + +            ProfileEvents::increment(ProfileEvents::MainConfigLoads); + +            /// Must be the last. +            latest_config = config; +        }, +        /* already_loaded = */ false);  /// Reload it right now (initial loading) + +    const auto listen_hosts = getListenHosts(config()); +    const auto interserver_listen_hosts = getInterserverListenHosts(config()); +    const auto listen_try = getListenTry(config()); + +    if (config().has("keeper_server")) +    { +#if USE_NURAFT +        //// If we don't have configured connection probably someone trying to use clickhouse-server instead +        //// of clickhouse-keeper, so start synchronously. +        bool can_initialize_keeper_async = false; + +        if (has_zookeeper) /// We have configured connection to some zookeeper cluster +        { +            /// If we cannot connect to some other node from our cluster then we have to wait our Keeper start +            /// synchronously. +            can_initialize_keeper_async = global_context->tryCheckClientConnectionToMyKeeperCluster(); +        } +        /// Initialize keeper RAFT. +        global_context->initializeKeeperDispatcher(can_initialize_keeper_async); +        FourLetterCommandFactory::registerCommands(*global_context->getKeeperDispatcher()); + +        auto config_getter = [this] () -> const Poco::Util::AbstractConfiguration & +        { +            return global_context->getConfigRef(); +        }; + +        for (const auto & listen_host : listen_hosts) +        { +            /// TCP Keeper +            const char * port_name = "keeper_server.tcp_port"; +            createServer( +                config(), listen_host, port_name, listen_try, /* start_server: */ false, +                servers_to_start_before_tables, +                [&](UInt16 port) -> ProtocolServerAdapter +                { +                    Poco::Net::ServerSocket socket; +                    auto address = socketBindListen(config(), socket, listen_host, port); +                    socket.setReceiveTimeout(Poco::Timespan(config().getUInt64("keeper_server.socket_receive_timeout_sec", DBMS_DEFAULT_RECEIVE_TIMEOUT_SEC), 0)); +                    socket.setSendTimeout(Poco::Timespan(config().getUInt64("keeper_server.socket_send_timeout_sec", DBMS_DEFAULT_SEND_TIMEOUT_SEC), 0)); +                    return ProtocolServerAdapter( +                        listen_host, +                        port_name, +                        "Keeper (tcp): " + address.toString(), +                        std::make_unique<TCPServer>( +                            new KeeperTCPHandlerFactory( +                                config_getter, global_context->getKeeperDispatcher(), +                                global_context->getSettingsRef().receive_timeout.totalSeconds(), +                                global_context->getSettingsRef().send_timeout.totalSeconds(), +                                false), server_pool, socket)); +                }); + +            const char * secure_port_name = "keeper_server.tcp_port_secure"; +            createServer( +                config(), listen_host, secure_port_name, listen_try, /* start_server: */ false, +                servers_to_start_before_tables, +                [&](UInt16 port) -> ProtocolServerAdapter +                { +#if USE_SSL +                    Poco::Net::SecureServerSocket socket; +                    auto address = socketBindListen(config(), socket, listen_host, port, /* secure = */ true); +                    socket.setReceiveTimeout(Poco::Timespan(config().getUInt64("keeper_server.socket_receive_timeout_sec", DBMS_DEFAULT_RECEIVE_TIMEOUT_SEC), 0)); +                    socket.setSendTimeout(Poco::Timespan(config().getUInt64("keeper_server.socket_send_timeout_sec", DBMS_DEFAULT_SEND_TIMEOUT_SEC), 0)); +                    return ProtocolServerAdapter( +                        listen_host, +                        secure_port_name, +                        "Keeper with secure protocol (tcp_secure): " + address.toString(), +                        std::make_unique<TCPServer>( +                            new KeeperTCPHandlerFactory( +                                config_getter, global_context->getKeeperDispatcher(), +                                global_context->getSettingsRef().receive_timeout.totalSeconds(), +                                global_context->getSettingsRef().send_timeout.totalSeconds(), true), server_pool, socket)); +#else +                    UNUSED(port); +                    throw Exception(ErrorCodes::SUPPORT_IS_DISABLED, "SSL support for TCP protocol is disabled because Poco library was built without NetSSL support."); +#endif +                }); +        } +#else +        throw Exception(ErrorCodes::SUPPORT_IS_DISABLED, "ClickHouse server built without NuRaft library. Cannot use internal coordination."); +#endif + +    } + +    { +        std::lock_guard lock(servers_lock); +        /// We should start interserver communications before (and more imporant shutdown after) tables. +        /// Because server can wait for a long-running queries (for example in tcp_handler) after interserver handler was already shut down. +        /// In this case we will have replicated tables which are unable to send any parts to other replicas, but still can +        /// communicate with zookeeper, execute merges, etc. +        createInterserverServers( +            config(), +            interserver_listen_hosts, +            listen_try, +            server_pool, +            async_metrics, +            servers_to_start_before_tables, +            /* start_servers= */ false); + + +        for (auto & server : servers_to_start_before_tables) +        { +            server.start(); +            LOG_INFO(log, "Listening for {}", server.getDescription()); +        } +    } + +    /// Initialize access storages. +    auto & access_control = global_context->getAccessControl(); +    try +    { +        access_control.setUpFromMainConfig(config(), config_path, [&] { return global_context->getZooKeeper(); }); +    } +    catch (...) +    { +        tryLogCurrentException(log, "Caught exception while setting up access control."); +        throw; +    } + +    /// Reload config in SYSTEM RELOAD CONFIG query. +    global_context->setConfigReloadCallback([&]() +    { +        main_config_reloader->reload(); +        access_control.reload(AccessControl::ReloadMode::USERS_CONFIG_ONLY); +    }); + +    global_context->setStopServersCallback([&](const ServerType & server_type) +    { +        stopServers(servers, server_type); +    }); + +    global_context->setStartServersCallback([&](const ServerType & server_type) +    { +        createServers( +            config(), +            listen_hosts, +            listen_try, +            server_pool, +            async_metrics, +            servers, +            /* start_servers= */ true, +            server_type); +    }); + +    /// Limit on total number of concurrently executed queries. +    global_context->getProcessList().setMaxSize(server_settings.max_concurrent_queries); + +    /// Load global settings from default_profile and system_profile. +    global_context->setDefaultProfiles(config()); + +    /// Initialize background executors after we load default_profile config. +    /// This is needed to load proper values of background_pool_size etc. +    global_context->initializeBackgroundExecutorsIfNeeded(); + +    if (server_settings.async_insert_threads) +    { +        global_context->setAsynchronousInsertQueue(std::make_shared<AsynchronousInsertQueue>( +            global_context, +            server_settings.async_insert_threads, +            server_settings.async_insert_queue_flush_on_shutdown)); +    } + +    /// Set path for format schema files +    fs::path format_schema_path(config().getString("format_schema_path", path / "format_schemas/")); +    global_context->setFormatSchemaPath(format_schema_path); +    fs::create_directories(format_schema_path); + +    /// Set path for filesystem caches +    fs::path filesystem_caches_path(config().getString("filesystem_caches_path", "")); +    if (!filesystem_caches_path.empty()) +        global_context->setFilesystemCachesPath(filesystem_caches_path); + +    /// Check sanity of MergeTreeSettings on server startup +    { +        size_t background_pool_tasks = global_context->getMergeMutateExecutor()->getMaxTasksCount(); +        global_context->getMergeTreeSettings().sanityCheck(background_pool_tasks); +        global_context->getReplicatedMergeTreeSettings().sanityCheck(background_pool_tasks); +    } +    /// try set up encryption. There are some errors in config, error will be printed and server wouldn't start. +    CompressionCodecEncrypted::Configuration::instance().load(config(), "encryption_codecs"); + +    SCOPE_EXIT({ +        async_metrics.stop(); + +        /** Ask to cancel background jobs all table engines, +          *  and also query_log. +          * It is important to do early, not in destructor of Context, because +          *  table engines could use Context on destroy. +          */ +        LOG_INFO(log, "Shutting down storages."); + +        global_context->shutdown(); + +        LOG_DEBUG(log, "Shut down storages."); + +        if (!servers_to_start_before_tables.empty()) +        { +            LOG_DEBUG(log, "Waiting for current connections to servers for tables to finish."); +            size_t current_connections = 0; +            { +                std::lock_guard lock(servers_lock); +                for (auto & server : servers_to_start_before_tables) +                { +                    server.stop(); +                    current_connections += server.currentConnections(); +                } +            } + +            if (current_connections) +                LOG_INFO(log, "Closed all listening sockets. Waiting for {} outstanding connections.", current_connections); +            else +                LOG_INFO(log, "Closed all listening sockets."); + +            if (current_connections > 0) +                current_connections = waitServersToFinish(servers_to_start_before_tables, servers_lock, config().getInt("shutdown_wait_unfinished", 5)); + +            if (current_connections) +                LOG_INFO(log, "Closed connections to servers for tables. But {} remain. Probably some tables of other users cannot finish their connections after context shutdown.", current_connections); +            else +                LOG_INFO(log, "Closed connections to servers for tables."); + +            global_context->shutdownKeeperDispatcher(); +        } + +        /// Wait server pool to avoid use-after-free of destroyed context in the handlers +        server_pool.joinAll(); + +        /** Explicitly destroy Context. It is more convenient than in destructor of Server, because logger is still available. +          * At this moment, no one could own shared part of Context. +          */ +        global_context.reset(); +        shared_context.reset(); +        LOG_DEBUG(log, "Destroyed global context."); +    }); + +    /// DNSCacheUpdater uses BackgroundSchedulePool which lives in shared context +    /// and thus this object must be created after the SCOPE_EXIT object where shared +    /// context is destroyed. +    /// In addition this object has to be created before the loading of the tables. +    std::unique_ptr<DNSCacheUpdater> dns_cache_updater; +    if (server_settings.disable_internal_dns_cache) +    { +        /// Disable DNS caching at all +        DNSResolver::instance().setDisableCacheFlag(); +        LOG_DEBUG(log, "DNS caching disabled"); +    } +    else +    { +        /// Initialize a watcher periodically updating DNS cache +        dns_cache_updater = std::make_unique<DNSCacheUpdater>( +            global_context, server_settings.dns_cache_update_period, server_settings.dns_max_consecutive_failures); +    } + +    if (dns_cache_updater) +        dns_cache_updater->start(); + +    /// Set current database name before loading tables and databases because +    /// system logs may copy global context. +    global_context->setCurrentDatabaseNameInGlobalContext(default_database); + +    LOG_INFO(log, "Loading metadata from {}", path_str); + +    try +    { +        auto & database_catalog = DatabaseCatalog::instance(); +        /// We load temporary database first, because projections need it. +        database_catalog.initializeAndLoadTemporaryDatabase(); +        loadMetadataSystem(global_context); +        maybeConvertSystemDatabase(global_context); +        /// This has to be done before the initialization of system logs, +        /// otherwise there is a race condition between the system database initialization +        /// and creation of new tables in the database. +        startupSystemTables(); +        /// After attaching system databases we can initialize system log. +        global_context->initializeSystemLogs(); +        global_context->setSystemZooKeeperLogAfterInitializationIfNeeded(); +        /// Build loggers before tables startup to make log messages from tables +        /// attach available in system.text_log +        buildLoggers(config(), logger()); +        /// After the system database is created, attach virtual system tables (in addition to query_log and part_log) +        attachSystemTablesServer(global_context, *database_catalog.getSystemDatabase(), has_zookeeper); +        attachInformationSchema(global_context, *database_catalog.getDatabase(DatabaseCatalog::INFORMATION_SCHEMA)); +        attachInformationSchema(global_context, *database_catalog.getDatabase(DatabaseCatalog::INFORMATION_SCHEMA_UPPERCASE)); +        /// Firstly remove partially dropped databases, to avoid race with MaterializedMySQLSyncThread, +        /// that may execute DROP before loadMarkedAsDroppedTables() in background, +        /// and so loadMarkedAsDroppedTables() will find it and try to add, and UUID will overlap. +        database_catalog.loadMarkedAsDroppedTables(); +        database_catalog.createBackgroundTasks(); +        /// Then, load remaining databases +        loadMetadata(global_context, default_database); +        convertDatabasesEnginesIfNeed(global_context); +        database_catalog.startupBackgroundCleanup(); +        /// After loading validate that default database exists +        database_catalog.assertDatabaseExists(default_database); +        /// Load user-defined SQL functions. +        global_context->getUserDefinedSQLObjectsLoader().loadObjects(); +    } +    catch (...) +    { +        tryLogCurrentException(log, "Caught exception while loading metadata"); +        throw; +    } +    LOG_DEBUG(log, "Loaded metadata."); + +    /// Init trace collector only after trace_log system table was created +    /// Disable it if we collect test coverage information, because it will work extremely slow. +#if !WITH_COVERAGE +    /// Profilers cannot work reliably with any other libunwind or without PHDR cache. +    if (hasPHDRCache()) +    { +        global_context->initializeTraceCollector(); + +        /// Set up server-wide memory profiler (for total memory tracker). +        if (server_settings.total_memory_profiler_step) +        { +            total_memory_tracker.setProfilerStep(server_settings.total_memory_profiler_step); +        } + +        if (server_settings.total_memory_tracker_sample_probability > 0.0) +        { +            total_memory_tracker.setSampleProbability(server_settings.total_memory_tracker_sample_probability); +        } + +        if (server_settings.total_memory_profiler_sample_min_allocation_size) +        { +            total_memory_tracker.setSampleMinAllocationSize(server_settings.total_memory_profiler_sample_min_allocation_size); +        } + +        if (server_settings.total_memory_profiler_sample_max_allocation_size) +        { +            total_memory_tracker.setSampleMaxAllocationSize(server_settings.total_memory_profiler_sample_max_allocation_size); +        } + +    } +#endif + +    /// Describe multiple reasons when query profiler cannot work. + +#if WITH_COVERAGE +    LOG_INFO(log, "Query Profiler and TraceCollector are disabled because they work extremely slow with test coverage."); +#endif + +#if defined(SANITIZER) +    LOG_INFO(log, "Query Profiler disabled because they cannot work under sanitizers" +        " when two different stack unwinding methods will interfere with each other."); +#endif + +#if !defined(__x86_64__) +    LOG_INFO(log, "Query Profiler and TraceCollector is only tested on x86_64. It also known to not work under qemu-user."); +#endif + +    if (!hasPHDRCache()) +        LOG_INFO(log, "Query Profiler and TraceCollector are disabled because they require PHDR cache to be created" +            " (otherwise the function 'dl_iterate_phdr' is not lock free and not async-signal safe)."); + +#if defined(OS_LINUX) +    auto tasks_stats_provider = TasksStatsCounters::findBestAvailableProvider(); +    if (tasks_stats_provider == TasksStatsCounters::MetricsProvider::None) +    { +        LOG_INFO(log, "It looks like this system does not have procfs mounted at /proc location," +            " neither clickhouse-server process has CAP_NET_ADMIN capability." +            " 'taskstats' performance statistics will be disabled." +            " It could happen due to incorrect ClickHouse package installation." +            " You can try to resolve the problem manually with 'sudo setcap cap_net_admin=+ep {}'." +            " Note that it will not work on 'nosuid' mounted filesystems." +            " It also doesn't work if you run clickhouse-server inside network namespace as it happens in some containers.", +            executable_path); +    } +    else +    { +        LOG_INFO(log, "Tasks stats provider: {}", TasksStatsCounters::metricsProviderString(tasks_stats_provider)); +    } + +    if (!hasLinuxCapability(CAP_SYS_NICE)) +    { +        LOG_INFO(log, "It looks like the process has no CAP_SYS_NICE capability, the setting 'os_thread_priority' will have no effect." +            " It could happen due to incorrect ClickHouse package installation." +            " You could resolve the problem manually with 'sudo setcap cap_sys_nice=+ep {}'." +            " Note that it will not work on 'nosuid' mounted filesystems.", +            executable_path); +    } +#else +    LOG_INFO(log, "TaskStats is not implemented for this OS. IO accounting will be disabled."); +#endif + +    { +        attachSystemTablesAsync(global_context, *DatabaseCatalog::instance().getSystemDatabase(), async_metrics); + +        { +            std::lock_guard lock(servers_lock); +            createServers(config(), listen_hosts, listen_try, server_pool, async_metrics, servers); +            if (servers.empty()) +                throw Exception(ErrorCodes::NO_ELEMENTS_IN_CONFIG, +                                "No servers started (add valid listen_host and 'tcp_port' or 'http_port' " +                                "to configuration file.)"); +        } + +        if (servers.empty()) +             throw Exception(ErrorCodes::NO_ELEMENTS_IN_CONFIG, +                             "No servers started (add valid listen_host and 'tcp_port' or 'http_port' " +                             "to configuration file.)"); + +#if USE_SSL +        CertificateReloader::instance().tryLoad(config()); +#endif + +        /// Must be done after initialization of `servers`, because async_metrics will access `servers` variable from its thread. +        async_metrics.start(); + +        main_config_reloader->start(); +        access_control.startPeriodicReloading(); + +        /// try to load dictionaries immediately, throw on error and die +        try +        { +            global_context->loadOrReloadDictionaries(config()); +        } +        catch (...) +        { +            tryLogCurrentException(log, "Caught exception while loading dictionaries."); +            throw; +        } + +        /// try to load embedded dictionaries immediately, throw on error and die +        try +        { +            global_context->tryCreateEmbeddedDictionaries(config()); +        } +        catch (...) +        { +            tryLogCurrentException(log, "Caught exception while loading embedded dictionaries."); +            throw; +        } + +        /// try to load user defined executable functions, throw on error and die +        try +        { +            global_context->loadOrReloadUserDefinedExecutableFunctions(config()); +        } +        catch (...) +        { +            tryLogCurrentException(log, "Caught exception while loading user defined executable functions."); +            throw; +        } + +        if (has_zookeeper && config().has("distributed_ddl")) +        { +            /// DDL worker should be started after all tables were loaded +            String ddl_zookeeper_path = config().getString("distributed_ddl.path", "/clickhouse/task_queue/ddl/"); +            int pool_size = config().getInt("distributed_ddl.pool_size", 1); +            if (pool_size < 1) +                throw Exception(ErrorCodes::ARGUMENT_OUT_OF_BOUND, "distributed_ddl.pool_size should be greater then 0"); +            global_context->setDDLWorker(std::make_unique<DDLWorker>(pool_size, ddl_zookeeper_path, global_context, &config(), +                                                                     "distributed_ddl", "DDLWorker", +                                                                     &CurrentMetrics::MaxDDLEntryID, &CurrentMetrics::MaxPushedDDLEntryID)); +        } + +        { +            std::lock_guard lock(servers_lock); +            for (auto & server : servers) +            { +                server.start(); +                LOG_INFO(log, "Listening for {}", server.getDescription()); +            } + +            global_context->setServerCompletelyStarted(); +            LOG_INFO(log, "Ready for connections."); +        } + +        startup_watch.stop(); +        ProfileEvents::increment(ProfileEvents::ServerStartupMilliseconds, startup_watch.elapsedMilliseconds()); + +        try +        { +            global_context->startClusterDiscovery(); +        } +        catch (...) +        { +            tryLogCurrentException(log, "Caught exception while starting cluster discovery"); +        } + +#if defined(OS_LINUX) +        /// Tell the service manager that service startup is finished. +        /// NOTE: the parent clickhouse-watchdog process must do systemdNotify("MAINPID={}\n", child_pid); before +        /// the child process notifies 'READY=1'. +        systemdNotify("READY=1\n"); +#endif + +        SCOPE_EXIT_SAFE({ +            LOG_DEBUG(log, "Received termination signal."); + +            /// Stop reloading of the main config. This must be done before everything else because it +            /// can try to access/modify already deleted objects. +            /// E.g. it can recreate new servers or it may pass a changed config to some destroyed parts of ContextSharedPart. +            main_config_reloader.reset(); +            access_control.stopPeriodicReloading(); + +            is_cancelled = true; + +            LOG_DEBUG(log, "Waiting for current connections to close."); + +            size_t current_connections = 0; +            { +                std::lock_guard lock(servers_lock); +                for (auto & server : servers) +                { +                    server.stop(); +                    current_connections += server.currentConnections(); +                } +            } + +            if (current_connections) +                LOG_WARNING(log, "Closed all listening sockets. Waiting for {} outstanding connections.", current_connections); +            else +                LOG_INFO(log, "Closed all listening sockets."); + +            /// Killing remaining queries. +            if (!server_settings.shutdown_wait_unfinished_queries) +                global_context->getProcessList().killAllQueries(); + +            if (current_connections) +                current_connections = waitServersToFinish(servers, servers_lock, config().getInt("shutdown_wait_unfinished", 5)); + +            if (current_connections) +                LOG_WARNING(log, "Closed connections. But {} remain." +                    " Tip: To increase wait time add to config: <shutdown_wait_unfinished>60</shutdown_wait_unfinished>", current_connections); +            else +                LOG_INFO(log, "Closed connections."); + +            dns_cache_updater.reset(); + +            if (current_connections) +            { +                /// There is no better way to force connections to close in Poco. +                /// Otherwise connection handlers will continue to live +                /// (they are effectively dangling objects, but they use global thread pool +                ///  and global thread pool destructor will wait for threads, preventing server shutdown). + +                /// Dump coverage here, because std::atexit callback would not be called. +                dumpCoverageReportIfPossible(); +                LOG_WARNING(log, "Will shutdown forcefully."); +                safeExit(0); +            } +        }); + +        std::vector<std::unique_ptr<MetricsTransmitter>> metrics_transmitters; +        for (const auto & graphite_key : DB::getMultipleKeysFromConfig(config(), "", "graphite")) +        { +            metrics_transmitters.emplace_back(std::make_unique<MetricsTransmitter>( +                global_context->getConfigRef(), graphite_key, async_metrics)); +        } + +        waitForTerminationRequest(); +    } + +    return Application::EXIT_OK; +} +catch (...) +{ +    /// Poco does not provide stacktrace. +    tryLogCurrentException("Application"); +    throw; +} + +std::unique_ptr<TCPProtocolStackFactory> Server::buildProtocolStackFromConfig( +    const Poco::Util::AbstractConfiguration & config, +    const std::string & protocol, +    Poco::Net::HTTPServerParams::Ptr http_params, +    AsynchronousMetrics & async_metrics, +    bool & is_secure) +{ +    auto create_factory = [&](const std::string & type, const std::string & conf_name) -> TCPServerConnectionFactory::Ptr +    { +        if (type == "tcp") +            return TCPServerConnectionFactory::Ptr(new TCPHandlerFactory(*this, false, false)); + +        if (type == "tls") +#if USE_SSL +            return TCPServerConnectionFactory::Ptr(new TLSHandlerFactory(*this, conf_name)); +#else +            throw Exception(ErrorCodes::SUPPORT_IS_DISABLED, "SSL support for TCP protocol is disabled because Poco library was built without NetSSL support."); +#endif + +        if (type == "proxy1") +            return TCPServerConnectionFactory::Ptr(new ProxyV1HandlerFactory(*this, conf_name)); +        if (type == "mysql") +            return TCPServerConnectionFactory::Ptr(new MySQLHandlerFactory(*this)); +        if (type == "postgres") +            return TCPServerConnectionFactory::Ptr(new PostgreSQLHandlerFactory(*this)); +        if (type == "http") +            return TCPServerConnectionFactory::Ptr( +                new HTTPServerConnectionFactory(httpContext(), http_params, createHandlerFactory(*this, config, async_metrics, "HTTPHandler-factory")) +            ); +        if (type == "prometheus") +            return TCPServerConnectionFactory::Ptr( +                new HTTPServerConnectionFactory(httpContext(), http_params, createHandlerFactory(*this, config, async_metrics, "PrometheusHandler-factory")) +            ); +        if (type == "interserver") +            return TCPServerConnectionFactory::Ptr( +                new HTTPServerConnectionFactory(httpContext(), http_params, createHandlerFactory(*this, config, async_metrics, "InterserverIOHTTPHandler-factory")) +            ); + +        throw Exception(ErrorCodes::INVALID_CONFIG_PARAMETER, "Protocol configuration error, unknown protocol name '{}'", type); +    }; + +    std::string conf_name = "protocols." + protocol; +    std::string prefix = conf_name + "."; +    std::unordered_set<std::string> pset {conf_name}; + +    auto stack = std::make_unique<TCPProtocolStackFactory>(*this, conf_name); + +    while (true) +    { +        // if there is no "type" - it's a reference to another protocol and this is just an endpoint +        if (config.has(prefix + "type")) +        { +            std::string type = config.getString(prefix + "type"); +            if (type == "tls") +            { +                if (is_secure) +                    throw Exception(ErrorCodes::INVALID_CONFIG_PARAMETER, "Protocol '{}' contains more than one TLS layer", protocol); +                is_secure = true; +            } + +            stack->append(create_factory(type, conf_name)); +        } + +        if (!config.has(prefix + "impl")) +            break; + +        conf_name = "protocols." + config.getString(prefix + "impl"); +        prefix = conf_name + "."; + +        if (!pset.insert(conf_name).second) +            throw Exception(ErrorCodes::INVALID_CONFIG_PARAMETER, "Protocol '{}' configuration contains a loop on '{}'", protocol, conf_name); +    } + +    return stack; +} + +HTTPContextPtr Server::httpContext() const +{ +    return std::make_shared<HTTPContext>(context()); +} + +void Server::createServers( +    Poco::Util::AbstractConfiguration & config, +    const Strings & listen_hosts, +    bool listen_try, +    Poco::ThreadPool & server_pool, +    AsynchronousMetrics & async_metrics, +    std::vector<ProtocolServerAdapter> & servers, +    bool start_servers, +    const ServerType & server_type) +{ +    const Settings & settings = global_context->getSettingsRef(); + +    Poco::Timespan keep_alive_timeout(config.getUInt("keep_alive_timeout", 10), 0); +    Poco::Net::HTTPServerParams::Ptr http_params = new Poco::Net::HTTPServerParams; +    http_params->setTimeout(settings.http_receive_timeout); +    http_params->setKeepAliveTimeout(keep_alive_timeout); + +    Poco::Util::AbstractConfiguration::Keys protocols; +    config.keys("protocols", protocols); + +    for (const auto & protocol : protocols) +    { +        if (!server_type.shouldStart(ServerType::Type::CUSTOM, protocol)) +            continue; + +        std::string prefix = "protocols." + protocol + "."; +        std::string port_name = prefix + "port"; +        std::string description {"<undefined> protocol"}; +        if (config.has(prefix + "description")) +            description = config.getString(prefix + "description"); + +        if (!config.has(prefix + "port")) +            continue; + +        std::vector<std::string> hosts; +        if (config.has(prefix + "host")) +            hosts.push_back(config.getString(prefix + "host")); +        else +            hosts = listen_hosts; + +        for (const auto & host : hosts) +        { +            bool is_secure = false; +            auto stack = buildProtocolStackFromConfig(config, protocol, http_params, async_metrics, is_secure); + +            if (stack->empty()) +                throw Exception(ErrorCodes::INVALID_CONFIG_PARAMETER, "Protocol '{}' stack empty", protocol); + +            createServer(config, host, port_name.c_str(), listen_try, start_servers, servers, [&](UInt16 port) -> ProtocolServerAdapter +            { +                Poco::Net::ServerSocket socket; +                auto address = socketBindListen(config, socket, host, port, is_secure); +                socket.setReceiveTimeout(settings.receive_timeout); +                socket.setSendTimeout(settings.send_timeout); + +                return ProtocolServerAdapter( +                    host, +                    port_name.c_str(), +                    description + ": " + address.toString(), +                    std::make_unique<TCPServer>( +                        stack.release(), +                        server_pool, +                        socket, +                        new Poco::Net::TCPServerParams)); +            }); +        } +    } + +    for (const auto & listen_host : listen_hosts) +    { +        const char * port_name; + +        if (server_type.shouldStart(ServerType::Type::HTTP)) +        { +            /// HTTP +            port_name = "http_port"; +            createServer(config, listen_host, port_name, listen_try, start_servers, servers, [&](UInt16 port) -> ProtocolServerAdapter +            { +                Poco::Net::ServerSocket socket; +                auto address = socketBindListen(config, socket, listen_host, port); +                socket.setReceiveTimeout(settings.http_receive_timeout); +                socket.setSendTimeout(settings.http_send_timeout); + +                return ProtocolServerAdapter( +                    listen_host, +                    port_name, +                    "http://" + address.toString(), +                    std::make_unique<HTTPServer>( +                        httpContext(), createHandlerFactory(*this, config, async_metrics, "HTTPHandler-factory"), server_pool, socket, http_params)); +            }); +        } + +        if (server_type.shouldStart(ServerType::Type::HTTPS)) +        { +            /// HTTPS +            port_name = "https_port"; +            createServer(config, listen_host, port_name, listen_try, start_servers, servers, [&](UInt16 port) -> ProtocolServerAdapter +            { +#if USE_SSL +                Poco::Net::SecureServerSocket socket; +                auto address = socketBindListen(config, socket, listen_host, port, /* secure = */ true); +                socket.setReceiveTimeout(settings.http_receive_timeout); +                socket.setSendTimeout(settings.http_send_timeout); +                return ProtocolServerAdapter( +                    listen_host, +                    port_name, +                    "https://" + address.toString(), +                    std::make_unique<HTTPServer>( +                        httpContext(), createHandlerFactory(*this, config, async_metrics, "HTTPSHandler-factory"), server_pool, socket, http_params)); +#else +                UNUSED(port); +                throw Exception(ErrorCodes::SUPPORT_IS_DISABLED, "HTTPS protocol is disabled because Poco library was built without NetSSL support."); +#endif +            }); +        } + +        if (server_type.shouldStart(ServerType::Type::TCP)) +        { +            /// TCP +            port_name = "tcp_port"; +            createServer(config, listen_host, port_name, listen_try, start_servers, servers, [&](UInt16 port) -> ProtocolServerAdapter +            { +                Poco::Net::ServerSocket socket; +                auto address = socketBindListen(config, socket, listen_host, port); +                socket.setReceiveTimeout(settings.receive_timeout); +                socket.setSendTimeout(settings.send_timeout); +                return ProtocolServerAdapter( +                    listen_host, +                    port_name, +                    "native protocol (tcp): " + address.toString(), +                    std::make_unique<TCPServer>( +                        new TCPHandlerFactory(*this, /* secure */ false, /* proxy protocol */ false), +                        server_pool, +                        socket, +                        new Poco::Net::TCPServerParams)); +            }); +        } + +        if (server_type.shouldStart(ServerType::Type::TCP_WITH_PROXY)) +        { +            /// TCP with PROXY protocol, see https://github.com/wolfeidau/proxyv2/blob/master/docs/proxy-protocol.txt +            port_name = "tcp_with_proxy_port"; +            createServer(config, listen_host, port_name, listen_try, start_servers, servers, [&](UInt16 port) -> ProtocolServerAdapter +            { +                Poco::Net::ServerSocket socket; +                auto address = socketBindListen(config, socket, listen_host, port); +                socket.setReceiveTimeout(settings.receive_timeout); +                socket.setSendTimeout(settings.send_timeout); +                return ProtocolServerAdapter( +                    listen_host, +                    port_name, +                    "native protocol (tcp) with PROXY: " + address.toString(), +                    std::make_unique<TCPServer>( +                        new TCPHandlerFactory(*this, /* secure */ false, /* proxy protocol */ true), +                        server_pool, +                        socket, +                        new Poco::Net::TCPServerParams)); +            }); +        } + +        if (server_type.shouldStart(ServerType::Type::TCP_SECURE)) +        { +            /// TCP with SSL +            port_name = "tcp_port_secure"; +            createServer(config, listen_host, port_name, listen_try, start_servers, servers, [&](UInt16 port) -> ProtocolServerAdapter +            { +    #if USE_SSL +                Poco::Net::SecureServerSocket socket; +                auto address = socketBindListen(config, socket, listen_host, port, /* secure = */ true); +                socket.setReceiveTimeout(settings.receive_timeout); +                socket.setSendTimeout(settings.send_timeout); +                return ProtocolServerAdapter( +                    listen_host, +                    port_name, +                    "secure native protocol (tcp_secure): " + address.toString(), +                    std::make_unique<TCPServer>( +                        new TCPHandlerFactory(*this, /* secure */ true, /* proxy protocol */ false), +                        server_pool, +                        socket, +                        new Poco::Net::TCPServerParams)); +    #else +                UNUSED(port); +                throw Exception(ErrorCodes::SUPPORT_IS_DISABLED, "SSL support for TCP protocol is disabled because Poco library was built without NetSSL support."); +    #endif +            }); +        } + +        if (server_type.shouldStart(ServerType::Type::MYSQL)) +        { +            port_name = "mysql_port"; +            createServer(config, listen_host, port_name, listen_try, start_servers, servers, [&](UInt16 port) -> ProtocolServerAdapter +            { +                Poco::Net::ServerSocket socket; +                auto address = socketBindListen(config, socket, listen_host, port, /* secure = */ true); +                socket.setReceiveTimeout(Poco::Timespan()); +                socket.setSendTimeout(settings.send_timeout); +                return ProtocolServerAdapter( +                    listen_host, +                    port_name, +                    "MySQL compatibility protocol: " + address.toString(), +                    std::make_unique<TCPServer>(new MySQLHandlerFactory(*this), server_pool, socket, new Poco::Net::TCPServerParams)); +            }); +        } + +        if (server_type.shouldStart(ServerType::Type::POSTGRESQL)) +        { +            port_name = "postgresql_port"; +            createServer(config, listen_host, port_name, listen_try, start_servers, servers, [&](UInt16 port) -> ProtocolServerAdapter +            { +                Poco::Net::ServerSocket socket; +                auto address = socketBindListen(config, socket, listen_host, port, /* secure = */ true); +                socket.setReceiveTimeout(Poco::Timespan()); +                socket.setSendTimeout(settings.send_timeout); +                return ProtocolServerAdapter( +                    listen_host, +                    port_name, +                    "PostgreSQL compatibility protocol: " + address.toString(), +                    std::make_unique<TCPServer>(new PostgreSQLHandlerFactory(*this), server_pool, socket, new Poco::Net::TCPServerParams)); +            }); +        } + +#if USE_GRPC +        if (server_type.shouldStart(ServerType::Type::GRPC)) +        { +            port_name = "grpc_port"; +            createServer(config, listen_host, port_name, listen_try, start_servers, servers, [&](UInt16 port) -> ProtocolServerAdapter +            { +                Poco::Net::SocketAddress server_address(listen_host, port); +                return ProtocolServerAdapter( +                    listen_host, +                    port_name, +                    "gRPC protocol: " + server_address.toString(), +                    std::make_unique<GRPCServer>(*this, makeSocketAddress(listen_host, port, &logger()))); +            }); +        } +#endif +        if (server_type.shouldStart(ServerType::Type::PROMETHEUS)) +        { +            /// Prometheus (if defined and not setup yet with http_port) +            port_name = "prometheus.port"; +            createServer(config, listen_host, port_name, listen_try, start_servers, servers, [&](UInt16 port) -> ProtocolServerAdapter +            { +                Poco::Net::ServerSocket socket; +                auto address = socketBindListen(config, socket, listen_host, port); +                socket.setReceiveTimeout(settings.http_receive_timeout); +                socket.setSendTimeout(settings.http_send_timeout); +                return ProtocolServerAdapter( +                    listen_host, +                    port_name, +                    "Prometheus: http://" + address.toString(), +                    std::make_unique<HTTPServer>( +                        httpContext(), createHandlerFactory(*this, config, async_metrics, "PrometheusHandler-factory"), server_pool, socket, http_params)); +            }); +        } +    } +} + +void Server::createInterserverServers( +    Poco::Util::AbstractConfiguration & config, +    const Strings & interserver_listen_hosts, +    bool listen_try, +    Poco::ThreadPool & server_pool, +    AsynchronousMetrics & async_metrics, +    std::vector<ProtocolServerAdapter> & servers, +    bool start_servers, +    const ServerType & server_type) +{ +    const Settings & settings = global_context->getSettingsRef(); + +    Poco::Timespan keep_alive_timeout(config.getUInt("keep_alive_timeout", 10), 0); +    Poco::Net::HTTPServerParams::Ptr http_params = new Poco::Net::HTTPServerParams; +    http_params->setTimeout(settings.http_receive_timeout); +    http_params->setKeepAliveTimeout(keep_alive_timeout); + +    /// Now iterate over interserver_listen_hosts +    for (const auto & interserver_listen_host : interserver_listen_hosts) +    { +        const char * port_name; + +        if (server_type.shouldStart(ServerType::Type::INTERSERVER_HTTP)) +        { +            /// Interserver IO HTTP +            port_name = "interserver_http_port"; +            createServer(config, interserver_listen_host, port_name, listen_try, start_servers, servers, [&](UInt16 port) -> ProtocolServerAdapter +            { +                Poco::Net::ServerSocket socket; +                auto address = socketBindListen(config, socket, interserver_listen_host, port); +                socket.setReceiveTimeout(settings.http_receive_timeout); +                socket.setSendTimeout(settings.http_send_timeout); +                return ProtocolServerAdapter( +                    interserver_listen_host, +                    port_name, +                    "replica communication (interserver): http://" + address.toString(), +                    std::make_unique<HTTPServer>( +                        httpContext(), +                        createHandlerFactory(*this, config, async_metrics, "InterserverIOHTTPHandler-factory"), +                        server_pool, +                        socket, +                        http_params)); +            }); +        } + +        if (server_type.shouldStart(ServerType::Type::INTERSERVER_HTTPS)) +        { +            port_name = "interserver_https_port"; +            createServer(config, interserver_listen_host, port_name, listen_try, start_servers, servers, [&](UInt16 port) -> ProtocolServerAdapter +            { +#if USE_SSL +                Poco::Net::SecureServerSocket socket; +                auto address = socketBindListen(config, socket, interserver_listen_host, port, /* secure = */ true); +                socket.setReceiveTimeout(settings.http_receive_timeout); +                socket.setSendTimeout(settings.http_send_timeout); +                return ProtocolServerAdapter( +                    interserver_listen_host, +                    port_name, +                    "secure replica communication (interserver): https://" + address.toString(), +                    std::make_unique<HTTPServer>( +                        httpContext(), +                        createHandlerFactory(*this, config, async_metrics, "InterserverIOHTTPSHandler-factory"), +                        server_pool, +                        socket, +                        http_params)); +#else +                UNUSED(port); +                throw Exception(ErrorCodes::SUPPORT_IS_DISABLED, "SSL support for TCP protocol is disabled because Poco library was built without NetSSL support."); +#endif +            }); +        } +    } +} + +void Server::stopServers( +    std::vector<ProtocolServerAdapter> & servers, +    const ServerType & server_type +) const +{ +    Poco::Logger * log = &logger(); + +    /// Remove servers once all their connections are closed +    auto check_server = [&log](const char prefix[], auto & server) +    { +        if (!server.isStopping()) +            return false; +        size_t current_connections = server.currentConnections(); +        LOG_DEBUG(log, "Server {}{}: {} ({} connections)", +            server.getDescription(), +            prefix, +            !current_connections ? "finished" : "waiting", +            current_connections); +        return !current_connections; +    }; + +    std::erase_if(servers, std::bind_front(check_server, " (from one of previous remove)")); + +    for (auto & server : servers) +    { +        if (!server.isStopping()) +        { +            const std::string server_port_name = server.getPortName(); + +            if (server_type.shouldStop(server_port_name)) +                server.stop(); +        } +    } + +    std::erase_if(servers, std::bind_front(check_server, "")); +} + +void Server::updateServers( +    Poco::Util::AbstractConfiguration & config, +    Poco::ThreadPool & server_pool, +    AsynchronousMetrics & async_metrics, +    std::vector<ProtocolServerAdapter> & servers, +    std::vector<ProtocolServerAdapter> & servers_to_start_before_tables) +{ +    Poco::Logger * log = &logger(); + +    const auto listen_hosts = getListenHosts(config); +    const auto interserver_listen_hosts = getInterserverListenHosts(config); +    const auto listen_try = getListenTry(config); + +    /// Remove servers once all their connections are closed +    auto check_server = [&log](const char prefix[], auto & server) +    { +        if (!server.isStopping()) +            return false; +        size_t current_connections = server.currentConnections(); +        LOG_DEBUG(log, "Server {}{}: {} ({} connections)", +            server.getDescription(), +            prefix, +            !current_connections ? "finished" : "waiting", +            current_connections); +        return !current_connections; +    }; + +    std::erase_if(servers, std::bind_front(check_server, " (from one of previous reload)")); + +    Poco::Util::AbstractConfiguration & previous_config = latest_config ? *latest_config : this->config(); + +    std::vector<ProtocolServerAdapter *> all_servers; +    all_servers.reserve(servers.size() + servers_to_start_before_tables.size()); +    for (auto & server : servers) +        all_servers.push_back(&server); + +    for (auto & server : servers_to_start_before_tables) +        all_servers.push_back(&server); + +    for (auto * server : all_servers) +    { +        if (!server->isStopping()) +        { +            std::string port_name = server->getPortName(); +            bool has_host = false; +            bool is_http = false; +            if (port_name.starts_with("protocols.")) +            { +                std::string protocol = port_name.substr(0, port_name.find_last_of('.')); +                has_host = config.has(protocol + ".host"); + +                std::string conf_name = protocol; +                std::string prefix = protocol + "."; +                std::unordered_set<std::string> pset {conf_name}; +                while (true) +                { +                    if (config.has(prefix + "type")) +                    { +                        std::string type = config.getString(prefix + "type"); +                        if (type == "http") +                        { +                            is_http = true; +                            break; +                        } +                    } + +                    if (!config.has(prefix + "impl")) +                        break; + +                    conf_name = "protocols." + config.getString(prefix + "impl"); +                    prefix = conf_name + "."; + +                    if (!pset.insert(conf_name).second) +                        throw Exception(ErrorCodes::INVALID_CONFIG_PARAMETER, "Protocol '{}' configuration contains a loop on '{}'", protocol, conf_name); +                } +            } +            else +            { +                /// NOTE: better to compare using getPortName() over using +                /// dynamic_cast<> since HTTPServer is also used for prometheus and +                /// internal replication communications. +                is_http = server->getPortName() == "http_port" || server->getPortName() == "https_port"; +            } + +            if (!has_host) +                has_host = std::find(listen_hosts.begin(), listen_hosts.end(), server->getListenHost()) != listen_hosts.end(); +            bool has_port = !config.getString(port_name, "").empty(); +            bool force_restart = is_http && !isSameConfiguration(previous_config, config, "http_handlers"); +            if (force_restart) +                LOG_TRACE(log, "<http_handlers> had been changed, will reload {}", server->getDescription()); + +            if (!has_host || !has_port || config.getInt(server->getPortName()) != server->portNumber() || force_restart) +            { +                server->stop(); +                LOG_INFO(log, "Stopped listening for {}", server->getDescription()); +            } +        } +    } + +    createServers(config, listen_hosts, listen_try, server_pool, async_metrics, servers, /* start_servers= */ true); +    createInterserverServers(config, interserver_listen_hosts, listen_try, server_pool, async_metrics, servers_to_start_before_tables, /* start_servers= */ true); + +    std::erase_if(servers, std::bind_front(check_server, "")); +    std::erase_if(servers_to_start_before_tables, std::bind_front(check_server, "")); +} + +}  | 
