#include "StorageMergeTree.h" #include "Core/QueryProcessingStage.h" #include "Storages/MergeTree/IMergeTreeDataPart.h" #include <optional> #include <ranges> #include <base/sort.h> #include <Backups/BackupEntriesCollector.h> #include <Databases/IDatabase.h> #include <Common/MemoryTracker.h> #include <Common/escapeForFileName.h> #include <Common/ProfileEventsScope.h> #include <Common/typeid_cast.h> #include <Common/ThreadPool.h> #include <Interpreters/InterpreterAlterQuery.h> #include <Interpreters/PartLog.h> #include <Interpreters/MutationsInterpreter.h> #include <Interpreters/Context.h> #include <Interpreters/TransactionLog.h> #include <Interpreters/ClusterProxy/executeQuery.h> #include <Interpreters/ClusterProxy/SelectStreamFactory.h> #include <Interpreters/InterpreterSelectQueryAnalyzer.h> #include <IO/copyData.h> #include <Parsers/ASTCheckQuery.h> #include <Parsers/ASTFunction.h> #include <Parsers/ASTLiteral.h> #include <Parsers/ASTPartition.h> #include <Parsers/ASTSetQuery.h> #include <Parsers/queryToString.h> #include <Parsers/formatAST.h> #include <Storages/MergeTree/MergeTreeData.h> #include <Storages/MergeTree/ActiveDataPartSet.h> #include <Storages/AlterCommands.h> #include <Storages/PartitionCommands.h> #include <Storages/MergeTree/MergeTreeSink.h> #include <Storages/MergeTree/MergeTreeDataPartInMemory.h> #include <Storages/MergeTree/MergePlainMergeTreeTask.h> #include <Storages/MergeTree/PartitionPruner.h> #include <Storages/MergeTree/MergeList.h> #include <Storages/MergeTree/checkDataPart.h> #include <QueryPipeline/Pipe.h> #include <Processors/QueryPlan/QueryPlan.h> #include <Processors/QueryPlan/BuildQueryPipelineSettings.h> #include <Processors/QueryPlan/Optimizations/QueryPlanOptimizationSettings.h> #include <fmt/core.h> namespace DB { namespace ErrorCodes { extern const int NOT_IMPLEMENTED; extern const int LOGICAL_ERROR; extern const int NOT_ENOUGH_SPACE; extern const int BAD_ARGUMENTS; extern const int INCORRECT_DATA; extern const int CANNOT_ASSIGN_OPTIMIZE; extern const int TIMEOUT_EXCEEDED; extern const int UNKNOWN_POLICY; extern const int NO_SUCH_DATA_PART; extern const int ABORTED; } namespace ActionLocks { extern const StorageActionBlockType PartsMerge; extern const StorageActionBlockType PartsTTLMerge; extern const StorageActionBlockType PartsMove; } static MergeTreeTransactionPtr tryGetTransactionForMutation(const MergeTreeMutationEntry & mutation, Poco::Logger * log = nullptr) { assert(!mutation.tid.isEmpty()); if (mutation.tid.isPrehistoric()) return {}; auto txn = TransactionLog::instance().tryGetRunningTransaction(mutation.tid.getHash()); if (txn) return txn; if (log) LOG_WARNING(log, "Cannot find transaction {} which had started mutation {}, probably it finished", mutation.tid, mutation.file_name); return {}; } StorageMergeTree::StorageMergeTree( const StorageID & table_id_, const String & relative_data_path_, const StorageInMemoryMetadata & metadata_, bool attach, ContextMutablePtr context_, const String & date_column_name, const MergingParams & merging_params_, std::unique_ptr<MergeTreeSettings> storage_settings_, bool has_force_restore_data_flag) : MergeTreeData( table_id_, metadata_, context_, date_column_name, merging_params_, std::move(storage_settings_), false, /// require_part_metadata attach) , reader(*this) , writer(*this) , merger_mutator(*this) { initializeDirectoriesAndFormatVersion(relative_data_path_, attach, date_column_name); loadDataParts(has_force_restore_data_flag); if (!attach && !getDataPartsForInternalUsage().empty() && !isStaticStorage()) throw Exception(ErrorCodes::INCORRECT_DATA, "Data directory for table already containing data parts - probably " "it was unclean DROP table or manual intervention. " "You must either clear directory by hand or use ATTACH TABLE instead " "of CREATE TABLE if you need to use that parts."); increment.set(getMaxBlockNumber()); loadMutations(); loadDeduplicationLog(); } void StorageMergeTree::startup() { clearOldWriteAheadLogs(); clearEmptyParts(); /// Temporary directories contain incomplete results of merges (after forced restart) /// and don't allow to reinitialize them, so delete each of them immediately clearOldTemporaryDirectories(0, {"tmp_", "delete_tmp_", "tmp-fetch_"}); /// NOTE background task will also do the above cleanups periodically. time_after_previous_cleanup_parts.restart(); time_after_previous_cleanup_temporary_directories.restart(); /// Do not schedule any background jobs if current storage has static data files. if (isStaticStorage()) return; try { background_operations_assignee.start(); startBackgroundMovesIfNeeded(); startOutdatedDataPartsLoadingTask(); } catch (...) { /// Exception safety: failed "startup" does not require a call to "shutdown" from the caller. /// And it should be able to safely destroy table after exception in "startup" method. /// It means that failed "startup" must not create any background tasks that we will have to wait. try { shutdown(); } catch (...) { std::terminate(); } /// Note: after failed "startup", the table will be in a state that only allows to destroy the object. throw; } } void StorageMergeTree::shutdown() { if (shutdown_called.exchange(true)) return; stopOutdatedDataPartsLoadingTask(); /// Unlock all waiting mutations { std::lock_guard lock(mutation_wait_mutex); mutation_wait_event.notify_all(); } merger_mutator.merges_blocker.cancelForever(); parts_mover.moves_blocker.cancelForever(); background_operations_assignee.finish(); background_moves_assignee.finish(); if (deduplication_log) deduplication_log->shutdown(); } StorageMergeTree::~StorageMergeTree() { shutdown(); } void StorageMergeTree::read( QueryPlan & query_plan, const Names & column_names, const StorageSnapshotPtr & storage_snapshot, SelectQueryInfo & query_info, ContextPtr local_context, QueryProcessingStage::Enum processed_stage, size_t max_block_size, size_t num_streams) { if (!query_info.parallel_replicas_disabled && local_context->canUseParallelReplicasOnInitiator() && local_context->getSettingsRef().parallel_replicas_for_non_replicated_merge_tree) { auto table_id = getStorageID(); const auto & modified_query_ast = ClusterProxy::rewriteSelectQuery( local_context, query_info.query, table_id.database_name, table_id.table_name, /*remote_table_function_ptr*/nullptr); String cluster_for_parallel_replicas = local_context->getSettingsRef().cluster_for_parallel_replicas; auto cluster = local_context->getCluster(cluster_for_parallel_replicas); Block header; if (local_context->getSettingsRef().allow_experimental_analyzer) header = InterpreterSelectQueryAnalyzer::getSampleBlock(modified_query_ast, local_context, SelectQueryOptions(processed_stage).analyze()); else header = InterpreterSelectQuery(modified_query_ast, local_context, SelectQueryOptions(processed_stage).analyze()).getSampleBlock(); ClusterProxy::SelectStreamFactory select_stream_factory = ClusterProxy::SelectStreamFactory( header, {}, storage_snapshot, processed_stage); ClusterProxy::executeQueryWithParallelReplicas( query_plan, getStorageID(), /*remove_table_function_ptr*/ nullptr, select_stream_factory, modified_query_ast, local_context, query_info, cluster); } else { const bool enable_parallel_reading = !query_info.parallel_replicas_disabled && local_context->canUseParallelReplicasOnFollower() && local_context->getSettingsRef().parallel_replicas_for_non_replicated_merge_tree; if (auto plan = reader.read( column_names, storage_snapshot, query_info, local_context, max_block_size, num_streams, processed_stage, nullptr, enable_parallel_reading)) query_plan = std::move(*plan); } /// Now, copy of parts that is required for the query, stored in the processors, /// while snapshot_data.parts includes all parts, even one that had been filtered out with partition pruning, /// reset them to avoid holding them. auto & snapshot_data = assert_cast<MergeTreeData::SnapshotData &>(*storage_snapshot->data); snapshot_data.parts = {}; snapshot_data.alter_conversions = {}; } std::optional<UInt64> StorageMergeTree::totalRows(const Settings &) const { return getTotalActiveSizeInRows(); } std::optional<UInt64> StorageMergeTree::totalRowsByPartitionPredicate(const SelectQueryInfo & query_info, ContextPtr local_context) const { auto parts = getVisibleDataPartsVector(local_context); return totalRowsByPartitionPredicateImpl(query_info, local_context, parts); } std::optional<UInt64> StorageMergeTree::totalBytes(const Settings &) const { return getTotalActiveSizeInBytes(); } SinkToStoragePtr StorageMergeTree::write(const ASTPtr & /*query*/, const StorageMetadataPtr & metadata_snapshot, ContextPtr local_context, bool /*async_insert*/) { const auto & settings = local_context->getSettingsRef(); return std::make_shared<MergeTreeSink>( *this, metadata_snapshot, settings.max_partitions_per_insert_block, local_context); } void StorageMergeTree::checkTableCanBeDropped([[ maybe_unused ]] ContextPtr query_context) const { if (!supportsReplication() && isStaticStorage()) return; auto table_id = getStorageID(); getContext()->checkTableCanBeDropped(table_id.database_name, table_id.table_name, getTotalActiveSizeInBytes()); } void StorageMergeTree::drop() { shutdown(); /// In case there is read-only disk we cannot allow to call dropAllData(), but dropping tables is allowed. if (isStaticStorage()) return; dropAllData(); } void StorageMergeTree::alter( const AlterCommands & commands, ContextPtr local_context, AlterLockHolder & table_lock_holder) { if (local_context->getCurrentTransaction() && local_context->getSettingsRef().throw_on_unsupported_query_inside_transaction) throw Exception(ErrorCodes::NOT_IMPLEMENTED, "ALTER METADATA is not supported inside transactions"); auto table_id = getStorageID(); auto old_storage_settings = getSettings(); StorageInMemoryMetadata new_metadata = getInMemoryMetadata(); StorageInMemoryMetadata old_metadata = getInMemoryMetadata(); auto maybe_mutation_commands = commands.getMutationCommands(new_metadata, local_context->getSettingsRef().materialize_ttl_after_modify, local_context); if (!maybe_mutation_commands.empty()) delayMutationOrThrowIfNeeded(nullptr, local_context); Int64 mutation_version = -1; commands.apply(new_metadata, local_context); /// This alter can be performed at new_metadata level only if (commands.isSettingsAlter()) { changeSettings(new_metadata.settings_changes, table_lock_holder); DatabaseCatalog::instance().getDatabase(table_id.database_name)->alterTable(local_context, table_id, new_metadata); } else { if (!maybe_mutation_commands.empty() && maybe_mutation_commands.containBarrierCommand()) { int64_t prev_mutation = 0; { std::lock_guard lock(currently_processing_in_background_mutex); auto it = current_mutations_by_version.rbegin(); if (it != current_mutations_by_version.rend()) prev_mutation = it->first; } if (prev_mutation != 0) { LOG_DEBUG(log, "Cannot change metadata with barrier alter query, will wait for mutation {}", prev_mutation); waitForMutation(prev_mutation, /* from_another_mutation */ true); LOG_DEBUG(log, "Mutation {} finished", prev_mutation); } } { changeSettings(new_metadata.settings_changes, table_lock_holder); checkTTLExpressions(new_metadata, old_metadata); /// Reinitialize primary key because primary key column types might have changed. setProperties(new_metadata, old_metadata, false, local_context); DatabaseCatalog::instance().getDatabase(table_id.database_name)->alterTable(local_context, table_id, new_metadata); if (!maybe_mutation_commands.empty()) mutation_version = startMutation(maybe_mutation_commands, local_context); } { /// Reset Object columns, because column of type /// Object may be added or dropped by alter. auto parts_lock = lockParts(); resetObjectColumnsFromActiveParts(parts_lock); } /// Always execute required mutations synchronously, because alters /// should be executed in sequential order. if (!maybe_mutation_commands.empty()) waitForMutation(mutation_version, false); } { /// Some additional changes in settings auto new_storage_settings = getSettings(); if (old_storage_settings->non_replicated_deduplication_window != new_storage_settings->non_replicated_deduplication_window) { /// We cannot place this check into settings sanityCheck because it depends on format_version. /// sanityCheck must work event without storage. if (new_storage_settings->non_replicated_deduplication_window != 0 && format_version < MERGE_TREE_DATA_MIN_FORMAT_VERSION_WITH_CUSTOM_PARTITIONING) throw Exception(ErrorCodes::BAD_ARGUMENTS, "Deduplication for non-replicated MergeTree in old syntax is not supported"); deduplication_log->setDeduplicationWindowSize(new_storage_settings->non_replicated_deduplication_window); } } } /// While exists, marks parts as 'currently_merging_mutating_parts' and reserves free space on filesystem. CurrentlyMergingPartsTagger::CurrentlyMergingPartsTagger( FutureMergedMutatedPartPtr future_part_, size_t total_size, StorageMergeTree & storage_, const StorageMetadataPtr & metadata_snapshot, bool is_mutation) : future_part(future_part_), storage(storage_) { /// Assume mutex is already locked, because this method is called from mergeTask. /// if we mutate part, than we should reserve space on the same disk, because mutations possible can create hardlinks if (is_mutation) { reserved_space = storage.tryReserveSpace(total_size, future_part->parts[0]->getDataPartStorage()); } else { IMergeTreeDataPart::TTLInfos ttl_infos; size_t max_volume_index = 0; for (auto & part_ptr : future_part->parts) { ttl_infos.update(part_ptr->ttl_infos); auto disk_name = part_ptr->getDataPartStorage().getDiskName(); size_t volume_index = storage.getStoragePolicy()->getVolumeIndexByDiskName(disk_name); max_volume_index = std::max(max_volume_index, volume_index); } reserved_space = storage.balancedReservation( metadata_snapshot, total_size, max_volume_index, future_part->name, future_part->part_info, future_part->parts, &tagger, &ttl_infos); if (!reserved_space) reserved_space = storage.tryReserveSpacePreferringTTLRules(metadata_snapshot, total_size, ttl_infos, time(nullptr), max_volume_index); } if (!reserved_space) { if (is_mutation) throw Exception(ErrorCodes::NOT_ENOUGH_SPACE, "Not enough space for mutating part '{}'", future_part->parts[0]->name); else throw Exception(ErrorCodes::NOT_ENOUGH_SPACE, "Not enough space for merging parts"); } future_part->updatePath(storage, reserved_space.get()); for (const auto & part : future_part->parts) { if (storage.currently_merging_mutating_parts.contains(part)) throw Exception(ErrorCodes::LOGICAL_ERROR, "Tagging already tagged part {}. This is a bug.", part->name); } storage.currently_merging_mutating_parts.insert(future_part->parts.begin(), future_part->parts.end()); } CurrentlyMergingPartsTagger::~CurrentlyMergingPartsTagger() { std::lock_guard lock(storage.currently_processing_in_background_mutex); for (const auto & part : future_part->parts) { if (!storage.currently_merging_mutating_parts.contains(part)) std::terminate(); storage.currently_merging_mutating_parts.erase(part); } storage.currently_processing_in_background_condition.notify_all(); } Int64 StorageMergeTree::startMutation(const MutationCommands & commands, ContextPtr query_context) { /// Choose any disk, because when we load mutations we search them at each disk /// where storage can be placed. See loadMutations(). auto disk = getStoragePolicy()->getAnyDisk(); TransactionID current_tid = Tx::PrehistoricTID; String additional_info; auto txn = query_context->getCurrentTransaction(); if (txn) { current_tid = txn->tid; additional_info = fmt::format(" (TID: {}; TIDH: {})", current_tid, current_tid.getHash()); } Int64 version; { std::lock_guard lock(currently_processing_in_background_mutex); MergeTreeMutationEntry entry(commands, disk, relative_data_path, insert_increment.get(), current_tid, getContext()->getWriteSettings()); version = increment.get(); entry.commit(version); String mutation_id = entry.file_name; if (txn) txn->addMutation(shared_from_this(), mutation_id); bool inserted = current_mutations_by_version.try_emplace(version, std::move(entry)).second; if (!inserted) throw Exception(ErrorCodes::LOGICAL_ERROR, "Mutation {} already exists, it's a bug", version); LOG_INFO(log, "Added mutation: {}{}", mutation_id, additional_info); } background_operations_assignee.trigger(); return version; } void StorageMergeTree::updateMutationEntriesErrors(FutureMergedMutatedPartPtr result_part, bool is_successful, const String & exception_message) { /// Update the information about failed parts in the system.mutations table. Int64 sources_data_version = result_part->parts.at(0)->info.getDataVersion(); Int64 result_data_version = result_part->part_info.getDataVersion(); if (sources_data_version != result_data_version) { std::lock_guard lock(currently_processing_in_background_mutex); auto mutations_begin_it = current_mutations_by_version.upper_bound(sources_data_version); auto mutations_end_it = current_mutations_by_version.upper_bound(result_data_version); for (auto it = mutations_begin_it; it != mutations_end_it; ++it) { MergeTreeMutationEntry & entry = it->second; if (is_successful) { if (!entry.latest_failed_part.empty() && result_part->part_info.contains(entry.latest_failed_part_info)) { entry.latest_failed_part.clear(); entry.latest_failed_part_info = MergeTreePartInfo(); entry.latest_fail_time = 0; entry.latest_fail_reason.clear(); } } else { entry.latest_failed_part = result_part->parts.at(0)->name; entry.latest_failed_part_info = result_part->parts.at(0)->info; entry.latest_fail_time = time(nullptr); entry.latest_fail_reason = exception_message; } } } std::unique_lock lock(mutation_wait_mutex); mutation_wait_event.notify_all(); } void StorageMergeTree::waitForMutation(Int64 version, bool wait_for_another_mutation) { String mutation_id = MergeTreeMutationEntry::versionToFileName(version); waitForMutation(version, mutation_id, wait_for_another_mutation); } void StorageMergeTree::waitForMutation(const String & mutation_id, bool wait_for_another_mutation) { Int64 version = MergeTreeMutationEntry::parseFileName(mutation_id); waitForMutation(version, mutation_id, wait_for_another_mutation); } void StorageMergeTree::waitForMutation(Int64 version, const String & mutation_id, bool wait_for_another_mutation) { LOG_INFO(log, "Waiting mutation: {}", mutation_id); { auto check = [version, wait_for_another_mutation, this]() { if (shutdown_called) return true; auto mutation_status = getIncompleteMutationsStatus(version, nullptr, wait_for_another_mutation); return !mutation_status || mutation_status->is_done || !mutation_status->latest_fail_reason.empty(); }; std::unique_lock lock(mutation_wait_mutex); mutation_wait_event.wait(lock, check); } /// At least we have our current mutation std::set<String> mutation_ids; mutation_ids.insert(mutation_id); auto mutation_status = getIncompleteMutationsStatus(version, &mutation_ids, wait_for_another_mutation); checkMutationStatus(mutation_status, mutation_ids); LOG_INFO(log, "Mutation {} done", mutation_id); } void StorageMergeTree::setMutationCSN(const String & mutation_id, CSN csn) { LOG_INFO(log, "Writing CSN {} for mutation {}", csn, mutation_id); UInt64 version = MergeTreeMutationEntry::parseFileName(mutation_id); std::lock_guard lock(currently_processing_in_background_mutex); auto it = current_mutations_by_version.find(version); if (it == current_mutations_by_version.end()) throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot find mutation {}", mutation_id); it->second.writeCSN(csn); } void StorageMergeTree::mutate(const MutationCommands & commands, ContextPtr query_context) { delayMutationOrThrowIfNeeded(nullptr, query_context); /// Validate partition IDs (if any) before starting mutation getPartitionIdsAffectedByCommands(commands, query_context); Int64 version; { /// It's important to serialize order of mutations with alter queries because /// they can depend on each other. if (auto alter_lock = tryLockForAlter(query_context->getSettings().lock_acquire_timeout); alter_lock == std::nullopt) { throw Exception(ErrorCodes::TIMEOUT_EXCEEDED, "Cannot start mutation in {}ms because some metadata-changing ALTER (MODIFY|RENAME|ADD|DROP) is currently executing. " "You can change this timeout with `lock_acquire_timeout` setting", query_context->getSettings().lock_acquire_timeout.totalMilliseconds()); } version = startMutation(commands, query_context); } if (query_context->getSettingsRef().mutations_sync > 0 || query_context->getCurrentTransaction()) waitForMutation(version, false); } bool StorageMergeTree::hasLightweightDeletedMask() const { return has_lightweight_delete_parts.load(std::memory_order_relaxed); } namespace { struct PartVersionWithName { Int64 version; String name; }; bool comparator(const PartVersionWithName & f, const PartVersionWithName & s) { return f.version < s.version; } } std::optional<MergeTreeMutationStatus> StorageMergeTree::getIncompleteMutationsStatus( Int64 mutation_version, std::set<String> * mutation_ids, bool from_another_mutation) const { std::unique_lock lock(currently_processing_in_background_mutex); return getIncompleteMutationsStatusUnlocked(mutation_version, lock, mutation_ids, from_another_mutation); } std::optional<MergeTreeMutationStatus> StorageMergeTree::getIncompleteMutationsStatusUnlocked( Int64 mutation_version, std::unique_lock<std::mutex> & /*lock*/, std::set<String> * mutation_ids, bool from_another_mutation) const { auto current_mutation_it = current_mutations_by_version.find(mutation_version); /// Killed if (current_mutation_it == current_mutations_by_version.end()) return {}; MergeTreeMutationStatus result{.is_done = false}; const auto & mutation_entry = current_mutation_it->second; auto txn = tryGetTransactionForMutation(mutation_entry, log); /// There's no way a transaction may finish before a mutation that was started by the transaction. /// But sometimes we need to check status of an unrelated mutation, in this case we don't care about transactions. assert(txn || mutation_entry.tid.isPrehistoric() || from_another_mutation); auto data_parts = getVisibleDataPartsVector(txn); for (const auto & data_part : data_parts) { Int64 data_version = data_part->info.getDataVersion(); if (data_version < mutation_version) { if (!mutation_entry.latest_fail_reason.empty()) { result.latest_failed_part = mutation_entry.latest_failed_part; result.latest_fail_reason = mutation_entry.latest_fail_reason; result.latest_fail_time = mutation_entry.latest_fail_time; /// Fill all mutations which failed with the same error /// (we can execute several mutations together) if (mutation_ids) { auto mutations_begin_it = current_mutations_by_version.upper_bound(data_version); for (auto it = mutations_begin_it; it != current_mutations_by_version.end(); ++it) /// All mutations with the same failure if (it->second.latest_fail_reason == result.latest_fail_reason) mutation_ids->insert(it->second.file_name); } } else if (txn && !from_another_mutation) { /// Part is locked by concurrent transaction, most likely it will never be mutated TIDHash part_locked = data_part->version.removal_tid_lock.load(); if (part_locked && part_locked != mutation_entry.tid.getHash()) { result.latest_failed_part = data_part->name; result.latest_fail_reason = fmt::format("Serialization error: part {} is locked by transaction {}", data_part->name, part_locked); result.latest_fail_time = time(nullptr); } } return result; } } result.is_done = true; return result; } std::vector<MergeTreeMutationStatus> StorageMergeTree::getMutationsStatus() const { std::lock_guard lock(currently_processing_in_background_mutex); std::vector<PartVersionWithName> part_versions_with_names; auto data_parts = getDataPartsVectorForInternalUsage(); part_versions_with_names.reserve(data_parts.size()); for (const auto & part : data_parts) part_versions_with_names.emplace_back(PartVersionWithName{part->info.getDataVersion(), part->name}); std::sort(part_versions_with_names.begin(), part_versions_with_names.end(), comparator); std::vector<MergeTreeMutationStatus> result; for (const auto & kv : current_mutations_by_version) { Int64 mutation_version = kv.first; const MergeTreeMutationEntry & entry = kv.second; const PartVersionWithName needle{mutation_version, ""}; auto versions_it = std::lower_bound( part_versions_with_names.begin(), part_versions_with_names.end(), needle, comparator); size_t parts_to_do = versions_it - part_versions_with_names.begin(); Names parts_to_do_names; parts_to_do_names.reserve(parts_to_do); for (size_t i = 0; i < parts_to_do; ++i) parts_to_do_names.push_back(part_versions_with_names[i].name); std::map<String, Int64> block_numbers_map({{"", entry.block_number}}); for (const MutationCommand & command : entry.commands) { WriteBufferFromOwnString buf; formatAST(*command.ast, buf, false, true); result.push_back(MergeTreeMutationStatus { entry.file_name, buf.str(), entry.create_time, block_numbers_map, parts_to_do_names, /* is_done = */parts_to_do_names.empty(), entry.latest_failed_part, entry.latest_fail_time, entry.latest_fail_reason, }); } } return result; } CancellationCode StorageMergeTree::killMutation(const String & mutation_id) { LOG_TRACE(log, "Killing mutation {}", mutation_id); UInt64 mutation_version = MergeTreeMutationEntry::tryParseFileName(mutation_id); if (!mutation_version) return CancellationCode::NotFound; std::optional<MergeTreeMutationEntry> to_kill; { std::lock_guard lock(currently_processing_in_background_mutex); auto it = current_mutations_by_version.find(mutation_version); if (it != current_mutations_by_version.end()) { to_kill.emplace(std::move(it->second)); current_mutations_by_version.erase(it); } } if (!to_kill) return CancellationCode::NotFound; if (auto txn = tryGetTransactionForMutation(*to_kill, log)) { LOG_TRACE(log, "Cancelling transaction {} which had started mutation {}", to_kill->tid, mutation_id); TransactionLog::instance().rollbackTransaction(txn); } getContext()->getMergeList().cancelPartMutations(getStorageID(), {}, to_kill->block_number); to_kill->removeFile(); LOG_TRACE(log, "Cancelled part mutations and removed mutation file {}", mutation_id); { std::lock_guard lock(mutation_wait_mutex); mutation_wait_event.notify_all(); } /// Maybe there is another mutation that was blocked by the killed one. Try to execute it immediately. background_operations_assignee.trigger(); return CancellationCode::CancelSent; } void StorageMergeTree::loadDeduplicationLog() { auto settings = getSettings(); if (settings->non_replicated_deduplication_window != 0 && format_version < MERGE_TREE_DATA_MIN_FORMAT_VERSION_WITH_CUSTOM_PARTITIONING) throw Exception(ErrorCodes::BAD_ARGUMENTS, "Deduplication for non-replicated MergeTree in old syntax is not supported"); auto disk = getDisks()[0]; std::string path = fs::path(relative_data_path) / "deduplication_logs"; deduplication_log = std::make_unique<MergeTreeDeduplicationLog>(path, settings->non_replicated_deduplication_window, format_version, disk); deduplication_log->load(); } void StorageMergeTree::loadMutations() { for (const auto & disk : getDisks()) { for (auto it = disk->iterateDirectory(relative_data_path); it->isValid(); it->next()) { if (startsWith(it->name(), "mutation_")) { MergeTreeMutationEntry entry(disk, relative_data_path, it->name()); UInt64 block_number = entry.block_number; LOG_DEBUG(log, "Loading mutation: {} entry, commands size: {}", it->name(), entry.commands.size()); if (!entry.tid.isPrehistoric() && !entry.csn) { if (auto csn = TransactionLog::getCSN(entry.tid)) { /// Transaction is committed => mutation is finished, but let's load it anyway (so it will be shown in system.mutations) entry.writeCSN(csn); } else { TransactionLog::assertTIDIsNotOutdated(entry.tid); LOG_DEBUG(log, "Mutation entry {} was created by transaction {}, but it was not committed. Removing mutation entry", it->name(), entry.tid); disk->removeFile(it->path()); continue; } } auto inserted = current_mutations_by_version.try_emplace(block_number, std::move(entry)).second; if (!inserted) throw Exception(ErrorCodes::LOGICAL_ERROR, "Mutation {} already exists, it's a bug", block_number); } else if (startsWith(it->name(), "tmp_mutation_")) { disk->removeFile(it->path()); } } } if (!current_mutations_by_version.empty()) increment.value = std::max(increment.value.load(), current_mutations_by_version.rbegin()->first); } MergeMutateSelectedEntryPtr StorageMergeTree::selectPartsToMerge( const StorageMetadataPtr & metadata_snapshot, bool aggressive, const String & partition_id, bool final, String & out_disable_reason, TableLockHolder & /* table_lock_holder */, std::unique_lock<std::mutex> & lock, const MergeTreeTransactionPtr & txn, bool optimize_skip_merged_partitions, SelectPartsDecision * select_decision_out) { auto data_settings = getSettings(); auto future_part = std::make_shared<FutureMergedMutatedPart>(); if (storage_settings.get()->assign_part_uuids) future_part->uuid = UUIDHelpers::generateV4(); /// You must call destructor with unlocked `currently_processing_in_background_mutex`. CurrentlyMergingPartsTaggerPtr merging_tagger; MergeList::EntryPtr merge_entry; auto can_merge = [this, &lock](const DataPartPtr & left, const DataPartPtr & right, const MergeTreeTransaction * tx, String & disable_reason) -> bool { if (tx) { /// Cannot merge parts if some of them are not visible in current snapshot /// TODO Transactions: We can use simplified visibility rules (without CSN lookup) here if ((left && !left->version.isVisible(tx->getSnapshot(), Tx::EmptyTID)) || (right && !right->version.isVisible(tx->getSnapshot(), Tx::EmptyTID))) { disable_reason = "Some part is not visible in transaction"; return false; } /// Do not try to merge parts that are locked for removal (merge will probably fail) if ((left && left->version.isRemovalTIDLocked()) || (right && right->version.isRemovalTIDLocked())) { disable_reason = "Some part is locked for removal in another cuncurrent transaction"; return false; } } /// This predicate is checked for the first part of each range. /// (left = nullptr, right = "first part of partition") if (!left) { if (currently_merging_mutating_parts.contains(right)) { disable_reason = "Some part currently in a merging or mutating process"; return false; } else return true; } if (currently_merging_mutating_parts.contains(left) || currently_merging_mutating_parts.contains(right)) { disable_reason = "Some part currently in a merging or mutating process"; return false; } if (getCurrentMutationVersion(left, lock) != getCurrentMutationVersion(right, lock)) { disable_reason = "Some parts have different mutation version"; return false; } if (!partsContainSameProjections(left, right)) { disable_reason = "Some parts contains differ projections"; return false; } auto max_possible_level = getMaxLevelInBetween(left, right); if (max_possible_level > std::max(left->info.level, right->info.level)) { disable_reason = fmt::format("There is an outdated part in a gap between two active parts ({}, {}) with merge level {} higher than these active parts have", left->name, right->name, max_possible_level); return false; } return true; }; SelectPartsDecision select_decision = SelectPartsDecision::CANNOT_SELECT; auto is_background_memory_usage_ok = [](String & disable_reason) -> bool { if (canEnqueueBackgroundTask()) return true; disable_reason = fmt::format("Current background tasks memory usage ({}) is more than the limit ({})", formatReadableSizeWithBinarySuffix(background_memory_tracker.get()), formatReadableSizeWithBinarySuffix(background_memory_tracker.getSoftLimit())); return false; }; if (partition_id.empty()) { if (is_background_memory_usage_ok(out_disable_reason)) { UInt64 max_source_parts_size = merger_mutator.getMaxSourcePartsSizeForMerge(); bool merge_with_ttl_allowed = getTotalMergesWithTTLInMergeList() < data_settings->max_number_of_merges_with_ttl_in_pool; /// TTL requirements is much more strict than for regular merge, so /// if regular not possible, than merge with ttl is not also not /// possible. if (max_source_parts_size > 0) { select_decision = merger_mutator.selectPartsToMerge( future_part, aggressive, max_source_parts_size, can_merge, merge_with_ttl_allowed, txn, out_disable_reason); } else out_disable_reason = "Current value of max_source_parts_size is zero"; } } else { while (true) { auto timeout_ms = getSettings()->lock_acquire_timeout_for_background_operations.totalMilliseconds(); auto timeout = std::chrono::milliseconds(timeout_ms); if (!is_background_memory_usage_ok(out_disable_reason)) { constexpr auto poll_interval = std::chrono::seconds(1); Int64 attempts = timeout / poll_interval; bool ok = false; for (Int64 i = 0; i < attempts; ++i) { std::this_thread::sleep_for(poll_interval); if (is_background_memory_usage_ok(out_disable_reason)) { ok = true; break; } } if (!ok) break; } select_decision = merger_mutator.selectAllPartsToMergeWithinPartition( future_part, can_merge, partition_id, final, metadata_snapshot, txn, out_disable_reason, optimize_skip_merged_partitions); /// If final - we will wait for currently processing merges to finish and continue. if (final && select_decision != SelectPartsDecision::SELECTED && !currently_merging_mutating_parts.empty()) { LOG_DEBUG(log, "Waiting for currently running merges ({} parts are merging right now) to perform OPTIMIZE FINAL", currently_merging_mutating_parts.size()); if (std::cv_status::timeout == currently_processing_in_background_condition.wait_for(lock, timeout)) { out_disable_reason = fmt::format("Timeout ({} ms) while waiting for already running merges before running OPTIMIZE with FINAL", timeout_ms); break; } } else break; } } /// In case of final we need to know the decision of select in StorageMergeTree::merge /// to treat NOTHING_TO_MERGE as successful merge (otherwise optimize final will be uncompleted) if (select_decision_out) *select_decision_out = select_decision; if (select_decision != SelectPartsDecision::SELECTED) { if (!out_disable_reason.empty()) out_disable_reason += ". "; out_disable_reason += "Cannot select parts for optimization"; return {}; } /// Account TTL merge here to avoid exceeding the max_number_of_merges_with_ttl_in_pool limit if (isTTLMergeType(future_part->merge_type)) getContext()->getMergeList().bookMergeWithTTL(); merging_tagger = std::make_unique<CurrentlyMergingPartsTagger>(future_part, MergeTreeDataMergerMutator::estimateNeededDiskSpace(future_part->parts), *this, metadata_snapshot, false); return std::make_shared<MergeMutateSelectedEntry>(future_part, std::move(merging_tagger), std::make_shared<MutationCommands>()); } bool StorageMergeTree::merge( bool aggressive, const String & partition_id, bool final, bool deduplicate, const Names & deduplicate_by_columns, bool cleanup, const MergeTreeTransactionPtr & txn, String & out_disable_reason, bool optimize_skip_merged_partitions) { auto table_lock_holder = lockForShare(RWLockImpl::NO_QUERY, getSettings()->lock_acquire_timeout_for_background_operations); auto metadata_snapshot = getInMemoryMetadataPtr(); SelectPartsDecision select_decision; MergeMutateSelectedEntryPtr merge_mutate_entry; { std::unique_lock lock(currently_processing_in_background_mutex); if (merger_mutator.merges_blocker.isCancelled()) throw Exception(ErrorCodes::ABORTED, "Cancelled merging parts"); merge_mutate_entry = selectPartsToMerge( metadata_snapshot, aggressive, partition_id, final, out_disable_reason, table_lock_holder, lock, txn, optimize_skip_merged_partitions, &select_decision); } /// If there is nothing to merge then we treat this merge as successful (needed for optimize final optimization) if (select_decision == SelectPartsDecision::NOTHING_TO_MERGE) return true; if (!merge_mutate_entry) return false; /// Copying a vector of columns `deduplicate by columns. IExecutableTask::TaskResultCallback f = [](bool) {}; auto task = std::make_shared<MergePlainMergeTreeTask>( *this, metadata_snapshot, deduplicate, deduplicate_by_columns, cleanup, merge_mutate_entry, table_lock_holder, f); task->setCurrentTransaction(MergeTreeTransactionHolder{}, MergeTreeTransactionPtr{txn}); executeHere(task); return true; } bool StorageMergeTree::partIsAssignedToBackgroundOperation(const DataPartPtr & part) const { std::lock_guard background_processing_lock(currently_processing_in_background_mutex); return currently_merging_mutating_parts.contains(part); } MergeMutateSelectedEntryPtr StorageMergeTree::selectPartsToMutate( const StorageMetadataPtr & metadata_snapshot, String & /* disable_reason */, TableLockHolder & /* table_lock_holder */, std::unique_lock<std::mutex> & /*currently_processing_in_background_mutex_lock*/) { if (current_mutations_by_version.empty()) return {}; size_t max_source_part_size = merger_mutator.getMaxSourcePartSizeForMutation(); if (max_source_part_size == 0) { LOG_DEBUG( log, "Not enough idle threads to apply mutations at the moment. See settings 'number_of_free_entries_in_pool_to_execute_mutation' " "and 'background_pool_size'"); return {}; } size_t max_ast_elements = getContext()->getSettingsRef().max_expanded_ast_elements; auto future_part = std::make_shared<FutureMergedMutatedPart>(); if (storage_settings.get()->assign_part_uuids) future_part->uuid = UUIDHelpers::generateV4(); CurrentlyMergingPartsTaggerPtr tagger; auto mutations_end_it = current_mutations_by_version.end(); for (const auto & part : getDataPartsVectorForInternalUsage()) { if (currently_merging_mutating_parts.contains(part)) continue; auto mutations_begin_it = current_mutations_by_version.upper_bound(part->info.getDataVersion()); if (mutations_begin_it == mutations_end_it) continue; if (max_source_part_size < part->getBytesOnDisk()) { LOG_DEBUG( log, "Current max source part size for mutation is {} but part size {}. Will not mutate part {} yet", max_source_part_size, part->getBytesOnDisk(), part->name); continue; } TransactionID first_mutation_tid = mutations_begin_it->second.tid; MergeTreeTransactionPtr txn; if (!first_mutation_tid.isPrehistoric()) { /// Mutate visible parts only /// NOTE Do not mutate visible parts in Outdated state, because it does not make sense: /// mutation will fail anyway due to serialization error. /// It's possible that both mutation and transaction are already finished, /// because that part should not be mutated because it was not visible for that transaction. if (!part->version.isVisible(first_mutation_tid.start_csn, first_mutation_tid)) continue; txn = tryGetTransactionForMutation(mutations_begin_it->second, log); if (!txn) throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot find transaction {} that has started mutation {} " "that is going to be applied to part {}", first_mutation_tid, mutations_begin_it->second.file_name, part->name); } auto commands = std::make_shared<MutationCommands>(); size_t current_ast_elements = 0; auto last_mutation_to_apply = mutations_end_it; for (auto it = mutations_begin_it; it != mutations_end_it; ++it) { /// Do not squash mutations from different transactions to be able to commit/rollback them independently. if (first_mutation_tid != it->second.tid) break; size_t commands_size = 0; MutationCommands commands_for_size_validation; for (const auto & command : it->second.commands) { if (command.type != MutationCommand::Type::DROP_COLUMN && command.type != MutationCommand::Type::DROP_INDEX && command.type != MutationCommand::Type::DROP_PROJECTION && command.type != MutationCommand::Type::RENAME_COLUMN) { commands_for_size_validation.push_back(command); } else { commands_size += command.ast->size(); } } if (!commands_for_size_validation.empty()) { try { auto fake_query_context = Context::createCopy(getContext()); fake_query_context->makeQueryContext(); fake_query_context->setCurrentQueryId(""); MutationsInterpreter::Settings settings(false); MutationsInterpreter interpreter( shared_from_this(), metadata_snapshot, commands_for_size_validation, fake_query_context, settings); commands_size += interpreter.evaluateCommandsSize(); } catch (...) { tryLogCurrentException(log); MergeTreeMutationEntry & entry = it->second; entry.latest_fail_time = time(nullptr); entry.latest_fail_reason = getCurrentExceptionMessage(false); /// NOTE we should not skip mutations, because exception may be retryable (e.g. MEMORY_LIMIT_EXCEEDED) break; } } if (current_ast_elements + commands_size >= max_ast_elements) break; const auto & single_mutation_commands = it->second.commands; if (single_mutation_commands.containBarrierCommand()) { if (commands->empty()) { commands->insert(commands->end(), single_mutation_commands.begin(), single_mutation_commands.end()); last_mutation_to_apply = it; } break; } else { current_ast_elements += commands_size; commands->insert(commands->end(), single_mutation_commands.begin(), single_mutation_commands.end()); last_mutation_to_apply = it; } } assert(commands->empty() == (last_mutation_to_apply == mutations_end_it)); if (!commands->empty()) { auto new_part_info = part->info; new_part_info.mutation = last_mutation_to_apply->first; future_part->parts.push_back(part); future_part->part_info = new_part_info; future_part->name = part->getNewName(new_part_info); future_part->part_format = part->getFormat(); tagger = std::make_unique<CurrentlyMergingPartsTagger>(future_part, MergeTreeDataMergerMutator::estimateNeededDiskSpace({part}), *this, metadata_snapshot, true); return std::make_shared<MergeMutateSelectedEntry>(future_part, std::move(tagger), commands, txn); } } return {}; } UInt32 StorageMergeTree::getMaxLevelInBetween(const DataPartPtr & left, const DataPartPtr & right) const { auto parts_lock = lockParts(); auto begin = data_parts_by_info.find(left->info); if (begin == data_parts_by_info.end()) throw Exception(ErrorCodes::LOGICAL_ERROR, "unable to find left part, left part {}. It's a bug", left->name); auto end = data_parts_by_info.find(right->info); if (end == data_parts_by_info.end()) throw Exception(ErrorCodes::LOGICAL_ERROR, "unable to find right part, right part {}. It's a bug", right->name); UInt32 level = 0; for (auto it = begin++; it != end; ++it) { if (it == data_parts_by_info.end()) throw Exception(ErrorCodes::LOGICAL_ERROR, "left and right parts in the wrong order, left part {}, right part {}. It's a bug", left->name, right->name); level = std::max(level, (*it)->info.level); } return level; } bool StorageMergeTree::scheduleDataProcessingJob(BackgroundJobsAssignee & assignee) { if (shutdown_called) return false; assert(!isStaticStorage()); auto metadata_snapshot = getInMemoryMetadataPtr(); MergeMutateSelectedEntryPtr merge_entry, mutate_entry; auto shared_lock = lockForShare(RWLockImpl::NO_QUERY, getSettings()->lock_acquire_timeout_for_background_operations); MergeTreeTransactionHolder transaction_for_merge; MergeTreeTransactionPtr txn; if (transactions_enabled.load(std::memory_order_relaxed)) { /// TODO Transactions: avoid beginning transaction if there is nothing to merge. txn = TransactionLog::instance().beginTransaction(); transaction_for_merge = MergeTreeTransactionHolder{txn, /* autocommit = */ false}; } bool has_mutations = false; { std::unique_lock lock(currently_processing_in_background_mutex); if (merger_mutator.merges_blocker.isCancelled()) return false; String out_reason; merge_entry = selectPartsToMerge(metadata_snapshot, false, {}, false, out_reason, shared_lock, lock, txn); if (!merge_entry && !current_mutations_by_version.empty()) mutate_entry = selectPartsToMutate(metadata_snapshot, out_reason, shared_lock, lock); has_mutations = !current_mutations_by_version.empty(); } if (merge_entry) { auto task = std::make_shared<MergePlainMergeTreeTask>(*this, metadata_snapshot, /* deduplicate */ false, Names{}, /* cleanup */ false, merge_entry, shared_lock, common_assignee_trigger); task->setCurrentTransaction(std::move(transaction_for_merge), std::move(txn)); bool scheduled = assignee.scheduleMergeMutateTask(task); /// The problem that we already booked a slot for TTL merge, but a merge list entry will be created only in a prepare method /// in MergePlainMergeTreeTask. So, this slot will never be freed. if (!scheduled && isTTLMergeType(merge_entry->future_part->merge_type)) getContext()->getMergeList().cancelMergeWithTTL(); return scheduled; } if (mutate_entry) { /// We take new metadata snapshot here. It's because mutation commands can be executed only with metadata snapshot /// which is equal or more fresh than commands themselves. In extremely rare case it can happen that we will have alter /// in between we took snapshot above and selected commands. That is why we take new snapshot here. auto task = std::make_shared<MutatePlainMergeTreeTask>(*this, getInMemoryMetadataPtr(), mutate_entry, shared_lock, common_assignee_trigger); return assignee.scheduleMergeMutateTask(task); } if (has_mutations) { /// Notify in case of errors if no mutation was successfully selected. /// Otherwise, notification will occur after any of mutations complete. std::lock_guard lock(mutation_wait_mutex); mutation_wait_event.notify_all(); } bool scheduled = false; if (auto lock = time_after_previous_cleanup_temporary_directories.compareAndRestartDeferred( getSettings()->merge_tree_clear_old_temporary_directories_interval_seconds)) { assignee.scheduleCommonTask(std::make_shared<ExecutableLambdaAdapter>( [this, shared_lock] () { return clearOldTemporaryDirectories(getSettings()->temporary_directories_lifetime.totalSeconds()); }, common_assignee_trigger, getStorageID()), /* need_trigger */ false); scheduled = true; } if (auto lock = time_after_previous_cleanup_parts.compareAndRestartDeferred( getSettings()->merge_tree_clear_old_parts_interval_seconds)) { assignee.scheduleCommonTask(std::make_shared<ExecutableLambdaAdapter>( [this, shared_lock] () { /// All use relative_data_path which changes during rename /// so execute under share lock. size_t cleared_count = 0; cleared_count += clearOldPartsFromFilesystem(); cleared_count += clearOldWriteAheadLogs(); cleared_count += clearOldMutations(); cleared_count += clearEmptyParts(); cleared_count += clearOldBrokenPartsFromDetachedDirectory(); return cleared_count; /// TODO maybe take into account number of cleared objects when calculating backoff }, common_assignee_trigger, getStorageID()), /* need_trigger */ false); scheduled = true; } return scheduled; } size_t StorageMergeTree::getNumberOfUnfinishedMutations() const { std::unique_lock lock(currently_processing_in_background_mutex); size_t count = 0; for (const auto & [version, _] : current_mutations_by_version | std::views::reverse) { auto status = getIncompleteMutationsStatusUnlocked(version, lock, nullptr, true); if (!status) continue; if (status->is_done) break; ++count; } return count; } UInt64 StorageMergeTree::getCurrentMutationVersion( const DataPartPtr & part, std::unique_lock<std::mutex> & /*currently_processing_in_background_mutex_lock*/) const { auto it = current_mutations_by_version.upper_bound(part->info.getDataVersion()); if (it == current_mutations_by_version.begin()) return 0; --it; return it->first; } size_t StorageMergeTree::clearOldMutations(bool truncate) { size_t finished_mutations_to_keep = truncate ? 0 : getSettings()->finished_mutations_to_keep; std::vector<MergeTreeMutationEntry> mutations_to_delete; { std::lock_guard lock(currently_processing_in_background_mutex); if (current_mutations_by_version.size() <= finished_mutations_to_keep) return 0; auto end_it = current_mutations_by_version.end(); auto begin_it = current_mutations_by_version.begin(); if (std::optional<Int64> min_version = getMinPartDataVersion()) end_it = current_mutations_by_version.upper_bound(*min_version); size_t done_count = std::distance(begin_it, end_it); if (done_count <= finished_mutations_to_keep) return 0; for (auto it = begin_it; it != end_it; ++it) { if (!it->second.tid.isPrehistoric()) { done_count = std::distance(begin_it, it); break; } } if (done_count <= finished_mutations_to_keep) return 0; size_t to_delete_count = done_count - finished_mutations_to_keep; auto it = begin_it; for (size_t i = 0; i < to_delete_count; ++i) { const auto & tid = it->second.tid; if (!tid.isPrehistoric() && !TransactionLog::getCSN(tid)) throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot remove mutation {}, because transaction {} is not committed. It's a bug", it->first, tid); mutations_to_delete.push_back(std::move(it->second)); it = current_mutations_by_version.erase(it); } } for (auto & mutation : mutations_to_delete) { LOG_TRACE(log, "Removing mutation: {}", mutation.file_name); mutation.removeFile(); } return mutations_to_delete.size(); } bool StorageMergeTree::optimize( const ASTPtr & /*query*/, const StorageMetadataPtr & /*metadata_snapshot*/, const ASTPtr & partition, bool final, bool deduplicate, const Names & deduplicate_by_columns, bool cleanup, ContextPtr local_context) { if (deduplicate) { if (deduplicate_by_columns.empty()) LOG_DEBUG(log, "DEDUPLICATE BY all columns"); else LOG_DEBUG(log, "DEDUPLICATE BY ('{}')", fmt::join(deduplicate_by_columns, "', '")); } auto txn = local_context->getCurrentTransaction(); String disable_reason; if (!partition && final) { if (cleanup && this->merging_params.mode != MergingParams::Mode::Replacing) { constexpr const char * message = "Cannot OPTIMIZE with CLEANUP table: {}"; disable_reason = "only ReplacingMergeTree can be CLEANUP"; throw Exception(ErrorCodes::CANNOT_ASSIGN_OPTIMIZE, message, disable_reason); } DataPartsVector data_parts = getVisibleDataPartsVector(local_context); std::unordered_set<String> partition_ids; for (const DataPartPtr & part : data_parts) partition_ids.emplace(part->info.partition_id); for (const String & partition_id : partition_ids) { if (!merge( true, partition_id, true, deduplicate, deduplicate_by_columns, cleanup, txn, disable_reason, local_context->getSettingsRef().optimize_skip_merged_partitions)) { constexpr auto message = "Cannot OPTIMIZE table: {}"; if (disable_reason.empty()) disable_reason = "unknown reason"; LOG_INFO(log, message, disable_reason); if (local_context->getSettingsRef().optimize_throw_if_noop) throw Exception(ErrorCodes::CANNOT_ASSIGN_OPTIMIZE, message, disable_reason); return false; } } } else { String partition_id; if (partition) partition_id = getPartitionIDFromQuery(partition, local_context); if (!merge( true, partition_id, final, deduplicate, deduplicate_by_columns, cleanup, txn, disable_reason, local_context->getSettingsRef().optimize_skip_merged_partitions)) { constexpr auto message = "Cannot OPTIMIZE table: {}"; if (disable_reason.empty()) disable_reason = "unknown reason"; LOG_INFO(log, message, disable_reason); if (local_context->getSettingsRef().optimize_throw_if_noop) throw Exception(ErrorCodes::CANNOT_ASSIGN_OPTIMIZE, message, disable_reason); return false; } } return true; } ActionLock StorageMergeTree::stopMergesAndWait() { /// TODO allow to stop merges in specific partition only (like it's done in ReplicatedMergeTree) std::unique_lock lock(currently_processing_in_background_mutex); /// Asks to complete merges and does not allow them to start. /// This protects against "revival" of data for a removed partition after completion of merge. auto merge_blocker = merger_mutator.merges_blocker.cancel(); while (!currently_merging_mutating_parts.empty()) { LOG_DEBUG(log, "Waiting for currently running merges ({} parts are merging right now)", currently_merging_mutating_parts.size()); if (std::cv_status::timeout == currently_processing_in_background_condition.wait_for( lock, std::chrono::seconds(DBMS_DEFAULT_LOCK_ACQUIRE_TIMEOUT_SEC))) { throw Exception(ErrorCodes::TIMEOUT_EXCEEDED, "Timeout while waiting for already running merges"); } } return merge_blocker; } MergeTreeDataPartPtr StorageMergeTree::outdatePart(MergeTreeTransaction * txn, const String & part_name, bool force, bool clear_without_timeout) { if (force) { /// Forcefully stop merges and make part outdated auto merge_blocker = stopMergesAndWait(); auto parts_lock = lockParts(); auto part = getPartIfExistsUnlocked(part_name, {MergeTreeDataPartState::Active}, parts_lock); if (!part) throw Exception(ErrorCodes::NO_SUCH_DATA_PART, "Part {} not found, won't try to drop it.", part_name); removePartsFromWorkingSet(txn, {part}, clear_without_timeout, &parts_lock); return part; } else { /// Wait merges selector std::unique_lock lock(currently_processing_in_background_mutex); auto parts_lock = lockParts(); auto part = getPartIfExistsUnlocked(part_name, {MergeTreeDataPartState::Active}, parts_lock); /// It's okay, part was already removed if (!part) return nullptr; /// Part will be "removed" by merge or mutation, it's OK in case of some /// background cleanup processes like removing of empty parts. if (currently_merging_mutating_parts.contains(part)) return nullptr; removePartsFromWorkingSet(txn, {part}, clear_without_timeout, &parts_lock); return part; } } void StorageMergeTree::dropPartNoWaitNoThrow(const String & part_name) { if (auto part = outdatePart(NO_TRANSACTION_RAW, part_name, /*force=*/ false, /*clear_without_timeout=*/ false)) { if (deduplication_log) { deduplication_log->dropPart(part->info); } /// Need to destroy part objects before clearing them from filesystem. part.reset(); clearOldPartsFromFilesystem(); } /// Else nothing to do, part was removed in some different way } struct FutureNewEmptyPart { MergeTreePartInfo part_info; MergeTreePartition partition; std::string part_name; StorageMergeTree::MutableDataPartPtr data_part; }; using FutureNewEmptyParts = std::vector<FutureNewEmptyPart>; Strings getPartsNames(const FutureNewEmptyParts & parts) { Strings part_names; for (const auto & p : parts) part_names.push_back(p.part_name); return part_names; } FutureNewEmptyParts initCoverageWithNewEmptyParts(const DataPartsVector & old_parts) { FutureNewEmptyParts future_parts; for (const auto & old_part : old_parts) { future_parts.emplace_back(); auto & new_part = future_parts.back(); new_part.part_info = old_part->info; new_part.part_info.level += 1; new_part.partition = old_part->partition; new_part.part_name = old_part->getNewName(new_part.part_info); } return future_parts; } std::pair<StorageMergeTree::MutableDataPartsVector, std::vector<scope_guard>> createEmptyDataParts( MergeTreeData & data, FutureNewEmptyParts & future_parts, const MergeTreeTransactionPtr & txn) { std::pair<StorageMergeTree::MutableDataPartsVector, std::vector<scope_guard>> data_parts; for (auto & part: future_parts) { auto [new_data_part, tmp_dir_holder] = data.createEmptyPart(part.part_info, part.partition, part.part_name, txn); data_parts.first.emplace_back(std::move(new_data_part)); data_parts.second.emplace_back(std::move(tmp_dir_holder)); } return data_parts; } void StorageMergeTree::renameAndCommitEmptyParts(MutableDataPartsVector & new_parts, Transaction & transaction) { DataPartsVector covered_parts; for (auto & part: new_parts) { DataPartsVector covered_parts_by_one_part = renameTempPartAndReplace(part, transaction); if (covered_parts_by_one_part.size() > 1) throw Exception(ErrorCodes::LOGICAL_ERROR, "Part {} expected to cover not more then 1 part. " "{} covered parts have been found. This is a bug.", part->name, covered_parts_by_one_part.size()); std::move(covered_parts_by_one_part.begin(), covered_parts_by_one_part.end(), std::back_inserter(covered_parts)); } LOG_INFO(log, "Remove {} parts by covering them with empty {} parts. With txn {}.", covered_parts.size(), new_parts.size(), transaction.getTID()); transaction.commit(); /// Remove covered parts without waiting for old_parts_lifetime seconds. for (auto & part: covered_parts) part->remove_time.store(0, std::memory_order_relaxed); if (deduplication_log) for (const auto & part : covered_parts) deduplication_log->dropPart(part->info); } void StorageMergeTree::truncate(const ASTPtr &, const StorageMetadataPtr &, ContextPtr query_context, TableExclusiveLockHolder &) { { /// Asks to complete merges and does not allow them to start. /// This protects against "revival" of data for a removed partition after completion of merge. waitForOutdatedPartsToBeLoaded(); auto merge_blocker = stopMergesAndWait(); Stopwatch watch; ProfileEventsScope profile_events_scope; auto txn = query_context->getCurrentTransaction(); if (txn) { auto data_parts_lock = lockParts(); auto parts_to_remove = getVisibleDataPartsVectorUnlocked(query_context, data_parts_lock); removePartsFromWorkingSet(txn.get(), parts_to_remove, true, data_parts_lock); LOG_INFO(log, "Removed {} parts: [{}]", parts_to_remove.size(), fmt::join(getPartsNames(parts_to_remove), ", ")); } else { MergeTreeData::Transaction transaction(*this, txn.get()); auto operation_data_parts_lock = lockOperationsWithParts(); auto parts = getVisibleDataPartsVector(query_context); auto future_parts = initCoverageWithNewEmptyParts(parts); LOG_TEST(log, "Made {} empty parts in order to cover {} parts. Empty parts: {}, covered parts: {}. With txn {}", future_parts.size(), parts.size(), fmt::join(getPartsNames(future_parts), ", "), fmt::join(getPartsNames(parts), ", "), transaction.getTID()); auto [new_data_parts, tmp_dir_holders] = createEmptyDataParts(*this, future_parts, txn); renameAndCommitEmptyParts(new_data_parts, transaction); PartLog::addNewParts(query_context, PartLog::createPartLogEntries(new_data_parts, watch.elapsed(), profile_events_scope.getSnapshot())); LOG_INFO(log, "Truncated table with {} parts by replacing them with new empty {} parts. With txn {}", parts.size(), future_parts.size(), transaction.getTID()); } } /// Old parts are needed to be destroyed before clearing them from filesystem. clearOldMutations(true); clearOldPartsFromFilesystem(); clearEmptyParts(); } void StorageMergeTree::dropPart(const String & part_name, bool detach, ContextPtr query_context) { { /// Asks to complete merges and does not allow them to start. /// This protects against "revival" of data for a removed partition after completion of merge. auto merge_blocker = stopMergesAndWait(); Stopwatch watch; ProfileEventsScope profile_events_scope; /// It's important to create it outside of lock scope because /// otherwise it can lock parts in destructor and deadlock is possible. auto txn = query_context->getCurrentTransaction(); if (txn) { if (auto part = outdatePart(txn.get(), part_name, /*force=*/ true)) dropPartsImpl({part}, detach); } else { MergeTreeData::Transaction transaction(*this, txn.get()); auto operation_data_parts_lock = lockOperationsWithParts(); auto part = getPartIfExists(part_name, {MergeTreeDataPartState::Active}); if (!part) throw Exception(ErrorCodes::NO_SUCH_DATA_PART, "Part {} not found, won't try to drop it.", part_name); if (detach) { auto metadata_snapshot = getInMemoryMetadataPtr(); String part_dir = part->getDataPartStorage().getPartDirectory(); LOG_INFO(log, "Detaching {}", part_dir); auto holder = getTemporaryPartDirectoryHolder(String(DETACHED_DIR_NAME) + "/" + part_dir); part->makeCloneInDetached("", metadata_snapshot, /*disk_transaction*/ {}); } { auto future_parts = initCoverageWithNewEmptyParts({part}); LOG_TEST(log, "Made {} empty parts in order to cover {} part. With txn {}", fmt::join(getPartsNames(future_parts), ", "), fmt::join(getPartsNames({part}), ", "), transaction.getTID()); auto [new_data_parts, tmp_dir_holders] = createEmptyDataParts(*this, future_parts, txn); renameAndCommitEmptyParts(new_data_parts, transaction); PartLog::addNewParts(query_context, PartLog::createPartLogEntries(new_data_parts, watch.elapsed(), profile_events_scope.getSnapshot())); const auto * op = detach ? "Detached" : "Dropped"; LOG_INFO(log, "{} {} part by replacing it with new empty {} part. With txn {}", op, part->name, future_parts[0].part_name, transaction.getTID()); } } } /// Old part objects is needed to be destroyed before clearing them from filesystem. clearOldMutations(true); clearOldPartsFromFilesystem(); clearEmptyParts(); } void StorageMergeTree::dropPartition(const ASTPtr & partition, bool detach, ContextPtr query_context) { { const auto * partition_ast = partition->as<ASTPartition>(); /// Asks to complete merges and does not allow them to start. /// This protects against "revival" of data for a removed partition after completion of merge. auto merge_blocker = stopMergesAndWait(); Stopwatch watch; ProfileEventsScope profile_events_scope; /// It's important to create it outside of lock scope because /// otherwise it can lock parts in destructor and deadlock is possible. auto txn = query_context->getCurrentTransaction(); if (txn) { DataPartsVector parts_to_remove; { auto data_parts_lock = lockParts(); if (partition_ast && partition_ast->all) parts_to_remove = getVisibleDataPartsVectorUnlocked(query_context, data_parts_lock); else { String partition_id = getPartitionIDFromQuery(partition, query_context, &data_parts_lock); parts_to_remove = getVisibleDataPartsVectorInPartition(query_context, partition_id, data_parts_lock); } removePartsFromWorkingSet(txn.get(), parts_to_remove, true, data_parts_lock); } dropPartsImpl(std::move(parts_to_remove), detach); } else { MergeTreeData::Transaction transaction(*this, txn.get()); auto operation_data_parts_lock = lockOperationsWithParts(); DataPartsVector parts; { if (partition_ast && partition_ast->all) parts = getVisibleDataPartsVector(query_context); else { String partition_id = getPartitionIDFromQuery(partition, query_context); parts = getVisibleDataPartsVectorInPartition(query_context, partition_id); } } if (detach) { for (const auto & part : parts) { auto metadata_snapshot = getInMemoryMetadataPtr(); String part_dir = part->getDataPartStorage().getPartDirectory(); LOG_INFO(log, "Detaching {}", part_dir); auto holder = getTemporaryPartDirectoryHolder(String(DETACHED_DIR_NAME) + "/" + part_dir); part->makeCloneInDetached("", metadata_snapshot, /*disk_transaction*/ {}); } } auto future_parts = initCoverageWithNewEmptyParts(parts); LOG_TEST(log, "Made {} empty parts in order to cover {} parts. Empty parts: {}, covered parts: {}. With txn {}", future_parts.size(), parts.size(), fmt::join(getPartsNames(future_parts), ", "), fmt::join(getPartsNames(parts), ", "), transaction.getTID()); auto [new_data_parts, tmp_dir_holders] = createEmptyDataParts(*this, future_parts, txn); renameAndCommitEmptyParts(new_data_parts, transaction); PartLog::addNewParts(query_context, PartLog::createPartLogEntries(new_data_parts, watch.elapsed(), profile_events_scope.getSnapshot())); const auto * op = detach ? "Detached" : "Dropped"; LOG_INFO(log, "{} partition with {} parts by replacing them with new empty {} parts. With txn {}", op, parts.size(), future_parts.size(), transaction.getTID()); } } /// Old parts are needed to be destroyed before clearing them from filesystem. clearOldMutations(true); clearOldPartsFromFilesystem(); clearEmptyParts(); } void StorageMergeTree::dropPartsImpl(DataPartsVector && parts_to_remove, bool detach) { auto metadata_snapshot = getInMemoryMetadataPtr(); if (detach) { /// If DETACH clone parts to detached/ directory /// NOTE: no race with background cleanup until we hold pointers to parts for (const auto & part : parts_to_remove) { String part_dir = part->getDataPartStorage().getPartDirectory(); LOG_INFO(log, "Detaching {}", part_dir); auto holder = getTemporaryPartDirectoryHolder(String(DETACHED_DIR_NAME) + "/" + part_dir); part->makeCloneInDetached("", metadata_snapshot, /*disk_transaction*/ {}); } } if (deduplication_log) { for (const auto & part : parts_to_remove) deduplication_log->dropPart(part->info); } if (detach) LOG_INFO(log, "Detached {} parts: [{}]", parts_to_remove.size(), fmt::join(getPartsNames(parts_to_remove), ", ")); else LOG_INFO(log, "Removed {} parts: [{}]", parts_to_remove.size(), fmt::join(getPartsNames(parts_to_remove), ", ")); } PartitionCommandsResultInfo StorageMergeTree::attachPartition( const ASTPtr & partition, const StorageMetadataPtr & /* metadata_snapshot */, bool attach_part, ContextPtr local_context) { PartitionCommandsResultInfo results; PartsTemporaryRename renamed_parts(*this, "detached/"); MutableDataPartsVector loaded_parts = tryLoadPartsToAttach(partition, attach_part, local_context, renamed_parts); for (size_t i = 0; i < loaded_parts.size(); ++i) { LOG_INFO(log, "Attaching part {} from {}", loaded_parts[i]->name, renamed_parts.old_and_new_names[i].new_name); /// We should write version metadata on part creation to distinguish it from parts that were created without transaction. auto txn = local_context->getCurrentTransaction(); TransactionID tid = txn ? txn->tid : Tx::PrehistoricTID; loaded_parts[i]->version.setCreationTID(tid, nullptr); loaded_parts[i]->storeVersionMetadata(); String old_name = renamed_parts.old_and_new_names[i].old_name; /// It's important to create it outside of lock scope because /// otherwise it can lock parts in destructor and deadlock is possible. MergeTreeData::Transaction transaction(*this, local_context->getCurrentTransaction().get()); { auto lock = lockParts(); fillNewPartName(loaded_parts[i], lock); renameTempPartAndAdd(loaded_parts[i], transaction, lock); transaction.commit(&lock); } renamed_parts.old_and_new_names[i].old_name.clear(); results.push_back(PartitionCommandResultInfo{ .command_type = "ATTACH_PART", .partition_id = loaded_parts[i]->info.partition_id, .part_name = loaded_parts[i]->name, .old_part_name = old_name, }); LOG_INFO(log, "Finished attaching part"); } /// New parts with other data may appear in place of deleted parts. local_context->clearCaches(); return results; } void StorageMergeTree::replacePartitionFrom(const StoragePtr & source_table, const ASTPtr & partition, bool replace, ContextPtr local_context) { auto lock1 = lockForShare(local_context->getCurrentQueryId(), local_context->getSettingsRef().lock_acquire_timeout); auto lock2 = source_table->lockForShare(local_context->getCurrentQueryId(), local_context->getSettingsRef().lock_acquire_timeout); auto merges_blocker = stopMergesAndWait(); auto source_metadata_snapshot = source_table->getInMemoryMetadataPtr(); auto my_metadata_snapshot = getInMemoryMetadataPtr(); Stopwatch watch; ProfileEventsScope profile_events_scope; MergeTreeData & src_data = checkStructureAndGetMergeTreeData(source_table, source_metadata_snapshot, my_metadata_snapshot); String partition_id = getPartitionIDFromQuery(partition, local_context); DataPartsVector src_parts = src_data.getVisibleDataPartsVectorInPartition(local_context, partition_id); MutableDataPartsVector dst_parts; std::vector<scope_guard> dst_parts_locks; static const String TMP_PREFIX = "tmp_replace_from_"; for (const DataPartPtr & src_part : src_parts) { if (!canReplacePartition(src_part)) throw Exception(ErrorCodes::BAD_ARGUMENTS, "Cannot replace partition '{}' because part '{}' has inconsistent granularity with table", partition_id, src_part->name); /// This will generate unique name in scope of current server process. Int64 temp_index = insert_increment.get(); MergeTreePartInfo dst_part_info(partition_id, temp_index, temp_index, src_part->info.level); IDataPartStorage::ClonePartParams clone_params{.txn = local_context->getCurrentTransaction()}; auto [dst_part, part_lock] = cloneAndLoadDataPartOnSameDisk(src_part, TMP_PREFIX, dst_part_info, my_metadata_snapshot, clone_params); dst_parts.emplace_back(std::move(dst_part)); dst_parts_locks.emplace_back(std::move(part_lock)); } /// ATTACH empty part set if (!replace && dst_parts.empty()) return; MergeTreePartInfo drop_range; if (replace) { drop_range.partition_id = partition_id; drop_range.min_block = 0; drop_range.max_block = increment.get(); // there will be a "hole" in block numbers drop_range.level = std::numeric_limits<decltype(drop_range.level)>::max(); } /// Atomically add new parts and remove old ones try { { /// Here we use the transaction just like RAII since rare errors in renameTempPartAndReplace() are possible /// and we should be able to rollback already added (Precomitted) parts Transaction transaction(*this, local_context->getCurrentTransaction().get()); auto data_parts_lock = lockParts(); /** It is important that obtaining new block number and adding that block to parts set is done atomically. * Otherwise there is race condition - merge of blocks could happen in interval that doesn't yet contain new part. */ for (auto part : dst_parts) { fillNewPartName(part, data_parts_lock); renameTempPartAndReplaceUnlocked(part, transaction, data_parts_lock); } /// Populate transaction transaction.commit(&data_parts_lock); /// If it is REPLACE (not ATTACH), remove all parts which max_block_number less then min_block_number of the first new block if (replace) removePartsInRangeFromWorkingSet(local_context->getCurrentTransaction().get(), drop_range, data_parts_lock); } /// Note: same elapsed time and profile events for all parts is used PartLog::addNewParts(getContext(), PartLog::createPartLogEntries(dst_parts, watch.elapsed(), profile_events_scope.getSnapshot())); } catch (...) { PartLog::addNewParts(getContext(), PartLog::createPartLogEntries(dst_parts, watch.elapsed()), ExecutionStatus::fromCurrentException("", true)); throw; } } void StorageMergeTree::movePartitionToTable(const StoragePtr & dest_table, const ASTPtr & partition, ContextPtr local_context) { auto lock1 = lockForShare(local_context->getCurrentQueryId(), local_context->getSettingsRef().lock_acquire_timeout); auto lock2 = dest_table->lockForShare(local_context->getCurrentQueryId(), local_context->getSettingsRef().lock_acquire_timeout); auto merges_blocker = stopMergesAndWait(); auto dest_table_storage = std::dynamic_pointer_cast<StorageMergeTree>(dest_table); if (!dest_table_storage) throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Table {} supports movePartitionToTable only for MergeTree family of table engines. Got {}", getStorageID().getNameForLogs(), dest_table->getName()); if (dest_table_storage->getStoragePolicy() != this->getStoragePolicy()) throw Exception(ErrorCodes::UNKNOWN_POLICY, "Destination table {} should have the same storage policy of source table {}. {}: {}, {}: {}", dest_table_storage->getStorageID().getNameForLogs(), getStorageID().getNameForLogs(), getStorageID().getNameForLogs(), this->getStoragePolicy()->getName(), dest_table_storage->getStorageID().getNameForLogs(), dest_table_storage->getStoragePolicy()->getName()); auto dest_metadata_snapshot = dest_table->getInMemoryMetadataPtr(); auto metadata_snapshot = getInMemoryMetadataPtr(); Stopwatch watch; ProfileEventsScope profile_events_scope; MergeTreeData & src_data = dest_table_storage->checkStructureAndGetMergeTreeData(*this, metadata_snapshot, dest_metadata_snapshot); String partition_id = getPartitionIDFromQuery(partition, local_context); DataPartsVector src_parts = src_data.getVisibleDataPartsVectorInPartition(local_context, partition_id); MutableDataPartsVector dst_parts; std::vector<scope_guard> dst_parts_locks; static const String TMP_PREFIX = "tmp_move_from_"; for (const DataPartPtr & src_part : src_parts) { if (!dest_table_storage->canReplacePartition(src_part)) throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot move partition '{}' because part '{}' has inconsistent granularity with table", partition_id, src_part->name); /// This will generate unique name in scope of current server process. Int64 temp_index = insert_increment.get(); MergeTreePartInfo dst_part_info(partition_id, temp_index, temp_index, src_part->info.level); IDataPartStorage::ClonePartParams clone_params{.txn = local_context->getCurrentTransaction()}; auto [dst_part, part_lock] = dest_table_storage->cloneAndLoadDataPartOnSameDisk(src_part, TMP_PREFIX, dst_part_info, dest_metadata_snapshot, clone_params); dst_parts.emplace_back(std::move(dst_part)); dst_parts_locks.emplace_back(std::move(part_lock)); } /// empty part set if (dst_parts.empty()) return; /// Move new parts to the destination table. NOTE It doesn't look atomic. try { { Transaction transaction(*dest_table_storage, local_context->getCurrentTransaction().get()); auto src_data_parts_lock = lockParts(); auto dest_data_parts_lock = dest_table_storage->lockParts(); for (auto & part : dst_parts) { dest_table_storage->fillNewPartName(part, dest_data_parts_lock); dest_table_storage->renameTempPartAndReplaceUnlocked(part, transaction, dest_data_parts_lock); } removePartsFromWorkingSet(local_context->getCurrentTransaction().get(), src_parts, true, src_data_parts_lock); transaction.commit(&src_data_parts_lock); } clearOldPartsFromFilesystem(); /// Note: same elapsed time and profile events for all parts is used PartLog::addNewParts(getContext(), PartLog::createPartLogEntries(dst_parts, watch.elapsed(), profile_events_scope.getSnapshot())); } catch (...) { PartLog::addNewParts(getContext(), PartLog::createPartLogEntries(dst_parts, watch.elapsed()), ExecutionStatus::fromCurrentException("", true)); throw; } } ActionLock StorageMergeTree::getActionLock(StorageActionBlockType action_type) { if (action_type == ActionLocks::PartsMerge) return merger_mutator.merges_blocker.cancel(); else if (action_type == ActionLocks::PartsTTLMerge) return merger_mutator.ttl_merges_blocker.cancel(); else if (action_type == ActionLocks::PartsMove) return parts_mover.moves_blocker.cancel(); return {}; } void StorageMergeTree::onActionLockRemove(StorageActionBlockType action_type) { if (action_type == ActionLocks::PartsMerge || action_type == ActionLocks::PartsTTLMerge) background_operations_assignee.trigger(); else if (action_type == ActionLocks::PartsMove) background_moves_assignee.trigger(); } CheckResults StorageMergeTree::checkData(const ASTPtr & query, ContextPtr local_context) { CheckResults results; DataPartsVector data_parts; if (const auto & check_query = query->as<ASTCheckQuery &>(); check_query.partition) { String partition_id = getPartitionIDFromQuery(check_query.partition, local_context); data_parts = getVisibleDataPartsVectorInPartition(local_context, partition_id); } else data_parts = getVisibleDataPartsVector(local_context); for (auto & part : data_parts) { /// If the checksums file is not present, calculate the checksums and write them to disk. static constexpr auto checksums_path = "checksums.txt"; if (part->isStoredOnDisk() && !part->getDataPartStorage().exists(checksums_path)) { try { auto calculated_checksums = checkDataPart(part, false); calculated_checksums.checkEqual(part->checksums, true); auto & part_mutable = const_cast<IMergeTreeDataPart &>(*part); part_mutable.writeChecksums(part->checksums, local_context->getWriteSettings()); part->checkMetadata(); results.emplace_back(part->name, true, "Checksums recounted and written to disk."); } catch (const Exception & ex) { tryLogCurrentException(log, __PRETTY_FUNCTION__); results.emplace_back(part->name, false, "Check of part finished with error: '" + ex.message() + "'"); } } else { try { checkDataPart(part, true); part->checkMetadata(); results.emplace_back(part->name, true, ""); } catch (const Exception & ex) { results.emplace_back(part->name, false, ex.message()); } } } return results; } void StorageMergeTree::backupData(BackupEntriesCollector & backup_entries_collector, const String & data_path_in_backup, const std::optional<ASTs> & partitions) { const auto & backup_settings = backup_entries_collector.getBackupSettings(); const auto & read_settings = backup_entries_collector.getReadSettings(); auto local_context = backup_entries_collector.getContext(); DataPartsVector data_parts; if (partitions) data_parts = getVisibleDataPartsVectorInPartitions(local_context, getPartitionIDsFromQuery(*partitions, local_context)); else data_parts = getVisibleDataPartsVector(local_context); Int64 min_data_version = std::numeric_limits<Int64>::max(); for (const auto & data_part : data_parts) min_data_version = std::min(min_data_version, data_part->info.getDataVersion() + 1); auto parts_backup_entries = backupParts(data_parts, data_path_in_backup, backup_settings, read_settings, local_context); for (auto & part_backup_entries : parts_backup_entries) backup_entries_collector.addBackupEntries(std::move(part_backup_entries.backup_entries)); backup_entries_collector.addBackupEntries(backupMutations(min_data_version, data_path_in_backup)); } BackupEntries StorageMergeTree::backupMutations(UInt64 version, const String & data_path_in_backup) const { std::lock_guard lock(currently_processing_in_background_mutex); fs::path mutations_path_in_backup = fs::path{data_path_in_backup} / "mutations"; BackupEntries backup_entries; for (auto it = current_mutations_by_version.lower_bound(version); it != current_mutations_by_version.end(); ++it) backup_entries.emplace_back(mutations_path_in_backup / fmt::format("{:010}.txt", it->first), it->second.backup()); return backup_entries; } void StorageMergeTree::attachRestoredParts(MutableDataPartsVector && parts) { for (auto part : parts) { /// It's important to create it outside of lock scope because /// otherwise it can lock parts in destructor and deadlock is possible. MergeTreeData::Transaction transaction(*this, NO_TRANSACTION_RAW); { auto lock = lockParts(); fillNewPartName(part, lock); renameTempPartAndAdd(part, transaction, lock); transaction.commit(&lock); } } } std::map<int64_t, MutationCommands> StorageMergeTree::getAlterMutationCommandsForPart(const DataPartPtr & part) const { std::lock_guard lock(currently_processing_in_background_mutex); UInt64 part_data_version = part->info.getDataVersion(); std::map<int64_t, MutationCommands> result; for (const auto & [mutation_version, entry] : current_mutations_by_version | std::views::reverse) { if (mutation_version > part_data_version) result[mutation_version] = entry.commands; else break; } return result; } void StorageMergeTree::startBackgroundMovesIfNeeded() { if (areBackgroundMovesNeeded()) background_moves_assignee.start(); } std::unique_ptr<MergeTreeSettings> StorageMergeTree::getDefaultSettings() const { return std::make_unique<MergeTreeSettings>(getContext()->getMergeTreeSettings()); } PreparedSetsCachePtr StorageMergeTree::getPreparedSetsCache(Int64 mutation_id) { auto l = std::lock_guard(mutation_prepared_sets_cache_mutex); /// Cleanup stale entries where the shared_ptr is expired. while (!mutation_prepared_sets_cache.empty()) { auto it = mutation_prepared_sets_cache.begin(); if (it->second.lock()) break; mutation_prepared_sets_cache.erase(it); } /// Look up an existing entry. auto it = mutation_prepared_sets_cache.find(mutation_id); if (it != mutation_prepared_sets_cache.end()) { /// If the entry is still alive, return it. auto existing_set_cache = it->second.lock(); if (existing_set_cache) return existing_set_cache; } /// Create new entry. auto cache = std::make_shared<PreparedSetsCache>(); mutation_prepared_sets_cache[mutation_id] = cache; return cache; } void StorageMergeTree::fillNewPartName(MutableDataPartPtr & part, DataPartsLock &) { part->info.min_block = part->info.max_block = increment.get(); part->info.mutation = 0; part->setName(part->getNewName(part->info)); } }