diff options
author | aneporada <aneporada@ydb.tech> | 2023-08-29 18:45:56 +0300 |
---|---|---|
committer | aneporada <aneporada@ydb.tech> | 2023-08-30 00:27:19 +0300 |
commit | d2b503b8590310a4102e31d96a5ec1320343cc76 (patch) | |
tree | d142951f585cf3a78395f0abc6b88cd67a79b3e9 | |
parent | 8dc173a6fdfd2c3ac36d4395f0b9a1f95944adf0 (diff) | |
download | ydb-d2b503b8590310a4102e31d96a5ec1320343cc76.tar.gz |
Do not require system column _yql_group_stream_index when UseBlocks is set
-rw-r--r-- | ydb/library/yql/core/type_ann/type_ann_list.cpp | 23 | ||||
-rw-r--r-- | ydb/library/yql/core/yql_aggregate_expander.cpp | 4 |
2 files changed, 13 insertions, 14 deletions
diff --git a/ydb/library/yql/core/type_ann/type_ann_list.cpp b/ydb/library/yql/core/type_ann/type_ann_list.cpp index ffc737c0ed..d2c247ca28 100644 --- a/ydb/library/yql/core/type_ann/type_ann_list.cpp +++ b/ydb/library/yql/core/type_ann/type_ann_list.cpp @@ -4683,10 +4683,16 @@ namespace { return IGraphTransformer::TStatus::Repeat; } - if (isMany && ctx.Types.UseBlocks && !inputStructType->FindItem("_yql_group_stream_index")) { - ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(input->Pos()), - TStringBuilder() << "Missing service column: _yql_group_stream_index")); - return IGraphTransformer::TStatus::Error; + if (isMany && ctx.Types.UseBlocks) { + auto streamIndex = inputStructType->FindItem("_yql_group_stream_index"); + if (streamIndex) { + const TTypeAnnotationNode* streamIndexType = inputStructType->GetItems()[*streamIndex]->GetItemType(); + if (streamIndexType->GetKind() != ETypeAnnotationKind::Data || streamIndexType->Cast<TDataExprType>()->GetSlot() != EDataSlot::Uint32) { + ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(input->Pos()), + TStringBuilder() << "Invalid type for service column _yql_group_stream_index: expecting Uint32, got: " << *streamIndexType)); + return IGraphTransformer::TStatus::Error; + } + } } auto status = NormalizeTupleOfAtoms(input, 1, output, ctx.Expr); @@ -4717,7 +4723,6 @@ namespace { } bool isHopping = false; - bool hasManyStreams = false; const auto settings = input->Child(3); if (!EnsureTuple(*settings, ctx.Expr)) { return IGraphTransformer::TStatus::Error; @@ -4847,8 +4852,6 @@ namespace { if (!ValidateAggManyStreams(*value, input->Child(2)->ChildrenSize(), ctx.Expr)) { return IGraphTransformer::TStatus::Error; } - - hasManyStreams = true; } else { ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(setting->Head().Pos()), TStringBuilder() << "Unexpected setting: " << settingName)); @@ -4856,12 +4859,6 @@ namespace { } } - if (isMany && !hasManyStreams && ctx.Types.UseBlocks) { - ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(settings->Head().Pos()), - "Missing setting: many_streams")); - return IGraphTransformer::TStatus::Error; - } - for (auto& child : input->Child(1)->Children()) { if (!EnsureAtom(*child, ctx.Expr)) { return IGraphTransformer::TStatus::Error; diff --git a/ydb/library/yql/core/yql_aggregate_expander.cpp b/ydb/library/yql/core/yql_aggregate_expander.cpp index 17ee8b5d51..c8428507b9 100644 --- a/ydb/library/yql/core/yql_aggregate_expander.cpp +++ b/ydb/library/yql/core/yql_aggregate_expander.cpp @@ -550,7 +550,9 @@ TExprNode::TPtr TAggregateExpander::MakeInputBlocks(const TExprNode::TPtr& strea if (many) { auto rowIndex = RowType->FindItem("_yql_group_stream_index"); - YQL_ENSURE(rowIndex, "Unknown column: _yql_group_stream_index"); + if (!rowIndex) { + return nullptr; + } if (streamIdxColumn) { *streamIdxColumn = extractorRoots.size(); } |