diff options
author | komels <komels@ydb.tech> | 2023-08-09 07:37:44 +0300 |
---|---|---|
committer | komels <komels@ydb.tech> | 2023-08-09 08:02:42 +0300 |
commit | 0d19892e44c1e2c43f86a9380e1e83af79245155 (patch) | |
tree | e9d94099aee4b26964854da82e6b0990f431e003 | |
parent | c4c5c6d2bbec0deeffe1b3d8e387a6039d05c1d7 (diff) | |
download | ydb-0d19892e44c1e2c43f86a9380e1e83af79245155.tar.gz |
Verify commit offset only when provided in topic API
-rw-r--r-- | ydb/core/persqueue/ut/mirrorer_ut.cpp | 71 | ||||
-rw-r--r-- | ydb/services/persqueue_v1/actors/events.h | 9 | ||||
-rw-r--r-- | ydb/services/persqueue_v1/actors/partition_actor.cpp | 8 | ||||
-rw-r--r-- | ydb/services/persqueue_v1/actors/partition_actor.h | 2 | ||||
-rw-r--r-- | ydb/services/persqueue_v1/actors/read_session_actor.ipp | 16 |
5 files changed, 92 insertions, 14 deletions
diff --git a/ydb/core/persqueue/ut/mirrorer_ut.cpp b/ydb/core/persqueue/ut/mirrorer_ut.cpp index ed2818716ac..bfc59d125e0 100644 --- a/ydb/core/persqueue/ut/mirrorer_ut.cpp +++ b/ydb/core/persqueue/ut/mirrorer_ut.cpp @@ -251,5 +251,76 @@ Y_UNIT_TEST_SUITE(TPersQueueMirrorer) { } } + + Y_UNIT_TEST(ValidStartStream) { + using namespace NYdb::NTopic; + + NPersQueue::TTestServer server; + TString topic = "topic1"; + TString topicFullName = "rt3.dc1--" + topic; + + server.AnnoyingClient->CreateTopic(topicFullName, 1); + + auto driver = server.AnnoyingClient->GetDriver(); + auto writer = CreateSimpleWriter(*driver, topic, "src-id-test"); + for (auto i = 0u; i < 5; i++) { + auto res = writer->Write(TString(10, 'a')); + UNIT_ASSERT(res); + } + + auto createTopicReader = [&](const TString& topic) { + auto settings = TReadSessionSettings() + .AppendTopics(TTopicReadSettings(topic)) + .ConsumerName("shared/user") + .Decompress(false); + + return TTopicClient(*driver).CreateReadSession(settings); + }; + auto reader = createTopicReader(topic); + ui64 messagesGot = 0; + while(true) { + auto event = reader->GetEvent(true); + UNIT_ASSERT(event); + if (auto* dataEvent = std::get_if<TReadSessionEvent::TDataReceivedEvent>(&*event)) { + for (auto msg : dataEvent->GetCompressedMessages()) { + msg.Commit(); + messagesGot++; + } + if (messagesGot == 5) { + reader->Close(); + } + } else if (auto* lockEv = std::get_if<TReadSessionEvent::TStartPartitionSessionEvent>(&*event)) { + lockEv->Confirm(); + } else if (auto* releaseEv = std::get_if<TReadSessionEvent::TStopPartitionSessionEvent>(&*event)) { + releaseEv->Confirm(); + } else if (auto* closeSessionEvent = std::get_if<TSessionClosedEvent>(&*event)) { + UNIT_ASSERT_VALUES_EQUAL(messagesGot, 5); + break; + } + } + + for (auto i = 0u; i < 5; i++) { + auto res = writer->Write(TString(10, 'b')); + UNIT_ASSERT(res); + } + + auto res = writer->Close(TDuration::Seconds(10)); + UNIT_ASSERT(res); + + reader = createTopicReader(topic); + bool gotData = false; + while(!gotData) { + auto event = reader->GetEvent(true); + UNIT_ASSERT(event); + if (auto dataEvent = std::get_if<TReadSessionEvent::TDataReceivedEvent>(&*event)) { + gotData = true; + } else if (auto* lockEv = std::get_if<TReadSessionEvent::TStartPartitionSessionEvent>(&*event)) { + lockEv->Confirm(5); + } else if (auto* closeSessionEvent = std::get_if<TSessionClosedEvent>(&*event)) { + UNIT_FAIL(closeSessionEvent->DebugString()); + break; + } + } + } } } // NKikimr::NPersQueueTests diff --git a/ydb/services/persqueue_v1/actors/events.h b/ydb/services/persqueue_v1/actors/events.h index ba7e6d0ec24..ee3b7b7dfa6 100644 --- a/ydb/services/persqueue_v1/actors/events.h +++ b/ydb/services/persqueue_v1/actors/events.h @@ -325,7 +325,7 @@ struct TEvPQProxy { }; struct TEvStartRead : public NActors::TEventLocal<TEvStartRead, EvStartRead> { - TEvStartRead(ui64 id, ui64 readOffset, ui64 commitOffset, bool verifyReadOffset) + TEvStartRead(ui64 id, ui64 readOffset, const TMaybe<ui64>& commitOffset, bool verifyReadOffset) : AssignId(id) , ReadOffset(readOffset) , CommitOffset(commitOffset) @@ -335,7 +335,7 @@ struct TEvPQProxy { const ui64 AssignId; ui64 ReadOffset; - ui64 CommitOffset; + TMaybe<ui64> CommitOffset; bool VerifyReadOffset; ui64 Generation; }; @@ -377,7 +377,8 @@ struct TEvPQProxy { }; struct TEvLockPartition : public NActors::TEventLocal<TEvLockPartition, EvLockPartition> { - explicit TEvLockPartition(const ui64 readOffset, const ui64 commitOffset, bool verifyReadOffset, bool startReading) + explicit TEvLockPartition(const ui64 readOffset, const TMaybe<ui64>& commitOffset, bool verifyReadOffset, + bool startReading) : ReadOffset(readOffset) , CommitOffset(commitOffset) , VerifyReadOffset(verifyReadOffset) @@ -385,7 +386,7 @@ struct TEvPQProxy { { } ui64 ReadOffset; - ui64 CommitOffset; + TMaybe<ui64> CommitOffset; bool VerifyReadOffset; bool StartReading; }; diff --git a/ydb/services/persqueue_v1/actors/partition_actor.cpp b/ydb/services/persqueue_v1/actors/partition_actor.cpp index 9e01d199a6e..7895934a13a 100644 --- a/ydb/services/persqueue_v1/actors/partition_actor.cpp +++ b/ydb/services/persqueue_v1/actors/partition_actor.cpp @@ -715,7 +715,7 @@ void TPartitionActor::InitStartReading(const TActorContext& ctx) { return; } - if (ClientCommitOffset < CommittedOffset) { + if (ClientCommitOffset.Defined() && *ClientCommitOffset < CommittedOffset) { ctx.Send(ParentId, new TEvPQProxy::TEvCloseSession(TStringBuilder() << "trying to commit to position that is less than committed: read " << ClientCommitOffset @@ -733,7 +733,7 @@ void TPartitionActor::InitStartReading(const TActorContext& ctx) { } } - if (ClientCommitOffset > CommittedOffset) { + if (ClientCommitOffset.GetOrElse(0) > CommittedOffset) { if (ClientCommitOffset > ReadOffset) { ctx.Send(ParentId, new TEvPQProxy::TEvCloseSession(TStringBuilder() @@ -742,7 +742,7 @@ void TPartitionActor::InitStartReading(const TActorContext& ctx) { PersQueue::ErrorCode::BAD_REQUEST)); return; } - if (ClientCommitOffset > EndOffset) { + if (ClientCommitOffset.GetOrElse(0) > EndOffset) { ctx.Send(ParentId, new TEvPQProxy::TEvCloseSession(TStringBuilder() << "trying to commit to future: commit " << ClientCommitOffset << " endOffset " << EndOffset, @@ -750,7 +750,7 @@ void TPartitionActor::InitStartReading(const TActorContext& ctx) { return; } Y_VERIFY(CommitsInfly.empty()); - CommitsInfly.push_back(std::pair<ui64, TCommitInfo>(Max<ui64>(), {Max<ui64>(), ClientCommitOffset, ctx.Now()})); + CommitsInfly.push_back(std::pair<ui64, TCommitInfo>(Max<ui64>(), {Max<ui64>(), ClientCommitOffset.GetOrElse(0), ctx.Now()})); Counters.SLITotal.Inc(); if (PipeClient) //pipe will be recreated soon SendCommit(CommitsInfly.back().first, CommitsInfly.back().second.Offset, ctx); diff --git a/ydb/services/persqueue_v1/actors/partition_actor.h b/ydb/services/persqueue_v1/actors/partition_actor.h index ef1f00c2e4b..4399f3ca5ca 100644 --- a/ydb/services/persqueue_v1/actors/partition_actor.h +++ b/ydb/services/persqueue_v1/actors/partition_actor.h @@ -150,7 +150,7 @@ private: ui64 ReadOffset; ui64 ClientReadOffset; - ui64 ClientCommitOffset; + TMaybe<ui64> ClientCommitOffset; bool ClientVerifyReadOffset; ui64 CommittedOffset; ui64 WriteTimestampEstimateMs; diff --git a/ydb/services/persqueue_v1/actors/read_session_actor.ipp b/ydb/services/persqueue_v1/actors/read_session_actor.ipp index 4dc966dbb89..5dde8abc52b 100644 --- a/ydb/services/persqueue_v1/actors/read_session_actor.ipp +++ b/ydb/services/persqueue_v1/actors/read_session_actor.ipp @@ -148,10 +148,11 @@ void TReadSessionActor<UseMigrationProtocol>::Handle(typename IContext::TEvReadF const auto& req = request.start_read(); const ui64 readOffset = req.read_offset(); - const ui64 commitOffset = req.commit_offset(); const bool verifyReadOffset = req.verify_read_offset(); - ctx.Send(ctx.SelfID, new TEvPQProxy::TEvStartRead(getAssignId(request.start_read()), readOffset, commitOffset, verifyReadOffset)); + ctx.Send(ctx.SelfID, new TEvPQProxy::TEvStartRead( + getAssignId(request.start_read()), readOffset, req.commit_offset(), verifyReadOffset + )); return (void)ReadFromStreamOrDie(ctx); } @@ -218,7 +219,10 @@ void TReadSessionActor<UseMigrationProtocol>::Handle(typename IContext::TEvReadF const ui64 readOffset = req.read_offset(); const ui64 commitOffset = req.commit_offset(); - ctx.Send(ctx.SelfID, new TEvPQProxy::TEvStartRead(getAssignId(req), readOffset, commitOffset, req.has_read_offset())); + ctx.Send(ctx.SelfID, new TEvPQProxy::TEvStartRead( + getAssignId(req), readOffset, req.has_commit_offset() ? commitOffset : TMaybe<ui64>{}, + req.has_read_offset() + )); return (void)ReadFromStreamOrDie(ctx); } @@ -446,7 +450,9 @@ void TReadSessionActor<UseMigrationProtocol>::Handle(TEvPQProxy::TEvStartRead::T // proxy request to partition - allow initing // TODO: add here VerifyReadOffset too and check it againts Committed position - ctx.Send(it->second.Actor, new TEvPQProxy::TEvLockPartition(ev->Get()->ReadOffset, ev->Get()->CommitOffset, ev->Get()->VerifyReadOffset, true)); + ctx.Send(it->second.Actor, new TEvPQProxy::TEvLockPartition( + ev->Get()->ReadOffset, ev->Get()->CommitOffset, ev->Get()->VerifyReadOffset, true + )); } template <bool UseMigrationProtocol> @@ -1044,7 +1050,7 @@ void TReadSessionActor<UseMigrationProtocol>::Handle(TEvPersQueue::TEvLockPartit LOG_INFO_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " assign" << ": record# " << record); - ctx.Send(actorId, new TEvPQProxy::TEvLockPartition(0, 0, false, false)); + ctx.Send(actorId, new TEvPQProxy::TEvLockPartition(0, {}, false, false)); } template <bool UseMigrationProtocol> |