aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorSergey Uzhakov <uzhastik@gmail.com>2022-04-24 01:02:13 +0300
committerSergey Uzhakov <uzhastik@gmail.com>2022-04-24 01:02:13 +0300
commit3f6e9cab8cfaa0d12f631edef218864c61c193d6 (patch)
treede6e657fbc4ebc116ef60ea61a67ed20c7be6de9
parent6c3af841aa6189e444cf951edaf72dbd66d596c8 (diff)
downloadydb-3f6e9cab8cfaa0d12f631edef218864c61c193d6.tar.gz
YQ-1043: temporary extend tasks packer logic to support different programs in task from single stage
ref:28b5d78ef1b2fe9e207df6975681720a8e1dc9d1
-rw-r--r--ydb/core/yq/libs/tasks_packer/tasks_packer.cpp17
1 files changed, 14 insertions, 3 deletions
diff --git a/ydb/core/yq/libs/tasks_packer/tasks_packer.cpp b/ydb/core/yq/libs/tasks_packer/tasks_packer.cpp
index f7bbd04c06..57865f69e8 100644
--- a/ydb/core/yq/libs/tasks_packer/tasks_packer.cpp
+++ b/ydb/core/yq/libs/tasks_packer/tasks_packer.cpp
@@ -9,9 +9,13 @@ namespace NTasksPacker {
void Pack(TVector<NYql::NDqProto::TDqTask>& tasks, THashMap<i64, TString>& stagePrograms) {
for (auto& task : tasks) {
auto stageId = task.GetStageId();
- auto it = stagePrograms.find(stageId);
- if (it == stagePrograms.end()) {
- stagePrograms[stageId] = std::move(*task.MutableProgram()->MutableRaw());
+ auto& p = stagePrograms[stageId];
+ if (!p) {
+ p = std::move(*task.MutableProgram()->MutableRaw());
+ task.MutableProgram()->MutableRaw()->clear();
+ continue;
+ }
+ if (p == task.GetProgram().GetRaw()) {
task.MutableProgram()->MutableRaw()->clear();
}
}
@@ -19,6 +23,9 @@ void Pack(TVector<NYql::NDqProto::TDqTask>& tasks, THashMap<i64, TString>& stage
void UnPack(TVector<NYql::NDqProto::TDqTask>& tasks, const THashMap<i64, TString>& stagePrograms) {
for (auto& task : tasks) {
+ if (task.GetProgram().GetRaw()) {
+ continue;
+ }
auto stageId = task.GetStageId();
auto it = stagePrograms.find(stageId);
YQL_ENSURE(it != stagePrograms.end());
@@ -38,6 +45,10 @@ void UnPack(
TVector<NYql::NDqProto::TDqTask> tasks;
tasks.reserve(src.size());
for (const auto& srcTask : src) {
+ if (srcTask.GetProgram().GetRaw()) {
+ tasks.emplace_back(srcTask);
+ continue;
+ }
auto stageId = srcTask.GetStageId();
auto it = stagePrograms.find(stageId);
YQL_ENSURE(it != stagePrograms.end());