diff options
author | aneporada <aneporada@ydb.tech> | 2023-03-27 13:36:56 +0300 |
---|---|---|
committer | aneporada <aneporada@ydb.tech> | 2023-03-27 13:36:56 +0300 |
commit | c89b7b6f6f7deee5667e9b4289bce25064fbcada (patch) | |
tree | e07a35f8d1e7b57160e97f3bd532625c61028a1f | |
parent | b826ba6c4f21f1712cbab37e8d04ef1bed83adfa (diff) | |
download | ydb-c89b7b6f6f7deee5667e9b4289bce25064fbcada.tar.gz |
Support wide stage inputs and output (for DqPhyStage)
Support wide stage inputs and output (for DqPhyStage)
-rw-r--r-- | ydb/core/kqp/opt/kqp_opt_build_txs.cpp | 3 | ||||
-rw-r--r-- | ydb/library/yql/core/yql_expr_type_annotation.cpp | 24 | ||||
-rw-r--r-- | ydb/library/yql/core/yql_expr_type_annotation.h | 2 | ||||
-rw-r--r-- | ydb/library/yql/dq/opt/dq_opt.cpp | 11 | ||||
-rw-r--r-- | ydb/library/yql/dq/opt/dq_opt.h | 5 | ||||
-rw-r--r-- | ydb/library/yql/dq/opt/dq_opt_build.cpp | 260 | ||||
-rw-r--r-- | ydb/library/yql/dq/opt/dq_opt_build.h | 2 | ||||
-rw-r--r-- | ydb/library/yql/dq/type_ann/dq_type_ann.cpp | 112 | ||||
-rw-r--r-- | ydb/library/yql/providers/dq/common/yql_dq_settings.cpp | 2 | ||||
-rw-r--r-- | ydb/library/yql/providers/dq/common/yql_dq_settings.h | 7 | ||||
-rw-r--r-- | ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp | 3 |
11 files changed, 413 insertions, 18 deletions
diff --git a/ydb/core/kqp/opt/kqp_opt_build_txs.cpp b/ydb/core/kqp/opt/kqp_opt_build_txs.cpp index 77c856e7bf7..162554768a5 100644 --- a/ydb/core/kqp/opt/kqp_opt_build_txs.cpp +++ b/ydb/core/kqp/opt/kqp_opt_build_txs.cpp @@ -22,7 +22,8 @@ using TStatus = IGraphTransformer::TStatus; namespace { TAutoPtr<NYql::IGraphTransformer> CreateKqpBuildPhyStagesTransformer(bool allowDependantConsumers) { - return NDq::CreateDqBuildPhyStagesTransformer(allowDependantConsumers); + bool useWideChannels = false; + return NDq::CreateDqBuildPhyStagesTransformer(allowDependantConsumers, useWideChannels); } class TKqpBuildTxTransformer : public TSyncTransformerBase { diff --git a/ydb/library/yql/core/yql_expr_type_annotation.cpp b/ydb/library/yql/core/yql_expr_type_annotation.cpp index e5cf68a6354..24ecb5ff63a 100644 --- a/ydb/library/yql/core/yql_expr_type_annotation.cpp +++ b/ydb/library/yql/core/yql_expr_type_annotation.cpp @@ -2742,6 +2742,30 @@ bool EnsureWideFlowType(TPositionHandle position, const TTypeAnnotationNode& typ return true; } +bool EnsureWideStreamType(const TExprNode& node, TExprContext& ctx) { + if (HasError(node.GetTypeAnn(), ctx) || !node.GetTypeAnn()) { + YQL_ENSURE(node.Type() == TExprNode::Lambda); + ctx.AddError(TIssue(ctx.GetPosition(node.Pos()), TStringBuilder() << "Expected wide stream type, but got lambda")); + return false; + } + + if (node.GetTypeAnn()->GetKind() != ETypeAnnotationKind::Stream || node.GetTypeAnn()->Cast<TStreamExprType>()->GetItemType()->GetKind() != ETypeAnnotationKind::Multi) { + ctx.AddError(TIssue(ctx.GetPosition(node.Pos()), TStringBuilder() << "Expected wide stream type, but got: " << *node.GetTypeAnn())); + return false; + } + + return true; +} + +bool EnsureWideStreamType(TPositionHandle position, const TTypeAnnotationNode& type, TExprContext& ctx) { + if (HasError(&type, ctx) || type.GetKind() != ETypeAnnotationKind::Stream || type.Cast<TStreamExprType>()->GetItemType()->GetKind() != ETypeAnnotationKind::Multi) { + ctx.AddError(TIssue(ctx.GetPosition(position), TStringBuilder() << "Expected wide stream type, but got: " << type)); + return false; + } + + return true; +} + bool EnsureWideFlowBlockType(const TExprNode& node, TTypeAnnotationNode::TListType& blockItemTypes, TExprContext& ctx, bool allowScalar) { if (!EnsureWideFlowType(node, ctx)) { return false; diff --git a/ydb/library/yql/core/yql_expr_type_annotation.h b/ydb/library/yql/core/yql_expr_type_annotation.h index af62e050e6b..93941fa95e3 100644 --- a/ydb/library/yql/core/yql_expr_type_annotation.h +++ b/ydb/library/yql/core/yql_expr_type_annotation.h @@ -121,6 +121,8 @@ bool EnsureFlowType(const TExprNode& node, TExprContext& ctx); bool EnsureFlowType(TPositionHandle position, const TTypeAnnotationNode& type, TExprContext& ctx); bool EnsureWideFlowType(const TExprNode& node, TExprContext& ctx); bool EnsureWideFlowType(TPositionHandle position, const TTypeAnnotationNode& type, TExprContext& ctx); +bool EnsureWideStreamType(const TExprNode& node, TExprContext& ctx); +bool EnsureWideStreamType(TPositionHandle position, const TTypeAnnotationNode& type, TExprContext& ctx); bool EnsureWideFlowBlockType(const TExprNode& node, TTypeAnnotationNode::TListType& blockItemTypes, TExprContext& ctx, bool allowScalar = true); bool EnsureOptionalType(const TExprNode& node, TExprContext& ctx); bool EnsureOptionalType(TPositionHandle position, const TTypeAnnotationNode& type, TExprContext& ctx); diff --git a/ydb/library/yql/dq/opt/dq_opt.cpp b/ydb/library/yql/dq/opt/dq_opt.cpp index f27f3949f5a..cbde2de0e20 100644 --- a/ydb/library/yql/dq/opt/dq_opt.cpp +++ b/ydb/library/yql/dq/opt/dq_opt.cpp @@ -21,6 +21,9 @@ TDqStageSettings TDqStageSettings::Parse(const TDqStageBase& node) { settings.LogicalId = FromString<ui64>(tuple.Value().Cast<TCoAtom>().Value()); } else if (name == SinglePartitionSettingName) { settings.SinglePartition = true; + } else if (name == WideChannelsSettingName) { + settings.WideChannels = true; + settings.OutputNarrowType = tuple.Value().Ref().GetTypeAnn()->Cast<TTypeExprType>()->GetType()->Cast<TStructExprType>(); } } @@ -62,6 +65,14 @@ NNodes::TCoNameValueTupleList TDqStageSettings::BuildNode(TExprContext& ctx, TPo .Done()); } + if (WideChannels) { + YQL_ENSURE(OutputNarrowType); + settings.push_back(Build<TCoNameValueTuple>(ctx, pos) + .Name().Build(WideChannelsSettingName) + .Value(ExpandType(pos, *OutputNarrowType, ctx)) + .Done()); + } + return Build<TCoNameValueTupleList>(ctx, pos) .Add(settings) .Done(); diff --git a/ydb/library/yql/dq/opt/dq_opt.h b/ydb/library/yql/dq/opt/dq_opt.h index c49a97266ea..84174e04fd1 100644 --- a/ydb/library/yql/dq/opt/dq_opt.h +++ b/ydb/library/yql/dq/opt/dq_opt.h @@ -12,12 +12,17 @@ struct TDqStageSettings { static constexpr TStringBuf LogicalIdSettingName = "_logical_id"; static constexpr TStringBuf IdSettingName = "_id"; static constexpr TStringBuf SinglePartitionSettingName = "_single_partition"; + static constexpr TStringBuf WideChannelsSettingName = "_wide_channels"; ui64 LogicalId = 0; TString Id; bool SinglePartition = false; + bool WideChannels = false; + const TStructExprType* OutputNarrowType = nullptr; + TDqStageSettings& SetSinglePartition(bool value = true) { SinglePartition = value; return *this; } + TDqStageSettings& SetWideChannels(const TStructExprType& narrowType) { WideChannels = true; OutputNarrowType = &narrowType; return *this; } static TDqStageSettings New(const NNodes::TDqStageBase& node); diff --git a/ydb/library/yql/dq/opt/dq_opt_build.cpp b/ydb/library/yql/dq/opt/dq_opt_build.cpp index b07be281fe9..22985a33519 100644 --- a/ydb/library/yql/dq/opt/dq_opt_build.cpp +++ b/ydb/library/yql/dq/opt/dq_opt_build.cpp @@ -227,7 +227,7 @@ void MakeConsumerReplaces( class TDqReplaceStageConsumersTransformer : public TSyncTransformerBase { public: explicit TDqReplaceStageConsumersTransformer(bool allowDependantConsumers) - : AllowDependantConsumers(allowDependantConsumers) {} + : AllowDependantConsumers_(allowDependantConsumers) {} TStatus DoTransform(TExprNode::TPtr input, TExprNode::TPtr& output, TExprContext& ctx) final { YQL_CLOG(TRACE, CoreDq) << "[DQ/Build/TransformConsumers] " << NCommon::ExprToPrettyString(ctx, *input); @@ -296,7 +296,7 @@ public: TNodeOnNodeOwnedMap replaces; for (const auto& [stage, info] : consumersMap) { - MakeConsumerReplaces(TDqStage(stage), info, AllowDependantConsumers, replaces, ctx); + MakeConsumerReplaces(TDqStage(stage), info, AllowDependantConsumers_, replaces, ctx); } if (replaces.empty()) { @@ -312,7 +312,7 @@ public: } private: - const bool AllowDependantConsumers; + const bool AllowDependantConsumers_; }; class TDqBuildPhysicalStagesTransformer : public TSyncTransformerBase { @@ -384,9 +384,252 @@ public: } }; +bool CanRebuildForWideChannelOutput(const TDqOutput& output) { + ui32 index = FromString<ui32>(output.Index().Value()); + if (index != 0) { + // stage has multiple outputs + return false; + } + + auto stage = output.Stage().Cast<TDqPhyStage>(); + auto stageSettings = TDqStageSettings::Parse(stage); + if (stageSettings.WideChannels) { + return false; + } + + auto stageOutputType = stage.Ref().GetTypeAnn()->Cast<TTupleExprType>(); + if (stageOutputType->GetSize() != 1 || stage.Outputs().IsValid()) { + return false; + } + + return true; +} + +bool CanRebuildForWideChannelOutput(const TDqConnection& conn) { + if (conn.Maybe<TDqCnResult>() || conn.Maybe<TDqCnValue>()) { + return false; + } + + ui32 index = FromString<ui32>(conn.Output().Index().Value()); + if (index != 0) { + // stage has multiple outputs + return false; + } + + auto stage = conn.Output().Stage().Cast<TDqPhyStage>(); + auto stageSettings = TDqStageSettings::Parse(stage); + if (stageSettings.WideChannels) { + return false; + } + + auto stageOutputType = stage.Ref().GetTypeAnn()->Cast<TTupleExprType>(); + if (stageOutputType->GetSize() != 1 || stage.Outputs().IsValid()) { + return false; + } + + return true; +} + +const TStructExprType* GetStageOutputItemType(const TDqPhyStage& stage) { + const TTupleExprType* stageType = stage.Ref().GetTypeAnn()->Cast<TTupleExprType>(); + YQL_ENSURE(stageType->GetSize() == 1); + return stageType->GetItems()[0]->Cast<TListExprType>()->GetItemType()->Cast<TStructExprType>(); +} + +TDqPhyStage RebuildStageInputsAsWide(const TDqPhyStage& stage, TExprContext& ctx) { + TVector<TCoArgument> newArgs; + newArgs.reserve(stage.Inputs().Size()); + TNodeOnNodeOwnedMap argsMap; + + YQL_ENSURE(stage.Inputs().Size() == stage.Program().Args().Size()); + + bool needRebuild = false; + for (size_t i = 0; i < stage.Inputs().Size(); ++i) { + TCoArgument arg = stage.Program().Args().Arg(i); + + auto newArg = TCoArgument(ctx.NewArgument(arg.Pos(), arg.Name())); + newArgs.emplace_back(newArg); + + auto maybeConn = stage.Inputs().Item(i).Maybe<TDqConnection>(); + + if (maybeConn && CanRebuildForWideChannelOutput(maybeConn.Cast().Output())) { + needRebuild = true; + auto itemType = arg.Ref().GetTypeAnn()->Cast<TStreamExprType>()->GetItemType()->Cast<TStructExprType>(); + + // input will actually be wide stream - need to convert it back to stream + auto argReplace = ctx.Builder(arg.Pos()) + .Callable("FromFlow") + .Callable(0, "NarrowMap") + .Callable(0, "ToFlow") + .Add(0, newArg.Ptr()) + .Seal() + .Lambda(1) + .Params("fields", itemType->GetSize()) + .Callable("AsStruct") + .Do([&](TExprNodeBuilder& parent) -> TExprNodeBuilder& { + ui32 i = 0U; + for (const auto& item : itemType->GetItems()) { + parent.List(i) + .Atom(0, item->GetName()) + .Arg(1, "fields", i) + .Seal(); + ++i; + } + return parent; + }) + .Seal() + .Seal() + .Seal() + .Seal() + .Build(); + + argsMap.emplace(arg.Raw(), argReplace); + } else { + argsMap.emplace(arg.Raw(), newArg.Ptr()); + } + } + + if (!needRebuild) { + return stage; + } + + return Build<TDqPhyStage>(ctx, stage.Pos()) + .InitFrom(stage) + .Program() + .Args(newArgs) + .Body(ctx.ReplaceNodes(stage.Program().Body().Ptr(), argsMap)) + .Build() + .Done(); +} + +TDqPhyStage RebuildStageOutputAsWide(const TDqPhyStage& stage, const TStructExprType& outputItemType, TExprContext& ctx) { + // convert stream to wide stream + auto resultStream = ctx.Builder(stage.Program().Body().Pos()) + .Callable("FromFlow") + .Callable(0, "ExpandMap") + .Callable(0, "ToFlow") + .Add(0, stage.Program().Body().Ptr()) + .Seal() + .Lambda(1) + .Param("item") + .Do([&](TExprNodeBuilder& lambda) -> TExprNodeBuilder& { + ui32 i = 0U; + for (const auto& item : outputItemType.GetItems()) { + lambda.Callable(i++, "Member") + .Arg(0, "item") + .Atom(1, item->GetName()) + .Seal(); + } + return lambda; + }) + .Seal() + .Seal() + .Seal() + .Build(); + + return Build<TDqPhyStage>(ctx, stage.Pos()) + .InitFrom(stage) + .Program() + .Args(stage.Program().Args()) + .Body(resultStream) + .Build() + .Settings(TDqStageSettings::New(stage).SetWideChannels(outputItemType).BuildNode(ctx, stage.Pos())) + .Done(); +} + +TDqPhyStage RebuildStageAsWide(const TDqPhyStage& stage, TExprContext& ctx) { + const TStructExprType* outputItemType = GetStageOutputItemType(stage); + return RebuildStageOutputAsWide(RebuildStageInputsAsWide(stage, ctx), *outputItemType, ctx); +} + +IGraphTransformer::TStatus DqEnableWideChannels(TExprNode::TPtr input, TExprNode::TPtr& output, + TExprContext& ctx) +{ + output = input; + TNodeOnNodeOwnedMap replaces; + TNodeSet processedStages; + VisitExpr(input, [&replaces, &processedStages, &ctx](const TExprNode::TPtr& node) { + if (node->IsLambda()) { + return false; + } + + TExprBase expr{node}; + auto maybeConn = expr.Maybe<TDqConnection>(); + if (maybeConn && CanRebuildForWideChannelOutput(maybeConn.Cast())) { + auto conn = maybeConn.Cast(); + processedStages.insert(conn.Output().Stage().Raw()); + auto newStage = RebuildStageAsWide(conn.Output().Stage().Cast<TDqPhyStage>(), ctx); + auto outputItemType = GetStageOutputItemType(conn.Output().Stage().Cast<TDqPhyStage>()); + + if (conn.Maybe<TDqCnHashShuffle>()) { + auto shuffle = conn.Maybe<TDqCnHashShuffle>().Cast(); + auto builder = Build<TCoAtomList>(ctx, shuffle.KeyColumns().Pos()); + for (auto key : shuffle.KeyColumns()) { + ui32 idx = *outputItemType->FindItem(key.Value()); + builder.Add<TCoAtom>().Build(ToString(idx)); + } + + replaces[conn.Raw()] = Build<TDqCnHashShuffle>(ctx, conn.Pos()) + .Output<TDqOutput>() + .InitFrom(conn.Output()) + .Stage(newStage) + .Build() + .KeyColumns(builder.Build().Value()) + .Done().Ptr(); + } else if (conn.Maybe<TDqCnMerge>()) { + auto merge = conn.Maybe<TDqCnMerge>().Cast(); + auto builder = Build<TDqSortColumnList>(ctx, merge.SortColumns().Pos()); + for (auto sortColumn : merge.SortColumns()) { + ui32 idx = *outputItemType->FindItem(sortColumn.Column().Value()); + builder.Add<TDqSortColumn>() + .Column<TCoAtom>().Build(ToString(idx)) + .SortDirection(sortColumn.SortDirection()) + .Build(); + } + + replaces[conn.Raw()] = Build<TDqCnMerge>(ctx, conn.Pos()) + .Output<TDqOutput>() + .InitFrom(conn.Output()) + .Stage(newStage) + .Build() + .SortColumns(builder.Build().Value()) + .Done().Ptr(); + } else { + auto newOutput = Build<TDqOutput>(ctx, conn.Output().Pos()) + .InitFrom(conn.Output()) + .Stage(newStage) + .Done(); + replaces[conn.Raw()] = ctx.ChangeChild(conn.Ref(), TDqConnection::idx_Output, newOutput.Ptr()); + } + } else if (expr.Maybe<TDqPhyStage>()) { + auto stage = expr.Maybe<TDqPhyStage>().Cast(); + if (!processedStages.contains(stage.Raw())) { + processedStages.insert(stage.Raw()); + auto newStage = RebuildStageInputsAsWide(stage, ctx); + if (newStage.Raw() != stage.Raw()) { + replaces[stage.Raw()] = newStage.Ptr(); + } + } + } + + return true; + }); + + if (replaces.empty()) { + return IGraphTransformer::TStatus::Ok; + } + + YQL_CLOG(INFO, CoreDq) << "[DQ/Build/EnableWideChannels] " << "Enabled wide channels for " << replaces.size() << " stages"; + TOptimizeExprSettings settings{nullptr}; + settings.VisitLambdas = false; + auto status = RemapExpr(input, output, replaces, ctx, settings); + YQL_CLOG(TRACE, CoreDq) << "[DQ/Build/EnableWideChannels] " << "Dump: " << NCommon::ExprToPrettyString(ctx, *output); + return status; +} + } // namespace -TAutoPtr<IGraphTransformer> CreateDqBuildPhyStagesTransformer(bool allowDependantConsumers) { +TAutoPtr<IGraphTransformer> CreateDqBuildPhyStagesTransformer(bool allowDependantConsumers, bool useWideChannels) { TVector<TTransformStage> transformers; transformers.push_back(TTransformStage(CreateFunctorTransformer( @@ -406,6 +649,15 @@ TAutoPtr<IGraphTransformer> CreateDqBuildPhyStagesTransformer(bool allowDependan "BuildPhysicalStages", TIssuesIds::DEFAULT_ERROR)); + if (useWideChannels) { + transformers.push_back(TTransformStage(CreateFunctorTransformer( + [](const TExprNode::TPtr& input, TExprNode::TPtr& output, TExprContext& ctx) { + return DqEnableWideChannels(input, output, ctx); + }), + "EnableWideChannels", + TIssuesIds::DEFAULT_ERROR)); + } + return CreateCompositeGraphTransformer(transformers, false); } diff --git a/ydb/library/yql/dq/opt/dq_opt_build.h b/ydb/library/yql/dq/opt/dq_opt_build.h index 456b1f0c812..3853d8a3be6 100644 --- a/ydb/library/yql/dq/opt/dq_opt_build.h +++ b/ydb/library/yql/dq/opt/dq_opt_build.h @@ -5,6 +5,6 @@ namespace NYql::NDq { -TAutoPtr<IGraphTransformer> CreateDqBuildPhyStagesTransformer(bool allowDependantConsumers); +TAutoPtr<IGraphTransformer> CreateDqBuildPhyStagesTransformer(bool allowDependantConsumers, bool useWideChannels); } // namespace NYql::NDq diff --git a/ydb/library/yql/dq/type_ann/dq_type_ann.cpp b/ydb/library/yql/dq/type_ann/dq_type_ann.cpp index 0d0d66c4c76..3ccd0b7b67b 100644 --- a/ydb/library/yql/dq/type_ann/dq_type_ann.cpp +++ b/ydb/library/yql/dq/type_ann/dq_type_ann.cpp @@ -16,6 +16,9 @@ using TStatus = NYql::IGraphTransformer::TStatus; namespace { +// TODO: relocate names to common code for type_ann and dq_opt.cpp +static constexpr TStringBuf WideChannelsSettingName = "_wide_channels"; + const TTypeAnnotationNode* GetDqOutputType(const TDqOutput& output, TExprContext& ctx) { auto stageResultTuple = output.Stage().Ref().GetTypeAnn()->Cast<TTupleExprType>(); @@ -49,6 +52,34 @@ const TTypeAnnotationNode* GetDqConnectionType(const TDqConnection& node, TExprC return GetDqOutputType(node.Output(), ctx); } +const TTypeAnnotationNode* GetColumnType(const TDqConnection& node, const TStructExprType& structType, TStringBuf name, TPositionHandle pos, TExprContext& ctx) { + if (HasSetting(node.Output().Stage().Settings().Ref(), WideChannelsSettingName)) { + auto multiType = node.Output().Stage().Program().Ref().GetTypeAnn()->Cast<TStreamExprType>()->GetItemType()->Cast<TMultiExprType>(); + ui32 idx; + if (!TryFromString(name, idx)) { + ctx.AddError(TIssue(ctx.GetPosition(pos), + TStringBuilder() << "Expecting integer as column name, but got '" << name << "'")); + return nullptr; + } + if (idx >= multiType->GetSize()) { + ctx.AddError(TIssue(ctx.GetPosition(pos), + TStringBuilder() << "Column index too big: " << name << " >= " << multiType->GetSize())); + return nullptr; + } + + return multiType->GetItems()[idx]; + } + + auto result = structType.FindItemType(name); + if (!result) { + ctx.AddError(TIssue(ctx.GetPosition(pos), + TStringBuilder() << "Missing column '" << name << "'")); + return nullptr; + } + + return result; +} + template <typename TStage> TStatus AnnotateStage(const TExprNode::TPtr& stage, TExprContext& ctx) { if (!EnsureMinMaxArgsCount(*stage, 3, 4, ctx)) { @@ -67,6 +98,8 @@ TStatus AnnotateStage(const TExprNode::TPtr& stage, TExprContext& ctx) { return TStatus::Error; } + bool useWideChannels = false; + const TStructExprType* outputNarrowType = nullptr; for (auto& setting: settingsTuple->Children()) { if (!EnsureTupleMinSize(*setting, 1, ctx)) { return TStatus::Error; @@ -74,6 +107,31 @@ TStatus AnnotateStage(const TExprNode::TPtr& stage, TExprContext& ctx) { if (!EnsureAtom(*setting->Child(0), ctx)) { return TStatus::Error; } + + TStringBuf name = setting->Head().Content(); + if (name == WideChannelsSettingName) { + if (setting->ChildrenSize() != 2) { + ctx.AddError(TIssue(ctx.GetPosition(setting->Pos()), TStringBuilder() << "Setting " << name << " should contain single value")); + return TStatus::Error; + } + auto value = setting->Child(1); + if (!EnsureType(*value, ctx)) { + return TStatus::Error; + } + + auto valueType = value->GetTypeAnn()->Cast<TTypeExprType>()->GetType(); + if (!EnsureStructType(value->Pos(), *valueType, ctx)) { + return TStatus::Error; + } + + if constexpr (std::is_same_v<TStage, TDqStage>) { + ctx.AddError(TIssue(ctx.GetPosition(setting->Pos()), TStringBuilder() << "Setting " << name << " is not supported in " << stage->Content())); + return TStatus::Error; + } else { + useWideChannels = true; + outputNarrowType = valueType->Cast<TStructExprType>(); + } + } } if (!EnsureLambda(*programLambda, ctx)) { @@ -109,6 +167,13 @@ TStatus AnnotateStage(const TExprNode::TPtr& stage, TExprContext& ctx) { return TStatus::Error; } } + + if (TDqConnection::Match(input.Get())) { + TDqConnection conn(input); + if (HasSetting(conn.Output().Stage().Settings().Ref(), WideChannelsSettingName)) { + argType = conn.Output().Stage().Program().Ref().GetTypeAnn(); + } + } } if (!TDqPhyPrecompute::Match(input.Get()) && input->Content() != "KqpTxResultBinding") { @@ -161,10 +226,40 @@ TStatus AnnotateStage(const TExprNode::TPtr& stage, TExprContext& ctx) { } } + if (useWideChannels) { + if (!EnsureWideStreamType(*programLambda, ctx)) { + ctx.AddError(TIssue(ctx.GetPosition(programLambda->Pos()),TStringBuilder() << "Wide channel stage requires exactly one output, but got " << programResultTypesTuple.size())); + return TStatus::Error; + } + YQL_ENSURE(programResultTypesTuple.size() == 1); + auto multiType = programLambda->GetTypeAnn()->Cast<TStreamExprType>()->GetItemType()->Cast<TMultiExprType>(); + if (multiType->GetSize() != outputNarrowType->GetSize()) { + ctx.AddError(TIssue(ctx.GetPosition(programLambda->Pos()),TStringBuilder() << "Wide/narrow types has different number of items: " << + multiType->GetSize() << " vs " << outputNarrowType->GetSize())); + return TStatus::Error; + } + + for (size_t i = 0; i < outputNarrowType->GetSize(); ++i) { + auto structItem = outputNarrowType->GetItems()[i]; + if (!IsSameAnnotation(*structItem->GetItemType(), *(multiType->GetItems()[i]))) { + ctx.AddError(TIssue(ctx.GetPosition(programLambda->Pos()),TStringBuilder() << "Wide/narrow types mismatch for column '" << + structItem->GetName() << "' : " << *(multiType->GetItems()[i]) << " vs " << *structItem->GetItemType())); + return TStatus::Error; + } + } + + programResultTypesTuple[0] = ctx.MakeType<TListExprType>(outputNarrowType); + } + TVector<const TTypeAnnotationNode*> stageResultTypes; if (TDqStageBase::idx_Outputs < stage->ChildrenSize()) { YQL_ENSURE(stage->Child(TDqStageBase::idx_Outputs)->ChildrenSize() != 0, "Stage.Outputs list exists but empty, stage: " << stage->Dump()); + if (useWideChannels) { + ctx.AddError(TIssue(ctx.GetPosition(programLambda->Pos()),TStringBuilder() << "Wide channel stage is incompatible with Sink/Transform")); + return TStatus::Error; + } + auto outputsNumber = programResultTypesTuple.size(); TVector<TExprNode::TPtr> transforms; TVector<TExprNode::TPtr> sinks; @@ -522,13 +617,12 @@ TStatus AnnotateDqCnMerge(const TExprNode::TPtr& node, TExprContext& ctx) { { return TStatus::Error; } - TMaybe<ui32> colIndex = structType->FindItem(column.Column().StringValue()); - if (!colIndex) { - ctx.AddError(TIssue(ctx.GetPosition(column.Pos()), - TStringBuilder() << "Missing sort column: " << column.Column().StringValue())); + + auto colType = GetColumnType(TDqConnection(node), *structType, column.Column().Value(), column.Pos(), ctx); + if (!colType) { return TStatus::Error; } - const TTypeAnnotationNode* colType = (structType->GetItems())[*colIndex]->GetItemType(); + if (colType->GetKind() == ETypeAnnotationKind::Optional) { colType = colType->Cast<TOptionalExprType>()->GetItemType(); } @@ -582,12 +676,12 @@ TStatus AnnotateDqCnHashShuffle(const TExprNode::TPtr& input, TExprContext& ctx) if (!EnsureAtom(*column, ctx)) { return TStatus::Error; } - if (!structType->FindItem(column->Content())) { - ctx.AddError(TIssue(ctx.GetPosition(column->Pos()), - TStringBuilder() << "Missing key column: " << column->Content())); + auto ty = GetColumnType(TDqConnection(input), *structType, column->Content(), column->Pos(), ctx); + if (!ty) { return TStatus::Error; } - if (const auto ty = structType->FindItemType(column->Content()); !ty->IsHashable()) { + + if (!ty->IsHashable()) { ctx.AddError(TIssue(ctx.GetPosition(column->Pos()), TStringBuilder() << "Non-hashable key column: " << column->Content())); return TStatus::Error; diff --git a/ydb/library/yql/providers/dq/common/yql_dq_settings.cpp b/ydb/library/yql/providers/dq/common/yql_dq_settings.cpp index 925f8beb7e0..2c2bfb9dc65 100644 --- a/ydb/library/yql/providers/dq/common/yql_dq_settings.cpp +++ b/ydb/library/yql/providers/dq/common/yql_dq_settings.cpp @@ -58,6 +58,8 @@ TDqConfiguration::TDqConfiguration() { REGISTER_SETTING(*this, HashJoinMode).Parser([](const TString& v) { return FromString<NDq::EHashJoinMode>(v); }); REGISTER_SETTING(*this, HashShuffleTasksRatio).Lower(0.5).Upper(5); REGISTER_SETTING(*this, HashShuffleMaxTasks).Lower(1).Upper(1000); + + REGISTER_SETTING(*this, UseWideChannels); } } // namespace NYql diff --git a/ydb/library/yql/providers/dq/common/yql_dq_settings.h b/ydb/library/yql/providers/dq/common/yql_dq_settings.h index ae15121f183..4c9244b5444 100644 --- a/ydb/library/yql/providers/dq/common/yql_dq_settings.h +++ b/ydb/library/yql/providers/dq/common/yql_dq_settings.h @@ -90,6 +90,8 @@ struct TDqSettings { NCommon::TConfSetting<double, false> HashShuffleTasksRatio; NCommon::TConfSetting<ui32, false> HashShuffleMaxTasks; + NCommon::TConfSetting<bool, false> UseWideChannels; + // This options will be passed to executor_actor and worker_actor template <typename TProtoConfig> void Save(TProtoConfig& config) { @@ -133,8 +135,9 @@ struct TDqSettings { SAVE_SETTING(WatermarksLateArrivalDelayMs); SAVE_SETTING(UseAggPhases); SAVE_SETTING(HashJoinMode); - SAVE_SETTING(HashShuffleTasksRatio); - SAVE_SETTING(HashShuffleMaxTasks); + SAVE_SETTING(HashShuffleTasksRatio); + SAVE_SETTING(HashShuffleMaxTasks); + SAVE_SETTING(UseWideChannels); #undef SAVE_SETTING } diff --git a/ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp b/ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp index 1fa3b6571a2..35c630e19d2 100644 --- a/ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp +++ b/ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp @@ -230,7 +230,8 @@ private: void AfterTypeAnnotation(TTransformationPipeline* pipeline) const final { pipeline->Add(NDqs::CreateDqsReplacePrecomputesTransformer(*pipeline->GetTypeAnnotationContext(), State_->FunctionRegistry), "ReplacePrecomputes"); - pipeline->Add(NDq::CreateDqBuildPhyStagesTransformer(false), "BuildPhy"); + bool useWideChannels = State_->Settings->UseWideChannels.Get().GetOrElse(false); + pipeline->Add(NDq::CreateDqBuildPhyStagesTransformer(false, useWideChannels), "BuildPhy"); pipeline->Add(NDqs::CreateDqsRewritePhyCallablesTransformer(*pipeline->GetTypeAnnotationContext()), "RewritePhyCallables"); } |