aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorDmitry Kardymon <kardymon-d@ydb.tech>2024-11-20 12:17:32 +0300
committerGitHub <noreply@github.com>2024-11-20 12:17:32 +0300
commit912022f5004fc384fd1996cae3f848bd1d44a28d (patch)
treef975b5b6582ab0561d01167af9052b4877d2e197
parent5876bb106ea873c752ca2814604f5aa3da860222 (diff)
downloadydb-912022f5004fc384fd1996cae3f848bd1d44a28d.tar.gz
YQ-3838 Skip filtering if already processed (#11734)
-rw-r--r--ydb/core/fq/libs/row_dispatcher/topic_session.cpp6
-rw-r--r--ydb/core/fq/libs/row_dispatcher/ut/topic_session_ut.cpp8
2 files changed, 9 insertions, 5 deletions
diff --git a/ydb/core/fq/libs/row_dispatcher/topic_session.cpp b/ydb/core/fq/libs/row_dispatcher/topic_session.cpp
index 56ccbc8d8a..c793ba7f7e 100644
--- a/ydb/core/fq/libs/row_dispatcher/topic_session.cpp
+++ b/ydb/core/fq/libs/row_dispatcher/topic_session.cpp
@@ -672,9 +672,13 @@ void TTopicSession::DoFiltering(ui64 rowsOffset, ui64 numberRows, const TVector<
Y_ENSURE(rowsOffset < offsets.size(), "Invalid first row ofset");
Y_ENSURE(numberRows, "Expected non empty parsed batch");
Y_ENSURE(parsedValues, "Expected non empty schema");
- LOG_ROW_DISPATCHER_TRACE("SendToFiltering, first offset: " << offsets[rowsOffset] << ", last offset: " << offsets[rowsOffset + numberRows - 1]);
+ auto lastOffset = offsets[rowsOffset + numberRows - 1];
+ LOG_ROW_DISPATCHER_TRACE("SendToFiltering, first offset: " << offsets[rowsOffset] << ", last offset: " << lastOffset);
for (auto& [actorId, info] : Clients) {
+ if (info.NextMessageOffset && lastOffset < info.NextMessageOffset) { // the batch has already been processed
+ continue;
+ }
try {
if (info.Filter) {
info.Filter->Push(offsets, RebuildJson(info, parsedValues), rowsOffset, numberRows);
diff --git a/ydb/core/fq/libs/row_dispatcher/ut/topic_session_ut.cpp b/ydb/core/fq/libs/row_dispatcher/ut/topic_session_ut.cpp
index d7efafa851..39bb46b6c2 100644
--- a/ydb/core/fq/libs/row_dispatcher/ut/topic_session_ut.cpp
+++ b/ydb/core/fq/libs/row_dispatcher/ut/topic_session_ut.cpp
@@ -332,7 +332,7 @@ Y_UNIT_TEST_SUITE(TopicSessionTests) {
auto source = BuildSource(topicName);
StartSession(ReadActorId1, source);
- const std::vector<TString> data = { Json1, Json2 }; // offset 0, 1
+ const std::vector<TString> data = { Json1, Json2, Json3 }; // offset 0, 1, 2
PQWrite(data, topicName);
ExpectNewDataArrived({ReadActorId1});
ExpectMessageBatch(ReadActorId1, data);
@@ -341,11 +341,11 @@ Y_UNIT_TEST_SUITE(TopicSessionTests) {
StartSession(ReadActorId2, source, 1);
ExpectNewDataArrived({ReadActorId2});
- PQWrite({ Json3 }, topicName);
+ PQWrite({ Json4 }, topicName);
ExpectNewDataArrived({ReadActorId1});
- ExpectMessageBatch(ReadActorId1, { Json3 });
- ExpectMessageBatch(ReadActorId2, { Json2, Json3 });
+ ExpectMessageBatch(ReadActorId1, { Json4 });
+ ExpectMessageBatch(ReadActorId2, { Json2, Json3, Json4 });
StopSession(ReadActorId1, source);
StopSession(ReadActorId2, source);