diff options
author | Dmitry Kardymon <kardymon-d@ydb.tech> | 2024-12-10 15:09:48 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-12-10 15:09:48 +0300 |
commit | 1172da4fac201a3fa4fdf6aff6b729daf509db0d (patch) | |
tree | 55b133b5c6c14cee4b4a09e0c8288679347d40f1 | |
parent | d8dfdbf4d73848a69f6448c54c40772d32768ce0 (diff) | |
download | ydb-1172da4fac201a3fa4fdf6aff6b729daf509db0d.tar.gz |
YQ-3893 Fix next message offset in topic session (#12284)
-rw-r--r-- | ydb/core/fq/libs/row_dispatcher/topic_session.cpp | 18 | ||||
-rw-r--r-- | ydb/core/fq/libs/row_dispatcher/ut/topic_session_ut.cpp | 15 |
2 files changed, 23 insertions, 10 deletions
diff --git a/ydb/core/fq/libs/row_dispatcher/topic_session.cpp b/ydb/core/fq/libs/row_dispatcher/topic_session.cpp index 5c755e71a75..4fec6e7af0d 100644 --- a/ydb/core/fq/libs/row_dispatcher/topic_session.cpp +++ b/ydb/core/fq/libs/row_dispatcher/topic_session.cpp @@ -148,8 +148,8 @@ private: TQueue<std::pair<ui64, TString>> Buffer; ui64 UnreadBytes = 0; bool DataArrivedSent = false; - TMaybe<ui64> NextMessageOffset; - ui64 LastSendedNextMessageOffset = 0; + TMaybe<ui64> NextMessageOffset; // offset to restart topic session + TMaybe<ui64> ProcessedNextMessageOffset; // offset of fully processed data (to save to checkpoint) TVector<ui64> FieldsIds; TDuration ReconnectPeriod; TStats Stat; // Send (filtered) to read_actor @@ -509,16 +509,15 @@ void TTopicSession::Handle(NFq::TEvPrivate::TEvSendStatisticToReadActor::TPtr&) auto readBytes = ClientsStats.Bytes; for (auto& [actorId, info] : Clients) { - if (!info.NextMessageOffset) { + if (!info.ProcessedNextMessageOffset) { continue; } auto event = std::make_unique<TEvRowDispatcher::TEvStatistics>(); event->Record.SetPartitionId(PartitionId); - event->Record.SetNextMessageOffset(*info.NextMessageOffset); + event->Record.SetNextMessageOffset(*info.ProcessedNextMessageOffset); event->Record.SetReadBytes(readBytes); - info.LastSendedNextMessageOffset = *info.NextMessageOffset; event->ReadActorId = info.ReadActorId; - LOG_ROW_DISPATCHER_TRACE("Send status to " << info.ReadActorId << ", offset " << *info.NextMessageOffset); + LOG_ROW_DISPATCHER_TRACE("Send status to " << info.ReadActorId << ", offset " << info.ProcessedNextMessageOffset); Send(RowDispatcherActorId, event.release()); } ClientsStats.Clear(); @@ -541,6 +540,11 @@ void TTopicSession::Handle(NFq::TEvPrivate::TEvDataFiltered::TPtr& ev) { if (!info.NextMessageOffset || *info.NextMessageOffset < ev->Get()->Offset + 1) { info.NextMessageOffset = ev->Get()->Offset + 1; } + if (info.Buffer.empty()) { + if (!info.ProcessedNextMessageOffset || *info.ProcessedNextMessageOffset < ev->Get()->Offset + 1) { + info.ProcessedNextMessageOffset = ev->Get()->Offset + 1; + } + } } } @@ -776,7 +780,7 @@ void TTopicSession::SendData(TClientsInfo& info) { } while(!info.Buffer.empty()); info.Stat.Add(dataSize, eventsSize); info.FilteredDataRate->Add(dataSize); - info.LastSendedNextMessageOffset = *info.NextMessageOffset; + info.ProcessedNextMessageOffset = *info.NextMessageOffset; } void TTopicSession::UpdateFieldsIds(TClientsInfo& info) { diff --git a/ydb/core/fq/libs/row_dispatcher/ut/topic_session_ut.cpp b/ydb/core/fq/libs/row_dispatcher/ut/topic_session_ut.cpp index 6a3ae942e81..84c8c425b51 100644 --- a/ydb/core/fq/libs/row_dispatcher/ut/topic_session_ut.cpp +++ b/ydb/core/fq/libs/row_dispatcher/ut/topic_session_ut.cpp @@ -206,13 +206,14 @@ public: return eventHolder->Get()->Record.MessagesSize(); } - void ExpectStatisticToReadActor(TSet<NActors::TActorId> readActorIds) { + void ExpectStatisticToReadActor(TSet<NActors::TActorId> readActorIds, ui64 expectedNextMessageOffset) { size_t count = readActorIds.size(); for (size_t i = 0; i < count; ++i) { auto eventHolder = Runtime.GrabEdgeEvent<TEvRowDispatcher::TEvStatistics>(RowDispatcherActorId, TDuration::Seconds(GrabTimeoutSec)); UNIT_ASSERT(eventHolder.Get() != nullptr); UNIT_ASSERT(readActorIds.contains(eventHolder->Get()->ReadActorId)); readActorIds.erase(eventHolder->Get()->ReadActorId); + UNIT_ASSERT_VALUES_EQUAL(eventHolder->Get()->Record.GetNextMessageOffset(), expectedNextMessageOffset); } } @@ -244,12 +245,20 @@ Y_UNIT_TEST_SUITE(TopicSessionTests) { StartSession(ReadActorId1, source); StartSession(ReadActorId2, source); - const std::vector<TString> data = { Json1 }; + std::vector<TString> data = { Json1 }; PQWrite(data, topicName); ExpectNewDataArrived({ReadActorId1, ReadActorId2}); ExpectMessageBatch(ReadActorId1, { Json1 }); ExpectMessageBatch(ReadActorId2, { Json1 }); - ExpectStatisticToReadActor({ReadActorId1, ReadActorId2}); + ExpectStatisticToReadActor({ReadActorId1, ReadActorId2}, 1); + + data = { Json2 }; + PQWrite(data, topicName); + ExpectNewDataArrived({ReadActorId1, ReadActorId2}); + ExpectStatisticToReadActor({ReadActorId1, ReadActorId2}, 1); + ExpectMessageBatch(ReadActorId1, data); + ExpectMessageBatch(ReadActorId2, data); + ExpectStatisticToReadActor({ReadActorId1, ReadActorId2}, 2); auto source2 = BuildSource(topicName, false, "OtherConsumer"); StartSession(ReadActorId3, source2, Nothing(), true); |