diff options
| author | AlexSm <[email protected]> | 2024-01-04 15:09:05 +0100 |
|---|---|---|
| committer | GitHub <[email protected]> | 2024-01-04 15:09:05 +0100 |
| commit | dab291146f6cd7d35684e3a1150e5bb1c412982c (patch) | |
| tree | 36ef35f6cacb6432845a4a33f940c95871036b32 /contrib/clickhouse/src/Databases/DatabaseFactory.cpp | |
| parent | 63660ad5e7512029fd0218e7a636580695a24e1f (diff) | |
Library import 5, delete go dependencies (#832)
* Library import 5, delete go dependencies
* Fix yt client
Diffstat (limited to 'contrib/clickhouse/src/Databases/DatabaseFactory.cpp')
| -rw-r--r-- | contrib/clickhouse/src/Databases/DatabaseFactory.cpp | 501 |
1 files changed, 0 insertions, 501 deletions
diff --git a/contrib/clickhouse/src/Databases/DatabaseFactory.cpp b/contrib/clickhouse/src/Databases/DatabaseFactory.cpp deleted file mode 100644 index fb7d46cd441..00000000000 --- a/contrib/clickhouse/src/Databases/DatabaseFactory.cpp +++ /dev/null @@ -1,501 +0,0 @@ -#include <Databases/DatabaseFactory.h> - -#include <filesystem> -#include <Databases/DatabaseAtomic.h> -#include <Databases/DatabaseDictionary.h> -#include <Databases/DatabaseFilesystem.h> -#include <Databases/DatabaseLazy.h> -#include <Databases/DatabaseMemory.h> -#include <Databases/DatabaseOrdinary.h> -#include <Databases/DatabaseReplicated.h> -#include <Interpreters/Context.h> -#include <Interpreters/evaluateConstantExpression.h> -#include <Parsers/ASTCreateQuery.h> -#include <Parsers/ASTFunction.h> -#include <Parsers/ASTLiteral.h> -#include <Parsers/queryToString.h> -#include <Storages/NamedCollectionsHelpers.h> -#include <Common/logger_useful.h> -#include <Common/Macros.h> -#include <Common/filesystemHelpers.h> - -#include "clickhouse_config.h" - -#if USE_MYSQL -# include <Core/MySQL/MySQLClient.h> -# include <Databases/MySQL/DatabaseMySQL.h> -# include <Databases/MySQL/MaterializedMySQLSettings.h> -# include <Storages/MySQL/MySQLHelpers.h> -# include <Storages/MySQL/MySQLSettings.h> -# include <Storages/StorageMySQL.h> -# include <Databases/MySQL/DatabaseMaterializedMySQL.h> -# error #include <mysqlxx/Pool.h> -#endif - -#if USE_MYSQL || USE_LIBPQXX -#include <Common/parseRemoteDescription.h> -#include <Common/parseAddress.h> -#endif - -#if USE_LIBPQXX -#error #include <Databases/PostgreSQL/DatabasePostgreSQL.h> -#error #include <Databases/PostgreSQL/DatabaseMaterializedPostgreSQL.h> -#error #include <Storages/PostgreSQL/MaterializedPostgreSQLSettings.h> -#include <Storages/StoragePostgreSQL.h> -#endif - -#if USE_SQLITE -#error #include <Databases/SQLite/DatabaseSQLite.h> -#endif - -#if USE_AWS_S3 -#include <Databases/DatabaseS3.h> -#endif - -#if USE_HDFS -#include <Databases/DatabaseHDFS.h> -#endif - -namespace fs = std::filesystem; - -namespace DB -{ - -namespace ErrorCodes -{ - extern const int UNKNOWN_ELEMENT_IN_AST; - extern const int BAD_ARGUMENTS; - extern const int UNKNOWN_DATABASE_ENGINE; - extern const int CANNOT_CREATE_DATABASE; - extern const int NOT_IMPLEMENTED; -} - -void cckMetadataPathForOrdinary(const ASTCreateQuery & create, const String & metadata_path) -{ - const String & engine_name = create.storage->engine->name; - const String & database_name = create.getDatabase(); - - if (engine_name != "Ordinary") - return; - - if (!FS::isSymlink(metadata_path)) - return; - - String target_path = FS::readSymlink(metadata_path).string(); - fs::path path_to_remove = metadata_path; - if (path_to_remove.filename().empty()) - path_to_remove = path_to_remove.parent_path(); - - /// Before 20.7 metadata/db_name.sql file might absent and Ordinary database was attached if there's metadata/db_name/ dir. - /// Between 20.7 and 22.7 metadata/db_name.sql was created in this case as well. - /// Since 20.7 `default` database is created with Atomic engine on the very first server run. - /// The problem is that if server crashed during the very first run and metadata/db_name/ -> store/whatever symlink was created - /// then it's considered as Ordinary database. And it even works somehow - /// until background task tries to remove unused dir from store/... - throw Exception(ErrorCodes::CANNOT_CREATE_DATABASE, - "Metadata directory {} for Ordinary database {} is a symbolic link to {}. " - "It may be a result of manual intervention, crash on very first server start or a bug. " - "Database cannot be attached (it's kind of protection from potential data loss). " - "Metadata directory must not be a symlink and must contain tables metadata files itself. " - "You have to resolve this manually. It can be done like this: rm {}; sudo -u clickhouse mv {} {};", - metadata_path, database_name, target_path, - quoteString(path_to_remove.string()), quoteString(target_path), quoteString(path_to_remove.string())); - -} - -DatabasePtr DatabaseFactory::get(const ASTCreateQuery & create, const String & metadata_path, ContextPtr context) -{ - cckMetadataPathForOrdinary(create, metadata_path); - - DatabasePtr impl = getImpl(create, metadata_path, context); - - if (impl && context->hasQueryContext() && context->getSettingsRef().log_queries) - context->getQueryContext()->addQueryFactoriesInfo(Context::QueryLogFactories::Database, impl->getEngineName()); - - /// Attach database metadata - if (impl && create.comment) - impl->setDatabaseComment(create.comment->as<ASTLiteral>()->value.safeGet<String>()); - - return impl; -} - -template <typename ValueType> -static inline ValueType safeGetLiteralValue(const ASTPtr &ast, const String &engine_name) -{ - if (!ast || !ast->as<ASTLiteral>()) - throw Exception(ErrorCodes::BAD_ARGUMENTS, "Database engine {} requested literal argument.", engine_name); - - return ast->as<ASTLiteral>()->value.safeGet<ValueType>(); -} - -DatabasePtr DatabaseFactory::getImpl(const ASTCreateQuery & create, const String & metadata_path, ContextPtr context) -{ - auto * engine_define = create.storage; - const String & database_name = create.getDatabase(); - const String & engine_name = engine_define->engine->name; - const UUID & uuid = create.uuid; - - static const std::unordered_set<std::string_view> database_engines{"Ordinary", "Atomic", "Memory", - "Dictionary", "Lazy", "Replicated", "MySQL", "MaterializeMySQL", "MaterializedMySQL", - "PostgreSQL", "MaterializedPostgreSQL", "SQLite", "Filesystem", "S3", "HDFS"}; - - if (!database_engines.contains(engine_name)) - throw Exception(ErrorCodes::BAD_ARGUMENTS, "Database engine name `{}` does not exist", engine_name); - - static const std::unordered_set<std::string_view> engines_with_arguments{"MySQL", "MaterializeMySQL", "MaterializedMySQL", - "Lazy", "Replicated", "PostgreSQL", "MaterializedPostgreSQL", "SQLite", "Filesystem", "S3", "HDFS"}; - - static const std::unordered_set<std::string_view> engines_with_table_overrides{"MaterializeMySQL", "MaterializedMySQL", "MaterializedPostgreSQL"}; - bool engine_may_have_arguments = engines_with_arguments.contains(engine_name); - - if (engine_define->engine->arguments && !engine_may_have_arguments) - throw Exception(ErrorCodes::BAD_ARGUMENTS, "Database engine `{}` cannot have arguments", engine_name); - - bool has_unexpected_element = engine_define->engine->parameters || engine_define->partition_by || - engine_define->primary_key || engine_define->order_by || - engine_define->sample_by; - bool may_have_settings = endsWith(engine_name, "MySQL") || engine_name == "Replicated" || engine_name == "MaterializedPostgreSQL"; - - if (has_unexpected_element || (!may_have_settings && engine_define->settings)) - throw Exception(ErrorCodes::UNKNOWN_ELEMENT_IN_AST, - "Database engine `{}` cannot have parameters, primary_key, order_by, sample_by, settings", engine_name); - - if (create.table_overrides && !engines_with_table_overrides.contains(engine_name)) - throw Exception(ErrorCodes::BAD_ARGUMENTS, "Database engine `{}` cannot have table overrides", engine_name); - - if (engine_name == "Ordinary") - { - if (!create.attach && !context->getSettingsRef().allow_deprecated_database_ordinary) - throw Exception(ErrorCodes::UNKNOWN_DATABASE_ENGINE, - "Ordinary database engine is deprecated (see also allow_deprecated_database_ordinary setting)"); - - return std::make_shared<DatabaseOrdinary>(database_name, metadata_path, context); - } - - if (engine_name == "Atomic") - return std::make_shared<DatabaseAtomic>(database_name, metadata_path, uuid, context); - else if (engine_name == "Memory") - return std::make_shared<DatabaseMemory>(database_name, context); - else if (engine_name == "Dictionary") - return std::make_shared<DatabaseDictionary>(database_name, context); - -#if USE_MYSQL - - else if (engine_name == "MySQL" || engine_name == "MaterializeMySQL" || engine_name == "MaterializedMySQL") - { - const ASTFunction * engine = engine_define->engine; - if (!engine->arguments) - throw Exception(ErrorCodes::BAD_ARGUMENTS, "Engine `{}` must have arguments", engine_name); - - StorageMySQL::Configuration configuration; - ASTs & arguments = engine->arguments->children; - auto mysql_settings = std::make_unique<MySQLSettings>(); - - if (auto named_collection = tryGetNamedCollectionWithOverrides(arguments, context)) - { - configuration = StorageMySQL::processNamedCollectionResult(*named_collection, *mysql_settings, context, false); - } - else - { - if (arguments.size() != 4) - throw Exception(ErrorCodes::BAD_ARGUMENTS, - "MySQL database require mysql_hostname, mysql_database_name, mysql_username, mysql_password arguments."); - - - arguments[1] = evaluateConstantExpressionOrIdentifierAsLiteral(arguments[1], context); - const auto & host_port = safeGetLiteralValue<String>(arguments[0], engine_name); - - if (engine_name == "MySQL") - { - size_t max_addresses = context->getSettingsRef().glob_expansion_max_elements; - configuration.addresses = parseRemoteDescriptionForExternalDatabase(host_port, max_addresses, 3306); - } - else - { - const auto & [remote_host, remote_port] = parseAddress(host_port, 3306); - configuration.host = remote_host; - configuration.port = remote_port; - } - - configuration.database = safeGetLiteralValue<String>(arguments[1], engine_name); - configuration.username = safeGetLiteralValue<String>(arguments[2], engine_name); - configuration.password = safeGetLiteralValue<String>(arguments[3], engine_name); - } - - try - { - if (engine_name == "MySQL") - { - mysql_settings->loadFromQueryContext(context, *engine_define); - if (engine_define->settings) - mysql_settings->loadFromQuery(*engine_define); - - auto mysql_pool = createMySQLPoolWithFailover(configuration, *mysql_settings); - - return std::make_shared<DatabaseMySQL>( - context, database_name, metadata_path, engine_define, configuration.database, - std::move(mysql_settings), std::move(mysql_pool), create.attach); - } - - MySQLClient client(configuration.host, configuration.port, configuration.username, configuration.password); - auto mysql_pool = mysqlxx::Pool(configuration.database, configuration.host, configuration.username, configuration.password, configuration.port); - - auto materialize_mode_settings = std::make_unique<MaterializedMySQLSettings>(); - - if (engine_define->settings) - materialize_mode_settings->loadFromQuery(*engine_define); - - if (uuid == UUIDHelpers::Nil) - { - auto print_create_ast = create.clone(); - print_create_ast->as<ASTCreateQuery>()->attach = false; - throw Exception(ErrorCodes::NOT_IMPLEMENTED, - "The MaterializedMySQL database engine no longer supports Ordinary databases. To re-create the database, delete " - "the old one by executing \"rm -rf {}{{,.sql}}\", then re-create the database with the following query: {}", - metadata_path, - queryToString(print_create_ast)); - } - - return std::make_shared<DatabaseMaterializedMySQL>( - context, database_name, metadata_path, uuid, configuration.database, std::move(mysql_pool), - std::move(client), std::move(materialize_mode_settings)); - } - catch (...) - { - const auto & exception_message = getCurrentExceptionMessage(true); - throw Exception(ErrorCodes::CANNOT_CREATE_DATABASE, "Cannot create MySQL database, because {}", exception_message); - } - } -#endif - - else if (engine_name == "Lazy") - { - const ASTFunction * engine = engine_define->engine; - - if (!engine->arguments || engine->arguments->children.size() != 1) - throw Exception(ErrorCodes::BAD_ARGUMENTS, "Lazy database require cache_expiration_time_seconds argument"); - - const auto & arguments = engine->arguments->children; - - const auto cache_expiration_time_seconds = safeGetLiteralValue<UInt64>(arguments[0], "Lazy"); - return std::make_shared<DatabaseLazy>(database_name, metadata_path, cache_expiration_time_seconds, context); - } - - else if (engine_name == "Replicated") - { - const ASTFunction * engine = engine_define->engine; - - if (!engine->arguments || engine->arguments->children.size() != 3) - throw Exception(ErrorCodes::BAD_ARGUMENTS, "Replicated database requires 3 arguments: zookeeper path, shard name and replica name"); - - auto & arguments = engine->arguments->children; - for (auto & engine_arg : arguments) - engine_arg = evaluateConstantExpressionOrIdentifierAsLiteral(engine_arg, context); - - String zookeeper_path = safeGetLiteralValue<String>(arguments[0], "Replicated"); - String shard_name = safeGetLiteralValue<String>(arguments[1], "Replicated"); - String replica_name = safeGetLiteralValue<String>(arguments[2], "Replicated"); - - zookeeper_path = context->getMacros()->expand(zookeeper_path); - shard_name = context->getMacros()->expand(shard_name); - replica_name = context->getMacros()->expand(replica_name); - - DatabaseReplicatedSettings database_replicated_settings{}; - if (engine_define->settings) - database_replicated_settings.loadFromQuery(*engine_define); - - return std::make_shared<DatabaseReplicated>(database_name, metadata_path, uuid, - zookeeper_path, shard_name, replica_name, - std::move(database_replicated_settings), context); - } - -#if USE_LIBPQXX - - else if (engine_name == "PostgreSQL") - { - const ASTFunction * engine = engine_define->engine; - if (!engine->arguments) - throw Exception(ErrorCodes::BAD_ARGUMENTS, "Engine `{}` must have arguments", engine_name); - - ASTs & engine_args = engine->arguments->children; - auto use_table_cache = false; - StoragePostgreSQL::Configuration configuration; - - if (auto named_collection = tryGetNamedCollectionWithOverrides(engine_args, context)) - { - configuration = StoragePostgreSQL::processNamedCollectionResult(*named_collection, false); - use_table_cache = named_collection->getOrDefault<UInt64>("use_table_cache", 0); - } - else - { - if (engine_args.size() < 4 || engine_args.size() > 6) - throw Exception(ErrorCodes::BAD_ARGUMENTS, - "PostgreSQL Database require `host:port`, `database_name`, `username`, `password`" - "[, `schema` = "", `use_table_cache` = 0"); - - for (auto & engine_arg : engine_args) - engine_arg = evaluateConstantExpressionOrIdentifierAsLiteral(engine_arg, context); - - const auto & host_port = safeGetLiteralValue<String>(engine_args[0], engine_name); - size_t max_addresses = context->getSettingsRef().glob_expansion_max_elements; - - configuration.addresses = parseRemoteDescriptionForExternalDatabase(host_port, max_addresses, 5432); - configuration.database = safeGetLiteralValue<String>(engine_args[1], engine_name); - configuration.username = safeGetLiteralValue<String>(engine_args[2], engine_name); - configuration.password = safeGetLiteralValue<String>(engine_args[3], engine_name); - - bool is_deprecated_syntax = false; - if (engine_args.size() >= 5) - { - auto arg_value = engine_args[4]->as<ASTLiteral>()->value; - if (arg_value.getType() == Field::Types::Which::String) - { - configuration.schema = safeGetLiteralValue<String>(engine_args[4], engine_name); - } - else - { - use_table_cache = safeGetLiteralValue<UInt8>(engine_args[4], engine_name); - LOG_WARNING(&Poco::Logger::get("DatabaseFactory"), "A deprecated syntax of PostgreSQL database engine is used"); - is_deprecated_syntax = true; - } - } - - if (!is_deprecated_syntax && engine_args.size() >= 6) - use_table_cache = safeGetLiteralValue<UInt8>(engine_args[5], engine_name); - } - - const auto & settings = context->getSettingsRef(); - auto pool = std::make_shared<postgres::PoolWithFailover>( - configuration, - settings.postgresql_connection_pool_size, - settings.postgresql_connection_pool_wait_timeout, - POSTGRESQL_POOL_WITH_FAILOVER_DEFAULT_MAX_TRIES, - settings.postgresql_connection_pool_auto_close_connection); - - return std::make_shared<DatabasePostgreSQL>( - context, metadata_path, engine_define, database_name, configuration, pool, use_table_cache); - } - else if (engine_name == "MaterializedPostgreSQL") - { - const ASTFunction * engine = engine_define->engine; - if (!engine->arguments) - throw Exception(ErrorCodes::BAD_ARGUMENTS, "Engine `{}` must have arguments", engine_name); - - ASTs & engine_args = engine->arguments->children; - StoragePostgreSQL::Configuration configuration; - - if (auto named_collection = tryGetNamedCollectionWithOverrides(engine_args, context)) - { - configuration = StoragePostgreSQL::processNamedCollectionResult(*named_collection, false); - } - else - { - if (engine_args.size() != 4) - throw Exception(ErrorCodes::BAD_ARGUMENTS, - "MaterializedPostgreSQL Database require `host:port`, `database_name`, `username`, `password`."); - - for (auto & engine_arg : engine_args) - engine_arg = evaluateConstantExpressionOrIdentifierAsLiteral(engine_arg, context); - - auto parsed_host_port = parseAddress(safeGetLiteralValue<String>(engine_args[0], engine_name), 5432); - - configuration.host = parsed_host_port.first; - configuration.port = parsed_host_port.second; - configuration.database = safeGetLiteralValue<String>(engine_args[1], engine_name); - configuration.username = safeGetLiteralValue<String>(engine_args[2], engine_name); - configuration.password = safeGetLiteralValue<String>(engine_args[3], engine_name); - } - - auto connection_info = postgres::formatConnectionString( - configuration.database, configuration.host, configuration.port, configuration.username, configuration.password); - - auto postgresql_replica_settings = std::make_unique<MaterializedPostgreSQLSettings>(); - if (engine_define->settings) - postgresql_replica_settings->loadFromQuery(*engine_define); - - return std::make_shared<DatabaseMaterializedPostgreSQL>( - context, metadata_path, uuid, create.attach, - database_name, configuration.database, connection_info, - std::move(postgresql_replica_settings)); - } - - -#endif - -#if USE_SQLITE - else if (engine_name == "SQLite") - { - const ASTFunction * engine = engine_define->engine; - - if (!engine->arguments || engine->arguments->children.size() != 1) - throw Exception(ErrorCodes::BAD_ARGUMENTS, "SQLite database requires 1 argument: database path"); - - const auto & arguments = engine->arguments->children; - - String database_path = safeGetLiteralValue<String>(arguments[0], "SQLite"); - - return std::make_shared<DatabaseSQLite>(context, engine_define, create.attach, database_path); - } -#endif - - else if (engine_name == "Filesystem") - { - const ASTFunction * engine = engine_define->engine; - - /// If init_path is empty, then the current path will be used - std::string init_path; - - if (engine->arguments && !engine->arguments->children.empty()) - { - if (engine->arguments->children.size() != 1) - throw Exception(ErrorCodes::BAD_ARGUMENTS, "Filesystem database requires at most 1 argument: filesystem_path"); - - const auto & arguments = engine->arguments->children; - init_path = safeGetLiteralValue<String>(arguments[0], engine_name); - } - - return std::make_shared<DatabaseFilesystem>(database_name, init_path, context); - } - -#if USE_AWS_S3 - else if (engine_name == "S3") - { - const ASTFunction * engine = engine_define->engine; - - DatabaseS3::Configuration config; - - if (engine->arguments && !engine->arguments->children.empty()) - { - ASTs & engine_args = engine->arguments->children; - config = DatabaseS3::parseArguments(engine_args, context); - } - - return std::make_shared<DatabaseS3>(database_name, config, context); - } -#endif - -#if USE_HDFS - else if (engine_name == "HDFS") - { - const ASTFunction * engine = engine_define->engine; - - /// If source_url is empty, then table name must contain full url - std::string source_url; - - if (engine->arguments && !engine->arguments->children.empty()) - { - if (engine->arguments->children.size() != 1) - throw Exception(ErrorCodes::BAD_ARGUMENTS, "HDFS database requires at most 1 argument: source_url"); - - const auto & arguments = engine->arguments->children; - source_url = safeGetLiteralValue<String>(arguments[0], engine_name); - } - - return std::make_shared<DatabaseHDFS>(database_name, source_url, context); - } -#endif - - throw Exception(ErrorCodes::UNKNOWN_DATABASE_ENGINE, "Unknown database engine: {}", engine_name); -} - -} |
