diff options
author | galaxycrab <UgnineSirdis@ydb.tech> | 2023-08-04 21:25:10 +0300 |
---|---|---|
committer | galaxycrab <UgnineSirdis@ydb.tech> | 2023-08-04 21:58:33 +0300 |
commit | 803265b1cbfeebb85765030cc317e24938d787e8 (patch) | |
tree | e6563c70f64eaf3a01868b741d6f6b026d232f0c | |
parent | 0f496fc8a55e9d2dfd4fe9da109ef9abc58c68f4 (diff) | |
download | ydb-803265b1cbfeebb85765030cc317e24938d787e8.tar.gz |
Support write to external sources in KQP
47 files changed, 1127 insertions, 203 deletions
diff --git a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp index b247f4a77f..4bc8e8ba9e 100644 --- a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp +++ b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp @@ -1338,6 +1338,7 @@ private: void BuildDatashardTasks(TStageInfo& stageInfo) { THashMap<ui64, ui64> shardTasks; // shardId -> taskId + auto& stage = stageInfo.Meta.GetStage(stageInfo.Id); auto getShardTask = [&](ui64 shardId) -> TTask& { auto it = shardTasks.find(shardId); @@ -1349,11 +1350,12 @@ private: task.Meta.ExecuterId = SelfId(); task.Meta.ShardId = shardId; shardTasks.emplace(shardId, task.Id); + + BuildSinks(stage, task); + return task; }; - auto& stage = stageInfo.Meta.GetStage(stageInfo.Id); - const auto& table = GetTableKeys().GetTable(stageInfo.Meta.TableId); const auto& keyTypes = table.KeyColumnTypes;; @@ -1531,6 +1533,9 @@ private: auto& task = TasksGraph.AddTask(stageInfo); task.Meta.ExecuterId = SelfId(); task.Meta.Type = TTaskMeta::TTaskType::Compute; + + BuildSinks(stage, task); + LOG_D("Stage " << stageInfo.Id << " create compute task: " << task.Id); } } diff --git a/ydb/core/kqp/executer_actor/kqp_executer_impl.h b/ydb/core/kqp/executer_actor/kqp_executer_impl.h index 8d9c9595b2..1014b280cc 100644 --- a/ydb/core/kqp/executer_actor/kqp_executer_impl.h +++ b/ydb/core/kqp/executer_actor/kqp_executer_impl.h @@ -732,15 +732,32 @@ protected: task.Meta.ReadInfo.Reverse = op.GetReadRange().GetReverse(); task.Meta.Type = TTaskMeta::TTaskType::Compute; + BuildSinks(stage, task); + LOG_D("Stage " << stageInfo.Id << " create sysview scan task: " << task.Id); } } + void BuildSinks(const NKqpProto::TKqpPhyStage& stage, TKqpTasksGraph::TTaskType& task) { + if (stage.SinksSize() > 0) { + YQL_ENSURE(stage.SinksSize() == 1, "multiple sinks are not supported"); + const auto& sink = stage.GetSinks(0); + YQL_ENSURE(sink.HasExternalSink(), "only external sinks are supported"); + const auto& extSink = sink.GetExternalSink(); + YQL_ENSURE(sink.GetOutputIndex() < task.Outputs.size()); + auto& output = task.Outputs[sink.GetOutputIndex()]; + output.Type = TTaskOutputType::Sink; + output.SinkType = extSink.GetType(); + output.SinkSettings = extSink.GetSettings(); + } + } + void BuildReadTasksFromSource(TStageInfo& stageInfo, TMap<TString, TString> secureParams) { const auto& stage = stageInfo.Meta.GetStage(stageInfo.Id); YQL_ENSURE(stage.GetSources(0).HasExternalSource()); - YQL_ENSURE(stage.InputsSize() == 0 && stage.SourcesSize() == 1, "multiple sources or sources mixed with connections"); + YQL_ENSURE(stage.InputsSize() == 0 && stage.SourcesSize() == 1, + "multiple sources or sources mixed with connections"); const auto& stageSource = stage.GetSources(0); const auto& externalSource = stageSource.GetExternalSource(); @@ -762,6 +779,7 @@ protected: task.Meta.Type = TTaskMeta::TTaskType::Compute; + BuildSinks(stage, task); } } @@ -873,6 +891,8 @@ protected: settings->SetLockTxId(*lockTxId); settings->SetLockNodeId(self.NodeId()); } + + BuildSinks(stage, task); }; if (source.GetSequentialInFlightShards()) { diff --git a/ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp b/ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp index e0ba573bbf..c52c248672 100644 --- a/ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp +++ b/ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp @@ -945,6 +945,14 @@ void FillOutputDesc(const TKqpTasksGraph& tasksGraph, NYql::NDqProto::TTaskOutpu break; } + case TTaskOutputType::Sink: { + auto* sink = outputDesc.MutableSink(); + sink->SetType(output.SinkType); + YQL_ENSURE(output.SinkSettings); + sink->MutableSettings()->CopyFrom(*output.SinkSettings); + break; + } + default: { YQL_ENSURE(false, "Unexpected task output type " << output.Type); } diff --git a/ydb/core/kqp/expr_nodes/kqp_expr_nodes.json b/ydb/core/kqp/expr_nodes/kqp_expr_nodes.json index 622675b009..d208142f54 100644 --- a/ydb/core/kqp/expr_nodes/kqp_expr_nodes.json +++ b/ydb/core/kqp/expr_nodes/kqp_expr_nodes.json @@ -205,9 +205,15 @@ "Match": {"Type": "Callable", "Name": "KqlStreamLookupIndex"} }, { - "Name": "TKqlTableEffect", + "Name": "TKqlEffectBase", "Base": "TExprBase", "Match": {"Type": "CallableBase"}, + "Builder": {"Generate": "None"} + }, + { + "Name": "TKqlTableEffect", + "Base": "TKqlEffectBase", + "Match": {"Type": "CallableBase"}, "Builder": {"Generate": "None"}, "Children": [ {"Index": 0, "Name": "Table", "Type": "TKqpTable"} @@ -289,6 +295,14 @@ "Match": {"Type": "Callable", "Name": "TKqlInsertRowsIndex"} }, { + "Name": "TKqlExternalEffect", + "Base": "TKqlEffectBase", + "Match": {"Type": "Callable", "Name": "KqlExternalEffect"}, + "Children": [ + {"Index": 0, "Name": "Input", "Type": "TExprBase"} + ] + }, + { "Name": "TKqpParamBinding", "Base": "TExprBase", "Match": {"Type": "Tuple"}, @@ -449,6 +463,15 @@ "Match": {"Type": "Callable", "Name": "KqpDeleteRows"} }, { + "Name": "TKqpSinkEffect", + "Base": "TKqlEffectBase", + "Match": {"Type": "Callable", "Name": "KqpSinkEffect"}, + "Children": [ + {"Index": 0, "Name": "Stage", "Type": "TExprBase"}, + {"Index": 1, "Name": "SinkIndex", "Type": "TCoAtom"} + ] + }, + { "Name": "TKqpOlapOperationBase", "Base": "TCallable", "Match": {"Type": "CallableBase"}, diff --git a/ydb/core/kqp/host/kqp_host.cpp b/ydb/core/kqp/host/kqp_host.cpp index 6be649aa8a..1ce9e72f6a 100644 --- a/ydb/core/kqp/host/kqp_host.cpp +++ b/ydb/core/kqp/host/kqp_host.cpp @@ -1527,7 +1527,7 @@ private: auto queryExecutor = MakeIntrusive<TKqpQueryExecutor>(Gateway, Cluster, SessionCtx, KqpRunner); auto kikimrDataSource = CreateKikimrDataSource(*FuncRegistry, *TypesCtx, gatewayProxy, SessionCtx, ExternalSourceFactory, IsInternalCall); - auto kikimrDataSink = CreateKikimrDataSink(*FuncRegistry, *TypesCtx, gatewayProxy, SessionCtx, queryExecutor); + auto kikimrDataSink = CreateKikimrDataSink(*FuncRegistry, *TypesCtx, gatewayProxy, SessionCtx, ExternalSourceFactory, queryExecutor); FillSettings.AllResultsBytesLimit = Nothing(); FillSettings.RowsLimitPerWrite = SessionCtx->Config()._ResultRowsLimit.Get().GetRef(); diff --git a/ydb/core/kqp/host/kqp_host_impl.h b/ydb/core/kqp/host/kqp_host_impl.h index 4cc155552d..b2972b0176 100644 --- a/ydb/core/kqp/host/kqp_host_impl.h +++ b/ydb/core/kqp/host/kqp_host_impl.h @@ -243,7 +243,7 @@ public: }; TIntrusivePtr<IKqpRunner> CreateKqpRunner(TIntrusivePtr<IKqpGateway> gateway, const TString& cluster, - TIntrusivePtr<NYql::TTypeAnnotationContext> typesCtx, TIntrusivePtr<NYql::TKikimrSessionContext> sessionCtx, + const TIntrusivePtr<NYql::TTypeAnnotationContext>& typesCtx, TIntrusivePtr<NYql::TKikimrSessionContext> sessionCtx, const NMiniKQL::IFunctionRegistry& funcRegistry, TIntrusivePtr<ITimeProvider> timeProvider, TIntrusivePtr<IRandomProvider> randomProvider); diff --git a/ydb/core/kqp/host/kqp_runner.cpp b/ydb/core/kqp/host/kqp_runner.cpp index d9f7c0fe34..ecbffe376e 100644 --- a/ydb/core/kqp/host/kqp_runner.cpp +++ b/ydb/core/kqp/host/kqp_runner.cpp @@ -14,6 +14,7 @@ #include <ydb/library/yql/core/type_ann/type_ann_expr.h> #include <ydb/library/yql/utils/log/log.h> #include <ydb/library/yql/core/services/yql_transform_pipeline.h> +#include <ydb/library/yql/core/yql_opt_proposed_by_data.h> #include <util/generic/is_in.h> @@ -58,7 +59,7 @@ private: class TKqpRunner : public IKqpRunner { public: TKqpRunner(TIntrusivePtr<IKqpGateway> gateway, const TString& cluster, - TIntrusivePtr<TTypeAnnotationContext> typesCtx, TIntrusivePtr<TKikimrSessionContext> sessionCtx, + const TIntrusivePtr<TTypeAnnotationContext>& typesCtx, TIntrusivePtr<TKikimrSessionContext> sessionCtx, const NMiniKQL::IFunctionRegistry& funcRegistry, TIntrusivePtr<ITimeProvider> timeProvider, TIntrusivePtr<IRandomProvider> randomProvider) : Gateway(gateway) @@ -82,13 +83,16 @@ public: PhysicalOptimizeTransformer = CreateKqpQueryBlocksTransformer(TTransformationPipeline(typesCtx) .AddServiceTransformers() .Add(TLogExprTransformer::Sync("PhysicalOptimizeTransformer", logComp, logLevel), "LogPhysicalOptimize") + .AddExpressionEvaluation(FuncRegistry) .AddTypeAnnotationTransformer(CreateKqpTypeAnnotationTransformer(Cluster, sessionCtx->TablesPtr(), *typesCtx, Config)) .Add(CreateKqpCheckQueryTransformer(), "CheckKqlQuery") .AddPostTypeAnnotation(/* forSubgraph */ true) .AddCommonOptimization() .Add(CreateKqpLogOptTransformer(OptimizeCtx, *typesCtx, Config), "LogicalOptimize") - .Add(CreateKqpPhyOptTransformer(OptimizeCtx, *typesCtx), "PhysicalOptimize") + .Add(CreateLogicalDataProposalsInspector(*typesCtx), "ProvidersLogicalOptimize") + .Add(CreateKqpPhyOptTransformer(OptimizeCtx, *typesCtx), "KqpPhysicalOptimize") + .Add(CreatePhysicalDataProposalsInspector(*typesCtx), "ProvidersPhysicalOptimize") .Add(CreateKqpFinalizingOptTransformer(OptimizeCtx), "FinalizingOptimize") .Add(CreateKqpQueryPhasesTransformer(), "QueryPhases") .Add(CreateKqpQueryEffectsTransformer(OptimizeCtx), "QueryEffects") @@ -347,7 +351,7 @@ private: } // namespace TIntrusivePtr<IKqpRunner> CreateKqpRunner(TIntrusivePtr<IKqpGateway> gateway, const TString& cluster, - TIntrusivePtr<TTypeAnnotationContext> typesCtx, TIntrusivePtr<TKikimrSessionContext> sessionCtx, + const TIntrusivePtr<TTypeAnnotationContext>& typesCtx, TIntrusivePtr<TKikimrSessionContext> sessionCtx, const NMiniKQL::IFunctionRegistry& funcRegistry, TIntrusivePtr<ITimeProvider> timeProvider, TIntrusivePtr<IRandomProvider> randomProvider) { diff --git a/ydb/core/kqp/host/kqp_type_ann.cpp b/ydb/core/kqp/host/kqp_type_ann.cpp index 7211e6dbf3..d00d3e6da7 100644 --- a/ydb/core/kqp/host/kqp_type_ann.cpp +++ b/ydb/core/kqp/host/kqp_type_ann.cpp @@ -1410,7 +1410,7 @@ TStatus AnnotateSequencerConnection(const TExprNode::TPtr& node, TExprContext& c node->SetTypeAnn(ctx.MakeType<TStreamExprType>(rowType)); return TStatus::Ok; -} +} TStatus AnnotateStreamLookupConnection(const TExprNode::TPtr& node, TExprContext& ctx, const TString& cluster, const TKikimrTablesData& tablesData, bool withSystemColumns) { @@ -1471,6 +1471,32 @@ TStatus AnnotateStreamLookupConnection(const TExprNode::TPtr& node, TExprContext return TStatus::Ok; } +TStatus AnnotateExternalEffect(const TExprNode::TPtr& node, TExprContext& ctx) { + if (!EnsureArgsCount(*node, 1, ctx)) { + return TStatus::Error; + } + + node->SetTypeAnn(node->Child(TKqlExternalEffect::idx_Input)->GetTypeAnn()); + return TStatus::Ok; +} + +TStatus AnnotateKqpSinkEffect(const TExprNode::TPtr& node, TExprContext& ctx) { + if (!EnsureArgsCount(*node, 2, ctx)) { + return TStatus::Error; + } + + if (!TDqStageBase::Match(node->Child(TKqpSinkEffect::idx_Stage))) { + return TStatus::Error; + } + + if (!EnsureAtom(*node->Child(TKqpSinkEffect::idx_SinkIndex), ctx)) { + return TStatus::Error; + } + + node->SetTypeAnn(node->Child(TKqpSinkEffect::idx_Stage)->GetTypeAnn()); + return TStatus::Ok; +} + } // namespace TAutoPtr<IGraphTransformer> CreateKqpTypeAnnotationTransformer(const TString& cluster, @@ -1609,6 +1635,13 @@ TAutoPtr<IGraphTransformer> CreateKqpTypeAnnotationTransformer(const TString& cl return AnnotateKqpSourceSettings(input, ctx, cluster, *tablesData, config->SystemColumnsEnabled()); } + if (TKqlExternalEffect::Match(input.Get())) { + return AnnotateExternalEffect(input, ctx); + } + + if (TKqpSinkEffect::Match(input.Get())) { + return AnnotateKqpSinkEffect(input, ctx); + } return dqTransformer->Transform(input, output, ctx); }); diff --git a/ydb/core/kqp/opt/kqp_opt.cpp b/ydb/core/kqp/opt/kqp_opt.cpp index 8bea802beb..50ece581fd 100644 --- a/ydb/core/kqp/opt/kqp_opt.cpp +++ b/ydb/core/kqp/opt/kqp_opt.cpp @@ -104,4 +104,18 @@ TKqpTable BuildTableMeta(const TKikimrTableDescription& tableDesc, const TPositi return BuildTableMeta(*tableDesc.Metadata, pos, ctx); } +bool IsBuiltEffect(const TExprBase& effect) { + // Stage with effect output + if (effect.Maybe<TDqOutput>()) { + return true; + } + + // Stage with sink effect + if (effect.Maybe<TKqpSinkEffect>()) { + return true; + } + + return false; +} + } // namespace NKikimr::NKqp::NOpt diff --git a/ydb/core/kqp/opt/kqp_opt_build_txs.cpp b/ydb/core/kqp/opt/kqp_opt_build_txs.cpp index 4997f007f9..3054575846 100644 --- a/ydb/core/kqp/opt/kqp_opt_build_txs.cpp +++ b/ydb/core/kqp/opt/kqp_opt_build_txs.cpp @@ -390,6 +390,7 @@ private: .Body(ctx.ReplaceNodes(stage.Program().Body().Ptr(), argsMap)) .Build() .Settings(stage.Settings()) + .Outputs(stage.Outputs()) .Done(); } diff --git a/ydb/core/kqp/opt/kqp_opt_effects.cpp b/ydb/core/kqp/opt/kqp_opt_effects.cpp index 69db639676..74b880ac3d 100644 --- a/ydb/core/kqp/opt/kqp_opt_effects.cpp +++ b/ydb/core/kqp/opt/kqp_opt_effects.cpp @@ -1,6 +1,7 @@ #include "kqp_opt_impl.h" #include <ydb/library/yql/core/yql_opt_utils.h> +#include <ydb/library/yql/dq/integration/yql_dq_integration.h> namespace NKikimr::NKqp::NOpt { @@ -363,63 +364,102 @@ bool BuildDeleteRowsEffect(const TKqlDeleteRows& node, TExprContext& ctx, const return true; } -bool BuildEffects(TPositionHandle pos, const TVector<TKqlTableEffect>& effects, - TExprContext& ctx, const TKqpOptimizeContext& kqpCtx, TVector<TExprBase>& builtEffects) +bool BuildEffects(TPositionHandle pos, const TVector<TExprBase>& effects, + TExprContext& ctx, const TKqpOptimizeContext& kqpCtx, + TVector<TExprBase>& builtEffects) { TVector<TCoArgument> inputArgs; TVector<TExprBase> inputs; TVector<TExprBase> newEffects; + TVector<TExprBase> newSinkEffects; newEffects.reserve(effects.size()); + newSinkEffects.reserve(effects.size()); for (const auto& effect : effects) { - TCoArgument inputArg = Build<TCoArgument>(ctx, pos) - .Name("inputArg") - .Done(); - - TMaybeNode<TExprBase> input; TMaybeNode<TExprBase> newEffect; + bool sinkEffect = false; + YQL_ENSURE(effect.Maybe<TKqlEffectBase>()); + if (effect.Maybe<TKqlTableEffect>()) { + TMaybeNode<TExprBase> input; + TCoArgument inputArg = Build<TCoArgument>(ctx, pos) + .Name("inputArg") + .Done(); + + if (auto maybeUpsertRows = effect.Maybe<TKqlUpsertRows>()) { + if (!BuildUpsertRowsEffect(maybeUpsertRows.Cast(), ctx, kqpCtx, inputArg, input, newEffect)) { + return false; + } + } - if (auto maybeUpsertRows = effect.Maybe<TKqlUpsertRows>()) { - if (!BuildUpsertRowsEffect(maybeUpsertRows.Cast(), ctx, kqpCtx, inputArg, input, newEffect)) { - return false; + if (auto maybeDeleteRows = effect.Maybe<TKqlDeleteRows>()) { + if (!BuildDeleteRowsEffect(maybeDeleteRows.Cast(), ctx, kqpCtx, inputArg, input, newEffect)) { + return false; + } } - } - if (auto maybeDeleteRows = effect.Maybe<TKqlDeleteRows>()) { - if (!BuildDeleteRowsEffect(maybeDeleteRows.Cast(), ctx, kqpCtx, inputArg, input, newEffect)) { + if (input) { + inputArgs.push_back(inputArg); + inputs.push_back(input.Cast()); + } + } else if (auto maybeExt = effect.Maybe<TKqlExternalEffect>()) { + sinkEffect = true; + TKqlExternalEffect externalEffect = maybeExt.Cast(); + TExprBase input = externalEffect.Input(); + auto maybeStage = input.Maybe<TDqStageBase>(); + if (!maybeStage) { return false; } + auto stage = maybeStage.Cast(); + const auto outputsList = stage.Outputs(); + if (!outputsList) { + return false; + } + TDqStageOutputsList outputs = outputsList.Cast(); + YQL_ENSURE(outputs.Size() == 1, "Multiple sinks are not supported yet"); + TDqOutputAnnotationBase output = outputs.Item(0); + if (!output.Maybe<TDqSink>()) { + return false; + } + newEffect = Build<TKqpSinkEffect>(ctx, effect.Pos()) + .Stage(maybeStage.Cast().Ptr()) + .SinkIndex().Build("0") + .Done(); } YQL_ENSURE(newEffect); - newEffects.push_back(newEffect.Cast()); - - if (input) { - inputArgs.push_back(inputArg); - inputs.push_back(input.Cast()); + if (sinkEffect) { + newSinkEffects.push_back(newEffect.Cast()); + } else { + newEffects.push_back(newEffect.Cast()); } } - auto stage = Build<TDqStage>(ctx, pos) - .Inputs() - .Add(inputs) - .Build() - .Program() - .Args(inputArgs) - .Body<TKqpEffects>() - .Add(newEffects) + if (!newEffects.empty()) { + auto stage = Build<TDqStage>(ctx, pos) + .Inputs() + .Add(inputs) .Build() - .Build() - .Settings().Build() - .Done(); - - for (ui32 i = 0; i < newEffects.size(); ++i) { - auto effect = Build<TDqOutput>(ctx, pos) - .Stage(stage) - .Index().Build(ToString(0)) + .Program() + .Args(inputArgs) + .Body<TKqpEffects>() + .Add(newEffects) + .Build() + .Build() + .Settings().Build() .Done(); - builtEffects.push_back(effect); + for (ui32 i = 0; i < newEffects.size(); ++i) { + auto effect = Build<TDqOutput>(ctx, pos) + .Stage(stage) + .Index().Build(ToString(0)) + .Done(); + + builtEffects.push_back(effect); + } + } + + if (!newSinkEffects.empty()) { + builtEffects.insert(builtEffects.end(), newSinkEffects.begin(), newSinkEffects.end()); } return true; @@ -432,20 +472,20 @@ TMaybeNode<TKqlQuery> BuildEffects(const TKqlQuery& query, TExprContext& ctx, TVector<TExprBase> builtEffects; if constexpr (GroupEffectsByTable) { - TMap<TStringBuf, TVector<TKqlTableEffect>> tableEffectsMap; + TMap<TStringBuf, TVector<TExprBase>> tableEffectsMap; for (const auto& maybeEffect : query.Effects()) { if (const auto maybeList = maybeEffect.Maybe<TExprList>()) { for (const auto effect : maybeList.Cast()) { YQL_ENSURE(effect.Maybe<TKqlTableEffect>()); auto tableEffect = effect.Cast<TKqlTableEffect>(); - tableEffectsMap[tableEffect.Table().Path()].push_back(tableEffect); + tableEffectsMap[tableEffect.Table().Path()].push_back(effect); } } else { YQL_ENSURE(maybeEffect.Maybe<TKqlTableEffect>()); auto tableEffect = maybeEffect.Cast<TKqlTableEffect>(); - tableEffectsMap[tableEffect.Table().Path()].push_back(tableEffect); + tableEffectsMap[tableEffect.Table().Path()].push_back(maybeEffect); } } @@ -460,18 +500,12 @@ TMaybeNode<TKqlQuery> BuildEffects(const TKqlQuery& query, TExprContext& ctx, for (const auto& maybeEffect : query.Effects()) { if (const auto maybeList = maybeEffect.Maybe<TExprList>()) { for (const auto effect : maybeList.Cast()) { - YQL_ENSURE(effect.Maybe<TKqlTableEffect>()); - auto tableEffect = effect.Cast<TKqlTableEffect>(); - - if (!BuildEffects(query.Pos(), {tableEffect}, ctx, kqpCtx, builtEffects)) { + if (!BuildEffects(query.Pos(), {effect}, ctx, kqpCtx, builtEffects)) { return {}; } } } else { - YQL_ENSURE(maybeEffect.Maybe<TKqlTableEffect>()); - auto tableEffect = maybeEffect.Cast<TKqlTableEffect>(); - - if (!BuildEffects(query.Pos(), {tableEffect}, ctx, kqpCtx, builtEffects)) { + if (!BuildEffects(query.Pos(), {maybeEffect}, ctx, kqpCtx, builtEffects)) { return {}; } } @@ -502,7 +536,7 @@ TAutoPtr<IGraphTransformer> CreateKqpQueryEffectsTransformer(const TIntrusivePtr bool requireBuild = false; bool hasBuilt = false; for (const auto& effect : query.Effects()) { - if (!effect.Maybe<TDqOutput>()) { + if (!IsBuiltEffect(effect)) { requireBuild = true; } else { hasBuilt = true; diff --git a/ydb/core/kqp/opt/kqp_opt_impl.h b/ydb/core/kqp/opt/kqp_opt_impl.h index 1b6c528e9d..8370b0994b 100644 --- a/ydb/core/kqp/opt/kqp_opt_impl.h +++ b/ydb/core/kqp/opt/kqp_opt_impl.h @@ -60,4 +60,6 @@ TVector<std::pair<NYql::TExprNode::TPtr, const NYql::TIndexDescription*>> BuildS const std::function<NYql::NNodes::TExprBase (const NYql::TKikimrTableMetadata&, NYql::TPositionHandle, NYql::TExprContext&)>& tableBuilder); +bool IsBuiltEffect(const NYql::NNodes::TExprBase& effect); + } // namespace NKikimr::NKqp::NOpt diff --git a/ydb/core/kqp/opt/kqp_opt_kql.cpp b/ydb/core/kqp/opt/kqp_opt_kql.cpp index 2ac4bc8ae8..1c25cc2ad4 100644 --- a/ydb/core/kqp/opt/kqp_opt_kql.cpp +++ b/ydb/core/kqp/opt/kqp_opt_kql.cpp @@ -235,7 +235,7 @@ TCoAtomList BuildUpsertInputColumns(const TCoAtomList& inputColumns, for(const auto& item: inputColumns) { result.push_back(item.Ptr()); } - + for(const auto& item: autoincrement) { result.push_back(item.Ptr()); } @@ -259,7 +259,7 @@ std::pair<TExprBase, TCoAtomList> BuildWriteInput(const TKiWriteTable& write, co } if (isWriteReplace) { - std::tie(input, inputCols) = CreateRowsToReplace(input, inputColumns, table, write.Pos(), ctx); + std::tie(input, inputCols) = CreateRowsToReplace(input, inputColumns, table, write.Pos(), ctx); } auto baseInput = Build<TKqpWriteConstraint>(ctx, pos) @@ -816,6 +816,32 @@ TVector<TExprBase> HandleDeleteTable(const TKiDeleteTable& del, TExprContext& ct } } +TExprNode::TPtr HandleExternalWrite(const TCallable& effect, TExprContext& ctx, TTypeAnnotationContext& typesCtx) { + if (effect.Ref().ChildrenSize() <= 1) { + return {}; + } + // As a rule data sink is a second child for all write callables + TExprBase dataSinkArg(effect.Ref().Child(1)); + if (auto maybeDataSink = dataSinkArg.Maybe<TCoDataSink>()) { + TStringBuf dataSinkCategory = maybeDataSink.Cast().Category(); + auto dataSinkProviderIt = typesCtx.DataSinkMap.find(dataSinkCategory); + if (dataSinkProviderIt != typesCtx.DataSinkMap.end()) { + if (auto* dqIntegration = dataSinkProviderIt->second->GetDqIntegration()) { + if (auto canWrite = dqIntegration->CanWrite(*effect.Raw(), ctx)) { + YQL_ENSURE(*canWrite, "Erros handling write"); + if (auto result = dqIntegration->WrapWrite(effect.Ptr(), ctx)) { + return Build<TKqlExternalEffect>(ctx, effect.Pos()) + .Input(result) + .Done() + .Ptr(); + } + } + } + } + } + return {}; +} + } // namespace const TKikimrTableDescription& GetTableData(const TKikimrTablesData& tablesData, @@ -836,12 +862,12 @@ TIntrusivePtr<TKikimrTableMetadata> GetIndexMetadata(const TKqlReadTableIndex& r } TMaybe<TKqlQueryList> BuildKqlQuery(TKiDataQueryBlocks dataQueryBlocks, const TKikimrTablesData& tablesData, - TExprContext& ctx, bool withSystemColumns, const TIntrusivePtr<TKqpOptimizeContext>& kqpCtx, NYql::TTypeAnnotationContext& typesCtx) + TExprContext& ctx, bool withSystemColumns, const TIntrusivePtr<TKqpOptimizeContext>& kqpCtx, TTypeAnnotationContext& typesCtx) { TVector<TKqlQuery> queryBlocks; queryBlocks.reserve(dataQueryBlocks.ArgCount()); for (const auto& block : dataQueryBlocks) { - TVector <TExprBase> kqlEffects; + TVector<TExprBase> kqlEffects; for (const auto& effect : block.Effects()) { if (auto maybeWrite = effect.Maybe<TKiWriteTable>()) { auto result = HandleWriteTable(maybeWrite.Cast(), ctx, tablesData, kqpCtx); @@ -861,9 +887,13 @@ TMaybe<TKqlQueryList> BuildKqlQuery(TKiDataQueryBlocks dataQueryBlocks, const TK auto results = HandleDeleteTable(maybeDelete.Cast(), ctx, tablesData, withSystemColumns, kqpCtx); kqlEffects.insert(kqlEffects.end(), results.begin(), results.end()); } + + if (TExprNode::TPtr result = HandleExternalWrite(effect, ctx, typesCtx)) { + kqlEffects.emplace_back(result); + } } - TVector <TKqlQueryResult> kqlResults; + TVector<TKqlQueryResult> kqlResults; kqlResults.reserve(block.Results().Size()); for (const auto& kiResult : block.Results()) { kqlResults.emplace_back( @@ -888,7 +918,7 @@ TMaybe<TKqlQueryList> BuildKqlQuery(TKiDataQueryBlocks dataQueryBlocks, const TK TOptimizeExprSettings optSettings(nullptr); optSettings.VisitChanges = true; auto status = OptimizeExpr(queryBlock.Ptr(), optResult, - [&tablesData, withSystemColumns, &kqpCtx, &typesCtx](const TExprNode::TPtr& input, TExprContext &ctx) { + [&tablesData, withSystemColumns, &kqpCtx, &typesCtx](const TExprNode::TPtr& input, TExprContext& ctx) { auto node = TExprBase(input); TExprNode::TPtr effect; diff --git a/ydb/core/kqp/opt/kqp_opt_phy_check.cpp b/ydb/core/kqp/opt/kqp_opt_phy_check.cpp index 90ab8da99a..6353167edc 100644 --- a/ydb/core/kqp/opt/kqp_opt_phy_check.cpp +++ b/ydb/core/kqp/opt/kqp_opt_phy_check.cpp @@ -35,7 +35,7 @@ TAutoPtr<IGraphTransformer> CreateKqpCheckPhysicalQueryTransformer() { } for (const auto& effect : query.Effects()) { - if (!effect.Maybe<TDqOutput>()) { + if (!IsBuiltEffect(effect)) { ctx.AddError(TIssue(ctx.GetPosition(effect.Pos()), "Failed to build query effects.")); return TStatus::Error; } @@ -86,10 +86,12 @@ TAutoPtr<IGraphTransformer> CreateKqpCheckPhysicalQueryTransformer() { YQL_ENSURE(stageType); auto stageResultType = stageType->Cast<TTupleExprType>(); const auto& stageConsumers = GetConsumers(stage, parentsMap); + bool stageWithResult = false; TDynBitMap usedOutputs; for (auto consumer : stageConsumers) { if (auto maybeOutput = TExprBase(consumer).Maybe<TDqOutput>()) { + stageWithResult = true; auto output = maybeOutput.Cast(); auto outputIndex = FromString<ui32>(output.Index().Value()); if (usedOutputs.Test(outputIndex)) { @@ -100,18 +102,38 @@ TAutoPtr<IGraphTransformer> CreateKqpCheckPhysicalQueryTransformer() { } usedOutputs.Set(outputIndex); } else { - YQL_ENSURE(false, "Stage #" << PrintKqpStageOnly(stage, ctx) << " has unexpected consumer: " - << consumer->Content()); + // There can be also an effect with stage that has dq sinks + // Check the following structure: + // TKqlQuery (tuple with 2 elems) - results and effects + auto stageParentsIt = parentsMap.find(stage.Raw()); + YQL_ENSURE(stageParentsIt != parentsMap.end()); + if (stageParentsIt->second.size() != 1) { + hasMultipleConsumers = true; + } else { + const TExprNode* effectNode = *stageParentsIt->second.begin(); + auto effectParentIt = parentsMap.find(effectNode); + YQL_ENSURE(effectParentIt != parentsMap.end()); + if (effectParentIt->second.size() != 1) { + hasMultipleConsumers = true; + } else { + const TExprNode* queryNode = *effectParentIt->second.begin(); + YQL_ENSURE(queryNode->GetTypeAnn()->GetKind() == ETypeAnnotationKind::Tuple, + "Stage #" << PrintKqpStageOnly(stage, ctx) << " has unexpected consumer: " + << consumer->Content()); + } + } } } - for (size_t i = 0; i < stageResultType->GetSize(); ++i) { - if (!usedOutputs.Test(i)) { - hasBrokenStage = true; - YQL_CLOG(ERROR, ProviderKqp) << "Stage #" << PrintKqpStageOnly(stage, ctx) - << ", output " << i << " (" << FormatType(stageResultType->GetItems()[i]) << ")" - << " not used"; - return false; + if (stageWithResult) { + for (size_t i = 0; i < stageResultType->GetSize(); ++i) { + if (!usedOutputs.Test(i)) { + hasBrokenStage = true; + YQL_CLOG(ERROR, ProviderKqp) << "Stage #" << PrintKqpStageOnly(stage, ctx) + << ", output " << i << " (" << FormatType(stageResultType->GetItems()[i]) << ")" + << " not used"; + return false; + } } } } diff --git a/ydb/core/kqp/opt/kqp_query_plan.cpp b/ydb/core/kqp/opt/kqp_query_plan.cpp index b29c09596a..543047c883 100644 --- a/ydb/core/kqp/opt/kqp_query_plan.cpp +++ b/ydb/core/kqp/opt/kqp_query_plan.cpp @@ -190,10 +190,15 @@ public: } for (const auto& stage: Tx.Stages()) { - if (stage.Cast<TDqStageBase>().Program().Body().Maybe<TKqpEffects>()) { - auto &planNode = AddPlanNode(phaseNode); + TDqStageBase stageBase = stage.Cast<TDqStageBase>(); + if (stageBase.Program().Body().Maybe<TKqpEffects>()) { + auto& planNode = AddPlanNode(phaseNode); planNode.TypeName = "Effect"; Visit(TExprBase(stage), planNode); + } else if (stageBase.Outputs()) { // Sink + auto& planNode = AddPlanNode(phaseNode); + planNode.TypeName = "Sink"; + Visit(TExprBase(stage), planNode); } } @@ -884,7 +889,7 @@ private: Visit(settings.Cast(), stagePlanNode); } else { TOperator op; - op.Properties["Name"] = TString(source.Cast().DataSource().Cast<TCoDataSource>().Category().Value()); + op.Properties["Name"] = source.Cast().DataSource().Cast<TCoDataSource>().Category().StringValue(); AddOperator(stagePlanNode, "Source", op); } } else { @@ -896,6 +901,16 @@ private: Visit(inputCn.Output().Stage(), inputPlanNode); } } + + if (auto outputs = expr.Cast<TDqStageBase>().Outputs()) { + for (auto output : outputs.Cast()) { + if (output.Maybe<TDqSink>()) { + TOperator op; + op.Properties["Name"] = output.DataSink().Cast<TCoDataSink>().Category().StringValue(); + AddOperator(stagePlanNode, "Sink", op); + } + } + } } else { Visit(expr.Ptr(), planNode); diff --git a/ydb/core/kqp/opt/peephole/kqp_opt_peephole.cpp b/ydb/core/kqp/opt/peephole/kqp_opt_peephole.cpp index 1a1068757d..16b322b860 100644 --- a/ydb/core/kqp/opt/peephole/kqp_opt_peephole.cpp +++ b/ydb/core/kqp/opt/peephole/kqp_opt_peephole.cpp @@ -250,6 +250,7 @@ TMaybeNode<TKqpPhysicalTx> PeepholeOptimize(const TKqpPhysicalTx& tx, TExprConte .Inputs(ctx.ReplaceNodes(stage.Inputs().Ptr(), stagesMap)) .Program(ctx.DeepCopyLambda(TKqpProgram(newProgram).Lambda().Ref())) .Settings(stage.Settings()) + .Outputs(stage.Outputs()) .Done(); stages.emplace_back(newStage); diff --git a/ydb/core/kqp/provider/CMakeLists.darwin-x86_64.txt b/ydb/core/kqp/provider/CMakeLists.darwin-x86_64.txt index 4b843449f5..324ba1b41a 100644 --- a/ydb/core/kqp/provider/CMakeLists.darwin-x86_64.txt +++ b/ydb/core/kqp/provider/CMakeLists.darwin-x86_64.txt @@ -36,6 +36,7 @@ target_link_libraries(core-kqp-provider PUBLIC public-lib-scheme_types cpp-client-ydb_topic yql-core-expr_nodes + yql-core-peephole_opt parser-pg_wrapper-interface providers-common-codec providers-common-config diff --git a/ydb/core/kqp/provider/CMakeLists.linux-aarch64.txt b/ydb/core/kqp/provider/CMakeLists.linux-aarch64.txt index 46d277a850..125cf43faa 100644 --- a/ydb/core/kqp/provider/CMakeLists.linux-aarch64.txt +++ b/ydb/core/kqp/provider/CMakeLists.linux-aarch64.txt @@ -37,6 +37,7 @@ target_link_libraries(core-kqp-provider PUBLIC public-lib-scheme_types cpp-client-ydb_topic yql-core-expr_nodes + yql-core-peephole_opt parser-pg_wrapper-interface providers-common-codec providers-common-config diff --git a/ydb/core/kqp/provider/CMakeLists.linux-x86_64.txt b/ydb/core/kqp/provider/CMakeLists.linux-x86_64.txt index 46d277a850..125cf43faa 100644 --- a/ydb/core/kqp/provider/CMakeLists.linux-x86_64.txt +++ b/ydb/core/kqp/provider/CMakeLists.linux-x86_64.txt @@ -37,6 +37,7 @@ target_link_libraries(core-kqp-provider PUBLIC public-lib-scheme_types cpp-client-ydb_topic yql-core-expr_nodes + yql-core-peephole_opt parser-pg_wrapper-interface providers-common-codec providers-common-config diff --git a/ydb/core/kqp/provider/CMakeLists.windows-x86_64.txt b/ydb/core/kqp/provider/CMakeLists.windows-x86_64.txt index 4b843449f5..324ba1b41a 100644 --- a/ydb/core/kqp/provider/CMakeLists.windows-x86_64.txt +++ b/ydb/core/kqp/provider/CMakeLists.windows-x86_64.txt @@ -36,6 +36,7 @@ target_link_libraries(core-kqp-provider PUBLIC public-lib-scheme_types cpp-client-ydb_topic yql-core-expr_nodes + yql-core-peephole_opt parser-pg_wrapper-interface providers-common-codec providers-common-config diff --git a/ydb/core/kqp/provider/ya.make b/ydb/core/kqp/provider/ya.make index c25d4d5aa8..e49a6415fc 100644 --- a/ydb/core/kqp/provider/ya.make +++ b/ydb/core/kqp/provider/ya.make @@ -35,6 +35,7 @@ PEERDIR( ydb/public/lib/scheme_types ydb/public/sdk/cpp/client/ydb_topic ydb/library/yql/core/expr_nodes + ydb/library/yql/core/peephole_opt ydb/library/yql/parser/pg_wrapper/interface ydb/library/yql/providers/common/codec ydb/library/yql/providers/common/config diff --git a/ydb/core/kqp/provider/yql_kikimr_datasink.cpp b/ydb/core/kqp/provider/yql_kikimr_datasink.cpp index 8d64d840cd..3bf5acc332 100644 --- a/ydb/core/kqp/provider/yql_kikimr_datasink.cpp +++ b/ydb/core/kqp/provider/yql_kikimr_datasink.cpp @@ -5,6 +5,8 @@ #include <ydb/library/yql/core/yql_expr_optimize.h> +#include <ydb/library/yql/utils/log/log.h> + namespace NYql { namespace { @@ -340,13 +342,15 @@ public: TTypeAnnotationContext& types, TIntrusivePtr<IKikimrGateway> gateway, TIntrusivePtr<TKikimrSessionContext> sessionCtx, + const NExternalSource::IExternalSourceFactory::TPtr& externalSourceFactory, TIntrusivePtr<IKikimrQueryExecutor> queryExecutor) : FunctionRegistry(functionRegistry) , Types(types) , Gateway(gateway) , SessionCtx(sessionCtx) + , ExternalSourceFactory(externalSourceFactory) , IntentDeterminationTransformer(CreateKiSinkIntentDeterminationTransformer(sessionCtx)) - , TypeAnnotationTransformer(CreateKiSinkTypeAnnotationTransformer(gateway, sessionCtx)) + , TypeAnnotationTransformer(CreateKiSinkTypeAnnotationTransformer(gateway, sessionCtx, types)) , LogicalOptProposalTransformer(CreateKiLogicalOptProposalTransformer(sessionCtx, types)) , PhysicalOptProposalTransformer(CreateKiPhysicalOptProposalTransformer(sessionCtx)) , CallableExecutionTransformer(CreateKiSinkCallableExecutionTransformer(gateway, sessionCtx, queryExecutor)) @@ -496,6 +500,57 @@ public: .Ptr(); } + TExprNode::TPtr RewriteIOExternal(const TKikimrKey& key, const TExprNode::TPtr& node, TExprContext& ctx) { + TKiDataSink dataSink(node->ChildPtr(1)); + auto& tableDesc = SessionCtx->Tables().GetTable(TString{dataSink.Cluster()}, key.GetTablePath()); + if (!tableDesc.Metadata || tableDesc.Metadata->Kind != EKikimrTableKind::External) { + return nullptr; + } + + if (tableDesc.Metadata->ExternalSource.SourceType != ESourceType::ExternalDataSource && tableDesc.Metadata->ExternalSource.SourceType != ESourceType::ExternalTable) { + YQL_CVLOG(NLog::ELevel::ERROR, NLog::EComponent::ProviderKikimr) << "Skip RewriteIO for external entity: unknown entity type: " << (int)tableDesc.Metadata->ExternalSource.SourceType; + return nullptr; + } + + ctx.Step.Repeat(TExprStep::DiscoveryIO) + .Repeat(TExprStep::Epochs) + .Repeat(TExprStep::Intents) + .Repeat(TExprStep::LoadTablesMetadata) + .Repeat(TExprStep::RewriteIO); + + const auto& externalSource = ExternalSourceFactory->GetOrCreate(tableDesc.Metadata->ExternalSource.Type); + if (tableDesc.Metadata->ExternalSource.SourceType == ESourceType::ExternalDataSource) { + auto writeArgs = node->ChildrenList(); + writeArgs[1] = Build<TCoDataSink>(ctx, node->Pos()) + .Category(ctx.NewAtom(node->Pos(), externalSource->GetName())) + .FreeArgs() + .Add(writeArgs[1]->ChildrenList()[1]) + .Build() + .Done().Ptr(); + return ctx.ChangeChildren(*node, std::move(writeArgs)); + } else { // tableDesc.Metadata->ExternalSource.SourceType == ESourceType::ExternalTable + TExprNode::TPtr path = ctx.NewCallable(node->Pos(), "String", { ctx.NewAtom(node->Pos(), tableDesc.Metadata->ExternalSource.TableLocation) }); + auto table = ctx.NewList(node->Pos(), {ctx.NewAtom(node->Pos(), "table"), path}); + auto keyNode = ctx.NewCallable(node->Pos(), "Key", {table}); + auto r = Build<TCoWrite>(ctx, node->Pos()) + .World(node->Child(0)) + .DataSink() + .Category(ctx.NewAtom(node->Pos(), externalSource->GetName())) + .FreeArgs() + .Add(ctx.NewAtom(node->Pos(), tableDesc.Metadata->ExternalSource.DataSourcePath)) + .Build() + .Build() + .FreeArgs() + .Add(keyNode) + .Add(node->Child(3)) + .Add(BuildExternalTableSettings(node->Pos(), ctx, tableDesc.Metadata->Columns, externalSource, tableDesc.Metadata->ExternalSource.TableContent)) + .Build() + .Done().Ptr(); + return r; + } + return nullptr; + } + TExprNode::TPtr RewriteIO(const TExprNode::TPtr& node, TExprContext& ctx) override { YQL_ENSURE(node->IsCallable(WriteName), "Expected Write!, got: " << node->Content()); @@ -504,6 +559,10 @@ public: switch (key.GetKeyType()) { case TKikimrKey::Type::Table: { + if (TExprNode::TPtr resultNode = RewriteIOExternal(key, node, ctx)) { + return resultNode; + } + NCommon::TWriteTableSettings settings = NCommon::ParseWriteTableSettings(TExprList(node->Child(4)), ctx); YQL_ENSURE(settings.Mode); auto mode = settings.Mode.Cast(); @@ -837,6 +896,7 @@ private: const TTypeAnnotationContext& Types; TIntrusivePtr<IKikimrGateway> Gateway; TIntrusivePtr<TKikimrSessionContext> SessionCtx; + NExternalSource::IExternalSourceFactory::TPtr ExternalSourceFactory; TAutoPtr<IGraphTransformer> IntentDeterminationTransformer; TAutoPtr<IGraphTransformer> TypeAnnotationTransformer; @@ -970,9 +1030,10 @@ TIntrusivePtr<IDataProvider> CreateKikimrDataSink( TTypeAnnotationContext& types, TIntrusivePtr<IKikimrGateway> gateway, TIntrusivePtr<TKikimrSessionContext> sessionCtx, + const NExternalSource::IExternalSourceFactory::TPtr& externalSourceFactory, TIntrusivePtr<IKikimrQueryExecutor> queryExecutor) { - return new TKikimrDataSink(functionRegistry, types, gateway, sessionCtx, queryExecutor); + return new TKikimrDataSink(functionRegistry, types, gateway, sessionCtx, externalSourceFactory, queryExecutor); } TAutoPtr<IGraphTransformer> CreateKiSinkIntentDeterminationTransformer( diff --git a/ydb/core/kqp/provider/yql_kikimr_datasource.cpp b/ydb/core/kqp/provider/yql_kikimr_datasource.cpp index dbe344fce2..d18ac31c98 100644 --- a/ydb/core/kqp/provider/yql_kikimr_datasource.cpp +++ b/ydb/core/kqp/provider/yql_kikimr_datasource.cpp @@ -16,6 +16,48 @@ #include <util/generic/is_in.h> namespace NYql { + +static Ydb::Type CreateYdbType(const NKikimr::NScheme::TTypeInfo& typeInfo, bool notNull) { + Ydb::Type ydbType; + if (typeInfo.GetTypeId() == NKikimr::NScheme::NTypeIds::Pg) { + auto* typeDesc = typeInfo.GetTypeDesc(); + auto* pg = ydbType.mutable_pg_type(); + pg->set_type_name(NKikimr::NPg::PgTypeNameFromTypeDesc(typeDesc)); + pg->set_oid(NKikimr::NPg::PgTypeIdFromTypeDesc(typeDesc)); + } else { + auto& item = notNull + ? ydbType + : *ydbType.mutable_optional_type()->mutable_item(); + item.set_type_id((Ydb::Type::PrimitiveTypeId)typeInfo.GetTypeId()); + } + return ydbType; +} + +TExprNode::TPtr BuildExternalTableSettings(TPositionHandle pos, TExprContext& ctx, const TMap<TString, NYql::TKikimrColumnMetadata>& columns, const NKikimr::NExternalSource::IExternalSource::TPtr& source, const TString& content) { + TVector<std::pair<TString, const NYql::TTypeAnnotationNode*>> typedColumns; + typedColumns.reserve(columns.size()); + for (const auto& [n, c] : columns) { + NYdb::TTypeParser parser(NYdb::TType(CreateYdbType(c.TypeInfo, c.NotNull))); + auto type = NFq::MakeType(parser, ctx); + typedColumns.emplace_back(n, type); + } + + const TString ysonSchema = NYql::NCommon::WriteTypeToYson(NFq::MakeStructType(typedColumns, ctx), NYson::EYsonFormat::Text); + TExprNode::TListType items; + auto schema = ctx.NewAtom(pos, ysonSchema); + auto type = ctx.NewCallable(pos, "SqlTypeFromYson"sv, { schema }); + auto order = ctx.NewCallable(pos, "SqlColumnOrderFromYson"sv, { schema }); + auto userSchema = ctx.NewAtom(pos, "userschema"sv); + items.emplace_back(ctx.NewList(pos, {userSchema, type, order})); + + for (const auto& [key, value]: source->GetParameters(content)) { + auto keyAtom = ctx.NewAtom(pos, NormalizeName(key)); + auto valueAtom = ctx.NewAtom(pos, value); + items.emplace_back(ctx.NewList(pos, {keyAtom, valueAtom})); + } + return ctx.NewList(pos, std::move(items)); +} + namespace { using namespace NKikimr; @@ -368,7 +410,7 @@ public: , IntentDeterminationTransformer(new TKiSourceIntentDeterminationTransformer(sessionCtx)) , LoadTableMetadataTransformer(CreateKiSourceLoadTableMetadataTransformer(gateway, sessionCtx, types, externalSourceFactory, isInternalCall)) , TypeAnnotationTransformer(CreateKiSourceTypeAnnotationTransformer(sessionCtx, types)) - , CallableExecutionTransformer(CreateKiSourceCallableExecutionTransformer(gateway, sessionCtx)) + , CallableExecutionTransformer(CreateKiSourceCallableExecutionTransformer(gateway, sessionCtx, types)) { Y_UNUSED(FunctionRegistry); @@ -551,47 +593,6 @@ public: return false; } - static Ydb::Type CreateYdbType(const NScheme::TTypeInfo& typeInfo, bool notNull) { - Ydb::Type ydbType; - if (typeInfo.GetTypeId() == NScheme::NTypeIds::Pg) { - auto* typeDesc = typeInfo.GetTypeDesc(); - auto* pg = ydbType.mutable_pg_type(); - pg->set_type_name(NKikimr::NPg::PgTypeNameFromTypeDesc(typeDesc)); - pg->set_oid(NKikimr::NPg::PgTypeIdFromTypeDesc(typeDesc)); - } else { - auto& item = notNull - ? ydbType - : *ydbType.mutable_optional_type()->mutable_item(); - item.set_type_id((Ydb::Type::PrimitiveTypeId)typeInfo.GetTypeId()); - } - return ydbType; - } - - TExprNode::TPtr BuildSettings(TPositionHandle pos, TExprContext& ctx, const TMap<TString, NYql::TKikimrColumnMetadata>& columns, const NExternalSource::IExternalSource::TPtr& source, const TString& content) { - TVector<std::pair<TString, const NYql::TTypeAnnotationNode*>> typedColumns; - typedColumns.reserve(columns.size()); - for (const auto& [n, c] : columns) { - NYdb::TTypeParser parser(NYdb::TType(CreateYdbType(c.TypeInfo, c.NotNull))); - auto type = NFq::MakeType(parser, ctx); - typedColumns.emplace_back(n, type); - } - - const TString ysonSchema = NYql::NCommon::WriteTypeToYson(NFq::MakeStructType(typedColumns, ctx), NYson::EYsonFormat::Text); - TExprNode::TListType items; - auto schema = ctx.NewAtom(pos, ysonSchema); - auto type = ctx.NewCallable(pos, "SqlTypeFromYson"sv, { schema }); - auto order = ctx.NewCallable(pos, "SqlColumnOrderFromYson"sv, { schema }); - auto userSchema = ctx.NewAtom(pos, "userschema"sv); - items.emplace_back(ctx.NewList(pos, {userSchema, type, order})); - - for (const auto& [key, value]: source->GetParameters(content)) { - auto keyAtom = ctx.NewAtom(pos, NormalizeName(key)); - auto valueAtom = ctx.NewAtom(pos, value); - items.emplace_back(ctx.NewList(pos, {keyAtom, valueAtom})); - } - return ctx.NewList(pos, std::move(items)); - } - TExprNode::TPtr RewriteIO(const TExprNode::TPtr& node, TExprContext& ctx) override { auto read = node->Child(0); if (!read->IsCallable(ReadName)) { @@ -664,7 +665,7 @@ public: .FreeArgs() .Add(ctx.NewCallable(node->Pos(), "MrTableConcat", {key})) .Add(ctx.NewCallable(node->Pos(), "Void", {})) - .Add(BuildSettings(node->Pos(), ctx, tableDesc.Metadata->Columns, source, tableDesc.Metadata->ExternalSource.TableContent)) + .Add(BuildExternalTableSettings(node->Pos(), ctx, tableDesc.Metadata->Columns, source, tableDesc.Metadata->ExternalSource.TableContent)) .Build() .Done().Ptr(); auto retChildren = node->ChildrenList(); diff --git a/ydb/core/kqp/provider/yql_kikimr_exec.cpp b/ydb/core/kqp/provider/yql_kikimr_exec.cpp index ec0792a718..1557ede0b7 100644 --- a/ydb/core/kqp/provider/yql_kikimr_exec.cpp +++ b/ydb/core/kqp/provider/yql_kikimr_exec.cpp @@ -499,19 +499,32 @@ private: }; class TKiSourceCallableExecutionTransformer : public TAsyncCallbackTransformer<TKiSourceCallableExecutionTransformer> { - private: + IGraphTransformer::TStatus PeepHole(TExprNode::TPtr input, TExprNode::TPtr& output, TExprContext& ctx) const { + TPeepholeSettings peepholeSettings; + bool hasNonDeterministicFunctions; + auto status = PeepHoleOptimizeNode(input, output, ctx, TypesCtx, nullptr, hasNonDeterministicFunctions, peepholeSettings); + if (status.Level != TStatus::Ok) { + ctx.AddError(TIssue(ctx.GetPosition(output->Pos()), TString("Peephole optimization failed for Dq stage"))); + return status; + } + return status; + } - TString GetLambdaBody(TExprNode::TPtr resInput, NKikimrMiniKQL::TType& resultType, TExprContext& ctx) { - + IGraphTransformer::TStatus GetLambdaBody(TExprNode::TPtr resInput, NKikimrMiniKQL::TType& resultType, TExprContext& ctx, TString& lambda) { auto pos = resInput->Pos(); auto typeAnn = resInput->GetTypeAnn(); const auto kind = resInput->GetTypeAnn()->GetKind(); - const bool data = kind != ETypeAnnotationKind::Flow && kind != ETypeAnnotationKind::List && kind != ETypeAnnotationKind::Stream && kind != ETypeAnnotationKind::Optional; + const bool data = kind != ETypeAnnotationKind::Flow && kind != ETypeAnnotationKind::Stream && kind != ETypeAnnotationKind::Optional; auto node = ctx.WrapByCallableIf(kind != ETypeAnnotationKind::Stream, "ToStream", ctx.WrapByCallableIf(data, "Just", std::move(resInput))); + auto peepHoleStatus = PeepHole(node, node, ctx); + if (peepHoleStatus.Level != IGraphTransformer::TStatus::Ok) { + return peepHoleStatus; + } + auto guard = Guard(SessionCtx->Query().QueryData->GetAllocState()->Alloc); auto input = Build<TDqPhyStage>(ctx, pos) @@ -530,7 +543,7 @@ private: TVector<TExprBase> fakeReads; auto paramsType = NDq::CollectParameters(programLambda, ctx); - auto lambda = NDq::BuildProgram( + lambda = NDq::BuildProgram( programLambda, *paramsType, compiler, SessionCtx->Query().QueryData->GetAllocState()->TypeEnv, *SessionCtx->Query().QueryData->GetAllocState()->HolderFactory.GetFunctionRegistry(), ctx, fakeReads); @@ -542,7 +555,7 @@ private: auto type = NYql::NCommon::BuildType(*typeAnn, programBuilder, errorStream); ExportTypeToProto(type, resultType); - return lambda; + return IGraphTransformer::TStatus::Ok; } TString EncodeResultToYson(const NKikimrMiniKQL::TResult& result, bool& truncated) { @@ -563,9 +576,11 @@ private: public: TKiSourceCallableExecutionTransformer(TIntrusivePtr<IKikimrGateway> gateway, - TIntrusivePtr<TKikimrSessionContext> sessionCtx) + TIntrusivePtr<TKikimrSessionContext> sessionCtx, + TTypeAnnotationContext& types) : Gateway(gateway) - , SessionCtx(sessionCtx) {} + , SessionCtx(sessionCtx) + , TypesCtx(types) {} std::pair<TStatus, TAsyncTransformCallbackFuture> CallbackTransform(const TExprNode::TPtr& input, TExprNode::TPtr& output, TExprContext& ctx) @@ -588,7 +603,11 @@ public: if (input->Content() == "Result") { auto result = TMaybeNode<TResult>(input).Cast(); NKikimrMiniKQL::TType resultType; - auto program = GetLambdaBody(result.Input().Ptr(), resultType, ctx); + TString program; + TStatus status = GetLambdaBody(result.Input().Ptr(), resultType, ctx, program); + if (status.Level != TStatus::Ok) { + return SyncStatus(status); + } auto asyncResult = Gateway->ExecuteLiteral(program, resultType, SessionCtx->Query().QueryData->GetAllocState()); return std::make_pair(IGraphTransformer::TStatus::Async, asyncResult.Apply( @@ -745,6 +764,7 @@ private: private: TIntrusivePtr<IKikimrGateway> Gateway; TIntrusivePtr<TKikimrSessionContext> SessionCtx; + TTypeAnnotationContext& TypesCtx; }; template <class TKiObject, class TSettings> @@ -1975,9 +1995,10 @@ private: TAutoPtr<IGraphTransformer> CreateKiSourceCallableExecutionTransformer( TIntrusivePtr<IKikimrGateway> gateway, - TIntrusivePtr<TKikimrSessionContext> sessionCtx) + TIntrusivePtr<TKikimrSessionContext> sessionCtx, + TTypeAnnotationContext& types) { - return new TKiSourceCallableExecutionTransformer(gateway, sessionCtx); + return new TKiSourceCallableExecutionTransformer(gateway, sessionCtx, types); } TAutoPtr<IGraphTransformer> CreateKiSinkCallableExecutionTransformer( diff --git a/ydb/core/kqp/provider/yql_kikimr_opt_build.cpp b/ydb/core/kqp/provider/yql_kikimr_opt_build.cpp index c50913c001..c2f5a31f4d 100644 --- a/ydb/core/kqp/provider/yql_kikimr_opt_build.cpp +++ b/ydb/core/kqp/provider/yql_kikimr_opt_build.cpp @@ -328,25 +328,33 @@ bool ExploreTx(TExprBase node, TExprContext& ctx, const TKiDataSink& dataSink, T return result; } - for (const auto& dataSource : types.DataSources) { - if (auto* dqIntegration = dataSource->GetDqIntegration(); dqIntegration - && dqIntegration->CanRead(*node.Ptr(), ctx) - && dqIntegration->EstimateReadSize(TDqSettings::TDefault::DataSizePerJob, TDqSettings::TDefault::MaxTasksPerStage, *node.Ptr(), ctx)) { - txRes.Ops.insert(node.Raw()); - for (size_t i = 0, childrenSize = node.Raw()->ChildrenSize(); i < childrenSize; ++i) { - if (TExprNode::TPtr child = node.Raw()->ChildPtr(i)) { - bool isWorldChild = false; - if (child->IsWorld()) { - isWorldChild = true; - } else if (auto* typeAnn = child->GetTypeAnn(); typeAnn && typeAnn->GetKind() == ETypeAnnotationKind::World) { - isWorldChild = true; - } - if (isWorldChild) { - return ExploreTx(TExprBase(child), ctx, dataSink, txRes, tablesData, types); + if (node.Ref().ChildrenSize() > 1) { + TExprBase dataSourceArg(node.Ref().Child(1)); + if (auto maybeDataSource = dataSourceArg.Maybe<TCoDataSource>()) { + TStringBuf dataSourceCategory = maybeDataSource.Cast().Category(); + auto dataSourceProviderIt = types.DataSourceMap.find(dataSourceCategory); + if (dataSourceProviderIt != types.DataSourceMap.end()) { + if (auto* dqIntegration = dataSourceProviderIt->second->GetDqIntegration()) { + if (dqIntegration->CanRead(*node.Ptr(), ctx) + && dqIntegration->EstimateReadSize( + TDqSettings::TDefault::DataSizePerJob, + TDqSettings::TDefault::MaxTasksPerStage, + *node.Ptr(), + ctx)) + { + txRes.Ops.insert(node.Raw()); + for (size_t i = 0, childrenSize = node.Raw()->ChildrenSize(); i < childrenSize; ++i) { + if (TExprNode::TPtr child = node.Raw()->ChildPtr(i)) { + auto* typeAnn = child->GetTypeAnn(); + if (typeAnn && typeAnn->GetKind() == ETypeAnnotationKind::World) { + return ExploreTx(TExprBase(child), ctx, dataSink, txRes, tablesData, types); + } + } + } + YQL_ENSURE(false, "Node \"" << node.Ref().Content() << "\" is expected to contain world child"); } } } - return true; } } @@ -382,6 +390,32 @@ bool ExploreTx(TExprBase node, TExprContext& ctx, const TKiDataSink& dataSink, T return result; } + if (node.Ref().ChildrenSize() > 1) { + TExprBase dataSinkArg(node.Ref().Child(1)); + if (auto maybeDataSink = dataSinkArg.Maybe<TCoDataSink>()) { + TStringBuf dataSinkCategory = maybeDataSink.Cast().Category(); + auto dataSinkProviderIt = types.DataSinkMap.find(dataSinkCategory); + if (dataSinkProviderIt != types.DataSinkMap.end()) { + if (auto* dqIntegration = dataSinkProviderIt->second->GetDqIntegration()) { + if (auto canWrite = dqIntegration->CanWrite(node.Ref(), ctx)) { + YQL_ENSURE(*canWrite, "Errors handling write"); + txRes.Ops.insert(node.Raw()); + txRes.AddEffect(node, THashMap<TString, TPrimitiveYdbOperations>{}); + for (size_t i = 0, childrenSize = node.Raw()->ChildrenSize(); i < childrenSize; ++i) { + if (TExprNode::TPtr child = node.Raw()->ChildPtr(i)) { + auto* typeAnn = child->GetTypeAnn(); + if (typeAnn && typeAnn->GetKind() == ETypeAnnotationKind::World) { + return ExploreTx(TExprBase(child), ctx, dataSink, txRes, tablesData, types); + } + } + } + YQL_ENSURE(false, "Node \"" << node.Ref().Content() << "\" is expected to contain world child"); + } + } + } + } + } + if (auto maybeUpdate = node.Maybe<TKiUpdateTable>()) { auto update = maybeUpdate.Cast(); if (!checkDataSink(update.DataSink())) { diff --git a/ydb/core/kqp/provider/yql_kikimr_provider.h b/ydb/core/kqp/provider/yql_kikimr_provider.h index a6345a57fd..a743c6ceb4 100644 --- a/ydb/core/kqp/provider/yql_kikimr_provider.h +++ b/ydb/core/kqp/provider/yql_kikimr_provider.h @@ -484,6 +484,7 @@ TIntrusivePtr<IDataProvider> CreateKikimrDataSink( TTypeAnnotationContext& types, TIntrusivePtr<IKikimrGateway> gateway, TIntrusivePtr<TKikimrSessionContext> sessionCtx, + const NKikimr::NExternalSource::IExternalSourceFactory::TPtr& sourceFactory, TIntrusivePtr<IKikimrQueryExecutor> queryExecutor); } // namespace NYql diff --git a/ydb/core/kqp/provider/yql_kikimr_provider_impl.h b/ydb/core/kqp/provider/yql_kikimr_provider_impl.h index 4adb83d704..414c128e65 100644 --- a/ydb/core/kqp/provider/yql_kikimr_provider_impl.h +++ b/ydb/core/kqp/provider/yql_kikimr_provider_impl.h @@ -162,7 +162,7 @@ struct TKiExecDataQuerySettings { TAutoPtr<IGraphTransformer> CreateKiSourceTypeAnnotationTransformer(TIntrusivePtr<TKikimrSessionContext> sessionCtx, TTypeAnnotationContext& types); TAutoPtr<IGraphTransformer> CreateKiSinkTypeAnnotationTransformer(TIntrusivePtr<IKikimrGateway> gateway, - TIntrusivePtr<TKikimrSessionContext> sessionCtx); + TIntrusivePtr<TKikimrSessionContext> sessionCtx, TTypeAnnotationContext& types); TAutoPtr<IGraphTransformer> CreateKiLogicalOptProposalTransformer(TIntrusivePtr<TKikimrSessionContext> sessionCtx, TTypeAnnotationContext& types); TAutoPtr<IGraphTransformer> CreateKiPhysicalOptProposalTransformer(TIntrusivePtr<TKikimrSessionContext> sessionCtx); @@ -175,7 +175,8 @@ TAutoPtr<IGraphTransformer> CreateKiSinkIntentDeterminationTransformer(TIntrusiv TAutoPtr<IGraphTransformer> CreateKiSourceCallableExecutionTransformer( TIntrusivePtr<IKikimrGateway> gateway, - TIntrusivePtr<TKikimrSessionContext> sessionCtx); + TIntrusivePtr<TKikimrSessionContext> sessionCtx, + TTypeAnnotationContext& types); TAutoPtr<IGraphTransformer> CreateKiSinkCallableExecutionTransformer( TIntrusivePtr<IKikimrGateway> gateway, @@ -219,4 +220,6 @@ bool IsKikimrSystemColumn(const TStringBuf columnName); bool ValidateTableHasIndex(TKikimrTableMetadataPtr metadata, TExprContext& ctx, const TPositionHandle& pos); +TExprNode::TPtr BuildExternalTableSettings(TPositionHandle pos, TExprContext& ctx, const TMap<TString, NYql::TKikimrColumnMetadata>& columns, const NKikimr::NExternalSource::IExternalSource::TPtr& source, const TString& content); + } // namespace NYql diff --git a/ydb/core/kqp/provider/yql_kikimr_type_ann.cpp b/ydb/core/kqp/provider/yql_kikimr_type_ann.cpp index 10c58733c7..f07c2a564c 100644 --- a/ydb/core/kqp/provider/yql_kikimr_type_ann.cpp +++ b/ydb/core/kqp/provider/yql_kikimr_type_ann.cpp @@ -7,6 +7,7 @@ #include <ydb/library/yql/core/yql_expr_optimize.h> #include <ydb/library/yql/core/yql_expr_type_annotation.h> #include <ydb/library/yql/core/yql_opt_utils.h> +#include <ydb/library/yql/dq/integration/yql_dq_integration.h> #include <ydb/library/yql/parser/pg_wrapper/interface/type_desc.h> #include <ydb/library/yql/providers/common/provider/yql_provider.h> #include <ydb/library/yql/providers/dq/provider/yql_dq_datasource_type_ann.h> @@ -292,9 +293,10 @@ class TKiSinkTypeAnnotationTransformer : public TKiSinkVisitorTransformer { public: TKiSinkTypeAnnotationTransformer(TIntrusivePtr<IKikimrGateway> gateway, - TIntrusivePtr<TKikimrSessionContext> sessionCtx) + TIntrusivePtr<TKikimrSessionContext> sessionCtx, TTypeAnnotationContext& types) : Gateway(gateway) - , SessionCtx(sessionCtx) {} + , SessionCtx(sessionCtx) + , Types(types) {} private: virtual TStatus HandleWriteTable(TKiWriteTable node, TExprContext& ctx) override { @@ -382,7 +384,7 @@ private: if (rowType->FindItem(keyColumnName)) { continue; } - + if (!columnInfo.IsAutoIncrement()) { ctx.AddError(YqlIssue(pos, TIssuesIds::KIKIMR_PRECONDITION_FAILED, TStringBuilder() << "Missing key column in input: " << keyColumnName @@ -410,7 +412,7 @@ private: //no type-level notnull check for pg types. continue; } - + if (itemType && itemType->HasOptionalOrNull()) { ctx.AddError(YqlIssue(pos, TIssuesIds::KIKIMR_BAD_COLUMN_TYPE, TStringBuilder() << "Can't set NULL or optional value to not null column: " << name @@ -1501,9 +1503,28 @@ virtual TStatus HandleCreateTable(TKiCreateTable create, TExprContext& ctx) over } if (!KikimrSupportedEffects().contains(effect.CallableName())) { - ctx.AddError(TIssue(ctx.GetPosition(node.Pos()), TStringBuilder() - << "Unsupported Kikimr data query effect: " << effect.CallableName())); - return TStatus::Error; + bool supported = false; + if (effect.Ref().ChildrenSize() > 1) { + TExprBase dataSinkArg(effect.Ref().Child(1)); + if (auto maybeDataSink = dataSinkArg.Maybe<TCoDataSink>()) { + TStringBuf dataSinkCategory = maybeDataSink.Cast().Category(); + auto dataSinkProviderIt = Types.DataSinkMap.find(dataSinkCategory); + if (dataSinkProviderIt != Types.DataSinkMap.end()) { + if (auto* dqIntegration = dataSinkProviderIt->second->GetDqIntegration()) { + auto canWrite = dqIntegration->CanWrite(*effect.Raw(), ctx); + if (canWrite) { + supported = *canWrite; // if false, we will exit this function a few lines later + } + } + } + } + } + + if (!supported) { + ctx.AddError(TIssue(ctx.GetPosition(node.Pos()), TStringBuilder() + << "Unsupported Kikimr data query effect: " << effect.CallableName())); + return TStatus::Error; + } } } @@ -1599,6 +1620,7 @@ virtual TStatus HandleCreateTable(TKiCreateTable create, TExprContext& ctx) over private: TIntrusivePtr<IKikimrGateway> Gateway; TIntrusivePtr<TKikimrSessionContext> SessionCtx; + TTypeAnnotationContext& Types; }; } // namespace @@ -1610,9 +1632,9 @@ TAutoPtr<IGraphTransformer> CreateKiSourceTypeAnnotationTransformer(TIntrusivePt } TAutoPtr<IGraphTransformer> CreateKiSinkTypeAnnotationTransformer(TIntrusivePtr<IKikimrGateway> gateway, - TIntrusivePtr<TKikimrSessionContext> sessionCtx) + TIntrusivePtr<TKikimrSessionContext> sessionCtx, TTypeAnnotationContext& types) { - return new TKiSinkTypeAnnotationTransformer(gateway, sessionCtx); + return new TKiSinkTypeAnnotationTransformer(gateway, sessionCtx, types); } const TTypeAnnotationNode* GetReadTableRowType(TExprContext& ctx, const TKikimrTablesData& tablesData, diff --git a/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp b/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp index 841d819b49..6f7122105f 100644 --- a/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp +++ b/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp @@ -708,6 +708,20 @@ private: stageProto.SetOutputsCount(outputsCount); + // Dq sinks + if (auto maybeOutputsNode = stage.Outputs()) { + auto outputsNode = maybeOutputsNode.Cast(); + for (size_t i = 0; i < outputsNode.Size(); ++i) { + auto outputNode = outputsNode.Item(i); + auto maybeSinkNode = outputNode.Maybe<TDqSink>(); + YQL_ENSURE(maybeSinkNode); + auto sinkNode = maybeSinkNode.Cast(); + auto* sinkProto = stageProto.AddSinks(); + FillSink(sinkNode, sinkProto, ctx); + sinkProto->SetOutputIndex(FromString(TStringBuf(sinkNode.Index()))); + } + } + auto paramsType = CollectParameters(stage, ctx); auto programBytecode = NDq::BuildProgram(stage.Program(), *paramsType, *KqlCompiler, TypeEnv, FuncRegistry, ctx, {}); @@ -751,10 +765,7 @@ private: i.MutableProgram()->MutableSettings()->SetLevelDataPrediction(rPredictor.GetLevelDataVolume(i.GetProgram().GetSettings().GetStageLevel())); } - - YQL_ENSURE(hasEffectStage == txSettings.WithEffects); - - txProto.SetHasEffects(txSettings.WithEffects); + txProto.SetHasEffects(hasEffectStage); for (const auto& paramBinding : tx.ParamBindings()) { TString paramName(paramBinding.Name().Value()); @@ -911,7 +922,7 @@ private: void FillSource(const TDqSource& source, NKqpProto::TKqpSource* protoSource, bool allowSystemColumns, THashMap<TStringBuf, THashSet<TStringBuf>>& tablesMap, TExprContext& ctx) { - const TStringBuf dataSourceCategory = source.DataSource().Cast<TCoDataSource>().Category().Value(); + const TStringBuf dataSourceCategory = source.DataSource().Cast<TCoDataSource>().Category(); if (dataSourceCategory == NYql::KikimrProviderName || dataSourceCategory == NYql::YdbProviderName || dataSourceCategory == NYql::KqpReadRangesSourceName) { FillKqpSource(source, protoSource, allowSystemColumns, tablesMap); } else { @@ -949,6 +960,22 @@ private: } } + void FillSink(const TDqSink& sink, NKqpProto::TKqpSink* protoSink, TExprContext& ctx) { + Y_UNUSED(ctx); + const TStringBuf dataSinkCategory = sink.DataSink().Cast<TCoDataSink>().Category(); + // Delegate sink filling to dq integration of specific provider + const auto provider = TypesCtx.DataSinkMap.find(dataSinkCategory); + YQL_ENSURE(provider != TypesCtx.DataSinkMap.end(), "Unsupported data sink category: \"" << dataSinkCategory << "\""); + NYql::IDqIntegration* dqIntegration = provider->second->GetDqIntegration(); + YQL_ENSURE(dqIntegration, "Unsupported dq sink for provider: \"" << dataSinkCategory << "\""); + auto& externalSink = *protoSink->MutableExternalSink(); + google::protobuf::Any& settings = *externalSink.MutableSettings(); + TString& sinkType = *externalSink.MutableType(); + dqIntegration->FillSinkSettings(sink.Ref(), settings, sinkType); + YQL_ENSURE(!settings.type_url().empty(), "Data sink provider \"" << dataSinkCategory << "\" did't fill dq sink settings for its dq sink node"); + YQL_ENSURE(sinkType, "Data sink provider \"" << dataSinkCategory << "\" did't fill dq sink settings type for its dq sink node"); + } + void FillConnection(const TDqConnection& connection, const TMap<ui64, ui32>& stagesMap, NKqpProto::TKqpPhyConnection& connectionProto, TExprContext& ctx, THashMap<TStringBuf, THashSet<TStringBuf>>& tablesMap) diff --git a/ydb/core/kqp/ut/federated_query/kqp_federated_query_ut.cpp b/ydb/core/kqp/ut/federated_query/kqp_federated_query_ut.cpp index 1d7a7400da..18e0b3766b 100644 --- a/ydb/core/kqp/ut/federated_query/kqp_federated_query_ut.cpp +++ b/ydb/core/kqp/ut/federated_query/kqp_federated_query_ut.cpp @@ -5,6 +5,8 @@ #include <aws/core/auth/AWSCredentialsProvider.h> #include <aws/core/Aws.h> #include <aws/s3/model/CreateBucketRequest.h> +#include <aws/s3/model/GetObjectRequest.h> +#include <aws/s3/model/ListObjectsRequest.h> #include <aws/s3/model/PutObjectRequest.h> #include <aws/s3/S3Client.h> @@ -39,21 +41,31 @@ bool EnsureAwsApiInited() { return inited; } -void CreateBucketWithObject(const TString& bucket, const TString& object, const TStringBuf& content) { +Aws::S3::S3Client MakeS3Client() { EnsureAwsApiInited(); Aws::Client::ClientConfiguration s3ClientConfig; s3ClientConfig.endpointOverride = GetEnv("S3_ENDPOINT"); s3ClientConfig.scheme = Aws::Http::Scheme::HTTP; - Aws::S3::S3Client s3Client(std::make_shared<Aws::Auth::AnonymousAWSCredentialsProvider>(), s3ClientConfig); + return Aws::S3::S3Client(std::make_shared<Aws::Auth::AnonymousAWSCredentialsProvider>(), s3ClientConfig); +} - { - Aws::S3::Model::CreateBucketRequest req; - req.SetBucket(bucket); - req.SetACL(Aws::S3::Model::BucketCannedACL::public_read_write); - const Aws::S3::Model::CreateBucketOutcome result = s3Client.CreateBucket(req); - UNIT_ASSERT_C(result.IsSuccess(), "Error creating bucket \"" << bucket << "\": " << result.GetError().GetExceptionName() << ": " << result.GetError().GetMessage()); - } +void CreateBucket(const TString& bucket, Aws::S3::S3Client& s3Client) { + Aws::S3::Model::CreateBucketRequest req; + req.SetBucket(bucket); + req.SetACL(Aws::S3::Model::BucketCannedACL::public_read_write); + const Aws::S3::Model::CreateBucketOutcome result = s3Client.CreateBucket(req); + UNIT_ASSERT_C(result.IsSuccess(), "Error creating bucket \"" << bucket << "\": " << result.GetError().GetExceptionName() << ": " << result.GetError().GetMessage()); +} + +void CreateBucket(const TString& bucket) { + Aws::S3::S3Client s3Client = MakeS3Client(); + + CreateBucket(bucket, s3Client); +} + +void CreateBucketWithObject(const TString& bucket, const TString& object, const TStringBuf& content, Aws::S3::S3Client& s3Client) { + CreateBucket(bucket, s3Client); { Aws::S3::Model::PutObjectRequest req; @@ -67,11 +79,81 @@ void CreateBucketWithObject(const TString& bucket, const TString& object, const } } +void CreateBucketWithObject(const TString& bucket, const TString& object, const TStringBuf& content) { + Aws::S3::S3Client s3Client = MakeS3Client(); + + CreateBucketWithObject(bucket, object, content, s3Client); +} + +TString GetObject(const TString& bucket, const TString& object, Aws::S3::S3Client& s3Client) { + Aws::S3::Model::GetObjectRequest req; + req.WithBucket(bucket).WithKey(object); + + Aws::S3::Model::GetObjectOutcome outcome = s3Client.GetObject(req); + UNIT_ASSERT(outcome.IsSuccess()); + Aws::S3::Model::GetObjectResult& result = outcome.GetResult(); + std::istreambuf_iterator<char> eos; + std::string objContent(std::istreambuf_iterator<char>(result.GetBody()), eos); + Cerr << "Got object content from \"" << bucket << "." << object << "\"\n" << objContent << Endl; + return objContent; +} + +TString GetObject(const TString& bucket, const TString& object) { + Aws::S3::S3Client s3Client = MakeS3Client(); + + return GetObject(bucket, object, s3Client); +} + +std::vector<TString> GetObjectKeys(const TString& bucket, Aws::S3::S3Client& s3Client) { + Aws::S3::Model::ListObjectsRequest listReq; + listReq.WithBucket(bucket); + + Aws::S3::Model::ListObjectsOutcome outcome = s3Client.ListObjects(listReq); + UNIT_ASSERT(outcome.IsSuccess()); + + std::vector<TString> keys; + for (auto& obj : outcome.GetResult().GetContents()) { + keys.push_back(TString(obj.GetKey())); + Cerr << "Found S3 object: \"" << obj.GetKey() << "\"" << Endl; + } + return keys; +} + +std::vector<TString> GetObjectKeys(const TString& bucket) { + Aws::S3::S3Client s3Client = MakeS3Client(); + + return GetObjectKeys(bucket, s3Client); +} + +TString GetAllObjects(const TString& bucket, TStringBuf separator, Aws::S3::S3Client& s3Client) { + std::vector<TString> keys = GetObjectKeys(bucket, s3Client); + TString result; + bool firstObject = true; + for (const TString& key : keys) { + result += GetObject(bucket, key, s3Client); + if (!firstObject) { + result += separator; + } + firstObject = false; + } + return result; +} + +TString GetAllObjects(const TString& bucket, TStringBuf separator = "") { + Aws::S3::S3Client s3Client = MakeS3Client(); + + return GetAllObjects(bucket, separator, s3Client); +} + +TString GetBucketLocation(const TStringBuf bucket) { + return TStringBuilder() << GetEnv("S3_ENDPOINT") << '/' << bucket << '/'; +} + NYdb::NQuery::TScriptExecutionOperation WaitScriptExecutionOperation(const NYdb::TOperation::TOperationId& operationId, const NYdb::TDriver& ydbDriver) { NYdb::NOperation::TOperationClient client(ydbDriver); NThreading::TFuture<NYdb::NQuery::TScriptExecutionOperation> op; do { - if (!op.Initialized()) { + if (op.Initialized()) { Sleep(TDuration::MilliSeconds(10)); } op = client.Get<NYdb::NQuery::TScriptExecutionOperation>(operationId); @@ -111,7 +193,7 @@ Y_UNIT_TEST_SUITE(KqpFederatedQuery) { );)", "external_source"_a = externalDataSourceName, "external_table"_a = externalTableName, - "location"_a = GetEnv("S3_ENDPOINT") + "/" + bucket + "/", + "location"_a = GetBucketLocation(bucket), "object"_a = object ); auto result = session.ExecuteSchemeQuery(query).GetValueSync(); @@ -175,7 +257,7 @@ Y_UNIT_TEST_SUITE(KqpFederatedQuery) { );)", "external_source"_a = externalDataSourceName, "external_table"_a = externalTableName, - "location"_a = GetEnv("S3_ENDPOINT") + "/" + bucket + "/", + "location"_a = GetBucketLocation(bucket), "object"_a = object ); auto result = session.ExecuteSchemeQuery(query).GetValueSync(); @@ -235,7 +317,7 @@ Y_UNIT_TEST_SUITE(KqpFederatedQuery) { AUTH_METHOD="NONE" );)", "external_source"_a = externalDataSourceName, - "location"_a = GetEnv("S3_ENDPOINT") + "/" + bucket + "/" + "location"_a = GetBucketLocation(bucket) ); auto result = session.ExecuteSchemeQuery(query).GetValueSync(); UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString()); @@ -294,7 +376,7 @@ Y_UNIT_TEST_SUITE(KqpFederatedQuery) { ); )", "external_source"_a = externalDataSourceName, - "location"_a = GetEnv("S3_ENDPOINT") + "/" + bucket + "/", + "location"_a = GetBucketLocation(bucket), "ydb_table"_a = ydbTable ); auto result = session.ExecuteSchemeQuery(query).GetValueSync(); @@ -374,7 +456,7 @@ Y_UNIT_TEST_SUITE(KqpFederatedQuery) { );)", "external_source"_a = externalDataSourceName, "external_table"_a = externalTableName, - "location"_a = GetEnv("S3_ENDPOINT") + "/" + bucket + "/", + "location"_a = GetBucketLocation(bucket), "object"_a = object ); auto result = session.ExecuteSchemeQuery(query).GetValueSync(); @@ -434,7 +516,7 @@ Y_UNIT_TEST_SUITE(KqpFederatedQuery) { ); )", "external_source"_a = externalDataSourceName, - "location"_a = GetEnv("S3_ENDPOINT") + "/" + bucket + "/", + "location"_a = GetBucketLocation(bucket), "ydb_table"_a = ydbTable ); auto result = session.ExecuteSchemeQuery(query).GetValueSync(); @@ -653,6 +735,86 @@ Y_UNIT_TEST_SUITE(KqpFederatedQuery) { } } + Y_UNIT_TEST(InsertIntoBucket) { + using namespace fmt::literals; + const TString readDataSourceName = "/Root/read_data_source"; + const TString readTableName = "/Root/read_binding"; + const TString readBucket = "test_bucket_read"; + const TString readObject = "test_object_read"; + const TString writeDataSourceName = "/Root/write_data_source"; + const TString writeTableName = "/Root/write_binding"; + const TString writeBucket = "test_bucket_write"; + const TString writeObject = "test_object_write/"; + + { + Aws::S3::S3Client s3Client = MakeS3Client(); + CreateBucketWithObject(readBucket, readObject, TEST_CONTENT, s3Client); + CreateBucket(writeBucket, s3Client); + } + + auto kikimr = DefaultKikimrRunner(); + kikimr.GetTestServer().GetRuntime()->GetAppData(0).FeatureFlags.SetEnableExternalDataSources(true); + + auto tc = kikimr.GetTableClient(); + auto session = tc.CreateSession().GetValueSync().GetSession(); + const TString query = fmt::format(R"( + CREATE EXTERNAL DATA SOURCE `{read_source}` WITH ( + SOURCE_TYPE="ObjectStorage", + LOCATION="{read_location}", + AUTH_METHOD="NONE" + ); + CREATE EXTERNAL TABLE `{read_table}` ( + key Utf8 NOT NULL, + value Utf8 NOT NULL + ) WITH ( + DATA_SOURCE="{read_source}", + LOCATION="{read_object}", + FORMAT="json_each_row" + ); + + CREATE EXTERNAL DATA SOURCE `{write_source}` WITH ( + SOURCE_TYPE="ObjectStorage", + LOCATION="{write_location}", + AUTH_METHOD="NONE" + ); + CREATE EXTERNAL TABLE `{write_table}` ( + key Utf8 NOT NULL, + value Utf8 NOT NULL + ) WITH ( + DATA_SOURCE="{write_source}", + LOCATION="{write_object}", + FORMAT="tsv_with_names" + ); + )", + "read_source"_a = readDataSourceName, + "read_table"_a = readTableName, + "read_location"_a = GetBucketLocation(readBucket), + "read_object"_a = readObject, + "write_source"_a = writeDataSourceName, + "write_table"_a = writeTableName, + "write_location"_a = GetBucketLocation(writeBucket), + "write_object"_a = writeObject + ); + auto result = session.ExecuteSchemeQuery(query).GetValueSync(); + UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString()); + + const TString sql = fmt::format(R"( + INSERT INTO `{write_table}` + SELECT * FROM `{read_table}` + )", + "read_table"_a=readTableName, + "write_table"_a = writeTableName); + + auto db = kikimr.GetQueryClient(); + auto resultFuture = db.ExecuteQuery(sql, NYdb::NQuery::TTxControl::BeginTx().CommitTx()); + resultFuture.Wait(); + UNIT_ASSERT_C(resultFuture.GetValueSync().IsSuccess(), resultFuture.GetValueSync().GetIssues().ToString()); + + TString content = GetAllObjects(writeBucket); + UNIT_ASSERT_STRING_CONTAINS(content, "key\tvalue\n"); // tsv header + UNIT_ASSERT_STRING_CONTAINS(content, "1\ttrololo\n"); + UNIT_ASSERT_STRING_CONTAINS(content, "2\thello world\n"); + } } } // namespace NKqp diff --git a/ydb/core/protos/kqp_physical.proto b/ydb/core/protos/kqp_physical.proto index 936680f952..2fe9c89f07 100644 --- a/ydb/core/protos/kqp_physical.proto +++ b/ydb/core/protos/kqp_physical.proto @@ -319,7 +319,20 @@ message TKqpSource { TKqpReadRangesSource ReadRangesSource = 3; TKqpExternalSource ExternalSource = 4; } -}; +} + +message TKqpExternalSink { + string Type = 1; + google.protobuf.Any Settings = 2; +} + +message TKqpSink { + uint32 OutputIndex = 1; + + oneof Type { + TKqpExternalSink ExternalSink = 2; + } +} message TKqpPhyStage { NYql.NDqProto.TProgram Program = 1; @@ -332,6 +345,7 @@ message TKqpPhyStage { string StageGuid = 8; repeated TKqpSource Sources = 9; bool IsSinglePartition = 10; + repeated TKqpSink Sinks = 11; } message TKqpPhyResult { diff --git a/ydb/library/yql/core/services/yql_transform_pipeline.cpp b/ydb/library/yql/core/services/yql_transform_pipeline.cpp index b4b494de29..719d35a306 100644 --- a/ydb/library/yql/core/services/yql_transform_pipeline.cpp +++ b/ydb/library/yql/core/services/yql_transform_pipeline.cpp @@ -56,11 +56,11 @@ TTransformationPipeline& TTransformationPipeline::AddParametersEvaluation(const TTransformationPipeline& TTransformationPipeline::AddExpressionEvaluation(const NKikimr::NMiniKQL::IFunctionRegistry& functionRegistry, IGraphTransformer* calcTransfomer, EYqlIssueCode issueCode) { - auto typeCtx = TypeAnnotationContext_; - auto funcReg = &functionRegistry; + auto& typeCtx = *TypeAnnotationContext_; + auto& funcReg = functionRegistry; Transformers_.push_back(TTransformStage(CreateFunctorTransformer( - [=](const TExprNode::TPtr& input, TExprNode::TPtr& output, TExprContext& ctx) { - return EvaluateExpression(input, output, *typeCtx, ctx, *funcReg, calcTransfomer); + [&typeCtx, &funcReg, calcTransfomer](const TExprNode::TPtr& input, TExprNode::TPtr& output, TExprContext& ctx) { + return EvaluateExpression(input, output, typeCtx, ctx, funcReg, calcTransfomer); }), "EvaluateExpression", issueCode)); return *this; diff --git a/ydb/library/yql/core/yql_graph_transformer.cpp b/ydb/library/yql/core/yql_graph_transformer.cpp index 886fddbfd1..dd3aa951df 100644 --- a/ydb/library/yql/core/yql_graph_transformer.cpp +++ b/ydb/library/yql/core/yql_graph_transformer.cpp @@ -142,7 +142,7 @@ protected: void AddTooManyTransformationsError(TPositionHandle pos, const TStringBuf& where, TExprContext& ctx) { ctx.AddError(TIssue(ctx.GetPosition(pos), - TStringBuilder() << "YQL: Internal core error! " << where << " take too much iteration: " + TStringBuilder() << "YQL: Internal core error! " << where << " takes too much iterations: " << ctx.RepeatTransformLimit << ". You may set RepeatTransformLimit as flags for config provider.")); } diff --git a/ydb/library/yql/dq/integration/yql_dq_integration.h b/ydb/library/yql/dq/integration/yql_dq_integration.h index 860b69b4c3..b528faaa82 100644 --- a/ydb/library/yql/dq/integration/yql_dq_integration.h +++ b/ydb/library/yql/dq/integration/yql_dq_integration.h @@ -44,7 +44,13 @@ public: virtual bool CanRead(const TExprNode& read, TExprContext& ctx, bool skipIssues = true) = 0; virtual TMaybe<ui64> EstimateReadSize(ui64 dataSizePerJob, ui32 maxTasksPerStage, const TExprNode& node, TExprContext& ctx) = 0; virtual TExprNode::TPtr WrapRead(const TDqSettings& config, const TExprNode::TPtr& read, TExprContext& ctx) = 0; - virtual TMaybe<bool> CanWrite(const TDqSettings& config, const TExprNode& write, TExprContext& ctx) = 0; + + // Nothing if callable is not for writing, + // false if callable is for writing and there are some errors (they are added to ctx), + // true if callable is for writing and no issues occured. + virtual TMaybe<bool> CanWrite(const TExprNode& write, TExprContext& ctx) = 0; + + virtual TExprNode::TPtr WrapWrite(const TExprNode::TPtr& write, TExprContext& ctx) = 0; virtual void RegisterMkqlCompiler(NCommon::TMkqlCallableCompilerBase& compiler) = 0; virtual bool CanFallback() = 0; virtual void FillSourceSettings(const TExprNode& node, ::google::protobuf::Any& settings, TString& sourceType) = 0; diff --git a/ydb/library/yql/dq/opt/dq_opt_build.cpp b/ydb/library/yql/dq/opt/dq_opt_build.cpp index 10abdb7bcf..9caff49017 100644 --- a/ydb/library/yql/dq/opt/dq_opt_build.cpp +++ b/ydb/library/yql/dq/opt/dq_opt_build.cpp @@ -116,7 +116,7 @@ void MakeConsumerReplaces( TNodeOnNodeOwnedMap argsMap; CollectArgsReplaces(dqStage, newArgs, argsMap, ctx); auto newStage = Build<TDqStage>(ctx, dqStage.Pos()) - .InitFrom(dqStage) + .InitFrom(dqStage) .Program() .Args(newArgs) .Body<TCoFlatMap>() @@ -481,7 +481,7 @@ TDqPhyStage RebuildStageInputsAsWide(const TDqPhyStage& stage, bool useChannelBl .Callable(0, "NarrowMap") .Callable(0, "ToFlow") .Add(0, newArgNode) - .Seal() + .Seal() .Lambda(1) .Params("fields", itemType->GetSize()) .Callable("AsStruct") @@ -568,6 +568,7 @@ TDqPhyStage RebuildStageOutputAsWide(const TDqPhyStage& stage, const TStructExpr .Body(resultStream) .Build() .Settings(TDqStageSettings::New(stage).SetWideChannels(outputItemType).BuildNode(ctx, stage.Pos())) + .Outputs(stage.Outputs()) .Done(); } @@ -630,7 +631,7 @@ IGraphTransformer::TStatus DqEnableWideChannels(EChannelMode mode, TExprNode::TP .Stage(newStage) .Build() .SortColumns(builder.Build().Value()) - .Done().Ptr(); + .Done().Ptr(); } else { auto newOutput = Build<TDqOutput>(ctx, conn.Output().Pos()) .InitFrom(conn.Output()) diff --git a/ydb/library/yql/providers/common/dq/yql_dq_integration_impl.cpp b/ydb/library/yql/providers/common/dq/yql_dq_integration_impl.cpp index 4f9da234bd..9ba72f7a86 100644 --- a/ydb/library/yql/providers/common/dq/yql_dq_integration_impl.cpp +++ b/ydb/library/yql/providers/common/dq/yql_dq_integration_impl.cpp @@ -19,10 +19,14 @@ TExprNode::TPtr TDqIntegrationBase::WrapRead(const TDqSettings&, const TExprNode return read; } -TMaybe<bool> TDqIntegrationBase::CanWrite(const TDqSettings&, const TExprNode&, TExprContext&) { +TMaybe<bool> TDqIntegrationBase::CanWrite(const TExprNode&, TExprContext&) { return Nothing(); } +TExprNode::TPtr TDqIntegrationBase::WrapWrite(const TExprNode::TPtr& write, TExprContext&) { + return write; +} + void TDqIntegrationBase::RegisterMkqlCompiler(NCommon::TMkqlCallableCompilerBase&) { } diff --git a/ydb/library/yql/providers/common/dq/yql_dq_integration_impl.h b/ydb/library/yql/providers/common/dq/yql_dq_integration_impl.h index de26d81671..abbc412d08 100644 --- a/ydb/library/yql/providers/common/dq/yql_dq_integration_impl.h +++ b/ydb/library/yql/providers/common/dq/yql_dq_integration_impl.h @@ -12,7 +12,8 @@ public: TMaybe<ui64> EstimateReadSize(ui64 dataSizePerJob, ui32 maxTasksPerStage, const TExprNode& node, TExprContext& ctx) override; TExprNode::TPtr WrapRead(const TDqSettings& config, const TExprNode::TPtr& read, TExprContext& ctx) override; void RegisterMkqlCompiler(NCommon::TMkqlCallableCompilerBase& compiler) override; - TMaybe<bool> CanWrite(const TDqSettings& config, const TExprNode& write, TExprContext& ctx) override; + TMaybe<bool> CanWrite(const TExprNode& write, TExprContext& ctx) override; + TExprNode::TPtr WrapWrite(const TExprNode::TPtr& write, TExprContext& ctx) override; bool CanFallback() override; void FillSourceSettings(const TExprNode& node, ::google::protobuf::Any& settings, TString& sourceType) override; void FillSinkSettings(const TExprNode& node, ::google::protobuf::Any& settings, TString& sinkType) override; diff --git a/ydb/library/yql/providers/dq/provider/yql_dq_recapture.cpp b/ydb/library/yql/providers/dq/provider/yql_dq_recapture.cpp index e568efb416..0cc5f15570 100644 --- a/ydb/library/yql/providers/dq/provider/yql_dq_recapture.cpp +++ b/ydb/library/yql/providers/dq/provider/yql_dq_recapture.cpp @@ -180,7 +180,7 @@ private: auto dataSink = State_->TypeCtx->DataSinkMap.FindPtr(dataSinkName); YQL_ENSURE(dataSink); if (auto dqIntegration = dataSink->Get()->GetDqIntegration()) { - if (auto canWrite = dqIntegration->CanWrite(*State_->Settings, node, ctx)) { + if (auto canWrite = dqIntegration->CanWrite(node, ctx)) { if (!canWrite.GetRef()) { good = false; } else if (!State_->Settings->EnableInsert.Get().GetOrElse(false)) { diff --git a/ydb/library/yql/providers/pq/provider/yql_pq_dq_integration.cpp b/ydb/library/yql/providers/pq/provider/yql_pq_dq_integration.cpp index a9cfab9cee..e47fd3b2b8 100644 --- a/ydb/library/yql/providers/pq/provider/yql_pq_dq_integration.cpp +++ b/ydb/library/yql/providers/pq/provider/yql_pq_dq_integration.cpp @@ -145,7 +145,7 @@ public: return read; } - TMaybe<bool> CanWrite(const TDqSettings&, const TExprNode&, TExprContext&) override { + TMaybe<bool> CanWrite(const TExprNode&, TExprContext&) override { YQL_ENSURE(false, "Unimplemented"); } diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp index 40a1d4f494..951aee6dd0 100644 --- a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp +++ b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp @@ -193,7 +193,7 @@ struct TEvPrivate { }; struct TEvReadStarted : public TEventLocal<TEvReadStarted, EvReadStarted> { - TEvReadStarted(CURLcode curlResponseCode, long httpResponseCode) + TEvReadStarted(CURLcode curlResponseCode, long httpResponseCode) : CurlResponseCode(curlResponseCode), HttpResponseCode(httpResponseCode) {} const CURLcode CurlResponseCode; const long HttpResponseCode; @@ -1287,7 +1287,7 @@ struct TReadBufferCounter { DecodedBytes += deltaDecodedBytes; DecodedRows += deltaDecodedRows; } - + ui64 Value = 0; const ui64 Limit; ui64 CoroCount = 0; @@ -2280,8 +2280,8 @@ public: void Bootstrap() { LOG_D("TS3StreamReadActor", "Bootstrap"); QueueBufferCounter = std::make_shared<TReadBufferCounter>( - ReadActorFactoryCfg.DataInflight, - TActivationContext::ActorSystem(), + ReadActorFactoryCfg.DataInflight, + TActivationContext::ActorSystem(), QueueDataSize, TaskQueueDataSize, DownloadPaused, diff --git a/ydb/library/yql/providers/s3/expr_nodes/yql_s3_expr_nodes.json b/ydb/library/yql/providers/s3/expr_nodes/yql_s3_expr_nodes.json index b87bab16ce..91d585082e 100644 --- a/ydb/library/yql/providers/s3/expr_nodes/yql_s3_expr_nodes.json +++ b/ydb/library/yql/providers/s3/expr_nodes/yql_s3_expr_nodes.json @@ -140,6 +140,16 @@ ] }, { + "Name": "TS3Insert", + "Base": "TCallable", + "Match": {"Type": "Callable", "Name": "S3Insert"}, + "Children": [ + {"Index": 0, "Name": "DataSink", "Type": "TS3DataSink"}, + {"Index": 1, "Name": "Target", "Type": "TS3Target"}, + {"Index": 2, "Name": "Input", "Type": "TExprBase"} + ] + }, + { "Name": "TS3SinkSettings", "Base": "TCallable", "Match": {"Type": "Callable", "Name": "S3SinkSettings"}, diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_datasink_type_ann.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_datasink_type_ann.cpp index cf94e8f102..9000aaef8e 100644 --- a/ydb/library/yql/providers/s3/provider/yql_s3_datasink_type_ann.cpp +++ b/ydb/library/yql/providers/s3/provider/yql_s3_datasink_type_ann.cpp @@ -29,6 +29,7 @@ public: AddHandler({TS3Target::CallableName()}, Hndl(&TSelf::HandleTarget)); AddHandler({TS3SinkSettings::CallableName()}, Hndl(&TSelf::HandleSink)); AddHandler({TS3SinkOutput::CallableName()}, Hndl(&TSelf::HandleOutput)); + AddHandler({TS3Insert::CallableName()}, Hndl(&TSelf::HandleInsert)); } private: TStatus HandleCommit(TExprBase input, TExprContext&) { @@ -84,6 +85,59 @@ private: return TStatus::Ok; } + TStatus HandleInsert(const TExprNode::TPtr& input, TExprContext& ctx) { + if (!EnsureArgsCount(*input, 3U, ctx)) { + return TStatus::Error; + } + + if (!EnsureSpecificDataSink(*input->Child(TS3Insert::idx_DataSink), S3ProviderName, ctx)) { + return TStatus::Error; + } + + auto source = input->Child(TS3Insert::idx_Input); + if (!EnsureListType(*source, ctx)) { + return TStatus::Error; + } + + const TTypeAnnotationNode* sourceType = source->GetTypeAnn()->Cast<TListExprType>()->GetItemType(); + if (!EnsureStructType(source->Pos(), *sourceType, ctx)) { + return TStatus::Error; + } + + auto target = input->Child(TS3Insert::idx_Target); + if (!TS3Target::Match(target)) { + ctx.AddError(TIssue(ctx.GetPosition(target->Pos()), "Expected S3 target.")); + return TStatus::Error; + } + + TS3Target tgt(target); + if (auto settings = tgt.Settings()) { + if (auto userschema = GetSetting(settings.Cast().Ref(), "userschema")) { + const TTypeAnnotationNode* targetType = userschema->Child(1)->GetTypeAnn()->Cast<TTypeExprType>()->GetType(); + if (!IsSameAnnotation(*targetType, *sourceType)) { + ctx.AddError(TIssue(ctx.GetPosition(source->Pos()), + TStringBuilder() << "Type mismatch between schema type: " << *targetType + << " and actual data type: " << *sourceType << ", diff is: " + << GetTypeDiff(*targetType, *sourceType))); + return TStatus::Error; + } + } + } + + input->SetTypeAnn( + ctx.MakeType<TTupleExprType>( + TTypeAnnotationNode::TListType{ + ctx.MakeType<TListExprType>( + ctx.MakeType<TOptionalExprType>( + ctx.MakeType<TDataExprType>(EDataSlot::String) + ) + ) + } + ) + ); + return TStatus::Ok; + } + TStatus HandleTarget(const TExprNode::TPtr& input, TExprNode::TPtr& output, TExprContext& ctx) { if (!EnsureMinMaxArgsCount(*input, 2U, 3U, ctx)) { return TStatus::Error; diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp index 31628ec902..d98ab61154 100644 --- a/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp +++ b/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp @@ -340,6 +340,21 @@ public: } } + TMaybe<bool> CanWrite(const TExprNode& write, TExprContext& ctx) override { + Y_UNUSED(ctx); + return TS3WriteObject::Match(&write); + } + + TExprNode::TPtr WrapWrite(const TExprNode::TPtr& writeNode, TExprContext& ctx) override { + TExprBase writeExpr(writeNode); + const auto write = writeExpr.Cast<TS3WriteObject>(); + return Build<TS3Insert>(ctx, write.Pos()) + .DataSink(write.DataSink()) + .Target(write.Target()) + .Input(write.Input()) + .Done().Ptr(); + } + void FillSinkSettings(const TExprNode& node, ::google::protobuf::Any& protoSettings, TString& sinkType) override { const TDqSink sink(&node); if (const auto maybeSettings = sink.Settings().Maybe<TS3SinkSettings>()) { diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_io_discovery.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_io_discovery.cpp index 9feecb6722..61700c459a 100644 --- a/ydb/library/yql/providers/s3/provider/yql_s3_io_discovery.cpp +++ b/ydb/library/yql/providers/s3/provider/yql_s3_io_discovery.cpp @@ -587,7 +587,7 @@ private: if (!s3ParseSettingsBase.Paths().Empty()) { resultSetLimitPerPath /= s3ParseSettingsBase.Paths().Size(); } - + for (auto path : s3ParseSettingsBase.Paths()) { NS3Details::TPathList directories; NS3Details::UnpackPathsList(path.Data().Literal().Value(), FromString<bool>(path.IsText().Literal().Value()), directories); @@ -613,7 +613,7 @@ private: State_->Configuration->UseConcurrentDirectoryLister.Get().GetOrElse( State_->Configuration->AllowConcurrentListings), .MaxResultSet = resultSetLimitPerPath}); - + RequestsByNode_[source.Raw()].push_back(req); PendingRequests_[req] = future; @@ -755,7 +755,7 @@ private: .S3Request{.Url = url, .Token = tokenStr}, .FilePattern = effectiveFilePattern, .Options{ - .IsConcurrentListing = isConcurrentListingEnabled, + .IsConcurrentListing = isConcurrentListingEnabled, .MaxResultSet = std::max(State_->Configuration->MaxDiscoveryFilesPerQuery, State_->Configuration->MaxDirectoriesAndFilesPerQuery) }}; diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_phy_opt.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_phy_opt.cpp index 7bfad7c4e6..2035ee5936 100644 --- a/ydb/library/yql/providers/s3/provider/yql_s3_phy_opt.cpp +++ b/ydb/library/yql/providers/s3/provider/yql_s3_phy_opt.cpp @@ -104,6 +104,7 @@ public: #define HNDL(name) "PhysicalOptimizer-"#name, Hndl(&TS3PhysicalOptProposalTransformer::name) AddHandler(0, &TCoLeft::Match, HNDL(TrimReadWorld)); AddHandler(0, &TS3WriteObject::Match, HNDL(S3WriteObject)); + AddHandler(0, &TS3Insert::Match, HNDL(S3Insert)); #undef HNDL } @@ -118,7 +119,276 @@ public: return TExprBase(maybeRead.Cast().World().Ptr()); } + TMaybe<TDqStage> BuildSinkStage(TPositionHandle writePos, TS3DataSink dataSink, TS3Target target, TExprBase input, TExprContext& ctx, const TGetParents& getParents) const { + const auto& cluster = dataSink.Cluster().StringValue(); + const auto token = "cluster:default_" + cluster; + const auto& settings = target.Settings().Ref(); + auto partBy = GetPartitionBy(settings); + auto keys = GetPartitionKeys(partBy); + + auto sinkSettingsBuilder = Build<TExprList>(ctx, target.Pos()); + if (partBy) + sinkSettingsBuilder.Add(std::move(partBy)); + + auto compression = GetCompression(settings); + const auto& extension = GetExtension(target.Format().Value(), compression ? compression->Tail().Content() : ""sv); + if (compression) + sinkSettingsBuilder.Add(std::move(compression)); + + auto sinkOutputSettingsBuilder = Build<TExprList>(ctx, target.Pos()); + if (auto csvDelimiter = GetCsvDelimiter(settings)) { + sinkOutputSettingsBuilder.Add(std::move(csvDelimiter)); + } + + bool hasDateTimeFormat = false; + bool hasDateTimeFormatName = false; + bool hasTimestampFormat = false; + bool hasTimestampFormatName = false; + if (auto dateTimeFormatName = GetDateTimeFormatName(settings)) { + sinkOutputSettingsBuilder.Add(std::move(dateTimeFormatName)); + hasDateTimeFormatName = true; + } + + if (auto dateTimeFormat = GetDateTimeFormat(settings)) { + sinkOutputSettingsBuilder.Add(std::move(dateTimeFormat)); + hasDateTimeFormat = true; + } + + if (auto timestampFormatName = GetTimestampFormatName(settings)) { + sinkOutputSettingsBuilder.Add(std::move(timestampFormatName)); + hasTimestampFormatName = true; + } + + if (auto timestampFormat = GetTimestampFormat(settings)) { + sinkOutputSettingsBuilder.Add(std::move(timestampFormat)); + hasTimestampFormat = true; + } + + if (!hasDateTimeFormat && !hasDateTimeFormatName) { + TExprNode::TListType pair; + pair.push_back(ctx.NewAtom(target.Pos(), "data.datetime.formatname")); + pair.push_back(ctx.NewAtom(target.Pos(), "POSIX")); + sinkOutputSettingsBuilder.Add(ctx.NewList(target.Pos(), std::move(pair))); + } + + if (!hasTimestampFormat && !hasTimestampFormatName) { + TExprNode::TListType pair; + pair.push_back(ctx.NewAtom(target.Pos(), "data.timestamp.formatname")); + pair.push_back(ctx.NewAtom(target.Pos(), "POSIX")); + sinkOutputSettingsBuilder.Add(ctx.NewList(target.Pos(), std::move(pair))); + } + + const TStringBuf format = target.Format(); + if (format != "raw" && format != "json_list") { // multipart + { + TExprNode::TListType pair; + pair.push_back(ctx.NewAtom(target.Pos(), "multipart")); + pair.push_back(ctx.NewAtom(target.Pos(), "true")); + sinkSettingsBuilder.Add(ctx.NewList(target.Pos(), std::move(pair))); + } + { + TExprNode::TListType pair; + pair.push_back(ctx.NewAtom(target.Pos(), "file_size_limit")); + size_t fileSize = 50_MB; + if (const auto& maxObjectSize = State_->Configuration->MaxOutputObjectSize.Get()) { + fileSize = *maxObjectSize; + } + pair.push_back(ctx.NewAtom(target.Pos(), ToString(fileSize))); + sinkOutputSettingsBuilder.Add(ctx.NewList(target.Pos(), std::move(pair))); + } + } + + if (!FindNode(input.Ptr(), [] (const TExprNode::TPtr& node) { return node->IsCallable(TCoDataSource::CallableName()); })) { + YQL_CLOG(INFO, ProviderS3) << "Rewrite pure S3WriteObject `" << cluster << "`.`" << target.Path().StringValue() << "` as stage with sink."; + return keys.empty() ? + Build<TDqStage>(ctx, writePos) + .Inputs().Build() + .Program<TCoLambda>() + .Args({}) + .Body<TS3SinkOutput>() + .Input<TCoToFlow>() + .Input(input) + .Build() + .Format(target.Format()) + .KeyColumns().Build() + .Settings(sinkOutputSettingsBuilder.Done()) + .Build() + .Build() + .Outputs<TDqStageOutputsList>() + .Add<TDqSink>() + .DataSink(dataSink) + .Index().Value("0").Build() + .Settings<TS3SinkSettings>() + .Path(target.Path()) + .Settings(sinkSettingsBuilder.Done()) + .Token<TCoSecureParam>() + .Name().Build(token) + .Build() + .Extension().Value(extension).Build() + .Build() + .Build() + .Build() + .Settings().Build() + .Done() + : + Build<TDqStage>(ctx, writePos) + .Inputs() + .Add<TDqCnHashShuffle>() + .Output<TDqOutput>() + .Stage<TDqStage>() + .Inputs().Build() + .Program<TCoLambda>() + .Args({}) + .Body<TCoToFlow>() + .Input(input) + .Build() + .Build() + .Settings().Build() + .Build() + .Index().Value("0", TNodeFlags::Default).Build() + .Build() + .KeyColumns().Add(keys).Build() + .Build() + .Build() + .Program<TCoLambda>() + .Args({"in"}) + .Body<TS3SinkOutput>() + .Input("in") + .Format(target.Format()) + .KeyColumns().Add(keys).Build() + .Settings(sinkOutputSettingsBuilder.Done()) + .Build() + .Build() + .Outputs<TDqStageOutputsList>() + .Add<TDqSink>() + .DataSink(dataSink) + .Index().Value("0", TNodeFlags::Default).Build() + .Settings<TS3SinkSettings>() + .Path(target.Path()) + .Settings(sinkSettingsBuilder.Done()) + .Token<TCoSecureParam>() + .Name().Build(token) + .Build() + .Extension().Value(extension).Build() + .Build() + .Build() + .Build() + .Settings().Build() + .Done(); + } + + if (!TDqCnUnionAll::Match(input.Raw())) { + return Nothing(); + } + + const TParentsMap* parentsMap = getParents(); + const auto dqUnion = input.Cast<TDqCnUnionAll>(); + if (!NDq::IsSingleConsumerConnection(dqUnion, *parentsMap)) { + return Nothing(); + } + + YQL_CLOG(INFO, ProviderS3) << "Rewrite S3WriteObject `" << cluster << "`.`" << target.Path().StringValue() << "` as sink."; + + const auto inputStage = dqUnion.Output().Stage().Cast<TDqStage>(); + + const auto sink = Build<TDqSink>(ctx, writePos) + .DataSink(dataSink) + .Index(dqUnion.Output().Index()) + .Settings<TS3SinkSettings>() + .Path(target.Path()) + .Settings(sinkSettingsBuilder.Done()) + .Token<TCoSecureParam>() + .Name().Build(token) + .Build() + .Extension().Value(extension).Build() + .Build() + .Done(); + + auto outputsBuilder = Build<TDqStageOutputsList>(ctx, target.Pos()); + if (inputStage.Outputs() && keys.empty()) { + outputsBuilder.InitFrom(inputStage.Outputs().Cast()); + } + outputsBuilder.Add(sink); + + if (keys.empty()) { + const auto outputBuilder = Build<TS3SinkOutput>(ctx, target.Pos()) + .Input(inputStage.Program().Body().Ptr()) + .Format(target.Format()) + .KeyColumns().Add(std::move(keys)).Build() + .Settings(sinkOutputSettingsBuilder.Done()) + .Done(); + + return Build<TDqStage>(ctx, writePos) + .InitFrom(inputStage) + .Program(ctx.DeepCopyLambda(inputStage.Program().Ref(), outputBuilder.Ptr())) + .Outputs(outputsBuilder.Done()) + .Done(); + } else { + return Build<TDqStage>(ctx, writePos) + .Inputs() + .Add<TDqCnHashShuffle>() + .Output<TDqOutput>() + .Stage(inputStage) + .Index(dqUnion.Output().Index()) + .Build() + .KeyColumns().Add(keys).Build() + .Build() + .Build() + .Program<TCoLambda>() + .Args({"in"}) + .Body<TS3SinkOutput>() + .Input("in") + .Format(target.Format()) + .KeyColumns().Add(std::move(keys)).Build() + .Settings(sinkOutputSettingsBuilder.Done()) + .Build() + .Build() + .Settings().Build() + .Outputs(outputsBuilder.Done()) + .Done(); + } + } + + TMaybeNode<TExprBase> S3Insert(TExprBase node, TExprContext& ctx, const TGetParents& getParents) const { + auto insert = node.Cast<TS3Insert>(); + TMaybe<TDqStage> stage + = BuildSinkStage(node.Pos(), + insert.DataSink(), + insert.Target(), + insert.Input(), + ctx, + getParents); + + if (stage) { + return *stage; + } else { + return node; + } + } + TMaybeNode<TExprBase> S3WriteObject(TExprBase node, TExprContext& ctx, const TGetParents& getParents) const { + auto write = node.Cast<TS3WriteObject>(); + TMaybe<TDqStage> stage + = BuildSinkStage(node.Pos(), + write.DataSink(), + write.Target(), + write.Input(), + ctx, + getParents); + + if (stage) { + return Build<TDqQuery>(ctx, write.Pos()) + .World(write.World()) + .SinkStages() + .Add(*stage) + .Build() + .Done(); + } else { + return node; + } + } + + TMaybeNode<TExprBase> S3WriteObject1(TExprBase node, TExprContext& ctx, const TGetParents& getParents) const { const auto& write = node.Cast<TS3WriteObject>(); const auto& targetNode = write.Target(); const auto& cluster = write.DataSink().Cluster().StringValue(); diff --git a/ydb/library/yql/providers/solomon/provider/yql_solomon_dq_integration.cpp b/ydb/library/yql/providers/solomon/provider/yql_solomon_dq_integration.cpp index 33be2f8fdd..cb82aec4f5 100644 --- a/ydb/library/yql/providers/solomon/provider/yql_solomon_dq_integration.cpp +++ b/ydb/library/yql/providers/solomon/provider/yql_solomon_dq_integration.cpp @@ -87,7 +87,7 @@ public: YQL_ENSURE(false, "Unimplemented"); } - TMaybe<bool> CanWrite(const TDqSettings&, const TExprNode&, TExprContext&) override { + TMaybe<bool> CanWrite(const TExprNode&, TExprContext&) override { YQL_ENSURE(false, "Unimplemented"); } diff --git a/ydb/library/yql/providers/yt/provider/yql_yt_dq_integration.cpp b/ydb/library/yql/providers/yt/provider/yql_yt_dq_integration.cpp index 1c21299802..78247608b4 100644 --- a/ydb/library/yql/providers/yt/provider/yql_yt_dq_integration.cpp +++ b/ydb/library/yql/providers/yt/provider/yql_yt_dq_integration.cpp @@ -410,7 +410,7 @@ public: return read; } - TMaybe<bool> CanWrite(const TDqSettings&, const TExprNode& node, TExprContext& ctx) override { + TMaybe<bool> CanWrite(const TExprNode& node, TExprContext& ctx) override { if (auto maybeWrite = TMaybeNode<TYtWriteTable>(&node)) { auto cluster = TString{maybeWrite.Cast().DataSink().Cluster().Value()}; auto tableName = TString{TYtTableInfo::GetTableLabel(maybeWrite.Cast().Table())}; |