diff options
author | ulya-sidorina <yulia@ydb.tech> | 2022-08-23 22:08:40 +0300 |
---|---|---|
committer | ulya-sidorina <yulia@ydb.tech> | 2022-08-23 22:08:40 +0300 |
commit | f6f06f8d3d3fa32513405736aeceb52a801d9a19 (patch) | |
tree | 9f2b4495cdf0ec1ddcef0dd3a50368c595e2c478 | |
parent | 05b694dd4ef965ddcb61b55ea4e3d6d84ab873da (diff) | |
download | ydb-f6f06f8d3d3fa32513405736aeceb52a801d9a19.tar.gz |
check stage inputs in AreAllStagesKqpPure()
fix(kqp_opt): check inputs in AreAllStagesKqpPure()
-rw-r--r-- | ydb/core/kqp/executer/kqp_planner.cpp | 28 | ||||
-rw-r--r-- | ydb/core/kqp/executer/kqp_planner.h | 1 | ||||
-rw-r--r-- | ydb/core/kqp/executer/kqp_scan_executer.cpp | 2 | ||||
-rw-r--r-- | ydb/core/kqp/executer/kqp_tasks_graph.cpp | 10 | ||||
-rw-r--r-- | ydb/core/kqp/executer/kqp_tasks_graph.h | 2 | ||||
-rw-r--r-- | ydb/core/kqp/opt/kqp_opt.cpp | 10 | ||||
-rw-r--r-- | ydb/core/kqp/opt/kqp_opt_build_txs.cpp | 2 | ||||
-rw-r--r-- | ydb/core/kqp/opt/kqp_opt_impl.h | 1 | ||||
-rw-r--r-- | ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp | 2 |
9 files changed, 48 insertions, 10 deletions
diff --git a/ydb/core/kqp/executer/kqp_planner.cpp b/ydb/core/kqp/executer/kqp_planner.cpp index 385dd1fe685..313c8b0779b 100644 --- a/ydb/core/kqp/executer/kqp_planner.cpp +++ b/ydb/core/kqp/executer/kqp_planner.cpp @@ -246,6 +246,7 @@ THolder<TEvKqpNode::TEvStartKqpTasksRequest> TKqpPlanner::PrepareKqpNodeRequest( if (DisableLlvmForUdfStages && taskDesc.GetProgram().GetSettings().GetHasUdf()) { withLLVM = false; } + AddSnapshotInfoToTaskInputs(taskDesc); ev->Record.AddTasks()->Swap(&taskDesc); } } @@ -255,6 +256,7 @@ THolder<TEvKqpNode::TEvStartKqpTasksRequest> TKqpPlanner::PrepareKqpNodeRequest( if (DisableLlvmForUdfStages && taskDesc.GetProgram().GetSettings().GetHasUdf()) { withLLVM = false; } + AddSnapshotInfoToTaskInputs(taskDesc); ev->Record.AddTasks()->Swap(&taskDesc); } } @@ -301,6 +303,7 @@ void TKqpPlanner::AddScansToKqpNodeRequest(THolder<TEvKqpNode::TEvStartKqpTasksR if (DisableLlvmForUdfStages && task.GetProgram().GetSettings().GetHasUdf()) { withLLVM = false; } + AddSnapshotInfoToTaskInputs(task); ev->Record.AddTasks()->Swap(&task); } ScanTasks.erase(nodeId); @@ -319,6 +322,31 @@ ui32 TKqpPlanner::CalcSendMessageFlagsForNode(ui32 nodeId) { return flags; } +void TKqpPlanner::AddSnapshotInfoToTaskInputs(NYql::NDqProto::TDqTask& task) { + YQL_ENSURE(Snapshot.IsValid()); + + for (auto& input : *task.MutableInputs()) { + if (input.HasTransform()) { + auto transform = input.MutableTransform(); + YQL_ENSURE(transform->GetType() == "StreamLookupInputTransformer", + "Unexpected input transform type: " << transform->GetType()); + + const google::protobuf::Any& settingsAny = transform->GetSettings(); + YQL_ENSURE(settingsAny.Is<NKikimrKqp::TKqpStreamLookupSettings>(), "Expected settings type: " + << NKikimrKqp::TKqpStreamLookupSettings::descriptor()->full_name() + << " , but got: " << settingsAny.type_url()); + + NKikimrKqp::TKqpStreamLookupSettings settings; + YQL_ENSURE(settingsAny.UnpackTo(&settings), "Failed to unpack settings"); + + settings.MutableSnapshot()->SetStep(Snapshot.Step); + settings.MutableSnapshot()->SetTxId(Snapshot.TxId); + + transform->MutableSettings()->PackFrom(settings); + } + } +} + //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// IActor* CreateKqpPlanner(ui64 txId, const TActorId& executer, TVector<NDqProto::TDqTask>&& tasks, THashMap<ui64, TVector<NDqProto::TDqTask>>&& scanTasks, const IKqpGateway::TKqpSnapshot& snapshot, diff --git a/ydb/core/kqp/executer/kqp_planner.h b/ydb/core/kqp/executer/kqp_planner.h index b2d2549c394..ae1cb852c13 100644 --- a/ydb/core/kqp/executer/kqp_planner.h +++ b/ydb/core/kqp/executer/kqp_planner.h @@ -52,6 +52,7 @@ private: THolder<TEvKqpNode::TEvStartKqpTasksRequest> PrepareKqpNodeRequest(const TVector<ui64>& taskIds); void AddScansToKqpNodeRequest(THolder<TEvKqpNode::TEvStartKqpTasksRequest>& ev, ui64 nodeId); + void AddSnapshotInfoToTaskInputs(NYql::NDqProto::TDqTask& task); ui32 CalcSendMessageFlagsForNode(ui32 nodeId); diff --git a/ydb/core/kqp/executer/kqp_scan_executer.cpp b/ydb/core/kqp/executer/kqp_scan_executer.cpp index 9b7de40a8c2..228b36899c3 100644 --- a/ydb/core/kqp/executer/kqp_scan_executer.cpp +++ b/ydb/core/kqp/executer/kqp_scan_executer.cpp @@ -658,7 +658,7 @@ private: YQL_ENSURE(false, "Unexpected stage type " << (int) stageInfo.Meta.TableKind); } - BuildKqpStageChannels(TasksGraph, TableKeys, stageInfo, TxId, AppData()->EnableKqpSpilling, Request.Snapshot); + BuildKqpStageChannels(TasksGraph, TableKeys, stageInfo, TxId, AppData()->EnableKqpSpilling); } BuildKqpExecuterResults(*tx.Body, Results); diff --git a/ydb/core/kqp/executer/kqp_tasks_graph.cpp b/ydb/core/kqp/executer/kqp_tasks_graph.cpp index fd0efe74ec7..08dd6b985f7 100644 --- a/ydb/core/kqp/executer/kqp_tasks_graph.cpp +++ b/ydb/core/kqp/executer/kqp_tasks_graph.cpp @@ -197,15 +197,11 @@ void BuildShuffleShardChannels(TKqpTasksGraph& graph, const TStageInfo& stageInf void BuildStreamLookupChannels(TKqpTasksGraph& graph, const TStageInfo& stageInfo, ui32 inputIndex, const TStageInfo& inputStageInfo, ui32 outputIndex, const TKqpTableKeys& tableKeys, - const NKqpProto::TKqpPhyCnStreamLookup& streamLookup, const IKqpGateway::TKqpSnapshot& snapshot, - bool enableSpilling, const TChannelLogFunc& logFunc) { + const NKqpProto::TKqpPhyCnStreamLookup& streamLookup, bool enableSpilling, const TChannelLogFunc& logFunc) { YQL_ENSURE(stageInfo.Tasks.size() == inputStageInfo.Tasks.size()); - YQL_ENSURE(snapshot.IsValid()); NKikimrKqp::TKqpStreamLookupSettings settings; settings.MutableTable()->CopyFrom(streamLookup.GetTable()); - settings.MutableSnapshot()->SetStep(snapshot.Step); - settings.MutableSnapshot()->SetTxId(snapshot.TxId); auto table = tableKeys.GetTable(MakeTableId(streamLookup.GetTable())); for (const auto& keyColumn : streamLookup.GetKeyColumns()) { @@ -250,7 +246,7 @@ void BuildStreamLookupChannels(TKqpTasksGraph& graph, const TStageInfo& stageInf } void BuildKqpStageChannels(TKqpTasksGraph& tasksGraph, const TKqpTableKeys& tableKeys, const TStageInfo& stageInfo, - ui64 txId, bool enableSpilling, const IKqpGateway::TKqpSnapshot& snapshot) + ui64 txId, bool enableSpilling) { auto& stage = GetStage(stageInfo); @@ -312,7 +308,7 @@ void BuildKqpStageChannels(TKqpTasksGraph& tasksGraph, const TKqpTableKeys& tabl } case NKqpProto::TKqpPhyConnection::kStreamLookup: { BuildStreamLookupChannels(tasksGraph, stageInfo, inputIdx, inputStageInfo, outputIdx, tableKeys, - input.GetStreamLookup(), snapshot, enableSpilling, log); + input.GetStreamLookup(), enableSpilling, log); break; } diff --git a/ydb/core/kqp/executer/kqp_tasks_graph.h b/ydb/core/kqp/executer/kqp_tasks_graph.h index 486815f4000..986cd5ae7b3 100644 --- a/ydb/core/kqp/executer/kqp_tasks_graph.h +++ b/ydb/core/kqp/executer/kqp_tasks_graph.h @@ -154,7 +154,7 @@ using TKqpTasksGraph = NYql::NDq::TDqTasksGraph<TStageInfoMeta, TTaskMeta, TTask void FillKqpTasksGraphStages(TKqpTasksGraph& tasksGraph, const TVector<IKqpGateway::TPhysicalTxData>& txs); void BuildKqpTaskGraphResultChannels(TKqpTasksGraph& tasksGraph, const NKqpProto::TKqpPhyTx& tx, ui64 txIdx); void BuildKqpStageChannels(TKqpTasksGraph& tasksGraph, const TKqpTableKeys& tableKeys, const TStageInfo& stageInfo, - ui64 txId, bool enableSpilling, const IKqpGateway::TKqpSnapshot& snapshot = {}); + ui64 txId, bool enableSpilling); TVector<TTaskMeta::TColumn> BuildKqpColumns(const NKqpProto::TKqpPhyTableOperation& op, const TKqpTableKeys::TTable& table); struct TKqpTaskOutputType { diff --git a/ydb/core/kqp/opt/kqp_opt.cpp b/ydb/core/kqp/opt/kqp_opt.cpp index 1bd771ece1f..519fc65e93a 100644 --- a/ydb/core/kqp/opt/kqp_opt.cpp +++ b/ydb/core/kqp/opt/kqp_opt.cpp @@ -62,6 +62,16 @@ bool IsKqpPureLambda(const TCoLambda& lambda) { }); } +bool IsKqpPureInputs(const TExprList& inputs) { + return !FindNode(inputs.Ptr(), [](const TExprNode::TPtr& node) { + if (TMaybeNode<TKqpCnStreamLookup>(node)) { + return true; + } + + return false; + }); +} + bool IsKqpEffectsStage(const TDqStageBase& stage) { return stage.Program().Body().Maybe<TKqpEffects>().IsValid(); } diff --git a/ydb/core/kqp/opt/kqp_opt_build_txs.cpp b/ydb/core/kqp/opt/kqp_opt_build_txs.cpp index ac4c6b02e41..711788e3678 100644 --- a/ydb/core/kqp/opt/kqp_opt_build_txs.cpp +++ b/ydb/core/kqp/opt/kqp_opt_build_txs.cpp @@ -156,7 +156,7 @@ private: static bool AreAllStagesKqpPure(const TVector<TDqPhyStage>& stages) { // TODO: Avoid lambda analysis here, use sources/sinks for table interaction. - return std::all_of(stages.begin(), stages.end(), [](const auto& x) { return IsKqpPureLambda(x.Program()); }); + return std::all_of(stages.begin(), stages.end(), [](const auto& x) { return IsKqpPureLambda(x.Program()) && IsKqpPureInputs(x.Inputs()); }); } static TMaybeNode<TExprList> BuildTxResults(const TKqlQueryResultList& results, TVector<TDqPhyStage>& stages, diff --git a/ydb/core/kqp/opt/kqp_opt_impl.h b/ydb/core/kqp/opt/kqp_opt_impl.h index da0bae03c80..88e49dc8a17 100644 --- a/ydb/core/kqp/opt/kqp_opt_impl.h +++ b/ydb/core/kqp/opt/kqp_opt_impl.h @@ -31,6 +31,7 @@ static inline void DumpAppliedRule(const TString& name, const NYql::TExprNode::T } bool IsKqpPureLambda(const NYql::NNodes::TCoLambda& lambda); +bool IsKqpPureInputs(const NYql::NNodes::TExprList& inputs); const NYql::TKikimrTableDescription& GetTableData(const NYql::TKikimrTablesData& tablesData, TStringBuf cluster, TStringBuf table); diff --git a/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp b/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp index 0ff947a52ff..40ea4e6dac6 100644 --- a/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp +++ b/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp @@ -429,6 +429,8 @@ private: THolder<TEvDataShard::TEvRead> request(new TEvDataShard::TEvRead()); auto& record = request->Record; + YQL_ENSURE(Snapshot.IsValid()); + record.MutableSnapshot()->SetStep(Snapshot.Step); record.MutableSnapshot()->SetTxId(Snapshot.TxId); |