aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorDmitry Kardymon <kardymon-d@ydb.tech>2025-02-03 08:33:02 +0300
committerGitHub <noreply@github.com>2025-02-03 08:33:02 +0300
commit9d9a33e9739e8254c38751b9855ca342829e0740 (patch)
tree868d2c6b81d8fd4619c68f6c3ae0f1a19e7d8e5a
parente122885d71d6c413432e4e7d74312860a440358d (diff)
downloadydb-9d9a33e9739e8254c38751b9855ca342829e0740.tar.gz
YQ: Added force refresh by reconnects in topic_session (#14058)
-rw-r--r--ydb/core/fq/libs/row_dispatcher/format_handler/format_handler.cpp6
-rw-r--r--ydb/core/fq/libs/row_dispatcher/format_handler/format_handler.h1
-rw-r--r--ydb/core/fq/libs/row_dispatcher/topic_session.cpp9
3 files changed, 16 insertions, 0 deletions
diff --git a/ydb/core/fq/libs/row_dispatcher/format_handler/format_handler.cpp b/ydb/core/fq/libs/row_dispatcher/format_handler/format_handler.cpp
index 85cf11c99ee..270589f0f83 100644
--- a/ydb/core/fq/libs/row_dispatcher/format_handler/format_handler.cpp
+++ b/ydb/core/fq/libs/row_dispatcher/format_handler/format_handler.cpp
@@ -457,6 +457,12 @@ public:
return statistics;
}
+ void ForceRefresh() override {
+ if (Parser) {
+ Parser->Refresh(true);
+ }
+ }
+
protected:
NActors::TActorId GetSelfId() const override {
return SelfId();
diff --git a/ydb/core/fq/libs/row_dispatcher/format_handler/format_handler.h b/ydb/core/fq/libs/row_dispatcher/format_handler/format_handler.h
index 6e2fbb82d1a..8e0efb13dc9 100644
--- a/ydb/core/fq/libs/row_dispatcher/format_handler/format_handler.h
+++ b/ydb/core/fq/libs/row_dispatcher/format_handler/format_handler.h
@@ -55,6 +55,7 @@ public:
virtual bool HasClients() const = 0;
virtual TFormatHandlerStatistic GetStatistics() = 0;
+ virtual void ForceRefresh() = 0;
protected:
virtual NActors::TActorId GetSelfId() const = 0;
diff --git a/ydb/core/fq/libs/row_dispatcher/topic_session.cpp b/ydb/core/fq/libs/row_dispatcher/topic_session.cpp
index 684f54ed5d1..81b45ee550b 100644
--- a/ydb/core/fq/libs/row_dispatcher/topic_session.cpp
+++ b/ydb/core/fq/libs/row_dispatcher/topic_session.cpp
@@ -311,6 +311,7 @@ private:
TMaybe<ui64> GetOffset(const NFq::NRowDispatcherProto::TEvStartSession& settings);
void SendSessionError(TActorId readActorId, TStatus status);
void RestartSessionIfOldestClient(const TClientsInfo& info);
+ void RefreshParsers();
private:
@@ -501,6 +502,7 @@ void TTopicSession::Handle(NFq::TEvPrivate::TEvReconnectSession::TPtr&) {
LOG_ROW_DISPATCHER_DEBUG("Reconnect topic session, " << TopicPathPartition
<< ", StartingMessageTimestamp " << minTime
<< ", BufferSize " << BufferSize << ", WithoutConsumer " << Config.GetWithoutConsumer());
+ RefreshParsers();
StopReadSession();
CreateTopicSession();
Schedule(ReconnectPeriod, new NFq::TEvPrivate::TEvReconnectSession());
@@ -777,6 +779,7 @@ void TTopicSession::RestartSessionIfOldestClient(const TClientsInfo& info) {
Metrics.RestartSessionByOffsets->Inc();
++RestartSessionByOffsets;
info.RestartSessionByOffsetsByQuery->Inc();
+ RefreshParsers();
StopReadSession();
if (!ReadSession) {
@@ -910,6 +913,12 @@ TMaybe<ui64> TTopicSession::GetOffset(const NFq::NRowDispatcherProto::TEvStartSe
return Nothing();
}
+void TTopicSession::RefreshParsers() {
+ for (const auto& [_, formatHandler] : FormatHandlers) {
+ formatHandler->ForceRefresh();
+ }
+}
+
} // anonymous namespace
////////////////////////////////////////////////////////////////////////////////