diff options
author | zverevgeny <zverevgeny@ydb.tech> | 2024-02-01 11:06:12 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-02-01 11:06:12 +0300 |
commit | 9cddf4b950e79f87332a27be97ef6f40a44e12dd (patch) | |
tree | 2c8e0888786dcd25e6ff2acefdf0736f22ae2ae7 | |
parent | 9c7d88b7cf8205bbc4122f47933cb6b8dd71f34c (diff) | |
download | ydb-9cddf4b950e79f87332a27be97ef6f40a44e12dd.tar.gz |
YQL-17542 move SaveState LoadState (#1474)
-rw-r--r-- | ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h | 26 | ||||
-rw-r--r-- | ydb/library/yql/dq/actors/compute/dq_sync_compute_actor_base.h | 26 |
2 files changed, 27 insertions, 25 deletions
diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h index e0d3d3fa297..27852630bf3 100644 --- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h +++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h @@ -741,22 +741,6 @@ protected: return true; } - void SaveState(const NDqProto::TCheckpoint& checkpoint, NDqProto::TComputeActorState& state) const override { - CA_LOG_D("Save state"); - NDqProto::TMiniKqlProgramState& mkqlProgramState = *state.MutableMiniKqlProgram(); - mkqlProgramState.SetRuntimeVersion(NDqProto::RUNTIME_VERSION_YQL_1_0); - NDqProto::TStateData::TData& data = *mkqlProgramState.MutableData()->MutableStateData(); - data.SetVersion(TDqComputeActorCheckpoints::ComputeActorCurrentStateVersion); - data.SetBlob(TaskRunner->Save()); - - for (auto& [inputIndex, source] : SourcesMap) { - YQL_ENSURE(source.AsyncInput, "Source[" << inputIndex << "] is not created"); - NDqProto::TSourceState& sourceState = *state.AddSources(); - source.AsyncInput->SaveState(checkpoint, sourceState); - sourceState.SetInputIndex(inputIndex); - } - } - void CommitState(const NDqProto::TCheckpoint& checkpoint) override { CA_LOG_D("Commit state"); for (auto& [inputIndex, source] : SourcesMap) { @@ -810,15 +794,7 @@ protected: } } - virtual void DoLoadRunnerState(TString&& blob) { - TMaybe<TString> error = Nothing(); - try { - TaskRunner->Load(blob); - } catch (const std::exception& e) { - error = e.what(); - } - Checkpoints->AfterStateLoading(error); - } + virtual void DoLoadRunnerState(TString&& blob) = 0; void LoadState(NDqProto::TComputeActorState&& state) override { CA_LOG_D("Load state"); diff --git a/ydb/library/yql/dq/actors/compute/dq_sync_compute_actor_base.h b/ydb/library/yql/dq/actors/compute/dq_sync_compute_actor_base.h index eb69e10eea8..efb0b3c0590 100644 --- a/ydb/library/yql/dq/actors/compute/dq_sync_compute_actor_base.h +++ b/ydb/library/yql/dq/actors/compute/dq_sync_compute_actor_base.h @@ -44,6 +44,32 @@ public: return inputTransformInfo.Buffer.Get(); } protected: + void SaveState(const NDqProto::TCheckpoint& checkpoint, NDqProto::TComputeActorState& state) const override { + CA_LOG_D("Save state"); + NDqProto::TMiniKqlProgramState& mkqlProgramState = *state.MutableMiniKqlProgram(); + mkqlProgramState.SetRuntimeVersion(NDqProto::RUNTIME_VERSION_YQL_1_0); + NDqProto::TStateData::TData& data = *mkqlProgramState.MutableData()->MutableStateData(); + data.SetVersion(TDqComputeActorCheckpoints::ComputeActorCurrentStateVersion); + data.SetBlob(this->TaskRunner->Save()); + + for (auto& [inputIndex, source] : this->SourcesMap) { + YQL_ENSURE(source.AsyncInput, "Source[" << inputIndex << "] is not created"); + NDqProto::TSourceState& sourceState = *state.AddSources(); + source.AsyncInput->SaveState(checkpoint, sourceState); + sourceState.SetInputIndex(inputIndex); + } + } + + void DoLoadRunnerState(TString&& blob) override { + TMaybe<TString> error = Nothing(); + try { + this->TaskRunner->Load(blob); + } catch (const std::exception& e) { + error = e.what(); + } + this->Checkpoints->AfterStateLoading(error); + } + void SetTaskRunner(const TIntrusivePtr<IDqTaskRunner>& taskRunner) { this->TaskRunner = taskRunner; } |