diff options
author | vitalyisaev <vitalyisaev@ydb.tech> | 2023-11-14 09:58:56 +0300 |
---|---|---|
committer | vitalyisaev <vitalyisaev@ydb.tech> | 2023-11-14 10:20:20 +0300 |
commit | c2b2dfd9827a400a8495e172a56343462e3ceb82 (patch) | |
tree | cd4e4f597d01bede4c82dffeb2d780d0a9046bd0 /contrib/clickhouse/src/Storages/MergeTree/ReplicatedMergeTreeRestartingThread.cpp | |
parent | d4ae8f119e67808cb0cf776ba6e0cf95296f2df7 (diff) | |
download | ydb-c2b2dfd9827a400a8495e172a56343462e3ceb82.tar.gz |
YQ Connector: move tests from yql to ydb (OSS)
Перенос папки с тестами на Коннектор из папки yql в папку ydb (синхронизируется с github).
Diffstat (limited to 'contrib/clickhouse/src/Storages/MergeTree/ReplicatedMergeTreeRestartingThread.cpp')
-rw-r--r-- | contrib/clickhouse/src/Storages/MergeTree/ReplicatedMergeTreeRestartingThread.cpp | 394 |
1 files changed, 394 insertions, 0 deletions
diff --git a/contrib/clickhouse/src/Storages/MergeTree/ReplicatedMergeTreeRestartingThread.cpp b/contrib/clickhouse/src/Storages/MergeTree/ReplicatedMergeTreeRestartingThread.cpp new file mode 100644 index 0000000000..79054ef46d --- /dev/null +++ b/contrib/clickhouse/src/Storages/MergeTree/ReplicatedMergeTreeRestartingThread.cpp @@ -0,0 +1,394 @@ +#include <IO/Operators.h> +#include <Storages/StorageReplicatedMergeTree.h> +#include <Storages/MergeTree/ReplicatedMergeTreeRestartingThread.h> +#include <Storages/MergeTree/ReplicatedMergeTreeQuorumEntry.h> +#include <Storages/MergeTree/ReplicatedMergeTreeAddress.h> +#include <Interpreters/Context.h> +#include <Common/ZooKeeper/KeeperException.h> +#include <Common/randomSeed.h> +#include <Core/ServerUUID.h> +#include <boost/algorithm/string/replace.hpp> + + +namespace CurrentMetrics +{ + extern const Metric ReadonlyReplica; +} + + +namespace DB +{ + +namespace ErrorCodes +{ + extern const int REPLICA_IS_ALREADY_ACTIVE; +} + +/// Used to check whether it's us who set node `is_active`, or not. +static String generateActiveNodeIdentifier() +{ + return Field(ServerUUID::get()).dump(); +} + +ReplicatedMergeTreeRestartingThread::ReplicatedMergeTreeRestartingThread(StorageReplicatedMergeTree & storage_) + : storage(storage_) + , log_name(storage.getStorageID().getFullTableName() + " (ReplicatedMergeTreeRestartingThread)") + , log(&Poco::Logger::get(log_name)) + , active_node_identifier(generateActiveNodeIdentifier()) +{ + const auto storage_settings = storage.getSettings(); + check_period_ms = storage_settings->zookeeper_session_expiration_check_period.totalSeconds() * 1000; + + task = storage.getContext()->getSchedulePool().createTask(log_name, [this]{ run(); }); +} + +void ReplicatedMergeTreeRestartingThread::run() +{ + if (need_stop) + return; + + /// In case of any exceptions we want to rerun the this task as fast as possible but we also don't want to keep retrying immediately + /// in a close loop (as fast as tasks can be processed), so we'll retry in between 100 and 10000 ms + const size_t backoff_ms = 100 * ((consecutive_check_failures + 1) * (consecutive_check_failures + 2)) / 2; + const size_t next_failure_retry_ms = std::min(size_t{10000}, backoff_ms); + + try + { + bool replica_is_active = runImpl(); + if (replica_is_active) + { + consecutive_check_failures = 0; + task->scheduleAfter(check_period_ms); + } + else + { + consecutive_check_failures++; + task->scheduleAfter(next_failure_retry_ms); + } + } + catch (...) + { + consecutive_check_failures++; + task->scheduleAfter(next_failure_retry_ms); + + /// We couldn't activate table let's set it into readonly mode if necessary + /// We do this after scheduling the task in case it throws + partialShutdown(); + tryLogCurrentException(log, "Failed to restart the table. Will try again"); + } + + if (first_time) + { + if (storage.is_readonly) + { + /// We failed to start replication, table is still readonly, so we should increment the metric. See also setNotReadonly(). + CurrentMetrics::add(CurrentMetrics::ReadonlyReplica); + } + /// It does not matter if replication is actually started or not, just notify after the first attempt. + storage.startup_event.set(); + first_time = false; + } +} + +bool ReplicatedMergeTreeRestartingThread::runImpl() +{ + if (!storage.is_readonly && !storage.getZooKeeper()->expired()) + return true; + + if (first_time) + { + LOG_DEBUG(log, "Activating replica."); + assert(storage.is_readonly); + } + else if (storage.is_readonly) + { + LOG_WARNING(log, "Table was in readonly mode. Will try to activate it."); + } + else if (storage.getZooKeeper()->expired()) + { + LOG_WARNING(log, "ZooKeeper session has expired. Switching to a new session."); + partialShutdown(); + } + else + { + UNREACHABLE(); + } + + try + { + storage.setZooKeeper(); + } + catch (const Coordination::Exception &) + { + /// The exception when you try to zookeeper_init usually happens if DNS does not work or the connection with ZK fails + tryLogCurrentException(log, "Failed to establish a new ZK connection. Will try again"); + assert(storage.is_readonly); + return false; + } + + if (need_stop) + return false; + + if (!tryStartup()) + { + assert(storage.is_readonly); + return false; + } + + setNotReadonly(); + + /// Start queue processing + storage.background_operations_assignee.start(); + storage.queue_updating_task->activateAndSchedule(); + storage.mutations_updating_task->activateAndSchedule(); + storage.mutations_finalizing_task->activateAndSchedule(); + storage.merge_selecting_task->activateAndSchedule(); + storage.cleanup_thread.start(); + storage.async_block_ids_cache.start(); + storage.part_check_thread.start(); + + LOG_DEBUG(log, "Table started successfully"); + return true; +} + + +bool ReplicatedMergeTreeRestartingThread::tryStartup() +{ + LOG_DEBUG(log, "Trying to start replica up"); + try + { + removeFailedQuorumParts(); + activateReplica(); + + const auto & zookeeper = storage.getZooKeeper(); + const auto storage_settings = storage.getSettings(); + + storage.cloneReplicaIfNeeded(zookeeper); + + try + { + storage.queue.initialize(zookeeper); + storage.queue.load(zookeeper); + storage.queue.createLogEntriesToFetchBrokenParts(); + + /// pullLogsToQueue() after we mark replica 'is_active' (and after we repair if it was lost); + /// because cleanup_thread doesn't delete log_pointer of active replicas. + storage.queue.pullLogsToQueue(zookeeper, {}, ReplicatedMergeTreeQueue::LOAD); + } + catch (...) + { + std::lock_guard lock(storage.last_queue_update_exception_lock); + storage.last_queue_update_exception = getCurrentExceptionMessage(false); + throw; + } + + storage.queue.removeCurrentPartsFromMutations(); + storage.last_queue_update_finish_time.store(time(nullptr)); + + updateQuorumIfWeHavePart(); + + /// Anything above can throw a KeeperException if something is wrong with ZK. + /// Anything below should not throw exceptions. + + storage.partial_shutdown_called = false; + storage.partial_shutdown_event.reset(); + return true; + } + catch (...) + { + storage.replica_is_active_node = nullptr; + + try + { + throw; + } + catch (const Coordination::Exception & e) + { + LOG_ERROR(log, "Couldn't start replication (table will be in readonly mode): {}. {}", e.what(), DB::getCurrentExceptionMessage(true)); + return false; + } + catch (const Exception & e) + { + if (e.code() != ErrorCodes::REPLICA_IS_ALREADY_ACTIVE) + throw; + + LOG_ERROR(log, "Couldn't start replication (table will be in readonly mode): {}. {}", e.what(), DB::getCurrentExceptionMessage(true)); + return false; + } + } +} + + +void ReplicatedMergeTreeRestartingThread::removeFailedQuorumParts() +{ + auto zookeeper = storage.getZooKeeper(); + + Strings failed_parts; + if (zookeeper->tryGetChildren(storage.zookeeper_path + "/quorum/failed_parts", failed_parts) != Coordination::Error::ZOK) + return; + + /// Firstly, remove parts from ZooKeeper + storage.removePartsFromZooKeeperWithRetries(failed_parts); + + for (const auto & part_name : failed_parts) + { + auto part = storage.getPartIfExists( + part_name, {MergeTreeDataPartState::PreActive, MergeTreeDataPartState::Active, MergeTreeDataPartState::Outdated}); + + if (part) + { + LOG_DEBUG(log, "Found part {} with failed quorum. Moving to detached. This shouldn't happen often.", part_name); + storage.forcefullyMovePartToDetachedAndRemoveFromMemory(part, "noquorum"); + storage.queue.removeFailedQuorumPart(part->info); + } + } +} + + +void ReplicatedMergeTreeRestartingThread::updateQuorumIfWeHavePart() +{ + auto zookeeper = storage.getZooKeeper(); + + String quorum_str; + if (zookeeper->tryGet(fs::path(storage.zookeeper_path) / "quorum" / "status", quorum_str)) + { + ReplicatedMergeTreeQuorumEntry quorum_entry(quorum_str); + + if (!quorum_entry.replicas.contains(storage.replica_name) + && storage.getActiveContainingPart(quorum_entry.part_name)) + { + LOG_WARNING(log, "We have part {} but we is not in quorum. Updating quorum. This shouldn't happen often.", quorum_entry.part_name); + storage.updateQuorum(quorum_entry.part_name, false); + } + } + + Strings part_names; + String parallel_quorum_parts_path = fs::path(storage.zookeeper_path) / "quorum" / "parallel"; + if (zookeeper->tryGetChildren(parallel_quorum_parts_path, part_names) == Coordination::Error::ZOK) + { + for (auto & part_name : part_names) + { + if (zookeeper->tryGet(fs::path(parallel_quorum_parts_path) / part_name, quorum_str)) + { + ReplicatedMergeTreeQuorumEntry quorum_entry(quorum_str); + if (!quorum_entry.replicas.contains(storage.replica_name) + && storage.getActiveContainingPart(part_name)) + { + LOG_WARNING(log, "We have part {} but we is not in quorum. Updating quorum. This shouldn't happen often.", part_name); + storage.updateQuorum(part_name, true); + } + } + } + } +} + + +void ReplicatedMergeTreeRestartingThread::activateReplica() +{ + auto zookeeper = storage.getZooKeeper(); + + /// How other replicas can access this one. + ReplicatedMergeTreeAddress address = storage.getReplicatedMergeTreeAddress(); + + String is_active_path = fs::path(storage.replica_path) / "is_active"; + zookeeper->handleEphemeralNodeExistence(is_active_path, active_node_identifier); + + /// Simultaneously declare that this replica is active, and update the host. + Coordination::Requests ops; + ops.emplace_back(zkutil::makeCreateRequest(is_active_path, active_node_identifier, zkutil::CreateMode::Ephemeral)); + ops.emplace_back(zkutil::makeSetRequest(fs::path(storage.replica_path) / "host", address.toString(), -1)); + + try + { + zookeeper->multi(ops); + } + catch (const Coordination::Exception & e) + { + String existing_replica_host; + zookeeper->tryGet(fs::path(storage.replica_path) / "host", existing_replica_host); + + if (existing_replica_host.empty()) + existing_replica_host = "without host node"; + else + boost::replace_all(existing_replica_host, "\n", ", "); + + if (e.code == Coordination::Error::ZNODEEXISTS) + throw Exception(ErrorCodes::REPLICA_IS_ALREADY_ACTIVE, + "Replica {} appears to be already active ({}). If you're sure it's not, " + "try again in a minute or remove znode {}/is_active manually", + storage.replica_path, existing_replica_host, storage.replica_path); + + throw; + } + + /// `current_zookeeper` lives for the lifetime of `replica_is_active_node`, + /// since before changing `current_zookeeper`, `replica_is_active_node` object is destroyed in `partialShutdown` method. + storage.replica_is_active_node = zkutil::EphemeralNodeHolder::existing(is_active_path, *storage.current_zookeeper); +} + + +void ReplicatedMergeTreeRestartingThread::partialShutdown(bool part_of_full_shutdown) +{ + setReadonly(/* on_shutdown = */ part_of_full_shutdown); + storage.partialShutdown(); +} + + +void ReplicatedMergeTreeRestartingThread::shutdown(bool part_of_full_shutdown) +{ + /// Stop restarting_thread before stopping other tasks - so that it won't restart them again. + need_stop = true; + task->deactivate(); + + /// Explicitly set the event, because the restarting thread will not set it again + if (part_of_full_shutdown) + storage.startup_event.set(); + + LOG_TRACE(log, "Restarting thread finished"); + + setReadonly(part_of_full_shutdown); + +} + +void ReplicatedMergeTreeRestartingThread::setReadonly(bool on_shutdown) +{ + bool old_val = false; + bool became_readonly = storage.is_readonly.compare_exchange_strong(old_val, true); + + /// Do not increment the metric if replica became readonly due to shutdown. + if (became_readonly && on_shutdown) + return; + + if (became_readonly) + CurrentMetrics::add(CurrentMetrics::ReadonlyReplica); + + /// Replica was already readonly, but we should decrement the metric, because we are detaching/dropping table. + /// if first pass wasn't done we don't have to decrement because it wasn't incremented in the first place + /// the task should be deactivated if it's full shutdown so no race is present + if (!first_time && on_shutdown) + { + CurrentMetrics::sub(CurrentMetrics::ReadonlyReplica); + assert(CurrentMetrics::get(CurrentMetrics::ReadonlyReplica) >= 0); + } + + if (storage.since_metadata_err_incr_readonly_metric) + { + CurrentMetrics::sub(CurrentMetrics::ReadonlyReplica); + assert(CurrentMetrics::get(CurrentMetrics::ReadonlyReplica) >= 0); + } +} + +void ReplicatedMergeTreeRestartingThread::setNotReadonly() +{ + bool old_val = true; + /// is_readonly is true on startup, but ReadonlyReplica metric is not incremented, + /// because we don't want to change this metric if replication is started successfully. + /// So we should not decrement it when replica stopped being readonly on startup. + if (storage.is_readonly.compare_exchange_strong(old_val, false) && !first_time) + { + CurrentMetrics::sub(CurrentMetrics::ReadonlyReplica); + assert(CurrentMetrics::get(CurrentMetrics::ReadonlyReplica) >= 0); + } +} + +} |