diff options
author | Dmitry Kardymon <kardymon-d@ydb.tech> | 2024-11-20 12:17:32 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-11-20 12:17:32 +0300 |
commit | 912022f5004fc384fd1996cae3f848bd1d44a28d (patch) | |
tree | f975b5b6582ab0561d01167af9052b4877d2e197 | |
parent | 5876bb106ea873c752ca2814604f5aa3da860222 (diff) | |
download | ydb-912022f5004fc384fd1996cae3f848bd1d44a28d.tar.gz |
YQ-3838 Skip filtering if already processed (#11734)
-rw-r--r-- | ydb/core/fq/libs/row_dispatcher/topic_session.cpp | 6 | ||||
-rw-r--r-- | ydb/core/fq/libs/row_dispatcher/ut/topic_session_ut.cpp | 8 |
2 files changed, 9 insertions, 5 deletions
diff --git a/ydb/core/fq/libs/row_dispatcher/topic_session.cpp b/ydb/core/fq/libs/row_dispatcher/topic_session.cpp index 56ccbc8d8a..c793ba7f7e 100644 --- a/ydb/core/fq/libs/row_dispatcher/topic_session.cpp +++ b/ydb/core/fq/libs/row_dispatcher/topic_session.cpp @@ -672,9 +672,13 @@ void TTopicSession::DoFiltering(ui64 rowsOffset, ui64 numberRows, const TVector< Y_ENSURE(rowsOffset < offsets.size(), "Invalid first row ofset"); Y_ENSURE(numberRows, "Expected non empty parsed batch"); Y_ENSURE(parsedValues, "Expected non empty schema"); - LOG_ROW_DISPATCHER_TRACE("SendToFiltering, first offset: " << offsets[rowsOffset] << ", last offset: " << offsets[rowsOffset + numberRows - 1]); + auto lastOffset = offsets[rowsOffset + numberRows - 1]; + LOG_ROW_DISPATCHER_TRACE("SendToFiltering, first offset: " << offsets[rowsOffset] << ", last offset: " << lastOffset); for (auto& [actorId, info] : Clients) { + if (info.NextMessageOffset && lastOffset < info.NextMessageOffset) { // the batch has already been processed + continue; + } try { if (info.Filter) { info.Filter->Push(offsets, RebuildJson(info, parsedValues), rowsOffset, numberRows); diff --git a/ydb/core/fq/libs/row_dispatcher/ut/topic_session_ut.cpp b/ydb/core/fq/libs/row_dispatcher/ut/topic_session_ut.cpp index d7efafa851..39bb46b6c2 100644 --- a/ydb/core/fq/libs/row_dispatcher/ut/topic_session_ut.cpp +++ b/ydb/core/fq/libs/row_dispatcher/ut/topic_session_ut.cpp @@ -332,7 +332,7 @@ Y_UNIT_TEST_SUITE(TopicSessionTests) { auto source = BuildSource(topicName); StartSession(ReadActorId1, source); - const std::vector<TString> data = { Json1, Json2 }; // offset 0, 1 + const std::vector<TString> data = { Json1, Json2, Json3 }; // offset 0, 1, 2 PQWrite(data, topicName); ExpectNewDataArrived({ReadActorId1}); ExpectMessageBatch(ReadActorId1, data); @@ -341,11 +341,11 @@ Y_UNIT_TEST_SUITE(TopicSessionTests) { StartSession(ReadActorId2, source, 1); ExpectNewDataArrived({ReadActorId2}); - PQWrite({ Json3 }, topicName); + PQWrite({ Json4 }, topicName); ExpectNewDataArrived({ReadActorId1}); - ExpectMessageBatch(ReadActorId1, { Json3 }); - ExpectMessageBatch(ReadActorId2, { Json2, Json3 }); + ExpectMessageBatch(ReadActorId1, { Json4 }); + ExpectMessageBatch(ReadActorId2, { Json2, Json3, Json4 }); StopSession(ReadActorId1, source); StopSession(ReadActorId2, source); |