aboutsummaryrefslogtreecommitdiffstats
path: root/yql/essentials/tests/s-expressions/suites/Aggregation
diff options
context:
space:
mode:
authorMaxim Yurchuk <maxim-yurchuk@ydb.tech>2024-11-20 17:37:57 +0000
committerGitHub <noreply@github.com>2024-11-20 17:37:57 +0000
commitf76323e9b295c15751e51e3443aa47a36bee8023 (patch)
tree4113c8cad473a33e0f746966e0cf087252fa1d7a /yql/essentials/tests/s-expressions/suites/Aggregation
parent753ecb8d410a4cb459c26f3a0082fb2d1724fe63 (diff)
parenta7b9a6afea2a9d7a7bfac4c5eb4c1a8e60adb9e6 (diff)
downloadydb-f76323e9b295c15751e51e3443aa47a36bee8023.tar.gz
Merge pull request #11788 from ydb-platform/mergelibs-241120-1113
Library import 241120-1113
Diffstat (limited to 'yql/essentials/tests/s-expressions/suites/Aggregation')
-rw-r--r--yql/essentials/tests/s-expressions/suites/Aggregation/AggrAdd.yql21
-rw-r--r--yql/essentials/tests/s-expressions/suites/Aggregation/AggrCount.yql33
-rw-r--r--yql/essentials/tests/s-expressions/suites/Aggregation/AggrMinMax.yql28
-rw-r--r--yql/essentials/tests/s-expressions/suites/Aggregation/AggregateGroubByTwoFields.yql44
-rw-r--r--yql/essentials/tests/s-expressions/suites/Aggregation/AggregateGroubByTwoFieldsUsingTuple.yql52
-rw-r--r--yql/essentials/tests/s-expressions/suites/Aggregation/AggregateGroupByOneField.yql50
-rw-r--r--yql/essentials/tests/s-expressions/suites/Aggregation/AggregateOptState.yql38
-rw-r--r--yql/essentials/tests/s-expressions/suites/Aggregation/AggregateRead.yql22
-rw-r--r--yql/essentials/tests/s-expressions/suites/Aggregation/AggregateReadDistinct.yql24
-rw-r--r--yql/essentials/tests/s-expressions/suites/Aggregation/AggregateReadDistinctUsingTuple.yql36
-rw-r--r--yql/essentials/tests/s-expressions/suites/Aggregation/AggregateReadDistinctWithParents.yql24
-rw-r--r--yql/essentials/tests/s-expressions/suites/Aggregation/AggregateReadWithParents.yql22
-rw-r--r--yql/essentials/tests/s-expressions/suites/Aggregation/AggregateSameDistinct.yql107
-rw-r--r--yql/essentials/tests/s-expressions/suites/Aggregation/AggregateWithoutGroup.yql55
-rw-r--r--yql/essentials/tests/s-expressions/suites/Aggregation/AggregateWithoutTraits.yql37
-rw-r--r--yql/essentials/tests/s-expressions/suites/Aggregation/Bug1.yql69
-rw-r--r--yql/essentials/tests/s-expressions/suites/Aggregation/Bug2.yql53
-rw-r--r--yql/essentials/tests/s-expressions/suites/Aggregation/Bug3.cfg3
-rw-r--r--yql/essentials/tests/s-expressions/suites/Aggregation/Bug3.yql40
-rw-r--r--yql/essentials/tests/s-expressions/suites/Aggregation/EmptyOptional.cfg2
-rw-r--r--yql/essentials/tests/s-expressions/suites/Aggregation/EmptyOptional.yql58
-rw-r--r--yql/essentials/tests/s-expressions/suites/Aggregation/InMemAggregate.yql35
-rw-r--r--yql/essentials/tests/s-expressions/suites/Aggregation/InMemAggregateUsingTuple.yql40
-rw-r--r--yql/essentials/tests/s-expressions/suites/Aggregation/InMemAggregateWithParents.yql35
-rw-r--r--yql/essentials/tests/s-expressions/suites/Aggregation/InMemAggregateZero.yql24
-rw-r--r--yql/essentials/tests/s-expressions/suites/Aggregation/InMemAggregateZeroOpt.yql24
-rw-r--r--yql/essentials/tests/s-expressions/suites/Aggregation/Level_4.cfg7
-rw-r--r--yql/essentials/tests/s-expressions/suites/Aggregation/Level_4.yql168
-rw-r--r--yql/essentials/tests/s-expressions/suites/Aggregation/Level_5.cfg8
-rw-r--r--yql/essentials/tests/s-expressions/suites/Aggregation/Level_5.yql254
-rw-r--r--yql/essentials/tests/s-expressions/suites/Aggregation/SameTrait.cfg5
-rw-r--r--yql/essentials/tests/s-expressions/suites/Aggregation/SameTrait.sql17
-rw-r--r--yql/essentials/tests/s-expressions/suites/Aggregation/SameTrait.yql73
-rw-r--r--yql/essentials/tests/s-expressions/suites/Aggregation/Traits.yql17
-rw-r--r--yql/essentials/tests/s-expressions/suites/Aggregation/default.cfg3
-rw-r--r--yql/essentials/tests/s-expressions/suites/Aggregation/input.txt7
-rw-r--r--yql/essentials/tests/s-expressions/suites/Aggregation/input.txt.attr30
-rw-r--r--yql/essentials/tests/s-expressions/suites/Aggregation/input1.txt8
-rw-r--r--yql/essentials/tests/s-expressions/suites/Aggregation/input1.txt.attr30
-rw-r--r--yql/essentials/tests/s-expressions/suites/Aggregation/input3.txt2
-rw-r--r--yql/essentials/tests/s-expressions/suites/Aggregation/input3.txt.attr10
-rw-r--r--yql/essentials/tests/s-expressions/suites/Aggregation/many_columns.txt10
-rw-r--r--yql/essentials/tests/s-expressions/suites/Aggregation/many_columns.txt.attr3
43 files changed, 1628 insertions, 0 deletions
diff --git a/yql/essentials/tests/s-expressions/suites/Aggregation/AggrAdd.yql b/yql/essentials/tests/s-expressions/suites/Aggregation/AggrAdd.yql
new file mode 100644
index 0000000000..ae95d15a68
--- /dev/null
+++ b/yql/essentials/tests/s-expressions/suites/Aggregation/AggrAdd.yql
@@ -0,0 +1,21 @@
+(
+#comment
+(let config (DataSource 'config))
+(let world (Configure! world config 'PureDataSource 'yt))
+
+(let res_sink (DataSink 'result))
+(let x (Int32 '34))
+(let y (Int32 '56))
+(let jx (Just x))
+(let jy (Just y))
+(let n (Nothing (TypeOf jx)))
+
+(let world (Write! world res_sink (Key) (AggrAdd x y) '('('type))))
+(let world (Write! world res_sink (Key) (AggrAdd jx jy) '('('type))))
+(let world (Write! world res_sink (Key) (AggrAdd jx n) '('('type))))
+(let world (Write! world res_sink (Key) (AggrAdd n jy) '('('type))))
+(let world (Write! world res_sink (Key) (AggrAdd n n) '('('type))))
+
+(let world (Commit! world res_sink))
+(return world)
+)
diff --git a/yql/essentials/tests/s-expressions/suites/Aggregation/AggrCount.yql b/yql/essentials/tests/s-expressions/suites/Aggregation/AggrCount.yql
new file mode 100644
index 0000000000..f0a27ab8d5
--- /dev/null
+++ b/yql/essentials/tests/s-expressions/suites/Aggregation/AggrCount.yql
@@ -0,0 +1,33 @@
+(
+#comment
+(let config (DataSource 'config))
+(let world (Configure! world config 'PureDataSource 'yt))
+
+(let res_sink (DataSink 'result))
+(let x (Int32 '34))
+(let jx (Just x))
+(let n (Nothing (TypeOf jx)))
+
+(let world (Write! world res_sink (Key) (AggrCountInit x) '('('type))))
+(let world (Write! world res_sink (Key) (AggrCountInit jx) '('('type))))
+(let world (Write! world res_sink (Key) (AggrCountInit n) '('('type))))
+
+(let world (Write! world res_sink (Key) (AggrCountUpdate x (Uint64 '10)) '('('type))))
+(let world (Write! world res_sink (Key) (AggrCountUpdate jx (Uint64 '10)) '('('type))))
+(let world (Write! world res_sink (Key) (AggrCountUpdate n (Uint64 '10)) '('('type))))
+
+(let cx (Callable (ParseType '"()->Int32") (lambda '() x)))
+(let cjx (Callable (ParseType '"()->Int32?") (lambda '() jx)))
+(let cn (Callable (ParseType '"()->Int32?") (lambda '() n)))
+
+(let world (Write! world res_sink (Key) (AggrCountInit (Apply cx)) '('('type))))
+(let world (Write! world res_sink (Key) (AggrCountInit (Apply cjx)) '('('type))))
+(let world (Write! world res_sink (Key) (AggrCountInit (Apply cn)) '('('type))))
+
+(let world (Write! world res_sink (Key) (AggrCountUpdate (Apply cx) (Uint64 '10)) '('('type))))
+(let world (Write! world res_sink (Key) (AggrCountUpdate (Apply cjx) (Uint64 '10)) '('('type))))
+(let world (Write! world res_sink (Key) (AggrCountUpdate (Apply cn) (Uint64 '10)) '('('type))))
+
+(let world (Commit! world res_sink))
+(return world)
+)
diff --git a/yql/essentials/tests/s-expressions/suites/Aggregation/AggrMinMax.yql b/yql/essentials/tests/s-expressions/suites/Aggregation/AggrMinMax.yql
new file mode 100644
index 0000000000..ab948dc649
--- /dev/null
+++ b/yql/essentials/tests/s-expressions/suites/Aggregation/AggrMinMax.yql
@@ -0,0 +1,28 @@
+(
+#comment
+(let config (DataSource 'config))
+(let world (Configure! world config 'PureDataSource 'yt))
+
+(let res_sink (DataSink 'result))
+(let x (Int32 '34))
+(let y (Int32 '56))
+(let jx (Just x))
+(let jy (Just y))
+(let n (Nothing (TypeOf jx)))
+
+(let world (Write! world res_sink (Key) (AggrMin x y) '('('type))))
+(let world (Write! world res_sink (Key) (AggrMin jx jy) '('('type))))
+(let world (Write! world res_sink (Key) (AggrMin jx n) '('('type))))
+(let world (Write! world res_sink (Key) (AggrMin n jy) '('('type))))
+(let world (Write! world res_sink (Key) (AggrMin n n) '('('type))))
+
+(let world (Write! world res_sink (Key) (AggrMax x y) '('('type))))
+(let world (Write! world res_sink (Key) (AggrMax jx jy) '('('type))))
+(let world (Write! world res_sink (Key) (AggrMax jx n) '('('type))))
+(let world (Write! world res_sink (Key) (AggrMax n jy) '('('type))))
+(let world (Write! world res_sink (Key) (AggrMax n n) '('('type))))
+
+
+(let world (Commit! world res_sink))
+(return world)
+)
diff --git a/yql/essentials/tests/s-expressions/suites/Aggregation/AggregateGroubByTwoFields.yql b/yql/essentials/tests/s-expressions/suites/Aggregation/AggregateGroubByTwoFields.yql
new file mode 100644
index 0000000000..e9565555ef
--- /dev/null
+++ b/yql/essentials/tests/s-expressions/suites/Aggregation/AggregateGroubByTwoFields.yql
@@ -0,0 +1,44 @@
+(
+(let world (block '(
+ (let x (Read! world (DataSource '"yt" '"plato") (Key '('table (String '"Input"))) '('"key" '"subkey" '"value") '()))
+ (let world (Left! x))
+ (let table1 (Right! x))
+ (let output (block '(
+ (let select (block '(
+ (let core table1)
+ (let core (block '(
+ (let Count0_create (lambda '(row) (Convert (Exists row) 'Uint64)))
+ (let Count0_update (lambda '(row state) (OptionalReduce state (Convert (Exists row) 'Uint64) (lambda '(a b) (+ a b)))))
+ (let Count0_save (lambda '(state) state))
+ (let Count0_load (lambda '(item) item))
+ (let Count0_merge (lambda '(a b) (OptionalReduce a b (lambda '(a b) (+ a b)))))
+ (let Count0_finish (lambda '(state) state))
+ (return (Aggregate core '('"key" '"subkey") '('('Count0 (AggregationTraits (StructMemberType (ListItemType (TypeOf core)) '"value") Count0_create Count0_update Count0_save Count0_load Count0_merge Count0_finish (Uint64 '0)) '"value"))))
+ )))
+ (let core (FlatMap core (lambda '(row) (block '(
+ (let res (Struct))
+ (let res (AddMember res '"key" (Member row '"key")))
+ (let res (AddMember res '"subkey" (Member row '"subkey")))
+ (let res (AddMember res '"column2" (Member row 'Count0)))
+ (let res (AsList res))
+ (return res)
+ )))))
+ (return core)
+ )))
+ (let select (Sort select '((Bool 'true) (Bool 'true)) (lambda '(row) '((Member row '"key") (Member row '"subkey")))))
+ (return select)
+ )))
+ (let world (block '(
+ (let result_sink (DataSink 'result))
+ (let world (Write! world result_sink (Key) output '('('type) '('autoref) '('columns '('"key" '"subkey" '"column2")))))
+ (return (Commit! world result_sink))
+ )))
+ (return world)
+)))
+(let world (block '(
+ (let plato_sink (DataSink '"yt" '"plato"))
+ (let world (Commit! world plato_sink))
+ (return world)
+)))
+(return world)
+)
diff --git a/yql/essentials/tests/s-expressions/suites/Aggregation/AggregateGroubByTwoFieldsUsingTuple.yql b/yql/essentials/tests/s-expressions/suites/Aggregation/AggregateGroubByTwoFieldsUsingTuple.yql
new file mode 100644
index 0000000000..502366d6d5
--- /dev/null
+++ b/yql/essentials/tests/s-expressions/suites/Aggregation/AggregateGroubByTwoFieldsUsingTuple.yql
@@ -0,0 +1,52 @@
+(
+(let world (block '(
+ (let x (Read! world (DataSource '"yt" '"plato") (Key '('table (String '"Input"))) '('"key" '"subkey" '"value") '()))
+ (let world (Left! x))
+ (let table1 (Right! x))
+ (let table1 (FlatMap table1 (lambda '(row) (block '(
+ (let res (Struct))
+ (let res (AddMember res '"key" '((Member row '"key") (Member row '"key"))))
+ (let res (AddMember res '"subkey" (Member row '"subkey")))
+ (let res (AddMember res '"value" (Member row 'value)))
+ (let res (AsList res))
+ (return res)
+ )))))
+ (let output (block '(
+ (let select (block '(
+ (let core table1)
+ (let core (block '(
+ (let Count0_create (lambda '(row) (Convert (Exists row) 'Uint64)))
+ (let Count0_update (lambda '(row state) (OptionalReduce state (Convert (Exists row) 'Uint64) (lambda '(a b) (+ a b)))))
+ (let Count0_save (lambda '(state) state))
+ (let Count0_load (lambda '(item) item))
+ (let Count0_merge (lambda '(a b) (OptionalReduce a b (lambda '(a b) (+ a b)))))
+ (let Count0_finish (lambda '(state) state))
+ (return (Aggregate core '('"key" '"subkey") '('('Count0 (AggregationTraits (StructMemberType (ListItemType (TypeOf core)) '"value") Count0_create Count0_update Count0_save Count0_load Count0_merge Count0_finish (Uint64 '0)) '"value"))))
+ )))
+ (let core (FlatMap core (lambda '(row) (block '(
+ (let res (Struct))
+ (let res (AddMember res '"key" (Member row '"key")))
+ (let res (AddMember res '"subkey" (Member row '"subkey")))
+ (let res (AddMember res '"column2" (Member row 'Count0)))
+ (let res (AsList res))
+ (return res)
+ )))))
+ (return core)
+ )))
+ (let select (Sort select '((Bool 'true) (Bool 'true) (Bool 'true)) (lambda '(row) '((Nth (Member row '"key") '0) (Nth (Member row '"key") '1) (Member row '"subkey")))))
+ (return select)
+ )))
+ (let world (block '(
+ (let result_sink (DataSink 'result))
+ (let world (Write! world result_sink (Key) output '('('type) '('autoref) '('columns '('"key" '"subkey" '"column2")))))
+ (return (Commit! world result_sink))
+ )))
+ (return world)
+)))
+(let world (block '(
+ (let plato_sink (DataSink '"yt" '"plato"))
+ (let world (Commit! world plato_sink))
+ (return world)
+)))
+(return world)
+)
diff --git a/yql/essentials/tests/s-expressions/suites/Aggregation/AggregateGroupByOneField.yql b/yql/essentials/tests/s-expressions/suites/Aggregation/AggregateGroupByOneField.yql
new file mode 100644
index 0000000000..b309a99b1c
--- /dev/null
+++ b/yql/essentials/tests/s-expressions/suites/Aggregation/AggregateGroupByOneField.yql
@@ -0,0 +1,50 @@
+(
+(let world (block '(
+ (let x (Read! world (DataSource '"yt" '"plato") (Key '('table (String '"Input"))) '('"key" '"subkey" '"value") '()))
+ (let world (Left! x))
+ (let table2 (Right! x))
+ (let output (block '(
+ (let select (block '(
+ (let core table2)
+ (let core (block '(
+ (let Sum0_create (lambda '(row) ("Coalesce" (Cast (Member row '"subkey") 'Uint64) (Int64 '"0"))))
+ (let Sum0_update (lambda '(row state) (OptionalReduce state ("Coalesce" (Cast (Member row '"subkey") 'Uint64) (Int64 '"0")) (lambda '(a b) (+ a b)))))
+ (let Sum0_save (lambda '(state) state))
+ (let Sum0_load (lambda '(item) item))
+ (let Sum0_merge (lambda '(a b) (OptionalReduce a b (lambda '(a b) (+ a b)))))
+ (let Sum0_finish (lambda '(state) state))
+ (let Count1_create (lambda '(row) (Convert (Exists row) 'Uint64)))
+ (let Count1_update (lambda '(row state) (OptionalReduce state (Convert (Exists row) 'Uint64) (lambda '(a b) (+ a b)))))
+ (let Count1_save (lambda '(state) state))
+ (let Count1_load (lambda '(item) item))
+ (let Count1_merge (lambda '(a b) (OptionalReduce a b (lambda '(a b) (+ a b)))))
+ (let Count1_finish (lambda '(state) state))
+ (return (Aggregate core '('"key") '('('Sum0 (AggregationTraits (ListItemType (TypeOf core)) Sum0_create Sum0_update Sum0_save Sum0_load Sum0_merge Sum0_finish (Null))) '('Count1 (AggregationTraits (StructMemberType (ListItemType (TypeOf core)) '"value") Count1_create Count1_update Count1_save Count1_load Count1_merge Count1_finish (Uint64 '0)) '"value"))))
+ )))
+ (let core (FlatMap core (lambda '(row) (block '(
+ (let res (Struct))
+ (let res (AddMember res '"key" (Member row '"key")))
+ (let res (AddMember res '"column1" (Member row 'Sum0)))
+ (let res (AddMember res '"column2" (Member row 'Count1)))
+ (let res (AsList res))
+ (return res)
+ )))))
+ (return core)
+ )))
+ (let select (Sort select (Bool 'true) (lambda '(row) (Member row '"key"))))
+ (return select)
+ )))
+ (let world (block '(
+ (let result_sink (DataSink 'result))
+ (let world (Write! world result_sink (Key) output '('('type) '('autoref) '('columns '('"key" '"column1" '"column2")))))
+ (return (Commit! world result_sink))
+ )))
+ (return world)
+)))
+(let world (block '(
+ (let plato_sink (DataSink '"yt" '"plato"))
+ (let world (Commit! world plato_sink))
+ (return world)
+)))
+(return world)
+)
diff --git a/yql/essentials/tests/s-expressions/suites/Aggregation/AggregateOptState.yql b/yql/essentials/tests/s-expressions/suites/Aggregation/AggregateOptState.yql
new file mode 100644
index 0000000000..bb96925785
--- /dev/null
+++ b/yql/essentials/tests/s-expressions/suites/Aggregation/AggregateOptState.yql
@@ -0,0 +1,38 @@
+(
+(let world (block '(
+ (let x (Read! world (DataSource '"yt" '"plato") (Key '('table (String '"Input"))) '('"value" '"key" '"subkey") '()))
+ (let world (Left! x))
+ (let table2 (Right! x))
+ (let output (block '(
+ (let select (block '(
+ (let core table2)
+ (let core (block '(
+ (let Sum1_create (lambda '(row) (FromString (Member row '"subkey") '"Int64")))
+ (let Sum1_update (lambda '(row state) (OptionalReduce state (FromString (Member row '"subkey") '"Int64") (lambda '(a b) (+ a b)))))
+ (let Sum1_save (lambda '(state) state))
+ (let Sum1_load (lambda '(item) item))
+ (let Sum1_merge (lambda '(a b) (OptionalReduce a b (lambda '(a b) (+ a b)))))
+ (let Sum1_finish (lambda '(state) state))
+ (return (Aggregate core '('"value") '('('"Sum1" (AggregationTraits (ListItemType (TypeOf core)) Sum1_create Sum1_update Sum1_save Sum1_load Sum1_merge Sum1_finish (Null))))))
+)
+))
+ (return core)
+)
+))
+ (return select)
+)
+))
+ (let world (block '(
+ (let result_sink (DataSink 'result))
+ (let sortedOutput (Sort output (Bool 'false) (lambda '(x) (Member x 'value) )))
+ (let world (Write! world result_sink (Key) sortedOutput '('('type) '('autoref))))
+ (let plato_sink (DataSink '"yt" '"plato"))
+ (let world (Commit! world plato_sink))
+ (return (Commit! world result_sink))
+)
+))
+ (return world)
+)
+))
+(return world)
+)
diff --git a/yql/essentials/tests/s-expressions/suites/Aggregation/AggregateRead.yql b/yql/essentials/tests/s-expressions/suites/Aggregation/AggregateRead.yql
new file mode 100644
index 0000000000..70d8879fc0
--- /dev/null
+++ b/yql/essentials/tests/s-expressions/suites/Aggregation/AggregateRead.yql
@@ -0,0 +1,22 @@
+(
+#comment
+(let res_sink (DataSink 'result))
+(let mr_source (DataSource 'yt 'plato))
+(let x (Read! world mr_source (Key '('table (String 'Input))) '('key 'subkey 'value) '()))
+(let world (Left! x))
+(let table1 (Right! x))
+(let init (lambda '(x) (StrictFromString (Member x 'value) 'Uint32)))
+(let update_min (lambda '(x y) (Min (StrictFromString (Member x 'value) 'Uint32) y)))
+(let save (lambda '(x) x))
+(let load (lambda '(x) x))
+(let merge_min (lambda '(x y) (Min x y)))
+(let finish (lambda '(x) x))
+(let min (AggregationTraits (ListItemType (TypeOf table1)) init update_min save load merge_min finish (Null)))
+(let resAll (Aggregate table1 '() '('('minvalue min))))
+(let world (Write! world res_sink (Key) resAll '('('type))))
+(let resKey (Aggregate table1 '('key) '('('minvalue min))))
+(let sortedOutput (Sort resKey (Bool 'false) (lambda '(x) (Member x 'key) )))
+(let world (Write! world res_sink (Key) sortedOutput '('('type))))
+(let world (Commit! world res_sink))
+(return world)
+)
diff --git a/yql/essentials/tests/s-expressions/suites/Aggregation/AggregateReadDistinct.yql b/yql/essentials/tests/s-expressions/suites/Aggregation/AggregateReadDistinct.yql
new file mode 100644
index 0000000000..94034d32af
--- /dev/null
+++ b/yql/essentials/tests/s-expressions/suites/Aggregation/AggregateReadDistinct.yql
@@ -0,0 +1,24 @@
+(
+#comment
+(let res_sink (DataSink 'result))
+(let mr_source (DataSource 'yt 'plato))
+(let x (Read! world mr_source (Key '('table (String 'Input))) '('key 'subkey 'value) '()))
+(let world (Left! x))
+(let table1 (Right! x))
+(let init (lambda '(x) (StrictFromString (Member x 'value) 'Uint32)))
+(let init_distinct (lambda '(x) (StrictFromString x 'Uint32)))
+(let update_sum_distinct (lambda '(x y) (+ (StrictFromString x 'Uint32) y)))
+(let save (lambda '(x) x))
+(let load (lambda '(x) x))
+(let merge_sum (lambda '(x y) (+ x y)))
+(let finish (lambda '(x) x))
+# distinct process one column and requires data/data? type
+(let sum (AggregationTraits (StructMemberType (ListItemType (TypeOf table1)) 'value) init_distinct update_sum_distinct save load merge_sum finish (Null)))
+(let resAll (Aggregate table1 '() '('('distsum sum 'value))))
+(let world (Write! world res_sink (Key) resAll '('('type))))
+(let resKey (Aggregate table1 '('key) '('('distsum sum 'value))))
+(let sortedOutput (Sort resKey (Bool 'false) (lambda '(x) (Member x 'key) )))
+(let world (Write! world res_sink (Key) sortedOutput '('('type))))
+(let world (Commit! world res_sink))
+(return world)
+)
diff --git a/yql/essentials/tests/s-expressions/suites/Aggregation/AggregateReadDistinctUsingTuple.yql b/yql/essentials/tests/s-expressions/suites/Aggregation/AggregateReadDistinctUsingTuple.yql
new file mode 100644
index 0000000000..d4f0adca76
--- /dev/null
+++ b/yql/essentials/tests/s-expressions/suites/Aggregation/AggregateReadDistinctUsingTuple.yql
@@ -0,0 +1,36 @@
+(
+#comment
+(let res_sink (DataSink 'result))
+(let mr_source (DataSource 'yt 'plato))
+(let x (Read! world mr_source (Key '('table (String 'Input))) '('key 'subkey 'value) '()))
+(let world (Left! x))
+(let table1 (Right! x))
+(let table1 (FlatMap table1 (lambda '(row) (block '(
+ (let res (Struct))
+ (let res (AddMember res '"key" '((Member row '"key") (Member row '"key"))))
+ (let res (AddMember res '"subkey" (Member row '"subkey")))
+ (let res (AddMember res '"value" (Member row 'value)))
+ (let res (AsList res))
+ (return res)
+)))))
+
+(let init (lambda '(x) (StrictFromString (Member x 'value) 'Uint32)))
+(let init_distinct (lambda '(x) (StrictFromString x 'Uint32)))
+(let update_sum_distinct (lambda '(x y) (+ (StrictFromString x 'Uint32) y)))
+(let save (lambda '(x) x))
+(let load (lambda '(x) x))
+(let merge_sum (lambda '(x y) (+ x y)))
+(let finish (lambda '(x) x))
+# distinct process one column and requires data/data? type
+(let sum (AggregationTraits (StructMemberType (ListItemType (TypeOf table1)) 'value) init_distinct update_sum_distinct save load merge_sum finish (Null)))
+(let resAll (Aggregate table1 '() '('('distsum sum 'value))))
+(let world (Write! world res_sink (Key) resAll '('('type))))
+(let resKey (Aggregate table1 '('key) '('('distsum sum 'value))))
+(let sortedOutput (Sort resKey '((Bool 'false) (Bool 'false)) (lambda '(x) (Member x 'key) )))
+(let world (Write! world res_sink (Key) sortedOutput '('('type))))
+(let resKey (Aggregate table1 '('key 'subkey) '('('distsum sum 'value))))
+(let sortedOutput (Sort resKey '((Bool 'false) (Bool 'false) (Bool 'false)) (lambda '(x) '((Nth (Member x 'key) '0) (Nth (Member x 'key) '1) (Member x 'subkey)) )))
+(let world (Write! world res_sink (Key) sortedOutput '('('type))))
+(let world (Commit! world res_sink))
+(return world)
+)
diff --git a/yql/essentials/tests/s-expressions/suites/Aggregation/AggregateReadDistinctWithParents.yql b/yql/essentials/tests/s-expressions/suites/Aggregation/AggregateReadDistinctWithParents.yql
new file mode 100644
index 0000000000..27c4faf647
--- /dev/null
+++ b/yql/essentials/tests/s-expressions/suites/Aggregation/AggregateReadDistinctWithParents.yql
@@ -0,0 +1,24 @@
+(
+#comment
+(let res_sink (DataSink 'result))
+(let mr_source (DataSource 'yt 'plato))
+(let x (Read! world mr_source (Key '('table (String 'Input))) '('key 'subkey 'value) '()))
+(let world (Left! x))
+(let table1 (Right! x))
+(let init (lambda '(x parent) '(parent (StrictFromString (Member x 'value) 'Uint32))))
+(let init_distinct (lambda '(x parent) '(parent (StrictFromString x 'Uint32))))
+(let update_sum_distinct (lambda '(x y parent) '(parent (+ (StrictFromString x 'Uint32) (Nth y '1)))))
+(let save (lambda '(x) x))
+(let load (lambda '(x) x))
+(let merge_sum (lambda '(x y) '((Nth x '0) (AggrAdd (Nth x '1) (Nth y '1)))))
+(let finish (lambda '(x) (Nth x '1)))
+# distinct process one column and requires data/data? type
+(let sum (AggregationTraits (StructMemberType (ListItemType (TypeOf table1)) 'value) init_distinct update_sum_distinct save load merge_sum finish (Null)))
+(let resAll (Aggregate table1 '() '('('distsum sum 'value))))
+(let world (Write! world res_sink (Key) resAll '('('type))))
+(let resKey (Aggregate table1 '('key) '('('distsum sum 'value))))
+(let sortedOutput (Sort resKey (Bool 'false) (lambda '(x) (Member x 'key) )))
+(let world (Write! world res_sink (Key) sortedOutput '('('type))))
+(let world (Commit! world res_sink))
+(return world)
+)
diff --git a/yql/essentials/tests/s-expressions/suites/Aggregation/AggregateReadWithParents.yql b/yql/essentials/tests/s-expressions/suites/Aggregation/AggregateReadWithParents.yql
new file mode 100644
index 0000000000..b0b2f6a084
--- /dev/null
+++ b/yql/essentials/tests/s-expressions/suites/Aggregation/AggregateReadWithParents.yql
@@ -0,0 +1,22 @@
+(
+#comment
+(let res_sink (DataSink 'result))
+(let mr_source (DataSource 'yt 'plato))
+(let x (Read! world mr_source (Key '('table (String 'Input))) '('key 'subkey 'value) '()))
+(let world (Left! x))
+(let table1 (Right! x))
+(let init (lambda '(x parent) '(parent (StrictFromString (Member x 'value) 'Uint32))))
+(let update_min (lambda '(x y parent) '(parent (AggrMin (StrictFromString (Member x 'value) 'Uint32) (Nth y '1)))))
+(let save (lambda '(x) x))
+(let load (lambda '(x) x))
+(let merge_min (lambda '(x y) (AggrMin x y)))
+(let finish (lambda '(x) (Nth x '1)))
+(let min (AggregationTraits (ListItemType (TypeOf table1)) init update_min save load merge_min finish (Null)))
+(let resAll (Aggregate table1 '() '('('minvalue min))))
+(let world (Write! world res_sink (Key) resAll '('('type))))
+(let resKey (Aggregate table1 '('key) '('('minvalue min))))
+(let sortedOutput (Sort resKey (Bool 'false) (lambda '(x) (Member x 'key) )))
+(let world (Write! world res_sink (Key) sortedOutput '('('type))))
+(let world (Commit! world res_sink))
+(return world)
+)
diff --git a/yql/essentials/tests/s-expressions/suites/Aggregation/AggregateSameDistinct.yql b/yql/essentials/tests/s-expressions/suites/Aggregation/AggregateSameDistinct.yql
new file mode 100644
index 0000000000..9f4fb01757
--- /dev/null
+++ b/yql/essentials/tests/s-expressions/suites/Aggregation/AggregateSameDistinct.yql
@@ -0,0 +1,107 @@
+(
+(let $1 world)
+(let $2 (DataSource '"yt" '"plato"))
+(let $3 (String '"Input"))
+(let $4 '('table $3))
+(let $5 (Key $4))
+(let $6 '('"key"))
+(let $7 '())
+(let $8 (Read! $1 $2 $5 $6 $7))
+(let $9 (Left! $8))
+(let $10 (DataSink 'result))
+(let $11 (Key))
+(let $12 (Right! $8))
+(let $13 '())
+(let $14 (TypeOf $12))
+(let $15 (ListItemType $14))
+(let $16 (StructMemberType $15 '"key"))
+(let $17 (lambda '($49) (block '(
+ (let $51 (Exists $49))
+ (let $52 (Convert $51 'Uint64))
+ (return $52)
+))))
+(let $18 (lambda '($53 $54) (block '(
+ (let $56 (Exists $53))
+ (let $57 (Convert $56 'Uint64))
+ (let $58 (lambda '($60 $61) (block '(
+ (let $63 (+ $60 $61))
+ (return $63)
+ ))))
+ (let $59 (OptionalReduce $54 $57 $58))
+ (return $59)
+))))
+(let $19 (lambda '($64) (block '(
+ (return $64)
+))))
+(let $20 (lambda '($66) (block '(
+ (return $66)
+))))
+(let $21 (lambda '($68 $69) (block '(
+ (let $71 (lambda '($73 $74) (block '(
+ (let $76 (+ $73 $74))
+ (return $76)
+ ))))
+ (let $72 (OptionalReduce $68 $69 $71))
+ (return $72)
+))))
+(let $22 (lambda '($77) (block '(
+ (return $77)
+))))
+(let $23 (AggregationTraits $16 $17 $18 $19 $20 $21 $22 (Null)))
+(let $24 '('Count0 $23 '"key"))
+(let $25 (TypeOf $12))
+(let $26 (ListItemType $25))
+(let $27 (StructMemberType $26 '"key"))
+(let $28 (lambda '($79) (block '(
+ (return $79)
+))))
+(let $29 (lambda '($81 $82) (block '(
+ (let $84 (lambda '($86 $87) (block '(
+ (let $89 (Min $86 $87))
+ (return $89)
+ ))))
+ (let $85 (OptionalReduce $82 $81 $84))
+ (return $85)
+))))
+(let $30 (lambda '($90) (block '(
+ (return $90)
+))))
+(let $31 (lambda '($92) (block '(
+ (return $92)
+))))
+(let $32 (lambda '($94 $95) (block '(
+ (let $97 (lambda '($99 $100) (block '(
+ (let $102 (Min $99 $100))
+ (return $102)
+ ))))
+ (let $98 (OptionalReduce $94 $95 $97))
+ (return $98)
+))))
+(let $33 (lambda '($103) (block '(
+ (return $103)
+))))
+(let $34 (AggregationTraits $27 $28 $29 $30 $31 $32 $33 (Null)))
+(let $35 '('Min1 $34 '"key"))
+(let $36 '($24 $35))
+(let $37 (Aggregate $12 $13 $36))
+(let $38 (lambda '($105) (block '(
+ (let $107 (Struct))
+ (let $108 (Member $105 'Count0))
+ (let $109 (AddMember $107 '"column0" $108))
+ (let $110 (Member $105 'Min1))
+ (let $111 (AddMember $109 '"column1" $110))
+ (let $112 (AsList $111))
+ (return $112)
+))))
+(let $39 (FlatMap $37 $38))
+(let $40 '('type))
+(let $41 '('autoref))
+(let $42 '('"column0" '"column1"))
+(let $43 '('columns $42))
+(let $44 '($40 $41 $43))
+(let $45 (Write! $9 $10 $11 $39 $44))
+(let $46 (Commit! $45 $10))
+(let $47 (DataSink '"yt" '"plato"))
+(let $48 (Commit! $46 $47))
+(return $48)
+)
diff --git a/yql/essentials/tests/s-expressions/suites/Aggregation/AggregateWithoutGroup.yql b/yql/essentials/tests/s-expressions/suites/Aggregation/AggregateWithoutGroup.yql
new file mode 100644
index 0000000000..472d0234d1
--- /dev/null
+++ b/yql/essentials/tests/s-expressions/suites/Aggregation/AggregateWithoutGroup.yql
@@ -0,0 +1,55 @@
+(
+(let world (block '(
+ (let x (Read! world (DataSource '"yt" '"plato") (Key '('table (String '"Input"))) '('"key" '"subkey" '"value") '()))
+ (let world (Left! x))
+ (let table3 (Right! x))
+ (let output (block '(
+ (let select (block '(
+ (let core table3)
+ (let core (block '(
+ (let Sum0_create (lambda '(row) ("Coalesce" (Cast (Member row '"key") 'Uint64) (Int64 '"0"))))
+ (let Sum0_update (lambda '(row state) (OptionalReduce state ("Coalesce" (Cast (Member row '"key") 'Uint64) (Int64 '"0")) (lambda '(a b) (+ a b)))))
+ (let Sum0_save (lambda '(state) state))
+ (let Sum0_load (lambda '(item) item))
+ (let Sum0_merge (lambda '(a b) (OptionalReduce a b (lambda '(a b) (+ a b)))))
+ (let Sum0_finish (lambda '(state) state))
+ (let Count1_create (lambda '(row) (Convert (Exists row) 'Uint64)))
+ (let Count1_update (lambda '(row state) (OptionalReduce state (Convert (Exists row) 'Uint64) (lambda '(a b) (+ a b)))))
+ (let Count1_save (lambda '(state) state))
+ (let Count1_load (lambda '(item) item))
+ (let Count1_merge (lambda '(a b) (OptionalReduce a b (lambda '(a b) (+ a b)))))
+ (let Count1_finish (lambda '(state) state))
+ (let Count2_create (lambda '(row) (Convert (Exists row) 'Uint64)))
+ (let Count2_update (lambda '(row state) (OptionalReduce state (Convert (Exists row) 'Uint64) (lambda '(a b) (+ a b)))))
+ (let Count2_save (lambda '(state) state))
+ (let Count2_load (lambda '(item) item))
+ (let Count2_merge (lambda '(a b) (OptionalReduce a b (lambda '(a b) (+ a b)))))
+ (let Count2_finish (lambda '(state) state))
+ (return (Aggregate core '() '('('Sum0 (AggregationTraits (ListItemType (TypeOf core)) Sum0_create Sum0_update Sum0_save Sum0_load Sum0_merge Sum0_finish (Null))) '('Count1 (AggregationTraits (StructMemberType (ListItemType (TypeOf core)) '"subkey") Count1_create Count1_update Count1_save Count1_load Count1_merge Count1_finish (Uint64 '0)) '"subkey") '('Count2 (AggregationTraits (StructMemberType (ListItemType (TypeOf core)) '"value") Count2_create Count2_update Count2_save Count2_load Count2_merge Count2_finish (Uint64 '0)) '"value"))))
+ )))
+ (let core (FlatMap core (lambda '(row) (block '(
+ (let res (Struct))
+ (let res (AddMember res '"column0" (Member row 'Sum0)))
+ (let res (AddMember res '"column1" (Member row 'Count1)))
+ (let res (AddMember res '"column2" (Member row 'Count2)))
+ (let res (AsList res))
+ (return res)
+ )))))
+ (return core)
+ )))
+ (return select)
+ )))
+ (let world (block '(
+ (let result_sink (DataSink 'result))
+ (let world (Write! world result_sink (Key) output '('('type) '('autoref) '('columns '('"column0" '"column1" '"column2")))))
+ (return (Commit! world result_sink))
+ )))
+ (return world)
+)))
+(let world (block '(
+ (let plato_sink (DataSink '"yt" '"plato"))
+ (let world (Commit! world plato_sink))
+ (return world)
+)))
+(return world)
+)
diff --git a/yql/essentials/tests/s-expressions/suites/Aggregation/AggregateWithoutTraits.yql b/yql/essentials/tests/s-expressions/suites/Aggregation/AggregateWithoutTraits.yql
new file mode 100644
index 0000000000..59dd4cabc3
--- /dev/null
+++ b/yql/essentials/tests/s-expressions/suites/Aggregation/AggregateWithoutTraits.yql
@@ -0,0 +1,37 @@
+(
+(let world (block '(
+ (let x (Read! world (DataSource '"yt" '"plato") (Key '('table (String '"Input"))) '('"key" '"subkey" '"value") '()))
+ (let world (Left! x))
+ (let input (Right! x))
+ (let output (block '(
+ (let select (block '(
+ (let core input)
+ (let core (block '(
+ (return (Aggregate core '('key 'value) '()))
+ )))
+ (let core (FlatMap core (lambda '(row) (block '(
+ (let res (Struct))
+ (let res (AddMember res '"key" (Member row 'key)))
+ (let res (AddMember res '"value" (Member row 'value)))
+ (let res (AsList res))
+ (return res)
+ )))))
+ (let core (Sort core '((Bool 'true) (Bool 'true)) (lambda '(x) '((Member x 'key) (Member x 'value)) )))
+ (return core)
+ )))
+ (return select)
+ )))
+ (let world (block '(
+ (let result_sink (DataSink 'result))
+ (let world (Write! world result_sink (Key) output '('('type))))
+ (return (Commit! world result_sink))
+ )))
+ (return world)
+)))
+(let world (block '(
+ (let plato_sink (DataSink '"yt" '"plato"))
+ (let world (Commit! world plato_sink))
+ (return world)
+)))
+(return world)
+)
diff --git a/yql/essentials/tests/s-expressions/suites/Aggregation/Bug1.yql b/yql/essentials/tests/s-expressions/suites/Aggregation/Bug1.yql
new file mode 100644
index 0000000000..0a2a83b345
--- /dev/null
+++ b/yql/essentials/tests/s-expressions/suites/Aggregation/Bug1.yql
@@ -0,0 +1,69 @@
+(
+(let world (block '(
+ (let x (Read! world (DataSource '"yt" '"plato") (Key '('table (String '"Input"))) '('"key" '"value") '()))
+ (let world (Left! x))
+ (let table0 (Right! x))
+ (let output (block '(
+ (let select (block '(
+ (let core (block '(
+ (let select (block '(
+ (let core table0)
+ (let core (FlatMap core (lambda '(row) (block '(
+ (let res (Struct))
+ (let res (AddMember res '"key" (Member row '"key")))
+ (let res (AddMember res '"value1" (MatchType (Member row '"value") 'Optional (lambda '(item) (Coalesce (Map item (lambda '(val) (IfType val (DataType 'String) (lambda '(item) (FromString item '"Int64")) (lambda '(item) (Just (Convert item '"Int64")))))) (Nothing (OptionalType (DataType '"Int64"))))) (lambda '(item) (IfType item (DataType 'String) (lambda '(item) (FromString item '"Int64")) (lambda '(item) (Convert item '"Int64")))))))
+ (let res (AsList res))
+ (return res)
+)
+))))
+ (return core)
+)
+))
+ (return select)
+)
+))
+ (let core (block '(
+ (let Max1_create (lambda '(row) (Member row '"value1")))
+ (let Max1_update (lambda '(row state) (OptionalReduce state (Member row '"value1") (lambda '(a b) (Max a b)))))
+ (let Max1_save (lambda '(state) state))
+ (let Max1_load (lambda '(item) item))
+ (let Max1_merge (lambda '(a b) (OptionalReduce a b (lambda '(a b) (Max a b)))))
+ (let Max1_finish (lambda '(state) state))
+ (let Count2_create (lambda '(row) (Convert (Exists row) 'Uint64)))
+ (let Count2_update (lambda '(row state) (OptionalReduce state (Convert (Exists row) 'Uint64) (lambda '(a b) (+ a b)))))
+ (let Count2_save (lambda '(state) state))
+ (let Count2_load (lambda '(item) item))
+ (let Count2_merge (lambda '(a b) (OptionalReduce a b (lambda '(a b) (+ a b)))))
+ (let Count2_finish (lambda '(state) state))
+ (return (Aggregate core '('"key") '('('Max1 (AggregationTraits (ListItemType (TypeOf core)) Max1_create Max1_update Max1_save Max1_load Max1_merge Max1_finish (Null))) '('Count2 (AggregationTraits (StructMemberType (ListItemType (TypeOf core)) '"value1") Count2_create Count2_update Count2_save Count2_load Count2_merge Count2_finish (Uint64 '0)) '"value1"))))
+)
+))
+ (let core (FlatMap core (lambda '(row) (block '(
+ (let res (Struct))
+ (let res (AddMember res '"column0" (Member row 'Max1)))
+ (let res (AddMember res '"key" (Member row '"key")))
+ (let res (AddMember res '"column2" (Member row 'Count2)))
+ (let res (AsList res))
+ (return res)
+)
+))))
+ (return core)
+)
+))
+ (return select)
+)
+))
+ (let world (block '(
+ (let result_sink (DataSink 'result))
+ (let sortedOutput (Sort output (Bool 'false) (lambda '(x) (Member x 'key) )))
+ (let world (Write! world result_sink (Key) sortedOutput '('('type) '('autoref) '('columns '('"column0" '"key" '"column2")))))
+ (let plato_sink (DataSink '"yt" '"plato"))
+ (let world (Commit! world plato_sink))
+ (return (Commit! world result_sink))
+)
+))
+ (return world)
+)
+))
+(return world)
+)
diff --git a/yql/essentials/tests/s-expressions/suites/Aggregation/Bug2.yql b/yql/essentials/tests/s-expressions/suites/Aggregation/Bug2.yql
new file mode 100644
index 0000000000..1558d5a3a9
--- /dev/null
+++ b/yql/essentials/tests/s-expressions/suites/Aggregation/Bug2.yql
@@ -0,0 +1,53 @@
+(
+(let world (block '(
+ (let x (Read! world (DataSource '"yt" '"plato") (Key '('table (String '"Input"))) '('"key" '"value") '()))
+ (let world (Left! x))
+ (let table2 (Right! x))
+ (let output (block '(
+ (let select (block '(
+ (let core table2)
+ (let core (block '(
+ (let Max0_create (lambda '(row) (Member row '"value")))
+ (let Max0_update (lambda '(row state) (OptionalReduce state (Member row '"value") (lambda '(a b) (Max a b)))))
+ (let Max0_save (lambda '(state) state))
+ (let Max0_load (lambda '(item) item))
+ (let Max0_merge (lambda '(a b) (OptionalReduce a b (lambda '(a b) (Max a b)))))
+ (let Max0_finish (lambda '(state) state))
+ (let Count1_create (lambda '(row) (Convert (Exists row) 'Uint64)))
+ (let Count1_update (lambda '(row state) (OptionalReduce state (Convert (Exists row) 'Uint64) (lambda '(a b) (+ a b)))))
+ (let Count1_save (lambda '(state) state))
+ (let Count1_load (lambda '(item) item))
+ (let Count1_merge (lambda '(a b) (OptionalReduce a b (lambda '(a b) (+ a b)))))
+ (let Count1_finish (lambda '(state) state))
+ (return (Aggregate core '('"key") '('('Max0 (AggregationTraits (ListItemType (TypeOf core)) Max0_create Max0_update Max0_save Max0_load Max0_merge Max0_finish (Null))) '('Count1 (AggregationTraits (StructMemberType (ListItemType (TypeOf core)) '"value") Count1_create Count1_update Count1_save Count1_load Count1_merge Count1_finish (Uint64 '0)) '"value"))))
+)
+))
+ (let core (FlatMap core (lambda '(row) (block '(
+ (let res (Struct))
+ (let res (AddMember res '"column0" (Member row 'Max0)))
+ (let res (AddMember res '"key" (Member row '"key")))
+ (let res (AddMember res '"column2" (Member row 'Count1)))
+ (let res (AsList res))
+ (return res)
+)
+))))
+ (return core)
+)
+))
+ (return select)
+)
+))
+ (let world (block '(
+ (let result_sink (DataSink 'result))
+ (let sortedOutput (Sort output (Bool 'false) (lambda '(x) (Member x 'key) )))
+ (let world (Write! world result_sink (Key) sortedOutput '('('type) '('autoref) '('columns '('"column0" '"key" '"column2")))))
+ (let plato_sink (DataSink '"yt" '"plato"))
+ (let world (Commit! world plato_sink))
+ (return (Commit! world result_sink))
+)
+))
+ (return world)
+)
+))
+(return world)
+)
diff --git a/yql/essentials/tests/s-expressions/suites/Aggregation/Bug3.cfg b/yql/essentials/tests/s-expressions/suites/Aggregation/Bug3.cfg
new file mode 100644
index 0000000000..2c8f4d3baa
--- /dev/null
+++ b/yql/essentials/tests/s-expressions/suites/Aggregation/Bug3.cfg
@@ -0,0 +1,3 @@
+in Input input3.txt
+res result.txt
+udf datetime2_udf
diff --git a/yql/essentials/tests/s-expressions/suites/Aggregation/Bug3.yql b/yql/essentials/tests/s-expressions/suites/Aggregation/Bug3.yql
new file mode 100644
index 0000000000..61e56c5a0f
--- /dev/null
+++ b/yql/essentials/tests/s-expressions/suites/Aggregation/Bug3.yql
@@ -0,0 +1,40 @@
+(
+(import aggregate_module '"/lib/yql/aggregate.yql")
+(import window_module '"/lib/yql/window.yql")
+(import core_module '"/lib/yql/core.yql")
+(let world (Configure! world (DataSource '"config") 'SQL '0))
+(let world (block '(
+ (let x (Read! world (DataSource '"yt" '"plato") (MrTableConcat (Key '('table (String '"Input")))) (Void) '()))
+ (let world (Left! x))
+ (let table1 (Right! x))
+ (let output (block '(
+ (let select (block '(
+ (let core table1)
+ (let core (Aggregate core '() '('('Avg0 (Apply (bind aggregate_module '"avg_traits_factory") (TypeOf core) (lambda '(row) (PersistableRepr (Apply (lambda '("$UniqID") (block '(
+ (let $string_uniqid (Coalesce (SafeCast "$UniqID" (DataType 'String)) (String '"")))
+ (let $time (Substring "$string_uniqid" ("-" (Size "$string_uniqid") (SafeCast (Int32 '"10") (DataType 'Uint32))) (Int32 '"10")))
+ (return ("-" (SafeCast (SqlCall '"DateTime2.ToSeconds" '((PositionalArgs (SafeCast (String '"2017-06-29") (DataType 'Date))))) (DataType 'Int64)) (SafeCast (SqlCall '"DateTime2.FromSeconds" '((PositionalArgs (SafeCast "$time" (DataType 'Uint32))))) (DataType 'Int64))))
+ ))) (Member row '"key"))))))) '()))
+ (let core (FlatMap core (lambda '(row) (block '(
+ (let res (AsStruct '('"avg_timediff" (Cast (/ (Member row 'Avg0) (Uint64 '1000000000000)) 'Int32))))
+ (let res (AsList res))
+ (return res)
+ )))))
+ (return core)
+ )))
+ (return select)
+ )))
+ (let world (block '(
+ (let result_sink (DataSink 'result))
+ (let world (Write! world result_sink (Key) output '('('type) '('autoref) '('columns '('"avg_timediff")))))
+ (return (Commit! world result_sink))
+ )))
+ (return world)
+)))
+(let world (block '(
+ (let plato_sink (DataSink '"yt" '"plato"))
+ (let world (Commit! world plato_sink))
+ (return world)
+)))
+(return world)
+)
diff --git a/yql/essentials/tests/s-expressions/suites/Aggregation/EmptyOptional.cfg b/yql/essentials/tests/s-expressions/suites/Aggregation/EmptyOptional.cfg
new file mode 100644
index 0000000000..c8889a670f
--- /dev/null
+++ b/yql/essentials/tests/s-expressions/suites/Aggregation/EmptyOptional.cfg
@@ -0,0 +1,2 @@
+in Input input1.txt
+res result.txt
diff --git a/yql/essentials/tests/s-expressions/suites/Aggregation/EmptyOptional.yql b/yql/essentials/tests/s-expressions/suites/Aggregation/EmptyOptional.yql
new file mode 100644
index 0000000000..312c757886
--- /dev/null
+++ b/yql/essentials/tests/s-expressions/suites/Aggregation/EmptyOptional.yql
@@ -0,0 +1,58 @@
+(
+(let world (block '(
+ (let x (Read! world (DataSource '"yt" '"plato") (Key '('table (String '"Input"))) '('"key" '"subkey" '"value") '()))
+ (let world (Left! x))
+ (let table0 (Right! x))
+ (let output (block '(
+ (let select (block '(
+ (let core (block '(
+ (let select (block '(
+ (let core table0)
+ (let core (FlatMap core (lambda '(row) (block '(
+ (let res (Struct))
+ (let res (AddMember res '"key" (Member row '"key")))
+ (let res (AddMember res '"subkey" (Member row '"subkey")))
+ (let res (AddMember res '"value" (Cast (Member row '"value") 'Double)))
+ (let res (AsList res))
+ (return res)
+ )))))
+ (return core)
+ )))
+ (return select)
+ )))
+ (let core (block '(
+ (let Sum1_create (lambda '(row) (Member row '"value")))
+ (let Sum1_update (lambda '(row state) (OptionalReduce state (Member row '"value") (lambda '(a b) (+ a b)))))
+ (let Sum1_save (lambda '(state) state))
+ (let Sum1_load (lambda '(item) item))
+ (let Sum1_merge (lambda '(a b) (OptionalReduce a b (lambda '(a b) (+ a b)))))
+ (let Sum1_finish (lambda '(state) state))
+ (return (Aggregate core '('"key") '('('Sum1 (AggregationTraits (ListItemType (TypeOf core)) Sum1_create Sum1_update Sum1_save Sum1_load Sum1_merge Sum1_finish (Null))))))
+ )))
+ (let core (FlatMap core (lambda '(row) (block '(
+ (let res (Struct))
+ (let res (AddMember res '"key" (Member row '"key")))
+ (let res (AddMember res '"subkey" (String '"")))
+ (let res (AddMember res '"value" (Member row 'Sum1)))
+ (let res (AsList res))
+ (return res)
+ )))))
+ (return core)
+ )))
+ (let select (Sort select (Bool 'true) (lambda '(row) (Member row '"key"))))
+ (return select)
+ )))
+ (let world (block '(
+ (let result_sink (DataSink 'result))
+ (let world (Write! world result_sink (Key) output '('('type) '('autoref) '('columns '('"key" '"subkey" '"value")))))
+ (return (Commit! world result_sink))
+ )))
+ (return world)
+)))
+(let world (block '(
+ (let plato_sink (DataSink '"yt" '"plato"))
+ (let world (Commit! world plato_sink))
+ (return world)
+)))
+(return world)
+)
diff --git a/yql/essentials/tests/s-expressions/suites/Aggregation/InMemAggregate.yql b/yql/essentials/tests/s-expressions/suites/Aggregation/InMemAggregate.yql
new file mode 100644
index 0000000000..b25953f460
--- /dev/null
+++ b/yql/essentials/tests/s-expressions/suites/Aggregation/InMemAggregate.yql
@@ -0,0 +1,35 @@
+(
+#comment
+(let config (DataSource 'config))
+(let world (Configure! world config 'PureDataSource 'yt))
+
+(let init (lambda '(x) (Member x 'value)))
+(let init_distinct (lambda '(x) x))
+(let update_min (lambda '(x y) (Min (Member x 'value) y)))
+(let update_sum_distinct (lambda '(x y) (+ x y)))
+(let save (lambda '(x) x))
+(let load (lambda '(x) x))
+(let merge_min (lambda '(x y) (Min x y)))
+(let merge_sum (lambda '(x y) (+ x y)))
+(let finish (lambda '(x) x))
+(let finish_min2 (lambda '(x) '(x (* (Uint32 '2) x))))
+(let list (AsList
+(AsStruct '('key (Uint32 '1)) '('value (Uint32 '2)))
+(AsStruct '('key (Uint32 '2)) '('value (Uint32 '3)))
+(AsStruct '('key (Uint32 '1)) '('value (Uint32 '4)))
+(AsStruct '('key (Uint32 '3)) '('value (Uint32 '10)))
+(AsStruct '('key (Uint32 '2)) '('value (Uint32 '5)))
+(AsStruct '('key (Uint32 '2)) '('value (Uint32 '5)))
+))
+# non-distinct processes row
+(let min (AggregationTraits (ListItemType (TypeOf list)) init update_min save load merge_min finish_min2 (Null)))
+# distinct process one column and requires data/data? type
+(let sum (AggregationTraits (StructMemberType (ListItemType (TypeOf list)) 'value) init_distinct update_sum_distinct save load merge_sum finish (Null)))
+(let resAll (Aggregate list '() '('('('minvalue 'minvalue2) min) '('distsum sum 'value))))
+(let res_sink (DataSink 'result))
+(let world (Write! world res_sink (Key) resAll '('('type))))
+(let resKey (Aggregate list '('key) '('('('minvalue 'minvalue2) min) '('distsum sum 'value))))
+(let world (Write! world res_sink (Key) resKey '('('type))))
+(let world (Commit! world res_sink))
+(return world)
+)
diff --git a/yql/essentials/tests/s-expressions/suites/Aggregation/InMemAggregateUsingTuple.yql b/yql/essentials/tests/s-expressions/suites/Aggregation/InMemAggregateUsingTuple.yql
new file mode 100644
index 0000000000..d3c8358342
--- /dev/null
+++ b/yql/essentials/tests/s-expressions/suites/Aggregation/InMemAggregateUsingTuple.yql
@@ -0,0 +1,40 @@
+(
+#comment
+(let config (DataSource 'config))
+(let world (Configure! world config 'PureDataSource 'yt))
+
+(let init (lambda '(x) (Member x 'value)))
+(let init_distinct (lambda '(x) x))
+(let update_min (lambda '(x y) (Min (Member x 'value) y)))
+(let update_sum_distinct (lambda '(x y) (+ x y)))
+(let save (lambda '(x) x))
+(let load (lambda '(x) x))
+(let merge_min (lambda '(x y) (Min x y)))
+(let merge_sum (lambda '(x y) (+ x y)))
+(let finish (lambda '(x) x))
+(let finish_min2 (lambda '(x) '(x (* (Uint32 '2) x))))
+(let list (AsList
+(AsStruct '('key '((Uint32 '1) (Uint32 '1))) '('subkey (String '.)) '('value (Uint32 '2)))
+(AsStruct '('key '((Uint32 '2) (Uint32 '2))) '('subkey (String '.)) '('value (Uint32 '3)))
+(AsStruct '('key '((Uint32 '1) (Uint32 '1))) '('subkey (String '.)) '('value (Uint32 '4)))
+(AsStruct '('key '((Uint32 '3) (Uint32 '3))) '('subkey (String '.)) '('value (Uint32 '10)))
+(AsStruct '('key '((Uint32 '2) (Uint32 '2))) '('subkey (String '.)) '('value (Uint32 '5)))
+(AsStruct '('key '((Uint32 '2) (Uint32 '2))) '('subkey (String '.)) '('value (Uint32 '5)))
+))
+# non-distinct processes row
+(let min (AggregationTraits (ListItemType (TypeOf list)) init update_min save load merge_min finish_min2 (Null)))
+# distinct process one column and requires data/data? type
+(let sum (AggregationTraits (StructMemberType (ListItemType (TypeOf list)) 'value) init_distinct update_sum_distinct save load merge_sum finish (Null)))
+(let resAll (Aggregate list '() '('('('minvalue 'minvalue2) min) '('distsum sum 'value))))
+(let res_sink (DataSink 'result))
+(let world (Write! world res_sink (Key) resAll '('('type))))
+(let resKey (Aggregate list '('key) '('('('minvalue 'minvalue2) min) '('distsum sum 'value))))
+(let keyExtractor (lambda '(x) '((Nth (Member x 'key) '0) (Nth (Member x 'key) '1))))
+(let resSort (ListSort resKey '((Bool 'true) (Bool 'false)) keyExtractor))
+(let world (Write! world res_sink (Key) resSort '('('type))))
+(let world (Commit! world res_sink))
+(let resKey (Aggregate list '('key 'subkey) '('('('minvalue 'minvalue2) min) '('distsum sum 'value))))
+(let world (Write! world res_sink (Key) resKey '('('type))))
+(let world (Commit! world res_sink))
+(return world)
+)
diff --git a/yql/essentials/tests/s-expressions/suites/Aggregation/InMemAggregateWithParents.yql b/yql/essentials/tests/s-expressions/suites/Aggregation/InMemAggregateWithParents.yql
new file mode 100644
index 0000000000..4357fcf5c7
--- /dev/null
+++ b/yql/essentials/tests/s-expressions/suites/Aggregation/InMemAggregateWithParents.yql
@@ -0,0 +1,35 @@
+(
+#comment
+(let config (DataSource 'config))
+(let world (Configure! world config 'PureDataSource 'yt))
+
+(let init (lambda '(x parent) '(parent (Member x 'value))))
+(let init_distinct (lambda '(x parent) '(parent x)))
+(let update_min (lambda '(x y parent) '(parent (Min (Member x 'value) (Nth y '1)))))
+(let update_sum_distinct (lambda '(x y parent) '(parent (Add x (Nth y '1)))))
+(let save (lambda '(x) x))
+(let load (lambda '(x) x))
+(let merge_min (lambda '(x y) (AggrMin x y)))
+(let merge_sum (lambda '(x y) '((Nth x '0) (AggrAdd (Nth y '1) (Nth x '1)))))
+(let finish (lambda '(x) (Nth x '1)))
+(let finish_min2 (lambda '(x) '((Nth x '1) (* (Uint32 '2) (Nth x '1)))))
+(let list (AsList
+(AsStruct '('key (Uint32 '1)) '('value (Uint32 '2)))
+(AsStruct '('key (Uint32 '2)) '('value (Uint32 '3)))
+(AsStruct '('key (Uint32 '1)) '('value (Uint32 '4)))
+(AsStruct '('key (Uint32 '3)) '('value (Uint32 '10)))
+(AsStruct '('key (Uint32 '2)) '('value (Uint32 '5)))
+(AsStruct '('key (Uint32 '2)) '('value (Uint32 '5)))
+))
+# non-distinct processes row
+(let min (AggregationTraits (ListItemType (TypeOf list)) init update_min save load merge_min finish_min2 (Null)))
+# distinct process one column and requires data/data? type
+(let sum (AggregationTraits (StructMemberType (ListItemType (TypeOf list)) 'value) init_distinct update_sum_distinct save load merge_sum finish (Null)))
+(let resAll (Aggregate list '() '('('('minvalue 'minvalue2) min) '('distsum sum 'value))))
+(let res_sink (DataSink 'result))
+(let world (Write! world res_sink (Key) resAll '('('type))))
+(let resKey (Aggregate list '('key) '('('('minvalue 'minvalue2) min) '('distsum sum 'value))))
+(let world (Write! world res_sink (Key) resKey '('('type))))
+(let world (Commit! world res_sink))
+(return world)
+)
diff --git a/yql/essentials/tests/s-expressions/suites/Aggregation/InMemAggregateZero.yql b/yql/essentials/tests/s-expressions/suites/Aggregation/InMemAggregateZero.yql
new file mode 100644
index 0000000000..73069b7061
--- /dev/null
+++ b/yql/essentials/tests/s-expressions/suites/Aggregation/InMemAggregateZero.yql
@@ -0,0 +1,24 @@
+(
+#comment
+(let config (DataSource 'config))
+(let world (Configure! world config 'PureDataSource 'yt))
+
+(let init_min (lambda '(x) (Member x 'value)))
+(let init_count (lambda '(x) (Uint64 '1)))
+(let update_min (lambda '(x y) (Min (Member x 'value) y)))
+(let update_count (lambda '(x y) (+ y (Uint64 '1))))
+(let id (lambda '(x) x))
+(let merge_min (lambda '(x y) (Min x y)))
+(let merge_count (lambda '(x y) (+ x y)))
+(let list (AsList
+(AsStruct '('key (Uint32 '1)) '('value (Uint32 '2)))
+))
+(let list (Take list (Uint64 '0)))
+(let min (AggregationTraits (ListItemType (TypeOf list)) init_min update_min id id merge_min id (Null)))
+(let count (AggregationTraits (ListItemType (TypeOf list)) init_count update_count id id merge_count id (Uint64 '0)))
+(let resAll (Aggregate list '() '('('minvalue min) '('count count))))
+(let res_sink (DataSink 'result))
+(let world (Write! world res_sink (Key) resAll '('('type))))
+(let world (Commit! world res_sink))
+(return world)
+)
diff --git a/yql/essentials/tests/s-expressions/suites/Aggregation/InMemAggregateZeroOpt.yql b/yql/essentials/tests/s-expressions/suites/Aggregation/InMemAggregateZeroOpt.yql
new file mode 100644
index 0000000000..857859cc2c
--- /dev/null
+++ b/yql/essentials/tests/s-expressions/suites/Aggregation/InMemAggregateZeroOpt.yql
@@ -0,0 +1,24 @@
+(
+#comment
+(let config (DataSource 'config))
+(let world (Configure! world config 'PureDataSource 'yt))
+
+(let init_min (lambda '(x) (Member x 'value)))
+(let init_count (lambda '(x) (Uint64 '1)))
+(let update_min (lambda '(x y) (Min (Member x 'value) y)))
+(let update_count (lambda '(x y) (+ y (Uint64 '1))))
+(let id (lambda '(x) x))
+(let merge_min (lambda '(x y) (Min x y)))
+(let merge_count (lambda '(x y) (+ x y)))
+(let list (AsList
+(AsStruct '('key (Uint32 '1)) '('value (Just (Uint32 '2))))
+))
+(let list (Take list (Uint64 '0)))
+(let min (AggregationTraits (ListItemType (TypeOf list)) init_min update_min id id merge_min id (Null)))
+(let count (AggregationTraits (ListItemType (TypeOf list)) init_count update_count id id merge_count id (Uint64 '0)))
+(let resAll (Aggregate list '() '('('minvalue min) '('count count))))
+(let res_sink (DataSink 'result))
+(let world (Write! world res_sink (Key) resAll '('('type))))
+(let world (Commit! world res_sink))
+(return world)
+)
diff --git a/yql/essentials/tests/s-expressions/suites/Aggregation/Level_4.cfg b/yql/essentials/tests/s-expressions/suites/Aggregation/Level_4.cfg
new file mode 100644
index 0000000000..e341fb79d2
--- /dev/null
+++ b/yql/essentials/tests/s-expressions/suites/Aggregation/Level_4.cfg
@@ -0,0 +1,7 @@
+in Input many_columns.txt
+out Output output.txt
+out Output1 output1.txt
+out Output2 output2.txt
+res result.txt
+mount ..\mounts.txt
+udf datetime2_udf
diff --git a/yql/essentials/tests/s-expressions/suites/Aggregation/Level_4.yql b/yql/essentials/tests/s-expressions/suites/Aggregation/Level_4.yql
new file mode 100644
index 0000000000..f18d8573a4
--- /dev/null
+++ b/yql/essentials/tests/s-expressions/suites/Aggregation/Level_4.yql
@@ -0,0 +1,168 @@
+(
+(import aggregate_module '"/lib/yql/aggregate.yql")
+(let world (Configure! world (DataSource '"config") 'SQL '0))
+(let world (block '(
+ (let x (Read! world (DataSource '"yt" '"plato") (Key '('table (String '"Input"))) '('"assignment_gs_count" '"assignment_assignment_id" '"assignment_last_status_change_time" '"requester_id" '"project_id" '"assignment_gs_weight") '()))
+ (let world (Left! x))
+ (let table5 (Right! x))
+ (let values (block '(
+ (let select (block '(
+ (let core table5)
+ (let core (Filter core (lambda '(row) (Coalesce (">" (Member row '"assignment_gs_count") (Int64 '"0")) (Bool 'false)))))
+ (let core (block '(
+ (return (Aggregate core '('"assignment_assignment_id") '('('Some0 (Apply (bind aggregate_module '"some_traits_factory") (TypeOf core) (lambda '(row) (Member row '"assignment_last_status_change_time")))) '('Some1 (Apply (bind aggregate_module '"some_traits_factory") (TypeOf core) (lambda '(row) (Member row '"requester_id")))) '('Some2 (Apply (bind aggregate_module '"some_traits_factory") (TypeOf core) (lambda '(row) (Member row '"project_id")))) '('Some3 (Apply (bind aggregate_module '"some_traits_factory") (TypeOf core) (lambda '(row) (Member row '"assignment_gs_count")))) '('Some4 (Apply (bind aggregate_module '"some_traits_factory") (TypeOf core) (lambda '(row) (Member row '"assignment_gs_weight")))))))
+ )))
+ (let core (FlatMap core (lambda '(row) (block '(
+ (let res (Struct))
+ (let res (AddMember res '"assignment_id" (Member row '"assignment_assignment_id")))
+ (let res (AddMember res '"date" (SafeCast ("Apply" ("Udf" '"DateTime2.FromSeconds") (Cast (Member row 'Some0) 'Uint32)) (DataType 'Date))))
+ (let res (AddMember res '"requester_id" (Member row 'Some1)))
+ (let res (AddMember res '"project_id" (Member row 'Some2)))
+ (let res (AddMember res '"gs_count" (Member row 'Some3)))
+ (let res (AddMember res '"gs_weight" (Cast (Member row 'Some4) 'Double)))
+ (let res (AsList res))
+ (return res)
+ )))))
+ (return core)
+ )))
+ (return select)
+ )))
+ (let world (block '(
+ (let sink (DataSink '"yt" '"plato"))
+ (let world (Write! world sink (Key '('table (String '"Output"))) values '('('mode 'renew))))
+ (return world)
+ )))
+ (return world)
+)))
+(let world (block '(
+ (let plato_sink (DataSink '"yt" '"plato"))
+ (let world (Commit! world plato_sink))
+ (return world)
+)))
+(let world (block '(
+ (let x (Read! world (DataSource '"yt" '"plato") (Key '('table (String '"Output"))) (Void) '()))
+ (let world (Left! x))
+ (let table7 (Right! x))
+ (let values (block '(
+ (let select (block '(
+ (let core (block '(
+ (let select (block '(
+ (let core (block '(
+ (let select (block '(
+ (let core table7)
+ (let core (FlatMap core (lambda '(row) (block '(
+ (let res (AsList row))
+ (return res)
+ )))))
+ (return core)
+ )))
+ (return select)
+ )))
+ (let core (block '(
+ (return (Aggregate core '('"date" '"project_id" '"requester_id") '('('Sum8 (Apply (bind aggregate_module '"sum_traits_factory") (TypeOf core) (lambda '(row) (Member row '"gs_count")))) '('Sum9 (Apply (bind aggregate_module '"sum_traits_factory") (TypeOf core) (lambda '(row) (Member row '"gs_weight")))))))
+ )))
+ (let core (FlatMap core (lambda '(row) (block '(
+ (let res (Struct))
+ (let res (AddMember res '"date" (Member row '"date")))
+ (let res (AddMember res '"requester_id" (Member row '"requester_id")))
+ (let res (AddMember res '"project_id" (Member row '"project_id")))
+ (let res (AddMember res '"gs_count" (Member row 'Sum8)))
+ (let res (AddMember res '"gs_weight" (Member row 'Sum9)))
+ (let res (AsList res))
+ (return res)
+ )))))
+ (return core)
+ )))
+ (return select)
+ )))
+ (let core (FlatMap core (lambda '(row) (block '(
+ (let res (AsList row))
+ (return res)
+ )))))
+ (return core)
+ )))
+ (return select)
+ )))
+ (let world (block '(
+ (let sink (DataSink '"yt" '"plato"))
+ (let world (Write! world sink (Key '('table (String '"Output1"))) values '('('mode 'renew))))
+ (return world)
+ )))
+ (return world)
+)))
+(let world (block '(
+ (let x (Read! world (DataSource '"yt" '"plato") (Key '('table (String '"Output"))) (Void) '()))
+ (let world (Left! x))
+ (let table7 (Right! x))
+ (let values (block '(
+ (let select (block '(
+ (let core (block '(
+ (let select (block '(
+ (let core (block '(
+ (let select (block '(
+ (let core (block '(
+ (let select (block '(
+ (let core table7)
+ (let core (FlatMap core (lambda '(row) (block '(
+ (let res (AsList row))
+ (return res)
+ )))))
+ (return core)
+ )))
+ (return select)
+ )))
+ (let core (block '(
+ (return (Aggregate core '('"date" '"project_id" '"requester_id") '('('Sum8 (Apply (bind aggregate_module '"sum_traits_factory") (TypeOf core) (lambda '(row) (Member row '"gs_count")))) '('Sum9 (Apply (bind aggregate_module '"sum_traits_factory") (TypeOf core) (lambda '(row) (Member row '"gs_weight")))))))
+ )))
+ (let core (FlatMap core (lambda '(row) (block '(
+ (let res (Struct))
+ (let res (AddMember res '"date" (Member row '"date")))
+ (let res (AddMember res '"requester_id" (Member row '"requester_id")))
+ (let res (AddMember res '"project_id" (Member row '"project_id")))
+ (let res (AddMember res '"gs_count" (Member row 'Sum8)))
+ (let res (AddMember res '"gs_weight" (Member row 'Sum9)))
+ (let res (AsList res))
+ (return res)
+ )))))
+ (return core)
+ )))
+ (return select)
+ )))
+ (let core (block '(
+ (return (Aggregate core '('"date" '"requester_id") '('('Sum11 (Apply (bind aggregate_module '"sum_traits_factory") (TypeOf core) (lambda '(row) (Member row '"gs_count")))) '('Sum12 (Apply (bind aggregate_module '"sum_traits_factory") (TypeOf core) (lambda '(row) (Member row '"gs_weight")))))))
+ )))
+ (let core (FlatMap core (lambda '(row) (block '(
+ (let res (Struct))
+ (let res (AddMember res '"date" (Member row '"date")))
+ (let res (AddMember res '"requester_id" (Member row '"requester_id")))
+ (let res (AddMember res '"gs_count" (Member row 'Sum11)))
+ (let res (AddMember res '"gs_weight" (Member row 'Sum12)))
+ (let res (AsList res))
+ (return res)
+ )))))
+ (return core)
+ )))
+ (return select)
+ )))
+ (let core (FlatMap core (lambda '(row) (block '(
+ (let res (AsList row))
+ (return res)
+ )))))
+ (return core)
+ )))
+ (return select)
+ )))
+ (let world (block '(
+ (let sink (DataSink '"yt" '"plato"))
+ (let world (Write! world sink (Key '('table (String '"Output2"))) values '('('mode 'renew))))
+ (return world)
+ )))
+ (return world)
+)))
+(let world (block '(
+ (let plato_sink (DataSink '"yt" '"plato"))
+ (let world (Commit! world plato_sink))
+ (return world)
+)))
+(return world)
+)
diff --git a/yql/essentials/tests/s-expressions/suites/Aggregation/Level_5.cfg b/yql/essentials/tests/s-expressions/suites/Aggregation/Level_5.cfg
new file mode 100644
index 0000000000..f01a3e0180
--- /dev/null
+++ b/yql/essentials/tests/s-expressions/suites/Aggregation/Level_5.cfg
@@ -0,0 +1,8 @@
+in Input many_columns.txt
+out Output output.txt
+out Output1 output1.txt
+out Output2 output2.txt
+out Output3 output3.txt
+res result.txt
+mount ..\mounts.txt
+udf datetime2_udf
diff --git a/yql/essentials/tests/s-expressions/suites/Aggregation/Level_5.yql b/yql/essentials/tests/s-expressions/suites/Aggregation/Level_5.yql
new file mode 100644
index 0000000000..18d89027da
--- /dev/null
+++ b/yql/essentials/tests/s-expressions/suites/Aggregation/Level_5.yql
@@ -0,0 +1,254 @@
+(
+(import aggregate_module '"/lib/yql/aggregate.yql")
+(let world (Configure! world (DataSource '"config") 'SQL '0))
+(let world (block '(
+ (let x (Read! world (DataSource '"yt" '"plato") (Key '('table (String '"Input"))) '('"assignment_gs_count" '"assignment_assignment_id" '"assignment_last_status_change_time" '"requester_id" '"project_id" '"assignment_gs_weight") '()))
+ (let world (Left! x))
+ (let table5 (Right! x))
+ (let values (block '(
+ (let select (block '(
+ (let core table5)
+ (let core (Filter core (lambda '(row) (Coalesce (">" (Member row '"assignment_gs_count") (Int64 '"0")) (Bool 'false)))))
+ (let core (block '(
+ (return (Aggregate core '('"assignment_assignment_id") '('('Some0 (Apply (bind aggregate_module '"some_traits_factory") (TypeOf core) (lambda '(row) (Member row '"assignment_last_status_change_time")))) '('Some1 (Apply (bind aggregate_module '"some_traits_factory") (TypeOf core) (lambda '(row) (Member row '"requester_id")))) '('Some2 (Apply (bind aggregate_module '"some_traits_factory") (TypeOf core) (lambda '(row) (Member row '"project_id")))) '('Some3 (Apply (bind aggregate_module '"some_traits_factory") (TypeOf core) (lambda '(row) (Member row '"assignment_gs_count")))) '('Some4 (Apply (bind aggregate_module '"some_traits_factory") (TypeOf core) (lambda '(row) (Member row '"assignment_gs_weight")))))))
+ )))
+ (let core (FlatMap core (lambda '(row) (block '(
+ (let res (Struct))
+ (let res (AddMember res '"assignment_id" (Member row '"assignment_assignment_id")))
+ (let res (AddMember res '"date" (SafeCast ("Apply" ("Udf" '"DateTime2.FromSeconds") (Cast (Member row 'Some0) 'Uint32)) (DataType 'Date))))
+ (let res (AddMember res '"requester_id" (Member row 'Some1)))
+ (let res (AddMember res '"project_id" (Member row 'Some2)))
+ (let res (AddMember res '"gs_count" (Member row 'Some3)))
+ (let res (AddMember res '"gs_weight" (Cast (Member row 'Some4) 'Double)))
+ (let res (AsList res))
+ (return res)
+ )))))
+ (return core)
+ )))
+ (return select)
+ )))
+ (let world (block '(
+ (let sink (DataSink '"yt" '"plato"))
+ (let world (Write! world sink (Key '('table (String '"Output"))) values '('('mode 'renew))))
+ (return world)
+ )))
+ (return world)
+)))
+(let world (block '(
+ (let plato_sink (DataSink '"yt" '"plato"))
+ (let world (Commit! world plato_sink))
+ (return world)
+)))
+(let world (block '(
+ (let x (Read! world (DataSource '"yt" '"plato") (Key '('table (String '"Output"))) (Void) '()))
+ (let world (Left! x))
+ (let table7 (Right! x))
+ (let values (block '(
+ (let select (block '(
+ (let core (block '(
+ (let select (block '(
+ (let core (block '(
+ (let select (block '(
+ (let core table7)
+ (let core (FlatMap core (lambda '(row) (block '(
+ (let res (AsList row))
+ (return res)
+ )))))
+ (return core)
+ )))
+ (return select)
+ )))
+ (let core (block '(
+ (return (Aggregate core '('"date" '"project_id" '"requester_id") '('('Sum8 (Apply (bind aggregate_module '"sum_traits_factory") (TypeOf core) (lambda '(row) (Member row '"gs_count")))) '('Sum9 (Apply (bind aggregate_module '"sum_traits_factory") (TypeOf core) (lambda '(row) (Member row '"gs_weight")))))))
+ )))
+ (let core (FlatMap core (lambda '(row) (block '(
+ (let res (Struct))
+ (let res (AddMember res '"date" (Member row '"date")))
+ (let res (AddMember res '"requester_id" (Member row '"requester_id")))
+ (let res (AddMember res '"project_id" (Member row '"project_id")))
+ (let res (AddMember res '"gs_count" (Member row 'Sum8)))
+ (let res (AddMember res '"gs_weight" (Member row 'Sum9)))
+ (let res (AsList res))
+ (return res)
+ )))))
+ (return core)
+ )))
+ (return select)
+ )))
+ (let core (FlatMap core (lambda '(row) (block '(
+ (let res (AsList row))
+ (return res)
+ )))))
+ (return core)
+ )))
+ (return select)
+ )))
+ (let world (block '(
+ (let sink (DataSink '"yt" '"plato"))
+ (let world (Write! world sink (Key '('table (String '"Output1"))) values '('('mode 'renew))))
+ (return world)
+ )))
+ (return world)
+)))
+(let world (block '(
+ (let x (Read! world (DataSource '"yt" '"plato") (Key '('table (String '"Output"))) (Void) '()))
+ (let world (Left! x))
+ (let table7 (Right! x))
+ (let values (block '(
+ (let select (block '(
+ (let core (block '(
+ (let select (block '(
+ (let core (block '(
+ (let select (block '(
+ (let core (block '(
+ (let select (block '(
+ (let core table7)
+ (let core (FlatMap core (lambda '(row) (block '(
+ (let res (AsList row))
+ (return res)
+ )))))
+ (return core)
+ )))
+ (return select)
+ )))
+ (let core (block '(
+ (return (Aggregate core '('"date" '"project_id" '"requester_id") '('('Sum8 (Apply (bind aggregate_module '"sum_traits_factory") (TypeOf core) (lambda '(row) (Member row '"gs_count")))) '('Sum9 (Apply (bind aggregate_module '"sum_traits_factory") (TypeOf core) (lambda '(row) (Member row '"gs_weight")))))))
+ )))
+ (let core (FlatMap core (lambda '(row) (block '(
+ (let res (Struct))
+ (let res (AddMember res '"date" (Member row '"date")))
+ (let res (AddMember res '"requester_id" (Member row '"requester_id")))
+ (let res (AddMember res '"project_id" (Member row '"project_id")))
+ (let res (AddMember res '"gs_count" (Member row 'Sum8)))
+ (let res (AddMember res '"gs_weight" (Member row 'Sum9)))
+ (let res (AsList res))
+ (return res)
+ )))))
+ (return core)
+ )))
+ (return select)
+ )))
+ (let core (block '(
+ (return (Aggregate core '('"date" '"requester_id") '('('Sum11 (Apply (bind aggregate_module '"sum_traits_factory") (TypeOf core) (lambda '(row) (Member row '"gs_count")))) '('Sum12 (Apply (bind aggregate_module '"sum_traits_factory") (TypeOf core) (lambda '(row) (Member row '"gs_weight")))))))
+ )))
+ (let core (FlatMap core (lambda '(row) (block '(
+ (let res (Struct))
+ (let res (AddMember res '"date" (Member row '"date")))
+ (let res (AddMember res '"requester_id" (Member row '"requester_id")))
+ (let res (AddMember res '"gs_count" (Member row 'Sum11)))
+ (let res (AddMember res '"gs_weight" (Member row 'Sum12)))
+ (let res (AsList res))
+ (return res)
+ )))))
+ (return core)
+ )))
+ (return select)
+ )))
+ (let core (FlatMap core (lambda '(row) (block '(
+ (let res (AsList row))
+ (return res)
+ )))))
+ (return core)
+ )))
+ (return select)
+ )))
+ (let world (block '(
+ (let sink (DataSink '"yt" '"plato"))
+ (let world (Write! world sink (Key '('table (String '"Output2"))) values '('('mode 'renew))))
+ (return world)
+ )))
+ (return world)
+)))
+(let world (block '(
+ (let x (Read! world (DataSource '"yt" '"plato") (Key '('table (String '"Output"))) (Void) '()))
+ (let world (Left! x))
+ (let table7 (Right! x))
+ (let values (block '(
+ (let select (block '(
+ (let core (block '(
+ (let select (block '(
+ (let core (block '(
+ (let select (block '(
+ (let core (block '(
+ (let select (block '(
+ (let core (block '(
+ (let select (block '(
+ (let core table7)
+ (let core (FlatMap core (lambda '(row) (block '(
+ (let res (AsList row))
+ (return res)
+ )))))
+ (return core)
+ )))
+ (return select)
+ )))
+ (let core (block '(
+ (return (Aggregate core '('"date" '"project_id" '"requester_id") '('('Sum8 (Apply (bind aggregate_module '"sum_traits_factory") (TypeOf core) (lambda '(row) (Member row '"gs_count")))) '('Sum9 (Apply (bind aggregate_module '"sum_traits_factory") (TypeOf core) (lambda '(row) (Member row '"gs_weight")))))))
+ )))
+ (let core (FlatMap core (lambda '(row) (block '(
+ (let res (Struct))
+ (let res (AddMember res '"date" (Member row '"date")))
+ (let res (AddMember res '"requester_id" (Member row '"requester_id")))
+ (let res (AddMember res '"project_id" (Member row '"project_id")))
+ (let res (AddMember res '"gs_count" (Member row 'Sum8)))
+ (let res (AddMember res '"gs_weight" (Member row 'Sum9)))
+ (let res (AsList res))
+ (return res)
+ )))))
+ (return core)
+ )))
+ (return select)
+ )))
+ (let core (block '(
+ (return (Aggregate core '('"date" '"requester_id") '('('Sum11 (Apply (bind aggregate_module '"sum_traits_factory") (TypeOf core) (lambda '(row) (Member row '"gs_count")))) '('Sum12 (Apply (bind aggregate_module '"sum_traits_factory") (TypeOf core) (lambda '(row) (Member row '"gs_weight")))))))
+ )))
+ (let core (FlatMap core (lambda '(row) (block '(
+ (let res (Struct))
+ (let res (AddMember res '"date" (Member row '"date")))
+ (let res (AddMember res '"requester_id" (Member row '"requester_id")))
+ (let res (AddMember res '"gs_count" (Member row 'Sum11)))
+ (let res (AddMember res '"gs_weight" (Member row 'Sum12)))
+ (let res (AsList res))
+ (return res)
+ )))))
+ (return core)
+ )))
+ (return select)
+ )))
+ (let core (block '(
+ (return (Aggregate core '('"date") '('('Sum14 (Apply (bind aggregate_module '"sum_traits_factory") (TypeOf core) (lambda '(row) (Member row '"gs_count")))) '('Sum15 (Apply (bind aggregate_module '"sum_traits_factory") (TypeOf core) (lambda '(row) (Member row '"gs_weight")))))))
+ )))
+ (let core (FlatMap core (lambda '(row) (block '(
+ (let res (Struct))
+ (let res (AddMember res '"date" (Member row '"date")))
+ (let res (AddMember res '"gs_count" (Member row 'Sum14)))
+ (let res (AddMember res '"gs_weight" (Member row 'Sum15)))
+ (let res (AsList res))
+ (return res)
+ )))))
+ (return core)
+ )))
+ (return select)
+ )))
+ (let core (FlatMap core (lambda '(row) (block '(
+ (let res (AsList row))
+ (return res)
+ )))))
+ (return core)
+ )))
+ (return select)
+ )))
+ (let world (block '(
+ (let sink (DataSink '"yt" '"plato"))
+ (let world (Write! world sink (Key '('table (String '"Output3"))) values '('('mode 'renew))))
+ (return world)
+ )))
+ (return world)
+)))
+(let world (block '(
+ (let plato_sink (DataSink '"yt" '"plato"))
+ (let world (Commit! world plato_sink))
+ (return world)
+)))
+(return world)
+)
diff --git a/yql/essentials/tests/s-expressions/suites/Aggregation/SameTrait.cfg b/yql/essentials/tests/s-expressions/suites/Aggregation/SameTrait.cfg
new file mode 100644
index 0000000000..ed61022bf8
--- /dev/null
+++ b/yql/essentials/tests/s-expressions/suites/Aggregation/SameTrait.cfg
@@ -0,0 +1,5 @@
+in Input input.txt
+out Output output.txt
+res result.txt
+mount ..\mounts.txt
+udf stat_udf
diff --git a/yql/essentials/tests/s-expressions/suites/Aggregation/SameTrait.sql b/yql/essentials/tests/s-expressions/suites/Aggregation/SameTrait.sql
new file mode 100644
index 0000000000..aaa89e48d5
--- /dev/null
+++ b/yql/essentials/tests/s-expressions/suites/Aggregation/SameTrait.sql
@@ -0,0 +1,17 @@
+USE plato;
+
+SELECT
+ key as key,
+ "a" || min(subkey || "q") as x1,
+ "b" || min(subkey || "q") as x2,
+ "c" || max(value) as x3,
+ "d" || max(value) as x4,
+ "1" || min(distinct subkey) as y1,
+ "2" || min(distinct subkey) as y2,
+ "3" || max(distinct value) as y3,
+ "4" || max(distinct value) as y4,
+ percentile(x, 0.5),
+ percentile(x, 0.9)
+FROM (SELECT key, subkey, value, Length(key) as x from Input)
+GROUP BY key
+ORDER BY key; \ No newline at end of file
diff --git a/yql/essentials/tests/s-expressions/suites/Aggregation/SameTrait.yql b/yql/essentials/tests/s-expressions/suites/Aggregation/SameTrait.yql
new file mode 100644
index 0000000000..64ee248ed9
--- /dev/null
+++ b/yql/essentials/tests/s-expressions/suites/Aggregation/SameTrait.yql
@@ -0,0 +1,73 @@
+(
+(import aggregate_module '"/lib/yql/aggregate.yql")
+(let world (block '(
+ (let x (Read! world (DataSource '"yt" '"plato") (Key '('table (String '"Input"))) '('"key" '"subkey" '"value") '()))
+ (let world (Left! x))
+ (let table0 (Right! x))
+ (let output (block '(
+ (let select (block '(
+ (let core (block '(
+ (let select (block '(
+ (let core table0)
+ (let core (FlatMap core (lambda '(row) (block '(
+ (let res (Struct))
+ (let res (AddMember res '"key" (Member row '"key")))
+ (let res (AddMember res '"subkey" (Member row '"subkey")))
+ (let res (AddMember res '"value" (Member row '"value")))
+ (let res (AddMember res '"x" ("Size" (Member row '"key"))))
+ (let res (AsList res))
+ (return res)
+ )))))
+ (return core)
+ )))
+ (return select)
+ )))
+ (let core (block '(
+ (let percentile_x_create (lambda '(row) (MatchType (Member row '"x") 'Optional (lambda '(optValue) (FlatMap optValue (lambda '(value) (Just (Apply (Udf 'Stat.TDigest_Create) value))))) (lambda '(value) (Apply (Udf 'Stat.TDigest_Create) value)))))
+ (let percentile_x_update (lambda '(row state) (MatchType (Member row '"x") 'Optional (lambda '(optValue) (IfPresent state (lambda '(state) (IfPresent optValue (lambda '(value) (Just (Apply (Udf 'Stat.TDigest_AddValue) state value))) (Just state))) (FlatMap optValue (lambda '(value) (Just (Apply (Udf 'Stat.TDigest_Create) value)))))) (lambda '(value) (Apply (Udf 'Stat.TDigest_AddValue) state value)))))
+ (let percentile_x_save (lambda '(state) (MatchType state 'Optional (lambda '(optState) (Map optState (lambda '(currState) (Apply (Udf 'Stat.TDigest_Serialize) currState)))) (lambda '(currState) (Apply (Udf 'Stat.TDigest_Serialize) currState)))))
+ (let percentile_x_load (lambda '(item) (MatchType item 'Optional (lambda '(optData) (FlatMap optData (lambda '(data) (Just (Apply (Udf 'Stat.TDigest_Deserialize) data))))) (lambda '(data) (Apply (Udf 'Stat.TDigest_Deserialize) data)))))
+ (let percentile_x_merge (lambda '(a b) (OptionalReduce a b (lambda '(a b) (Apply (Udf 'Stat.TDigest_Merge) a b)))))
+ (let percentile_x_finish (lambda '(state) (block '(
+ (let res (Struct))
+ (let res (AddMember res 'Percentile9 (Apply (lambda '(state) (MatchType state 'Optional (lambda '(optData) (Map optData (lambda '(data) (Apply (Udf 'Stat.TDigest_GetPercentile) data (Double '0.5))))) (lambda '(data) (Apply (Udf 'Stat.TDigest_GetPercentile) data (Double '0.5))))) state)))
+ (let res (AddMember res 'Percentile10 (Apply (lambda '(state) (MatchType state 'Optional (lambda '(optData) (Map optData (lambda '(data) (Apply (Udf 'Stat.TDigest_GetPercentile) data (Double '0.9))))) (lambda '(data) (Apply (Udf 'Stat.TDigest_GetPercentile) data (Double '0.9))))) state)))
+ (return res)
+ ))))
+ (return (Aggregate core '('"key") '('('Min1 (Apply (bind aggregate_module '"min_traits_factory") (TypeOf core) (lambda '(row) ("Concat" (Member row '"subkey") (String '"q"))))) '('Min2 (Apply (bind aggregate_module '"min_traits_factory") (TypeOf core) (lambda '(row) ("Concat" (Member row '"subkey") (String '"q"))))) '('Max3 (Apply (bind aggregate_module '"max_traits_factory") (TypeOf core) (lambda '(row) (Member row '"value")))) '('Max4 (Apply (bind aggregate_module '"max_traits_factory") (TypeOf core) (lambda '(row) (Member row '"value")))) '('Min5 (Apply (bind aggregate_module '"min_traits_factory") (ListType (StructMemberType (ListItemType (TypeOf core)) '"subkey")) (lambda '(row) row)) '"subkey") '('Min6 (Apply (bind aggregate_module '"min_traits_factory") (ListType (StructMemberType (ListItemType (TypeOf core)) '"subkey")) (lambda '(row) row)) '"subkey") '('Max7 (Apply (bind aggregate_module '"max_traits_factory") (ListType (StructMemberType (ListItemType (TypeOf core)) '"value")) (lambda '(row) row)) '"value") '('Max8 (Apply (bind aggregate_module '"max_traits_factory") (ListType (StructMemberType (ListItemType (TypeOf core)) '"value")) (lambda '(row) row)) '"value") '('percentile_x (AggregationTraits (ListItemType (TypeOf core)) percentile_x_create percentile_x_update percentile_x_save percentile_x_load percentile_x_merge percentile_x_finish (Null))))))
+ )))
+ (let core (FlatMap core (lambda '(row) (block '(
+ (let res (Struct))
+ (let res (AddMember res '"key" (Member row '"key")))
+ (let res (AddMember res '"x1" ("Concat" (String '"a") (Member row 'Min1))))
+ (let res (AddMember res '"x2" ("Concat" (String '"b") (Member row 'Min2))))
+ (let res (AddMember res '"x3" ("Concat" (String '"c") (Member row 'Max3))))
+ (let res (AddMember res '"x4" ("Concat" (String '"d") (Member row 'Max4))))
+ (let res (AddMember res '"y1" ("Concat" (String '"1") (Member row 'Min5))))
+ (let res (AddMember res '"y2" ("Concat" (String '"2") (Member row 'Min6))))
+ (let res (AddMember res '"y3" ("Concat" (String '"3") (Member row 'Max7))))
+ (let res (AddMember res '"y4" ("Concat" (String '"4") (Member row 'Max8))))
+ (let res (AddMember res '"column9" (Member (Member row '"percentile_x") 'Percentile9)))
+ (let res (AddMember res '"column10" (Member (Member row '"percentile_x") 'Percentile10)))
+ (let res (AsList res))
+ (return res)
+ )))))
+ (return core)
+ )))
+ (let select (Sort select (Bool 'true) (lambda '(row) (Member row '"key"))))
+ (return select)
+ )))
+ (let world (block '(
+ (let result_sink (DataSink 'result))
+ (let world (Write! world result_sink (Key) output '('('type) '('autoref) '('columns '('"key" '"x1" '"x2" '"x3" '"x4" '"y1" '"y2" '"y3" '"y4" '"column9" '"column10")))))
+ (return (Commit! world result_sink))
+ )))
+ (return world)
+)))
+(let world (block '(
+ (let plato_sink (DataSink '"yt" '"plato"))
+ (let world (Commit! world plato_sink))
+ (return world)
+)))
+(return world)
+)
diff --git a/yql/essentials/tests/s-expressions/suites/Aggregation/Traits.yql b/yql/essentials/tests/s-expressions/suites/Aggregation/Traits.yql
new file mode 100644
index 0000000000..7a8c8845dc
--- /dev/null
+++ b/yql/essentials/tests/s-expressions/suites/Aggregation/Traits.yql
@@ -0,0 +1,17 @@
+(
+#comment
+(let config (DataSource 'config))
+(let world (Configure! world config 'PureDataSource 'yt))
+
+(let init (lambda '(x) x))
+(let update (lambda '(x y) (+ x y)))
+(let save (lambda '(x) x))
+(let load (lambda '(x) x))
+(let merge (lambda '(x y) (+ x y)))
+(let finish (lambda '(x) x))
+(let sum (AggregationTraits (DataType 'Uint32) init update save load merge finish (Null)))
+(let res_sink (DataSink 'result))
+(let world (Write! world res_sink (Key) (FormatType (TypeOf sum)) '('('type))))
+(let world (Commit! world res_sink))
+(return world)
+)
diff --git a/yql/essentials/tests/s-expressions/suites/Aggregation/default.cfg b/yql/essentials/tests/s-expressions/suites/Aggregation/default.cfg
new file mode 100644
index 0000000000..fe2b19a9ff
--- /dev/null
+++ b/yql/essentials/tests/s-expressions/suites/Aggregation/default.cfg
@@ -0,0 +1,3 @@
+in Input input.txt
+out Output output.txt
+res result.txt
diff --git a/yql/essentials/tests/s-expressions/suites/Aggregation/input.txt b/yql/essentials/tests/s-expressions/suites/Aggregation/input.txt
new file mode 100644
index 0000000000..fc70ee2541
--- /dev/null
+++ b/yql/essentials/tests/s-expressions/suites/Aggregation/input.txt
@@ -0,0 +1,7 @@
+{"key"="075";"subkey"="1";"value"="2"};
+{"key"="075";"subkey"="2";"value"="5"};
+{"key"="075";"subkey"="3";"value"="5"};
+{"key"="800";"subkey"="4";"value"="4"};
+{"key"="020";"subkey"="5";"value"="10"};
+{"key"="150";"subkey"="6";"value"="2"};
+{"key"="150";"subkey"="7";"value"="3"};
diff --git a/yql/essentials/tests/s-expressions/suites/Aggregation/input.txt.attr b/yql/essentials/tests/s-expressions/suites/Aggregation/input.txt.attr
new file mode 100644
index 0000000000..b6100e5fd0
--- /dev/null
+++ b/yql/essentials/tests/s-expressions/suites/Aggregation/input.txt.attr
@@ -0,0 +1,30 @@
+{
+ "_yql_row_spec" = {
+ "Type" = [
+ "StructType";
+ [
+ [
+ "key";
+ [
+ "DataType";
+ "String"
+ ]
+ ];
+ [
+ "subkey";
+ [
+ "DataType";
+ "String"
+ ]
+ ];
+ [
+ "value";
+ [
+ "DataType";
+ "String"
+ ]
+ ]
+ ]
+ ]
+ }
+} \ No newline at end of file
diff --git a/yql/essentials/tests/s-expressions/suites/Aggregation/input1.txt b/yql/essentials/tests/s-expressions/suites/Aggregation/input1.txt
new file mode 100644
index 0000000000..d1fa0973d9
--- /dev/null
+++ b/yql/essentials/tests/s-expressions/suites/Aggregation/input1.txt
@@ -0,0 +1,8 @@
+{"key"="075";"subkey"=".";"value"="abc"};
+{"key"="911";"subkey"=".";"value"="kkk"};
+{"key"="023";"subkey"=".";"value"="aaa"};
+{"key"="527";"subkey"=".";"value"="bbb"};
+{"key"="037";"subkey"=".";"value"="ddd"};
+{"key"="761";"subkey"=".";"value"="ccc"};
+{"key"="200";"subkey"=".";"value"="qqq"};
+{"key"="150";"subkey"=".";"value"="zzz"};
diff --git a/yql/essentials/tests/s-expressions/suites/Aggregation/input1.txt.attr b/yql/essentials/tests/s-expressions/suites/Aggregation/input1.txt.attr
new file mode 100644
index 0000000000..b6100e5fd0
--- /dev/null
+++ b/yql/essentials/tests/s-expressions/suites/Aggregation/input1.txt.attr
@@ -0,0 +1,30 @@
+{
+ "_yql_row_spec" = {
+ "Type" = [
+ "StructType";
+ [
+ [
+ "key";
+ [
+ "DataType";
+ "String"
+ ]
+ ];
+ [
+ "subkey";
+ [
+ "DataType";
+ "String"
+ ]
+ ];
+ [
+ "value";
+ [
+ "DataType";
+ "String"
+ ]
+ ]
+ ]
+ ]
+ }
+} \ No newline at end of file
diff --git a/yql/essentials/tests/s-expressions/suites/Aggregation/input3.txt b/yql/essentials/tests/s-expressions/suites/Aggregation/input3.txt
new file mode 100644
index 0000000000..6872d1e389
--- /dev/null
+++ b/yql/essentials/tests/s-expressions/suites/Aggregation/input3.txt
@@ -0,0 +1,2 @@
+{"key"=1016143471372484394u};
+{"key"=1033900131482000140u};
diff --git a/yql/essentials/tests/s-expressions/suites/Aggregation/input3.txt.attr b/yql/essentials/tests/s-expressions/suites/Aggregation/input3.txt.attr
new file mode 100644
index 0000000000..bdbbaad78d
--- /dev/null
+++ b/yql/essentials/tests/s-expressions/suites/Aggregation/input3.txt.attr
@@ -0,0 +1,10 @@
+{"_yql_row_spec"={
+ "Type"=["StructType";[
+ ["key";["DataType";"Uint64"]];
+ ]
+ ];
+ "SortDirections"=[1;];
+ "SortedBy"=["key";];
+ "SortedByTypes"=[["DataType";"Uint64";];];
+ "SortMembers"=["key";];
+}}
diff --git a/yql/essentials/tests/s-expressions/suites/Aggregation/many_columns.txt b/yql/essentials/tests/s-expressions/suites/Aggregation/many_columns.txt
new file mode 100644
index 0000000000..c76c262542
--- /dev/null
+++ b/yql/essentials/tests/s-expressions/suites/Aggregation/many_columns.txt
@@ -0,0 +1,10 @@
+{"assignment_assignment_id"=243u;"assignment_gs_count"=98u;"assignment_gs_weight"=89989u;"assignment_last_status_change_time"=43u;"project_id"=67u;"requester_id"=65u};
+{"assignment_assignment_id"=43u;"assignment_gs_count"=9u;"assignment_gs_weight"=8u;"assignment_last_status_change_time"=4563u;"project_id"=6897u;"requester_id"=765u};
+{"assignment_assignment_id"=25u;"assignment_gs_count"=678u;"assignment_gs_weight"=0u;"assignment_last_status_change_time"=45u;"project_id"=62u;"requester_id"=63u};
+{"assignment_assignment_id"=2563u;"assignment_gs_count"=98u;"assignment_gs_weight"=89989u;"assignment_last_status_change_time"=43u;"project_id"=67u;"requester_id"=65u};
+{"assignment_assignment_id"=233453u;"assignment_gs_count"=98u;"assignment_gs_weight"=89989u;"assignment_last_status_change_time"=43u;"project_id"=67u;"requester_id"=65u};
+{"assignment_assignment_id"=2456u;"assignment_gs_count"=978u;"assignment_gs_weight"=89u;"assignment_last_status_change_time"=403u;"project_id"=67u;"requester_id"=67u};
+{"assignment_assignment_id"=27643u;"assignment_gs_count"=9768u;"assignment_gs_weight"=86789u;"assignment_last_status_change_time"=49u;"project_id"=67u;"requester_id"=67u};
+{"assignment_assignment_id"=248u;"assignment_gs_count"=9768u;"assignment_gs_weight"=89989u;"assignment_last_status_change_time"=409u;"project_id"=67u;"requester_id"=625u};
+{"assignment_assignment_id"=248u;"assignment_gs_count"=9768u;"assignment_gs_weight"=89969u;"assignment_last_status_change_time"=4u;"project_id"=6347u;"requester_id"=65u};
+{"assignment_assignment_id"=24893u;"assignment_gs_count"=98u;"assignment_gs_weight"=8963989u;"assignment_last_status_change_time"=40u;"project_id"=6767u;"requester_id"=655u};
diff --git a/yql/essentials/tests/s-expressions/suites/Aggregation/many_columns.txt.attr b/yql/essentials/tests/s-expressions/suites/Aggregation/many_columns.txt.attr
new file mode 100644
index 0000000000..54d58bb57d
--- /dev/null
+++ b/yql/essentials/tests/s-expressions/suites/Aggregation/many_columns.txt.attr
@@ -0,0 +1,3 @@
+{
+ "_yql_row_spec"={"Type"=["StructType";[["assignment_assignment_id";["DataType";"Uint64"]];["assignment_last_status_change_time";["DataType";"Uint64"]];["requester_id";["DataType";"Uint64"]];["project_id";["DataType";"Uint64"]];["assignment_gs_count";["DataType";"Uint64"]];["assignment_gs_weight";["DataType";"Uint64"]]]]}
+}