aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoraneporada <aneporada@ydb.tech>2023-03-27 13:36:56 +0300
committeraneporada <aneporada@ydb.tech>2023-03-27 13:36:56 +0300
commitc89b7b6f6f7deee5667e9b4289bce25064fbcada (patch)
treee07a35f8d1e7b57160e97f3bd532625c61028a1f
parentb826ba6c4f21f1712cbab37e8d04ef1bed83adfa (diff)
downloadydb-c89b7b6f6f7deee5667e9b4289bce25064fbcada.tar.gz
Support wide stage inputs and output (for DqPhyStage)
Support wide stage inputs and output (for DqPhyStage)
-rw-r--r--ydb/core/kqp/opt/kqp_opt_build_txs.cpp3
-rw-r--r--ydb/library/yql/core/yql_expr_type_annotation.cpp24
-rw-r--r--ydb/library/yql/core/yql_expr_type_annotation.h2
-rw-r--r--ydb/library/yql/dq/opt/dq_opt.cpp11
-rw-r--r--ydb/library/yql/dq/opt/dq_opt.h5
-rw-r--r--ydb/library/yql/dq/opt/dq_opt_build.cpp260
-rw-r--r--ydb/library/yql/dq/opt/dq_opt_build.h2
-rw-r--r--ydb/library/yql/dq/type_ann/dq_type_ann.cpp112
-rw-r--r--ydb/library/yql/providers/dq/common/yql_dq_settings.cpp2
-rw-r--r--ydb/library/yql/providers/dq/common/yql_dq_settings.h7
-rw-r--r--ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp3
11 files changed, 413 insertions, 18 deletions
diff --git a/ydb/core/kqp/opt/kqp_opt_build_txs.cpp b/ydb/core/kqp/opt/kqp_opt_build_txs.cpp
index 77c856e7bf7..162554768a5 100644
--- a/ydb/core/kqp/opt/kqp_opt_build_txs.cpp
+++ b/ydb/core/kqp/opt/kqp_opt_build_txs.cpp
@@ -22,7 +22,8 @@ using TStatus = IGraphTransformer::TStatus;
namespace {
TAutoPtr<NYql::IGraphTransformer> CreateKqpBuildPhyStagesTransformer(bool allowDependantConsumers) {
- return NDq::CreateDqBuildPhyStagesTransformer(allowDependantConsumers);
+ bool useWideChannels = false;
+ return NDq::CreateDqBuildPhyStagesTransformer(allowDependantConsumers, useWideChannels);
}
class TKqpBuildTxTransformer : public TSyncTransformerBase {
diff --git a/ydb/library/yql/core/yql_expr_type_annotation.cpp b/ydb/library/yql/core/yql_expr_type_annotation.cpp
index e5cf68a6354..24ecb5ff63a 100644
--- a/ydb/library/yql/core/yql_expr_type_annotation.cpp
+++ b/ydb/library/yql/core/yql_expr_type_annotation.cpp
@@ -2742,6 +2742,30 @@ bool EnsureWideFlowType(TPositionHandle position, const TTypeAnnotationNode& typ
return true;
}
+bool EnsureWideStreamType(const TExprNode& node, TExprContext& ctx) {
+ if (HasError(node.GetTypeAnn(), ctx) || !node.GetTypeAnn()) {
+ YQL_ENSURE(node.Type() == TExprNode::Lambda);
+ ctx.AddError(TIssue(ctx.GetPosition(node.Pos()), TStringBuilder() << "Expected wide stream type, but got lambda"));
+ return false;
+ }
+
+ if (node.GetTypeAnn()->GetKind() != ETypeAnnotationKind::Stream || node.GetTypeAnn()->Cast<TStreamExprType>()->GetItemType()->GetKind() != ETypeAnnotationKind::Multi) {
+ ctx.AddError(TIssue(ctx.GetPosition(node.Pos()), TStringBuilder() << "Expected wide stream type, but got: " << *node.GetTypeAnn()));
+ return false;
+ }
+
+ return true;
+}
+
+bool EnsureWideStreamType(TPositionHandle position, const TTypeAnnotationNode& type, TExprContext& ctx) {
+ if (HasError(&type, ctx) || type.GetKind() != ETypeAnnotationKind::Stream || type.Cast<TStreamExprType>()->GetItemType()->GetKind() != ETypeAnnotationKind::Multi) {
+ ctx.AddError(TIssue(ctx.GetPosition(position), TStringBuilder() << "Expected wide stream type, but got: " << type));
+ return false;
+ }
+
+ return true;
+}
+
bool EnsureWideFlowBlockType(const TExprNode& node, TTypeAnnotationNode::TListType& blockItemTypes, TExprContext& ctx, bool allowScalar) {
if (!EnsureWideFlowType(node, ctx)) {
return false;
diff --git a/ydb/library/yql/core/yql_expr_type_annotation.h b/ydb/library/yql/core/yql_expr_type_annotation.h
index af62e050e6b..93941fa95e3 100644
--- a/ydb/library/yql/core/yql_expr_type_annotation.h
+++ b/ydb/library/yql/core/yql_expr_type_annotation.h
@@ -121,6 +121,8 @@ bool EnsureFlowType(const TExprNode& node, TExprContext& ctx);
bool EnsureFlowType(TPositionHandle position, const TTypeAnnotationNode& type, TExprContext& ctx);
bool EnsureWideFlowType(const TExprNode& node, TExprContext& ctx);
bool EnsureWideFlowType(TPositionHandle position, const TTypeAnnotationNode& type, TExprContext& ctx);
+bool EnsureWideStreamType(const TExprNode& node, TExprContext& ctx);
+bool EnsureWideStreamType(TPositionHandle position, const TTypeAnnotationNode& type, TExprContext& ctx);
bool EnsureWideFlowBlockType(const TExprNode& node, TTypeAnnotationNode::TListType& blockItemTypes, TExprContext& ctx, bool allowScalar = true);
bool EnsureOptionalType(const TExprNode& node, TExprContext& ctx);
bool EnsureOptionalType(TPositionHandle position, const TTypeAnnotationNode& type, TExprContext& ctx);
diff --git a/ydb/library/yql/dq/opt/dq_opt.cpp b/ydb/library/yql/dq/opt/dq_opt.cpp
index f27f3949f5a..cbde2de0e20 100644
--- a/ydb/library/yql/dq/opt/dq_opt.cpp
+++ b/ydb/library/yql/dq/opt/dq_opt.cpp
@@ -21,6 +21,9 @@ TDqStageSettings TDqStageSettings::Parse(const TDqStageBase& node) {
settings.LogicalId = FromString<ui64>(tuple.Value().Cast<TCoAtom>().Value());
} else if (name == SinglePartitionSettingName) {
settings.SinglePartition = true;
+ } else if (name == WideChannelsSettingName) {
+ settings.WideChannels = true;
+ settings.OutputNarrowType = tuple.Value().Ref().GetTypeAnn()->Cast<TTypeExprType>()->GetType()->Cast<TStructExprType>();
}
}
@@ -62,6 +65,14 @@ NNodes::TCoNameValueTupleList TDqStageSettings::BuildNode(TExprContext& ctx, TPo
.Done());
}
+ if (WideChannels) {
+ YQL_ENSURE(OutputNarrowType);
+ settings.push_back(Build<TCoNameValueTuple>(ctx, pos)
+ .Name().Build(WideChannelsSettingName)
+ .Value(ExpandType(pos, *OutputNarrowType, ctx))
+ .Done());
+ }
+
return Build<TCoNameValueTupleList>(ctx, pos)
.Add(settings)
.Done();
diff --git a/ydb/library/yql/dq/opt/dq_opt.h b/ydb/library/yql/dq/opt/dq_opt.h
index c49a97266ea..84174e04fd1 100644
--- a/ydb/library/yql/dq/opt/dq_opt.h
+++ b/ydb/library/yql/dq/opt/dq_opt.h
@@ -12,12 +12,17 @@ struct TDqStageSettings {
static constexpr TStringBuf LogicalIdSettingName = "_logical_id";
static constexpr TStringBuf IdSettingName = "_id";
static constexpr TStringBuf SinglePartitionSettingName = "_single_partition";
+ static constexpr TStringBuf WideChannelsSettingName = "_wide_channels";
ui64 LogicalId = 0;
TString Id;
bool SinglePartition = false;
+ bool WideChannels = false;
+ const TStructExprType* OutputNarrowType = nullptr;
+
TDqStageSettings& SetSinglePartition(bool value = true) { SinglePartition = value; return *this; }
+ TDqStageSettings& SetWideChannels(const TStructExprType& narrowType) { WideChannels = true; OutputNarrowType = &narrowType; return *this; }
static TDqStageSettings New(const NNodes::TDqStageBase& node);
diff --git a/ydb/library/yql/dq/opt/dq_opt_build.cpp b/ydb/library/yql/dq/opt/dq_opt_build.cpp
index b07be281fe9..22985a33519 100644
--- a/ydb/library/yql/dq/opt/dq_opt_build.cpp
+++ b/ydb/library/yql/dq/opt/dq_opt_build.cpp
@@ -227,7 +227,7 @@ void MakeConsumerReplaces(
class TDqReplaceStageConsumersTransformer : public TSyncTransformerBase {
public:
explicit TDqReplaceStageConsumersTransformer(bool allowDependantConsumers)
- : AllowDependantConsumers(allowDependantConsumers) {}
+ : AllowDependantConsumers_(allowDependantConsumers) {}
TStatus DoTransform(TExprNode::TPtr input, TExprNode::TPtr& output, TExprContext& ctx) final {
YQL_CLOG(TRACE, CoreDq) << "[DQ/Build/TransformConsumers] " << NCommon::ExprToPrettyString(ctx, *input);
@@ -296,7 +296,7 @@ public:
TNodeOnNodeOwnedMap replaces;
for (const auto& [stage, info] : consumersMap) {
- MakeConsumerReplaces(TDqStage(stage), info, AllowDependantConsumers, replaces, ctx);
+ MakeConsumerReplaces(TDqStage(stage), info, AllowDependantConsumers_, replaces, ctx);
}
if (replaces.empty()) {
@@ -312,7 +312,7 @@ public:
}
private:
- const bool AllowDependantConsumers;
+ const bool AllowDependantConsumers_;
};
class TDqBuildPhysicalStagesTransformer : public TSyncTransformerBase {
@@ -384,9 +384,252 @@ public:
}
};
+bool CanRebuildForWideChannelOutput(const TDqOutput& output) {
+ ui32 index = FromString<ui32>(output.Index().Value());
+ if (index != 0) {
+ // stage has multiple outputs
+ return false;
+ }
+
+ auto stage = output.Stage().Cast<TDqPhyStage>();
+ auto stageSettings = TDqStageSettings::Parse(stage);
+ if (stageSettings.WideChannels) {
+ return false;
+ }
+
+ auto stageOutputType = stage.Ref().GetTypeAnn()->Cast<TTupleExprType>();
+ if (stageOutputType->GetSize() != 1 || stage.Outputs().IsValid()) {
+ return false;
+ }
+
+ return true;
+}
+
+bool CanRebuildForWideChannelOutput(const TDqConnection& conn) {
+ if (conn.Maybe<TDqCnResult>() || conn.Maybe<TDqCnValue>()) {
+ return false;
+ }
+
+ ui32 index = FromString<ui32>(conn.Output().Index().Value());
+ if (index != 0) {
+ // stage has multiple outputs
+ return false;
+ }
+
+ auto stage = conn.Output().Stage().Cast<TDqPhyStage>();
+ auto stageSettings = TDqStageSettings::Parse(stage);
+ if (stageSettings.WideChannels) {
+ return false;
+ }
+
+ auto stageOutputType = stage.Ref().GetTypeAnn()->Cast<TTupleExprType>();
+ if (stageOutputType->GetSize() != 1 || stage.Outputs().IsValid()) {
+ return false;
+ }
+
+ return true;
+}
+
+const TStructExprType* GetStageOutputItemType(const TDqPhyStage& stage) {
+ const TTupleExprType* stageType = stage.Ref().GetTypeAnn()->Cast<TTupleExprType>();
+ YQL_ENSURE(stageType->GetSize() == 1);
+ return stageType->GetItems()[0]->Cast<TListExprType>()->GetItemType()->Cast<TStructExprType>();
+}
+
+TDqPhyStage RebuildStageInputsAsWide(const TDqPhyStage& stage, TExprContext& ctx) {
+ TVector<TCoArgument> newArgs;
+ newArgs.reserve(stage.Inputs().Size());
+ TNodeOnNodeOwnedMap argsMap;
+
+ YQL_ENSURE(stage.Inputs().Size() == stage.Program().Args().Size());
+
+ bool needRebuild = false;
+ for (size_t i = 0; i < stage.Inputs().Size(); ++i) {
+ TCoArgument arg = stage.Program().Args().Arg(i);
+
+ auto newArg = TCoArgument(ctx.NewArgument(arg.Pos(), arg.Name()));
+ newArgs.emplace_back(newArg);
+
+ auto maybeConn = stage.Inputs().Item(i).Maybe<TDqConnection>();
+
+ if (maybeConn && CanRebuildForWideChannelOutput(maybeConn.Cast().Output())) {
+ needRebuild = true;
+ auto itemType = arg.Ref().GetTypeAnn()->Cast<TStreamExprType>()->GetItemType()->Cast<TStructExprType>();
+
+ // input will actually be wide stream - need to convert it back to stream
+ auto argReplace = ctx.Builder(arg.Pos())
+ .Callable("FromFlow")
+ .Callable(0, "NarrowMap")
+ .Callable(0, "ToFlow")
+ .Add(0, newArg.Ptr())
+ .Seal()
+ .Lambda(1)
+ .Params("fields", itemType->GetSize())
+ .Callable("AsStruct")
+ .Do([&](TExprNodeBuilder& parent) -> TExprNodeBuilder& {
+ ui32 i = 0U;
+ for (const auto& item : itemType->GetItems()) {
+ parent.List(i)
+ .Atom(0, item->GetName())
+ .Arg(1, "fields", i)
+ .Seal();
+ ++i;
+ }
+ return parent;
+ })
+ .Seal()
+ .Seal()
+ .Seal()
+ .Seal()
+ .Build();
+
+ argsMap.emplace(arg.Raw(), argReplace);
+ } else {
+ argsMap.emplace(arg.Raw(), newArg.Ptr());
+ }
+ }
+
+ if (!needRebuild) {
+ return stage;
+ }
+
+ return Build<TDqPhyStage>(ctx, stage.Pos())
+ .InitFrom(stage)
+ .Program()
+ .Args(newArgs)
+ .Body(ctx.ReplaceNodes(stage.Program().Body().Ptr(), argsMap))
+ .Build()
+ .Done();
+}
+
+TDqPhyStage RebuildStageOutputAsWide(const TDqPhyStage& stage, const TStructExprType& outputItemType, TExprContext& ctx) {
+ // convert stream to wide stream
+ auto resultStream = ctx.Builder(stage.Program().Body().Pos())
+ .Callable("FromFlow")
+ .Callable(0, "ExpandMap")
+ .Callable(0, "ToFlow")
+ .Add(0, stage.Program().Body().Ptr())
+ .Seal()
+ .Lambda(1)
+ .Param("item")
+ .Do([&](TExprNodeBuilder& lambda) -> TExprNodeBuilder& {
+ ui32 i = 0U;
+ for (const auto& item : outputItemType.GetItems()) {
+ lambda.Callable(i++, "Member")
+ .Arg(0, "item")
+ .Atom(1, item->GetName())
+ .Seal();
+ }
+ return lambda;
+ })
+ .Seal()
+ .Seal()
+ .Seal()
+ .Build();
+
+ return Build<TDqPhyStage>(ctx, stage.Pos())
+ .InitFrom(stage)
+ .Program()
+ .Args(stage.Program().Args())
+ .Body(resultStream)
+ .Build()
+ .Settings(TDqStageSettings::New(stage).SetWideChannels(outputItemType).BuildNode(ctx, stage.Pos()))
+ .Done();
+}
+
+TDqPhyStage RebuildStageAsWide(const TDqPhyStage& stage, TExprContext& ctx) {
+ const TStructExprType* outputItemType = GetStageOutputItemType(stage);
+ return RebuildStageOutputAsWide(RebuildStageInputsAsWide(stage, ctx), *outputItemType, ctx);
+}
+
+IGraphTransformer::TStatus DqEnableWideChannels(TExprNode::TPtr input, TExprNode::TPtr& output,
+ TExprContext& ctx)
+{
+ output = input;
+ TNodeOnNodeOwnedMap replaces;
+ TNodeSet processedStages;
+ VisitExpr(input, [&replaces, &processedStages, &ctx](const TExprNode::TPtr& node) {
+ if (node->IsLambda()) {
+ return false;
+ }
+
+ TExprBase expr{node};
+ auto maybeConn = expr.Maybe<TDqConnection>();
+ if (maybeConn && CanRebuildForWideChannelOutput(maybeConn.Cast())) {
+ auto conn = maybeConn.Cast();
+ processedStages.insert(conn.Output().Stage().Raw());
+ auto newStage = RebuildStageAsWide(conn.Output().Stage().Cast<TDqPhyStage>(), ctx);
+ auto outputItemType = GetStageOutputItemType(conn.Output().Stage().Cast<TDqPhyStage>());
+
+ if (conn.Maybe<TDqCnHashShuffle>()) {
+ auto shuffle = conn.Maybe<TDqCnHashShuffle>().Cast();
+ auto builder = Build<TCoAtomList>(ctx, shuffle.KeyColumns().Pos());
+ for (auto key : shuffle.KeyColumns()) {
+ ui32 idx = *outputItemType->FindItem(key.Value());
+ builder.Add<TCoAtom>().Build(ToString(idx));
+ }
+
+ replaces[conn.Raw()] = Build<TDqCnHashShuffle>(ctx, conn.Pos())
+ .Output<TDqOutput>()
+ .InitFrom(conn.Output())
+ .Stage(newStage)
+ .Build()
+ .KeyColumns(builder.Build().Value())
+ .Done().Ptr();
+ } else if (conn.Maybe<TDqCnMerge>()) {
+ auto merge = conn.Maybe<TDqCnMerge>().Cast();
+ auto builder = Build<TDqSortColumnList>(ctx, merge.SortColumns().Pos());
+ for (auto sortColumn : merge.SortColumns()) {
+ ui32 idx = *outputItemType->FindItem(sortColumn.Column().Value());
+ builder.Add<TDqSortColumn>()
+ .Column<TCoAtom>().Build(ToString(idx))
+ .SortDirection(sortColumn.SortDirection())
+ .Build();
+ }
+
+ replaces[conn.Raw()] = Build<TDqCnMerge>(ctx, conn.Pos())
+ .Output<TDqOutput>()
+ .InitFrom(conn.Output())
+ .Stage(newStage)
+ .Build()
+ .SortColumns(builder.Build().Value())
+ .Done().Ptr();
+ } else {
+ auto newOutput = Build<TDqOutput>(ctx, conn.Output().Pos())
+ .InitFrom(conn.Output())
+ .Stage(newStage)
+ .Done();
+ replaces[conn.Raw()] = ctx.ChangeChild(conn.Ref(), TDqConnection::idx_Output, newOutput.Ptr());
+ }
+ } else if (expr.Maybe<TDqPhyStage>()) {
+ auto stage = expr.Maybe<TDqPhyStage>().Cast();
+ if (!processedStages.contains(stage.Raw())) {
+ processedStages.insert(stage.Raw());
+ auto newStage = RebuildStageInputsAsWide(stage, ctx);
+ if (newStage.Raw() != stage.Raw()) {
+ replaces[stage.Raw()] = newStage.Ptr();
+ }
+ }
+ }
+
+ return true;
+ });
+
+ if (replaces.empty()) {
+ return IGraphTransformer::TStatus::Ok;
+ }
+
+ YQL_CLOG(INFO, CoreDq) << "[DQ/Build/EnableWideChannels] " << "Enabled wide channels for " << replaces.size() << " stages";
+ TOptimizeExprSettings settings{nullptr};
+ settings.VisitLambdas = false;
+ auto status = RemapExpr(input, output, replaces, ctx, settings);
+ YQL_CLOG(TRACE, CoreDq) << "[DQ/Build/EnableWideChannels] " << "Dump: " << NCommon::ExprToPrettyString(ctx, *output);
+ return status;
+}
+
} // namespace
-TAutoPtr<IGraphTransformer> CreateDqBuildPhyStagesTransformer(bool allowDependantConsumers) {
+TAutoPtr<IGraphTransformer> CreateDqBuildPhyStagesTransformer(bool allowDependantConsumers, bool useWideChannels) {
TVector<TTransformStage> transformers;
transformers.push_back(TTransformStage(CreateFunctorTransformer(
@@ -406,6 +649,15 @@ TAutoPtr<IGraphTransformer> CreateDqBuildPhyStagesTransformer(bool allowDependan
"BuildPhysicalStages",
TIssuesIds::DEFAULT_ERROR));
+ if (useWideChannels) {
+ transformers.push_back(TTransformStage(CreateFunctorTransformer(
+ [](const TExprNode::TPtr& input, TExprNode::TPtr& output, TExprContext& ctx) {
+ return DqEnableWideChannels(input, output, ctx);
+ }),
+ "EnableWideChannels",
+ TIssuesIds::DEFAULT_ERROR));
+ }
+
return CreateCompositeGraphTransformer(transformers, false);
}
diff --git a/ydb/library/yql/dq/opt/dq_opt_build.h b/ydb/library/yql/dq/opt/dq_opt_build.h
index 456b1f0c812..3853d8a3be6 100644
--- a/ydb/library/yql/dq/opt/dq_opt_build.h
+++ b/ydb/library/yql/dq/opt/dq_opt_build.h
@@ -5,6 +5,6 @@
namespace NYql::NDq {
-TAutoPtr<IGraphTransformer> CreateDqBuildPhyStagesTransformer(bool allowDependantConsumers);
+TAutoPtr<IGraphTransformer> CreateDqBuildPhyStagesTransformer(bool allowDependantConsumers, bool useWideChannels);
} // namespace NYql::NDq
diff --git a/ydb/library/yql/dq/type_ann/dq_type_ann.cpp b/ydb/library/yql/dq/type_ann/dq_type_ann.cpp
index 0d0d66c4c76..3ccd0b7b67b 100644
--- a/ydb/library/yql/dq/type_ann/dq_type_ann.cpp
+++ b/ydb/library/yql/dq/type_ann/dq_type_ann.cpp
@@ -16,6 +16,9 @@ using TStatus = NYql::IGraphTransformer::TStatus;
namespace {
+// TODO: relocate names to common code for type_ann and dq_opt.cpp
+static constexpr TStringBuf WideChannelsSettingName = "_wide_channels";
+
const TTypeAnnotationNode* GetDqOutputType(const TDqOutput& output, TExprContext& ctx) {
auto stageResultTuple = output.Stage().Ref().GetTypeAnn()->Cast<TTupleExprType>();
@@ -49,6 +52,34 @@ const TTypeAnnotationNode* GetDqConnectionType(const TDqConnection& node, TExprC
return GetDqOutputType(node.Output(), ctx);
}
+const TTypeAnnotationNode* GetColumnType(const TDqConnection& node, const TStructExprType& structType, TStringBuf name, TPositionHandle pos, TExprContext& ctx) {
+ if (HasSetting(node.Output().Stage().Settings().Ref(), WideChannelsSettingName)) {
+ auto multiType = node.Output().Stage().Program().Ref().GetTypeAnn()->Cast<TStreamExprType>()->GetItemType()->Cast<TMultiExprType>();
+ ui32 idx;
+ if (!TryFromString(name, idx)) {
+ ctx.AddError(TIssue(ctx.GetPosition(pos),
+ TStringBuilder() << "Expecting integer as column name, but got '" << name << "'"));
+ return nullptr;
+ }
+ if (idx >= multiType->GetSize()) {
+ ctx.AddError(TIssue(ctx.GetPosition(pos),
+ TStringBuilder() << "Column index too big: " << name << " >= " << multiType->GetSize()));
+ return nullptr;
+ }
+
+ return multiType->GetItems()[idx];
+ }
+
+ auto result = structType.FindItemType(name);
+ if (!result) {
+ ctx.AddError(TIssue(ctx.GetPosition(pos),
+ TStringBuilder() << "Missing column '" << name << "'"));
+ return nullptr;
+ }
+
+ return result;
+}
+
template <typename TStage>
TStatus AnnotateStage(const TExprNode::TPtr& stage, TExprContext& ctx) {
if (!EnsureMinMaxArgsCount(*stage, 3, 4, ctx)) {
@@ -67,6 +98,8 @@ TStatus AnnotateStage(const TExprNode::TPtr& stage, TExprContext& ctx) {
return TStatus::Error;
}
+ bool useWideChannels = false;
+ const TStructExprType* outputNarrowType = nullptr;
for (auto& setting: settingsTuple->Children()) {
if (!EnsureTupleMinSize(*setting, 1, ctx)) {
return TStatus::Error;
@@ -74,6 +107,31 @@ TStatus AnnotateStage(const TExprNode::TPtr& stage, TExprContext& ctx) {
if (!EnsureAtom(*setting->Child(0), ctx)) {
return TStatus::Error;
}
+
+ TStringBuf name = setting->Head().Content();
+ if (name == WideChannelsSettingName) {
+ if (setting->ChildrenSize() != 2) {
+ ctx.AddError(TIssue(ctx.GetPosition(setting->Pos()), TStringBuilder() << "Setting " << name << " should contain single value"));
+ return TStatus::Error;
+ }
+ auto value = setting->Child(1);
+ if (!EnsureType(*value, ctx)) {
+ return TStatus::Error;
+ }
+
+ auto valueType = value->GetTypeAnn()->Cast<TTypeExprType>()->GetType();
+ if (!EnsureStructType(value->Pos(), *valueType, ctx)) {
+ return TStatus::Error;
+ }
+
+ if constexpr (std::is_same_v<TStage, TDqStage>) {
+ ctx.AddError(TIssue(ctx.GetPosition(setting->Pos()), TStringBuilder() << "Setting " << name << " is not supported in " << stage->Content()));
+ return TStatus::Error;
+ } else {
+ useWideChannels = true;
+ outputNarrowType = valueType->Cast<TStructExprType>();
+ }
+ }
}
if (!EnsureLambda(*programLambda, ctx)) {
@@ -109,6 +167,13 @@ TStatus AnnotateStage(const TExprNode::TPtr& stage, TExprContext& ctx) {
return TStatus::Error;
}
}
+
+ if (TDqConnection::Match(input.Get())) {
+ TDqConnection conn(input);
+ if (HasSetting(conn.Output().Stage().Settings().Ref(), WideChannelsSettingName)) {
+ argType = conn.Output().Stage().Program().Ref().GetTypeAnn();
+ }
+ }
}
if (!TDqPhyPrecompute::Match(input.Get()) && input->Content() != "KqpTxResultBinding") {
@@ -161,10 +226,40 @@ TStatus AnnotateStage(const TExprNode::TPtr& stage, TExprContext& ctx) {
}
}
+ if (useWideChannels) {
+ if (!EnsureWideStreamType(*programLambda, ctx)) {
+ ctx.AddError(TIssue(ctx.GetPosition(programLambda->Pos()),TStringBuilder() << "Wide channel stage requires exactly one output, but got " << programResultTypesTuple.size()));
+ return TStatus::Error;
+ }
+ YQL_ENSURE(programResultTypesTuple.size() == 1);
+ auto multiType = programLambda->GetTypeAnn()->Cast<TStreamExprType>()->GetItemType()->Cast<TMultiExprType>();
+ if (multiType->GetSize() != outputNarrowType->GetSize()) {
+ ctx.AddError(TIssue(ctx.GetPosition(programLambda->Pos()),TStringBuilder() << "Wide/narrow types has different number of items: " <<
+ multiType->GetSize() << " vs " << outputNarrowType->GetSize()));
+ return TStatus::Error;
+ }
+
+ for (size_t i = 0; i < outputNarrowType->GetSize(); ++i) {
+ auto structItem = outputNarrowType->GetItems()[i];
+ if (!IsSameAnnotation(*structItem->GetItemType(), *(multiType->GetItems()[i]))) {
+ ctx.AddError(TIssue(ctx.GetPosition(programLambda->Pos()),TStringBuilder() << "Wide/narrow types mismatch for column '" <<
+ structItem->GetName() << "' : " << *(multiType->GetItems()[i]) << " vs " << *structItem->GetItemType()));
+ return TStatus::Error;
+ }
+ }
+
+ programResultTypesTuple[0] = ctx.MakeType<TListExprType>(outputNarrowType);
+ }
+
TVector<const TTypeAnnotationNode*> stageResultTypes;
if (TDqStageBase::idx_Outputs < stage->ChildrenSize()) {
YQL_ENSURE(stage->Child(TDqStageBase::idx_Outputs)->ChildrenSize() != 0, "Stage.Outputs list exists but empty, stage: " << stage->Dump());
+ if (useWideChannels) {
+ ctx.AddError(TIssue(ctx.GetPosition(programLambda->Pos()),TStringBuilder() << "Wide channel stage is incompatible with Sink/Transform"));
+ return TStatus::Error;
+ }
+
auto outputsNumber = programResultTypesTuple.size();
TVector<TExprNode::TPtr> transforms;
TVector<TExprNode::TPtr> sinks;
@@ -522,13 +617,12 @@ TStatus AnnotateDqCnMerge(const TExprNode::TPtr& node, TExprContext& ctx) {
{
return TStatus::Error;
}
- TMaybe<ui32> colIndex = structType->FindItem(column.Column().StringValue());
- if (!colIndex) {
- ctx.AddError(TIssue(ctx.GetPosition(column.Pos()),
- TStringBuilder() << "Missing sort column: " << column.Column().StringValue()));
+
+ auto colType = GetColumnType(TDqConnection(node), *structType, column.Column().Value(), column.Pos(), ctx);
+ if (!colType) {
return TStatus::Error;
}
- const TTypeAnnotationNode* colType = (structType->GetItems())[*colIndex]->GetItemType();
+
if (colType->GetKind() == ETypeAnnotationKind::Optional) {
colType = colType->Cast<TOptionalExprType>()->GetItemType();
}
@@ -582,12 +676,12 @@ TStatus AnnotateDqCnHashShuffle(const TExprNode::TPtr& input, TExprContext& ctx)
if (!EnsureAtom(*column, ctx)) {
return TStatus::Error;
}
- if (!structType->FindItem(column->Content())) {
- ctx.AddError(TIssue(ctx.GetPosition(column->Pos()),
- TStringBuilder() << "Missing key column: " << column->Content()));
+ auto ty = GetColumnType(TDqConnection(input), *structType, column->Content(), column->Pos(), ctx);
+ if (!ty) {
return TStatus::Error;
}
- if (const auto ty = structType->FindItemType(column->Content()); !ty->IsHashable()) {
+
+ if (!ty->IsHashable()) {
ctx.AddError(TIssue(ctx.GetPosition(column->Pos()),
TStringBuilder() << "Non-hashable key column: " << column->Content()));
return TStatus::Error;
diff --git a/ydb/library/yql/providers/dq/common/yql_dq_settings.cpp b/ydb/library/yql/providers/dq/common/yql_dq_settings.cpp
index 925f8beb7e0..2c2bfb9dc65 100644
--- a/ydb/library/yql/providers/dq/common/yql_dq_settings.cpp
+++ b/ydb/library/yql/providers/dq/common/yql_dq_settings.cpp
@@ -58,6 +58,8 @@ TDqConfiguration::TDqConfiguration() {
REGISTER_SETTING(*this, HashJoinMode).Parser([](const TString& v) { return FromString<NDq::EHashJoinMode>(v); });
REGISTER_SETTING(*this, HashShuffleTasksRatio).Lower(0.5).Upper(5);
REGISTER_SETTING(*this, HashShuffleMaxTasks).Lower(1).Upper(1000);
+
+ REGISTER_SETTING(*this, UseWideChannels);
}
} // namespace NYql
diff --git a/ydb/library/yql/providers/dq/common/yql_dq_settings.h b/ydb/library/yql/providers/dq/common/yql_dq_settings.h
index ae15121f183..4c9244b5444 100644
--- a/ydb/library/yql/providers/dq/common/yql_dq_settings.h
+++ b/ydb/library/yql/providers/dq/common/yql_dq_settings.h
@@ -90,6 +90,8 @@ struct TDqSettings {
NCommon::TConfSetting<double, false> HashShuffleTasksRatio;
NCommon::TConfSetting<ui32, false> HashShuffleMaxTasks;
+ NCommon::TConfSetting<bool, false> UseWideChannels;
+
// This options will be passed to executor_actor and worker_actor
template <typename TProtoConfig>
void Save(TProtoConfig& config) {
@@ -133,8 +135,9 @@ struct TDqSettings {
SAVE_SETTING(WatermarksLateArrivalDelayMs);
SAVE_SETTING(UseAggPhases);
SAVE_SETTING(HashJoinMode);
- SAVE_SETTING(HashShuffleTasksRatio);
- SAVE_SETTING(HashShuffleMaxTasks);
+ SAVE_SETTING(HashShuffleTasksRatio);
+ SAVE_SETTING(HashShuffleMaxTasks);
+ SAVE_SETTING(UseWideChannels);
#undef SAVE_SETTING
}
diff --git a/ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp b/ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp
index 1fa3b6571a2..35c630e19d2 100644
--- a/ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp
+++ b/ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp
@@ -230,7 +230,8 @@ private:
void AfterTypeAnnotation(TTransformationPipeline* pipeline) const final {
pipeline->Add(NDqs::CreateDqsReplacePrecomputesTransformer(*pipeline->GetTypeAnnotationContext(), State_->FunctionRegistry), "ReplacePrecomputes");
- pipeline->Add(NDq::CreateDqBuildPhyStagesTransformer(false), "BuildPhy");
+ bool useWideChannels = State_->Settings->UseWideChannels.Get().GetOrElse(false);
+ pipeline->Add(NDq::CreateDqBuildPhyStagesTransformer(false, useWideChannels), "BuildPhy");
pipeline->Add(NDqs::CreateDqsRewritePhyCallablesTransformer(*pipeline->GetTypeAnnotationContext()), "RewritePhyCallables");
}