1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
|
#include "clickhouse_config.h"
#if USE_MYSQL
# include <Databases/MySQL/DatabaseMaterializedMySQL.h>
# include <Interpreters/Context.h>
# include <Databases/MySQL/DatabaseMaterializedTablesIterator.h>
# include <Databases/MySQL/MaterializedMySQLSyncThread.h>
# include <Parsers/ASTCreateQuery.h>
# include <Storages/StorageMaterializedMySQL.h>
# include <Common/setThreadName.h>
# include <filesystem>
namespace fs = std::filesystem;
namespace DB
{
namespace ErrorCodes
{
extern const int NOT_IMPLEMENTED;
}
DatabaseMaterializedMySQL::DatabaseMaterializedMySQL(
ContextPtr context_,
const String & database_name_,
const String & metadata_path_,
UUID uuid,
const String & mysql_database_name_,
mysqlxx::Pool && pool_,
MySQLClient && client_,
std::unique_ptr<MaterializedMySQLSettings> settings_)
: DatabaseAtomic(database_name_, metadata_path_, uuid, "DatabaseMaterializedMySQL(" + database_name_ + ")", context_)
, settings(std::move(settings_))
, materialize_thread(context_, database_name_, mysql_database_name_, std::move(pool_), std::move(client_), settings.get())
{
}
void DatabaseMaterializedMySQL::rethrowExceptionIfNeeded() const
{
std::lock_guard lock(mutex);
if (!settings->allows_query_when_mysql_lost && exception)
{
try
{
std::rethrow_exception(exception);
}
catch (Exception & ex)
{
/// This method can be called from multiple threads
/// and Exception can be modified concurrently by calling addMessage(...),
/// so we rethrow a copy.
throw Exception(ex);
}
}
}
void DatabaseMaterializedMySQL::setException(const std::exception_ptr & exception_)
{
std::lock_guard lock(mutex);
exception = exception_;
}
void DatabaseMaterializedMySQL::startupTables(ThreadPool & thread_pool, LoadingStrictnessLevel mode)
{
LOG_TRACE(log, "Starting MaterializeMySQL tables");
DatabaseAtomic::startupTables(thread_pool, mode);
if (mode < LoadingStrictnessLevel::FORCE_ATTACH)
materialize_thread.assertMySQLAvailable();
materialize_thread.startSynchronization();
started_up = true;
}
void DatabaseMaterializedMySQL::createTable(ContextPtr context_, const String & name, const StoragePtr & table, const ASTPtr & query)
{
checkIsInternalQuery(context_, "CREATE TABLE");
DatabaseAtomic::createTable(context_, name, table, query);
}
void DatabaseMaterializedMySQL::dropTable(ContextPtr context_, const String & name, bool sync)
{
checkIsInternalQuery(context_, "DROP TABLE");
DatabaseAtomic::dropTable(context_, name, sync);
}
void DatabaseMaterializedMySQL::attachTable(ContextPtr context_, const String & name, const StoragePtr & table, const String & relative_table_path)
{
checkIsInternalQuery(context_, "ATTACH TABLE");
DatabaseAtomic::attachTable(context_, name, table, relative_table_path);
}
StoragePtr DatabaseMaterializedMySQL::detachTable(ContextPtr context_, const String & name)
{
checkIsInternalQuery(context_, "DETACH TABLE");
return DatabaseAtomic::detachTable(context_, name);
}
void DatabaseMaterializedMySQL::renameTable(ContextPtr context_, const String & name, IDatabase & to_database, const String & to_name, bool exchange, bool dictionary)
{
checkIsInternalQuery(context_, "RENAME TABLE");
if (exchange)
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "MaterializedMySQL database does not support EXCHANGE TABLE.");
if (dictionary)
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "MaterializedMySQL database does not support RENAME DICTIONARY.");
if (to_database.getDatabaseName() != DatabaseAtomic::getDatabaseName())
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Cannot rename with other database for MaterializedMySQL database.");
DatabaseAtomic::renameTable(context_, name, *this, to_name, exchange, dictionary);
}
void DatabaseMaterializedMySQL::alterTable(ContextPtr context_, const StorageID & table_id, const StorageInMemoryMetadata & metadata)
{
checkIsInternalQuery(context_, "ALTER TABLE");
DatabaseAtomic::alterTable(context_, table_id, metadata);
}
void DatabaseMaterializedMySQL::drop(ContextPtr context_)
{
LOG_TRACE(log, "Dropping MaterializeMySQL database");
/// Remove metadata info
fs::path metadata(getMetadataPath() + "/.metadata");
if (fs::exists(metadata))
fs::remove(metadata);
DatabaseAtomic::drop(context_);
}
StoragePtr DatabaseMaterializedMySQL::tryGetTable(const String & name, ContextPtr context_) const
{
StoragePtr nested_storage = DatabaseAtomic::tryGetTable(name, context_);
if (context_->isInternalQuery())
return nested_storage;
if (nested_storage)
return std::make_shared<StorageMaterializedMySQL>(std::move(nested_storage), this);
return nullptr;
}
DatabaseTablesIteratorPtr
DatabaseMaterializedMySQL::getTablesIterator(ContextPtr context_, const DatabaseOnDisk::FilterByNameFunction & filter_by_table_name) const
{
DatabaseTablesIteratorPtr iterator = DatabaseAtomic::getTablesIterator(context_, filter_by_table_name);
if (context_->isInternalQuery())
return iterator;
return std::make_unique<DatabaseMaterializedTablesIterator>(std::move(iterator), this);
}
void DatabaseMaterializedMySQL::checkIsInternalQuery(ContextPtr context_, const char * method) const
{
if (started_up && context_ && !context_->isInternalQuery())
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "MaterializedMySQL database does not support {}", method);
}
void DatabaseMaterializedMySQL::stopReplication()
{
materialize_thread.stopSynchronization();
started_up = false;
}
}
#endif
|