aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorhrustyashko <hrustyashko@yandex-team.ru>2022-04-21 02:44:42 +0300
committerhrustyashko <hrustyashko@yandex-team.ru>2022-04-21 02:44:42 +0300
commitd48331e0a9ea5992b4abb35a1ce23c741c98e5e5 (patch)
tree6c99bb3b59d27d0a28af257f54de2e7ef2a84769
parent29486e56aab0f6709271b5f3f61035d8224f5bb0 (diff)
downloadydb-d48331e0a9ea5992b4abb35a1ce23c741c98e5e5.tar.gz
Resolve function's provider as Sink with output
``` lisp (let $4 (DataSink '"function" '"CloudFunction" '"my_sa")) (let $5 (TransformSettings '"CloudFunction" (StructType '('"key" $1)) (StructType '('"b" (DataType 'Int32))) '('('"invoke_url" '"https://functions.yandexcloud.net/d4em1sgldfc6p4bcmtp2")))) (let $6 (DqPhyStage '((DqCnHashShuffle (TDqOutput $3 '0) '($2))) (lambda '($16) (FromFlow (Map (ToFlow $16) (lambda '($17) (AsStruct '('"key" (Member $17 '"key"))))))) '('('"_logical_id" '173953) '('"_id" '"683fd09-90df104f-be026786-b3e950c3")) '((DqSink $4 $5 '0)))) ``` ref:1aa1e7696c2217a5e1d7caf883e054fee7466bbf
-rw-r--r--CMakeLists.darwin.txt5
-rw-r--r--CMakeLists.linux.txt5
-rw-r--r--ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h1
-rw-r--r--ydb/library/yql/dq/expr_nodes/dq_expr_nodes.json13
-rw-r--r--ydb/library/yql/dq/opt/dq_opt.cpp8
-rw-r--r--ydb/library/yql/dq/opt/dq_opt.h2
-rw-r--r--ydb/library/yql/dq/opt/dq_opt_build.cpp3
-rw-r--r--ydb/library/yql/dq/opt/dq_opt_join.cpp2
-rw-r--r--ydb/library/yql/dq/opt/dq_opt_phy.cpp119
-rw-r--r--ydb/library/yql/dq/opt/dq_opt_phy.h3
-rw-r--r--ydb/library/yql/dq/runtime/dq_tasks_runner.cpp13
-rw-r--r--ydb/library/yql/dq/tasks/CMakeLists.txt1
-rw-r--r--ydb/library/yql/dq/tasks/dq_connection_builder.h18
-rw-r--r--ydb/library/yql/dq/tasks/dq_tasks_graph.h36
-rw-r--r--ydb/library/yql/dq/type_ann/dq_type_ann.cpp94
-rw-r--r--ydb/library/yql/dq/type_ann/dq_type_ann.h1
-rw-r--r--ydb/library/yql/providers/common/dq/yql_dq_integration_impl.cpp5
-rw-r--r--ydb/library/yql/providers/common/dq/yql_dq_integration_impl.h1
-rw-r--r--ydb/library/yql/providers/dq/interface/CMakeLists.txt1
-rw-r--r--ydb/library/yql/providers/dq/interface/yql_dq_integration.h2
-rw-r--r--ydb/library/yql/providers/dq/opt/physical_optimize.cpp8
-rw-r--r--ydb/library/yql/providers/dq/planner/execution_planner.cpp58
-rw-r--r--ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp1
-rw-r--r--ydb/library/yql/providers/dq/provider/yql_dq_datasink_type_ann.cpp1
-rw-r--r--ydb/library/yql/providers/function/expr_nodes/dq_function_expr_nodes.h26
-rw-r--r--ydb/library/yql/providers/function/expr_nodes/dq_function_expr_nodes.json14
-rw-r--r--ydb/library/yql/providers/function/gateway/dq_function_gateway.h2
-rw-r--r--ydb/library/yql/providers/function/proto/CMakeLists.txt31
-rw-r--r--ydb/library/yql/providers/function/proto/dq_function.proto8
-rw-r--r--ydb/library/yql/providers/function/provider/CMakeLists.txt8
-rw-r--r--ydb/library/yql/providers/function/provider/dq_function_datasink.cpp85
-rw-r--r--ydb/library/yql/providers/function/provider/dq_function_datasource.cpp9
-rw-r--r--ydb/library/yql/providers/function/provider/dq_function_dq_integration.cpp71
-rw-r--r--ydb/library/yql/providers/function/provider/dq_function_intent_transformer.cpp30
-rw-r--r--ydb/library/yql/providers/function/provider/dq_function_load_meta.cpp9
-rw-r--r--ydb/library/yql/providers/function/provider/dq_function_physical_optimize.cpp182
-rw-r--r--ydb/library/yql/providers/function/provider/dq_function_provider.cpp10
-rw-r--r--ydb/library/yql/providers/function/provider/dq_function_provider.h9
-rw-r--r--ydb/library/yql/providers/function/provider/dq_function_provider_impl.h5
39 files changed, 667 insertions, 233 deletions
diff --git a/CMakeLists.darwin.txt b/CMakeLists.darwin.txt
index fea6b301ca7..91c2c24eec5 100644
--- a/CMakeLists.darwin.txt
+++ b/CMakeLists.darwin.txt
@@ -1205,10 +1205,11 @@ add_subdirectory(ydb/library/yql/parser/pg_catalog/ut)
add_subdirectory(ydb/library/yql/parser/lexer_common/ut)
add_subdirectory(ydb/library/yql/providers/common/schema)
add_subdirectory(ydb/library/yql/providers/common/schema/skiff)
-add_subdirectory(ydb/library/yql/providers/function/provider)
-add_subdirectory(ydb/library/yql/providers/function/expr_nodes)
add_subdirectory(ydb/library/yql/providers/function/common)
+add_subdirectory(ydb/library/yql/providers/function/expr_nodes)
add_subdirectory(ydb/library/yql/providers/function/gateway)
+add_subdirectory(ydb/library/yql/providers/function/proto)
+add_subdirectory(ydb/library/yql/providers/function/provider)
add_subdirectory(ydb/library/yql/public/decimal/ut)
add_subdirectory(ydb/library/yql/public/issue/ut)
add_subdirectory(ydb/library/yql/public/udf/ut)
diff --git a/CMakeLists.linux.txt b/CMakeLists.linux.txt
index 708a6f310a4..fd5ab1928c9 100644
--- a/CMakeLists.linux.txt
+++ b/CMakeLists.linux.txt
@@ -1300,10 +1300,11 @@ add_subdirectory(ydb/library/yql/parser/pg_catalog/ut)
add_subdirectory(ydb/library/yql/parser/lexer_common/ut)
add_subdirectory(ydb/library/yql/providers/common/schema)
add_subdirectory(ydb/library/yql/providers/common/schema/skiff)
-add_subdirectory(ydb/library/yql/providers/function/provider)
-add_subdirectory(ydb/library/yql/providers/function/expr_nodes)
add_subdirectory(ydb/library/yql/providers/function/common)
+add_subdirectory(ydb/library/yql/providers/function/expr_nodes)
add_subdirectory(ydb/library/yql/providers/function/gateway)
+add_subdirectory(ydb/library/yql/providers/function/proto)
+add_subdirectory(ydb/library/yql/providers/function/provider)
add_subdirectory(ydb/library/yql/public/decimal/ut)
add_subdirectory(ydb/library/yql/public/issue/ut)
add_subdirectory(ydb/library/yql/public/udf/ut)
diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h
index 665d5b43dc1..3cea1fe0241 100644
--- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h
+++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h
@@ -1341,6 +1341,7 @@ private:
const auto& outputDesc = Task.GetOutputs(i);
Y_VERIFY(!outputDesc.HasSink() || outputDesc.ChannelsSize() == 0); // HasSink => no channels
Y_VERIFY(outputDesc.HasSink() || outputDesc.ChannelsSize() > 0);
+
if (outputDesc.HasSink()) {
auto result = SinksMap.emplace(i, TSinkInfo());
YQL_ENSURE(result.second);
diff --git a/ydb/library/yql/dq/expr_nodes/dq_expr_nodes.json b/ydb/library/yql/dq/expr_nodes/dq_expr_nodes.json
index 67627b1b8bd..0041a497154 100644
--- a/ydb/library/yql/dq/expr_nodes/dq_expr_nodes.json
+++ b/ydb/library/yql/dq/expr_nodes/dq_expr_nodes.json
@@ -80,6 +80,17 @@
"ListBase": "TDqSink"
},
{
+ "Name": "TTransformSettings",
+ "Base": "TCallable",
+ "Match": {"Type": "Callable", "Name": "TransformSettings"},
+ "Children": [
+ {"Index": 0, "Name": "Type", "Type": "TCoAtom"},
+ {"Index": 1, "Name": "InputType", "Type": "TExprBase"},
+ {"Index": 2, "Name": "OutputType", "Type": "TExprBase"},
+ {"Index": 3, "Name": "Other", "Type": "TCoNameValueTupleList"}
+ ]
+ },
+ {
"Name": "TDqStageBase",
"Base": "TCallable",
"Match": {"Type": "CallableBase"},
@@ -224,7 +235,7 @@
"Children": [
{"Index": 0, "Name": "TransformType", "Type": "TExprBase"},
{"Index": 1, "Name": "TransformName", "Type": "TExprBase"},
- {"Index": 2, "Name": "Settings", "Type": "TExprBase"}
+ {"Index": 2, "Name": "Settings", "Type": "TCoNameValueTupleList"}
]
}
]
diff --git a/ydb/library/yql/dq/opt/dq_opt.cpp b/ydb/library/yql/dq/opt/dq_opt.cpp
index 418f3fe117f..d936a0939c6 100644
--- a/ydb/library/yql/dq/opt/dq_opt.cpp
+++ b/ydb/library/yql/dq/opt/dq_opt.cpp
@@ -144,15 +144,11 @@ bool IsSingleConsumerConnection(const TDqConnection& node, const TParentsMap& pa
&& (allowStageMultiUsage || IsSingleConsumer(node.Output().Stage(), parentsMap));
}
-ui32 GetStageOutputsCount(const TDqStageBase& stage, bool includingSinks) {
+ui32 GetStageOutputsCount(const TDqStageBase& stage) {
auto stageType = stage.Ref().GetTypeAnn();
YQL_ENSURE(stageType);
auto resultsTypeTuple = stageType->Cast<TTupleExprType>();
- ui32 result = resultsTypeTuple->GetSize();
- if (!includingSinks && stage.Sinks()) {
- result -= stage.Sinks().Cast().Size();
- }
- return result;
+ return resultsTypeTuple->GetSize();
}
TVector<TDqConnection> FindDqConnections(const TExprBase& node) {
diff --git a/ydb/library/yql/dq/opt/dq_opt.h b/ydb/library/yql/dq/opt/dq_opt.h
index 125a0a5bd9e..3d9a14e1f8a 100644
--- a/ydb/library/yql/dq/opt/dq_opt.h
+++ b/ydb/library/yql/dq/opt/dq_opt.h
@@ -54,7 +54,7 @@ bool IsSingleConsumer(const NNodes::TExprBase& node, const TParentsMap& parentsM
bool IsSingleConsumerConnection(const NNodes::TDqConnection& node, const TParentsMap& parentsMap, bool allowStageMultiUsage = true);
-ui32 GetStageOutputsCount(const NNodes::TDqStageBase& stage, bool includingSinks);
+ui32 GetStageOutputsCount(const NNodes::TDqStageBase& stage);
TVector<NNodes::TDqConnection> FindDqConnections(const NNodes::TExprBase& node);
bool IsDqPureExpr(const NNodes::TExprBase& node, bool isPrecomputePure = true);
diff --git a/ydb/library/yql/dq/opt/dq_opt_build.cpp b/ydb/library/yql/dq/opt/dq_opt_build.cpp
index e57b85a9113..175b58cc208 100644
--- a/ydb/library/yql/dq/opt/dq_opt_build.cpp
+++ b/ydb/library/yql/dq/opt/dq_opt_build.cpp
@@ -235,8 +235,9 @@ public:
if (output.Stage().Maybe<TDqStage>()) {
auto& info = consumersMap[output.Stage().Raw()];
+
if (info.Consumers.empty()) {
- info.Consumers.resize(GetStageOutputsCount(output.Stage(), false));
+ info.Consumers.resize(GetStageOutputsCount(output.Stage()));
}
YQL_ENSURE(index <= info.Consumers.size());
diff --git a/ydb/library/yql/dq/opt/dq_opt_join.cpp b/ydb/library/yql/dq/opt/dq_opt_join.cpp
index ab440d1d5ed..7ce13c9b83d 100644
--- a/ydb/library/yql/dq/opt/dq_opt_join.cpp
+++ b/ydb/library/yql/dq/opt/dq_opt_join.cpp
@@ -617,7 +617,7 @@ TExprBase DqBuildPhyJoin(const TDqJoin& join, bool pushLeftStage, TExprContext&
buildNewStage = IsDqDependsOnStage(join.RightInput(), leftCn.Output().Stage());
if (!buildNewStage) {
// NOTE: Do not push join to stage with multiple outputs, reduce memory footprint.
- buildNewStage = GetStageOutputsCount(leftCn.Output().Stage(), true) > 1;
+ buildNewStage = GetStageOutputsCount(leftCn.Output().Stage()) > 1;
}
}
diff --git a/ydb/library/yql/dq/opt/dq_opt_phy.cpp b/ydb/library/yql/dq/opt/dq_opt_phy.cpp
index 9df0e144c5e..71cf0dcff82 100644
--- a/ydb/library/yql/dq/opt/dq_opt_phy.cpp
+++ b/ydb/library/yql/dq/opt/dq_opt_phy.cpp
@@ -201,7 +201,7 @@ TMaybeNode<TDqStage> DqPushLambdaToStage(const TDqStage& stage, const TCoAtom& o
auto program = stage.Program();
ui32 index = FromString<ui32>(outputIndex.Value());
- ui32 branchesCount = GetStageOutputsCount(stage, true);
+ ui32 branchesCount = GetStageOutputsCount(stage);
TExprNode::TPtr newProgram;
if (branchesCount == 1) {
@@ -427,123 +427,6 @@ TExprBase DqPushLMapToStage(TExprBase node, TExprContext& ctx, IOptimizationCont
return DqPushBaseLMapToStage<TCoLMap>(node, ctx, optCtx, parentsMap, allowStageMultiUsage);
}
-TExprBase DqBuildExtFunctionStage(TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx,
- const TParentsMap& parentsMap, bool allowStageMultiUsage)
-{
- Y_UNUSED(optCtx);
- Y_UNUSED(allowStageMultiUsage);
- Y_UNUSED(parentsMap);
-
- auto apply = node.Cast<TCoApply>();
- auto callable = apply.Callable().Maybe<TDqSqlExternalFunction>();
- if (!callable
- || apply.Args().Count() != 2
- || !apply.Arg(1).Maybe<TDqCnUnionAll>()) {
-
- return node;
- }
- callable = callable.Cast();
- TDqCnUnionAll nodeInput {apply.Arg(1).Cast<TDqCnUnionAll>()};
-
- if (!IsSingleConsumerConnection(nodeInput, parentsMap, allowStageMultiUsage)) {
- return node;
- }
-
- const auto shuffleColumn = Build<TCoAtom>(ctx, node.Pos())
- .Value("_yql_transform_shuffle")
- .Done();
- auto addShuffleColumn = Build<TCoLambda>(ctx, node.Pos())
- .Args({"stream"})
- .Body<TCoMap>()
- .Input("stream")
- .Lambda()
- .Args({"row"})
- .Body<TCoAddMember>()
- .Struct("row")
- .Name(shuffleColumn)
- .Item<TCoRandom>().Add<TCoDependsOn>().Input("row").Build().Build()
- .Build()
- .Build()
- .Build()
- .Done();
- auto removeShuffleColumn = Build<TCoLambda>(ctx, node.Pos())
- .Args({"row"})
- .Body<TCoForceRemoveMember>()
- .Struct("row")
- .Name(shuffleColumn)
- .Build()
- .Done();
-
- TVector<TCoNameValueTuple> settings;
- auto isExtFunction = Build<TCoNameValueTuple>(ctx, node.Pos())
- .Name().Build(TDqStageSettings::IsExternalSetting)
- .Value<TCoBool>().Literal().Build("true").Build()
- .Done();
- settings.push_back(isExtFunction);
-
- auto transformType = callable.TransformType().Cast<TCoString>().Literal().StringValue();
- settings.push_back(
- Build<TCoNameValueTuple>(ctx, node.Pos())
- .Name().Build(TDqStageSettings::TransformTypeSetting)
- .Value<TCoAtom>().Build(transformType)
- .Done());
-
- auto transformName = callable.TransformName().Cast<TCoString>().Literal().StringValue();
- settings.push_back(
- Build<TCoNameValueTuple>(ctx, node.Pos())
- .Name().Build(TDqStageSettings::TransformNameSetting)
- .Value<TCoAtom>().Build(transformName)
- .Done());
-
- for (const auto &tuple: callable.Settings().Ref().Children()) {
- const auto paramName = tuple->Head().Content();
- auto setting = Build<TCoNameValueTuple>(ctx, node.Pos())
- .Name().Build(paramName)
- .Value(tuple->TailPtr())
- .Done();
- settings.push_back(setting);
- }
-
- auto settingsBuilder = Build<TCoNameValueTupleList>(ctx, node.Pos())
- .Add(settings)
- .Done();
-
- auto stage = nodeInput.Output().Stage().Cast<TDqStage>();
- auto dutyColumn = DqPushLambdaToStage(stage, nodeInput.Output().Index(), addShuffleColumn, {}, ctx, optCtx);
- YQL_ENSURE(dutyColumn);
-
- auto transformStage = Build<TDqStage>(ctx, node.Pos())
- .Inputs()
- .Add<TDqCnHashShuffle>()
- .KeyColumns()
- .Add({shuffleColumn})
- .Build()
- .Output()
- .Stage(dutyColumn.Cast())
- .Index(nodeInput.Output().Index())
- .Build()
- .Build()
- .Build()
- .Program()
- .Args({"row"})
- .Body<TCoMap>()
- .Lambda(removeShuffleColumn)
- .Input("row")
- .Build()
- .Build()
- .Settings(settingsBuilder)
- .Done();
-
- auto externalStage = Build<TDqCnUnionAll>(ctx, node.Pos())
- .Output()
- .Stage(transformStage)
- .Index().Build("0")
- .Build()
- .Done();
-
- return externalStage;
-}
-
TExprBase DqPushCombineToStage(TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx,
const TParentsMap& parentsMap, bool allowStageMultiUsage)
{
diff --git a/ydb/library/yql/dq/opt/dq_opt_phy.h b/ydb/library/yql/dq/opt/dq_opt_phy.h
index 7013aa72a47..693d993fdfb 100644
--- a/ydb/library/yql/dq/opt/dq_opt_phy.h
+++ b/ydb/library/yql/dq/opt/dq_opt_phy.h
@@ -29,9 +29,6 @@ NNodes::TExprBase DqPushOrderedLMapToStage(NNodes::TExprBase node, TExprContext&
NNodes::TExprBase DqPushLMapToStage(NNodes::TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx,
const TParentsMap& parentsMap, bool allowStageMultiUsage = true);
-NNodes::TExprBase DqBuildExtFunctionStage(NNodes::TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx,
- const TParentsMap& parentsMap, bool allowStageMultiUsage = true);
-
NNodes::TExprBase DqBuildFlatmapStage(NNodes::TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx,
const TParentsMap& parentsMap, bool allowStageMultiUsage = true);
diff --git a/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp b/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp
index 67005a599b1..ab8a5eb190c 100644
--- a/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp
+++ b/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp
@@ -9,10 +9,13 @@
#include <ydb/library/yql/minikql/computation/mkql_computation_node.h>
+#include <ydb/library/yql/providers/common/schema/mkql/yql_mkql_schema.h>
+
#include <ydb/library/yql/public/udf/udf_value.h>
#include <ydb/library/yql/core/user_data/yql_user_data.h>
#include <ydb/library/yql/minikql/mkql_node_serialization.h>
#include <ydb/library/yql/minikql/mkql_node_visitor.h>
+#include <ydb/library/yql/minikql/mkql_program_builder.h>
#include <util/generic/scope.h>
@@ -404,6 +407,7 @@ public:
TBindTerminator term(ProgramParsed.CompGraph->GetTerminator());
auto& typeEnv = TypeEnv();
+ const auto pb = std::make_unique<NKikimr::NMiniKQL::TProgramBuilder>(typeEnv, *(holderFactory.GetFunctionRegistry()));
for (ui32 i = 0; i < task.InputsSize(); ++i) {
auto& inputDesc = task.GetInputs(i);
@@ -441,6 +445,15 @@ public:
TaskHasEffects = true;
}
+ if (outputDesc.HasTransform()) {
+ auto transform = outputDesc.GetTransform();
+ auto outputType = NCommon::ParseTypeFromYson(TStringBuf{transform.GetOutputType()}, *pb, Cerr);
+ auto inputType = NCommon::ParseTypeFromYson(TStringBuf{transform.GetInputType()}, *pb, Cerr);
+ LOG(TStringBuilder() << "Task: " << TaskId << " has transform by "
+ << transform.GetType() << " with input type: " << *inputType
+ << " , output type: " << *outputType);
+ }
+
TVector<IDqOutput::TPtr> outputs{Reserve(std::max<ui64>(outputDesc.ChannelsSize(), 1))};
if (outputDesc.HasSink()) {
auto sink = CreateDqAsyncOutputBuffer(i, ProgramParsed.OutputItemTypes[i], memoryLimits.ChannelBufferSize,
diff --git a/ydb/library/yql/dq/tasks/CMakeLists.txt b/ydb/library/yql/dq/tasks/CMakeLists.txt
index 726d1950eb2..ed676758698 100644
--- a/ydb/library/yql/dq/tasks/CMakeLists.txt
+++ b/ydb/library/yql/dq/tasks/CMakeLists.txt
@@ -17,6 +17,7 @@ target_link_libraries(yql-dq-tasks PUBLIC
library-yql-core
yql-dq-expr_nodes
yql-dq-proto
+ library-yql-ast
)
target_sources(yql-dq-tasks PRIVATE
${CMAKE_SOURCE_DIR}/ydb/library/yql/dq/tasks/dq_task_program.cpp
diff --git a/ydb/library/yql/dq/tasks/dq_connection_builder.h b/ydb/library/yql/dq/tasks/dq_connection_builder.h
index d5bd5a4cf55..ee7f73726ee 100644
--- a/ydb/library/yql/dq/tasks/dq_connection_builder.h
+++ b/ydb/library/yql/dq/tasks/dq_connection_builder.h
@@ -11,7 +11,6 @@ template <class TStageInfoMeta, class TTaskMeta, class TInputMeta, class TOutput
void CommonBuildTasks(TDqTasksGraph<TStageInfoMeta, TTaskMeta, TInputMeta, TOutputMeta>& graph, const NNodes::TDqPhyStage& stage) {
ui32 partitionsCount = 1;
- const auto stageSettings = NDq::TDqStageSettings::Parse(stage);
auto& stageInfo = graph.GetStageInfo(stage);
for (ui32 inputIndex = 0; inputIndex < stage.Inputs().Size(); ++inputIndex) {
const auto& input = stage.Inputs().Item(inputIndex);
@@ -32,12 +31,8 @@ void CommonBuildTasks(TDqTasksGraph<TStageInfoMeta, TTaskMeta, TInputMeta, TOutp
if (auto maybeCnShuffle = input.Maybe<NNodes::TDqCnHashShuffle>()) {
auto shuffle = maybeCnShuffle.Cast();
auto& originStageInfo = graph.GetStageInfo(shuffle.Output().Stage());
- if (stageSettings.IsExternalFunction) {
- partitionsCount = stageSettings.MaxTransformConcurrency();
- } else {
- partitionsCount = std::max(partitionsCount, (ui32)originStageInfo.Tasks.size() / 2);
- partitionsCount = std::min(partitionsCount, 24u);
- }
+ partitionsCount = std::max(partitionsCount, (ui32)originStageInfo.Tasks.size() / 2);
+ partitionsCount = std::min(partitionsCount, 24u);
} else if (auto maybeCnMap = input.Maybe<NNodes::TDqCnMap>()) {
auto cnMap = maybeCnMap.Cast();
auto& originStageInfo = graph.GetStageInfo(cnMap.Output().Stage());
@@ -46,12 +41,7 @@ void CommonBuildTasks(TDqTasksGraph<TStageInfoMeta, TTaskMeta, TInputMeta, TOutp
}
for (ui32 i = 0; i < partitionsCount; ++i) {
- auto& task = graph.AddTask(stageInfo);
- if (stageSettings.IsExternalFunction) {
- auto& transform = task.OutputTransform;
- transform.Type = stageSettings.TransformType;
- //transform.FunctionName = stageSettings.TransformName;
- }
+ graph.AddTask(stageInfo);
}
}
@@ -60,7 +50,7 @@ void BuildUnionAllChannels(TGraph& graph, const typename TGraph::TStageInfoType&
const typename TGraph::TStageInfoType& inputStageInfo, ui32 outputIndex, bool enableSpilling,
const TChannelLogFunc& logFunc)
{
- YQL_ENSURE(stageInfo.Tasks.size() == 1, "Multiple tasks on union all input. StageId: " << stageInfo.Id);
+ YQL_ENSURE(stageInfo.Tasks.size() == 1, "Multiple tasks on union all input. StageId: " << stageInfo.Id << " " << stageInfo.Tasks.size());
auto& targetTask = graph.GetTask(stageInfo.Tasks[0]);
for (auto& originTaskId : inputStageInfo.Tasks) {
diff --git a/ydb/library/yql/dq/tasks/dq_tasks_graph.h b/ydb/library/yql/dq/tasks/dq_tasks_graph.h
index ea2bc4030de..ee7cba6b71a 100644
--- a/ydb/library/yql/dq/tasks/dq_tasks_graph.h
+++ b/ydb/library/yql/dq/tasks/dq_tasks_graph.h
@@ -2,6 +2,7 @@
#include <ydb/library/yql/dq/expr_nodes/dq_expr_nodes.h>
#include <ydb/library/yql/dq/proto/dq_tasks.pb.h>
+#include <ydb/library/yql/ast/yql_expr.h>
#include <library/cpp/actors/core/actorid.h>
@@ -43,23 +44,8 @@ struct TStageInfo : private TMoveOnly {
, InputsCount(stage.Inputs().Size())
, Meta(std::move(meta))
{
- auto result = stage.Program().Body();
- auto resultType = result.Ref().GetTypeAnn();
-
- if (resultType->GetKind() == ETypeAnnotationKind::Stream) {
- auto resultItemType = resultType->Cast<TStreamExprType>()->GetItemType();
- if (resultItemType->GetKind() == ETypeAnnotationKind::Variant) {
- auto underlyingType = resultItemType->Cast<TVariantExprType>()->GetUnderlyingType();
- YQL_ENSURE(underlyingType->GetKind() == ETypeAnnotationKind::Tuple);
- OutputsCount = underlyingType->Cast<TTupleExprType>()->GetSize();
- YQL_ENSURE(OutputsCount > 1);
- } else {
- OutputsCount = 1;
- }
- } else {
- YQL_ENSURE(resultType->GetKind() == ETypeAnnotationKind::Void, "got " << *resultType);
- OutputsCount = 0;
- }
+ auto stageResultTuple = stage.Ref().GetTypeAnn()->Cast<TTupleExprType>();
+ OutputsCount = stageResultTuple->GetSize();
}
TStageInfo(const NNodes::TDqPhyStage& stage, TStageInfoMeta&& meta)
@@ -141,6 +127,15 @@ struct TTaskOutputType {
};
};
+struct TTransform {
+ TString Type;
+
+ TString InputType;
+ TString OutputType;
+
+ ::google::protobuf::Any Settings;
+};
+
template <class TOutputMeta>
struct TTaskOutput {
ui32 Type = TTaskOutputType::Undefined;
@@ -150,11 +145,7 @@ struct TTaskOutput {
TMaybe<::google::protobuf::Any> SinkSettings;
TString SinkType;
TOutputMeta Meta;
-};
-
-struct TTransform {
- TString Type;
- ::google::protobuf::Any TransformSettings;
+ TMaybe<TTransform> Transform;
};
template <class TStageInfoMeta, class TTaskMeta, class TInputMeta, class TOutputMeta>
@@ -175,7 +166,6 @@ struct TTask {
NActors::TActorId ComputeActorId;
TTaskMeta Meta;
NDqProto::ECheckpointingMode CheckpointingMode = NDqProto::CHECKPOINTING_MODE_DEFAULT;
- TTransform OutputTransform;
};
template <class TStageInfoMeta, class TTaskMeta, class TInputMeta, class TOutputMeta>
diff --git a/ydb/library/yql/dq/type_ann/dq_type_ann.cpp b/ydb/library/yql/dq/type_ann/dq_type_ann.cpp
index 31dca5102e2..db912c530ea 100644
--- a/ydb/library/yql/dq/type_ann/dq_type_ann.cpp
+++ b/ydb/library/yql/dq/type_ann/dq_type_ann.cpp
@@ -134,8 +134,7 @@ TStatus AnnotateStage(const TExprNode::TPtr& stage, TExprContext& ctx) {
return TStatus::Repeat;
}
- TVector<const TTypeAnnotationNode*> resultTypesTuple;
-
+ TVector<const TTypeAnnotationNode*> programResultTypesTuple;
if (resultType->GetKind() == ETypeAnnotationKind::Void) {
// do nothing, return empty tuple as program result
} else {
@@ -150,26 +149,81 @@ TStatus AnnotateStage(const TExprNode::TPtr& stage, TExprContext& ctx) {
auto variantType = itemType->Cast<TVariantExprType>()->GetUnderlyingType();
YQL_ENSURE(variantType->GetKind() == ETypeAnnotationKind::Tuple);
const auto& items = variantType->Cast<TTupleExprType>()->GetItems();
- resultTypesTuple.reserve(items.size());
+ programResultTypesTuple.reserve(items.size());
for (const auto* branchType : items) {
- resultTypesTuple.emplace_back(ctx.MakeType<TListExprType>(branchType));
+ programResultTypesTuple.emplace_back(ctx.MakeType<TListExprType>(branchType));
}
} else {
- resultTypesTuple.emplace_back(ctx.MakeType<TListExprType>(itemType));
+ programResultTypesTuple.emplace_back(ctx.MakeType<TListExprType>(itemType));
}
} else {
YQL_ENSURE(resultType->GetKind() != ETypeAnnotationKind::List, "stage: " << stage->Dump());
- resultTypesTuple.emplace_back(resultType);
+ programResultTypesTuple.emplace_back(resultType);
}
}
+ TVector<const TTypeAnnotationNode*> stageResultTypes;
if (TDqStageBase::idx_Sinks < stage->ChildrenSize()) {
+ YQL_ENSURE(stage->Child(TDqStageBase::idx_Sinks)->ChildrenSize() != 0, "Sink list exists but empty, stage: " << stage->Dump());
+
+ auto outputsNumber = programResultTypesTuple.size();
+ TVector<TExprNode::TPtr> transformSinks;
+ TVector<TExprNode::TPtr> pureSinks;
+ transformSinks.reserve(outputsNumber);
+ pureSinks.reserve(outputsNumber);
for (const auto& sink: stage->Child(TDqStageBase::idx_Sinks)->Children()) {
- sink->SetTypeAnn(resultType);
+ const ui64 index = FromString(sink->Child(TDqSink::idx_Index)->Content());
+ if (index >= outputsNumber) {
+ ctx.AddError(TIssue(ctx.GetPosition(stage->Pos()), TStringBuilder()
+ << "Sink/Transform try to process un-existing lambda's output"));
+ return TStatus::Error;
+ }
+
+ auto transformSettings = sink->Child(TDqSink::idx_Settings)->IsCallable(TTransformSettings::CallableName());
+ if (transformSettings) {
+ transformSinks.push_back(sink);
+ } else {
+ pureSinks.push_back(sink);
+ }
}
+
+ if (!transformSinks.empty() && !pureSinks.empty()
+ && transformSinks.size() != pureSinks.size()) {
+
+ ctx.AddError(TIssue(ctx.GetPosition(stage->Pos()), TStringBuilder()
+ << "Not every transform has a corresponding sink"));
+ return TStatus::Error;
+ }
+
+ if (!pureSinks.empty()) {
+ for (auto sink : pureSinks) {
+ sink->SetTypeAnn(resultType);
+ }
+ stageResultTypes.assign(programResultTypesTuple.begin(), programResultTypesTuple.end());
+ } else {
+ for (auto sink : transformSinks) {
+ auto* sinkType = sink->GetTypeAnn();
+ if (sinkType->GetKind() != ETypeAnnotationKind::List
+ && sinkType->GetKind() != ETypeAnnotationKind::Void) {
+
+ ctx.AddError(TIssue(ctx.GetPosition(sink->Pos()), TStringBuilder()
+ << "Expected List or Void type, but got: " << *sinkType));
+ return TStatus::Error;
+ }
+ /* auto* itemType = sinkType->Cast<TListExprType>()->GetItemType();
+ if (itemType->GetKind() != ETypeAnnotationKind::Struct) {
+ ctx.AddError(TIssue(ctx.GetPosition(sink->Pos()), TStringBuilder()
+ << "Expected List<Struct<...>> type, but got: List<" << *itemType << ">"));
+ return TStatus::Error;
+ } */
+ stageResultTypes.emplace_back(sinkType);
+ }
+ }
+ } else {
+ stageResultTypes.assign(programResultTypesTuple.begin(), programResultTypesTuple.end());
}
- stage->SetTypeAnn(ctx.MakeType<TTupleExprType>(resultTypesTuple));
+ stage->SetTypeAnn(ctx.MakeType<TTupleExprType>(stageResultTypes));
return TStatus::Ok;
}
@@ -783,6 +837,26 @@ TStatus AnnotateDqQuery(const TExprNode::TPtr& input, TExprContext& ctx) {
return TStatus::Ok;
}
+TStatus AnnotateTransformSettings(const TExprNode::TPtr& input, TExprContext& ctx) {
+ if (!EnsureArgsCount(*input, 4U, ctx)) {
+ return TStatus::Error;
+ }
+
+ const TExprNode* outputArg = input->Child(TTransformSettings::idx_OutputType);
+ if (!EnsureTypeWithStructType(*outputArg, ctx)) {
+ return IGraphTransformer::TStatus::Error;
+ }
+ const TTypeAnnotationNode* outputType = outputArg->GetTypeAnn()->Cast<TTypeExprType>()->GetType();
+
+ const TExprNode* inputType = input->Child(TTransformSettings::idx_InputType);
+ if (!EnsureTypeWithStructType(*inputType, ctx)) {
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ input->SetTypeAnn(ctx.MakeType<TListExprType>(outputType));
+ return TStatus::Ok;
+}
+
THolder<IGraphTransformer> CreateDqTypeAnnotationTransformer(TTypeAnnotationContext& typesCtx) {
auto coreTransformer = CreateExtCallableTypeAnnotationTransformer(typesCtx);
@@ -862,6 +936,10 @@ THolder<IGraphTransformer> CreateDqTypeAnnotationTransformer(TTypeAnnotationCont
return AnnotateDqSink(input, ctx);
}
+ if (TTransformSettings::Match(input.Get())) {
+ return AnnotateTransformSettings (input, ctx);
+ }
+
if (TDqQuery::Match(input.Get())) {
return AnnotateDqQuery(input, ctx);
}
diff --git a/ydb/library/yql/dq/type_ann/dq_type_ann.h b/ydb/library/yql/dq/type_ann/dq_type_ann.h
index 03aefe701ff..84f8afe289e 100644
--- a/ydb/library/yql/dq/type_ann/dq_type_ann.h
+++ b/ydb/library/yql/dq/type_ann/dq_type_ann.h
@@ -22,6 +22,7 @@ IGraphTransformer::TStatus AnnotateDqCrossJoin(const TExprNode::TPtr& input, TEx
IGraphTransformer::TStatus AnnotateDqSource(const TExprNode::TPtr& input, TExprContext& ctx);
IGraphTransformer::TStatus AnnotateDqSink(const TExprNode::TPtr& input, TExprContext& ctx);
IGraphTransformer::TStatus AnnotateDqQuery(const TExprNode::TPtr& input, TExprContext& ctx);
+IGraphTransformer::TStatus AnnotateTransformSettings(const TExprNode::TPtr& input, TExprContext& ctx);
THolder<IGraphTransformer> CreateDqTypeAnnotationTransformer(NYql::TTypeAnnotationContext& typesCtx);
diff --git a/ydb/library/yql/providers/common/dq/yql_dq_integration_impl.cpp b/ydb/library/yql/providers/common/dq/yql_dq_integration_impl.cpp
index 4da4d13d034..d2de27ec495 100644
--- a/ydb/library/yql/providers/common/dq/yql_dq_integration_impl.cpp
+++ b/ydb/library/yql/providers/common/dq/yql_dq_integration_impl.cpp
@@ -54,6 +54,11 @@ void TDqIntegrationBase::FillSinkSettings(const TExprNode& node, ::google::proto
Y_UNUSED(sinkType);
}
+void TDqIntegrationBase::FillTransformSettings(const TExprNode& node, ::google::protobuf::Any& settings) {
+ Y_UNUSED(node);
+ Y_UNUSED(settings);
+}
+
void TDqIntegrationBase::Annotate(const TExprNode& node, THashMap<TString, TString>& params) {
Y_UNUSED(node);
Y_UNUSED(params);
diff --git a/ydb/library/yql/providers/common/dq/yql_dq_integration_impl.h b/ydb/library/yql/providers/common/dq/yql_dq_integration_impl.h
index cd1c150ebfc..60638f811a5 100644
--- a/ydb/library/yql/providers/common/dq/yql_dq_integration_impl.h
+++ b/ydb/library/yql/providers/common/dq/yql_dq_integration_impl.h
@@ -15,6 +15,7 @@ public:
bool CanFallback() override;
void FillSourceSettings(const TExprNode& node, ::google::protobuf::Any& settings, TString& sourceType) override;
void FillSinkSettings(const TExprNode& node, ::google::protobuf::Any& settings, TString& sinkType) override;
+ void FillTransformSettings(const TExprNode& node, ::google::protobuf::Any& settings) override;
void Annotate(const TExprNode& node, THashMap<TString, TString>& params) override;
bool PrepareFullResultTableParams(const TExprNode& root, TExprContext& ctx, THashMap<TString, TString>& params, THashMap<TString, TString>& secureParams) override;
void WriteFullResultTableRef(NYson::TYsonWriter& writer, const TVector<TString>& columns, const THashMap<TString, TString>& graphParams) override;
diff --git a/ydb/library/yql/providers/dq/interface/CMakeLists.txt b/ydb/library/yql/providers/dq/interface/CMakeLists.txt
index 69c5b9be066..1d60ad149d1 100644
--- a/ydb/library/yql/providers/dq/interface/CMakeLists.txt
+++ b/ydb/library/yql/providers/dq/interface/CMakeLists.txt
@@ -18,6 +18,7 @@ target_link_libraries(providers-dq-interface PUBLIC
library-cpp-yson
library-yql-ast
library-yql-core
+ yql-dq-tasks
)
target_sources(providers-dq-interface PRIVATE
${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/dq/interface/yql_dq_integration.cpp
diff --git a/ydb/library/yql/providers/dq/interface/yql_dq_integration.h b/ydb/library/yql/providers/dq/interface/yql_dq_integration.h
index f55fdb03e2a..d9980511cca 100644
--- a/ydb/library/yql/providers/dq/interface/yql_dq_integration.h
+++ b/ydb/library/yql/providers/dq/interface/yql_dq_integration.h
@@ -2,6 +2,7 @@
#include <ydb/library/yql/core/yql_data_provider.h>
#include <ydb/library/yql/ast/yql_expr.h>
+#include <ydb/library/yql/dq/tasks/dq_tasks_graph.h>
#include <library/cpp/yson/writer.h>
@@ -36,6 +37,7 @@ public:
virtual bool CanFallback() = 0;
virtual void FillSourceSettings(const TExprNode& node, ::google::protobuf::Any& settings, TString& sourceType) = 0;
virtual void FillSinkSettings(const TExprNode& node, ::google::protobuf::Any& settings, TString& sinkType) = 0;
+ virtual void FillTransformSettings(const TExprNode& node, ::google::protobuf::Any& settings) = 0;
virtual void Annotate(const TExprNode& node, THashMap<TString, TString>& params) = 0;
virtual bool PrepareFullResultTableParams(const TExprNode& root, TExprContext& ctx, THashMap<TString, TString>& params, THashMap<TString, TString>& secureParams) = 0;
virtual void WriteFullResultTableRef(NYson::TYsonWriter& writer, const TVector<TString>& columns, const THashMap<TString, TString>& graphParams) = 0;
diff --git a/ydb/library/yql/providers/dq/opt/physical_optimize.cpp b/ydb/library/yql/providers/dq/opt/physical_optimize.cpp
index f574c17433f..c9171de68c1 100644
--- a/ydb/library/yql/providers/dq/opt/physical_optimize.cpp
+++ b/ydb/library/yql/providers/dq/opt/physical_optimize.cpp
@@ -41,8 +41,6 @@ public:
AddHandler(0, &TCoAssumeSorted::Match, HNDL(BuildSortStage<false>));
AddHandler(0, &TCoOrderedLMap::Match, HNDL(PushOrderedLMapToStage<false>));
AddHandler(0, &TCoLMap::Match, HNDL(PushLMapToStage<false>));
- // (Apply (SqlExternalFunction ..) ..) to stage
- AddHandler(0, &TCoApply::Match, HNDL(BuildExtFunctionStage<false>));
#if 0
AddHandler(0, &TCoHasItems::Match, HNDL(BuildHasItems));
AddHandler(0, &TCoToOptional::Match, HNDL(BuildScalarPrecompute));
@@ -60,7 +58,6 @@ public:
AddHandler(1, &TCoAssumeSorted::Match, HNDL(BuildSortStage<true>));
AddHandler(1, &TCoOrderedLMap::Match, HNDL(PushOrderedLMapToStage<true>));
AddHandler(1, &TCoLMap::Match, HNDL(PushLMapToStage<true>));
- AddHandler(1, &TCoApply::Match, HNDL(BuildExtFunctionStage<true>));
#undef HNDL
SetGlobal(1u);
@@ -232,11 +229,6 @@ protected:
}
template <bool IsGlobal>
- TMaybeNode<TExprBase> BuildExtFunctionStage(TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx, const TGetParents& getParents) {
- return DqBuildExtFunctionStage(node, ctx, optCtx, *getParents(), IsGlobal);
- }
-
- template <bool IsGlobal>
TMaybeNode<TExprBase> PushCombineToStage(TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx, const TGetParents& getParents) {
return DqPushCombineToStage(node, ctx, optCtx, *getParents(), IsGlobal);
}
diff --git a/ydb/library/yql/providers/dq/planner/execution_planner.cpp b/ydb/library/yql/providers/dq/planner/execution_planner.cpp
index f6de66104a6..23b8191cacd 100644
--- a/ydb/library/yql/providers/dq/planner/execution_planner.cpp
+++ b/ydb/library/yql/providers/dq/planner/execution_planner.cpp
@@ -10,6 +10,8 @@
#include <ydb/library/yql/providers/dq/common/yql_dq_common.h>
#include <ydb/library/yql/providers/common/provider/yql_provider.h>
#include <ydb/library/yql/providers/common/mkql/yql_type_mkql.h>
+#include <ydb/library/yql/providers/common/schema/mkql/yql_mkql_schema.h>
+#include <ydb/library/yql/providers/common/schema/expr/yql_expr_schema.h>
#include <ydb/library/yql/core/yql_expr_optimize.h>
#include <ydb/library/yql/core/type_ann/type_ann_expr.h>
@@ -166,19 +168,46 @@ namespace NYql::NDqs {
YQL_ENSURE(datasink);
auto dqIntegration = (*datasink)->GetDqIntegration();
YQL_ENSURE(dqIntegration, "DqSink assumes that datasink has a dq integration impl");
+
+ TTransform stageTransform;
TString sinkType;
::google::protobuf::Any sinkSettings;
- dqIntegration->FillSinkSettings(sink.Ref(), sinkSettings, sinkType);
- YQL_ENSURE(!sinkSettings.type_url().empty(), "Data sink provider \"" << dataSinkName << "\" did't fill dq sink settings for its dq sink node");
- YQL_ENSURE(sinkType, "Data sink provider \"" << dataSinkName << "\" did't fill dq sink settings type for its dq sink node");
+
+ auto transformSettings = sink.Settings().Maybe<TTransformSettings>();
+ if (transformSettings) {
+ auto settings = transformSettings.Cast();
+
+ stageTransform.Type = settings.Type();
+ const auto inputType = settings.InputType().Ref().GetTypeAnn()->Cast<TTypeExprType>()->GetType();
+ stageTransform.InputType = NCommon::WriteTypeToYson(inputType);
+ const auto outputType = settings.OutputType().Ref().GetTypeAnn()->Cast<TTypeExprType>()->GetType();
+ stageTransform.OutputType = NCommon::WriteTypeToYson(outputType);
+
+ dqIntegration->FillTransformSettings(sink.Ref(), stageTransform.Settings);
+ } else {
+ dqIntegration->FillSinkSettings(sink.Ref(), sinkSettings, sinkType);
+ YQL_ENSURE(!sinkSettings.type_url().empty(), "Data sink provider \"" << dataSinkName << "\" did't fill dq sink settings for its dq sink node");
+ YQL_ENSURE(sinkType, "Data sink provider \"" << dataSinkName << "\" did't fill dq sink settings type for its dq sink node");
+ }
for (ui64 taskId : stageInfo.Tasks) {
auto& task = TasksGraph.GetTask(taskId);
YQL_ENSURE(index < task.Outputs.size());
auto& output = task.Outputs[index];
- output.SinkType = sinkType;
- output.SinkSettings = sinkSettings;
- output.Type = NDq::TTaskOutputType::Sink;
+ if (transformSettings) {
+
+ output.Transform.ConstructInPlace();
+
+ auto& transform = output.Transform;
+ transform->Type = stageTransform.Type;
+ transform->InputType = stageTransform.InputType;
+ transform->OutputType = stageTransform.OutputType;
+ //transform->Settings = stageTransform.Settings;
+ } else {
+ output.SinkType = sinkType;
+ output.SinkSettings = sinkSettings;
+ output.Type = NDq::TTaskOutputType::Sink;
+ }
}
}
}
@@ -475,11 +504,6 @@ namespace NYql::NDqs {
task.Inputs[dqSourceInputIndex].SourceSettings = sourceSettings;
task.Inputs[dqSourceInputIndex].SourceType = sourceType;
}
- if (stageSettings.IsExternalFunction) {
- auto& transform = task.OutputTransform;
- transform.Type = stageSettings.TransformType;
- //transform.FunctionName = stageSettings.TransformName;
- }
}
}
return !parts.empty();
@@ -642,7 +666,7 @@ namespace NYql::NDqs {
}
case TTaskOutputType::Undefined: {
- YQL_ENSURE(false, "Unexpected task output type `TTaskOutputType::Undefined`");
+ YQL_ENSURE(output.Transform, "Unexpected task output type `TTaskOutputType::Undefined`");
}
}
@@ -650,6 +674,16 @@ namespace NYql::NDqs {
auto& channelDesc = *outputDesc.AddChannels();
FillChannelDesc(channelDesc, TasksGraph.GetChannel(channel));
}
+
+ if (output.Transform) {
+ auto* transformDesc = outputDesc.MutableTransform();
+ auto& transform = output.Transform;
+
+ transformDesc->SetType(transform->Type);
+ transformDesc->SetInputType(transform->InputType);
+ transformDesc->SetOutputType(transform->OutputType);
+ *transformDesc->MutableSettings() = transform->Settings;
+ }
}
diff --git a/ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp b/ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp
index e4c29bef408..7fcc73768b8 100644
--- a/ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp
+++ b/ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp
@@ -1051,7 +1051,6 @@ private:
executionPlanner.Destroy();
int level = 0;
-
// TODO: remove copy-paste
return WrapFutureCallback(future, [settings, startTime, localRun, type, fillSettings, level, graphParams, columns, enableFullResultWrite, state = State](const IDqGateway::TResult& res, const TExprNode::TPtr& input, TExprNode::TPtr& output, TExprContext& ctx) {
YQL_LOG(DEBUG) << state->SessionId << " WrapFutureCallback";
diff --git a/ydb/library/yql/providers/dq/provider/yql_dq_datasink_type_ann.cpp b/ydb/library/yql/providers/dq/provider/yql_dq_datasink_type_ann.cpp
index c8d9786a7a0..7d7f7b8fee7 100644
--- a/ydb/library/yql/providers/dq/provider/yql_dq_datasink_type_ann.cpp
+++ b/ydb/library/yql/providers/dq/provider/yql_dq_datasink_type_ann.cpp
@@ -34,6 +34,7 @@ public:
AddHandler({TDqSink::CallableName()}, Hndl(&NDq::AnnotateDqSink));
AddHandler({TDqWrite::CallableName()}, Hndl(&TDqsDataSinkTypeAnnotationTransformer::AnnotateDqWrite));
AddHandler({TDqQuery::CallableName()}, Hndl(&NDq::AnnotateDqQuery));
+ AddHandler({TTransformSettings::CallableName()}, Hndl(&NDq::AnnotateTransformSettings));
}
private:
diff --git a/ydb/library/yql/providers/function/expr_nodes/dq_function_expr_nodes.h b/ydb/library/yql/providers/function/expr_nodes/dq_function_expr_nodes.h
index 1d3b8923c0b..8d018f438b3 100644
--- a/ydb/library/yql/providers/function/expr_nodes/dq_function_expr_nodes.h
+++ b/ydb/library/yql/providers/function/expr_nodes/dq_function_expr_nodes.h
@@ -34,6 +34,32 @@ public:
}
};
+class TFunctionDataSink: public NGenerated::TFunctionDataSinkStub<TExprBase, TCallable, TCoAtom> {
+public:
+ explicit TFunctionDataSink(const TExprNode* node)
+ : TFunctionDataSinkStub(node)
+ {
+ }
+
+ explicit TFunctionDataSink(const TExprNode::TPtr& node)
+ : TFunctionDataSinkStub(node)
+ {
+ }
+
+ static bool Match(const TExprNode* node) {
+ if (!TFunctionDataSinkStub::Match(node)) {
+ return false;
+ }
+
+ if (node->Child(0)->Content() != FunctionProviderName) {
+ return false;
+ }
+
+ return true;
+ }
+};
+
+
#include <ydb/library/yql/providers/function/expr_nodes/dq_function_expr_nodes.defs.inl.h>
} // namespace NNodes
diff --git a/ydb/library/yql/providers/function/expr_nodes/dq_function_expr_nodes.json b/ydb/library/yql/providers/function/expr_nodes/dq_function_expr_nodes.json
index 16084c87559..c70a72fe51e 100644
--- a/ydb/library/yql/providers/function/expr_nodes/dq_function_expr_nodes.json
+++ b/ydb/library/yql/providers/function/expr_nodes/dq_function_expr_nodes.json
@@ -13,8 +13,18 @@
"Children": [
{"Index": 0, "Name": "Category", "Type": "TCoAtom"},
{"Index": 1, "Name": "Type", "Type": "TCoAtom"},
- {"Index": 2, "Name": "FunctionName", "Type": "TCoAtom"},
- {"Index": 3, "Name": "Connection", "Type": "TCoAtom"}
+ {"Index": 2, "Name": "Connection", "Type": "TCoAtom"}
+ ]
+ },
+ {
+ "Name": "TFunctionDataSink",
+ "Base": "TCallable",
+ "Definition": "Custom",
+ "Match": {"Type": "Callable", "Name": "DataSink"},
+ "Children": [
+ {"Index": 0, "Name": "Category", "Type": "TCoAtom"},
+ {"Index": 1, "Name": "Type", "Type": "TCoAtom"},
+ {"Index": 2, "Name": "Connection", "Type": "TCoAtom"}
]
}
]
diff --git a/ydb/library/yql/providers/function/gateway/dq_function_gateway.h b/ydb/library/yql/providers/function/gateway/dq_function_gateway.h
index 1404b7709c3..eb75d7d1bf1 100644
--- a/ydb/library/yql/providers/function/gateway/dq_function_gateway.h
+++ b/ydb/library/yql/providers/function/gateway/dq_function_gateway.h
@@ -14,7 +14,7 @@ class IDqFunctionGateway {
public:
using TPtr = std::shared_ptr<IDqFunctionGateway>;
- virtual NThreading::TFuture<TDqFunctionDescription> ResolveFunction(const TString& folderId, const TString& functionName);
+ virtual NThreading::TFuture<TDqFunctionDescription> ResolveFunction(const TString& folderId, const TString& functionName) = 0;
virtual ~IDqFunctionGateway() = default;
};
diff --git a/ydb/library/yql/providers/function/proto/CMakeLists.txt b/ydb/library/yql/providers/function/proto/CMakeLists.txt
new file mode 100644
index 00000000000..93df0adeaab
--- /dev/null
+++ b/ydb/library/yql/providers/function/proto/CMakeLists.txt
@@ -0,0 +1,31 @@
+
+# This file was gererated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(providers-function-proto)
+target_link_libraries(providers-function-proto PUBLIC
+ contrib-libs-cxxsupp
+ yutil
+ contrib-libs-protobuf
+)
+target_proto_messages(providers-function-proto PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/function/proto/dq_function.proto
+)
+target_proto_addincls(providers-function-proto
+ ./
+ ${CMAKE_SOURCE_DIR}/
+ ${CMAKE_BINARY_DIR}
+ ${CMAKE_SOURCE_DIR}
+ ${CMAKE_SOURCE_DIR}/contrib/libs/protobuf/src
+ ${CMAKE_BINARY_DIR}
+ ${CMAKE_SOURCE_DIR}/contrib/libs/protobuf/src
+)
+target_proto_outs(providers-function-proto
+ --cpp_out=${CMAKE_BINARY_DIR}/
+ --cpp_styleguide_out=${CMAKE_BINARY_DIR}/
+)
diff --git a/ydb/library/yql/providers/function/proto/dq_function.proto b/ydb/library/yql/providers/function/proto/dq_function.proto
new file mode 100644
index 00000000000..494fcb882c8
--- /dev/null
+++ b/ydb/library/yql/providers/function/proto/dq_function.proto
@@ -0,0 +1,8 @@
+syntax = "proto3";
+option cc_enable_arenas = true;
+
+package NYql.NProto;
+
+message TFunctionTransform {
+ string InvokeUrl = 1;
+}
diff --git a/ydb/library/yql/providers/function/provider/CMakeLists.txt b/ydb/library/yql/providers/function/provider/CMakeLists.txt
index 6daea338182..ab4c8e6f4d8 100644
--- a/ydb/library/yql/providers/function/provider/CMakeLists.txt
+++ b/ydb/library/yql/providers/function/provider/CMakeLists.txt
@@ -15,16 +15,24 @@ target_link_libraries(providers-function-provider PUBLIC
contrib-libs-cxxsupp
yutil
common-token_accessor-client
+ providers-common-dq
providers-common-provider
+ common-schema-mkql
providers-function-expr_nodes
providers-function-common
providers-function-gateway
+ providers-function-proto
library-yql-core
yql-core-expr_nodes
+ yql-dq-expr_nodes
+ yql-dq-opt
)
target_sources(providers-function-provider PRIVATE
${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/function/provider/dq_function_load_meta.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/function/provider/dq_function_intent_transformer.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/function/provider/dq_function_provider.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/function/provider/dq_function_datasource.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/function/provider/dq_function_datasink.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/function/provider/dq_function_physical_optimize.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/function/provider/dq_function_dq_integration.cpp
)
diff --git a/ydb/library/yql/providers/function/provider/dq_function_datasink.cpp b/ydb/library/yql/providers/function/provider/dq_function_datasink.cpp
new file mode 100644
index 00000000000..0e1335ddf97
--- /dev/null
+++ b/ydb/library/yql/providers/function/provider/dq_function_datasink.cpp
@@ -0,0 +1,85 @@
+#include "dq_function_provider_impl.h"
+
+#include <ydb/library/yql/core/expr_nodes/yql_expr_nodes.h>
+#include <ydb/library/yql/core/yql_expr_type_annotation.h>
+#include <ydb/library/yql/providers/common/provider/yql_data_provider_impl.h>
+#include <ydb/library/yql/providers/common/provider/yql_provider_names.h>
+#include <ydb/library/yql/dq/expr_nodes/dq_expr_nodes.h>
+#include <ydb/library/yql/providers/function/expr_nodes/dq_function_expr_nodes.h>
+
+namespace NYql::NDqFunction {
+
+namespace {
+
+using namespace NNodes;
+
+class TDqFunctionDataSink : public TDataProviderBase {
+public:
+ TDqFunctionDataSink(TDqFunctionState::TPtr state)
+ : State(std::move(state))
+ , PhysicalOptTransformer(CreateDqFunctionPhysicalOptTransformer(State))
+ , LoadMetaDataTransformer(CreateDqFunctionMetaLoader(State))
+ , IntentDeterminationTransformer(CreateDqFunctionIntentTransformer(State))
+ , DqIntegration(CreateDqFunctionDqIntegration(State))
+ {}
+
+ TStringBuf GetName() const override {
+ return FunctionProviderName;
+ }
+
+ bool ValidateParameters(TExprNode& node, TExprContext& ctx, TMaybe<TString>& cluster) override {
+ if (node.IsCallable(TCoDataSink::CallableName())) {
+ // (DataSink 'Function 'CloudFunction 'connection_name)
+ if (!EnsureArgsCount(node, 3, ctx)) {
+ return false;
+ }
+ if (node.Head().Content() == FunctionProviderName) {
+ TDqFunctionType functionType{node.Child(1)->Content()};
+ if (!State->GatewayFactory->IsKnownFunctionType(functionType)) {
+ ctx.AddError(TIssue(ctx.GetPosition(node.Pos()), TStringBuilder() <<
+ "Unknown EXTERNAL FUNCTION type '" << functionType << "'"));
+ return false;
+ }
+ cluster = Nothing();
+ return true;
+ }
+ }
+
+ ctx.AddError(TIssue(ctx.GetPosition(node.Pos()), "Invalid ExternalFunction DataSink parameters"));
+ return false;
+ }
+
+ bool CanParse(const TExprNode& node) override {
+ return node.IsCallable(TDqSqlExternalFunction::CallableName());
+ }
+
+ IGraphTransformer& GetPhysicalOptProposalTransformer() override {
+ return *PhysicalOptTransformer;
+ }
+
+ IGraphTransformer& GetLoadTableMetadataTransformer() override {
+ return *LoadMetaDataTransformer;
+ }
+
+ IGraphTransformer& GetIntentDeterminationTransformer() override {
+ return *IntentDeterminationTransformer;
+ }
+
+ IDqIntegration* GetDqIntegration() override {
+ return DqIntegration.Get();
+ }
+
+private:
+ const TDqFunctionState::TPtr State;
+ const THolder<IGraphTransformer> PhysicalOptTransformer;
+ const THolder<IGraphTransformer> LoadMetaDataTransformer;
+ const THolder<TVisitorTransformerBase> IntentDeterminationTransformer;
+ const THolder<IDqIntegration> DqIntegration;
+};
+} // namespace
+
+TIntrusivePtr<IDataProvider> CreateDqFunctionDataSink(TDqFunctionState::TPtr state) {
+ return new TDqFunctionDataSink(std::move(state));
+}
+
+} \ No newline at end of file
diff --git a/ydb/library/yql/providers/function/provider/dq_function_datasource.cpp b/ydb/library/yql/providers/function/provider/dq_function_datasource.cpp
index 0889cdb7112..7519e7a6a28 100644
--- a/ydb/library/yql/providers/function/provider/dq_function_datasource.cpp
+++ b/ydb/library/yql/providers/function/provider/dq_function_datasource.cpp
@@ -16,7 +16,6 @@ public:
TDqFunctionDataSource(TDqFunctionState::TPtr state)
: State(std::move(state))
, LoadMetaDataTransformer(CreateDqFunctionMetaLoader(State))
- , IntentDeterminationTransformer(CreateDqFunctionIntentTransformer(State))
{}
TStringBuf GetName() const override {
@@ -25,8 +24,7 @@ public:
bool ValidateParameters(TExprNode& node, TExprContext& ctx, TMaybe<TString>& cluster) override {
if (node.IsCallable(TCoDataSource::CallableName())) {
- //(DataSource 'ExternalFunction 'CloudFunction 'my_function_name 'connection_name)
- if (!EnsureArgsCount(node, 4, ctx)) {
+ if (!EnsureArgsCount(node, 3, ctx)) {
return false;
}
if (node.Head().Content() == FunctionProviderName) {
@@ -49,14 +47,9 @@ public:
return *LoadMetaDataTransformer;
}
- IGraphTransformer& GetIntentDeterminationTransformer() override {
- return *IntentDeterminationTransformer;
- }
-
private:
const TDqFunctionState::TPtr State;
const THolder<IGraphTransformer> LoadMetaDataTransformer;
- const THolder<TVisitorTransformerBase> IntentDeterminationTransformer;
};
}
diff --git a/ydb/library/yql/providers/function/provider/dq_function_dq_integration.cpp b/ydb/library/yql/providers/function/provider/dq_function_dq_integration.cpp
new file mode 100644
index 00000000000..56db4e968d0
--- /dev/null
+++ b/ydb/library/yql/providers/function/provider/dq_function_dq_integration.cpp
@@ -0,0 +1,71 @@
+#include "dq_function_provider_impl.h"
+
+#include <ydb/library/yql/providers/common/dq/yql_dq_integration_impl.h>
+#include <ydb/library/yql/dq/expr_nodes/dq_expr_nodes.h>
+#include <ydb/library/yql/providers/function/expr_nodes/dq_function_expr_nodes.h>
+#include <ydb/library/yql/providers/function/proto/dq_function.pb.h>
+#include <ydb/library/yql/providers/common/schema/mkql/yql_mkql_schema.h>
+#include <ydb/library/yql/providers/common/schema/expr/yql_expr_schema.h>
+
+#include <util/generic/ptr.h>
+
+namespace NYql::NDqFunction {
+namespace {
+
+using namespace NNodes;
+
+class TDqFunctionDqIntegration: public TDqIntegrationBase {
+public:
+ TDqFunctionDqIntegration(TDqFunctionState::TPtr state)
+ : State(state)
+ {
+ }
+
+ void FillTransformSettings(const TExprNode& node, ::google::protobuf::Any& transformSettings) override {
+ auto maybeDqSink = TMaybeNode<TDqSink>(&node);
+ if (!maybeDqSink)
+ return;
+
+ auto dqSink = maybeDqSink.Cast();
+ auto maybeSettings = TMaybeNode<TTransformSettings>(dqSink.Settings().Raw());
+ if (!maybeSettings) {
+ return;
+ }
+
+ auto functionSettings = maybeSettings.Cast();
+ NYql::NProto::TFunctionTransform transform;
+ for (size_t i = 0; i < functionSettings.Other().Size(); i++) {
+ TCoNameValueTuple setting = functionSettings.Other().Item(i);
+ const TStringBuf name = Name(setting);
+ if (name == "invoke_url") {
+ transform.SetInvokeUrl(TString(Value(setting)));
+ }
+ }
+
+ transformSettings.PackFrom(transform);
+ }
+
+ static TStringBuf Name(const TCoNameValueTuple& nameValue) {
+ return nameValue.Name().Value();
+ }
+
+ static TStringBuf Value(const TCoNameValueTuple& nameValue) {
+ if (TMaybeNode<TExprBase> maybeValue = nameValue.Value()) {
+ const TExprNode& value = maybeValue.Cast().Ref();
+ YQL_ENSURE(value.IsAtom());
+ return value.Content();
+ }
+
+ return {};
+ }
+
+private:
+ TDqFunctionState::TPtr State;
+};
+} // namespace
+
+THolder<IDqIntegration> CreateDqFunctionDqIntegration(TDqFunctionState::TPtr state) {
+ return MakeHolder<TDqFunctionDqIntegration>(state);
+}
+
+} \ No newline at end of file
diff --git a/ydb/library/yql/providers/function/provider/dq_function_intent_transformer.cpp b/ydb/library/yql/providers/function/provider/dq_function_intent_transformer.cpp
index d978ffde027..1b0ceaffc27 100644
--- a/ydb/library/yql/providers/function/provider/dq_function_intent_transformer.cpp
+++ b/ydb/library/yql/providers/function/provider/dq_function_intent_transformer.cpp
@@ -2,6 +2,7 @@
#include <ydb/library/yql/providers/common/transform/yql_visit.h>
#include <ydb/library/yql/providers/function/expr_nodes/dq_function_expr_nodes.h>
+#include <ydb/library/yql/dq/expr_nodes/dq_expr_nodes.h>
namespace NYql::NDqFunction {
namespace {
@@ -14,21 +15,28 @@ public:
: TVisitorTransformerBase(false)
, State(state)
{
- AddHandler({TFunctionDataSource::CallableName()}, Hndl(&TDqFunctionIntentTransformer::HandleDataSource));
+ AddHandler({TDqSqlExternalFunction::CallableName()}, Hndl(&TDqFunctionIntentTransformer::ExtractFunctionsName));
}
- TStatus HandleDataSource(TExprBase input, TExprContext& ctx) {
- Y_UNUSED(ctx);
+ TStatus ExtractFunctionsName(TExprBase input, TExprContext& ctx) {
+ auto sqlFunction = input.Cast<TDqSqlExternalFunction>();
+ auto functionType = TString{sqlFunction.TransformType().Ref().Tail().Content()};
+ auto functionName = TString{sqlFunction.TransformName().Ref().Tail().Content()};
+ TString connection;
+ for (const auto &tuple: sqlFunction.Settings().Ref().Children()) {
+ const auto paramName = tuple->Head().Content();
+ if (paramName == "connection") {
+ connection = TString{tuple->Tail().Tail().Content()};
+ }
+ }
+
+ if (connection.empty()) {
+ ctx.AddError(TIssue(ctx.GetPosition(input.Pos()), TStringBuilder()
+ << "Empty CONNECTION name for EXTERNAL FUNCTION '" << functionName << "'"));
+ return TStatus::Error;
+ }
- if (!TFunctionDataSource::Match(input.Raw()))
- return TStatus::Ok;
-
- auto source = input.Cast<TFunctionDataSource>();
- TDqFunctionType functionType{source.Type().Value()};
- TString functionName{source.FunctionName().Value()};
- TString connection{source.Connection().Value()};
State->FunctionsResolver->AddFunction(functionType, functionName, connection);
-
return TStatus::Ok;
}
diff --git a/ydb/library/yql/providers/function/provider/dq_function_load_meta.cpp b/ydb/library/yql/providers/function/provider/dq_function_load_meta.cpp
index 9f6b9beca12..1dcab11d1fc 100644
--- a/ydb/library/yql/providers/function/provider/dq_function_load_meta.cpp
+++ b/ydb/library/yql/providers/function/provider/dq_function_load_meta.cpp
@@ -35,16 +35,18 @@ public:
std::vector<NThreading::TFuture<void>> resolverHandles;
resolverHandles.reserve(functions.size());
auto resolverContext = ResolverContext;
+ const auto position = ctx.GetPosition(input->Pos());
for (auto functionDesc : functions) {
auto gateway = State->GatewayFactory->CreateDqFunctionGateway(
- functionDesc.Type, {}, functionDesc.Connection);
+ functionDesc.Type, State->SecureParams, functionDesc.Connection);
+
auto future = gateway->ResolveFunction(State->ScopeFolderId, functionDesc.FunctionName);
- resolverHandles.push_back(future.Apply([resolverContext]
+ resolverHandles.push_back(future.Apply([resolverContext, position]
(const NThreading::TFuture<TDqFunctionDescription>& future) {
try {
resolverContext->FunctionsDescription.emplace(future.GetValue());
} catch (const std::exception& e) {
- resolverContext->ResolveIssues.push_back(ExceptionToIssue(e));
+ resolverContext->ResolveIssues.push_back(ExceptionToIssue(e, position));
}
}));
}
@@ -63,6 +65,7 @@ public:
}
TStatus DoApplyAsyncChanges(TExprNode::TPtr input, TExprNode::TPtr& output, TExprContext& ctx) final {
+
Y_UNUSED(ctx);
YQL_ENSURE(AllFutures.HasValue());
output = input;
diff --git a/ydb/library/yql/providers/function/provider/dq_function_physical_optimize.cpp b/ydb/library/yql/providers/function/provider/dq_function_physical_optimize.cpp
new file mode 100644
index 00000000000..4bf88da9885
--- /dev/null
+++ b/ydb/library/yql/providers/function/provider/dq_function_physical_optimize.cpp
@@ -0,0 +1,182 @@
+#include "dq_function_provider_impl.h"
+
+#include <ydb/library/yql/dq/expr_nodes/dq_expr_nodes.h>
+#include <ydb/library/yql/providers/common/transform/yql_optimize.h>
+#include <ydb/library/yql/dq/opt/dq_opt_phy.h>
+#include <ydb/library/yql/providers/function/expr_nodes/dq_function_expr_nodes.h>
+#include <ydb/library/yql/providers/common/provider/yql_provider_names.h>
+
+namespace NYql::NDqFunction {
+namespace {
+
+using namespace NNodes;
+using namespace NDq;
+
+class TDqFunctionPhysicalOptTransformer : public TOptimizeTransformerBase {
+public:
+ TDqFunctionPhysicalOptTransformer(TDqFunctionState::TPtr state)
+ : TOptimizeTransformerBase(state->Types, NLog::EComponent::ProviderDq, {})
+ , State(state)
+ {
+#define HNDL(name) "PhysicalOptimizer-"#name, Hndl(&TDqFunctionPhysicalOptTransformer::name)
+ // (Apply (SqlExternalFunction ..) ..) to stage
+ AddHandler(0, &TCoApply::Match, HNDL(DqBuildExtFunctionStage));
+#undef HNDL
+
+ SetGlobal(0); // Stage 0 of this optimizer is global => we can remap nodes.
+ }
+
+ TMaybeNode<TExprBase> DqBuildExtFunctionStage(TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx, const TGetParents& getParents) const {
+ Y_UNUSED(optCtx);
+
+ auto apply = node.Cast<TCoApply>();
+ auto callable = apply.Callable().Maybe<TDqSqlExternalFunction>();
+ if (!callable
+ || apply.Args().Count() != 2
+ || !apply.Arg(1).Maybe<TDqCnUnionAll>()) {
+
+ return node;
+ }
+ callable = callable.Cast();
+ TDqCnUnionAll nodeInput {apply.Arg(1).Cast<TDqCnUnionAll>()};
+
+ const TParentsMap* parentsMap = getParents();
+ if (!IsSingleConsumerConnection(nodeInput, *parentsMap, false)) {
+ YQL_ENSURE(false, "Allow only single external function stage usage");
+ }
+
+ const auto shuffleColumn = Build<TCoAtom>(ctx, node.Pos())
+ .Value("_yql_transform_shuffle")
+ .Done();
+ auto addShuffleColumn = Build<TCoLambda>(ctx, node.Pos())
+ .Args({"stream"})
+ .Body<TCoMap>()
+ .Input("stream")
+ .Lambda()
+ .Args({"row"})
+ .Body<TCoAddMember>()
+ .Struct("row")
+ .Name(shuffleColumn)
+ .Item<TCoRandom>().Add<TCoDependsOn>().Input("row").Build().Build()
+ .Build()
+ .Build()
+ .Build()
+ .Done();
+ auto removeShuffleColumn = Build<TCoLambda>(ctx, node.Pos())
+ .Args({"row"})
+ .Body<TCoForceRemoveMember>()
+ .Struct("row")
+ .Name(shuffleColumn)
+ .Build()
+ .Done();
+
+ auto transformType = callable.TransformType().Cast<TCoString>().Literal().StringValue();
+ auto transformName = callable.TransformName().Cast<TCoString>().Literal().StringValue();
+
+ TString connectionName;
+ TExprNode::TPtr inputType;
+ TExprNode::TPtr outputType;
+
+ TVector<TCoNameValueTuple> settings;
+ for (const auto &tuple: callable.Settings().Ref().Children()) {
+ const auto paramName = tuple->Head().Content();
+ if (paramName == "connection") {
+ connectionName = TString{tuple->Tail().Tail().Content()};
+ } else if (paramName == "input_type") {
+ inputType = tuple->TailPtr();
+ } else if (paramName == "output_type") {
+ outputType = tuple->TailPtr();
+ } else {
+ auto setting = Build<TCoNameValueTuple>(ctx, node.Pos())
+ .Name().Build(paramName)
+ .Value(tuple->TailPtr())
+ .Done();
+ settings.push_back(setting);
+ }
+ }
+
+ const auto description = State->FunctionsDescription.find(TDqFunctionDescription{
+ .Type = transformType,
+ .FunctionName = transformName,
+ .Connection = connectionName
+ });
+
+ YQL_ENSURE(description != State->FunctionsDescription.end(), "External function meta doesn't found " << transformName);
+ settings.push_back(
+ Build<TCoNameValueTuple>(ctx, node.Pos())
+ .Name().Build("invoke_url")
+ .Value<TCoAtom>().Build(description->InvokeUrl)
+ .Done()
+ );
+
+ auto settingsBuilder = Build<TCoNameValueTupleList>(ctx, node.Pos())
+ .Add(settings)
+ .Done();
+
+ auto stage = nodeInput.Output().Stage().Cast<TDqStage>();
+ auto dutyColumn = DqPushLambdaToStage(stage, nodeInput.Output().Index(), addShuffleColumn, {}, ctx, optCtx);
+ YQL_ENSURE(dutyColumn);
+
+ auto transformSink = Build<TFunctionDataSink>(ctx, node.Pos())
+ .Category().Build(FunctionProviderName)
+ .Type<TCoAtom>().Build(transformType)
+ .Connection().Build(connectionName)
+ .Done();
+
+ auto sinkSettings = Build<TTransformSettings>(ctx, node.Pos())
+ .Type<TCoAtom>().Build(transformType)
+ .InputType(inputType)
+ .OutputType(outputType)
+ .Other(settingsBuilder)
+ .Done();
+
+ auto dqSink = Build<TDqSink>(ctx, node.Pos())
+ .DataSink(transformSink)
+ .Settings(sinkSettings)
+ .Index().Build("0")
+ .Done();
+
+ auto transformStage = Build<TDqStage>(ctx, node.Pos())
+ .Inputs()
+ .Add<TDqCnHashShuffle>()
+ .KeyColumns()
+ .Add({shuffleColumn})
+ .Build()
+ .Output()
+ .Stage(dutyColumn.Cast())
+ .Index(nodeInput.Output().Index())
+ .Build()
+ .Build()
+ .Build()
+ .Program()
+ .Args({"row"})
+ .Body<TCoMap>()
+ .Lambda(removeShuffleColumn)
+ .Input("row")
+ .Build()
+ .Build()
+ .Settings().Build()
+ .Sinks().Add(dqSink).Build()
+ .Done();
+
+ auto externalStage = Build<TDqCnUnionAll>(ctx, node.Pos())
+ .Output()
+ .Stage(transformStage)
+ .Index().Build("0")
+ .Build()
+ .Done();
+
+ return externalStage;
+ }
+
+private:
+ TDqFunctionState::TPtr State;
+};
+
+} // namespace
+
+THolder<IGraphTransformer> CreateDqFunctionPhysicalOptTransformer(TDqFunctionState::TPtr state) {
+ return MakeHolder<TDqFunctionPhysicalOptTransformer>(state);
+}
+
+} \ No newline at end of file
diff --git a/ydb/library/yql/providers/function/provider/dq_function_provider.cpp b/ydb/library/yql/providers/function/provider/dq_function_provider.cpp
index fbcf9ce7cd8..08bb1dc0805 100644
--- a/ydb/library/yql/providers/function/provider/dq_function_provider.cpp
+++ b/ydb/library/yql/providers/function/provider/dq_function_provider.cpp
@@ -12,9 +12,10 @@ TDataProviderInitializer GetDqFunctionDataProviderInitializer(
ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory,
TDqFunctionGatewayFactory::TPtr gatewayFactory,
// TRunActorParams.TScope
- const TString& scopeFolderId) {
+ const TString& scopeFolderId,
+ const THashMap<TString, TString>& secureParams) {
- return [credentialsFactory, gatewayFactory, scopeFolderId] (
+ return [credentialsFactory, gatewayFactory, scopeFolderId, secureParams] (
const TString& userName,
const TString& sessionId,
const TGatewaysConfig* gatewaysConfig,
@@ -29,19 +30,20 @@ TDataProviderInitializer GetDqFunctionDataProviderInitializer(
Y_UNUSED(gatewaysConfig);
Y_UNUSED(functionRegistry);
Y_UNUSED(randomProvider);
- Y_UNUSED(typeCtx);
Y_UNUSED(progressWriter);
Y_UNUSED(operationOptions);
auto state = MakeIntrusive<TDqFunctionState>();
state->SessionId = sessionId;
+ state->Types = typeCtx.Get();
state->GatewayFactory = gatewayFactory;
state->ScopeFolderId = scopeFolderId;
+ state->SecureParams = secureParams;
TDataProviderInfo provider;
provider.Names.insert({TString{FunctionProviderName}});
provider.Source = CreateDqFunctionDataSource(state);
- // TODO Sink
+ provider.Sink = CreateDqFunctionDataSink(state);
return provider;
};
}
diff --git a/ydb/library/yql/providers/function/provider/dq_function_provider.h b/ydb/library/yql/providers/function/provider/dq_function_provider.h
index 2bcfdc076cc..116b0d161af 100644
--- a/ydb/library/yql/providers/function/provider/dq_function_provider.h
+++ b/ydb/library/yql/providers/function/provider/dq_function_provider.h
@@ -16,13 +16,17 @@ struct TDqFunctionState : public TThrRefBase {
TString SessionId;
TString ScopeFolderId;
+ TTypeAnnotationContext* Types = nullptr;
+
TDqFunctionResolver::TPtr FunctionsResolver = MakeIntrusive<TDqFunctionResolver>();
TDqFunctionGatewayFactory::TPtr GatewayFactory;
TDqFunctionsSet FunctionsDescription;
+
+ THashMap<TString, TString> SecureParams;
};
TIntrusivePtr<IDataProvider> CreateDqFunctionDataSource(TDqFunctionState::TPtr state);
-//TIntrusivePtr<IDataProvider> CreateDqFunctionDataSink(TDqFunctionState::TPtr state);
+TIntrusivePtr<IDataProvider> CreateDqFunctionDataSink(TDqFunctionState::TPtr state);
}
namespace NYql {
@@ -30,6 +34,7 @@ namespace NYql {
TDataProviderInitializer GetDqFunctionDataProviderInitializer(
ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory,
TDqFunctionGatewayFactory::TPtr gatewayFactory,
- const TString& scopeFolderId = {});
+ const TString& scopeFolderId = {},
+ const THashMap<TString, TString>& secureParams = {});
} \ No newline at end of file
diff --git a/ydb/library/yql/providers/function/provider/dq_function_provider_impl.h b/ydb/library/yql/providers/function/provider/dq_function_provider_impl.h
index 6c37e858eb6..90991e711db 100644
--- a/ydb/library/yql/providers/function/provider/dq_function_provider_impl.h
+++ b/ydb/library/yql/providers/function/provider/dq_function_provider_impl.h
@@ -4,10 +4,13 @@
#include <ydb/library/yql/providers/common/transform/yql_visit.h>
#include <ydb/library/yql/core/yql_graph_transformer.h>
+#include <ydb/library/yql/providers/dq/interface/yql_dq_integration.h>
-namespace NYql {
+namespace NYql::NDqFunction {
THolder<TVisitorTransformerBase> CreateDqFunctionIntentTransformer(TDqFunctionState::TPtr state);
THolder<IGraphTransformer> CreateDqFunctionMetaLoader(TDqFunctionState::TPtr state);
+THolder<IGraphTransformer> CreateDqFunctionPhysicalOptTransformer(TDqFunctionState::TPtr state);
+THolder<IDqIntegration> CreateDqFunctionDqIntegration(TDqFunctionState::TPtr state);
} \ No newline at end of file