diff options
author | robot-piglet <robot-piglet@yandex-team.com> | 2023-11-28 09:30:11 +0300 |
---|---|---|
committer | robot-piglet <robot-piglet@yandex-team.com> | 2023-11-28 09:51:56 +0300 |
commit | 86cac93557a380155b8a87abd5802e5d40c027bf (patch) | |
tree | 73f5784964fd8907e03406f69008b3ac0cf2a8bb /contrib/clickhouse/src/Storages | |
parent | d4a181fef66ec965883429981b9b95203a3b3d40 (diff) | |
download | ydb-86cac93557a380155b8a87abd5802e5d40c027bf.tar.gz |
Intermediate changes
Diffstat (limited to 'contrib/clickhouse/src/Storages')
3 files changed, 54 insertions, 10 deletions
diff --git a/contrib/clickhouse/src/Storages/MergeTree/DataPartsExchange.cpp b/contrib/clickhouse/src/Storages/MergeTree/DataPartsExchange.cpp index bc0cba9541..4867d6353d 100644 --- a/contrib/clickhouse/src/Storages/MergeTree/DataPartsExchange.cpp +++ b/contrib/clickhouse/src/Storages/MergeTree/DataPartsExchange.cpp @@ -65,8 +65,6 @@ constexpr auto REPLICATION_PROTOCOL_VERSION_WITH_PARTS_UUID = 5; constexpr auto REPLICATION_PROTOCOL_VERSION_WITH_PARTS_ZERO_COPY = 6; constexpr auto REPLICATION_PROTOCOL_VERSION_WITH_PARTS_PROJECTION = 7; constexpr auto REPLICATION_PROTOCOL_VERSION_WITH_METADATA_VERSION = 8; -// Reserved for ALTER PRIMARY KEY -// constexpr auto REPLICATION_PROTOCOL_VERSION_WITH_PARTS_PRIMARY_KEY = 9; std::string getEndpointId(const std::string & node_id) { @@ -351,22 +349,60 @@ MergeTreeData::DataPart::Checksums Service::sendPartFromDisk( return data_checksums; } +bool wait_loop(UInt32 wait_timeout_ms, const std::function<bool()> & pred) +{ + static const UInt32 loop_delay_ms = 5; + + /// this is sleep-based wait, it has to be short + chassert(wait_timeout_ms < 2000); + + if (pred()) + return true; + + Stopwatch timer; + sleepForMilliseconds(loop_delay_ms); + while (!pred() && timer.elapsedMilliseconds() < wait_timeout_ms) + { + sleepForMilliseconds(loop_delay_ms); + } + + return pred(); +} + MergeTreeData::DataPartPtr Service::findPart(const String & name) { /// It is important to include Outdated parts here because remote replicas cannot reliably /// determine the local state of the part, so queries for the parts in these states are completely normal. MergeTreeData::DataPartPtr part; - /// Ephemeral zero-copy lock may be lost for PreActive parts + part = data.getPartIfExists(name, {MergeTreeDataPartState::PreActive, MergeTreeDataPartState::Active, MergeTreeDataPartState::Outdated}); + + if (!part) + throw Exception(ErrorCodes::NO_SUCH_DATA_PART, "No part {} in table", name); + bool zero_copy_enabled = data.getSettings()->allow_remote_fs_zero_copy_replication; - if (zero_copy_enabled) - part = data.getPartIfExists(name, {MergeTreeDataPartState::Active, MergeTreeDataPartState::Outdated}); - else - part = data.getPartIfExists(name, {MergeTreeDataPartState::PreActive, MergeTreeDataPartState::Active, MergeTreeDataPartState::Outdated}); - if (part) + if (!zero_copy_enabled) return part; - throw Exception(ErrorCodes::NO_SUCH_DATA_PART, "No part {} in table", name); + /// Ephemeral zero-copy lock may be lost for PreActive parts + /// do not expose PreActive parts for zero-copy + + static const UInt32 wait_timeout_ms = 1000; + auto pred = [&] () + { + auto lock = data.lockParts(); + return part->getState() != MergeTreeDataPartState::PreActive; + }; + + bool pred_result = wait_loop(wait_timeout_ms, pred); + if (!pred_result) + throw Exception( + ErrorCodes::ABORTED, + "Could not exchange part {} as it's in preActive state ({} ms) and it uses zero copy replication. " + "This is expected behaviour and the client will retry fetching the part automatically.", + name, wait_timeout_ms); + + return part; } Fetcher::Fetcher(StorageReplicatedMergeTree & data_) @@ -493,7 +529,6 @@ std::pair<MergeTreeData::MutableDataPartPtr, scope_guard> Fetcher::fetchSelected static_cast<uint64_t>(data_settings->replicated_max_parallel_fetches_for_host)); int server_protocol_version = parse<int>(in->getResponseCookie("server_protocol_version", "0")); - String remote_fs_metadata = parse<String>(in->getResponseCookie("remote_fs_metadata", "")); DiskPtr preffered_disk = disk; diff --git a/contrib/clickhouse/src/Storages/MergeTree/MergeTreeSettings.h b/contrib/clickhouse/src/Storages/MergeTree/MergeTreeSettings.h index 4b86045604..ff2c946d85 100644 --- a/contrib/clickhouse/src/Storages/MergeTree/MergeTreeSettings.h +++ b/contrib/clickhouse/src/Storages/MergeTree/MergeTreeSettings.h @@ -85,6 +85,7 @@ struct Settings; M(UInt64, min_delay_to_insert_ms, 10, "Min delay of inserting data into MergeTree table in milliseconds, if there are a lot of unmerged parts in single partition.", 0) \ M(UInt64, max_parts_in_total, 100000, "If more than this number active parts in all partitions in total, throw 'Too many parts ...' exception.", 0) \ M(Bool, async_insert, false, "If true, data from INSERT query is stored in queue and later flushed to table in background.", 0) \ + M(Milliseconds, sleep_before_commit_local_part_in_replicated_table_ms, 0, "For testing. Do not change it.", 0) \ \ /* Part removal settings. */ \ M(UInt64, simultaneous_parts_removal_limit, 0, "Maximum number of parts to remove during one CleanupThread iteration (0 means unlimited).", 0) \ diff --git a/contrib/clickhouse/src/Storages/MergeTree/ReplicatedMergeTreeSink.cpp b/contrib/clickhouse/src/Storages/MergeTree/ReplicatedMergeTreeSink.cpp index 2894760a94..cd6b38df52 100644 --- a/contrib/clickhouse/src/Storages/MergeTree/ReplicatedMergeTreeSink.cpp +++ b/contrib/clickhouse/src/Storages/MergeTree/ReplicatedMergeTreeSink.cpp @@ -910,6 +910,14 @@ std::pair<std::vector<String>, bool> ReplicatedMergeTreeSinkImpl<async_insert>:: Coordination::Error multi_code = zookeeper->tryMultiNoThrow(ops, responses); /// 1 RTT if (multi_code == Coordination::Error::ZOK) { + auto sleep_before_commit_local_part_in_replicated_table_ms = storage.getSettings()->sleep_before_commit_local_part_in_replicated_table_ms; + if (sleep_before_commit_local_part_in_replicated_table_ms.totalMilliseconds()) + { + LOG_INFO(log, "committing part {}, triggered sleep_before_commit_local_part_in_replicated_table_ms {}", + part->name, sleep_before_commit_local_part_in_replicated_table_ms.totalMilliseconds()); + sleepForMilliseconds(sleep_before_commit_local_part_in_replicated_table_ms.totalMilliseconds()); + } + part->new_part_was_committed_to_zookeeper_after_rename_on_disk = true; transaction.commit(); storage.merge_selecting_task->schedule(); |