aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorVasily Gerasimov <UgnineSirdis@gmail.com>2022-06-30 03:40:54 +0300
committerVasily Gerasimov <UgnineSirdis@gmail.com>2022-06-30 03:40:54 +0300
commit735d9c3127b2fd3472830948c1f4a0d9be1fdab3 (patch)
treeef87697504ee803fda9215e4e4dc97e71be61939
parent2864f412d9b41800da7fd36455e8350b14760c7b (diff)
downloadydb-735d9c3127b2fd3472830948c1f4a0d9be1fdab3.tar.gz
YQ-1203 Fix race while loading task runner and its state simultaneously
Fix race while loading task runner and its state simultaneously ref:0336aa22c7223a0aadc11de74c45dd543fa0ad08
-rw-r--r--ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp42
1 files changed, 39 insertions, 3 deletions
diff --git a/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp b/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp
index 8c6517e070..6ba070e1cf 100644
--- a/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp
+++ b/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp
@@ -20,7 +20,7 @@ bool IsDebugLogEnabled(const TActorSystem* actorSystem) {
} // anonymous namespace
class TDqAsyncComputeActor : public TDqComputeActorBase<TDqAsyncComputeActor>
- , public NTaskRunnerActor::ITaskRunnerActor::ICallbacks
+ , public NTaskRunnerActor::ITaskRunnerActor::ICallbacks
{
using TBase = TDqComputeActorBase<TDqAsyncComputeActor>;
@@ -90,8 +90,10 @@ private:
hFunc(TEvDqCompute::TEvStateRequest, OnStateRequest);
hFunc(NTaskRunnerActor::TEvStatistics, OnStatisticsResponse);
hFunc(NTaskRunnerActor::TEvLoadTaskRunnerFromStateDone, OnTaskRunnerLoaded);
+ hFunc(TEvDqCompute::TEvInjectCheckpoint, OnInjectCheckpoint);
+ hFunc(TEvDqCompute::TEvRestoreFromCheckpoint, OnRestoreFromCheckpoint);
default:
- TDqComputeActorBase<TDqAsyncComputeActor>::StateFuncBase(ev, ctx);
+ TBase::StateFuncBase(ev, ctx);
};
ReportEventElapsedTime();
@@ -262,7 +264,7 @@ private:
if (TaskRunnerActor) {
TaskRunnerActor->PassAway();
}
- NActors::IActor::PassAway();
+ TBase::PassAway();
}
TMaybe<NTaskRunnerActor::TCheckpointRequest> GetCheckpointRequest() {
@@ -328,6 +330,13 @@ private:
Send(ExecuterId, ev.Release(), NActors::IEventHandle::FlagTrackDelivery);
}
+ if (DeferredInjectCheckpointEvent) {
+ ForwardToCheckpoints(std::move(DeferredInjectCheckpointEvent));
+ }
+ if (DeferredRestoreFromCheckpointEvent) {
+ ForwardToCheckpoints(std::move(DeferredRestoreFromCheckpointEvent));
+ }
+
ContinueExecute();
}
@@ -547,10 +556,37 @@ private:
Checkpoints->AfterStateLoading(std::move(ev->Get()->Error));
}
+ template <class TEvPtr>
+ void ForwardToCheckpoints(TEvPtr&& ev) {
+ auto* x = reinterpret_cast<TAutoPtr<NActors::IEventHandle>*>(&ev);
+ Checkpoints->Receive(*x, TActivationContext::AsActorContext());
+ ev = nullptr;
+ }
+
+ void OnInjectCheckpoint(TEvDqCompute::TEvInjectCheckpoint::TPtr& ev) {
+ if (TypeEnv) {
+ ForwardToCheckpoints(std::move(ev));
+ } else {
+ Y_VERIFY(!DeferredInjectCheckpointEvent);
+ DeferredInjectCheckpointEvent = std::move(ev);
+ }
+ }
+
+ void OnRestoreFromCheckpoint(TEvDqCompute::TEvRestoreFromCheckpoint::TPtr& ev) {
+ if (TypeEnv) {
+ ForwardToCheckpoints(std::move(ev));
+ } else {
+ Y_VERIFY(!DeferredRestoreFromCheckpointEvent);
+ DeferredRestoreFromCheckpointEvent = std::move(ev);
+ }
+ }
+
NKikimr::NMiniKQL::TTypeEnvironment* TypeEnv = nullptr;
NTaskRunnerActor::ITaskRunnerActor* TaskRunnerActor = nullptr;
NActors::TActorId TaskRunnerActorId;
NTaskRunnerActor::ITaskRunnerActorFactory::TPtr TaskRunnerActorFactory;
+ TEvDqCompute::TEvInjectCheckpoint::TPtr DeferredInjectCheckpointEvent;
+ TEvDqCompute::TEvRestoreFromCheckpoint::TPtr DeferredRestoreFromCheckpointEvent;
THashSet<ui64> FinishedOutputChannels;
THashSet<ui64> FinishedSinks;