aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorNikolay Shestakov <tesseract@ydb.tech>2025-04-21 19:10:23 +0500
committerGitHub <noreply@github.com>2025-04-21 17:10:23 +0300
commit2530f5d7f7203e24b274c8f1083d0009f15b83a3 (patch)
tree3dc99cbc354e6ad11abdc0b235240fe715a157c6
parent9a6920b87c8f27fc166301651184e83e11684115 (diff)
downloadydb-2530f5d7f7203e24b274c8f1083d0009f15b83a3.tar.gz
Fixed errors of the distributed commit offset to the partition (#17423)
-rw-r--r--ydb/core/persqueue/partition.cpp41
-rw-r--r--ydb/core/persqueue/partition.h1
-rw-r--r--ydb/core/persqueue/ut/partition_ut.cpp7
-rw-r--r--ydb/core/persqueue/ut/ut_with_sdk/autoscaling_ut.cpp495
-rw-r--r--ydb/core/persqueue/ut/ut_with_sdk/commitoffset_ut.cpp623
-rw-r--r--ydb/core/persqueue/ut/ut_with_sdk/topic_ut.cpp87
-rw-r--r--ydb/core/persqueue/ut/ut_with_sdk/ya.make1
-rw-r--r--ydb/public/sdk/cpp/src/client/topic/ut/ut_utils/topic_sdk_test_setup.cpp20
-rw-r--r--ydb/public/sdk/cpp/src/client/topic/ut/ut_utils/topic_sdk_test_setup.h4
-rw-r--r--ydb/services/persqueue_v1/actors/commit_offset_actor.cpp38
-rw-r--r--ydb/services/persqueue_v1/actors/read_session_actor.cpp4
11 files changed, 709 insertions, 612 deletions
diff --git a/ydb/core/persqueue/partition.cpp b/ydb/core/persqueue/partition.cpp
index 9a4ea983ba2..cd384de2734 100644
--- a/ydb/core/persqueue/partition.cpp
+++ b/ydb/core/persqueue/partition.cpp
@@ -2378,18 +2378,20 @@ TPartition::EProcessResult TPartition::BeginTransaction(const TEvPQ::TEvTxCalcPr
bool isAffectedConsumer = AffectedUsers.contains(consumer);
TUserInfoBase& userInfo = GetOrCreatePendingUser(consumer);
- if (!operation.GetReadSessionId().empty() && operation.GetReadSessionId() != userInfo.Session) {
- PQ_LOG_D("Partition " << Partition <<
- " Consumer '" << consumer << "'" <<
- " Bad request (session already dead) " <<
- " RequestSessionId '" << operation.GetReadSessionId() <<
- " CurrentSessionId '" << userInfo.Session <<
- "'");
- result = false;
- } else if (operation.GetOnlyCheckCommitedToFinish()) {
+ if (operation.GetOnlyCheckCommitedToFinish()) {
if (IsActive() || static_cast<ui64>(userInfo.Offset) != EndOffset) {
result = false;
}
+ } else if (!operation.GetReadSessionId().empty() && operation.GetReadSessionId() != userInfo.Session) {
+ if (IsActive() || operation.GetCommitOffsetsEnd() < EndOffset || userInfo.Offset != i64(EndOffset)) {
+ PQ_LOG_D("Partition " << Partition <<
+ " Consumer '" << consumer << "'" <<
+ " Bad request (session already dead) " <<
+ " RequestSessionId '" << operation.GetReadSessionId() <<
+ " CurrentSessionId '" << userInfo.Session <<
+ "'");
+ result = false;
+ }
} else {
if (!operation.GetForceCommit() && operation.GetCommitOffsetsBegin() > operation.GetCommitOffsetsEnd()) {
PQ_LOG_D("Partition " << Partition <<
@@ -2423,6 +2425,7 @@ TPartition::EProcessResult TPartition::BeginTransaction(const TEvPQ::TEvTxCalcPr
consumers.insert(consumer);
}
}
+
if (result) {
TxAffectedConsumers.insert(consumers.begin(), consumers.end());
}
@@ -2913,6 +2916,7 @@ TPartition::EProcessResult TPartition::PreProcessImmediateTx(const NKikimrPQ::TE
"incorrect offset range (begin > end)");
return EProcessResult::ContinueDrop;
}
+
consumers.insert(user);
}
SetOffsetAffectedConsumers.insert(consumers.begin(), consumers.end());
@@ -2937,6 +2941,10 @@ void TPartition::ExecImmediateTx(TTransaction& t)
return;
}
for (const auto& operation : record.GetData().GetOperations()) {
+ if (operation.GetOnlyCheckCommitedToFinish()) {
+ continue;
+ }
+
if (!operation.HasCommitOffsetsBegin() || !operation.HasCommitOffsetsEnd() || !operation.HasConsumer()) {
continue; //Write operation - handled separately via WriteInfo
}
@@ -2977,6 +2985,21 @@ void TPartition::ExecImmediateTx(TTransaction& t)
"incorrect offset range (commit to the future)");
return;
}
+
+ if (!operation.GetReadSessionId().empty() && operation.GetReadSessionId() != pendingUserInfo.Session) {
+ if (IsActive() || operation.GetCommitOffsetsEnd() < EndOffset || pendingUserInfo.Offset != i64(EndOffset)) {
+ ScheduleReplyPropose(record,
+ NKikimrPQ::TEvProposeTransactionResult::BAD_REQUEST,
+ NKikimrPQ::TError::BAD_REQUEST,
+ "session already dead");
+ return;
+ }
+ }
+
+ if ((i64)operation.GetCommitOffsetsEnd() < pendingUserInfo.Offset && !operation.GetReadSessionId().empty()) {
+ continue; // this is stale request, answer ok for it
+ }
+
pendingUserInfo.Offset = operation.GetCommitOffsetsEnd();
}
CommitWriteOperations(t);
diff --git a/ydb/core/persqueue/partition.h b/ydb/core/persqueue/partition.h
index 358260cc245..acf82db4d90 100644
--- a/ydb/core/persqueue/partition.h
+++ b/ydb/core/persqueue/partition.h
@@ -273,7 +273,6 @@ private:
void ConsumeBlobQuota();
void UpdateAfterWriteCounters(bool writeComplete);
-
void UpdateUserInfoEndOffset(const TInstant& now);
void UpdateWriteBufferIsFullState(const TInstant& now);
diff --git a/ydb/core/persqueue/ut/partition_ut.cpp b/ydb/core/persqueue/ut/partition_ut.cpp
index 3a98fd2c687..a7d5385d657 100644
--- a/ydb/core/persqueue/ut/partition_ut.cpp
+++ b/ydb/core/persqueue/ut/partition_ut.cpp
@@ -21,6 +21,11 @@
#include "make_config.h"
+template<>
+void Out<NKikimrPQ::TEvProposeTransactionResult_EStatus>(IOutputStream& out, NKikimrPQ::TEvProposeTransactionResult_EStatus v) {
+ out << NKikimrPQ::TEvProposeTransactionResult::EStatus_Name(v);
+}
+
namespace NKikimr::NPQ {
namespace NHelpers {
@@ -1007,7 +1012,7 @@ void TPartitionFixture::WaitProposeTransactionResponse(const TProposeTransaction
if (matcher.Status) {
UNIT_ASSERT(event->Record.HasStatus());
- UNIT_ASSERT(*matcher.Status == event->Record.GetStatus());
+ UNIT_ASSERT_VALUES_EQUAL(*matcher.Status, event->Record.GetStatus());
}
}
diff --git a/ydb/core/persqueue/ut/ut_with_sdk/autoscaling_ut.cpp b/ydb/core/persqueue/ut/ut_with_sdk/autoscaling_ut.cpp
index d560b8181d0..39dac6d578c 100644
--- a/ydb/core/persqueue/ut/ut_with_sdk/autoscaling_ut.cpp
+++ b/ydb/core/persqueue/ut/ut_with_sdk/autoscaling_ut.cpp
@@ -622,98 +622,6 @@ Y_UNIT_TEST_SUITE(TopicAutoscaling) {
readSession2->Close();
}
- Y_UNIT_TEST(PartitionSplit_OffsetCommit) {
- TTopicSdkTestSetup setup = CreateSetup();
- setup.CreateTopicWithAutoscale();
- TTopicClient client = setup.MakeClient();
-
- setup.Write("message-1", 0);
- setup.Write("message-2", 0);
- setup.Write("message-3", 0);
- setup.Write("message-4", 0);
- setup.Write("message-5", 0);
- setup.Write("message-6", 0);
-
- {
- ui64 txId = 1006;
- SplitPartition(setup, ++txId, 0, "a");
-
- auto describe = setup.DescribeTopic();
- UNIT_ASSERT_EQUAL(describe.GetPartitions().size(), 3);
- }
-
- setup.Write("message-7", 1);
- setup.Write("message-8", 1);
- setup.Write("message-9", 1);
- setup.Write("message-10", 1);
-
- {
- ui64 txId = 1007;
- SplitPartition(setup, ++txId, 1, "0");
-
- auto describe = setup.DescribeTopic();
- UNIT_ASSERT_EQUAL(describe.GetPartitions().size(), 5);
- }
-
- setup.Write("message-11", 3);
- setup.Write("message-12", 3);
-
- auto assertCommittedOffset = [&](size_t partition, size_t expectedOffset, const std::string& msg = "") {
- auto description = setup.DescribeConsumer();
- auto stats = description.GetPartitions().at(partition).GetPartitionConsumerStats();
- UNIT_ASSERT(stats);
- UNIT_ASSERT_VALUES_EQUAL_C(expectedOffset, stats->GetCommittedOffset(), "Partition " << partition << ": " << msg);
- };
-
- {
- static constexpr size_t commited = 2;
- auto status = setup.Commit(TEST_TOPIC, TEST_CONSUMER, 1, commited);
- UNIT_ASSERT(status.IsSuccess());
-
- assertCommittedOffset(0, 6, "Must be commited to the partition end because it is the parent");
- assertCommittedOffset(1, commited);
- assertCommittedOffset(3, 0);
- }
-
- {
- static constexpr size_t commited = 3;
- auto status = setup.Commit(TEST_TOPIC, TEST_CONSUMER, 0, commited);
- UNIT_ASSERT(status.IsSuccess());
-
- assertCommittedOffset(0, commited);
- assertCommittedOffset(1, 0, "Must be commited to the partition begin because it is the child");
- assertCommittedOffset(3, 0);
- }
- }
-
- Y_UNIT_TEST(CommitTopPast) {
- TTopicSdkTestSetup setup = CreateSetup();
- setup.CreateTopicWithAutoscale();
-
- TTopicClient client = setup.MakeClient();
-
- setup.Write("message_1", 0);
- setup.Write("message_2", 0);
-
- ui64 txId = 1023;
- SplitPartition(setup, ++txId, 0, "a");
-
- auto status = client.CommitOffset(TEST_TOPIC, 0, TEST_CONSUMER, 0).GetValueSync();
- UNIT_ASSERT_VALUES_EQUAL_C(NYdb::EStatus::SUCCESS, status.GetStatus(), "The consumer has just started reading the inactive partition and he can commit");
-
- status = client.CommitOffset(TEST_TOPIC, 0, TEST_CONSUMER, 1).GetValueSync();
- UNIT_ASSERT_VALUES_EQUAL_C(NYdb::EStatus::SUCCESS, status.GetStatus(), "A consumer who has not read to the end can commit messages forward.");
-
- status = client.CommitOffset(TEST_TOPIC, 0, TEST_CONSUMER, 0).GetValueSync();
- UNIT_ASSERT_VALUES_EQUAL_C(NYdb::EStatus::SUCCESS, status.GetStatus(), "A consumer who has not read to the end can commit messages back.");
-
- status = client.CommitOffset(TEST_TOPIC, 0, TEST_CONSUMER, 2).GetValueSync();
- UNIT_ASSERT_VALUES_EQUAL_C(NYdb::EStatus::SUCCESS, status.GetStatus(), "The consumer can commit at the end of the inactive partition.");
-
- status = client.CommitOffset(TEST_TOPIC, 0, TEST_CONSUMER, 0).GetValueSync();
- UNIT_ASSERT_VALUES_EQUAL_C(NYdb::EStatus::SUCCESS, status.GetStatus(), "The consumer can commit an offset for inactive, read-to-the-end partitions.");
- }
-
Y_UNIT_TEST(ControlPlane_CreateAlterDescribe) {
auto autoscalingTestTopic = "autoscalit-topic";
TTopicSdkTestSetup setup = CreateSetup();
@@ -932,409 +840,6 @@ Y_UNIT_TEST_SUITE(TopicAutoscaling) {
UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), NYdb::EStatus::BAD_REQUEST);
}
- Y_UNIT_TEST(PartitionSplit_DistributedTxCommit) {
- TTopicSdkTestSetup setup = CreateSetup();
- setup.CreateTopicWithAutoscale();
- TTopicClient client = setup.MakeClient();
-
- setup.Write("message-1", 0, "producer-1", 1);
- setup.Write("message-2", 0, "producer-1", 2);
- setup.Write("message-3", 0, "producer-1", 3);
- setup.Write("message-4", 0, "producer-1", 4);
- setup.Write("message-5", 0, "producer-1", 5);
- setup.Write("message-6", 0, "producer-2", 6);
-
- {
- ui64 txId = 1006;
- SplitPartition(setup, ++txId, 0, "a");
-
- auto describe = setup.DescribeTopic();
- UNIT_ASSERT_EQUAL(describe.GetPartitions().size(), 3);
- }
-
- setup.Write("message-7", 1, "producer-1", 7);
- setup.Write("message-8", 1, "producer-1", 8);
- setup.Write("message-9", 1, "producer-1", 9);
- setup.Write("message-10", 1, "producer-2", 10);
-
- {
- ui64 txId = 1007;
- SplitPartition(setup, ++txId, 1, "0");
-
- auto describe = setup.DescribeTopic();
- UNIT_ASSERT_EQUAL(describe.GetPartitions().size(), 5);
- }
-
- auto count = 0;
- const auto expected = 10;
-
- auto result = setup.Read(TEST_TOPIC, TEST_CONSUMER, [&](auto& x) {
- auto& messages = x.GetMessages();
- for (size_t i = 0u; i < messages.size(); ++i) {
- ++count;
- auto& message = messages[i];
- Cerr << "SESSION EVENT read message: " << count << " from partition: " << message.GetPartitionSession()->GetPartitionId() << Endl << Flush;
- message.Commit();
- }
-
- return true;
- });
-
- UNIT_ASSERT(result.Timeout);
- UNIT_ASSERT_VALUES_EQUAL(count, expected);
-
- auto description = setup.DescribeConsumer();
- UNIT_ASSERT(description.GetPartitions().size() == 5);
-
- auto stats1 = description.GetPartitions().at(1).GetPartitionConsumerStats();
- UNIT_ASSERT(stats1);
- UNIT_ASSERT(stats1->GetCommittedOffset() == 4);
- }
-
- Y_UNIT_TEST(PartitionSplit_DistributedTxCommit_ChildFirst) {
- TTopicSdkTestSetup setup = CreateSetup();
- setup.CreateTopicWithAutoscale();
-
- TTopicClient client = setup.MakeClient();
-
- setup.Write("message-1", 0, "producer-1", 1);
- setup.Write("message-2", 0, "producer-1", 2);
- setup.Write("message-3", 0, "producer-1", 3);
- setup.Write("message-4", 0, "producer-1", 4);
- setup.Write("message-5", 0, "producer-1", 5);
- setup.Write("message-6", 0, "producer-2", 6);
-
- {
- ui64 txId = 1006;
- SplitPartition(setup, ++txId, 0, "a");
-
- auto describe = setup.DescribeTopic();
- UNIT_ASSERT_EQUAL(describe.GetPartitions().size(), 3);
- }
-
- setup.Write("message-7", 1, "producer-1", 7);
- setup.Write("message-8", 1, "producer-1", 8);
- setup.Write("message-9", 1, "producer-1", 9);
- setup.Write("message-10", 1, "producer-2", 10);
-
- {
- ui64 txId = 1007;
- SplitPartition(setup, ++txId, 1, "0");
-
- auto describe = setup.DescribeTopic();
- UNIT_ASSERT_EQUAL(describe.GetPartitions().size(), 5);
- }
-
-
- auto count = 0;
- const auto expected = 10;
-
- std::vector<NYdb::NTopic::TReadSessionEvent::TDataReceivedEvent::TMessage> partition0Messages;
-
- auto result = setup.Read(TEST_TOPIC, TEST_CONSUMER, [&](auto& x) {
- auto& messages = x.GetMessages();
- for (size_t i = 0u; i < messages.size(); ++i) {
- auto& message = messages[i];
- count++;
- int partitionId = message.GetPartitionSession()->GetPartitionId();
- Cerr << "SESSION EVENT read message: " << count << " from partition: " << partitionId << Endl << Flush;
- if (partitionId == 1) {
- // Commit messages from partition 1 immediately
- message.Commit();
- } else if (partitionId == 0) {
- // Store messages from partition 0 for later
- partition0Messages.push_back(message);
- }
- }
-
- return true;
- });
-
- UNIT_ASSERT(result.Timeout);
- UNIT_ASSERT_VALUES_EQUAL(count, expected);
-
- Sleep(TDuration::Seconds(5));
-
- {
- auto description = setup.DescribeConsumer();
- auto stats = description.GetPartitions().at(1).GetPartitionConsumerStats();
- UNIT_ASSERT(stats);
-
- // Messages in the parent partition hasn't been committed
- UNIT_ASSERT_VALUES_EQUAL(stats->GetCommittedOffset(), 0);
- }
-
- for (auto& message : partition0Messages) {
- message.Commit();
- }
-
- Sleep(TDuration::Seconds(5));
-
- {
- auto description = setup.DescribeConsumer();
- auto stats = description.GetPartitions().at(1).GetPartitionConsumerStats();
- UNIT_ASSERT(stats);
-
- UNIT_ASSERT_VALUES_EQUAL(stats->GetCommittedOffset(), 4);
- }
- }
-
- Y_UNIT_TEST(PartitionSplit_DistributedTxCommit_CheckSessionResetAfterCommit) {
- TTopicSdkTestSetup setup = CreateSetup();
- setup.CreateTopicWithAutoscale();
-
- TTopicClient client = setup.MakeClient();
-
- auto seqNo = 1;
-
- setup.Write("message-1", 0, "producer-1", seqNo++);
- setup.Write("message-2", 0, "producer-1", seqNo++);
- setup.Write("message-3", 0, "producer-1", seqNo++);
- setup.Write("message-4", 0, "producer-1", seqNo++);
- setup.Write("message-5", 0, "producer-1", seqNo++);
- setup.Write("message-6", 0, "producer-2", seqNo++);
-
- {
- ui64 txId = 1006;
- SplitPartition(setup, ++txId, 0, "a");
-
- auto describe = setup.DescribeTopic();
- UNIT_ASSERT_EQUAL(describe.GetPartitions().size(), 3);
- }
-
- setup.Write("message-7", 1, "producer-2", seqNo++);
- setup.Write("message-8", 1, "producer-2", seqNo++);
-
- std::vector<size_t> counters;
- counters.resize(seqNo - 1);
-
- auto result = setup.Read(TEST_TOPIC, TEST_CONSUMER, [&](auto& x) {
- auto& messages = x.GetMessages();
- for (size_t i = 0u; i < messages.size(); ++i) {
- auto& message = messages[i];
- message.Commit();
- Cerr << "SESSION EVENT READ SeqNo: " << message.GetSeqNo() << Endl << Flush;
- auto count = ++counters[message.GetSeqNo() - 1];
-
- // check we get this SeqNo two times
- if (message.GetSeqNo() == 6 && count == 1) {
- Sleep(TDuration::MilliSeconds(300));
- auto status = setup.Commit(TEST_TOPIC, TEST_CONSUMER, 0, 3);
- UNIT_ASSERT(status.IsSuccess());
- }
- }
-
- return true;
- });
-
- UNIT_ASSERT_VALUES_EQUAL_C(1, counters[0], TStringBuilder() << "Message must be read 1 times because reset commit to offset 3, but 0 message has been read " << counters[0] << " times") ;
- UNIT_ASSERT_VALUES_EQUAL_C(1, counters[1], TStringBuilder() << "Message must be read 1 times because reset commit to offset 3, but 1 message has been read " << counters[1] << " times") ;
- UNIT_ASSERT_VALUES_EQUAL_C(1, counters[2], TStringBuilder() << "Message must be read 1 times because reset commit to offset 3, but 2 message has been read " << counters[2] << " times") ;
-
- UNIT_ASSERT_VALUES_EQUAL_C(2, counters[3], TStringBuilder() << "Message 1 must be read two times, but 3 message has been read " << counters[3] << " times") ;
- UNIT_ASSERT_VALUES_EQUAL_C(2, counters[4], TStringBuilder() << "Message 1 must be read two times, but 4 message has been read " << counters[4] << " times") ;
- UNIT_ASSERT_VALUES_EQUAL_C(2, counters[5], TStringBuilder() << "Message 1 must be read two times, but 5 message has been read " << counters[5] << " times") ;
-
- {
- auto s = result.StartPartitionSessionEvents[0];
- UNIT_ASSERT_VALUES_EQUAL(0, s.GetPartitionSession()->GetPartitionId());
- UNIT_ASSERT_VALUES_EQUAL(0, s.GetCommittedOffset());
- UNIT_ASSERT_VALUES_EQUAL(6, s.GetEndOffset());
- }
- {
- auto s = result.StartPartitionSessionEvents[3];
- UNIT_ASSERT_VALUES_EQUAL(0, s.GetPartitionSession()->GetPartitionId());
- UNIT_ASSERT_VALUES_EQUAL(3, s.GetCommittedOffset());
- UNIT_ASSERT_VALUES_EQUAL(6, s.GetEndOffset());
- }
- }
-
- Y_UNIT_TEST(PartitionSplit_DistributedTxCommit_CheckOffsetCommitForDifferentCases_SplitedTopic) {
- TTopicSdkTestSetup setup = CreateSetup();
- TTopicClient client = setup.MakeClient();
-
- setup.CreateTopicWithAutoscale();
-
- auto commit = [&](const std::string& sessionId, ui64 offset) {
- return setup.Commit(TEST_TOPIC, TEST_CONSUMER, 0, offset, sessionId);
- };
-
- auto getConsumerState = [&](ui32 partition) {
- auto description = setup.DescribeConsumer(TEST_TOPIC, TEST_CONSUMER);
-
- auto stats = description.GetPartitions().at(partition).GetPartitionConsumerStats();
- UNIT_ASSERT(stats);
- return stats;
- };
-
- setup.Write("message-1", 0, "producer-1", 1);
- setup.Write("message-2", 0, "producer-1", 2);
- setup.Write("message-3", 0, "producer-1", 3);
- setup.Write("message-4", 0, "producer-1", 4);
- setup.Write("message-5", 0, "producer-1", 5);
- setup.Write("message-6", 0, "producer-1", 6);
- setup.Write("message-7", 0, "producer-1", 7);
- setup.Write("message-8", 0, "producer-2", 8);
-
- {
- ui64 txId = 1006;
- SplitPartition(setup, ++txId, 0, "a");
-
- auto describe = client.DescribeTopic(TEST_TOPIC).GetValueSync();
- UNIT_ASSERT_EQUAL(describe.GetTopicDescription().GetPartitions().size(), 3);
- }
-
- setup.Write("message-9", 1, "producer-2", 9);
- setup.Write("message-10", 1, "producer-2", 10);
-
- auto commitSent = false;
- TString readSessionId = "";
-
- setup.Read(TEST_TOPIC, TEST_CONSUMER, [&](auto& x) {
- auto& messages = x.GetMessages();
- for (size_t i = 0u; i < messages.size(); ++i) {
- auto& message = messages[i];
- Cerr << "SESSION EVENT READ SeqNo: " << message.GetSeqNo() << Endl << Flush;
-
- if (commitSent) {
- // read session not changed
- UNIT_ASSERT_EQUAL(readSessionId, message.GetPartitionSession()->GetReadSessionId());
- }
-
- // check we NOT get this SeqNo two times
- if (message.GetSeqNo() == 6) {
- if (!commitSent) {
- commitSent = true;
-
- readSessionId = message.GetPartitionSession()->GetReadSessionId();
-
- {
- auto status = commit(message.GetPartitionSession()->GetReadSessionId(), 8);
- UNIT_ASSERT(status.IsSuccess());
-
- auto stats = getConsumerState(0);
- UNIT_ASSERT_VALUES_EQUAL(stats->GetCommittedOffset(), 8);
- }
-
- {
- // must be ignored, because commit to past
- auto status = commit(message.GetPartitionSession()->GetReadSessionId(), 0);
- UNIT_ASSERT(status.IsSuccess());
-
- auto stats = getConsumerState(0);
- UNIT_ASSERT_VALUES_EQUAL(stats->GetCommittedOffset(), 8);
- }
-
- /* TODO uncomment this
- {
- // must be ignored, because wrong sessionid
- auto status = commit("random session", 0);
- UNIT_ASSERT(!status.IsSuccess());
-
- Sleep(TDuration::MilliSeconds(500));
-
- auto stats = getConsumerState(0);
- UNIT_ASSERT_VALUES_EQUAL(stats->GetCommittedOffset(), 8);
- }
- */
- } else {
- UNIT_ASSERT(false);
- }
- } else {
- message.Commit();
- }
- }
-
- return true;
- });
- }
-
- Y_UNIT_TEST(PartitionSplit_DistributedTxCommit_CheckOffsetCommitForDifferentCases_NotSplitedTopic) {
- TTopicSdkTestSetup setup = CreateSetup();
- setup.CreateTopicWithAutoscale();
- TTopicClient client = setup.MakeClient();
-
- auto commit = [&](const std::string& sessionId, ui64 offset) {
- return setup.Commit(TEST_TOPIC, TEST_CONSUMER, 0, offset, sessionId);
- };
-
- auto getConsumerState = [&](ui32 partition) {
- auto description = setup.DescribeConsumer(TEST_TOPIC, TEST_CONSUMER);
-
- auto stats = description.GetPartitions().at(partition).GetPartitionConsumerStats();
- UNIT_ASSERT(stats);
- return stats;
- };
-
- setup.Write("message-1", 0, "producer-1", 1);
- setup.Write("message-2", 0, "producer-1", 2);
- setup.Write("message-3", 0, "producer-1", 3);
- setup.Write("message-4", 0, "producer-1", 4);
- setup.Write("message-5", 0, "producer-1", 5);
- setup.Write("message-6", 0, "producer-1", 6);
- setup.Write("message-7", 0, "producer-1", 7);
- setup.Write("message-8", 0, "producer-2", 8);
-
- auto commitSent = false;
- TString readSessionId = "";
-
- setup.Read(TEST_TOPIC, TEST_CONSUMER, [&](auto& x) {
- auto& messages = x.GetMessages();
- for (size_t i = 0u; i < messages.size(); ++i) {
- auto& message = messages[i];
-
- if (commitSent) {
- // read session not changed
- UNIT_ASSERT_EQUAL(readSessionId, message.GetPartitionSession()->GetReadSessionId());
- }
-
- // check we NOT get this SeqNo two times
- if (message.GetSeqNo() == 6) {
- if (!commitSent) {
- commitSent = true;
- readSessionId = message.GetPartitionSession()->GetReadSessionId();
-
- Sleep(TDuration::MilliSeconds(300));
-
- {
- auto status = commit(message.GetPartitionSession()->GetReadSessionId(), 8);
- UNIT_ASSERT(status.IsSuccess());
-
- auto stats = getConsumerState(0);
- UNIT_ASSERT_VALUES_EQUAL(stats->GetCommittedOffset(), 8);
- }
-
- {
- // must be ignored, because commit to past
- auto status = commit(message.GetPartitionSession()->GetReadSessionId(), 0);
- UNIT_ASSERT(status.IsSuccess());
-
- auto stats = getConsumerState(0);
- UNIT_ASSERT_VALUES_EQUAL(stats->GetCommittedOffset(), 8);
- }
-
- {
- // must be ignored, because wrong sessionid
- auto status = commit("random session", 0);
- UNIT_ASSERT(!status.IsSuccess());
-
- Sleep(TDuration::MilliSeconds(500));
-
- auto stats = getConsumerState(0);
- UNIT_ASSERT_VALUES_EQUAL(stats->GetCommittedOffset(), 8);
- }
- } else {
- UNIT_ASSERT(false);
- }
- } else {
- message.Commit();
- }
- }
-
- return true;
- });
- }
-
Y_UNIT_TEST(PartitionSplit_AutosplitByLoad) {
TTopicSdkTestSetup setup = CreateSetup();
TTopicClient client = setup.MakeClient();
diff --git a/ydb/core/persqueue/ut/ut_with_sdk/commitoffset_ut.cpp b/ydb/core/persqueue/ut/ut_with_sdk/commitoffset_ut.cpp
new file mode 100644
index 00000000000..cd80af0b1ec
--- /dev/null
+++ b/ydb/core/persqueue/ut/ut_with_sdk/commitoffset_ut.cpp
@@ -0,0 +1,623 @@
+#include <ydb/core/persqueue/ut/common/autoscaling_ut_common.h>
+
+#include <ydb/public/sdk/cpp/src/client/topic/ut/ut_utils/topic_sdk_test_setup.h>
+
+#include <library/cpp/testing/unittest/registar.h>
+#include <ydb/core/persqueue/partition_key_range/partition_key_range.h>
+#include <ydb/core/persqueue/partition_scale_manager.h>
+#include <ydb/core/tx/schemeshard/ut_helpers/helpers.h>
+#include <ydb/core/tx/schemeshard/ut_helpers/test_env.h>
+
+#include <util/stream/output.h>
+
+namespace NKikimr {
+
+using namespace NYdb::NTopic;
+using namespace NYdb::NTopic::NTests;
+using namespace NSchemeShardUT_Private;
+using namespace NKikimr::NPQ::NTest;
+
+Y_UNIT_TEST_SUITE(CommitOffset) {
+
+ void PrepareFlatTopic(TTopicSdkTestSetup& setup) {
+ setup.CreateTopic();
+
+ setup.Write("message-1");
+ setup.Write("message-2");
+ setup.Write("message-3");
+ }
+
+ void PrepareAutopartitionedTopic(TTopicSdkTestSetup& setup) {
+ setup.CreateTopicWithAutoscale();
+
+ // Creating partition hierarchy
+ // 0 ──┬──> 1 ──┬──> 3
+ // │ └──> 4
+ // └──> 2
+ //
+ // Each partition has 3 messages
+
+ setup.Write("message-0-1", 0);
+ setup.Write("message-0-2", 0);
+ setup.Write("message-0-3", 0);
+
+ {
+ ui64 txId = 1006;
+ SplitPartition(setup, ++txId, 0, "a");
+ }
+
+ setup.Write("message-1-1", 1);
+ setup.Write("message-1-2", 1);
+ setup.Write("message-1-3", 1);
+
+ setup.Write("message-2-1", 2);
+ setup.Write("message-2-2", 2);
+ setup.Write("message-2-3", 2);
+
+ {
+ ui64 txId = 1007;
+ SplitPartition(setup, ++txId, 1, "0");
+ }
+
+ setup.Write("message-3-1", 3);
+ setup.Write("message-3-2", 3);
+ setup.Write("message-3-3", 3);
+
+ setup.Write("message-4-1", 4);
+ setup.Write("message-4-2", 4);
+ setup.Write("message-4-3", 4);
+ }
+
+ ui64 GetCommittedOffset(TTopicSdkTestSetup& setup, size_t partition) {
+ auto description = setup.DescribeConsumer();
+ auto stats = description.GetPartitions().at(partition).GetPartitionConsumerStats();
+ UNIT_ASSERT(stats);
+ return stats->GetCommittedOffset();
+ }
+
+ Y_UNIT_TEST(Commit_Flat_WithWrongSession) {
+ TTopicSdkTestSetup setup = CreateSetup();
+ PrepareFlatTopic(setup);
+
+ {
+ auto result = setup.Commit(TEST_TOPIC, TEST_CONSUMER, 0, 1, "wrong-read-session-id");
+ UNIT_ASSERT_C(!result.IsSuccess(), "Commit doesn`t work with wrong session id");
+ UNIT_ASSERT_VALUES_EQUAL(0, GetCommittedOffset(setup, 0));
+ }
+ }
+
+ Y_UNIT_TEST(Commit_Flat_WithWrongSession_ToPast) {
+ TTopicSdkTestSetup setup = CreateSetup();
+ PrepareFlatTopic(setup);
+
+ {
+ auto result = setup.Commit(TEST_TOPIC, TEST_CONSUMER, 0, 2);
+ UNIT_ASSERT_C(result.IsSuccess(), "Commited without session id. It is reset mode");
+ UNIT_ASSERT_VALUES_EQUAL(2, GetCommittedOffset(setup, 0));
+ }
+
+ {
+ auto result = setup.Commit(TEST_TOPIC, TEST_CONSUMER, 0, 0, "wrong-read-session-id");
+ UNIT_ASSERT_C(!result.IsSuccess(), "Commit doesn`t work with wrong session id");
+ UNIT_ASSERT_VALUES_EQUAL(2, GetCommittedOffset(setup, 0));
+ }
+ }
+
+ Y_UNIT_TEST(Commit_WithoutSession_TopPast) {
+ TTopicSdkTestSetup setup = CreateSetup();
+ PrepareAutopartitionedTopic(setup);
+
+ auto status = setup.Commit(TEST_TOPIC, TEST_CONSUMER, 0, 0);
+ UNIT_ASSERT_VALUES_EQUAL_C(NYdb::EStatus::SUCCESS, status.GetStatus(), "The consumer has just started reading the inactive partition and he can commit");
+
+ status = setup.Commit(TEST_TOPIC, TEST_CONSUMER, 0, 1);
+ UNIT_ASSERT_VALUES_EQUAL_C(NYdb::EStatus::SUCCESS, status.GetStatus(), "A consumer who has not read to the end can commit messages forward.");
+
+ status = setup.Commit(TEST_TOPIC, TEST_CONSUMER, 0, 0);
+ UNIT_ASSERT_VALUES_EQUAL_C(NYdb::EStatus::SUCCESS, status.GetStatus(), "A consumer who has not read to the end can commit messages back.");
+
+ status = setup.Commit(TEST_TOPIC, TEST_CONSUMER, 0, 3);
+ UNIT_ASSERT_VALUES_EQUAL_C(NYdb::EStatus::SUCCESS, status.GetStatus(), "The consumer can commit at the end of the inactive partition.");
+
+ status = setup.Commit(TEST_TOPIC, TEST_CONSUMER, 0, 0);
+ UNIT_ASSERT_VALUES_EQUAL_C(NYdb::EStatus::SUCCESS, status.GetStatus(), "The consumer can commit an offset for inactive, read-to-the-end partitions.");
+ }
+
+ Y_UNIT_TEST(Commit_WithWrongSession_ToParent) {
+ TTopicSdkTestSetup setup = CreateSetup();
+ PrepareAutopartitionedTopic(setup);
+ setup.CreateTopicWithAutoscale();
+
+ {
+ auto result = setup.Commit(TEST_TOPIC, TEST_CONSUMER, 0, 1);
+ UNIT_ASSERT_C(result.IsSuccess(), "Commited without session id. It is reset mode");
+ UNIT_ASSERT_VALUES_EQUAL(1, GetCommittedOffset(setup, 0));
+ }
+
+ {
+ auto result = setup.Commit(TEST_TOPIC, TEST_CONSUMER, 1, 1);
+ UNIT_ASSERT_C(result.IsSuccess(), "Commited without session id. It is reset mode");
+ UNIT_ASSERT_VALUES_EQUAL(3, GetCommittedOffset(setup, 0));
+ UNIT_ASSERT_VALUES_EQUAL(1, GetCommittedOffset(setup, 1));
+ }
+
+ {
+ auto result = setup.Commit(TEST_TOPIC, TEST_CONSUMER, 0, 0, "wrong-read-session-id");
+ UNIT_ASSERT_C(!result.IsSuccess(), "Commit doesn`t work with wrong session id");
+ UNIT_ASSERT_VALUES_EQUAL_C(3, GetCommittedOffset(setup, 0), "Offset doesn`t changed");
+ UNIT_ASSERT_VALUES_EQUAL_C(1, GetCommittedOffset(setup, 1), "Offset doesn`t changed");
+ }
+ }
+
+ Y_UNIT_TEST(Commit_WithoutSession_ParentNotFinished) {
+ TTopicSdkTestSetup setup = CreateSetup();
+ PrepareAutopartitionedTopic(setup);
+
+ {
+ // Commit parent partition to non end
+ auto result = setup.Commit(TEST_TOPIC, TEST_CONSUMER, 0, 1);
+ UNIT_ASSERT(result.IsSuccess());
+
+ UNIT_ASSERT_VALUES_EQUAL(1, GetCommittedOffset(setup, 0));
+ UNIT_ASSERT_VALUES_EQUAL(0, GetCommittedOffset(setup, 1));
+ UNIT_ASSERT_VALUES_EQUAL(0, GetCommittedOffset(setup, 2));
+ UNIT_ASSERT_VALUES_EQUAL(0, GetCommittedOffset(setup, 3));
+ UNIT_ASSERT_VALUES_EQUAL(0, GetCommittedOffset(setup, 4));
+ }
+
+ {
+ auto result = setup.Commit(TEST_TOPIC, TEST_CONSUMER, 1, 1);
+ UNIT_ASSERT(result.IsSuccess());
+
+ UNIT_ASSERT_VALUES_EQUAL(3, GetCommittedOffset(setup, 0));
+ UNIT_ASSERT_VALUES_EQUAL(1, GetCommittedOffset(setup, 1));
+ UNIT_ASSERT_VALUES_EQUAL(0, GetCommittedOffset(setup, 2));
+ UNIT_ASSERT_VALUES_EQUAL(0, GetCommittedOffset(setup, 3));
+ UNIT_ASSERT_VALUES_EQUAL(0, GetCommittedOffset(setup, 4));
+ }
+ }
+
+ Y_UNIT_TEST(Commit_WithoutSession_ToPastParentPartition) {
+ TTopicSdkTestSetup setup = CreateSetup();
+ PrepareAutopartitionedTopic(setup);
+
+ {
+ // Commit child partition to non end
+ auto result = setup.Commit(TEST_TOPIC, TEST_CONSUMER, 3, 1);
+ UNIT_ASSERT(result.IsSuccess());
+
+ UNIT_ASSERT_VALUES_EQUAL(3, GetCommittedOffset(setup, 0));
+ UNIT_ASSERT_VALUES_EQUAL(3, GetCommittedOffset(setup, 1));
+ UNIT_ASSERT_VALUES_EQUAL(0, GetCommittedOffset(setup, 2));
+ UNIT_ASSERT_VALUES_EQUAL(1, GetCommittedOffset(setup, 3));
+ UNIT_ASSERT_VALUES_EQUAL(0, GetCommittedOffset(setup, 4));
+ }
+
+ {
+ auto result = setup.Commit(TEST_TOPIC, TEST_CONSUMER, 1, 1);
+ UNIT_ASSERT(result.IsSuccess());
+
+ UNIT_ASSERT_VALUES_EQUAL(3, GetCommittedOffset(setup, 0));
+ UNIT_ASSERT_VALUES_EQUAL(1, GetCommittedOffset(setup, 1));
+ UNIT_ASSERT_VALUES_EQUAL(0, GetCommittedOffset(setup, 2));
+ UNIT_ASSERT_VALUES_EQUAL(0, GetCommittedOffset(setup, 3));
+ UNIT_ASSERT_VALUES_EQUAL(0, GetCommittedOffset(setup, 4));
+ }
+ }
+
+ Y_UNIT_TEST(Commit_WithSession_ParentNotFinished_SameSession) {
+ TTopicSdkTestSetup setup = CreateSetup();
+ PrepareAutopartitionedTopic(setup);
+
+ {
+ // Commit parent partition to non end
+ auto result = setup.Commit(TEST_TOPIC, TEST_CONSUMER, 0, 1);
+ UNIT_ASSERT(result.IsSuccess());
+
+ UNIT_ASSERT_VALUES_EQUAL(1, GetCommittedOffset(setup, 0));
+ UNIT_ASSERT_VALUES_EQUAL(0, GetCommittedOffset(setup, 1));
+ UNIT_ASSERT_VALUES_EQUAL(0, GetCommittedOffset(setup, 2));
+ UNIT_ASSERT_VALUES_EQUAL(0, GetCommittedOffset(setup, 3));
+ UNIT_ASSERT_VALUES_EQUAL(0, GetCommittedOffset(setup, 4));
+ }
+
+ {
+ auto r = setup.Read(TEST_TOPIC, TEST_CONSUMER, [&](auto& x) {
+ return x.GetMessages().back().GetData() != "message-3-3";
+ });
+
+ // Commit parent to middle
+ auto result = setup.Commit(TEST_TOPIC, TEST_CONSUMER, 1, 1, r.StartPartitionSessionEvents.front().GetPartitionSession()->GetReadSessionId());
+ UNIT_ASSERT(result.IsSuccess());
+
+ UNIT_ASSERT_VALUES_EQUAL(3, GetCommittedOffset(setup, 0));
+ UNIT_ASSERT_VALUES_EQUAL(1, GetCommittedOffset(setup, 1));
+ UNIT_ASSERT_VALUES_EQUAL(0, GetCommittedOffset(setup, 2));
+ UNIT_ASSERT_VALUES_EQUAL(0, GetCommittedOffset(setup, 3));
+ UNIT_ASSERT_VALUES_EQUAL(0, GetCommittedOffset(setup, 4));
+ }
+ }
+
+ Y_UNIT_TEST(Commit_WithSession_ParentNotFinished_OtherSession) {
+ TTopicSdkTestSetup setup = CreateSetup();
+ PrepareAutopartitionedTopic(setup);
+ TTopicClient client(setup.MakeDriver());
+
+ auto r0 = setup.Read(TEST_TOPIC, TEST_CONSUMER, [](auto&) { return false; }, 0);
+ auto r1 = setup.Read(TEST_TOPIC, TEST_CONSUMER, [](auto&) { return false; }, 1);
+
+ auto result = setup.Commit(TEST_TOPIC, TEST_CONSUMER, 1, 1,
+ r1.StartPartitionSessionEvents.back().GetPartitionSession()->GetReadSessionId());
+ UNIT_ASSERT(!result.IsSuccess());
+
+ UNIT_ASSERT_VALUES_EQUAL(0, GetCommittedOffset(setup, 0));
+ UNIT_ASSERT_VALUES_EQUAL(0, GetCommittedOffset(setup, 1));
+ UNIT_ASSERT_VALUES_EQUAL(0, GetCommittedOffset(setup, 2));
+ UNIT_ASSERT_VALUES_EQUAL(0, GetCommittedOffset(setup, 3));
+ UNIT_ASSERT_VALUES_EQUAL(0, GetCommittedOffset(setup, 4));
+ }
+
+ Y_UNIT_TEST(Commit_WithSession_ParentNotFinished_OtherSession_ParentCommittedToEnd) {
+ TTopicSdkTestSetup setup = CreateSetup();
+ PrepareAutopartitionedTopic(setup);
+ TTopicClient client(setup.MakeDriver());
+
+ setup.Commit(TEST_TOPIC, TEST_CONSUMER, 0, 3);
+
+ auto r0 = setup.Read(TEST_TOPIC, TEST_CONSUMER, [](auto&) { return false; }, 0);
+ auto r1 = setup.Read(TEST_TOPIC, TEST_CONSUMER, [](auto&) { return false; }, 1);
+
+ auto result = setup.Commit(TEST_TOPIC, TEST_CONSUMER, 1, 1,
+ r1.StartPartitionSessionEvents.back().GetPartitionSession()->GetReadSessionId());
+ UNIT_ASSERT(result.IsSuccess());
+
+ UNIT_ASSERT_VALUES_EQUAL(3, GetCommittedOffset(setup, 0));
+ UNIT_ASSERT_VALUES_EQUAL(1, GetCommittedOffset(setup, 1));
+ UNIT_ASSERT_VALUES_EQUAL(0, GetCommittedOffset(setup, 2));
+ UNIT_ASSERT_VALUES_EQUAL(0, GetCommittedOffset(setup, 3));
+ UNIT_ASSERT_VALUES_EQUAL(0, GetCommittedOffset(setup, 4));
+ }
+
+ Y_UNIT_TEST(Commit_WithSession_ToPastParentPartition) {
+ TTopicSdkTestSetup setup = CreateSetup();
+ PrepareAutopartitionedTopic(setup);
+
+ {
+ // Commit parent partition to non end
+ auto result = setup.Commit(TEST_TOPIC, TEST_CONSUMER, 3, 1);
+ UNIT_ASSERT(result.IsSuccess());
+
+ UNIT_ASSERT_VALUES_EQUAL(3, GetCommittedOffset(setup, 0));
+ UNIT_ASSERT_VALUES_EQUAL(3, GetCommittedOffset(setup, 1));
+ UNIT_ASSERT_VALUES_EQUAL(0, GetCommittedOffset(setup, 2));
+ UNIT_ASSERT_VALUES_EQUAL(1, GetCommittedOffset(setup, 3));
+ UNIT_ASSERT_VALUES_EQUAL(0, GetCommittedOffset(setup, 4));
+ }
+
+ {
+ auto r = setup.Read(TEST_TOPIC, TEST_CONSUMER, [&](auto&) {
+ return false;
+ });
+
+ // Commit parent to middle
+ auto result = setup.Commit(TEST_TOPIC, TEST_CONSUMER, 1, 1, r.StartPartitionSessionEvents.front().GetPartitionSession()->GetReadSessionId());
+ UNIT_ASSERT(result.IsSuccess());
+
+ UNIT_ASSERT_VALUES_EQUAL(3, GetCommittedOffset(setup, 0));
+ UNIT_ASSERT_VALUES_EQUAL(3, GetCommittedOffset(setup, 1));
+ UNIT_ASSERT_VALUES_EQUAL(0, GetCommittedOffset(setup, 2));
+ UNIT_ASSERT_VALUES_EQUAL(1, GetCommittedOffset(setup, 3));
+ UNIT_ASSERT_VALUES_EQUAL(0, GetCommittedOffset(setup, 4));
+ }
+ }
+
+ Y_UNIT_TEST(Commit_FromSession_ToNewChild_WithoutCommitToParent) {
+ TTopicSdkTestSetup setup = CreateSetup();
+ setup.CreateTopicWithAutoscale();
+
+ setup.Write("message-0-0", 0);
+ setup.Write("message-0-1", 0);
+ setup.Write("message-0-2", 0);
+
+ {
+ bool committed = false;
+
+ auto r = setup.Read(TEST_TOPIC, TEST_CONSUMER, [&](auto& x) {
+ for (auto & m: x.GetMessages()) {
+ if (x.GetPartitionSession()->GetPartitionId() == 0 && m.GetOffset() == 1) {
+ ui64 txId = 1006;
+ SplitPartition(setup, ++txId, 0, "a");
+
+ setup.Write("message-1-0", 1);
+ setup.Write("message-1-1", 1);
+ setup.Write("message-1-2", 1);
+ } else if (x.GetPartitionSession()->GetPartitionId() == 1 && m.GetOffset() == 0) {
+ m.Commit();
+ committed = true;
+ return false;
+ }
+ }
+
+ return true;
+ });
+
+ UNIT_ASSERT(committed);
+
+ Sleep(TDuration::Seconds(3));
+
+ // Commit hasn`t applyed because messages from the parent partitions has`t been committed
+ UNIT_ASSERT_VALUES_EQUAL(0, GetCommittedOffset(setup, 0));
+ UNIT_ASSERT_VALUES_EQUAL(0, GetCommittedOffset(setup, 1));
+ UNIT_ASSERT_VALUES_EQUAL(0, GetCommittedOffset(setup, 2));
+ }
+ }
+
+ Y_UNIT_TEST(PartitionSplit_OffsetCommit) {
+ TTopicSdkTestSetup setup = CreateSetup();
+ PrepareAutopartitionedTopic(setup);
+
+ {
+ static constexpr size_t commited = 2;
+ auto status = setup.Commit(TEST_TOPIC, TEST_CONSUMER, 1, commited);
+ UNIT_ASSERT(status.IsSuccess());
+
+ UNIT_ASSERT_VALUES_EQUAL_C(3, GetCommittedOffset(setup, 0), "Must be commited to the partition end because it is the parent");
+ UNIT_ASSERT_VALUES_EQUAL(commited, GetCommittedOffset(setup, 1));
+ UNIT_ASSERT_VALUES_EQUAL(0, GetCommittedOffset(setup, 2));
+ }
+
+ {
+ static constexpr size_t commited = 3;
+ auto status = setup.Commit(TEST_TOPIC, TEST_CONSUMER, 0, commited);
+ UNIT_ASSERT(status.IsSuccess());
+
+ UNIT_ASSERT_VALUES_EQUAL(commited, GetCommittedOffset(setup, 0));
+ UNIT_ASSERT_VALUES_EQUAL_C(0, GetCommittedOffset(setup, 1), "Must be commited to the partition begin because it is the child");
+ UNIT_ASSERT_VALUES_EQUAL(0, GetCommittedOffset(setup, 2));
+
+ }
+ }
+
+ Y_UNIT_TEST(DistributedTxCommit) {
+ TTopicSdkTestSetup setup = CreateSetup();
+ PrepareAutopartitionedTopic(setup);
+
+ auto count = 0;
+ const auto expected = 15;
+
+ auto result = setup.Read(TEST_TOPIC, TEST_CONSUMER, [&](auto& x) {
+ auto& messages = x.GetMessages();
+ for (size_t i = 0u; i < messages.size(); ++i) {
+ ++count;
+ auto& message = messages[i];
+ Cerr << "SESSION EVENT read message: " << count << " from partition: " << message.GetPartitionSession()->GetPartitionId() << Endl << Flush;
+ message.Commit();
+ }
+
+ return true;
+ });
+
+ UNIT_ASSERT(result.Timeout);
+ UNIT_ASSERT_VALUES_EQUAL(count, expected);
+
+ UNIT_ASSERT_VALUES_EQUAL(3, GetCommittedOffset(setup, 0));
+ UNIT_ASSERT_VALUES_EQUAL(3, GetCommittedOffset(setup, 1));
+ UNIT_ASSERT_VALUES_EQUAL(3, GetCommittedOffset(setup, 2));
+ UNIT_ASSERT_VALUES_EQUAL(3, GetCommittedOffset(setup, 3));
+ UNIT_ASSERT_VALUES_EQUAL(3, GetCommittedOffset(setup, 4));
+ }
+
+ Y_UNIT_TEST(DistributedTxCommit_ChildFirst) {
+ TTopicSdkTestSetup setup = CreateSetup();
+ PrepareAutopartitionedTopic(setup);
+
+ auto count = 0;
+ const auto expected = 15;
+
+ std::vector<NYdb::NTopic::TReadSessionEvent::TDataReceivedEvent::TMessage> partition0Messages;
+
+ auto result = setup.Read(TEST_TOPIC, TEST_CONSUMER, [&](auto& x) {
+ auto& messages = x.GetMessages();
+ for (size_t i = 0u; i < messages.size(); ++i) {
+ auto& message = messages[i];
+ count++;
+ int partitionId = message.GetPartitionSession()->GetPartitionId();
+ Cerr << "SESSION EVENT read message: " << count << " from partition: " << partitionId << Endl << Flush;
+ if (partitionId == 1) {
+ // Commit messages from partition 1 immediately
+ message.Commit();
+ } else if (partitionId == 0) {
+ // Store messages from partition 0 for later
+ partition0Messages.push_back(message);
+ }
+ }
+
+ return true;
+ });
+
+ UNIT_ASSERT(result.Timeout);
+ UNIT_ASSERT_VALUES_EQUAL(count, expected);
+
+ UNIT_ASSERT_VALUES_EQUAL(0, GetCommittedOffset(setup, 0));
+ UNIT_ASSERT_VALUES_EQUAL(0, GetCommittedOffset(setup, 1));
+ UNIT_ASSERT_VALUES_EQUAL(0, GetCommittedOffset(setup, 2));
+ UNIT_ASSERT_VALUES_EQUAL(0, GetCommittedOffset(setup, 3));
+ UNIT_ASSERT_VALUES_EQUAL(0, GetCommittedOffset(setup, 4));
+
+ for (auto& message : partition0Messages) {
+ message.Commit();
+ }
+
+ Sleep(TDuration::Seconds(5));
+
+ UNIT_ASSERT_VALUES_EQUAL(3, GetCommittedOffset(setup, 0));
+ UNIT_ASSERT_VALUES_EQUAL(3, GetCommittedOffset(setup, 1));
+ UNIT_ASSERT_VALUES_EQUAL(0, GetCommittedOffset(setup, 2));
+ UNIT_ASSERT_VALUES_EQUAL(0, GetCommittedOffset(setup, 3));
+ UNIT_ASSERT_VALUES_EQUAL(0, GetCommittedOffset(setup, 4));
+ }
+
+ Y_UNIT_TEST(DistributedTxCommit_CheckSessionResetAfterCommit) {
+ TTopicSdkTestSetup setup = CreateSetup();
+ PrepareAutopartitionedTopic(setup);
+
+ std::unordered_map<std::string, size_t> counters;
+
+ auto result = setup.Read(TEST_TOPIC, TEST_CONSUMER, [&](auto& x) {
+ auto& messages = x.GetMessages();
+ for (size_t i = 0u; i < messages.size(); ++i) {
+ auto& message = messages[i];
+ message.Commit();
+
+ auto count = ++counters[message.GetData()];
+
+ // check we get this SeqNo two times
+ if (message.GetData() == "message-0-3" && count == 1) {
+ Sleep(TDuration::MilliSeconds(300));
+ auto status = setup.Commit(TEST_TOPIC, TEST_CONSUMER, 0, 1);
+ UNIT_ASSERT(status.IsSuccess());
+ }
+ }
+
+ return true;
+ });
+
+ UNIT_ASSERT_VALUES_EQUAL_C(1, counters["message-0-1"], "Message must be read 1 times because reset commit to offset 3, but 0 message has been read " << counters["message-0-1"] << " times") ;
+ UNIT_ASSERT_VALUES_EQUAL_C(2, counters["message-0-2"], "Message must be read 2 times because reset commit to offset 1, but 1 message has been read " << counters["message-0-2"] << " times") ;
+ UNIT_ASSERT_VALUES_EQUAL_C(2, counters["message-0-3"], "Message must be read 2 times because reset commit to offset 1, but 2 message has been read " << counters["message-0-3"] << " times") ;
+
+ {
+ auto s = result.StartPartitionSessionEvents[0];
+ UNIT_ASSERT_VALUES_EQUAL(0, s.GetPartitionSession()->GetPartitionId());
+ UNIT_ASSERT_VALUES_EQUAL(0, s.GetCommittedOffset());
+ UNIT_ASSERT_VALUES_EQUAL(3, s.GetEndOffset());
+ }
+ {
+ auto s = result.StartPartitionSessionEvents[3];
+ UNIT_ASSERT_VALUES_EQUAL(0, s.GetPartitionSession()->GetPartitionId());
+ UNIT_ASSERT_VALUES_EQUAL(1, s.GetCommittedOffset());
+ UNIT_ASSERT_VALUES_EQUAL(3, s.GetEndOffset());
+ }
+ }
+
+ Y_UNIT_TEST(DistributedTxCommit_CheckOffsetCommitForDifferentCases) {
+ TTopicSdkTestSetup setup = CreateSetup();
+ PrepareAutopartitionedTopic(setup);
+
+ auto commit = [&](const std::string& sessionId, ui64 offset) {
+ return setup.Commit(TEST_TOPIC, TEST_CONSUMER, 0, offset, sessionId);
+ };
+
+ auto commitSent = false;
+ TString readSessionId = "";
+
+ setup.Read(TEST_TOPIC, TEST_CONSUMER, [&](auto& x) {
+ auto& messages = x.GetMessages();
+ for (size_t i = 0u; i < messages.size(); ++i) {
+ auto& message = messages[i];
+ if (commitSent) {
+ // read session not changed
+ UNIT_ASSERT_EQUAL(readSessionId, message.GetPartitionSession()->GetReadSessionId());
+ }
+
+ // check we NOT get this SeqNo two times
+ if (message.GetData() == "message-0-2") {
+ if (!commitSent) {
+ commitSent = true;
+
+ readSessionId = message.GetPartitionSession()->GetReadSessionId();
+
+ {
+ auto status = commit(readSessionId, 3);
+ UNIT_ASSERT(status.IsSuccess());
+ UNIT_ASSERT_VALUES_EQUAL(GetCommittedOffset(setup, 0), 3);
+ }
+
+ {
+ // must be ignored, because commit to past
+ auto status = commit(readSessionId, 0);
+ UNIT_ASSERT(status.IsSuccess());
+ UNIT_ASSERT_VALUES_EQUAL(GetCommittedOffset(setup, 0), 3);
+ }
+
+ {
+ // must be ignored, because wrong sessionid
+ auto status = commit("random session", 0);
+ UNIT_ASSERT(!status.IsSuccess());
+ Sleep(TDuration::MilliSeconds(500));
+ UNIT_ASSERT_VALUES_EQUAL(GetCommittedOffset(setup, 0), 3);
+ }
+ } else {
+ UNIT_ASSERT(false);
+ }
+ } else {
+ message.Commit();
+ }
+ }
+
+ return true;
+ });
+ }
+
+ Y_UNIT_TEST(DistributedTxCommit_Flat_CheckOffsetCommitForDifferentCases) {
+ TTopicSdkTestSetup setup = CreateSetup();
+ PrepareFlatTopic(setup);
+
+ auto commit = [&](const std::string& sessionId, ui64 offset) {
+ return setup.Commit(TEST_TOPIC, TEST_CONSUMER, 0, offset, sessionId);
+ };
+
+ auto commitSent = false;
+ TString readSessionId = "";
+
+ setup.Read(TEST_TOPIC, TEST_CONSUMER, [&](auto& x) {
+ auto& messages = x.GetMessages();
+ for (size_t i = 0u; i < messages.size(); ++i) {
+ auto& message = messages[i];
+
+ if (commitSent) {
+ // read session not changed
+ UNIT_ASSERT_EQUAL(readSessionId, message.GetPartitionSession()->GetReadSessionId());
+ }
+
+ // check we NOT get this message two times
+ if (message.GetData() == "message-0-2") {
+ if (!commitSent) {
+ commitSent = true;
+
+ readSessionId = message.GetPartitionSession()->GetReadSessionId();
+
+ {
+ auto status = commit(readSessionId, 3);
+ UNIT_ASSERT(status.IsSuccess());
+ UNIT_ASSERT_VALUES_EQUAL(GetCommittedOffset(setup, 0), 3);
+ }
+
+ {
+ // must be ignored, because commit to past
+ auto status = commit(readSessionId, 0);
+ UNIT_ASSERT(status.IsSuccess());
+ UNIT_ASSERT_VALUES_EQUAL(GetCommittedOffset(setup, 0), 3);
+ }
+
+ {
+ // must be ignored, because wrong sessionid
+ auto status = commit("random session", 0);
+ UNIT_ASSERT(!status.IsSuccess());
+ Sleep(TDuration::MilliSeconds(500));
+ UNIT_ASSERT_VALUES_EQUAL(GetCommittedOffset(setup, 0), 3);
+ }
+ } else {
+ UNIT_ASSERT(false);
+ }
+ } else {
+ message.Commit();
+ }
+ }
+
+ return true;
+ });
+ }
+}
+
+} // namespace NKikimr
diff --git a/ydb/core/persqueue/ut/ut_with_sdk/topic_ut.cpp b/ydb/core/persqueue/ut/ut_with_sdk/topic_ut.cpp
index 5cc8256aaa0..aadf1ad1627 100644
--- a/ydb/core/persqueue/ut/ut_with_sdk/topic_ut.cpp
+++ b/ydb/core/persqueue/ut/ut_with_sdk/topic_ut.cpp
@@ -162,93 +162,6 @@ Y_UNIT_TEST_SUITE(WithSDK) {
UNIT_ASSERT_VALUES_EQUAL(2, c->GetLastReadOffset());
}
}
-
- Y_UNIT_TEST(CommitWithWrongSessionId) {
- TTopicSdkTestSetup setup = CreateSetup();
- setup.CreateTopic(std::string{TEST_TOPIC}, std::string{TEST_CONSUMER}, 1);
-
- setup.Write("message-1");
- setup.Write("message-2");
- setup.Write("message-3");
-
- {
- auto result = setup.Commit(TString(TEST_TOPIC), TEST_CONSUMER, 0, 1, "wrong-read-session-id");
- UNIT_ASSERT_C(!result.IsSuccess(), "Commit doesn`t work with wrong session id");
-
- auto desc = setup.DescribeConsumer(TString(TEST_TOPIC), TEST_CONSUMER);
- UNIT_ASSERT_VALUES_EQUAL(0, desc.GetPartitions().at(0).GetPartitionConsumerStats()->GetCommittedOffset());
- }
- }
-
- Y_UNIT_TEST(CommitToPastWithWrongSessionId) {
- TTopicSdkTestSetup setup = CreateSetup();
- setup.CreateTopic(std::string{TEST_TOPIC}, std::string{TEST_CONSUMER}, 1);
-
- setup.Write("message-1");
- setup.Write("message-2");
- setup.Write("message-3");
-
- {
- auto result = setup.Commit(TString(TEST_TOPIC), TEST_CONSUMER, 0, 2);
- UNIT_ASSERT_C(result.IsSuccess(), "Commited without session id. It is reset mode");
-
- auto desc = setup.DescribeConsumer(TString(TEST_TOPIC), TEST_CONSUMER);
- UNIT_ASSERT_VALUES_EQUAL(2, desc.GetPartitions().at(0).GetPartitionConsumerStats()->GetCommittedOffset());
- }
-
- {
- auto result = setup.Commit(TString(TEST_TOPIC), TEST_CONSUMER, 0, 0, "wrong-read-session-id");
- UNIT_ASSERT_C(!result.IsSuccess(), "Commit doesn`t work with wrong session id");
-
- auto desc = setup.DescribeConsumer(TString(TEST_TOPIC), TEST_CONSUMER);
- UNIT_ASSERT_VALUES_EQUAL(2, desc.GetPartitions().at(0).GetPartitionConsumerStats()->GetCommittedOffset());
- }
- }
-
- /* TODO Uncomment this test
- Y_UNIT_TEST(CommitToParentPartitionWithWrongSessionId) {
- TTopicSdkTestSetup setup = CreateSetup();
- setup.CreateTopicWithAutoscale();
-
- setup.Write("message-1", 0);
-
- {
- ui64 txId = 1006;
- SplitPartition(setup, ++txId, 0, "a");
- }
-
- setup.Write("message-2", 1);
-
- Cerr << ">>>>> BEGIN 0" << Endl << Flush;
- {
- auto result = setup.Commit(TString(TEST_TOPIC), TEST_CONSUMER, 0, 1);
- UNIT_ASSERT_C(result.IsSuccess(), "Commited without session id. It is reset mode");
-
- auto desc = setup.DescribeConsumer(TString(TEST_TOPIC), TEST_CONSUMER);
- UNIT_ASSERT_VALUES_EQUAL(1, desc.GetPartitions().at(0).GetPartitionConsumerStats()->GetCommittedOffset());
- }
-
- Cerr << ">>>>> BEGIN 1" << Endl << Flush;
- {
- auto result = setup.Commit(TString(TEST_TOPIC), TEST_CONSUMER, 1, 1);
- UNIT_ASSERT_C(result.IsSuccess(), "Commited without session id. It is reset mode");
-
- auto desc = setup.DescribeConsumer(TString(TEST_TOPIC), TEST_CONSUMER);
- UNIT_ASSERT_VALUES_EQUAL(1, desc.GetPartitions().at(1).GetPartitionConsumerStats()->GetCommittedOffset());
- }
-
- Cerr << ">>>>> BEGIN 2" << Endl << Flush;
- {
- auto result = setup.Commit(TString(TEST_TOPIC), TEST_CONSUMER, 0, 0, "wrong-read-session-id");
- UNIT_ASSERT_C(!result.IsSuccess(), "Commit doesn`t work with wrong session id");
-
- auto desc = setup.DescribeConsumer(TString(TEST_TOPIC), TEST_CONSUMER);
- UNIT_ASSERT_VALUES_EQUAL_C(1, desc.GetPartitions().at(0).GetPartitionConsumerStats()->GetCommittedOffset(), "Offset doesn`t changed");
- }
- Cerr << ">>>>> END" << Endl << Flush;
-
- }
- */
}
} // namespace NKikimr
diff --git a/ydb/core/persqueue/ut/ut_with_sdk/ya.make b/ydb/core/persqueue/ut/ut_with_sdk/ya.make
index eb370dff024..c837ecdfa64 100644
--- a/ydb/core/persqueue/ut/ut_with_sdk/ya.make
+++ b/ydb/core/persqueue/ut/ut_with_sdk/ya.make
@@ -31,6 +31,7 @@ YQL_LAST_ABI_VERSION()
SRCS(
autoscaling_ut.cpp
balancing_ut.cpp
+ commitoffset_ut.cpp
mirrorer_ut.cpp
topic_ut.cpp
)
diff --git a/ydb/public/sdk/cpp/src/client/topic/ut/ut_utils/topic_sdk_test_setup.cpp b/ydb/public/sdk/cpp/src/client/topic/ut/ut_utils/topic_sdk_test_setup.cpp
index 1110c0f038b..0055dd8543c 100644
--- a/ydb/public/sdk/cpp/src/client/topic/ut/ut_utils/topic_sdk_test_setup.cpp
+++ b/ydb/public/sdk/cpp/src/client/topic/ut/ut_utils/topic_sdk_test_setup.cpp
@@ -98,14 +98,22 @@ void TTopicSdkTestSetup::Write(const std::string& message, ui32 partitionId, con
session->Close(TDuration::Seconds(5));
}
-TTopicSdkTestSetup::ReadResult TTopicSdkTestSetup::Read(const std::string& topic, const std::string& consumer, std::function<bool (NYdb::NTopic::TReadSessionEvent::TDataReceivedEvent&)> handler, const TDuration timeout) {
+TTopicSdkTestSetup::ReadResult TTopicSdkTestSetup::Read(const std::string& topic, const std::string& consumer,
+ std::function<bool (NYdb::NTopic::TReadSessionEvent::TDataReceivedEvent&)> handler,
+ std::optional<size_t> partition, const TDuration timeout) {
TTopicClient client(MakeDriver());
- auto reader = client.CreateReadSession(
- TReadSessionSettings()
- .AutoPartitioningSupport(true)
- .AppendTopics(TTopicReadSettings(topic))
- .ConsumerName(consumer));
+ auto topicSettings = TTopicReadSettings(topic);
+ if (partition) {
+ topicSettings.AppendPartitionIds(partition.value());
+ }
+
+ auto settins = TReadSessionSettings()
+ .AutoPartitioningSupport(true)
+ .AppendTopics(topicSettings)
+ .ConsumerName(consumer);
+
+ auto reader = client.CreateReadSession(settins);
TInstant deadlineTime = TInstant::Now() + timeout;
diff --git a/ydb/public/sdk/cpp/src/client/topic/ut/ut_utils/topic_sdk_test_setup.h b/ydb/public/sdk/cpp/src/client/topic/ut/ut_utils/topic_sdk_test_setup.h
index 9502b9fd4ff..190fb3bd624 100644
--- a/ydb/public/sdk/cpp/src/client/topic/ut/ut_utils/topic_sdk_test_setup.h
+++ b/ydb/public/sdk/cpp/src/client/topic/ut/ut_utils/topic_sdk_test_setup.h
@@ -32,7 +32,9 @@ public:
std::vector<NYdb::NTopic::TReadSessionEvent::TStartPartitionSessionEvent> StartPartitionSessionEvents;
};
- ReadResult Read(const std::string& topic, const std::string& consumer, std::function<bool (NYdb::NTopic::TReadSessionEvent::TDataReceivedEvent&)> handler, const TDuration timeout = TDuration::Seconds(5));
+ ReadResult Read(const std::string& topic, const std::string& consumer,
+ std::function<bool (NYdb::NTopic::TReadSessionEvent::TDataReceivedEvent&)> handler,
+ std::optional<size_t> partition = std::nullopt, const TDuration timeout = TDuration::Seconds(5));
TStatus Commit(const std::string& path, const std::string& consumerName, size_t partitionId, size_t offset, std::optional<std::string> sessionId = std::nullopt);
TString GetEndpoint() const;
diff --git a/ydb/services/persqueue_v1/actors/commit_offset_actor.cpp b/ydb/services/persqueue_v1/actors/commit_offset_actor.cpp
index 471162c5f9a..415e99d676f 100644
--- a/ydb/services/persqueue_v1/actors/commit_offset_actor.cpp
+++ b/ydb/services/persqueue_v1/actors/commit_offset_actor.cpp
@@ -106,24 +106,42 @@ void TCommitOffsetActor::Handle(TEvPQProxy::TEvAuthResultOk::TPtr& ev, const TAc
if (partitionNode->AllParents.size() == 0 && partitionNode->DirectChildren.size() == 0) {
SendCommit(topicInitInfo, commitRequest, ctx);
} else {
- auto killReadSession = commitRequest->read_session_id().empty();
+ auto hasReadSession = !commitRequest->read_session_id().empty();
+ auto killReadSession = !hasReadSession;
+ const TString& readSessionId = commitRequest->read_session_id();
+
std::vector<TDistributedCommitHelper::TCommitInfo> commits;
for (auto& parent: partitionNode->AllParents) {
- TDistributedCommitHelper::TCommitInfo commit {.PartitionId = parent->Id, .Offset = Max<i64>(), .KillReadSession = killReadSession, .OnlyCheckCommitedToFinish = false};
+ TDistributedCommitHelper::TCommitInfo commit {
+ .PartitionId = parent->Id,
+ .Offset = Max<i64>(),
+ .KillReadSession = killReadSession,
+ .OnlyCheckCommitedToFinish = false,
+ .ReadSessionId = readSessionId
+ };
commits.push_back(commit);
}
- for (auto& child: partitionNode->AllChildren) {
- TDistributedCommitHelper::TCommitInfo commit {.PartitionId = child->Id, .Offset = 0, .KillReadSession = killReadSession, .OnlyCheckCommitedToFinish = false};
- commits.push_back(commit);
+ if (!hasReadSession) {
+ for (auto& child: partitionNode->AllChildren) {
+ TDistributedCommitHelper::TCommitInfo commit {
+ .PartitionId = child->Id,
+ .Offset = 0,
+ .KillReadSession = true,
+ .OnlyCheckCommitedToFinish = false
+ };
+ commits.push_back(commit);
+ }
}
- TDistributedCommitHelper::TCommitInfo commit {.PartitionId = partitionNode->Id, .Offset = commitRequest->offset(), .KillReadSession = killReadSession, .OnlyCheckCommitedToFinish = false};
-
- if (!commitRequest->read_session_id().empty()) {
- commit.ReadSessionId = commitRequest->read_session_id();
- }
+ TDistributedCommitHelper::TCommitInfo commit {
+ .PartitionId = partitionNode->Id,
+ .Offset = commitRequest->offset(),
+ .KillReadSession = killReadSession,
+ .OnlyCheckCommitedToFinish = false,
+ .ReadSessionId = readSessionId
+ };
commits.push_back(commit);
Kqp = std::make_unique<TDistributedCommitHelper>(Request().GetDatabaseName().GetOrElse(TString()), ClientId, topic, commits);
diff --git a/ydb/services/persqueue_v1/actors/read_session_actor.cpp b/ydb/services/persqueue_v1/actors/read_session_actor.cpp
index 611befb9196..16352cc0407 100644
--- a/ydb/services/persqueue_v1/actors/read_session_actor.cpp
+++ b/ydb/services/persqueue_v1/actors/read_session_actor.cpp
@@ -1278,7 +1278,7 @@ void TReadSessionActor<UseMigrationProtocol>::Handle(TEvPersQueue::TEvLockPartit
}
std::unordered_set<ui64> notCommitedToFinishParents;
- for (auto& parent: topic.PartitionGraph->GetPartition(record.GetPartition())->DirectParents) {
+ for (auto& parent: partitionNode->DirectParents) {
for (auto& [_, actorInfo]: Partitions) { // TODO: map
if (actorInfo.Partition.Partition == parent->Id && !actorInfo.IsLastOffsetCommitted()) {
notCommitedToFinishParents.emplace(actorInfo.Partition.Partition);
@@ -1286,7 +1286,7 @@ void TReadSessionActor<UseMigrationProtocol>::Handle(TEvPersQueue::TEvLockPartit
}
}
- const auto& parentPartitions = topic.PartitionGraph->GetPartition(partitionId.Partition)->AllParents;
+ const auto& parentPartitions = partitionNode->AllParents;
const auto database = Request->GetDatabaseName().GetOrElse(AppData(ctx)->PQConfig.GetDatabase());
const TActorId actorId = ctx.Register(new TPartitionActor(
ctx.SelfID, ClientId, ClientPath, Cookie, Session, partitionId, record.GetGeneration(),