diff options
| author | vitalyisaev <[email protected]> | 2023-11-14 09:58:56 +0300 |
|---|---|---|
| committer | vitalyisaev <[email protected]> | 2023-11-14 10:20:20 +0300 |
| commit | c2b2dfd9827a400a8495e172a56343462e3ceb82 (patch) | |
| tree | cd4e4f597d01bede4c82dffeb2d780d0a9046bd0 /contrib/clickhouse/src/Databases/DatabaseFactory.cpp | |
| parent | d4ae8f119e67808cb0cf776ba6e0cf95296f2df7 (diff) | |
YQ Connector: move tests from yql to ydb (OSS)
Перенос папки с тестами на Коннектор из папки yql в папку ydb (синхронизируется с github).
Diffstat (limited to 'contrib/clickhouse/src/Databases/DatabaseFactory.cpp')
| -rw-r--r-- | contrib/clickhouse/src/Databases/DatabaseFactory.cpp | 501 |
1 files changed, 501 insertions, 0 deletions
diff --git a/contrib/clickhouse/src/Databases/DatabaseFactory.cpp b/contrib/clickhouse/src/Databases/DatabaseFactory.cpp new file mode 100644 index 00000000000..fb7d46cd441 --- /dev/null +++ b/contrib/clickhouse/src/Databases/DatabaseFactory.cpp @@ -0,0 +1,501 @@ +#include <Databases/DatabaseFactory.h> + +#include <filesystem> +#include <Databases/DatabaseAtomic.h> +#include <Databases/DatabaseDictionary.h> +#include <Databases/DatabaseFilesystem.h> +#include <Databases/DatabaseLazy.h> +#include <Databases/DatabaseMemory.h> +#include <Databases/DatabaseOrdinary.h> +#include <Databases/DatabaseReplicated.h> +#include <Interpreters/Context.h> +#include <Interpreters/evaluateConstantExpression.h> +#include <Parsers/ASTCreateQuery.h> +#include <Parsers/ASTFunction.h> +#include <Parsers/ASTLiteral.h> +#include <Parsers/queryToString.h> +#include <Storages/NamedCollectionsHelpers.h> +#include <Common/logger_useful.h> +#include <Common/Macros.h> +#include <Common/filesystemHelpers.h> + +#include "clickhouse_config.h" + +#if USE_MYSQL +# include <Core/MySQL/MySQLClient.h> +# include <Databases/MySQL/DatabaseMySQL.h> +# include <Databases/MySQL/MaterializedMySQLSettings.h> +# include <Storages/MySQL/MySQLHelpers.h> +# include <Storages/MySQL/MySQLSettings.h> +# include <Storages/StorageMySQL.h> +# include <Databases/MySQL/DatabaseMaterializedMySQL.h> +# error #include <mysqlxx/Pool.h> +#endif + +#if USE_MYSQL || USE_LIBPQXX +#include <Common/parseRemoteDescription.h> +#include <Common/parseAddress.h> +#endif + +#if USE_LIBPQXX +#error #include <Databases/PostgreSQL/DatabasePostgreSQL.h> +#error #include <Databases/PostgreSQL/DatabaseMaterializedPostgreSQL.h> +#error #include <Storages/PostgreSQL/MaterializedPostgreSQLSettings.h> +#include <Storages/StoragePostgreSQL.h> +#endif + +#if USE_SQLITE +#error #include <Databases/SQLite/DatabaseSQLite.h> +#endif + +#if USE_AWS_S3 +#include <Databases/DatabaseS3.h> +#endif + +#if USE_HDFS +#include <Databases/DatabaseHDFS.h> +#endif + +namespace fs = std::filesystem; + +namespace DB +{ + +namespace ErrorCodes +{ + extern const int UNKNOWN_ELEMENT_IN_AST; + extern const int BAD_ARGUMENTS; + extern const int UNKNOWN_DATABASE_ENGINE; + extern const int CANNOT_CREATE_DATABASE; + extern const int NOT_IMPLEMENTED; +} + +void cckMetadataPathForOrdinary(const ASTCreateQuery & create, const String & metadata_path) +{ + const String & engine_name = create.storage->engine->name; + const String & database_name = create.getDatabase(); + + if (engine_name != "Ordinary") + return; + + if (!FS::isSymlink(metadata_path)) + return; + + String target_path = FS::readSymlink(metadata_path).string(); + fs::path path_to_remove = metadata_path; + if (path_to_remove.filename().empty()) + path_to_remove = path_to_remove.parent_path(); + + /// Before 20.7 metadata/db_name.sql file might absent and Ordinary database was attached if there's metadata/db_name/ dir. + /// Between 20.7 and 22.7 metadata/db_name.sql was created in this case as well. + /// Since 20.7 `default` database is created with Atomic engine on the very first server run. + /// The problem is that if server crashed during the very first run and metadata/db_name/ -> store/whatever symlink was created + /// then it's considered as Ordinary database. And it even works somehow + /// until background task tries to remove unused dir from store/... + throw Exception(ErrorCodes::CANNOT_CREATE_DATABASE, + "Metadata directory {} for Ordinary database {} is a symbolic link to {}. " + "It may be a result of manual intervention, crash on very first server start or a bug. " + "Database cannot be attached (it's kind of protection from potential data loss). " + "Metadata directory must not be a symlink and must contain tables metadata files itself. " + "You have to resolve this manually. It can be done like this: rm {}; sudo -u clickhouse mv {} {};", + metadata_path, database_name, target_path, + quoteString(path_to_remove.string()), quoteString(target_path), quoteString(path_to_remove.string())); + +} + +DatabasePtr DatabaseFactory::get(const ASTCreateQuery & create, const String & metadata_path, ContextPtr context) +{ + cckMetadataPathForOrdinary(create, metadata_path); + + DatabasePtr impl = getImpl(create, metadata_path, context); + + if (impl && context->hasQueryContext() && context->getSettingsRef().log_queries) + context->getQueryContext()->addQueryFactoriesInfo(Context::QueryLogFactories::Database, impl->getEngineName()); + + /// Attach database metadata + if (impl && create.comment) + impl->setDatabaseComment(create.comment->as<ASTLiteral>()->value.safeGet<String>()); + + return impl; +} + +template <typename ValueType> +static inline ValueType safeGetLiteralValue(const ASTPtr &ast, const String &engine_name) +{ + if (!ast || !ast->as<ASTLiteral>()) + throw Exception(ErrorCodes::BAD_ARGUMENTS, "Database engine {} requested literal argument.", engine_name); + + return ast->as<ASTLiteral>()->value.safeGet<ValueType>(); +} + +DatabasePtr DatabaseFactory::getImpl(const ASTCreateQuery & create, const String & metadata_path, ContextPtr context) +{ + auto * engine_define = create.storage; + const String & database_name = create.getDatabase(); + const String & engine_name = engine_define->engine->name; + const UUID & uuid = create.uuid; + + static const std::unordered_set<std::string_view> database_engines{"Ordinary", "Atomic", "Memory", + "Dictionary", "Lazy", "Replicated", "MySQL", "MaterializeMySQL", "MaterializedMySQL", + "PostgreSQL", "MaterializedPostgreSQL", "SQLite", "Filesystem", "S3", "HDFS"}; + + if (!database_engines.contains(engine_name)) + throw Exception(ErrorCodes::BAD_ARGUMENTS, "Database engine name `{}` does not exist", engine_name); + + static const std::unordered_set<std::string_view> engines_with_arguments{"MySQL", "MaterializeMySQL", "MaterializedMySQL", + "Lazy", "Replicated", "PostgreSQL", "MaterializedPostgreSQL", "SQLite", "Filesystem", "S3", "HDFS"}; + + static const std::unordered_set<std::string_view> engines_with_table_overrides{"MaterializeMySQL", "MaterializedMySQL", "MaterializedPostgreSQL"}; + bool engine_may_have_arguments = engines_with_arguments.contains(engine_name); + + if (engine_define->engine->arguments && !engine_may_have_arguments) + throw Exception(ErrorCodes::BAD_ARGUMENTS, "Database engine `{}` cannot have arguments", engine_name); + + bool has_unexpected_element = engine_define->engine->parameters || engine_define->partition_by || + engine_define->primary_key || engine_define->order_by || + engine_define->sample_by; + bool may_have_settings = endsWith(engine_name, "MySQL") || engine_name == "Replicated" || engine_name == "MaterializedPostgreSQL"; + + if (has_unexpected_element || (!may_have_settings && engine_define->settings)) + throw Exception(ErrorCodes::UNKNOWN_ELEMENT_IN_AST, + "Database engine `{}` cannot have parameters, primary_key, order_by, sample_by, settings", engine_name); + + if (create.table_overrides && !engines_with_table_overrides.contains(engine_name)) + throw Exception(ErrorCodes::BAD_ARGUMENTS, "Database engine `{}` cannot have table overrides", engine_name); + + if (engine_name == "Ordinary") + { + if (!create.attach && !context->getSettingsRef().allow_deprecated_database_ordinary) + throw Exception(ErrorCodes::UNKNOWN_DATABASE_ENGINE, + "Ordinary database engine is deprecated (see also allow_deprecated_database_ordinary setting)"); + + return std::make_shared<DatabaseOrdinary>(database_name, metadata_path, context); + } + + if (engine_name == "Atomic") + return std::make_shared<DatabaseAtomic>(database_name, metadata_path, uuid, context); + else if (engine_name == "Memory") + return std::make_shared<DatabaseMemory>(database_name, context); + else if (engine_name == "Dictionary") + return std::make_shared<DatabaseDictionary>(database_name, context); + +#if USE_MYSQL + + else if (engine_name == "MySQL" || engine_name == "MaterializeMySQL" || engine_name == "MaterializedMySQL") + { + const ASTFunction * engine = engine_define->engine; + if (!engine->arguments) + throw Exception(ErrorCodes::BAD_ARGUMENTS, "Engine `{}` must have arguments", engine_name); + + StorageMySQL::Configuration configuration; + ASTs & arguments = engine->arguments->children; + auto mysql_settings = std::make_unique<MySQLSettings>(); + + if (auto named_collection = tryGetNamedCollectionWithOverrides(arguments, context)) + { + configuration = StorageMySQL::processNamedCollectionResult(*named_collection, *mysql_settings, context, false); + } + else + { + if (arguments.size() != 4) + throw Exception(ErrorCodes::BAD_ARGUMENTS, + "MySQL database require mysql_hostname, mysql_database_name, mysql_username, mysql_password arguments."); + + + arguments[1] = evaluateConstantExpressionOrIdentifierAsLiteral(arguments[1], context); + const auto & host_port = safeGetLiteralValue<String>(arguments[0], engine_name); + + if (engine_name == "MySQL") + { + size_t max_addresses = context->getSettingsRef().glob_expansion_max_elements; + configuration.addresses = parseRemoteDescriptionForExternalDatabase(host_port, max_addresses, 3306); + } + else + { + const auto & [remote_host, remote_port] = parseAddress(host_port, 3306); + configuration.host = remote_host; + configuration.port = remote_port; + } + + configuration.database = safeGetLiteralValue<String>(arguments[1], engine_name); + configuration.username = safeGetLiteralValue<String>(arguments[2], engine_name); + configuration.password = safeGetLiteralValue<String>(arguments[3], engine_name); + } + + try + { + if (engine_name == "MySQL") + { + mysql_settings->loadFromQueryContext(context, *engine_define); + if (engine_define->settings) + mysql_settings->loadFromQuery(*engine_define); + + auto mysql_pool = createMySQLPoolWithFailover(configuration, *mysql_settings); + + return std::make_shared<DatabaseMySQL>( + context, database_name, metadata_path, engine_define, configuration.database, + std::move(mysql_settings), std::move(mysql_pool), create.attach); + } + + MySQLClient client(configuration.host, configuration.port, configuration.username, configuration.password); + auto mysql_pool = mysqlxx::Pool(configuration.database, configuration.host, configuration.username, configuration.password, configuration.port); + + auto materialize_mode_settings = std::make_unique<MaterializedMySQLSettings>(); + + if (engine_define->settings) + materialize_mode_settings->loadFromQuery(*engine_define); + + if (uuid == UUIDHelpers::Nil) + { + auto print_create_ast = create.clone(); + print_create_ast->as<ASTCreateQuery>()->attach = false; + throw Exception(ErrorCodes::NOT_IMPLEMENTED, + "The MaterializedMySQL database engine no longer supports Ordinary databases. To re-create the database, delete " + "the old one by executing \"rm -rf {}{{,.sql}}\", then re-create the database with the following query: {}", + metadata_path, + queryToString(print_create_ast)); + } + + return std::make_shared<DatabaseMaterializedMySQL>( + context, database_name, metadata_path, uuid, configuration.database, std::move(mysql_pool), + std::move(client), std::move(materialize_mode_settings)); + } + catch (...) + { + const auto & exception_message = getCurrentExceptionMessage(true); + throw Exception(ErrorCodes::CANNOT_CREATE_DATABASE, "Cannot create MySQL database, because {}", exception_message); + } + } +#endif + + else if (engine_name == "Lazy") + { + const ASTFunction * engine = engine_define->engine; + + if (!engine->arguments || engine->arguments->children.size() != 1) + throw Exception(ErrorCodes::BAD_ARGUMENTS, "Lazy database require cache_expiration_time_seconds argument"); + + const auto & arguments = engine->arguments->children; + + const auto cache_expiration_time_seconds = safeGetLiteralValue<UInt64>(arguments[0], "Lazy"); + return std::make_shared<DatabaseLazy>(database_name, metadata_path, cache_expiration_time_seconds, context); + } + + else if (engine_name == "Replicated") + { + const ASTFunction * engine = engine_define->engine; + + if (!engine->arguments || engine->arguments->children.size() != 3) + throw Exception(ErrorCodes::BAD_ARGUMENTS, "Replicated database requires 3 arguments: zookeeper path, shard name and replica name"); + + auto & arguments = engine->arguments->children; + for (auto & engine_arg : arguments) + engine_arg = evaluateConstantExpressionOrIdentifierAsLiteral(engine_arg, context); + + String zookeeper_path = safeGetLiteralValue<String>(arguments[0], "Replicated"); + String shard_name = safeGetLiteralValue<String>(arguments[1], "Replicated"); + String replica_name = safeGetLiteralValue<String>(arguments[2], "Replicated"); + + zookeeper_path = context->getMacros()->expand(zookeeper_path); + shard_name = context->getMacros()->expand(shard_name); + replica_name = context->getMacros()->expand(replica_name); + + DatabaseReplicatedSettings database_replicated_settings{}; + if (engine_define->settings) + database_replicated_settings.loadFromQuery(*engine_define); + + return std::make_shared<DatabaseReplicated>(database_name, metadata_path, uuid, + zookeeper_path, shard_name, replica_name, + std::move(database_replicated_settings), context); + } + +#if USE_LIBPQXX + + else if (engine_name == "PostgreSQL") + { + const ASTFunction * engine = engine_define->engine; + if (!engine->arguments) + throw Exception(ErrorCodes::BAD_ARGUMENTS, "Engine `{}` must have arguments", engine_name); + + ASTs & engine_args = engine->arguments->children; + auto use_table_cache = false; + StoragePostgreSQL::Configuration configuration; + + if (auto named_collection = tryGetNamedCollectionWithOverrides(engine_args, context)) + { + configuration = StoragePostgreSQL::processNamedCollectionResult(*named_collection, false); + use_table_cache = named_collection->getOrDefault<UInt64>("use_table_cache", 0); + } + else + { + if (engine_args.size() < 4 || engine_args.size() > 6) + throw Exception(ErrorCodes::BAD_ARGUMENTS, + "PostgreSQL Database require `host:port`, `database_name`, `username`, `password`" + "[, `schema` = "", `use_table_cache` = 0"); + + for (auto & engine_arg : engine_args) + engine_arg = evaluateConstantExpressionOrIdentifierAsLiteral(engine_arg, context); + + const auto & host_port = safeGetLiteralValue<String>(engine_args[0], engine_name); + size_t max_addresses = context->getSettingsRef().glob_expansion_max_elements; + + configuration.addresses = parseRemoteDescriptionForExternalDatabase(host_port, max_addresses, 5432); + configuration.database = safeGetLiteralValue<String>(engine_args[1], engine_name); + configuration.username = safeGetLiteralValue<String>(engine_args[2], engine_name); + configuration.password = safeGetLiteralValue<String>(engine_args[3], engine_name); + + bool is_deprecated_syntax = false; + if (engine_args.size() >= 5) + { + auto arg_value = engine_args[4]->as<ASTLiteral>()->value; + if (arg_value.getType() == Field::Types::Which::String) + { + configuration.schema = safeGetLiteralValue<String>(engine_args[4], engine_name); + } + else + { + use_table_cache = safeGetLiteralValue<UInt8>(engine_args[4], engine_name); + LOG_WARNING(&Poco::Logger::get("DatabaseFactory"), "A deprecated syntax of PostgreSQL database engine is used"); + is_deprecated_syntax = true; + } + } + + if (!is_deprecated_syntax && engine_args.size() >= 6) + use_table_cache = safeGetLiteralValue<UInt8>(engine_args[5], engine_name); + } + + const auto & settings = context->getSettingsRef(); + auto pool = std::make_shared<postgres::PoolWithFailover>( + configuration, + settings.postgresql_connection_pool_size, + settings.postgresql_connection_pool_wait_timeout, + POSTGRESQL_POOL_WITH_FAILOVER_DEFAULT_MAX_TRIES, + settings.postgresql_connection_pool_auto_close_connection); + + return std::make_shared<DatabasePostgreSQL>( + context, metadata_path, engine_define, database_name, configuration, pool, use_table_cache); + } + else if (engine_name == "MaterializedPostgreSQL") + { + const ASTFunction * engine = engine_define->engine; + if (!engine->arguments) + throw Exception(ErrorCodes::BAD_ARGUMENTS, "Engine `{}` must have arguments", engine_name); + + ASTs & engine_args = engine->arguments->children; + StoragePostgreSQL::Configuration configuration; + + if (auto named_collection = tryGetNamedCollectionWithOverrides(engine_args, context)) + { + configuration = StoragePostgreSQL::processNamedCollectionResult(*named_collection, false); + } + else + { + if (engine_args.size() != 4) + throw Exception(ErrorCodes::BAD_ARGUMENTS, + "MaterializedPostgreSQL Database require `host:port`, `database_name`, `username`, `password`."); + + for (auto & engine_arg : engine_args) + engine_arg = evaluateConstantExpressionOrIdentifierAsLiteral(engine_arg, context); + + auto parsed_host_port = parseAddress(safeGetLiteralValue<String>(engine_args[0], engine_name), 5432); + + configuration.host = parsed_host_port.first; + configuration.port = parsed_host_port.second; + configuration.database = safeGetLiteralValue<String>(engine_args[1], engine_name); + configuration.username = safeGetLiteralValue<String>(engine_args[2], engine_name); + configuration.password = safeGetLiteralValue<String>(engine_args[3], engine_name); + } + + auto connection_info = postgres::formatConnectionString( + configuration.database, configuration.host, configuration.port, configuration.username, configuration.password); + + auto postgresql_replica_settings = std::make_unique<MaterializedPostgreSQLSettings>(); + if (engine_define->settings) + postgresql_replica_settings->loadFromQuery(*engine_define); + + return std::make_shared<DatabaseMaterializedPostgreSQL>( + context, metadata_path, uuid, create.attach, + database_name, configuration.database, connection_info, + std::move(postgresql_replica_settings)); + } + + +#endif + +#if USE_SQLITE + else if (engine_name == "SQLite") + { + const ASTFunction * engine = engine_define->engine; + + if (!engine->arguments || engine->arguments->children.size() != 1) + throw Exception(ErrorCodes::BAD_ARGUMENTS, "SQLite database requires 1 argument: database path"); + + const auto & arguments = engine->arguments->children; + + String database_path = safeGetLiteralValue<String>(arguments[0], "SQLite"); + + return std::make_shared<DatabaseSQLite>(context, engine_define, create.attach, database_path); + } +#endif + + else if (engine_name == "Filesystem") + { + const ASTFunction * engine = engine_define->engine; + + /// If init_path is empty, then the current path will be used + std::string init_path; + + if (engine->arguments && !engine->arguments->children.empty()) + { + if (engine->arguments->children.size() != 1) + throw Exception(ErrorCodes::BAD_ARGUMENTS, "Filesystem database requires at most 1 argument: filesystem_path"); + + const auto & arguments = engine->arguments->children; + init_path = safeGetLiteralValue<String>(arguments[0], engine_name); + } + + return std::make_shared<DatabaseFilesystem>(database_name, init_path, context); + } + +#if USE_AWS_S3 + else if (engine_name == "S3") + { + const ASTFunction * engine = engine_define->engine; + + DatabaseS3::Configuration config; + + if (engine->arguments && !engine->arguments->children.empty()) + { + ASTs & engine_args = engine->arguments->children; + config = DatabaseS3::parseArguments(engine_args, context); + } + + return std::make_shared<DatabaseS3>(database_name, config, context); + } +#endif + +#if USE_HDFS + else if (engine_name == "HDFS") + { + const ASTFunction * engine = engine_define->engine; + + /// If source_url is empty, then table name must contain full url + std::string source_url; + + if (engine->arguments && !engine->arguments->children.empty()) + { + if (engine->arguments->children.size() != 1) + throw Exception(ErrorCodes::BAD_ARGUMENTS, "HDFS database requires at most 1 argument: source_url"); + + const auto & arguments = engine->arguments->children; + source_url = safeGetLiteralValue<String>(arguments[0], engine_name); + } + + return std::make_shared<DatabaseHDFS>(database_name, source_url, context); + } +#endif + + throw Exception(ErrorCodes::UNKNOWN_DATABASE_ENGINE, "Unknown database engine: {}", engine_name); +} + +} |
