aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorkomels <komels@ydb.tech>2023-08-09 07:37:44 +0300
committerkomels <komels@ydb.tech>2023-08-09 08:02:42 +0300
commit0d19892e44c1e2c43f86a9380e1e83af79245155 (patch)
treee9d94099aee4b26964854da82e6b0990f431e003
parentc4c5c6d2bbec0deeffe1b3d8e387a6039d05c1d7 (diff)
downloadydb-0d19892e44c1e2c43f86a9380e1e83af79245155.tar.gz
Verify commit offset only when provided in topic API
-rw-r--r--ydb/core/persqueue/ut/mirrorer_ut.cpp71
-rw-r--r--ydb/services/persqueue_v1/actors/events.h9
-rw-r--r--ydb/services/persqueue_v1/actors/partition_actor.cpp8
-rw-r--r--ydb/services/persqueue_v1/actors/partition_actor.h2
-rw-r--r--ydb/services/persqueue_v1/actors/read_session_actor.ipp16
5 files changed, 92 insertions, 14 deletions
diff --git a/ydb/core/persqueue/ut/mirrorer_ut.cpp b/ydb/core/persqueue/ut/mirrorer_ut.cpp
index ed2818716ac..bfc59d125e0 100644
--- a/ydb/core/persqueue/ut/mirrorer_ut.cpp
+++ b/ydb/core/persqueue/ut/mirrorer_ut.cpp
@@ -251,5 +251,76 @@ Y_UNIT_TEST_SUITE(TPersQueueMirrorer) {
}
}
+
+ Y_UNIT_TEST(ValidStartStream) {
+ using namespace NYdb::NTopic;
+
+ NPersQueue::TTestServer server;
+ TString topic = "topic1";
+ TString topicFullName = "rt3.dc1--" + topic;
+
+ server.AnnoyingClient->CreateTopic(topicFullName, 1);
+
+ auto driver = server.AnnoyingClient->GetDriver();
+ auto writer = CreateSimpleWriter(*driver, topic, "src-id-test");
+ for (auto i = 0u; i < 5; i++) {
+ auto res = writer->Write(TString(10, 'a'));
+ UNIT_ASSERT(res);
+ }
+
+ auto createTopicReader = [&](const TString& topic) {
+ auto settings = TReadSessionSettings()
+ .AppendTopics(TTopicReadSettings(topic))
+ .ConsumerName("shared/user")
+ .Decompress(false);
+
+ return TTopicClient(*driver).CreateReadSession(settings);
+ };
+ auto reader = createTopicReader(topic);
+ ui64 messagesGot = 0;
+ while(true) {
+ auto event = reader->GetEvent(true);
+ UNIT_ASSERT(event);
+ if (auto* dataEvent = std::get_if<TReadSessionEvent::TDataReceivedEvent>(&*event)) {
+ for (auto msg : dataEvent->GetCompressedMessages()) {
+ msg.Commit();
+ messagesGot++;
+ }
+ if (messagesGot == 5) {
+ reader->Close();
+ }
+ } else if (auto* lockEv = std::get_if<TReadSessionEvent::TStartPartitionSessionEvent>(&*event)) {
+ lockEv->Confirm();
+ } else if (auto* releaseEv = std::get_if<TReadSessionEvent::TStopPartitionSessionEvent>(&*event)) {
+ releaseEv->Confirm();
+ } else if (auto* closeSessionEvent = std::get_if<TSessionClosedEvent>(&*event)) {
+ UNIT_ASSERT_VALUES_EQUAL(messagesGot, 5);
+ break;
+ }
+ }
+
+ for (auto i = 0u; i < 5; i++) {
+ auto res = writer->Write(TString(10, 'b'));
+ UNIT_ASSERT(res);
+ }
+
+ auto res = writer->Close(TDuration::Seconds(10));
+ UNIT_ASSERT(res);
+
+ reader = createTopicReader(topic);
+ bool gotData = false;
+ while(!gotData) {
+ auto event = reader->GetEvent(true);
+ UNIT_ASSERT(event);
+ if (auto dataEvent = std::get_if<TReadSessionEvent::TDataReceivedEvent>(&*event)) {
+ gotData = true;
+ } else if (auto* lockEv = std::get_if<TReadSessionEvent::TStartPartitionSessionEvent>(&*event)) {
+ lockEv->Confirm(5);
+ } else if (auto* closeSessionEvent = std::get_if<TSessionClosedEvent>(&*event)) {
+ UNIT_FAIL(closeSessionEvent->DebugString());
+ break;
+ }
+ }
+ }
}
} // NKikimr::NPersQueueTests
diff --git a/ydb/services/persqueue_v1/actors/events.h b/ydb/services/persqueue_v1/actors/events.h
index ba7e6d0ec24..ee3b7b7dfa6 100644
--- a/ydb/services/persqueue_v1/actors/events.h
+++ b/ydb/services/persqueue_v1/actors/events.h
@@ -325,7 +325,7 @@ struct TEvPQProxy {
};
struct TEvStartRead : public NActors::TEventLocal<TEvStartRead, EvStartRead> {
- TEvStartRead(ui64 id, ui64 readOffset, ui64 commitOffset, bool verifyReadOffset)
+ TEvStartRead(ui64 id, ui64 readOffset, const TMaybe<ui64>& commitOffset, bool verifyReadOffset)
: AssignId(id)
, ReadOffset(readOffset)
, CommitOffset(commitOffset)
@@ -335,7 +335,7 @@ struct TEvPQProxy {
const ui64 AssignId;
ui64 ReadOffset;
- ui64 CommitOffset;
+ TMaybe<ui64> CommitOffset;
bool VerifyReadOffset;
ui64 Generation;
};
@@ -377,7 +377,8 @@ struct TEvPQProxy {
};
struct TEvLockPartition : public NActors::TEventLocal<TEvLockPartition, EvLockPartition> {
- explicit TEvLockPartition(const ui64 readOffset, const ui64 commitOffset, bool verifyReadOffset, bool startReading)
+ explicit TEvLockPartition(const ui64 readOffset, const TMaybe<ui64>& commitOffset, bool verifyReadOffset,
+ bool startReading)
: ReadOffset(readOffset)
, CommitOffset(commitOffset)
, VerifyReadOffset(verifyReadOffset)
@@ -385,7 +386,7 @@ struct TEvPQProxy {
{ }
ui64 ReadOffset;
- ui64 CommitOffset;
+ TMaybe<ui64> CommitOffset;
bool VerifyReadOffset;
bool StartReading;
};
diff --git a/ydb/services/persqueue_v1/actors/partition_actor.cpp b/ydb/services/persqueue_v1/actors/partition_actor.cpp
index 9e01d199a6e..7895934a13a 100644
--- a/ydb/services/persqueue_v1/actors/partition_actor.cpp
+++ b/ydb/services/persqueue_v1/actors/partition_actor.cpp
@@ -715,7 +715,7 @@ void TPartitionActor::InitStartReading(const TActorContext& ctx) {
return;
}
- if (ClientCommitOffset < CommittedOffset) {
+ if (ClientCommitOffset.Defined() && *ClientCommitOffset < CommittedOffset) {
ctx.Send(ParentId,
new TEvPQProxy::TEvCloseSession(TStringBuilder()
<< "trying to commit to position that is less than committed: read " << ClientCommitOffset
@@ -733,7 +733,7 @@ void TPartitionActor::InitStartReading(const TActorContext& ctx) {
}
}
- if (ClientCommitOffset > CommittedOffset) {
+ if (ClientCommitOffset.GetOrElse(0) > CommittedOffset) {
if (ClientCommitOffset > ReadOffset) {
ctx.Send(ParentId,
new TEvPQProxy::TEvCloseSession(TStringBuilder()
@@ -742,7 +742,7 @@ void TPartitionActor::InitStartReading(const TActorContext& ctx) {
PersQueue::ErrorCode::BAD_REQUEST));
return;
}
- if (ClientCommitOffset > EndOffset) {
+ if (ClientCommitOffset.GetOrElse(0) > EndOffset) {
ctx.Send(ParentId,
new TEvPQProxy::TEvCloseSession(TStringBuilder()
<< "trying to commit to future: commit " << ClientCommitOffset << " endOffset " << EndOffset,
@@ -750,7 +750,7 @@ void TPartitionActor::InitStartReading(const TActorContext& ctx) {
return;
}
Y_VERIFY(CommitsInfly.empty());
- CommitsInfly.push_back(std::pair<ui64, TCommitInfo>(Max<ui64>(), {Max<ui64>(), ClientCommitOffset, ctx.Now()}));
+ CommitsInfly.push_back(std::pair<ui64, TCommitInfo>(Max<ui64>(), {Max<ui64>(), ClientCommitOffset.GetOrElse(0), ctx.Now()}));
Counters.SLITotal.Inc();
if (PipeClient) //pipe will be recreated soon
SendCommit(CommitsInfly.back().first, CommitsInfly.back().second.Offset, ctx);
diff --git a/ydb/services/persqueue_v1/actors/partition_actor.h b/ydb/services/persqueue_v1/actors/partition_actor.h
index ef1f00c2e4b..4399f3ca5ca 100644
--- a/ydb/services/persqueue_v1/actors/partition_actor.h
+++ b/ydb/services/persqueue_v1/actors/partition_actor.h
@@ -150,7 +150,7 @@ private:
ui64 ReadOffset;
ui64 ClientReadOffset;
- ui64 ClientCommitOffset;
+ TMaybe<ui64> ClientCommitOffset;
bool ClientVerifyReadOffset;
ui64 CommittedOffset;
ui64 WriteTimestampEstimateMs;
diff --git a/ydb/services/persqueue_v1/actors/read_session_actor.ipp b/ydb/services/persqueue_v1/actors/read_session_actor.ipp
index 4dc966dbb89..5dde8abc52b 100644
--- a/ydb/services/persqueue_v1/actors/read_session_actor.ipp
+++ b/ydb/services/persqueue_v1/actors/read_session_actor.ipp
@@ -148,10 +148,11 @@ void TReadSessionActor<UseMigrationProtocol>::Handle(typename IContext::TEvReadF
const auto& req = request.start_read();
const ui64 readOffset = req.read_offset();
- const ui64 commitOffset = req.commit_offset();
const bool verifyReadOffset = req.verify_read_offset();
- ctx.Send(ctx.SelfID, new TEvPQProxy::TEvStartRead(getAssignId(request.start_read()), readOffset, commitOffset, verifyReadOffset));
+ ctx.Send(ctx.SelfID, new TEvPQProxy::TEvStartRead(
+ getAssignId(request.start_read()), readOffset, req.commit_offset(), verifyReadOffset
+ ));
return (void)ReadFromStreamOrDie(ctx);
}
@@ -218,7 +219,10 @@ void TReadSessionActor<UseMigrationProtocol>::Handle(typename IContext::TEvReadF
const ui64 readOffset = req.read_offset();
const ui64 commitOffset = req.commit_offset();
- ctx.Send(ctx.SelfID, new TEvPQProxy::TEvStartRead(getAssignId(req), readOffset, commitOffset, req.has_read_offset()));
+ ctx.Send(ctx.SelfID, new TEvPQProxy::TEvStartRead(
+ getAssignId(req), readOffset, req.has_commit_offset() ? commitOffset : TMaybe<ui64>{},
+ req.has_read_offset()
+ ));
return (void)ReadFromStreamOrDie(ctx);
}
@@ -446,7 +450,9 @@ void TReadSessionActor<UseMigrationProtocol>::Handle(TEvPQProxy::TEvStartRead::T
// proxy request to partition - allow initing
// TODO: add here VerifyReadOffset too and check it againts Committed position
- ctx.Send(it->second.Actor, new TEvPQProxy::TEvLockPartition(ev->Get()->ReadOffset, ev->Get()->CommitOffset, ev->Get()->VerifyReadOffset, true));
+ ctx.Send(it->second.Actor, new TEvPQProxy::TEvLockPartition(
+ ev->Get()->ReadOffset, ev->Get()->CommitOffset, ev->Get()->VerifyReadOffset, true
+ ));
}
template <bool UseMigrationProtocol>
@@ -1044,7 +1050,7 @@ void TReadSessionActor<UseMigrationProtocol>::Handle(TEvPersQueue::TEvLockPartit
LOG_INFO_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " assign"
<< ": record# " << record);
- ctx.Send(actorId, new TEvPQProxy::TEvLockPartition(0, 0, false, false));
+ ctx.Send(actorId, new TEvPQProxy::TEvLockPartition(0, {}, false, false));
}
template <bool UseMigrationProtocol>