aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorulya-sidorina <yulia@ydb.tech>2022-08-23 22:08:40 +0300
committerulya-sidorina <yulia@ydb.tech>2022-08-23 22:08:40 +0300
commitf6f06f8d3d3fa32513405736aeceb52a801d9a19 (patch)
tree9f2b4495cdf0ec1ddcef0dd3a50368c595e2c478
parent05b694dd4ef965ddcb61b55ea4e3d6d84ab873da (diff)
downloadydb-f6f06f8d3d3fa32513405736aeceb52a801d9a19.tar.gz
check stage inputs in AreAllStagesKqpPure()
fix(kqp_opt): check inputs in AreAllStagesKqpPure()
-rw-r--r--ydb/core/kqp/executer/kqp_planner.cpp28
-rw-r--r--ydb/core/kqp/executer/kqp_planner.h1
-rw-r--r--ydb/core/kqp/executer/kqp_scan_executer.cpp2
-rw-r--r--ydb/core/kqp/executer/kqp_tasks_graph.cpp10
-rw-r--r--ydb/core/kqp/executer/kqp_tasks_graph.h2
-rw-r--r--ydb/core/kqp/opt/kqp_opt.cpp10
-rw-r--r--ydb/core/kqp/opt/kqp_opt_build_txs.cpp2
-rw-r--r--ydb/core/kqp/opt/kqp_opt_impl.h1
-rw-r--r--ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp2
9 files changed, 48 insertions, 10 deletions
diff --git a/ydb/core/kqp/executer/kqp_planner.cpp b/ydb/core/kqp/executer/kqp_planner.cpp
index 385dd1fe685..313c8b0779b 100644
--- a/ydb/core/kqp/executer/kqp_planner.cpp
+++ b/ydb/core/kqp/executer/kqp_planner.cpp
@@ -246,6 +246,7 @@ THolder<TEvKqpNode::TEvStartKqpTasksRequest> TKqpPlanner::PrepareKqpNodeRequest(
if (DisableLlvmForUdfStages && taskDesc.GetProgram().GetSettings().GetHasUdf()) {
withLLVM = false;
}
+ AddSnapshotInfoToTaskInputs(taskDesc);
ev->Record.AddTasks()->Swap(&taskDesc);
}
}
@@ -255,6 +256,7 @@ THolder<TEvKqpNode::TEvStartKqpTasksRequest> TKqpPlanner::PrepareKqpNodeRequest(
if (DisableLlvmForUdfStages && taskDesc.GetProgram().GetSettings().GetHasUdf()) {
withLLVM = false;
}
+ AddSnapshotInfoToTaskInputs(taskDesc);
ev->Record.AddTasks()->Swap(&taskDesc);
}
}
@@ -301,6 +303,7 @@ void TKqpPlanner::AddScansToKqpNodeRequest(THolder<TEvKqpNode::TEvStartKqpTasksR
if (DisableLlvmForUdfStages && task.GetProgram().GetSettings().GetHasUdf()) {
withLLVM = false;
}
+ AddSnapshotInfoToTaskInputs(task);
ev->Record.AddTasks()->Swap(&task);
}
ScanTasks.erase(nodeId);
@@ -319,6 +322,31 @@ ui32 TKqpPlanner::CalcSendMessageFlagsForNode(ui32 nodeId) {
return flags;
}
+void TKqpPlanner::AddSnapshotInfoToTaskInputs(NYql::NDqProto::TDqTask& task) {
+ YQL_ENSURE(Snapshot.IsValid());
+
+ for (auto& input : *task.MutableInputs()) {
+ if (input.HasTransform()) {
+ auto transform = input.MutableTransform();
+ YQL_ENSURE(transform->GetType() == "StreamLookupInputTransformer",
+ "Unexpected input transform type: " << transform->GetType());
+
+ const google::protobuf::Any& settingsAny = transform->GetSettings();
+ YQL_ENSURE(settingsAny.Is<NKikimrKqp::TKqpStreamLookupSettings>(), "Expected settings type: "
+ << NKikimrKqp::TKqpStreamLookupSettings::descriptor()->full_name()
+ << " , but got: " << settingsAny.type_url());
+
+ NKikimrKqp::TKqpStreamLookupSettings settings;
+ YQL_ENSURE(settingsAny.UnpackTo(&settings), "Failed to unpack settings");
+
+ settings.MutableSnapshot()->SetStep(Snapshot.Step);
+ settings.MutableSnapshot()->SetTxId(Snapshot.TxId);
+
+ transform->MutableSettings()->PackFrom(settings);
+ }
+ }
+}
+
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
IActor* CreateKqpPlanner(ui64 txId, const TActorId& executer, TVector<NDqProto::TDqTask>&& tasks,
THashMap<ui64, TVector<NDqProto::TDqTask>>&& scanTasks, const IKqpGateway::TKqpSnapshot& snapshot,
diff --git a/ydb/core/kqp/executer/kqp_planner.h b/ydb/core/kqp/executer/kqp_planner.h
index b2d2549c394..ae1cb852c13 100644
--- a/ydb/core/kqp/executer/kqp_planner.h
+++ b/ydb/core/kqp/executer/kqp_planner.h
@@ -52,6 +52,7 @@ private:
THolder<TEvKqpNode::TEvStartKqpTasksRequest> PrepareKqpNodeRequest(const TVector<ui64>& taskIds);
void AddScansToKqpNodeRequest(THolder<TEvKqpNode::TEvStartKqpTasksRequest>& ev, ui64 nodeId);
+ void AddSnapshotInfoToTaskInputs(NYql::NDqProto::TDqTask& task);
ui32 CalcSendMessageFlagsForNode(ui32 nodeId);
diff --git a/ydb/core/kqp/executer/kqp_scan_executer.cpp b/ydb/core/kqp/executer/kqp_scan_executer.cpp
index 9b7de40a8c2..228b36899c3 100644
--- a/ydb/core/kqp/executer/kqp_scan_executer.cpp
+++ b/ydb/core/kqp/executer/kqp_scan_executer.cpp
@@ -658,7 +658,7 @@ private:
YQL_ENSURE(false, "Unexpected stage type " << (int) stageInfo.Meta.TableKind);
}
- BuildKqpStageChannels(TasksGraph, TableKeys, stageInfo, TxId, AppData()->EnableKqpSpilling, Request.Snapshot);
+ BuildKqpStageChannels(TasksGraph, TableKeys, stageInfo, TxId, AppData()->EnableKqpSpilling);
}
BuildKqpExecuterResults(*tx.Body, Results);
diff --git a/ydb/core/kqp/executer/kqp_tasks_graph.cpp b/ydb/core/kqp/executer/kqp_tasks_graph.cpp
index fd0efe74ec7..08dd6b985f7 100644
--- a/ydb/core/kqp/executer/kqp_tasks_graph.cpp
+++ b/ydb/core/kqp/executer/kqp_tasks_graph.cpp
@@ -197,15 +197,11 @@ void BuildShuffleShardChannels(TKqpTasksGraph& graph, const TStageInfo& stageInf
void BuildStreamLookupChannels(TKqpTasksGraph& graph, const TStageInfo& stageInfo, ui32 inputIndex,
const TStageInfo& inputStageInfo, ui32 outputIndex, const TKqpTableKeys& tableKeys,
- const NKqpProto::TKqpPhyCnStreamLookup& streamLookup, const IKqpGateway::TKqpSnapshot& snapshot,
- bool enableSpilling, const TChannelLogFunc& logFunc) {
+ const NKqpProto::TKqpPhyCnStreamLookup& streamLookup, bool enableSpilling, const TChannelLogFunc& logFunc) {
YQL_ENSURE(stageInfo.Tasks.size() == inputStageInfo.Tasks.size());
- YQL_ENSURE(snapshot.IsValid());
NKikimrKqp::TKqpStreamLookupSettings settings;
settings.MutableTable()->CopyFrom(streamLookup.GetTable());
- settings.MutableSnapshot()->SetStep(snapshot.Step);
- settings.MutableSnapshot()->SetTxId(snapshot.TxId);
auto table = tableKeys.GetTable(MakeTableId(streamLookup.GetTable()));
for (const auto& keyColumn : streamLookup.GetKeyColumns()) {
@@ -250,7 +246,7 @@ void BuildStreamLookupChannels(TKqpTasksGraph& graph, const TStageInfo& stageInf
}
void BuildKqpStageChannels(TKqpTasksGraph& tasksGraph, const TKqpTableKeys& tableKeys, const TStageInfo& stageInfo,
- ui64 txId, bool enableSpilling, const IKqpGateway::TKqpSnapshot& snapshot)
+ ui64 txId, bool enableSpilling)
{
auto& stage = GetStage(stageInfo);
@@ -312,7 +308,7 @@ void BuildKqpStageChannels(TKqpTasksGraph& tasksGraph, const TKqpTableKeys& tabl
}
case NKqpProto::TKqpPhyConnection::kStreamLookup: {
BuildStreamLookupChannels(tasksGraph, stageInfo, inputIdx, inputStageInfo, outputIdx, tableKeys,
- input.GetStreamLookup(), snapshot, enableSpilling, log);
+ input.GetStreamLookup(), enableSpilling, log);
break;
}
diff --git a/ydb/core/kqp/executer/kqp_tasks_graph.h b/ydb/core/kqp/executer/kqp_tasks_graph.h
index 486815f4000..986cd5ae7b3 100644
--- a/ydb/core/kqp/executer/kqp_tasks_graph.h
+++ b/ydb/core/kqp/executer/kqp_tasks_graph.h
@@ -154,7 +154,7 @@ using TKqpTasksGraph = NYql::NDq::TDqTasksGraph<TStageInfoMeta, TTaskMeta, TTask
void FillKqpTasksGraphStages(TKqpTasksGraph& tasksGraph, const TVector<IKqpGateway::TPhysicalTxData>& txs);
void BuildKqpTaskGraphResultChannels(TKqpTasksGraph& tasksGraph, const NKqpProto::TKqpPhyTx& tx, ui64 txIdx);
void BuildKqpStageChannels(TKqpTasksGraph& tasksGraph, const TKqpTableKeys& tableKeys, const TStageInfo& stageInfo,
- ui64 txId, bool enableSpilling, const IKqpGateway::TKqpSnapshot& snapshot = {});
+ ui64 txId, bool enableSpilling);
TVector<TTaskMeta::TColumn> BuildKqpColumns(const NKqpProto::TKqpPhyTableOperation& op, const TKqpTableKeys::TTable& table);
struct TKqpTaskOutputType {
diff --git a/ydb/core/kqp/opt/kqp_opt.cpp b/ydb/core/kqp/opt/kqp_opt.cpp
index 1bd771ece1f..519fc65e93a 100644
--- a/ydb/core/kqp/opt/kqp_opt.cpp
+++ b/ydb/core/kqp/opt/kqp_opt.cpp
@@ -62,6 +62,16 @@ bool IsKqpPureLambda(const TCoLambda& lambda) {
});
}
+bool IsKqpPureInputs(const TExprList& inputs) {
+ return !FindNode(inputs.Ptr(), [](const TExprNode::TPtr& node) {
+ if (TMaybeNode<TKqpCnStreamLookup>(node)) {
+ return true;
+ }
+
+ return false;
+ });
+}
+
bool IsKqpEffectsStage(const TDqStageBase& stage) {
return stage.Program().Body().Maybe<TKqpEffects>().IsValid();
}
diff --git a/ydb/core/kqp/opt/kqp_opt_build_txs.cpp b/ydb/core/kqp/opt/kqp_opt_build_txs.cpp
index ac4c6b02e41..711788e3678 100644
--- a/ydb/core/kqp/opt/kqp_opt_build_txs.cpp
+++ b/ydb/core/kqp/opt/kqp_opt_build_txs.cpp
@@ -156,7 +156,7 @@ private:
static bool AreAllStagesKqpPure(const TVector<TDqPhyStage>& stages) {
// TODO: Avoid lambda analysis here, use sources/sinks for table interaction.
- return std::all_of(stages.begin(), stages.end(), [](const auto& x) { return IsKqpPureLambda(x.Program()); });
+ return std::all_of(stages.begin(), stages.end(), [](const auto& x) { return IsKqpPureLambda(x.Program()) && IsKqpPureInputs(x.Inputs()); });
}
static TMaybeNode<TExprList> BuildTxResults(const TKqlQueryResultList& results, TVector<TDqPhyStage>& stages,
diff --git a/ydb/core/kqp/opt/kqp_opt_impl.h b/ydb/core/kqp/opt/kqp_opt_impl.h
index da0bae03c80..88e49dc8a17 100644
--- a/ydb/core/kqp/opt/kqp_opt_impl.h
+++ b/ydb/core/kqp/opt/kqp_opt_impl.h
@@ -31,6 +31,7 @@ static inline void DumpAppliedRule(const TString& name, const NYql::TExprNode::T
}
bool IsKqpPureLambda(const NYql::NNodes::TCoLambda& lambda);
+bool IsKqpPureInputs(const NYql::NNodes::TExprList& inputs);
const NYql::TKikimrTableDescription& GetTableData(const NYql::TKikimrTablesData& tablesData,
TStringBuf cluster, TStringBuf table);
diff --git a/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp b/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp
index 0ff947a52ff..40ea4e6dac6 100644
--- a/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp
+++ b/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp
@@ -429,6 +429,8 @@ private:
THolder<TEvDataShard::TEvRead> request(new TEvDataShard::TEvRead());
auto& record = request->Record;
+ YQL_ENSURE(Snapshot.IsValid());
+
record.MutableSnapshot()->SetStep(Snapshot.Step);
record.MutableSnapshot()->SetTxId(Snapshot.TxId);