aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorDmitry Kardymon <kardymon-d@ydb.tech>2025-02-03 12:14:27 +0300
committerGitHub <noreply@github.com>2025-02-03 12:14:27 +0300
commit9657f160f6281b7c0b96ee9d37e8048a83c73ea3 (patch)
treec8be66a25b7731eca7d7a0cadfc3c94bdc359c8e
parentaaddd9ba0de5e56698acb7a0242f109ecb2e00e0 (diff)
downloadydb-9657f160f6281b7c0b96ee9d37e8048a83c73ea3.tar.gz
YQ-3924 Add CA notify to update SourceCpu (#14113)
-rw-r--r--ydb/library/yql/providers/pq/async_io/dq_pq_rd_read_actor.cpp11
1 files changed, 11 insertions, 0 deletions
diff --git a/ydb/library/yql/providers/pq/async_io/dq_pq_rd_read_actor.cpp b/ydb/library/yql/providers/pq/async_io/dq_pq_rd_read_actor.cpp
index 37d74e4bb2c..3e2b2a507c9 100644
--- a/ydb/library/yql/providers/pq/async_io/dq_pq_rd_read_actor.cpp
+++ b/ydb/library/yql/providers/pq/async_io/dq_pq_rd_read_actor.cpp
@@ -103,11 +103,13 @@ struct TEvPrivate {
EvBegin = EventSpaceBegin(NActors::TEvents::ES_PRIVATE),
EvPrintState = EvBegin + 20,
EvProcessState = EvBegin + 21,
+ EvNotifyCA = EvBegin + 22,
EvEnd
};
static_assert(EvEnd < EventSpaceEnd(NActors::TEvents::ES_PRIVATE), "expect EvEnd < EventSpaceEnd(NActors::TEvents::ES_PRIVATE)");
struct TEvPrintState : public NActors::TEventLocal<TEvPrintState, EvPrintState> {};
struct TEvProcessState : public NActors::TEventLocal<TEvProcessState, EvProcessState> {};
+ struct TEvNotifyCA : public NActors::TEventLocal<TEvNotifyCA, EvNotifyCA> {};
};
class TDqPqRdReadActor : public NActors::TActor<TDqPqRdReadActor>, public NYql::NDq::NInternal::TDqPqReadActorBase {
@@ -115,6 +117,7 @@ class TDqPqRdReadActor : public NActors::TActor<TDqPqRdReadActor>, public NYql::
const ui64 PrintStatePeriodSec = 300;
const ui64 ProcessStatePeriodSec = 1;
const ui64 PrintStateToLogSplitSize = 64000;
+ const ui64 NotifyCAPeriodSec = 10;
struct TReadyBatch {
public:
@@ -263,6 +266,7 @@ public:
void Handle(const NFq::TEvRowDispatcher::TEvHeartbeat::TPtr&);
void Handle(TEvPrivate::TEvPrintState::TPtr&);
void Handle(TEvPrivate::TEvProcessState::TPtr&);
+ void Handle(TEvPrivate::TEvNotifyCA::TPtr&);
STRICT_STFUNC(StateFunc, {
hFunc(NFq::TEvRowDispatcher::TEvCoordinatorChanged, Handle);
@@ -283,6 +287,7 @@ public:
hFunc(NFq::TEvRowDispatcher::TEvHeartbeat, Handle);
hFunc(TEvPrivate::TEvPrintState, Handle);
hFunc(TEvPrivate::TEvProcessState, Handle);
+ hFunc(TEvPrivate::TEvNotifyCA, Handle);
})
static constexpr char ActorName[] = "DQ_PQ_READ_ACTOR";
@@ -379,6 +384,7 @@ void TDqPqRdReadActor::Init() {
Send(LocalRowDispatcherActorId, new NFq::TEvRowDispatcher::TEvCoordinatorChangesSubscribe());
Schedule(TDuration::Seconds(PrintStatePeriodSec), new TEvPrivate::TEvPrintState());
+ Schedule(TDuration::Seconds(NotifyCAPeriodSec), new TEvPrivate::TEvNotifyCA());
Inited = true;
}
@@ -1004,6 +1010,11 @@ void TDqPqRdReadActor::UpdateSessions() {
LastUsedPartitionDistribution = LastReceivedPartitionDistribution;
}
+void TDqPqRdReadActor::Handle(TEvPrivate::TEvNotifyCA::TPtr&) {
+ Schedule(TDuration::Seconds(NotifyCAPeriodSec), new TEvPrivate::TEvNotifyCA());
+ NotifyCA();
+}
+
std::pair<IDqComputeActorAsyncInput*, NActors::IActor*> CreateDqPqRdReadActor(
const TTypeEnvironment& typeEnv,
NPq::NProto::TDqPqTopicSource&& settings,