summaryrefslogtreecommitdiffstats
path: root/contrib/clickhouse/src/Databases/MySQL/MaterializedMySQLSyncThread.h
blob: a1602c089a6873f96b17401e40458b8c4974946e (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
#pragma once

#include "clickhouse_config.h"

#if USE_MYSQL

#    include <mutex>
#    include <Core/MySQL/MySQLClient.h>
#    include <QueryPipeline/BlockIO.h>
#    include <DataTypes/DataTypeString.h>
#    include <DataTypes/DataTypesNumber.h>
#    include <Databases/DatabaseOrdinary.h>
#    include <Databases/IDatabase.h>
#    include <Databases/MySQL/MaterializeMetadata.h>
#    include <Databases/MySQL/MaterializedMySQLSettings.h>
#    include <Parsers/ASTCreateQuery.h>
#    error #include <mysqlxx/Pool.h>
#    error #include <mysqlxx/PoolWithFailover.h>


namespace DB
{

/** MySQL table structure and data synchronization thread
 *
 *  When catch exception, it always exits immediately.
 *  In this case, you need to execute DETACH DATABASE and ATTACH DATABASE after manual processing
 *
 *  The whole work of the thread includes synchronous full data and real-time pull incremental data
 *
 *  synchronous full data:
 *      We will synchronize the full data when the database is first create or not found binlog file in MySQL after restart.
 *
 *  real-time pull incremental data:
 *      We will pull the binlog event of MySQL to parse and execute when the full data synchronization is completed.
 */
class MaterializedMySQLSyncThread : WithContext
{
public:
    ~MaterializedMySQLSyncThread();

    MaterializedMySQLSyncThread(
        ContextPtr context,
        const String & database_name_,
        const String & mysql_database_name_,
        mysqlxx::Pool && pool_,
        MySQLClient && client_,
        MaterializedMySQLSettings * settings_);

    void stopSynchronization();

    void startSynchronization();

    void assertMySQLAvailable();

private:
    Poco::Logger * log;

    String database_name;
    String mysql_database_name;

    mutable mysqlxx::Pool pool;
    mutable MySQLClient client;
    MaterializedMySQLSettings * settings;
    String query_prefix;
    NameSet materialized_tables_list;

    // USE MySQL ERROR CODE:
    // https://dev.mysql.com/doc/mysql-errors/5.7/en/server-error-reference.html
    const int ER_ACCESS_DENIED_ERROR = 1045; /// NOLINT
    const int ER_DBACCESS_DENIED_ERROR = 1044; /// NOLINT
    const int ER_BAD_DB_ERROR = 1049; /// NOLINT

    // https://dev.mysql.com/doc/mysql-errors/8.0/en/client-error-reference.html
    const int CR_SERVER_LOST = 2013; /// NOLINT

    struct Buffers
    {
        String database;

        /// thresholds
        size_t max_block_rows = 0;
        size_t max_block_bytes = 0;
        size_t total_blocks_rows = 0;
        size_t total_blocks_bytes = 0;

        using BufferAndSortingColumns = std::pair<Block, std::vector<size_t>>;
        using BufferAndSortingColumnsPtr = std::shared_ptr<BufferAndSortingColumns>;
        std::unordered_map<String, BufferAndSortingColumnsPtr> data;

        explicit Buffers(const String & database_) : database(database_) {}

        void commit(ContextPtr context);

        void add(size_t block_rows, size_t block_bytes, size_t written_rows, size_t written_bytes);

        bool checkThresholds(size_t check_block_rows, size_t check_block_bytes, size_t check_total_rows, size_t check_total_bytes) const;

        BufferAndSortingColumnsPtr getTableDataBuffer(const String & table, ContextPtr context);
    };

    void synchronization();

    bool isCancelled() { return sync_quit.load(std::memory_order_relaxed); }

    bool prepareSynchronized(MaterializeMetadata & metadata);

    void flushBuffersData(Buffers & buffers, MaterializeMetadata & metadata);

    void onEvent(Buffers & buffers, const MySQLReplication::BinlogEventPtr & event, MaterializeMetadata & metadata);

    std::atomic<bool> sync_quit{false};
    std::unique_ptr<ThreadFromGlobalPool> background_thread_pool;
    void executeDDLAtomic(const QueryEvent & query_event);

    void setSynchronizationThreadException(const std::exception_ptr & exception);
};

}

#endif