aboutsummaryrefslogblamecommitdiffstats
path: root/contrib/clickhouse/src/Databases/MySQL/MaterializeMetadata.cpp
blob: 51951ddbc9fa7bb2b8029c01e8f2417b11a2d5f3 (plain) (tree)




























































































































































































































































































































                                                                                                                                                                
#include <Databases/MySQL/MaterializeMetadata.h>

#if USE_MYSQL

#include <Core/Block.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypesNumber.h>
#error #include <Processors/Sources/MySQLSource.h>
#include <Processors/Executors/PullingPipelineExecutor.h>
#include <QueryPipeline/QueryPipelineBuilder.h>
#include <IO/ReadBufferFromFile.h>
#include <IO/WriteBufferFromFile.h>
#include <Common/quoteString.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteHelpers.h>
#include <IO/Operators.h>
#include <filesystem>
#include <base/FnTraits.h>

namespace fs = std::filesystem;

namespace DB
{

namespace ErrorCodes
{
    extern const int LOGICAL_ERROR;
    extern const int SYNC_MYSQL_USER_ACCESS_ERROR;
}

static std::unordered_map<String, String> fetchTablesCreateQuery(
    const mysqlxx::PoolWithFailover::Entry & connection, const String & database_name,
    const std::vector<String> & fetch_tables, std::unordered_set<String> & materialized_tables_list,
    const Settings & global_settings)
{
    std::unordered_map<String, String> tables_create_query;
    for (const auto & fetch_table_name : fetch_tables)
    {
        if (!materialized_tables_list.empty() && !materialized_tables_list.contains(fetch_table_name))
            continue;

        Block show_create_table_header{
            {std::make_shared<DataTypeString>(), "Table"},
            {std::make_shared<DataTypeString>(), "Create Table"},
        };

        StreamSettings mysql_input_stream_settings(global_settings, false, true);
        auto show_create_table = std::make_unique<MySQLSource>(
            connection, "SHOW CREATE TABLE " + backQuoteIfNeed(database_name) + "." + backQuoteIfNeed(fetch_table_name),
            show_create_table_header, mysql_input_stream_settings);

        QueryPipeline pipeline(std::move(show_create_table));

        Block create_query_block;
        PullingPipelineExecutor executor(pipeline);
        executor.pull(create_query_block);
        if (!create_query_block || create_query_block.rows() != 1)
            throw Exception(ErrorCodes::LOGICAL_ERROR, "LOGICAL ERROR mysql show create return more rows.");

        tables_create_query[fetch_table_name] = create_query_block.getByName("Create Table").column->getDataAt(0).toString();
    }

    return tables_create_query;
}


static std::vector<String> fetchTablesInDB(const mysqlxx::PoolWithFailover::Entry & connection, const std::string & database, const Settings & global_settings)
{
    Block header{{std::make_shared<DataTypeString>(), "table_name"}};
    String query = "SELECT TABLE_NAME AS table_name FROM INFORMATION_SCHEMA.TABLES  WHERE TABLE_TYPE != 'VIEW' AND TABLE_SCHEMA = " + quoteString(database);

    std::vector<String> tables_in_db;
    StreamSettings mysql_input_stream_settings(global_settings);
    auto input = std::make_unique<MySQLSource>(connection, query, header, mysql_input_stream_settings);

    QueryPipeline pipeline(std::move(input));

    Block block;
    PullingPipelineExecutor executor(pipeline);
    while (executor.pull(block))
    {
        tables_in_db.reserve(tables_in_db.size() + block.rows());
        for (size_t index = 0; index < block.rows(); ++index)
            tables_in_db.emplace_back((*block.getByPosition(0).column)[index].safeGet<String>());
    }

    return tables_in_db;
}

void MaterializeMetadata::fetchMasterStatus(mysqlxx::PoolWithFailover::Entry & connection)
{
    Block header{
        {std::make_shared<DataTypeString>(), "File"},
        {std::make_shared<DataTypeUInt64>(), "Position"},
        {std::make_shared<DataTypeString>(), "Binlog_Do_DB"},
        {std::make_shared<DataTypeString>(), "Binlog_Ignore_DB"},
        {std::make_shared<DataTypeString>(), "Executed_Gtid_Set"},
    };

    StreamSettings mysql_input_stream_settings(settings, false, true);
    auto input = std::make_unique<MySQLSource>(connection, "SHOW MASTER STATUS;", header, mysql_input_stream_settings);

    QueryPipeline pipeline(std::move(input));

    Block master_status;
    PullingPipelineExecutor executor(pipeline);
    executor.pull(master_status);

    if (!master_status || master_status.rows() != 1)
        throw Exception(ErrorCodes::LOGICAL_ERROR, "Unable to get master status from MySQL.");

    data_version = 1;
    binlog_file = (*master_status.getByPosition(0).column)[0].safeGet<String>();
    binlog_position = (*master_status.getByPosition(1).column)[0].safeGet<UInt64>();
    binlog_do_db = (*master_status.getByPosition(2).column)[0].safeGet<String>();
    binlog_ignore_db = (*master_status.getByPosition(3).column)[0].safeGet<String>();
    executed_gtid_set = (*master_status.getByPosition(4).column)[0].safeGet<String>();
}

void MaterializeMetadata::fetchMasterVariablesValue(const mysqlxx::PoolWithFailover::Entry & connection)
{
    Block variables_header{
        {std::make_shared<DataTypeString>(), "Variable_name"},
        {std::make_shared<DataTypeString>(), "Value"}
    };

    const String & fetch_query = "SHOW VARIABLES WHERE Variable_name = 'binlog_checksum'";
    StreamSettings mysql_input_stream_settings(settings, false, true);
    auto variables_input = std::make_unique<MySQLSource>(connection, fetch_query, variables_header, mysql_input_stream_settings);
    QueryPipeline pipeline(std::move(variables_input));

    Block variables_block;
    PullingPipelineExecutor executor(pipeline);
    while (executor.pull(variables_block))
    {
        ColumnPtr variables_name = variables_block.getByName("Variable_name").column;
        ColumnPtr variables_value = variables_block.getByName("Value").column;

        for (size_t index = 0; index < variables_block.rows(); ++index)
        {
            if (variables_name->getDataAt(index) == "binlog_checksum")
                binlog_checksum = variables_value->getDataAt(index).toString();
        }
    }
}

static bool checkSyncUserPrivImpl(const mysqlxx::PoolWithFailover::Entry & connection, const Settings & global_settings, WriteBuffer & out)
{
    Block sync_user_privs_header
    {
        {std::make_shared<DataTypeString>(), "current_user_grants"}
    };

    String grants_query, sub_privs;
    StreamSettings mysql_input_stream_settings(global_settings);
    auto input = std::make_unique<MySQLSource>(connection, "SHOW GRANTS FOR CURRENT_USER();", sync_user_privs_header, mysql_input_stream_settings);
    QueryPipeline pipeline(std::move(input));

    Block block;
    PullingPipelineExecutor executor(pipeline);
    while (executor.pull(block))
    {
        for (size_t index = 0; index < block.rows(); ++index)
        {
            grants_query = (*block.getByPosition(0).column)[index].safeGet<String>();
            out << grants_query << "; ";
            sub_privs = grants_query.substr(0, grants_query.find(" ON "));
            if (sub_privs.find("ALL PRIVILEGES") == std::string::npos)
            {
                if ((sub_privs.find("RELOAD") != std::string::npos and
                    sub_privs.find("REPLICATION SLAVE") != std::string::npos and
                    sub_privs.find("REPLICATION CLIENT") != std::string::npos))
                    return true;
            }
            else
            {
                return true;
            }
        }
    }
    return false;
}

static void checkSyncUserPriv(const mysqlxx::PoolWithFailover::Entry & connection, const Settings & global_settings)
{
    WriteBufferFromOwnString out;

    if (!checkSyncUserPrivImpl(connection, global_settings, out))
        throw Exception(ErrorCodes::SYNC_MYSQL_USER_ACCESS_ERROR, "MySQL SYNC USER ACCESS ERR: "
                        "mysql sync user needs at least GLOBAL PRIVILEGES:'RELOAD, REPLICATION SLAVE, REPLICATION CLIENT' "
                        "and SELECT PRIVILEGE on MySQL Database.But the SYNC USER grant query is: {}", out.str());
}

bool MaterializeMetadata::checkBinlogFileExists(const mysqlxx::PoolWithFailover::Entry & connection) const
{
    if (binlog_file.empty())
        return false;

    Block logs_header {
        {std::make_shared<DataTypeString>(), "Log_name"},
        {std::make_shared<DataTypeUInt64>(), "File_size"}
    };

    StreamSettings mysql_input_stream_settings(settings, false, true);
    auto input = std::make_unique<MySQLSource>(connection, "SHOW MASTER LOGS", logs_header, mysql_input_stream_settings);
    QueryPipeline pipeline(std::move(input));

    Block block;
    PullingPipelineExecutor executor(pipeline);
    while (executor.pull(block))
    {
        for (size_t index = 0; index < block.rows(); ++index)
        {
            const auto log_name = (*block.getByPosition(0).column)[index].safeGet<String>();
            if (log_name == binlog_file)
                return true;
        }
    }
    return false;
}

void commitMetadata(Fn<void()> auto && function, const String & persistent_tmp_path, const String & persistent_path)
{
    try
    {
        function();
        fs::rename(persistent_tmp_path, persistent_path);
    }
    catch (...)
    {
        fs::remove(persistent_tmp_path);
        throw;
    }
}

void MaterializeMetadata::transaction(const MySQLReplication::Position & position, const std::function<void()> & fun)
{
    binlog_file = position.binlog_name;
    binlog_position = position.binlog_pos;
    executed_gtid_set = position.gtid_sets.toString();

    String persistent_tmp_path = persistent_path + ".tmp";

    {
        WriteBufferFromFile out(persistent_tmp_path, DBMS_DEFAULT_BUFFER_SIZE, O_WRONLY | O_TRUNC | O_CREAT);

        /// TSV format metadata file.
        writeString("Version:\t" + toString(meta_version), out);
        writeString("\nBinlog File:\t" + binlog_file, out);
        writeString("\nExecuted GTID:\t" + executed_gtid_set, out);
        writeString("\nBinlog Position:\t" + toString(binlog_position), out);
        writeString("\nData Version:\t" + toString(data_version), out);

        out.next();
        out.sync();
        out.close();
    }

    commitMetadata(fun, persistent_tmp_path, persistent_path);
}

MaterializeMetadata::MaterializeMetadata(const String & path_, const Settings & settings_) : persistent_path(path_), settings(settings_)
{
    if (fs::exists(persistent_path))
    {
        ReadBufferFromFile in(persistent_path, DBMS_DEFAULT_BUFFER_SIZE);
        assertString("Version:\t" + toString(meta_version), in);
        assertString("\nBinlog File:\t", in);
        readString(binlog_file, in);
        assertString("\nExecuted GTID:\t", in);
        readString(executed_gtid_set, in);
        assertString("\nBinlog Position:\t", in);
        readIntText(binlog_position, in);
        assertString("\nData Version:\t", in);
        readIntText(data_version, in);

    }
}

void MaterializeMetadata::startReplication(
    mysqlxx::PoolWithFailover::Entry & connection, const String & database,
    bool & opened_transaction, std::unordered_map<String, String> & need_dumping_tables,
    std::unordered_set<String> & materialized_tables_list)
{
    checkSyncUserPriv(connection, settings);

    if (checkBinlogFileExists(connection))
      return;

    bool locked_tables = false;

    try
    {
        connection->query("FLUSH TABLES;").execute();
        connection->query("FLUSH TABLES WITH READ LOCK;").execute();

        locked_tables = true;
        fetchMasterStatus(connection);
        fetchMasterVariablesValue(connection);
        connection->query("SET SESSION TRANSACTION ISOLATION LEVEL REPEATABLE READ;").execute();
        connection->query("START TRANSACTION /*!40100 WITH CONSISTENT SNAPSHOT */;").execute();

        opened_transaction = true;
        need_dumping_tables = fetchTablesCreateQuery(connection, database, fetchTablesInDB(connection, database, settings), materialized_tables_list, settings);
        connection->query("UNLOCK TABLES;").execute();
    }
    catch (...)
    {
        if (locked_tables)
            connection->query("UNLOCK TABLES;").execute();

        throw;
    }
}

}

#endif