diff options
author | whcrc <whcrc@yandex-team.ru> | 2022-04-08 20:02:27 +0300 |
---|---|---|
committer | whcrc <whcrc@yandex-team.ru> | 2022-04-08 20:02:27 +0300 |
commit | ff5413a5bb8ed7ae510a8b11d167b5ee85d7efbe (patch) | |
tree | 5b42420fea0902e4907c8c59fca57f153f33ece0 | |
parent | cf0b73376c22e41e4a5721cbe3ec18725f82252c (diff) | |
download | ydb-ff5413a5bb8ed7ae510a8b11d167b5ee85d7efbe.tar.gz |
YQ-833: add test for minikql recover by checkpoint
ref:10b110386eb40e7ccea7cd6622963edf29b79256
-rw-r--r-- | ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp | 2 | ||||
-rw-r--r-- | ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h | 11 | ||||
-rw-r--r-- | ydb/library/yql/dq/runtime/dq_tasks_runner.cpp | 7 |
3 files changed, 15 insertions, 5 deletions
diff --git a/ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp b/ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp index bea98282ae4..fe420756a6e 100644 --- a/ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp +++ b/ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp @@ -35,7 +35,7 @@ public: TKqpComputeActor(const TActorId& executerId, ui64 txId, NDqProto::TDqTask&& task, IDqSourceActorFactory::TPtr sourceActorFactory, IDqSinkActorFactory::TPtr sinkActorFactory, const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits) - : TBase(executerId, txId, std::move(task), std::move(sourceActorFactory), std::move(sinkActorFactory), settings, memoryLimits) + : TBase(executerId, txId, std::move(task), std::move(sourceActorFactory), std::move(sinkActorFactory), settings, memoryLimits, /* passExceptions = */ true) , ComputeCtx(settings.StatsMode) { if (GetTask().GetMeta().Is<NKikimrTxDataShard::TKqpTransaction::TScanTaskMeta>()) { diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h index a99a7e7d464..5163aed60f2 100644 --- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h +++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h @@ -119,7 +119,7 @@ public: protected: TDqComputeActorBase(const NActors::TActorId& executerId, const TTxId& txId, NDqProto::TDqTask&& task, IDqSourceActorFactory::TPtr sourceActorFactory, IDqSinkActorFactory::TPtr sinkActorFactory, - const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits) + const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits, bool passExceptions = false) : ExecuterId(executerId) , TxId(txId) , Task(std::move(task)) @@ -131,6 +131,7 @@ protected: , CheckpointingMode(GetTaskCheckpointingMode(Task)) , State(Task.GetCreateSuspended() ? NDqProto::COMPUTE_STATE_UNKNOWN : NDqProto::COMPUTE_STATE_EXECUTING) , Running(!Task.GetCreateSuspended()) + , PassExceptions(passExceptions) { if (RuntimeSettings.StatsMode >= NDqProto::DQ_STATS_MODE_BASIC) { BasicStats = std::make_unique<TBasicStats>(); @@ -214,6 +215,9 @@ protected: << ", host: " << HostName() << ", canAllocateExtraMemory: " << CanAllocateExtraMemory); } catch (const std::exception& e) { + if (PassExceptions) { + throw; + } InternalError(TIssuesIds::UNEXPECTED, e.what()); } @@ -833,7 +837,9 @@ protected: YQL_ENSURE(inputChannel || outputChannel, "Unknown channelId: " << channelUpdate.GetId() << ", task: " << Task.GetId()); } - DoExecute(); + if (Running) { // waiting for TEvRun to start + DoExecute(); + } } void HandleExecuteBase(NActors::TEvents::TEvWakeup::TPtr& ev) { @@ -1514,6 +1520,7 @@ protected: private: bool Running = true; TInstant LastSendStatsTime; + bool PassExceptions = false; }; } // namespace NYql diff --git a/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp b/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp index 10900352c3b..c2ba27b5dc1 100644 --- a/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp +++ b/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp @@ -433,8 +433,6 @@ public: DqBuildInputValue(inputDesc, ProgramParsed.InputItemTypes[i], std::move(inputs), holderFactory)); } - ResultStream = ProgramParsed.CompGraph->GetValue(); - TVector<IDqOutputConsumer::TPtr> outputConsumers(task.OutputsSize()); for (ui32 i = 0; i < task.OutputsSize(); ++i) { auto& outputDesc = task.GetOutputs(i); @@ -513,6 +511,10 @@ public: ERunStatus Run() final { LOG(TStringBuilder() << "Run task: " << TaskId); + if (!ResultStream) { + TBindTerminator term(ProgramParsed.CompGraph->GetTerminator()); + ResultStream = ProgramParsed.CompGraph->GetValue(); + } RunComputeTime = TDuration::Zero(); @@ -625,6 +627,7 @@ public: } void Load(TStringBuf in) override { + Y_VERIFY(!ResultStream); ProgramParsed.CompGraph->LoadGraphState(in); } |