diff options
author | Nikolay Shestakov <tesseract@ydb.tech> | 2024-04-27 15:18:01 +0500 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-04-27 15:18:01 +0500 |
commit | 1b8616baf3b031cbc1ebcb0a6eba49c3f6c37ddf (patch) | |
tree | 233a78819a316c519ce42c8c1812c43a3da3d522 | |
parent | 3d12c621bebc79d16bc50587d1e0076a20068f0a (diff) | |
download | ydb-1b8616baf3b031cbc1ebcb0a6eba49c3f6c37ddf.tar.gz |
Prohibit commit to the past for inactive partitions (#4170)
-rw-r--r-- | ydb/core/persqueue/partition.cpp | 9 | ||||
-rw-r--r-- | ydb/core/persqueue/ut/autoscaling_ut.cpp | 29 |
2 files changed, 38 insertions, 0 deletions
diff --git a/ydb/core/persqueue/partition.cpp b/ydb/core/persqueue/partition.cpp index 35ba494b10b..b0b268b72cb 100644 --- a/ydb/core/persqueue/partition.cpp +++ b/ydb/core/persqueue/partition.cpp @@ -2452,6 +2452,15 @@ void TPartition::ProcessUserAct(TEvPQ::TEvSetClientInfo& act, */ } + if (!IsActive() && act.Type == TEvPQ::TEvSetClientInfo::ESCI_OFFSET && static_cast<i64>(EndOffset) == userInfo.Offset && offset < EndOffset) { + TabletCounters.Cumulative()[COUNTER_PQ_SET_CLIENT_OFFSET_ERROR].Increment(1); + ScheduleReplyError(act.Cookie, + NPersQueue::NErrorCode::SET_OFFSET_ERROR_COMMIT_TO_PAST, + TStringBuilder() << "set offset " << act.Offset << " to past for consumer " << act.ClientId << " for inactive partition"); + + return; + } + EmulatePostProcessUserAct(act, userInfo, ctx); } diff --git a/ydb/core/persqueue/ut/autoscaling_ut.cpp b/ydb/core/persqueue/ut/autoscaling_ut.cpp index 34ce7796bd2..6d033a07daf 100644 --- a/ydb/core/persqueue/ut/autoscaling_ut.cpp +++ b/ydb/core/persqueue/ut/autoscaling_ut.cpp @@ -307,6 +307,35 @@ Y_UNIT_TEST_SUITE(TopicSplitMerge) { readSession1.Close(); readSession2.Close(); } + + Y_UNIT_TEST(CommitTopPast) { + TTopicSdkTestSetup setup = CreateSetup(); + setup.CreateTopic(TEST_TOPIC, TEST_CONSUMER, 1, 100); + + TTopicClient client = setup.MakeClient(); + + auto writeSession = CreateWriteSession(client, "producer-1", 0); + UNIT_ASSERT(writeSession->Write(Msg("message_1", 2))); + UNIT_ASSERT(writeSession->Write(Msg("message_2", 3))); + + ui64 txId = 1023; + SplitPartition(setup, ++txId, 0, "a"); + + auto status = client.CommitOffset(TEST_TOPIC, 0, TEST_CONSUMER, 0).GetValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(NYdb::EStatus::SUCCESS, status.GetStatus(), "The consumer has just started reading the inactive partition and he can commit"); + + status = client.CommitOffset(TEST_TOPIC, 0, TEST_CONSUMER, 1).GetValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(NYdb::EStatus::SUCCESS, status.GetStatus(), "A consumer who has not read to the end can commit messages forward."); + + status = client.CommitOffset(TEST_TOPIC, 0, TEST_CONSUMER, 0).GetValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(NYdb::EStatus::SUCCESS, status.GetStatus(), "A consumer who has not read to the end can commit messages back."); + + status = client.CommitOffset(TEST_TOPIC, 0, TEST_CONSUMER, 2).GetValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(NYdb::EStatus::SUCCESS, status.GetStatus(), "The consumer can commit at the end of the inactive partition."); + + status = client.CommitOffset(TEST_TOPIC, 0, TEST_CONSUMER, 0).GetValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(NYdb::EStatus::BAD_REQUEST, status.GetStatus(), "The consumer cannot commit an offset for inactive, read-to-the-end partitions."); + } } } // namespace NKikimr |