aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorVasily Gerasimov <UgnineSirdis@gmail.com>2022-02-14 16:25:56 +0300
committerVasily Gerasimov <UgnineSirdis@gmail.com>2022-02-14 16:25:56 +0300
commitfaffc8be37c9fe59ca686960f200a3358a083f1c (patch)
tree85acd38f1fe2dee09b552ac337df5aa44a042a14
parenta958a3fd80df0116b477ee7b64ea7a639bace22d (diff)
downloadydb-faffc8be37c9fe59ca686960f200a3358a083f1c.tar.gz
YQ-356 Require force mode when topic has grown
ref:7d06cff5cd5a8b313faf54c2e14ae5812347007e
-rw-r--r--build/mapping.conf.json6
-rwxr-xr-xya4
-rw-r--r--ydb/library/yql/dq/state/dq_state_load_plan.cpp15
-rw-r--r--ydb/library/yql/dq/state/ut/dq_state_load_plan_ut.cpp4
4 files changed, 18 insertions, 11 deletions
diff --git a/build/mapping.conf.json b/build/mapping.conf.json
index 12f563c40a5..d9e361ebc6a 100644
--- a/build/mapping.conf.json
+++ b/build/mapping.conf.json
@@ -2918,6 +2918,12 @@
"2786406421": "https://storage.mds.yandex.net/get-devtools-opensource/471749/2786406421",
"2786406721": "https://storage.mds.yandex.net/get-devtools-opensource/233854/2786406721",
"2786406951": "https://storage.mds.yandex.net/get-devtools-opensource/471749/2786406951",
+ "2786636274": "https://storage.mds.yandex.net/get-devtools-opensource/250854/2786636274",
+ "2786636814": "https://storage.mds.yandex.net/get-devtools-opensource/373962/2786636814",
+ "2786636995": "https://storage.mds.yandex.net/get-devtools-opensource/479623/2786636995",
+ "2786659959": "https://storage.mds.yandex.net/get-devtools-opensource/479623/2786659959",
+ "2786660754": "https://storage.mds.yandex.net/get-devtools-opensource/471749/2786660754",
+ "2786660966": "https://storage.mds.yandex.net/get-devtools-opensource/233854/2786660966",
"309054781": "https://storage.mds.yandex.net/get-devtools-opensource/250854/309054781",
"360916612": "https://storage.mds.yandex.net/get-devtools-opensource/233854/360916612",
"412716868": "https://storage.mds.yandex.net/get-devtools-opensource/233854/412716868",
diff --git a/ya b/ya
index 16a7f4ab3e6..6ee223ac482 100755
--- a/ya
+++ b/ya
@@ -4,8 +4,8 @@ import sys
import platform
import json
-URLS = ["https://storage.mds.yandex.net/get-devtools-opensource/233854/8c07a75cc7b4d0fb6896b0376d449df8"]
-MD5 = "8c07a75cc7b4d0fb6896b0376d449df8"
+URLS = ["https://storage.mds.yandex.net/get-devtools-opensource/479623/d572a4841e2b2dcca14450f609635d2f"]
+MD5 = "d572a4841e2b2dcca14450f609635d2f"
RETRIES = 5
HASH_PREFIX = 10
diff --git a/ydb/library/yql/dq/state/dq_state_load_plan.cpp b/ydb/library/yql/dq/state/dq_state_load_plan.cpp
index 79000f07a1d..22275a74c10 100644
--- a/ydb/library/yql/dq/state/dq_state_load_plan.cpp
+++ b/ydb/library/yql/dq/state/dq_state_load_plan.cpp
@@ -157,6 +157,8 @@ bool MakeContinueFromStreamingOffsetsPlan(
THashMap<ui64, NDqProto::NDqStateLoadPlan::TTaskPlan>& plan,
TIssues& issues)
{
+#define FORCE_MSG(msg) (force ? ". " msg : ". Use force mode to ignore this issue")
+
bool result = true;
// Build src mapping
TTopicsMapping srcMapping;
@@ -196,7 +198,7 @@ bool MakeContinueFromStreamingOffsetsPlan(
}
const auto mappingInfoIt = srcMapping.find(TTopic{srcDesc.GetDatabaseId(), srcDesc.GetDatabase(), srcDesc.GetTopicPath()});
if (mappingInfoIt == srcMapping.end()) {
- ISSUE("Topic `" << srcDesc.GetTopicPath() << "` is not found in previous query" << (force ? ". Query will use fresh offsets for its partitions" : ". Use force mode to ignore this issue"));
+ ISSUE("Topic `" << srcDesc.GetTopicPath() << "` is not found in previous query" << FORCE_MSG("Query will use fresh offsets for its partitions"));
continue;
}
TTopicMappingInfo& mappingInfo = mappingInfoIt->second;
@@ -209,13 +211,10 @@ bool MakeContinueFromStreamingOffsetsPlan(
do {
auto [taskBegin, taskEnd] = mappingInfo.PartitionsMapping.equal_range(currentPartition);
if (taskBegin == taskEnd) {
- // Normal case. Topic was extended. Print warning and continue.
- TIssue issue(TStringBuilder() << "Topic `" << srcDesc.GetTopicPath() << "` partition " << currentPartition << " is not found in previous query. Query will use fresh offsets for it");
- issue.SetCode(TIssuesIds::WARNING, TSeverityIds::S_WARNING);
- issues.AddIssue(std::move(issue));
+ ISSUE("Topic `" << srcDesc.GetTopicPath() << "` partition " << currentPartition << " is not found in previous query" << FORCE_MSG("Query will use fresh offsets for it"));
} else {
if (std::distance(taskBegin, taskEnd) > 1) {
- ISSUE("Topic `" << srcDesc.GetTopicPath() << "` partition " << currentPartition << " has ambiguous offsets source in previous query checkpoint" << (force ? ". Query will use minimum offset to avoid skipping data" : ". Use force mode to ignore this issue"));
+ ISSUE("Topic `" << srcDesc.GetTopicPath() << "` partition " << currentPartition << " has ambiguous offsets source in previous query checkpoint" << FORCE_MSG("Query will use minimum offset to avoid skipping data"));
}
for (; taskBegin != taskEnd; ++taskBegin) {
tasksSet.insert(taskBegin->second);
@@ -242,10 +241,12 @@ bool MakeContinueFromStreamingOffsetsPlan(
}
for (const auto& [topic, mappingInfo] : srcMapping) {
if (!mappingInfo.Used) {
- ISSUE("Topic `" << topic.TopicPath << "` is read in previous query but is not read in new query" << (force ? ". Reading offsets will be lost in next checkpoint" : ". Use force mode to ignore this issue"));
+ ISSUE("Topic `" << topic.TopicPath << "` is read in previous query but is not read in new query" << FORCE_MSG("Reading offsets will be lost in next checkpoint"));
}
}
return result;
+
+#undef FORCE_MSG
}
} // namespace NYql::NDq
diff --git a/ydb/library/yql/dq/state/ut/dq_state_load_plan_ut.cpp b/ydb/library/yql/dq/state/ut/dq_state_load_plan_ut.cpp
index c690578f9c6..0c17f21f002 100644
--- a/ydb/library/yql/dq/state/ut/dq_state_load_plan_ut.cpp
+++ b/ydb/library/yql/dq/state/ut/dq_state_load_plan_ut.cpp
@@ -361,7 +361,7 @@ Y_UNIT_TEST_SUITE_F(TContinueFromStreamingOffsetsPlanTest, TTestCase) {
AssertTaskPlanIsEmpty(2);
}
- Y_UNIT_TEST(NotMappedAllPartitionsIsOk) {
+ Y_UNIT_TEST(NotMappedAllPartitions) {
SrcGraph
.Task()
.Input().TopicSource("t", 5, 1, 0).Build()
@@ -375,7 +375,7 @@ Y_UNIT_TEST_SUITE_F(TContinueFromStreamingOffsetsPlanTest, TTestCase) {
.Input().TopicSource("t", 10, 2, 1).Build()
.Build();
- UNIT_ASSERT(MakePlan(false));
+ UNIT_ASSERT(!MakePlan(false));
UNIT_ASSERT_UNEQUAL(Issues.Size(), 0);
UNIT_ASSERT(MakePlan(true));
UNIT_ASSERT_UNEQUAL(Issues.Size(), 0);