aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/clickhouse/src/Storages/MergeTree/ReplicatedMergeTreeAttachThread.cpp
blob: 65cf8bbce725ddb8dfdc6be45a9687a9ad937909 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
#include <Storages/MergeTree/ReplicatedMergeTreeAttachThread.h>
#include <Storages/StorageReplicatedMergeTree.h>
#include <Common/ZooKeeper/IKeeper.h>

namespace DB
{

namespace ErrorCodes
{
    extern const int SUPPORT_IS_DISABLED;
    extern const int REPLICA_STATUS_CHANGED;
}

ReplicatedMergeTreeAttachThread::ReplicatedMergeTreeAttachThread(StorageReplicatedMergeTree & storage_)
    : storage(storage_)
    , log_name(storage.getStorageID().getFullTableName() + " (ReplicatedMergeTreeAttachThread)")
    , log(&Poco::Logger::get(log_name))
{
    task = storage.getContext()->getSchedulePool().createTask(log_name, [this] { run(); });
    const auto storage_settings = storage.getSettings();
    retry_period = storage_settings->initialization_retry_period.totalSeconds();
}

ReplicatedMergeTreeAttachThread::~ReplicatedMergeTreeAttachThread()
{
    shutdown();
}

void ReplicatedMergeTreeAttachThread::shutdown()
{
    if (!shutdown_called.exchange(true))
    {
        task->deactivate();
        LOG_INFO(log, "Attach thread finished");
    }
}

void ReplicatedMergeTreeAttachThread::run()
{
    bool needs_retry{false};
    try
    {
        // we delay the first reconnect if the storage failed to connect to ZK initially
        if (!first_try_done && !storage.current_zookeeper)
        {
            needs_retry = true;
        }
        else
        {
            runImpl();
            finalizeInitialization();
        }
    }
    catch (const Exception & e)
    {
        if (const auto * coordination_exception = dynamic_cast<const Coordination::Exception *>(&e))
            needs_retry = Coordination::isHardwareError(coordination_exception->code);
        else if (e.code() == ErrorCodes::REPLICA_STATUS_CHANGED)
            needs_retry = true;

        if (needs_retry)
        {
            LOG_ERROR(log, "Initialization failed. Error: {}", getCurrentExceptionMessage(/* with_stacktrace */ true));
        }
        else
        {
            LOG_ERROR(log, "Initialization failed, table will remain readonly. Error: {}", getCurrentExceptionMessage(/* with_stacktrace */ true));
            storage.initialization_done = true;
        }
    }

    if (!first_try_done.exchange(true))
        first_try_done.notify_one();

    if (shutdown_called)
    {
        LOG_WARNING(log, "Shutdown called, cancelling initialization");
        return;
    }

    if (needs_retry)
    {
        LOG_INFO(log, "Will retry initialization in {}s", retry_period);
        task->scheduleAfter(retry_period * 1000);
    }
}

void ReplicatedMergeTreeAttachThread::checkHasReplicaMetadataInZooKeeper(const zkutil::ZooKeeperPtr & zookeeper, const String & replica_path)
{
    /// Since 20.4 and until 22.9 "/metadata" node was created on replica startup and "/metadata_version" was created on ALTER.
    /// Since 21.12 we could use "/metadata" to check if replica is dropped (see StorageReplicatedMergeTree::dropReplica),
    /// but it did not work correctly, because "/metadata" node was re-created on server startup.
    /// Since 22.9 we do not recreate these nodes and use "/host" to check if replica is dropped.

    String replica_metadata;
    const bool replica_metadata_exists = zookeeper->tryGet(replica_path + "/metadata", replica_metadata);
    if (!replica_metadata_exists || replica_metadata.empty())
    {
        throw Exception(ErrorCodes::SUPPORT_IS_DISABLED, "Upgrade from 20.3 and older to 22.9 and newer "
                        "should be done through an intermediate version (failed to get metadata or metadata_version for {},"
                        "assuming it's because of upgrading)", replica_path);
    }
}

void ReplicatedMergeTreeAttachThread::runImpl()
{
    storage.setZooKeeper();

    auto zookeeper = storage.getZooKeeper();
    const auto & zookeeper_path = storage.zookeeper_path;
    bool metadata_exists = zookeeper->exists(zookeeper_path + "/metadata");
    if (!metadata_exists)
    {
        LOG_WARNING(log, "No metadata in ZooKeeper for {}: table will stay in readonly mode.", zookeeper_path);
        storage.has_metadata_in_zookeeper = false;
        return;
    }

    auto metadata_snapshot = storage.getInMemoryMetadataPtr();

    const auto & replica_path = storage.replica_path;
    /// May it be ZK lost not the whole root, so the upper check passed, but only the /replicas/replica
    /// folder.
    bool replica_path_exists = zookeeper->exists(replica_path);
    if (!replica_path_exists)
    {
        LOG_WARNING(log, "No metadata in ZooKeeper for {}: table will stay in readonly mode", replica_path);
        storage.has_metadata_in_zookeeper = false;
        return;
    }

    bool host_node_exists = zookeeper->exists(replica_path + "/host");
    if (!host_node_exists)
    {
        LOG_WARNING(log, "Replica {} is dropped (but metadata is not completely removed from ZooKeeper), "
                         "table will stay in readonly mode", replica_path);
        storage.has_metadata_in_zookeeper = false;
        return;
    }

    storage.has_metadata_in_zookeeper = true;

    checkHasReplicaMetadataInZooKeeper(zookeeper, replica_path);

    /// Just in case it was not removed earlier due to connection loss
    zookeeper->tryRemove(replica_path + "/flags/force_restore_data");

    String replica_metadata_version;
    const bool replica_metadata_version_exists = zookeeper->tryGet(replica_path + "/metadata_version", replica_metadata_version);
    if (replica_metadata_version_exists)
    {
        storage.setInMemoryMetadata(metadata_snapshot->withMetadataVersion(parse<int>(replica_metadata_version)));
    }
    else
    {
        /// Table was created before 20.4 and was never altered,
        /// let's initialize replica metadata version from global metadata version.
        Coordination::Stat table_metadata_version_stat;
        zookeeper->get(zookeeper_path + "/metadata", &table_metadata_version_stat);

        Coordination::Requests ops;
        ops.emplace_back(zkutil::makeCheckRequest(zookeeper_path + "/metadata", table_metadata_version_stat.version));
        ops.emplace_back(zkutil::makeCreateRequest(replica_path + "/metadata_version", toString(table_metadata_version_stat.version), zkutil::CreateMode::Persistent));

        Coordination::Responses res;
        auto code = zookeeper->tryMulti(ops, res);

        if (code == Coordination::Error::ZBADVERSION)
            throw Exception(ErrorCodes::REPLICA_STATUS_CHANGED, "Failed to initialize metadata_version "
                                                                "because table was concurrently altered, will retry");

        zkutil::KeeperMultiException::check(code, ops, res);
    }

    storage.checkTableStructure(replica_path, metadata_snapshot);
    storage.checkParts(skip_sanity_checks);

    /// Temporary directories contain uninitialized results of Merges or Fetches (after forced restart),
    /// don't allow to reinitialize them, delete each of them immediately.
    storage.clearOldTemporaryDirectories(0, {"tmp_", "delete_tmp_", "tmp-fetch_"});
    storage.clearOldWriteAheadLogs();

    storage.createNewZooKeeperNodes();
    storage.syncPinnedPartUUIDs();

    std::lock_guard lock(storage.table_shared_id_mutex);
    storage.createTableSharedID();
};

void ReplicatedMergeTreeAttachThread::finalizeInitialization() TSA_NO_THREAD_SAFETY_ANALYSIS
{
    storage.startupImpl(/* from_attach_thread */ true);
    storage.initialization_done = true;
    LOG_INFO(log, "Table is initialized");
}

void ReplicatedMergeTreeAttachThread::setSkipSanityChecks(bool skip_sanity_checks_)
{
    skip_sanity_checks = skip_sanity_checks_;
}

}