diff options
author | aneporada <aneporada@ydb.tech> | 2023-11-07 18:05:23 +0300 |
---|---|---|
committer | aneporada <aneporada@ydb.tech> | 2023-11-07 19:04:47 +0300 |
commit | 6fb44d264c2506caaab59ed462d729da9fbe7653 (patch) | |
tree | a8641bddd043a7228715a4bb770453aac26c3fdc | |
parent | 4382353c3985417a93ae2b65ad96fe22531e39df (diff) | |
download | ydb-6fb44d264c2506caaab59ed462d729da9fbe7653.tar.gz |
Refactoring: fix code duplication when working with DqStage settings
40 files changed, 204 insertions, 146 deletions
diff --git a/ydb/core/kqp/opt/CMakeLists.darwin-x86_64.txt b/ydb/core/kqp/opt/CMakeLists.darwin-x86_64.txt index 6e1f5dcb11..3499cb9fc7 100644 --- a/ydb/core/kqp/opt/CMakeLists.darwin-x86_64.txt +++ b/ydb/core/kqp/opt/CMakeLists.darwin-x86_64.txt @@ -29,6 +29,7 @@ target_link_libraries(core-kqp-opt PUBLIC kqp-opt-physical yql-dq-common yql-dq-opt + yql-dq-type_ann providers-s3-expr_nodes core-kqp-provider tools-enum_parser-enum_serialization_runtime diff --git a/ydb/core/kqp/opt/CMakeLists.linux-aarch64.txt b/ydb/core/kqp/opt/CMakeLists.linux-aarch64.txt index 7b8adeb4e8..bb161bfebc 100644 --- a/ydb/core/kqp/opt/CMakeLists.linux-aarch64.txt +++ b/ydb/core/kqp/opt/CMakeLists.linux-aarch64.txt @@ -30,6 +30,7 @@ target_link_libraries(core-kqp-opt PUBLIC kqp-opt-physical yql-dq-common yql-dq-opt + yql-dq-type_ann providers-s3-expr_nodes core-kqp-provider tools-enum_parser-enum_serialization_runtime diff --git a/ydb/core/kqp/opt/CMakeLists.linux-x86_64.txt b/ydb/core/kqp/opt/CMakeLists.linux-x86_64.txt index 7b8adeb4e8..bb161bfebc 100644 --- a/ydb/core/kqp/opt/CMakeLists.linux-x86_64.txt +++ b/ydb/core/kqp/opt/CMakeLists.linux-x86_64.txt @@ -30,6 +30,7 @@ target_link_libraries(core-kqp-opt PUBLIC kqp-opt-physical yql-dq-common yql-dq-opt + yql-dq-type_ann providers-s3-expr_nodes core-kqp-provider tools-enum_parser-enum_serialization_runtime diff --git a/ydb/core/kqp/opt/CMakeLists.windows-x86_64.txt b/ydb/core/kqp/opt/CMakeLists.windows-x86_64.txt index 6e1f5dcb11..3499cb9fc7 100644 --- a/ydb/core/kqp/opt/CMakeLists.windows-x86_64.txt +++ b/ydb/core/kqp/opt/CMakeLists.windows-x86_64.txt @@ -29,6 +29,7 @@ target_link_libraries(core-kqp-opt PUBLIC kqp-opt-physical yql-dq-common yql-dq-opt + yql-dq-type_ann providers-s3-expr_nodes core-kqp-provider tools-enum_parser-enum_serialization_runtime diff --git a/ydb/core/kqp/opt/kqp_opt_build_txs.cpp b/ydb/core/kqp/opt/kqp_opt_build_txs.cpp index 857cffe859..755e58441e 100644 --- a/ydb/core/kqp/opt/kqp_opt_build_txs.cpp +++ b/ydb/core/kqp/opt/kqp_opt_build_txs.cpp @@ -6,6 +6,7 @@ #include <ydb/library/yql/core/yql_expr_optimize.h> #include <ydb/library/yql/dq/opt/dq_opt.h> #include <ydb/library/yql/dq/opt/dq_opt_build.h> +#include <ydb/library/yql/dq/type_ann/dq_type_ann.h> #include <ydb/library/yql/core/services/yql_out_transformers.h> #include <ydb/library/yql/core/services/yql_transform_pipeline.h> #include <ydb/library/yql/providers/common/provider/yql_provider.h> diff --git a/ydb/core/kqp/opt/kqp_query_plan.cpp b/ydb/core/kqp/opt/kqp_query_plan.cpp index cab38fb67b..5065723df9 100644 --- a/ydb/core/kqp/opt/kqp_query_plan.cpp +++ b/ydb/core/kqp/opt/kqp_query_plan.cpp @@ -10,6 +10,7 @@ #include <ydb/library/yql/core/yql_expr_optimize.h> #include <ydb/library/yql/core/yql_opt_utils.h> #include <ydb/library/yql/dq/opt/dq_opt.h> +#include <ydb/library/yql/dq/type_ann/dq_type_ann.h> #include <ydb/library/yql/dq/tasks/dq_tasks_graph.h> #include <ydb/library/yql/providers/s3/expr_nodes/yql_s3_expr_nodes.h> diff --git a/ydb/core/kqp/opt/physical/CMakeLists.darwin-x86_64.txt b/ydb/core/kqp/opt/physical/CMakeLists.darwin-x86_64.txt index f78e4a6016..4a838cacd6 100644 --- a/ydb/core/kqp/opt/physical/CMakeLists.darwin-x86_64.txt +++ b/ydb/core/kqp/opt/physical/CMakeLists.darwin-x86_64.txt @@ -19,6 +19,7 @@ target_link_libraries(kqp-opt-physical PUBLIC opt-physical-effects yql-dq-common yql-dq-opt + yql-dq-type_ann providers-common-pushdown ) target_sources(kqp-opt-physical PRIVATE diff --git a/ydb/core/kqp/opt/physical/CMakeLists.linux-aarch64.txt b/ydb/core/kqp/opt/physical/CMakeLists.linux-aarch64.txt index cdcab0fa74..fa95a0b09c 100644 --- a/ydb/core/kqp/opt/physical/CMakeLists.linux-aarch64.txt +++ b/ydb/core/kqp/opt/physical/CMakeLists.linux-aarch64.txt @@ -20,6 +20,7 @@ target_link_libraries(kqp-opt-physical PUBLIC opt-physical-effects yql-dq-common yql-dq-opt + yql-dq-type_ann providers-common-pushdown ) target_sources(kqp-opt-physical PRIVATE diff --git a/ydb/core/kqp/opt/physical/CMakeLists.linux-x86_64.txt b/ydb/core/kqp/opt/physical/CMakeLists.linux-x86_64.txt index cdcab0fa74..fa95a0b09c 100644 --- a/ydb/core/kqp/opt/physical/CMakeLists.linux-x86_64.txt +++ b/ydb/core/kqp/opt/physical/CMakeLists.linux-x86_64.txt @@ -20,6 +20,7 @@ target_link_libraries(kqp-opt-physical PUBLIC opt-physical-effects yql-dq-common yql-dq-opt + yql-dq-type_ann providers-common-pushdown ) target_sources(kqp-opt-physical PRIVATE diff --git a/ydb/core/kqp/opt/physical/CMakeLists.windows-x86_64.txt b/ydb/core/kqp/opt/physical/CMakeLists.windows-x86_64.txt index f78e4a6016..4a838cacd6 100644 --- a/ydb/core/kqp/opt/physical/CMakeLists.windows-x86_64.txt +++ b/ydb/core/kqp/opt/physical/CMakeLists.windows-x86_64.txt @@ -19,6 +19,7 @@ target_link_libraries(kqp-opt-physical PUBLIC opt-physical-effects yql-dq-common yql-dq-opt + yql-dq-type_ann providers-common-pushdown ) target_sources(kqp-opt-physical PRIVATE diff --git a/ydb/core/kqp/opt/physical/kqp_opt_phy_build_stage.cpp b/ydb/core/kqp/opt/physical/kqp_opt_phy_build_stage.cpp index 22688e7505..bb329f4dae 100644 --- a/ydb/core/kqp/opt/physical/kqp_opt_phy_build_stage.cpp +++ b/ydb/core/kqp/opt/physical/kqp_opt_phy_build_stage.cpp @@ -8,6 +8,7 @@ #include <ydb/public/lib/scheme_types/scheme_type_id.h> #include <ydb/library/yql/dq/opt/dq_opt.h> +#include <ydb/library/yql/dq/type_ann/dq_type_ann.h> #include <ydb/library/yql/core/yql_opt_utils.h> namespace NKikimr::NKqp::NOpt { diff --git a/ydb/core/kqp/opt/physical/ya.make b/ydb/core/kqp/opt/physical/ya.make index 07b0cdf0e0..ca28f8057b 100644 --- a/ydb/core/kqp/opt/physical/ya.make +++ b/ydb/core/kqp/opt/physical/ya.make @@ -18,6 +18,7 @@ PEERDIR( ydb/core/kqp/opt/physical/effects ydb/library/yql/dq/common ydb/library/yql/dq/opt + ydb/library/yql/dq/type_ann ydb/library/yql/providers/common/pushdown ) diff --git a/ydb/core/kqp/opt/ya.make b/ydb/core/kqp/opt/ya.make index bc13e15d34..9dbadad013 100644 --- a/ydb/core/kqp/opt/ya.make +++ b/ydb/core/kqp/opt/ya.make @@ -22,6 +22,7 @@ PEERDIR( ydb/core/kqp/opt/physical ydb/library/yql/dq/common ydb/library/yql/dq/opt + ydb/library/yql/dq/type_ann ydb/library/yql/providers/s3/expr_nodes ydb/core/kqp/provider ) diff --git a/ydb/core/kqp/query_compiler/CMakeLists.darwin-x86_64.txt b/ydb/core/kqp/query_compiler/CMakeLists.darwin-x86_64.txt index 90d3f4b7c2..4bb9c30770 100644 --- a/ydb/core/kqp/query_compiler/CMakeLists.darwin-x86_64.txt +++ b/ydb/core/kqp/query_compiler/CMakeLists.darwin-x86_64.txt @@ -21,6 +21,7 @@ target_link_libraries(core-kqp-query_compiler PUBLIC core-arrow_kernels-request yql-dq-integration yql-dq-opt + yql-dq-type_ann yql-dq-tasks library-yql-minikql providers-common-mkql diff --git a/ydb/core/kqp/query_compiler/CMakeLists.linux-aarch64.txt b/ydb/core/kqp/query_compiler/CMakeLists.linux-aarch64.txt index 30947e3dc2..f06e2a8133 100644 --- a/ydb/core/kqp/query_compiler/CMakeLists.linux-aarch64.txt +++ b/ydb/core/kqp/query_compiler/CMakeLists.linux-aarch64.txt @@ -22,6 +22,7 @@ target_link_libraries(core-kqp-query_compiler PUBLIC core-arrow_kernels-request yql-dq-integration yql-dq-opt + yql-dq-type_ann yql-dq-tasks library-yql-minikql providers-common-mkql diff --git a/ydb/core/kqp/query_compiler/CMakeLists.linux-x86_64.txt b/ydb/core/kqp/query_compiler/CMakeLists.linux-x86_64.txt index 30947e3dc2..f06e2a8133 100644 --- a/ydb/core/kqp/query_compiler/CMakeLists.linux-x86_64.txt +++ b/ydb/core/kqp/query_compiler/CMakeLists.linux-x86_64.txt @@ -22,6 +22,7 @@ target_link_libraries(core-kqp-query_compiler PUBLIC core-arrow_kernels-request yql-dq-integration yql-dq-opt + yql-dq-type_ann yql-dq-tasks library-yql-minikql providers-common-mkql diff --git a/ydb/core/kqp/query_compiler/CMakeLists.windows-x86_64.txt b/ydb/core/kqp/query_compiler/CMakeLists.windows-x86_64.txt index 90d3f4b7c2..4bb9c30770 100644 --- a/ydb/core/kqp/query_compiler/CMakeLists.windows-x86_64.txt +++ b/ydb/core/kqp/query_compiler/CMakeLists.windows-x86_64.txt @@ -21,6 +21,7 @@ target_link_libraries(core-kqp-query_compiler PUBLIC core-arrow_kernels-request yql-dq-integration yql-dq-opt + yql-dq-type_ann yql-dq-tasks library-yql-minikql providers-common-mkql diff --git a/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp b/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp index 1bc3082812..8d9ab4e6e0 100644 --- a/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp +++ b/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp @@ -13,6 +13,7 @@ #include <ydb/library/yql/dq/integration/yql_dq_integration.h> #include <ydb/library/yql/dq/opt/dq_opt.h> +#include <ydb/library/yql/dq/type_ann/dq_type_ann.h> #include <ydb/library/yql/dq/tasks/dq_task_program.h> #include <ydb/library/yql/minikql/mkql_node_serialization.h> #include <ydb/library/yql/providers/common/mkql/yql_type_mkql.h> diff --git a/ydb/core/kqp/query_compiler/ya.make b/ydb/core/kqp/query_compiler/ya.make index 735b2c41d4..c2ab1f96c8 100644 --- a/ydb/core/kqp/query_compiler/ya.make +++ b/ydb/core/kqp/query_compiler/ya.make @@ -14,6 +14,7 @@ PEERDIR( ydb/library/yql/core/arrow_kernels/request ydb/library/yql/dq/integration ydb/library/yql/dq/opt + ydb/library/yql/dq/type_ann ydb/library/yql/dq/tasks ydb/library/yql/minikql ydb/library/yql/providers/common/mkql diff --git a/ydb/library/yql/dq/opt/dq_opt.cpp b/ydb/library/yql/dq/opt/dq_opt.cpp index cbde2de0e2..7954b61b55 100644 --- a/ydb/library/yql/dq/opt/dq_opt.cpp +++ b/ydb/library/yql/dq/opt/dq_opt.cpp @@ -9,74 +9,6 @@ using namespace NYql::NNodes; namespace NYql::NDq { -TDqStageSettings TDqStageSettings::Parse(const TDqStageBase& node) { - TDqStageSettings settings{}; - - for (const auto& tuple : node.Settings()) { - if (const auto name = tuple.Name().Value(); name == IdSettingName) { - YQL_ENSURE(tuple.Value().Maybe<TCoAtom>()); - settings.Id = tuple.Value().Cast<TCoAtom>().Value(); - } else if (name == LogicalIdSettingName) { - YQL_ENSURE(tuple.Value().Maybe<TCoAtom>()); - settings.LogicalId = FromString<ui64>(tuple.Value().Cast<TCoAtom>().Value()); - } else if (name == SinglePartitionSettingName) { - settings.SinglePartition = true; - } else if (name == WideChannelsSettingName) { - settings.WideChannels = true; - settings.OutputNarrowType = tuple.Value().Ref().GetTypeAnn()->Cast<TTypeExprType>()->GetType()->Cast<TStructExprType>(); - } - } - - return settings; -} - -TDqStageSettings TDqStageSettings::New(const NNodes::TDqStageBase& node) { - auto settings = Parse(node); - - if (!settings.Id) { - settings.Id = CreateGuidAsString(); - } - - return settings; -} - -NNodes::TCoNameValueTupleList TDqStageSettings::BuildNode(TExprContext& ctx, TPositionHandle pos) const { - TVector<TCoNameValueTuple> settings; - auto logicalId = LogicalId; - if (!logicalId) { - logicalId = ctx.NextUniqueId; - } - - settings.push_back(Build<TCoNameValueTuple>(ctx, pos) - .Name().Build(LogicalIdSettingName) - .Value<TCoAtom>().Build(logicalId) - .Done()); - - if (Id) { - settings.push_back(Build<TCoNameValueTuple>(ctx, pos) - .Name().Build(IdSettingName) - .Value<TCoAtom>().Build(Id) - .Done()); - } - - if (SinglePartition) { - settings.push_back(Build<TCoNameValueTuple>(ctx, pos) - .Name().Build(SinglePartitionSettingName) - .Done()); - } - - if (WideChannels) { - YQL_ENSURE(OutputNarrowType); - settings.push_back(Build<TCoNameValueTuple>(ctx, pos) - .Name().Build(WideChannelsSettingName) - .Value(ExpandType(pos, *OutputNarrowType, ctx)) - .Done()); - } - - return Build<TCoNameValueTupleList>(ctx, pos) - .Add(settings) - .Done(); -} TCoAtom BuildAtom(TStringBuf value, TPositionHandle pos, TExprContext& ctx) { return Build<TCoAtom>(ctx, pos) diff --git a/ydb/library/yql/dq/opt/dq_opt.h b/ydb/library/yql/dq/opt/dq_opt.h index 8b7bd7ea98..686ff84ec6 100644 --- a/ydb/library/yql/dq/opt/dq_opt.h +++ b/ydb/library/yql/dq/opt/dq_opt.h @@ -8,34 +8,6 @@ namespace NYql::NDq { -struct TDqStageSettings { - static constexpr TStringBuf LogicalIdSettingName = "_logical_id"; - static constexpr TStringBuf IdSettingName = "_id"; - static constexpr TStringBuf SinglePartitionSettingName = "_single_partition"; - static constexpr TStringBuf WideChannelsSettingName = "_wide_channels"; - - ui64 LogicalId = 0; - TString Id; - bool SinglePartition = false; - - bool WideChannels = false; - const TStructExprType* OutputNarrowType = nullptr; - - TDqStageSettings& SetSinglePartition(bool value = true) { SinglePartition = value; return *this; } - TDqStageSettings& SetWideChannels(const TStructExprType& narrowType) { WideChannels = true; OutputNarrowType = &narrowType; return *this; } - - static TDqStageSettings New(const NNodes::TDqStageBase& node); - - static TDqStageSettings New() { - TDqStageSettings s; - s.Id = CreateGuidAsString(); - return s; - } - - static TDqStageSettings Parse(const NNodes::TDqStageBase& node); - NNodes::TCoNameValueTupleList BuildNode(TExprContext& ctx, TPositionHandle pos) const; -}; - NNodes::TCoAtom BuildAtom(TStringBuf value, TPositionHandle pos, TExprContext& ctx); NNodes::TCoAtomList BuildAtomList(TStringBuf value, TPositionHandle pos, TExprContext& ctx); NNodes::TCoLambda BuildIdentityLambda(TPositionHandle pos, TExprContext& ctx); diff --git a/ydb/library/yql/dq/opt/dq_opt_build.cpp b/ydb/library/yql/dq/opt/dq_opt_build.cpp index 5d49d3c67d..51f6fccade 100644 --- a/ydb/library/yql/dq/opt/dq_opt_build.cpp +++ b/ydb/library/yql/dq/opt/dq_opt_build.cpp @@ -5,6 +5,7 @@ #include <ydb/library/yql/ast/yql_expr.h> #include <ydb/library/yql/core/yql_expr_optimize.h> #include <ydb/library/yql/dq/expr_nodes/dq_expr_nodes.h> +#include <ydb/library/yql/dq/type_ann/dq_type_ann.h> #include <ydb/library/yql/utils/log/log.h> #include <ydb/library/yql/providers/common/provider/yql_provider.h> diff --git a/ydb/library/yql/dq/type_ann/dq_type_ann.cpp b/ydb/library/yql/dq/type_ann/dq_type_ann.cpp index 4cc5d09056..013be5f2d2 100644 --- a/ydb/library/yql/dq/type_ann/dq_type_ann.cpp +++ b/ydb/library/yql/dq/type_ann/dq_type_ann.cpp @@ -16,9 +16,6 @@ using TStatus = NYql::IGraphTransformer::TStatus; namespace { -// TODO: relocate names to common code for type_ann and dq_opt.cpp -static constexpr TStringBuf WideChannelsSettingName = "_wide_channels"; - const TTypeAnnotationNode* GetDqOutputType(const TDqOutput& output, TExprContext& ctx) { auto stageResultTuple = output.Stage().Ref().GetTypeAnn()->Cast<TTupleExprType>(); @@ -53,7 +50,8 @@ const TTypeAnnotationNode* GetDqConnectionType(const TDqConnection& node, TExprC } const TTypeAnnotationNode* GetColumnType(const TDqConnection& node, const TStructExprType& structType, TStringBuf name, TPositionHandle pos, TExprContext& ctx) { - if (HasSetting(node.Output().Stage().Settings().Ref(), WideChannelsSettingName)) { + TDqStageSettings settings = TDqStageSettings::Parse(node.Output().Stage()); + if (settings.WideChannels) { auto multiType = node.Output().Stage().Program().Ref().GetTypeAnn()->Cast<TStreamExprType>()->GetItemType()->Cast<TMultiExprType>(); ui32 idx; if (!TryFromString(name, idx)) { @@ -95,52 +93,15 @@ TStatus AnnotateStage(const TExprNode::TPtr& stage, TExprContext& ctx) { auto* inputsTuple = stage->Child(TDqStageBase::idx_Inputs); auto& programLambda = stage->ChildRef(TDqStageBase::idx_Program); - auto* settingsTuple = stage->Child(TDqPhyStage::idx_Settings); if (!EnsureTuple(*inputsTuple, ctx)) { return TStatus::Error; } - if (!EnsureTuple(*settingsTuple, ctx)) { + if (!TDqStageSettings::Validate(*stage, ctx)) { return TStatus::Error; } - bool useWideChannels = false; - const TStructExprType* outputNarrowType = nullptr; - for (auto& setting: settingsTuple->Children()) { - if (!EnsureTupleMinSize(*setting, 1, ctx)) { - return TStatus::Error; - } - if (!EnsureAtom(*setting->Child(0), ctx)) { - return TStatus::Error; - } - - TStringBuf name = setting->Head().Content(); - if (name == WideChannelsSettingName) { - if (setting->ChildrenSize() != 2) { - ctx.AddError(TIssue(ctx.GetPosition(setting->Pos()), TStringBuilder() << "Setting " << name << " should contain single value")); - return TStatus::Error; - } - auto value = setting->Child(1); - if (!EnsureType(*value, ctx)) { - return TStatus::Error; - } - - auto valueType = value->GetTypeAnn()->Cast<TTypeExprType>()->GetType(); - if (!EnsureStructType(value->Pos(), *valueType, ctx)) { - return TStatus::Error; - } - - if constexpr (std::is_same_v<TStage, TDqStage>) { - ctx.AddError(TIssue(ctx.GetPosition(setting->Pos()), TStringBuilder() << "Setting " << name << " is not supported in " << stage->Content())); - return TStatus::Error; - } else { - useWideChannels = true; - outputNarrowType = valueType->Cast<TStructExprType>(); - } - } - } - if (!EnsureLambda(*programLambda, ctx)) { return TStatus::Error; } @@ -177,7 +138,7 @@ TStatus AnnotateStage(const TExprNode::TPtr& stage, TExprContext& ctx) { if (TDqConnection::Match(input.Get())) { TDqConnection conn(input); - if (HasSetting(conn.Output().Stage().Settings().Ref(), WideChannelsSettingName)) { + if (TDqStageSettings::Parse(conn.Output().Stage()).WideChannels) { argType = conn.Output().Stage().Program().Ref().GetTypeAnn(); } } @@ -233,7 +194,8 @@ TStatus AnnotateStage(const TExprNode::TPtr& stage, TExprContext& ctx) { } } - if (useWideChannels) { + const TDqStageSettings settings = TDqStageSettings::Parse(TDqStageBase(stage)); + if (settings.WideChannels) { if (!EnsureWideStreamType(*programLambda, ctx)) { ctx.AddError(TIssue(ctx.GetPosition(programLambda->Pos()),TStringBuilder() << "Wide channel stage requires exactly one output, but got " << programResultTypesTuple.size())); return TStatus::Error; @@ -247,14 +209,14 @@ TStatus AnnotateStage(const TExprNode::TPtr& stage, TExprContext& ctx) { } const ui32 width = isBlock ? (blockItemTypes.size() - 1) : multiType->GetSize(); - if (width != outputNarrowType->GetSize()) { + if (width != settings.OutputNarrowType->GetSize()) { ctx.AddError(TIssue(ctx.GetPosition(programLambda->Pos()),TStringBuilder() << "Wide/narrow types has different number of items: " << - width << " vs " << outputNarrowType->GetSize())); + width << " vs " << settings.OutputNarrowType->GetSize())); return TStatus::Error; } - for (size_t i = 0; i < outputNarrowType->GetSize(); ++i) { - auto structItem = outputNarrowType->GetItems()[i]; + for (size_t i = 0; i < settings.OutputNarrowType->GetSize(); ++i) { + auto structItem = settings.OutputNarrowType->GetItems()[i]; auto wideItem = isBlock ? blockItemTypes[i] : multiType->GetItems()[i]; if (!IsSameAnnotation(*structItem->GetItemType(), *wideItem)) { ctx.AddError(TIssue(ctx.GetPosition(programLambda->Pos()),TStringBuilder() << "Wide/narrow types mismatch for column '" << @@ -263,14 +225,14 @@ TStatus AnnotateStage(const TExprNode::TPtr& stage, TExprContext& ctx) { } } - programResultTypesTuple[0] = ctx.MakeType<TListExprType>(outputNarrowType); + programResultTypesTuple[0] = ctx.MakeType<TListExprType>(settings.OutputNarrowType); } TVector<const TTypeAnnotationNode*> stageResultTypes; if (TDqStageBase::idx_Outputs < stage->ChildrenSize()) { YQL_ENSURE(stage->Child(TDqStageBase::idx_Outputs)->ChildrenSize() != 0, "Stage.Outputs list exists but empty, stage: " << stage->Dump()); - if (useWideChannels) { + if (settings.WideChannels) { ctx.AddError(TIssue(ctx.GetPosition(programLambda->Pos()),TStringBuilder() << "Wide channel stage is incompatible with Sink/Transform")); return TStatus::Error; } @@ -1142,6 +1104,137 @@ bool IsMergeConnectionApplicable(const TVector<const TTypeAnnotationNode*>& sort return true; } +TDqStageSettings TDqStageSettings::Parse(const TDqStageBase& node) { + TDqStageSettings settings{}; + + for (const auto& tuple : node.Settings()) { + if (const auto name = tuple.Name().Value(); name == IdSettingName) { + YQL_ENSURE(tuple.Value().Maybe<TCoAtom>()); + settings.Id = tuple.Value().Cast<TCoAtom>().Value(); + } else if (name == LogicalIdSettingName) { + YQL_ENSURE(tuple.Value().Maybe<TCoAtom>()); + settings.LogicalId = FromString<ui64>(tuple.Value().Cast<TCoAtom>().Value()); + } else if (name == SinglePartitionSettingName) { + settings.SinglePartition = true; + } else if (name == WideChannelsSettingName) { + settings.WideChannels = true; + settings.OutputNarrowType = tuple.Value().Ref().GetTypeAnn()->Cast<TTypeExprType>()->GetType()->Cast<TStructExprType>(); + } + } + + return settings; +} + +bool TDqStageSettings::Validate(const TExprNode& stage, TExprContext& ctx) { + auto& settings = *stage.Child(TDqStageBase::idx_Settings); + if (!EnsureTuple(settings, ctx)) { + return false; + } + + for (auto& setting: settings.Children()) { + if (!EnsureTupleMinSize(*setting, 1, ctx)) { + return false; + } + + if (!EnsureAtom(*setting->Child(0), ctx)) { + return false; + } + + TStringBuf name = setting->Head().Content(); + if (name == IdSettingName || name == LogicalIdSettingName) { + if (setting->ChildrenSize() != 2) { + ctx.AddError(TIssue(ctx.GetPosition(setting->Pos()), TStringBuilder() << "Setting " << name << " should contain single value")); + return false; + } + auto value = setting->Child(1); + if (!EnsureAtom(*value, ctx)) { + return false; + } + + if (name == LogicalIdSettingName && !TryFromString<ui64>(value->Content())) { + ctx.AddError(TIssue(ctx.GetPosition(setting->Pos()), TStringBuilder() << "Setting " << name << " should contain ui64 value, but got: " << value->Content())); + return false; + } + } else if (name == WideChannelsSettingName) { + if (setting->ChildrenSize() != 2) { + ctx.AddError(TIssue(ctx.GetPosition(setting->Pos()), TStringBuilder() << "Setting " << name << " should contain single value")); + return false; + } + auto value = setting->Child(1); + if (!EnsureType(*value, ctx)) { + return false; + } + + auto valueType = value->GetTypeAnn()->Cast<TTypeExprType>()->GetType(); + if (!EnsureStructType(value->Pos(), *valueType, ctx)) { + return false; + } + } else if (name == SinglePartitionSettingName) { + if (setting->ChildrenSize() != 1) { + ctx.AddError(TIssue(ctx.GetPosition(setting->Pos()), TStringBuilder() << "Setting " << name << " should not contain any value")); + return false; + } + } + } + + return true; +} + +TDqStageSettings TDqStageSettings::New(const NNodes::TDqStageBase& node) { + auto settings = Parse(node); + + if (!settings.Id) { + settings.Id = CreateGuidAsString(); + } + + return settings; +} + +TDqStageSettings TDqStageSettings::New() { + TDqStageSettings s; + s.Id = CreateGuidAsString(); + return s; +} + +NNodes::TCoNameValueTupleList TDqStageSettings::BuildNode(TExprContext& ctx, TPositionHandle pos) const { + TVector<TCoNameValueTuple> settings; + auto logicalId = LogicalId; + if (!logicalId) { + logicalId = ctx.NextUniqueId; + } + + settings.push_back(Build<TCoNameValueTuple>(ctx, pos) + .Name().Build(LogicalIdSettingName) + .Value<TCoAtom>().Build(logicalId) + .Done()); + + if (Id) { + settings.push_back(Build<TCoNameValueTuple>(ctx, pos) + .Name().Build(IdSettingName) + .Value<TCoAtom>().Build(Id) + .Done()); + } + + if (SinglePartition) { + settings.push_back(Build<TCoNameValueTuple>(ctx, pos) + .Name().Build(SinglePartitionSettingName) + .Done()); + } + + if (WideChannels) { + YQL_ENSURE(OutputNarrowType); + settings.push_back(Build<TCoNameValueTuple>(ctx, pos) + .Name().Build(WideChannelsSettingName) + .Value(ExpandType(pos, *OutputNarrowType, ctx)) + .Done()); + } + + return Build<TCoNameValueTupleList>(ctx, pos) + .Add(settings) + .Done(); +} + + TString PrintDqStageOnly(const TDqStageBase& stage, TExprContext& ctx) { if (stage.Inputs().Empty()) { return NCommon::ExprToPrettyString(ctx, stage.Ref()); diff --git a/ydb/library/yql/dq/type_ann/dq_type_ann.h b/ydb/library/yql/dq/type_ann/dq_type_ann.h index 9c96d67b8e..1df5cbbc02 100644 --- a/ydb/library/yql/dq/type_ann/dq_type_ann.h +++ b/ydb/library/yql/dq/type_ann/dq_type_ann.h @@ -33,6 +33,31 @@ bool IsTypeSupportedInMergeCn(EDataSlot type); bool IsTypeSupportedInMergeCn(const TDataExprType* dataType); bool IsMergeConnectionApplicable(const TVector<const TTypeAnnotationNode*>& sortKeyTypes); +struct TDqStageSettings { + static constexpr TStringBuf LogicalIdSettingName = "_logical_id"; + static constexpr TStringBuf IdSettingName = "_id"; + static constexpr TStringBuf SinglePartitionSettingName = "_single_partition"; + static constexpr TStringBuf WideChannelsSettingName = "_wide_channels"; + + ui64 LogicalId = 0; + TString Id; + bool SinglePartition = false; + + bool WideChannels = false; + const TStructExprType* OutputNarrowType = nullptr; + + TDqStageSettings& SetSinglePartition(bool value = true) { SinglePartition = value; return *this; } + TDqStageSettings& SetWideChannels(const TStructExprType& narrowType) { WideChannels = true; OutputNarrowType = &narrowType; return *this; } + + static TDqStageSettings New(const NNodes::TDqStageBase& node); + static TDqStageSettings New(); + + static TDqStageSettings Parse(const NNodes::TDqStageBase& node); + static bool Validate(const TExprNode& stage, TExprContext& ctx); + NNodes::TCoNameValueTupleList BuildNode(TExprContext& ctx, TPositionHandle pos) const; +}; + + TString PrintDqStageOnly(const NNodes::TDqStageBase& stage, TExprContext& ctx); } // namespace NYql::NDq diff --git a/ydb/library/yql/providers/dq/opt/logical_optimize.cpp b/ydb/library/yql/providers/dq/opt/logical_optimize.cpp index 6104801fc7..387f355205 100644 --- a/ydb/library/yql/providers/dq/opt/logical_optimize.cpp +++ b/ydb/library/yql/providers/dq/opt/logical_optimize.cpp @@ -8,6 +8,7 @@ #include <ydb/library/yql/dq/integration/yql_dq_optimization.h> #include <ydb/library/yql/dq/opt/dq_opt_log.h> #include <ydb/library/yql/dq/opt/dq_opt.h> +#include <ydb/library/yql/dq/type_ann/dq_type_ann.h> #include <ydb/library/yql/dq/expr_nodes/dq_expr_nodes.h> #include <ydb/library/yql/core/yql_opt_utils.h> #include <ydb/library/yql/utils/log/log.h> diff --git a/ydb/library/yql/providers/dq/opt/physical_optimize.cpp b/ydb/library/yql/providers/dq/opt/physical_optimize.cpp index a935041d29..e2ff322c9d 100644 --- a/ydb/library/yql/providers/dq/opt/physical_optimize.cpp +++ b/ydb/library/yql/providers/dq/opt/physical_optimize.cpp @@ -5,6 +5,7 @@ #include <ydb/library/yql/dq/opt/dq_opt_phy.h> #include <ydb/library/yql/dq/opt/dq_opt_join.h> #include <ydb/library/yql/dq/opt/dq_opt.h> +#include <ydb/library/yql/dq/type_ann/dq_type_ann.h> #include <ydb/library/yql/core/expr_nodes/yql_expr_nodes.h> #include <ydb/library/yql/core/yql_opt_utils.h> diff --git a/ydb/library/yql/providers/dq/planner/CMakeLists.darwin-x86_64.txt b/ydb/library/yql/providers/dq/planner/CMakeLists.darwin-x86_64.txt index df735d780b..e75e5ea430 100644 --- a/ydb/library/yql/providers/dq/planner/CMakeLists.darwin-x86_64.txt +++ b/ydb/library/yql/providers/dq/planner/CMakeLists.darwin-x86_64.txt @@ -18,6 +18,7 @@ target_link_libraries(providers-dq-planner PUBLIC minikql-comp_nodes-llvm yql-dq-integration yql-dq-opt + yql-dq-type_ann yql-dq-tasks providers-common-mkql dq-api-protos diff --git a/ydb/library/yql/providers/dq/planner/CMakeLists.linux-aarch64.txt b/ydb/library/yql/providers/dq/planner/CMakeLists.linux-aarch64.txt index 2e9717c394..301b127603 100644 --- a/ydb/library/yql/providers/dq/planner/CMakeLists.linux-aarch64.txt +++ b/ydb/library/yql/providers/dq/planner/CMakeLists.linux-aarch64.txt @@ -19,6 +19,7 @@ target_link_libraries(providers-dq-planner PUBLIC minikql-comp_nodes-llvm yql-dq-integration yql-dq-opt + yql-dq-type_ann yql-dq-tasks providers-common-mkql dq-api-protos diff --git a/ydb/library/yql/providers/dq/planner/CMakeLists.linux-x86_64.txt b/ydb/library/yql/providers/dq/planner/CMakeLists.linux-x86_64.txt index 2e9717c394..301b127603 100644 --- a/ydb/library/yql/providers/dq/planner/CMakeLists.linux-x86_64.txt +++ b/ydb/library/yql/providers/dq/planner/CMakeLists.linux-x86_64.txt @@ -19,6 +19,7 @@ target_link_libraries(providers-dq-planner PUBLIC minikql-comp_nodes-llvm yql-dq-integration yql-dq-opt + yql-dq-type_ann yql-dq-tasks providers-common-mkql dq-api-protos diff --git a/ydb/library/yql/providers/dq/planner/CMakeLists.windows-x86_64.txt b/ydb/library/yql/providers/dq/planner/CMakeLists.windows-x86_64.txt index df735d780b..e75e5ea430 100644 --- a/ydb/library/yql/providers/dq/planner/CMakeLists.windows-x86_64.txt +++ b/ydb/library/yql/providers/dq/planner/CMakeLists.windows-x86_64.txt @@ -18,6 +18,7 @@ target_link_libraries(providers-dq-planner PUBLIC minikql-comp_nodes-llvm yql-dq-integration yql-dq-opt + yql-dq-type_ann yql-dq-tasks providers-common-mkql dq-api-protos diff --git a/ydb/library/yql/providers/dq/planner/execution_planner.cpp b/ydb/library/yql/providers/dq/planner/execution_planner.cpp index a41d672447..bf404a4410 100644 --- a/ydb/library/yql/providers/dq/planner/execution_planner.cpp +++ b/ydb/library/yql/providers/dq/planner/execution_planner.cpp @@ -21,6 +21,7 @@ #include <ydb/library/yql/dq/opt/dq_opt.h> #include <ydb/library/yql/dq/tasks/dq_connection_builder.h> #include <ydb/library/yql/dq/tasks/dq_task_program.h> +#include <ydb/library/yql/dq/type_ann/dq_type_ann.h> #include <ydb/library/yql/utils/log/log.h> #include <ydb/library/yql/core/services/yql_transform_pipeline.h> #include <ydb/library/yql/minikql/aligned_page_pool.h> diff --git a/ydb/library/yql/providers/dq/planner/ya.make b/ydb/library/yql/providers/dq/planner/ya.make index c6ba1141c8..df1e402ef4 100644 --- a/ydb/library/yql/providers/dq/planner/ya.make +++ b/ydb/library/yql/providers/dq/planner/ya.make @@ -5,6 +5,7 @@ PEERDIR( ydb/library/yql/minikql/comp_nodes/llvm ydb/library/yql/dq/integration ydb/library/yql/dq/opt + ydb/library/yql/dq/type_ann ydb/library/yql/dq/tasks ydb/library/yql/providers/common/mkql ydb/library/yql/providers/dq/api/protos diff --git a/ydb/library/yql/providers/dq/provider/yql_dq_datasource.cpp b/ydb/library/yql/providers/dq/provider/yql_dq_datasource.cpp index 7a4cb98e69..6821a6502e 100644 --- a/ydb/library/yql/providers/dq/provider/yql_dq_datasource.cpp +++ b/ydb/library/yql/providers/dq/provider/yql_dq_datasource.cpp @@ -20,6 +20,7 @@ #include <ydb/library/yql/dq/opt/dq_opt_build.h> #include <ydb/library/yql/dq/opt/dq_opt.h> +#include <ydb/library/yql/dq/type_ann/dq_type_ann.h> #include <ydb/library/yql/utils/log/log.h> #include <ydb/library/yql/core/services/yql_transform_pipeline.h> diff --git a/ydb/library/yql/providers/yt/provider/CMakeLists.darwin-x86_64.txt b/ydb/library/yql/providers/yt/provider/CMakeLists.darwin-x86_64.txt index bcd890489a..c3499508b5 100644 --- a/ydb/library/yql/providers/yt/provider/CMakeLists.darwin-x86_64.txt +++ b/ydb/library/yql/providers/yt/provider/CMakeLists.darwin-x86_64.txt @@ -42,6 +42,7 @@ target_link_libraries(providers-yt-provider PUBLIC core-url_lister-interface yql-dq-integration yql-dq-opt + yql-dq-type_ann library-yql-minikql providers-common-codec providers-common-config diff --git a/ydb/library/yql/providers/yt/provider/CMakeLists.linux-aarch64.txt b/ydb/library/yql/providers/yt/provider/CMakeLists.linux-aarch64.txt index 4fb7023356..01abb47076 100644 --- a/ydb/library/yql/providers/yt/provider/CMakeLists.linux-aarch64.txt +++ b/ydb/library/yql/providers/yt/provider/CMakeLists.linux-aarch64.txt @@ -43,6 +43,7 @@ target_link_libraries(providers-yt-provider PUBLIC core-url_lister-interface yql-dq-integration yql-dq-opt + yql-dq-type_ann library-yql-minikql providers-common-codec providers-common-config diff --git a/ydb/library/yql/providers/yt/provider/CMakeLists.linux-x86_64.txt b/ydb/library/yql/providers/yt/provider/CMakeLists.linux-x86_64.txt index 4fb7023356..01abb47076 100644 --- a/ydb/library/yql/providers/yt/provider/CMakeLists.linux-x86_64.txt +++ b/ydb/library/yql/providers/yt/provider/CMakeLists.linux-x86_64.txt @@ -43,6 +43,7 @@ target_link_libraries(providers-yt-provider PUBLIC core-url_lister-interface yql-dq-integration yql-dq-opt + yql-dq-type_ann library-yql-minikql providers-common-codec providers-common-config diff --git a/ydb/library/yql/providers/yt/provider/CMakeLists.windows-x86_64.txt b/ydb/library/yql/providers/yt/provider/CMakeLists.windows-x86_64.txt index bcd890489a..c3499508b5 100644 --- a/ydb/library/yql/providers/yt/provider/CMakeLists.windows-x86_64.txt +++ b/ydb/library/yql/providers/yt/provider/CMakeLists.windows-x86_64.txt @@ -42,6 +42,7 @@ target_link_libraries(providers-yt-provider PUBLIC core-url_lister-interface yql-dq-integration yql-dq-opt + yql-dq-type_ann library-yql-minikql providers-common-codec providers-common-config diff --git a/ydb/library/yql/providers/yt/provider/ya.make b/ydb/library/yql/providers/yt/provider/ya.make index 744053b65e..b07485f878 100644 --- a/ydb/library/yql/providers/yt/provider/ya.make +++ b/ydb/library/yql/providers/yt/provider/ya.make @@ -64,6 +64,7 @@ PEERDIR( ydb/library/yql/core/url_lister/interface ydb/library/yql/dq/integration ydb/library/yql/dq/opt + ydb/library/yql/dq/type_ann ydb/library/yql/minikql ydb/library/yql/providers/common/codec ydb/library/yql/providers/common/config diff --git a/ydb/library/yql/providers/yt/provider/yql_yt_dq_hybrid.cpp b/ydb/library/yql/providers/yt/provider/yql_yt_dq_hybrid.cpp index 1f9866025b..92ffdcdd2d 100644 --- a/ydb/library/yql/providers/yt/provider/yql_yt_dq_hybrid.cpp +++ b/ydb/library/yql/providers/yt/provider/yql_yt_dq_hybrid.cpp @@ -18,6 +18,7 @@ #include <ydb/library/yql/core/yql_data_provider.h> #include <ydb/library/yql/dq/expr_nodes/dq_expr_nodes.h> #include <ydb/library/yql/dq/opt/dq_opt_phy.h> +#include <ydb/library/yql/dq/type_ann/dq_type_ann.h> #include <ydb/library/yql/utils/log/log.h> #include <ydb/library/yql/utils/yql_panic.h> #include <ydb/library/yql/minikql/mkql_program_builder.h> diff --git a/ydb/library/yql/providers/yt/provider/yql_yt_physical_optimize.cpp b/ydb/library/yql/providers/yt/provider/yql_yt_physical_optimize.cpp index d28baa9ef5..a7ed8ced5a 100644 --- a/ydb/library/yql/providers/yt/provider/yql_yt_physical_optimize.cpp +++ b/ydb/library/yql/providers/yt/provider/yql_yt_physical_optimize.cpp @@ -30,6 +30,7 @@ #include <ydb/library/yql/core/yql_data_provider.h> #include <ydb/library/yql/dq/expr_nodes/dq_expr_nodes.h> #include <ydb/library/yql/dq/opt/dq_opt_phy.h> +#include <ydb/library/yql/dq/type_ann/dq_type_ann.h> #include <ydb/library/yql/ast/yql_expr.h> #include <ydb/library/yql/utils/log/log.h> #include <ydb/library/yql/core/extract_predicate/extract_predicate.h> |