diff options
author | hrustyashko <hrustyashko@yandex-team.ru> | 2022-04-29 13:43:23 +0300 |
---|---|---|
committer | hrustyashko <hrustyashko@yandex-team.ru> | 2022-04-29 13:43:23 +0300 |
commit | fd5635f15404f5918d4c5f144d68f3ff29c2710d (patch) | |
tree | ba87130969cd82e30c55cc1e552b6e9e8271f4c1 | |
parent | de8610d1f1d1b9af38c38abd4540416c51abfc00 (diff) | |
download | ydb-fd5635f15404f5918d4c5f144d68f3ff29c2710d.tar.gz |
new callable DqTransform; explicitly describe stage's outputs
ref:abc5e03bfc6c96549bc291fda72eced93387d11c
17 files changed, 195 insertions, 146 deletions
diff --git a/ydb/library/yql/dq/expr_nodes/dq_expr_nodes.h b/ydb/library/yql/dq/expr_nodes/dq_expr_nodes.h index 48cfcfeb79a..61094133195 100644 --- a/ydb/library/yql/dq/expr_nodes/dq_expr_nodes.h +++ b/ydb/library/yql/dq/expr_nodes/dq_expr_nodes.h @@ -38,6 +38,32 @@ public: } }; +class TDqOutputAnnotationBase : public NGenerated::TDqOutputAnnotationBaseStub<TExprBase, TCallable, TCoAtom> { +public: + explicit TDqOutputAnnotationBase(const TExprNode* node) + : TDqOutputAnnotationBaseStub(node) {} + + explicit TDqOutputAnnotationBase(const TExprNode::TPtr& node) + : TDqOutputAnnotationBaseStub(node) {} + + static bool Match(const TExprNode* node) { + if (!node) { + return false; + } + + if (!node->IsCallable()) { + return false; + } + + if (node->ChildrenSize() < 2) { + return false; + } + + return TCoAtom::Match(node->Child(0)) + && TCallable::Match(node->Child(1)); + } +}; + #include <ydb/library/yql/dq/expr_nodes/dq_expr_nodes.defs.inl.h> } // namespace NYql::NDq diff --git a/ydb/library/yql/dq/expr_nodes/dq_expr_nodes.json b/ydb/library/yql/dq/expr_nodes/dq_expr_nodes.json index 0041a497154..46d880b3a50 100644 --- a/ydb/library/yql/dq/expr_nodes/dq_expr_nodes.json +++ b/ydb/library/yql/dq/expr_nodes/dq_expr_nodes.json @@ -66,31 +66,39 @@ ] }, { - "Name": "TDqSink", + "Name": "TDqOutputAnnotationBase", "Base": "TCallable", - "Match": {"Type": "Callable", "Name": "DqSink"}, + "Definition": "Custom", + "Builder": {"Generate": "None"}, "Children": [ - {"Index": 0, "Name": "DataSink", "Type": "TCallable"}, - {"Index": 1, "Name": "Settings", "Type": "TExprBase"}, - {"Index": 2, "Name": "Index", "Type": "TCoAtom"} + {"Index": 0, "Name": "Index", "Type": "TCoAtom"}, + {"Index": 1, "Name": "DataSink", "Type": "TCallable"} ] }, { - "Name": "TDqSinksList", - "ListBase": "TDqSink" + "Name": "TDqTransform", + "Base": "TDqOutputAnnotationBase", + "Match": {"Type": "Callable", "Name": "DqTransform"}, + "Children": [ + {"Index": 2, "Name": "Type", "Type": "TCoAtom"}, + {"Index": 3, "Name": "InputType", "Type": "TExprBase"}, + {"Index": 4, "Name": "OutputType", "Type": "TExprBase"}, + {"Index": 5, "Name": "Settings", "Type": "TCallable"} + ] }, { - "Name": "TTransformSettings", - "Base": "TCallable", - "Match": {"Type": "Callable", "Name": "TransformSettings"}, + "Name": "TDqSink", + "Base": "TDqOutputAnnotationBase", + "Match": {"Type": "Callable", "Name": "DqSink"}, "Children": [ - {"Index": 0, "Name": "Type", "Type": "TCoAtom"}, - {"Index": 1, "Name": "InputType", "Type": "TExprBase"}, - {"Index": 2, "Name": "OutputType", "Type": "TExprBase"}, - {"Index": 3, "Name": "Other", "Type": "TCoNameValueTupleList"} + {"Index": 2, "Name": "Settings", "Type": "TCallable"} ] }, { + "Name": "TDqStageOutputsList", + "ListBase": "TDqOutputAnnotationBase" + }, + { "Name": "TDqStageBase", "Base": "TCallable", "Match": {"Type": "CallableBase"}, @@ -99,7 +107,7 @@ {"Index": 0, "Name": "Inputs", "Type": "TExprList"}, {"Index": 1, "Name": "Program", "Type": "TCoLambda"}, {"Index": 2, "Name": "Settings", "Type": "TCoNameValueTupleList"}, - {"Index": 3, "Name": "Sinks", "Type": "TDqSinksList", "Optional": true} + {"Index": 3, "Name": "Outputs", "Type": "TDqStageOutputsList", "Optional": true} ] }, { diff --git a/ydb/library/yql/dq/opt/dq_opt_build.cpp b/ydb/library/yql/dq/opt/dq_opt_build.cpp index 175b58cc208..600dd21b7d9 100644 --- a/ydb/library/yql/dq/opt/dq_opt_build.cpp +++ b/ydb/library/yql/dq/opt/dq_opt_build.cpp @@ -145,7 +145,7 @@ void MakeConsumerReplaces( .Body(ctx.ReplaceNodes(stageResult.Cast().Ptr(), argsMap)) .Build() .Settings(TDqStageSettings::New(dqStage).BuildNode(ctx, dqStage.Pos())) - .Sinks(dqStage.Sinks()) + .Outputs(dqStage.Outputs()) .Done(); for (ui32 i = 0; i < consumers.size(); ++i) { @@ -319,7 +319,7 @@ public: .Body(newBody) .Build() .Settings(TDqStageSettings::New(stage).BuildNode(ctx, stage.Pos())) - .Sinks(stage.Sinks()) + .Outputs(stage.Outputs()) .Done(); replaces.emplace(stage.Raw(), newStage.Ptr()); diff --git a/ydb/library/yql/dq/opt/dq_opt_phy_finalizing.cpp b/ydb/library/yql/dq/opt/dq_opt_phy_finalizing.cpp index 13f8016398e..a4204becfe9 100644 --- a/ydb/library/yql/dq/opt/dq_opt_phy_finalizing.cpp +++ b/ydb/library/yql/dq/opt/dq_opt_phy_finalizing.cpp @@ -392,8 +392,8 @@ bool GatherConsumersImpl(const TExprNode& node, TNodeMap<TNodeMultiSet>& consume return false; } - if (stage.Sinks()) { - if (!GatherConsumersImpl(stage.Sinks().Ref(), consumers, visited)) { + if (stage.Outputs()) { + if (!GatherConsumersImpl(stage.Outputs().Ref(), consumers, visited)) { return false; } } diff --git a/ydb/library/yql/dq/type_ann/dq_type_ann.cpp b/ydb/library/yql/dq/type_ann/dq_type_ann.cpp index cefe4c7fa3c..78db19b894d 100644 --- a/ydb/library/yql/dq/type_ann/dq_type_ann.cpp +++ b/ydb/library/yql/dq/type_ann/dq_type_ann.cpp @@ -163,51 +163,51 @@ TStatus AnnotateStage(const TExprNode::TPtr& stage, TExprContext& ctx) { } TVector<const TTypeAnnotationNode*> stageResultTypes; - if (TDqStageBase::idx_Sinks < stage->ChildrenSize()) { - YQL_ENSURE(stage->Child(TDqStageBase::idx_Sinks)->ChildrenSize() != 0, "Sink list exists but empty, stage: " << stage->Dump()); + if (TDqStageBase::idx_Outputs < stage->ChildrenSize()) { + YQL_ENSURE(stage->Child(TDqStageBase::idx_Outputs)->ChildrenSize() != 0, "Stage.Outputs list exists but empty, stage: " << stage->Dump()); auto outputsNumber = programResultTypesTuple.size(); - TVector<TExprNode::TPtr> transformSinks; - TVector<TExprNode::TPtr> pureSinks; - transformSinks.reserve(outputsNumber); - pureSinks.reserve(outputsNumber); - for (const auto& sink: stage->Child(TDqStageBase::idx_Sinks)->Children()) { - const ui64 index = FromString(sink->Child(TDqSink::idx_Index)->Content()); + TVector<TExprNode::TPtr> transforms; + TVector<TExprNode::TPtr> sinks; + transforms.reserve(outputsNumber); + sinks.reserve(outputsNumber); + for (const auto& output: stage->Child(TDqStageBase::idx_Outputs)->Children()) { + const ui64 index = FromString(output->Child(TDqOutputAnnotationBase::idx_Index)->Content()); if (index >= outputsNumber) { ctx.AddError(TIssue(ctx.GetPosition(stage->Pos()), TStringBuilder() << "Sink/Transform try to process un-existing lambda's output")); return TStatus::Error; } - auto transformSettings = sink->Child(TDqSink::idx_Settings)->IsCallable(TTransformSettings::CallableName()); - if (transformSettings) { - transformSinks.push_back(sink); + if (output->IsCallable(TDqSink::CallableName())) { + sinks.push_back(output); + } else if (output->IsCallable(TDqTransform::CallableName())) { + transforms.push_back(output); } else { - pureSinks.push_back(sink); + YQL_ENSURE(false, "Unknown stage output type " << output->Content()); } } - if (!transformSinks.empty() && !pureSinks.empty() - && transformSinks.size() != pureSinks.size()) { + if (!transforms.empty() && !sinks.empty() + && transforms.size() != sinks.size()) { ctx.AddError(TIssue(ctx.GetPosition(stage->Pos()), TStringBuilder() << "Not every transform has a corresponding sink")); return TStatus::Error; } - if (!pureSinks.empty()) { - for (auto sink : pureSinks) { + if (!sinks.empty()) { + for (auto sink : sinks) { sink->SetTypeAnn(resultType); } stageResultTypes.assign(programResultTypesTuple.begin(), programResultTypesTuple.end()); } else { - for (auto sink : transformSinks) { - auto* sinkType = sink->GetTypeAnn(); - if (sinkType->GetKind() != ETypeAnnotationKind::List - && sinkType->GetKind() != ETypeAnnotationKind::Void) { + for (auto transform : transforms) { + auto* type = transform->GetTypeAnn(); + if (type->GetKind() != ETypeAnnotationKind::List) { - ctx.AddError(TIssue(ctx.GetPosition(sink->Pos()), TStringBuilder() - << "Expected List or Void type, but got: " << *sinkType)); + ctx.AddError(TIssue(ctx.GetPosition(transform->Pos()), TStringBuilder() + << "Expected List type, but got: " << *type)); return TStatus::Error; } /* auto* itemType = sinkType->Cast<TListExprType>()->GetItemType(); @@ -216,7 +216,7 @@ TStatus AnnotateStage(const TExprNode::TPtr& stage, TExprContext& ctx) { << "Expected List<Struct<...>> type, but got: List<" << *itemType << ">")); return TStatus::Error; } */ - stageResultTypes.emplace_back(sinkType); + stageResultTypes.emplace_back(type); } } } else { @@ -810,18 +810,18 @@ TStatus AnnotateDqQuery(const TExprNode::TPtr& input, TExprContext& ctx) { return TStatus::Ok; } -TStatus AnnotateTransformSettings(const TExprNode::TPtr& input, TExprContext& ctx) { - if (!EnsureArgsCount(*input, 4U, ctx)) { +TStatus AnnotateDqTransform(const TExprNode::TPtr& input, TExprContext& ctx) { + if (!EnsureArgsCount(*input, 6U, ctx)) { return TStatus::Error; } - const TExprNode* outputArg = input->Child(TTransformSettings::idx_OutputType); + const TExprNode* outputArg = input->Child(TDqTransform::idx_OutputType); if (!EnsureTypeWithStructType(*outputArg, ctx)) { return IGraphTransformer::TStatus::Error; } const TTypeAnnotationNode* outputType = outputArg->GetTypeAnn()->Cast<TTypeExprType>()->GetType(); - const TExprNode* inputType = input->Child(TTransformSettings::idx_InputType); + const TExprNode* inputType = input->Child(TDqTransform::idx_InputType); if (!EnsureTypeWithStructType(*inputType, ctx)) { return IGraphTransformer::TStatus::Error; } @@ -933,8 +933,8 @@ THolder<IGraphTransformer> CreateDqTypeAnnotationTransformer(TTypeAnnotationCont return AnnotateDqSink(input, ctx); } - if (TTransformSettings::Match(input.Get())) { - return AnnotateTransformSettings (input, ctx); + if (TDqTransform::Match(input.Get())) { + return AnnotateDqTransform(input, ctx); } if (TDqQuery::Match(input.Get())) { @@ -1025,4 +1025,4 @@ TString PrintDqStageOnly(const TDqStageBase& stage, TExprContext& ctx) { return NCommon::ExprToPrettyString(ctx, *newStage); } -} // namespace NYql::NDq +} // namespace NYql::NDq
\ No newline at end of file diff --git a/ydb/library/yql/dq/type_ann/dq_type_ann.h b/ydb/library/yql/dq/type_ann/dq_type_ann.h index c870ce7c7fc..afcc29eb88e 100644 --- a/ydb/library/yql/dq/type_ann/dq_type_ann.h +++ b/ydb/library/yql/dq/type_ann/dq_type_ann.h @@ -22,9 +22,9 @@ IGraphTransformer::TStatus AnnotateDqCrossJoin(const TExprNode::TPtr& input, TEx IGraphTransformer::TStatus AnnotateDqSource(const TExprNode::TPtr& input, TExprContext& ctx); IGraphTransformer::TStatus AnnotateDqSink(const TExprNode::TPtr& input, TExprContext& ctx); IGraphTransformer::TStatus AnnotateDqQuery(const TExprNode::TPtr& input, TExprContext& ctx); -IGraphTransformer::TStatus AnnotateTransformSettings(const TExprNode::TPtr& input, TExprContext& ctx); IGraphTransformer::TStatus AnnotateDqPrecompute(const TExprNode::TPtr& node, TExprContext& ctx); IGraphTransformer::TStatus AnnotateDqPhyPrecompute(const TExprNode::TPtr& node, TExprContext& ctx); +IGraphTransformer::TStatus AnnotateDqTransform(const TExprNode::TPtr& input, TExprContext& ctx); THolder<IGraphTransformer> CreateDqTypeAnnotationTransformer(NYql::TTypeAnnotationContext& typesCtx); diff --git a/ydb/library/yql/providers/dq/planner/execution_planner.cpp b/ydb/library/yql/providers/dq/planner/execution_planner.cpp index afbadeeda0c..9f52f3a2ea6 100644 --- a/ydb/library/yql/providers/dq/planner/execution_planner.cpp +++ b/ydb/library/yql/providers/dq/planner/execution_planner.cpp @@ -156,57 +156,56 @@ namespace NYql::NDqs { } // Sinks - if (auto maybeDqSinksList = stage.Sinks()) { - auto dqSinksList = maybeDqSinksList.Cast(); - for (const TDqSink& sink : dqSinksList) { - const ui64 index = FromString(sink.Index().Value()); + if (auto maybeDqOutputsList = stage.Outputs()) { + auto dqOutputsList = maybeDqOutputsList.Cast(); + for (const auto& output : dqOutputsList) { + const ui64 index = FromString(output.Ptr()->Child(TDqOutputAnnotationBase::idx_Index)->Content()); auto& stageInfo = TasksGraph.GetStageInfo(stage); YQL_ENSURE(index < stageInfo.OutputsCount); - auto dataSinkName = sink.Ptr()->Child(TDqSink::idx_DataSink)->Child(0)->Content(); + auto dataSinkName = output.Ptr()->Child(TDqOutputAnnotationBase::idx_DataSink)->Child(0)->Content(); auto datasink = TypeContext->DataSinkMap.FindPtr(dataSinkName); YQL_ENSURE(datasink); auto dqIntegration = (*datasink)->GetDqIntegration(); YQL_ENSURE(dqIntegration, "DqSink assumes that datasink has a dq integration impl"); - TTransform stageTransform; + NDq::TTransform outputTransform; TString sinkType; ::google::protobuf::Any sinkSettings; - - auto transformSettings = sink.Settings().Maybe<TTransformSettings>(); - if (transformSettings) { - auto settings = transformSettings.Cast(); - - stageTransform.Type = settings.Type(); - const auto inputType = settings.InputType().Ref().GetTypeAnn()->Cast<TTypeExprType>()->GetType(); - stageTransform.InputType = NCommon::WriteTypeToYson(inputType); - const auto outputType = settings.OutputType().Ref().GetTypeAnn()->Cast<TTypeExprType>()->GetType(); - stageTransform.OutputType = NCommon::WriteTypeToYson(outputType); - - dqIntegration->FillTransformSettings(sink.Ref(), stageTransform.Settings); - } else { + if (output.Maybe<TDqSink>()) { + auto sink = output.Cast<TDqSink>(); dqIntegration->FillSinkSettings(sink.Ref(), sinkSettings, sinkType); YQL_ENSURE(!sinkSettings.type_url().empty(), "Data sink provider \"" << dataSinkName << "\" did't fill dq sink settings for its dq sink node"); YQL_ENSURE(sinkType, "Data sink provider \"" << dataSinkName << "\" did't fill dq sink settings type for its dq sink node"); + } else if (output.Maybe<NNodes::TDqTransform>()) { + auto transform = output.Cast<NNodes::TDqTransform>(); + outputTransform.Type = transform.Type(); + const auto inputType = transform.InputType().Ref().GetTypeAnn()->Cast<TTypeExprType>()->GetType(); + outputTransform.InputType = NCommon::WriteTypeToYson(inputType); + const auto outputType = transform.OutputType().Ref().GetTypeAnn()->Cast<TTypeExprType>()->GetType(); + outputTransform.OutputType = NCommon::WriteTypeToYson(outputType); + dqIntegration->FillTransformSettings(transform.Ref(), outputTransform.Settings); + } else { + YQL_ENSURE(false, "Unknown stage output type"); } for (ui64 taskId : stageInfo.Tasks) { auto& task = TasksGraph.GetTask(taskId); YQL_ENSURE(index < task.Outputs.size()); - auto& output = task.Outputs[index]; - if (transformSettings) { - - output.Transform.ConstructInPlace(); - - auto& transform = output.Transform; - transform->Type = stageTransform.Type; - transform->InputType = stageTransform.InputType; - transform->OutputType = stageTransform.OutputType; - //transform->Settings = stageTransform.Settings; - } else { - output.SinkType = sinkType; - output.SinkSettings = sinkSettings; - output.Type = NDq::TTaskOutputType::Sink; + auto& taskOutput = task.Outputs[index]; + + if (output.Maybe<TDqSink>()) { + taskOutput.SinkType = sinkType; + taskOutput.SinkSettings = sinkSettings; + taskOutput.Type = NDq::TTaskOutputType::Sink; + } else if (output.Maybe<NNodes::TDqTransform>()) { + taskOutput.Transform.ConstructInPlace(); + + auto& transform = taskOutput.Transform; + transform->Type = outputTransform.Type; + transform->InputType = outputTransform.InputType; + transform->OutputType = outputTransform.OutputType; + //transform->Settings = outputTransform.Settings; } } } diff --git a/ydb/library/yql/providers/dq/provider/yql_dq_datasink_type_ann.cpp b/ydb/library/yql/providers/dq/provider/yql_dq_datasink_type_ann.cpp index 2763be33c2b..1c1af7bb077 100644 --- a/ydb/library/yql/providers/dq/provider/yql_dq_datasink_type_ann.cpp +++ b/ydb/library/yql/providers/dq/provider/yql_dq_datasink_type_ann.cpp @@ -34,9 +34,9 @@ public: AddHandler({TDqSink::CallableName()}, Hndl(&NDq::AnnotateDqSink)); AddHandler({TDqWrite::CallableName()}, Hndl(&TDqsDataSinkTypeAnnotationTransformer::AnnotateDqWrite)); AddHandler({TDqQuery::CallableName()}, Hndl(&NDq::AnnotateDqQuery)); - AddHandler({TTransformSettings::CallableName()}, Hndl(&NDq::AnnotateTransformSettings)); AddHandler({TDqPrecompute::CallableName()}, Hndl(&NDq::AnnotateDqPrecompute)); AddHandler({TDqPhyPrecompute::CallableName()}, Hndl(&NDq::AnnotateDqPhyPrecompute)); + AddHandler({TDqTransform::CallableName()}, Hndl(&NDq::AnnotateDqTransform)); } private: @@ -78,4 +78,4 @@ THolder<TVisitorTransformerBase> CreateDqsDataSinkTypeAnnotationTransformer(TTyp return THolder(new TDqsDataSinkTypeAnnotationTransformer(typeCtx)); } -} // NYql +} // NYql
\ No newline at end of file diff --git a/ydb/library/yql/providers/function/expr_nodes/dq_function_expr_nodes.json b/ydb/library/yql/providers/function/expr_nodes/dq_function_expr_nodes.json index c70a72fe51e..3bb385c1428 100644 --- a/ydb/library/yql/providers/function/expr_nodes/dq_function_expr_nodes.json +++ b/ydb/library/yql/providers/function/expr_nodes/dq_function_expr_nodes.json @@ -26,6 +26,14 @@ {"Index": 1, "Name": "Type", "Type": "TCoAtom"}, {"Index": 2, "Name": "Connection", "Type": "TCoAtom"} ] + }, + { + "Name": "TFunctionTransformSettings", + "Base": "TCallable", + "Match": {"Type": "Callable", "Name": "FunctionTransformSettings"}, + "Children": [ + {"Index": 0, "Name": "InvokeUrl", "Type": "TCoAtom"} + ] } ] }
\ No newline at end of file diff --git a/ydb/library/yql/providers/function/provider/CMakeLists.txt b/ydb/library/yql/providers/function/provider/CMakeLists.txt index ab4c8e6f4d8..62be94ae640 100644 --- a/ydb/library/yql/providers/function/provider/CMakeLists.txt +++ b/ydb/library/yql/providers/function/provider/CMakeLists.txt @@ -33,6 +33,7 @@ target_sources(providers-function-provider PRIVATE ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/function/provider/dq_function_provider.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/function/provider/dq_function_datasource.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/function/provider/dq_function_datasink.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/function/provider/dq_function_type_ann.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/function/provider/dq_function_physical_optimize.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/function/provider/dq_function_dq_integration.cpp ) diff --git a/ydb/library/yql/providers/function/provider/dq_function_datasink.cpp b/ydb/library/yql/providers/function/provider/dq_function_datasink.cpp index 0e1335ddf97..c83408c96fc 100644 --- a/ydb/library/yql/providers/function/provider/dq_function_datasink.cpp +++ b/ydb/library/yql/providers/function/provider/dq_function_datasink.cpp @@ -21,6 +21,7 @@ public: , LoadMetaDataTransformer(CreateDqFunctionMetaLoader(State)) , IntentDeterminationTransformer(CreateDqFunctionIntentTransformer(State)) , DqIntegration(CreateDqFunctionDqIntegration(State)) + , TypeAnnotationTransformer(CreateDqFunctionTypeAnnotation(State)) {} TStringBuf GetName() const override { @@ -50,7 +51,8 @@ public: } bool CanParse(const TExprNode& node) override { - return node.IsCallable(TDqSqlExternalFunction::CallableName()); + return node.IsCallable(TDqSqlExternalFunction::CallableName()) + || TypeAnnotationTransformer->CanParse(node); } IGraphTransformer& GetPhysicalOptProposalTransformer() override { @@ -69,12 +71,18 @@ public: return DqIntegration.Get(); } + IGraphTransformer& GetTypeAnnotationTransformer(bool instantOnly) override { + Y_UNUSED(instantOnly); + return *TypeAnnotationTransformer; + } + private: const TDqFunctionState::TPtr State; const THolder<IGraphTransformer> PhysicalOptTransformer; const THolder<IGraphTransformer> LoadMetaDataTransformer; const THolder<TVisitorTransformerBase> IntentDeterminationTransformer; const THolder<IDqIntegration> DqIntegration; + const THolder<TVisitorTransformerBase> TypeAnnotationTransformer; }; } // namespace diff --git a/ydb/library/yql/providers/function/provider/dq_function_dq_integration.cpp b/ydb/library/yql/providers/function/provider/dq_function_dq_integration.cpp index 56db4e968d0..95e0b2d6585 100644 --- a/ydb/library/yql/providers/function/provider/dq_function_dq_integration.cpp +++ b/ydb/library/yql/providers/function/provider/dq_function_dq_integration.cpp @@ -22,43 +22,22 @@ public: } void FillTransformSettings(const TExprNode& node, ::google::protobuf::Any& transformSettings) override { - auto maybeDqSink = TMaybeNode<TDqSink>(&node); - if (!maybeDqSink) + auto maybeDqTransform = TMaybeNode<TDqTransform>(&node); + if (!maybeDqTransform) return; - auto dqSink = maybeDqSink.Cast(); - auto maybeSettings = TMaybeNode<TTransformSettings>(dqSink.Settings().Raw()); + auto maybeSettings = TMaybeNode<TFunctionTransformSettings>(maybeDqTransform.Cast().Settings().Raw()); if (!maybeSettings) { return; } auto functionSettings = maybeSettings.Cast(); NYql::NProto::TFunctionTransform transform; - for (size_t i = 0; i < functionSettings.Other().Size(); i++) { - TCoNameValueTuple setting = functionSettings.Other().Item(i); - const TStringBuf name = Name(setting); - if (name == "invoke_url") { - transform.SetInvokeUrl(TString(Value(setting))); - } - } + transform.SetInvokeUrl(TString{functionSettings.InvokeUrl().Value()}); transformSettings.PackFrom(transform); } - static TStringBuf Name(const TCoNameValueTuple& nameValue) { - return nameValue.Name().Value(); - } - - static TStringBuf Value(const TCoNameValueTuple& nameValue) { - if (TMaybeNode<TExprBase> maybeValue = nameValue.Value()) { - const TExprNode& value = maybeValue.Cast().Ref(); - YQL_ENSURE(value.IsAtom()); - return value.Content(); - } - - return {}; - } - private: TDqFunctionState::TPtr State; }; diff --git a/ydb/library/yql/providers/function/provider/dq_function_physical_optimize.cpp b/ydb/library/yql/providers/function/provider/dq_function_physical_optimize.cpp index 4bf88da9885..569a80f000c 100644 --- a/ydb/library/yql/providers/function/provider/dq_function_physical_optimize.cpp +++ b/ydb/library/yql/providers/function/provider/dq_function_physical_optimize.cpp @@ -77,7 +77,6 @@ public: TExprNode::TPtr inputType; TExprNode::TPtr outputType; - TVector<TCoNameValueTuple> settings; for (const auto &tuple: callable.Settings().Ref().Children()) { const auto paramName = tuple->Head().Content(); if (paramName == "connection") { @@ -87,11 +86,6 @@ public: } else if (paramName == "output_type") { outputType = tuple->TailPtr(); } else { - auto setting = Build<TCoNameValueTuple>(ctx, node.Pos()) - .Name().Build(paramName) - .Value(tuple->TailPtr()) - .Done(); - settings.push_back(setting); } } @@ -102,16 +96,6 @@ public: }); YQL_ENSURE(description != State->FunctionsDescription.end(), "External function meta doesn't found " << transformName); - settings.push_back( - Build<TCoNameValueTuple>(ctx, node.Pos()) - .Name().Build("invoke_url") - .Value<TCoAtom>().Build(description->InvokeUrl) - .Done() - ); - - auto settingsBuilder = Build<TCoNameValueTupleList>(ctx, node.Pos()) - .Add(settings) - .Done(); auto stage = nodeInput.Output().Stage().Cast<TDqStage>(); auto dutyColumn = DqPushLambdaToStage(stage, nodeInput.Output().Index(), addShuffleColumn, {}, ctx, optCtx); @@ -123,17 +107,17 @@ public: .Connection().Build(connectionName) .Done(); - auto sinkSettings = Build<TTransformSettings>(ctx, node.Pos()) - .Type<TCoAtom>().Build(transformType) - .InputType(inputType) - .OutputType(outputType) - .Other(settingsBuilder) + auto settings = Build<TFunctionTransformSettings>(ctx, node.Pos()) + .InvokeUrl<TCoAtom>().Build(description->InvokeUrl) .Done(); - auto dqSink = Build<TDqSink>(ctx, node.Pos()) - .DataSink(transformSink) - .Settings(sinkSettings) + auto dqTransform = Build<TDqTransform>(ctx, node.Pos()) .Index().Build("0") + .DataSink(transformSink) + .Type<TCoAtom>().Build(transformType) + .InputType(inputType) + .OutputType(outputType) + .Settings(settings) .Done(); auto transformStage = Build<TDqStage>(ctx, node.Pos()) @@ -156,7 +140,7 @@ public: .Build() .Build() .Settings().Build() - .Sinks().Add(dqSink).Build() + .Outputs().Add(dqTransform).Build() .Done(); auto externalStage = Build<TDqCnUnionAll>(ctx, node.Pos()) diff --git a/ydb/library/yql/providers/function/provider/dq_function_provider_impl.h b/ydb/library/yql/providers/function/provider/dq_function_provider_impl.h index 90991e711db..0995a84bad7 100644 --- a/ydb/library/yql/providers/function/provider/dq_function_provider_impl.h +++ b/ydb/library/yql/providers/function/provider/dq_function_provider_impl.h @@ -12,5 +12,6 @@ THolder<TVisitorTransformerBase> CreateDqFunctionIntentTransformer(TDqFunctionSt THolder<IGraphTransformer> CreateDqFunctionMetaLoader(TDqFunctionState::TPtr state); THolder<IGraphTransformer> CreateDqFunctionPhysicalOptTransformer(TDqFunctionState::TPtr state); THolder<IDqIntegration> CreateDqFunctionDqIntegration(TDqFunctionState::TPtr state); +THolder<TVisitorTransformerBase> CreateDqFunctionTypeAnnotation(TDqFunctionState::TPtr state); }
\ No newline at end of file diff --git a/ydb/library/yql/providers/function/provider/dq_function_type_ann.cpp b/ydb/library/yql/providers/function/provider/dq_function_type_ann.cpp new file mode 100644 index 00000000000..8b27309c724 --- /dev/null +++ b/ydb/library/yql/providers/function/provider/dq_function_type_ann.cpp @@ -0,0 +1,35 @@ +#include "dq_function_provider_impl.h" + +#include <ydb/library/yql/providers/function/expr_nodes/dq_function_expr_nodes.h> + +namespace NYql::NDqFunction { +namespace { + +using namespace NNodes; + +class TDqFunctionTypeAnnotationTransformer: public TVisitorTransformerBase { +public: + TDqFunctionTypeAnnotationTransformer(TDqFunctionState::TPtr state) + : TVisitorTransformerBase(true) + , State(state) + { + + using TSelf = TDqFunctionTypeAnnotationTransformer; + AddHandler({TFunctionTransformSettings::CallableName()}, Hndl(&TSelf::HandleSettings)); + } + + TStatus HandleSettings(const TExprNode::TPtr& input, TExprContext& ctx) { + input->SetTypeAnn(ctx.MakeType<TStreamExprType>(ctx.MakeType<TDataExprType>(EDataSlot::String))); + return TStatus::Ok; + } + +private: + TDqFunctionState::TPtr State; +}; +} // namespace + +THolder<TVisitorTransformerBase> CreateDqFunctionTypeAnnotation(TDqFunctionState::TPtr state) { + return MakeHolder<TDqFunctionTypeAnnotationTransformer>(state); +} + +}
\ No newline at end of file diff --git a/ydb/library/yql/providers/pq/provider/yql_pq_physical_optimize.cpp b/ydb/library/yql/providers/pq/provider/yql_pq_physical_optimize.cpp index a9351eb654f..2f57e3999de 100644 --- a/ydb/library/yql/providers/pq/provider/yql_pq_physical_optimize.cpp +++ b/ydb/library/yql/providers/pq/provider/yql_pq_physical_optimize.cpp @@ -92,15 +92,15 @@ public: TDqStage inputStage = dqUnion.Output().Stage().Cast<TDqStage>(); - auto sinksBuilder = Build<TDqSinksList>(ctx, topicNode.Pos()); - if (inputStage.Sinks()) { - sinksBuilder.InitFrom(inputStage.Sinks().Cast()); + auto outputsBuilder = Build<TDqStageOutputsList>(ctx, topicNode.Pos()); + if (inputStage.Outputs()) { + outputsBuilder.InitFrom(inputStage.Outputs().Cast()); } - sinksBuilder.Add(dqSink); + outputsBuilder.Add(dqSink); auto dqStageWithSink = Build<TDqStage>(ctx, inputStage.Pos()) .InitFrom(inputStage) - .Sinks(sinksBuilder.Done()) + .Outputs(outputsBuilder.Done()) .Done(); auto dqQueryBuilder = Build<TDqQuery>(ctx, write.Pos()); diff --git a/ydb/library/yql/providers/solomon/provider/yql_solomon_physical_optimize.cpp b/ydb/library/yql/providers/solomon/provider/yql_solomon_physical_optimize.cpp index 226205cce20..ece1e1c666a 100644 --- a/ydb/library/yql/providers/solomon/provider/yql_solomon_physical_optimize.cpp +++ b/ydb/library/yql/providers/solomon/provider/yql_solomon_physical_optimize.cpp @@ -98,15 +98,15 @@ public: TDqStage inputStage = dqUnion.Output().Stage().Cast<TDqStage>(); - auto sinksBuilder = Build<TDqSinksList>(ctx, inputStage.Pos()); - if (inputStage.Sinks()) { - sinksBuilder.InitFrom(inputStage.Sinks().Cast()); + auto sinksBuilder = Build<TDqStageOutputsList>(ctx, inputStage.Pos()); + if (inputStage.Outputs()) { + sinksBuilder.InitFrom(inputStage.Outputs().Cast()); } sinksBuilder.Add(dqSink); auto dqStageWithSink = Build<TDqStage>(ctx, inputStage.Pos()) .InitFrom(inputStage) - .Sinks(sinksBuilder.Done()) + .Outputs(sinksBuilder.Done()) .Done(); auto dqQueryBuilder = Build<TDqQuery>(ctx, write.Pos()); @@ -119,7 +119,7 @@ public: } private: - TExprBase BuildSolomonShard(TCoAtom shardNode, TExprContext& ctx, TString solomonCluster) const { + TCallable BuildSolomonShard(TCoAtom shardNode, TExprContext& ctx, TString solomonCluster) const { const auto* clusterDesc = State_->Configuration->ClusterConfigs.FindPtr(solomonCluster); YQL_ENSURE(clusterDesc, "Unknown cluster " << solomonCluster); |