diff options
author | Dmitry Kardymon <kardymon-d@ydb.tech> | 2024-11-20 20:37:01 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-11-20 17:37:01 +0000 |
commit | 753ecb8d410a4cb459c26f3a0082fb2d1724fe63 (patch) | |
tree | 05195665cec1f2638b3dd6daec8ceb99b564488f | |
parent | 872bb4d25b2454e82c5bbc045690c97cb64846ab (diff) | |
download | ydb-753ecb8d410a4cb459c26f3a0082fb2d1724fe63.tar.gz |
YQ-3889 Add MaxHandledEventsSize (#11791)
-rw-r--r-- | ydb/core/fq/libs/row_dispatcher/topic_session.cpp | 17 |
1 files changed, 13 insertions, 4 deletions
diff --git a/ydb/core/fq/libs/row_dispatcher/topic_session.cpp b/ydb/core/fq/libs/row_dispatcher/topic_session.cpp index c793ba7f7e..7f1070d21b 100644 --- a/ydb/core/fq/libs/row_dispatcher/topic_session.cpp +++ b/ydb/core/fq/libs/row_dispatcher/topic_session.cpp @@ -96,7 +96,8 @@ struct TEvPrivate { ui64 SendStatisticPeriodSec = 2; ui64 MaxBatchSizeBytes = 10000000; -ui64 MaxHandledEvents = 1000; +ui64 MaxHandledEventsCount = 1000; +ui64 MaxHandledEventsSize = 1000000; TVector<TString> GetVector(const google::protobuf::RepeatedPtrField<TString>& value) { return {value.begin(), value.end()}; @@ -157,7 +158,8 @@ private: void operator()(NYdb::NTopic::TReadSessionEvent::TPartitionSessionStatusEvent&) { } TTopicSession& Self; - const TString& LogPrefix; + const TString& LogPrefix; + ui64& dataReceivedEventSize; }; struct TParserSchema { @@ -531,7 +533,9 @@ void TTopicSession::Handle(TEvRowDispatcher::TEvGetNextBatch::TPtr& ev) { } void TTopicSession::HandleNewEvents() { - for (ui64 i = 0; i < MaxHandledEvents; ++i) { + ui64 handledEventsSize = 0; + + for (ui64 i = 0; i < MaxHandledEventsCount; ++i) { if (!ReadSession) { return; } @@ -543,7 +547,11 @@ void TTopicSession::HandleNewEvents() { if (!event) { break; } - std::visit(TTopicEventProcessor{*this, LogPrefix}, *event); + + std::visit(TTopicEventProcessor{*this, LogPrefix, handledEventsSize}, *event); + if (handledEventsSize >= MaxHandledEventsSize) { + break; + } } } @@ -568,6 +576,7 @@ void TTopicSession::TTopicEventProcessor::operator()(NYdb::NTopic::TReadSessionE Self.ClientsStats.Add(dataSize, event.GetMessages().size()); Self.Metrics.SessionDataRate->Add(dataSize); Self.Metrics.AllSessionsDataRate->Add(dataSize); + dataReceivedEventSize += dataSize; Self.SendToParsing(event.GetMessages()); } |