diff options
author | vokayndzop <vokayndzop@yandex-team.com> | 2025-01-23 16:59:05 +0300 |
---|---|---|
committer | vokayndzop <vokayndzop@yandex-team.com> | 2025-01-23 17:39:07 +0300 |
commit | 8b11fd720733f22796dbcff181d79102d7ebd049 (patch) | |
tree | 98010b2a23fa2a492fd8c9f0a362d875134111ce | |
parent | 203ad1bbfb63b4c33a489508b126164e58d55271 (diff) | |
download | ydb-8b11fd720733f22796dbcff181d79102d7ebd049.tar.gz |
HOP: fix limitations
commit_hash:edf4149577b2957f7f4de19b59737b4df4654924
9 files changed, 107 insertions, 49 deletions
diff --git a/yql/essentials/core/yql_opt_hopping.cpp b/yql/essentials/core/yql_opt_hopping.cpp index c1cfb7ec91..dfa5073986 100644 --- a/yql/essentials/core/yql_opt_hopping.cpp +++ b/yql/essentials/core/yql_opt_hopping.cpp @@ -159,10 +159,13 @@ void EnsureNotDistinct(const TCoAggregate& aggregate) { TMaybe<THoppingTraits> ExtractHopTraits(const TCoAggregate& aggregate, TExprContext& ctx, bool analyticsMode) { const auto pos = aggregate.Pos(); + const auto addError = [&](TStringBuf message) { + ctx.AddError(TIssue(ctx.GetPosition(pos), message)); + }; const auto hopSetting = GetSetting(aggregate.Settings().Ref(), "hopping"); if (!hopSetting) { - ctx.AddError(TIssue(ctx.GetPosition(pos), "Aggregate over stream must have 'hopping' setting")); + addError("Aggregate over stream must have 'hopping' setting"); return Nothing(); } @@ -176,47 +179,67 @@ TMaybe<THoppingTraits> ExtractHopTraits(const TCoAggregate& aggregate, TExprCont const auto maybeTraits = TMaybeNode<TCoHoppingTraits>(traitsNode); if (!maybeTraits) { - ctx.AddError(TIssue(ctx.GetPosition(pos), "Invalid 'hopping' setting in Aggregate")); + addError("Invalid 'hopping' setting in Aggregate"); return Nothing(); } const auto traits = maybeTraits.Cast(); - const auto checkIntervalParam = [&] (TExprBase param) -> ui64 { + const auto checkIntervalParam = [&](TExprBase param) -> TMaybe<i64> { if (param.Maybe<TCoJust>()) { param = param.Cast<TCoJust>().Input(); } if (!param.Maybe<TCoInterval>()) { - ctx.AddError(TIssue(ctx.GetPosition(pos), "Not an interval data ctor")); - return 0; - } - auto value = FromString<i64>(param.Cast<TCoInterval>().Literal().Value()); - if (value <= 0) { - ctx.AddError(TIssue(ctx.GetPosition(pos), "Interval value must be positive")); - return 0; + addError("Not an interval data ctor"); + return Nothing(); } - return (ui64)value; + return FromString<i64>(param.Cast<TCoInterval>().Literal().Value()); }; const auto hop = checkIntervalParam(traits.Hop()); if (!hop) { return Nothing(); } + const auto hopTime = *hop; + const auto interval = checkIntervalParam(traits.Interval()); if (!interval) { return Nothing(); } + const auto intervalTime = *interval; + const auto delay = checkIntervalParam(traits.Delay()); if (!delay) { return Nothing(); } + const auto delayTime = *delay; - if (interval < hop) { - ctx.AddError(TIssue(ctx.GetPosition(pos), "Interval must be greater or equal then hop")); + if (hopTime <= 0) { + addError("Hop time must be positive"); + return Nothing(); + } + if (intervalTime <= 0) { + addError("Interval time must be positive"); + return Nothing(); + } + if (delayTime < 0) { + addError("Delay time must be non-negative"); + return Nothing(); + } + if (intervalTime % hopTime) { + addError("Interval time must be divisible by hop time"); + return Nothing(); + } + if (delayTime % hopTime) { + addError("Delay time must be divisible by hop time"); + return Nothing(); + } + if (intervalTime / hopTime > 100'000) { + addError("Too many hops in interval"); return Nothing(); } - if (delay < hop) { - ctx.AddError(TIssue(ctx.GetPosition(pos), "Delay must be greater or equal then hop")); + if (delayTime / hopTime > 100'000) { + addError("Too many hops in delay"); return Nothing(); } @@ -230,9 +253,9 @@ TMaybe<THoppingTraits> ExtractHopTraits(const TCoAggregate& aggregate, TExprCont return THoppingTraits { hoppingColumn, newTraits, - hop, - interval, - delay + static_cast<ui64>(hopTime), + static_cast<ui64>(intervalTime), + static_cast<ui64>(delayTime), }; } diff --git a/yql/essentials/minikql/comp_nodes/mkql_hopping.cpp b/yql/essentials/minikql/comp_nodes/mkql_hopping.cpp index 5fc8843292..8866dfe3a0 100644 --- a/yql/essentials/minikql/comp_nodes/mkql_hopping.cpp +++ b/yql/essentials/minikql/comp_nodes/mkql_hopping.cpp @@ -265,22 +265,13 @@ public: } NUdf::TUnboxedValuePod DoCalculate(TComputationContext& ctx) const { - const auto hopTime = Hop->GetValue(ctx).Get<i64>(); - const auto interval = Interval->GetValue(ctx).Get<i64>(); - const auto delay = Delay->GetValue(ctx).Get<i64>(); - - // TODO: move checks from here - MKQL_ENSURE(hopTime > 0, "hop must be positive"); - MKQL_ENSURE(interval >= hopTime, "interval should be greater or equal to hop"); - MKQL_ENSURE(delay >= hopTime, "delay should be greater or equal to hop"); - + const auto hopTime = Hop->GetValue(ctx).Get<ui64>(); + const auto interval = Interval->GetValue(ctx).Get<ui64>(); + const auto delay = Delay->GetValue(ctx).Get<ui64>(); const auto intervalHopCount = interval / hopTime; const auto delayHopCount = delay / hopTime; - MKQL_ENSURE(intervalHopCount <= 100000, "too many hops in interval"); - MKQL_ENSURE(delayHopCount <= 100000, "too many hops in delay"); - - return ctx.HolderFactory.Create<TStreamValue>(Stream->GetValue(ctx), this, (ui64)hopTime, (ui64)intervalHopCount, (ui64)delayHopCount, ctx); + return ctx.HolderFactory.Create<TStreamValue>(Stream->GetValue(ctx), this, hopTime, intervalHopCount, delayHopCount, ctx); } private: diff --git a/yql/essentials/minikql/comp_nodes/mkql_multihopping.cpp b/yql/essentials/minikql/comp_nodes/mkql_multihopping.cpp index a7385531c6..735cd9cab8 100644 --- a/yql/essentials/minikql/comp_nodes/mkql_multihopping.cpp +++ b/yql/essentials/minikql/comp_nodes/mkql_multihopping.cpp @@ -483,29 +483,27 @@ public: } NUdf::TUnboxedValuePod CreateStream(TComputationContext& ctx) const { - const auto hopTime = Hop->GetValue(ctx).Get<i64>(); - const auto interval = Interval->GetValue(ctx).Get<i64>(); - const auto delay = Delay->GetValue(ctx).Get<i64>(); + const auto hopTime = Hop->GetValue(ctx).Get<ui64>(); + const auto interval = Interval->GetValue(ctx).Get<ui64>(); + const auto delay = Delay->GetValue(ctx).Get<ui64>(); const auto dataWatermarks = DataWatermarks->GetValue(ctx).Get<bool>(); const auto watermarkMode = WatermarkMode->GetValue(ctx).Get<bool>(); - - // TODO: move checks from here - MKQL_ENSURE(hopTime > 0, "hop must be positive"); - MKQL_ENSURE(interval >= hopTime, "interval should be greater or equal to hop"); - MKQL_ENSURE(delay >= hopTime, "delay should be greater or equal to hop"); - const auto intervalHopCount = interval / hopTime; const auto delayHopCount = delay / hopTime; - MKQL_ENSURE(intervalHopCount <= 100000, "too many hops in interval"); - MKQL_ENSURE(delayHopCount <= 100000, "too many hops in delay"); - - return ctx.HolderFactory.Create<TStreamValue>(Stream->GetValue(ctx), this, (ui64)hopTime, - (ui64)intervalHopCount, (ui64)delayHopCount, - dataWatermarks, watermarkMode, ctx, - TValueHasher(KeyTypes, IsTuple, Hash.Get()), - TValueEqual(KeyTypes, IsTuple, Equate.Get()), - Watermark); + return ctx.HolderFactory.Create<TStreamValue>( + Stream->GetValue(ctx), + this, + hopTime, + intervalHopCount, + delayHopCount, + dataWatermarks, + watermarkMode, + ctx, + TValueHasher(KeyTypes, IsTuple, Hash.Get()), + TValueEqual(KeyTypes, IsTuple, Equate.Get()), + Watermark + ); } NUdf::TUnboxedValue GetValue(TComputationContext& compCtx) const override { diff --git a/yt/yql/tests/sql/suites/aggregate/group_by_hop_bad_delay.cfg b/yt/yql/tests/sql/suites/aggregate/group_by_hop_bad_delay.cfg new file mode 100644 index 0000000000..f03d38e18c --- /dev/null +++ b/yt/yql/tests/sql/suites/aggregate/group_by_hop_bad_delay.cfg @@ -0,0 +1,2 @@ +in Input session1.txt +xfail diff --git a/yt/yql/tests/sql/suites/aggregate/group_by_hop_bad_delay.sql b/yt/yql/tests/sql/suites/aggregate/group_by_hop_bad_delay.sql new file mode 100644 index 0000000000..e38246948c --- /dev/null +++ b/yt/yql/tests/sql/suites/aggregate/group_by_hop_bad_delay.sql @@ -0,0 +1,14 @@ +/* syntax version 1 */ +/* postgres can not */ +/* ytfile can not */ +/* yt can not */ +/* custom error: Delay time must be divisible by hop time */ + +PRAGMA dq.AnalyticsHopping="true"; + +SELECT + user, + HOP_START() as ts, + SUM(payload) as payload +FROM plato.Input +GROUP BY HOP(DateTime::FromSeconds(CAST(ts as Uint32)), "PT10S", "PT10S", "PT11S"), user; diff --git a/yt/yql/tests/sql/suites/aggregate/group_by_hop_bad_interval.cfg b/yt/yql/tests/sql/suites/aggregate/group_by_hop_bad_interval.cfg new file mode 100644 index 0000000000..f03d38e18c --- /dev/null +++ b/yt/yql/tests/sql/suites/aggregate/group_by_hop_bad_interval.cfg @@ -0,0 +1,2 @@ +in Input session1.txt +xfail diff --git a/yt/yql/tests/sql/suites/aggregate/group_by_hop_bad_interval.sql b/yt/yql/tests/sql/suites/aggregate/group_by_hop_bad_interval.sql new file mode 100644 index 0000000000..59c4da1a90 --- /dev/null +++ b/yt/yql/tests/sql/suites/aggregate/group_by_hop_bad_interval.sql @@ -0,0 +1,14 @@ +/* syntax version 1 */ +/* postgres can not */ +/* ytfile can not */ +/* yt can not */ +/* custom error: Interval time must be divisible by hop time */ + +PRAGMA dq.AnalyticsHopping="true"; + +SELECT + user, + HOP_START() as ts, + SUM(payload) as payload +FROM plato.Input +GROUP BY HOP(DateTime::FromSeconds(CAST(ts as Uint32)), "PT10S", "PT11S", "PT10S"), user; diff --git a/yt/yql/tests/sql/suites/aggregate/group_by_hop_zero_delay.cfg b/yt/yql/tests/sql/suites/aggregate/group_by_hop_zero_delay.cfg new file mode 100644 index 0000000000..c788a7d1ec --- /dev/null +++ b/yt/yql/tests/sql/suites/aggregate/group_by_hop_zero_delay.cfg @@ -0,0 +1 @@ +in Input session1.txt diff --git a/yt/yql/tests/sql/suites/aggregate/group_by_hop_zero_delay.sql b/yt/yql/tests/sql/suites/aggregate/group_by_hop_zero_delay.sql new file mode 100644 index 0000000000..3472455d18 --- /dev/null +++ b/yt/yql/tests/sql/suites/aggregate/group_by_hop_zero_delay.sql @@ -0,0 +1,13 @@ +/* syntax version 1 */ +/* postgres can not */ +/* ytfile can not */ +/* yt can not */ + +PRAGMA dq.AnalyticsHopping="true"; + +SELECT + user, + HOP_START() as ts, + SUM(payload) as payload +FROM plato.Input +GROUP BY HOP(DateTime::FromSeconds(CAST(ts as Uint32)), "PT10S", "PT10S", "PT0S"), user; |