aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoraneporada <aneporada@ydb.tech>2023-08-29 18:45:56 +0300
committeraneporada <aneporada@ydb.tech>2023-08-30 00:27:19 +0300
commitd2b503b8590310a4102e31d96a5ec1320343cc76 (patch)
treed142951f585cf3a78395f0abc6b88cd67a79b3e9
parent8dc173a6fdfd2c3ac36d4395f0b9a1f95944adf0 (diff)
downloadydb-d2b503b8590310a4102e31d96a5ec1320343cc76.tar.gz
Do not require system column _yql_group_stream_index when UseBlocks is set
-rw-r--r--ydb/library/yql/core/type_ann/type_ann_list.cpp23
-rw-r--r--ydb/library/yql/core/yql_aggregate_expander.cpp4
2 files changed, 13 insertions, 14 deletions
diff --git a/ydb/library/yql/core/type_ann/type_ann_list.cpp b/ydb/library/yql/core/type_ann/type_ann_list.cpp
index ffc737c0ed..d2c247ca28 100644
--- a/ydb/library/yql/core/type_ann/type_ann_list.cpp
+++ b/ydb/library/yql/core/type_ann/type_ann_list.cpp
@@ -4683,10 +4683,16 @@ namespace {
return IGraphTransformer::TStatus::Repeat;
}
- if (isMany && ctx.Types.UseBlocks && !inputStructType->FindItem("_yql_group_stream_index")) {
- ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(input->Pos()),
- TStringBuilder() << "Missing service column: _yql_group_stream_index"));
- return IGraphTransformer::TStatus::Error;
+ if (isMany && ctx.Types.UseBlocks) {
+ auto streamIndex = inputStructType->FindItem("_yql_group_stream_index");
+ if (streamIndex) {
+ const TTypeAnnotationNode* streamIndexType = inputStructType->GetItems()[*streamIndex]->GetItemType();
+ if (streamIndexType->GetKind() != ETypeAnnotationKind::Data || streamIndexType->Cast<TDataExprType>()->GetSlot() != EDataSlot::Uint32) {
+ ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(input->Pos()),
+ TStringBuilder() << "Invalid type for service column _yql_group_stream_index: expecting Uint32, got: " << *streamIndexType));
+ return IGraphTransformer::TStatus::Error;
+ }
+ }
}
auto status = NormalizeTupleOfAtoms(input, 1, output, ctx.Expr);
@@ -4717,7 +4723,6 @@ namespace {
}
bool isHopping = false;
- bool hasManyStreams = false;
const auto settings = input->Child(3);
if (!EnsureTuple(*settings, ctx.Expr)) {
return IGraphTransformer::TStatus::Error;
@@ -4847,8 +4852,6 @@ namespace {
if (!ValidateAggManyStreams(*value, input->Child(2)->ChildrenSize(), ctx.Expr)) {
return IGraphTransformer::TStatus::Error;
}
-
- hasManyStreams = true;
} else {
ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(setting->Head().Pos()),
TStringBuilder() << "Unexpected setting: " << settingName));
@@ -4856,12 +4859,6 @@ namespace {
}
}
- if (isMany && !hasManyStreams && ctx.Types.UseBlocks) {
- ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(settings->Head().Pos()),
- "Missing setting: many_streams"));
- return IGraphTransformer::TStatus::Error;
- }
-
for (auto& child : input->Child(1)->Children()) {
if (!EnsureAtom(*child, ctx.Expr)) {
return IGraphTransformer::TStatus::Error;
diff --git a/ydb/library/yql/core/yql_aggregate_expander.cpp b/ydb/library/yql/core/yql_aggregate_expander.cpp
index 17ee8b5d51..c8428507b9 100644
--- a/ydb/library/yql/core/yql_aggregate_expander.cpp
+++ b/ydb/library/yql/core/yql_aggregate_expander.cpp
@@ -550,7 +550,9 @@ TExprNode::TPtr TAggregateExpander::MakeInputBlocks(const TExprNode::TPtr& strea
if (many) {
auto rowIndex = RowType->FindItem("_yql_group_stream_index");
- YQL_ENSURE(rowIndex, "Unknown column: _yql_group_stream_index");
+ if (!rowIndex) {
+ return nullptr;
+ }
if (streamIdxColumn) {
*streamIdxColumn = extractorRoots.size();
}