summaryrefslogtreecommitdiffstats
path: root/contrib/clickhouse/src/Databases/MySQL/DatabaseMySQL.cpp
diff options
context:
space:
mode:
authorAlexSm <[email protected]>2024-01-04 15:09:05 +0100
committerGitHub <[email protected]>2024-01-04 15:09:05 +0100
commitdab291146f6cd7d35684e3a1150e5bb1c412982c (patch)
tree36ef35f6cacb6432845a4a33f940c95871036b32 /contrib/clickhouse/src/Databases/MySQL/DatabaseMySQL.cpp
parent63660ad5e7512029fd0218e7a636580695a24e1f (diff)
Library import 5, delete go dependencies (#832)
* Library import 5, delete go dependencies * Fix yt client
Diffstat (limited to 'contrib/clickhouse/src/Databases/MySQL/DatabaseMySQL.cpp')
-rw-r--r--contrib/clickhouse/src/Databases/MySQL/DatabaseMySQL.cpp510
1 files changed, 0 insertions, 510 deletions
diff --git a/contrib/clickhouse/src/Databases/MySQL/DatabaseMySQL.cpp b/contrib/clickhouse/src/Databases/MySQL/DatabaseMySQL.cpp
deleted file mode 100644
index 282dae63a0f..00000000000
--- a/contrib/clickhouse/src/Databases/MySQL/DatabaseMySQL.cpp
+++ /dev/null
@@ -1,510 +0,0 @@
-#include "clickhouse_config.h"
-
-#if USE_MYSQL
-# include <string>
-# include <DataTypes/DataTypeDateTime.h>
-# include <DataTypes/DataTypeNullable.h>
-# include <DataTypes/DataTypeString.h>
-# include <DataTypes/DataTypesNumber.h>
-# include <DataTypes/convertMySQLDataType.h>
-# include <Databases/MySQL/DatabaseMySQL.h>
-# include <Databases/MySQL/FetchTablesColumnsList.h>
-# error #include <Processors/Sources/MySQLSource.h>
-# include <Processors/Executors/PullingPipelineExecutor.h>
-# include <QueryPipeline/QueryPipelineBuilder.h>
-# include <IO/Operators.h>
-# include <Interpreters/Context.h>
-# include <Parsers/ASTCreateQuery.h>
-# include <Parsers/ASTFunction.h>
-# include <Parsers/ParserCreateQuery.h>
-# include <Parsers/parseQuery.h>
-# include <Parsers/queryToString.h>
-# include <Storages/StorageMySQL.h>
-# include <Storages/MySQL/MySQLSettings.h>
-# include <Common/escapeForFileName.h>
-# include <Common/parseAddress.h>
-# include <Common/setThreadName.h>
-# include <filesystem>
-# include <Common/filesystemHelpers.h>
-# include <Parsers/ASTIdentifier.h>
-
-namespace fs = std::filesystem;
-
-namespace DB
-{
-
-namespace ErrorCodes
-{
- extern const int NOT_IMPLEMENTED;
- extern const int LOGICAL_ERROR;
- extern const int UNKNOWN_TABLE;
- extern const int TABLE_IS_DROPPED;
- extern const int TABLE_ALREADY_EXISTS;
- extern const int UNEXPECTED_AST_STRUCTURE;
-}
-
-constexpr static const auto suffix = ".remove_flag";
-static constexpr const std::chrono::seconds cleaner_sleep_time{30};
-static const std::chrono::seconds lock_acquire_timeout{10};
-
-DatabaseMySQL::DatabaseMySQL(
- ContextPtr context_,
- const String & database_name_,
- const String & metadata_path_,
- const ASTStorage * database_engine_define_,
- const String & database_name_in_mysql_,
- std::unique_ptr<MySQLSettings> settings_,
- mysqlxx::PoolWithFailover && pool,
- bool attach)
- : IDatabase(database_name_)
- , WithContext(context_->getGlobalContext())
- , metadata_path(metadata_path_)
- , database_engine_define(database_engine_define_->clone())
- , database_name_in_mysql(database_name_in_mysql_)
- , mysql_settings(std::move(settings_))
- , mysql_pool(std::move(pool)) /// NOLINT
-{
- try
- {
- /// Test that the database is working fine; it will also fetch tables.
- empty(); // NOLINT(bugprone-standalone-empty)
- }
- catch (...)
- {
- if (attach)
- tryLogCurrentException("DatabaseMySQL");
- else
- throw;
- }
-
- fs::create_directories(metadata_path);
-
- thread = ThreadFromGlobalPool{&DatabaseMySQL::cleanOutdatedTables, this};
-}
-
-bool DatabaseMySQL::empty() const
-{
- std::lock_guard lock(mutex);
-
- fetchTablesIntoLocalCache(getContext());
-
- if (local_tables_cache.empty())
- return true;
-
- for (const auto & [table_name, storage_info] : local_tables_cache)
- if (!remove_or_detach_tables.contains(table_name))
- return false;
-
- return true;
-}
-
-DatabaseTablesIteratorPtr DatabaseMySQL::getTablesIterator(ContextPtr local_context, const FilterByNameFunction & filter_by_table_name) const
-{
- Tables tables;
- std::lock_guard lock(mutex);
-
- fetchTablesIntoLocalCache(local_context);
-
- for (const auto & [table_name, modify_time_and_storage] : local_tables_cache)
- if (!remove_or_detach_tables.contains(table_name) && (!filter_by_table_name || filter_by_table_name(table_name)))
- tables[table_name] = modify_time_and_storage.second;
-
- return std::make_unique<DatabaseTablesSnapshotIterator>(tables, database_name);
-}
-
-bool DatabaseMySQL::isTableExist(const String & name, ContextPtr local_context) const
-{
- return bool(tryGetTable(name, local_context));
-}
-
-StoragePtr DatabaseMySQL::tryGetTable(const String & mysql_table_name, ContextPtr local_context) const
-{
- std::lock_guard lock(mutex);
-
- fetchTablesIntoLocalCache(local_context);
-
- if (!remove_or_detach_tables.contains(mysql_table_name) && local_tables_cache.find(mysql_table_name) != local_tables_cache.end())
- return local_tables_cache[mysql_table_name].second;
-
- return StoragePtr{};
-}
-
-ASTPtr DatabaseMySQL::getCreateTableQueryImpl(const String & table_name, ContextPtr local_context, bool throw_on_error) const
-{
- std::lock_guard lock(mutex);
-
- fetchTablesIntoLocalCache(local_context);
-
- if (local_tables_cache.find(table_name) == local_tables_cache.end())
- {
- if (throw_on_error)
- throw Exception(ErrorCodes::UNKNOWN_TABLE, "MySQL table {}.{} doesn't exist.", database_name_in_mysql, table_name);
- return nullptr;
- }
-
- auto storage = local_tables_cache[table_name].second;
- auto table_storage_define = database_engine_define->clone();
- {
- ASTStorage * ast_storage = table_storage_define->as<ASTStorage>();
- ast_storage->engine->kind = ASTFunction::Kind::TABLE_ENGINE;
- ASTs storage_children = ast_storage->children;
- auto storage_engine_arguments = ast_storage->engine->arguments;
-
- /// Add table_name to engine arguments
- if (typeid_cast<ASTIdentifier *>(storage_engine_arguments->children[0].get()))
- {
- storage_engine_arguments->children.push_back(
- makeASTFunction("equals", std::make_shared<ASTIdentifier>("table"), std::make_shared<ASTLiteral>(table_name)));
- }
- else
- {
- auto mysql_table_name = std::make_shared<ASTLiteral>(table_name);
- storage_engine_arguments->children.insert(storage_engine_arguments->children.begin() + 2, mysql_table_name);
- }
-
- /// Unset settings
- std::erase_if(storage_children, [&](const ASTPtr & element) { return element.get() == ast_storage->settings; });
- ast_storage->settings = nullptr;
- }
-
- unsigned max_parser_depth = static_cast<unsigned>(getContext()->getSettingsRef().max_parser_depth);
- auto create_table_query = DB::getCreateQueryFromStorage(storage,
- table_storage_define,
- true,
- max_parser_depth,
- throw_on_error);
- return create_table_query;
-}
-
-time_t DatabaseMySQL::getObjectMetadataModificationTime(const String & table_name) const
-{
- std::lock_guard lock(mutex);
-
- fetchTablesIntoLocalCache(getContext());
-
- if (local_tables_cache.find(table_name) == local_tables_cache.end())
- throw Exception(ErrorCodes::UNKNOWN_TABLE, "MySQL table {}.{} doesn't exist.", database_name_in_mysql, table_name);
-
- return time_t(local_tables_cache[table_name].first);
-}
-
-ASTPtr DatabaseMySQL::getCreateDatabaseQuery() const
-{
- const auto & create_query = std::make_shared<ASTCreateQuery>();
- create_query->setDatabase(getDatabaseName());
- create_query->set(create_query->storage, database_engine_define);
-
- if (const auto comment_value = getDatabaseComment(); !comment_value.empty())
- create_query->set(create_query->comment, std::make_shared<ASTLiteral>(comment_value));
-
- return create_query;
-}
-
-void DatabaseMySQL::fetchTablesIntoLocalCache(ContextPtr local_context) const
-{
- const auto & tables_with_modification_time = fetchTablesWithModificationTime(local_context);
-
- destroyLocalCacheExtraTables(tables_with_modification_time);
- fetchLatestTablesStructureIntoCache(tables_with_modification_time, local_context);
-}
-
-void DatabaseMySQL::destroyLocalCacheExtraTables(const std::map<String, UInt64> & tables_with_modification_time) const
-{
- for (auto iterator = local_tables_cache.begin(); iterator != local_tables_cache.end();)
- {
- if (tables_with_modification_time.find(iterator->first) != tables_with_modification_time.end())
- ++iterator;
- else
- {
- outdated_tables.emplace_back(iterator->second.second);
- iterator = local_tables_cache.erase(iterator);
- }
- }
-}
-
-void DatabaseMySQL::fetchLatestTablesStructureIntoCache(
- const std::map<String, UInt64> & tables_modification_time, ContextPtr local_context) const
-{
- std::vector<String> wait_update_tables_name;
- for (const auto & table_modification_time : tables_modification_time)
- {
- const auto & it = local_tables_cache.find(table_modification_time.first);
-
- /// Outdated or new table structures
- if (it == local_tables_cache.end() || table_modification_time.second > it->second.first)
- wait_update_tables_name.emplace_back(table_modification_time.first);
- }
-
- std::map<String, ColumnsDescription> tables_and_columns = fetchTablesColumnsList(wait_update_tables_name, local_context);
-
- for (const auto & table_and_columns : tables_and_columns)
- {
- const auto & table_name = table_and_columns.first;
- const auto & columns_name_and_type = table_and_columns.second;
- const auto & table_modification_time = tables_modification_time.at(table_name);
-
- const auto & iterator = local_tables_cache.find(table_name);
- if (iterator != local_tables_cache.end())
- {
- outdated_tables.emplace_back(iterator->second.second);
- local_tables_cache.erase(iterator);
- }
-
- local_tables_cache[table_name] = std::make_pair(
- table_modification_time,
- std::make_shared<StorageMySQL>(
- StorageID(database_name, table_name),
- std::move(mysql_pool),
- database_name_in_mysql,
- table_name,
- /* replace_query_ */ false,
- /* on_duplicate_clause = */ "",
- ColumnsDescription{columns_name_and_type},
- ConstraintsDescription{},
- String{},
- getContext(),
- MySQLSettings{}));
- }
-}
-
-std::map<String, UInt64> DatabaseMySQL::fetchTablesWithModificationTime(ContextPtr local_context) const
-{
- Block tables_status_sample_block
- {
- { std::make_shared<DataTypeString>(), "table_name" },
- { std::make_shared<DataTypeDateTime>(), "modification_time" },
- };
-
- WriteBufferFromOwnString query;
- query << "SELECT"
- " TABLE_NAME AS table_name, "
- " CREATE_TIME AS modification_time "
- " FROM INFORMATION_SCHEMA.TABLES "
- " WHERE TABLE_SCHEMA = " << quote << database_name_in_mysql;
-
- std::map<String, UInt64> tables_with_modification_time;
- StreamSettings mysql_input_stream_settings(local_context->getSettingsRef());
- auto result = std::make_unique<MySQLSource>(mysql_pool.get(), query.str(), tables_status_sample_block, mysql_input_stream_settings);
- QueryPipeline pipeline(std::move(result));
-
- Block block;
- PullingPipelineExecutor executor(pipeline);
- while (executor.pull(block))
- {
- size_t rows = block.rows();
- for (size_t index = 0; index < rows; ++index)
- {
- String table_name = (*block.getByPosition(0).column)[index].safeGet<String>();
- tables_with_modification_time[table_name] = (*block.getByPosition(1).column)[index].safeGet<UInt64>();
- }
- }
-
- return tables_with_modification_time;
-}
-
-std::map<String, ColumnsDescription>
-DatabaseMySQL::fetchTablesColumnsList(const std::vector<String> & tables_name, ContextPtr local_context) const
-{
- const auto & settings = local_context->getSettingsRef();
-
- return DB::fetchTablesColumnsList(
- mysql_pool,
- database_name_in_mysql,
- tables_name,
- settings,
- mysql_settings->mysql_datatypes_support_level);
-}
-
-void DatabaseMySQL::shutdown()
-{
- std::map<String, ModifyTimeAndStorage> tables_snapshot;
- {
- std::lock_guard lock(mutex);
- tables_snapshot = local_tables_cache;
- }
-
- for (const auto & [table_name, modify_time_and_storage] : tables_snapshot)
- modify_time_and_storage.second->flushAndShutdown();
-
- std::lock_guard lock(mutex);
- local_tables_cache.clear();
-}
-
-void DatabaseMySQL::drop(ContextPtr /*context*/)
-{
- fs::remove_all(getMetadataPath());
-}
-
-void DatabaseMySQL::cleanOutdatedTables()
-{
- setThreadName("MySQLDBCleaner");
-
- std::unique_lock lock{mutex};
-
- while (!quit.load(std::memory_order_relaxed))
- {
- for (auto iterator = outdated_tables.begin(); iterator != outdated_tables.end();)
- {
- if (!iterator->unique())
- ++iterator;
- else
- {
- const auto table_lock = (*iterator)->lockExclusively(RWLockImpl::NO_QUERY, lock_acquire_timeout);
-
- (*iterator)->flushAndShutdown();
- (*iterator)->is_dropped = true;
- iterator = outdated_tables.erase(iterator);
- }
- }
-
- cond.wait_for(lock, cleaner_sleep_time);
- }
-}
-
-void DatabaseMySQL::attachTable(ContextPtr /* context_ */, const String & table_name, const StoragePtr & storage, const String &)
-{
- std::lock_guard lock{mutex};
-
- if (!local_tables_cache.contains(table_name))
- throw Exception(ErrorCodes::UNKNOWN_TABLE, "Cannot attach table {}.{} because it does not exist.",
- backQuoteIfNeed(database_name), backQuoteIfNeed(table_name));
-
- if (!remove_or_detach_tables.contains(table_name))
- throw Exception(ErrorCodes::TABLE_ALREADY_EXISTS, "Cannot attach table {}.{} because it already exists.",
- backQuoteIfNeed(database_name), backQuoteIfNeed(table_name));
-
- /// We use the new storage to replace the original storage, because the original storage may have been dropped
- /// Although we still keep its
- local_tables_cache[table_name].second = storage;
-
- remove_or_detach_tables.erase(table_name);
- fs::path remove_flag = fs::path(getMetadataPath()) / (escapeForFileName(table_name) + suffix);
-
- if (fs::exists(remove_flag))
- fs::remove(remove_flag);
-}
-
-StoragePtr DatabaseMySQL::detachTable(ContextPtr /* context */, const String & table_name)
-{
- std::lock_guard lock{mutex};
-
- if (remove_or_detach_tables.contains(table_name))
- throw Exception(ErrorCodes::TABLE_IS_DROPPED, "Table {}.{} is dropped",
- backQuoteIfNeed(database_name), backQuoteIfNeed(table_name));
-
- if (!local_tables_cache.contains(table_name))
- throw Exception(ErrorCodes::UNKNOWN_TABLE, "Table {}.{} doesn't exist.",
- backQuoteIfNeed(database_name), backQuoteIfNeed(table_name));
-
- remove_or_detach_tables.emplace(table_name);
- return local_tables_cache[table_name].second;
-}
-
-String DatabaseMySQL::getMetadataPath() const
-{
- return metadata_path;
-}
-
-void DatabaseMySQL::loadStoredObjects(ContextMutablePtr, LoadingStrictnessLevel /*mode*/)
-{
-
- std::lock_guard lock{mutex};
- fs::directory_iterator iter(getMetadataPath());
-
- for (fs::directory_iterator end; iter != end; ++iter)
- {
- if (fs::is_regular_file(iter->path()) && endsWith(iter->path().filename(), suffix))
- {
- const auto & filename = iter->path().filename().string();
- const auto & table_name = unescapeForFileName(filename.substr(0, filename.size() - strlen(suffix)));
- remove_or_detach_tables.emplace(table_name);
- }
- }
-}
-
-void DatabaseMySQL::detachTablePermanently(ContextPtr, const String & table_name)
-{
- std::lock_guard lock{mutex};
-
- fs::path remove_flag = fs::path(getMetadataPath()) / (escapeForFileName(table_name) + suffix);
-
- if (remove_or_detach_tables.contains(table_name))
- throw Exception(ErrorCodes::TABLE_IS_DROPPED, "Table {}.{} is dropped", backQuoteIfNeed(database_name), backQuoteIfNeed(table_name));
-
- if (fs::exists(remove_flag))
- throw Exception(ErrorCodes::LOGICAL_ERROR, "The remove flag file already exists but the {}.{} does not exists remove tables, it is bug.",
- backQuoteIfNeed(database_name), backQuoteIfNeed(table_name));
-
- auto table_iter = local_tables_cache.find(table_name);
- if (table_iter == local_tables_cache.end())
- throw Exception(ErrorCodes::UNKNOWN_TABLE, "Table {}.{} doesn't exist", backQuoteIfNeed(database_name), backQuoteIfNeed(table_name));
-
- remove_or_detach_tables.emplace(table_name);
-
- try
- {
- table_iter->second.second->drop();
- FS::createFile(remove_flag);
- }
- catch (...)
- {
- remove_or_detach_tables.erase(table_name);
- throw;
- }
- table_iter->second.second->is_detached = true;
-}
-
-void DatabaseMySQL::dropTable(ContextPtr local_context, const String & table_name, bool /*sync*/)
-{
- detachTablePermanently(local_context, table_name);
-}
-
-DatabaseMySQL::~DatabaseMySQL()
-{
- try
- {
- if (!quit)
- {
- {
- quit = true;
- std::lock_guard lock{mutex};
- }
- cond.notify_one();
- thread.join();
- }
-
- shutdown();
- }
- catch (...)
- {
- tryLogCurrentException(__PRETTY_FUNCTION__);
- }
-}
-
-void DatabaseMySQL::createTable(ContextPtr local_context, const String & table_name, const StoragePtr & storage, const ASTPtr & create_query)
-{
- const auto & create = create_query->as<ASTCreateQuery>();
-
- if (!create->attach)
- throw Exception(ErrorCodes::NOT_IMPLEMENTED,
- "MySQL database engine does not support create table. "
- "for tables that were detach or dropped before, you can use attach "
- "to add them back to the MySQL database");
-
- /// XXX: hack
- /// In order to prevent users from broken the table structure by executing attach table database_name.table_name (...)
- /// we should compare the old and new create_query to make them completely consistent
- const auto & origin_create_query = getCreateTableQuery(table_name, getContext());
- origin_create_query->as<ASTCreateQuery>()->attach = true;
-
- if (queryToString(origin_create_query) != queryToString(create_query))
- throw Exception(ErrorCodes::UNEXPECTED_AST_STRUCTURE,
- "The MySQL database engine can only execute attach statements "
- "of type attach table database_name.table_name");
-
- attachTable(local_context, table_name, storage, {});
-}
-
-}
-
-#endif