aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoraneporada <aneporada@ydb.tech>2023-07-27 12:09:43 +0300
committeraneporada <aneporada@ydb.tech>2023-07-27 12:09:43 +0300
commit3a6dc213690b48dfa360d9ced0091d077a2dca6e (patch)
treebc3919ad8848e22f42b8f25dc84f2b97ae070237
parent930489b1ca5ecac97640bf891c5141397a645d44 (diff)
downloadydb-3a6dc213690b48dfa360d9ced0091d077a2dca6e.tar.gz
Implement new pragma dq.UseWideBlockChannels
-rw-r--r--ydb/core/kqp/opt/kqp_opt_build_txs.cpp10
-rw-r--r--ydb/library/yql/core/yql_expr_type_annotation.cpp43
-rw-r--r--ydb/library/yql/core/yql_expr_type_annotation.h2
-rw-r--r--ydb/library/yql/dq/opt/dq_opt_build.cpp71
-rw-r--r--ydb/library/yql/dq/opt/dq_opt_build.h9
-rw-r--r--ydb/library/yql/dq/type_ann/dq_type_ann.cpp29
-rw-r--r--ydb/library/yql/providers/dq/common/yql_dq_settings.cpp7
-rw-r--r--ydb/library/yql/providers/dq/common/yql_dq_settings.h2
-rw-r--r--ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp11
9 files changed, 144 insertions, 40 deletions
diff --git a/ydb/core/kqp/opt/kqp_opt_build_txs.cpp b/ydb/core/kqp/opt/kqp_opt_build_txs.cpp
index 162554768a..4997f007f9 100644
--- a/ydb/core/kqp/opt/kqp_opt_build_txs.cpp
+++ b/ydb/core/kqp/opt/kqp_opt_build_txs.cpp
@@ -21,9 +21,9 @@ using TStatus = IGraphTransformer::TStatus;
namespace {
-TAutoPtr<NYql::IGraphTransformer> CreateKqpBuildPhyStagesTransformer(bool allowDependantConsumers) {
- bool useWideChannels = false;
- return NDq::CreateDqBuildPhyStagesTransformer(allowDependantConsumers, useWideChannels);
+TAutoPtr<NYql::IGraphTransformer> CreateKqpBuildPhyStagesTransformer(bool allowDependantConsumers, TTypeAnnotationContext& typesCtx) {
+ EChannelMode mode = EChannelMode::CHANNEL_SCALAR;
+ return NDq::CreateDqBuildPhyStagesTransformer(allowDependantConsumers, typesCtx, mode);
}
class TKqpBuildTxTransformer : public TSyncTransformerBase {
@@ -471,7 +471,7 @@ public:
.Add(TExprLogTransformer::Sync("TxOpt", NYql::NLog::EComponent::ProviderKqp, NYql::NLog::ELevel::TRACE), "TxOpt")
.Add(*TypeAnnTransformer, "TypeAnnotation")
.AddPostTypeAnnotation(/* forSubgraph */ true)
- .Add(CreateKqpBuildPhyStagesTransformer(/* allowDependantConsumers */ false), "BuildPhysicalStages")
+ .Add(CreateKqpBuildPhyStagesTransformer(/* allowDependantConsumers */ false, typesCtx), "BuildPhysicalStages")
.Add(*BuildTxTransformer, "BuildPhysicalTx")
.Add(CreateKqpTxPeepholeTransformer(TypeAnnTransformer.Get(), typesCtx, config, /* withFinalStageRules */ false), "Peephole")
.Build(false);
@@ -481,7 +481,7 @@ public:
.Add(TExprLogTransformer::Sync("TxOpt", NYql::NLog::EComponent::ProviderKqp, NYql::NLog::ELevel::TRACE), "TxOpt")
.Add(*TypeAnnTransformer, "TypeAnnotation")
.AddPostTypeAnnotation(/* forSubgraph */ true)
- .Add(CreateKqpBuildPhyStagesTransformer(config->SpillingEnabled()), "BuildPhysicalStages")
+ .Add(CreateKqpBuildPhyStagesTransformer(config->SpillingEnabled(), typesCtx), "BuildPhysicalStages")
.Add(*BuildTxTransformer, "BuildPhysicalTx")
.Add(CreateKqpTxPeepholeTransformer(TypeAnnTransformer.Get(), typesCtx, config, /* withFinalStageRules */ false), "Peephole")
.Build(false);
diff --git a/ydb/library/yql/core/yql_expr_type_annotation.cpp b/ydb/library/yql/core/yql_expr_type_annotation.cpp
index 3564e110ae..bc3c46f512 100644
--- a/ydb/library/yql/core/yql_expr_type_annotation.cpp
+++ b/ydb/library/yql/core/yql_expr_type_annotation.cpp
@@ -2792,43 +2792,64 @@ bool EnsureWideStreamType(TPositionHandle position, const TTypeAnnotationNode& t
return true;
}
-bool EnsureWideFlowBlockType(const TExprNode& node, TTypeAnnotationNode::TListType& blockItemTypes, TExprContext& ctx, bool allowScalar) {
- if (!EnsureWideFlowType(node, ctx)) {
+bool EnsureWideBlockType(TPositionHandle position, const TTypeAnnotationNode& type, TTypeAnnotationNode::TListType& blockItemTypes, TExprContext& ctx, bool allowScalar) {
+ if (HasError(&type, ctx)) {
+ return false;
+ }
+
+ if (type.GetKind() != ETypeAnnotationKind::Multi) {
+ ctx.AddError(TIssue(ctx.GetPosition(position), TStringBuilder() << "Expected wide type, but got: " << type));
return false;
}
- auto& items = node.GetTypeAnn()->Cast<TFlowExprType>()->GetItemType()->Cast<TMultiExprType>()->GetItems();
+ auto& items = type.Cast<TMultiExprType>()->GetItems();
if (items.empty()) {
- ctx.AddError(TIssue(ctx.GetPosition(node.Pos()), "Expected at least one column"));
- return IGraphTransformer::TStatus::Error;
+ ctx.AddError(TIssue(ctx.GetPosition(position), "Expected at least one column"));
+ return false;
}
bool isScalar;
for (ui32 i = 0; i < items.size(); ++i) {
- const auto& type = items[i];
- if (!EnsureBlockOrScalarType(node.Pos(), *type, ctx)) {
+ const auto& itemType = items[i];
+ if (!EnsureBlockOrScalarType(position, *itemType, ctx)) {
return false;
}
- blockItemTypes.push_back(GetBlockItemType(*type, isScalar));
+ blockItemTypes.push_back(GetBlockItemType(*itemType, isScalar));
if (!allowScalar && isScalar && (i + 1 != items.size())) {
- ctx.AddError(TIssue(ctx.GetPosition(node.Pos()), "Scalars are not allowed"));
+ ctx.AddError(TIssue(ctx.GetPosition(position), "Scalars are not allowed"));
return false;
}
}
if (!isScalar) {
- ctx.AddError(TIssue(ctx.GetPosition(node.Pos()), "Last column should be a scalar"));
+ ctx.AddError(TIssue(ctx.GetPosition(position), "Last column should be a scalar"));
return false;
}
- if (!EnsureSpecificDataType(node.Pos(), *blockItemTypes.back(), EDataSlot::Uint64, ctx)) {
+ if (!EnsureSpecificDataType(position, *blockItemTypes.back(), EDataSlot::Uint64, ctx)) {
return false;
}
return true;
}
+bool EnsureWideFlowBlockType(const TExprNode& node, TTypeAnnotationNode::TListType& blockItemTypes, TExprContext& ctx, bool allowScalar) {
+ if (!EnsureWideFlowType(node, ctx)) {
+ return false;
+ }
+
+ return EnsureWideBlockType(node.Pos(), *node.GetTypeAnn()->Cast<TFlowExprType>()->GetItemType(), blockItemTypes, ctx, allowScalar);
+}
+
+bool EnsureWideStreamBlockType(const TExprNode& node, TTypeAnnotationNode::TListType& blockItemTypes, TExprContext& ctx, bool allowScalar) {
+ if (!EnsureWideStreamType(node, ctx)) {
+ return false;
+ }
+
+ return EnsureWideBlockType(node.Pos(), *node.GetTypeAnn()->Cast<TStreamExprType>()->GetItemType(), blockItemTypes, ctx, allowScalar);
+}
+
bool EnsureOptionalType(const TExprNode& node, TExprContext& ctx) {
if (!node.GetTypeAnn()) {
YQL_ENSURE(node.Type() == TExprNode::Lambda);
diff --git a/ydb/library/yql/core/yql_expr_type_annotation.h b/ydb/library/yql/core/yql_expr_type_annotation.h
index 3a929bf772..f2b6430ff5 100644
--- a/ydb/library/yql/core/yql_expr_type_annotation.h
+++ b/ydb/library/yql/core/yql_expr_type_annotation.h
@@ -124,7 +124,9 @@ bool EnsureWideFlowType(const TExprNode& node, TExprContext& ctx);
bool EnsureWideFlowType(TPositionHandle position, const TTypeAnnotationNode& type, TExprContext& ctx);
bool EnsureWideStreamType(const TExprNode& node, TExprContext& ctx);
bool EnsureWideStreamType(TPositionHandle position, const TTypeAnnotationNode& type, TExprContext& ctx);
+bool EnsureWideBlockType(TPositionHandle position, const TTypeAnnotationNode& type, TTypeAnnotationNode::TListType& blockItemTypes, TExprContext& ctx, bool allowScalar = true);
bool EnsureWideFlowBlockType(const TExprNode& node, TTypeAnnotationNode::TListType& blockItemTypes, TExprContext& ctx, bool allowScalar = true);
+bool EnsureWideStreamBlockType(const TExprNode& node, TTypeAnnotationNode::TListType& blockItemTypes, TExprContext& ctx, bool allowScalar = true);
bool EnsureOptionalType(const TExprNode& node, TExprContext& ctx);
bool EnsureOptionalType(TPositionHandle position, const TTypeAnnotationNode& type, TExprContext& ctx);
bool EnsureType(const TExprNode& node, TExprContext& ctx);
diff --git a/ydb/library/yql/dq/opt/dq_opt_build.cpp b/ydb/library/yql/dq/opt/dq_opt_build.cpp
index a7b00b7c70..10abdb7bcf 100644
--- a/ydb/library/yql/dq/opt/dq_opt_build.cpp
+++ b/ydb/library/yql/dq/opt/dq_opt_build.cpp
@@ -432,7 +432,18 @@ const TStructExprType* GetStageOutputItemType(const TDqPhyStage& stage) {
return stageType->GetItems()[0]->Cast<TListExprType>()->GetItemType()->Cast<TStructExprType>();
}
-TDqPhyStage RebuildStageInputsAsWide(const TDqPhyStage& stage, TExprContext& ctx) {
+bool IsCompatibleWithBlocks(TPositionHandle pos, const TStructExprType& type, TExprContext& ctx, TTypeAnnotationContext& typesCtx) {
+ TVector<const TTypeAnnotationNode*> types;
+ for (auto& item : type.GetItems()) {
+ types.emplace_back(item->GetItemType());
+ }
+
+ auto resolveStatus = typesCtx.ArrowResolver->AreTypesSupported(ctx.GetPosition(pos), types, ctx);
+ YQL_ENSURE(resolveStatus != IArrowResolver::ERROR);
+ return resolveStatus == IArrowResolver::OK;
+}
+
+TDqPhyStage RebuildStageInputsAsWide(const TDqPhyStage& stage, bool useChannelBlocks, TExprContext& ctx, TTypeAnnotationContext& typesCtx) {
TVector<TCoArgument> newArgs;
newArgs.reserve(stage.Inputs().Size());
TNodeOnNodeOwnedMap argsMap;
@@ -451,13 +462,25 @@ TDqPhyStage RebuildStageInputsAsWide(const TDqPhyStage& stage, TExprContext& ctx
if (maybeConn && CanRebuildForWideChannelOutput(maybeConn.Cast().Output())) {
needRebuild = true;
auto itemType = arg.Ref().GetTypeAnn()->Cast<TStreamExprType>()->GetItemType()->Cast<TStructExprType>();
-
+ TExprNode::TPtr newArgNode = newArg.Ptr();
+ if (useChannelBlocks && IsCompatibleWithBlocks(arg.Pos(), *itemType, ctx, typesCtx)) {
+ // input will actually be wide block stream - convert it to wide stream first
+ newArgNode = ctx.Builder(arg.Pos())
+ .Callable("FromFlow")
+ .Callable(0, "WideFromBlocks")
+ .Callable(0, "ToFlow")
+ .Add(0, newArg.Ptr())
+ .Seal()
+ .Seal()
+ .Seal()
+ .Build();
+ }
// input will actually be wide stream - need to convert it back to stream
auto argReplace = ctx.Builder(arg.Pos())
.Callable("FromFlow")
.Callable(0, "NarrowMap")
.Callable(0, "ToFlow")
- .Add(0, newArg.Ptr())
+ .Add(0, newArgNode)
.Seal()
.Lambda(1)
.Params("fields", itemType->GetSize())
@@ -498,7 +521,9 @@ TDqPhyStage RebuildStageInputsAsWide(const TDqPhyStage& stage, TExprContext& ctx
.Done();
}
-TDqPhyStage RebuildStageOutputAsWide(const TDqPhyStage& stage, const TStructExprType& outputItemType, TExprContext& ctx) {
+TDqPhyStage RebuildStageOutputAsWide(const TDqPhyStage& stage, const TStructExprType& outputItemType, bool useChannelBlocks,
+ TExprContext& ctx, TTypeAnnotationContext& typesCtx)
+{
// convert stream to wide stream
auto resultStream = ctx.Builder(stage.Program().Body().Pos())
.Callable("FromFlow")
@@ -523,6 +548,19 @@ TDqPhyStage RebuildStageOutputAsWide(const TDqPhyStage& stage, const TStructExpr
.Seal()
.Build();
+ if (useChannelBlocks && IsCompatibleWithBlocks(resultStream->Pos(), outputItemType, ctx, typesCtx)) {
+ // convert wide stream to wide block stream
+ resultStream = ctx.Builder(resultStream->Pos())
+ .Callable("FromFlow")
+ .Callable(0, "WideToBlocks")
+ .Callable(0, "ToFlow")
+ .Add(0, resultStream)
+ .Seal()
+ .Seal()
+ .Seal()
+ .Build();
+ }
+
return Build<TDqPhyStage>(ctx, stage.Pos())
.InitFrom(stage)
.Program()
@@ -533,18 +571,21 @@ TDqPhyStage RebuildStageOutputAsWide(const TDqPhyStage& stage, const TStructExpr
.Done();
}
-TDqPhyStage RebuildStageAsWide(const TDqPhyStage& stage, TExprContext& ctx) {
+TDqPhyStage RebuildStageAsWide(const TDqPhyStage& stage, bool useChannelBlocks, TExprContext& ctx, TTypeAnnotationContext& typesCtx) {
const TStructExprType* outputItemType = GetStageOutputItemType(stage);
- return RebuildStageOutputAsWide(RebuildStageInputsAsWide(stage, ctx), *outputItemType, ctx);
+ return RebuildStageOutputAsWide(RebuildStageInputsAsWide(stage, useChannelBlocks, ctx, typesCtx),
+ *outputItemType, useChannelBlocks, ctx, typesCtx);
}
-IGraphTransformer::TStatus DqEnableWideChannels(TExprNode::TPtr input, TExprNode::TPtr& output,
- TExprContext& ctx)
+IGraphTransformer::TStatus DqEnableWideChannels(EChannelMode mode, TExprNode::TPtr input, TExprNode::TPtr& output,
+ TExprContext& ctx, TTypeAnnotationContext& typesCtx)
{
output = input;
TNodeOnNodeOwnedMap replaces;
TNodeSet processedStages;
- VisitExpr(input, [&replaces, &processedStages, &ctx](const TExprNode::TPtr& node) {
+ YQL_ENSURE(mode == CHANNEL_WIDE || mode == CHANNEL_WIDE_BLOCK);
+ const bool useChannelBlocks = mode == CHANNEL_WIDE_BLOCK;
+ VisitExpr(input, [&](const TExprNode::TPtr& node) {
if (node->IsLambda()) {
return false;
}
@@ -554,7 +595,7 @@ IGraphTransformer::TStatus DqEnableWideChannels(TExprNode::TPtr input, TExprNode
if (maybeConn && CanRebuildForWideChannelOutput(maybeConn.Cast())) {
auto conn = maybeConn.Cast();
processedStages.insert(conn.Output().Stage().Raw());
- auto newStage = RebuildStageAsWide(conn.Output().Stage().Cast<TDqPhyStage>(), ctx);
+ auto newStage = RebuildStageAsWide(conn.Output().Stage().Cast<TDqPhyStage>(), useChannelBlocks, ctx, typesCtx);
auto outputItemType = GetStageOutputItemType(conn.Output().Stage().Cast<TDqPhyStage>());
if (conn.Maybe<TDqCnHashShuffle>()) {
@@ -601,7 +642,7 @@ IGraphTransformer::TStatus DqEnableWideChannels(TExprNode::TPtr input, TExprNode
auto stage = expr.Maybe<TDqPhyStage>().Cast();
if (!processedStages.contains(stage.Raw())) {
processedStages.insert(stage.Raw());
- auto newStage = RebuildStageInputsAsWide(stage, ctx);
+ auto newStage = RebuildStageInputsAsWide(stage, useChannelBlocks, ctx, typesCtx);
if (newStage.Raw() != stage.Raw()) {
replaces[stage.Raw()] = newStage.Ptr();
}
@@ -625,7 +666,7 @@ IGraphTransformer::TStatus DqEnableWideChannels(TExprNode::TPtr input, TExprNode
} // namespace
-TAutoPtr<IGraphTransformer> CreateDqBuildPhyStagesTransformer(bool allowDependantConsumers, bool useWideChannels) {
+TAutoPtr<IGraphTransformer> CreateDqBuildPhyStagesTransformer(bool allowDependantConsumers, TTypeAnnotationContext& typesCtx, EChannelMode mode) {
TVector<TTransformStage> transformers;
transformers.push_back(TTransformStage(CreateFunctorTransformer(
@@ -645,10 +686,10 @@ TAutoPtr<IGraphTransformer> CreateDqBuildPhyStagesTransformer(bool allowDependan
"BuildPhysicalStages",
TIssuesIds::DEFAULT_ERROR));
- if (useWideChannels) {
+ if (mode != CHANNEL_SCALAR) {
transformers.push_back(TTransformStage(CreateFunctorTransformer(
- [](const TExprNode::TPtr& input, TExprNode::TPtr& output, TExprContext& ctx) {
- return DqEnableWideChannels(input, output, ctx);
+ [mode, &typesCtx](const TExprNode::TPtr& input, TExprNode::TPtr& output, TExprContext& ctx) {
+ return DqEnableWideChannels(mode, input, output, ctx, typesCtx);
}),
"EnableWideChannels",
TIssuesIds::DEFAULT_ERROR));
diff --git a/ydb/library/yql/dq/opt/dq_opt_build.h b/ydb/library/yql/dq/opt/dq_opt_build.h
index 3853d8a3be..6cd9911630 100644
--- a/ydb/library/yql/dq/opt/dq_opt_build.h
+++ b/ydb/library/yql/dq/opt/dq_opt_build.h
@@ -1,10 +1,17 @@
#pragma once
#include <ydb/library/yql/core/yql_graph_transformer.h>
+#include <ydb/library/yql/core/yql_type_annotation.h>
#include <ydb/library/yql/dq/common/dq_common.h>
namespace NYql::NDq {
-TAutoPtr<IGraphTransformer> CreateDqBuildPhyStagesTransformer(bool allowDependantConsumers, bool useWideChannels);
+enum EChannelMode {
+ CHANNEL_SCALAR,
+ CHANNEL_WIDE,
+ CHANNEL_WIDE_BLOCK,
+};
+
+TAutoPtr<IGraphTransformer> CreateDqBuildPhyStagesTransformer(bool allowDependantConsumers, TTypeAnnotationContext& typesCtx, EChannelMode mode);
} // namespace NYql::NDq
diff --git a/ydb/library/yql/dq/type_ann/dq_type_ann.cpp b/ydb/library/yql/dq/type_ann/dq_type_ann.cpp
index 83de70ed7d..9955b5ee14 100644
--- a/ydb/library/yql/dq/type_ann/dq_type_ann.cpp
+++ b/ydb/library/yql/dq/type_ann/dq_type_ann.cpp
@@ -61,13 +61,20 @@ const TTypeAnnotationNode* GetColumnType(const TDqConnection& node, const TStruc
TStringBuilder() << "Expecting integer as column name, but got '" << name << "'"));
return nullptr;
}
- if (idx >= multiType->GetSize()) {
+ const bool isBlock = AnyOf(multiType->GetItems(), [](const TTypeAnnotationNode* item) { return item->IsBlockOrScalar(); });
+ const ui32 width = isBlock ? (multiType->GetSize() - 1) : multiType->GetSize();
+ if (idx >= width) {
ctx.AddError(TIssue(ctx.GetPosition(pos),
- TStringBuilder() << "Column index too big: " << name << " >= " << multiType->GetSize()));
+ TStringBuilder() << "Column index too big: " << name << " >= " << width));
return nullptr;
}
- return multiType->GetItems()[idx];
+ auto itemType = multiType->GetItems()[idx];
+ if (isBlock) {
+ itemType = itemType->IsBlock() ? itemType->Cast<TBlockExprType>()->GetItemType() :
+ itemType->Cast<TScalarExprType>()->GetItemType();
+ }
+ return itemType;
}
auto result = structType.FindItemType(name);
@@ -233,17 +240,25 @@ TStatus AnnotateStage(const TExprNode::TPtr& stage, TExprContext& ctx) {
}
YQL_ENSURE(programResultTypesTuple.size() == 1);
auto multiType = programLambda->GetTypeAnn()->Cast<TStreamExprType>()->GetItemType()->Cast<TMultiExprType>();
- if (multiType->GetSize() != outputNarrowType->GetSize()) {
+ const bool isBlock = AnyOf(multiType->GetItems(), [](const TTypeAnnotationNode* item) { return item->IsBlockOrScalar(); });
+ TTypeAnnotationNode::TListType blockItemTypes;
+ if (isBlock && !EnsureWideStreamBlockType(*programLambda, blockItemTypes, ctx)) {
+ return TStatus::Error;
+ }
+
+ const ui32 width = isBlock ? (blockItemTypes.size() - 1) : multiType->GetSize();
+ if (width != outputNarrowType->GetSize()) {
ctx.AddError(TIssue(ctx.GetPosition(programLambda->Pos()),TStringBuilder() << "Wide/narrow types has different number of items: " <<
- multiType->GetSize() << " vs " << outputNarrowType->GetSize()));
+ width << " vs " << outputNarrowType->GetSize()));
return TStatus::Error;
}
for (size_t i = 0; i < outputNarrowType->GetSize(); ++i) {
auto structItem = outputNarrowType->GetItems()[i];
- if (!IsSameAnnotation(*structItem->GetItemType(), *(multiType->GetItems()[i]))) {
+ auto wideItem = isBlock ? blockItemTypes[i] : multiType->GetItems()[i];
+ if (!IsSameAnnotation(*structItem->GetItemType(), *wideItem)) {
ctx.AddError(TIssue(ctx.GetPosition(programLambda->Pos()),TStringBuilder() << "Wide/narrow types mismatch for column '" <<
- structItem->GetName() << "' : " << *(multiType->GetItems()[i]) << " vs " << *structItem->GetItemType()));
+ structItem->GetName() << "' : " << *wideItem << " vs " << *structItem->GetItemType()));
return TStatus::Error;
}
}
diff --git a/ydb/library/yql/providers/dq/common/yql_dq_settings.cpp b/ydb/library/yql/providers/dq/common/yql_dq_settings.cpp
index f38f300135..61512abd44 100644
--- a/ydb/library/yql/providers/dq/common/yql_dq_settings.cpp
+++ b/ydb/library/yql/providers/dq/common/yql_dq_settings.cpp
@@ -60,6 +60,13 @@ TDqConfiguration::TDqConfiguration() {
REGISTER_SETTING(*this, HashShuffleMaxTasks).Lower(1).Upper(1000);
REGISTER_SETTING(*this, UseWideChannels);
+ REGISTER_SETTING(*this, UseWideBlockChannels)
+ .ValueSetter([this](const TString&, bool value) {
+ UseWideBlockChannels = value;
+ if (value) {
+ UseWideChannels = true;
+ }
+ });
REGISTER_SETTING(*this, UseFastPickleTransport);
REGISTER_SETTING(*this, UseOOBTransport);
diff --git a/ydb/library/yql/providers/dq/common/yql_dq_settings.h b/ydb/library/yql/providers/dq/common/yql_dq_settings.h
index 1c29292bfa..aa30d90a50 100644
--- a/ydb/library/yql/providers/dq/common/yql_dq_settings.h
+++ b/ydb/library/yql/providers/dq/common/yql_dq_settings.h
@@ -98,6 +98,7 @@ struct TDqSettings {
NCommon::TConfSetting<ui32, false> HashShuffleMaxTasks;
NCommon::TConfSetting<bool, false> UseWideChannels;
+ NCommon::TConfSetting<bool, false> UseWideBlockChannels;
NCommon::TConfSetting<bool, false> UseFastPickleTransport;
NCommon::TConfSetting<bool, false> UseOOBTransport;
@@ -151,6 +152,7 @@ struct TDqSettings {
SAVE_SETTING(HashShuffleTasksRatio);
SAVE_SETTING(HashShuffleMaxTasks);
SAVE_SETTING(UseWideChannels);
+ SAVE_SETTING(UseWideBlockChannels);
SAVE_SETTING(UseFastPickleTransport);
SAVE_SETTING(UseOOBTransport);
SAVE_SETTING(AggregateStatsByStage);
diff --git a/ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp b/ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp
index 60515cba7e..4268395f59 100644
--- a/ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp
+++ b/ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp
@@ -234,7 +234,16 @@ private:
void AfterTypeAnnotation(TTransformationPipeline* pipeline) const final {
pipeline->Add(NDqs::CreateDqsReplacePrecomputesTransformer(*pipeline->GetTypeAnnotationContext(), State_->FunctionRegistry), "ReplacePrecomputes");
bool useWideChannels = State_->Settings->UseWideChannels.Get().GetOrElse(false);
- pipeline->Add(NDq::CreateDqBuildPhyStagesTransformer(false, useWideChannels), "BuildPhy");
+ bool useChannelBlocks = State_->Settings->UseWideBlockChannels.Get().GetOrElse(false);
+ NDq::EChannelMode mode;
+ if (!useWideChannels) {
+ mode = NDq::EChannelMode::CHANNEL_SCALAR;
+ } else if (!useChannelBlocks) {
+ mode = NDq::EChannelMode::CHANNEL_WIDE;
+ } else {
+ mode = NDq::EChannelMode::CHANNEL_WIDE_BLOCK;
+ }
+ pipeline->Add(NDq::CreateDqBuildPhyStagesTransformer(false, *pipeline->GetTypeAnnotationContext(), mode), "BuildPhy");
pipeline->Add(NDqs::CreateDqsRewritePhyCallablesTransformer(*pipeline->GetTypeAnnotationContext()), "RewritePhyCallables");
}