aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorDmitry Kardymon <kardymon-d@ydb.tech>2024-12-10 15:09:48 +0300
committerGitHub <noreply@github.com>2024-12-10 15:09:48 +0300
commit1172da4fac201a3fa4fdf6aff6b729daf509db0d (patch)
tree55b133b5c6c14cee4b4a09e0c8288679347d40f1
parentd8dfdbf4d73848a69f6448c54c40772d32768ce0 (diff)
downloadydb-1172da4fac201a3fa4fdf6aff6b729daf509db0d.tar.gz
YQ-3893 Fix next message offset in topic session (#12284)
-rw-r--r--ydb/core/fq/libs/row_dispatcher/topic_session.cpp18
-rw-r--r--ydb/core/fq/libs/row_dispatcher/ut/topic_session_ut.cpp15
2 files changed, 23 insertions, 10 deletions
diff --git a/ydb/core/fq/libs/row_dispatcher/topic_session.cpp b/ydb/core/fq/libs/row_dispatcher/topic_session.cpp
index 5c755e71a75..4fec6e7af0d 100644
--- a/ydb/core/fq/libs/row_dispatcher/topic_session.cpp
+++ b/ydb/core/fq/libs/row_dispatcher/topic_session.cpp
@@ -148,8 +148,8 @@ private:
TQueue<std::pair<ui64, TString>> Buffer;
ui64 UnreadBytes = 0;
bool DataArrivedSent = false;
- TMaybe<ui64> NextMessageOffset;
- ui64 LastSendedNextMessageOffset = 0;
+ TMaybe<ui64> NextMessageOffset; // offset to restart topic session
+ TMaybe<ui64> ProcessedNextMessageOffset; // offset of fully processed data (to save to checkpoint)
TVector<ui64> FieldsIds;
TDuration ReconnectPeriod;
TStats Stat; // Send (filtered) to read_actor
@@ -509,16 +509,15 @@ void TTopicSession::Handle(NFq::TEvPrivate::TEvSendStatisticToReadActor::TPtr&)
auto readBytes = ClientsStats.Bytes;
for (auto& [actorId, info] : Clients) {
- if (!info.NextMessageOffset) {
+ if (!info.ProcessedNextMessageOffset) {
continue;
}
auto event = std::make_unique<TEvRowDispatcher::TEvStatistics>();
event->Record.SetPartitionId(PartitionId);
- event->Record.SetNextMessageOffset(*info.NextMessageOffset);
+ event->Record.SetNextMessageOffset(*info.ProcessedNextMessageOffset);
event->Record.SetReadBytes(readBytes);
- info.LastSendedNextMessageOffset = *info.NextMessageOffset;
event->ReadActorId = info.ReadActorId;
- LOG_ROW_DISPATCHER_TRACE("Send status to " << info.ReadActorId << ", offset " << *info.NextMessageOffset);
+ LOG_ROW_DISPATCHER_TRACE("Send status to " << info.ReadActorId << ", offset " << info.ProcessedNextMessageOffset);
Send(RowDispatcherActorId, event.release());
}
ClientsStats.Clear();
@@ -541,6 +540,11 @@ void TTopicSession::Handle(NFq::TEvPrivate::TEvDataFiltered::TPtr& ev) {
if (!info.NextMessageOffset || *info.NextMessageOffset < ev->Get()->Offset + 1) {
info.NextMessageOffset = ev->Get()->Offset + 1;
}
+ if (info.Buffer.empty()) {
+ if (!info.ProcessedNextMessageOffset || *info.ProcessedNextMessageOffset < ev->Get()->Offset + 1) {
+ info.ProcessedNextMessageOffset = ev->Get()->Offset + 1;
+ }
+ }
}
}
@@ -776,7 +780,7 @@ void TTopicSession::SendData(TClientsInfo& info) {
} while(!info.Buffer.empty());
info.Stat.Add(dataSize, eventsSize);
info.FilteredDataRate->Add(dataSize);
- info.LastSendedNextMessageOffset = *info.NextMessageOffset;
+ info.ProcessedNextMessageOffset = *info.NextMessageOffset;
}
void TTopicSession::UpdateFieldsIds(TClientsInfo& info) {
diff --git a/ydb/core/fq/libs/row_dispatcher/ut/topic_session_ut.cpp b/ydb/core/fq/libs/row_dispatcher/ut/topic_session_ut.cpp
index 6a3ae942e81..84c8c425b51 100644
--- a/ydb/core/fq/libs/row_dispatcher/ut/topic_session_ut.cpp
+++ b/ydb/core/fq/libs/row_dispatcher/ut/topic_session_ut.cpp
@@ -206,13 +206,14 @@ public:
return eventHolder->Get()->Record.MessagesSize();
}
- void ExpectStatisticToReadActor(TSet<NActors::TActorId> readActorIds) {
+ void ExpectStatisticToReadActor(TSet<NActors::TActorId> readActorIds, ui64 expectedNextMessageOffset) {
size_t count = readActorIds.size();
for (size_t i = 0; i < count; ++i) {
auto eventHolder = Runtime.GrabEdgeEvent<TEvRowDispatcher::TEvStatistics>(RowDispatcherActorId, TDuration::Seconds(GrabTimeoutSec));
UNIT_ASSERT(eventHolder.Get() != nullptr);
UNIT_ASSERT(readActorIds.contains(eventHolder->Get()->ReadActorId));
readActorIds.erase(eventHolder->Get()->ReadActorId);
+ UNIT_ASSERT_VALUES_EQUAL(eventHolder->Get()->Record.GetNextMessageOffset(), expectedNextMessageOffset);
}
}
@@ -244,12 +245,20 @@ Y_UNIT_TEST_SUITE(TopicSessionTests) {
StartSession(ReadActorId1, source);
StartSession(ReadActorId2, source);
- const std::vector<TString> data = { Json1 };
+ std::vector<TString> data = { Json1 };
PQWrite(data, topicName);
ExpectNewDataArrived({ReadActorId1, ReadActorId2});
ExpectMessageBatch(ReadActorId1, { Json1 });
ExpectMessageBatch(ReadActorId2, { Json1 });
- ExpectStatisticToReadActor({ReadActorId1, ReadActorId2});
+ ExpectStatisticToReadActor({ReadActorId1, ReadActorId2}, 1);
+
+ data = { Json2 };
+ PQWrite(data, topicName);
+ ExpectNewDataArrived({ReadActorId1, ReadActorId2});
+ ExpectStatisticToReadActor({ReadActorId1, ReadActorId2}, 1);
+ ExpectMessageBatch(ReadActorId1, data);
+ ExpectMessageBatch(ReadActorId2, data);
+ ExpectStatisticToReadActor({ReadActorId1, ReadActorId2}, 2);
auto source2 = BuildSource(topicName, false, "OtherConsumer");
StartSession(ReadActorId3, source2, Nothing(), true);