aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorvokayndzop <vokayndzop@yandex-team.com>2025-01-23 16:59:05 +0300
committervokayndzop <vokayndzop@yandex-team.com>2025-01-23 17:39:07 +0300
commit8b11fd720733f22796dbcff181d79102d7ebd049 (patch)
tree98010b2a23fa2a492fd8c9f0a362d875134111ce
parent203ad1bbfb63b4c33a489508b126164e58d55271 (diff)
downloadydb-8b11fd720733f22796dbcff181d79102d7ebd049.tar.gz
HOP: fix limitations
commit_hash:edf4149577b2957f7f4de19b59737b4df4654924
-rw-r--r--yql/essentials/core/yql_opt_hopping.cpp59
-rw-r--r--yql/essentials/minikql/comp_nodes/mkql_hopping.cpp17
-rw-r--r--yql/essentials/minikql/comp_nodes/mkql_multihopping.cpp34
-rw-r--r--yt/yql/tests/sql/suites/aggregate/group_by_hop_bad_delay.cfg2
-rw-r--r--yt/yql/tests/sql/suites/aggregate/group_by_hop_bad_delay.sql14
-rw-r--r--yt/yql/tests/sql/suites/aggregate/group_by_hop_bad_interval.cfg2
-rw-r--r--yt/yql/tests/sql/suites/aggregate/group_by_hop_bad_interval.sql14
-rw-r--r--yt/yql/tests/sql/suites/aggregate/group_by_hop_zero_delay.cfg1
-rw-r--r--yt/yql/tests/sql/suites/aggregate/group_by_hop_zero_delay.sql13
9 files changed, 107 insertions, 49 deletions
diff --git a/yql/essentials/core/yql_opt_hopping.cpp b/yql/essentials/core/yql_opt_hopping.cpp
index c1cfb7ec91..dfa5073986 100644
--- a/yql/essentials/core/yql_opt_hopping.cpp
+++ b/yql/essentials/core/yql_opt_hopping.cpp
@@ -159,10 +159,13 @@ void EnsureNotDistinct(const TCoAggregate& aggregate) {
TMaybe<THoppingTraits> ExtractHopTraits(const TCoAggregate& aggregate, TExprContext& ctx, bool analyticsMode) {
const auto pos = aggregate.Pos();
+ const auto addError = [&](TStringBuf message) {
+ ctx.AddError(TIssue(ctx.GetPosition(pos), message));
+ };
const auto hopSetting = GetSetting(aggregate.Settings().Ref(), "hopping");
if (!hopSetting) {
- ctx.AddError(TIssue(ctx.GetPosition(pos), "Aggregate over stream must have 'hopping' setting"));
+ addError("Aggregate over stream must have 'hopping' setting");
return Nothing();
}
@@ -176,47 +179,67 @@ TMaybe<THoppingTraits> ExtractHopTraits(const TCoAggregate& aggregate, TExprCont
const auto maybeTraits = TMaybeNode<TCoHoppingTraits>(traitsNode);
if (!maybeTraits) {
- ctx.AddError(TIssue(ctx.GetPosition(pos), "Invalid 'hopping' setting in Aggregate"));
+ addError("Invalid 'hopping' setting in Aggregate");
return Nothing();
}
const auto traits = maybeTraits.Cast();
- const auto checkIntervalParam = [&] (TExprBase param) -> ui64 {
+ const auto checkIntervalParam = [&](TExprBase param) -> TMaybe<i64> {
if (param.Maybe<TCoJust>()) {
param = param.Cast<TCoJust>().Input();
}
if (!param.Maybe<TCoInterval>()) {
- ctx.AddError(TIssue(ctx.GetPosition(pos), "Not an interval data ctor"));
- return 0;
- }
- auto value = FromString<i64>(param.Cast<TCoInterval>().Literal().Value());
- if (value <= 0) {
- ctx.AddError(TIssue(ctx.GetPosition(pos), "Interval value must be positive"));
- return 0;
+ addError("Not an interval data ctor");
+ return Nothing();
}
- return (ui64)value;
+ return FromString<i64>(param.Cast<TCoInterval>().Literal().Value());
};
const auto hop = checkIntervalParam(traits.Hop());
if (!hop) {
return Nothing();
}
+ const auto hopTime = *hop;
+
const auto interval = checkIntervalParam(traits.Interval());
if (!interval) {
return Nothing();
}
+ const auto intervalTime = *interval;
+
const auto delay = checkIntervalParam(traits.Delay());
if (!delay) {
return Nothing();
}
+ const auto delayTime = *delay;
- if (interval < hop) {
- ctx.AddError(TIssue(ctx.GetPosition(pos), "Interval must be greater or equal then hop"));
+ if (hopTime <= 0) {
+ addError("Hop time must be positive");
+ return Nothing();
+ }
+ if (intervalTime <= 0) {
+ addError("Interval time must be positive");
+ return Nothing();
+ }
+ if (delayTime < 0) {
+ addError("Delay time must be non-negative");
+ return Nothing();
+ }
+ if (intervalTime % hopTime) {
+ addError("Interval time must be divisible by hop time");
+ return Nothing();
+ }
+ if (delayTime % hopTime) {
+ addError("Delay time must be divisible by hop time");
+ return Nothing();
+ }
+ if (intervalTime / hopTime > 100'000) {
+ addError("Too many hops in interval");
return Nothing();
}
- if (delay < hop) {
- ctx.AddError(TIssue(ctx.GetPosition(pos), "Delay must be greater or equal then hop"));
+ if (delayTime / hopTime > 100'000) {
+ addError("Too many hops in delay");
return Nothing();
}
@@ -230,9 +253,9 @@ TMaybe<THoppingTraits> ExtractHopTraits(const TCoAggregate& aggregate, TExprCont
return THoppingTraits {
hoppingColumn,
newTraits,
- hop,
- interval,
- delay
+ static_cast<ui64>(hopTime),
+ static_cast<ui64>(intervalTime),
+ static_cast<ui64>(delayTime),
};
}
diff --git a/yql/essentials/minikql/comp_nodes/mkql_hopping.cpp b/yql/essentials/minikql/comp_nodes/mkql_hopping.cpp
index 5fc8843292..8866dfe3a0 100644
--- a/yql/essentials/minikql/comp_nodes/mkql_hopping.cpp
+++ b/yql/essentials/minikql/comp_nodes/mkql_hopping.cpp
@@ -265,22 +265,13 @@ public:
}
NUdf::TUnboxedValuePod DoCalculate(TComputationContext& ctx) const {
- const auto hopTime = Hop->GetValue(ctx).Get<i64>();
- const auto interval = Interval->GetValue(ctx).Get<i64>();
- const auto delay = Delay->GetValue(ctx).Get<i64>();
-
- // TODO: move checks from here
- MKQL_ENSURE(hopTime > 0, "hop must be positive");
- MKQL_ENSURE(interval >= hopTime, "interval should be greater or equal to hop");
- MKQL_ENSURE(delay >= hopTime, "delay should be greater or equal to hop");
-
+ const auto hopTime = Hop->GetValue(ctx).Get<ui64>();
+ const auto interval = Interval->GetValue(ctx).Get<ui64>();
+ const auto delay = Delay->GetValue(ctx).Get<ui64>();
const auto intervalHopCount = interval / hopTime;
const auto delayHopCount = delay / hopTime;
- MKQL_ENSURE(intervalHopCount <= 100000, "too many hops in interval");
- MKQL_ENSURE(delayHopCount <= 100000, "too many hops in delay");
-
- return ctx.HolderFactory.Create<TStreamValue>(Stream->GetValue(ctx), this, (ui64)hopTime, (ui64)intervalHopCount, (ui64)delayHopCount, ctx);
+ return ctx.HolderFactory.Create<TStreamValue>(Stream->GetValue(ctx), this, hopTime, intervalHopCount, delayHopCount, ctx);
}
private:
diff --git a/yql/essentials/minikql/comp_nodes/mkql_multihopping.cpp b/yql/essentials/minikql/comp_nodes/mkql_multihopping.cpp
index a7385531c6..735cd9cab8 100644
--- a/yql/essentials/minikql/comp_nodes/mkql_multihopping.cpp
+++ b/yql/essentials/minikql/comp_nodes/mkql_multihopping.cpp
@@ -483,29 +483,27 @@ public:
}
NUdf::TUnboxedValuePod CreateStream(TComputationContext& ctx) const {
- const auto hopTime = Hop->GetValue(ctx).Get<i64>();
- const auto interval = Interval->GetValue(ctx).Get<i64>();
- const auto delay = Delay->GetValue(ctx).Get<i64>();
+ const auto hopTime = Hop->GetValue(ctx).Get<ui64>();
+ const auto interval = Interval->GetValue(ctx).Get<ui64>();
+ const auto delay = Delay->GetValue(ctx).Get<ui64>();
const auto dataWatermarks = DataWatermarks->GetValue(ctx).Get<bool>();
const auto watermarkMode = WatermarkMode->GetValue(ctx).Get<bool>();
-
- // TODO: move checks from here
- MKQL_ENSURE(hopTime > 0, "hop must be positive");
- MKQL_ENSURE(interval >= hopTime, "interval should be greater or equal to hop");
- MKQL_ENSURE(delay >= hopTime, "delay should be greater or equal to hop");
-
const auto intervalHopCount = interval / hopTime;
const auto delayHopCount = delay / hopTime;
- MKQL_ENSURE(intervalHopCount <= 100000, "too many hops in interval");
- MKQL_ENSURE(delayHopCount <= 100000, "too many hops in delay");
-
- return ctx.HolderFactory.Create<TStreamValue>(Stream->GetValue(ctx), this, (ui64)hopTime,
- (ui64)intervalHopCount, (ui64)delayHopCount,
- dataWatermarks, watermarkMode, ctx,
- TValueHasher(KeyTypes, IsTuple, Hash.Get()),
- TValueEqual(KeyTypes, IsTuple, Equate.Get()),
- Watermark);
+ return ctx.HolderFactory.Create<TStreamValue>(
+ Stream->GetValue(ctx),
+ this,
+ hopTime,
+ intervalHopCount,
+ delayHopCount,
+ dataWatermarks,
+ watermarkMode,
+ ctx,
+ TValueHasher(KeyTypes, IsTuple, Hash.Get()),
+ TValueEqual(KeyTypes, IsTuple, Equate.Get()),
+ Watermark
+ );
}
NUdf::TUnboxedValue GetValue(TComputationContext& compCtx) const override {
diff --git a/yt/yql/tests/sql/suites/aggregate/group_by_hop_bad_delay.cfg b/yt/yql/tests/sql/suites/aggregate/group_by_hop_bad_delay.cfg
new file mode 100644
index 0000000000..f03d38e18c
--- /dev/null
+++ b/yt/yql/tests/sql/suites/aggregate/group_by_hop_bad_delay.cfg
@@ -0,0 +1,2 @@
+in Input session1.txt
+xfail
diff --git a/yt/yql/tests/sql/suites/aggregate/group_by_hop_bad_delay.sql b/yt/yql/tests/sql/suites/aggregate/group_by_hop_bad_delay.sql
new file mode 100644
index 0000000000..e38246948c
--- /dev/null
+++ b/yt/yql/tests/sql/suites/aggregate/group_by_hop_bad_delay.sql
@@ -0,0 +1,14 @@
+/* syntax version 1 */
+/* postgres can not */
+/* ytfile can not */
+/* yt can not */
+/* custom error: Delay time must be divisible by hop time */
+
+PRAGMA dq.AnalyticsHopping="true";
+
+SELECT
+ user,
+ HOP_START() as ts,
+ SUM(payload) as payload
+FROM plato.Input
+GROUP BY HOP(DateTime::FromSeconds(CAST(ts as Uint32)), "PT10S", "PT10S", "PT11S"), user;
diff --git a/yt/yql/tests/sql/suites/aggregate/group_by_hop_bad_interval.cfg b/yt/yql/tests/sql/suites/aggregate/group_by_hop_bad_interval.cfg
new file mode 100644
index 0000000000..f03d38e18c
--- /dev/null
+++ b/yt/yql/tests/sql/suites/aggregate/group_by_hop_bad_interval.cfg
@@ -0,0 +1,2 @@
+in Input session1.txt
+xfail
diff --git a/yt/yql/tests/sql/suites/aggregate/group_by_hop_bad_interval.sql b/yt/yql/tests/sql/suites/aggregate/group_by_hop_bad_interval.sql
new file mode 100644
index 0000000000..59c4da1a90
--- /dev/null
+++ b/yt/yql/tests/sql/suites/aggregate/group_by_hop_bad_interval.sql
@@ -0,0 +1,14 @@
+/* syntax version 1 */
+/* postgres can not */
+/* ytfile can not */
+/* yt can not */
+/* custom error: Interval time must be divisible by hop time */
+
+PRAGMA dq.AnalyticsHopping="true";
+
+SELECT
+ user,
+ HOP_START() as ts,
+ SUM(payload) as payload
+FROM plato.Input
+GROUP BY HOP(DateTime::FromSeconds(CAST(ts as Uint32)), "PT10S", "PT11S", "PT10S"), user;
diff --git a/yt/yql/tests/sql/suites/aggregate/group_by_hop_zero_delay.cfg b/yt/yql/tests/sql/suites/aggregate/group_by_hop_zero_delay.cfg
new file mode 100644
index 0000000000..c788a7d1ec
--- /dev/null
+++ b/yt/yql/tests/sql/suites/aggregate/group_by_hop_zero_delay.cfg
@@ -0,0 +1 @@
+in Input session1.txt
diff --git a/yt/yql/tests/sql/suites/aggregate/group_by_hop_zero_delay.sql b/yt/yql/tests/sql/suites/aggregate/group_by_hop_zero_delay.sql
new file mode 100644
index 0000000000..3472455d18
--- /dev/null
+++ b/yt/yql/tests/sql/suites/aggregate/group_by_hop_zero_delay.sql
@@ -0,0 +1,13 @@
+/* syntax version 1 */
+/* postgres can not */
+/* ytfile can not */
+/* yt can not */
+
+PRAGMA dq.AnalyticsHopping="true";
+
+SELECT
+ user,
+ HOP_START() as ts,
+ SUM(payload) as payload
+FROM plato.Input
+GROUP BY HOP(DateTime::FromSeconds(CAST(ts as Uint32)), "PT10S", "PT10S", "PT0S"), user;