aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorhrustyashko <hrustyashko@yandex-team.ru>2022-04-29 13:43:23 +0300
committerhrustyashko <hrustyashko@yandex-team.ru>2022-04-29 13:43:23 +0300
commitfd5635f15404f5918d4c5f144d68f3ff29c2710d (patch)
treeba87130969cd82e30c55cc1e552b6e9e8271f4c1
parentde8610d1f1d1b9af38c38abd4540416c51abfc00 (diff)
downloadydb-fd5635f15404f5918d4c5f144d68f3ff29c2710d.tar.gz
new callable DqTransform; explicitly describe stage's outputs
ref:abc5e03bfc6c96549bc291fda72eced93387d11c
-rw-r--r--ydb/library/yql/dq/expr_nodes/dq_expr_nodes.h26
-rw-r--r--ydb/library/yql/dq/expr_nodes/dq_expr_nodes.json38
-rw-r--r--ydb/library/yql/dq/opt/dq_opt_build.cpp4
-rw-r--r--ydb/library/yql/dq/opt/dq_opt_phy_finalizing.cpp4
-rw-r--r--ydb/library/yql/dq/type_ann/dq_type_ann.cpp60
-rw-r--r--ydb/library/yql/dq/type_ann/dq_type_ann.h2
-rw-r--r--ydb/library/yql/providers/dq/planner/execution_planner.cpp65
-rw-r--r--ydb/library/yql/providers/dq/provider/yql_dq_datasink_type_ann.cpp4
-rw-r--r--ydb/library/yql/providers/function/expr_nodes/dq_function_expr_nodes.json8
-rw-r--r--ydb/library/yql/providers/function/provider/CMakeLists.txt1
-rw-r--r--ydb/library/yql/providers/function/provider/dq_function_datasink.cpp10
-rw-r--r--ydb/library/yql/providers/function/provider/dq_function_dq_integration.cpp29
-rw-r--r--ydb/library/yql/providers/function/provider/dq_function_physical_optimize.cpp34
-rw-r--r--ydb/library/yql/providers/function/provider/dq_function_provider_impl.h1
-rw-r--r--ydb/library/yql/providers/function/provider/dq_function_type_ann.cpp35
-rw-r--r--ydb/library/yql/providers/pq/provider/yql_pq_physical_optimize.cpp10
-rw-r--r--ydb/library/yql/providers/solomon/provider/yql_solomon_physical_optimize.cpp10
17 files changed, 195 insertions, 146 deletions
diff --git a/ydb/library/yql/dq/expr_nodes/dq_expr_nodes.h b/ydb/library/yql/dq/expr_nodes/dq_expr_nodes.h
index 48cfcfeb79a..61094133195 100644
--- a/ydb/library/yql/dq/expr_nodes/dq_expr_nodes.h
+++ b/ydb/library/yql/dq/expr_nodes/dq_expr_nodes.h
@@ -38,6 +38,32 @@ public:
}
};
+class TDqOutputAnnotationBase : public NGenerated::TDqOutputAnnotationBaseStub<TExprBase, TCallable, TCoAtom> {
+public:
+ explicit TDqOutputAnnotationBase(const TExprNode* node)
+ : TDqOutputAnnotationBaseStub(node) {}
+
+ explicit TDqOutputAnnotationBase(const TExprNode::TPtr& node)
+ : TDqOutputAnnotationBaseStub(node) {}
+
+ static bool Match(const TExprNode* node) {
+ if (!node) {
+ return false;
+ }
+
+ if (!node->IsCallable()) {
+ return false;
+ }
+
+ if (node->ChildrenSize() < 2) {
+ return false;
+ }
+
+ return TCoAtom::Match(node->Child(0))
+ && TCallable::Match(node->Child(1));
+ }
+};
+
#include <ydb/library/yql/dq/expr_nodes/dq_expr_nodes.defs.inl.h>
} // namespace NYql::NDq
diff --git a/ydb/library/yql/dq/expr_nodes/dq_expr_nodes.json b/ydb/library/yql/dq/expr_nodes/dq_expr_nodes.json
index 0041a497154..46d880b3a50 100644
--- a/ydb/library/yql/dq/expr_nodes/dq_expr_nodes.json
+++ b/ydb/library/yql/dq/expr_nodes/dq_expr_nodes.json
@@ -66,31 +66,39 @@
]
},
{
- "Name": "TDqSink",
+ "Name": "TDqOutputAnnotationBase",
"Base": "TCallable",
- "Match": {"Type": "Callable", "Name": "DqSink"},
+ "Definition": "Custom",
+ "Builder": {"Generate": "None"},
"Children": [
- {"Index": 0, "Name": "DataSink", "Type": "TCallable"},
- {"Index": 1, "Name": "Settings", "Type": "TExprBase"},
- {"Index": 2, "Name": "Index", "Type": "TCoAtom"}
+ {"Index": 0, "Name": "Index", "Type": "TCoAtom"},
+ {"Index": 1, "Name": "DataSink", "Type": "TCallable"}
]
},
{
- "Name": "TDqSinksList",
- "ListBase": "TDqSink"
+ "Name": "TDqTransform",
+ "Base": "TDqOutputAnnotationBase",
+ "Match": {"Type": "Callable", "Name": "DqTransform"},
+ "Children": [
+ {"Index": 2, "Name": "Type", "Type": "TCoAtom"},
+ {"Index": 3, "Name": "InputType", "Type": "TExprBase"},
+ {"Index": 4, "Name": "OutputType", "Type": "TExprBase"},
+ {"Index": 5, "Name": "Settings", "Type": "TCallable"}
+ ]
},
{
- "Name": "TTransformSettings",
- "Base": "TCallable",
- "Match": {"Type": "Callable", "Name": "TransformSettings"},
+ "Name": "TDqSink",
+ "Base": "TDqOutputAnnotationBase",
+ "Match": {"Type": "Callable", "Name": "DqSink"},
"Children": [
- {"Index": 0, "Name": "Type", "Type": "TCoAtom"},
- {"Index": 1, "Name": "InputType", "Type": "TExprBase"},
- {"Index": 2, "Name": "OutputType", "Type": "TExprBase"},
- {"Index": 3, "Name": "Other", "Type": "TCoNameValueTupleList"}
+ {"Index": 2, "Name": "Settings", "Type": "TCallable"}
]
},
{
+ "Name": "TDqStageOutputsList",
+ "ListBase": "TDqOutputAnnotationBase"
+ },
+ {
"Name": "TDqStageBase",
"Base": "TCallable",
"Match": {"Type": "CallableBase"},
@@ -99,7 +107,7 @@
{"Index": 0, "Name": "Inputs", "Type": "TExprList"},
{"Index": 1, "Name": "Program", "Type": "TCoLambda"},
{"Index": 2, "Name": "Settings", "Type": "TCoNameValueTupleList"},
- {"Index": 3, "Name": "Sinks", "Type": "TDqSinksList", "Optional": true}
+ {"Index": 3, "Name": "Outputs", "Type": "TDqStageOutputsList", "Optional": true}
]
},
{
diff --git a/ydb/library/yql/dq/opt/dq_opt_build.cpp b/ydb/library/yql/dq/opt/dq_opt_build.cpp
index 175b58cc208..600dd21b7d9 100644
--- a/ydb/library/yql/dq/opt/dq_opt_build.cpp
+++ b/ydb/library/yql/dq/opt/dq_opt_build.cpp
@@ -145,7 +145,7 @@ void MakeConsumerReplaces(
.Body(ctx.ReplaceNodes(stageResult.Cast().Ptr(), argsMap))
.Build()
.Settings(TDqStageSettings::New(dqStage).BuildNode(ctx, dqStage.Pos()))
- .Sinks(dqStage.Sinks())
+ .Outputs(dqStage.Outputs())
.Done();
for (ui32 i = 0; i < consumers.size(); ++i) {
@@ -319,7 +319,7 @@ public:
.Body(newBody)
.Build()
.Settings(TDqStageSettings::New(stage).BuildNode(ctx, stage.Pos()))
- .Sinks(stage.Sinks())
+ .Outputs(stage.Outputs())
.Done();
replaces.emplace(stage.Raw(), newStage.Ptr());
diff --git a/ydb/library/yql/dq/opt/dq_opt_phy_finalizing.cpp b/ydb/library/yql/dq/opt/dq_opt_phy_finalizing.cpp
index 13f8016398e..a4204becfe9 100644
--- a/ydb/library/yql/dq/opt/dq_opt_phy_finalizing.cpp
+++ b/ydb/library/yql/dq/opt/dq_opt_phy_finalizing.cpp
@@ -392,8 +392,8 @@ bool GatherConsumersImpl(const TExprNode& node, TNodeMap<TNodeMultiSet>& consume
return false;
}
- if (stage.Sinks()) {
- if (!GatherConsumersImpl(stage.Sinks().Ref(), consumers, visited)) {
+ if (stage.Outputs()) {
+ if (!GatherConsumersImpl(stage.Outputs().Ref(), consumers, visited)) {
return false;
}
}
diff --git a/ydb/library/yql/dq/type_ann/dq_type_ann.cpp b/ydb/library/yql/dq/type_ann/dq_type_ann.cpp
index cefe4c7fa3c..78db19b894d 100644
--- a/ydb/library/yql/dq/type_ann/dq_type_ann.cpp
+++ b/ydb/library/yql/dq/type_ann/dq_type_ann.cpp
@@ -163,51 +163,51 @@ TStatus AnnotateStage(const TExprNode::TPtr& stage, TExprContext& ctx) {
}
TVector<const TTypeAnnotationNode*> stageResultTypes;
- if (TDqStageBase::idx_Sinks < stage->ChildrenSize()) {
- YQL_ENSURE(stage->Child(TDqStageBase::idx_Sinks)->ChildrenSize() != 0, "Sink list exists but empty, stage: " << stage->Dump());
+ if (TDqStageBase::idx_Outputs < stage->ChildrenSize()) {
+ YQL_ENSURE(stage->Child(TDqStageBase::idx_Outputs)->ChildrenSize() != 0, "Stage.Outputs list exists but empty, stage: " << stage->Dump());
auto outputsNumber = programResultTypesTuple.size();
- TVector<TExprNode::TPtr> transformSinks;
- TVector<TExprNode::TPtr> pureSinks;
- transformSinks.reserve(outputsNumber);
- pureSinks.reserve(outputsNumber);
- for (const auto& sink: stage->Child(TDqStageBase::idx_Sinks)->Children()) {
- const ui64 index = FromString(sink->Child(TDqSink::idx_Index)->Content());
+ TVector<TExprNode::TPtr> transforms;
+ TVector<TExprNode::TPtr> sinks;
+ transforms.reserve(outputsNumber);
+ sinks.reserve(outputsNumber);
+ for (const auto& output: stage->Child(TDqStageBase::idx_Outputs)->Children()) {
+ const ui64 index = FromString(output->Child(TDqOutputAnnotationBase::idx_Index)->Content());
if (index >= outputsNumber) {
ctx.AddError(TIssue(ctx.GetPosition(stage->Pos()), TStringBuilder()
<< "Sink/Transform try to process un-existing lambda's output"));
return TStatus::Error;
}
- auto transformSettings = sink->Child(TDqSink::idx_Settings)->IsCallable(TTransformSettings::CallableName());
- if (transformSettings) {
- transformSinks.push_back(sink);
+ if (output->IsCallable(TDqSink::CallableName())) {
+ sinks.push_back(output);
+ } else if (output->IsCallable(TDqTransform::CallableName())) {
+ transforms.push_back(output);
} else {
- pureSinks.push_back(sink);
+ YQL_ENSURE(false, "Unknown stage output type " << output->Content());
}
}
- if (!transformSinks.empty() && !pureSinks.empty()
- && transformSinks.size() != pureSinks.size()) {
+ if (!transforms.empty() && !sinks.empty()
+ && transforms.size() != sinks.size()) {
ctx.AddError(TIssue(ctx.GetPosition(stage->Pos()), TStringBuilder()
<< "Not every transform has a corresponding sink"));
return TStatus::Error;
}
- if (!pureSinks.empty()) {
- for (auto sink : pureSinks) {
+ if (!sinks.empty()) {
+ for (auto sink : sinks) {
sink->SetTypeAnn(resultType);
}
stageResultTypes.assign(programResultTypesTuple.begin(), programResultTypesTuple.end());
} else {
- for (auto sink : transformSinks) {
- auto* sinkType = sink->GetTypeAnn();
- if (sinkType->GetKind() != ETypeAnnotationKind::List
- && sinkType->GetKind() != ETypeAnnotationKind::Void) {
+ for (auto transform : transforms) {
+ auto* type = transform->GetTypeAnn();
+ if (type->GetKind() != ETypeAnnotationKind::List) {
- ctx.AddError(TIssue(ctx.GetPosition(sink->Pos()), TStringBuilder()
- << "Expected List or Void type, but got: " << *sinkType));
+ ctx.AddError(TIssue(ctx.GetPosition(transform->Pos()), TStringBuilder()
+ << "Expected List type, but got: " << *type));
return TStatus::Error;
}
/* auto* itemType = sinkType->Cast<TListExprType>()->GetItemType();
@@ -216,7 +216,7 @@ TStatus AnnotateStage(const TExprNode::TPtr& stage, TExprContext& ctx) {
<< "Expected List<Struct<...>> type, but got: List<" << *itemType << ">"));
return TStatus::Error;
} */
- stageResultTypes.emplace_back(sinkType);
+ stageResultTypes.emplace_back(type);
}
}
} else {
@@ -810,18 +810,18 @@ TStatus AnnotateDqQuery(const TExprNode::TPtr& input, TExprContext& ctx) {
return TStatus::Ok;
}
-TStatus AnnotateTransformSettings(const TExprNode::TPtr& input, TExprContext& ctx) {
- if (!EnsureArgsCount(*input, 4U, ctx)) {
+TStatus AnnotateDqTransform(const TExprNode::TPtr& input, TExprContext& ctx) {
+ if (!EnsureArgsCount(*input, 6U, ctx)) {
return TStatus::Error;
}
- const TExprNode* outputArg = input->Child(TTransformSettings::idx_OutputType);
+ const TExprNode* outputArg = input->Child(TDqTransform::idx_OutputType);
if (!EnsureTypeWithStructType(*outputArg, ctx)) {
return IGraphTransformer::TStatus::Error;
}
const TTypeAnnotationNode* outputType = outputArg->GetTypeAnn()->Cast<TTypeExprType>()->GetType();
- const TExprNode* inputType = input->Child(TTransformSettings::idx_InputType);
+ const TExprNode* inputType = input->Child(TDqTransform::idx_InputType);
if (!EnsureTypeWithStructType(*inputType, ctx)) {
return IGraphTransformer::TStatus::Error;
}
@@ -933,8 +933,8 @@ THolder<IGraphTransformer> CreateDqTypeAnnotationTransformer(TTypeAnnotationCont
return AnnotateDqSink(input, ctx);
}
- if (TTransformSettings::Match(input.Get())) {
- return AnnotateTransformSettings (input, ctx);
+ if (TDqTransform::Match(input.Get())) {
+ return AnnotateDqTransform(input, ctx);
}
if (TDqQuery::Match(input.Get())) {
@@ -1025,4 +1025,4 @@ TString PrintDqStageOnly(const TDqStageBase& stage, TExprContext& ctx) {
return NCommon::ExprToPrettyString(ctx, *newStage);
}
-} // namespace NYql::NDq
+} // namespace NYql::NDq \ No newline at end of file
diff --git a/ydb/library/yql/dq/type_ann/dq_type_ann.h b/ydb/library/yql/dq/type_ann/dq_type_ann.h
index c870ce7c7fc..afcc29eb88e 100644
--- a/ydb/library/yql/dq/type_ann/dq_type_ann.h
+++ b/ydb/library/yql/dq/type_ann/dq_type_ann.h
@@ -22,9 +22,9 @@ IGraphTransformer::TStatus AnnotateDqCrossJoin(const TExprNode::TPtr& input, TEx
IGraphTransformer::TStatus AnnotateDqSource(const TExprNode::TPtr& input, TExprContext& ctx);
IGraphTransformer::TStatus AnnotateDqSink(const TExprNode::TPtr& input, TExprContext& ctx);
IGraphTransformer::TStatus AnnotateDqQuery(const TExprNode::TPtr& input, TExprContext& ctx);
-IGraphTransformer::TStatus AnnotateTransformSettings(const TExprNode::TPtr& input, TExprContext& ctx);
IGraphTransformer::TStatus AnnotateDqPrecompute(const TExprNode::TPtr& node, TExprContext& ctx);
IGraphTransformer::TStatus AnnotateDqPhyPrecompute(const TExprNode::TPtr& node, TExprContext& ctx);
+IGraphTransformer::TStatus AnnotateDqTransform(const TExprNode::TPtr& input, TExprContext& ctx);
THolder<IGraphTransformer> CreateDqTypeAnnotationTransformer(NYql::TTypeAnnotationContext& typesCtx);
diff --git a/ydb/library/yql/providers/dq/planner/execution_planner.cpp b/ydb/library/yql/providers/dq/planner/execution_planner.cpp
index afbadeeda0c..9f52f3a2ea6 100644
--- a/ydb/library/yql/providers/dq/planner/execution_planner.cpp
+++ b/ydb/library/yql/providers/dq/planner/execution_planner.cpp
@@ -156,57 +156,56 @@ namespace NYql::NDqs {
}
// Sinks
- if (auto maybeDqSinksList = stage.Sinks()) {
- auto dqSinksList = maybeDqSinksList.Cast();
- for (const TDqSink& sink : dqSinksList) {
- const ui64 index = FromString(sink.Index().Value());
+ if (auto maybeDqOutputsList = stage.Outputs()) {
+ auto dqOutputsList = maybeDqOutputsList.Cast();
+ for (const auto& output : dqOutputsList) {
+ const ui64 index = FromString(output.Ptr()->Child(TDqOutputAnnotationBase::idx_Index)->Content());
auto& stageInfo = TasksGraph.GetStageInfo(stage);
YQL_ENSURE(index < stageInfo.OutputsCount);
- auto dataSinkName = sink.Ptr()->Child(TDqSink::idx_DataSink)->Child(0)->Content();
+ auto dataSinkName = output.Ptr()->Child(TDqOutputAnnotationBase::idx_DataSink)->Child(0)->Content();
auto datasink = TypeContext->DataSinkMap.FindPtr(dataSinkName);
YQL_ENSURE(datasink);
auto dqIntegration = (*datasink)->GetDqIntegration();
YQL_ENSURE(dqIntegration, "DqSink assumes that datasink has a dq integration impl");
- TTransform stageTransform;
+ NDq::TTransform outputTransform;
TString sinkType;
::google::protobuf::Any sinkSettings;
-
- auto transformSettings = sink.Settings().Maybe<TTransformSettings>();
- if (transformSettings) {
- auto settings = transformSettings.Cast();
-
- stageTransform.Type = settings.Type();
- const auto inputType = settings.InputType().Ref().GetTypeAnn()->Cast<TTypeExprType>()->GetType();
- stageTransform.InputType = NCommon::WriteTypeToYson(inputType);
- const auto outputType = settings.OutputType().Ref().GetTypeAnn()->Cast<TTypeExprType>()->GetType();
- stageTransform.OutputType = NCommon::WriteTypeToYson(outputType);
-
- dqIntegration->FillTransformSettings(sink.Ref(), stageTransform.Settings);
- } else {
+ if (output.Maybe<TDqSink>()) {
+ auto sink = output.Cast<TDqSink>();
dqIntegration->FillSinkSettings(sink.Ref(), sinkSettings, sinkType);
YQL_ENSURE(!sinkSettings.type_url().empty(), "Data sink provider \"" << dataSinkName << "\" did't fill dq sink settings for its dq sink node");
YQL_ENSURE(sinkType, "Data sink provider \"" << dataSinkName << "\" did't fill dq sink settings type for its dq sink node");
+ } else if (output.Maybe<NNodes::TDqTransform>()) {
+ auto transform = output.Cast<NNodes::TDqTransform>();
+ outputTransform.Type = transform.Type();
+ const auto inputType = transform.InputType().Ref().GetTypeAnn()->Cast<TTypeExprType>()->GetType();
+ outputTransform.InputType = NCommon::WriteTypeToYson(inputType);
+ const auto outputType = transform.OutputType().Ref().GetTypeAnn()->Cast<TTypeExprType>()->GetType();
+ outputTransform.OutputType = NCommon::WriteTypeToYson(outputType);
+ dqIntegration->FillTransformSettings(transform.Ref(), outputTransform.Settings);
+ } else {
+ YQL_ENSURE(false, "Unknown stage output type");
}
for (ui64 taskId : stageInfo.Tasks) {
auto& task = TasksGraph.GetTask(taskId);
YQL_ENSURE(index < task.Outputs.size());
- auto& output = task.Outputs[index];
- if (transformSettings) {
-
- output.Transform.ConstructInPlace();
-
- auto& transform = output.Transform;
- transform->Type = stageTransform.Type;
- transform->InputType = stageTransform.InputType;
- transform->OutputType = stageTransform.OutputType;
- //transform->Settings = stageTransform.Settings;
- } else {
- output.SinkType = sinkType;
- output.SinkSettings = sinkSettings;
- output.Type = NDq::TTaskOutputType::Sink;
+ auto& taskOutput = task.Outputs[index];
+
+ if (output.Maybe<TDqSink>()) {
+ taskOutput.SinkType = sinkType;
+ taskOutput.SinkSettings = sinkSettings;
+ taskOutput.Type = NDq::TTaskOutputType::Sink;
+ } else if (output.Maybe<NNodes::TDqTransform>()) {
+ taskOutput.Transform.ConstructInPlace();
+
+ auto& transform = taskOutput.Transform;
+ transform->Type = outputTransform.Type;
+ transform->InputType = outputTransform.InputType;
+ transform->OutputType = outputTransform.OutputType;
+ //transform->Settings = outputTransform.Settings;
}
}
}
diff --git a/ydb/library/yql/providers/dq/provider/yql_dq_datasink_type_ann.cpp b/ydb/library/yql/providers/dq/provider/yql_dq_datasink_type_ann.cpp
index 2763be33c2b..1c1af7bb077 100644
--- a/ydb/library/yql/providers/dq/provider/yql_dq_datasink_type_ann.cpp
+++ b/ydb/library/yql/providers/dq/provider/yql_dq_datasink_type_ann.cpp
@@ -34,9 +34,9 @@ public:
AddHandler({TDqSink::CallableName()}, Hndl(&NDq::AnnotateDqSink));
AddHandler({TDqWrite::CallableName()}, Hndl(&TDqsDataSinkTypeAnnotationTransformer::AnnotateDqWrite));
AddHandler({TDqQuery::CallableName()}, Hndl(&NDq::AnnotateDqQuery));
- AddHandler({TTransformSettings::CallableName()}, Hndl(&NDq::AnnotateTransformSettings));
AddHandler({TDqPrecompute::CallableName()}, Hndl(&NDq::AnnotateDqPrecompute));
AddHandler({TDqPhyPrecompute::CallableName()}, Hndl(&NDq::AnnotateDqPhyPrecompute));
+ AddHandler({TDqTransform::CallableName()}, Hndl(&NDq::AnnotateDqTransform));
}
private:
@@ -78,4 +78,4 @@ THolder<TVisitorTransformerBase> CreateDqsDataSinkTypeAnnotationTransformer(TTyp
return THolder(new TDqsDataSinkTypeAnnotationTransformer(typeCtx));
}
-} // NYql
+} // NYql \ No newline at end of file
diff --git a/ydb/library/yql/providers/function/expr_nodes/dq_function_expr_nodes.json b/ydb/library/yql/providers/function/expr_nodes/dq_function_expr_nodes.json
index c70a72fe51e..3bb385c1428 100644
--- a/ydb/library/yql/providers/function/expr_nodes/dq_function_expr_nodes.json
+++ b/ydb/library/yql/providers/function/expr_nodes/dq_function_expr_nodes.json
@@ -26,6 +26,14 @@
{"Index": 1, "Name": "Type", "Type": "TCoAtom"},
{"Index": 2, "Name": "Connection", "Type": "TCoAtom"}
]
+ },
+ {
+ "Name": "TFunctionTransformSettings",
+ "Base": "TCallable",
+ "Match": {"Type": "Callable", "Name": "FunctionTransformSettings"},
+ "Children": [
+ {"Index": 0, "Name": "InvokeUrl", "Type": "TCoAtom"}
+ ]
}
]
} \ No newline at end of file
diff --git a/ydb/library/yql/providers/function/provider/CMakeLists.txt b/ydb/library/yql/providers/function/provider/CMakeLists.txt
index ab4c8e6f4d8..62be94ae640 100644
--- a/ydb/library/yql/providers/function/provider/CMakeLists.txt
+++ b/ydb/library/yql/providers/function/provider/CMakeLists.txt
@@ -33,6 +33,7 @@ target_sources(providers-function-provider PRIVATE
${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/function/provider/dq_function_provider.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/function/provider/dq_function_datasource.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/function/provider/dq_function_datasink.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/function/provider/dq_function_type_ann.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/function/provider/dq_function_physical_optimize.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/function/provider/dq_function_dq_integration.cpp
)
diff --git a/ydb/library/yql/providers/function/provider/dq_function_datasink.cpp b/ydb/library/yql/providers/function/provider/dq_function_datasink.cpp
index 0e1335ddf97..c83408c96fc 100644
--- a/ydb/library/yql/providers/function/provider/dq_function_datasink.cpp
+++ b/ydb/library/yql/providers/function/provider/dq_function_datasink.cpp
@@ -21,6 +21,7 @@ public:
, LoadMetaDataTransformer(CreateDqFunctionMetaLoader(State))
, IntentDeterminationTransformer(CreateDqFunctionIntentTransformer(State))
, DqIntegration(CreateDqFunctionDqIntegration(State))
+ , TypeAnnotationTransformer(CreateDqFunctionTypeAnnotation(State))
{}
TStringBuf GetName() const override {
@@ -50,7 +51,8 @@ public:
}
bool CanParse(const TExprNode& node) override {
- return node.IsCallable(TDqSqlExternalFunction::CallableName());
+ return node.IsCallable(TDqSqlExternalFunction::CallableName())
+ || TypeAnnotationTransformer->CanParse(node);
}
IGraphTransformer& GetPhysicalOptProposalTransformer() override {
@@ -69,12 +71,18 @@ public:
return DqIntegration.Get();
}
+ IGraphTransformer& GetTypeAnnotationTransformer(bool instantOnly) override {
+ Y_UNUSED(instantOnly);
+ return *TypeAnnotationTransformer;
+ }
+
private:
const TDqFunctionState::TPtr State;
const THolder<IGraphTransformer> PhysicalOptTransformer;
const THolder<IGraphTransformer> LoadMetaDataTransformer;
const THolder<TVisitorTransformerBase> IntentDeterminationTransformer;
const THolder<IDqIntegration> DqIntegration;
+ const THolder<TVisitorTransformerBase> TypeAnnotationTransformer;
};
} // namespace
diff --git a/ydb/library/yql/providers/function/provider/dq_function_dq_integration.cpp b/ydb/library/yql/providers/function/provider/dq_function_dq_integration.cpp
index 56db4e968d0..95e0b2d6585 100644
--- a/ydb/library/yql/providers/function/provider/dq_function_dq_integration.cpp
+++ b/ydb/library/yql/providers/function/provider/dq_function_dq_integration.cpp
@@ -22,43 +22,22 @@ public:
}
void FillTransformSettings(const TExprNode& node, ::google::protobuf::Any& transformSettings) override {
- auto maybeDqSink = TMaybeNode<TDqSink>(&node);
- if (!maybeDqSink)
+ auto maybeDqTransform = TMaybeNode<TDqTransform>(&node);
+ if (!maybeDqTransform)
return;
- auto dqSink = maybeDqSink.Cast();
- auto maybeSettings = TMaybeNode<TTransformSettings>(dqSink.Settings().Raw());
+ auto maybeSettings = TMaybeNode<TFunctionTransformSettings>(maybeDqTransform.Cast().Settings().Raw());
if (!maybeSettings) {
return;
}
auto functionSettings = maybeSettings.Cast();
NYql::NProto::TFunctionTransform transform;
- for (size_t i = 0; i < functionSettings.Other().Size(); i++) {
- TCoNameValueTuple setting = functionSettings.Other().Item(i);
- const TStringBuf name = Name(setting);
- if (name == "invoke_url") {
- transform.SetInvokeUrl(TString(Value(setting)));
- }
- }
+ transform.SetInvokeUrl(TString{functionSettings.InvokeUrl().Value()});
transformSettings.PackFrom(transform);
}
- static TStringBuf Name(const TCoNameValueTuple& nameValue) {
- return nameValue.Name().Value();
- }
-
- static TStringBuf Value(const TCoNameValueTuple& nameValue) {
- if (TMaybeNode<TExprBase> maybeValue = nameValue.Value()) {
- const TExprNode& value = maybeValue.Cast().Ref();
- YQL_ENSURE(value.IsAtom());
- return value.Content();
- }
-
- return {};
- }
-
private:
TDqFunctionState::TPtr State;
};
diff --git a/ydb/library/yql/providers/function/provider/dq_function_physical_optimize.cpp b/ydb/library/yql/providers/function/provider/dq_function_physical_optimize.cpp
index 4bf88da9885..569a80f000c 100644
--- a/ydb/library/yql/providers/function/provider/dq_function_physical_optimize.cpp
+++ b/ydb/library/yql/providers/function/provider/dq_function_physical_optimize.cpp
@@ -77,7 +77,6 @@ public:
TExprNode::TPtr inputType;
TExprNode::TPtr outputType;
- TVector<TCoNameValueTuple> settings;
for (const auto &tuple: callable.Settings().Ref().Children()) {
const auto paramName = tuple->Head().Content();
if (paramName == "connection") {
@@ -87,11 +86,6 @@ public:
} else if (paramName == "output_type") {
outputType = tuple->TailPtr();
} else {
- auto setting = Build<TCoNameValueTuple>(ctx, node.Pos())
- .Name().Build(paramName)
- .Value(tuple->TailPtr())
- .Done();
- settings.push_back(setting);
}
}
@@ -102,16 +96,6 @@ public:
});
YQL_ENSURE(description != State->FunctionsDescription.end(), "External function meta doesn't found " << transformName);
- settings.push_back(
- Build<TCoNameValueTuple>(ctx, node.Pos())
- .Name().Build("invoke_url")
- .Value<TCoAtom>().Build(description->InvokeUrl)
- .Done()
- );
-
- auto settingsBuilder = Build<TCoNameValueTupleList>(ctx, node.Pos())
- .Add(settings)
- .Done();
auto stage = nodeInput.Output().Stage().Cast<TDqStage>();
auto dutyColumn = DqPushLambdaToStage(stage, nodeInput.Output().Index(), addShuffleColumn, {}, ctx, optCtx);
@@ -123,17 +107,17 @@ public:
.Connection().Build(connectionName)
.Done();
- auto sinkSettings = Build<TTransformSettings>(ctx, node.Pos())
- .Type<TCoAtom>().Build(transformType)
- .InputType(inputType)
- .OutputType(outputType)
- .Other(settingsBuilder)
+ auto settings = Build<TFunctionTransformSettings>(ctx, node.Pos())
+ .InvokeUrl<TCoAtom>().Build(description->InvokeUrl)
.Done();
- auto dqSink = Build<TDqSink>(ctx, node.Pos())
- .DataSink(transformSink)
- .Settings(sinkSettings)
+ auto dqTransform = Build<TDqTransform>(ctx, node.Pos())
.Index().Build("0")
+ .DataSink(transformSink)
+ .Type<TCoAtom>().Build(transformType)
+ .InputType(inputType)
+ .OutputType(outputType)
+ .Settings(settings)
.Done();
auto transformStage = Build<TDqStage>(ctx, node.Pos())
@@ -156,7 +140,7 @@ public:
.Build()
.Build()
.Settings().Build()
- .Sinks().Add(dqSink).Build()
+ .Outputs().Add(dqTransform).Build()
.Done();
auto externalStage = Build<TDqCnUnionAll>(ctx, node.Pos())
diff --git a/ydb/library/yql/providers/function/provider/dq_function_provider_impl.h b/ydb/library/yql/providers/function/provider/dq_function_provider_impl.h
index 90991e711db..0995a84bad7 100644
--- a/ydb/library/yql/providers/function/provider/dq_function_provider_impl.h
+++ b/ydb/library/yql/providers/function/provider/dq_function_provider_impl.h
@@ -12,5 +12,6 @@ THolder<TVisitorTransformerBase> CreateDqFunctionIntentTransformer(TDqFunctionSt
THolder<IGraphTransformer> CreateDqFunctionMetaLoader(TDqFunctionState::TPtr state);
THolder<IGraphTransformer> CreateDqFunctionPhysicalOptTransformer(TDqFunctionState::TPtr state);
THolder<IDqIntegration> CreateDqFunctionDqIntegration(TDqFunctionState::TPtr state);
+THolder<TVisitorTransformerBase> CreateDqFunctionTypeAnnotation(TDqFunctionState::TPtr state);
} \ No newline at end of file
diff --git a/ydb/library/yql/providers/function/provider/dq_function_type_ann.cpp b/ydb/library/yql/providers/function/provider/dq_function_type_ann.cpp
new file mode 100644
index 00000000000..8b27309c724
--- /dev/null
+++ b/ydb/library/yql/providers/function/provider/dq_function_type_ann.cpp
@@ -0,0 +1,35 @@
+#include "dq_function_provider_impl.h"
+
+#include <ydb/library/yql/providers/function/expr_nodes/dq_function_expr_nodes.h>
+
+namespace NYql::NDqFunction {
+namespace {
+
+using namespace NNodes;
+
+class TDqFunctionTypeAnnotationTransformer: public TVisitorTransformerBase {
+public:
+ TDqFunctionTypeAnnotationTransformer(TDqFunctionState::TPtr state)
+ : TVisitorTransformerBase(true)
+ , State(state)
+ {
+
+ using TSelf = TDqFunctionTypeAnnotationTransformer;
+ AddHandler({TFunctionTransformSettings::CallableName()}, Hndl(&TSelf::HandleSettings));
+ }
+
+ TStatus HandleSettings(const TExprNode::TPtr& input, TExprContext& ctx) {
+ input->SetTypeAnn(ctx.MakeType<TStreamExprType>(ctx.MakeType<TDataExprType>(EDataSlot::String)));
+ return TStatus::Ok;
+ }
+
+private:
+ TDqFunctionState::TPtr State;
+};
+} // namespace
+
+THolder<TVisitorTransformerBase> CreateDqFunctionTypeAnnotation(TDqFunctionState::TPtr state) {
+ return MakeHolder<TDqFunctionTypeAnnotationTransformer>(state);
+}
+
+} \ No newline at end of file
diff --git a/ydb/library/yql/providers/pq/provider/yql_pq_physical_optimize.cpp b/ydb/library/yql/providers/pq/provider/yql_pq_physical_optimize.cpp
index a9351eb654f..2f57e3999de 100644
--- a/ydb/library/yql/providers/pq/provider/yql_pq_physical_optimize.cpp
+++ b/ydb/library/yql/providers/pq/provider/yql_pq_physical_optimize.cpp
@@ -92,15 +92,15 @@ public:
TDqStage inputStage = dqUnion.Output().Stage().Cast<TDqStage>();
- auto sinksBuilder = Build<TDqSinksList>(ctx, topicNode.Pos());
- if (inputStage.Sinks()) {
- sinksBuilder.InitFrom(inputStage.Sinks().Cast());
+ auto outputsBuilder = Build<TDqStageOutputsList>(ctx, topicNode.Pos());
+ if (inputStage.Outputs()) {
+ outputsBuilder.InitFrom(inputStage.Outputs().Cast());
}
- sinksBuilder.Add(dqSink);
+ outputsBuilder.Add(dqSink);
auto dqStageWithSink = Build<TDqStage>(ctx, inputStage.Pos())
.InitFrom(inputStage)
- .Sinks(sinksBuilder.Done())
+ .Outputs(outputsBuilder.Done())
.Done();
auto dqQueryBuilder = Build<TDqQuery>(ctx, write.Pos());
diff --git a/ydb/library/yql/providers/solomon/provider/yql_solomon_physical_optimize.cpp b/ydb/library/yql/providers/solomon/provider/yql_solomon_physical_optimize.cpp
index 226205cce20..ece1e1c666a 100644
--- a/ydb/library/yql/providers/solomon/provider/yql_solomon_physical_optimize.cpp
+++ b/ydb/library/yql/providers/solomon/provider/yql_solomon_physical_optimize.cpp
@@ -98,15 +98,15 @@ public:
TDqStage inputStage = dqUnion.Output().Stage().Cast<TDqStage>();
- auto sinksBuilder = Build<TDqSinksList>(ctx, inputStage.Pos());
- if (inputStage.Sinks()) {
- sinksBuilder.InitFrom(inputStage.Sinks().Cast());
+ auto sinksBuilder = Build<TDqStageOutputsList>(ctx, inputStage.Pos());
+ if (inputStage.Outputs()) {
+ sinksBuilder.InitFrom(inputStage.Outputs().Cast());
}
sinksBuilder.Add(dqSink);
auto dqStageWithSink = Build<TDqStage>(ctx, inputStage.Pos())
.InitFrom(inputStage)
- .Sinks(sinksBuilder.Done())
+ .Outputs(sinksBuilder.Done())
.Done();
auto dqQueryBuilder = Build<TDqQuery>(ctx, write.Pos());
@@ -119,7 +119,7 @@ public:
}
private:
- TExprBase BuildSolomonShard(TCoAtom shardNode, TExprContext& ctx, TString solomonCluster) const {
+ TCallable BuildSolomonShard(TCoAtom shardNode, TExprContext& ctx, TString solomonCluster) const {
const auto* clusterDesc = State_->Configuration->ClusterConfigs.FindPtr(solomonCluster);
YQL_ENSURE(clusterDesc, "Unknown cluster " << solomonCluster);