diff options
author | aozeritsky <aozeritsky@yandex-team.ru> | 2022-04-28 17:51:04 +0300 |
---|---|---|
committer | aozeritsky <aozeritsky@yandex-team.ru> | 2022-04-28 17:51:04 +0300 |
commit | 47c30cd16e2884561a0b247945b75e288fa5bf95 (patch) | |
tree | 937c7c71865e2fa401025640b49484958307c9f2 | |
parent | e4e932fb112cbc9df7800e1418ca5eeed0bf323f (diff) | |
download | ydb-47c30cd16e2884561a0b247945b75e288fa5bf95.tar.gz |
YQ-1066: stageId/publicId separation
ref:fa806b01ccf68085e8b525cc01be3e89967ad4f1
3 files changed, 14 insertions, 12 deletions
diff --git a/ydb/core/yq/libs/pretty_printers/graph_params_printer.cpp b/ydb/core/yq/libs/pretty_printers/graph_params_printer.cpp index dbdad960e8c..4c398f6c1ba 100644 --- a/ydb/core/yq/libs/pretty_printers/graph_params_printer.cpp +++ b/ydb/core/yq/libs/pretty_printers/graph_params_printer.cpp @@ -49,6 +49,11 @@ public: TString PrettyPrintGraphParams(const NProto::TGraphParams& sourceGraphParams, bool canonical) { NProto::TGraphParams patchedGraphParams = sourceGraphParams; + if (canonical) { + for (auto& task : *patchedGraphParams.MutableTasks()) { + task.ClearStageId(); + } + } for (auto& [secureKey, tokenValue] : *patchedGraphParams.MutableSecureParams()) { tokenValue = "== token_value =="; } diff --git a/ydb/library/yql/providers/dq/planner/execution_planner.cpp b/ydb/library/yql/providers/dq/planner/execution_planner.cpp index 23b8191cacd..afbadeeda0c 100644 --- a/ydb/library/yql/providers/dq/planner/execution_planner.cpp +++ b/ydb/library/yql/providers/dq/planner/execution_planner.cpp @@ -350,7 +350,7 @@ namespace NYql::NDqs { tasks[i].ComputeActorId = workers[i]; } - THashMap<TStageId, std::tuple<TString, ui64>> stagePrograms = BuildAllPrograms(); + THashMap<TStageId, std::tuple<TString, ui64, ui64>> stagePrograms = BuildAllPrograms(); TVector<TDqTask> plan; THashSet<TString> clusterNameHints; for (const auto& task : tasks) { @@ -391,10 +391,10 @@ namespace NYql::NDqs { auto& program = *taskDesc.MutableProgram(); program.SetRuntimeVersion(NYql::NDqProto::ERuntimeVersion::RUNTIME_VERSION_YQL_1_0); TString programStr; - ui64 stageId; - std::tie(programStr, stageId) = stagePrograms[task.StageId]; + ui64 stageId, publicId; + std::tie(programStr, stageId, publicId) = stagePrograms[task.StageId]; program.SetRaw(programStr); - taskMeta.SetStageId(stageId); + taskMeta.SetStageId(publicId); taskDesc.MutableMeta()->PackFrom(taskMeta); taskDesc.SetStageId(stageId); @@ -535,10 +535,10 @@ namespace NYql::NDqs { #undef BUILD_CONNECTION - THashMap<TStageId, std::tuple<TString,ui64>> TDqsExecutionPlanner::BuildAllPrograms() { +THashMap<TStageId, std::tuple<TString,ui64,ui64>> TDqsExecutionPlanner::BuildAllPrograms() { using namespace NKikimr::NMiniKQL; - THashMap<TStageId, std::tuple<TString,ui64>> result; + THashMap<TStageId, std::tuple<TString,ui64,ui64>> result; TScopedAlloc alloc(NKikimr::TAlignedPagePoolCounters(), FunctionRegistry->SupportsSizedAllocators()); TTypeEnvironment typeEnv(alloc); TVector<NNodes::TExprBase> fakeReads; @@ -552,10 +552,7 @@ namespace NYql::NDqs { auto settings = NDq::TDqStageSettings::Parse(stage); ui64 stageId = stage.Ref().UniqueId(); - auto maybeStageId = PublicIds.find(settings.LogicalId); - if (maybeStageId != PublicIds.end()) { - stageId = maybeStageId->second; - } + ui64 publicId = PublicIds.Value(settings.LogicalId, stageId); /* TODO: ui64 stageId = stage.Ref().UniqueId(); @@ -576,7 +573,7 @@ namespace NYql::NDqs { NDq::BuildProgram( stage.Program(), *paramsType, compiler, typeEnv, *FunctionRegistry, ExprContext, fakeReads), - stageId); + stageId, publicId); } return result; diff --git a/ydb/library/yql/providers/dq/planner/execution_planner.h b/ydb/library/yql/providers/dq/planner/execution_planner.h index 280600c8f7b..9ad12ada58e 100644 --- a/ydb/library/yql/providers/dq/planner/execution_planner.h +++ b/ydb/library/yql/providers/dq/planner/execution_planner.h @@ -52,7 +52,7 @@ namespace NYql::NDqs { private: bool BuildReadStage(const TDqSettings::TPtr& settings, const NNodes::TDqPhyStage& stage, bool dqSource, bool canFallback); void BuildConnections(const NNodes::TDqPhyStage& stage); - THashMap<NDq::TStageId, std::tuple<TString,ui64>> BuildAllPrograms(); + THashMap<NDq::TStageId, std::tuple<TString,ui64,ui64>> BuildAllPrograms(); void FillChannelDesc(NDqProto::TChannel& channelDesc, const NDq::TChannel& channel); void FillInputDesc(NDqProto::TTaskInput& inputDesc, const TTaskInput& input); void FillOutputDesc(NDqProto::TTaskOutput& outputDesc, const TTaskOutput& output); |