aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorDmitry Kardymon <kardymon-d@ydb.tech>2024-11-20 20:37:01 +0300
committerGitHub <noreply@github.com>2024-11-20 17:37:01 +0000
commit753ecb8d410a4cb459c26f3a0082fb2d1724fe63 (patch)
tree05195665cec1f2638b3dd6daec8ceb99b564488f
parent872bb4d25b2454e82c5bbc045690c97cb64846ab (diff)
downloadydb-753ecb8d410a4cb459c26f3a0082fb2d1724fe63.tar.gz
YQ-3889 Add MaxHandledEventsSize (#11791)
-rw-r--r--ydb/core/fq/libs/row_dispatcher/topic_session.cpp17
1 files changed, 13 insertions, 4 deletions
diff --git a/ydb/core/fq/libs/row_dispatcher/topic_session.cpp b/ydb/core/fq/libs/row_dispatcher/topic_session.cpp
index c793ba7f7e..7f1070d21b 100644
--- a/ydb/core/fq/libs/row_dispatcher/topic_session.cpp
+++ b/ydb/core/fq/libs/row_dispatcher/topic_session.cpp
@@ -96,7 +96,8 @@ struct TEvPrivate {
ui64 SendStatisticPeriodSec = 2;
ui64 MaxBatchSizeBytes = 10000000;
-ui64 MaxHandledEvents = 1000;
+ui64 MaxHandledEventsCount = 1000;
+ui64 MaxHandledEventsSize = 1000000;
TVector<TString> GetVector(const google::protobuf::RepeatedPtrField<TString>& value) {
return {value.begin(), value.end()};
@@ -157,7 +158,8 @@ private:
void operator()(NYdb::NTopic::TReadSessionEvent::TPartitionSessionStatusEvent&) { }
TTopicSession& Self;
- const TString& LogPrefix;
+ const TString& LogPrefix;
+ ui64& dataReceivedEventSize;
};
struct TParserSchema {
@@ -531,7 +533,9 @@ void TTopicSession::Handle(TEvRowDispatcher::TEvGetNextBatch::TPtr& ev) {
}
void TTopicSession::HandleNewEvents() {
- for (ui64 i = 0; i < MaxHandledEvents; ++i) {
+ ui64 handledEventsSize = 0;
+
+ for (ui64 i = 0; i < MaxHandledEventsCount; ++i) {
if (!ReadSession) {
return;
}
@@ -543,7 +547,11 @@ void TTopicSession::HandleNewEvents() {
if (!event) {
break;
}
- std::visit(TTopicEventProcessor{*this, LogPrefix}, *event);
+
+ std::visit(TTopicEventProcessor{*this, LogPrefix, handledEventsSize}, *event);
+ if (handledEventsSize >= MaxHandledEventsSize) {
+ break;
+ }
}
}
@@ -568,6 +576,7 @@ void TTopicSession::TTopicEventProcessor::operator()(NYdb::NTopic::TReadSessionE
Self.ClientsStats.Add(dataSize, event.GetMessages().size());
Self.Metrics.SessionDataRate->Add(dataSize);
Self.Metrics.AllSessionsDataRate->Add(dataSize);
+ dataReceivedEventSize += dataSize;
Self.SendToParsing(event.GetMessages());
}