summaryrefslogtreecommitdiffstats
path: root/contrib/clickhouse/src/Databases/MySQL/DatabaseMaterializedMySQL.cpp
diff options
context:
space:
mode:
authorvitalyisaev <[email protected]>2023-11-14 09:58:56 +0300
committervitalyisaev <[email protected]>2023-11-14 10:20:20 +0300
commitc2b2dfd9827a400a8495e172a56343462e3ceb82 (patch)
treecd4e4f597d01bede4c82dffeb2d780d0a9046bd0 /contrib/clickhouse/src/Databases/MySQL/DatabaseMaterializedMySQL.cpp
parentd4ae8f119e67808cb0cf776ba6e0cf95296f2df7 (diff)
YQ Connector: move tests from yql to ydb (OSS)
Перенос папки с тестами на Коннектор из папки yql в папку ydb (синхронизируется с github).
Diffstat (limited to 'contrib/clickhouse/src/Databases/MySQL/DatabaseMaterializedMySQL.cpp')
-rw-r--r--contrib/clickhouse/src/Databases/MySQL/DatabaseMaterializedMySQL.cpp169
1 files changed, 169 insertions, 0 deletions
diff --git a/contrib/clickhouse/src/Databases/MySQL/DatabaseMaterializedMySQL.cpp b/contrib/clickhouse/src/Databases/MySQL/DatabaseMaterializedMySQL.cpp
new file mode 100644
index 00000000000..6049e7df2e1
--- /dev/null
+++ b/contrib/clickhouse/src/Databases/MySQL/DatabaseMaterializedMySQL.cpp
@@ -0,0 +1,169 @@
+#include "clickhouse_config.h"
+
+#if USE_MYSQL
+
+# include <Databases/MySQL/DatabaseMaterializedMySQL.h>
+
+# include <Interpreters/Context.h>
+# include <Databases/MySQL/DatabaseMaterializedTablesIterator.h>
+# include <Databases/MySQL/MaterializedMySQLSyncThread.h>
+# include <Parsers/ASTCreateQuery.h>
+# include <Storages/StorageMaterializedMySQL.h>
+# include <Common/setThreadName.h>
+# include <filesystem>
+
+namespace fs = std::filesystem;
+
+namespace DB
+{
+
+namespace ErrorCodes
+{
+ extern const int NOT_IMPLEMENTED;
+}
+
+DatabaseMaterializedMySQL::DatabaseMaterializedMySQL(
+ ContextPtr context_,
+ const String & database_name_,
+ const String & metadata_path_,
+ UUID uuid,
+ const String & mysql_database_name_,
+ mysqlxx::Pool && pool_,
+ MySQLClient && client_,
+ std::unique_ptr<MaterializedMySQLSettings> settings_)
+ : DatabaseAtomic(database_name_, metadata_path_, uuid, "DatabaseMaterializedMySQL(" + database_name_ + ")", context_)
+ , settings(std::move(settings_))
+ , materialize_thread(context_, database_name_, mysql_database_name_, std::move(pool_), std::move(client_), settings.get())
+{
+}
+
+void DatabaseMaterializedMySQL::rethrowExceptionIfNeeded() const
+{
+ std::lock_guard lock(mutex);
+
+ if (!settings->allows_query_when_mysql_lost && exception)
+ {
+ try
+ {
+ std::rethrow_exception(exception);
+ }
+ catch (Exception & ex)
+ {
+ /// This method can be called from multiple threads
+ /// and Exception can be modified concurrently by calling addMessage(...),
+ /// so we rethrow a copy.
+ throw Exception(ex);
+ }
+ }
+}
+
+void DatabaseMaterializedMySQL::setException(const std::exception_ptr & exception_)
+{
+ std::lock_guard lock(mutex);
+ exception = exception_;
+}
+
+void DatabaseMaterializedMySQL::startupTables(ThreadPool & thread_pool, LoadingStrictnessLevel mode)
+{
+ LOG_TRACE(log, "Starting MaterializeMySQL tables");
+ DatabaseAtomic::startupTables(thread_pool, mode);
+
+ if (mode < LoadingStrictnessLevel::FORCE_ATTACH)
+ materialize_thread.assertMySQLAvailable();
+
+ materialize_thread.startSynchronization();
+ started_up = true;
+}
+
+void DatabaseMaterializedMySQL::createTable(ContextPtr context_, const String & name, const StoragePtr & table, const ASTPtr & query)
+{
+ checkIsInternalQuery(context_, "CREATE TABLE");
+ DatabaseAtomic::createTable(context_, name, table, query);
+}
+
+void DatabaseMaterializedMySQL::dropTable(ContextPtr context_, const String & name, bool sync)
+{
+ checkIsInternalQuery(context_, "DROP TABLE");
+ DatabaseAtomic::dropTable(context_, name, sync);
+}
+
+void DatabaseMaterializedMySQL::attachTable(ContextPtr context_, const String & name, const StoragePtr & table, const String & relative_table_path)
+{
+ checkIsInternalQuery(context_, "ATTACH TABLE");
+ DatabaseAtomic::attachTable(context_, name, table, relative_table_path);
+}
+
+StoragePtr DatabaseMaterializedMySQL::detachTable(ContextPtr context_, const String & name)
+{
+ checkIsInternalQuery(context_, "DETACH TABLE");
+ return DatabaseAtomic::detachTable(context_, name);
+}
+
+void DatabaseMaterializedMySQL::renameTable(ContextPtr context_, const String & name, IDatabase & to_database, const String & to_name, bool exchange, bool dictionary)
+{
+ checkIsInternalQuery(context_, "RENAME TABLE");
+
+ if (exchange)
+ throw Exception(ErrorCodes::NOT_IMPLEMENTED, "MaterializedMySQL database does not support EXCHANGE TABLE.");
+
+ if (dictionary)
+ throw Exception(ErrorCodes::NOT_IMPLEMENTED, "MaterializedMySQL database does not support RENAME DICTIONARY.");
+
+ if (to_database.getDatabaseName() != DatabaseAtomic::getDatabaseName())
+ throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Cannot rename with other database for MaterializedMySQL database.");
+
+ DatabaseAtomic::renameTable(context_, name, *this, to_name, exchange, dictionary);
+}
+
+void DatabaseMaterializedMySQL::alterTable(ContextPtr context_, const StorageID & table_id, const StorageInMemoryMetadata & metadata)
+{
+ checkIsInternalQuery(context_, "ALTER TABLE");
+ DatabaseAtomic::alterTable(context_, table_id, metadata);
+}
+
+void DatabaseMaterializedMySQL::drop(ContextPtr context_)
+{
+ LOG_TRACE(log, "Dropping MaterializeMySQL database");
+ /// Remove metadata info
+ fs::path metadata(getMetadataPath() + "/.metadata");
+
+ if (fs::exists(metadata))
+ fs::remove(metadata);
+
+ DatabaseAtomic::drop(context_);
+}
+
+StoragePtr DatabaseMaterializedMySQL::tryGetTable(const String & name, ContextPtr context_) const
+{
+ StoragePtr nested_storage = DatabaseAtomic::tryGetTable(name, context_);
+ if (context_->isInternalQuery())
+ return nested_storage;
+ if (nested_storage)
+ return std::make_shared<StorageMaterializedMySQL>(std::move(nested_storage), this);
+ return nullptr;
+}
+
+DatabaseTablesIteratorPtr
+DatabaseMaterializedMySQL::getTablesIterator(ContextPtr context_, const DatabaseOnDisk::FilterByNameFunction & filter_by_table_name) const
+{
+ DatabaseTablesIteratorPtr iterator = DatabaseAtomic::getTablesIterator(context_, filter_by_table_name);
+ if (context_->isInternalQuery())
+ return iterator;
+ return std::make_unique<DatabaseMaterializedTablesIterator>(std::move(iterator), this);
+}
+
+void DatabaseMaterializedMySQL::checkIsInternalQuery(ContextPtr context_, const char * method) const
+{
+ if (started_up && context_ && !context_->isInternalQuery())
+ throw Exception(ErrorCodes::NOT_IMPLEMENTED, "MaterializedMySQL database does not support {}", method);
+}
+
+void DatabaseMaterializedMySQL::stopReplication()
+{
+ materialize_thread.stopSynchronization();
+ started_up = false;
+}
+
+}
+
+#endif