aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorzverevgeny <zverevgeny@ydb.tech>2024-02-01 11:06:12 +0300
committerGitHub <noreply@github.com>2024-02-01 11:06:12 +0300
commit9cddf4b950e79f87332a27be97ef6f40a44e12dd (patch)
tree2c8e0888786dcd25e6ff2acefdf0736f22ae2ae7
parent9c7d88b7cf8205bbc4122f47933cb6b8dd71f34c (diff)
downloadydb-9cddf4b950e79f87332a27be97ef6f40a44e12dd.tar.gz
YQL-17542 move SaveState LoadState (#1474)
-rw-r--r--ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h26
-rw-r--r--ydb/library/yql/dq/actors/compute/dq_sync_compute_actor_base.h26
2 files changed, 27 insertions, 25 deletions
diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h
index e0d3d3fa297..27852630bf3 100644
--- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h
+++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h
@@ -741,22 +741,6 @@ protected:
return true;
}
- void SaveState(const NDqProto::TCheckpoint& checkpoint, NDqProto::TComputeActorState& state) const override {
- CA_LOG_D("Save state");
- NDqProto::TMiniKqlProgramState& mkqlProgramState = *state.MutableMiniKqlProgram();
- mkqlProgramState.SetRuntimeVersion(NDqProto::RUNTIME_VERSION_YQL_1_0);
- NDqProto::TStateData::TData& data = *mkqlProgramState.MutableData()->MutableStateData();
- data.SetVersion(TDqComputeActorCheckpoints::ComputeActorCurrentStateVersion);
- data.SetBlob(TaskRunner->Save());
-
- for (auto& [inputIndex, source] : SourcesMap) {
- YQL_ENSURE(source.AsyncInput, "Source[" << inputIndex << "] is not created");
- NDqProto::TSourceState& sourceState = *state.AddSources();
- source.AsyncInput->SaveState(checkpoint, sourceState);
- sourceState.SetInputIndex(inputIndex);
- }
- }
-
void CommitState(const NDqProto::TCheckpoint& checkpoint) override {
CA_LOG_D("Commit state");
for (auto& [inputIndex, source] : SourcesMap) {
@@ -810,15 +794,7 @@ protected:
}
}
- virtual void DoLoadRunnerState(TString&& blob) {
- TMaybe<TString> error = Nothing();
- try {
- TaskRunner->Load(blob);
- } catch (const std::exception& e) {
- error = e.what();
- }
- Checkpoints->AfterStateLoading(error);
- }
+ virtual void DoLoadRunnerState(TString&& blob) = 0;
void LoadState(NDqProto::TComputeActorState&& state) override {
CA_LOG_D("Load state");
diff --git a/ydb/library/yql/dq/actors/compute/dq_sync_compute_actor_base.h b/ydb/library/yql/dq/actors/compute/dq_sync_compute_actor_base.h
index eb69e10eea8..efb0b3c0590 100644
--- a/ydb/library/yql/dq/actors/compute/dq_sync_compute_actor_base.h
+++ b/ydb/library/yql/dq/actors/compute/dq_sync_compute_actor_base.h
@@ -44,6 +44,32 @@ public:
return inputTransformInfo.Buffer.Get();
}
protected:
+ void SaveState(const NDqProto::TCheckpoint& checkpoint, NDqProto::TComputeActorState& state) const override {
+ CA_LOG_D("Save state");
+ NDqProto::TMiniKqlProgramState& mkqlProgramState = *state.MutableMiniKqlProgram();
+ mkqlProgramState.SetRuntimeVersion(NDqProto::RUNTIME_VERSION_YQL_1_0);
+ NDqProto::TStateData::TData& data = *mkqlProgramState.MutableData()->MutableStateData();
+ data.SetVersion(TDqComputeActorCheckpoints::ComputeActorCurrentStateVersion);
+ data.SetBlob(this->TaskRunner->Save());
+
+ for (auto& [inputIndex, source] : this->SourcesMap) {
+ YQL_ENSURE(source.AsyncInput, "Source[" << inputIndex << "] is not created");
+ NDqProto::TSourceState& sourceState = *state.AddSources();
+ source.AsyncInput->SaveState(checkpoint, sourceState);
+ sourceState.SetInputIndex(inputIndex);
+ }
+ }
+
+ void DoLoadRunnerState(TString&& blob) override {
+ TMaybe<TString> error = Nothing();
+ try {
+ this->TaskRunner->Load(blob);
+ } catch (const std::exception& e) {
+ error = e.what();
+ }
+ this->Checkpoints->AfterStateLoading(error);
+ }
+
void SetTaskRunner(const TIntrusivePtr<IDqTaskRunner>& taskRunner) {
this->TaskRunner = taskRunner;
}