diff options
author | Dmitry Kardymon <kardymon-d@ydb.tech> | 2025-02-03 08:33:02 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2025-02-03 08:33:02 +0300 |
commit | 9d9a33e9739e8254c38751b9855ca342829e0740 (patch) | |
tree | 868d2c6b81d8fd4619c68f6c3ae0f1a19e7d8e5a | |
parent | e122885d71d6c413432e4e7d74312860a440358d (diff) | |
download | ydb-9d9a33e9739e8254c38751b9855ca342829e0740.tar.gz |
YQ: Added force refresh by reconnects in topic_session (#14058)
3 files changed, 16 insertions, 0 deletions
diff --git a/ydb/core/fq/libs/row_dispatcher/format_handler/format_handler.cpp b/ydb/core/fq/libs/row_dispatcher/format_handler/format_handler.cpp index 85cf11c99ee..270589f0f83 100644 --- a/ydb/core/fq/libs/row_dispatcher/format_handler/format_handler.cpp +++ b/ydb/core/fq/libs/row_dispatcher/format_handler/format_handler.cpp @@ -457,6 +457,12 @@ public: return statistics; } + void ForceRefresh() override { + if (Parser) { + Parser->Refresh(true); + } + } + protected: NActors::TActorId GetSelfId() const override { return SelfId(); diff --git a/ydb/core/fq/libs/row_dispatcher/format_handler/format_handler.h b/ydb/core/fq/libs/row_dispatcher/format_handler/format_handler.h index 6e2fbb82d1a..8e0efb13dc9 100644 --- a/ydb/core/fq/libs/row_dispatcher/format_handler/format_handler.h +++ b/ydb/core/fq/libs/row_dispatcher/format_handler/format_handler.h @@ -55,6 +55,7 @@ public: virtual bool HasClients() const = 0; virtual TFormatHandlerStatistic GetStatistics() = 0; + virtual void ForceRefresh() = 0; protected: virtual NActors::TActorId GetSelfId() const = 0; diff --git a/ydb/core/fq/libs/row_dispatcher/topic_session.cpp b/ydb/core/fq/libs/row_dispatcher/topic_session.cpp index 684f54ed5d1..81b45ee550b 100644 --- a/ydb/core/fq/libs/row_dispatcher/topic_session.cpp +++ b/ydb/core/fq/libs/row_dispatcher/topic_session.cpp @@ -311,6 +311,7 @@ private: TMaybe<ui64> GetOffset(const NFq::NRowDispatcherProto::TEvStartSession& settings); void SendSessionError(TActorId readActorId, TStatus status); void RestartSessionIfOldestClient(const TClientsInfo& info); + void RefreshParsers(); private: @@ -501,6 +502,7 @@ void TTopicSession::Handle(NFq::TEvPrivate::TEvReconnectSession::TPtr&) { LOG_ROW_DISPATCHER_DEBUG("Reconnect topic session, " << TopicPathPartition << ", StartingMessageTimestamp " << minTime << ", BufferSize " << BufferSize << ", WithoutConsumer " << Config.GetWithoutConsumer()); + RefreshParsers(); StopReadSession(); CreateTopicSession(); Schedule(ReconnectPeriod, new NFq::TEvPrivate::TEvReconnectSession()); @@ -777,6 +779,7 @@ void TTopicSession::RestartSessionIfOldestClient(const TClientsInfo& info) { Metrics.RestartSessionByOffsets->Inc(); ++RestartSessionByOffsets; info.RestartSessionByOffsetsByQuery->Inc(); + RefreshParsers(); StopReadSession(); if (!ReadSession) { @@ -910,6 +913,12 @@ TMaybe<ui64> TTopicSession::GetOffset(const NFq::NRowDispatcherProto::TEvStartSe return Nothing(); } +void TTopicSession::RefreshParsers() { + for (const auto& [_, formatHandler] : FormatHandlers) { + formatHandler->ForceRefresh(); + } +} + } // anonymous namespace //////////////////////////////////////////////////////////////////////////////// |