aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authord-mokhnatkin <d-mokhnatkin@ydb.tech>2022-10-11 10:14:42 +0300
committerd-mokhnatkin <d-mokhnatkin@ydb.tech>2022-10-11 10:14:42 +0300
commit0a1cad3a38e0df480eb17194b64ff1965cab487e (patch)
tree05a7d454fbb94687a62169fd9d686a44a8a5fa30
parent60c4d2a23c79eb93eaf3c5f230a48c8861f84d2b (diff)
downloadydb-0a1cad3a38e0df480eb17194b64ff1965cab487e.tar.gz
new hopping syntax
-rw-r--r--ydb/library/yql/core/common_opt/yql_co_extr_members.cpp24
-rw-r--r--ydb/library/yql/core/common_opt/yql_co_flow2.cpp26
-rw-r--r--ydb/library/yql/core/type_ann/type_ann_core.cpp65
-rw-r--r--ydb/library/yql/core/type_ann/type_ann_list.cpp55
-rw-r--r--ydb/library/yql/providers/dq/opt/logical_optimize.cpp101
-rw-r--r--ydb/library/yql/providers/pq/async_io/dq_pq_meta_extractor.cpp2
-rw-r--r--ydb/library/yql/providers/pq/common/pq_meta_fields.cpp2
-rw-r--r--ydb/library/yql/providers/pq/provider/yql_pq_datasink.cpp4
-rw-r--r--ydb/library/yql/providers/pq/provider/yql_pq_datasource.cpp7
-rw-r--r--ydb/library/yql/providers/pq/provider/yql_pq_datasource_type_ann.cpp54
-rw-r--r--ydb/library/yql/providers/solomon/provider/yql_solomon_datasink.cpp4
-rw-r--r--ydb/library/yql/sql/v1/builtin.cpp100
-rw-r--r--ydb/library/yql/sql/v1/node.cpp49
-rw-r--r--ydb/library/yql/sql/v1/node.h40
-rw-r--r--ydb/library/yql/sql/v1/select.cpp30
-rw-r--r--ydb/library/yql/sql/v1/sql.cpp59
16 files changed, 459 insertions, 163 deletions
diff --git a/ydb/library/yql/core/common_opt/yql_co_extr_members.cpp b/ydb/library/yql/core/common_opt/yql_co_extr_members.cpp
index 98c428f454d..f9ae2a9aba9 100644
--- a/ydb/library/yql/core/common_opt/yql_co_extr_members.cpp
+++ b/ydb/library/yql/core/common_opt/yql_co_extr_members.cpp
@@ -684,10 +684,21 @@ TExprNode::TPtr ApplyExtractMembersToAggregate(const TExprNode::TPtr& node, cons
sessionColumn = sessionSetting->Child(1)->Child(0)->Content();
}
+ TMaybe<TStringBuf> hoppingColumn = "_yql_time";
+ const auto hoppingSetting = GetSetting(aggr.Settings().Ref(), "hopping");
+ if (hoppingSetting) {
+ auto hoppingSettingValue = hoppingSetting->Child(1);
+ bool isLegacyHopping = !hoppingSettingValue->IsList();
+ if (!isLegacyHopping) {
+ YQL_ENSURE(hoppingSettingValue->Child(0)->IsAtom());
+ hoppingColumn = hoppingSettingValue->Child(0)->Content();
+ }
+ }
+
TSet<TStringBuf> usedFields;
- // all actual (non-session) keys will be used
+ // all actual (non-session/non-hopping) keys will be used
for (const auto& key : aggr.Keys()) {
- if (key.Value() != sessionColumn) {
+ if (key.Value() != sessionColumn && key.Value() != hoppingColumn) {
usedFields.insert(key.Value());
}
}
@@ -736,10 +747,12 @@ TExprNode::TPtr ApplyExtractMembersToAggregate(const TExprNode::TPtr& node, cons
}
}
- auto settings = aggr.Settings();
- auto hoppingSetting = GetSetting(settings.Ref(), "hopping");
if (hoppingSetting) {
- auto traits = TCoHoppingTraits(hoppingSetting->Child(1));
+ auto traitsNode = hoppingSetting->ChildPtr(1);
+ if (traitsNode->IsList()) {
+ traitsNode = traitsNode->ChildPtr(1);
+ }
+ auto traits = TCoHoppingTraits(traitsNode);
auto timeExtractor = traits.TimeExtractor();
auto usedType = traits.ItemType().Ref().GetTypeAnn()->Cast<TTypeExprType>()->GetType()->Cast<TStructExprType>();
@@ -756,7 +769,6 @@ TExprNode::TPtr ApplyExtractMembersToAggregate(const TExprNode::TPtr& node, cons
if (sessionSetting) {
TCoSessionWindowTraits traits(sessionSetting->Child(1)->ChildPtr(1));
- // TODO: same should be done for hopping
auto usedType = traits.ListType().Ref().GetTypeAnn()->Cast<TTypeExprType>()->GetType()->Cast<TListExprType>()->
GetItemType()->Cast<TStructExprType>();
for (const auto& item : usedType->GetItems()) {
diff --git a/ydb/library/yql/core/common_opt/yql_co_flow2.cpp b/ydb/library/yql/core/common_opt/yql_co_flow2.cpp
index 8df3f4d297c..c5e95637c00 100644
--- a/ydb/library/yql/core/common_opt/yql_co_flow2.cpp
+++ b/ydb/library/yql/core/common_opt/yql_co_flow2.cpp
@@ -33,9 +33,19 @@ TExprNode::TPtr AggregateSubsetFieldsAnalyzer(const TCoAggregate& node, TExprCon
sessionColumn = sessionSetting->Child(1)->Child(0)->Content();
}
+ TMaybe<TStringBuf> hoppingColumn;
+ auto hoppingSetting = GetSetting(node.Settings().Ref(), "hopping");
+ if (hoppingSetting) {
+ auto traitsNode = hoppingSetting->ChildPtr(1);
+ if (traitsNode->IsList()) {
+ YQL_ENSURE(traitsNode->Child(0)->IsAtom());
+ hoppingColumn = traitsNode->Child(0)->Content();
+ }
+ }
+
TSet<TStringBuf> usedFields;
for (const auto& x : node.Keys()) {
- if (x.Value() != sessionColumn) {
+ if (x.Value() != sessionColumn && x.Value() != hoppingColumn) {
usedFields.insert(x.Value());
}
}
@@ -71,10 +81,13 @@ TExprNode::TPtr AggregateSubsetFieldsAnalyzer(const TCoAggregate& node, TExprCon
}
}
- auto settings = node.Settings();
- auto hoppingSetting = GetSetting(settings.Ref(), "hopping");
if (hoppingSetting) {
- auto traits = TCoHoppingTraits(hoppingSetting->Child(1));
+ auto traitsNode = hoppingSetting->ChildPtr(1);
+ if (traitsNode->IsList()) {
+ traitsNode = traitsNode->ChildPtr(1);
+ }
+ auto traits = TCoHoppingTraits(traitsNode);
+
auto timeExtractor = traits.TimeExtractor();
auto usedType = traits.ItemType().Ref().GetTypeAnn()->Cast<TTypeExprType>()->GetType()->Cast<TStructExprType>();
@@ -83,10 +96,9 @@ TExprNode::TPtr AggregateSubsetFieldsAnalyzer(const TCoAggregate& node, TExprCon
}
TSet<TStringBuf> lambdaSubset;
- if (!HaveFieldsSubset(timeExtractor.Body().Ptr(), *timeExtractor.Args().Arg(0).Raw(), lambdaSubset, parentsMap)) {
- return node.Ptr();
+ if (HaveFieldsSubset(timeExtractor.Body().Ptr(), *timeExtractor.Args().Arg(0).Raw(), lambdaSubset, parentsMap)) {
+ usedFields.insert(lambdaSubset.cbegin(), lambdaSubset.cend());
}
- usedFields.insert(lambdaSubset.cbegin(), lambdaSubset.cend());
if (usedFields.size() == structType->GetSize()) {
return node.Ptr();
diff --git a/ydb/library/yql/core/type_ann/type_ann_core.cpp b/ydb/library/yql/core/type_ann/type_ann_core.cpp
index f6383d58a78..9ab90ef4368 100644
--- a/ydb/library/yql/core/type_ann/type_ann_core.cpp
+++ b/ydb/library/yql/core/type_ann/type_ann_core.cpp
@@ -2237,10 +2237,10 @@ namespace NTypeAnnImpl {
bool first_signed = IsDataTypeSigned(first);
bool second_signed = IsDataTypeSigned(second);
if (first_width > second_width && !first_signed && second_signed ||
- first_width == second_width && first_signed != second_signed)
+ first_width == second_width && first_signed != second_signed)
{
auto issue = TIssue(
- ctx.Expr.GetPosition(input->Pos()),
+ ctx.Expr.GetPosition(input->Pos()),
TStringBuilder() << "Integral type implicit bitcast: " << *input->Head().GetTypeAnn() << " and " << *input->Tail().GetTypeAnn()
);
SetIssueCode(EYqlIssueCode::TIssuesIds_EIssueCode_CORE_IMPLICIT_BITCAST, issue);
@@ -2278,7 +2278,7 @@ namespace NTypeAnnImpl {
if (check_result != IGraphTransformer::TStatus::Ok) {
return check_result;
}
-
+
const bool isLeftNumeric = IsDataTypeNumeric(dataType[0]->GetSlot());
const bool isRightNumeric = IsDataTypeNumeric(dataType[1]->GetSlot());
// bool isOk = false;
@@ -2354,7 +2354,7 @@ namespace NTypeAnnImpl {
if (check_result != IGraphTransformer::TStatus::Ok) {
return check_result;
}
-
+
const bool isLeftNumeric = IsDataTypeNumeric(dataType[0]->GetSlot());
const bool isRightNumeric = IsDataTypeNumeric(dataType[1]->GetSlot());
// bool isOk = false;
@@ -2428,7 +2428,7 @@ namespace NTypeAnnImpl {
if (check_result != IGraphTransformer::TStatus::Ok) {
return check_result;
}
-
+
if (IsDataTypeNumeric(dataType[0]->GetSlot()) && IsDataTypeNumeric(dataType[1]->GetSlot())) {
auto commonTypeSlot = GetNumericDataTypeByLevel(Max(GetNumericDataTypeLevel(dataType[0]->GetSlot()),
GetNumericDataTypeLevel(dataType[1]->GetSlot())));
@@ -2484,7 +2484,7 @@ namespace NTypeAnnImpl {
haveOptional |= isOptional[i];
}
-
+
auto check_result = CheckIntegralsWidth(input, ctx, dataType[0]->GetSlot(), dataType[1]->GetSlot());
if (check_result != IGraphTransformer::TStatus::Ok) {
return check_result;
@@ -2547,7 +2547,7 @@ namespace NTypeAnnImpl {
if (check_result != IGraphTransformer::TStatus::Ok) {
return check_result;
}
-
+
if (IsDataTypeNumeric(dataType[0]->GetSlot()) && IsDataTypeNumeric(dataType[1]->GetSlot())) {
auto commonTypeSlot = GetNumericDataTypeByLevel(Max(GetNumericDataTypeLevel(dataType[0]->GetSlot()),
GetNumericDataTypeLevel(dataType[1]->GetSlot())));
@@ -8934,6 +8934,7 @@ template <NKikimr::NUdf::EDataSlot DataSlot>
return IGraphTransformer::TStatus::Error;
}
+ THashSet<TStringBuf> addedInProjectionFields;
TVector<const TItemExprType*> allItems;
for (auto& item : input->Child(1)->Children()) {
if (!item->IsCallable({"SqlProjectItem", "SqlProjectStarItem"})) {
@@ -8944,13 +8945,59 @@ template <NKikimr::NUdf::EDataSlot DataSlot>
if (item->IsCallable("SqlProjectStarItem")) {
auto& structItems = item->GetTypeAnn()->Cast<TStructExprType>()->GetItems();
- allItems.insert(allItems.end(), structItems.begin(), structItems.end());
+ for (const auto& item : structItems) {
+ allItems.emplace_back(item);
+ addedInProjectionFields.emplace(item->GetName());
+ }
} else {
YQL_ENSURE(item->Child(1)->IsAtom());
- allItems.push_back(ctx.Expr.MakeType<TItemExprType>(item->Child(1)->Content(), item->GetTypeAnn()));
+ const auto fieldName = item->Child(1)->Content();
+ allItems.push_back(ctx.Expr.MakeType<TItemExprType>(fieldName, item->GetTypeAnn()));
+ addedInProjectionFields.emplace(fieldName);
}
}
+ TVector<TStringBuf> transparentFields;
+ for (auto& inputItem : itemType->Cast<TStructExprType>()->GetItems()) {
+ if (!inputItem->GetName().StartsWith("_yql_sys_tsp_") ||
+ addedInProjectionFields.contains(inputItem->GetName()))
+ {
+ continue;
+ }
+
+ transparentFields.push_back(inputItem->GetName());
+ }
+
+ if (!transparentFields.empty()) {
+ TVector<TExprNode::TPtr> newProjectItems;
+ for (const auto& item : input->Child(1)->Children()) {
+ newProjectItems.push_back(item);
+ }
+
+ for (const auto& fieldName : transparentFields) {
+ auto lambdaArg = ctx.Expr.NewArgument(input->Pos(), "row");
+ auto lambdaBody = ctx.Expr.NewCallable(input->Pos(), "Member", {
+ lambdaArg,
+ ctx.Expr.NewAtom(input->Pos(), fieldName)
+ });
+ newProjectItems.push_back(ctx.Expr.NewCallable(
+ input->Pos(),
+ "SqlProjectItem",
+ {
+ ctx.Expr.NewCallable(input->Pos(), "TypeOf", {
+ input->Child(0)
+ }),
+ ctx.Expr.NewAtom(input->Pos(), fieldName),
+ ctx.Expr.NewLambda(
+ input->Pos(),
+ ctx.Expr.NewArguments(input->Pos(), {std::move(lambdaArg)}),
+ std::move(lambdaBody))
+ }));
+ }
+ output = ctx.Expr.ChangeChild(*input, 1, ctx.Expr.NewList(input->Pos(), std::move(newProjectItems)));
+ return IGraphTransformer::TStatus::Repeat;
+ }
+
auto resultStructType = ctx.Expr.MakeType<TStructExprType>(allItems);
if (!resultStructType->Validate(input->Pos(), ctx.Expr)) {
return IGraphTransformer::TStatus::Error;
diff --git a/ydb/library/yql/core/type_ann/type_ann_list.cpp b/ydb/library/yql/core/type_ann/type_ann_list.cpp
index d12f33f801c..6db84f4c786 100644
--- a/ydb/library/yql/core/type_ann/type_ann_list.cpp
+++ b/ydb/library/yql/core/type_ann/type_ann_list.cpp
@@ -4637,6 +4637,7 @@ namespace {
TVector<const TItemExprType*> rowColumns;
TMaybe<TStringBuf> sessionColumnName;
+ TMaybe<TStringBuf> hoppingColumnName;
for (const auto& setting : settings->Children()) {
if (!EnsureTupleMinSize(*setting, 1, ctx.Expr)) {
return IGraphTransformer::TStatus::Error;
@@ -4653,13 +4654,52 @@ namespace {
return IGraphTransformer::TStatus::Error;
}
- if (!setting->Child(1)->IsCallable("HoppingTraits")) {
+ auto value = setting->ChildPtr(1);
+ if (value->Type() == TExprNode::List) {
+ if (!EnsureTupleSize(*value, 2, ctx.Expr)) {
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ auto hoppingOutputColumn = value->Child(0);
+ if (!EnsureAtom(*hoppingOutputColumn, ctx.Expr)) {
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ if (hoppingOutputColumn->Content().Empty()) {
+ ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(hoppingOutputColumn->Pos()),
+ TStringBuilder() << "Hopping output column name can not be empty"));
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ hoppingColumnName = hoppingOutputColumn->Content();
+
+ auto traits = value->Child(1);
+ if (!traits->IsCallable("HoppingTraits")) {
+ ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(setting->Child(1)->Pos()),
+ TStringBuilder() << "Expected HoppingTraits callable"));
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ bool seenAsKeyColumn = AnyOf(input->Child(1)->ChildrenList(), [&](const auto& keyColum) {
+ return hoppingColumnName == keyColum->Content();
+ });
+
+ if (!seenAsKeyColumn) {
+ ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(setting->Child(1)->Pos()),
+ TStringBuilder() << "Hopping column " << *hoppingColumnName << " is not listed in key columns"));
+ return IGraphTransformer::TStatus::Error;
+ }
+ } else if (setting->Child(1)->IsCallable("HoppingTraits")) {
+ hoppingColumnName = "_yql_time"; // legacy hopping
+ } else {
ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(setting->Child(1)->Pos()),
TStringBuilder() << "Expected HoppingTraits callable"));
return IGraphTransformer::TStatus::Error;
}
- rowColumns.push_back(ctx.Expr.MakeType<TItemExprType>("_yql_time",
- ctx.Expr.MakeType<TOptionalExprType>(ctx.Expr.MakeType<TDataExprType>(EDataSlot::Timestamp))));
+
+ rowColumns.push_back(ctx.Expr.MakeType<TItemExprType>(
+ *hoppingColumnName,
+ ctx.Expr.MakeType<TOptionalExprType>(ctx.Expr.MakeType<TDataExprType>(EDataSlot::Timestamp))));
} else if (settingName == "session") {
if (!EnsureTupleSize(*setting, 2, ctx.Expr)) {
return IGraphTransformer::TStatus::Error;
@@ -4718,7 +4758,7 @@ namespace {
return IGraphTransformer::TStatus::Error;
}
- if (sessionColumnName == child->Content()) {
+ if (sessionColumnName == child->Content() || hoppingColumnName == child->Content()) {
continue;
}
@@ -5976,12 +6016,7 @@ namespace {
return IGraphTransformer::TStatus::Repeat;
}
- const TTypeAnnotationNode* timeType = ctx.Expr.MakeType<TDataExprType>(EDataSlot::Timestamp);
-
- if (!IsSameAnnotation(*RemoveOptionalType(lambdaTimeExtractor->GetTypeAnn()), *timeType)) {
- ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(lambdaTimeExtractor->Pos()), TStringBuilder()
- << "Mismatch hopping window time extractor lambda output type, expected: "
- << *timeType << ", but got: " << *lambdaTimeExtractor->GetTypeAnn()));
+ if (!EnsureSpecificDataType(*lambdaTimeExtractor, EDataSlot::Timestamp, ctx.Expr, true)) {
return IGraphTransformer::TStatus::Error;
}
diff --git a/ydb/library/yql/providers/dq/opt/logical_optimize.cpp b/ydb/library/yql/providers/dq/opt/logical_optimize.cpp
index c60a2290c5d..ad857041be8 100644
--- a/ydb/library/yql/providers/dq/opt/logical_optimize.cpp
+++ b/ydb/library/yql/providers/dq/opt/logical_optimize.cpp
@@ -17,7 +17,6 @@ using namespace NYql::NNodes;
namespace {
-constexpr TStringBuf YQL_TIME = "_yql_time";
TString BuildColumnName(const TExprBase column) {
if (const auto columnName = column.Maybe<TCoAtom>()) {
@@ -37,6 +36,10 @@ TString BuildColumnName(const TExprBase column) {
<< column.Ptr()->Dump());
}
+bool IsLegacyHopping(const TExprNode::TPtr& hoppingSetting) {
+ return !hoppingSetting->Child(1)->IsList();
+}
+
}
class TDqsLogicalOptProposalTransformer : public TOptimizeTransformerBase {
@@ -81,7 +84,7 @@ protected:
auto hopSetting = GetSetting(aggregate.Settings().Ref(), "hopping");
if (input) {
if (hopSetting) {
- return RewriteAsHoppingWindow(node, ctx, input.Cast()).Cast();
+ return RewriteAsHoppingWindow(node, ctx, input.Cast());
} else {
return DqRewriteAggregate(node, ctx, TypesCtx, true);
}
@@ -141,8 +144,14 @@ private:
EnsureNotDistinct(aggregate);
+ const auto maybeHopTraits = ExtractHopTraits(aggregate, ctx);
+ if (!maybeHopTraits) {
+ return nullptr;
+ }
+ const auto [hoppingColumn, hopTraits] = *maybeHopTraits;
+
const auto aggregateInputType = GetSeqItemType(node.Ptr()->Head().GetTypeAnn())->Cast<TStructExprType>();
- TKeysDescription keysDescription(*aggregateInputType, aggregate.Keys());
+ TKeysDescription keysDescription(*aggregateInputType, aggregate.Keys(), hoppingColumn);
if (keysDescription.NeedPickle()) {
return Build<TCoMap>(ctx, pos)
@@ -157,20 +166,14 @@ private:
.Done();
}
- const auto maybeHopTraits = ExtractHopTraits(aggregate, ctx);
- if (!maybeHopTraits) {
- return nullptr;
- }
- const auto hopTraits = maybeHopTraits.Cast();
-
- const auto keyLambda = BuildKeySelector(pos, *aggregateInputType, aggregate.Keys().Ptr(), ctx);
+ const auto keyLambda = keysDescription.GetKeySelector(ctx, pos, aggregateInputType);
const auto timeExtractorLambda = BuildTimeExtractor(hopTraits, ctx);
const auto initLambda = BuildInitHopLambda(aggregate, ctx);
const auto updateLambda = BuildUpdateHopLambda(aggregate, ctx);
const auto saveLambda = BuildSaveHopLambda(aggregate, ctx);
const auto loadLambda = BuildLoadHopLambda(aggregate, ctx);
const auto mergeLambda = BuildMergeHopLambda(aggregate, ctx);
- const auto finishLambda = BuildFinishHopLambda(aggregate, ctx);
+ const auto finishLambda = BuildFinishHopLambda(aggregate, keysDescription.GetActualGroupKeys(), hoppingColumn, ctx);
const auto watermarkMode = BuildWatermarkMode(aggregate, ctx);
if (!watermarkMode) {
return nullptr;
@@ -215,7 +218,7 @@ private:
.Done();
} else {
auto wrappedInput = input.Ptr();
- if (!aggregate.Keys().Empty()) {
+ if (!keysDescription.MemberKeys.empty()) {
// Shuffle input connection by keys
wrappedInput = WrapToShuffle(keysDescription, aggregate, input, ctx);
if (!wrappedInput) {
@@ -250,19 +253,27 @@ private:
}
}
- TMaybeNode<TCoHoppingTraits> ExtractHopTraits(const TCoAggregate& aggregate, TExprContext& ctx) {
+ TMaybe<std::pair<TString, TCoHoppingTraits>> ExtractHopTraits(const TCoAggregate& aggregate, TExprContext& ctx) {
const auto pos = aggregate.Pos();
const auto hopSetting = GetSetting(aggregate.Settings().Ref(), "hopping");
if (!hopSetting) {
ctx.AddError(TIssue(ctx.GetPosition(pos), "Aggregate over stream must have 'hopping' setting"));
- return TMaybeNode<TCoHoppingTraits>();
+ return Nothing();
}
- const auto maybeTraits = TMaybeNode<TCoHoppingTraits>(hopSetting->Child(1));
+ const auto hoppingColumn = IsLegacyHopping(hopSetting)
+ ? "_yql_time"
+ : TString(hopSetting->Child(1)->Child(0)->Content());
+
+ const auto traitsNode = IsLegacyHopping(hopSetting)
+ ? hopSetting->Child(1)
+ : hopSetting->Child(1)->Child(1);
+
+ const auto maybeTraits = TMaybeNode<TCoHoppingTraits>(traitsNode);
if (!maybeTraits) {
ctx.AddError(TIssue(ctx.GetPosition(pos), "Invalid 'hopping' setting in Aggregate"));
- return TMaybeNode<TCoHoppingTraits>();
+ return Nothing();
}
const auto traits = maybeTraits.Cast();
@@ -285,40 +296,48 @@ private:
const auto hop = checkIntervalParam(traits.Hop());
if (!hop) {
- return TMaybeNode<TCoHoppingTraits>();
+ return Nothing();
}
const auto interval = checkIntervalParam(traits.Interval());
if (!interval) {
- return TMaybeNode<TCoHoppingTraits>();
+ return Nothing();
}
const auto delay = checkIntervalParam(traits.Delay());
if (!delay) {
- return TMaybeNode<TCoHoppingTraits>();
+ return Nothing();
}
if (interval < hop) {
ctx.AddError(TIssue(ctx.GetPosition(pos), "Interval must be greater or equal then hop"));
- return TMaybeNode<TCoHoppingTraits>();
+ return Nothing();
}
if (delay < hop) {
ctx.AddError(TIssue(ctx.GetPosition(pos), "Delay must be greater or equal then hop"));
- return TMaybeNode<TCoHoppingTraits>();
+ return Nothing();
}
- return Build<TCoHoppingTraits>(ctx, aggregate.Pos())
+ const auto newTraits = Build<TCoHoppingTraits>(ctx, aggregate.Pos())
.InitFrom(traits)
.DataWatermarks(Config->AnalyticsHopping.Get().GetOrElse(false)
? ctx.NewAtom(aggregate.Pos(), "false")
: traits.DataWatermarks().Ptr())
.Done();
+
+ return std::make_pair(hoppingColumn, newTraits);
}
struct TKeysDescription {
TVector<TString> PickleKeys;
TVector<TString> MemberKeys;
+ TVector<TString> FakeKeys;
- explicit TKeysDescription(const TStructExprType& rowType, const TCoAtomList& keys) {
+ explicit TKeysDescription(const TStructExprType& rowType, const TCoAtomList& keys, const TString& hoppingColumn) {
for (const auto& key : keys) {
+ if (key.StringValue() == hoppingColumn) {
+ FakeKeys.emplace_back(key.StringValue());
+ continue;
+ }
+
const auto index = rowType.FindItem(key.StringValue());
Y_ENSURE(index);
@@ -405,9 +424,25 @@ private:
return res;
}
+ TVector<TString> GetActualGroupKeys() {
+ TVector<TString> result;
+ result.reserve(PickleKeys.size() + MemberKeys.size());
+ result.insert(result.end(), PickleKeys.begin(), PickleKeys.end());
+ result.insert(result.end(), MemberKeys.begin(), MemberKeys.end());
+ return result;
+ }
+
bool NeedPickle() const {
return !PickleKeys.empty();
}
+
+ TExprNode::TPtr GetKeySelector(TExprContext& ctx, TPositionHandle pos, const TStructExprType* rowType) {
+ auto builder = Build<TCoAtomList>(ctx, pos);
+ for (auto key : GetKeysList(ctx, pos)) {
+ builder.Add(std::move(key));
+ }
+ return BuildKeySelector(pos, *rowType, builder.Build().Value().Ptr(), ctx);
+ }
};
TExprNode::TPtr WrapToShuffle(
@@ -622,9 +657,13 @@ private:
.Ptr();
}
- TExprNode::TPtr BuildFinishHopLambda(const TCoAggregate& aggregate, TExprContext& ctx) {
+ TExprNode::TPtr BuildFinishHopLambda(
+ const TCoAggregate& aggregate,
+ const TVector<TString>& actualGroupKeys,
+ const TString& hoppingColumn,
+ TExprContext& ctx)
+ {
const auto pos = aggregate.Pos();
- const auto keyColumns = aggregate.Keys();
const auto aggregateHandlers = aggregate.Handlers();
const auto finishKeyArg = Build<TCoArgument>(ctx, pos).Name("key").Done();
@@ -632,17 +671,17 @@ private:
const auto finishTimeArg = Build<TCoArgument>(ctx, pos).Name("time").Done();
TVector<TExprBase> structItems;
- structItems.reserve(keyColumns.Size() + aggregateHandlers.Size() + 1);
+ structItems.reserve(actualGroupKeys.size() + aggregateHandlers.Size() + 1);
- if (keyColumns.Size() == 1) {
+ if (actualGroupKeys.size() == 1) {
structItems.push_back(Build<TCoNameValueTuple>(ctx, pos)
- .Name(keyColumns.Item(0))
+ .Name().Build(actualGroupKeys[0])
.Value(finishKeyArg)
.Done());
} else {
- for (size_t i = 0; i < keyColumns.Size(); ++i) {
+ for (size_t i = 0; i < actualGroupKeys.size(); ++i) {
structItems.push_back(Build<TCoNameValueTuple>(ctx, pos)
- .Name(keyColumns.Item(i))
+ .Name().Build(actualGroupKeys[i])
.Value<TCoNth>()
.Tuple(finishKeyArg)
.Index<TCoAtom>()
@@ -701,7 +740,7 @@ private:
}
structItems.push_back(Build<TCoNameValueTuple>(ctx, pos)
- .Name().Build(YQL_TIME)
+ .Name().Build(hoppingColumn)
.Value(finishTimeArg)
.Done());
diff --git a/ydb/library/yql/providers/pq/async_io/dq_pq_meta_extractor.cpp b/ydb/library/yql/providers/pq/async_io/dq_pq_meta_extractor.cpp
index c7507a91e01..52a9ab5d34f 100644
--- a/ydb/library/yql/providers/pq/async_io/dq_pq_meta_extractor.cpp
+++ b/ydb/library/yql/providers/pq/async_io/dq_pq_meta_extractor.cpp
@@ -23,7 +23,7 @@ namespace {
}
},
{
- "_yql_sys_write_time", [](const NYdb::NPersQueue::TReadSessionEvent::TDataReceivedEvent::TMessage& message){
+ "_yql_sys_tsp_write_time", [](const NYdb::NPersQueue::TReadSessionEvent::TDataReceivedEvent::TMessage& message){
using TDataType = NYql::NUdf::TDataType<NYql::NUdf::TTimestamp>;
return std::make_pair(
NYql::NUdf::TUnboxedValuePod(static_cast<TDataType::TLayout>(message.GetWriteTime().MicroSeconds())),
diff --git a/ydb/library/yql/providers/pq/common/pq_meta_fields.cpp b/ydb/library/yql/providers/pq/common/pq_meta_fields.cpp
index 2c9e94faacb..173cee83d45 100644
--- a/ydb/library/yql/providers/pq/common/pq_meta_fields.cpp
+++ b/ydb/library/yql/providers/pq/common/pq_meta_fields.cpp
@@ -6,7 +6,7 @@
namespace {
const std::vector<NYql::TMetaFieldDescriptor> PqMetaFields = {
NYql::TMetaFieldDescriptor("create_time", "_yql_sys_create_time", NYql::NUdf::EDataSlot::Timestamp),
- NYql::TMetaFieldDescriptor("write_time", "_yql_sys_write_time", NYql::NUdf::EDataSlot::Timestamp),
+ NYql::TMetaFieldDescriptor("write_time", "_yql_sys_tsp_write_time", NYql::NUdf::EDataSlot::Timestamp),
NYql::TMetaFieldDescriptor("partition_id", "_yql_sys_partition_id", NYql::NUdf::EDataSlot::Uint64),
NYql::TMetaFieldDescriptor("offset", "_yql_sys_offset", NYql::NUdf::EDataSlot::Uint64),
NYql::TMetaFieldDescriptor("message_group_id", "_yql_sys_message_group_id", NYql::NUdf::EDataSlot::String),
diff --git a/ydb/library/yql/providers/pq/provider/yql_pq_datasink.cpp b/ydb/library/yql/providers/pq/provider/yql_pq_datasink.cpp
index 380df33d529..090162046af 100644
--- a/ydb/library/yql/providers/pq/provider/yql_pq_datasink.cpp
+++ b/ydb/library/yql/providers/pq/provider/yql_pq_datasink.cpp
@@ -126,7 +126,9 @@ public:
.World(write.World())
.DataSink(write.DataSink().Ptr())
.Topic(topicNode)
- .Input(node->Child(3))
+ .Input<TCoRemoveSystemMembers>()
+ .Input(node->Child(3))
+ .Build()
.Mode(settings.Mode.Cast())
.Settings(settings.Other)
.Done().Ptr();
diff --git a/ydb/library/yql/providers/pq/provider/yql_pq_datasource.cpp b/ydb/library/yql/providers/pq/provider/yql_pq_datasource.cpp
index ec7623ab155..582456cc1a5 100644
--- a/ydb/library/yql/providers/pq/provider/yql_pq_datasource.cpp
+++ b/ydb/library/yql/providers/pq/provider/yql_pq_datasource.cpp
@@ -120,11 +120,16 @@ public:
.Metadata().Add(sourceMetadata).Build()
.Done();
+ auto format = topicKeyParser.GetFormat();
+ if (format.Empty()) {
+ format = "raw";
+ }
+
auto builder = Build<TPqReadTopic>(ctx, read.Pos())
.World(read.World())
.DataSource(read.DataSource())
.Topic(std::move(topicNode))
- .Format().Value(topicKeyParser.GetFormat()).Build()
+ .Format().Value(format).Build()
.Compression().Value(topicKeyParser.GetCompression()).Build();
if (topicKeyParser.GetColumnOrder()) {
diff --git a/ydb/library/yql/providers/pq/provider/yql_pq_datasource_type_ann.cpp b/ydb/library/yql/providers/pq/provider/yql_pq_datasource_type_ann.cpp
index 8fa309a590d..639dcf3024e 100644
--- a/ydb/library/yql/providers/pq/provider/yql_pq_datasource_type_ann.cpp
+++ b/ydb/library/yql/providers/pq/provider/yql_pq_datasource_type_ann.cpp
@@ -215,36 +215,36 @@ public:
if (row->GetTypeAnn()->GetKind() == ETypeAnnotationKind::Struct) {
auto structType = row->GetTypeAnn()->Cast<TStructExprType>();
- if (auto pos = structType->FindItem(descriptor->SysColumn)) {
- bool isOptional = false;
- const TDataExprType* dataType = nullptr;
- if (!EnsureDataOrOptionalOfData(row->Pos(), structType->GetItems()[*pos]->GetItemType(), isOptional, dataType, ctx)) {
- return IGraphTransformer::TStatus::Error;
- }
+ auto pos = structType->FindItem(descriptor->SysColumn);
+ if (!pos && descriptor->Key == "write_time") {
+ // Allow user to specify column manually. It is required for analytics hopping now.
+ // In future it will be removed (when custom event_time will be implemented)
+ pos = structType->FindItem("_row_time");
+ }
- if (!EnsureSpecificDataType(row->Pos(), *dataType, descriptor->Type, ctx)) {
- return IGraphTransformer::TStatus::Error;
- }
+ if (!pos) {
+ ctx.AddError(TIssue(ctx.GetPosition(input->Pos()), TStringBuilder() << "Wrong place to use SystemMetadata"));
+ return IGraphTransformer::TStatus::Error;
+ }
- output = ctx.Builder(input->Pos())
- .Callable("Member")
- .Add(0, row)
- .Atom(1, descriptor->SysColumn, TNodeFlags::Default)
- .Seal()
- .Build();
-
- if (isOptional) {
- output = ctx.Builder(input->Pos())
- .Callable("Coalesce")
- .Add(0, output)
- .Callable(1, "String")
- .Atom(0, "0", TNodeFlags::Default)
- .Seal()
- .Seal()
- .Build();
- }
- return IGraphTransformer::TStatus::Repeat;
+ bool isOptional = false;
+ const TDataExprType* dataType = nullptr;
+ if (!EnsureDataOrOptionalOfData(row->Pos(), structType->GetItems()[*pos]->GetItemType(), isOptional, dataType, ctx)) {
+ return IGraphTransformer::TStatus::Error;
}
+
+ if (!EnsureSpecificDataType(row->Pos(), *dataType, descriptor->Type, ctx)) {
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ output = ctx.Builder(input->Pos())
+ .Callable("Member")
+ .Add(0, row)
+ .Atom(1, structType->GetItems()[*pos]->GetName(), TNodeFlags::Default)
+ .Seal()
+ .Build();
+
+ return IGraphTransformer::TStatus::Repeat;
}
input->SetTypeAnn(ctx.MakeType<TDataExprType>(descriptor->Type));
diff --git a/ydb/library/yql/providers/solomon/provider/yql_solomon_datasink.cpp b/ydb/library/yql/providers/solomon/provider/yql_solomon_datasink.cpp
index b09fd381a07..b8e952c920b 100644
--- a/ydb/library/yql/providers/solomon/provider/yql_solomon_datasink.cpp
+++ b/ydb/library/yql/providers/solomon/provider/yql_solomon_datasink.cpp
@@ -137,7 +137,9 @@ public:
.World(write.World())
.DataSink(write.DataSink())
.Shard<TCoAtom>().Value(tablePath->Content()).Build()
- .Input(write.Arg(3))
+ .Input<TCoRemoveSystemMembers>()
+ .Input(write.Arg(3))
+ .Build()
.Done().Ptr();
}
diff --git a/ydb/library/yql/sql/v1/builtin.cpp b/ydb/library/yql/sql/v1/builtin.cpp
index 5ee117730cf..c3bf4a7c780 100644
--- a/ydb/library/yql/sql/v1/builtin.cpp
+++ b/ydb/library/yql/sql/v1/builtin.cpp
@@ -2067,6 +2067,84 @@ private:
TNodePtr Node;
};
+THoppingWindow::THoppingWindow(TPosition pos, const TVector<TNodePtr>& args)
+ : INode(pos)
+ , Args(args)
+ , FakeSource(BuildFakeSource(pos))
+ , Valid(false)
+{}
+
+void THoppingWindow::MarkValid() {
+ YQL_ENSURE(!HasState(ENodeState::Initialized));
+ Valid = true;
+}
+
+TNodePtr THoppingWindow::BuildTraits(const TString& label) const {
+ YQL_ENSURE(HasState(ENodeState::Initialized));
+
+ return Y(
+ "HoppingTraits",
+ Y("ListItemType", Y("TypeOf", label)),
+ BuildLambda(Pos, Y("row"), Y("Just", Y("SystemMetadata", Y("String", Q("write_time")), Y("DependsOn", "row")))),
+ Hop,
+ Interval,
+ Interval,
+ Q("true"));
+}
+
+bool THoppingWindow::DoInit(TContext& ctx, ISource* src) {
+ if (!src || src->IsFake()) {
+ ctx.Error(Pos) << "HoppingWindow requires data source";
+ return false;
+ }
+
+ if (!(Args.size() == 2)) {
+ ctx.Error(Pos) << "HoppingWindow requires two arguments";
+ return false;
+ }
+
+ if (!Valid) {
+ ctx.Error(Pos) << "HoppingWindow can only be used as a top-level GROUP BY expression";
+ return false;
+ }
+
+ auto hopExpr = Args[0];
+ auto intervalExpr = Args[1];
+ if (!(hopExpr->Init(ctx, FakeSource.Get()) && intervalExpr->Init(ctx, FakeSource.Get()))) {
+ return false;
+ }
+
+ Hop = ProcessIntervalParam(hopExpr);
+ Interval = ProcessIntervalParam(intervalExpr);
+
+ return true;
+}
+
+TAstNode* THoppingWindow::Translate(TContext&) const {
+ YQL_ENSURE(false, "Translate is called for HoppingWindow");
+ return nullptr;
+}
+
+void THoppingWindow::DoUpdateState() const {
+ State.Set(ENodeState::Const, false);
+}
+
+TNodePtr THoppingWindow::DoClone() const {
+ return new THoppingWindow(Pos, CloneContainer(Args));
+}
+
+TString THoppingWindow::GetOpName() const {
+ return "HoppingWindow";
+}
+
+TNodePtr THoppingWindow::ProcessIntervalParam(const TNodePtr& node) const {
+ auto literal = node->GetLiteral("String");
+ if (!literal) {
+ return Y("EvaluateExpr", node);
+ }
+
+ return new TYqlData(node->GetPos(), "Interval", {node});
+}
TNodePtr BuildUdfUserTypeArg(TPosition pos, const TVector<TNodePtr>& args, TNodePtr customUserType) {
TVector<TNodePtr> argsTypeItems;
@@ -2414,22 +2492,31 @@ private:
bool DoInit(TContext& ctx, ISource* src) override {
Y_UNUSED(ctx);
- auto window = src->GetHoppingWindowSpec();
- if (!window) {
+ auto legacySpec = src->GetLegacyHoppingWindowSpec();
+ auto spec = src->GetHoppingWindowSpec();
+ if (!legacySpec && !spec) {
ctx.Error(Pos) << "No hopping window parameters in aggregation";
return false;
}
Nodes.clear();
+ const auto fieldName = legacySpec
+ ? "_yql_time"
+ : spec->GetLabel();
+
+ const auto interval = legacySpec
+ ? legacySpec->Interval
+ : dynamic_cast<THoppingWindow*>(spec.Get())->Interval;
+
if (!IsStart) {
- Add("Member", "row", Q("_yql_time"));
+ Add("Member", "row", Q(fieldName));
return true;
}
Add("Sub",
- Y("Member", "row", Q("_yql_time")),
- window->Interval);
+ Y("Member", "row", Q(fieldName)),
+ interval);
return true;
}
@@ -2930,6 +3017,9 @@ struct TBuiltinFuncData {
{"sessionstart", BuildSimpleBuiltinFactoryCallback<TSessionStart<true>>()},
{"sessionstate", BuildSimpleBuiltinFactoryCallback<TSessionStart<false>>()},
+ // New hopping
+ {"hoppingwindow", BuildSimpleBuiltinFactoryCallback<THoppingWindow>()},
+
// Hopping intervals time functions
{"hopstart", BuildSimpleBuiltinFactoryCallback<THoppingTime<true>>()},
{"hopend", BuildSimpleBuiltinFactoryCallback<THoppingTime<false>>()},
diff --git a/ydb/library/yql/sql/v1/node.cpp b/ydb/library/yql/sql/v1/node.cpp
index 8a1e56b4c94..c8ad9f85671 100644
--- a/ydb/library/yql/sql/v1/node.cpp
+++ b/ydb/library/yql/sql/v1/node.cpp
@@ -1002,8 +1002,8 @@ TWindowSpecificationPtr TWindowSpecification::Clone() const {
return res;
}
-THoppingWindowSpecPtr THoppingWindowSpec::Clone() const {
- auto res = MakeIntrusive<THoppingWindowSpec>();
+TLegacyHoppingWindowSpecPtr TLegacyHoppingWindowSpec::Clone() const {
+ auto res = MakeIntrusive<TLegacyHoppingWindowSpec>();
res->TimeExtractor = TimeExtractor->Clone();
res->Hop = Hop->Clone();
res->Interval = Interval->Clone();
@@ -1423,6 +1423,14 @@ bool ISource::AddExpressions(TContext& ctx, const TVector<TNodePtr>& expressions
}
SessionWindow = expr;
}
+ if (auto hoppingWindow = dynamic_cast<THoppingWindow*>(expr.Get())) {
+ if (HoppingWindow) {
+ ctx.Error(expr->GetPos()) << "Duplicate hopping window specification:";
+ ctx.Error(HoppingWindow->GetPos()) << "Previous hopping window is declared here";
+ return false;
+ }
+ HoppingWindow = expr;
+ }
}
Expressions(exprSeat).emplace_back(expr);
}
@@ -1481,18 +1489,22 @@ const TVector<TString>& ISource::GetTmpWindowColumns() const {
return TmpWindowColumns;
}
-void ISource::SetHoppingWindowSpec(THoppingWindowSpecPtr spec) {
- HoppingWindowSpec = spec;
+void ISource::SetLegacyHoppingWindowSpec(TLegacyHoppingWindowSpecPtr spec) {
+ LegacyHoppingWindowSpec = spec;
}
-THoppingWindowSpecPtr ISource::GetHoppingWindowSpec() const {
- return HoppingWindowSpec;
+TLegacyHoppingWindowSpecPtr ISource::GetLegacyHoppingWindowSpec() const {
+ return LegacyHoppingWindowSpec;
}
TNodePtr ISource::GetSessionWindowSpec() const {
return SessionWindow;
}
+TNodePtr ISource::GetHoppingWindowSpec() const {
+ return HoppingWindow;
+}
+
TWindowSpecificationPtr ISource::FindWindowSpecification(TContext& ctx, const TString& windowName) const {
auto winIter = WinSpecs.find(windowName);
if (winIter == WinSpecs.end()) {
@@ -1708,6 +1720,9 @@ TNodePtr BuildLambdaBodyForExprAliases(TPosition pos, const TVector<TNodePtr>& e
if (dynamic_cast<const TSessionWindow*>(exprNode.Get())) {
continue;
}
+ if (dynamic_cast<const THoppingWindow*>(exprNode.Get())) {
+ continue;
+ }
structObj = structObj->Y("AddMember", structObj, structObj->Q(name), exprNode);
}
return structObj->Y("AsList", structObj);
@@ -1775,7 +1790,7 @@ bool ISource::SetSamplingRate(TContext& ctx, TNodePtr samplingRate) {
}
std::pair<TNodePtr, bool> ISource::BuildAggregation(const TString& label, TContext& ctx) {
- if (GroupKeys.empty() && Aggregations.empty() && !IsCompositeSource() && !HoppingWindowSpec) {
+ if (GroupKeys.empty() && Aggregations.empty() && !IsCompositeSource() && !LegacyHoppingWindowSpec) {
return { nullptr, true };
}
@@ -1819,15 +1834,15 @@ std::pair<TNodePtr, bool> ISource::BuildAggregation(const TString& label, TConte
options = L(options, Q(Y(Q("compact"))));
}
- if (HoppingWindowSpec) {
+ if (LegacyHoppingWindowSpec) {
auto hoppingTraits = Y(
"HoppingTraits",
Y("ListItemType", listType),
- BuildLambda(Pos, Y("row"), HoppingWindowSpec->TimeExtractor),
- HoppingWindowSpec->Hop,
- HoppingWindowSpec->Interval,
- HoppingWindowSpec->Delay,
- HoppingWindowSpec->DataWatermarks ? Q("true") : Q("false"));
+ BuildLambda(Pos, Y("row"), LegacyHoppingWindowSpec->TimeExtractor),
+ LegacyHoppingWindowSpec->Hop,
+ LegacyHoppingWindowSpec->Interval,
+ LegacyHoppingWindowSpec->Delay,
+ LegacyHoppingWindowSpec->DataWatermarks ? Q("true") : Q("false"));
options = L(options, Q(Y(Q("hopping"), hoppingTraits)));
}
@@ -1840,6 +1855,14 @@ std::pair<TNodePtr, bool> ISource::BuildAggregation(const TString& label, TConte
Q(Y(BuildQuotedAtom(Pos, SessionWindow->GetLabel()), sessionWindow->BuildTraits(label))))));
}
+ if (HoppingWindow) {
+ YQL_ENSURE(HoppingWindow->GetLabel());
+ auto hoppingWindow = dynamic_cast<THoppingWindow*>(HoppingWindow.Get());
+ YQL_ENSURE(hoppingWindow);
+ options = L(options, Q(Y(Q("hopping"),
+ Q(Y(BuildQuotedAtom(Pos, HoppingWindow->GetLabel()), hoppingWindow->BuildTraits(label))))));
+ }
+
return { Y("AssumeColumnOrderPartial", Y("Aggregate" + GroupBySuffix, label, Q(keysTuple), Q(aggrArgs), Q(options)), Q(keysTuple)), true };
}
diff --git a/ydb/library/yql/sql/v1/node.h b/ydb/library/yql/sql/v1/node.h
index 6cdc0098ecc..7028f0bbaaa 100644
--- a/ydb/library/yql/sql/v1/node.h
+++ b/ydb/library/yql/sql/v1/node.h
@@ -606,17 +606,17 @@ namespace NSQLTranslationV1 {
};
typedef TIntrusivePtr<TFrameSpecification> TFrameSpecificationPtr;
- struct THoppingWindowSpec: public TSimpleRefCount<THoppingWindowSpec> {
+ struct TLegacyHoppingWindowSpec: public TSimpleRefCount<TLegacyHoppingWindowSpec> {
TNodePtr TimeExtractor;
TNodePtr Hop;
TNodePtr Interval;
TNodePtr Delay;
bool DataWatermarks;
- TIntrusivePtr<THoppingWindowSpec> Clone() const;
- ~THoppingWindowSpec() {}
+ TIntrusivePtr<TLegacyHoppingWindowSpec> Clone() const;
+ ~TLegacyHoppingWindowSpec() {}
};
- typedef TIntrusivePtr<THoppingWindowSpec> THoppingWindowSpecPtr;
+ typedef TIntrusivePtr<TLegacyHoppingWindowSpec> TLegacyHoppingWindowSpecPtr;
struct TWindowSpecification: public TSimpleRefCount<TWindowSpecification> {
TMaybe<TString> ExistingWindowName;
@@ -838,9 +838,10 @@ namespace NSQLTranslationV1 {
virtual void AddWindowSpecs(TWinSpecs winSpecs);
virtual bool AddAggregationOverWindow(TContext& ctx, const TString& windowName, TAggregationPtr func);
virtual bool AddFuncOverWindow(TContext& ctx, const TString& windowName, TNodePtr func);
- virtual void SetHoppingWindowSpec(THoppingWindowSpecPtr spec);
- virtual THoppingWindowSpecPtr GetHoppingWindowSpec() const;
+ virtual void SetLegacyHoppingWindowSpec(TLegacyHoppingWindowSpecPtr spec);
+ virtual TLegacyHoppingWindowSpecPtr GetLegacyHoppingWindowSpec() const;
virtual TNodePtr GetSessionWindowSpec() const;
+ virtual TNodePtr GetHoppingWindowSpec() const;
virtual bool IsCompositeSource() const;
virtual bool IsGroupByColumn(const TString& column) const;
virtual bool IsFlattenByColumns() const;
@@ -916,8 +917,9 @@ namespace NSQLTranslationV1 {
TMap<TString, TVector<TAggregationPtr>> AggregationOverWindow;
TMap<TString, TVector<TNodePtr>> FuncOverWindow;
TWinSpecs WinSpecs;
- THoppingWindowSpecPtr HoppingWindowSpec;
+ TLegacyHoppingWindowSpecPtr LegacyHoppingWindowSpec;
TNodePtr SessionWindow;
+ TNodePtr HoppingWindow;
TVector<ISource*> UsedSources;
TString FlattenMode;
bool FlattenColumns = false;
@@ -1049,6 +1051,28 @@ namespace NSQLTranslationV1 {
bool Valid;
};
+ class THoppingWindow final : public INode {
+ public:
+ THoppingWindow(TPosition pos, const TVector<TNodePtr>& args);
+ void MarkValid();
+ TNodePtr BuildTraits(const TString& label) const;
+ public:
+ TNodePtr Hop;
+ TNodePtr Interval;
+ private:
+ bool DoInit(TContext& ctx, ISource* src) override;
+ TAstNode* Translate(TContext&) const override;
+ void DoUpdateState() const override;
+ TNodePtr DoClone() const override;
+ TString GetOpName() const override;
+ TNodePtr ProcessIntervalParam(const TNodePtr& val) const;
+
+ TVector<TNodePtr> Args;
+ TSourcePtr FakeSource;
+ TNodePtr Node;
+ bool Valid;
+ };
+
struct TStringContent {
TString Content;
NYql::NUdf::EDataSlot Type = NYql::NUdf::EDataSlot::String;
@@ -1296,7 +1320,7 @@ namespace NSQLTranslationV1 {
const TVector<TSortSpecificationPtr>& orderBy,
TNodePtr having,
TWinSpecs&& windowSpec,
- THoppingWindowSpecPtr hoppingWindowSpec,
+ TLegacyHoppingWindowSpecPtr legacyHoppingWindowSpec,
TVector<TNodePtr>&& terms,
bool distinct,
TVector<TNodePtr>&& without,
diff --git a/ydb/library/yql/sql/v1/select.cpp b/ydb/library/yql/sql/v1/select.cpp
index ac37571185c..77faabd4236 100644
--- a/ydb/library/yql/sql/v1/select.cpp
+++ b/ydb/library/yql/sql/v1/select.cpp
@@ -1409,7 +1409,7 @@ public:
const TVector<TSortSpecificationPtr>& orderBy,
TNodePtr having,
TWinSpecs& winSpecs,
- THoppingWindowSpecPtr hoppingWindowSpec,
+ TLegacyHoppingWindowSpecPtr legacyHoppingWindowSpec,
const TVector<TNodePtr>& terms,
bool distinct,
const TVector<TNodePtr>& without,
@@ -1429,7 +1429,7 @@ public:
, Terms(terms)
, Without(without)
, Distinct(distinct)
- , HoppingWindowSpec(hoppingWindowSpec)
+ , LegacyHoppingWindowSpec(legacyHoppingWindowSpec)
, SelectStream(selectStream)
, Settings(settings)
{
@@ -1495,6 +1495,10 @@ public:
sessionWindow->MarkValid();
}
+ if (auto hoppingWindow = dynamic_cast<THoppingWindow*>(expr.Get())) {
+ hoppingWindow->MarkValid();
+ }
+
// need to collect and Init() preaggregated exprs before calling Init() on GROUP BY expression
TVector<TNodePtr> distinctAggrsInGroupBy;
expr->CollectPreaggregateExprs(ctx, *src, distinctAggrsInGroupBy);
@@ -1773,7 +1777,7 @@ public:
}
return new TSelectCore(Pos, Source->CloneSource(), CloneContainer(GroupByExpr),
CloneContainer(GroupBy), CompactGroupBy, GroupBySuffix, AssumeSorted, CloneContainer(OrderBy),
- SafeClone(Having), newSpecs, SafeClone(HoppingWindowSpec),
+ SafeClone(Having), newSpecs, SafeClone(LegacyHoppingWindowSpec),
CloneContainer(Terms), Distinct, Without, SelectStream, Settings);
}
@@ -1808,11 +1812,11 @@ private:
}
}
- if (HoppingWindowSpec) {
- if (!HoppingWindowSpec->TimeExtractor->Init(ctx, src)) {
+ if (LegacyHoppingWindowSpec) {
+ if (!LegacyHoppingWindowSpec->TimeExtractor->Init(ctx, src)) {
hasError = true;
}
- src->SetHoppingWindowSpec(HoppingWindowSpec);
+ src->SetLegacyHoppingWindowSpec(LegacyHoppingWindowSpec);
}
for (auto& term: Terms) {
@@ -2109,7 +2113,7 @@ private:
TVector<TNodePtr> Without;
const bool Distinct;
bool OrderByInit = false;
- THoppingWindowSpecPtr HoppingWindowSpec;
+ TLegacyHoppingWindowSpecPtr LegacyHoppingWindowSpec;
const bool SelectStream;
const TWriteSettings Settings;
};
@@ -2480,7 +2484,7 @@ TSourcePtr DoBuildSelectCore(
const TVector<TSortSpecificationPtr>& orderBy,
TNodePtr having,
TWinSpecs&& winSpecs,
- THoppingWindowSpecPtr hoppingWindowSpec,
+ TLegacyHoppingWindowSpecPtr legacyHoppingWindowSpec,
TVector<TNodePtr>&& terms,
bool distinct,
TVector<TNodePtr>&& without,
@@ -2489,7 +2493,7 @@ TSourcePtr DoBuildSelectCore(
) {
if (groupBy.empty() || !groupBy.front()->ContentListPtr()) {
return new TSelectCore(pos, std::move(source), groupByExpr, groupBy, compactGroupBy, groupBySuffix, assumeSorted,
- orderBy, having, winSpecs, hoppingWindowSpec, terms, distinct, without, selectStream, settings);
+ orderBy, having, winSpecs, legacyHoppingWindowSpec, terms, distinct, without, selectStream, settings);
}
if (groupBy.size() == 1) {
/// actualy no big idea to use grouping function in this case (result allways 0)
@@ -2497,7 +2501,7 @@ TSourcePtr DoBuildSelectCore(
source = new TNestedProxySource(pos, *contentPtr, source);
return DoBuildSelectCore(ctx, pos, originalSource, source, groupByExpr, *contentPtr, compactGroupBy, groupBySuffix,
assumeSorted, orderBy, having, std::move(winSpecs),
- hoppingWindowSpec, std::move(terms), distinct, std::move(without), selectStream, settings);
+ legacyHoppingWindowSpec, std::move(terms), distinct, std::move(without), selectStream, settings);
}
/// \todo some smart merge logic, generalize common part of grouping (expr, flatten, etc)?
TIntrusivePtr<TCompositeSelect> compositeSelect = new TCompositeSelect(pos, std::move(source), originalSource->CloneSource(), settings);
@@ -2524,7 +2528,7 @@ TSourcePtr DoBuildSelectCore(
totalGroups += contentPtr->size();
TSelectCore* selectCore = new TSelectCore(pos, std::move(proxySource), CloneContainer(groupByExpr),
CloneContainer(*contentPtr), compactGroupBy, groupBySuffix, assumeSorted, orderBy, SafeClone(having), winSpecs,
- hoppingWindowSpec, terms, distinct, without, selectStream, settings);
+ legacyHoppingWindowSpec, terms, distinct, without, selectStream, settings);
subselects.emplace_back(selectCore);
}
if (totalGroups > ctx.PragmaGroupByLimit) {
@@ -2549,7 +2553,7 @@ TSourcePtr BuildSelectCore(
const TVector<TSortSpecificationPtr>& orderBy,
TNodePtr having,
TWinSpecs&& winSpecs,
- THoppingWindowSpecPtr hoppingWindowSpec,
+ TLegacyHoppingWindowSpecPtr legacyHoppingWindowSpec,
TVector<TNodePtr>&& terms,
bool distinct,
TVector<TNodePtr>&& without,
@@ -2557,7 +2561,7 @@ TSourcePtr BuildSelectCore(
const TWriteSettings& settings)
{
return DoBuildSelectCore(ctx, pos, source, source, groupByExpr, groupBy, compactGroupBy, groupBySuffix, assumeSorted, orderBy,
- having, std::move(winSpecs), hoppingWindowSpec, std::move(terms), distinct, std::move(without), selectStream, settings);
+ having, std::move(winSpecs), legacyHoppingWindowSpec, std::move(terms), distinct, std::move(without), selectStream, settings);
}
class TUnionAll: public IRealSource {
diff --git a/ydb/library/yql/sql/v1/sql.cpp b/ydb/library/yql/sql/v1/sql.cpp
index 1594a3095b9..b86952f1ba8 100644
--- a/ydb/library/yql/sql/v1/sql.cpp
+++ b/ydb/library/yql/sql/v1/sql.cpp
@@ -3593,13 +3593,13 @@ public:
, CompactGroupBy(false)
{}
- bool Build(const TRule_group_by_clause& node, bool stream);
+ bool Build(const TRule_group_by_clause& node);
bool ParseList(const TRule_grouping_element_list& groupingListNode, EGroupByFeatures featureContext);
void SetFeatures(const TString& field) const;
TVector<TNodePtr>& Content();
TMap<TString, TNodePtr>& Aliases();
- THoppingWindowSpecPtr GetHoppingWindow() const;
+ TLegacyHoppingWindowSpecPtr GetLegacyHoppingWindow() const;
bool IsCompactGroupBy() const;
TString GetSuffix() const;
@@ -3622,7 +3622,7 @@ private:
TVector<TNodePtr> GroupBySet;
TGroupByClauseCtx::TPtr GroupSetContext;
- THoppingWindowSpecPtr HoppingWindowSpec; // stream queries
+ TLegacyHoppingWindowSpecPtr LegacyHoppingWindowSpec; // stream queries
static const TString AutogenerateNamePrefix;
bool CompactGroupBy;
TString Suffix;
@@ -6710,23 +6710,30 @@ TSourcePtr TSqlSelect::SelectCore(const TRule_select_core& node, const TWriteSet
/// \todo merge gtoupByExpr and groupBy in one
TVector<TNodePtr> groupByExpr, groupBy;
- THoppingWindowSpecPtr hoppingWindowSpec;
+ TLegacyHoppingWindowSpecPtr legacyHoppingWindowSpec;
bool compactGroupBy = false;
TString groupBySuffix;
if (node.HasBlock11()) {
TGroupByClause clause(Ctx, Mode);
- if (!clause.Build(node.GetBlock11().GetRule_group_by_clause1(), source->IsStream())) {
+ if (!clause.Build(node.GetBlock11().GetRule_group_by_clause1())) {
return nullptr;
}
+ bool hasHopping = (bool)clause.GetLegacyHoppingWindow();
for (const auto& exprAlias: clause.Aliases()) {
YQL_ENSURE(exprAlias.first == exprAlias.second->GetLabel());
groupByExpr.emplace_back(exprAlias.second);
+ hasHopping |= (bool)dynamic_cast<THoppingWindow*>(exprAlias.second.Get());
}
groupBy = std::move(clause.Content());
clause.SetFeatures("sql_features");
- hoppingWindowSpec = clause.GetHoppingWindow();
+ legacyHoppingWindowSpec = clause.GetLegacyHoppingWindow();
compactGroupBy = clause.IsCompactGroupBy();
groupBySuffix = clause.GetSuffix();
+
+ if (source->IsStream() && !hasHopping) {
+ Ctx.Error() << "Streaming group by query must have a hopping window specification.";
+ return nullptr;
+ }
}
TNodePtr having;
@@ -6815,7 +6822,7 @@ TSourcePtr TSqlSelect::SelectCore(const TRule_select_core& node, const TWriteSet
return nullptr;
}
return BuildSelectCore(Ctx, startPos, std::move(source), groupByExpr, groupBy, compactGroupBy, groupBySuffix, assumeSorted, orderBy, having,
- std::move(windowSpec), hoppingWindowSpec, std::move(terms), distinct, std::move(without), selectStream, settings);
+ std::move(windowSpec), legacyHoppingWindowSpec, std::move(terms), distinct, std::move(without), selectStream, settings);
}
TString TSqlTranslation::FrameSettingsToString(EFrameSettings settings, bool isUnbounded) {
@@ -7188,7 +7195,7 @@ bool TSqlTranslation::OrderByClause(const TRule_order_by_clause& node, TVector<T
return SortSpecificationList(node.GetRule_sort_specification_list3(), orderBy);
}
-bool TGroupByClause::Build(const TRule_group_by_clause& node, bool stream) {
+bool TGroupByClause::Build(const TRule_group_by_clause& node) {
// group_by_clause: GROUP COMPACT? BY opt_set_quantifier grouping_element_list (WITH an_id)?;
CompactGroupBy = node.HasBlock2();
if (!CompactGroupBy) {
@@ -7236,12 +7243,6 @@ bool TGroupByClause::Build(const TRule_group_by_clause& node, bool stream) {
if (!ResolveGroupByAndGrouping()) {
return false;
}
- if (stream) {
- if (!HoppingWindowSpec) {
- Ctx.Error() << "Streaming group by query must have a hopping window specification.";
- return false;
- }
- }
return true;
}
@@ -7288,8 +7289,8 @@ TMap<TString, TNodePtr>& TGroupByClause::Aliases() {
return GroupSetContext->NodeAliases;
}
-THoppingWindowSpecPtr TGroupByClause::GetHoppingWindow() const {
- return HoppingWindowSpec;
+TLegacyHoppingWindowSpecPtr TGroupByClause::GetLegacyHoppingWindow() const {
+ return LegacyHoppingWindowSpec;
}
bool TGroupByClause::IsCompactGroupBy() const {
@@ -7543,16 +7544,16 @@ bool TGroupByClause::OrdinaryGroupingSetList(const TRule_ordinary_grouping_set_l
}
bool TGroupByClause::HoppingWindow(const TRule_hopping_window_specification& node) {
- if (HoppingWindowSpec) {
+ if (LegacyHoppingWindowSpec) {
Ctx.Error() << "Duplicate hopping window specification.";
return false;
}
- HoppingWindowSpec = new THoppingWindowSpec;
+ LegacyHoppingWindowSpec = new TLegacyHoppingWindowSpec;
{
TColumnRefScope scope(Ctx, EColumnRefState::Allow);
TSqlExpression expr(Ctx, Mode);
- HoppingWindowSpec->TimeExtractor = expr.Build(node.GetRule_expr3());
- if (!HoppingWindowSpec->TimeExtractor) {
+ LegacyHoppingWindowSpec->TimeExtractor = expr.Build(node.GetRule_expr3());
+ if (!LegacyHoppingWindowSpec->TimeExtractor) {
return false;
}
}
@@ -7591,19 +7592,19 @@ bool TGroupByClause::HoppingWindow(const TRule_hopping_window_specification& nod
});
};
- HoppingWindowSpec->Hop = processIntervalParam(node.GetRule_expr5());
- if (!HoppingWindowSpec->Hop) {
+ LegacyHoppingWindowSpec->Hop = processIntervalParam(node.GetRule_expr5());
+ if (!LegacyHoppingWindowSpec->Hop) {
return false;
}
- HoppingWindowSpec->Interval = processIntervalParam(node.GetRule_expr7());
- if (!HoppingWindowSpec->Interval) {
+ LegacyHoppingWindowSpec->Interval = processIntervalParam(node.GetRule_expr7());
+ if (!LegacyHoppingWindowSpec->Interval) {
return false;
}
- HoppingWindowSpec->Delay = processIntervalParam(node.GetRule_expr9());
- if (!HoppingWindowSpec->Delay) {
+ LegacyHoppingWindowSpec->Delay = processIntervalParam(node.GetRule_expr9());
+ if (!LegacyHoppingWindowSpec->Delay) {
return false;
}
- HoppingWindowSpec->DataWatermarks = Ctx.PragmaDataWatermarks;
+ LegacyHoppingWindowSpec->DataWatermarks = Ctx.PragmaDataWatermarks;
return true;
}
@@ -7886,7 +7887,7 @@ TSourcePtr TSqlSelect::Build(const TRule& node, TPosition pos, TSelectKindResult
TString groupBySuffix = "";
TNodePtr having;
TWinSpecs winSpecs;
- THoppingWindowSpecPtr hoppingWindowSpec;
+ TLegacyHoppingWindowSpecPtr legacyHoppingWindowSpec;
bool distinct = false;
TVector<TNodePtr> without;
bool stream = false;
@@ -7895,7 +7896,7 @@ TSourcePtr TSqlSelect::Build(const TRule& node, TPosition pos, TSelectKindResult
terms.push_back(BuildColumn(unionPos, "*", ""));
result = BuildSelectCore(Ctx, unionPos, std::move(result), groupByExpr, groupBy, compactGroupBy, groupBySuffix,
- assumeOrderBy, orderBy, having, std::move(winSpecs), hoppingWindowSpec, std::move(terms),
+ assumeOrderBy, orderBy, having, std::move(winSpecs), legacyHoppingWindowSpec, std::move(terms),
distinct, std::move(without), stream, settings);
} else {
result = BuildUnionAll(unionPos, std::move(sources), settings);