aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/clickhouse/src/Storages
diff options
context:
space:
mode:
authorrobot-piglet <robot-piglet@yandex-team.com>2023-11-28 09:30:11 +0300
committerrobot-piglet <robot-piglet@yandex-team.com>2023-11-28 09:51:56 +0300
commit86cac93557a380155b8a87abd5802e5d40c027bf (patch)
tree73f5784964fd8907e03406f69008b3ac0cf2a8bb /contrib/clickhouse/src/Storages
parentd4a181fef66ec965883429981b9b95203a3b3d40 (diff)
downloadydb-86cac93557a380155b8a87abd5802e5d40c027bf.tar.gz
Intermediate changes
Diffstat (limited to 'contrib/clickhouse/src/Storages')
-rw-r--r--contrib/clickhouse/src/Storages/MergeTree/DataPartsExchange.cpp55
-rw-r--r--contrib/clickhouse/src/Storages/MergeTree/MergeTreeSettings.h1
-rw-r--r--contrib/clickhouse/src/Storages/MergeTree/ReplicatedMergeTreeSink.cpp8
3 files changed, 54 insertions, 10 deletions
diff --git a/contrib/clickhouse/src/Storages/MergeTree/DataPartsExchange.cpp b/contrib/clickhouse/src/Storages/MergeTree/DataPartsExchange.cpp
index bc0cba9541..4867d6353d 100644
--- a/contrib/clickhouse/src/Storages/MergeTree/DataPartsExchange.cpp
+++ b/contrib/clickhouse/src/Storages/MergeTree/DataPartsExchange.cpp
@@ -65,8 +65,6 @@ constexpr auto REPLICATION_PROTOCOL_VERSION_WITH_PARTS_UUID = 5;
constexpr auto REPLICATION_PROTOCOL_VERSION_WITH_PARTS_ZERO_COPY = 6;
constexpr auto REPLICATION_PROTOCOL_VERSION_WITH_PARTS_PROJECTION = 7;
constexpr auto REPLICATION_PROTOCOL_VERSION_WITH_METADATA_VERSION = 8;
-// Reserved for ALTER PRIMARY KEY
-// constexpr auto REPLICATION_PROTOCOL_VERSION_WITH_PARTS_PRIMARY_KEY = 9;
std::string getEndpointId(const std::string & node_id)
{
@@ -351,22 +349,60 @@ MergeTreeData::DataPart::Checksums Service::sendPartFromDisk(
return data_checksums;
}
+bool wait_loop(UInt32 wait_timeout_ms, const std::function<bool()> & pred)
+{
+ static const UInt32 loop_delay_ms = 5;
+
+ /// this is sleep-based wait, it has to be short
+ chassert(wait_timeout_ms < 2000);
+
+ if (pred())
+ return true;
+
+ Stopwatch timer;
+ sleepForMilliseconds(loop_delay_ms);
+ while (!pred() && timer.elapsedMilliseconds() < wait_timeout_ms)
+ {
+ sleepForMilliseconds(loop_delay_ms);
+ }
+
+ return pred();
+}
+
MergeTreeData::DataPartPtr Service::findPart(const String & name)
{
/// It is important to include Outdated parts here because remote replicas cannot reliably
/// determine the local state of the part, so queries for the parts in these states are completely normal.
MergeTreeData::DataPartPtr part;
- /// Ephemeral zero-copy lock may be lost for PreActive parts
+ part = data.getPartIfExists(name, {MergeTreeDataPartState::PreActive, MergeTreeDataPartState::Active, MergeTreeDataPartState::Outdated});
+
+ if (!part)
+ throw Exception(ErrorCodes::NO_SUCH_DATA_PART, "No part {} in table", name);
+
bool zero_copy_enabled = data.getSettings()->allow_remote_fs_zero_copy_replication;
- if (zero_copy_enabled)
- part = data.getPartIfExists(name, {MergeTreeDataPartState::Active, MergeTreeDataPartState::Outdated});
- else
- part = data.getPartIfExists(name, {MergeTreeDataPartState::PreActive, MergeTreeDataPartState::Active, MergeTreeDataPartState::Outdated});
- if (part)
+ if (!zero_copy_enabled)
return part;
- throw Exception(ErrorCodes::NO_SUCH_DATA_PART, "No part {} in table", name);
+ /// Ephemeral zero-copy lock may be lost for PreActive parts
+ /// do not expose PreActive parts for zero-copy
+
+ static const UInt32 wait_timeout_ms = 1000;
+ auto pred = [&] ()
+ {
+ auto lock = data.lockParts();
+ return part->getState() != MergeTreeDataPartState::PreActive;
+ };
+
+ bool pred_result = wait_loop(wait_timeout_ms, pred);
+ if (!pred_result)
+ throw Exception(
+ ErrorCodes::ABORTED,
+ "Could not exchange part {} as it's in preActive state ({} ms) and it uses zero copy replication. "
+ "This is expected behaviour and the client will retry fetching the part automatically.",
+ name, wait_timeout_ms);
+
+ return part;
}
Fetcher::Fetcher(StorageReplicatedMergeTree & data_)
@@ -493,7 +529,6 @@ std::pair<MergeTreeData::MutableDataPartPtr, scope_guard> Fetcher::fetchSelected
static_cast<uint64_t>(data_settings->replicated_max_parallel_fetches_for_host));
int server_protocol_version = parse<int>(in->getResponseCookie("server_protocol_version", "0"));
-
String remote_fs_metadata = parse<String>(in->getResponseCookie("remote_fs_metadata", ""));
DiskPtr preffered_disk = disk;
diff --git a/contrib/clickhouse/src/Storages/MergeTree/MergeTreeSettings.h b/contrib/clickhouse/src/Storages/MergeTree/MergeTreeSettings.h
index 4b86045604..ff2c946d85 100644
--- a/contrib/clickhouse/src/Storages/MergeTree/MergeTreeSettings.h
+++ b/contrib/clickhouse/src/Storages/MergeTree/MergeTreeSettings.h
@@ -85,6 +85,7 @@ struct Settings;
M(UInt64, min_delay_to_insert_ms, 10, "Min delay of inserting data into MergeTree table in milliseconds, if there are a lot of unmerged parts in single partition.", 0) \
M(UInt64, max_parts_in_total, 100000, "If more than this number active parts in all partitions in total, throw 'Too many parts ...' exception.", 0) \
M(Bool, async_insert, false, "If true, data from INSERT query is stored in queue and later flushed to table in background.", 0) \
+ M(Milliseconds, sleep_before_commit_local_part_in_replicated_table_ms, 0, "For testing. Do not change it.", 0) \
\
/* Part removal settings. */ \
M(UInt64, simultaneous_parts_removal_limit, 0, "Maximum number of parts to remove during one CleanupThread iteration (0 means unlimited).", 0) \
diff --git a/contrib/clickhouse/src/Storages/MergeTree/ReplicatedMergeTreeSink.cpp b/contrib/clickhouse/src/Storages/MergeTree/ReplicatedMergeTreeSink.cpp
index 2894760a94..cd6b38df52 100644
--- a/contrib/clickhouse/src/Storages/MergeTree/ReplicatedMergeTreeSink.cpp
+++ b/contrib/clickhouse/src/Storages/MergeTree/ReplicatedMergeTreeSink.cpp
@@ -910,6 +910,14 @@ std::pair<std::vector<String>, bool> ReplicatedMergeTreeSinkImpl<async_insert>::
Coordination::Error multi_code = zookeeper->tryMultiNoThrow(ops, responses); /// 1 RTT
if (multi_code == Coordination::Error::ZOK)
{
+ auto sleep_before_commit_local_part_in_replicated_table_ms = storage.getSettings()->sleep_before_commit_local_part_in_replicated_table_ms;
+ if (sleep_before_commit_local_part_in_replicated_table_ms.totalMilliseconds())
+ {
+ LOG_INFO(log, "committing part {}, triggered sleep_before_commit_local_part_in_replicated_table_ms {}",
+ part->name, sleep_before_commit_local_part_in_replicated_table_ms.totalMilliseconds());
+ sleepForMilliseconds(sleep_before_commit_local_part_in_replicated_table_ms.totalMilliseconds());
+ }
+
part->new_part_was_committed_to_zookeeper_after_rename_on_disk = true;
transaction.commit();
storage.merge_selecting_task->schedule();