aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorNikolay Shestakov <tesseract@ydb.tech>2024-04-27 15:18:01 +0500
committerGitHub <noreply@github.com>2024-04-27 15:18:01 +0500
commit1b8616baf3b031cbc1ebcb0a6eba49c3f6c37ddf (patch)
tree233a78819a316c519ce42c8c1812c43a3da3d522
parent3d12c621bebc79d16bc50587d1e0076a20068f0a (diff)
downloadydb-1b8616baf3b031cbc1ebcb0a6eba49c3f6c37ddf.tar.gz
Prohibit commit to the past for inactive partitions (#4170)
-rw-r--r--ydb/core/persqueue/partition.cpp9
-rw-r--r--ydb/core/persqueue/ut/autoscaling_ut.cpp29
2 files changed, 38 insertions, 0 deletions
diff --git a/ydb/core/persqueue/partition.cpp b/ydb/core/persqueue/partition.cpp
index 35ba494b10b..b0b268b72cb 100644
--- a/ydb/core/persqueue/partition.cpp
+++ b/ydb/core/persqueue/partition.cpp
@@ -2452,6 +2452,15 @@ void TPartition::ProcessUserAct(TEvPQ::TEvSetClientInfo& act,
*/
}
+ if (!IsActive() && act.Type == TEvPQ::TEvSetClientInfo::ESCI_OFFSET && static_cast<i64>(EndOffset) == userInfo.Offset && offset < EndOffset) {
+ TabletCounters.Cumulative()[COUNTER_PQ_SET_CLIENT_OFFSET_ERROR].Increment(1);
+ ScheduleReplyError(act.Cookie,
+ NPersQueue::NErrorCode::SET_OFFSET_ERROR_COMMIT_TO_PAST,
+ TStringBuilder() << "set offset " << act.Offset << " to past for consumer " << act.ClientId << " for inactive partition");
+
+ return;
+ }
+
EmulatePostProcessUserAct(act, userInfo, ctx);
}
diff --git a/ydb/core/persqueue/ut/autoscaling_ut.cpp b/ydb/core/persqueue/ut/autoscaling_ut.cpp
index 34ce7796bd2..6d033a07daf 100644
--- a/ydb/core/persqueue/ut/autoscaling_ut.cpp
+++ b/ydb/core/persqueue/ut/autoscaling_ut.cpp
@@ -307,6 +307,35 @@ Y_UNIT_TEST_SUITE(TopicSplitMerge) {
readSession1.Close();
readSession2.Close();
}
+
+ Y_UNIT_TEST(CommitTopPast) {
+ TTopicSdkTestSetup setup = CreateSetup();
+ setup.CreateTopic(TEST_TOPIC, TEST_CONSUMER, 1, 100);
+
+ TTopicClient client = setup.MakeClient();
+
+ auto writeSession = CreateWriteSession(client, "producer-1", 0);
+ UNIT_ASSERT(writeSession->Write(Msg("message_1", 2)));
+ UNIT_ASSERT(writeSession->Write(Msg("message_2", 3)));
+
+ ui64 txId = 1023;
+ SplitPartition(setup, ++txId, 0, "a");
+
+ auto status = client.CommitOffset(TEST_TOPIC, 0, TEST_CONSUMER, 0).GetValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(NYdb::EStatus::SUCCESS, status.GetStatus(), "The consumer has just started reading the inactive partition and he can commit");
+
+ status = client.CommitOffset(TEST_TOPIC, 0, TEST_CONSUMER, 1).GetValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(NYdb::EStatus::SUCCESS, status.GetStatus(), "A consumer who has not read to the end can commit messages forward.");
+
+ status = client.CommitOffset(TEST_TOPIC, 0, TEST_CONSUMER, 0).GetValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(NYdb::EStatus::SUCCESS, status.GetStatus(), "A consumer who has not read to the end can commit messages back.");
+
+ status = client.CommitOffset(TEST_TOPIC, 0, TEST_CONSUMER, 2).GetValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(NYdb::EStatus::SUCCESS, status.GetStatus(), "The consumer can commit at the end of the inactive partition.");
+
+ status = client.CommitOffset(TEST_TOPIC, 0, TEST_CONSUMER, 0).GetValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(NYdb::EStatus::BAD_REQUEST, status.GetStatus(), "The consumer cannot commit an offset for inactive, read-to-the-end partitions.");
+ }
}
} // namespace NKikimr