aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorwhcrc <whcrc@yandex-team.ru>2022-04-08 20:02:27 +0300
committerwhcrc <whcrc@yandex-team.ru>2022-04-08 20:02:27 +0300
commitff5413a5bb8ed7ae510a8b11d167b5ee85d7efbe (patch)
tree5b42420fea0902e4907c8c59fca57f153f33ece0
parentcf0b73376c22e41e4a5721cbe3ec18725f82252c (diff)
downloadydb-ff5413a5bb8ed7ae510a8b11d167b5ee85d7efbe.tar.gz
YQ-833: add test for minikql recover by checkpoint
ref:10b110386eb40e7ccea7cd6622963edf29b79256
-rw-r--r--ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp2
-rw-r--r--ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h11
-rw-r--r--ydb/library/yql/dq/runtime/dq_tasks_runner.cpp7
3 files changed, 15 insertions, 5 deletions
diff --git a/ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp b/ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp
index bea98282ae4..fe420756a6e 100644
--- a/ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp
+++ b/ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp
@@ -35,7 +35,7 @@ public:
TKqpComputeActor(const TActorId& executerId, ui64 txId, NDqProto::TDqTask&& task,
IDqSourceActorFactory::TPtr sourceActorFactory, IDqSinkActorFactory::TPtr sinkActorFactory,
const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits)
- : TBase(executerId, txId, std::move(task), std::move(sourceActorFactory), std::move(sinkActorFactory), settings, memoryLimits)
+ : TBase(executerId, txId, std::move(task), std::move(sourceActorFactory), std::move(sinkActorFactory), settings, memoryLimits, /* passExceptions = */ true)
, ComputeCtx(settings.StatsMode)
{
if (GetTask().GetMeta().Is<NKikimrTxDataShard::TKqpTransaction::TScanTaskMeta>()) {
diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h
index a99a7e7d464..5163aed60f2 100644
--- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h
+++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h
@@ -119,7 +119,7 @@ public:
protected:
TDqComputeActorBase(const NActors::TActorId& executerId, const TTxId& txId, NDqProto::TDqTask&& task,
IDqSourceActorFactory::TPtr sourceActorFactory, IDqSinkActorFactory::TPtr sinkActorFactory,
- const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits)
+ const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits, bool passExceptions = false)
: ExecuterId(executerId)
, TxId(txId)
, Task(std::move(task))
@@ -131,6 +131,7 @@ protected:
, CheckpointingMode(GetTaskCheckpointingMode(Task))
, State(Task.GetCreateSuspended() ? NDqProto::COMPUTE_STATE_UNKNOWN : NDqProto::COMPUTE_STATE_EXECUTING)
, Running(!Task.GetCreateSuspended())
+ , PassExceptions(passExceptions)
{
if (RuntimeSettings.StatsMode >= NDqProto::DQ_STATS_MODE_BASIC) {
BasicStats = std::make_unique<TBasicStats>();
@@ -214,6 +215,9 @@ protected:
<< ", host: " << HostName()
<< ", canAllocateExtraMemory: " << CanAllocateExtraMemory);
} catch (const std::exception& e) {
+ if (PassExceptions) {
+ throw;
+ }
InternalError(TIssuesIds::UNEXPECTED, e.what());
}
@@ -833,7 +837,9 @@ protected:
YQL_ENSURE(inputChannel || outputChannel, "Unknown channelId: " << channelUpdate.GetId() << ", task: " << Task.GetId());
}
- DoExecute();
+ if (Running) { // waiting for TEvRun to start
+ DoExecute();
+ }
}
void HandleExecuteBase(NActors::TEvents::TEvWakeup::TPtr& ev) {
@@ -1514,6 +1520,7 @@ protected:
private:
bool Running = true;
TInstant LastSendStatsTime;
+ bool PassExceptions = false;
};
} // namespace NYql
diff --git a/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp b/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp
index 10900352c3b..c2ba27b5dc1 100644
--- a/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp
+++ b/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp
@@ -433,8 +433,6 @@ public:
DqBuildInputValue(inputDesc, ProgramParsed.InputItemTypes[i], std::move(inputs), holderFactory));
}
- ResultStream = ProgramParsed.CompGraph->GetValue();
-
TVector<IDqOutputConsumer::TPtr> outputConsumers(task.OutputsSize());
for (ui32 i = 0; i < task.OutputsSize(); ++i) {
auto& outputDesc = task.GetOutputs(i);
@@ -513,6 +511,10 @@ public:
ERunStatus Run() final {
LOG(TStringBuilder() << "Run task: " << TaskId);
+ if (!ResultStream) {
+ TBindTerminator term(ProgramParsed.CompGraph->GetTerminator());
+ ResultStream = ProgramParsed.CompGraph->GetValue();
+ }
RunComputeTime = TDuration::Zero();
@@ -625,6 +627,7 @@ public:
}
void Load(TStringBuf in) override {
+ Y_VERIFY(!ResultStream);
ProgramParsed.CompGraph->LoadGraphState(in);
}