diff options
author | hrustyashko <hrustyashko@yandex-team.ru> | 2022-04-21 02:44:42 +0300 |
---|---|---|
committer | hrustyashko <hrustyashko@yandex-team.ru> | 2022-04-21 02:44:42 +0300 |
commit | d48331e0a9ea5992b4abb35a1ce23c741c98e5e5 (patch) | |
tree | 6c99bb3b59d27d0a28af257f54de2e7ef2a84769 | |
parent | 29486e56aab0f6709271b5f3f61035d8224f5bb0 (diff) | |
download | ydb-d48331e0a9ea5992b4abb35a1ce23c741c98e5e5.tar.gz |
Resolve function's provider as Sink with output
``` lisp
(let $4 (DataSink '"function" '"CloudFunction" '"my_sa"))
(let $5 (TransformSettings '"CloudFunction" (StructType '('"key" $1)) (StructType '('"b" (DataType 'Int32))) '('('"invoke_url" '"https://functions.yandexcloud.net/d4em1sgldfc6p4bcmtp2"))))
(let $6 (DqPhyStage '((DqCnHashShuffle (TDqOutput $3 '0) '($2))) (lambda '($16) (FromFlow (Map (ToFlow $16) (lambda '($17) (AsStruct '('"key" (Member $17 '"key"))))))) '('('"_logical_id" '173953) '('"_id" '"683fd09-90df104f-be026786-b3e950c3")) '((DqSink $4 $5 '0))))
```
ref:1aa1e7696c2217a5e1d7caf883e054fee7466bbf
39 files changed, 667 insertions, 233 deletions
diff --git a/CMakeLists.darwin.txt b/CMakeLists.darwin.txt index fea6b301ca7..91c2c24eec5 100644 --- a/CMakeLists.darwin.txt +++ b/CMakeLists.darwin.txt @@ -1205,10 +1205,11 @@ add_subdirectory(ydb/library/yql/parser/pg_catalog/ut) add_subdirectory(ydb/library/yql/parser/lexer_common/ut) add_subdirectory(ydb/library/yql/providers/common/schema) add_subdirectory(ydb/library/yql/providers/common/schema/skiff) -add_subdirectory(ydb/library/yql/providers/function/provider) -add_subdirectory(ydb/library/yql/providers/function/expr_nodes) add_subdirectory(ydb/library/yql/providers/function/common) +add_subdirectory(ydb/library/yql/providers/function/expr_nodes) add_subdirectory(ydb/library/yql/providers/function/gateway) +add_subdirectory(ydb/library/yql/providers/function/proto) +add_subdirectory(ydb/library/yql/providers/function/provider) add_subdirectory(ydb/library/yql/public/decimal/ut) add_subdirectory(ydb/library/yql/public/issue/ut) add_subdirectory(ydb/library/yql/public/udf/ut) diff --git a/CMakeLists.linux.txt b/CMakeLists.linux.txt index 708a6f310a4..fd5ab1928c9 100644 --- a/CMakeLists.linux.txt +++ b/CMakeLists.linux.txt @@ -1300,10 +1300,11 @@ add_subdirectory(ydb/library/yql/parser/pg_catalog/ut) add_subdirectory(ydb/library/yql/parser/lexer_common/ut) add_subdirectory(ydb/library/yql/providers/common/schema) add_subdirectory(ydb/library/yql/providers/common/schema/skiff) -add_subdirectory(ydb/library/yql/providers/function/provider) -add_subdirectory(ydb/library/yql/providers/function/expr_nodes) add_subdirectory(ydb/library/yql/providers/function/common) +add_subdirectory(ydb/library/yql/providers/function/expr_nodes) add_subdirectory(ydb/library/yql/providers/function/gateway) +add_subdirectory(ydb/library/yql/providers/function/proto) +add_subdirectory(ydb/library/yql/providers/function/provider) add_subdirectory(ydb/library/yql/public/decimal/ut) add_subdirectory(ydb/library/yql/public/issue/ut) add_subdirectory(ydb/library/yql/public/udf/ut) diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h index 665d5b43dc1..3cea1fe0241 100644 --- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h +++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h @@ -1341,6 +1341,7 @@ private: const auto& outputDesc = Task.GetOutputs(i); Y_VERIFY(!outputDesc.HasSink() || outputDesc.ChannelsSize() == 0); // HasSink => no channels Y_VERIFY(outputDesc.HasSink() || outputDesc.ChannelsSize() > 0); + if (outputDesc.HasSink()) { auto result = SinksMap.emplace(i, TSinkInfo()); YQL_ENSURE(result.second); diff --git a/ydb/library/yql/dq/expr_nodes/dq_expr_nodes.json b/ydb/library/yql/dq/expr_nodes/dq_expr_nodes.json index 67627b1b8bd..0041a497154 100644 --- a/ydb/library/yql/dq/expr_nodes/dq_expr_nodes.json +++ b/ydb/library/yql/dq/expr_nodes/dq_expr_nodes.json @@ -80,6 +80,17 @@ "ListBase": "TDqSink" }, { + "Name": "TTransformSettings", + "Base": "TCallable", + "Match": {"Type": "Callable", "Name": "TransformSettings"}, + "Children": [ + {"Index": 0, "Name": "Type", "Type": "TCoAtom"}, + {"Index": 1, "Name": "InputType", "Type": "TExprBase"}, + {"Index": 2, "Name": "OutputType", "Type": "TExprBase"}, + {"Index": 3, "Name": "Other", "Type": "TCoNameValueTupleList"} + ] + }, + { "Name": "TDqStageBase", "Base": "TCallable", "Match": {"Type": "CallableBase"}, @@ -224,7 +235,7 @@ "Children": [ {"Index": 0, "Name": "TransformType", "Type": "TExprBase"}, {"Index": 1, "Name": "TransformName", "Type": "TExprBase"}, - {"Index": 2, "Name": "Settings", "Type": "TExprBase"} + {"Index": 2, "Name": "Settings", "Type": "TCoNameValueTupleList"} ] } ] diff --git a/ydb/library/yql/dq/opt/dq_opt.cpp b/ydb/library/yql/dq/opt/dq_opt.cpp index 418f3fe117f..d936a0939c6 100644 --- a/ydb/library/yql/dq/opt/dq_opt.cpp +++ b/ydb/library/yql/dq/opt/dq_opt.cpp @@ -144,15 +144,11 @@ bool IsSingleConsumerConnection(const TDqConnection& node, const TParentsMap& pa && (allowStageMultiUsage || IsSingleConsumer(node.Output().Stage(), parentsMap)); } -ui32 GetStageOutputsCount(const TDqStageBase& stage, bool includingSinks) { +ui32 GetStageOutputsCount(const TDqStageBase& stage) { auto stageType = stage.Ref().GetTypeAnn(); YQL_ENSURE(stageType); auto resultsTypeTuple = stageType->Cast<TTupleExprType>(); - ui32 result = resultsTypeTuple->GetSize(); - if (!includingSinks && stage.Sinks()) { - result -= stage.Sinks().Cast().Size(); - } - return result; + return resultsTypeTuple->GetSize(); } TVector<TDqConnection> FindDqConnections(const TExprBase& node) { diff --git a/ydb/library/yql/dq/opt/dq_opt.h b/ydb/library/yql/dq/opt/dq_opt.h index 125a0a5bd9e..3d9a14e1f8a 100644 --- a/ydb/library/yql/dq/opt/dq_opt.h +++ b/ydb/library/yql/dq/opt/dq_opt.h @@ -54,7 +54,7 @@ bool IsSingleConsumer(const NNodes::TExprBase& node, const TParentsMap& parentsM bool IsSingleConsumerConnection(const NNodes::TDqConnection& node, const TParentsMap& parentsMap, bool allowStageMultiUsage = true); -ui32 GetStageOutputsCount(const NNodes::TDqStageBase& stage, bool includingSinks); +ui32 GetStageOutputsCount(const NNodes::TDqStageBase& stage); TVector<NNodes::TDqConnection> FindDqConnections(const NNodes::TExprBase& node); bool IsDqPureExpr(const NNodes::TExprBase& node, bool isPrecomputePure = true); diff --git a/ydb/library/yql/dq/opt/dq_opt_build.cpp b/ydb/library/yql/dq/opt/dq_opt_build.cpp index e57b85a9113..175b58cc208 100644 --- a/ydb/library/yql/dq/opt/dq_opt_build.cpp +++ b/ydb/library/yql/dq/opt/dq_opt_build.cpp @@ -235,8 +235,9 @@ public: if (output.Stage().Maybe<TDqStage>()) { auto& info = consumersMap[output.Stage().Raw()]; + if (info.Consumers.empty()) { - info.Consumers.resize(GetStageOutputsCount(output.Stage(), false)); + info.Consumers.resize(GetStageOutputsCount(output.Stage())); } YQL_ENSURE(index <= info.Consumers.size()); diff --git a/ydb/library/yql/dq/opt/dq_opt_join.cpp b/ydb/library/yql/dq/opt/dq_opt_join.cpp index ab440d1d5ed..7ce13c9b83d 100644 --- a/ydb/library/yql/dq/opt/dq_opt_join.cpp +++ b/ydb/library/yql/dq/opt/dq_opt_join.cpp @@ -617,7 +617,7 @@ TExprBase DqBuildPhyJoin(const TDqJoin& join, bool pushLeftStage, TExprContext& buildNewStage = IsDqDependsOnStage(join.RightInput(), leftCn.Output().Stage()); if (!buildNewStage) { // NOTE: Do not push join to stage with multiple outputs, reduce memory footprint. - buildNewStage = GetStageOutputsCount(leftCn.Output().Stage(), true) > 1; + buildNewStage = GetStageOutputsCount(leftCn.Output().Stage()) > 1; } } diff --git a/ydb/library/yql/dq/opt/dq_opt_phy.cpp b/ydb/library/yql/dq/opt/dq_opt_phy.cpp index 9df0e144c5e..71cf0dcff82 100644 --- a/ydb/library/yql/dq/opt/dq_opt_phy.cpp +++ b/ydb/library/yql/dq/opt/dq_opt_phy.cpp @@ -201,7 +201,7 @@ TMaybeNode<TDqStage> DqPushLambdaToStage(const TDqStage& stage, const TCoAtom& o auto program = stage.Program(); ui32 index = FromString<ui32>(outputIndex.Value()); - ui32 branchesCount = GetStageOutputsCount(stage, true); + ui32 branchesCount = GetStageOutputsCount(stage); TExprNode::TPtr newProgram; if (branchesCount == 1) { @@ -427,123 +427,6 @@ TExprBase DqPushLMapToStage(TExprBase node, TExprContext& ctx, IOptimizationCont return DqPushBaseLMapToStage<TCoLMap>(node, ctx, optCtx, parentsMap, allowStageMultiUsage); } -TExprBase DqBuildExtFunctionStage(TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx, - const TParentsMap& parentsMap, bool allowStageMultiUsage) -{ - Y_UNUSED(optCtx); - Y_UNUSED(allowStageMultiUsage); - Y_UNUSED(parentsMap); - - auto apply = node.Cast<TCoApply>(); - auto callable = apply.Callable().Maybe<TDqSqlExternalFunction>(); - if (!callable - || apply.Args().Count() != 2 - || !apply.Arg(1).Maybe<TDqCnUnionAll>()) { - - return node; - } - callable = callable.Cast(); - TDqCnUnionAll nodeInput {apply.Arg(1).Cast<TDqCnUnionAll>()}; - - if (!IsSingleConsumerConnection(nodeInput, parentsMap, allowStageMultiUsage)) { - return node; - } - - const auto shuffleColumn = Build<TCoAtom>(ctx, node.Pos()) - .Value("_yql_transform_shuffle") - .Done(); - auto addShuffleColumn = Build<TCoLambda>(ctx, node.Pos()) - .Args({"stream"}) - .Body<TCoMap>() - .Input("stream") - .Lambda() - .Args({"row"}) - .Body<TCoAddMember>() - .Struct("row") - .Name(shuffleColumn) - .Item<TCoRandom>().Add<TCoDependsOn>().Input("row").Build().Build() - .Build() - .Build() - .Build() - .Done(); - auto removeShuffleColumn = Build<TCoLambda>(ctx, node.Pos()) - .Args({"row"}) - .Body<TCoForceRemoveMember>() - .Struct("row") - .Name(shuffleColumn) - .Build() - .Done(); - - TVector<TCoNameValueTuple> settings; - auto isExtFunction = Build<TCoNameValueTuple>(ctx, node.Pos()) - .Name().Build(TDqStageSettings::IsExternalSetting) - .Value<TCoBool>().Literal().Build("true").Build() - .Done(); - settings.push_back(isExtFunction); - - auto transformType = callable.TransformType().Cast<TCoString>().Literal().StringValue(); - settings.push_back( - Build<TCoNameValueTuple>(ctx, node.Pos()) - .Name().Build(TDqStageSettings::TransformTypeSetting) - .Value<TCoAtom>().Build(transformType) - .Done()); - - auto transformName = callable.TransformName().Cast<TCoString>().Literal().StringValue(); - settings.push_back( - Build<TCoNameValueTuple>(ctx, node.Pos()) - .Name().Build(TDqStageSettings::TransformNameSetting) - .Value<TCoAtom>().Build(transformName) - .Done()); - - for (const auto &tuple: callable.Settings().Ref().Children()) { - const auto paramName = tuple->Head().Content(); - auto setting = Build<TCoNameValueTuple>(ctx, node.Pos()) - .Name().Build(paramName) - .Value(tuple->TailPtr()) - .Done(); - settings.push_back(setting); - } - - auto settingsBuilder = Build<TCoNameValueTupleList>(ctx, node.Pos()) - .Add(settings) - .Done(); - - auto stage = nodeInput.Output().Stage().Cast<TDqStage>(); - auto dutyColumn = DqPushLambdaToStage(stage, nodeInput.Output().Index(), addShuffleColumn, {}, ctx, optCtx); - YQL_ENSURE(dutyColumn); - - auto transformStage = Build<TDqStage>(ctx, node.Pos()) - .Inputs() - .Add<TDqCnHashShuffle>() - .KeyColumns() - .Add({shuffleColumn}) - .Build() - .Output() - .Stage(dutyColumn.Cast()) - .Index(nodeInput.Output().Index()) - .Build() - .Build() - .Build() - .Program() - .Args({"row"}) - .Body<TCoMap>() - .Lambda(removeShuffleColumn) - .Input("row") - .Build() - .Build() - .Settings(settingsBuilder) - .Done(); - - auto externalStage = Build<TDqCnUnionAll>(ctx, node.Pos()) - .Output() - .Stage(transformStage) - .Index().Build("0") - .Build() - .Done(); - - return externalStage; -} - TExprBase DqPushCombineToStage(TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx, const TParentsMap& parentsMap, bool allowStageMultiUsage) { diff --git a/ydb/library/yql/dq/opt/dq_opt_phy.h b/ydb/library/yql/dq/opt/dq_opt_phy.h index 7013aa72a47..693d993fdfb 100644 --- a/ydb/library/yql/dq/opt/dq_opt_phy.h +++ b/ydb/library/yql/dq/opt/dq_opt_phy.h @@ -29,9 +29,6 @@ NNodes::TExprBase DqPushOrderedLMapToStage(NNodes::TExprBase node, TExprContext& NNodes::TExprBase DqPushLMapToStage(NNodes::TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx, const TParentsMap& parentsMap, bool allowStageMultiUsage = true); -NNodes::TExprBase DqBuildExtFunctionStage(NNodes::TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx, - const TParentsMap& parentsMap, bool allowStageMultiUsage = true); - NNodes::TExprBase DqBuildFlatmapStage(NNodes::TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx, const TParentsMap& parentsMap, bool allowStageMultiUsage = true); diff --git a/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp b/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp index 67005a599b1..ab8a5eb190c 100644 --- a/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp +++ b/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp @@ -9,10 +9,13 @@ #include <ydb/library/yql/minikql/computation/mkql_computation_node.h> +#include <ydb/library/yql/providers/common/schema/mkql/yql_mkql_schema.h> + #include <ydb/library/yql/public/udf/udf_value.h> #include <ydb/library/yql/core/user_data/yql_user_data.h> #include <ydb/library/yql/minikql/mkql_node_serialization.h> #include <ydb/library/yql/minikql/mkql_node_visitor.h> +#include <ydb/library/yql/minikql/mkql_program_builder.h> #include <util/generic/scope.h> @@ -404,6 +407,7 @@ public: TBindTerminator term(ProgramParsed.CompGraph->GetTerminator()); auto& typeEnv = TypeEnv(); + const auto pb = std::make_unique<NKikimr::NMiniKQL::TProgramBuilder>(typeEnv, *(holderFactory.GetFunctionRegistry())); for (ui32 i = 0; i < task.InputsSize(); ++i) { auto& inputDesc = task.GetInputs(i); @@ -441,6 +445,15 @@ public: TaskHasEffects = true; } + if (outputDesc.HasTransform()) { + auto transform = outputDesc.GetTransform(); + auto outputType = NCommon::ParseTypeFromYson(TStringBuf{transform.GetOutputType()}, *pb, Cerr); + auto inputType = NCommon::ParseTypeFromYson(TStringBuf{transform.GetInputType()}, *pb, Cerr); + LOG(TStringBuilder() << "Task: " << TaskId << " has transform by " + << transform.GetType() << " with input type: " << *inputType + << " , output type: " << *outputType); + } + TVector<IDqOutput::TPtr> outputs{Reserve(std::max<ui64>(outputDesc.ChannelsSize(), 1))}; if (outputDesc.HasSink()) { auto sink = CreateDqAsyncOutputBuffer(i, ProgramParsed.OutputItemTypes[i], memoryLimits.ChannelBufferSize, diff --git a/ydb/library/yql/dq/tasks/CMakeLists.txt b/ydb/library/yql/dq/tasks/CMakeLists.txt index 726d1950eb2..ed676758698 100644 --- a/ydb/library/yql/dq/tasks/CMakeLists.txt +++ b/ydb/library/yql/dq/tasks/CMakeLists.txt @@ -17,6 +17,7 @@ target_link_libraries(yql-dq-tasks PUBLIC library-yql-core yql-dq-expr_nodes yql-dq-proto + library-yql-ast ) target_sources(yql-dq-tasks PRIVATE ${CMAKE_SOURCE_DIR}/ydb/library/yql/dq/tasks/dq_task_program.cpp diff --git a/ydb/library/yql/dq/tasks/dq_connection_builder.h b/ydb/library/yql/dq/tasks/dq_connection_builder.h index d5bd5a4cf55..ee7f73726ee 100644 --- a/ydb/library/yql/dq/tasks/dq_connection_builder.h +++ b/ydb/library/yql/dq/tasks/dq_connection_builder.h @@ -11,7 +11,6 @@ template <class TStageInfoMeta, class TTaskMeta, class TInputMeta, class TOutput void CommonBuildTasks(TDqTasksGraph<TStageInfoMeta, TTaskMeta, TInputMeta, TOutputMeta>& graph, const NNodes::TDqPhyStage& stage) { ui32 partitionsCount = 1; - const auto stageSettings = NDq::TDqStageSettings::Parse(stage); auto& stageInfo = graph.GetStageInfo(stage); for (ui32 inputIndex = 0; inputIndex < stage.Inputs().Size(); ++inputIndex) { const auto& input = stage.Inputs().Item(inputIndex); @@ -32,12 +31,8 @@ void CommonBuildTasks(TDqTasksGraph<TStageInfoMeta, TTaskMeta, TInputMeta, TOutp if (auto maybeCnShuffle = input.Maybe<NNodes::TDqCnHashShuffle>()) { auto shuffle = maybeCnShuffle.Cast(); auto& originStageInfo = graph.GetStageInfo(shuffle.Output().Stage()); - if (stageSettings.IsExternalFunction) { - partitionsCount = stageSettings.MaxTransformConcurrency(); - } else { - partitionsCount = std::max(partitionsCount, (ui32)originStageInfo.Tasks.size() / 2); - partitionsCount = std::min(partitionsCount, 24u); - } + partitionsCount = std::max(partitionsCount, (ui32)originStageInfo.Tasks.size() / 2); + partitionsCount = std::min(partitionsCount, 24u); } else if (auto maybeCnMap = input.Maybe<NNodes::TDqCnMap>()) { auto cnMap = maybeCnMap.Cast(); auto& originStageInfo = graph.GetStageInfo(cnMap.Output().Stage()); @@ -46,12 +41,7 @@ void CommonBuildTasks(TDqTasksGraph<TStageInfoMeta, TTaskMeta, TInputMeta, TOutp } for (ui32 i = 0; i < partitionsCount; ++i) { - auto& task = graph.AddTask(stageInfo); - if (stageSettings.IsExternalFunction) { - auto& transform = task.OutputTransform; - transform.Type = stageSettings.TransformType; - //transform.FunctionName = stageSettings.TransformName; - } + graph.AddTask(stageInfo); } } @@ -60,7 +50,7 @@ void BuildUnionAllChannels(TGraph& graph, const typename TGraph::TStageInfoType& const typename TGraph::TStageInfoType& inputStageInfo, ui32 outputIndex, bool enableSpilling, const TChannelLogFunc& logFunc) { - YQL_ENSURE(stageInfo.Tasks.size() == 1, "Multiple tasks on union all input. StageId: " << stageInfo.Id); + YQL_ENSURE(stageInfo.Tasks.size() == 1, "Multiple tasks on union all input. StageId: " << stageInfo.Id << " " << stageInfo.Tasks.size()); auto& targetTask = graph.GetTask(stageInfo.Tasks[0]); for (auto& originTaskId : inputStageInfo.Tasks) { diff --git a/ydb/library/yql/dq/tasks/dq_tasks_graph.h b/ydb/library/yql/dq/tasks/dq_tasks_graph.h index ea2bc4030de..ee7cba6b71a 100644 --- a/ydb/library/yql/dq/tasks/dq_tasks_graph.h +++ b/ydb/library/yql/dq/tasks/dq_tasks_graph.h @@ -2,6 +2,7 @@ #include <ydb/library/yql/dq/expr_nodes/dq_expr_nodes.h> #include <ydb/library/yql/dq/proto/dq_tasks.pb.h> +#include <ydb/library/yql/ast/yql_expr.h> #include <library/cpp/actors/core/actorid.h> @@ -43,23 +44,8 @@ struct TStageInfo : private TMoveOnly { , InputsCount(stage.Inputs().Size()) , Meta(std::move(meta)) { - auto result = stage.Program().Body(); - auto resultType = result.Ref().GetTypeAnn(); - - if (resultType->GetKind() == ETypeAnnotationKind::Stream) { - auto resultItemType = resultType->Cast<TStreamExprType>()->GetItemType(); - if (resultItemType->GetKind() == ETypeAnnotationKind::Variant) { - auto underlyingType = resultItemType->Cast<TVariantExprType>()->GetUnderlyingType(); - YQL_ENSURE(underlyingType->GetKind() == ETypeAnnotationKind::Tuple); - OutputsCount = underlyingType->Cast<TTupleExprType>()->GetSize(); - YQL_ENSURE(OutputsCount > 1); - } else { - OutputsCount = 1; - } - } else { - YQL_ENSURE(resultType->GetKind() == ETypeAnnotationKind::Void, "got " << *resultType); - OutputsCount = 0; - } + auto stageResultTuple = stage.Ref().GetTypeAnn()->Cast<TTupleExprType>(); + OutputsCount = stageResultTuple->GetSize(); } TStageInfo(const NNodes::TDqPhyStage& stage, TStageInfoMeta&& meta) @@ -141,6 +127,15 @@ struct TTaskOutputType { }; }; +struct TTransform { + TString Type; + + TString InputType; + TString OutputType; + + ::google::protobuf::Any Settings; +}; + template <class TOutputMeta> struct TTaskOutput { ui32 Type = TTaskOutputType::Undefined; @@ -150,11 +145,7 @@ struct TTaskOutput { TMaybe<::google::protobuf::Any> SinkSettings; TString SinkType; TOutputMeta Meta; -}; - -struct TTransform { - TString Type; - ::google::protobuf::Any TransformSettings; + TMaybe<TTransform> Transform; }; template <class TStageInfoMeta, class TTaskMeta, class TInputMeta, class TOutputMeta> @@ -175,7 +166,6 @@ struct TTask { NActors::TActorId ComputeActorId; TTaskMeta Meta; NDqProto::ECheckpointingMode CheckpointingMode = NDqProto::CHECKPOINTING_MODE_DEFAULT; - TTransform OutputTransform; }; template <class TStageInfoMeta, class TTaskMeta, class TInputMeta, class TOutputMeta> diff --git a/ydb/library/yql/dq/type_ann/dq_type_ann.cpp b/ydb/library/yql/dq/type_ann/dq_type_ann.cpp index 31dca5102e2..db912c530ea 100644 --- a/ydb/library/yql/dq/type_ann/dq_type_ann.cpp +++ b/ydb/library/yql/dq/type_ann/dq_type_ann.cpp @@ -134,8 +134,7 @@ TStatus AnnotateStage(const TExprNode::TPtr& stage, TExprContext& ctx) { return TStatus::Repeat; } - TVector<const TTypeAnnotationNode*> resultTypesTuple; - + TVector<const TTypeAnnotationNode*> programResultTypesTuple; if (resultType->GetKind() == ETypeAnnotationKind::Void) { // do nothing, return empty tuple as program result } else { @@ -150,26 +149,81 @@ TStatus AnnotateStage(const TExprNode::TPtr& stage, TExprContext& ctx) { auto variantType = itemType->Cast<TVariantExprType>()->GetUnderlyingType(); YQL_ENSURE(variantType->GetKind() == ETypeAnnotationKind::Tuple); const auto& items = variantType->Cast<TTupleExprType>()->GetItems(); - resultTypesTuple.reserve(items.size()); + programResultTypesTuple.reserve(items.size()); for (const auto* branchType : items) { - resultTypesTuple.emplace_back(ctx.MakeType<TListExprType>(branchType)); + programResultTypesTuple.emplace_back(ctx.MakeType<TListExprType>(branchType)); } } else { - resultTypesTuple.emplace_back(ctx.MakeType<TListExprType>(itemType)); + programResultTypesTuple.emplace_back(ctx.MakeType<TListExprType>(itemType)); } } else { YQL_ENSURE(resultType->GetKind() != ETypeAnnotationKind::List, "stage: " << stage->Dump()); - resultTypesTuple.emplace_back(resultType); + programResultTypesTuple.emplace_back(resultType); } } + TVector<const TTypeAnnotationNode*> stageResultTypes; if (TDqStageBase::idx_Sinks < stage->ChildrenSize()) { + YQL_ENSURE(stage->Child(TDqStageBase::idx_Sinks)->ChildrenSize() != 0, "Sink list exists but empty, stage: " << stage->Dump()); + + auto outputsNumber = programResultTypesTuple.size(); + TVector<TExprNode::TPtr> transformSinks; + TVector<TExprNode::TPtr> pureSinks; + transformSinks.reserve(outputsNumber); + pureSinks.reserve(outputsNumber); for (const auto& sink: stage->Child(TDqStageBase::idx_Sinks)->Children()) { - sink->SetTypeAnn(resultType); + const ui64 index = FromString(sink->Child(TDqSink::idx_Index)->Content()); + if (index >= outputsNumber) { + ctx.AddError(TIssue(ctx.GetPosition(stage->Pos()), TStringBuilder() + << "Sink/Transform try to process un-existing lambda's output")); + return TStatus::Error; + } + + auto transformSettings = sink->Child(TDqSink::idx_Settings)->IsCallable(TTransformSettings::CallableName()); + if (transformSettings) { + transformSinks.push_back(sink); + } else { + pureSinks.push_back(sink); + } } + + if (!transformSinks.empty() && !pureSinks.empty() + && transformSinks.size() != pureSinks.size()) { + + ctx.AddError(TIssue(ctx.GetPosition(stage->Pos()), TStringBuilder() + << "Not every transform has a corresponding sink")); + return TStatus::Error; + } + + if (!pureSinks.empty()) { + for (auto sink : pureSinks) { + sink->SetTypeAnn(resultType); + } + stageResultTypes.assign(programResultTypesTuple.begin(), programResultTypesTuple.end()); + } else { + for (auto sink : transformSinks) { + auto* sinkType = sink->GetTypeAnn(); + if (sinkType->GetKind() != ETypeAnnotationKind::List + && sinkType->GetKind() != ETypeAnnotationKind::Void) { + + ctx.AddError(TIssue(ctx.GetPosition(sink->Pos()), TStringBuilder() + << "Expected List or Void type, but got: " << *sinkType)); + return TStatus::Error; + } + /* auto* itemType = sinkType->Cast<TListExprType>()->GetItemType(); + if (itemType->GetKind() != ETypeAnnotationKind::Struct) { + ctx.AddError(TIssue(ctx.GetPosition(sink->Pos()), TStringBuilder() + << "Expected List<Struct<...>> type, but got: List<" << *itemType << ">")); + return TStatus::Error; + } */ + stageResultTypes.emplace_back(sinkType); + } + } + } else { + stageResultTypes.assign(programResultTypesTuple.begin(), programResultTypesTuple.end()); } - stage->SetTypeAnn(ctx.MakeType<TTupleExprType>(resultTypesTuple)); + stage->SetTypeAnn(ctx.MakeType<TTupleExprType>(stageResultTypes)); return TStatus::Ok; } @@ -783,6 +837,26 @@ TStatus AnnotateDqQuery(const TExprNode::TPtr& input, TExprContext& ctx) { return TStatus::Ok; } +TStatus AnnotateTransformSettings(const TExprNode::TPtr& input, TExprContext& ctx) { + if (!EnsureArgsCount(*input, 4U, ctx)) { + return TStatus::Error; + } + + const TExprNode* outputArg = input->Child(TTransformSettings::idx_OutputType); + if (!EnsureTypeWithStructType(*outputArg, ctx)) { + return IGraphTransformer::TStatus::Error; + } + const TTypeAnnotationNode* outputType = outputArg->GetTypeAnn()->Cast<TTypeExprType>()->GetType(); + + const TExprNode* inputType = input->Child(TTransformSettings::idx_InputType); + if (!EnsureTypeWithStructType(*inputType, ctx)) { + return IGraphTransformer::TStatus::Error; + } + + input->SetTypeAnn(ctx.MakeType<TListExprType>(outputType)); + return TStatus::Ok; +} + THolder<IGraphTransformer> CreateDqTypeAnnotationTransformer(TTypeAnnotationContext& typesCtx) { auto coreTransformer = CreateExtCallableTypeAnnotationTransformer(typesCtx); @@ -862,6 +936,10 @@ THolder<IGraphTransformer> CreateDqTypeAnnotationTransformer(TTypeAnnotationCont return AnnotateDqSink(input, ctx); } + if (TTransformSettings::Match(input.Get())) { + return AnnotateTransformSettings (input, ctx); + } + if (TDqQuery::Match(input.Get())) { return AnnotateDqQuery(input, ctx); } diff --git a/ydb/library/yql/dq/type_ann/dq_type_ann.h b/ydb/library/yql/dq/type_ann/dq_type_ann.h index 03aefe701ff..84f8afe289e 100644 --- a/ydb/library/yql/dq/type_ann/dq_type_ann.h +++ b/ydb/library/yql/dq/type_ann/dq_type_ann.h @@ -22,6 +22,7 @@ IGraphTransformer::TStatus AnnotateDqCrossJoin(const TExprNode::TPtr& input, TEx IGraphTransformer::TStatus AnnotateDqSource(const TExprNode::TPtr& input, TExprContext& ctx); IGraphTransformer::TStatus AnnotateDqSink(const TExprNode::TPtr& input, TExprContext& ctx); IGraphTransformer::TStatus AnnotateDqQuery(const TExprNode::TPtr& input, TExprContext& ctx); +IGraphTransformer::TStatus AnnotateTransformSettings(const TExprNode::TPtr& input, TExprContext& ctx); THolder<IGraphTransformer> CreateDqTypeAnnotationTransformer(NYql::TTypeAnnotationContext& typesCtx); diff --git a/ydb/library/yql/providers/common/dq/yql_dq_integration_impl.cpp b/ydb/library/yql/providers/common/dq/yql_dq_integration_impl.cpp index 4da4d13d034..d2de27ec495 100644 --- a/ydb/library/yql/providers/common/dq/yql_dq_integration_impl.cpp +++ b/ydb/library/yql/providers/common/dq/yql_dq_integration_impl.cpp @@ -54,6 +54,11 @@ void TDqIntegrationBase::FillSinkSettings(const TExprNode& node, ::google::proto Y_UNUSED(sinkType); } +void TDqIntegrationBase::FillTransformSettings(const TExprNode& node, ::google::protobuf::Any& settings) { + Y_UNUSED(node); + Y_UNUSED(settings); +} + void TDqIntegrationBase::Annotate(const TExprNode& node, THashMap<TString, TString>& params) { Y_UNUSED(node); Y_UNUSED(params); diff --git a/ydb/library/yql/providers/common/dq/yql_dq_integration_impl.h b/ydb/library/yql/providers/common/dq/yql_dq_integration_impl.h index cd1c150ebfc..60638f811a5 100644 --- a/ydb/library/yql/providers/common/dq/yql_dq_integration_impl.h +++ b/ydb/library/yql/providers/common/dq/yql_dq_integration_impl.h @@ -15,6 +15,7 @@ public: bool CanFallback() override; void FillSourceSettings(const TExprNode& node, ::google::protobuf::Any& settings, TString& sourceType) override; void FillSinkSettings(const TExprNode& node, ::google::protobuf::Any& settings, TString& sinkType) override; + void FillTransformSettings(const TExprNode& node, ::google::protobuf::Any& settings) override; void Annotate(const TExprNode& node, THashMap<TString, TString>& params) override; bool PrepareFullResultTableParams(const TExprNode& root, TExprContext& ctx, THashMap<TString, TString>& params, THashMap<TString, TString>& secureParams) override; void WriteFullResultTableRef(NYson::TYsonWriter& writer, const TVector<TString>& columns, const THashMap<TString, TString>& graphParams) override; diff --git a/ydb/library/yql/providers/dq/interface/CMakeLists.txt b/ydb/library/yql/providers/dq/interface/CMakeLists.txt index 69c5b9be066..1d60ad149d1 100644 --- a/ydb/library/yql/providers/dq/interface/CMakeLists.txt +++ b/ydb/library/yql/providers/dq/interface/CMakeLists.txt @@ -18,6 +18,7 @@ target_link_libraries(providers-dq-interface PUBLIC library-cpp-yson library-yql-ast library-yql-core + yql-dq-tasks ) target_sources(providers-dq-interface PRIVATE ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/dq/interface/yql_dq_integration.cpp diff --git a/ydb/library/yql/providers/dq/interface/yql_dq_integration.h b/ydb/library/yql/providers/dq/interface/yql_dq_integration.h index f55fdb03e2a..d9980511cca 100644 --- a/ydb/library/yql/providers/dq/interface/yql_dq_integration.h +++ b/ydb/library/yql/providers/dq/interface/yql_dq_integration.h @@ -2,6 +2,7 @@ #include <ydb/library/yql/core/yql_data_provider.h> #include <ydb/library/yql/ast/yql_expr.h> +#include <ydb/library/yql/dq/tasks/dq_tasks_graph.h> #include <library/cpp/yson/writer.h> @@ -36,6 +37,7 @@ public: virtual bool CanFallback() = 0; virtual void FillSourceSettings(const TExprNode& node, ::google::protobuf::Any& settings, TString& sourceType) = 0; virtual void FillSinkSettings(const TExprNode& node, ::google::protobuf::Any& settings, TString& sinkType) = 0; + virtual void FillTransformSettings(const TExprNode& node, ::google::protobuf::Any& settings) = 0; virtual void Annotate(const TExprNode& node, THashMap<TString, TString>& params) = 0; virtual bool PrepareFullResultTableParams(const TExprNode& root, TExprContext& ctx, THashMap<TString, TString>& params, THashMap<TString, TString>& secureParams) = 0; virtual void WriteFullResultTableRef(NYson::TYsonWriter& writer, const TVector<TString>& columns, const THashMap<TString, TString>& graphParams) = 0; diff --git a/ydb/library/yql/providers/dq/opt/physical_optimize.cpp b/ydb/library/yql/providers/dq/opt/physical_optimize.cpp index f574c17433f..c9171de68c1 100644 --- a/ydb/library/yql/providers/dq/opt/physical_optimize.cpp +++ b/ydb/library/yql/providers/dq/opt/physical_optimize.cpp @@ -41,8 +41,6 @@ public: AddHandler(0, &TCoAssumeSorted::Match, HNDL(BuildSortStage<false>)); AddHandler(0, &TCoOrderedLMap::Match, HNDL(PushOrderedLMapToStage<false>)); AddHandler(0, &TCoLMap::Match, HNDL(PushLMapToStage<false>)); - // (Apply (SqlExternalFunction ..) ..) to stage - AddHandler(0, &TCoApply::Match, HNDL(BuildExtFunctionStage<false>)); #if 0 AddHandler(0, &TCoHasItems::Match, HNDL(BuildHasItems)); AddHandler(0, &TCoToOptional::Match, HNDL(BuildScalarPrecompute)); @@ -60,7 +58,6 @@ public: AddHandler(1, &TCoAssumeSorted::Match, HNDL(BuildSortStage<true>)); AddHandler(1, &TCoOrderedLMap::Match, HNDL(PushOrderedLMapToStage<true>)); AddHandler(1, &TCoLMap::Match, HNDL(PushLMapToStage<true>)); - AddHandler(1, &TCoApply::Match, HNDL(BuildExtFunctionStage<true>)); #undef HNDL SetGlobal(1u); @@ -232,11 +229,6 @@ protected: } template <bool IsGlobal> - TMaybeNode<TExprBase> BuildExtFunctionStage(TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx, const TGetParents& getParents) { - return DqBuildExtFunctionStage(node, ctx, optCtx, *getParents(), IsGlobal); - } - - template <bool IsGlobal> TMaybeNode<TExprBase> PushCombineToStage(TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx, const TGetParents& getParents) { return DqPushCombineToStage(node, ctx, optCtx, *getParents(), IsGlobal); } diff --git a/ydb/library/yql/providers/dq/planner/execution_planner.cpp b/ydb/library/yql/providers/dq/planner/execution_planner.cpp index f6de66104a6..23b8191cacd 100644 --- a/ydb/library/yql/providers/dq/planner/execution_planner.cpp +++ b/ydb/library/yql/providers/dq/planner/execution_planner.cpp @@ -10,6 +10,8 @@ #include <ydb/library/yql/providers/dq/common/yql_dq_common.h> #include <ydb/library/yql/providers/common/provider/yql_provider.h> #include <ydb/library/yql/providers/common/mkql/yql_type_mkql.h> +#include <ydb/library/yql/providers/common/schema/mkql/yql_mkql_schema.h> +#include <ydb/library/yql/providers/common/schema/expr/yql_expr_schema.h> #include <ydb/library/yql/core/yql_expr_optimize.h> #include <ydb/library/yql/core/type_ann/type_ann_expr.h> @@ -166,19 +168,46 @@ namespace NYql::NDqs { YQL_ENSURE(datasink); auto dqIntegration = (*datasink)->GetDqIntegration(); YQL_ENSURE(dqIntegration, "DqSink assumes that datasink has a dq integration impl"); + + TTransform stageTransform; TString sinkType; ::google::protobuf::Any sinkSettings; - dqIntegration->FillSinkSettings(sink.Ref(), sinkSettings, sinkType); - YQL_ENSURE(!sinkSettings.type_url().empty(), "Data sink provider \"" << dataSinkName << "\" did't fill dq sink settings for its dq sink node"); - YQL_ENSURE(sinkType, "Data sink provider \"" << dataSinkName << "\" did't fill dq sink settings type for its dq sink node"); + + auto transformSettings = sink.Settings().Maybe<TTransformSettings>(); + if (transformSettings) { + auto settings = transformSettings.Cast(); + + stageTransform.Type = settings.Type(); + const auto inputType = settings.InputType().Ref().GetTypeAnn()->Cast<TTypeExprType>()->GetType(); + stageTransform.InputType = NCommon::WriteTypeToYson(inputType); + const auto outputType = settings.OutputType().Ref().GetTypeAnn()->Cast<TTypeExprType>()->GetType(); + stageTransform.OutputType = NCommon::WriteTypeToYson(outputType); + + dqIntegration->FillTransformSettings(sink.Ref(), stageTransform.Settings); + } else { + dqIntegration->FillSinkSettings(sink.Ref(), sinkSettings, sinkType); + YQL_ENSURE(!sinkSettings.type_url().empty(), "Data sink provider \"" << dataSinkName << "\" did't fill dq sink settings for its dq sink node"); + YQL_ENSURE(sinkType, "Data sink provider \"" << dataSinkName << "\" did't fill dq sink settings type for its dq sink node"); + } for (ui64 taskId : stageInfo.Tasks) { auto& task = TasksGraph.GetTask(taskId); YQL_ENSURE(index < task.Outputs.size()); auto& output = task.Outputs[index]; - output.SinkType = sinkType; - output.SinkSettings = sinkSettings; - output.Type = NDq::TTaskOutputType::Sink; + if (transformSettings) { + + output.Transform.ConstructInPlace(); + + auto& transform = output.Transform; + transform->Type = stageTransform.Type; + transform->InputType = stageTransform.InputType; + transform->OutputType = stageTransform.OutputType; + //transform->Settings = stageTransform.Settings; + } else { + output.SinkType = sinkType; + output.SinkSettings = sinkSettings; + output.Type = NDq::TTaskOutputType::Sink; + } } } } @@ -475,11 +504,6 @@ namespace NYql::NDqs { task.Inputs[dqSourceInputIndex].SourceSettings = sourceSettings; task.Inputs[dqSourceInputIndex].SourceType = sourceType; } - if (stageSettings.IsExternalFunction) { - auto& transform = task.OutputTransform; - transform.Type = stageSettings.TransformType; - //transform.FunctionName = stageSettings.TransformName; - } } } return !parts.empty(); @@ -642,7 +666,7 @@ namespace NYql::NDqs { } case TTaskOutputType::Undefined: { - YQL_ENSURE(false, "Unexpected task output type `TTaskOutputType::Undefined`"); + YQL_ENSURE(output.Transform, "Unexpected task output type `TTaskOutputType::Undefined`"); } } @@ -650,6 +674,16 @@ namespace NYql::NDqs { auto& channelDesc = *outputDesc.AddChannels(); FillChannelDesc(channelDesc, TasksGraph.GetChannel(channel)); } + + if (output.Transform) { + auto* transformDesc = outputDesc.MutableTransform(); + auto& transform = output.Transform; + + transformDesc->SetType(transform->Type); + transformDesc->SetInputType(transform->InputType); + transformDesc->SetOutputType(transform->OutputType); + *transformDesc->MutableSettings() = transform->Settings; + } } diff --git a/ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp b/ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp index e4c29bef408..7fcc73768b8 100644 --- a/ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp +++ b/ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp @@ -1051,7 +1051,6 @@ private: executionPlanner.Destroy(); int level = 0; - // TODO: remove copy-paste return WrapFutureCallback(future, [settings, startTime, localRun, type, fillSettings, level, graphParams, columns, enableFullResultWrite, state = State](const IDqGateway::TResult& res, const TExprNode::TPtr& input, TExprNode::TPtr& output, TExprContext& ctx) { YQL_LOG(DEBUG) << state->SessionId << " WrapFutureCallback"; diff --git a/ydb/library/yql/providers/dq/provider/yql_dq_datasink_type_ann.cpp b/ydb/library/yql/providers/dq/provider/yql_dq_datasink_type_ann.cpp index c8d9786a7a0..7d7f7b8fee7 100644 --- a/ydb/library/yql/providers/dq/provider/yql_dq_datasink_type_ann.cpp +++ b/ydb/library/yql/providers/dq/provider/yql_dq_datasink_type_ann.cpp @@ -34,6 +34,7 @@ public: AddHandler({TDqSink::CallableName()}, Hndl(&NDq::AnnotateDqSink)); AddHandler({TDqWrite::CallableName()}, Hndl(&TDqsDataSinkTypeAnnotationTransformer::AnnotateDqWrite)); AddHandler({TDqQuery::CallableName()}, Hndl(&NDq::AnnotateDqQuery)); + AddHandler({TTransformSettings::CallableName()}, Hndl(&NDq::AnnotateTransformSettings)); } private: diff --git a/ydb/library/yql/providers/function/expr_nodes/dq_function_expr_nodes.h b/ydb/library/yql/providers/function/expr_nodes/dq_function_expr_nodes.h index 1d3b8923c0b..8d018f438b3 100644 --- a/ydb/library/yql/providers/function/expr_nodes/dq_function_expr_nodes.h +++ b/ydb/library/yql/providers/function/expr_nodes/dq_function_expr_nodes.h @@ -34,6 +34,32 @@ public: } }; +class TFunctionDataSink: public NGenerated::TFunctionDataSinkStub<TExprBase, TCallable, TCoAtom> { +public: + explicit TFunctionDataSink(const TExprNode* node) + : TFunctionDataSinkStub(node) + { + } + + explicit TFunctionDataSink(const TExprNode::TPtr& node) + : TFunctionDataSinkStub(node) + { + } + + static bool Match(const TExprNode* node) { + if (!TFunctionDataSinkStub::Match(node)) { + return false; + } + + if (node->Child(0)->Content() != FunctionProviderName) { + return false; + } + + return true; + } +}; + + #include <ydb/library/yql/providers/function/expr_nodes/dq_function_expr_nodes.defs.inl.h> } // namespace NNodes diff --git a/ydb/library/yql/providers/function/expr_nodes/dq_function_expr_nodes.json b/ydb/library/yql/providers/function/expr_nodes/dq_function_expr_nodes.json index 16084c87559..c70a72fe51e 100644 --- a/ydb/library/yql/providers/function/expr_nodes/dq_function_expr_nodes.json +++ b/ydb/library/yql/providers/function/expr_nodes/dq_function_expr_nodes.json @@ -13,8 +13,18 @@ "Children": [ {"Index": 0, "Name": "Category", "Type": "TCoAtom"}, {"Index": 1, "Name": "Type", "Type": "TCoAtom"}, - {"Index": 2, "Name": "FunctionName", "Type": "TCoAtom"}, - {"Index": 3, "Name": "Connection", "Type": "TCoAtom"} + {"Index": 2, "Name": "Connection", "Type": "TCoAtom"} + ] + }, + { + "Name": "TFunctionDataSink", + "Base": "TCallable", + "Definition": "Custom", + "Match": {"Type": "Callable", "Name": "DataSink"}, + "Children": [ + {"Index": 0, "Name": "Category", "Type": "TCoAtom"}, + {"Index": 1, "Name": "Type", "Type": "TCoAtom"}, + {"Index": 2, "Name": "Connection", "Type": "TCoAtom"} ] } ] diff --git a/ydb/library/yql/providers/function/gateway/dq_function_gateway.h b/ydb/library/yql/providers/function/gateway/dq_function_gateway.h index 1404b7709c3..eb75d7d1bf1 100644 --- a/ydb/library/yql/providers/function/gateway/dq_function_gateway.h +++ b/ydb/library/yql/providers/function/gateway/dq_function_gateway.h @@ -14,7 +14,7 @@ class IDqFunctionGateway { public: using TPtr = std::shared_ptr<IDqFunctionGateway>; - virtual NThreading::TFuture<TDqFunctionDescription> ResolveFunction(const TString& folderId, const TString& functionName); + virtual NThreading::TFuture<TDqFunctionDescription> ResolveFunction(const TString& folderId, const TString& functionName) = 0; virtual ~IDqFunctionGateway() = default; }; diff --git a/ydb/library/yql/providers/function/proto/CMakeLists.txt b/ydb/library/yql/providers/function/proto/CMakeLists.txt new file mode 100644 index 00000000000..93df0adeaab --- /dev/null +++ b/ydb/library/yql/providers/function/proto/CMakeLists.txt @@ -0,0 +1,31 @@ + +# This file was gererated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(providers-function-proto) +target_link_libraries(providers-function-proto PUBLIC + contrib-libs-cxxsupp + yutil + contrib-libs-protobuf +) +target_proto_messages(providers-function-proto PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/function/proto/dq_function.proto +) +target_proto_addincls(providers-function-proto + ./ + ${CMAKE_SOURCE_DIR}/ + ${CMAKE_BINARY_DIR} + ${CMAKE_SOURCE_DIR} + ${CMAKE_SOURCE_DIR}/contrib/libs/protobuf/src + ${CMAKE_BINARY_DIR} + ${CMAKE_SOURCE_DIR}/contrib/libs/protobuf/src +) +target_proto_outs(providers-function-proto + --cpp_out=${CMAKE_BINARY_DIR}/ + --cpp_styleguide_out=${CMAKE_BINARY_DIR}/ +) diff --git a/ydb/library/yql/providers/function/proto/dq_function.proto b/ydb/library/yql/providers/function/proto/dq_function.proto new file mode 100644 index 00000000000..494fcb882c8 --- /dev/null +++ b/ydb/library/yql/providers/function/proto/dq_function.proto @@ -0,0 +1,8 @@ +syntax = "proto3"; +option cc_enable_arenas = true; + +package NYql.NProto; + +message TFunctionTransform { + string InvokeUrl = 1; +} diff --git a/ydb/library/yql/providers/function/provider/CMakeLists.txt b/ydb/library/yql/providers/function/provider/CMakeLists.txt index 6daea338182..ab4c8e6f4d8 100644 --- a/ydb/library/yql/providers/function/provider/CMakeLists.txt +++ b/ydb/library/yql/providers/function/provider/CMakeLists.txt @@ -15,16 +15,24 @@ target_link_libraries(providers-function-provider PUBLIC contrib-libs-cxxsupp yutil common-token_accessor-client + providers-common-dq providers-common-provider + common-schema-mkql providers-function-expr_nodes providers-function-common providers-function-gateway + providers-function-proto library-yql-core yql-core-expr_nodes + yql-dq-expr_nodes + yql-dq-opt ) target_sources(providers-function-provider PRIVATE ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/function/provider/dq_function_load_meta.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/function/provider/dq_function_intent_transformer.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/function/provider/dq_function_provider.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/function/provider/dq_function_datasource.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/function/provider/dq_function_datasink.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/function/provider/dq_function_physical_optimize.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/function/provider/dq_function_dq_integration.cpp ) diff --git a/ydb/library/yql/providers/function/provider/dq_function_datasink.cpp b/ydb/library/yql/providers/function/provider/dq_function_datasink.cpp new file mode 100644 index 00000000000..0e1335ddf97 --- /dev/null +++ b/ydb/library/yql/providers/function/provider/dq_function_datasink.cpp @@ -0,0 +1,85 @@ +#include "dq_function_provider_impl.h" + +#include <ydb/library/yql/core/expr_nodes/yql_expr_nodes.h> +#include <ydb/library/yql/core/yql_expr_type_annotation.h> +#include <ydb/library/yql/providers/common/provider/yql_data_provider_impl.h> +#include <ydb/library/yql/providers/common/provider/yql_provider_names.h> +#include <ydb/library/yql/dq/expr_nodes/dq_expr_nodes.h> +#include <ydb/library/yql/providers/function/expr_nodes/dq_function_expr_nodes.h> + +namespace NYql::NDqFunction { + +namespace { + +using namespace NNodes; + +class TDqFunctionDataSink : public TDataProviderBase { +public: + TDqFunctionDataSink(TDqFunctionState::TPtr state) + : State(std::move(state)) + , PhysicalOptTransformer(CreateDqFunctionPhysicalOptTransformer(State)) + , LoadMetaDataTransformer(CreateDqFunctionMetaLoader(State)) + , IntentDeterminationTransformer(CreateDqFunctionIntentTransformer(State)) + , DqIntegration(CreateDqFunctionDqIntegration(State)) + {} + + TStringBuf GetName() const override { + return FunctionProviderName; + } + + bool ValidateParameters(TExprNode& node, TExprContext& ctx, TMaybe<TString>& cluster) override { + if (node.IsCallable(TCoDataSink::CallableName())) { + // (DataSink 'Function 'CloudFunction 'connection_name) + if (!EnsureArgsCount(node, 3, ctx)) { + return false; + } + if (node.Head().Content() == FunctionProviderName) { + TDqFunctionType functionType{node.Child(1)->Content()}; + if (!State->GatewayFactory->IsKnownFunctionType(functionType)) { + ctx.AddError(TIssue(ctx.GetPosition(node.Pos()), TStringBuilder() << + "Unknown EXTERNAL FUNCTION type '" << functionType << "'")); + return false; + } + cluster = Nothing(); + return true; + } + } + + ctx.AddError(TIssue(ctx.GetPosition(node.Pos()), "Invalid ExternalFunction DataSink parameters")); + return false; + } + + bool CanParse(const TExprNode& node) override { + return node.IsCallable(TDqSqlExternalFunction::CallableName()); + } + + IGraphTransformer& GetPhysicalOptProposalTransformer() override { + return *PhysicalOptTransformer; + } + + IGraphTransformer& GetLoadTableMetadataTransformer() override { + return *LoadMetaDataTransformer; + } + + IGraphTransformer& GetIntentDeterminationTransformer() override { + return *IntentDeterminationTransformer; + } + + IDqIntegration* GetDqIntegration() override { + return DqIntegration.Get(); + } + +private: + const TDqFunctionState::TPtr State; + const THolder<IGraphTransformer> PhysicalOptTransformer; + const THolder<IGraphTransformer> LoadMetaDataTransformer; + const THolder<TVisitorTransformerBase> IntentDeterminationTransformer; + const THolder<IDqIntegration> DqIntegration; +}; +} // namespace + +TIntrusivePtr<IDataProvider> CreateDqFunctionDataSink(TDqFunctionState::TPtr state) { + return new TDqFunctionDataSink(std::move(state)); +} + +}
\ No newline at end of file diff --git a/ydb/library/yql/providers/function/provider/dq_function_datasource.cpp b/ydb/library/yql/providers/function/provider/dq_function_datasource.cpp index 0889cdb7112..7519e7a6a28 100644 --- a/ydb/library/yql/providers/function/provider/dq_function_datasource.cpp +++ b/ydb/library/yql/providers/function/provider/dq_function_datasource.cpp @@ -16,7 +16,6 @@ public: TDqFunctionDataSource(TDqFunctionState::TPtr state) : State(std::move(state)) , LoadMetaDataTransformer(CreateDqFunctionMetaLoader(State)) - , IntentDeterminationTransformer(CreateDqFunctionIntentTransformer(State)) {} TStringBuf GetName() const override { @@ -25,8 +24,7 @@ public: bool ValidateParameters(TExprNode& node, TExprContext& ctx, TMaybe<TString>& cluster) override { if (node.IsCallable(TCoDataSource::CallableName())) { - //(DataSource 'ExternalFunction 'CloudFunction 'my_function_name 'connection_name) - if (!EnsureArgsCount(node, 4, ctx)) { + if (!EnsureArgsCount(node, 3, ctx)) { return false; } if (node.Head().Content() == FunctionProviderName) { @@ -49,14 +47,9 @@ public: return *LoadMetaDataTransformer; } - IGraphTransformer& GetIntentDeterminationTransformer() override { - return *IntentDeterminationTransformer; - } - private: const TDqFunctionState::TPtr State; const THolder<IGraphTransformer> LoadMetaDataTransformer; - const THolder<TVisitorTransformerBase> IntentDeterminationTransformer; }; } diff --git a/ydb/library/yql/providers/function/provider/dq_function_dq_integration.cpp b/ydb/library/yql/providers/function/provider/dq_function_dq_integration.cpp new file mode 100644 index 00000000000..56db4e968d0 --- /dev/null +++ b/ydb/library/yql/providers/function/provider/dq_function_dq_integration.cpp @@ -0,0 +1,71 @@ +#include "dq_function_provider_impl.h" + +#include <ydb/library/yql/providers/common/dq/yql_dq_integration_impl.h> +#include <ydb/library/yql/dq/expr_nodes/dq_expr_nodes.h> +#include <ydb/library/yql/providers/function/expr_nodes/dq_function_expr_nodes.h> +#include <ydb/library/yql/providers/function/proto/dq_function.pb.h> +#include <ydb/library/yql/providers/common/schema/mkql/yql_mkql_schema.h> +#include <ydb/library/yql/providers/common/schema/expr/yql_expr_schema.h> + +#include <util/generic/ptr.h> + +namespace NYql::NDqFunction { +namespace { + +using namespace NNodes; + +class TDqFunctionDqIntegration: public TDqIntegrationBase { +public: + TDqFunctionDqIntegration(TDqFunctionState::TPtr state) + : State(state) + { + } + + void FillTransformSettings(const TExprNode& node, ::google::protobuf::Any& transformSettings) override { + auto maybeDqSink = TMaybeNode<TDqSink>(&node); + if (!maybeDqSink) + return; + + auto dqSink = maybeDqSink.Cast(); + auto maybeSettings = TMaybeNode<TTransformSettings>(dqSink.Settings().Raw()); + if (!maybeSettings) { + return; + } + + auto functionSettings = maybeSettings.Cast(); + NYql::NProto::TFunctionTransform transform; + for (size_t i = 0; i < functionSettings.Other().Size(); i++) { + TCoNameValueTuple setting = functionSettings.Other().Item(i); + const TStringBuf name = Name(setting); + if (name == "invoke_url") { + transform.SetInvokeUrl(TString(Value(setting))); + } + } + + transformSettings.PackFrom(transform); + } + + static TStringBuf Name(const TCoNameValueTuple& nameValue) { + return nameValue.Name().Value(); + } + + static TStringBuf Value(const TCoNameValueTuple& nameValue) { + if (TMaybeNode<TExprBase> maybeValue = nameValue.Value()) { + const TExprNode& value = maybeValue.Cast().Ref(); + YQL_ENSURE(value.IsAtom()); + return value.Content(); + } + + return {}; + } + +private: + TDqFunctionState::TPtr State; +}; +} // namespace + +THolder<IDqIntegration> CreateDqFunctionDqIntegration(TDqFunctionState::TPtr state) { + return MakeHolder<TDqFunctionDqIntegration>(state); +} + +}
\ No newline at end of file diff --git a/ydb/library/yql/providers/function/provider/dq_function_intent_transformer.cpp b/ydb/library/yql/providers/function/provider/dq_function_intent_transformer.cpp index d978ffde027..1b0ceaffc27 100644 --- a/ydb/library/yql/providers/function/provider/dq_function_intent_transformer.cpp +++ b/ydb/library/yql/providers/function/provider/dq_function_intent_transformer.cpp @@ -2,6 +2,7 @@ #include <ydb/library/yql/providers/common/transform/yql_visit.h> #include <ydb/library/yql/providers/function/expr_nodes/dq_function_expr_nodes.h> +#include <ydb/library/yql/dq/expr_nodes/dq_expr_nodes.h> namespace NYql::NDqFunction { namespace { @@ -14,21 +15,28 @@ public: : TVisitorTransformerBase(false) , State(state) { - AddHandler({TFunctionDataSource::CallableName()}, Hndl(&TDqFunctionIntentTransformer::HandleDataSource)); + AddHandler({TDqSqlExternalFunction::CallableName()}, Hndl(&TDqFunctionIntentTransformer::ExtractFunctionsName)); } - TStatus HandleDataSource(TExprBase input, TExprContext& ctx) { - Y_UNUSED(ctx); + TStatus ExtractFunctionsName(TExprBase input, TExprContext& ctx) { + auto sqlFunction = input.Cast<TDqSqlExternalFunction>(); + auto functionType = TString{sqlFunction.TransformType().Ref().Tail().Content()}; + auto functionName = TString{sqlFunction.TransformName().Ref().Tail().Content()}; + TString connection; + for (const auto &tuple: sqlFunction.Settings().Ref().Children()) { + const auto paramName = tuple->Head().Content(); + if (paramName == "connection") { + connection = TString{tuple->Tail().Tail().Content()}; + } + } + + if (connection.empty()) { + ctx.AddError(TIssue(ctx.GetPosition(input.Pos()), TStringBuilder() + << "Empty CONNECTION name for EXTERNAL FUNCTION '" << functionName << "'")); + return TStatus::Error; + } - if (!TFunctionDataSource::Match(input.Raw())) - return TStatus::Ok; - - auto source = input.Cast<TFunctionDataSource>(); - TDqFunctionType functionType{source.Type().Value()}; - TString functionName{source.FunctionName().Value()}; - TString connection{source.Connection().Value()}; State->FunctionsResolver->AddFunction(functionType, functionName, connection); - return TStatus::Ok; } diff --git a/ydb/library/yql/providers/function/provider/dq_function_load_meta.cpp b/ydb/library/yql/providers/function/provider/dq_function_load_meta.cpp index 9f6b9beca12..1dcab11d1fc 100644 --- a/ydb/library/yql/providers/function/provider/dq_function_load_meta.cpp +++ b/ydb/library/yql/providers/function/provider/dq_function_load_meta.cpp @@ -35,16 +35,18 @@ public: std::vector<NThreading::TFuture<void>> resolverHandles; resolverHandles.reserve(functions.size()); auto resolverContext = ResolverContext; + const auto position = ctx.GetPosition(input->Pos()); for (auto functionDesc : functions) { auto gateway = State->GatewayFactory->CreateDqFunctionGateway( - functionDesc.Type, {}, functionDesc.Connection); + functionDesc.Type, State->SecureParams, functionDesc.Connection); + auto future = gateway->ResolveFunction(State->ScopeFolderId, functionDesc.FunctionName); - resolverHandles.push_back(future.Apply([resolverContext] + resolverHandles.push_back(future.Apply([resolverContext, position] (const NThreading::TFuture<TDqFunctionDescription>& future) { try { resolverContext->FunctionsDescription.emplace(future.GetValue()); } catch (const std::exception& e) { - resolverContext->ResolveIssues.push_back(ExceptionToIssue(e)); + resolverContext->ResolveIssues.push_back(ExceptionToIssue(e, position)); } })); } @@ -63,6 +65,7 @@ public: } TStatus DoApplyAsyncChanges(TExprNode::TPtr input, TExprNode::TPtr& output, TExprContext& ctx) final { + Y_UNUSED(ctx); YQL_ENSURE(AllFutures.HasValue()); output = input; diff --git a/ydb/library/yql/providers/function/provider/dq_function_physical_optimize.cpp b/ydb/library/yql/providers/function/provider/dq_function_physical_optimize.cpp new file mode 100644 index 00000000000..4bf88da9885 --- /dev/null +++ b/ydb/library/yql/providers/function/provider/dq_function_physical_optimize.cpp @@ -0,0 +1,182 @@ +#include "dq_function_provider_impl.h" + +#include <ydb/library/yql/dq/expr_nodes/dq_expr_nodes.h> +#include <ydb/library/yql/providers/common/transform/yql_optimize.h> +#include <ydb/library/yql/dq/opt/dq_opt_phy.h> +#include <ydb/library/yql/providers/function/expr_nodes/dq_function_expr_nodes.h> +#include <ydb/library/yql/providers/common/provider/yql_provider_names.h> + +namespace NYql::NDqFunction { +namespace { + +using namespace NNodes; +using namespace NDq; + +class TDqFunctionPhysicalOptTransformer : public TOptimizeTransformerBase { +public: + TDqFunctionPhysicalOptTransformer(TDqFunctionState::TPtr state) + : TOptimizeTransformerBase(state->Types, NLog::EComponent::ProviderDq, {}) + , State(state) + { +#define HNDL(name) "PhysicalOptimizer-"#name, Hndl(&TDqFunctionPhysicalOptTransformer::name) + // (Apply (SqlExternalFunction ..) ..) to stage + AddHandler(0, &TCoApply::Match, HNDL(DqBuildExtFunctionStage)); +#undef HNDL + + SetGlobal(0); // Stage 0 of this optimizer is global => we can remap nodes. + } + + TMaybeNode<TExprBase> DqBuildExtFunctionStage(TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx, const TGetParents& getParents) const { + Y_UNUSED(optCtx); + + auto apply = node.Cast<TCoApply>(); + auto callable = apply.Callable().Maybe<TDqSqlExternalFunction>(); + if (!callable + || apply.Args().Count() != 2 + || !apply.Arg(1).Maybe<TDqCnUnionAll>()) { + + return node; + } + callable = callable.Cast(); + TDqCnUnionAll nodeInput {apply.Arg(1).Cast<TDqCnUnionAll>()}; + + const TParentsMap* parentsMap = getParents(); + if (!IsSingleConsumerConnection(nodeInput, *parentsMap, false)) { + YQL_ENSURE(false, "Allow only single external function stage usage"); + } + + const auto shuffleColumn = Build<TCoAtom>(ctx, node.Pos()) + .Value("_yql_transform_shuffle") + .Done(); + auto addShuffleColumn = Build<TCoLambda>(ctx, node.Pos()) + .Args({"stream"}) + .Body<TCoMap>() + .Input("stream") + .Lambda() + .Args({"row"}) + .Body<TCoAddMember>() + .Struct("row") + .Name(shuffleColumn) + .Item<TCoRandom>().Add<TCoDependsOn>().Input("row").Build().Build() + .Build() + .Build() + .Build() + .Done(); + auto removeShuffleColumn = Build<TCoLambda>(ctx, node.Pos()) + .Args({"row"}) + .Body<TCoForceRemoveMember>() + .Struct("row") + .Name(shuffleColumn) + .Build() + .Done(); + + auto transformType = callable.TransformType().Cast<TCoString>().Literal().StringValue(); + auto transformName = callable.TransformName().Cast<TCoString>().Literal().StringValue(); + + TString connectionName; + TExprNode::TPtr inputType; + TExprNode::TPtr outputType; + + TVector<TCoNameValueTuple> settings; + for (const auto &tuple: callable.Settings().Ref().Children()) { + const auto paramName = tuple->Head().Content(); + if (paramName == "connection") { + connectionName = TString{tuple->Tail().Tail().Content()}; + } else if (paramName == "input_type") { + inputType = tuple->TailPtr(); + } else if (paramName == "output_type") { + outputType = tuple->TailPtr(); + } else { + auto setting = Build<TCoNameValueTuple>(ctx, node.Pos()) + .Name().Build(paramName) + .Value(tuple->TailPtr()) + .Done(); + settings.push_back(setting); + } + } + + const auto description = State->FunctionsDescription.find(TDqFunctionDescription{ + .Type = transformType, + .FunctionName = transformName, + .Connection = connectionName + }); + + YQL_ENSURE(description != State->FunctionsDescription.end(), "External function meta doesn't found " << transformName); + settings.push_back( + Build<TCoNameValueTuple>(ctx, node.Pos()) + .Name().Build("invoke_url") + .Value<TCoAtom>().Build(description->InvokeUrl) + .Done() + ); + + auto settingsBuilder = Build<TCoNameValueTupleList>(ctx, node.Pos()) + .Add(settings) + .Done(); + + auto stage = nodeInput.Output().Stage().Cast<TDqStage>(); + auto dutyColumn = DqPushLambdaToStage(stage, nodeInput.Output().Index(), addShuffleColumn, {}, ctx, optCtx); + YQL_ENSURE(dutyColumn); + + auto transformSink = Build<TFunctionDataSink>(ctx, node.Pos()) + .Category().Build(FunctionProviderName) + .Type<TCoAtom>().Build(transformType) + .Connection().Build(connectionName) + .Done(); + + auto sinkSettings = Build<TTransformSettings>(ctx, node.Pos()) + .Type<TCoAtom>().Build(transformType) + .InputType(inputType) + .OutputType(outputType) + .Other(settingsBuilder) + .Done(); + + auto dqSink = Build<TDqSink>(ctx, node.Pos()) + .DataSink(transformSink) + .Settings(sinkSettings) + .Index().Build("0") + .Done(); + + auto transformStage = Build<TDqStage>(ctx, node.Pos()) + .Inputs() + .Add<TDqCnHashShuffle>() + .KeyColumns() + .Add({shuffleColumn}) + .Build() + .Output() + .Stage(dutyColumn.Cast()) + .Index(nodeInput.Output().Index()) + .Build() + .Build() + .Build() + .Program() + .Args({"row"}) + .Body<TCoMap>() + .Lambda(removeShuffleColumn) + .Input("row") + .Build() + .Build() + .Settings().Build() + .Sinks().Add(dqSink).Build() + .Done(); + + auto externalStage = Build<TDqCnUnionAll>(ctx, node.Pos()) + .Output() + .Stage(transformStage) + .Index().Build("0") + .Build() + .Done(); + + return externalStage; + } + +private: + TDqFunctionState::TPtr State; +}; + +} // namespace + +THolder<IGraphTransformer> CreateDqFunctionPhysicalOptTransformer(TDqFunctionState::TPtr state) { + return MakeHolder<TDqFunctionPhysicalOptTransformer>(state); +} + +}
\ No newline at end of file diff --git a/ydb/library/yql/providers/function/provider/dq_function_provider.cpp b/ydb/library/yql/providers/function/provider/dq_function_provider.cpp index fbcf9ce7cd8..08bb1dc0805 100644 --- a/ydb/library/yql/providers/function/provider/dq_function_provider.cpp +++ b/ydb/library/yql/providers/function/provider/dq_function_provider.cpp @@ -12,9 +12,10 @@ TDataProviderInitializer GetDqFunctionDataProviderInitializer( ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory, TDqFunctionGatewayFactory::TPtr gatewayFactory, // TRunActorParams.TScope - const TString& scopeFolderId) { + const TString& scopeFolderId, + const THashMap<TString, TString>& secureParams) { - return [credentialsFactory, gatewayFactory, scopeFolderId] ( + return [credentialsFactory, gatewayFactory, scopeFolderId, secureParams] ( const TString& userName, const TString& sessionId, const TGatewaysConfig* gatewaysConfig, @@ -29,19 +30,20 @@ TDataProviderInitializer GetDqFunctionDataProviderInitializer( Y_UNUSED(gatewaysConfig); Y_UNUSED(functionRegistry); Y_UNUSED(randomProvider); - Y_UNUSED(typeCtx); Y_UNUSED(progressWriter); Y_UNUSED(operationOptions); auto state = MakeIntrusive<TDqFunctionState>(); state->SessionId = sessionId; + state->Types = typeCtx.Get(); state->GatewayFactory = gatewayFactory; state->ScopeFolderId = scopeFolderId; + state->SecureParams = secureParams; TDataProviderInfo provider; provider.Names.insert({TString{FunctionProviderName}}); provider.Source = CreateDqFunctionDataSource(state); - // TODO Sink + provider.Sink = CreateDqFunctionDataSink(state); return provider; }; } diff --git a/ydb/library/yql/providers/function/provider/dq_function_provider.h b/ydb/library/yql/providers/function/provider/dq_function_provider.h index 2bcfdc076cc..116b0d161af 100644 --- a/ydb/library/yql/providers/function/provider/dq_function_provider.h +++ b/ydb/library/yql/providers/function/provider/dq_function_provider.h @@ -16,13 +16,17 @@ struct TDqFunctionState : public TThrRefBase { TString SessionId; TString ScopeFolderId; + TTypeAnnotationContext* Types = nullptr; + TDqFunctionResolver::TPtr FunctionsResolver = MakeIntrusive<TDqFunctionResolver>(); TDqFunctionGatewayFactory::TPtr GatewayFactory; TDqFunctionsSet FunctionsDescription; + + THashMap<TString, TString> SecureParams; }; TIntrusivePtr<IDataProvider> CreateDqFunctionDataSource(TDqFunctionState::TPtr state); -//TIntrusivePtr<IDataProvider> CreateDqFunctionDataSink(TDqFunctionState::TPtr state); +TIntrusivePtr<IDataProvider> CreateDqFunctionDataSink(TDqFunctionState::TPtr state); } namespace NYql { @@ -30,6 +34,7 @@ namespace NYql { TDataProviderInitializer GetDqFunctionDataProviderInitializer( ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory, TDqFunctionGatewayFactory::TPtr gatewayFactory, - const TString& scopeFolderId = {}); + const TString& scopeFolderId = {}, + const THashMap<TString, TString>& secureParams = {}); }
\ No newline at end of file diff --git a/ydb/library/yql/providers/function/provider/dq_function_provider_impl.h b/ydb/library/yql/providers/function/provider/dq_function_provider_impl.h index 6c37e858eb6..90991e711db 100644 --- a/ydb/library/yql/providers/function/provider/dq_function_provider_impl.h +++ b/ydb/library/yql/providers/function/provider/dq_function_provider_impl.h @@ -4,10 +4,13 @@ #include <ydb/library/yql/providers/common/transform/yql_visit.h> #include <ydb/library/yql/core/yql_graph_transformer.h> +#include <ydb/library/yql/providers/dq/interface/yql_dq_integration.h> -namespace NYql { +namespace NYql::NDqFunction { THolder<TVisitorTransformerBase> CreateDqFunctionIntentTransformer(TDqFunctionState::TPtr state); THolder<IGraphTransformer> CreateDqFunctionMetaLoader(TDqFunctionState::TPtr state); +THolder<IGraphTransformer> CreateDqFunctionPhysicalOptTransformer(TDqFunctionState::TPtr state); +THolder<IDqIntegration> CreateDqFunctionDqIntegration(TDqFunctionState::TPtr state); }
\ No newline at end of file |