diff options
author | Vasily Gerasimov <UgnineSirdis@gmail.com> | 2022-02-14 16:25:56 +0300 |
---|---|---|
committer | Vasily Gerasimov <UgnineSirdis@gmail.com> | 2022-02-14 16:25:56 +0300 |
commit | faffc8be37c9fe59ca686960f200a3358a083f1c (patch) | |
tree | 85acd38f1fe2dee09b552ac337df5aa44a042a14 | |
parent | a958a3fd80df0116b477ee7b64ea7a639bace22d (diff) | |
download | ydb-faffc8be37c9fe59ca686960f200a3358a083f1c.tar.gz |
YQ-356 Require force mode when topic has grown
ref:7d06cff5cd5a8b313faf54c2e14ae5812347007e
-rw-r--r-- | build/mapping.conf.json | 6 | ||||
-rwxr-xr-x | ya | 4 | ||||
-rw-r--r-- | ydb/library/yql/dq/state/dq_state_load_plan.cpp | 15 | ||||
-rw-r--r-- | ydb/library/yql/dq/state/ut/dq_state_load_plan_ut.cpp | 4 |
4 files changed, 18 insertions, 11 deletions
diff --git a/build/mapping.conf.json b/build/mapping.conf.json index 12f563c40a5..d9e361ebc6a 100644 --- a/build/mapping.conf.json +++ b/build/mapping.conf.json @@ -2918,6 +2918,12 @@ "2786406421": "https://storage.mds.yandex.net/get-devtools-opensource/471749/2786406421", "2786406721": "https://storage.mds.yandex.net/get-devtools-opensource/233854/2786406721", "2786406951": "https://storage.mds.yandex.net/get-devtools-opensource/471749/2786406951", + "2786636274": "https://storage.mds.yandex.net/get-devtools-opensource/250854/2786636274", + "2786636814": "https://storage.mds.yandex.net/get-devtools-opensource/373962/2786636814", + "2786636995": "https://storage.mds.yandex.net/get-devtools-opensource/479623/2786636995", + "2786659959": "https://storage.mds.yandex.net/get-devtools-opensource/479623/2786659959", + "2786660754": "https://storage.mds.yandex.net/get-devtools-opensource/471749/2786660754", + "2786660966": "https://storage.mds.yandex.net/get-devtools-opensource/233854/2786660966", "309054781": "https://storage.mds.yandex.net/get-devtools-opensource/250854/309054781", "360916612": "https://storage.mds.yandex.net/get-devtools-opensource/233854/360916612", "412716868": "https://storage.mds.yandex.net/get-devtools-opensource/233854/412716868", @@ -4,8 +4,8 @@ import sys import platform import json -URLS = ["https://storage.mds.yandex.net/get-devtools-opensource/233854/8c07a75cc7b4d0fb6896b0376d449df8"] -MD5 = "8c07a75cc7b4d0fb6896b0376d449df8" +URLS = ["https://storage.mds.yandex.net/get-devtools-opensource/479623/d572a4841e2b2dcca14450f609635d2f"] +MD5 = "d572a4841e2b2dcca14450f609635d2f" RETRIES = 5 HASH_PREFIX = 10 diff --git a/ydb/library/yql/dq/state/dq_state_load_plan.cpp b/ydb/library/yql/dq/state/dq_state_load_plan.cpp index 79000f07a1d..22275a74c10 100644 --- a/ydb/library/yql/dq/state/dq_state_load_plan.cpp +++ b/ydb/library/yql/dq/state/dq_state_load_plan.cpp @@ -157,6 +157,8 @@ bool MakeContinueFromStreamingOffsetsPlan( THashMap<ui64, NDqProto::NDqStateLoadPlan::TTaskPlan>& plan, TIssues& issues) { +#define FORCE_MSG(msg) (force ? ". " msg : ". Use force mode to ignore this issue") + bool result = true; // Build src mapping TTopicsMapping srcMapping; @@ -196,7 +198,7 @@ bool MakeContinueFromStreamingOffsetsPlan( } const auto mappingInfoIt = srcMapping.find(TTopic{srcDesc.GetDatabaseId(), srcDesc.GetDatabase(), srcDesc.GetTopicPath()}); if (mappingInfoIt == srcMapping.end()) { - ISSUE("Topic `" << srcDesc.GetTopicPath() << "` is not found in previous query" << (force ? ". Query will use fresh offsets for its partitions" : ". Use force mode to ignore this issue")); + ISSUE("Topic `" << srcDesc.GetTopicPath() << "` is not found in previous query" << FORCE_MSG("Query will use fresh offsets for its partitions")); continue; } TTopicMappingInfo& mappingInfo = mappingInfoIt->second; @@ -209,13 +211,10 @@ bool MakeContinueFromStreamingOffsetsPlan( do { auto [taskBegin, taskEnd] = mappingInfo.PartitionsMapping.equal_range(currentPartition); if (taskBegin == taskEnd) { - // Normal case. Topic was extended. Print warning and continue. - TIssue issue(TStringBuilder() << "Topic `" << srcDesc.GetTopicPath() << "` partition " << currentPartition << " is not found in previous query. Query will use fresh offsets for it"); - issue.SetCode(TIssuesIds::WARNING, TSeverityIds::S_WARNING); - issues.AddIssue(std::move(issue)); + ISSUE("Topic `" << srcDesc.GetTopicPath() << "` partition " << currentPartition << " is not found in previous query" << FORCE_MSG("Query will use fresh offsets for it")); } else { if (std::distance(taskBegin, taskEnd) > 1) { - ISSUE("Topic `" << srcDesc.GetTopicPath() << "` partition " << currentPartition << " has ambiguous offsets source in previous query checkpoint" << (force ? ". Query will use minimum offset to avoid skipping data" : ". Use force mode to ignore this issue")); + ISSUE("Topic `" << srcDesc.GetTopicPath() << "` partition " << currentPartition << " has ambiguous offsets source in previous query checkpoint" << FORCE_MSG("Query will use minimum offset to avoid skipping data")); } for (; taskBegin != taskEnd; ++taskBegin) { tasksSet.insert(taskBegin->second); @@ -242,10 +241,12 @@ bool MakeContinueFromStreamingOffsetsPlan( } for (const auto& [topic, mappingInfo] : srcMapping) { if (!mappingInfo.Used) { - ISSUE("Topic `" << topic.TopicPath << "` is read in previous query but is not read in new query" << (force ? ". Reading offsets will be lost in next checkpoint" : ". Use force mode to ignore this issue")); + ISSUE("Topic `" << topic.TopicPath << "` is read in previous query but is not read in new query" << FORCE_MSG("Reading offsets will be lost in next checkpoint")); } } return result; + +#undef FORCE_MSG } } // namespace NYql::NDq diff --git a/ydb/library/yql/dq/state/ut/dq_state_load_plan_ut.cpp b/ydb/library/yql/dq/state/ut/dq_state_load_plan_ut.cpp index c690578f9c6..0c17f21f002 100644 --- a/ydb/library/yql/dq/state/ut/dq_state_load_plan_ut.cpp +++ b/ydb/library/yql/dq/state/ut/dq_state_load_plan_ut.cpp @@ -361,7 +361,7 @@ Y_UNIT_TEST_SUITE_F(TContinueFromStreamingOffsetsPlanTest, TTestCase) { AssertTaskPlanIsEmpty(2); } - Y_UNIT_TEST(NotMappedAllPartitionsIsOk) { + Y_UNIT_TEST(NotMappedAllPartitions) { SrcGraph .Task() .Input().TopicSource("t", 5, 1, 0).Build() @@ -375,7 +375,7 @@ Y_UNIT_TEST_SUITE_F(TContinueFromStreamingOffsetsPlanTest, TTestCase) { .Input().TopicSource("t", 10, 2, 1).Build() .Build(); - UNIT_ASSERT(MakePlan(false)); + UNIT_ASSERT(!MakePlan(false)); UNIT_ASSERT_UNEQUAL(Issues.Size(), 0); UNIT_ASSERT(MakePlan(true)); UNIT_ASSERT_UNEQUAL(Issues.Size(), 0); |