diff options
author | Vasily Gerasimov <UgnineSirdis@gmail.com> | 2022-06-30 03:40:54 +0300 |
---|---|---|
committer | Vasily Gerasimov <UgnineSirdis@gmail.com> | 2022-06-30 03:40:54 +0300 |
commit | 735d9c3127b2fd3472830948c1f4a0d9be1fdab3 (patch) | |
tree | ef87697504ee803fda9215e4e4dc97e71be61939 | |
parent | 2864f412d9b41800da7fd36455e8350b14760c7b (diff) | |
download | ydb-735d9c3127b2fd3472830948c1f4a0d9be1fdab3.tar.gz |
YQ-1203 Fix race while loading task runner and its state simultaneously
Fix race while loading task runner and its state simultaneously
ref:0336aa22c7223a0aadc11de74c45dd543fa0ad08
-rw-r--r-- | ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp | 42 |
1 files changed, 39 insertions, 3 deletions
diff --git a/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp b/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp index 8c6517e070..6ba070e1cf 100644 --- a/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp +++ b/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp @@ -20,7 +20,7 @@ bool IsDebugLogEnabled(const TActorSystem* actorSystem) { } // anonymous namespace class TDqAsyncComputeActor : public TDqComputeActorBase<TDqAsyncComputeActor> - , public NTaskRunnerActor::ITaskRunnerActor::ICallbacks + , public NTaskRunnerActor::ITaskRunnerActor::ICallbacks { using TBase = TDqComputeActorBase<TDqAsyncComputeActor>; @@ -90,8 +90,10 @@ private: hFunc(TEvDqCompute::TEvStateRequest, OnStateRequest); hFunc(NTaskRunnerActor::TEvStatistics, OnStatisticsResponse); hFunc(NTaskRunnerActor::TEvLoadTaskRunnerFromStateDone, OnTaskRunnerLoaded); + hFunc(TEvDqCompute::TEvInjectCheckpoint, OnInjectCheckpoint); + hFunc(TEvDqCompute::TEvRestoreFromCheckpoint, OnRestoreFromCheckpoint); default: - TDqComputeActorBase<TDqAsyncComputeActor>::StateFuncBase(ev, ctx); + TBase::StateFuncBase(ev, ctx); }; ReportEventElapsedTime(); @@ -262,7 +264,7 @@ private: if (TaskRunnerActor) { TaskRunnerActor->PassAway(); } - NActors::IActor::PassAway(); + TBase::PassAway(); } TMaybe<NTaskRunnerActor::TCheckpointRequest> GetCheckpointRequest() { @@ -328,6 +330,13 @@ private: Send(ExecuterId, ev.Release(), NActors::IEventHandle::FlagTrackDelivery); } + if (DeferredInjectCheckpointEvent) { + ForwardToCheckpoints(std::move(DeferredInjectCheckpointEvent)); + } + if (DeferredRestoreFromCheckpointEvent) { + ForwardToCheckpoints(std::move(DeferredRestoreFromCheckpointEvent)); + } + ContinueExecute(); } @@ -547,10 +556,37 @@ private: Checkpoints->AfterStateLoading(std::move(ev->Get()->Error)); } + template <class TEvPtr> + void ForwardToCheckpoints(TEvPtr&& ev) { + auto* x = reinterpret_cast<TAutoPtr<NActors::IEventHandle>*>(&ev); + Checkpoints->Receive(*x, TActivationContext::AsActorContext()); + ev = nullptr; + } + + void OnInjectCheckpoint(TEvDqCompute::TEvInjectCheckpoint::TPtr& ev) { + if (TypeEnv) { + ForwardToCheckpoints(std::move(ev)); + } else { + Y_VERIFY(!DeferredInjectCheckpointEvent); + DeferredInjectCheckpointEvent = std::move(ev); + } + } + + void OnRestoreFromCheckpoint(TEvDqCompute::TEvRestoreFromCheckpoint::TPtr& ev) { + if (TypeEnv) { + ForwardToCheckpoints(std::move(ev)); + } else { + Y_VERIFY(!DeferredRestoreFromCheckpointEvent); + DeferredRestoreFromCheckpointEvent = std::move(ev); + } + } + NKikimr::NMiniKQL::TTypeEnvironment* TypeEnv = nullptr; NTaskRunnerActor::ITaskRunnerActor* TaskRunnerActor = nullptr; NActors::TActorId TaskRunnerActorId; NTaskRunnerActor::ITaskRunnerActorFactory::TPtr TaskRunnerActorFactory; + TEvDqCompute::TEvInjectCheckpoint::TPtr DeferredInjectCheckpointEvent; + TEvDqCompute::TEvRestoreFromCheckpoint::TPtr DeferredRestoreFromCheckpointEvent; THashSet<ui64> FinishedOutputChannels; THashSet<ui64> FinishedSinks; |