diff options
author | hrustyashko <hrustyashko@yandex-team.ru> | 2022-02-10 16:52:16 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:52:16 +0300 |
commit | c087c7f1c33a8f94d2b087f70808e23be476ee4b (patch) | |
tree | ab7fbbf3253d4c0e2793218f09378908beb025fb | |
parent | 12b8470d69df1313abe9f4efe57a81d8cfdc538c (diff) | |
download | ydb-c087c7f1c33a8f94d2b087f70808e23be476ee4b.tar.gz |
Restoring authorship annotation for <hrustyashko@yandex-team.ru>. Commit 2 of 2.
39 files changed, 1606 insertions, 1606 deletions
diff --git a/ydb/library/yql/core/type_ann/type_ann_core.cpp b/ydb/library/yql/core/type_ann/type_ann_core.cpp index e85af8f015d..5846e6cb108 100644 --- a/ydb/library/yql/core/type_ann/type_ann_core.cpp +++ b/ydb/library/yql/core/type_ann/type_ann_core.cpp @@ -8448,122 +8448,122 @@ template <NKikimr::NUdf::EDataSlot DataSlot> return IGraphTransformer::TStatus::Repeat; } - // 0 - function kind - // 1 - function name - // 2 - list of pair, settings ("key", value) - IGraphTransformer::TStatus SqlExternalFunctionWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx) { - Y_UNUSED(output); - if (!EnsureArgsCount(*input, 3, ctx.Expr)) { - return IGraphTransformer::TStatus::Error; - } - - if (!EnsureStringOrUtf8Type(*input->Child(0), ctx.Expr)) { - return IGraphTransformer::TStatus::Error; - } - - if (!EnsureStringOrUtf8Type(*input->Child(1), ctx.Expr)) { - return IGraphTransformer::TStatus::Error; - } - - if (!EnsureTupleMinSize(*input->Child(2), 1, ctx.Expr)) { - return IGraphTransformer::TStatus::Error; - } - - const TTypeAnnotationNode* outputType = nullptr; - const TTypeAnnotationNode* inputType = nullptr; - TSet<TString> usedParams; - for (const auto &tuple: input->Child(2)->Children()) { - if (!EnsureTupleSize(*tuple, 2, ctx.Expr)) { - return IGraphTransformer::TStatus::Error; - } - if (!EnsureAtom(tuple->Head(), ctx.Expr)) { - return IGraphTransformer::TStatus::Error; - } - - auto paramName = ToString(tuple->Head().Content()); - if (!usedParams.insert(paramName).second) { - ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(tuple->Pos()), - TStringBuilder() << "WITH " << to_upper(paramName).Quote() - << " clause should be specified only once")); - return IGraphTransformer::TStatus::Error; - } else if (paramName == "input_type" || paramName == "output_type") { - if (!EnsureTypeWithStructType(*tuple->Child(1), ctx.Expr)) { - return IGraphTransformer::TStatus::Error; - } - if (paramName == "output_type") { - outputType = tuple->Child(1)->GetTypeAnn()->Cast<TTypeExprType>()->GetType(); - } else if (paramName == "input_type") { - inputType = tuple->Child(1)->GetTypeAnn()->Cast<TTypeExprType>()->GetType(); - } - } else if (paramName == "concurrency" || paramName == "batch_size") { - if (!EnsureSpecificDataType(*tuple->Child(1), EDataSlot::Int32, ctx.Expr)) { - return IGraphTransformer::TStatus::Error; - } - /* - ui64 number = 0; - if (!TryFromString(tuple->Child(1)->Content(), number)) { - ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(tuple->Pos()), - TStringBuilder() << "Failed to convert to integer: " << tuple->Child(1)->Content())); - return IGraphTransformer::TStatus::Error; - }*/ - } else if (paramName == "optimize_for") { - if (!EnsureStringOrUtf8Type(*tuple->Child(1), ctx.Expr)) { - return IGraphTransformer::TStatus::Error; - } - /* - if (const auto optimize = tuple->Child(1)->Content(); optimize != "call" && optimize != "latency") { - ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(tuple->Child(1)->Pos()), TStringBuilder() << - "Unknown OPTIMIZE_FOR value, expected call or latency, but got: " << optimize)); - return IGraphTransformer::TStatus::Error; - }*/ - } else if (paramName == "connection") { - // FindCredential - if (!EnsureStringOrUtf8Type(*tuple->Child(1), ctx.Expr)) { - return IGraphTransformer::TStatus::Error; - } - } else if (paramName == "init") { - if (!EnsureComputable(*tuple->Child(1), ctx.Expr)) { - return IGraphTransformer::TStatus::Error; - } - } else { - ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(tuple->Pos()), TStringBuilder() << - "Unknown param name: " << paramName.Quote())); - return IGraphTransformer::TStatus::Error; - } - } - - if (inputType == nullptr && outputType == nullptr) { - ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(input->Child(2)->Pos()), TStringBuilder() << - "EXTERNAL FUNCTION should have INPUT_TYPE/OUTPUT_TYPE parameter")); - return IGraphTransformer::TStatus::Error; - } - - const TTypeAnnotationNode* nodeType; - if (outputType != nullptr && inputType != nullptr) { - // as transformation - TCallableExprType::TArgumentInfo inputArgument; - inputArgument.Flags = NKikimr::NUdf::ICallablePayload::TArgumentFlags::AutoMap; - inputArgument.Name = "input"; - inputArgument.Type = ctx.Expr.MakeType<TListExprType>(inputType); - TVector<TCallableExprType::TArgumentInfo> args(1, inputArgument); - nodeType = ctx.Expr.MakeType<TCallableExprType>( - ctx.Expr.MakeType<TListExprType>(outputType), - args, 0, TStringBuf("")); - } else if (outputType != nullptr) { - // as source - TVector<TCallableExprType::TArgumentInfo> args; - nodeType = ctx.Expr.MakeType<TCallableExprType>( - ctx.Expr.MakeType<TListExprType>(outputType), - args, 0, TStringBuf("")); - } else { - // as writer - nodeType = ctx.Expr.MakeType<TListExprType>(inputType); - } - - input->SetTypeAnn(nodeType); - return IGraphTransformer::TStatus::Ok; - } - + // 0 - function kind + // 1 - function name + // 2 - list of pair, settings ("key", value) + IGraphTransformer::TStatus SqlExternalFunctionWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx) { + Y_UNUSED(output); + if (!EnsureArgsCount(*input, 3, ctx.Expr)) { + return IGraphTransformer::TStatus::Error; + } + + if (!EnsureStringOrUtf8Type(*input->Child(0), ctx.Expr)) { + return IGraphTransformer::TStatus::Error; + } + + if (!EnsureStringOrUtf8Type(*input->Child(1), ctx.Expr)) { + return IGraphTransformer::TStatus::Error; + } + + if (!EnsureTupleMinSize(*input->Child(2), 1, ctx.Expr)) { + return IGraphTransformer::TStatus::Error; + } + + const TTypeAnnotationNode* outputType = nullptr; + const TTypeAnnotationNode* inputType = nullptr; + TSet<TString> usedParams; + for (const auto &tuple: input->Child(2)->Children()) { + if (!EnsureTupleSize(*tuple, 2, ctx.Expr)) { + return IGraphTransformer::TStatus::Error; + } + if (!EnsureAtom(tuple->Head(), ctx.Expr)) { + return IGraphTransformer::TStatus::Error; + } + + auto paramName = ToString(tuple->Head().Content()); + if (!usedParams.insert(paramName).second) { + ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(tuple->Pos()), + TStringBuilder() << "WITH " << to_upper(paramName).Quote() + << " clause should be specified only once")); + return IGraphTransformer::TStatus::Error; + } else if (paramName == "input_type" || paramName == "output_type") { + if (!EnsureTypeWithStructType(*tuple->Child(1), ctx.Expr)) { + return IGraphTransformer::TStatus::Error; + } + if (paramName == "output_type") { + outputType = tuple->Child(1)->GetTypeAnn()->Cast<TTypeExprType>()->GetType(); + } else if (paramName == "input_type") { + inputType = tuple->Child(1)->GetTypeAnn()->Cast<TTypeExprType>()->GetType(); + } + } else if (paramName == "concurrency" || paramName == "batch_size") { + if (!EnsureSpecificDataType(*tuple->Child(1), EDataSlot::Int32, ctx.Expr)) { + return IGraphTransformer::TStatus::Error; + } + /* + ui64 number = 0; + if (!TryFromString(tuple->Child(1)->Content(), number)) { + ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(tuple->Pos()), + TStringBuilder() << "Failed to convert to integer: " << tuple->Child(1)->Content())); + return IGraphTransformer::TStatus::Error; + }*/ + } else if (paramName == "optimize_for") { + if (!EnsureStringOrUtf8Type(*tuple->Child(1), ctx.Expr)) { + return IGraphTransformer::TStatus::Error; + } + /* + if (const auto optimize = tuple->Child(1)->Content(); optimize != "call" && optimize != "latency") { + ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(tuple->Child(1)->Pos()), TStringBuilder() << + "Unknown OPTIMIZE_FOR value, expected call or latency, but got: " << optimize)); + return IGraphTransformer::TStatus::Error; + }*/ + } else if (paramName == "connection") { + // FindCredential + if (!EnsureStringOrUtf8Type(*tuple->Child(1), ctx.Expr)) { + return IGraphTransformer::TStatus::Error; + } + } else if (paramName == "init") { + if (!EnsureComputable(*tuple->Child(1), ctx.Expr)) { + return IGraphTransformer::TStatus::Error; + } + } else { + ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(tuple->Pos()), TStringBuilder() << + "Unknown param name: " << paramName.Quote())); + return IGraphTransformer::TStatus::Error; + } + } + + if (inputType == nullptr && outputType == nullptr) { + ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(input->Child(2)->Pos()), TStringBuilder() << + "EXTERNAL FUNCTION should have INPUT_TYPE/OUTPUT_TYPE parameter")); + return IGraphTransformer::TStatus::Error; + } + + const TTypeAnnotationNode* nodeType; + if (outputType != nullptr && inputType != nullptr) { + // as transformation + TCallableExprType::TArgumentInfo inputArgument; + inputArgument.Flags = NKikimr::NUdf::ICallablePayload::TArgumentFlags::AutoMap; + inputArgument.Name = "input"; + inputArgument.Type = ctx.Expr.MakeType<TListExprType>(inputType); + TVector<TCallableExprType::TArgumentInfo> args(1, inputArgument); + nodeType = ctx.Expr.MakeType<TCallableExprType>( + ctx.Expr.MakeType<TListExprType>(outputType), + args, 0, TStringBuf("")); + } else if (outputType != nullptr) { + // as source + TVector<TCallableExprType::TArgumentInfo> args; + nodeType = ctx.Expr.MakeType<TCallableExprType>( + ctx.Expr.MakeType<TListExprType>(outputType), + args, 0, TStringBuf("")); + } else { + // as writer + nodeType = ctx.Expr.MakeType<TListExprType>(inputType); + } + + input->SetTypeAnn(nodeType); + return IGraphTransformer::TStatus::Ok; + } + IGraphTransformer::TStatus SqlExtractKeyWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx) { if (!EnsureArgsCount(*input, 2, ctx.Expr)) { return IGraphTransformer::TStatus::Error; @@ -13054,7 +13054,7 @@ template <NKikimr::NUdf::EDataSlot DataSlot> Functions["SqlAccess"] = &SqlAccessWrapper; Functions["SqlProcess"] = &SqlProcessWrapper; Functions["SqlReduce"] = &SqlReduceWrapper; - Functions["SqlExternalFunction"] = &SqlExternalFunctionWrapper; + Functions["SqlExternalFunction"] = &SqlExternalFunctionWrapper; Functions["SqlExtractKey"] = &SqlExtractKeyWrapper; Functions["SqlReduceUdf"] = &SqlReduceUdfWrapper; Functions["SqlProject"] = &SqlProjectWrapper; diff --git a/ydb/library/yql/core/yql_expr_type_annotation.cpp b/ydb/library/yql/core/yql_expr_type_annotation.cpp index f8bcc3b43fa..f2b793af8d3 100644 --- a/ydb/library/yql/core/yql_expr_type_annotation.cpp +++ b/ydb/library/yql/core/yql_expr_type_annotation.cpp @@ -2245,18 +2245,18 @@ bool EnsureStructType(TPositionHandle position, const TTypeAnnotationNode& type, return true; } -bool EnsureTypeWithStructType(const TExprNode& node, TExprContext& ctx) { - if (!EnsureType(node, ctx)) { - return false; - } - auto nodeType = node.GetTypeAnn()->Cast<TTypeExprType>()->GetType(); +bool EnsureTypeWithStructType(const TExprNode& node, TExprContext& ctx) { + if (!EnsureType(node, ctx)) { + return false; + } + auto nodeType = node.GetTypeAnn()->Cast<TTypeExprType>()->GetType(); YQL_ENSURE(nodeType); if (!EnsureStructType(node.Pos(), *nodeType, ctx)) { - return false; - } - return true; -} - + return false; + } + return true; +} + bool EnsureComposable(const TExprNode& node, TExprContext& ctx) { if (!node.IsComposable()) { ctx.AddError(TIssue(ctx.GetPosition(node.Pos()), "Composable required. World, datasink, datasource and lambda are not composable")); diff --git a/ydb/library/yql/core/yql_expr_type_annotation.h b/ydb/library/yql/core/yql_expr_type_annotation.h index 96395c5785e..a865aa27cd9 100644 --- a/ydb/library/yql/core/yql_expr_type_annotation.h +++ b/ydb/library/yql/core/yql_expr_type_annotation.h @@ -88,7 +88,7 @@ bool EnsureStringOrUtf8Type(const TExprNode& node, TExprContext& ctx); bool EnsureStringOrUtf8Type(TPositionHandle position, const TTypeAnnotationNode& type, TExprContext& ctx); bool EnsureStructType(const TExprNode& node, TExprContext& ctx); bool EnsureStructType(TPositionHandle position, const TTypeAnnotationNode& type, TExprContext& ctx); -bool EnsureTypeWithStructType(const TExprNode& node, TExprContext& ctx); +bool EnsureTypeWithStructType(const TExprNode& node, TExprContext& ctx); bool EnsureComposable(const TExprNode& node, TExprContext& ctx); bool EnsureComposableType(const TExprNode& node, TExprContext& ctx); bool EnsureWorldType(const TExprNode& node, TExprContext& ctx); diff --git a/ydb/library/yql/dq/expr_nodes/dq_expr_nodes.json b/ydb/library/yql/dq/expr_nodes/dq_expr_nodes.json index 04ee85ba1b8..67627b1b8bd 100644 --- a/ydb/library/yql/dq/expr_nodes/dq_expr_nodes.json +++ b/ydb/library/yql/dq/expr_nodes/dq_expr_nodes.json @@ -216,16 +216,16 @@ "Children": [ {"Index": 0, "Name": "Connection", "Type": "TDqConnection"} ] - }, - { - "Name": "TDqSqlExternalFunction", - "Base": "TCallable", - "Match": {"Type": "Callable", "Name": "SqlExternalFunction"}, - "Children": [ - {"Index": 0, "Name": "TransformType", "Type": "TExprBase"}, - {"Index": 1, "Name": "TransformName", "Type": "TExprBase"}, - {"Index": 2, "Name": "Settings", "Type": "TExprBase"} - ] + }, + { + "Name": "TDqSqlExternalFunction", + "Base": "TCallable", + "Match": {"Type": "Callable", "Name": "SqlExternalFunction"}, + "Children": [ + {"Index": 0, "Name": "TransformType", "Type": "TExprBase"}, + {"Index": 1, "Name": "TransformName", "Type": "TExprBase"}, + {"Index": 2, "Name": "Settings", "Type": "TExprBase"} + ] } ] } diff --git a/ydb/library/yql/dq/opt/dq_opt.cpp b/ydb/library/yql/dq/opt/dq_opt.cpp index bd0925e7d38..f60f29dd460 100644 --- a/ydb/library/yql/dq/opt/dq_opt.cpp +++ b/ydb/library/yql/dq/opt/dq_opt.cpp @@ -21,20 +21,20 @@ TDqStageSettings TDqStageSettings::Parse(const TDqStageBase& node) { settings.LogicalId = FromString<ui64>(tuple.Value().Cast<TCoAtom>().Value()); } else if (name == SinglePartitionSettingName) { settings.SinglePartition = true; - } else if (name == IsExternalSetting) { - settings.IsExternalFunction = true; - } else if (name == TransformNameSetting) { - YQL_ENSURE(tuple.Value().Maybe<TCoAtom>()); - settings.TransformName = tuple.Value().Cast<TCoAtom>().Value(); - } else if (name == TransformTypeSetting) { - YQL_ENSURE(tuple.Value().Maybe<TCoAtom>()); - if (const auto type = tuple.Value().Cast<TCoAtom>().Value(); type == "YANDEX-CLOUD") { - settings.TransformType = NDqProto::TRANSFORM_YANDEX_CLOUD; - } else { - YQL_ENSURE(false, "Unknown transform type: " << type); - } - } else if (name == TransformConcurrencySetting) { - settings.TransformConcurrency = FromString<ui32>(tuple.Value().Cast<TCoAtom>().Value()); + } else if (name == IsExternalSetting) { + settings.IsExternalFunction = true; + } else if (name == TransformNameSetting) { + YQL_ENSURE(tuple.Value().Maybe<TCoAtom>()); + settings.TransformName = tuple.Value().Cast<TCoAtom>().Value(); + } else if (name == TransformTypeSetting) { + YQL_ENSURE(tuple.Value().Maybe<TCoAtom>()); + if (const auto type = tuple.Value().Cast<TCoAtom>().Value(); type == "YANDEX-CLOUD") { + settings.TransformType = NDqProto::TRANSFORM_YANDEX_CLOUD; + } else { + YQL_ENSURE(false, "Unknown transform type: " << type); + } + } else if (name == TransformConcurrencySetting) { + settings.TransformConcurrency = FromString<ui32>(tuple.Value().Cast<TCoAtom>().Value()); } } @@ -81,10 +81,10 @@ NNodes::TCoNameValueTupleList TDqStageSettings::BuildNode(TExprContext& ctx, TPo .Done(); } -ui32 TDqStageSettings::MaxTransformConcurrency() const { - return TransformConcurrency > 0 ? TransformConcurrency : TDqSettings::TDefault::CloudFunctionConcurrency; -} - +ui32 TDqStageSettings::MaxTransformConcurrency() const { + return TransformConcurrency > 0 ? TransformConcurrency : TDqSettings::TDefault::CloudFunctionConcurrency; +} + TCoAtom BuildAtom(TStringBuf value, TPositionHandle pos, TExprContext& ctx) { return Build<TCoAtom>(ctx, pos) .Value(value) diff --git a/ydb/library/yql/dq/opt/dq_opt.h b/ydb/library/yql/dq/opt/dq_opt.h index f06182bda94..5b3146d0f42 100644 --- a/ydb/library/yql/dq/opt/dq_opt.h +++ b/ydb/library/yql/dq/opt/dq_opt.h @@ -12,20 +12,20 @@ struct TDqStageSettings { static constexpr TStringBuf LogicalIdSettingName = "_logical_id"; static constexpr TStringBuf IdSettingName = "_id"; static constexpr TStringBuf SinglePartitionSettingName = "_single_partition"; - static constexpr TStringBuf IsExternalSetting = "is_external_function"; - static constexpr TStringBuf TransformNameSetting = "transform_name"; - static constexpr TStringBuf TransformTypeSetting = "transform_type"; - static constexpr TStringBuf TransformConcurrencySetting = "concurrency"; - + static constexpr TStringBuf IsExternalSetting = "is_external_function"; + static constexpr TStringBuf TransformNameSetting = "transform_name"; + static constexpr TStringBuf TransformTypeSetting = "transform_type"; + static constexpr TStringBuf TransformConcurrencySetting = "concurrency"; + ui64 LogicalId = 0; TString Id; bool SinglePartition = false; - bool IsExternalFunction = false; + bool IsExternalFunction = false; NDqProto::ETransformType TransformType = NDqProto::TRANSFORM_YANDEX_CLOUD; - TString TransformName; - ui32 TransformConcurrency = 0; - + TString TransformName; + ui32 TransformConcurrency = 0; + static TDqStageSettings Parse(const NNodes::TDqStageBase& node); static TDqStageSettings New(const NNodes::TDqStageBase& node); @@ -35,9 +35,9 @@ struct TDqStageSettings { s.Id = CreateGuidAsString(); return s; } - NNodes::TCoNameValueTupleList BuildNode(TExprContext& ctx, TPositionHandle pos) const; + NNodes::TCoNameValueTupleList BuildNode(TExprContext& ctx, TPositionHandle pos) const; - ui32 MaxTransformConcurrency() const; + ui32 MaxTransformConcurrency() const; }; NNodes::TCoAtom BuildAtom(TStringBuf value, TPositionHandle pos, TExprContext& ctx); diff --git a/ydb/library/yql/dq/opt/dq_opt_phy.cpp b/ydb/library/yql/dq/opt/dq_opt_phy.cpp index f350c51b331..cbe871059d7 100644 --- a/ydb/library/yql/dq/opt/dq_opt_phy.cpp +++ b/ydb/library/yql/dq/opt/dq_opt_phy.cpp @@ -430,123 +430,123 @@ TExprBase DqPushLMapToStage(TExprBase node, TExprContext& ctx, IOptimizationCont return DqPushBaseLMapToStage<TCoLMap>(node, ctx, optCtx, parentsMap, allowStageMultiUsage); } -TExprBase DqBuildExtFunctionStage(TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx, - const TParentsMap& parentsMap, bool allowStageMultiUsage) -{ - Y_UNUSED(optCtx); - Y_UNUSED(allowStageMultiUsage); - Y_UNUSED(parentsMap); - - auto apply = node.Cast<TCoApply>(); - auto callable = apply.Callable().Maybe<TDqSqlExternalFunction>(); - if (!callable - || apply.Args().Count() != 2 - || !apply.Arg(1).Maybe<TDqCnUnionAll>()) { - - return node; - } - callable = callable.Cast(); - TDqCnUnionAll nodeInput {apply.Arg(1).Cast<TDqCnUnionAll>()}; - - if (!IsSingleConsumerConnection(nodeInput, parentsMap, allowStageMultiUsage)) { - return node; - } - - const auto shuffleColumn = Build<TCoAtom>(ctx, node.Pos()) - .Value("_yql_transform_shuffle") - .Done(); - auto addShuffleColumn = Build<TCoLambda>(ctx, node.Pos()) - .Args({"stream"}) - .Body<TCoMap>() - .Input("stream") - .Lambda() - .Args({"row"}) - .Body<TCoAddMember>() - .Struct("row") - .Name(shuffleColumn) - .Item<TCoRandom>().Add<TCoDependsOn>().Input("row").Build().Build() - .Build() - .Build() - .Build() - .Done(); - auto removeShuffleColumn = Build<TCoLambda>(ctx, node.Pos()) - .Args({"row"}) - .Body<TCoForceRemoveMember>() - .Struct("row") - .Name(shuffleColumn) - .Build() - .Done(); - - TVector<TCoNameValueTuple> settings; - auto isExtFunction = Build<TCoNameValueTuple>(ctx, node.Pos()) - .Name().Build(TDqStageSettings::IsExternalSetting) - .Value<TCoBool>().Literal().Build("true").Build() - .Done(); - settings.push_back(isExtFunction); - - auto transformType = callable.TransformType().Cast<TCoString>().Literal().StringValue(); - settings.push_back( - Build<TCoNameValueTuple>(ctx, node.Pos()) - .Name().Build(TDqStageSettings::TransformTypeSetting) - .Value<TCoAtom>().Build(transformType) - .Done()); - - auto transformName = callable.TransformName().Cast<TCoString>().Literal().StringValue(); - settings.push_back( - Build<TCoNameValueTuple>(ctx, node.Pos()) - .Name().Build(TDqStageSettings::TransformNameSetting) - .Value<TCoAtom>().Build(transformName) - .Done()); - - for (const auto &tuple: callable.Settings().Ref().Children()) { - const auto paramName = tuple->Head().Content(); - auto setting = Build<TCoNameValueTuple>(ctx, node.Pos()) - .Name().Build(paramName) - .Value(tuple->TailPtr()) - .Done(); - settings.push_back(setting); - } - - auto settingsBuilder = Build<TCoNameValueTupleList>(ctx, node.Pos()) - .Add(settings) - .Done(); - - auto stage = nodeInput.Output().Stage().Cast<TDqStage>(); - auto dutyColumn = DqPushLambdaToStage(stage, nodeInput.Output().Index(), addShuffleColumn, {}, ctx, optCtx); - YQL_ENSURE(dutyColumn); - - auto transformStage = Build<TDqStage>(ctx, node.Pos()) - .Inputs() - .Add<TDqCnHashShuffle>() - .KeyColumns() - .Add({shuffleColumn}) - .Build() - .Output() - .Stage(dutyColumn.Cast()) - .Index(nodeInput.Output().Index()) - .Build() - .Build() - .Build() - .Program() - .Args({"row"}) - .Body<TCoMap>() - .Lambda(removeShuffleColumn) - .Input("row") - .Build() - .Build() - .Settings(settingsBuilder) - .Done(); - - auto externalStage = Build<TDqCnUnionAll>(ctx, node.Pos()) - .Output() - .Stage(transformStage) - .Index().Build("0") - .Build() - .Done(); - - return externalStage; -} - +TExprBase DqBuildExtFunctionStage(TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx, + const TParentsMap& parentsMap, bool allowStageMultiUsage) +{ + Y_UNUSED(optCtx); + Y_UNUSED(allowStageMultiUsage); + Y_UNUSED(parentsMap); + + auto apply = node.Cast<TCoApply>(); + auto callable = apply.Callable().Maybe<TDqSqlExternalFunction>(); + if (!callable + || apply.Args().Count() != 2 + || !apply.Arg(1).Maybe<TDqCnUnionAll>()) { + + return node; + } + callable = callable.Cast(); + TDqCnUnionAll nodeInput {apply.Arg(1).Cast<TDqCnUnionAll>()}; + + if (!IsSingleConsumerConnection(nodeInput, parentsMap, allowStageMultiUsage)) { + return node; + } + + const auto shuffleColumn = Build<TCoAtom>(ctx, node.Pos()) + .Value("_yql_transform_shuffle") + .Done(); + auto addShuffleColumn = Build<TCoLambda>(ctx, node.Pos()) + .Args({"stream"}) + .Body<TCoMap>() + .Input("stream") + .Lambda() + .Args({"row"}) + .Body<TCoAddMember>() + .Struct("row") + .Name(shuffleColumn) + .Item<TCoRandom>().Add<TCoDependsOn>().Input("row").Build().Build() + .Build() + .Build() + .Build() + .Done(); + auto removeShuffleColumn = Build<TCoLambda>(ctx, node.Pos()) + .Args({"row"}) + .Body<TCoForceRemoveMember>() + .Struct("row") + .Name(shuffleColumn) + .Build() + .Done(); + + TVector<TCoNameValueTuple> settings; + auto isExtFunction = Build<TCoNameValueTuple>(ctx, node.Pos()) + .Name().Build(TDqStageSettings::IsExternalSetting) + .Value<TCoBool>().Literal().Build("true").Build() + .Done(); + settings.push_back(isExtFunction); + + auto transformType = callable.TransformType().Cast<TCoString>().Literal().StringValue(); + settings.push_back( + Build<TCoNameValueTuple>(ctx, node.Pos()) + .Name().Build(TDqStageSettings::TransformTypeSetting) + .Value<TCoAtom>().Build(transformType) + .Done()); + + auto transformName = callable.TransformName().Cast<TCoString>().Literal().StringValue(); + settings.push_back( + Build<TCoNameValueTuple>(ctx, node.Pos()) + .Name().Build(TDqStageSettings::TransformNameSetting) + .Value<TCoAtom>().Build(transformName) + .Done()); + + for (const auto &tuple: callable.Settings().Ref().Children()) { + const auto paramName = tuple->Head().Content(); + auto setting = Build<TCoNameValueTuple>(ctx, node.Pos()) + .Name().Build(paramName) + .Value(tuple->TailPtr()) + .Done(); + settings.push_back(setting); + } + + auto settingsBuilder = Build<TCoNameValueTupleList>(ctx, node.Pos()) + .Add(settings) + .Done(); + + auto stage = nodeInput.Output().Stage().Cast<TDqStage>(); + auto dutyColumn = DqPushLambdaToStage(stage, nodeInput.Output().Index(), addShuffleColumn, {}, ctx, optCtx); + YQL_ENSURE(dutyColumn); + + auto transformStage = Build<TDqStage>(ctx, node.Pos()) + .Inputs() + .Add<TDqCnHashShuffle>() + .KeyColumns() + .Add({shuffleColumn}) + .Build() + .Output() + .Stage(dutyColumn.Cast()) + .Index(nodeInput.Output().Index()) + .Build() + .Build() + .Build() + .Program() + .Args({"row"}) + .Body<TCoMap>() + .Lambda(removeShuffleColumn) + .Input("row") + .Build() + .Build() + .Settings(settingsBuilder) + .Done(); + + auto externalStage = Build<TDqCnUnionAll>(ctx, node.Pos()) + .Output() + .Stage(transformStage) + .Index().Build("0") + .Build() + .Done(); + + return externalStage; +} + TExprBase DqPushCombineToStage(TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx, const TParentsMap& parentsMap, bool allowStageMultiUsage) { diff --git a/ydb/library/yql/dq/opt/dq_opt_phy.h b/ydb/library/yql/dq/opt/dq_opt_phy.h index 0ffe68b8dc5..70a9d70082b 100644 --- a/ydb/library/yql/dq/opt/dq_opt_phy.h +++ b/ydb/library/yql/dq/opt/dq_opt_phy.h @@ -29,9 +29,9 @@ NNodes::TExprBase DqPushOrderedLMapToStage(NNodes::TExprBase node, TExprContext& NNodes::TExprBase DqPushLMapToStage(NNodes::TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx, const TParentsMap& parentsMap, bool allowStageMultiUsage = true); -NNodes::TExprBase DqBuildExtFunctionStage(NNodes::TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx, - const TParentsMap& parentsMap, bool allowStageMultiUsage = true); - +NNodes::TExprBase DqBuildExtFunctionStage(NNodes::TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx, + const TParentsMap& parentsMap, bool allowStageMultiUsage = true); + NNodes::TExprBase DqBuildFlatmapStage(NNodes::TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx, const TParentsMap& parentsMap, bool allowStageMultiUsage = true); diff --git a/ydb/library/yql/dq/proto/dq_tasks.proto b/ydb/library/yql/dq/proto/dq_tasks.proto index 8f878460da3..e29e64ef2c5 100644 --- a/ydb/library/yql/dq/proto/dq_tasks.proto +++ b/ydb/library/yql/dq/proto/dq_tasks.proto @@ -129,16 +129,16 @@ message TTaskOutput { } } -enum ETransformType { - TRANSFORM_YANDEX_CLOUD = 0; -} - -message TDqTransform { - ETransformType Type = 1; - string FunctionName = 2; - string ConnectionName = 3; -} - +enum ETransformType { + TRANSFORM_YANDEX_CLOUD = 0; +} + +message TDqTransform { + ETransformType Type = 1; + string FunctionName = 2; + string ConnectionName = 3; +} + message TDqTask { uint64 Id = 1; uint32 StageId = 9; @@ -149,5 +149,5 @@ message TDqTask { repeated TTaskOutput Outputs = 6; google.protobuf.Any Meta = 7; bool CreateSuspended = 8; - optional TDqTransform OutputTransform = 12; + optional TDqTransform OutputTransform = 12; } diff --git a/ydb/library/yql/dq/tasks/dq_connection_builder.h b/ydb/library/yql/dq/tasks/dq_connection_builder.h index e611125bcc6..2c46061f60d 100644 --- a/ydb/library/yql/dq/tasks/dq_connection_builder.h +++ b/ydb/library/yql/dq/tasks/dq_connection_builder.h @@ -11,7 +11,7 @@ template <class TStageInfoMeta, class TTaskMeta, class TInputMeta, class TOutput void CommonBuildTasks(TDqTasksGraph<TStageInfoMeta, TTaskMeta, TInputMeta, TOutputMeta>& graph, const NNodes::TDqPhyStage& stage) { ui32 partitionsCount = 1; - const auto stageSettings = NDq::TDqStageSettings::Parse(stage); + const auto stageSettings = NDq::TDqStageSettings::Parse(stage); auto& stageInfo = graph.GetStageInfo(stage); for (ui32 inputIndex = 0; inputIndex < stage.Inputs().Size(); ++inputIndex) { const auto& input = stage.Inputs().Item(inputIndex); @@ -32,12 +32,12 @@ void CommonBuildTasks(TDqTasksGraph<TStageInfoMeta, TTaskMeta, TInputMeta, TOutp if (auto maybeCnShuffle = input.Maybe<NNodes::TDqCnHashShuffle>()) { auto shuffle = maybeCnShuffle.Cast(); auto& originStageInfo = graph.GetStageInfo(shuffle.Output().Stage()); - if (stageSettings.IsExternalFunction) { - partitionsCount = stageSettings.MaxTransformConcurrency(); - } else { - partitionsCount = std::max(partitionsCount, (ui32)originStageInfo.Tasks.size() / 2); - partitionsCount = std::min(partitionsCount, 24u); - } + if (stageSettings.IsExternalFunction) { + partitionsCount = stageSettings.MaxTransformConcurrency(); + } else { + partitionsCount = std::max(partitionsCount, (ui32)originStageInfo.Tasks.size() / 2); + partitionsCount = std::min(partitionsCount, 24u); + } } else if (auto maybeCnMap = input.Maybe<NNodes::TDqCnMap>()) { auto cnMap = maybeCnMap.Cast(); auto& originStageInfo = graph.GetStageInfo(cnMap.Output().Stage()); @@ -46,10 +46,10 @@ void CommonBuildTasks(TDqTasksGraph<TStageInfoMeta, TTaskMeta, TInputMeta, TOutp } for (ui32 i = 0; i < partitionsCount; ++i) { - auto& task = graph.AddTask(stageInfo); - auto& transform = task.OutputTransform; - transform.Type = stageSettings.TransformType; - transform.FunctionName = stageSettings.TransformName; + auto& task = graph.AddTask(stageInfo); + auto& transform = task.OutputTransform; + transform.Type = stageSettings.TransformType; + transform.FunctionName = stageSettings.TransformName; } } diff --git a/ydb/library/yql/dq/tasks/dq_tasks_graph.h b/ydb/library/yql/dq/tasks/dq_tasks_graph.h index dcd0b488515..e5c772310bf 100644 --- a/ydb/library/yql/dq/tasks/dq_tasks_graph.h +++ b/ydb/library/yql/dq/tasks/dq_tasks_graph.h @@ -152,12 +152,12 @@ struct TTaskOutput { TOutputMeta Meta; }; -struct TTransform { - NDqProto::ETransformType Type; - TString FunctionName; - TString ConnectionName; -}; - +struct TTransform { + NDqProto::ETransformType Type; + TString FunctionName; + TString ConnectionName; +}; + template <class TStageInfoMeta, class TTaskMeta, class TInputMeta, class TOutputMeta> struct TTask { using TInputType = TTaskInput<TInputMeta>; @@ -176,7 +176,7 @@ struct TTask { NActors::TActorId ComputeActorId; TTaskMeta Meta; NDqProto::ECheckpointingMode CheckpointingMode = NDqProto::CHECKPOINTING_MODE_DEFAULT; - TTransform OutputTransform; + TTransform OutputTransform; }; template <class TStageInfoMeta, class TTaskMeta, class TInputMeta, class TOutputMeta> diff --git a/ydb/library/yql/providers/dq/common/yql_dq_settings.h b/ydb/library/yql/providers/dq/common/yql_dq_settings.h index 28a78b2a860..3c37e4d48fd 100644 --- a/ydb/library/yql/providers/dq/common/yql_dq_settings.h +++ b/ydb/library/yql/providers/dq/common/yql_dq_settings.h @@ -24,7 +24,7 @@ struct TDqSettings { static constexpr int MaxNetworkRetries = 5; static constexpr ui64 LiteralTimeout = 60000; // 1 minutes static constexpr ui64 TableTimeout = 600000; // 10 minutes - static constexpr ui32 CloudFunctionConcurrency = 10; + static constexpr ui32 CloudFunctionConcurrency = 10; }; using TPtr = std::shared_ptr<TDqSettings>; diff --git a/ydb/library/yql/providers/dq/opt/physical_optimize.cpp b/ydb/library/yql/providers/dq/opt/physical_optimize.cpp index 3a71b56da21..858e3da45a0 100644 --- a/ydb/library/yql/providers/dq/opt/physical_optimize.cpp +++ b/ydb/library/yql/providers/dq/opt/physical_optimize.cpp @@ -42,13 +42,13 @@ public: AddHandler(0, &TCoAssumeSorted::Match, HNDL(BuildSortStage<false>)); AddHandler(0, &TCoOrderedLMap::Match, HNDL(PushOrderedLMapToStage<false>)); AddHandler(0, &TCoLMap::Match, HNDL(PushLMapToStage<false>)); - // (Apply (SqlExternalFunction ..) ..) to stage - AddHandler(0, &TCoApply::Match, HNDL(BuildExtFunctionStage<false>)); + // (Apply (SqlExternalFunction ..) ..) to stage + AddHandler(0, &TCoApply::Match, HNDL(BuildExtFunctionStage<false>)); #if 0 AddHandler(0, &TCoHasItems::Match, HNDL(BuildHasItems)); AddHandler(0, &TCoToOptional::Match, HNDL(BuildScalarPrecompute)); #endif - + AddHandler(1, &TCoSkipNullMembers::Match, HNDL(PushSkipNullMembersToStage<true>)); AddHandler(1, &TCoExtractMembers::Match, HNDL(PushExtractMembersToStage<true>)); AddHandler(1, &TCoFlatMapBase::Match, HNDL(BuildFlatmapStage<true>)); @@ -62,7 +62,7 @@ public: AddHandler(1, &TCoAssumeSorted::Match, HNDL(BuildSortStage<true>)); AddHandler(1, &TCoOrderedLMap::Match, HNDL(PushOrderedLMapToStage<true>)); AddHandler(1, &TCoLMap::Match, HNDL(PushLMapToStage<true>)); - AddHandler(1, &TCoApply::Match, HNDL(BuildExtFunctionStage<true>)); + AddHandler(1, &TCoApply::Match, HNDL(BuildExtFunctionStage<true>)); #undef HNDL SetGlobal(1u); @@ -213,11 +213,11 @@ protected: } template <bool IsGlobal> - TMaybeNode<TExprBase> BuildExtFunctionStage(TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx, const TGetParents& getParents) { - return DqBuildExtFunctionStage(node, ctx, optCtx, *getParents(), IsGlobal); - } - - template <bool IsGlobal> + TMaybeNode<TExprBase> BuildExtFunctionStage(TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx, const TGetParents& getParents) { + return DqBuildExtFunctionStage(node, ctx, optCtx, *getParents(), IsGlobal); + } + + template <bool IsGlobal> TMaybeNode<TExprBase> PushCombineToStage(TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx, const TGetParents& getParents) { return DqPushCombineToStage(node, ctx, optCtx, *getParents(), IsGlobal); } diff --git a/ydb/library/yql/providers/dq/planner/execution_planner.cpp b/ydb/library/yql/providers/dq/planner/execution_planner.cpp index 21f8f90de18..ec404c0a6dc 100644 --- a/ydb/library/yql/providers/dq/planner/execution_planner.cpp +++ b/ydb/library/yql/providers/dq/planner/execution_planner.cpp @@ -359,10 +359,10 @@ namespace NYql::NDqs { FillOutputDesc(*taskDesc.AddOutputs(), output); } - auto& transform = *taskDesc.MutableOutputTransform(); - transform.SetType(task.OutputTransform.Type); - transform.SetFunctionName(task.OutputTransform.FunctionName); - + auto& transform = *taskDesc.MutableOutputTransform(); + transform.SetType(task.OutputTransform.Type); + transform.SetFunctionName(task.OutputTransform.FunctionName); + auto& program = *taskDesc.MutableProgram(); program.SetRuntimeVersion(NYql::NDqProto::ERuntimeVersion::RUNTIME_VERSION_YQL_1_0); TString programStr; @@ -454,11 +454,11 @@ namespace NYql::NDqs { auto datasource = TypeContext->DataSourceMap.FindPtr(dataSourceName); YQL_ENSURE(datasource); const auto stageSettings = TDqStageSettings::Parse(stage); - auto tasksPerStage = settings->MaxTasksPerStage.Get().GetOrElse(TDqSettings::TDefault::MaxTasksPerStage); - if (stageSettings.IsExternalFunction) { - tasksPerStage = Min(tasksPerStage, stageSettings.MaxTransformConcurrency()); - } - const size_t maxPartitions = stageSettings.SinglePartition ? 1ULL : tasksPerStage; + auto tasksPerStage = settings->MaxTasksPerStage.Get().GetOrElse(TDqSettings::TDefault::MaxTasksPerStage); + if (stageSettings.IsExternalFunction) { + tasksPerStage = Min(tasksPerStage, stageSettings.MaxTransformConcurrency()); + } + const size_t maxPartitions = stageSettings.SinglePartition ? 1ULL : tasksPerStage; TVector<TString> parts; if (auto dqIntegration = (*datasource)->GetDqIntegration()) { TString clusterName; @@ -479,9 +479,9 @@ namespace NYql::NDqs { task.Inputs[dqSourceInputIndex].SourceSettings = sourceSettings; task.Inputs[dqSourceInputIndex].SourceType = sourceType; } - auto& transform = task.OutputTransform; - transform.Type = stageSettings.TransformType; - transform.FunctionName = stageSettings.TransformName; + auto& transform = task.OutputTransform; + transform.Type = stageSettings.TransformType; + transform.FunctionName = stageSettings.TransformName; } } return !parts.empty(); diff --git a/ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp b/ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp index 59ed72035fe..8759f442219 100644 --- a/ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp +++ b/ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp @@ -657,10 +657,10 @@ private: } THashMap<ui32, ui32> allPublicIds; - bool hasStageError = false; + bool hasStageError = false; VisitExpr(result.Ptr(), [&](const TExprNode::TPtr& node) { - const TExprBase expr(node); - if (expr.Maybe<TResFill>()) { + const TExprBase expr(node); + if (expr.Maybe<TResFill>()) { if (auto publicId = State->TypeCtx->TranslateOperationId(node->UniqueId())) { allPublicIds.emplace(*publicId, 0U); } @@ -668,10 +668,10 @@ private: return true; }); - if (hasStageError) { - return SyncError(); - } - + if (hasStageError) { + return SyncError(); + } + IDqGateway::TDqProgressWriter progressWriter = MakeDqProgressWriter(allPublicIds); auto executionPlanner = THolder<IDqsExecutionPlanner>(new TDqsSingleExecutionPlanner(lambda, NActors::TActorId(), NActors::TActorId(1, 0, 1, 0), State->FunctionRegistry, result.Input().Ref().GetTypeAnn())); @@ -849,7 +849,7 @@ private: size_t graphsCount = 0; THashMap<ui32, ui32> allPublicIds; THashMap<ui64, ui32> stage2publicId; - bool hasStageError = false; + bool hasStageError = false; VisitExpr(pull.Ptr(), [&](const TExprNode::TPtr& node) { if (TResTransientBase::Match(node.Get())) return false; @@ -858,8 +858,8 @@ private: allPublicIds.emplace(*publicId, 0U); } } else if (const auto& maybeStage = expr.Maybe<TDqStage>()) { - const auto& stage = maybeStage.Cast(); - if (!(stage.Ref().StartsExecution() || stage.Ref().HasResult())) { + const auto& stage = maybeStage.Cast(); + if (!(stage.Ref().StartsExecution() || stage.Ref().HasResult())) { if (const auto publicId = State->TypeCtx->TranslateOperationId(node->UniqueId())) { if (const auto settings = NDq::TDqStageSettings::Parse(maybeStage.Cast()); settings.LogicalId) { stage2publicId[settings.LogicalId] = *publicId; @@ -876,10 +876,10 @@ private: }); YQL_ENSURE(!oneGraphPerQuery || graphsCount == 1, "Internal error: only one graph per query is allowed"); - if (hasStageError) { - return SyncError(); - } - + if (hasStageError) { + return SyncError(); + } + auto optimizedInput = pull.Input().Ptr(); THashMap<TString, TString> secureParams; NCommon::FillSecureParams(optimizedInput, *State->TypeCtx, secureParams); diff --git a/ydb/library/yql/sql/v1/SQLv1.g.in b/ydb/library/yql/sql/v1/SQLv1.g.in index 6e376574c5e..ef394c7a009 100644 --- a/ydb/library/yql/sql/v1/SQLv1.g.in +++ b/ydb/library/yql/sql/v1/SQLv1.g.in @@ -219,7 +219,7 @@ invoke_expr_tail: (null_treatment | filter_clause)? (OVER window_name_or_specification)? ; -using_call_expr: ((an_id_or_type NAMESPACE an_id_or_type) | an_id_expr | bind_parameter | (EXTERNAL FUNCTION)) invoke_expr; +using_call_expr: ((an_id_or_type NAMESPACE an_id_or_type) | an_id_expr | bind_parameter | (EXTERNAL FUNCTION)) invoke_expr; key_expr: LBRACE_SQUARE expr RBRACE_SQUARE; @@ -342,13 +342,13 @@ select_kind: (DISCARD)? (process_core | reduce_core | select_core) (INTO RESULT process_core: PROCESS STREAM? named_single_source (COMMA named_single_source)* (USING using_call_expr (AS an_id)? - (WITH external_call_settings)? + (WITH external_call_settings)? (WHERE expr)? (HAVING expr)? (ASSUME order_by_clause)?)? ; -external_call_param: an_id EQUALS expr; -external_call_settings: external_call_param (COMMA external_call_param)*; - +external_call_param: an_id EQUALS expr; +external_call_settings: external_call_param (COMMA external_call_param)*; + reduce_core: REDUCE named_single_source (COMMA named_single_source)* (PRESORT sort_specification_list)? ON column_list USING ALL? using_call_expr (AS an_id)? @@ -911,14 +911,14 @@ keyword_compat: ( | EXCLUSIVE | EXPLAIN | EXPORT - | EXTERNAL + | EXTERNAL | FAIL | FILTER | FLATTEN | FOLLOWING | FOR | FOREIGN - | FUNCTION + | FUNCTION | GLOB | GROUP | GROUPING @@ -1191,7 +1191,7 @@ EXCLUSION: E X C L U S I O N; EXISTS: E X I S T S; EXPLAIN: E X P L A I N; EXPORT: E X P O R T; -EXTERNAL: E X T E R N A L; +EXTERNAL: E X T E R N A L; FAIL: F A I L; FAMILY: F A M I L Y; FILTER: F I L T E R; @@ -1202,7 +1202,7 @@ FOR: F O R; FOREIGN: F O R E I G N; FROM: F R O M; FULL: F U L L; -FUNCTION: F U N C T I O N; +FUNCTION: F U N C T I O N; GLOB: G L O B; GLOBAL: G L O B A L; GROUP: G R O U P; diff --git a/ydb/library/yql/sql/v1/builtin.cpp b/ydb/library/yql/sql/v1/builtin.cpp index 7b2944907e9..43ef1243f10 100644 --- a/ydb/library/yql/sql/v1/builtin.cpp +++ b/ydb/library/yql/sql/v1/builtin.cpp @@ -3203,8 +3203,8 @@ TNodePtr BuildBuiltinFunc(TContext& ctx, TPosition pos, TString name, const TVec BuildTuple(pos, {BuildQuotedAtom(pos, ""), args[1]}), }; return new TCallNodeImpl(pos, "FlattenMembers", 2, 2, flattenMembersArgs); - } else if (normalizedName == "sqlexternalfunction") { - return new TCallNodeImpl(pos, "SqlExternalFunction", args); + } else if (normalizedName == "sqlexternalfunction") { + return new TCallNodeImpl(pos, "SqlExternalFunction", args); } else { return new TInvalidBuiltin(pos, TStringBuilder() << "Unknown builtin: " << name); } diff --git a/ydb/library/yql/sql/v1/node.cpp b/ydb/library/yql/sql/v1/node.cpp index b724aa480d7..870bbd9cc7f 100644 --- a/ydb/library/yql/sql/v1/node.cpp +++ b/ydb/library/yql/sql/v1/node.cpp @@ -763,25 +763,25 @@ bool TWinLeadLag::DoInit(TContext& ctx, ISource* src) { TWinRank::TWinRank(TPosition pos, const TString& opName, i32 minArgs, i32 maxArgs, const TVector<TNodePtr>& args) : TWinAggrEmulation(pos, opName, minArgs, maxArgs, args) -{ - -} - -bool TExternalFunctionConfig::DoInit(TContext& ctx, ISource* src) { - for (auto& param: Config) { - auto paramName = Y(BuildQuotedAtom(Pos, param.first)); - if (!param.second->Init(ctx, src)) { - return false; - } - Nodes.push_back(Q(L(paramName, param.second))); - } - return true; -} - -INode::TPtr TExternalFunctionConfig::DoClone() const { - return {}; -} - +{ + +} + +bool TExternalFunctionConfig::DoInit(TContext& ctx, ISource* src) { + for (auto& param: Config) { + auto paramName = Y(BuildQuotedAtom(Pos, param.first)); + if (!param.second->Init(ctx, src)) { + return false; + } + Nodes.push_back(Q(L(paramName, param.second))); + } + return true; +} + +INode::TPtr TExternalFunctionConfig::DoClone() const { + return {}; +} + bool TWinRank::DoInit(TContext& ctx, ISource* src) { if (!ValidateArguments(ctx)) { return false; diff --git a/ydb/library/yql/sql/v1/node.h b/ydb/library/yql/sql/v1/node.h index ac06a6e7278..de82b45a238 100644 --- a/ydb/library/yql/sql/v1/node.h +++ b/ydb/library/yql/sql/v1/node.h @@ -412,23 +412,23 @@ namespace NSQLTranslationV1 { TString FuncAlias; }; - using TFunctionConfig = TMap<TString, TNodePtr>; - - class TExternalFunctionConfig final: public TAstListNode { - public: - TExternalFunctionConfig(TPosition pos, const TFunctionConfig& config) - : TAstListNode(pos) - , Config(config) - { - } - - bool DoInit(TContext& ctx, ISource* src) override; - TPtr DoClone() const final; - - private: - TFunctionConfig Config; - }; - + using TFunctionConfig = TMap<TString, TNodePtr>; + + class TExternalFunctionConfig final: public TAstListNode { + public: + TExternalFunctionConfig(TPosition pos, const TFunctionConfig& config) + : TAstListNode(pos) + , Config(config) + { + } + + bool DoInit(TContext& ctx, ISource* src) override; + TPtr DoClone() const final; + + private: + TFunctionConfig Config; + }; + class TWinRowNumber final: public TWinAggrEmulation { TPtr DoClone() const final { return CallNodeClone<TWinRowNumber>(); @@ -1300,7 +1300,7 @@ namespace NSQLTranslationV1 { TSourcePtr BuildReduce(TPosition pos, ReduceMode mode, TSourcePtr source, TVector<TSortSpecificationPtr>&& orderBy, TVector<TNodePtr>&& keys, TVector<TNodePtr>&& args, TNodePtr udf, TNodePtr having, const TWriteSettings& settings, const TVector<TSortSpecificationPtr>& assumeOrderBy, bool listCall); - TSourcePtr BuildProcess(TPosition pos, TSourcePtr source, TNodePtr with, bool withExtFunction, TVector<TNodePtr>&& terms, bool listCall, + TSourcePtr BuildProcess(TPosition pos, TSourcePtr source, TNodePtr with, bool withExtFunction, TVector<TNodePtr>&& terms, bool listCall, bool prcessStream, const TWriteSettings& settings, const TVector<TSortSpecificationPtr>& assumeOrderBy); TNodePtr BuildSelectResult(TPosition pos, TSourcePtr source, bool writeResult, bool inSubquery, TScopedStatePtr scoped); diff --git a/ydb/library/yql/sql/v1/select.cpp b/ydb/library/yql/sql/v1/select.cpp index 8bfb8eb88af..2841f05a5b5 100644 --- a/ydb/library/yql/sql/v1/select.cpp +++ b/ydb/library/yql/sql/v1/select.cpp @@ -2102,7 +2102,7 @@ public: TPosition pos, TSourcePtr source, TNodePtr with, - bool withExtFunction, + bool withExtFunction, TVector<TNodePtr>&& terms, bool listCall, bool processStream, @@ -2112,7 +2112,7 @@ public: : IRealSource(pos) , Source(std::move(source)) , With(with) - , WithExtFunction(withExtFunction) + , WithExtFunction(withExtFunction) , Terms(std::move(terms)) , ListCall(listCall) , ProcessStream(processStream) @@ -2153,13 +2153,13 @@ public: return false; } - TSourcePtr fakeSource = nullptr; - if (ListCall && !WithExtFunction) { + TSourcePtr fakeSource = nullptr; + if (ListCall && !WithExtFunction) { fakeSource = BuildFakeSource(src->GetPos()); src->AllColumns(); } - auto processSource = fakeSource != nullptr ? fakeSource.Get() : src; + auto processSource = fakeSource != nullptr ? fakeSource.Get() : src; Y_VERIFY_DEBUG(processSource != nullptr); if (!With->Init(ctx, processSource)) { return false; @@ -2174,15 +2174,15 @@ public: Columns.Add(&With->GetLabel(), false); } - bool hasError = false; + bool hasError = false; - TNodePtr produce; - if (WithExtFunction) { - produce = Y(); - } else { - TString processCall = (ListCall ? "SqlProcess" : "Apply"); - produce = Y(processCall, With); - } + TNodePtr produce; + if (WithExtFunction) { + produce = Y(); + } else { + TString processCall = (ListCall ? "SqlProcess" : "Apply"); + produce = Y(processCall, With); + } TMaybe<ui32> listPosIndex; ui32 termIndex = 0; for (auto& term: Terms) { @@ -2211,7 +2211,7 @@ public: return false; } - if (ListCall && !WithExtFunction) { + if (ListCall && !WithExtFunction) { YQL_ENSURE(listPosIndex.Defined()); produce = L(produce, Q(ToString(*listPosIndex))); } @@ -2220,9 +2220,9 @@ public: hasError = true; } - if (!(WithExtFunction && Terms.empty())) { - TVector<TNodePtr>(1, produce).swap(Terms); - } + if (!(WithExtFunction && Terms.empty())) { + TVector<TNodePtr>(1, produce).swap(Terms); + } src->FinishColumns(); @@ -2268,15 +2268,15 @@ public: block = L(block, Y("let", inputLabel, filter)); } - if (WithExtFunction) { - auto preTransform = Y("RemoveSystemMembers", inputLabel); - if (Terms.size() > 0) { - preTransform = Y("Map", preTransform, BuildLambda(Pos, Y("row"), Q(Terms[0]))); - } - block = L(block, Y("let", inputLabel, preTransform)); - block = L(block, Y("let", "transform", With)); - block = L(block, Y("let", "core", Y("Apply", "transform", inputLabel))); - } else if (ListCall) { + if (WithExtFunction) { + auto preTransform = Y("RemoveSystemMembers", inputLabel); + if (Terms.size() > 0) { + preTransform = Y("Map", preTransform, BuildLambda(Pos, Y("row"), Q(Terms[0]))); + } + block = L(block, Y("let", inputLabel, preTransform)); + block = L(block, Y("let", "transform", With)); + block = L(block, Y("let", "core", Y("Apply", "transform", inputLabel))); + } else if (ListCall) { block = L(block, Y("let", "core", Terms[0])); } else { auto terms = BuildColumnsTerms(ctx); @@ -2315,7 +2315,7 @@ public: } TNodePtr DoClone() const final { - return new TProcessSource(Pos, Source->CloneSource(), SafeClone(With), WithExtFunction, + return new TProcessSource(Pos, Source->CloneSource(), SafeClone(With), WithExtFunction, CloneContainer(Terms), ListCall, ProcessStream, Settings, CloneContainer(AssumeOrderBy)); } @@ -2338,7 +2338,7 @@ private: private: TSourcePtr Source; TNodePtr With; - const bool WithExtFunction; + const bool WithExtFunction; TVector<TNodePtr> Terms; const bool ListCall; const bool ProcessStream; @@ -2350,14 +2350,14 @@ TSourcePtr BuildProcess( TPosition pos, TSourcePtr source, TNodePtr with, - bool withExtFunction, + bool withExtFunction, TVector<TNodePtr>&& terms, bool listCall, bool processStream, const TWriteSettings& settings, const TVector<TSortSpecificationPtr>& assumeOrderBy ) { - return new TProcessSource(pos, std::move(source), with, withExtFunction, std::move(terms), listCall, processStream, settings, assumeOrderBy); + return new TProcessSource(pos, std::move(source), with, withExtFunction, std::move(terms), listCall, processStream, settings, assumeOrderBy); } class TNestedProxySource: public IProxySource { diff --git a/ydb/library/yql/sql/v1/sql.cpp b/ydb/library/yql/sql/v1/sql.cpp index 6c43c9775ad..ab5a8647885 100644 --- a/ydb/library/yql/sql/v1/sql.cpp +++ b/ydb/library/yql/sql/v1/sql.cpp @@ -2,7 +2,7 @@ #include "context.h" #include "node.h" -#include "sql_call_param.h" +#include "sql_call_param.h" #include "ydb/library/yql/ast/yql_ast.h" #include <ydb/library/yql/parser/proto_ast/collect_issues/collect_issues.h> #include <ydb/library/yql/parser/proto_ast/gen/v1/SQLv1Lexer.h> @@ -1102,8 +1102,8 @@ public: , AggMode(call.AggMode) , DistinctAllowed(call.DistinctAllowed) , UsingCallExpr(call.UsingCallExpr) - , IsExternalCall(call.IsExternalCall) - , CallConfig(call.CallConfig) + , IsExternalCall(call.IsExternalCall) + , CallConfig(call.CallConfig) { } @@ -1117,7 +1117,7 @@ public: bool Init(const TRule_using_call_expr& node); bool Init(const TRule_value_constructor& node); bool Init(const TRule_invoke_expr& node); - bool ConfigureExternalCall(const TRule_external_call_settings& node); + bool ConfigureExternalCall(const TRule_external_call_settings& node); void IncCounters(); TNodePtr BuildUdf(bool forReduce) { @@ -1149,27 +1149,27 @@ public: } args.emplace_back(BuildTuple(Pos, PositionalArgs)); args.emplace_back(BuildStructure(Pos, NamedArgs)); - } else if (IsExternalCall) { - Func = "SqlExternalFunction"; - if (Args.size() < 2 || Args.size() > 3) { - Ctx.Error(Pos) << "EXTERNAL FUNCTION requires from 2 to 3 arguments, but got: " << Args.size(); - return nullptr; - } - - if (Args.size() == 3) { - args.insert(args.end(), Args.begin(), Args.end() - 1); - Args.erase(Args.begin(), Args.end() - 1); - } else { - args.insert(args.end(), Args.begin(), Args.end()); - Args.erase(Args.begin(), Args.end()); - } - auto configNode = new TExternalFunctionConfig(Pos, CallConfig); - auto configList = new TAstListNodeImpl(Pos, { new TAstAtomNodeImpl(Pos, "quote", 0), configNode }); - args.push_back(configList); + } else if (IsExternalCall) { + Func = "SqlExternalFunction"; + if (Args.size() < 2 || Args.size() > 3) { + Ctx.Error(Pos) << "EXTERNAL FUNCTION requires from 2 to 3 arguments, but got: " << Args.size(); + return nullptr; + } + + if (Args.size() == 3) { + args.insert(args.end(), Args.begin(), Args.end() - 1); + Args.erase(Args.begin(), Args.end() - 1); + } else { + args.insert(args.end(), Args.begin(), Args.end()); + Args.erase(Args.begin(), Args.end()); + } + auto configNode = new TExternalFunctionConfig(Pos, CallConfig); + auto configList = new TAstListNodeImpl(Pos, { new TAstAtomNodeImpl(Pos, "quote", 0), configNode }); + args.push_back(configList); } else { args.insert(args.end(), Args.begin(), Args.end()); } - + auto result = BuildBuiltinFunc(Ctx, Pos, Func, args, Module, AggMode, &mustUseNamed, warnOnYqlNameSpace); if (mustUseNamed) { Error() << "Named args are used for call, but unsupported by function: " << Func; @@ -1200,16 +1200,16 @@ public: Func += "_IgnoreNulls"; } - bool IsExternal() { - return IsExternalCall; - } - + bool IsExternal() { + return IsExternalCall; + } + private: bool ExtractCallParam(const TRule_external_call_param& node); bool FillArg(const TString& module, const TString& func, size_t& idx, const TRule_named_expr& node); bool FillArgs(const TRule_named_expr_list& node); - -private: + +private: TPosition Pos; TString Func; TString Module; @@ -1221,8 +1221,8 @@ private: TString WindowName; bool DistinctAllowed = false; bool UsingCallExpr = false; - bool IsExternalCall = false; - TFunctionConfig CallConfig; + bool IsExternalCall = false; + TFunctionConfig CallConfig; }; TNodePtr TSqlTranslation::NamedExpr(const TRule_named_expr& node, EExpr exprMode) { @@ -3192,9 +3192,9 @@ bool TSqlCallExpr::Init(const TRule_value_constructor& node) { } bool TSqlCallExpr::ExtractCallParam(const TRule_external_call_param& node) { - TString paramName = Id(node.GetRule_an_id1(), *this); - paramName = to_lower(paramName); - + TString paramName = Id(node.GetRule_an_id1(), *this); + paramName = to_lower(paramName); + if (CallConfig.contains(paramName)) { Ctx.Error() << "WITH " << to_upper(paramName).Quote() << " clause should be specified only once"; @@ -3215,27 +3215,27 @@ bool TSqlCallExpr::ExtractCallParam(const TRule_external_call_param& node) { TDeferredAtom atom; MakeTableFromExpression(Ctx, value, atom); value = new TCallNodeImpl(Ctx.Pos(), "String", { atom.Build() }); - } - + } + if (!value) { return false; } CallConfig[paramName] = value; return true; -} - -bool TSqlCallExpr::ConfigureExternalCall(const TRule_external_call_settings& node) { +} + +bool TSqlCallExpr::ConfigureExternalCall(const TRule_external_call_settings& node) { bool success = ExtractCallParam(node.GetRule_external_call_param1()); for (auto& block: node.GetBlock2()) { success = ExtractCallParam(block.GetRule_external_call_param2()) && success; - } - - return success; -} - + } + + return success; +} + bool TSqlCallExpr::Init(const TRule_using_call_expr& node) { - // using_call_expr: ((an_id_or_type NAMESPACE an_id_or_type) | an_id_expr | bind_parameter | (EXTERNAL FUNCTION)) invoke_expr; + // using_call_expr: ((an_id_or_type NAMESPACE an_id_or_type) | an_id_expr | bind_parameter | (EXTERNAL FUNCTION)) invoke_expr; const auto& block = node.GetBlock1(); switch (block.Alt_case()) { case TRule_using_call_expr::TBlock1::kAlt1: { @@ -3259,10 +3259,10 @@ bool TSqlCallExpr::Init(const TRule_using_call_expr& node) { } break; } - case TRule_using_call_expr::TBlock1::kAlt4: { - IsExternalCall = true; - break; - } + case TRule_using_call_expr::TBlock1::kAlt4: { + IsExternalCall = true; + break; + } default: Y_FAIL("You should change implementation according to grammar changes"); } @@ -3373,11 +3373,11 @@ bool TSqlCallExpr::Init(const TRule_invoke_expr& node) { break; } case TRule_invoke_expr::TBlock2::kAlt2: - if (IsExternalCall) { - Ctx.Error() << "You should set EXTERNAL FUNCTION type. Example: EXTERNAL FUNCTION('YANDEX-CLOUD', ...)"; - } else { - Args.push_back(new TAsteriskNode(Pos)); - } + if (IsExternalCall) { + Ctx.Error() << "You should set EXTERNAL FUNCTION type. Example: EXTERNAL FUNCTION('YANDEX-CLOUD', ...)"; + } else { + Args.push_back(new TAsteriskNode(Pos)); + } break; default: Y_FAIL("You should change implementation according to grammar changes"); @@ -3387,11 +3387,11 @@ bool TSqlCallExpr::Init(const TRule_invoke_expr& node) { const auto& tail = node.GetRule_invoke_expr_tail4(); if (tail.HasBlock1()) { - if (IsExternalCall) { - Ctx.Error() << "Additional clause after EXTERNAL FUNCTION(...) is not supported"; - return false; - } - + if (IsExternalCall) { + Ctx.Error() << "Additional clause after EXTERNAL FUNCTION(...) is not supported"; + return false; + } + switch (tail.GetBlock1().Alt_case()) { case TRule_invoke_expr_tail::TBlock1::kAlt1: { if (!tail.HasBlock2()) { @@ -6321,7 +6321,7 @@ bool TSqlTranslation::RoleParameters(const TRule_create_user_option& node, TRole TSourcePtr TSqlSelect::ProcessCore(const TRule_process_core& node, const TWriteSettings& settings, TPosition& selectPos) { // PROCESS STREAM? named_single_source (COMMA named_single_source)* (USING using_call_expr (AS an_id)? - // (WITH external_call_settings)? + // (WITH external_call_settings)? // (WHERE expr)? (HAVING expr)? (ASSUME order_by_clause)?)? Token(node.GetToken1()); @@ -6352,14 +6352,14 @@ TSourcePtr TSqlSelect::ProcessCore(const TRule_process_core& node, const TWriteS const bool processStream = node.HasBlock2(); if (!hasUsing) { - return BuildProcess(startPos, std::move(source), nullptr, false, {}, false, processStream, settings, {}); + return BuildProcess(startPos, std::move(source), nullptr, false, {}, false, processStream, settings, {}); } const auto& block5 = node.GetBlock5(); - if (block5.HasBlock5()) { + if (block5.HasBlock5()) { TSqlExpression expr(Ctx, Mode); TColumnRefScope scope(Ctx, EColumnRefState::Allow); - TNodePtr where = expr.Build(block5.GetBlock5().GetRule_expr2()); + TNodePtr where = expr.Build(block5.GetBlock5().GetRule_expr2()); if (!where || !source->AddFilter(Ctx, where)) { return nullptr; } @@ -6368,7 +6368,7 @@ TSourcePtr TSqlSelect::ProcessCore(const TRule_process_core& node, const TWriteS Ctx.IncrementMonCounter("sql_features", processStream ? "ProcessStream" : "Process"); } - if (block5.HasBlock6()) { + if (block5.HasBlock6()) { Ctx.Error() << "PROCESS does not allow HAVING yet! You may request it on yql@ maillist."; return nullptr; } @@ -6395,45 +6395,45 @@ TSourcePtr TSqlSelect::ProcessCore(const TRule_process_core& node, const TWriteS } } - if (!call.IsExternal() && block5.HasBlock4()) { - Ctx.Error() << "PROCESS without USING EXTERNAL FUNCTION doesn't allow WITH block"; - return nullptr; - } - - if (block5.HasBlock4()) { - const auto& block54 = block5.GetBlock4(); - if (!call.ConfigureExternalCall(block54.GetRule_external_call_settings2())) { - return nullptr; - } - } - + if (!call.IsExternal() && block5.HasBlock4()) { + Ctx.Error() << "PROCESS without USING EXTERNAL FUNCTION doesn't allow WITH block"; + return nullptr; + } + + if (block5.HasBlock4()) { + const auto& block54 = block5.GetBlock4(); + if (!call.ConfigureExternalCall(block54.GetRule_external_call_settings2())) { + return nullptr; + } + } + TSqlCallExpr finalCall(call, args); - TNodePtr with(finalCall.IsExternal() ? finalCall.BuildCall() : finalCall.BuildUdf(/* forReduce = */ false)); + TNodePtr with(finalCall.IsExternal() ? finalCall.BuildCall() : finalCall.BuildUdf(/* forReduce = */ false)); if (!with) { return {}; } args = finalCall.GetArgs(); - if (call.IsExternal()) - listCall = true; + if (call.IsExternal()) + listCall = true; if (block5.HasBlock3()) { with->SetLabel(Id(block5.GetBlock3().GetRule_an_id2(), *this)); } - if (call.IsExternal() && block5.HasBlock7()) { - Ctx.Error() << "PROCESS with USING EXTERNAL FUNCTION doesn't allow ASSUME block"; - return nullptr; - } - + if (call.IsExternal() && block5.HasBlock7()) { + Ctx.Error() << "PROCESS with USING EXTERNAL FUNCTION doesn't allow ASSUME block"; + return nullptr; + } + TVector<TSortSpecificationPtr> assumeOrderBy; - if (block5.HasBlock7()) { - if (!OrderByClause(block5.GetBlock7().GetRule_order_by_clause2(), assumeOrderBy)) { + if (block5.HasBlock7()) { + if (!OrderByClause(block5.GetBlock7().GetRule_order_by_clause2(), assumeOrderBy)) { return nullptr; } Ctx.IncrementMonCounter("sql_features", IsColumnsOnly(assumeOrderBy) ? "AssumeOrderBy" : "AssumeOrderByExpr"); } - return BuildProcess(startPos, std::move(source), with, finalCall.IsExternal(), std::move(args), listCall, processStream, settings, assumeOrderBy); + return BuildProcess(startPos, std::move(source), with, finalCall.IsExternal(), std::move(args), listCall, processStream, settings, assumeOrderBy); } TSourcePtr TSqlSelect::ReduceCore(const TRule_reduce_core& node, const TWriteSettings& settings, TPosition& selectPos) { diff --git a/ydb/library/yql/sql/v1/sql_call_param.h b/ydb/library/yql/sql/v1/sql_call_param.h index 8b9f4488cb1..57495afd88f 100644 --- a/ydb/library/yql/sql/v1/sql_call_param.h +++ b/ydb/library/yql/sql/v1/sql_call_param.h @@ -1,20 +1,20 @@ -#pragma once - -#include <util/system/types.h> - -namespace NSQLTranslationV1 { - -/////////////////////////////////////////////////////////////////////////////////////////////// - -enum class ESqlCallParam: ui32 { - InputType /* "INPUT_TYPE" */, // as is - OutputType /* "OUTPUT_TYPE" */, // as is - Concurrency /* "CONCURRENCY" */, // as is - BatchSize /* "BATCH_SIZE" */, // as is - OptimizeFor /* "OPTIMIZE_FOR" */, // evaluate atom - Connection /* "CONNECTION" */, // evaluate atom - Init /* "INIT" */, // as is -}; - -/////////////////////////////////////////////////////////////////////////////////////////////// -} +#pragma once + +#include <util/system/types.h> + +namespace NSQLTranslationV1 { + +/////////////////////////////////////////////////////////////////////////////////////////////// + +enum class ESqlCallParam: ui32 { + InputType /* "INPUT_TYPE" */, // as is + OutputType /* "OUTPUT_TYPE" */, // as is + Concurrency /* "CONCURRENCY" */, // as is + BatchSize /* "BATCH_SIZE" */, // as is + OptimizeFor /* "OPTIMIZE_FOR" */, // evaluate atom + Connection /* "CONNECTION" */, // evaluate atom + Init /* "INIT" */, // as is +}; + +/////////////////////////////////////////////////////////////////////////////////////////////// +} diff --git a/ydb/library/yql/sql/v1/sql_ut.cpp b/ydb/library/yql/sql/v1/sql_ut.cpp index 58a51a69199..85b6e0dfcd3 100644 --- a/ydb/library/yql/sql/v1/sql_ut.cpp +++ b/ydb/library/yql/sql/v1/sql_ut.cpp @@ -1604,55 +1604,55 @@ Y_UNIT_TEST_SUITE(SqlParsingOnly) { } } -Y_UNIT_TEST_SUITE(ExternalFunction) { - Y_UNIT_TEST(ValidUseFunctions) { - - UNIT_ASSERT(SqlToYql( - "PROCESS plato.Input" - " USING EXTERNAL FUNCTION('YANDEX-CLOUD', 'foo', <|a: 123, b: a + 641|>)" - " WITH INPUT_TYPE=Struct<a:Int32>, OUTPUT_TYPE=Struct<b:Int32>," - " CONCURRENCY=3, OPTIMIZE_FOR='CALLS'").IsOk()); - - // use CALLS without quotes, as keyword - UNIT_ASSERT(SqlToYql( - "PROCESS plato.Input" - " USING EXTERNAL FUNCTION('YANDEX-CLOUD', 'foo')" - " WITH INPUT_TYPE=Struct<a:Int32>, OUTPUT_TYPE=Struct<b:Int32>," - " OPTIMIZE_FOR=CALLS").IsOk()); - - UNIT_ASSERT(SqlToYql( - "PROCESS plato.Input" - " USING EXTERNAL FUNCTION('YANDEX-CLOUD', 'foo', TableRow())" - " WITH INPUT_TYPE=Struct<a:Int32>, OUTPUT_TYPE=Struct<b:Int32>," - " CONCURRENCY=3").IsOk()); - - UNIT_ASSERT(SqlToYql( - "PROCESS plato.Input" - " USING EXTERNAL FUNCTION('YANDEX-CLOUD', 'foo')" - " WITH INPUT_TYPE=Struct<a:Int32>, OUTPUT_TYPE=Struct<b:Int32>," - " CONCURRENCY=3, BATCH_SIZE=1000000, CONNECTION='yc-folder34fse-con'," - " INIT=[0, 900]").IsOk()); - - UNIT_ASSERT(SqlToYql( - "PROCESS plato.Input" - " USING EXTERNAL FUNCTION('YANDEX-CLOUD', 'bar', TableRow())" - " WITH UNKNOWN_PARAM_1='837747712', UNKNOWN_PARAM_2=Tuple<Uint16, Utf8>," - " INPUT_TYPE=Struct<a:Int32>, OUTPUT_TYPE=Struct<b:Int32>").IsOk()); - } - - - Y_UNIT_TEST(InValidUseFunctions) { - ExpectFailWithError("PROCESS plato.Input USING some::udf(*) WITH INPUT_TYPE=Struct<a:Int32>", - "<main>:1:33: Error: PROCESS without USING EXTERNAL FUNCTION doesn't allow WITH block\n"); - - ExpectFailWithError("PROCESS plato.Input USING EXTERNAL FUNCTION('YANDEX-CLOUD', 'jhhjfh88134d')" - " WITH INPUT_TYPE=Struct<a:Int32>, OUTPUT_TYPE=Struct<b:Int32>" - " ASSUME ORDER BY key", - "<main>:1:129: Error: PROCESS with USING EXTERNAL FUNCTION doesn't allow ASSUME block\n"); - - ExpectFailWithError("PROCESS plato.Input USING EXTERNAL FUNCTION('YANDEX-CLOUD', 'foo', 'bar', 'baz')", - "<main>:1:15: Error: EXTERNAL FUNCTION requires from 2 to 3 arguments, but got: 4\n"); - +Y_UNIT_TEST_SUITE(ExternalFunction) { + Y_UNIT_TEST(ValidUseFunctions) { + + UNIT_ASSERT(SqlToYql( + "PROCESS plato.Input" + " USING EXTERNAL FUNCTION('YANDEX-CLOUD', 'foo', <|a: 123, b: a + 641|>)" + " WITH INPUT_TYPE=Struct<a:Int32>, OUTPUT_TYPE=Struct<b:Int32>," + " CONCURRENCY=3, OPTIMIZE_FOR='CALLS'").IsOk()); + + // use CALLS without quotes, as keyword + UNIT_ASSERT(SqlToYql( + "PROCESS plato.Input" + " USING EXTERNAL FUNCTION('YANDEX-CLOUD', 'foo')" + " WITH INPUT_TYPE=Struct<a:Int32>, OUTPUT_TYPE=Struct<b:Int32>," + " OPTIMIZE_FOR=CALLS").IsOk()); + + UNIT_ASSERT(SqlToYql( + "PROCESS plato.Input" + " USING EXTERNAL FUNCTION('YANDEX-CLOUD', 'foo', TableRow())" + " WITH INPUT_TYPE=Struct<a:Int32>, OUTPUT_TYPE=Struct<b:Int32>," + " CONCURRENCY=3").IsOk()); + + UNIT_ASSERT(SqlToYql( + "PROCESS plato.Input" + " USING EXTERNAL FUNCTION('YANDEX-CLOUD', 'foo')" + " WITH INPUT_TYPE=Struct<a:Int32>, OUTPUT_TYPE=Struct<b:Int32>," + " CONCURRENCY=3, BATCH_SIZE=1000000, CONNECTION='yc-folder34fse-con'," + " INIT=[0, 900]").IsOk()); + + UNIT_ASSERT(SqlToYql( + "PROCESS plato.Input" + " USING EXTERNAL FUNCTION('YANDEX-CLOUD', 'bar', TableRow())" + " WITH UNKNOWN_PARAM_1='837747712', UNKNOWN_PARAM_2=Tuple<Uint16, Utf8>," + " INPUT_TYPE=Struct<a:Int32>, OUTPUT_TYPE=Struct<b:Int32>").IsOk()); + } + + + Y_UNIT_TEST(InValidUseFunctions) { + ExpectFailWithError("PROCESS plato.Input USING some::udf(*) WITH INPUT_TYPE=Struct<a:Int32>", + "<main>:1:33: Error: PROCESS without USING EXTERNAL FUNCTION doesn't allow WITH block\n"); + + ExpectFailWithError("PROCESS plato.Input USING EXTERNAL FUNCTION('YANDEX-CLOUD', 'jhhjfh88134d')" + " WITH INPUT_TYPE=Struct<a:Int32>, OUTPUT_TYPE=Struct<b:Int32>" + " ASSUME ORDER BY key", + "<main>:1:129: Error: PROCESS with USING EXTERNAL FUNCTION doesn't allow ASSUME block\n"); + + ExpectFailWithError("PROCESS plato.Input USING EXTERNAL FUNCTION('YANDEX-CLOUD', 'foo', 'bar', 'baz')", + "<main>:1:15: Error: EXTERNAL FUNCTION requires from 2 to 3 arguments, but got: 4\n"); + ExpectFailWithError("PROCESS plato.Input\n" " USING EXTERNAL FUNCTION('YANDEX-CLOUD', 'foo', <|field_1: a1, field_b: b1|>)\n" " WITH INPUT_TYPE=Struct<a:Int32>, OUTPUT_TYPE=Struct<b:Int32>,\n" @@ -1661,9 +1661,9 @@ Y_UNIT_TEST_SUITE(ExternalFunction) { " INIT=[0, 900]\n", "<main>:5:2: Error: WITH \"CONCURRENCY\" clause should be specified only once\n" "<main>:5:17: Error: WITH \"INPUT_TYPE\" clause should be specified only once\n"); - } -} - + } +} + Y_UNIT_TEST_SUITE(SqlToYQLErrors) { Y_UNIT_TEST(StrayUTF8) { /// 'c' in plato is russian here diff --git a/ydb/library/yql/sql/v1/ya.make b/ydb/library/yql/sql/v1/ya.make index 4e2aee58a66..432fdefbf71 100644 --- a/ydb/library/yql/sql/v1/ya.make +++ b/ydb/library/yql/sql/v1/ya.make @@ -40,7 +40,7 @@ YQL_LAST_ABI_VERSION() GENERATE_ENUM_SERIALIZATION(node.h) -GENERATE_ENUM_SERIALIZATION(sql_call_param.h) +GENERATE_ENUM_SERIALIZATION(sql_call_param.h) END() diff --git a/ydb/library/yql/udfs/common/ip_base/ip_base.cpp b/ydb/library/yql/udfs/common/ip_base/ip_base.cpp index 15a375a91fc..ddee9bb7cf6 100644 --- a/ydb/library/yql/udfs/common/ip_base/ip_base.cpp +++ b/ydb/library/yql/udfs/common/ip_base/ip_base.cpp @@ -1,7 +1,7 @@ #include <ydb/library/yql/public/udf/udf_helpers.h> - -#include "lib/ip_base_udf.h" - -SIMPLE_MODULE(TIpModule, EXPORTED_IP_BASE_UDF) + +#include "lib/ip_base_udf.h" + +SIMPLE_MODULE(TIpModule, EXPORTED_IP_BASE_UDF) REGISTER_MODULES(TIpModule) diff --git a/ydb/library/yql/udfs/common/ip_base/lib/ip_base_udf.cpp b/ydb/library/yql/udfs/common/ip_base/lib/ip_base_udf.cpp index 777acf72a13..a0617e77283 100644 --- a/ydb/library/yql/udfs/common/ip_base/lib/ip_base_udf.cpp +++ b/ydb/library/yql/udfs/common/ip_base/lib/ip_base_udf.cpp @@ -1 +1 @@ -#include "ip_base_udf.h"
\ No newline at end of file +#include "ip_base_udf.h"
\ No newline at end of file diff --git a/ydb/library/yql/udfs/common/ip_base/lib/ip_base_udf.h b/ydb/library/yql/udfs/common/ip_base/lib/ip_base_udf.h index f7afd997369..b87bd959129 100644 --- a/ydb/library/yql/udfs/common/ip_base/lib/ip_base_udf.h +++ b/ydb/library/yql/udfs/common/ip_base/lib/ip_base_udf.h @@ -1,148 +1,148 @@ -#pragma once - +#pragma once + #include <ydb/library/yql/public/udf/udf_helpers.h> - -#include <util/draft/ip.h> -#include <util/generic/buffer.h> - -namespace { - using TAutoMapString = NKikimr::NUdf::TAutoMap<char*>; - using TOptionalString = NKikimr::NUdf::TOptional<char*>; - using TOptionalByte = NKikimr::NUdf::TOptional<ui8>; - using TStringRef = NKikimr::NUdf::TStringRef; - using TUnboxedValue = NKikimr::NUdf::TUnboxedValue; - using TUnboxedValuePod = NKikimr::NUdf::TUnboxedValuePod; - - struct TSerializeIpVisitor { - TStringRef operator()(const TIp4& ip) const { - return TStringRef(reinterpret_cast<const char*>(&ip), 4); - } - TStringRef operator()(const TIp6& ip) const { - return TStringRef(reinterpret_cast<const char*>(&ip.Data), 16); - } - }; - - SIMPLE_UDF(TFromString, TOptionalString(TAutoMapString)) { - try { - TString input(args[0].AsStringRef()); - const TIp4Or6& ip = Ip4Or6FromString(input.c_str()); - return valueBuilder->NewString(std::visit(TSerializeIpVisitor(), ip)); - } catch (TSystemError&) { - return TUnboxedValue(); - } - } - - SIMPLE_UDF(TToString, char*(TAutoMapString)) { - const auto& ref = args[0].AsStringRef(); - if (ref.Size() == 4) { - TIp4 ip; - memcpy(&ip, ref.Data(), sizeof(ip)); - return valueBuilder->NewString(Ip4Or6ToString(ip)); - } else if (ref.Size() == 16) { - TIp6 ip; - memcpy(&ip.Data, ref.Data(), sizeof(ip.Data)); - return valueBuilder->NewString(Ip4Or6ToString(ip)); - } else { - ythrow yexception() << "Incorrect size of input, expected " - << "4 or 16, got " << ref.Size(); - } - } - - SIMPLE_UDF(TIsIPv4, bool(TOptionalString)) { - Y_UNUSED(valueBuilder); - bool result = false; - if (args[0]) { - const auto ref = args[0].AsStringRef(); - result = ref.Size() == 4; - } - return TUnboxedValuePod(result); - } - - SIMPLE_UDF(TIsIPv6, bool(TOptionalString)) { - Y_UNUSED(valueBuilder); - bool result = false; - if (args[0]) { - const auto ref = args[0].AsStringRef(); - result = ref.Size() == 16; - } - return TUnboxedValuePod(result); - } - - SIMPLE_UDF(TIsEmbeddedIPv4, bool(TOptionalString)) { - Y_UNUSED(valueBuilder); - bool result = false; - if (args[0]) { - const auto ref = args[0].AsStringRef(); - if (ref.Size() == 16 && ref.Data()[10] == -1) { - bool allZeroes = true; - for (int i = 0; i < 10; ++i) { - if (ref.Data()[i] != 0) { - allZeroes = false; - break; - } - } - result = allZeroes; - } - } - return TUnboxedValuePod(result); - } - - SIMPLE_UDF(TConvertToIPv6, char*(TAutoMapString)) { - const auto& ref = args[0].AsStringRef(); - if (ref.Size() == 16) { - return valueBuilder->NewString(ref); - } else if (ref.Size() == 4) { - TIp4 ipv4; - memcpy(&ipv4, ref.Data(), sizeof(ipv4)); - const TIp6 ipv6 = Ip6FromIp4(ipv4); - return valueBuilder->NewString(TStringRef(reinterpret_cast<const char*>(&ipv6.Data), 16)); - } else { - ythrow yexception() << "Incorrect size of input, expected " - << "4 or 16, got " << ref.Size(); - } - } - - SIMPLE_UDF_OPTIONS(TGetSubnet, char*(TAutoMapString, TOptionalByte), - builder.OptionalArgs(1)) { - const auto ref = args[0].AsStringRef(); - ui8 subnetSize = args[1].GetOrDefault<ui8>(0); - - if (ref.Size() == 4) { - if (!subnetSize) { - subnetSize = 24; - } - } else if (ref.Size() == 16) { - if (!subnetSize) { - subnetSize = 64; - } - } else { - ythrow yexception() << "Incorrect size of input, expected " - << "4 or 16, got " << ref.Size(); - } - TBuffer result(ref.Data(), ref.Size()); - int bytesToMask = ref.Size() * 8 - subnetSize; - ui8 currentByte = ref.Size() - 1; - while (bytesToMask > 0) { - if (bytesToMask > 8) { - result.Data()[currentByte] = 0; - } else { - result.Data()[currentByte] = result.Data()[currentByte] & (0xff << bytesToMask); - } - bytesToMask -= 8; - --currentByte; - } - - return valueBuilder->NewString(TStringRef(result.Data(), result.Size())); - } - -#define EXPORTED_IP_BASE_UDF \ - TFromString, \ - TToString, \ - TIsIPv4, \ - TIsIPv6, \ - TIsEmbeddedIPv4, \ - TConvertToIPv6, \ - TGetSubnet -} - - + +#include <util/draft/ip.h> +#include <util/generic/buffer.h> + +namespace { + using TAutoMapString = NKikimr::NUdf::TAutoMap<char*>; + using TOptionalString = NKikimr::NUdf::TOptional<char*>; + using TOptionalByte = NKikimr::NUdf::TOptional<ui8>; + using TStringRef = NKikimr::NUdf::TStringRef; + using TUnboxedValue = NKikimr::NUdf::TUnboxedValue; + using TUnboxedValuePod = NKikimr::NUdf::TUnboxedValuePod; + + struct TSerializeIpVisitor { + TStringRef operator()(const TIp4& ip) const { + return TStringRef(reinterpret_cast<const char*>(&ip), 4); + } + TStringRef operator()(const TIp6& ip) const { + return TStringRef(reinterpret_cast<const char*>(&ip.Data), 16); + } + }; + + SIMPLE_UDF(TFromString, TOptionalString(TAutoMapString)) { + try { + TString input(args[0].AsStringRef()); + const TIp4Or6& ip = Ip4Or6FromString(input.c_str()); + return valueBuilder->NewString(std::visit(TSerializeIpVisitor(), ip)); + } catch (TSystemError&) { + return TUnboxedValue(); + } + } + + SIMPLE_UDF(TToString, char*(TAutoMapString)) { + const auto& ref = args[0].AsStringRef(); + if (ref.Size() == 4) { + TIp4 ip; + memcpy(&ip, ref.Data(), sizeof(ip)); + return valueBuilder->NewString(Ip4Or6ToString(ip)); + } else if (ref.Size() == 16) { + TIp6 ip; + memcpy(&ip.Data, ref.Data(), sizeof(ip.Data)); + return valueBuilder->NewString(Ip4Or6ToString(ip)); + } else { + ythrow yexception() << "Incorrect size of input, expected " + << "4 or 16, got " << ref.Size(); + } + } + + SIMPLE_UDF(TIsIPv4, bool(TOptionalString)) { + Y_UNUSED(valueBuilder); + bool result = false; + if (args[0]) { + const auto ref = args[0].AsStringRef(); + result = ref.Size() == 4; + } + return TUnboxedValuePod(result); + } + + SIMPLE_UDF(TIsIPv6, bool(TOptionalString)) { + Y_UNUSED(valueBuilder); + bool result = false; + if (args[0]) { + const auto ref = args[0].AsStringRef(); + result = ref.Size() == 16; + } + return TUnboxedValuePod(result); + } + + SIMPLE_UDF(TIsEmbeddedIPv4, bool(TOptionalString)) { + Y_UNUSED(valueBuilder); + bool result = false; + if (args[0]) { + const auto ref = args[0].AsStringRef(); + if (ref.Size() == 16 && ref.Data()[10] == -1) { + bool allZeroes = true; + for (int i = 0; i < 10; ++i) { + if (ref.Data()[i] != 0) { + allZeroes = false; + break; + } + } + result = allZeroes; + } + } + return TUnboxedValuePod(result); + } + + SIMPLE_UDF(TConvertToIPv6, char*(TAutoMapString)) { + const auto& ref = args[0].AsStringRef(); + if (ref.Size() == 16) { + return valueBuilder->NewString(ref); + } else if (ref.Size() == 4) { + TIp4 ipv4; + memcpy(&ipv4, ref.Data(), sizeof(ipv4)); + const TIp6 ipv6 = Ip6FromIp4(ipv4); + return valueBuilder->NewString(TStringRef(reinterpret_cast<const char*>(&ipv6.Data), 16)); + } else { + ythrow yexception() << "Incorrect size of input, expected " + << "4 or 16, got " << ref.Size(); + } + } + + SIMPLE_UDF_OPTIONS(TGetSubnet, char*(TAutoMapString, TOptionalByte), + builder.OptionalArgs(1)) { + const auto ref = args[0].AsStringRef(); + ui8 subnetSize = args[1].GetOrDefault<ui8>(0); + + if (ref.Size() == 4) { + if (!subnetSize) { + subnetSize = 24; + } + } else if (ref.Size() == 16) { + if (!subnetSize) { + subnetSize = 64; + } + } else { + ythrow yexception() << "Incorrect size of input, expected " + << "4 or 16, got " << ref.Size(); + } + TBuffer result(ref.Data(), ref.Size()); + int bytesToMask = ref.Size() * 8 - subnetSize; + ui8 currentByte = ref.Size() - 1; + while (bytesToMask > 0) { + if (bytesToMask > 8) { + result.Data()[currentByte] = 0; + } else { + result.Data()[currentByte] = result.Data()[currentByte] & (0xff << bytesToMask); + } + bytesToMask -= 8; + --currentByte; + } + + return valueBuilder->NewString(TStringRef(result.Data(), result.Size())); + } + +#define EXPORTED_IP_BASE_UDF \ + TFromString, \ + TToString, \ + TIsIPv4, \ + TIsIPv6, \ + TIsEmbeddedIPv4, \ + TConvertToIPv6, \ + TGetSubnet +} + + diff --git a/ydb/library/yql/udfs/common/ip_base/lib/ya.make b/ydb/library/yql/udfs/common/ip_base/lib/ya.make index dd1dda6ec66..88baf0a0cf2 100644 --- a/ydb/library/yql/udfs/common/ip_base/lib/ya.make +++ b/ydb/library/yql/udfs/common/ip_base/lib/ya.make @@ -1,22 +1,22 @@ -LIBRARY() - -YQL_ABI_VERSION( - 2 - 9 - 0 -) - +LIBRARY() + +YQL_ABI_VERSION( + 2 + 9 + 0 +) + OWNER( g:yql g:yql_ydb_core ) - -SRCS( - ip_base_udf.cpp -) - -PEERDIR( + +SRCS( + ip_base_udf.cpp +) + +PEERDIR( ydb/library/yql/public/udf -) - -END() +) + +END() diff --git a/ydb/library/yql/udfs/common/ip_base/ya.make b/ydb/library/yql/udfs/common/ip_base/ya.make index 30bb9ad05b7..40442f6a7f9 100644 --- a/ydb/library/yql/udfs/common/ip_base/ya.make +++ b/ydb/library/yql/udfs/common/ip_base/ya.make @@ -1,19 +1,19 @@ -YQL_UDF(ip_udf) - -YQL_ABI_VERSION( - 2 - 9 - 0 -) - -OWNER(g:yql g:yql_ydb_core) - -SRCS( - ip_base.cpp -) - -PEERDIR( +YQL_UDF(ip_udf) + +YQL_ABI_VERSION( + 2 + 9 + 0 +) + +OWNER(g:yql g:yql_ydb_core) + +SRCS( + ip_base.cpp +) + +PEERDIR( ydb/library/yql/udfs/common/ip_base/lib -) - -END() +) + +END() diff --git a/ydb/library/yql/udfs/common/unicode_base/lib/unicode_base_udf.cpp b/ydb/library/yql/udfs/common/unicode_base/lib/unicode_base_udf.cpp index 13026313333..3e90765e405 100644 --- a/ydb/library/yql/udfs/common/unicode_base/lib/unicode_base_udf.cpp +++ b/ydb/library/yql/udfs/common/unicode_base/lib/unicode_base_udf.cpp @@ -1 +1 @@ -#include "unicode_base_udf.h"
\ No newline at end of file +#include "unicode_base_udf.h"
\ No newline at end of file diff --git a/ydb/library/yql/udfs/common/unicode_base/lib/unicode_base_udf.h b/ydb/library/yql/udfs/common/unicode_base/lib/unicode_base_udf.h index bf9b2b6e2dc..1f8b0d3a95b 100644 --- a/ydb/library/yql/udfs/common/unicode_base/lib/unicode_base_udf.h +++ b/ydb/library/yql/udfs/common/unicode_base/lib/unicode_base_udf.h @@ -1,438 +1,438 @@ -#pragma once - +#pragma once + #include <ydb/library/yql/public/udf/udf_allocator.h> #include <ydb/library/yql/public/udf/udf_helpers.h> #include <ydb/library/yql/utils/utf8.h> - -#include <library/cpp/string_utils/levenshtein_diff/levenshtein_diff.h> -#include <library/cpp/unicode/normalization/normalization.h> - -#include <library/cpp/deprecated/split/split_iterator.h> -#include <util/string/join.h> -#include <util/string/reverse.h> -#include <util/string/split.h> -#include <util/string/subst.h> -#include <util/charset/wide.h> - -using namespace NYql; -using namespace NUdf; -using namespace NUnicode; - -namespace { -#define NORMALIZE_UDF_MAP(XX) \ - XX(Normalize, NFC) \ - XX(NormalizeNFD, NFD) \ - XX(NormalizeNFC, NFC) \ - XX(NormalizeNFKD, NFKD) \ - XX(NormalizeNFKC, NFKC) - -#define NORMALIZE_UDF(name, mode) \ - SIMPLE_UDF(T##name, TUtf8(TAutoMap<TUtf8>)) { \ - const auto& inputRef = args[0].AsStringRef(); \ - const TUtf16String& input = UTF8ToWide(inputRef.Data(), inputRef.Size()); \ - const TString& output = WideToUTF8(Normalize<mode>(input)); \ - return valueBuilder->NewString(output); \ - } - - NORMALIZE_UDF_MAP(NORMALIZE_UDF) - - SIMPLE_UDF(TIsUtf, bool(TOptional<char*>)) { - Y_UNUSED(valueBuilder); - if (args[0]) { - return TUnboxedValuePod(IsUtf8(args[0].AsStringRef())); - } else { - return TUnboxedValuePod(false); - } - } - - SIMPLE_UDF(TGetLength, ui64(TAutoMap<TUtf8>)) { - Y_UNUSED(valueBuilder); - const auto& inputRef = args[0].AsStringRef(); - size_t result; - GetNumberOfUTF8Chars(inputRef.Data(), inputRef.Size(), result); - return TUnboxedValuePod(static_cast<ui64>(result)); - } - - SIMPLE_UDF_OPTIONS(TSubstring, TUtf8(TAutoMap<TUtf8>, TOptional<ui64>, TOptional<ui64>), - builder.OptionalArgs(1)) { - const TStringBuf input(args[0].AsStringRef()); - size_t from = args[1].GetOrDefault<ui64>(0); - size_t len = !args[2] ? TStringBuf::npos : size_t(args[2].Get<ui64>()); - return valueBuilder->NewString(SubstrUTF8(input, from, len)); - } - - SIMPLE_UDF_OPTIONS(TFind, TOptional<ui64>(TAutoMap<TUtf8>, TUtf8, TOptional<ui64>), builder.OptionalArgs(1)) { - Y_UNUSED(valueBuilder); - const std::string_view string(args[0].AsStringRef()); - const std::string_view needle(args[1].AsStringRef()); - std::string_view::size_type pos = 0U; - - if (auto p = args[2].GetOrDefault<ui64>(0ULL)) { - for (auto ptr = string.data(); p && pos < string.size(); --p) { - const auto width = WideCharSize(*ptr); - pos += width; - ptr += width; - } - } - - if (const auto find = string.find(needle, pos); std::string_view::npos != find) { - size_t result; - GetNumberOfUTF8Chars(string.data(), find, result); - return TUnboxedValuePod(static_cast<ui64>(result)); - } - return TUnboxedValuePod(); - } - - SIMPLE_UDF_OPTIONS(TRFind, TOptional<ui64>(TAutoMap<TUtf8>, TUtf8, TOptional<ui64>), builder.OptionalArgs(1)) { - Y_UNUSED(valueBuilder); - const std::string_view string(args[0].AsStringRef()); - const std::string_view needle(args[1].AsStringRef()); - std::string_view::size_type pos = std::string_view::npos; - - if (auto p = args[2].GetOrDefault<ui64>(std::string_view::npos); std::string_view::npos != p) { - pos = 0ULL; - for (auto ptr = string.data(); p && pos < string.size(); --p) { - const auto width = WideCharSize(*ptr); - pos += width; - ptr += width; - } - } - - if (const auto find = string.rfind(needle, pos); std::string_view::npos != find) { - size_t result; - GetNumberOfUTF8Chars(string.data(), find, result); - return TUnboxedValuePod(static_cast<ui64>(result)); - } - return TUnboxedValuePod(); - } - - using TTmpVector = TSmallVec<TUnboxedValue, TUnboxedValue::TAllocator>; - - template <typename TIt> - static void SplitToListImpl( - const IValueBuilder* valueBuilder, - const TUnboxedValue& input, - const std::string_view::const_iterator from, - const TIt& it, - TTmpVector& result) { - for (const auto& elem : it) { - result.emplace_back(valueBuilder->SubString(input, std::distance(from, elem.TokenStart()), std::distance(elem.TokenStart(), elem.TokenDelim()))); - } - } - - template <typename TIt> - static void SplitToListImpl( - const IValueBuilder* valueBuilder, - const TUnboxedValue& input, - const TUtf16String::const_iterator start, - const TIt& it, - TTmpVector& result) { - const std::string_view& original = input.AsStringRef(); - size_t charPos = 0U, bytePos = 0U; - for (const auto& elem : it) { - for (const size_t next = std::distance(start, elem.TokenStart()); charPos < next; ++charPos) - bytePos += WideCharSize(original[bytePos]); - const auto from = bytePos; - - for (const size_t next = charPos + std::distance(elem.TokenStart(), elem.TokenDelim()); charPos < next; ++charPos) - bytePos += WideCharSize(original[bytePos]); - const auto size = bytePos - from; - result.emplace_back(valueBuilder->SubString(input, from, size)); - } - } - - template <typename TIt, typename TStrIt> - static void SplitToListImpl( - const IValueBuilder* valueBuilder, - const TUnboxedValue& input, - const TStrIt from, - TIt& it, - bool skipEmpty, - TTmpVector& result) { - if (skipEmpty) { - SplitToListImpl(valueBuilder, input, from, it.SkipEmpty(), result); - } else { - SplitToListImpl(valueBuilder, input, from, it, result); - } - } - - constexpr char delimeterStringName[] = "DelimeterString"; - constexpr char skipEmptyName[] = "SkipEmpty"; - constexpr char limitName[] = "Limit"; - using TDelimeterStringArg = TNamedArg<bool, delimeterStringName>; - using TSkipEmptyArg = TNamedArg<bool, skipEmptyName>; - using TLimitArg = TNamedArg<ui64, limitName>; - - SIMPLE_UDF_OPTIONS(TSplitToList, TListType<TUtf8>( - TOptional<TUtf8>, - TUtf8, - TDelimeterStringArg, - TSkipEmptyArg, - TLimitArg - ), - builder.OptionalArgs(3)) { - TTmpVector result; - if (args[0]) { - const bool delimiterString = args[2].GetOrDefault<bool>(true); - const bool skipEmpty = args[3].GetOrDefault<bool>(false); - const auto limit = args[4].GetOrDefault<ui64>(0); - if (delimiterString) { - const std::string_view input(args[0].AsStringRef()); - const std::string_view delimeter(args[1].AsStringRef()); - if (limit) { - auto it = StringSplitter(input).SplitByString(delimeter).Limit(limit + 1); - SplitToListImpl(valueBuilder, args[0], input.cbegin(), it, skipEmpty, result); - } else { - auto it = StringSplitter(input).SplitByString(delimeter); - SplitToListImpl(valueBuilder, args[0], input.cbegin(), it, skipEmpty, result); - } - } else { - const auto& input = UTF8ToWide(args[0].AsStringRef()); - const auto& delimeter = UTF8ToWide(args[1].AsStringRef()); - if (limit) { - auto it = StringSplitter(input).SplitBySet(delimeter.c_str()).Limit(limit + 1); - SplitToListImpl(valueBuilder, args[0], input.cbegin(), it, skipEmpty, result); - } else { - auto it = StringSplitter(input).SplitBySet(delimeter.c_str()); - SplitToListImpl(valueBuilder, args[0], input.cbegin(), it, skipEmpty, result); - } - } - } - return valueBuilder->NewList(result.data(), result.size()); - } - - SIMPLE_UDF(TJoinFromList, TUtf8(TAutoMap<TListType<TOptional<TUtf8>>>, TUtf8)) { - const auto input = args[0].GetListIterator(); - const std::string_view delimeter(args[1].AsStringRef()); - std::vector<TString> items; - - for (TUnboxedValue current; input.Next(current);) { - if (current) { - items.emplace_back(current.AsStringRef()); - } - } - - return valueBuilder->NewString(JoinSeq(delimeter, items)); - } - - SIMPLE_UDF(TLevensteinDistance, ui64(TAutoMap<TUtf8>, TAutoMap<TUtf8>)) { - Y_UNUSED(valueBuilder); - const TStringBuf left(args[0].AsStringRef()); - const TStringBuf right(args[1].AsStringRef()); - const TUtf16String& leftWide = UTF8ToWide(left); - const TUtf16String& rightWide = UTF8ToWide(right); - const ui64 result = NLevenshtein::Distance(leftWide, rightWide); - return TUnboxedValuePod(result); - } - - SIMPLE_UDF(TReplaceAll, TUtf8(TAutoMap<TUtf8>, TUtf8, TUtf8)) { - const TStringBuf input(args[0].AsStringRef()); - const TStringBuf whatBuf(args[1].AsStringRef()); - const TStringBuf withBuf(args[2].AsStringRef()); - TUtf16String result = UTF8ToWide(input); - const TUtf16String& what = UTF8ToWide(whatBuf); - const TUtf16String& with = UTF8ToWide(withBuf); - if (TUtf16String result = UTF8ToWide(input); SubstGlobal(result, what, with)) - return valueBuilder->NewString(WideToUTF8(result)); - else - return args[0]; - } - - SIMPLE_UDF(TReplaceFirst, TUtf8(TAutoMap<TUtf8>, TUtf8, TUtf8)) { - const TStringBuf input(args[0].AsStringRef()); - const TStringBuf whatBuf(args[1].AsStringRef()); - const TStringBuf withBuf(args[2].AsStringRef()); - TUtf16String result = UTF8ToWide(input); - const TUtf16String& what = UTF8ToWide(whatBuf); - const TUtf16String& with = UTF8ToWide(withBuf); - if (what.size() != 1) { - ythrow yexception() << "Only one char is supported as second argument"; - } - if (with.size() != 1) { - ythrow yexception() << "Only one char is supported as third argument"; - } - if (const auto index = result.find(what[0]); index != TUtf16String::npos) { - result.replace(index, 1, with.data()); - return valueBuilder->NewString(WideToUTF8(result)); - } - return args[0]; - } - - SIMPLE_UDF(TReplaceLast, TUtf8(TAutoMap<TUtf8>, TUtf8, TUtf8)) { - const TStringBuf input(args[0].AsStringRef()); - const TStringBuf whatBuf(args[1].AsStringRef()); - const TStringBuf withBuf(args[2].AsStringRef()); - TUtf16String result = UTF8ToWide(input); - const TUtf16String& what = UTF8ToWide(whatBuf); - const TUtf16String& with = UTF8ToWide(withBuf); - if (what.size() != 1) { - ythrow yexception() << "Only one char is supported as second argument"; - } - if (with.size() != 1) { - ythrow yexception() << "Only one char is supported as third argument"; - } - if (const auto index = result.rfind(what[0]); index != TUtf16String::npos) { - result.replace(index, 1, with.data()); - return valueBuilder->NewString(WideToUTF8(result)); - } - return args[0]; - } - - SIMPLE_UDF(TRemoveAll, TUtf8(TAutoMap<TUtf8>, TUtf8)) { - const TStringBuf input(args[0].AsStringRef()); - const TStringBuf removeBuf(args[1].AsStringRef()); - TUtf16String result = UTF8ToWide(input); - const TUtf16String& remove = UTF8ToWide(removeBuf); - for (const wchar16 c : remove) { - RemoveAll(result, c); - } - return valueBuilder->NewString(WideToUTF8(result)); - } - - SIMPLE_UDF(TRemoveFirst, TUtf8(TAutoMap<TUtf8>, TUtf8)) { - const TStringBuf input(args[0].AsStringRef()); - const TStringBuf removeBuf(args[1].AsStringRef()); - TUtf16String result = UTF8ToWide(input); - const TUtf16String& remove = UTF8ToWide(removeBuf); - if (remove.size() != 1) { - ythrow yexception() << "Only one char is supported as second argument"; - } - if (const auto index = result.find(remove[0]); index != TUtf16String::npos) { - result.remove(index, 1); - return valueBuilder->NewString(WideToUTF8(result)); - } - return args[0]; - } - - SIMPLE_UDF(TRemoveLast, TUtf8(TAutoMap<TUtf8>, TUtf8)) { - const TStringBuf input(args[0].AsStringRef()); - const TStringBuf removeBuf(args[1].AsStringRef()); - TUtf16String result = UTF8ToWide(input); - const TUtf16String& remove = UTF8ToWide(removeBuf); - if (remove.size() != 1) { - ythrow yexception() << "Only one char is supported as second argument"; - } - if (const auto index = result.rfind(remove[0]); index != TUtf16String::npos) { - result.remove(index, 1); - return valueBuilder->NewString(WideToUTF8(result)); - } - return args[0]; - } - - SIMPLE_UDF(TToCodePointList, TListType<ui32>(TAutoMap<TUtf8>)) { - size_t codePointCount = 0; - const auto& inputRef = args[0].AsStringRef(); - if (!GetNumberOfUTF8Chars(inputRef.Data(), inputRef.Size(), codePointCount)) { - // should not happen but still we have to check return code - ythrow yexception() << "Unable to count code points"; - } - - TUnboxedValue* itemsPtr = nullptr; - auto result = valueBuilder->NewArray(codePointCount, itemsPtr); - const unsigned char* current = reinterpret_cast<const unsigned char*>(inputRef.Data()); - const unsigned char* end = current + inputRef.Size(); - wchar32 rune = BROKEN_RUNE; - ui32 codePointIndex = 0; - RECODE_RESULT retcode = RECODE_OK; - while (current < end && RECODE_OK == (retcode = ReadUTF8CharAndAdvance(rune, current, end))) { - if (codePointIndex >= codePointCount) { - // sanity check - ythrow yexception() << "Too big code point index " << codePointIndex << ", expecting only " << codePointCount << " code points"; - } - itemsPtr[codePointIndex++] = TUnboxedValuePod(static_cast<ui32>(rune)); - } - - if (retcode != RECODE_OK) { - ythrow yexception() << "Malformed UTF-8 string"; - } - - return result; - } - - SIMPLE_UDF(TFromCodePointList, TUtf8(TAutoMap<TListType<ui32>>)) { - auto input = args[0]; - if (auto elems = input.GetElements()) { - const auto elemCount = input.GetListLength(); - auto bufferSize = WideToUTF8BufferSize(elemCount); - TTempBuf buffer(bufferSize); - auto bufferPtr = buffer.Data(); - auto bufferEnd = buffer.Data() + bufferSize; - for (ui64 i = 0; i != elemCount; ++i) { - const auto& item = elems[i]; - const wchar32 rune = item.Get<ui32>(); - size_t written = 0; - WideToUTF8(&rune, 1, bufferPtr, written); - Y_ENSURE(written <= 4); - bufferPtr += written; - Y_ENSURE(bufferPtr <= bufferEnd); - } - return valueBuilder->NewString(TStringRef(buffer.Data(), bufferPtr - buffer.Data())); - } - - std::vector<char, NUdf::TStdAllocatorForUdf<char>> buffer; - buffer.reserve(TUnboxedValuePod::InternalBufferSize); - - const auto& iter = input.GetListIterator(); - char runeBuffer[4] = {}; - for (NUdf::TUnboxedValue item; iter.Next(item); ) { - const wchar32 rune = item.Get<ui32>(); - size_t written = 0; - WideToUTF8(&rune, 1, runeBuffer, written); - Y_ENSURE(written <= 4); - buffer.insert(buffer.end(), runeBuffer, runeBuffer + written); - } - - return valueBuilder->NewString(TStringRef(buffer.data(), buffer.size())); - } - - SIMPLE_UDF(TReverse, TUtf8(TAutoMap<TUtf8>)) { - auto wide = UTF8ToWide(args[0].AsStringRef()); - ReverseInPlace(wide); - return valueBuilder->NewString(WideToUTF8(wide)); - } - - SIMPLE_UDF(TToLower, TUtf8(TAutoMap<TUtf8>)) { - if (auto wide = UTF8ToWide(args->AsStringRef()); ToLower(wide)) - return valueBuilder->NewString(WideToUTF8(wide)); - else - return *args; - } - - SIMPLE_UDF(TToUpper, TUtf8(TAutoMap<TUtf8>)) { - if (auto wide = UTF8ToWide(args->AsStringRef()); ToUpper(wide)) - return valueBuilder->NewString(WideToUTF8(wide)); - else - return *args; - } - - SIMPLE_UDF(TToTitle, TUtf8(TAutoMap<TUtf8>)) { - if (auto wide = UTF8ToWide(args->AsStringRef()); ToTitle(wide)) - return valueBuilder->NewString(WideToUTF8(wide)); - else - return *args; - } - -#define REGISTER_NORMALIZE_UDF(name, mode) T##name, -#define EXPORTED_UNICODE_BASE_UDF \ - NORMALIZE_UDF_MAP(REGISTER_NORMALIZE_UDF) \ - TIsUtf, \ - TGetLength, \ - TSubstring, \ - TFind, \ - TRFind, \ - TSplitToList, \ - TJoinFromList, \ - TLevensteinDistance, \ - TReplaceAll, \ - TReplaceFirst, \ - TReplaceLast, \ - TRemoveAll, \ - TRemoveFirst, \ - TRemoveLast, \ - TToCodePointList, \ - TFromCodePointList, \ - TReverse, \ - TToLower, \ - TToUpper, \ - TToTitle -} + +#include <library/cpp/string_utils/levenshtein_diff/levenshtein_diff.h> +#include <library/cpp/unicode/normalization/normalization.h> + +#include <library/cpp/deprecated/split/split_iterator.h> +#include <util/string/join.h> +#include <util/string/reverse.h> +#include <util/string/split.h> +#include <util/string/subst.h> +#include <util/charset/wide.h> + +using namespace NYql; +using namespace NUdf; +using namespace NUnicode; + +namespace { +#define NORMALIZE_UDF_MAP(XX) \ + XX(Normalize, NFC) \ + XX(NormalizeNFD, NFD) \ + XX(NormalizeNFC, NFC) \ + XX(NormalizeNFKD, NFKD) \ + XX(NormalizeNFKC, NFKC) + +#define NORMALIZE_UDF(name, mode) \ + SIMPLE_UDF(T##name, TUtf8(TAutoMap<TUtf8>)) { \ + const auto& inputRef = args[0].AsStringRef(); \ + const TUtf16String& input = UTF8ToWide(inputRef.Data(), inputRef.Size()); \ + const TString& output = WideToUTF8(Normalize<mode>(input)); \ + return valueBuilder->NewString(output); \ + } + + NORMALIZE_UDF_MAP(NORMALIZE_UDF) + + SIMPLE_UDF(TIsUtf, bool(TOptional<char*>)) { + Y_UNUSED(valueBuilder); + if (args[0]) { + return TUnboxedValuePod(IsUtf8(args[0].AsStringRef())); + } else { + return TUnboxedValuePod(false); + } + } + + SIMPLE_UDF(TGetLength, ui64(TAutoMap<TUtf8>)) { + Y_UNUSED(valueBuilder); + const auto& inputRef = args[0].AsStringRef(); + size_t result; + GetNumberOfUTF8Chars(inputRef.Data(), inputRef.Size(), result); + return TUnboxedValuePod(static_cast<ui64>(result)); + } + + SIMPLE_UDF_OPTIONS(TSubstring, TUtf8(TAutoMap<TUtf8>, TOptional<ui64>, TOptional<ui64>), + builder.OptionalArgs(1)) { + const TStringBuf input(args[0].AsStringRef()); + size_t from = args[1].GetOrDefault<ui64>(0); + size_t len = !args[2] ? TStringBuf::npos : size_t(args[2].Get<ui64>()); + return valueBuilder->NewString(SubstrUTF8(input, from, len)); + } + + SIMPLE_UDF_OPTIONS(TFind, TOptional<ui64>(TAutoMap<TUtf8>, TUtf8, TOptional<ui64>), builder.OptionalArgs(1)) { + Y_UNUSED(valueBuilder); + const std::string_view string(args[0].AsStringRef()); + const std::string_view needle(args[1].AsStringRef()); + std::string_view::size_type pos = 0U; + + if (auto p = args[2].GetOrDefault<ui64>(0ULL)) { + for (auto ptr = string.data(); p && pos < string.size(); --p) { + const auto width = WideCharSize(*ptr); + pos += width; + ptr += width; + } + } + + if (const auto find = string.find(needle, pos); std::string_view::npos != find) { + size_t result; + GetNumberOfUTF8Chars(string.data(), find, result); + return TUnboxedValuePod(static_cast<ui64>(result)); + } + return TUnboxedValuePod(); + } + + SIMPLE_UDF_OPTIONS(TRFind, TOptional<ui64>(TAutoMap<TUtf8>, TUtf8, TOptional<ui64>), builder.OptionalArgs(1)) { + Y_UNUSED(valueBuilder); + const std::string_view string(args[0].AsStringRef()); + const std::string_view needle(args[1].AsStringRef()); + std::string_view::size_type pos = std::string_view::npos; + + if (auto p = args[2].GetOrDefault<ui64>(std::string_view::npos); std::string_view::npos != p) { + pos = 0ULL; + for (auto ptr = string.data(); p && pos < string.size(); --p) { + const auto width = WideCharSize(*ptr); + pos += width; + ptr += width; + } + } + + if (const auto find = string.rfind(needle, pos); std::string_view::npos != find) { + size_t result; + GetNumberOfUTF8Chars(string.data(), find, result); + return TUnboxedValuePod(static_cast<ui64>(result)); + } + return TUnboxedValuePod(); + } + + using TTmpVector = TSmallVec<TUnboxedValue, TUnboxedValue::TAllocator>; + + template <typename TIt> + static void SplitToListImpl( + const IValueBuilder* valueBuilder, + const TUnboxedValue& input, + const std::string_view::const_iterator from, + const TIt& it, + TTmpVector& result) { + for (const auto& elem : it) { + result.emplace_back(valueBuilder->SubString(input, std::distance(from, elem.TokenStart()), std::distance(elem.TokenStart(), elem.TokenDelim()))); + } + } + + template <typename TIt> + static void SplitToListImpl( + const IValueBuilder* valueBuilder, + const TUnboxedValue& input, + const TUtf16String::const_iterator start, + const TIt& it, + TTmpVector& result) { + const std::string_view& original = input.AsStringRef(); + size_t charPos = 0U, bytePos = 0U; + for (const auto& elem : it) { + for (const size_t next = std::distance(start, elem.TokenStart()); charPos < next; ++charPos) + bytePos += WideCharSize(original[bytePos]); + const auto from = bytePos; + + for (const size_t next = charPos + std::distance(elem.TokenStart(), elem.TokenDelim()); charPos < next; ++charPos) + bytePos += WideCharSize(original[bytePos]); + const auto size = bytePos - from; + result.emplace_back(valueBuilder->SubString(input, from, size)); + } + } + + template <typename TIt, typename TStrIt> + static void SplitToListImpl( + const IValueBuilder* valueBuilder, + const TUnboxedValue& input, + const TStrIt from, + TIt& it, + bool skipEmpty, + TTmpVector& result) { + if (skipEmpty) { + SplitToListImpl(valueBuilder, input, from, it.SkipEmpty(), result); + } else { + SplitToListImpl(valueBuilder, input, from, it, result); + } + } + + constexpr char delimeterStringName[] = "DelimeterString"; + constexpr char skipEmptyName[] = "SkipEmpty"; + constexpr char limitName[] = "Limit"; + using TDelimeterStringArg = TNamedArg<bool, delimeterStringName>; + using TSkipEmptyArg = TNamedArg<bool, skipEmptyName>; + using TLimitArg = TNamedArg<ui64, limitName>; + + SIMPLE_UDF_OPTIONS(TSplitToList, TListType<TUtf8>( + TOptional<TUtf8>, + TUtf8, + TDelimeterStringArg, + TSkipEmptyArg, + TLimitArg + ), + builder.OptionalArgs(3)) { + TTmpVector result; + if (args[0]) { + const bool delimiterString = args[2].GetOrDefault<bool>(true); + const bool skipEmpty = args[3].GetOrDefault<bool>(false); + const auto limit = args[4].GetOrDefault<ui64>(0); + if (delimiterString) { + const std::string_view input(args[0].AsStringRef()); + const std::string_view delimeter(args[1].AsStringRef()); + if (limit) { + auto it = StringSplitter(input).SplitByString(delimeter).Limit(limit + 1); + SplitToListImpl(valueBuilder, args[0], input.cbegin(), it, skipEmpty, result); + } else { + auto it = StringSplitter(input).SplitByString(delimeter); + SplitToListImpl(valueBuilder, args[0], input.cbegin(), it, skipEmpty, result); + } + } else { + const auto& input = UTF8ToWide(args[0].AsStringRef()); + const auto& delimeter = UTF8ToWide(args[1].AsStringRef()); + if (limit) { + auto it = StringSplitter(input).SplitBySet(delimeter.c_str()).Limit(limit + 1); + SplitToListImpl(valueBuilder, args[0], input.cbegin(), it, skipEmpty, result); + } else { + auto it = StringSplitter(input).SplitBySet(delimeter.c_str()); + SplitToListImpl(valueBuilder, args[0], input.cbegin(), it, skipEmpty, result); + } + } + } + return valueBuilder->NewList(result.data(), result.size()); + } + + SIMPLE_UDF(TJoinFromList, TUtf8(TAutoMap<TListType<TOptional<TUtf8>>>, TUtf8)) { + const auto input = args[0].GetListIterator(); + const std::string_view delimeter(args[1].AsStringRef()); + std::vector<TString> items; + + for (TUnboxedValue current; input.Next(current);) { + if (current) { + items.emplace_back(current.AsStringRef()); + } + } + + return valueBuilder->NewString(JoinSeq(delimeter, items)); + } + + SIMPLE_UDF(TLevensteinDistance, ui64(TAutoMap<TUtf8>, TAutoMap<TUtf8>)) { + Y_UNUSED(valueBuilder); + const TStringBuf left(args[0].AsStringRef()); + const TStringBuf right(args[1].AsStringRef()); + const TUtf16String& leftWide = UTF8ToWide(left); + const TUtf16String& rightWide = UTF8ToWide(right); + const ui64 result = NLevenshtein::Distance(leftWide, rightWide); + return TUnboxedValuePod(result); + } + + SIMPLE_UDF(TReplaceAll, TUtf8(TAutoMap<TUtf8>, TUtf8, TUtf8)) { + const TStringBuf input(args[0].AsStringRef()); + const TStringBuf whatBuf(args[1].AsStringRef()); + const TStringBuf withBuf(args[2].AsStringRef()); + TUtf16String result = UTF8ToWide(input); + const TUtf16String& what = UTF8ToWide(whatBuf); + const TUtf16String& with = UTF8ToWide(withBuf); + if (TUtf16String result = UTF8ToWide(input); SubstGlobal(result, what, with)) + return valueBuilder->NewString(WideToUTF8(result)); + else + return args[0]; + } + + SIMPLE_UDF(TReplaceFirst, TUtf8(TAutoMap<TUtf8>, TUtf8, TUtf8)) { + const TStringBuf input(args[0].AsStringRef()); + const TStringBuf whatBuf(args[1].AsStringRef()); + const TStringBuf withBuf(args[2].AsStringRef()); + TUtf16String result = UTF8ToWide(input); + const TUtf16String& what = UTF8ToWide(whatBuf); + const TUtf16String& with = UTF8ToWide(withBuf); + if (what.size() != 1) { + ythrow yexception() << "Only one char is supported as second argument"; + } + if (with.size() != 1) { + ythrow yexception() << "Only one char is supported as third argument"; + } + if (const auto index = result.find(what[0]); index != TUtf16String::npos) { + result.replace(index, 1, with.data()); + return valueBuilder->NewString(WideToUTF8(result)); + } + return args[0]; + } + + SIMPLE_UDF(TReplaceLast, TUtf8(TAutoMap<TUtf8>, TUtf8, TUtf8)) { + const TStringBuf input(args[0].AsStringRef()); + const TStringBuf whatBuf(args[1].AsStringRef()); + const TStringBuf withBuf(args[2].AsStringRef()); + TUtf16String result = UTF8ToWide(input); + const TUtf16String& what = UTF8ToWide(whatBuf); + const TUtf16String& with = UTF8ToWide(withBuf); + if (what.size() != 1) { + ythrow yexception() << "Only one char is supported as second argument"; + } + if (with.size() != 1) { + ythrow yexception() << "Only one char is supported as third argument"; + } + if (const auto index = result.rfind(what[0]); index != TUtf16String::npos) { + result.replace(index, 1, with.data()); + return valueBuilder->NewString(WideToUTF8(result)); + } + return args[0]; + } + + SIMPLE_UDF(TRemoveAll, TUtf8(TAutoMap<TUtf8>, TUtf8)) { + const TStringBuf input(args[0].AsStringRef()); + const TStringBuf removeBuf(args[1].AsStringRef()); + TUtf16String result = UTF8ToWide(input); + const TUtf16String& remove = UTF8ToWide(removeBuf); + for (const wchar16 c : remove) { + RemoveAll(result, c); + } + return valueBuilder->NewString(WideToUTF8(result)); + } + + SIMPLE_UDF(TRemoveFirst, TUtf8(TAutoMap<TUtf8>, TUtf8)) { + const TStringBuf input(args[0].AsStringRef()); + const TStringBuf removeBuf(args[1].AsStringRef()); + TUtf16String result = UTF8ToWide(input); + const TUtf16String& remove = UTF8ToWide(removeBuf); + if (remove.size() != 1) { + ythrow yexception() << "Only one char is supported as second argument"; + } + if (const auto index = result.find(remove[0]); index != TUtf16String::npos) { + result.remove(index, 1); + return valueBuilder->NewString(WideToUTF8(result)); + } + return args[0]; + } + + SIMPLE_UDF(TRemoveLast, TUtf8(TAutoMap<TUtf8>, TUtf8)) { + const TStringBuf input(args[0].AsStringRef()); + const TStringBuf removeBuf(args[1].AsStringRef()); + TUtf16String result = UTF8ToWide(input); + const TUtf16String& remove = UTF8ToWide(removeBuf); + if (remove.size() != 1) { + ythrow yexception() << "Only one char is supported as second argument"; + } + if (const auto index = result.rfind(remove[0]); index != TUtf16String::npos) { + result.remove(index, 1); + return valueBuilder->NewString(WideToUTF8(result)); + } + return args[0]; + } + + SIMPLE_UDF(TToCodePointList, TListType<ui32>(TAutoMap<TUtf8>)) { + size_t codePointCount = 0; + const auto& inputRef = args[0].AsStringRef(); + if (!GetNumberOfUTF8Chars(inputRef.Data(), inputRef.Size(), codePointCount)) { + // should not happen but still we have to check return code + ythrow yexception() << "Unable to count code points"; + } + + TUnboxedValue* itemsPtr = nullptr; + auto result = valueBuilder->NewArray(codePointCount, itemsPtr); + const unsigned char* current = reinterpret_cast<const unsigned char*>(inputRef.Data()); + const unsigned char* end = current + inputRef.Size(); + wchar32 rune = BROKEN_RUNE; + ui32 codePointIndex = 0; + RECODE_RESULT retcode = RECODE_OK; + while (current < end && RECODE_OK == (retcode = ReadUTF8CharAndAdvance(rune, current, end))) { + if (codePointIndex >= codePointCount) { + // sanity check + ythrow yexception() << "Too big code point index " << codePointIndex << ", expecting only " << codePointCount << " code points"; + } + itemsPtr[codePointIndex++] = TUnboxedValuePod(static_cast<ui32>(rune)); + } + + if (retcode != RECODE_OK) { + ythrow yexception() << "Malformed UTF-8 string"; + } + + return result; + } + + SIMPLE_UDF(TFromCodePointList, TUtf8(TAutoMap<TListType<ui32>>)) { + auto input = args[0]; + if (auto elems = input.GetElements()) { + const auto elemCount = input.GetListLength(); + auto bufferSize = WideToUTF8BufferSize(elemCount); + TTempBuf buffer(bufferSize); + auto bufferPtr = buffer.Data(); + auto bufferEnd = buffer.Data() + bufferSize; + for (ui64 i = 0; i != elemCount; ++i) { + const auto& item = elems[i]; + const wchar32 rune = item.Get<ui32>(); + size_t written = 0; + WideToUTF8(&rune, 1, bufferPtr, written); + Y_ENSURE(written <= 4); + bufferPtr += written; + Y_ENSURE(bufferPtr <= bufferEnd); + } + return valueBuilder->NewString(TStringRef(buffer.Data(), bufferPtr - buffer.Data())); + } + + std::vector<char, NUdf::TStdAllocatorForUdf<char>> buffer; + buffer.reserve(TUnboxedValuePod::InternalBufferSize); + + const auto& iter = input.GetListIterator(); + char runeBuffer[4] = {}; + for (NUdf::TUnboxedValue item; iter.Next(item); ) { + const wchar32 rune = item.Get<ui32>(); + size_t written = 0; + WideToUTF8(&rune, 1, runeBuffer, written); + Y_ENSURE(written <= 4); + buffer.insert(buffer.end(), runeBuffer, runeBuffer + written); + } + + return valueBuilder->NewString(TStringRef(buffer.data(), buffer.size())); + } + + SIMPLE_UDF(TReverse, TUtf8(TAutoMap<TUtf8>)) { + auto wide = UTF8ToWide(args[0].AsStringRef()); + ReverseInPlace(wide); + return valueBuilder->NewString(WideToUTF8(wide)); + } + + SIMPLE_UDF(TToLower, TUtf8(TAutoMap<TUtf8>)) { + if (auto wide = UTF8ToWide(args->AsStringRef()); ToLower(wide)) + return valueBuilder->NewString(WideToUTF8(wide)); + else + return *args; + } + + SIMPLE_UDF(TToUpper, TUtf8(TAutoMap<TUtf8>)) { + if (auto wide = UTF8ToWide(args->AsStringRef()); ToUpper(wide)) + return valueBuilder->NewString(WideToUTF8(wide)); + else + return *args; + } + + SIMPLE_UDF(TToTitle, TUtf8(TAutoMap<TUtf8>)) { + if (auto wide = UTF8ToWide(args->AsStringRef()); ToTitle(wide)) + return valueBuilder->NewString(WideToUTF8(wide)); + else + return *args; + } + +#define REGISTER_NORMALIZE_UDF(name, mode) T##name, +#define EXPORTED_UNICODE_BASE_UDF \ + NORMALIZE_UDF_MAP(REGISTER_NORMALIZE_UDF) \ + TIsUtf, \ + TGetLength, \ + TSubstring, \ + TFind, \ + TRFind, \ + TSplitToList, \ + TJoinFromList, \ + TLevensteinDistance, \ + TReplaceAll, \ + TReplaceFirst, \ + TReplaceLast, \ + TRemoveAll, \ + TRemoveFirst, \ + TRemoveLast, \ + TToCodePointList, \ + TFromCodePointList, \ + TReverse, \ + TToLower, \ + TToUpper, \ + TToTitle +} diff --git a/ydb/library/yql/udfs/common/unicode_base/lib/ya.make b/ydb/library/yql/udfs/common/unicode_base/lib/ya.make index f238105c358..495bade7c31 100644 --- a/ydb/library/yql/udfs/common/unicode_base/lib/ya.make +++ b/ydb/library/yql/udfs/common/unicode_base/lib/ya.make @@ -1,26 +1,26 @@ -LIBRARY() - -YQL_ABI_VERSION( - 2 - 9 - 0 -) - +LIBRARY() + +YQL_ABI_VERSION( + 2 + 9 + 0 +) + OWNER( g:yql g:yql_ydb_core ) - -SRCS( - unicode_base_udf.cpp -) - -PEERDIR( - library/cpp/deprecated/split - library/cpp/string_utils/levenshtein_diff - library/cpp/unicode/normalization + +SRCS( + unicode_base_udf.cpp +) + +PEERDIR( + library/cpp/deprecated/split + library/cpp/string_utils/levenshtein_diff + library/cpp/unicode/normalization ydb/library/yql/public/udf ydb/library/yql/utils -) - -END() +) + +END() diff --git a/ydb/library/yql/udfs/common/unicode_base/unicode_base.cpp b/ydb/library/yql/udfs/common/unicode_base/unicode_base.cpp index 674e1575587..366777ab0eb 100644 --- a/ydb/library/yql/udfs/common/unicode_base/unicode_base.cpp +++ b/ydb/library/yql/udfs/common/unicode_base/unicode_base.cpp @@ -1,4 +1,4 @@ -#include "lib/unicode_base_udf.h" - -SIMPLE_MODULE(TUnicodeModule, EXPORTED_UNICODE_BASE_UDF) -REGISTER_MODULES(TUnicodeModule) +#include "lib/unicode_base_udf.h" + +SIMPLE_MODULE(TUnicodeModule, EXPORTED_UNICODE_BASE_UDF) +REGISTER_MODULES(TUnicodeModule) diff --git a/ydb/library/yql/udfs/common/unicode_base/ya.make b/ydb/library/yql/udfs/common/unicode_base/ya.make index 45ca02a816c..ad722ed0ee8 100644 --- a/ydb/library/yql/udfs/common/unicode_base/ya.make +++ b/ydb/library/yql/udfs/common/unicode_base/ya.make @@ -9,7 +9,7 @@ YQL_ABI_VERSION( OWNER(g:yql g:yql_ydb_core) SRCS( - unicode_base.cpp + unicode_base.cpp ) PEERDIR( diff --git a/ydb/library/yql/udfs/common/url_base/lib/url_base_udf.cpp b/ydb/library/yql/udfs/common/url_base/lib/url_base_udf.cpp index 78e7f5f58ec..50a3ee8d1f1 100644 --- a/ydb/library/yql/udfs/common/url_base/lib/url_base_udf.cpp +++ b/ydb/library/yql/udfs/common/url_base/lib/url_base_udf.cpp @@ -1 +1 @@ -#include "url_base_udf.h"
\ No newline at end of file +#include "url_base_udf.h"
\ No newline at end of file diff --git a/ydb/library/yql/udfs/common/url_base/lib/url_base_udf.h b/ydb/library/yql/udfs/common/url_base/lib/url_base_udf.h index a7119a2d4a7..a7d9fedd437 100644 --- a/ydb/library/yql/udfs/common/url_base/lib/url_base_udf.h +++ b/ydb/library/yql/udfs/common/url_base/lib/url_base_udf.h @@ -1,348 +1,348 @@ -#pragma once - -#include "url_parse.h" +#pragma once + +#include "url_parse.h" #include "url_query.h" - + #include <ydb/library/yql/public/udf/udf_helpers.h> - -#include <library/cpp/tld/tld.h> -#include <library/cpp/charset/wide.h> -#include <library/cpp/unicode/punycode/punycode.h> -#include <library/cpp/string_utils/quote/quote.h> -#include <library/cpp/string_utils/url/url.h> - -#include <util/string/split.h> -#include <util/string/subst.h> - -using namespace NKikimr; -using namespace NUdf; -using namespace NTld; -using namespace NUrlUdf; - -namespace { - inline bool PrepareUrl(const std::string_view& keyStr, TUri& parser) { - const NUri::TParseFlags& parseFlags(TUri::FeaturesRecommended); - return parser.ParseAbs(keyStr, parseFlags) == TUri::ParsedOK; - } - - SIMPLE_UDF(TNormalize, TOptional<char*>(TOptional<char*>)) { - EMPTY_RESULT_ON_EMPTY_ARG(0); - TUri url; - const bool success = PrepareUrl(args[0].AsStringRef(), url); - return success - ? valueBuilder->NewString(url.PrintS(TUri::FlagNoFrag)) - : TUnboxedValue(); - } - - SIMPLE_UDF(TGetScheme, char*(TAutoMap<char*>)) { - const std::string_view url(args[0].AsStringRef()); - const std::string_view prefix(GetSchemePrefix(url)); - return valueBuilder->SubString(args[0], std::distance(url.begin(), prefix.begin()), prefix.size()); - } - - SIMPLE_UDF(TGetHost, TOptional<char*>(TOptional<char*>)) { - EMPTY_RESULT_ON_EMPTY_ARG(0); - const std::string_view url(args[0].AsStringRef()); - const std::string_view host(GetOnlyHost(url)); - return host.empty() ? TUnboxedValue() : - valueBuilder->SubString(args[0], std::distance(url.begin(), host.begin()), host.size()); - } - - SIMPLE_UDF(TGetHostPort, TOptional<char*>(TOptional<char*>)) { - EMPTY_RESULT_ON_EMPTY_ARG(0); - const std::string_view url(args[0].AsStringRef()); - const std::string_view host(GetHostAndPort(CutSchemePrefix(url))); - return host.empty() ? TUnboxedValue() : - valueBuilder->SubString(args[0], std::distance(url.begin(), host.begin()), host.size()); - } - - SIMPLE_UDF(TGetSchemeHost, TOptional<char*>(TOptional<char*>)) { - EMPTY_RESULT_ON_EMPTY_ARG(0); - const std::string_view url(args[0].AsStringRef()); - const std::string_view host(GetHost(url)); - return host.empty() ? TUnboxedValue() : - valueBuilder->SubString(args[0], 0U, std::distance(url.begin(), host.end())); - } - - SIMPLE_UDF(TGetSchemeHostPort, TOptional<char*>(TOptional<char*>)) { - EMPTY_RESULT_ON_EMPTY_ARG(0); - const std::string_view url(args[0].AsStringRef()); - const std::string_view host(GetHostAndPort(url)); - return host.empty() ? TUnboxedValue() : - valueBuilder->SubString(args[0], 0U, std::distance(url.begin(), host.end())); - } - - SIMPLE_UDF(TGetPort, TOptional<ui64>(TOptional<char*>)) { - EMPTY_RESULT_ON_EMPTY_ARG(0); - Y_UNUSED(valueBuilder); - const std::string_view url(args[0].AsStringRef()); - const std::string_view host(GetHostAndPort(url)); - const auto pos = host.find(':'); - ui64 port; - bool success = false; - if (pos != std::string_view::npos) { - success = TryFromString(host.substr(pos + 1), port); - } else { - const std::string_view scheme(GetSchemePrefix(url)); - if (scheme.empty() || scheme == "http://") { - port = 80; - } else if (scheme == "https://") { - port = 443; - } - } - return success ? TUnboxedValuePod(port) : TUnboxedValuePod(); - } - - SIMPLE_UDF(TGetTail, TOptional<char*>(TOptional<char*>)) { - EMPTY_RESULT_ON_EMPTY_ARG(0); - const std::string_view url(args[0].AsStringRef()); - std::string_view cut(CutSchemePrefix(url)); - const auto s = cut.find('/'); - if (s == std::string_view::npos) { - return valueBuilder->NewString("/"); - } - - cut.remove_prefix(s); - return valueBuilder->SubString(args[0], std::distance(url.begin(), cut.begin()), cut.length()); - } - - SIMPLE_UDF(TGetPath, TOptional<char*>(TOptional<char*>)) { - EMPTY_RESULT_ON_EMPTY_ARG(0); - const std::string_view url(args[0].AsStringRef()); - std::string_view cut(CutSchemePrefix(url)); - const auto s = cut.find('/'); - if (s == std::string_view::npos) { - return valueBuilder->NewString("/"); - } - - cut.remove_prefix(s); - const auto end = cut.find_first_of("?#"); - if (std::string_view::npos != end) { - cut.remove_suffix(cut.size() - end); - } - - return valueBuilder->SubString(args[0], std::distance(url.begin(), cut.begin()), cut.length()); - } - - SIMPLE_UDF(TGetFragment, TOptional<char*>(TOptional<char*>)) { - EMPTY_RESULT_ON_EMPTY_ARG(0); - const std::string_view url(args[0].AsStringRef()); - const auto pos = url.find('#'); - return pos == std::string_view::npos ? TUnboxedValue() : - valueBuilder->SubString(args[0], pos + 1U, url.length() - pos - 1U); - } - - SIMPLE_UDF(TGetDomain, TOptional<char*>(TOptional<char*>, ui8)) { - EMPTY_RESULT_ON_EMPTY_ARG(0); - const std::string_view url(args[0].AsStringRef()); - const std::string_view host(GetOnlyHost(url)); - const ui8 level = args[1].Get<ui8>(); - std::vector<std::string_view> parts; - StringSplitter(host).Split('.').AddTo(&parts); - if (level && parts.size() >= level) { - const auto& result = host.substr(std::distance(host.begin(), parts[parts.size() - level].begin())); - return result.empty() ? TUnboxedValue() : - valueBuilder->SubString(args[0], std::distance(url.begin(), result.begin()), result.size()); - } - - return TUnboxedValue(); - } - - SIMPLE_UDF(TGetTLD, char*(TAutoMap<char*>)) { - const std::string_view url(args[0].AsStringRef()); - const std::string_view host(GetOnlyHost(url)); - const auto offset = host.rfind('.'); - const auto& cut = std::string_view::npos == offset ? host : host.substr(offset + 1); - return valueBuilder->SubString(args[0], std::distance(url.begin(), cut.begin()), cut.length()); - } - - SIMPLE_UDF(TGetDomainLevel, ui64(TAutoMap<char*>)) { - Y_UNUSED(valueBuilder); - std::vector<std::string_view> parts; - StringSplitter(GetOnlyHost(args[0].AsStringRef())).Split('.').AddTo(&parts); - return TUnboxedValuePod(ui64(parts.size())); - } - - SIMPLE_UDF_OPTIONS(TGetSignificantDomain, char*(TAutoMap<char*>, TOptional<TListType<char*>>), - builder.OptionalArgs(1)) { - const std::string_view url(args[0].AsStringRef()); - const std::string_view host(GetOnlyHost(url)); - std::vector<std::string_view> parts; - StringSplitter(host).Split('.').AddTo(&parts); - if (parts.size() > 2) { - const auto& secondLevel = parts.at(parts.size() - 2); - bool secondLevelIsZone = false; - - if (args[1]) { - const auto& zonesIterator = args[1].GetListIterator(); - for (TUnboxedValue item; zonesIterator.Next(item);) { - if (secondLevel == item.AsStringRef()) { - secondLevelIsZone = true; - break; - } - } - } else { - static const std::set<std::string_view> zones{"com", "net", "org", "co", "gov", "edu"}; - secondLevelIsZone = zones.count(secondLevel); - } - - const auto from = parts[parts.size() - (secondLevelIsZone ? 3U : 2U)].begin(); - return valueBuilder->SubString(args[0], std::distance(url.begin(), from), std::distance(from, parts.back().end())); - } - return valueBuilder->SubString(args[0], std::distance(url.begin(), host.begin()), host.length()); - } - - SIMPLE_UDF(TGetCGIParam, TOptional<char*>(TOptional<char*>, char*)) { - EMPTY_RESULT_ON_EMPTY_ARG(0); - const std::string_view url(args[0].AsStringRef()); - const std::string_view key(args[1].AsStringRef()); - const auto queryStart = url.find('?'); - if (queryStart != std::string_view::npos) { - const auto from = queryStart + 1U; - const auto anc = url.find('#', from); - const auto end = anc == std::string_view::npos ? url.length() : anc; - for (auto pos = from; pos && pos < end; ++pos) { - const auto equal = url.find('=', pos); - const auto amper = url.find('&', pos); - if (equal < amper) { - const auto& param = url.substr(pos, equal - pos); - if (param == key) { - return valueBuilder->SubString(args[0], equal + 1U, std::min(amper, end) - equal - 1U); - } - } - - pos = amper; - } - } - - return TUnboxedValue(); - } - - SIMPLE_UDF(TCutScheme, TOptional<char*>(TOptional<char*>)) { - EMPTY_RESULT_ON_EMPTY_ARG(0); - const std::string_view url(args[0].AsStringRef()); - const std::string_view cut(CutSchemePrefix(url)); - return cut.empty() ? TUnboxedValue() : - valueBuilder->SubString(args[0], std::distance(url.begin(), cut.begin()), cut.length()); - } - - SIMPLE_UDF(TCutWWW, TOptional<char*>(TOptional<char*>)) { - EMPTY_RESULT_ON_EMPTY_ARG(0); - const std::string_view url(args[0].AsStringRef()); - const std::string_view cut(CutWWWPrefix(url)); - return cut.empty() ? TUnboxedValue() : - valueBuilder->SubString(args[0], std::distance(url.begin(), cut.begin()), cut.length()); - } - - SIMPLE_UDF(TCutWWW2, TOptional<char*>(TOptional<char*>)) { - EMPTY_RESULT_ON_EMPTY_ARG(0); - const std::string_view url(args[0].AsStringRef()); - const std::string_view cut(CutWWWNumberedPrefix(url)); - return cut.empty() ? TUnboxedValue() : - valueBuilder->SubString(args[0], std::distance(url.begin(), cut.begin()), cut.length()); - } - - SIMPLE_UDF(TCutQueryStringAndFragment, char*(TAutoMap<char*>)) { - const std::string_view input(args[0].AsStringRef()); - const auto cut = input.find_first_of("?#"); - return std::string_view::npos == cut ? NUdf::TUnboxedValue(args[0]) : valueBuilder->SubString(args[0], 0U, cut); - } - - SIMPLE_UDF(TEncode, TOptional<char*>(TOptional<char*>)) { - EMPTY_RESULT_ON_EMPTY_ARG(0); - const std::string_view input(args[0].AsStringRef()); - if (input.empty()) { - return NUdf::TUnboxedValuePod(); - } - TString url(input); - UrlEscape(url); - return input == url ? NUdf::TUnboxedValue(args[0]) : valueBuilder->NewString(url); - } - - SIMPLE_UDF(TDecode, TOptional<char*>(TOptional<char*>)) { - EMPTY_RESULT_ON_EMPTY_ARG(0); - const std::string_view input(args[0].AsStringRef()); - if (input.empty()) { - return NUdf::TUnboxedValuePod(); - } - TString url(input); - SubstGlobal(url, '+', ' '); - UrlUnescape(url); - return input == url ? NUdf::TUnboxedValue(args[0]) : valueBuilder->NewString(url); - } - - SIMPLE_UDF(TIsKnownTLD, bool(TAutoMap<char*>)) { - Y_UNUSED(valueBuilder); - return TUnboxedValuePod(IsTld(args[0].AsStringRef())); - } - - SIMPLE_UDF(TIsWellKnownTLD, bool(TAutoMap<char*>)) { - Y_UNUSED(valueBuilder); - return TUnboxedValuePod(IsVeryGoodTld(args[0].AsStringRef())); - } - - SIMPLE_UDF(THostNameToPunycode, TOptional<char*>(TAutoMap<char*>)) try { - const TUtf16String& input = UTF8ToWide(args[0].AsStringRef()); - return valueBuilder->NewString(HostNameToPunycode(input)); - } catch (TPunycodeError&) { - return TUnboxedValue(); - } - - SIMPLE_UDF(TForceHostNameToPunycode, char*(TAutoMap<char*>)) { - const TUtf16String& input = UTF8ToWide(args[0].AsStringRef()); - return valueBuilder->NewString(ForceHostNameToPunycode(input)); - } - - SIMPLE_UDF(TPunycodeToHostName, TOptional<char*>(TAutoMap<char*>)) try { - const TStringRef& input = args[0].AsStringRef(); - const auto& result = WideToUTF8(PunycodeToHostName(input)); - return valueBuilder->NewString(result); - } catch (TPunycodeError&) { - return TUnboxedValue(); - } - - SIMPLE_UDF(TForcePunycodeToHostName, char*(TAutoMap<char*>)) { - const TStringRef& input = args[0].AsStringRef(); - const auto& result = WideToUTF8(ForcePunycodeToHostName(input)); - return valueBuilder->NewString(result); - } - - SIMPLE_UDF(TCanBePunycodeHostName, bool(TAutoMap<char*>)) { - Y_UNUSED(valueBuilder); - return TUnboxedValuePod(CanBePunycodeHostName(args[0].AsStringRef())); - } - -#define EXPORTED_URL_BASE_UDF \ - TNormalize, \ - TParse, \ - TGetScheme, \ - TGetHost, \ - TGetHostPort, \ - TGetSchemeHost, \ - TGetSchemeHostPort, \ - TGetPort, \ - TGetTail, \ - TGetPath, \ - TGetFragment, \ - TGetDomain, \ - TGetTLD, \ - TGetDomainLevel, \ - TGetSignificantDomain, \ - TGetCGIParam, \ - TCutScheme, \ - TCutWWW, \ - TCutWWW2, \ - TCutQueryStringAndFragment, \ - TEncode, \ - TDecode, \ - TIsKnownTLD, \ - TIsWellKnownTLD, \ - THostNameToPunycode, \ - TForceHostNameToPunycode, \ - TPunycodeToHostName, \ - TForcePunycodeToHostName, \ + +#include <library/cpp/tld/tld.h> +#include <library/cpp/charset/wide.h> +#include <library/cpp/unicode/punycode/punycode.h> +#include <library/cpp/string_utils/quote/quote.h> +#include <library/cpp/string_utils/url/url.h> + +#include <util/string/split.h> +#include <util/string/subst.h> + +using namespace NKikimr; +using namespace NUdf; +using namespace NTld; +using namespace NUrlUdf; + +namespace { + inline bool PrepareUrl(const std::string_view& keyStr, TUri& parser) { + const NUri::TParseFlags& parseFlags(TUri::FeaturesRecommended); + return parser.ParseAbs(keyStr, parseFlags) == TUri::ParsedOK; + } + + SIMPLE_UDF(TNormalize, TOptional<char*>(TOptional<char*>)) { + EMPTY_RESULT_ON_EMPTY_ARG(0); + TUri url; + const bool success = PrepareUrl(args[0].AsStringRef(), url); + return success + ? valueBuilder->NewString(url.PrintS(TUri::FlagNoFrag)) + : TUnboxedValue(); + } + + SIMPLE_UDF(TGetScheme, char*(TAutoMap<char*>)) { + const std::string_view url(args[0].AsStringRef()); + const std::string_view prefix(GetSchemePrefix(url)); + return valueBuilder->SubString(args[0], std::distance(url.begin(), prefix.begin()), prefix.size()); + } + + SIMPLE_UDF(TGetHost, TOptional<char*>(TOptional<char*>)) { + EMPTY_RESULT_ON_EMPTY_ARG(0); + const std::string_view url(args[0].AsStringRef()); + const std::string_view host(GetOnlyHost(url)); + return host.empty() ? TUnboxedValue() : + valueBuilder->SubString(args[0], std::distance(url.begin(), host.begin()), host.size()); + } + + SIMPLE_UDF(TGetHostPort, TOptional<char*>(TOptional<char*>)) { + EMPTY_RESULT_ON_EMPTY_ARG(0); + const std::string_view url(args[0].AsStringRef()); + const std::string_view host(GetHostAndPort(CutSchemePrefix(url))); + return host.empty() ? TUnboxedValue() : + valueBuilder->SubString(args[0], std::distance(url.begin(), host.begin()), host.size()); + } + + SIMPLE_UDF(TGetSchemeHost, TOptional<char*>(TOptional<char*>)) { + EMPTY_RESULT_ON_EMPTY_ARG(0); + const std::string_view url(args[0].AsStringRef()); + const std::string_view host(GetHost(url)); + return host.empty() ? TUnboxedValue() : + valueBuilder->SubString(args[0], 0U, std::distance(url.begin(), host.end())); + } + + SIMPLE_UDF(TGetSchemeHostPort, TOptional<char*>(TOptional<char*>)) { + EMPTY_RESULT_ON_EMPTY_ARG(0); + const std::string_view url(args[0].AsStringRef()); + const std::string_view host(GetHostAndPort(url)); + return host.empty() ? TUnboxedValue() : + valueBuilder->SubString(args[0], 0U, std::distance(url.begin(), host.end())); + } + + SIMPLE_UDF(TGetPort, TOptional<ui64>(TOptional<char*>)) { + EMPTY_RESULT_ON_EMPTY_ARG(0); + Y_UNUSED(valueBuilder); + const std::string_view url(args[0].AsStringRef()); + const std::string_view host(GetHostAndPort(url)); + const auto pos = host.find(':'); + ui64 port; + bool success = false; + if (pos != std::string_view::npos) { + success = TryFromString(host.substr(pos + 1), port); + } else { + const std::string_view scheme(GetSchemePrefix(url)); + if (scheme.empty() || scheme == "http://") { + port = 80; + } else if (scheme == "https://") { + port = 443; + } + } + return success ? TUnboxedValuePod(port) : TUnboxedValuePod(); + } + + SIMPLE_UDF(TGetTail, TOptional<char*>(TOptional<char*>)) { + EMPTY_RESULT_ON_EMPTY_ARG(0); + const std::string_view url(args[0].AsStringRef()); + std::string_view cut(CutSchemePrefix(url)); + const auto s = cut.find('/'); + if (s == std::string_view::npos) { + return valueBuilder->NewString("/"); + } + + cut.remove_prefix(s); + return valueBuilder->SubString(args[0], std::distance(url.begin(), cut.begin()), cut.length()); + } + + SIMPLE_UDF(TGetPath, TOptional<char*>(TOptional<char*>)) { + EMPTY_RESULT_ON_EMPTY_ARG(0); + const std::string_view url(args[0].AsStringRef()); + std::string_view cut(CutSchemePrefix(url)); + const auto s = cut.find('/'); + if (s == std::string_view::npos) { + return valueBuilder->NewString("/"); + } + + cut.remove_prefix(s); + const auto end = cut.find_first_of("?#"); + if (std::string_view::npos != end) { + cut.remove_suffix(cut.size() - end); + } + + return valueBuilder->SubString(args[0], std::distance(url.begin(), cut.begin()), cut.length()); + } + + SIMPLE_UDF(TGetFragment, TOptional<char*>(TOptional<char*>)) { + EMPTY_RESULT_ON_EMPTY_ARG(0); + const std::string_view url(args[0].AsStringRef()); + const auto pos = url.find('#'); + return pos == std::string_view::npos ? TUnboxedValue() : + valueBuilder->SubString(args[0], pos + 1U, url.length() - pos - 1U); + } + + SIMPLE_UDF(TGetDomain, TOptional<char*>(TOptional<char*>, ui8)) { + EMPTY_RESULT_ON_EMPTY_ARG(0); + const std::string_view url(args[0].AsStringRef()); + const std::string_view host(GetOnlyHost(url)); + const ui8 level = args[1].Get<ui8>(); + std::vector<std::string_view> parts; + StringSplitter(host).Split('.').AddTo(&parts); + if (level && parts.size() >= level) { + const auto& result = host.substr(std::distance(host.begin(), parts[parts.size() - level].begin())); + return result.empty() ? TUnboxedValue() : + valueBuilder->SubString(args[0], std::distance(url.begin(), result.begin()), result.size()); + } + + return TUnboxedValue(); + } + + SIMPLE_UDF(TGetTLD, char*(TAutoMap<char*>)) { + const std::string_view url(args[0].AsStringRef()); + const std::string_view host(GetOnlyHost(url)); + const auto offset = host.rfind('.'); + const auto& cut = std::string_view::npos == offset ? host : host.substr(offset + 1); + return valueBuilder->SubString(args[0], std::distance(url.begin(), cut.begin()), cut.length()); + } + + SIMPLE_UDF(TGetDomainLevel, ui64(TAutoMap<char*>)) { + Y_UNUSED(valueBuilder); + std::vector<std::string_view> parts; + StringSplitter(GetOnlyHost(args[0].AsStringRef())).Split('.').AddTo(&parts); + return TUnboxedValuePod(ui64(parts.size())); + } + + SIMPLE_UDF_OPTIONS(TGetSignificantDomain, char*(TAutoMap<char*>, TOptional<TListType<char*>>), + builder.OptionalArgs(1)) { + const std::string_view url(args[0].AsStringRef()); + const std::string_view host(GetOnlyHost(url)); + std::vector<std::string_view> parts; + StringSplitter(host).Split('.').AddTo(&parts); + if (parts.size() > 2) { + const auto& secondLevel = parts.at(parts.size() - 2); + bool secondLevelIsZone = false; + + if (args[1]) { + const auto& zonesIterator = args[1].GetListIterator(); + for (TUnboxedValue item; zonesIterator.Next(item);) { + if (secondLevel == item.AsStringRef()) { + secondLevelIsZone = true; + break; + } + } + } else { + static const std::set<std::string_view> zones{"com", "net", "org", "co", "gov", "edu"}; + secondLevelIsZone = zones.count(secondLevel); + } + + const auto from = parts[parts.size() - (secondLevelIsZone ? 3U : 2U)].begin(); + return valueBuilder->SubString(args[0], std::distance(url.begin(), from), std::distance(from, parts.back().end())); + } + return valueBuilder->SubString(args[0], std::distance(url.begin(), host.begin()), host.length()); + } + + SIMPLE_UDF(TGetCGIParam, TOptional<char*>(TOptional<char*>, char*)) { + EMPTY_RESULT_ON_EMPTY_ARG(0); + const std::string_view url(args[0].AsStringRef()); + const std::string_view key(args[1].AsStringRef()); + const auto queryStart = url.find('?'); + if (queryStart != std::string_view::npos) { + const auto from = queryStart + 1U; + const auto anc = url.find('#', from); + const auto end = anc == std::string_view::npos ? url.length() : anc; + for (auto pos = from; pos && pos < end; ++pos) { + const auto equal = url.find('=', pos); + const auto amper = url.find('&', pos); + if (equal < amper) { + const auto& param = url.substr(pos, equal - pos); + if (param == key) { + return valueBuilder->SubString(args[0], equal + 1U, std::min(amper, end) - equal - 1U); + } + } + + pos = amper; + } + } + + return TUnboxedValue(); + } + + SIMPLE_UDF(TCutScheme, TOptional<char*>(TOptional<char*>)) { + EMPTY_RESULT_ON_EMPTY_ARG(0); + const std::string_view url(args[0].AsStringRef()); + const std::string_view cut(CutSchemePrefix(url)); + return cut.empty() ? TUnboxedValue() : + valueBuilder->SubString(args[0], std::distance(url.begin(), cut.begin()), cut.length()); + } + + SIMPLE_UDF(TCutWWW, TOptional<char*>(TOptional<char*>)) { + EMPTY_RESULT_ON_EMPTY_ARG(0); + const std::string_view url(args[0].AsStringRef()); + const std::string_view cut(CutWWWPrefix(url)); + return cut.empty() ? TUnboxedValue() : + valueBuilder->SubString(args[0], std::distance(url.begin(), cut.begin()), cut.length()); + } + + SIMPLE_UDF(TCutWWW2, TOptional<char*>(TOptional<char*>)) { + EMPTY_RESULT_ON_EMPTY_ARG(0); + const std::string_view url(args[0].AsStringRef()); + const std::string_view cut(CutWWWNumberedPrefix(url)); + return cut.empty() ? TUnboxedValue() : + valueBuilder->SubString(args[0], std::distance(url.begin(), cut.begin()), cut.length()); + } + + SIMPLE_UDF(TCutQueryStringAndFragment, char*(TAutoMap<char*>)) { + const std::string_view input(args[0].AsStringRef()); + const auto cut = input.find_first_of("?#"); + return std::string_view::npos == cut ? NUdf::TUnboxedValue(args[0]) : valueBuilder->SubString(args[0], 0U, cut); + } + + SIMPLE_UDF(TEncode, TOptional<char*>(TOptional<char*>)) { + EMPTY_RESULT_ON_EMPTY_ARG(0); + const std::string_view input(args[0].AsStringRef()); + if (input.empty()) { + return NUdf::TUnboxedValuePod(); + } + TString url(input); + UrlEscape(url); + return input == url ? NUdf::TUnboxedValue(args[0]) : valueBuilder->NewString(url); + } + + SIMPLE_UDF(TDecode, TOptional<char*>(TOptional<char*>)) { + EMPTY_RESULT_ON_EMPTY_ARG(0); + const std::string_view input(args[0].AsStringRef()); + if (input.empty()) { + return NUdf::TUnboxedValuePod(); + } + TString url(input); + SubstGlobal(url, '+', ' '); + UrlUnescape(url); + return input == url ? NUdf::TUnboxedValue(args[0]) : valueBuilder->NewString(url); + } + + SIMPLE_UDF(TIsKnownTLD, bool(TAutoMap<char*>)) { + Y_UNUSED(valueBuilder); + return TUnboxedValuePod(IsTld(args[0].AsStringRef())); + } + + SIMPLE_UDF(TIsWellKnownTLD, bool(TAutoMap<char*>)) { + Y_UNUSED(valueBuilder); + return TUnboxedValuePod(IsVeryGoodTld(args[0].AsStringRef())); + } + + SIMPLE_UDF(THostNameToPunycode, TOptional<char*>(TAutoMap<char*>)) try { + const TUtf16String& input = UTF8ToWide(args[0].AsStringRef()); + return valueBuilder->NewString(HostNameToPunycode(input)); + } catch (TPunycodeError&) { + return TUnboxedValue(); + } + + SIMPLE_UDF(TForceHostNameToPunycode, char*(TAutoMap<char*>)) { + const TUtf16String& input = UTF8ToWide(args[0].AsStringRef()); + return valueBuilder->NewString(ForceHostNameToPunycode(input)); + } + + SIMPLE_UDF(TPunycodeToHostName, TOptional<char*>(TAutoMap<char*>)) try { + const TStringRef& input = args[0].AsStringRef(); + const auto& result = WideToUTF8(PunycodeToHostName(input)); + return valueBuilder->NewString(result); + } catch (TPunycodeError&) { + return TUnboxedValue(); + } + + SIMPLE_UDF(TForcePunycodeToHostName, char*(TAutoMap<char*>)) { + const TStringRef& input = args[0].AsStringRef(); + const auto& result = WideToUTF8(ForcePunycodeToHostName(input)); + return valueBuilder->NewString(result); + } + + SIMPLE_UDF(TCanBePunycodeHostName, bool(TAutoMap<char*>)) { + Y_UNUSED(valueBuilder); + return TUnboxedValuePod(CanBePunycodeHostName(args[0].AsStringRef())); + } + +#define EXPORTED_URL_BASE_UDF \ + TNormalize, \ + TParse, \ + TGetScheme, \ + TGetHost, \ + TGetHostPort, \ + TGetSchemeHost, \ + TGetSchemeHostPort, \ + TGetPort, \ + TGetTail, \ + TGetPath, \ + TGetFragment, \ + TGetDomain, \ + TGetTLD, \ + TGetDomainLevel, \ + TGetSignificantDomain, \ + TGetCGIParam, \ + TCutScheme, \ + TCutWWW, \ + TCutWWW2, \ + TCutQueryStringAndFragment, \ + TEncode, \ + TDecode, \ + TIsKnownTLD, \ + TIsWellKnownTLD, \ + THostNameToPunycode, \ + TForceHostNameToPunycode, \ + TPunycodeToHostName, \ + TForcePunycodeToHostName, \ TCanBePunycodeHostName, \ TQueryStringToList, \ TQueryStringToDict, \ TBuildQueryString -} +} diff --git a/ydb/library/yql/udfs/common/url_base/lib/ya.make b/ydb/library/yql/udfs/common/url_base/lib/ya.make index 20e2858d066..4b16b271e25 100644 --- a/ydb/library/yql/udfs/common/url_base/lib/ya.make +++ b/ydb/library/yql/udfs/common/url_base/lib/ya.make @@ -1,30 +1,30 @@ -LIBRARY() - -YQL_ABI_VERSION( - 2 +LIBRARY() + +YQL_ABI_VERSION( + 2 23 - 0 -) - + 0 +) + OWNER( g:yql g:yql_ydb_core ) - -SRCS( - url_base_udf.cpp - url_parse.cpp + +SRCS( + url_base_udf.cpp + url_parse.cpp url_query.cpp -) - -PEERDIR( - library/cpp/charset +) + +PEERDIR( + library/cpp/charset library/cpp/string_utils/quote library/cpp/string_utils/url - library/cpp/tld - library/cpp/unicode/punycode - library/cpp/uri + library/cpp/tld + library/cpp/unicode/punycode + library/cpp/uri ydb/library/yql/public/udf -) - -END() +) + +END() diff --git a/ydb/library/yql/udfs/common/url_base/url_base.cpp b/ydb/library/yql/udfs/common/url_base/url_base.cpp index d9f02e6e714..5a0b0182a96 100644 --- a/ydb/library/yql/udfs/common/url_base/url_base.cpp +++ b/ydb/library/yql/udfs/common/url_base/url_base.cpp @@ -1,7 +1,7 @@ #include <ydb/library/yql/public/udf/udf_helpers.h> - -#include "lib/url_base_udf.h" - -SIMPLE_MODULE(TUrlModule, EXPORTED_URL_BASE_UDF) + +#include "lib/url_base_udf.h" + +SIMPLE_MODULE(TUrlModule, EXPORTED_URL_BASE_UDF) REGISTER_MODULES(TUrlModule) diff --git a/ydb/library/yql/udfs/common/url_base/ya.make b/ydb/library/yql/udfs/common/url_base/ya.make index 475c8ef0263..9a2117fb3cb 100644 --- a/ydb/library/yql/udfs/common/url_base/ya.make +++ b/ydb/library/yql/udfs/common/url_base/ya.make @@ -12,7 +12,7 @@ OWNER( ) SRCS( - url_base.cpp + url_base.cpp ) PEERDIR( |