summaryrefslogtreecommitdiffstats
path: root/contrib/clickhouse/src/Databases/MySQL/DatabaseMySQL.cpp
diff options
context:
space:
mode:
authorvitalyisaev <[email protected]>2023-11-14 09:58:56 +0300
committervitalyisaev <[email protected]>2023-11-14 10:20:20 +0300
commitc2b2dfd9827a400a8495e172a56343462e3ceb82 (patch)
treecd4e4f597d01bede4c82dffeb2d780d0a9046bd0 /contrib/clickhouse/src/Databases/MySQL/DatabaseMySQL.cpp
parentd4ae8f119e67808cb0cf776ba6e0cf95296f2df7 (diff)
YQ Connector: move tests from yql to ydb (OSS)
Перенос папки с тестами на Коннектор из папки yql в папку ydb (синхронизируется с github).
Diffstat (limited to 'contrib/clickhouse/src/Databases/MySQL/DatabaseMySQL.cpp')
-rw-r--r--contrib/clickhouse/src/Databases/MySQL/DatabaseMySQL.cpp510
1 files changed, 510 insertions, 0 deletions
diff --git a/contrib/clickhouse/src/Databases/MySQL/DatabaseMySQL.cpp b/contrib/clickhouse/src/Databases/MySQL/DatabaseMySQL.cpp
new file mode 100644
index 00000000000..282dae63a0f
--- /dev/null
+++ b/contrib/clickhouse/src/Databases/MySQL/DatabaseMySQL.cpp
@@ -0,0 +1,510 @@
+#include "clickhouse_config.h"
+
+#if USE_MYSQL
+# include <string>
+# include <DataTypes/DataTypeDateTime.h>
+# include <DataTypes/DataTypeNullable.h>
+# include <DataTypes/DataTypeString.h>
+# include <DataTypes/DataTypesNumber.h>
+# include <DataTypes/convertMySQLDataType.h>
+# include <Databases/MySQL/DatabaseMySQL.h>
+# include <Databases/MySQL/FetchTablesColumnsList.h>
+# error #include <Processors/Sources/MySQLSource.h>
+# include <Processors/Executors/PullingPipelineExecutor.h>
+# include <QueryPipeline/QueryPipelineBuilder.h>
+# include <IO/Operators.h>
+# include <Interpreters/Context.h>
+# include <Parsers/ASTCreateQuery.h>
+# include <Parsers/ASTFunction.h>
+# include <Parsers/ParserCreateQuery.h>
+# include <Parsers/parseQuery.h>
+# include <Parsers/queryToString.h>
+# include <Storages/StorageMySQL.h>
+# include <Storages/MySQL/MySQLSettings.h>
+# include <Common/escapeForFileName.h>
+# include <Common/parseAddress.h>
+# include <Common/setThreadName.h>
+# include <filesystem>
+# include <Common/filesystemHelpers.h>
+# include <Parsers/ASTIdentifier.h>
+
+namespace fs = std::filesystem;
+
+namespace DB
+{
+
+namespace ErrorCodes
+{
+ extern const int NOT_IMPLEMENTED;
+ extern const int LOGICAL_ERROR;
+ extern const int UNKNOWN_TABLE;
+ extern const int TABLE_IS_DROPPED;
+ extern const int TABLE_ALREADY_EXISTS;
+ extern const int UNEXPECTED_AST_STRUCTURE;
+}
+
+constexpr static const auto suffix = ".remove_flag";
+static constexpr const std::chrono::seconds cleaner_sleep_time{30};
+static const std::chrono::seconds lock_acquire_timeout{10};
+
+DatabaseMySQL::DatabaseMySQL(
+ ContextPtr context_,
+ const String & database_name_,
+ const String & metadata_path_,
+ const ASTStorage * database_engine_define_,
+ const String & database_name_in_mysql_,
+ std::unique_ptr<MySQLSettings> settings_,
+ mysqlxx::PoolWithFailover && pool,
+ bool attach)
+ : IDatabase(database_name_)
+ , WithContext(context_->getGlobalContext())
+ , metadata_path(metadata_path_)
+ , database_engine_define(database_engine_define_->clone())
+ , database_name_in_mysql(database_name_in_mysql_)
+ , mysql_settings(std::move(settings_))
+ , mysql_pool(std::move(pool)) /// NOLINT
+{
+ try
+ {
+ /// Test that the database is working fine; it will also fetch tables.
+ empty(); // NOLINT(bugprone-standalone-empty)
+ }
+ catch (...)
+ {
+ if (attach)
+ tryLogCurrentException("DatabaseMySQL");
+ else
+ throw;
+ }
+
+ fs::create_directories(metadata_path);
+
+ thread = ThreadFromGlobalPool{&DatabaseMySQL::cleanOutdatedTables, this};
+}
+
+bool DatabaseMySQL::empty() const
+{
+ std::lock_guard lock(mutex);
+
+ fetchTablesIntoLocalCache(getContext());
+
+ if (local_tables_cache.empty())
+ return true;
+
+ for (const auto & [table_name, storage_info] : local_tables_cache)
+ if (!remove_or_detach_tables.contains(table_name))
+ return false;
+
+ return true;
+}
+
+DatabaseTablesIteratorPtr DatabaseMySQL::getTablesIterator(ContextPtr local_context, const FilterByNameFunction & filter_by_table_name) const
+{
+ Tables tables;
+ std::lock_guard lock(mutex);
+
+ fetchTablesIntoLocalCache(local_context);
+
+ for (const auto & [table_name, modify_time_and_storage] : local_tables_cache)
+ if (!remove_or_detach_tables.contains(table_name) && (!filter_by_table_name || filter_by_table_name(table_name)))
+ tables[table_name] = modify_time_and_storage.second;
+
+ return std::make_unique<DatabaseTablesSnapshotIterator>(tables, database_name);
+}
+
+bool DatabaseMySQL::isTableExist(const String & name, ContextPtr local_context) const
+{
+ return bool(tryGetTable(name, local_context));
+}
+
+StoragePtr DatabaseMySQL::tryGetTable(const String & mysql_table_name, ContextPtr local_context) const
+{
+ std::lock_guard lock(mutex);
+
+ fetchTablesIntoLocalCache(local_context);
+
+ if (!remove_or_detach_tables.contains(mysql_table_name) && local_tables_cache.find(mysql_table_name) != local_tables_cache.end())
+ return local_tables_cache[mysql_table_name].second;
+
+ return StoragePtr{};
+}
+
+ASTPtr DatabaseMySQL::getCreateTableQueryImpl(const String & table_name, ContextPtr local_context, bool throw_on_error) const
+{
+ std::lock_guard lock(mutex);
+
+ fetchTablesIntoLocalCache(local_context);
+
+ if (local_tables_cache.find(table_name) == local_tables_cache.end())
+ {
+ if (throw_on_error)
+ throw Exception(ErrorCodes::UNKNOWN_TABLE, "MySQL table {}.{} doesn't exist.", database_name_in_mysql, table_name);
+ return nullptr;
+ }
+
+ auto storage = local_tables_cache[table_name].second;
+ auto table_storage_define = database_engine_define->clone();
+ {
+ ASTStorage * ast_storage = table_storage_define->as<ASTStorage>();
+ ast_storage->engine->kind = ASTFunction::Kind::TABLE_ENGINE;
+ ASTs storage_children = ast_storage->children;
+ auto storage_engine_arguments = ast_storage->engine->arguments;
+
+ /// Add table_name to engine arguments
+ if (typeid_cast<ASTIdentifier *>(storage_engine_arguments->children[0].get()))
+ {
+ storage_engine_arguments->children.push_back(
+ makeASTFunction("equals", std::make_shared<ASTIdentifier>("table"), std::make_shared<ASTLiteral>(table_name)));
+ }
+ else
+ {
+ auto mysql_table_name = std::make_shared<ASTLiteral>(table_name);
+ storage_engine_arguments->children.insert(storage_engine_arguments->children.begin() + 2, mysql_table_name);
+ }
+
+ /// Unset settings
+ std::erase_if(storage_children, [&](const ASTPtr & element) { return element.get() == ast_storage->settings; });
+ ast_storage->settings = nullptr;
+ }
+
+ unsigned max_parser_depth = static_cast<unsigned>(getContext()->getSettingsRef().max_parser_depth);
+ auto create_table_query = DB::getCreateQueryFromStorage(storage,
+ table_storage_define,
+ true,
+ max_parser_depth,
+ throw_on_error);
+ return create_table_query;
+}
+
+time_t DatabaseMySQL::getObjectMetadataModificationTime(const String & table_name) const
+{
+ std::lock_guard lock(mutex);
+
+ fetchTablesIntoLocalCache(getContext());
+
+ if (local_tables_cache.find(table_name) == local_tables_cache.end())
+ throw Exception(ErrorCodes::UNKNOWN_TABLE, "MySQL table {}.{} doesn't exist.", database_name_in_mysql, table_name);
+
+ return time_t(local_tables_cache[table_name].first);
+}
+
+ASTPtr DatabaseMySQL::getCreateDatabaseQuery() const
+{
+ const auto & create_query = std::make_shared<ASTCreateQuery>();
+ create_query->setDatabase(getDatabaseName());
+ create_query->set(create_query->storage, database_engine_define);
+
+ if (const auto comment_value = getDatabaseComment(); !comment_value.empty())
+ create_query->set(create_query->comment, std::make_shared<ASTLiteral>(comment_value));
+
+ return create_query;
+}
+
+void DatabaseMySQL::fetchTablesIntoLocalCache(ContextPtr local_context) const
+{
+ const auto & tables_with_modification_time = fetchTablesWithModificationTime(local_context);
+
+ destroyLocalCacheExtraTables(tables_with_modification_time);
+ fetchLatestTablesStructureIntoCache(tables_with_modification_time, local_context);
+}
+
+void DatabaseMySQL::destroyLocalCacheExtraTables(const std::map<String, UInt64> & tables_with_modification_time) const
+{
+ for (auto iterator = local_tables_cache.begin(); iterator != local_tables_cache.end();)
+ {
+ if (tables_with_modification_time.find(iterator->first) != tables_with_modification_time.end())
+ ++iterator;
+ else
+ {
+ outdated_tables.emplace_back(iterator->second.second);
+ iterator = local_tables_cache.erase(iterator);
+ }
+ }
+}
+
+void DatabaseMySQL::fetchLatestTablesStructureIntoCache(
+ const std::map<String, UInt64> & tables_modification_time, ContextPtr local_context) const
+{
+ std::vector<String> wait_update_tables_name;
+ for (const auto & table_modification_time : tables_modification_time)
+ {
+ const auto & it = local_tables_cache.find(table_modification_time.first);
+
+ /// Outdated or new table structures
+ if (it == local_tables_cache.end() || table_modification_time.second > it->second.first)
+ wait_update_tables_name.emplace_back(table_modification_time.first);
+ }
+
+ std::map<String, ColumnsDescription> tables_and_columns = fetchTablesColumnsList(wait_update_tables_name, local_context);
+
+ for (const auto & table_and_columns : tables_and_columns)
+ {
+ const auto & table_name = table_and_columns.first;
+ const auto & columns_name_and_type = table_and_columns.second;
+ const auto & table_modification_time = tables_modification_time.at(table_name);
+
+ const auto & iterator = local_tables_cache.find(table_name);
+ if (iterator != local_tables_cache.end())
+ {
+ outdated_tables.emplace_back(iterator->second.second);
+ local_tables_cache.erase(iterator);
+ }
+
+ local_tables_cache[table_name] = std::make_pair(
+ table_modification_time,
+ std::make_shared<StorageMySQL>(
+ StorageID(database_name, table_name),
+ std::move(mysql_pool),
+ database_name_in_mysql,
+ table_name,
+ /* replace_query_ */ false,
+ /* on_duplicate_clause = */ "",
+ ColumnsDescription{columns_name_and_type},
+ ConstraintsDescription{},
+ String{},
+ getContext(),
+ MySQLSettings{}));
+ }
+}
+
+std::map<String, UInt64> DatabaseMySQL::fetchTablesWithModificationTime(ContextPtr local_context) const
+{
+ Block tables_status_sample_block
+ {
+ { std::make_shared<DataTypeString>(), "table_name" },
+ { std::make_shared<DataTypeDateTime>(), "modification_time" },
+ };
+
+ WriteBufferFromOwnString query;
+ query << "SELECT"
+ " TABLE_NAME AS table_name, "
+ " CREATE_TIME AS modification_time "
+ " FROM INFORMATION_SCHEMA.TABLES "
+ " WHERE TABLE_SCHEMA = " << quote << database_name_in_mysql;
+
+ std::map<String, UInt64> tables_with_modification_time;
+ StreamSettings mysql_input_stream_settings(local_context->getSettingsRef());
+ auto result = std::make_unique<MySQLSource>(mysql_pool.get(), query.str(), tables_status_sample_block, mysql_input_stream_settings);
+ QueryPipeline pipeline(std::move(result));
+
+ Block block;
+ PullingPipelineExecutor executor(pipeline);
+ while (executor.pull(block))
+ {
+ size_t rows = block.rows();
+ for (size_t index = 0; index < rows; ++index)
+ {
+ String table_name = (*block.getByPosition(0).column)[index].safeGet<String>();
+ tables_with_modification_time[table_name] = (*block.getByPosition(1).column)[index].safeGet<UInt64>();
+ }
+ }
+
+ return tables_with_modification_time;
+}
+
+std::map<String, ColumnsDescription>
+DatabaseMySQL::fetchTablesColumnsList(const std::vector<String> & tables_name, ContextPtr local_context) const
+{
+ const auto & settings = local_context->getSettingsRef();
+
+ return DB::fetchTablesColumnsList(
+ mysql_pool,
+ database_name_in_mysql,
+ tables_name,
+ settings,
+ mysql_settings->mysql_datatypes_support_level);
+}
+
+void DatabaseMySQL::shutdown()
+{
+ std::map<String, ModifyTimeAndStorage> tables_snapshot;
+ {
+ std::lock_guard lock(mutex);
+ tables_snapshot = local_tables_cache;
+ }
+
+ for (const auto & [table_name, modify_time_and_storage] : tables_snapshot)
+ modify_time_and_storage.second->flushAndShutdown();
+
+ std::lock_guard lock(mutex);
+ local_tables_cache.clear();
+}
+
+void DatabaseMySQL::drop(ContextPtr /*context*/)
+{
+ fs::remove_all(getMetadataPath());
+}
+
+void DatabaseMySQL::cleanOutdatedTables()
+{
+ setThreadName("MySQLDBCleaner");
+
+ std::unique_lock lock{mutex};
+
+ while (!quit.load(std::memory_order_relaxed))
+ {
+ for (auto iterator = outdated_tables.begin(); iterator != outdated_tables.end();)
+ {
+ if (!iterator->unique())
+ ++iterator;
+ else
+ {
+ const auto table_lock = (*iterator)->lockExclusively(RWLockImpl::NO_QUERY, lock_acquire_timeout);
+
+ (*iterator)->flushAndShutdown();
+ (*iterator)->is_dropped = true;
+ iterator = outdated_tables.erase(iterator);
+ }
+ }
+
+ cond.wait_for(lock, cleaner_sleep_time);
+ }
+}
+
+void DatabaseMySQL::attachTable(ContextPtr /* context_ */, const String & table_name, const StoragePtr & storage, const String &)
+{
+ std::lock_guard lock{mutex};
+
+ if (!local_tables_cache.contains(table_name))
+ throw Exception(ErrorCodes::UNKNOWN_TABLE, "Cannot attach table {}.{} because it does not exist.",
+ backQuoteIfNeed(database_name), backQuoteIfNeed(table_name));
+
+ if (!remove_or_detach_tables.contains(table_name))
+ throw Exception(ErrorCodes::TABLE_ALREADY_EXISTS, "Cannot attach table {}.{} because it already exists.",
+ backQuoteIfNeed(database_name), backQuoteIfNeed(table_name));
+
+ /// We use the new storage to replace the original storage, because the original storage may have been dropped
+ /// Although we still keep its
+ local_tables_cache[table_name].second = storage;
+
+ remove_or_detach_tables.erase(table_name);
+ fs::path remove_flag = fs::path(getMetadataPath()) / (escapeForFileName(table_name) + suffix);
+
+ if (fs::exists(remove_flag))
+ fs::remove(remove_flag);
+}
+
+StoragePtr DatabaseMySQL::detachTable(ContextPtr /* context */, const String & table_name)
+{
+ std::lock_guard lock{mutex};
+
+ if (remove_or_detach_tables.contains(table_name))
+ throw Exception(ErrorCodes::TABLE_IS_DROPPED, "Table {}.{} is dropped",
+ backQuoteIfNeed(database_name), backQuoteIfNeed(table_name));
+
+ if (!local_tables_cache.contains(table_name))
+ throw Exception(ErrorCodes::UNKNOWN_TABLE, "Table {}.{} doesn't exist.",
+ backQuoteIfNeed(database_name), backQuoteIfNeed(table_name));
+
+ remove_or_detach_tables.emplace(table_name);
+ return local_tables_cache[table_name].second;
+}
+
+String DatabaseMySQL::getMetadataPath() const
+{
+ return metadata_path;
+}
+
+void DatabaseMySQL::loadStoredObjects(ContextMutablePtr, LoadingStrictnessLevel /*mode*/)
+{
+
+ std::lock_guard lock{mutex};
+ fs::directory_iterator iter(getMetadataPath());
+
+ for (fs::directory_iterator end; iter != end; ++iter)
+ {
+ if (fs::is_regular_file(iter->path()) && endsWith(iter->path().filename(), suffix))
+ {
+ const auto & filename = iter->path().filename().string();
+ const auto & table_name = unescapeForFileName(filename.substr(0, filename.size() - strlen(suffix)));
+ remove_or_detach_tables.emplace(table_name);
+ }
+ }
+}
+
+void DatabaseMySQL::detachTablePermanently(ContextPtr, const String & table_name)
+{
+ std::lock_guard lock{mutex};
+
+ fs::path remove_flag = fs::path(getMetadataPath()) / (escapeForFileName(table_name) + suffix);
+
+ if (remove_or_detach_tables.contains(table_name))
+ throw Exception(ErrorCodes::TABLE_IS_DROPPED, "Table {}.{} is dropped", backQuoteIfNeed(database_name), backQuoteIfNeed(table_name));
+
+ if (fs::exists(remove_flag))
+ throw Exception(ErrorCodes::LOGICAL_ERROR, "The remove flag file already exists but the {}.{} does not exists remove tables, it is bug.",
+ backQuoteIfNeed(database_name), backQuoteIfNeed(table_name));
+
+ auto table_iter = local_tables_cache.find(table_name);
+ if (table_iter == local_tables_cache.end())
+ throw Exception(ErrorCodes::UNKNOWN_TABLE, "Table {}.{} doesn't exist", backQuoteIfNeed(database_name), backQuoteIfNeed(table_name));
+
+ remove_or_detach_tables.emplace(table_name);
+
+ try
+ {
+ table_iter->second.second->drop();
+ FS::createFile(remove_flag);
+ }
+ catch (...)
+ {
+ remove_or_detach_tables.erase(table_name);
+ throw;
+ }
+ table_iter->second.second->is_detached = true;
+}
+
+void DatabaseMySQL::dropTable(ContextPtr local_context, const String & table_name, bool /*sync*/)
+{
+ detachTablePermanently(local_context, table_name);
+}
+
+DatabaseMySQL::~DatabaseMySQL()
+{
+ try
+ {
+ if (!quit)
+ {
+ {
+ quit = true;
+ std::lock_guard lock{mutex};
+ }
+ cond.notify_one();
+ thread.join();
+ }
+
+ shutdown();
+ }
+ catch (...)
+ {
+ tryLogCurrentException(__PRETTY_FUNCTION__);
+ }
+}
+
+void DatabaseMySQL::createTable(ContextPtr local_context, const String & table_name, const StoragePtr & storage, const ASTPtr & create_query)
+{
+ const auto & create = create_query->as<ASTCreateQuery>();
+
+ if (!create->attach)
+ throw Exception(ErrorCodes::NOT_IMPLEMENTED,
+ "MySQL database engine does not support create table. "
+ "for tables that were detach or dropped before, you can use attach "
+ "to add them back to the MySQL database");
+
+ /// XXX: hack
+ /// In order to prevent users from broken the table structure by executing attach table database_name.table_name (...)
+ /// we should compare the old and new create_query to make them completely consistent
+ const auto & origin_create_query = getCreateTableQuery(table_name, getContext());
+ origin_create_query->as<ASTCreateQuery>()->attach = true;
+
+ if (queryToString(origin_create_query) != queryToString(create_query))
+ throw Exception(ErrorCodes::UNEXPECTED_AST_STRUCTURE,
+ "The MySQL database engine can only execute attach statements "
+ "of type attach table database_name.table_name");
+
+ attachTable(local_context, table_name, storage, {});
+}
+
+}
+
+#endif