diff options
author | d-mokhnatkin <d-mokhnatkin@ydb.tech> | 2022-10-11 10:14:42 +0300 |
---|---|---|
committer | d-mokhnatkin <d-mokhnatkin@ydb.tech> | 2022-10-11 10:14:42 +0300 |
commit | 0a1cad3a38e0df480eb17194b64ff1965cab487e (patch) | |
tree | 05a7d454fbb94687a62169fd9d686a44a8a5fa30 | |
parent | 60c4d2a23c79eb93eaf3c5f230a48c8861f84d2b (diff) | |
download | ydb-0a1cad3a38e0df480eb17194b64ff1965cab487e.tar.gz |
new hopping syntax
-rw-r--r-- | ydb/library/yql/core/common_opt/yql_co_extr_members.cpp | 24 | ||||
-rw-r--r-- | ydb/library/yql/core/common_opt/yql_co_flow2.cpp | 26 | ||||
-rw-r--r-- | ydb/library/yql/core/type_ann/type_ann_core.cpp | 65 | ||||
-rw-r--r-- | ydb/library/yql/core/type_ann/type_ann_list.cpp | 55 | ||||
-rw-r--r-- | ydb/library/yql/providers/dq/opt/logical_optimize.cpp | 101 | ||||
-rw-r--r-- | ydb/library/yql/providers/pq/async_io/dq_pq_meta_extractor.cpp | 2 | ||||
-rw-r--r-- | ydb/library/yql/providers/pq/common/pq_meta_fields.cpp | 2 | ||||
-rw-r--r-- | ydb/library/yql/providers/pq/provider/yql_pq_datasink.cpp | 4 | ||||
-rw-r--r-- | ydb/library/yql/providers/pq/provider/yql_pq_datasource.cpp | 7 | ||||
-rw-r--r-- | ydb/library/yql/providers/pq/provider/yql_pq_datasource_type_ann.cpp | 54 | ||||
-rw-r--r-- | ydb/library/yql/providers/solomon/provider/yql_solomon_datasink.cpp | 4 | ||||
-rw-r--r-- | ydb/library/yql/sql/v1/builtin.cpp | 100 | ||||
-rw-r--r-- | ydb/library/yql/sql/v1/node.cpp | 49 | ||||
-rw-r--r-- | ydb/library/yql/sql/v1/node.h | 40 | ||||
-rw-r--r-- | ydb/library/yql/sql/v1/select.cpp | 30 | ||||
-rw-r--r-- | ydb/library/yql/sql/v1/sql.cpp | 59 |
16 files changed, 459 insertions, 163 deletions
diff --git a/ydb/library/yql/core/common_opt/yql_co_extr_members.cpp b/ydb/library/yql/core/common_opt/yql_co_extr_members.cpp index 98c428f454d..f9ae2a9aba9 100644 --- a/ydb/library/yql/core/common_opt/yql_co_extr_members.cpp +++ b/ydb/library/yql/core/common_opt/yql_co_extr_members.cpp @@ -684,10 +684,21 @@ TExprNode::TPtr ApplyExtractMembersToAggregate(const TExprNode::TPtr& node, cons sessionColumn = sessionSetting->Child(1)->Child(0)->Content(); } + TMaybe<TStringBuf> hoppingColumn = "_yql_time"; + const auto hoppingSetting = GetSetting(aggr.Settings().Ref(), "hopping"); + if (hoppingSetting) { + auto hoppingSettingValue = hoppingSetting->Child(1); + bool isLegacyHopping = !hoppingSettingValue->IsList(); + if (!isLegacyHopping) { + YQL_ENSURE(hoppingSettingValue->Child(0)->IsAtom()); + hoppingColumn = hoppingSettingValue->Child(0)->Content(); + } + } + TSet<TStringBuf> usedFields; - // all actual (non-session) keys will be used + // all actual (non-session/non-hopping) keys will be used for (const auto& key : aggr.Keys()) { - if (key.Value() != sessionColumn) { + if (key.Value() != sessionColumn && key.Value() != hoppingColumn) { usedFields.insert(key.Value()); } } @@ -736,10 +747,12 @@ TExprNode::TPtr ApplyExtractMembersToAggregate(const TExprNode::TPtr& node, cons } } - auto settings = aggr.Settings(); - auto hoppingSetting = GetSetting(settings.Ref(), "hopping"); if (hoppingSetting) { - auto traits = TCoHoppingTraits(hoppingSetting->Child(1)); + auto traitsNode = hoppingSetting->ChildPtr(1); + if (traitsNode->IsList()) { + traitsNode = traitsNode->ChildPtr(1); + } + auto traits = TCoHoppingTraits(traitsNode); auto timeExtractor = traits.TimeExtractor(); auto usedType = traits.ItemType().Ref().GetTypeAnn()->Cast<TTypeExprType>()->GetType()->Cast<TStructExprType>(); @@ -756,7 +769,6 @@ TExprNode::TPtr ApplyExtractMembersToAggregate(const TExprNode::TPtr& node, cons if (sessionSetting) { TCoSessionWindowTraits traits(sessionSetting->Child(1)->ChildPtr(1)); - // TODO: same should be done for hopping auto usedType = traits.ListType().Ref().GetTypeAnn()->Cast<TTypeExprType>()->GetType()->Cast<TListExprType>()-> GetItemType()->Cast<TStructExprType>(); for (const auto& item : usedType->GetItems()) { diff --git a/ydb/library/yql/core/common_opt/yql_co_flow2.cpp b/ydb/library/yql/core/common_opt/yql_co_flow2.cpp index 8df3f4d297c..c5e95637c00 100644 --- a/ydb/library/yql/core/common_opt/yql_co_flow2.cpp +++ b/ydb/library/yql/core/common_opt/yql_co_flow2.cpp @@ -33,9 +33,19 @@ TExprNode::TPtr AggregateSubsetFieldsAnalyzer(const TCoAggregate& node, TExprCon sessionColumn = sessionSetting->Child(1)->Child(0)->Content(); } + TMaybe<TStringBuf> hoppingColumn; + auto hoppingSetting = GetSetting(node.Settings().Ref(), "hopping"); + if (hoppingSetting) { + auto traitsNode = hoppingSetting->ChildPtr(1); + if (traitsNode->IsList()) { + YQL_ENSURE(traitsNode->Child(0)->IsAtom()); + hoppingColumn = traitsNode->Child(0)->Content(); + } + } + TSet<TStringBuf> usedFields; for (const auto& x : node.Keys()) { - if (x.Value() != sessionColumn) { + if (x.Value() != sessionColumn && x.Value() != hoppingColumn) { usedFields.insert(x.Value()); } } @@ -71,10 +81,13 @@ TExprNode::TPtr AggregateSubsetFieldsAnalyzer(const TCoAggregate& node, TExprCon } } - auto settings = node.Settings(); - auto hoppingSetting = GetSetting(settings.Ref(), "hopping"); if (hoppingSetting) { - auto traits = TCoHoppingTraits(hoppingSetting->Child(1)); + auto traitsNode = hoppingSetting->ChildPtr(1); + if (traitsNode->IsList()) { + traitsNode = traitsNode->ChildPtr(1); + } + auto traits = TCoHoppingTraits(traitsNode); + auto timeExtractor = traits.TimeExtractor(); auto usedType = traits.ItemType().Ref().GetTypeAnn()->Cast<TTypeExprType>()->GetType()->Cast<TStructExprType>(); @@ -83,10 +96,9 @@ TExprNode::TPtr AggregateSubsetFieldsAnalyzer(const TCoAggregate& node, TExprCon } TSet<TStringBuf> lambdaSubset; - if (!HaveFieldsSubset(timeExtractor.Body().Ptr(), *timeExtractor.Args().Arg(0).Raw(), lambdaSubset, parentsMap)) { - return node.Ptr(); + if (HaveFieldsSubset(timeExtractor.Body().Ptr(), *timeExtractor.Args().Arg(0).Raw(), lambdaSubset, parentsMap)) { + usedFields.insert(lambdaSubset.cbegin(), lambdaSubset.cend()); } - usedFields.insert(lambdaSubset.cbegin(), lambdaSubset.cend()); if (usedFields.size() == structType->GetSize()) { return node.Ptr(); diff --git a/ydb/library/yql/core/type_ann/type_ann_core.cpp b/ydb/library/yql/core/type_ann/type_ann_core.cpp index f6383d58a78..9ab90ef4368 100644 --- a/ydb/library/yql/core/type_ann/type_ann_core.cpp +++ b/ydb/library/yql/core/type_ann/type_ann_core.cpp @@ -2237,10 +2237,10 @@ namespace NTypeAnnImpl { bool first_signed = IsDataTypeSigned(first); bool second_signed = IsDataTypeSigned(second); if (first_width > second_width && !first_signed && second_signed || - first_width == second_width && first_signed != second_signed) + first_width == second_width && first_signed != second_signed) { auto issue = TIssue( - ctx.Expr.GetPosition(input->Pos()), + ctx.Expr.GetPosition(input->Pos()), TStringBuilder() << "Integral type implicit bitcast: " << *input->Head().GetTypeAnn() << " and " << *input->Tail().GetTypeAnn() ); SetIssueCode(EYqlIssueCode::TIssuesIds_EIssueCode_CORE_IMPLICIT_BITCAST, issue); @@ -2278,7 +2278,7 @@ namespace NTypeAnnImpl { if (check_result != IGraphTransformer::TStatus::Ok) { return check_result; } - + const bool isLeftNumeric = IsDataTypeNumeric(dataType[0]->GetSlot()); const bool isRightNumeric = IsDataTypeNumeric(dataType[1]->GetSlot()); // bool isOk = false; @@ -2354,7 +2354,7 @@ namespace NTypeAnnImpl { if (check_result != IGraphTransformer::TStatus::Ok) { return check_result; } - + const bool isLeftNumeric = IsDataTypeNumeric(dataType[0]->GetSlot()); const bool isRightNumeric = IsDataTypeNumeric(dataType[1]->GetSlot()); // bool isOk = false; @@ -2428,7 +2428,7 @@ namespace NTypeAnnImpl { if (check_result != IGraphTransformer::TStatus::Ok) { return check_result; } - + if (IsDataTypeNumeric(dataType[0]->GetSlot()) && IsDataTypeNumeric(dataType[1]->GetSlot())) { auto commonTypeSlot = GetNumericDataTypeByLevel(Max(GetNumericDataTypeLevel(dataType[0]->GetSlot()), GetNumericDataTypeLevel(dataType[1]->GetSlot()))); @@ -2484,7 +2484,7 @@ namespace NTypeAnnImpl { haveOptional |= isOptional[i]; } - + auto check_result = CheckIntegralsWidth(input, ctx, dataType[0]->GetSlot(), dataType[1]->GetSlot()); if (check_result != IGraphTransformer::TStatus::Ok) { return check_result; @@ -2547,7 +2547,7 @@ namespace NTypeAnnImpl { if (check_result != IGraphTransformer::TStatus::Ok) { return check_result; } - + if (IsDataTypeNumeric(dataType[0]->GetSlot()) && IsDataTypeNumeric(dataType[1]->GetSlot())) { auto commonTypeSlot = GetNumericDataTypeByLevel(Max(GetNumericDataTypeLevel(dataType[0]->GetSlot()), GetNumericDataTypeLevel(dataType[1]->GetSlot()))); @@ -8934,6 +8934,7 @@ template <NKikimr::NUdf::EDataSlot DataSlot> return IGraphTransformer::TStatus::Error; } + THashSet<TStringBuf> addedInProjectionFields; TVector<const TItemExprType*> allItems; for (auto& item : input->Child(1)->Children()) { if (!item->IsCallable({"SqlProjectItem", "SqlProjectStarItem"})) { @@ -8944,13 +8945,59 @@ template <NKikimr::NUdf::EDataSlot DataSlot> if (item->IsCallable("SqlProjectStarItem")) { auto& structItems = item->GetTypeAnn()->Cast<TStructExprType>()->GetItems(); - allItems.insert(allItems.end(), structItems.begin(), structItems.end()); + for (const auto& item : structItems) { + allItems.emplace_back(item); + addedInProjectionFields.emplace(item->GetName()); + } } else { YQL_ENSURE(item->Child(1)->IsAtom()); - allItems.push_back(ctx.Expr.MakeType<TItemExprType>(item->Child(1)->Content(), item->GetTypeAnn())); + const auto fieldName = item->Child(1)->Content(); + allItems.push_back(ctx.Expr.MakeType<TItemExprType>(fieldName, item->GetTypeAnn())); + addedInProjectionFields.emplace(fieldName); } } + TVector<TStringBuf> transparentFields; + for (auto& inputItem : itemType->Cast<TStructExprType>()->GetItems()) { + if (!inputItem->GetName().StartsWith("_yql_sys_tsp_") || + addedInProjectionFields.contains(inputItem->GetName())) + { + continue; + } + + transparentFields.push_back(inputItem->GetName()); + } + + if (!transparentFields.empty()) { + TVector<TExprNode::TPtr> newProjectItems; + for (const auto& item : input->Child(1)->Children()) { + newProjectItems.push_back(item); + } + + for (const auto& fieldName : transparentFields) { + auto lambdaArg = ctx.Expr.NewArgument(input->Pos(), "row"); + auto lambdaBody = ctx.Expr.NewCallable(input->Pos(), "Member", { + lambdaArg, + ctx.Expr.NewAtom(input->Pos(), fieldName) + }); + newProjectItems.push_back(ctx.Expr.NewCallable( + input->Pos(), + "SqlProjectItem", + { + ctx.Expr.NewCallable(input->Pos(), "TypeOf", { + input->Child(0) + }), + ctx.Expr.NewAtom(input->Pos(), fieldName), + ctx.Expr.NewLambda( + input->Pos(), + ctx.Expr.NewArguments(input->Pos(), {std::move(lambdaArg)}), + std::move(lambdaBody)) + })); + } + output = ctx.Expr.ChangeChild(*input, 1, ctx.Expr.NewList(input->Pos(), std::move(newProjectItems))); + return IGraphTransformer::TStatus::Repeat; + } + auto resultStructType = ctx.Expr.MakeType<TStructExprType>(allItems); if (!resultStructType->Validate(input->Pos(), ctx.Expr)) { return IGraphTransformer::TStatus::Error; diff --git a/ydb/library/yql/core/type_ann/type_ann_list.cpp b/ydb/library/yql/core/type_ann/type_ann_list.cpp index d12f33f801c..6db84f4c786 100644 --- a/ydb/library/yql/core/type_ann/type_ann_list.cpp +++ b/ydb/library/yql/core/type_ann/type_ann_list.cpp @@ -4637,6 +4637,7 @@ namespace { TVector<const TItemExprType*> rowColumns; TMaybe<TStringBuf> sessionColumnName; + TMaybe<TStringBuf> hoppingColumnName; for (const auto& setting : settings->Children()) { if (!EnsureTupleMinSize(*setting, 1, ctx.Expr)) { return IGraphTransformer::TStatus::Error; @@ -4653,13 +4654,52 @@ namespace { return IGraphTransformer::TStatus::Error; } - if (!setting->Child(1)->IsCallable("HoppingTraits")) { + auto value = setting->ChildPtr(1); + if (value->Type() == TExprNode::List) { + if (!EnsureTupleSize(*value, 2, ctx.Expr)) { + return IGraphTransformer::TStatus::Error; + } + + auto hoppingOutputColumn = value->Child(0); + if (!EnsureAtom(*hoppingOutputColumn, ctx.Expr)) { + return IGraphTransformer::TStatus::Error; + } + + if (hoppingOutputColumn->Content().Empty()) { + ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(hoppingOutputColumn->Pos()), + TStringBuilder() << "Hopping output column name can not be empty")); + return IGraphTransformer::TStatus::Error; + } + + hoppingColumnName = hoppingOutputColumn->Content(); + + auto traits = value->Child(1); + if (!traits->IsCallable("HoppingTraits")) { + ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(setting->Child(1)->Pos()), + TStringBuilder() << "Expected HoppingTraits callable")); + return IGraphTransformer::TStatus::Error; + } + + bool seenAsKeyColumn = AnyOf(input->Child(1)->ChildrenList(), [&](const auto& keyColum) { + return hoppingColumnName == keyColum->Content(); + }); + + if (!seenAsKeyColumn) { + ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(setting->Child(1)->Pos()), + TStringBuilder() << "Hopping column " << *hoppingColumnName << " is not listed in key columns")); + return IGraphTransformer::TStatus::Error; + } + } else if (setting->Child(1)->IsCallable("HoppingTraits")) { + hoppingColumnName = "_yql_time"; // legacy hopping + } else { ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(setting->Child(1)->Pos()), TStringBuilder() << "Expected HoppingTraits callable")); return IGraphTransformer::TStatus::Error; } - rowColumns.push_back(ctx.Expr.MakeType<TItemExprType>("_yql_time", - ctx.Expr.MakeType<TOptionalExprType>(ctx.Expr.MakeType<TDataExprType>(EDataSlot::Timestamp)))); + + rowColumns.push_back(ctx.Expr.MakeType<TItemExprType>( + *hoppingColumnName, + ctx.Expr.MakeType<TOptionalExprType>(ctx.Expr.MakeType<TDataExprType>(EDataSlot::Timestamp)))); } else if (settingName == "session") { if (!EnsureTupleSize(*setting, 2, ctx.Expr)) { return IGraphTransformer::TStatus::Error; @@ -4718,7 +4758,7 @@ namespace { return IGraphTransformer::TStatus::Error; } - if (sessionColumnName == child->Content()) { + if (sessionColumnName == child->Content() || hoppingColumnName == child->Content()) { continue; } @@ -5976,12 +6016,7 @@ namespace { return IGraphTransformer::TStatus::Repeat; } - const TTypeAnnotationNode* timeType = ctx.Expr.MakeType<TDataExprType>(EDataSlot::Timestamp); - - if (!IsSameAnnotation(*RemoveOptionalType(lambdaTimeExtractor->GetTypeAnn()), *timeType)) { - ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(lambdaTimeExtractor->Pos()), TStringBuilder() - << "Mismatch hopping window time extractor lambda output type, expected: " - << *timeType << ", but got: " << *lambdaTimeExtractor->GetTypeAnn())); + if (!EnsureSpecificDataType(*lambdaTimeExtractor, EDataSlot::Timestamp, ctx.Expr, true)) { return IGraphTransformer::TStatus::Error; } diff --git a/ydb/library/yql/providers/dq/opt/logical_optimize.cpp b/ydb/library/yql/providers/dq/opt/logical_optimize.cpp index c60a2290c5d..ad857041be8 100644 --- a/ydb/library/yql/providers/dq/opt/logical_optimize.cpp +++ b/ydb/library/yql/providers/dq/opt/logical_optimize.cpp @@ -17,7 +17,6 @@ using namespace NYql::NNodes; namespace { -constexpr TStringBuf YQL_TIME = "_yql_time"; TString BuildColumnName(const TExprBase column) { if (const auto columnName = column.Maybe<TCoAtom>()) { @@ -37,6 +36,10 @@ TString BuildColumnName(const TExprBase column) { << column.Ptr()->Dump()); } +bool IsLegacyHopping(const TExprNode::TPtr& hoppingSetting) { + return !hoppingSetting->Child(1)->IsList(); +} + } class TDqsLogicalOptProposalTransformer : public TOptimizeTransformerBase { @@ -81,7 +84,7 @@ protected: auto hopSetting = GetSetting(aggregate.Settings().Ref(), "hopping"); if (input) { if (hopSetting) { - return RewriteAsHoppingWindow(node, ctx, input.Cast()).Cast(); + return RewriteAsHoppingWindow(node, ctx, input.Cast()); } else { return DqRewriteAggregate(node, ctx, TypesCtx, true); } @@ -141,8 +144,14 @@ private: EnsureNotDistinct(aggregate); + const auto maybeHopTraits = ExtractHopTraits(aggregate, ctx); + if (!maybeHopTraits) { + return nullptr; + } + const auto [hoppingColumn, hopTraits] = *maybeHopTraits; + const auto aggregateInputType = GetSeqItemType(node.Ptr()->Head().GetTypeAnn())->Cast<TStructExprType>(); - TKeysDescription keysDescription(*aggregateInputType, aggregate.Keys()); + TKeysDescription keysDescription(*aggregateInputType, aggregate.Keys(), hoppingColumn); if (keysDescription.NeedPickle()) { return Build<TCoMap>(ctx, pos) @@ -157,20 +166,14 @@ private: .Done(); } - const auto maybeHopTraits = ExtractHopTraits(aggregate, ctx); - if (!maybeHopTraits) { - return nullptr; - } - const auto hopTraits = maybeHopTraits.Cast(); - - const auto keyLambda = BuildKeySelector(pos, *aggregateInputType, aggregate.Keys().Ptr(), ctx); + const auto keyLambda = keysDescription.GetKeySelector(ctx, pos, aggregateInputType); const auto timeExtractorLambda = BuildTimeExtractor(hopTraits, ctx); const auto initLambda = BuildInitHopLambda(aggregate, ctx); const auto updateLambda = BuildUpdateHopLambda(aggregate, ctx); const auto saveLambda = BuildSaveHopLambda(aggregate, ctx); const auto loadLambda = BuildLoadHopLambda(aggregate, ctx); const auto mergeLambda = BuildMergeHopLambda(aggregate, ctx); - const auto finishLambda = BuildFinishHopLambda(aggregate, ctx); + const auto finishLambda = BuildFinishHopLambda(aggregate, keysDescription.GetActualGroupKeys(), hoppingColumn, ctx); const auto watermarkMode = BuildWatermarkMode(aggregate, ctx); if (!watermarkMode) { return nullptr; @@ -215,7 +218,7 @@ private: .Done(); } else { auto wrappedInput = input.Ptr(); - if (!aggregate.Keys().Empty()) { + if (!keysDescription.MemberKeys.empty()) { // Shuffle input connection by keys wrappedInput = WrapToShuffle(keysDescription, aggregate, input, ctx); if (!wrappedInput) { @@ -250,19 +253,27 @@ private: } } - TMaybeNode<TCoHoppingTraits> ExtractHopTraits(const TCoAggregate& aggregate, TExprContext& ctx) { + TMaybe<std::pair<TString, TCoHoppingTraits>> ExtractHopTraits(const TCoAggregate& aggregate, TExprContext& ctx) { const auto pos = aggregate.Pos(); const auto hopSetting = GetSetting(aggregate.Settings().Ref(), "hopping"); if (!hopSetting) { ctx.AddError(TIssue(ctx.GetPosition(pos), "Aggregate over stream must have 'hopping' setting")); - return TMaybeNode<TCoHoppingTraits>(); + return Nothing(); } - const auto maybeTraits = TMaybeNode<TCoHoppingTraits>(hopSetting->Child(1)); + const auto hoppingColumn = IsLegacyHopping(hopSetting) + ? "_yql_time" + : TString(hopSetting->Child(1)->Child(0)->Content()); + + const auto traitsNode = IsLegacyHopping(hopSetting) + ? hopSetting->Child(1) + : hopSetting->Child(1)->Child(1); + + const auto maybeTraits = TMaybeNode<TCoHoppingTraits>(traitsNode); if (!maybeTraits) { ctx.AddError(TIssue(ctx.GetPosition(pos), "Invalid 'hopping' setting in Aggregate")); - return TMaybeNode<TCoHoppingTraits>(); + return Nothing(); } const auto traits = maybeTraits.Cast(); @@ -285,40 +296,48 @@ private: const auto hop = checkIntervalParam(traits.Hop()); if (!hop) { - return TMaybeNode<TCoHoppingTraits>(); + return Nothing(); } const auto interval = checkIntervalParam(traits.Interval()); if (!interval) { - return TMaybeNode<TCoHoppingTraits>(); + return Nothing(); } const auto delay = checkIntervalParam(traits.Delay()); if (!delay) { - return TMaybeNode<TCoHoppingTraits>(); + return Nothing(); } if (interval < hop) { ctx.AddError(TIssue(ctx.GetPosition(pos), "Interval must be greater or equal then hop")); - return TMaybeNode<TCoHoppingTraits>(); + return Nothing(); } if (delay < hop) { ctx.AddError(TIssue(ctx.GetPosition(pos), "Delay must be greater or equal then hop")); - return TMaybeNode<TCoHoppingTraits>(); + return Nothing(); } - return Build<TCoHoppingTraits>(ctx, aggregate.Pos()) + const auto newTraits = Build<TCoHoppingTraits>(ctx, aggregate.Pos()) .InitFrom(traits) .DataWatermarks(Config->AnalyticsHopping.Get().GetOrElse(false) ? ctx.NewAtom(aggregate.Pos(), "false") : traits.DataWatermarks().Ptr()) .Done(); + + return std::make_pair(hoppingColumn, newTraits); } struct TKeysDescription { TVector<TString> PickleKeys; TVector<TString> MemberKeys; + TVector<TString> FakeKeys; - explicit TKeysDescription(const TStructExprType& rowType, const TCoAtomList& keys) { + explicit TKeysDescription(const TStructExprType& rowType, const TCoAtomList& keys, const TString& hoppingColumn) { for (const auto& key : keys) { + if (key.StringValue() == hoppingColumn) { + FakeKeys.emplace_back(key.StringValue()); + continue; + } + const auto index = rowType.FindItem(key.StringValue()); Y_ENSURE(index); @@ -405,9 +424,25 @@ private: return res; } + TVector<TString> GetActualGroupKeys() { + TVector<TString> result; + result.reserve(PickleKeys.size() + MemberKeys.size()); + result.insert(result.end(), PickleKeys.begin(), PickleKeys.end()); + result.insert(result.end(), MemberKeys.begin(), MemberKeys.end()); + return result; + } + bool NeedPickle() const { return !PickleKeys.empty(); } + + TExprNode::TPtr GetKeySelector(TExprContext& ctx, TPositionHandle pos, const TStructExprType* rowType) { + auto builder = Build<TCoAtomList>(ctx, pos); + for (auto key : GetKeysList(ctx, pos)) { + builder.Add(std::move(key)); + } + return BuildKeySelector(pos, *rowType, builder.Build().Value().Ptr(), ctx); + } }; TExprNode::TPtr WrapToShuffle( @@ -622,9 +657,13 @@ private: .Ptr(); } - TExprNode::TPtr BuildFinishHopLambda(const TCoAggregate& aggregate, TExprContext& ctx) { + TExprNode::TPtr BuildFinishHopLambda( + const TCoAggregate& aggregate, + const TVector<TString>& actualGroupKeys, + const TString& hoppingColumn, + TExprContext& ctx) + { const auto pos = aggregate.Pos(); - const auto keyColumns = aggregate.Keys(); const auto aggregateHandlers = aggregate.Handlers(); const auto finishKeyArg = Build<TCoArgument>(ctx, pos).Name("key").Done(); @@ -632,17 +671,17 @@ private: const auto finishTimeArg = Build<TCoArgument>(ctx, pos).Name("time").Done(); TVector<TExprBase> structItems; - structItems.reserve(keyColumns.Size() + aggregateHandlers.Size() + 1); + structItems.reserve(actualGroupKeys.size() + aggregateHandlers.Size() + 1); - if (keyColumns.Size() == 1) { + if (actualGroupKeys.size() == 1) { structItems.push_back(Build<TCoNameValueTuple>(ctx, pos) - .Name(keyColumns.Item(0)) + .Name().Build(actualGroupKeys[0]) .Value(finishKeyArg) .Done()); } else { - for (size_t i = 0; i < keyColumns.Size(); ++i) { + for (size_t i = 0; i < actualGroupKeys.size(); ++i) { structItems.push_back(Build<TCoNameValueTuple>(ctx, pos) - .Name(keyColumns.Item(i)) + .Name().Build(actualGroupKeys[i]) .Value<TCoNth>() .Tuple(finishKeyArg) .Index<TCoAtom>() @@ -701,7 +740,7 @@ private: } structItems.push_back(Build<TCoNameValueTuple>(ctx, pos) - .Name().Build(YQL_TIME) + .Name().Build(hoppingColumn) .Value(finishTimeArg) .Done()); diff --git a/ydb/library/yql/providers/pq/async_io/dq_pq_meta_extractor.cpp b/ydb/library/yql/providers/pq/async_io/dq_pq_meta_extractor.cpp index c7507a91e01..52a9ab5d34f 100644 --- a/ydb/library/yql/providers/pq/async_io/dq_pq_meta_extractor.cpp +++ b/ydb/library/yql/providers/pq/async_io/dq_pq_meta_extractor.cpp @@ -23,7 +23,7 @@ namespace { } }, { - "_yql_sys_write_time", [](const NYdb::NPersQueue::TReadSessionEvent::TDataReceivedEvent::TMessage& message){ + "_yql_sys_tsp_write_time", [](const NYdb::NPersQueue::TReadSessionEvent::TDataReceivedEvent::TMessage& message){ using TDataType = NYql::NUdf::TDataType<NYql::NUdf::TTimestamp>; return std::make_pair( NYql::NUdf::TUnboxedValuePod(static_cast<TDataType::TLayout>(message.GetWriteTime().MicroSeconds())), diff --git a/ydb/library/yql/providers/pq/common/pq_meta_fields.cpp b/ydb/library/yql/providers/pq/common/pq_meta_fields.cpp index 2c9e94faacb..173cee83d45 100644 --- a/ydb/library/yql/providers/pq/common/pq_meta_fields.cpp +++ b/ydb/library/yql/providers/pq/common/pq_meta_fields.cpp @@ -6,7 +6,7 @@ namespace { const std::vector<NYql::TMetaFieldDescriptor> PqMetaFields = { NYql::TMetaFieldDescriptor("create_time", "_yql_sys_create_time", NYql::NUdf::EDataSlot::Timestamp), - NYql::TMetaFieldDescriptor("write_time", "_yql_sys_write_time", NYql::NUdf::EDataSlot::Timestamp), + NYql::TMetaFieldDescriptor("write_time", "_yql_sys_tsp_write_time", NYql::NUdf::EDataSlot::Timestamp), NYql::TMetaFieldDescriptor("partition_id", "_yql_sys_partition_id", NYql::NUdf::EDataSlot::Uint64), NYql::TMetaFieldDescriptor("offset", "_yql_sys_offset", NYql::NUdf::EDataSlot::Uint64), NYql::TMetaFieldDescriptor("message_group_id", "_yql_sys_message_group_id", NYql::NUdf::EDataSlot::String), diff --git a/ydb/library/yql/providers/pq/provider/yql_pq_datasink.cpp b/ydb/library/yql/providers/pq/provider/yql_pq_datasink.cpp index 380df33d529..090162046af 100644 --- a/ydb/library/yql/providers/pq/provider/yql_pq_datasink.cpp +++ b/ydb/library/yql/providers/pq/provider/yql_pq_datasink.cpp @@ -126,7 +126,9 @@ public: .World(write.World()) .DataSink(write.DataSink().Ptr()) .Topic(topicNode) - .Input(node->Child(3)) + .Input<TCoRemoveSystemMembers>() + .Input(node->Child(3)) + .Build() .Mode(settings.Mode.Cast()) .Settings(settings.Other) .Done().Ptr(); diff --git a/ydb/library/yql/providers/pq/provider/yql_pq_datasource.cpp b/ydb/library/yql/providers/pq/provider/yql_pq_datasource.cpp index ec7623ab155..582456cc1a5 100644 --- a/ydb/library/yql/providers/pq/provider/yql_pq_datasource.cpp +++ b/ydb/library/yql/providers/pq/provider/yql_pq_datasource.cpp @@ -120,11 +120,16 @@ public: .Metadata().Add(sourceMetadata).Build() .Done(); + auto format = topicKeyParser.GetFormat(); + if (format.Empty()) { + format = "raw"; + } + auto builder = Build<TPqReadTopic>(ctx, read.Pos()) .World(read.World()) .DataSource(read.DataSource()) .Topic(std::move(topicNode)) - .Format().Value(topicKeyParser.GetFormat()).Build() + .Format().Value(format).Build() .Compression().Value(topicKeyParser.GetCompression()).Build(); if (topicKeyParser.GetColumnOrder()) { diff --git a/ydb/library/yql/providers/pq/provider/yql_pq_datasource_type_ann.cpp b/ydb/library/yql/providers/pq/provider/yql_pq_datasource_type_ann.cpp index 8fa309a590d..639dcf3024e 100644 --- a/ydb/library/yql/providers/pq/provider/yql_pq_datasource_type_ann.cpp +++ b/ydb/library/yql/providers/pq/provider/yql_pq_datasource_type_ann.cpp @@ -215,36 +215,36 @@ public: if (row->GetTypeAnn()->GetKind() == ETypeAnnotationKind::Struct) { auto structType = row->GetTypeAnn()->Cast<TStructExprType>(); - if (auto pos = structType->FindItem(descriptor->SysColumn)) { - bool isOptional = false; - const TDataExprType* dataType = nullptr; - if (!EnsureDataOrOptionalOfData(row->Pos(), structType->GetItems()[*pos]->GetItemType(), isOptional, dataType, ctx)) { - return IGraphTransformer::TStatus::Error; - } + auto pos = structType->FindItem(descriptor->SysColumn); + if (!pos && descriptor->Key == "write_time") { + // Allow user to specify column manually. It is required for analytics hopping now. + // In future it will be removed (when custom event_time will be implemented) + pos = structType->FindItem("_row_time"); + } - if (!EnsureSpecificDataType(row->Pos(), *dataType, descriptor->Type, ctx)) { - return IGraphTransformer::TStatus::Error; - } + if (!pos) { + ctx.AddError(TIssue(ctx.GetPosition(input->Pos()), TStringBuilder() << "Wrong place to use SystemMetadata")); + return IGraphTransformer::TStatus::Error; + } - output = ctx.Builder(input->Pos()) - .Callable("Member") - .Add(0, row) - .Atom(1, descriptor->SysColumn, TNodeFlags::Default) - .Seal() - .Build(); - - if (isOptional) { - output = ctx.Builder(input->Pos()) - .Callable("Coalesce") - .Add(0, output) - .Callable(1, "String") - .Atom(0, "0", TNodeFlags::Default) - .Seal() - .Seal() - .Build(); - } - return IGraphTransformer::TStatus::Repeat; + bool isOptional = false; + const TDataExprType* dataType = nullptr; + if (!EnsureDataOrOptionalOfData(row->Pos(), structType->GetItems()[*pos]->GetItemType(), isOptional, dataType, ctx)) { + return IGraphTransformer::TStatus::Error; } + + if (!EnsureSpecificDataType(row->Pos(), *dataType, descriptor->Type, ctx)) { + return IGraphTransformer::TStatus::Error; + } + + output = ctx.Builder(input->Pos()) + .Callable("Member") + .Add(0, row) + .Atom(1, structType->GetItems()[*pos]->GetName(), TNodeFlags::Default) + .Seal() + .Build(); + + return IGraphTransformer::TStatus::Repeat; } input->SetTypeAnn(ctx.MakeType<TDataExprType>(descriptor->Type)); diff --git a/ydb/library/yql/providers/solomon/provider/yql_solomon_datasink.cpp b/ydb/library/yql/providers/solomon/provider/yql_solomon_datasink.cpp index b09fd381a07..b8e952c920b 100644 --- a/ydb/library/yql/providers/solomon/provider/yql_solomon_datasink.cpp +++ b/ydb/library/yql/providers/solomon/provider/yql_solomon_datasink.cpp @@ -137,7 +137,9 @@ public: .World(write.World()) .DataSink(write.DataSink()) .Shard<TCoAtom>().Value(tablePath->Content()).Build() - .Input(write.Arg(3)) + .Input<TCoRemoveSystemMembers>() + .Input(write.Arg(3)) + .Build() .Done().Ptr(); } diff --git a/ydb/library/yql/sql/v1/builtin.cpp b/ydb/library/yql/sql/v1/builtin.cpp index 5ee117730cf..c3bf4a7c780 100644 --- a/ydb/library/yql/sql/v1/builtin.cpp +++ b/ydb/library/yql/sql/v1/builtin.cpp @@ -2067,6 +2067,84 @@ private: TNodePtr Node; }; +THoppingWindow::THoppingWindow(TPosition pos, const TVector<TNodePtr>& args) + : INode(pos) + , Args(args) + , FakeSource(BuildFakeSource(pos)) + , Valid(false) +{} + +void THoppingWindow::MarkValid() { + YQL_ENSURE(!HasState(ENodeState::Initialized)); + Valid = true; +} + +TNodePtr THoppingWindow::BuildTraits(const TString& label) const { + YQL_ENSURE(HasState(ENodeState::Initialized)); + + return Y( + "HoppingTraits", + Y("ListItemType", Y("TypeOf", label)), + BuildLambda(Pos, Y("row"), Y("Just", Y("SystemMetadata", Y("String", Q("write_time")), Y("DependsOn", "row")))), + Hop, + Interval, + Interval, + Q("true")); +} + +bool THoppingWindow::DoInit(TContext& ctx, ISource* src) { + if (!src || src->IsFake()) { + ctx.Error(Pos) << "HoppingWindow requires data source"; + return false; + } + + if (!(Args.size() == 2)) { + ctx.Error(Pos) << "HoppingWindow requires two arguments"; + return false; + } + + if (!Valid) { + ctx.Error(Pos) << "HoppingWindow can only be used as a top-level GROUP BY expression"; + return false; + } + + auto hopExpr = Args[0]; + auto intervalExpr = Args[1]; + if (!(hopExpr->Init(ctx, FakeSource.Get()) && intervalExpr->Init(ctx, FakeSource.Get()))) { + return false; + } + + Hop = ProcessIntervalParam(hopExpr); + Interval = ProcessIntervalParam(intervalExpr); + + return true; +} + +TAstNode* THoppingWindow::Translate(TContext&) const { + YQL_ENSURE(false, "Translate is called for HoppingWindow"); + return nullptr; +} + +void THoppingWindow::DoUpdateState() const { + State.Set(ENodeState::Const, false); +} + +TNodePtr THoppingWindow::DoClone() const { + return new THoppingWindow(Pos, CloneContainer(Args)); +} + +TString THoppingWindow::GetOpName() const { + return "HoppingWindow"; +} + +TNodePtr THoppingWindow::ProcessIntervalParam(const TNodePtr& node) const { + auto literal = node->GetLiteral("String"); + if (!literal) { + return Y("EvaluateExpr", node); + } + + return new TYqlData(node->GetPos(), "Interval", {node}); +} TNodePtr BuildUdfUserTypeArg(TPosition pos, const TVector<TNodePtr>& args, TNodePtr customUserType) { TVector<TNodePtr> argsTypeItems; @@ -2414,22 +2492,31 @@ private: bool DoInit(TContext& ctx, ISource* src) override { Y_UNUSED(ctx); - auto window = src->GetHoppingWindowSpec(); - if (!window) { + auto legacySpec = src->GetLegacyHoppingWindowSpec(); + auto spec = src->GetHoppingWindowSpec(); + if (!legacySpec && !spec) { ctx.Error(Pos) << "No hopping window parameters in aggregation"; return false; } Nodes.clear(); + const auto fieldName = legacySpec + ? "_yql_time" + : spec->GetLabel(); + + const auto interval = legacySpec + ? legacySpec->Interval + : dynamic_cast<THoppingWindow*>(spec.Get())->Interval; + if (!IsStart) { - Add("Member", "row", Q("_yql_time")); + Add("Member", "row", Q(fieldName)); return true; } Add("Sub", - Y("Member", "row", Q("_yql_time")), - window->Interval); + Y("Member", "row", Q(fieldName)), + interval); return true; } @@ -2930,6 +3017,9 @@ struct TBuiltinFuncData { {"sessionstart", BuildSimpleBuiltinFactoryCallback<TSessionStart<true>>()}, {"sessionstate", BuildSimpleBuiltinFactoryCallback<TSessionStart<false>>()}, + // New hopping + {"hoppingwindow", BuildSimpleBuiltinFactoryCallback<THoppingWindow>()}, + // Hopping intervals time functions {"hopstart", BuildSimpleBuiltinFactoryCallback<THoppingTime<true>>()}, {"hopend", BuildSimpleBuiltinFactoryCallback<THoppingTime<false>>()}, diff --git a/ydb/library/yql/sql/v1/node.cpp b/ydb/library/yql/sql/v1/node.cpp index 8a1e56b4c94..c8ad9f85671 100644 --- a/ydb/library/yql/sql/v1/node.cpp +++ b/ydb/library/yql/sql/v1/node.cpp @@ -1002,8 +1002,8 @@ TWindowSpecificationPtr TWindowSpecification::Clone() const { return res; } -THoppingWindowSpecPtr THoppingWindowSpec::Clone() const { - auto res = MakeIntrusive<THoppingWindowSpec>(); +TLegacyHoppingWindowSpecPtr TLegacyHoppingWindowSpec::Clone() const { + auto res = MakeIntrusive<TLegacyHoppingWindowSpec>(); res->TimeExtractor = TimeExtractor->Clone(); res->Hop = Hop->Clone(); res->Interval = Interval->Clone(); @@ -1423,6 +1423,14 @@ bool ISource::AddExpressions(TContext& ctx, const TVector<TNodePtr>& expressions } SessionWindow = expr; } + if (auto hoppingWindow = dynamic_cast<THoppingWindow*>(expr.Get())) { + if (HoppingWindow) { + ctx.Error(expr->GetPos()) << "Duplicate hopping window specification:"; + ctx.Error(HoppingWindow->GetPos()) << "Previous hopping window is declared here"; + return false; + } + HoppingWindow = expr; + } } Expressions(exprSeat).emplace_back(expr); } @@ -1481,18 +1489,22 @@ const TVector<TString>& ISource::GetTmpWindowColumns() const { return TmpWindowColumns; } -void ISource::SetHoppingWindowSpec(THoppingWindowSpecPtr spec) { - HoppingWindowSpec = spec; +void ISource::SetLegacyHoppingWindowSpec(TLegacyHoppingWindowSpecPtr spec) { + LegacyHoppingWindowSpec = spec; } -THoppingWindowSpecPtr ISource::GetHoppingWindowSpec() const { - return HoppingWindowSpec; +TLegacyHoppingWindowSpecPtr ISource::GetLegacyHoppingWindowSpec() const { + return LegacyHoppingWindowSpec; } TNodePtr ISource::GetSessionWindowSpec() const { return SessionWindow; } +TNodePtr ISource::GetHoppingWindowSpec() const { + return HoppingWindow; +} + TWindowSpecificationPtr ISource::FindWindowSpecification(TContext& ctx, const TString& windowName) const { auto winIter = WinSpecs.find(windowName); if (winIter == WinSpecs.end()) { @@ -1708,6 +1720,9 @@ TNodePtr BuildLambdaBodyForExprAliases(TPosition pos, const TVector<TNodePtr>& e if (dynamic_cast<const TSessionWindow*>(exprNode.Get())) { continue; } + if (dynamic_cast<const THoppingWindow*>(exprNode.Get())) { + continue; + } structObj = structObj->Y("AddMember", structObj, structObj->Q(name), exprNode); } return structObj->Y("AsList", structObj); @@ -1775,7 +1790,7 @@ bool ISource::SetSamplingRate(TContext& ctx, TNodePtr samplingRate) { } std::pair<TNodePtr, bool> ISource::BuildAggregation(const TString& label, TContext& ctx) { - if (GroupKeys.empty() && Aggregations.empty() && !IsCompositeSource() && !HoppingWindowSpec) { + if (GroupKeys.empty() && Aggregations.empty() && !IsCompositeSource() && !LegacyHoppingWindowSpec) { return { nullptr, true }; } @@ -1819,15 +1834,15 @@ std::pair<TNodePtr, bool> ISource::BuildAggregation(const TString& label, TConte options = L(options, Q(Y(Q("compact")))); } - if (HoppingWindowSpec) { + if (LegacyHoppingWindowSpec) { auto hoppingTraits = Y( "HoppingTraits", Y("ListItemType", listType), - BuildLambda(Pos, Y("row"), HoppingWindowSpec->TimeExtractor), - HoppingWindowSpec->Hop, - HoppingWindowSpec->Interval, - HoppingWindowSpec->Delay, - HoppingWindowSpec->DataWatermarks ? Q("true") : Q("false")); + BuildLambda(Pos, Y("row"), LegacyHoppingWindowSpec->TimeExtractor), + LegacyHoppingWindowSpec->Hop, + LegacyHoppingWindowSpec->Interval, + LegacyHoppingWindowSpec->Delay, + LegacyHoppingWindowSpec->DataWatermarks ? Q("true") : Q("false")); options = L(options, Q(Y(Q("hopping"), hoppingTraits))); } @@ -1840,6 +1855,14 @@ std::pair<TNodePtr, bool> ISource::BuildAggregation(const TString& label, TConte Q(Y(BuildQuotedAtom(Pos, SessionWindow->GetLabel()), sessionWindow->BuildTraits(label)))))); } + if (HoppingWindow) { + YQL_ENSURE(HoppingWindow->GetLabel()); + auto hoppingWindow = dynamic_cast<THoppingWindow*>(HoppingWindow.Get()); + YQL_ENSURE(hoppingWindow); + options = L(options, Q(Y(Q("hopping"), + Q(Y(BuildQuotedAtom(Pos, HoppingWindow->GetLabel()), hoppingWindow->BuildTraits(label)))))); + } + return { Y("AssumeColumnOrderPartial", Y("Aggregate" + GroupBySuffix, label, Q(keysTuple), Q(aggrArgs), Q(options)), Q(keysTuple)), true }; } diff --git a/ydb/library/yql/sql/v1/node.h b/ydb/library/yql/sql/v1/node.h index 6cdc0098ecc..7028f0bbaaa 100644 --- a/ydb/library/yql/sql/v1/node.h +++ b/ydb/library/yql/sql/v1/node.h @@ -606,17 +606,17 @@ namespace NSQLTranslationV1 { }; typedef TIntrusivePtr<TFrameSpecification> TFrameSpecificationPtr; - struct THoppingWindowSpec: public TSimpleRefCount<THoppingWindowSpec> { + struct TLegacyHoppingWindowSpec: public TSimpleRefCount<TLegacyHoppingWindowSpec> { TNodePtr TimeExtractor; TNodePtr Hop; TNodePtr Interval; TNodePtr Delay; bool DataWatermarks; - TIntrusivePtr<THoppingWindowSpec> Clone() const; - ~THoppingWindowSpec() {} + TIntrusivePtr<TLegacyHoppingWindowSpec> Clone() const; + ~TLegacyHoppingWindowSpec() {} }; - typedef TIntrusivePtr<THoppingWindowSpec> THoppingWindowSpecPtr; + typedef TIntrusivePtr<TLegacyHoppingWindowSpec> TLegacyHoppingWindowSpecPtr; struct TWindowSpecification: public TSimpleRefCount<TWindowSpecification> { TMaybe<TString> ExistingWindowName; @@ -838,9 +838,10 @@ namespace NSQLTranslationV1 { virtual void AddWindowSpecs(TWinSpecs winSpecs); virtual bool AddAggregationOverWindow(TContext& ctx, const TString& windowName, TAggregationPtr func); virtual bool AddFuncOverWindow(TContext& ctx, const TString& windowName, TNodePtr func); - virtual void SetHoppingWindowSpec(THoppingWindowSpecPtr spec); - virtual THoppingWindowSpecPtr GetHoppingWindowSpec() const; + virtual void SetLegacyHoppingWindowSpec(TLegacyHoppingWindowSpecPtr spec); + virtual TLegacyHoppingWindowSpecPtr GetLegacyHoppingWindowSpec() const; virtual TNodePtr GetSessionWindowSpec() const; + virtual TNodePtr GetHoppingWindowSpec() const; virtual bool IsCompositeSource() const; virtual bool IsGroupByColumn(const TString& column) const; virtual bool IsFlattenByColumns() const; @@ -916,8 +917,9 @@ namespace NSQLTranslationV1 { TMap<TString, TVector<TAggregationPtr>> AggregationOverWindow; TMap<TString, TVector<TNodePtr>> FuncOverWindow; TWinSpecs WinSpecs; - THoppingWindowSpecPtr HoppingWindowSpec; + TLegacyHoppingWindowSpecPtr LegacyHoppingWindowSpec; TNodePtr SessionWindow; + TNodePtr HoppingWindow; TVector<ISource*> UsedSources; TString FlattenMode; bool FlattenColumns = false; @@ -1049,6 +1051,28 @@ namespace NSQLTranslationV1 { bool Valid; }; + class THoppingWindow final : public INode { + public: + THoppingWindow(TPosition pos, const TVector<TNodePtr>& args); + void MarkValid(); + TNodePtr BuildTraits(const TString& label) const; + public: + TNodePtr Hop; + TNodePtr Interval; + private: + bool DoInit(TContext& ctx, ISource* src) override; + TAstNode* Translate(TContext&) const override; + void DoUpdateState() const override; + TNodePtr DoClone() const override; + TString GetOpName() const override; + TNodePtr ProcessIntervalParam(const TNodePtr& val) const; + + TVector<TNodePtr> Args; + TSourcePtr FakeSource; + TNodePtr Node; + bool Valid; + }; + struct TStringContent { TString Content; NYql::NUdf::EDataSlot Type = NYql::NUdf::EDataSlot::String; @@ -1296,7 +1320,7 @@ namespace NSQLTranslationV1 { const TVector<TSortSpecificationPtr>& orderBy, TNodePtr having, TWinSpecs&& windowSpec, - THoppingWindowSpecPtr hoppingWindowSpec, + TLegacyHoppingWindowSpecPtr legacyHoppingWindowSpec, TVector<TNodePtr>&& terms, bool distinct, TVector<TNodePtr>&& without, diff --git a/ydb/library/yql/sql/v1/select.cpp b/ydb/library/yql/sql/v1/select.cpp index ac37571185c..77faabd4236 100644 --- a/ydb/library/yql/sql/v1/select.cpp +++ b/ydb/library/yql/sql/v1/select.cpp @@ -1409,7 +1409,7 @@ public: const TVector<TSortSpecificationPtr>& orderBy, TNodePtr having, TWinSpecs& winSpecs, - THoppingWindowSpecPtr hoppingWindowSpec, + TLegacyHoppingWindowSpecPtr legacyHoppingWindowSpec, const TVector<TNodePtr>& terms, bool distinct, const TVector<TNodePtr>& without, @@ -1429,7 +1429,7 @@ public: , Terms(terms) , Without(without) , Distinct(distinct) - , HoppingWindowSpec(hoppingWindowSpec) + , LegacyHoppingWindowSpec(legacyHoppingWindowSpec) , SelectStream(selectStream) , Settings(settings) { @@ -1495,6 +1495,10 @@ public: sessionWindow->MarkValid(); } + if (auto hoppingWindow = dynamic_cast<THoppingWindow*>(expr.Get())) { + hoppingWindow->MarkValid(); + } + // need to collect and Init() preaggregated exprs before calling Init() on GROUP BY expression TVector<TNodePtr> distinctAggrsInGroupBy; expr->CollectPreaggregateExprs(ctx, *src, distinctAggrsInGroupBy); @@ -1773,7 +1777,7 @@ public: } return new TSelectCore(Pos, Source->CloneSource(), CloneContainer(GroupByExpr), CloneContainer(GroupBy), CompactGroupBy, GroupBySuffix, AssumeSorted, CloneContainer(OrderBy), - SafeClone(Having), newSpecs, SafeClone(HoppingWindowSpec), + SafeClone(Having), newSpecs, SafeClone(LegacyHoppingWindowSpec), CloneContainer(Terms), Distinct, Without, SelectStream, Settings); } @@ -1808,11 +1812,11 @@ private: } } - if (HoppingWindowSpec) { - if (!HoppingWindowSpec->TimeExtractor->Init(ctx, src)) { + if (LegacyHoppingWindowSpec) { + if (!LegacyHoppingWindowSpec->TimeExtractor->Init(ctx, src)) { hasError = true; } - src->SetHoppingWindowSpec(HoppingWindowSpec); + src->SetLegacyHoppingWindowSpec(LegacyHoppingWindowSpec); } for (auto& term: Terms) { @@ -2109,7 +2113,7 @@ private: TVector<TNodePtr> Without; const bool Distinct; bool OrderByInit = false; - THoppingWindowSpecPtr HoppingWindowSpec; + TLegacyHoppingWindowSpecPtr LegacyHoppingWindowSpec; const bool SelectStream; const TWriteSettings Settings; }; @@ -2480,7 +2484,7 @@ TSourcePtr DoBuildSelectCore( const TVector<TSortSpecificationPtr>& orderBy, TNodePtr having, TWinSpecs&& winSpecs, - THoppingWindowSpecPtr hoppingWindowSpec, + TLegacyHoppingWindowSpecPtr legacyHoppingWindowSpec, TVector<TNodePtr>&& terms, bool distinct, TVector<TNodePtr>&& without, @@ -2489,7 +2493,7 @@ TSourcePtr DoBuildSelectCore( ) { if (groupBy.empty() || !groupBy.front()->ContentListPtr()) { return new TSelectCore(pos, std::move(source), groupByExpr, groupBy, compactGroupBy, groupBySuffix, assumeSorted, - orderBy, having, winSpecs, hoppingWindowSpec, terms, distinct, without, selectStream, settings); + orderBy, having, winSpecs, legacyHoppingWindowSpec, terms, distinct, without, selectStream, settings); } if (groupBy.size() == 1) { /// actualy no big idea to use grouping function in this case (result allways 0) @@ -2497,7 +2501,7 @@ TSourcePtr DoBuildSelectCore( source = new TNestedProxySource(pos, *contentPtr, source); return DoBuildSelectCore(ctx, pos, originalSource, source, groupByExpr, *contentPtr, compactGroupBy, groupBySuffix, assumeSorted, orderBy, having, std::move(winSpecs), - hoppingWindowSpec, std::move(terms), distinct, std::move(without), selectStream, settings); + legacyHoppingWindowSpec, std::move(terms), distinct, std::move(without), selectStream, settings); } /// \todo some smart merge logic, generalize common part of grouping (expr, flatten, etc)? TIntrusivePtr<TCompositeSelect> compositeSelect = new TCompositeSelect(pos, std::move(source), originalSource->CloneSource(), settings); @@ -2524,7 +2528,7 @@ TSourcePtr DoBuildSelectCore( totalGroups += contentPtr->size(); TSelectCore* selectCore = new TSelectCore(pos, std::move(proxySource), CloneContainer(groupByExpr), CloneContainer(*contentPtr), compactGroupBy, groupBySuffix, assumeSorted, orderBy, SafeClone(having), winSpecs, - hoppingWindowSpec, terms, distinct, without, selectStream, settings); + legacyHoppingWindowSpec, terms, distinct, without, selectStream, settings); subselects.emplace_back(selectCore); } if (totalGroups > ctx.PragmaGroupByLimit) { @@ -2549,7 +2553,7 @@ TSourcePtr BuildSelectCore( const TVector<TSortSpecificationPtr>& orderBy, TNodePtr having, TWinSpecs&& winSpecs, - THoppingWindowSpecPtr hoppingWindowSpec, + TLegacyHoppingWindowSpecPtr legacyHoppingWindowSpec, TVector<TNodePtr>&& terms, bool distinct, TVector<TNodePtr>&& without, @@ -2557,7 +2561,7 @@ TSourcePtr BuildSelectCore( const TWriteSettings& settings) { return DoBuildSelectCore(ctx, pos, source, source, groupByExpr, groupBy, compactGroupBy, groupBySuffix, assumeSorted, orderBy, - having, std::move(winSpecs), hoppingWindowSpec, std::move(terms), distinct, std::move(without), selectStream, settings); + having, std::move(winSpecs), legacyHoppingWindowSpec, std::move(terms), distinct, std::move(without), selectStream, settings); } class TUnionAll: public IRealSource { diff --git a/ydb/library/yql/sql/v1/sql.cpp b/ydb/library/yql/sql/v1/sql.cpp index 1594a3095b9..b86952f1ba8 100644 --- a/ydb/library/yql/sql/v1/sql.cpp +++ b/ydb/library/yql/sql/v1/sql.cpp @@ -3593,13 +3593,13 @@ public: , CompactGroupBy(false) {} - bool Build(const TRule_group_by_clause& node, bool stream); + bool Build(const TRule_group_by_clause& node); bool ParseList(const TRule_grouping_element_list& groupingListNode, EGroupByFeatures featureContext); void SetFeatures(const TString& field) const; TVector<TNodePtr>& Content(); TMap<TString, TNodePtr>& Aliases(); - THoppingWindowSpecPtr GetHoppingWindow() const; + TLegacyHoppingWindowSpecPtr GetLegacyHoppingWindow() const; bool IsCompactGroupBy() const; TString GetSuffix() const; @@ -3622,7 +3622,7 @@ private: TVector<TNodePtr> GroupBySet; TGroupByClauseCtx::TPtr GroupSetContext; - THoppingWindowSpecPtr HoppingWindowSpec; // stream queries + TLegacyHoppingWindowSpecPtr LegacyHoppingWindowSpec; // stream queries static const TString AutogenerateNamePrefix; bool CompactGroupBy; TString Suffix; @@ -6710,23 +6710,30 @@ TSourcePtr TSqlSelect::SelectCore(const TRule_select_core& node, const TWriteSet /// \todo merge gtoupByExpr and groupBy in one TVector<TNodePtr> groupByExpr, groupBy; - THoppingWindowSpecPtr hoppingWindowSpec; + TLegacyHoppingWindowSpecPtr legacyHoppingWindowSpec; bool compactGroupBy = false; TString groupBySuffix; if (node.HasBlock11()) { TGroupByClause clause(Ctx, Mode); - if (!clause.Build(node.GetBlock11().GetRule_group_by_clause1(), source->IsStream())) { + if (!clause.Build(node.GetBlock11().GetRule_group_by_clause1())) { return nullptr; } + bool hasHopping = (bool)clause.GetLegacyHoppingWindow(); for (const auto& exprAlias: clause.Aliases()) { YQL_ENSURE(exprAlias.first == exprAlias.second->GetLabel()); groupByExpr.emplace_back(exprAlias.second); + hasHopping |= (bool)dynamic_cast<THoppingWindow*>(exprAlias.second.Get()); } groupBy = std::move(clause.Content()); clause.SetFeatures("sql_features"); - hoppingWindowSpec = clause.GetHoppingWindow(); + legacyHoppingWindowSpec = clause.GetLegacyHoppingWindow(); compactGroupBy = clause.IsCompactGroupBy(); groupBySuffix = clause.GetSuffix(); + + if (source->IsStream() && !hasHopping) { + Ctx.Error() << "Streaming group by query must have a hopping window specification."; + return nullptr; + } } TNodePtr having; @@ -6815,7 +6822,7 @@ TSourcePtr TSqlSelect::SelectCore(const TRule_select_core& node, const TWriteSet return nullptr; } return BuildSelectCore(Ctx, startPos, std::move(source), groupByExpr, groupBy, compactGroupBy, groupBySuffix, assumeSorted, orderBy, having, - std::move(windowSpec), hoppingWindowSpec, std::move(terms), distinct, std::move(without), selectStream, settings); + std::move(windowSpec), legacyHoppingWindowSpec, std::move(terms), distinct, std::move(without), selectStream, settings); } TString TSqlTranslation::FrameSettingsToString(EFrameSettings settings, bool isUnbounded) { @@ -7188,7 +7195,7 @@ bool TSqlTranslation::OrderByClause(const TRule_order_by_clause& node, TVector<T return SortSpecificationList(node.GetRule_sort_specification_list3(), orderBy); } -bool TGroupByClause::Build(const TRule_group_by_clause& node, bool stream) { +bool TGroupByClause::Build(const TRule_group_by_clause& node) { // group_by_clause: GROUP COMPACT? BY opt_set_quantifier grouping_element_list (WITH an_id)?; CompactGroupBy = node.HasBlock2(); if (!CompactGroupBy) { @@ -7236,12 +7243,6 @@ bool TGroupByClause::Build(const TRule_group_by_clause& node, bool stream) { if (!ResolveGroupByAndGrouping()) { return false; } - if (stream) { - if (!HoppingWindowSpec) { - Ctx.Error() << "Streaming group by query must have a hopping window specification."; - return false; - } - } return true; } @@ -7288,8 +7289,8 @@ TMap<TString, TNodePtr>& TGroupByClause::Aliases() { return GroupSetContext->NodeAliases; } -THoppingWindowSpecPtr TGroupByClause::GetHoppingWindow() const { - return HoppingWindowSpec; +TLegacyHoppingWindowSpecPtr TGroupByClause::GetLegacyHoppingWindow() const { + return LegacyHoppingWindowSpec; } bool TGroupByClause::IsCompactGroupBy() const { @@ -7543,16 +7544,16 @@ bool TGroupByClause::OrdinaryGroupingSetList(const TRule_ordinary_grouping_set_l } bool TGroupByClause::HoppingWindow(const TRule_hopping_window_specification& node) { - if (HoppingWindowSpec) { + if (LegacyHoppingWindowSpec) { Ctx.Error() << "Duplicate hopping window specification."; return false; } - HoppingWindowSpec = new THoppingWindowSpec; + LegacyHoppingWindowSpec = new TLegacyHoppingWindowSpec; { TColumnRefScope scope(Ctx, EColumnRefState::Allow); TSqlExpression expr(Ctx, Mode); - HoppingWindowSpec->TimeExtractor = expr.Build(node.GetRule_expr3()); - if (!HoppingWindowSpec->TimeExtractor) { + LegacyHoppingWindowSpec->TimeExtractor = expr.Build(node.GetRule_expr3()); + if (!LegacyHoppingWindowSpec->TimeExtractor) { return false; } } @@ -7591,19 +7592,19 @@ bool TGroupByClause::HoppingWindow(const TRule_hopping_window_specification& nod }); }; - HoppingWindowSpec->Hop = processIntervalParam(node.GetRule_expr5()); - if (!HoppingWindowSpec->Hop) { + LegacyHoppingWindowSpec->Hop = processIntervalParam(node.GetRule_expr5()); + if (!LegacyHoppingWindowSpec->Hop) { return false; } - HoppingWindowSpec->Interval = processIntervalParam(node.GetRule_expr7()); - if (!HoppingWindowSpec->Interval) { + LegacyHoppingWindowSpec->Interval = processIntervalParam(node.GetRule_expr7()); + if (!LegacyHoppingWindowSpec->Interval) { return false; } - HoppingWindowSpec->Delay = processIntervalParam(node.GetRule_expr9()); - if (!HoppingWindowSpec->Delay) { + LegacyHoppingWindowSpec->Delay = processIntervalParam(node.GetRule_expr9()); + if (!LegacyHoppingWindowSpec->Delay) { return false; } - HoppingWindowSpec->DataWatermarks = Ctx.PragmaDataWatermarks; + LegacyHoppingWindowSpec->DataWatermarks = Ctx.PragmaDataWatermarks; return true; } @@ -7886,7 +7887,7 @@ TSourcePtr TSqlSelect::Build(const TRule& node, TPosition pos, TSelectKindResult TString groupBySuffix = ""; TNodePtr having; TWinSpecs winSpecs; - THoppingWindowSpecPtr hoppingWindowSpec; + TLegacyHoppingWindowSpecPtr legacyHoppingWindowSpec; bool distinct = false; TVector<TNodePtr> without; bool stream = false; @@ -7895,7 +7896,7 @@ TSourcePtr TSqlSelect::Build(const TRule& node, TPosition pos, TSelectKindResult terms.push_back(BuildColumn(unionPos, "*", "")); result = BuildSelectCore(Ctx, unionPos, std::move(result), groupByExpr, groupBy, compactGroupBy, groupBySuffix, - assumeOrderBy, orderBy, having, std::move(winSpecs), hoppingWindowSpec, std::move(terms), + assumeOrderBy, orderBy, having, std::move(winSpecs), legacyHoppingWindowSpec, std::move(terms), distinct, std::move(without), stream, settings); } else { result = BuildUnionAll(unionPos, std::move(sources), settings); |