diff options
author | aneporada <aneporada@ydb.tech> | 2023-07-27 12:09:43 +0300 |
---|---|---|
committer | aneporada <aneporada@ydb.tech> | 2023-07-27 12:09:43 +0300 |
commit | 3a6dc213690b48dfa360d9ced0091d077a2dca6e (patch) | |
tree | bc3919ad8848e22f42b8f25dc84f2b97ae070237 | |
parent | 930489b1ca5ecac97640bf891c5141397a645d44 (diff) | |
download | ydb-3a6dc213690b48dfa360d9ced0091d077a2dca6e.tar.gz |
Implement new pragma dq.UseWideBlockChannels
-rw-r--r-- | ydb/core/kqp/opt/kqp_opt_build_txs.cpp | 10 | ||||
-rw-r--r-- | ydb/library/yql/core/yql_expr_type_annotation.cpp | 43 | ||||
-rw-r--r-- | ydb/library/yql/core/yql_expr_type_annotation.h | 2 | ||||
-rw-r--r-- | ydb/library/yql/dq/opt/dq_opt_build.cpp | 71 | ||||
-rw-r--r-- | ydb/library/yql/dq/opt/dq_opt_build.h | 9 | ||||
-rw-r--r-- | ydb/library/yql/dq/type_ann/dq_type_ann.cpp | 29 | ||||
-rw-r--r-- | ydb/library/yql/providers/dq/common/yql_dq_settings.cpp | 7 | ||||
-rw-r--r-- | ydb/library/yql/providers/dq/common/yql_dq_settings.h | 2 | ||||
-rw-r--r-- | ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp | 11 |
9 files changed, 144 insertions, 40 deletions
diff --git a/ydb/core/kqp/opt/kqp_opt_build_txs.cpp b/ydb/core/kqp/opt/kqp_opt_build_txs.cpp index 162554768a..4997f007f9 100644 --- a/ydb/core/kqp/opt/kqp_opt_build_txs.cpp +++ b/ydb/core/kqp/opt/kqp_opt_build_txs.cpp @@ -21,9 +21,9 @@ using TStatus = IGraphTransformer::TStatus; namespace { -TAutoPtr<NYql::IGraphTransformer> CreateKqpBuildPhyStagesTransformer(bool allowDependantConsumers) { - bool useWideChannels = false; - return NDq::CreateDqBuildPhyStagesTransformer(allowDependantConsumers, useWideChannels); +TAutoPtr<NYql::IGraphTransformer> CreateKqpBuildPhyStagesTransformer(bool allowDependantConsumers, TTypeAnnotationContext& typesCtx) { + EChannelMode mode = EChannelMode::CHANNEL_SCALAR; + return NDq::CreateDqBuildPhyStagesTransformer(allowDependantConsumers, typesCtx, mode); } class TKqpBuildTxTransformer : public TSyncTransformerBase { @@ -471,7 +471,7 @@ public: .Add(TExprLogTransformer::Sync("TxOpt", NYql::NLog::EComponent::ProviderKqp, NYql::NLog::ELevel::TRACE), "TxOpt") .Add(*TypeAnnTransformer, "TypeAnnotation") .AddPostTypeAnnotation(/* forSubgraph */ true) - .Add(CreateKqpBuildPhyStagesTransformer(/* allowDependantConsumers */ false), "BuildPhysicalStages") + .Add(CreateKqpBuildPhyStagesTransformer(/* allowDependantConsumers */ false, typesCtx), "BuildPhysicalStages") .Add(*BuildTxTransformer, "BuildPhysicalTx") .Add(CreateKqpTxPeepholeTransformer(TypeAnnTransformer.Get(), typesCtx, config, /* withFinalStageRules */ false), "Peephole") .Build(false); @@ -481,7 +481,7 @@ public: .Add(TExprLogTransformer::Sync("TxOpt", NYql::NLog::EComponent::ProviderKqp, NYql::NLog::ELevel::TRACE), "TxOpt") .Add(*TypeAnnTransformer, "TypeAnnotation") .AddPostTypeAnnotation(/* forSubgraph */ true) - .Add(CreateKqpBuildPhyStagesTransformer(config->SpillingEnabled()), "BuildPhysicalStages") + .Add(CreateKqpBuildPhyStagesTransformer(config->SpillingEnabled(), typesCtx), "BuildPhysicalStages") .Add(*BuildTxTransformer, "BuildPhysicalTx") .Add(CreateKqpTxPeepholeTransformer(TypeAnnTransformer.Get(), typesCtx, config, /* withFinalStageRules */ false), "Peephole") .Build(false); diff --git a/ydb/library/yql/core/yql_expr_type_annotation.cpp b/ydb/library/yql/core/yql_expr_type_annotation.cpp index 3564e110ae..bc3c46f512 100644 --- a/ydb/library/yql/core/yql_expr_type_annotation.cpp +++ b/ydb/library/yql/core/yql_expr_type_annotation.cpp @@ -2792,43 +2792,64 @@ bool EnsureWideStreamType(TPositionHandle position, const TTypeAnnotationNode& t return true; } -bool EnsureWideFlowBlockType(const TExprNode& node, TTypeAnnotationNode::TListType& blockItemTypes, TExprContext& ctx, bool allowScalar) { - if (!EnsureWideFlowType(node, ctx)) { +bool EnsureWideBlockType(TPositionHandle position, const TTypeAnnotationNode& type, TTypeAnnotationNode::TListType& blockItemTypes, TExprContext& ctx, bool allowScalar) { + if (HasError(&type, ctx)) { + return false; + } + + if (type.GetKind() != ETypeAnnotationKind::Multi) { + ctx.AddError(TIssue(ctx.GetPosition(position), TStringBuilder() << "Expected wide type, but got: " << type)); return false; } - auto& items = node.GetTypeAnn()->Cast<TFlowExprType>()->GetItemType()->Cast<TMultiExprType>()->GetItems(); + auto& items = type.Cast<TMultiExprType>()->GetItems(); if (items.empty()) { - ctx.AddError(TIssue(ctx.GetPosition(node.Pos()), "Expected at least one column")); - return IGraphTransformer::TStatus::Error; + ctx.AddError(TIssue(ctx.GetPosition(position), "Expected at least one column")); + return false; } bool isScalar; for (ui32 i = 0; i < items.size(); ++i) { - const auto& type = items[i]; - if (!EnsureBlockOrScalarType(node.Pos(), *type, ctx)) { + const auto& itemType = items[i]; + if (!EnsureBlockOrScalarType(position, *itemType, ctx)) { return false; } - blockItemTypes.push_back(GetBlockItemType(*type, isScalar)); + blockItemTypes.push_back(GetBlockItemType(*itemType, isScalar)); if (!allowScalar && isScalar && (i + 1 != items.size())) { - ctx.AddError(TIssue(ctx.GetPosition(node.Pos()), "Scalars are not allowed")); + ctx.AddError(TIssue(ctx.GetPosition(position), "Scalars are not allowed")); return false; } } if (!isScalar) { - ctx.AddError(TIssue(ctx.GetPosition(node.Pos()), "Last column should be a scalar")); + ctx.AddError(TIssue(ctx.GetPosition(position), "Last column should be a scalar")); return false; } - if (!EnsureSpecificDataType(node.Pos(), *blockItemTypes.back(), EDataSlot::Uint64, ctx)) { + if (!EnsureSpecificDataType(position, *blockItemTypes.back(), EDataSlot::Uint64, ctx)) { return false; } return true; } +bool EnsureWideFlowBlockType(const TExprNode& node, TTypeAnnotationNode::TListType& blockItemTypes, TExprContext& ctx, bool allowScalar) { + if (!EnsureWideFlowType(node, ctx)) { + return false; + } + + return EnsureWideBlockType(node.Pos(), *node.GetTypeAnn()->Cast<TFlowExprType>()->GetItemType(), blockItemTypes, ctx, allowScalar); +} + +bool EnsureWideStreamBlockType(const TExprNode& node, TTypeAnnotationNode::TListType& blockItemTypes, TExprContext& ctx, bool allowScalar) { + if (!EnsureWideStreamType(node, ctx)) { + return false; + } + + return EnsureWideBlockType(node.Pos(), *node.GetTypeAnn()->Cast<TStreamExprType>()->GetItemType(), blockItemTypes, ctx, allowScalar); +} + bool EnsureOptionalType(const TExprNode& node, TExprContext& ctx) { if (!node.GetTypeAnn()) { YQL_ENSURE(node.Type() == TExprNode::Lambda); diff --git a/ydb/library/yql/core/yql_expr_type_annotation.h b/ydb/library/yql/core/yql_expr_type_annotation.h index 3a929bf772..f2b6430ff5 100644 --- a/ydb/library/yql/core/yql_expr_type_annotation.h +++ b/ydb/library/yql/core/yql_expr_type_annotation.h @@ -124,7 +124,9 @@ bool EnsureWideFlowType(const TExprNode& node, TExprContext& ctx); bool EnsureWideFlowType(TPositionHandle position, const TTypeAnnotationNode& type, TExprContext& ctx); bool EnsureWideStreamType(const TExprNode& node, TExprContext& ctx); bool EnsureWideStreamType(TPositionHandle position, const TTypeAnnotationNode& type, TExprContext& ctx); +bool EnsureWideBlockType(TPositionHandle position, const TTypeAnnotationNode& type, TTypeAnnotationNode::TListType& blockItemTypes, TExprContext& ctx, bool allowScalar = true); bool EnsureWideFlowBlockType(const TExprNode& node, TTypeAnnotationNode::TListType& blockItemTypes, TExprContext& ctx, bool allowScalar = true); +bool EnsureWideStreamBlockType(const TExprNode& node, TTypeAnnotationNode::TListType& blockItemTypes, TExprContext& ctx, bool allowScalar = true); bool EnsureOptionalType(const TExprNode& node, TExprContext& ctx); bool EnsureOptionalType(TPositionHandle position, const TTypeAnnotationNode& type, TExprContext& ctx); bool EnsureType(const TExprNode& node, TExprContext& ctx); diff --git a/ydb/library/yql/dq/opt/dq_opt_build.cpp b/ydb/library/yql/dq/opt/dq_opt_build.cpp index a7b00b7c70..10abdb7bcf 100644 --- a/ydb/library/yql/dq/opt/dq_opt_build.cpp +++ b/ydb/library/yql/dq/opt/dq_opt_build.cpp @@ -432,7 +432,18 @@ const TStructExprType* GetStageOutputItemType(const TDqPhyStage& stage) { return stageType->GetItems()[0]->Cast<TListExprType>()->GetItemType()->Cast<TStructExprType>(); } -TDqPhyStage RebuildStageInputsAsWide(const TDqPhyStage& stage, TExprContext& ctx) { +bool IsCompatibleWithBlocks(TPositionHandle pos, const TStructExprType& type, TExprContext& ctx, TTypeAnnotationContext& typesCtx) { + TVector<const TTypeAnnotationNode*> types; + for (auto& item : type.GetItems()) { + types.emplace_back(item->GetItemType()); + } + + auto resolveStatus = typesCtx.ArrowResolver->AreTypesSupported(ctx.GetPosition(pos), types, ctx); + YQL_ENSURE(resolveStatus != IArrowResolver::ERROR); + return resolveStatus == IArrowResolver::OK; +} + +TDqPhyStage RebuildStageInputsAsWide(const TDqPhyStage& stage, bool useChannelBlocks, TExprContext& ctx, TTypeAnnotationContext& typesCtx) { TVector<TCoArgument> newArgs; newArgs.reserve(stage.Inputs().Size()); TNodeOnNodeOwnedMap argsMap; @@ -451,13 +462,25 @@ TDqPhyStage RebuildStageInputsAsWide(const TDqPhyStage& stage, TExprContext& ctx if (maybeConn && CanRebuildForWideChannelOutput(maybeConn.Cast().Output())) { needRebuild = true; auto itemType = arg.Ref().GetTypeAnn()->Cast<TStreamExprType>()->GetItemType()->Cast<TStructExprType>(); - + TExprNode::TPtr newArgNode = newArg.Ptr(); + if (useChannelBlocks && IsCompatibleWithBlocks(arg.Pos(), *itemType, ctx, typesCtx)) { + // input will actually be wide block stream - convert it to wide stream first + newArgNode = ctx.Builder(arg.Pos()) + .Callable("FromFlow") + .Callable(0, "WideFromBlocks") + .Callable(0, "ToFlow") + .Add(0, newArg.Ptr()) + .Seal() + .Seal() + .Seal() + .Build(); + } // input will actually be wide stream - need to convert it back to stream auto argReplace = ctx.Builder(arg.Pos()) .Callable("FromFlow") .Callable(0, "NarrowMap") .Callable(0, "ToFlow") - .Add(0, newArg.Ptr()) + .Add(0, newArgNode) .Seal() .Lambda(1) .Params("fields", itemType->GetSize()) @@ -498,7 +521,9 @@ TDqPhyStage RebuildStageInputsAsWide(const TDqPhyStage& stage, TExprContext& ctx .Done(); } -TDqPhyStage RebuildStageOutputAsWide(const TDqPhyStage& stage, const TStructExprType& outputItemType, TExprContext& ctx) { +TDqPhyStage RebuildStageOutputAsWide(const TDqPhyStage& stage, const TStructExprType& outputItemType, bool useChannelBlocks, + TExprContext& ctx, TTypeAnnotationContext& typesCtx) +{ // convert stream to wide stream auto resultStream = ctx.Builder(stage.Program().Body().Pos()) .Callable("FromFlow") @@ -523,6 +548,19 @@ TDqPhyStage RebuildStageOutputAsWide(const TDqPhyStage& stage, const TStructExpr .Seal() .Build(); + if (useChannelBlocks && IsCompatibleWithBlocks(resultStream->Pos(), outputItemType, ctx, typesCtx)) { + // convert wide stream to wide block stream + resultStream = ctx.Builder(resultStream->Pos()) + .Callable("FromFlow") + .Callable(0, "WideToBlocks") + .Callable(0, "ToFlow") + .Add(0, resultStream) + .Seal() + .Seal() + .Seal() + .Build(); + } + return Build<TDqPhyStage>(ctx, stage.Pos()) .InitFrom(stage) .Program() @@ -533,18 +571,21 @@ TDqPhyStage RebuildStageOutputAsWide(const TDqPhyStage& stage, const TStructExpr .Done(); } -TDqPhyStage RebuildStageAsWide(const TDqPhyStage& stage, TExprContext& ctx) { +TDqPhyStage RebuildStageAsWide(const TDqPhyStage& stage, bool useChannelBlocks, TExprContext& ctx, TTypeAnnotationContext& typesCtx) { const TStructExprType* outputItemType = GetStageOutputItemType(stage); - return RebuildStageOutputAsWide(RebuildStageInputsAsWide(stage, ctx), *outputItemType, ctx); + return RebuildStageOutputAsWide(RebuildStageInputsAsWide(stage, useChannelBlocks, ctx, typesCtx), + *outputItemType, useChannelBlocks, ctx, typesCtx); } -IGraphTransformer::TStatus DqEnableWideChannels(TExprNode::TPtr input, TExprNode::TPtr& output, - TExprContext& ctx) +IGraphTransformer::TStatus DqEnableWideChannels(EChannelMode mode, TExprNode::TPtr input, TExprNode::TPtr& output, + TExprContext& ctx, TTypeAnnotationContext& typesCtx) { output = input; TNodeOnNodeOwnedMap replaces; TNodeSet processedStages; - VisitExpr(input, [&replaces, &processedStages, &ctx](const TExprNode::TPtr& node) { + YQL_ENSURE(mode == CHANNEL_WIDE || mode == CHANNEL_WIDE_BLOCK); + const bool useChannelBlocks = mode == CHANNEL_WIDE_BLOCK; + VisitExpr(input, [&](const TExprNode::TPtr& node) { if (node->IsLambda()) { return false; } @@ -554,7 +595,7 @@ IGraphTransformer::TStatus DqEnableWideChannels(TExprNode::TPtr input, TExprNode if (maybeConn && CanRebuildForWideChannelOutput(maybeConn.Cast())) { auto conn = maybeConn.Cast(); processedStages.insert(conn.Output().Stage().Raw()); - auto newStage = RebuildStageAsWide(conn.Output().Stage().Cast<TDqPhyStage>(), ctx); + auto newStage = RebuildStageAsWide(conn.Output().Stage().Cast<TDqPhyStage>(), useChannelBlocks, ctx, typesCtx); auto outputItemType = GetStageOutputItemType(conn.Output().Stage().Cast<TDqPhyStage>()); if (conn.Maybe<TDqCnHashShuffle>()) { @@ -601,7 +642,7 @@ IGraphTransformer::TStatus DqEnableWideChannels(TExprNode::TPtr input, TExprNode auto stage = expr.Maybe<TDqPhyStage>().Cast(); if (!processedStages.contains(stage.Raw())) { processedStages.insert(stage.Raw()); - auto newStage = RebuildStageInputsAsWide(stage, ctx); + auto newStage = RebuildStageInputsAsWide(stage, useChannelBlocks, ctx, typesCtx); if (newStage.Raw() != stage.Raw()) { replaces[stage.Raw()] = newStage.Ptr(); } @@ -625,7 +666,7 @@ IGraphTransformer::TStatus DqEnableWideChannels(TExprNode::TPtr input, TExprNode } // namespace -TAutoPtr<IGraphTransformer> CreateDqBuildPhyStagesTransformer(bool allowDependantConsumers, bool useWideChannels) { +TAutoPtr<IGraphTransformer> CreateDqBuildPhyStagesTransformer(bool allowDependantConsumers, TTypeAnnotationContext& typesCtx, EChannelMode mode) { TVector<TTransformStage> transformers; transformers.push_back(TTransformStage(CreateFunctorTransformer( @@ -645,10 +686,10 @@ TAutoPtr<IGraphTransformer> CreateDqBuildPhyStagesTransformer(bool allowDependan "BuildPhysicalStages", TIssuesIds::DEFAULT_ERROR)); - if (useWideChannels) { + if (mode != CHANNEL_SCALAR) { transformers.push_back(TTransformStage(CreateFunctorTransformer( - [](const TExprNode::TPtr& input, TExprNode::TPtr& output, TExprContext& ctx) { - return DqEnableWideChannels(input, output, ctx); + [mode, &typesCtx](const TExprNode::TPtr& input, TExprNode::TPtr& output, TExprContext& ctx) { + return DqEnableWideChannels(mode, input, output, ctx, typesCtx); }), "EnableWideChannels", TIssuesIds::DEFAULT_ERROR)); diff --git a/ydb/library/yql/dq/opt/dq_opt_build.h b/ydb/library/yql/dq/opt/dq_opt_build.h index 3853d8a3be..6cd9911630 100644 --- a/ydb/library/yql/dq/opt/dq_opt_build.h +++ b/ydb/library/yql/dq/opt/dq_opt_build.h @@ -1,10 +1,17 @@ #pragma once #include <ydb/library/yql/core/yql_graph_transformer.h> +#include <ydb/library/yql/core/yql_type_annotation.h> #include <ydb/library/yql/dq/common/dq_common.h> namespace NYql::NDq { -TAutoPtr<IGraphTransformer> CreateDqBuildPhyStagesTransformer(bool allowDependantConsumers, bool useWideChannels); +enum EChannelMode { + CHANNEL_SCALAR, + CHANNEL_WIDE, + CHANNEL_WIDE_BLOCK, +}; + +TAutoPtr<IGraphTransformer> CreateDqBuildPhyStagesTransformer(bool allowDependantConsumers, TTypeAnnotationContext& typesCtx, EChannelMode mode); } // namespace NYql::NDq diff --git a/ydb/library/yql/dq/type_ann/dq_type_ann.cpp b/ydb/library/yql/dq/type_ann/dq_type_ann.cpp index 83de70ed7d..9955b5ee14 100644 --- a/ydb/library/yql/dq/type_ann/dq_type_ann.cpp +++ b/ydb/library/yql/dq/type_ann/dq_type_ann.cpp @@ -61,13 +61,20 @@ const TTypeAnnotationNode* GetColumnType(const TDqConnection& node, const TStruc TStringBuilder() << "Expecting integer as column name, but got '" << name << "'")); return nullptr; } - if (idx >= multiType->GetSize()) { + const bool isBlock = AnyOf(multiType->GetItems(), [](const TTypeAnnotationNode* item) { return item->IsBlockOrScalar(); }); + const ui32 width = isBlock ? (multiType->GetSize() - 1) : multiType->GetSize(); + if (idx >= width) { ctx.AddError(TIssue(ctx.GetPosition(pos), - TStringBuilder() << "Column index too big: " << name << " >= " << multiType->GetSize())); + TStringBuilder() << "Column index too big: " << name << " >= " << width)); return nullptr; } - return multiType->GetItems()[idx]; + auto itemType = multiType->GetItems()[idx]; + if (isBlock) { + itemType = itemType->IsBlock() ? itemType->Cast<TBlockExprType>()->GetItemType() : + itemType->Cast<TScalarExprType>()->GetItemType(); + } + return itemType; } auto result = structType.FindItemType(name); @@ -233,17 +240,25 @@ TStatus AnnotateStage(const TExprNode::TPtr& stage, TExprContext& ctx) { } YQL_ENSURE(programResultTypesTuple.size() == 1); auto multiType = programLambda->GetTypeAnn()->Cast<TStreamExprType>()->GetItemType()->Cast<TMultiExprType>(); - if (multiType->GetSize() != outputNarrowType->GetSize()) { + const bool isBlock = AnyOf(multiType->GetItems(), [](const TTypeAnnotationNode* item) { return item->IsBlockOrScalar(); }); + TTypeAnnotationNode::TListType blockItemTypes; + if (isBlock && !EnsureWideStreamBlockType(*programLambda, blockItemTypes, ctx)) { + return TStatus::Error; + } + + const ui32 width = isBlock ? (blockItemTypes.size() - 1) : multiType->GetSize(); + if (width != outputNarrowType->GetSize()) { ctx.AddError(TIssue(ctx.GetPosition(programLambda->Pos()),TStringBuilder() << "Wide/narrow types has different number of items: " << - multiType->GetSize() << " vs " << outputNarrowType->GetSize())); + width << " vs " << outputNarrowType->GetSize())); return TStatus::Error; } for (size_t i = 0; i < outputNarrowType->GetSize(); ++i) { auto structItem = outputNarrowType->GetItems()[i]; - if (!IsSameAnnotation(*structItem->GetItemType(), *(multiType->GetItems()[i]))) { + auto wideItem = isBlock ? blockItemTypes[i] : multiType->GetItems()[i]; + if (!IsSameAnnotation(*structItem->GetItemType(), *wideItem)) { ctx.AddError(TIssue(ctx.GetPosition(programLambda->Pos()),TStringBuilder() << "Wide/narrow types mismatch for column '" << - structItem->GetName() << "' : " << *(multiType->GetItems()[i]) << " vs " << *structItem->GetItemType())); + structItem->GetName() << "' : " << *wideItem << " vs " << *structItem->GetItemType())); return TStatus::Error; } } diff --git a/ydb/library/yql/providers/dq/common/yql_dq_settings.cpp b/ydb/library/yql/providers/dq/common/yql_dq_settings.cpp index f38f300135..61512abd44 100644 --- a/ydb/library/yql/providers/dq/common/yql_dq_settings.cpp +++ b/ydb/library/yql/providers/dq/common/yql_dq_settings.cpp @@ -60,6 +60,13 @@ TDqConfiguration::TDqConfiguration() { REGISTER_SETTING(*this, HashShuffleMaxTasks).Lower(1).Upper(1000); REGISTER_SETTING(*this, UseWideChannels); + REGISTER_SETTING(*this, UseWideBlockChannels) + .ValueSetter([this](const TString&, bool value) { + UseWideBlockChannels = value; + if (value) { + UseWideChannels = true; + } + }); REGISTER_SETTING(*this, UseFastPickleTransport); REGISTER_SETTING(*this, UseOOBTransport); diff --git a/ydb/library/yql/providers/dq/common/yql_dq_settings.h b/ydb/library/yql/providers/dq/common/yql_dq_settings.h index 1c29292bfa..aa30d90a50 100644 --- a/ydb/library/yql/providers/dq/common/yql_dq_settings.h +++ b/ydb/library/yql/providers/dq/common/yql_dq_settings.h @@ -98,6 +98,7 @@ struct TDqSettings { NCommon::TConfSetting<ui32, false> HashShuffleMaxTasks; NCommon::TConfSetting<bool, false> UseWideChannels; + NCommon::TConfSetting<bool, false> UseWideBlockChannels; NCommon::TConfSetting<bool, false> UseFastPickleTransport; NCommon::TConfSetting<bool, false> UseOOBTransport; @@ -151,6 +152,7 @@ struct TDqSettings { SAVE_SETTING(HashShuffleTasksRatio); SAVE_SETTING(HashShuffleMaxTasks); SAVE_SETTING(UseWideChannels); + SAVE_SETTING(UseWideBlockChannels); SAVE_SETTING(UseFastPickleTransport); SAVE_SETTING(UseOOBTransport); SAVE_SETTING(AggregateStatsByStage); diff --git a/ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp b/ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp index 60515cba7e..4268395f59 100644 --- a/ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp +++ b/ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp @@ -234,7 +234,16 @@ private: void AfterTypeAnnotation(TTransformationPipeline* pipeline) const final { pipeline->Add(NDqs::CreateDqsReplacePrecomputesTransformer(*pipeline->GetTypeAnnotationContext(), State_->FunctionRegistry), "ReplacePrecomputes"); bool useWideChannels = State_->Settings->UseWideChannels.Get().GetOrElse(false); - pipeline->Add(NDq::CreateDqBuildPhyStagesTransformer(false, useWideChannels), "BuildPhy"); + bool useChannelBlocks = State_->Settings->UseWideBlockChannels.Get().GetOrElse(false); + NDq::EChannelMode mode; + if (!useWideChannels) { + mode = NDq::EChannelMode::CHANNEL_SCALAR; + } else if (!useChannelBlocks) { + mode = NDq::EChannelMode::CHANNEL_WIDE; + } else { + mode = NDq::EChannelMode::CHANNEL_WIDE_BLOCK; + } + pipeline->Add(NDq::CreateDqBuildPhyStagesTransformer(false, *pipeline->GetTypeAnnotationContext(), mode), "BuildPhy"); pipeline->Add(NDqs::CreateDqsRewritePhyCallablesTransformer(*pipeline->GetTypeAnnotationContext()), "RewritePhyCallables"); } |