diff options
author | Dmitry Kardymon <kardymon-d@ydb.tech> | 2025-02-03 12:14:27 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2025-02-03 12:14:27 +0300 |
commit | 9657f160f6281b7c0b96ee9d37e8048a83c73ea3 (patch) | |
tree | c8be66a25b7731eca7d7a0cadfc3c94bdc359c8e | |
parent | aaddd9ba0de5e56698acb7a0242f109ecb2e00e0 (diff) | |
download | ydb-9657f160f6281b7c0b96ee9d37e8048a83c73ea3.tar.gz |
YQ-3924 Add CA notify to update SourceCpu (#14113)
-rw-r--r-- | ydb/library/yql/providers/pq/async_io/dq_pq_rd_read_actor.cpp | 11 |
1 files changed, 11 insertions, 0 deletions
diff --git a/ydb/library/yql/providers/pq/async_io/dq_pq_rd_read_actor.cpp b/ydb/library/yql/providers/pq/async_io/dq_pq_rd_read_actor.cpp index 37d74e4bb2c..3e2b2a507c9 100644 --- a/ydb/library/yql/providers/pq/async_io/dq_pq_rd_read_actor.cpp +++ b/ydb/library/yql/providers/pq/async_io/dq_pq_rd_read_actor.cpp @@ -103,11 +103,13 @@ struct TEvPrivate { EvBegin = EventSpaceBegin(NActors::TEvents::ES_PRIVATE), EvPrintState = EvBegin + 20, EvProcessState = EvBegin + 21, + EvNotifyCA = EvBegin + 22, EvEnd }; static_assert(EvEnd < EventSpaceEnd(NActors::TEvents::ES_PRIVATE), "expect EvEnd < EventSpaceEnd(NActors::TEvents::ES_PRIVATE)"); struct TEvPrintState : public NActors::TEventLocal<TEvPrintState, EvPrintState> {}; struct TEvProcessState : public NActors::TEventLocal<TEvProcessState, EvProcessState> {}; + struct TEvNotifyCA : public NActors::TEventLocal<TEvNotifyCA, EvNotifyCA> {}; }; class TDqPqRdReadActor : public NActors::TActor<TDqPqRdReadActor>, public NYql::NDq::NInternal::TDqPqReadActorBase { @@ -115,6 +117,7 @@ class TDqPqRdReadActor : public NActors::TActor<TDqPqRdReadActor>, public NYql:: const ui64 PrintStatePeriodSec = 300; const ui64 ProcessStatePeriodSec = 1; const ui64 PrintStateToLogSplitSize = 64000; + const ui64 NotifyCAPeriodSec = 10; struct TReadyBatch { public: @@ -263,6 +266,7 @@ public: void Handle(const NFq::TEvRowDispatcher::TEvHeartbeat::TPtr&); void Handle(TEvPrivate::TEvPrintState::TPtr&); void Handle(TEvPrivate::TEvProcessState::TPtr&); + void Handle(TEvPrivate::TEvNotifyCA::TPtr&); STRICT_STFUNC(StateFunc, { hFunc(NFq::TEvRowDispatcher::TEvCoordinatorChanged, Handle); @@ -283,6 +287,7 @@ public: hFunc(NFq::TEvRowDispatcher::TEvHeartbeat, Handle); hFunc(TEvPrivate::TEvPrintState, Handle); hFunc(TEvPrivate::TEvProcessState, Handle); + hFunc(TEvPrivate::TEvNotifyCA, Handle); }) static constexpr char ActorName[] = "DQ_PQ_READ_ACTOR"; @@ -379,6 +384,7 @@ void TDqPqRdReadActor::Init() { Send(LocalRowDispatcherActorId, new NFq::TEvRowDispatcher::TEvCoordinatorChangesSubscribe()); Schedule(TDuration::Seconds(PrintStatePeriodSec), new TEvPrivate::TEvPrintState()); + Schedule(TDuration::Seconds(NotifyCAPeriodSec), new TEvPrivate::TEvNotifyCA()); Inited = true; } @@ -1004,6 +1010,11 @@ void TDqPqRdReadActor::UpdateSessions() { LastUsedPartitionDistribution = LastReceivedPartitionDistribution; } +void TDqPqRdReadActor::Handle(TEvPrivate::TEvNotifyCA::TPtr&) { + Schedule(TDuration::Seconds(NotifyCAPeriodSec), new TEvPrivate::TEvNotifyCA()); + NotifyCA(); +} + std::pair<IDqComputeActorAsyncInput*, NActors::IActor*> CreateDqPqRdReadActor( const TTypeEnvironment& typeEnv, NPq::NProto::TDqPqTopicSource&& settings, |