1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
|
#pragma once
#include "clickhouse_config.h"
#if USE_MYSQL
# include <mutex>
# include <Core/MySQL/MySQLClient.h>
# include <QueryPipeline/BlockIO.h>
# include <DataTypes/DataTypeString.h>
# include <DataTypes/DataTypesNumber.h>
# include <Databases/DatabaseOrdinary.h>
# include <Databases/IDatabase.h>
# include <Databases/MySQL/MaterializeMetadata.h>
# include <Databases/MySQL/MaterializedMySQLSettings.h>
# include <Parsers/ASTCreateQuery.h>
# error #include <mysqlxx/Pool.h>
# error #include <mysqlxx/PoolWithFailover.h>
namespace DB
{
/** MySQL table structure and data synchronization thread
*
* When catch exception, it always exits immediately.
* In this case, you need to execute DETACH DATABASE and ATTACH DATABASE after manual processing
*
* The whole work of the thread includes synchronous full data and real-time pull incremental data
*
* synchronous full data:
* We will synchronize the full data when the database is first create or not found binlog file in MySQL after restart.
*
* real-time pull incremental data:
* We will pull the binlog event of MySQL to parse and execute when the full data synchronization is completed.
*/
class MaterializedMySQLSyncThread : WithContext
{
public:
~MaterializedMySQLSyncThread();
MaterializedMySQLSyncThread(
ContextPtr context,
const String & database_name_,
const String & mysql_database_name_,
mysqlxx::Pool && pool_,
MySQLClient && client_,
MaterializedMySQLSettings * settings_);
void stopSynchronization();
void startSynchronization();
void assertMySQLAvailable();
private:
Poco::Logger * log;
String database_name;
String mysql_database_name;
mutable mysqlxx::Pool pool;
mutable MySQLClient client;
MaterializedMySQLSettings * settings;
String query_prefix;
NameSet materialized_tables_list;
// USE MySQL ERROR CODE:
// https://dev.mysql.com/doc/mysql-errors/5.7/en/server-error-reference.html
const int ER_ACCESS_DENIED_ERROR = 1045; /// NOLINT
const int ER_DBACCESS_DENIED_ERROR = 1044; /// NOLINT
const int ER_BAD_DB_ERROR = 1049; /// NOLINT
// https://dev.mysql.com/doc/mysql-errors/8.0/en/client-error-reference.html
const int CR_SERVER_LOST = 2013; /// NOLINT
struct Buffers
{
String database;
/// thresholds
size_t max_block_rows = 0;
size_t max_block_bytes = 0;
size_t total_blocks_rows = 0;
size_t total_blocks_bytes = 0;
using BufferAndSortingColumns = std::pair<Block, std::vector<size_t>>;
using BufferAndSortingColumnsPtr = std::shared_ptr<BufferAndSortingColumns>;
std::unordered_map<String, BufferAndSortingColumnsPtr> data;
explicit Buffers(const String & database_) : database(database_) {}
void commit(ContextPtr context);
void add(size_t block_rows, size_t block_bytes, size_t written_rows, size_t written_bytes);
bool checkThresholds(size_t check_block_rows, size_t check_block_bytes, size_t check_total_rows, size_t check_total_bytes) const;
BufferAndSortingColumnsPtr getTableDataBuffer(const String & table, ContextPtr context);
};
void synchronization();
bool isCancelled() { return sync_quit.load(std::memory_order_relaxed); }
bool prepareSynchronized(MaterializeMetadata & metadata);
void flushBuffersData(Buffers & buffers, MaterializeMetadata & metadata);
void onEvent(Buffers & buffers, const MySQLReplication::BinlogEventPtr & event, MaterializeMetadata & metadata);
std::atomic<bool> sync_quit{false};
std::unique_ptr<ThreadFromGlobalPool> background_thread_pool;
void executeDDLAtomic(const QueryEvent & query_event);
void setSynchronizationThreadException(const std::exception_ptr & exception);
};
}
#endif
|