diff options
author | Maxim Yurchuk <maxim-yurchuk@ydb.tech> | 2024-11-20 17:37:57 +0000 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-11-20 17:37:57 +0000 |
commit | f76323e9b295c15751e51e3443aa47a36bee8023 (patch) | |
tree | 4113c8cad473a33e0f746966e0cf087252fa1d7a /yql/essentials/tests/sql/suites/produce | |
parent | 753ecb8d410a4cb459c26f3a0082fb2d1724fe63 (diff) | |
parent | a7b9a6afea2a9d7a7bfac4c5eb4c1a8e60adb9e6 (diff) | |
download | ydb-f76323e9b295c15751e51e3443aa47a36bee8023.tar.gz |
Merge pull request #11788 from ydb-platform/mergelibs-241120-1113
Library import 241120-1113
Diffstat (limited to 'yql/essentials/tests/sql/suites/produce')
125 files changed, 1612 insertions, 0 deletions
diff --git a/yql/essentials/tests/sql/suites/produce/default.cfg b/yql/essentials/tests/sql/suites/produce/default.cfg new file mode 100644 index 0000000000..55e29c24ba --- /dev/null +++ b/yql/essentials/tests/sql/suites/produce/default.cfg @@ -0,0 +1,7 @@ +in Input0 input0.txt +in Input1 input1.txt +udf python3_udf +udf streaming_udf +udf simple_udf +udf string_udf +udf structs_udf diff --git a/yql/essentials/tests/sql/suites/produce/descending.txt b/yql/essentials/tests/sql/suites/produce/descending.txt new file mode 100644 index 0000000000..8e63b46e56 --- /dev/null +++ b/yql/essentials/tests/sql/suites/produce/descending.txt @@ -0,0 +1,2 @@ +{"key"="075";"subkey"="2";"value"="abc"}; +{"key"="020";"subkey"="1";"value"="q"}; diff --git a/yql/essentials/tests/sql/suites/produce/descending.txt.attr b/yql/essentials/tests/sql/suites/produce/descending.txt.attr new file mode 100644 index 0000000000..4f8cb9b625 --- /dev/null +++ b/yql/essentials/tests/sql/suites/produce/descending.txt.attr @@ -0,0 +1,25 @@ +{ + "schema"=< + "strict" = %true; + "unique_keys" = %false + >[ + { + "name" = "key"; + "type" = "string"; + "required" = %true; + "sort_order" = "descending"; + }; + { + "name" = "subkey"; + "type" = "string"; + "required" = %true; + "sort_order" = "descending"; + }; + { + "name" = "value"; + "type" = "string"; + "required" = %true; + "sort_order" = "descending"; + }; + ] +}
\ No newline at end of file diff --git a/yql/essentials/tests/sql/suites/produce/discard_process_with_lambda.sql b/yql/essentials/tests/sql/suites/produce/discard_process_with_lambda.sql new file mode 100644 index 0000000000..ea5a9252ff --- /dev/null +++ b/yql/essentials/tests/sql/suites/produce/discard_process_with_lambda.sql @@ -0,0 +1,8 @@ +/* syntax version 1 */ +/* postgres can not */ +$udf = YQL::@@(lambda '(x) +(FlatMap x + (lambda '(y) (AsList y y)) +))@@; + +discard process plato.Input0 using $udf(TableRows()); diff --git a/yql/essentials/tests/sql/suites/produce/discard_reduce_lambda.sql b/yql/essentials/tests/sql/suites/produce/discard_reduce_lambda.sql new file mode 100644 index 0000000000..132b492b4c --- /dev/null +++ b/yql/essentials/tests/sql/suites/produce/discard_reduce_lambda.sql @@ -0,0 +1,9 @@ +/* postgres can not */ +USE plato; + +$udf = YQL::@@(lambda '(key stream) (AsStruct + '('key key) '('summ (Collect (Condense stream (Uint32 '0) (lambda '(item state) (Bool 'False)) (lambda '(item state) (Add state item))))) +))@@; + +--INSERT INTO Output +DISCARD REDUCE Input1 ON key USING $udf(cast(value as uint32) ?? 0); diff --git a/yql/essentials/tests/sql/suites/produce/empty.txt b/yql/essentials/tests/sql/suites/produce/empty.txt new file mode 100644 index 0000000000..e69de29bb2 --- /dev/null +++ b/yql/essentials/tests/sql/suites/produce/empty.txt diff --git a/yql/essentials/tests/sql/suites/produce/fuse_reduces_with_presort.cfg b/yql/essentials/tests/sql/suites/produce/fuse_reduces_with_presort.cfg new file mode 100644 index 0000000000..612a5060aa --- /dev/null +++ b/yql/essentials/tests/sql/suites/produce/fuse_reduces_with_presort.cfg @@ -0,0 +1 @@ +in Input input_sorted.txt
\ No newline at end of file diff --git a/yql/essentials/tests/sql/suites/produce/fuse_reduces_with_presort.sql b/yql/essentials/tests/sql/suites/produce/fuse_reduces_with_presort.sql new file mode 100644 index 0000000000..b4f5afd8c2 --- /dev/null +++ b/yql/essentials/tests/sql/suites/produce/fuse_reduces_with_presort.sql @@ -0,0 +1,69 @@ +USE plato; + +$reduce = ($_, $TableRows) -> { + return Yql::Condense1( + $TableRows, + ($item) -> ($item), + ($_, $_) -> (false), + ($item, $_) -> ($item) + ); +}; + +$stream = +select * from Input; + +-- + +$stream1 = Reduce $stream +presort value1 +on key, subkey +using $reduce(TableRow()) +assume order by key, subkey, value1; + +$stream1 = Reduce $stream1 +presort value1 +on key, subkey +using $reduce(TableRow()) +assume order by key, subkey, value1; + +-- + +$stream2 = Reduce $stream +presort value1, value2 +on key, subkey +using $reduce(TableRow()) +assume order by key, subkey, value1, value2; + +$stream2 = Reduce $stream2 +presort value1 +on key, subkey +using $reduce(TableRow()) +assume order by key, subkey, value1; + +-- + +$stream3 = Reduce $stream +presort value1, value2, value3 +on key, subkey +using $reduce(TableRow()) +assume order by key, subkey, value1, value2, value3; + +$stream3 = Reduce $stream3 +on key, subkey +using $reduce(TableRow()) +assume order by key, subkey; + +select + * +from $stream1 +ASSUME ORDER BY `key`, `subkey`; + +select + * +from $stream2 +ASSUME ORDER BY `key`, `subkey`; + +select + * +from $stream3 +ASSUME ORDER BY `key`, `subkey`; diff --git a/yql/essentials/tests/sql/suites/produce/input0.txt b/yql/essentials/tests/sql/suites/produce/input0.txt new file mode 100644 index 0000000000..65949ea745 --- /dev/null +++ b/yql/essentials/tests/sql/suites/produce/input0.txt @@ -0,0 +1,4 @@ +{"key"="075";"subkey"="1";"value"="abc"}; +{"key"="800";"subkey"="2";"value"="ddd"}; +{"key"="020";"subkey"="3";"value"="q"}; +{"key"="150";"subkey"="4";"value"="qzz"}; diff --git a/yql/essentials/tests/sql/suites/produce/input1.txt b/yql/essentials/tests/sql/suites/produce/input1.txt new file mode 100644 index 0000000000..60ee525827 --- /dev/null +++ b/yql/essentials/tests/sql/suites/produce/input1.txt @@ -0,0 +1,9 @@ +{"key"="foo";"subkey"="a";"value"="7"}; +{"key"="foo";"subkey"="b";"value"="1"}; +{"key"="foo";"subkey"="b";"value"="0"}; +{"key"="foo";"subkey"="a";"value"="2"}; +{"key"="bar";"subkey"="b";"value"="1"}; +{"key"="bar";"subkey"="u";"value"="2"}; +{"key"="bar";"subkey"="n";"value"="3"}; +{"key"="bar";"subkey"="n";"value"="4"}; +{"key"="bar";"subkey"="y";"value"="5"}; diff --git a/yql/essentials/tests/sql/suites/produce/input2.txt b/yql/essentials/tests/sql/suites/produce/input2.txt new file mode 100644 index 0000000000..b214aab0d9 --- /dev/null +++ b/yql/essentials/tests/sql/suites/produce/input2.txt @@ -0,0 +1,10 @@ +{"key"="023";"subkey"="3";"value"="aaa"}; +{"key"="037";"subkey"="5";"value"="ddd"}; +{"key"="075";"subkey"="1";"value"="abc"}; +{"key"="150";"subkey"="1";"value"="aaa"}; +{"key"="150";"subkey"="3";"value"="iii"}; +{"key"="150";"subkey"="8";"value"="zzz"}; +{"key"="200";"subkey"="7";"value"="qqq"}; +{"key"="527";"subkey"="4";"value"="bbb"}; +{"key"="761";"subkey"="6";"value"="ccc"}; +{"key"="911";"subkey"="2";"value"="kkk"}; diff --git a/yql/essentials/tests/sql/suites/produce/input_sorted.txt b/yql/essentials/tests/sql/suites/produce/input_sorted.txt new file mode 100644 index 0000000000..070ba2f987 --- /dev/null +++ b/yql/essentials/tests/sql/suites/produce/input_sorted.txt @@ -0,0 +1,4 @@ +{"key"="020";"subkey"="1";"value1"="abc";"value2"="aabbcc";"value3"="aa"}; +{"key"="075";"subkey"="2";"value1"="ddd";"value2"="dddddd";"value3"="bb"}; +{"key"="150";"subkey"="3";"value1"="q";"value2"="qq";"value3"="cc"}; +{"key"="800";"subkey"="4";"value1"="qzz";"value2"="qqzzzz";"value3"="dd"};
\ No newline at end of file diff --git a/yql/essentials/tests/sql/suites/produce/input_sorted.txt.attr b/yql/essentials/tests/sql/suites/produce/input_sorted.txt.attr new file mode 100644 index 0000000000..f091ca59dd --- /dev/null +++ b/yql/essentials/tests/sql/suites/produce/input_sorted.txt.attr @@ -0,0 +1,15 @@ +{ + "_yql_row_spec"={ + "Type"=["StructType";[ + ["key";["DataType";"String"]]; + ["subkey";["DataType";"String"]]; + ["value1";["DataType";"String"]]; + ["value2";["DataType";"String"]]; + ["value3";["DataType";"String"]] + ]]; + "SortDirections"=[1;1;1;1;1]; + "SortedBy"=["key";"subkey";"value1";"value2";"value3"]; + "SortedByTypes"=[["DataType";"String"];["DataType";"String"];["DataType";"String"];["DataType";"String"];["DataType";"String"]]; + "SortMembers"=["key";"subkey";"value1";"value2";"value3"]; + } +} diff --git a/yql/essentials/tests/sql/suites/produce/native_desc_reduce_with_presort.cfg b/yql/essentials/tests/sql/suites/produce/native_desc_reduce_with_presort.cfg new file mode 100644 index 0000000000..faf73f95ff --- /dev/null +++ b/yql/essentials/tests/sql/suites/produce/native_desc_reduce_with_presort.cfg @@ -0,0 +1,3 @@ +in Input1 descending.txt +in Input2 sorted.txt +res result.txt diff --git a/yql/essentials/tests/sql/suites/produce/native_desc_reduce_with_presort.sql b/yql/essentials/tests/sql/suites/produce/native_desc_reduce_with_presort.sql new file mode 100644 index 0000000000..4970a01bbe --- /dev/null +++ b/yql/essentials/tests/sql/suites/produce/native_desc_reduce_with_presort.sql @@ -0,0 +1,23 @@ +/* postgres can not */ +USE plato; +pragma yt.UseNativeDescSort; + +$udf = YQL::@@(lambda '(key stream) (AsStruct + '('key key) '('summ (Collect (Condense stream (Nothing (OptionalType (DataType 'String))) (lambda '(item state) (Bool 'False)) (lambda '(item state) (Coalesce state (Just item)))))) +))@@; + +select * from ( + reduce Input1 presort value desc on key, subkey using $udf(value) --YtReduce +) order by key, summ; + +select * from ( + reduce Input1 presort subkey desc, value desc on key using $udf(value) --YtReduce +) order by key, summ; + +select * from ( + reduce Input1 presort value on key, subkey using $udf(value) --YtMapReduce +) order by key, summ; + +select * from ( + reduce concat(Input1, Input2) presort value desc on key, subkey using $udf(value) --YtMapReduce +) order by key, summ; diff --git a/yql/essentials/tests/sql/suites/produce/process_and_filter.sql b/yql/essentials/tests/sql/suites/produce/process_and_filter.sql new file mode 100644 index 0000000000..2bd992f4f4 --- /dev/null +++ b/yql/essentials/tests/sql/suites/produce/process_and_filter.sql @@ -0,0 +1,2 @@ +/* postgres can not */ +PROCESS pLaTo.Input0 USING SimpleUdf::Echo(value) as val WHERE value == "abc"; diff --git a/yql/essentials/tests/sql/suites/produce/process_lambda_opt_args.sql b/yql/essentials/tests/sql/suites/produce/process_lambda_opt_args.sql new file mode 100644 index 0000000000..612885c95b --- /dev/null +++ b/yql/essentials/tests/sql/suites/produce/process_lambda_opt_args.sql @@ -0,0 +1,14 @@ +USE plato; + +$f = ($x, $optArg?)->{ + return Ensure($x, $optArg is null or len($optArg)>0); +}; + +PROCESS Input0 USING $f(TableRow()); + +PROCESS Input0 USING $f(TableRow(),'foo'); + +PROCESS Input0 USING $f(TableRows()); + +PROCESS Input0 USING $f(TableRows(),'foo'); + diff --git a/yql/essentials/tests/sql/suites/produce/process_multi_in.cfg b/yql/essentials/tests/sql/suites/produce/process_multi_in.cfg new file mode 100644 index 0000000000..19cfc046c5 --- /dev/null +++ b/yql/essentials/tests/sql/suites/produce/process_multi_in.cfg @@ -0,0 +1,3 @@ +in Input input0.txt +res result.txt +udf python3_udf diff --git a/yql/essentials/tests/sql/suites/produce/process_multi_in.sql b/yql/essentials/tests/sql/suites/produce/process_multi_in.sql new file mode 100644 index 0000000000..41d9c9be0e --- /dev/null +++ b/yql/essentials/tests/sql/suites/produce/process_multi_in.sql @@ -0,0 +1,22 @@ +/* syntax version 1 */ +/* postgres can not */ +/* dq file can not */ +$udfScript = @@ +def MyFunc(stream): + return stream +@@; + +$record = (SELECT TableRow() FROM plato.Input); +$recordType = TypeOf(Unwrap($record)); +$streamType = StreamType(VariantType(TupleType($recordType, $recordType, $recordType))); +$udf = Python3::MyFunc(CallableType(0, $streamType, $streamType), $udfScript); + +$src = (select * from plato.Input where key > "200"); + +$i, $j, $k = (PROCESS plato.Input, (select * from plato.Input where key > "100"), $src USING $udf(TableRows())); + +select * from $i; + +select * from $j; + +select * from $k; diff --git a/yql/essentials/tests/sql/suites/produce/process_multi_in_single_out.cfg b/yql/essentials/tests/sql/suites/produce/process_multi_in_single_out.cfg new file mode 100644 index 0000000000..98996c5bf7 --- /dev/null +++ b/yql/essentials/tests/sql/suites/produce/process_multi_in_single_out.cfg @@ -0,0 +1,3 @@ +in Input0 input0.txt +out Output output.txt +providers yt diff --git a/yql/essentials/tests/sql/suites/produce/process_multi_in_single_out.sql b/yql/essentials/tests/sql/suites/produce/process_multi_in_single_out.sql new file mode 100644 index 0000000000..11bc99aa5f --- /dev/null +++ b/yql/essentials/tests/sql/suites/produce/process_multi_in_single_out.sql @@ -0,0 +1,25 @@ +/* syntax version 1 */ +/* postgres can not */ + +$udf = YQL::@@ +(lambda '(stream) + (PartitionByKey stream + (lambda '(item) (Way item)) + (Void) + (Void) + (lambda '(listOfPairs) + (FlatMap listOfPairs (lambda '(pair) + (Map (Nth pair '1) (lambda '(elem) + (AsStruct + '('cnt (Visit elem '0 (lambda '(v) (Member v 'subkey)) '1 (lambda '(v) (Member v 'subkey)))) + '('src (Nth pair '0)) + ) + )) + )) + ) + ) +) +@@; + +INSERT INTO plato.Output WITH TRUNCATE +PROCESS plato.Input0, (select * from plato.Input0 where key > "100") USING $udf(TableRows()); diff --git a/yql/essentials/tests/sql/suites/produce/process_multi_in_trivial_lambda.cfg b/yql/essentials/tests/sql/suites/produce/process_multi_in_trivial_lambda.cfg new file mode 100644 index 0000000000..4468d3ba29 --- /dev/null +++ b/yql/essentials/tests/sql/suites/produce/process_multi_in_trivial_lambda.cfg @@ -0,0 +1,2 @@ +in Input input0.txt +res result.txt diff --git a/yql/essentials/tests/sql/suites/produce/process_multi_in_trivial_lambda.sql b/yql/essentials/tests/sql/suites/produce/process_multi_in_trivial_lambda.sql new file mode 100644 index 0000000000..51c3de3aaf --- /dev/null +++ b/yql/essentials/tests/sql/suites/produce/process_multi_in_trivial_lambda.sql @@ -0,0 +1,12 @@ +/* syntax version 1 */ +/* postgres can not */ +USE plato; + +$lambda = ($x) -> { return $x; }; + +$result = PROCESS Input, Input +USING + $lambda(TableRow()) +; + +SELECT * FROM AS_TABLE($result.0); diff --git a/yql/essentials/tests/sql/suites/produce/process_multi_out.cfg b/yql/essentials/tests/sql/suites/produce/process_multi_out.cfg new file mode 100644 index 0000000000..d5a164924d --- /dev/null +++ b/yql/essentials/tests/sql/suites/produce/process_multi_out.cfg @@ -0,0 +1,4 @@ +in Input input0.txt +res result.txt +udf python2_udf +providers yt diff --git a/yql/essentials/tests/sql/suites/produce/process_multi_out.sql b/yql/essentials/tests/sql/suites/produce/process_multi_out.sql new file mode 100644 index 0000000000..a6162a13a8 --- /dev/null +++ b/yql/essentials/tests/sql/suites/produce/process_multi_out.sql @@ -0,0 +1,24 @@ +/* syntax version 1 */ +/* postgres can not */ +$udfScript = @@ +def MyFunc(list): + return [(int(x.key) % 2, x) for x in list] +@@; + +$record = (SELECT TableRow() FROM plato.Input); +$recordType =TypeOf(Unwrap($record)); + +$udf = Python::MyFunc( + CallableType(0, + StreamType( + VariantType(TupleType($recordType, $recordType)) + ), + StreamType($recordType)), + $udfScript +); + +$i, $j = (PROCESS plato.Input USING $udf(TableRows())); + +select * from $i; + +select * from $j; diff --git a/yql/essentials/tests/sql/suites/produce/process_multi_out_bad_count_fail.cfg b/yql/essentials/tests/sql/suites/produce/process_multi_out_bad_count_fail.cfg new file mode 100644 index 0000000000..00bbe745ac --- /dev/null +++ b/yql/essentials/tests/sql/suites/produce/process_multi_out_bad_count_fail.cfg @@ -0,0 +1,4 @@ +xfail +in Input input0.txt +res result.txt +udf python2_udf diff --git a/yql/essentials/tests/sql/suites/produce/process_multi_out_bad_count_fail.sql b/yql/essentials/tests/sql/suites/produce/process_multi_out_bad_count_fail.sql new file mode 100644 index 0000000000..fdd02d93e3 --- /dev/null +++ b/yql/essentials/tests/sql/suites/produce/process_multi_out_bad_count_fail.sql @@ -0,0 +1,24 @@ +/* syntax version 1 */ +/* postgres can not */ +$udfScript = @@ +def MyFunc(list): + return [(int(x.key) % 2, x) for x in list] +@@; + +$record = (SELECT TableRow() FROM plato.Input); +$recordType =TypeOf(Unwrap($record)); + +$udf = Python::MyFunc( + CallableType(0, + StreamType( + VariantType(TupleType($recordType, $recordType)) + ), + StreamType($recordType)), + $udfScript +); + +$i, $j, $k = (PROCESS plato.Input USING $udf(TableRows())); + +select * from $i; +select * from $j; +select * from $k; diff --git a/yql/essentials/tests/sql/suites/produce/process_pure_with_sort.sql b/yql/essentials/tests/sql/suites/produce/process_pure_with_sort.sql new file mode 100644 index 0000000000..c03cc257ce --- /dev/null +++ b/yql/essentials/tests/sql/suites/produce/process_pure_with_sort.sql @@ -0,0 +1,32 @@ +/* syntax version 1 */ +/* postgres can not */ +USE plato; + +$sorted = ($world, $input, $orderByColumns, $asc) -> { + $n = ListLength($orderByColumns); + + $keySelector = LambdaCode(($row) -> { + $items = ListMap($orderByColumns, + ($x) -> { + RETURN FuncCode("Member", $row, AtomCode($x)); + }); + RETURN ListCode($items); + }); + + $sort = EvaluateCode(LambdaCode(($x) -> { + return FuncCode("Sort", + $x, + ListCode(ListReplicate(ReprCode($asc), $n)), + $keySelector) + })); + + RETURN $sort($input($world)); +}; + +DEFINE SUBQUERY $source() AS + PROCESS Input0; +END DEFINE; + +PROCESS $sorted($source, AsList("key","subkey"), true); +PROCESS $sorted($source, AsList("value"), true); +PROCESS $sorted($source, ListCreate(TypeOf("")), true);
\ No newline at end of file diff --git a/yql/essentials/tests/sql/suites/produce/process_row_and_columns.sql b/yql/essentials/tests/sql/suites/produce/process_row_and_columns.sql new file mode 100644 index 0000000000..8ce580f615 --- /dev/null +++ b/yql/essentials/tests/sql/suites/produce/process_row_and_columns.sql @@ -0,0 +1,19 @@ +/* syntax version 1 */ +/* postgres can not */ +$udfScript = @@ +def processRow(row, tag, separator): + return {"value":row.Name + separator + row.Value + separator + tag}; +@@; + +$udf = Python::processRow( + Callable<(Struct<Name:String, Tag:String, Value:String>, String, String)->Struct<value:String>>, + $udfScript +); + +$data = ( + SELECT key AS Name, value AS Value, subkey AS Tag FROM plato.Input0 +); + +$separator = "|"; + +PROCESS $data USING $udf(TableRow(), Tag, $separator); diff --git a/yql/essentials/tests/sql/suites/produce/process_rows_and_filter.cfg b/yql/essentials/tests/sql/suites/produce/process_rows_and_filter.cfg new file mode 100644 index 0000000000..c1a5b06634 --- /dev/null +++ b/yql/essentials/tests/sql/suites/produce/process_rows_and_filter.cfg @@ -0,0 +1,3 @@ +in Input1 input1.txt +udf python2_udf +providers yt diff --git a/yql/essentials/tests/sql/suites/produce/process_rows_and_filter.sql b/yql/essentials/tests/sql/suites/produce/process_rows_and_filter.sql new file mode 100644 index 0000000000..1af88830eb --- /dev/null +++ b/yql/essentials/tests/sql/suites/produce/process_rows_and_filter.sql @@ -0,0 +1,23 @@ +/* syntax version 1 */ +/* postgres can not */ +$udfScript = @@ +def processRows(prefix, rowList, separator): + result = []; + for row in rowList: + result.append({"Data" : prefix + row.Name + separator + row.Value}); + + return result; +@@; + +$udf = Python::processRows( + Callable<(String, Stream<Struct<Name:String, Value:String>>, String)->Stream<Struct<Data:String>>>, + $udfScript +); + +$data = ( + SELECT key AS Name, value AS Value FROM plato.Input1 +); + +$prefix = ">>"; + +PROCESS $data USING $udf($prefix, TableRows(), "=") WHERE Name != "foo"; diff --git a/yql/essentials/tests/sql/suites/produce/process_rows_sorted_desc_multi_out.cfg b/yql/essentials/tests/sql/suites/produce/process_rows_sorted_desc_multi_out.cfg new file mode 100644 index 0000000000..a1b36ede45 --- /dev/null +++ b/yql/essentials/tests/sql/suites/produce/process_rows_sorted_desc_multi_out.cfg @@ -0,0 +1,2 @@ +res result.txt +providers yt diff --git a/yql/essentials/tests/sql/suites/produce/process_rows_sorted_desc_multi_out.sql b/yql/essentials/tests/sql/suites/produce/process_rows_sorted_desc_multi_out.sql new file mode 100644 index 0000000000..97fb0222b6 --- /dev/null +++ b/yql/essentials/tests/sql/suites/produce/process_rows_sorted_desc_multi_out.sql @@ -0,0 +1,30 @@ +USE plato; + +$values = ListMap( + ListFromRange(0, 30), + ($x) -> (AsStruct($x as x)) +); + +INSERT INTO @table SELECT * FROM AS_TABLE($values) ORDER BY x DESC; + +COMMIT; + +$splitter = ($rows) -> { + $recordType = StreamItemType(TypeOf($rows)); + $varType = VariantType(TupleType($recordType, $recordType, $recordType, $recordType)); + RETURN Yql::OrderedMap($rows, ($row) -> { + RETURN CASE $row.x + WHEN 0 THEN Variant($row, "0", $varType) + WHEN 1 THEN Variant($row, "1", $varType) + WHEN 2 THEN Variant($row, "2", $varType) + ELSE Variant($row, "3", $varType) + END; + }); +}; + +$a, $b, $c, $d = (PROCESS @table USING $splitter(TableRows())); + +SELECT * FROM $a; +SELECT * FROM $b; +SELECT * FROM $c ORDER BY x DESC; +SELECT * FROM $d ORDER BY x DESC;
\ No newline at end of file diff --git a/yql/essentials/tests/sql/suites/produce/process_rows_sorted_multi_out.cfg b/yql/essentials/tests/sql/suites/produce/process_rows_sorted_multi_out.cfg new file mode 100644 index 0000000000..a1b36ede45 --- /dev/null +++ b/yql/essentials/tests/sql/suites/produce/process_rows_sorted_multi_out.cfg @@ -0,0 +1,2 @@ +res result.txt +providers yt diff --git a/yql/essentials/tests/sql/suites/produce/process_rows_sorted_multi_out.sql b/yql/essentials/tests/sql/suites/produce/process_rows_sorted_multi_out.sql new file mode 100644 index 0000000000..17ecc1de2b --- /dev/null +++ b/yql/essentials/tests/sql/suites/produce/process_rows_sorted_multi_out.sql @@ -0,0 +1,30 @@ +USE plato; + +$values = ListMap( + ListFromRange(0, 30), + ($x) -> (AsStruct($x as x)) +); + +INSERT INTO @table SELECT * FROM AS_TABLE($values) ORDER BY x; + +COMMIT; + +$splitter = ($rows) -> { + $recordType = StreamItemType(TypeOf($rows)); + $varType = VariantType(TupleType($recordType, $recordType, $recordType, $recordType)); + RETURN Yql::OrderedMap($rows, ($row) -> { + RETURN CASE $row.x + WHEN 0 THEN Variant($row, "0", $varType) + WHEN 1 THEN Variant($row, "1", $varType) + WHEN 2 THEN Variant($row, "2", $varType) + ELSE Variant($row, "3", $varType) + END; + }); +}; + +$a, $b, $c, $d = (PROCESS @table USING $splitter(TableRows())); + +SELECT * FROM $a; +SELECT * FROM $b; +SELECT * FROM $c ORDER BY x; +SELECT * FROM $d ORDER BY x;
\ No newline at end of file diff --git a/yql/essentials/tests/sql/suites/produce/process_sorted_desc_multi_out.cfg b/yql/essentials/tests/sql/suites/produce/process_sorted_desc_multi_out.cfg new file mode 100644 index 0000000000..a1b36ede45 --- /dev/null +++ b/yql/essentials/tests/sql/suites/produce/process_sorted_desc_multi_out.cfg @@ -0,0 +1,2 @@ +res result.txt +providers yt diff --git a/yql/essentials/tests/sql/suites/produce/process_sorted_desc_multi_out.sql b/yql/essentials/tests/sql/suites/produce/process_sorted_desc_multi_out.sql new file mode 100644 index 0000000000..478eda3d2c --- /dev/null +++ b/yql/essentials/tests/sql/suites/produce/process_sorted_desc_multi_out.sql @@ -0,0 +1,28 @@ +USE plato; + +$values = ListMap( + ListFromRange(0, 30), + ($x) -> (AsStruct($x as x)) +); + +INSERT INTO @table SELECT * FROM AS_TABLE($values) ORDER BY x DESC; + +COMMIT; + +$splitter = ($row) -> { + $recordType = TypeOf($row); + $varType = VariantType(TupleType($recordType, $recordType, $recordType, $recordType)); + RETURN CASE $row.x + WHEN 0 THEN Variant($row, "0", $varType) + WHEN 1 THEN Variant($row, "1", $varType) + WHEN 2 THEN Variant($row, "2", $varType) + ELSE Variant($row, "3", $varType) + END +}; + +$a, $b, $c, $d = (PROCESS @table USING $splitter(TableRow())); + +SELECT * FROM $a; +SELECT * FROM $b; +SELECT * FROM $c ORDER BY x DESC; +SELECT * FROM $d ORDER BY x DESC;
\ No newline at end of file diff --git a/yql/essentials/tests/sql/suites/produce/process_sorted_multi_out.cfg b/yql/essentials/tests/sql/suites/produce/process_sorted_multi_out.cfg new file mode 100644 index 0000000000..a1b36ede45 --- /dev/null +++ b/yql/essentials/tests/sql/suites/produce/process_sorted_multi_out.cfg @@ -0,0 +1,2 @@ +res result.txt +providers yt diff --git a/yql/essentials/tests/sql/suites/produce/process_sorted_multi_out.sql b/yql/essentials/tests/sql/suites/produce/process_sorted_multi_out.sql new file mode 100644 index 0000000000..87a401a806 --- /dev/null +++ b/yql/essentials/tests/sql/suites/produce/process_sorted_multi_out.sql @@ -0,0 +1,28 @@ +USE plato; + +$values = ListMap( + ListFromRange(0, 30), + ($x) -> (AsStruct($x as x)) +); + +INSERT INTO @table SELECT * FROM AS_TABLE($values) ORDER BY x; + +COMMIT; + +$splitter = ($row) -> { + $recordType = TypeOf($row); + $varType = VariantType(TupleType($recordType, $recordType, $recordType, $recordType)); + RETURN CASE $row.x + WHEN 0 THEN Variant($row, "0", $varType) + WHEN 1 THEN Variant($row, "1", $varType) + WHEN 2 THEN Variant($row, "2", $varType) + ELSE Variant($row, "3", $varType) + END +}; + +$a, $b, $c, $d = (PROCESS @table USING $splitter(TableRow())); + +SELECT * FROM $a; +SELECT * FROM $b; +SELECT * FROM $c ORDER BY x; +SELECT * FROM $d ORDER BY x;
\ No newline at end of file diff --git a/yql/essentials/tests/sql/suites/produce/process_streaming.sql b/yql/essentials/tests/sql/suites/produce/process_streaming.sql new file mode 100644 index 0000000000..1ad7274c79 --- /dev/null +++ b/yql/essentials/tests/sql/suites/produce/process_streaming.sql @@ -0,0 +1,22 @@ +/* syntax version 1 */ +/* postgres can not */ +-- not supported on windows + +$input = ( + SELECT String::JoinFromList(AsList(key, subkey, value), ",") AS Data FROM plato.Input1 +); + +$processed = ( + PROCESS $input USING Streaming::Process(TableRows(), "grep", AsList("[14]")) +); + +$list = ( + SELECT String::SplitToList(Data, ',') AS DataList FROM $processed +); + +SELECT + input.DataList[0] AS key, + input.DataList[1] AS subkey, + input.DataList[2] AS value +FROM $list AS input; + diff --git a/yql/essentials/tests/sql/suites/produce/process_streaming_count.sql b/yql/essentials/tests/sql/suites/produce/process_streaming_count.sql new file mode 100644 index 0000000000..c8111d385a --- /dev/null +++ b/yql/essentials/tests/sql/suites/produce/process_streaming_count.sql @@ -0,0 +1,19 @@ +/* syntax version 1 */ +/* postgres can not */ +-- not supported on windows + +$input = ( + SELECT String::JoinFromList(AsList(key, subkey, value), ",") AS Data FROM plato.Input1 +); + +$processed = ( + PROCESS $input USING Streaming::Process(TableRows(), "grep", AsList("[14]")) +); + +SELECT + * +FROM $processed; + +SELECT + COUNT(*) +FROM $processed; diff --git a/yql/essentials/tests/sql/suites/produce/process_streaming_inline_bash.sql b/yql/essentials/tests/sql/suites/produce/process_streaming_inline_bash.sql new file mode 100644 index 0000000000..5737cf807b --- /dev/null +++ b/yql/essentials/tests/sql/suites/produce/process_streaming_inline_bash.sql @@ -0,0 +1,14 @@ +/* syntax version 1 */ +/* postgres can not */ +-- not supported on windows + +$script = @@ +#!/bin/bash +cat - | grep $1 | head -n 3 | grep [234] +@@; + +$input = ( + SELECT String::JoinFromList(AsList(key, subkey, value), ",") AS Data FROM plato.Input1 +); + +PROCESS $input USING Streaming::ProcessInline(TableRows(), $script, AsList("bar")); diff --git a/yql/essentials/tests/sql/suites/produce/process_trivial_as_struct.sql b/yql/essentials/tests/sql/suites/produce/process_trivial_as_struct.sql new file mode 100644 index 0000000000..a948124640 --- /dev/null +++ b/yql/essentials/tests/sql/suites/produce/process_trivial_as_struct.sql @@ -0,0 +1,2 @@ +/* postgres can not */ +process plato.Input0 using SimpleUdf::Echo(value) as val;
\ No newline at end of file diff --git a/yql/essentials/tests/sql/suites/produce/process_with_assume.cfg b/yql/essentials/tests/sql/suites/produce/process_with_assume.cfg new file mode 100644 index 0000000000..66737248b8 --- /dev/null +++ b/yql/essentials/tests/sql/suites/produce/process_with_assume.cfg @@ -0,0 +1,2 @@ +in Input sorted.txt +out Output output.txt diff --git a/yql/essentials/tests/sql/suites/produce/process_with_assume.sql b/yql/essentials/tests/sql/suites/produce/process_with_assume.sql new file mode 100644 index 0000000000..ddb1353299 --- /dev/null +++ b/yql/essentials/tests/sql/suites/produce/process_with_assume.sql @@ -0,0 +1,12 @@ +/* postgres can not */ +/* multirun can not */ +/* syntax version 1 */ +use plato; + +$udf = YQL::@@(lambda '(x) +(FlatMap x + (lambda '(y) (Just (AsStruct '('key (Concat (String '"0") (Member y 'key))) '('subkey (Member y 'subkey)) '('value (Member y 'value))))) +))@@; + +insert into Output with truncate +process plato.Input using $udf(TableRows()) assume order by key; diff --git a/yql/essentials/tests/sql/suites/produce/process_with_lambda.sql b/yql/essentials/tests/sql/suites/produce/process_with_lambda.sql new file mode 100644 index 0000000000..e0886faaf8 --- /dev/null +++ b/yql/essentials/tests/sql/suites/produce/process_with_lambda.sql @@ -0,0 +1,8 @@ +/* syntax version 1 */ +/* postgres can not */ +$udf = YQL::@@(lambda '(x) +(FlatMap x + (lambda '(y) (AsList y y)) +))@@; + +process plato.Input0 using $udf(TableRows()); diff --git a/yql/essentials/tests/sql/suites/produce/process_with_lambda_outstream.sql b/yql/essentials/tests/sql/suites/produce/process_with_lambda_outstream.sql new file mode 100644 index 0000000000..0c6c100764 --- /dev/null +++ b/yql/essentials/tests/sql/suites/produce/process_with_lambda_outstream.sql @@ -0,0 +1,27 @@ +/* syntax version 1 */ +/* postgres can not */ +USE plato; + +$f1 = ($r)->{ + return $r; +}; + +PROCESS Input0 USING $f1(TableRow()); + +$f2 = ($r)->{ + return Just($r); +}; + +PROCESS Input0 USING $f2(TableRow()); + +$f3 = ($r)->{ + return AsList($r,$r); +}; + +PROCESS Input0 USING $f3(TableRow()); + +$f4 = ($r)->{ + return Yql::Iterator(AsList($r,$r)); +}; + +PROCESS Input0 USING $f4(TableRow()); diff --git a/yql/essentials/tests/sql/suites/produce/process_with_python.sql b/yql/essentials/tests/sql/suites/produce/process_with_python.sql new file mode 100644 index 0000000000..caeb8c2cb3 --- /dev/null +++ b/yql/essentials/tests/sql/suites/produce/process_with_python.sql @@ -0,0 +1,10 @@ +/* postgres can not */ +/* syntax version 1 */ +$udfScript = @@ +def Dup(s): + return [{"value":s},{"value":s}] +@@; + +$udf = Python::Dup(Callable<(String)->List<Struct<value:String>>>, $udfScript); + +process plato.Input0 using $udf(value); diff --git a/yql/essentials/tests/sql/suites/produce/process_with_python_as_struct.sql b/yql/essentials/tests/sql/suites/produce/process_with_python_as_struct.sql new file mode 100644 index 0000000000..237c2abef8 --- /dev/null +++ b/yql/essentials/tests/sql/suites/produce/process_with_python_as_struct.sql @@ -0,0 +1,10 @@ +/* postgres can not */ +/* syntax version 1 */ +$udfScript = @@ +def Dup(s): + return [s, s]; +@@; + +$udf = Python::Dup(Callable<(String)->List<String>>, $udfScript); + +process plato.Input0 using $udf(value) as val; diff --git a/yql/essentials/tests/sql/suites/produce/process_with_python_stream-empty.cfg b/yql/essentials/tests/sql/suites/produce/process_with_python_stream-empty.cfg new file mode 100644 index 0000000000..837223c684 --- /dev/null +++ b/yql/essentials/tests/sql/suites/produce/process_with_python_stream-empty.cfg @@ -0,0 +1,2 @@ +in Input0 empty.txt +udf python3_udf diff --git a/yql/essentials/tests/sql/suites/produce/process_with_python_stream.cfg b/yql/essentials/tests/sql/suites/produce/process_with_python_stream.cfg new file mode 100644 index 0000000000..e34e70ff55 --- /dev/null +++ b/yql/essentials/tests/sql/suites/produce/process_with_python_stream.cfg @@ -0,0 +1,3 @@ +in Input0 input0.txt +udf python3_udf +providers yt diff --git a/yql/essentials/tests/sql/suites/produce/process_with_python_stream.sql b/yql/essentials/tests/sql/suites/produce/process_with_python_stream.sql new file mode 100644 index 0000000000..33a0b8f8f7 --- /dev/null +++ b/yql/essentials/tests/sql/suites/produce/process_with_python_stream.sql @@ -0,0 +1,23 @@ +/* syntax version 1 */ +/* kikimr can not */ +USE plato; + +$udfScript = @@ +def f(input,x): + for i in input: + yield { + 'key': i.key, + 'subkey': i.subkey, + 'value': i.value, + 'pass': x + } +@@; + +$udf_stream = Python3::f( +Callable< + (Stream<Struct<key:String,subkey:String,value:String>>,Int32) + -> + Stream<Struct<key:String,subkey:String,value:String,pass:Int32>> +>, $udfScript); + +PROCESS Input0 using $udf_stream(TableRows(), 2); diff --git a/yql/essentials/tests/sql/suites/produce/process_with_udf.sql b/yql/essentials/tests/sql/suites/produce/process_with_udf.sql new file mode 100644 index 0000000000..f8fb326983 --- /dev/null +++ b/yql/essentials/tests/sql/suites/produce/process_with_udf.sql @@ -0,0 +1,2 @@ +/* postgres can not */ +process plato.Input0 using Person::New(key, subkey, coalesce(cast(value as Uint32), 0));
\ No newline at end of file diff --git a/yql/essentials/tests/sql/suites/produce/process_with_udf_rows.sql b/yql/essentials/tests/sql/suites/produce/process_with_udf_rows.sql new file mode 100644 index 0000000000..3df9f4df1b --- /dev/null +++ b/yql/essentials/tests/sql/suites/produce/process_with_udf_rows.sql @@ -0,0 +1,17 @@ +/* postgres can not */ +$udf = Python::process( +Callable< + ()->Stream<Struct<result:Int64>> +>, @@ +def process(): + for row in range(10): + result = row + yield locals() +@@); + +$users = ( + SELECT `key` AS age, `value` AS name FROM plato.Input0 +); + +PROCESS $users +USING $udf(); diff --git a/yql/essentials/tests/sql/suites/produce/process_with_udf_validate.sql b/yql/essentials/tests/sql/suites/produce/process_with_udf_validate.sql new file mode 100644 index 0000000000..e5082fdd77 --- /dev/null +++ b/yql/essentials/tests/sql/suites/produce/process_with_udf_validate.sql @@ -0,0 +1,7 @@ +/* postgres can not */ +$processed = ( + process plato.Input0 using Person::New(key, subkey, coalesce(cast(value as Uint32), 0)) +); + +PRAGMA config.flags("ValidateUdf", "Lazy"); +SELECT * FROM $processed; diff --git a/yql/essentials/tests/sql/suites/produce/process_with_udf_validate_ignore_broken.sql b/yql/essentials/tests/sql/suites/produce/process_with_udf_validate_ignore_broken.sql new file mode 100644 index 0000000000..de471f14f3 --- /dev/null +++ b/yql/essentials/tests/sql/suites/produce/process_with_udf_validate_ignore_broken.sql @@ -0,0 +1,8 @@ +/* postgres can not */ + +$processed = ( + process plato.Input0 using Person::New(key, subkey, Length(SimpleUdf::ReturnBrokenInt())) +); + +PRAGMA config.flags("ValidateUdf", "None"); +SELECT * FROM $processed; diff --git a/yql/essentials/tests/sql/suites/produce/reduce_all.sql b/yql/essentials/tests/sql/suites/produce/reduce_all.sql new file mode 100644 index 0000000000..89362a4454 --- /dev/null +++ b/yql/essentials/tests/sql/suites/produce/reduce_all.sql @@ -0,0 +1,16 @@ +/* syntax version 1 */ +/* postgres can not */ +/* dqfile can not */ +USE plato; + +$udfScript = @@ +import functools +def Len(stream): + sums = [functools.reduce(lambda x,y: x + int(y.value), pair[1], 0) for pair in stream] + return {"sumByAllVal":functools.reduce(lambda x,y: x + y, sums, 0)} +@@; + +$udf = Python::Len(Callable<(Stream<Tuple<String,Stream<Struct<key:String,subkey:String,value:String>>>>)->Struct<sumByAllVal:Uint32>>, $udfScript); + +--INSERT INTO Output +REDUCE Input1 ON key USING ALL $udf(TableRow()); diff --git a/yql/essentials/tests/sql/suites/produce/reduce_all_expr.sql b/yql/essentials/tests/sql/suites/produce/reduce_all_expr.sql new file mode 100644 index 0000000000..d4e194802f --- /dev/null +++ b/yql/essentials/tests/sql/suites/produce/reduce_all_expr.sql @@ -0,0 +1,16 @@ +/* postgres can not */ +/* syntax version 1 */ +/* dqfile can not */ +USE plato; + +$udfScript = @@ +import functools +def Len(stream): + sums = [functools.reduce(lambda x,y: x + y, pair[1], 0) for pair in stream] + return {"sumByAllVal":functools.reduce(lambda x,y: x + y, sums, 0)} +@@; + +$udf = Python::Len(Callable<(Stream<Tuple<String,Stream<Uint32>>>)->Struct<sumByAllVal:Uint32>>, $udfScript); + +--INSERT INTO Output +REDUCE Input1 ON key USING ALL $udf(cast(value as uint32) ?? 0); diff --git a/yql/essentials/tests/sql/suites/produce/reduce_all_list.sql b/yql/essentials/tests/sql/suites/produce/reduce_all_list.sql new file mode 100644 index 0000000000..ec08c7922d --- /dev/null +++ b/yql/essentials/tests/sql/suites/produce/reduce_all_list.sql @@ -0,0 +1,18 @@ +/* syntax version 1 */ +/* postgres can not */ +/* dq can not */ +/* dqfile can not */ +USE plato; + +$udfScript = @@ +import functools + +def Len(stream): + sums = [functools.reduce(lambda x,y: x + int(y.value), pair[1], 0) for pair in stream] + return [{"sumByAllVal":functools.reduce(lambda x,y: x + y, sums, 0)}] +@@; + +$udf = Python::Len(Callable<(Stream<Tuple<String,Stream<Struct<key:String,subkey:String,value:String>>>>)->List<Struct<sumByAllVal:Uint32>>>, $udfScript); + +--INSERT INTO Output +REDUCE Input1 ON key USING ALL $udf(TableRow()); diff --git a/yql/essentials/tests/sql/suites/produce/reduce_all_list_stream.sql b/yql/essentials/tests/sql/suites/produce/reduce_all_list_stream.sql new file mode 100644 index 0000000000..5e154426c0 --- /dev/null +++ b/yql/essentials/tests/sql/suites/produce/reduce_all_list_stream.sql @@ -0,0 +1,25 @@ +/* syntax version 1 */ +/* postgres can not */ +USE plato; + +$udfScript = @@ +import functools +from yql import TYieldIteration + +def Sum(stream): + def Gen(stream): + sums = [] + for pair in stream: + if isinstance(pair, TYieldIteration): + yield pair + else: + sums.append(functools.reduce(lambda x,y: x + int(y.value), pair[1], 0)) + + yield {"sumByAllVal":functools.reduce(lambda x,y: x + y, sums, 0)} + return Gen(stream) +@@; + +$udf = Python3::Sum(Callable<(Stream<Tuple<String,Stream<Struct<key:String,subkey:String,value:String>>>>)->Stream<Struct<sumByAllVal:Uint32>>>, $udfScript); + +--INSERT INTO Output +REDUCE Input1 ON key USING ALL $udf(TableRow()); diff --git a/yql/essentials/tests/sql/suites/produce/reduce_all_multi_in.sql b/yql/essentials/tests/sql/suites/produce/reduce_all_multi_in.sql new file mode 100644 index 0000000000..6f6750898e --- /dev/null +++ b/yql/essentials/tests/sql/suites/produce/reduce_all_multi_in.sql @@ -0,0 +1,15 @@ +/* syntax version 1 */ +/* postgres can not */ +/* dqfile can not */ +USE plato; + +$udfScript = @@ +import functools +def Len(stream): + sums = [functools.reduce(lambda x,y: x + int(y[1].value), pair[1], 0) for pair in stream] + return {"sumByAllVal":functools.reduce(lambda x,y: x + y, sums, 0)} +@@; + +$udf = Python::Len(Callable<(Stream<Tuple<String,Stream<Variant<Struct<key:String,subkey:String,value:String>,Struct<key:String,subkey:String,value:String>>>>>)->Struct<sumByAllVal:Uint32>>, $udfScript); + +REDUCE Input1, Input1 ON key USING ALL $udf(TableRow()); diff --git a/yql/essentials/tests/sql/suites/produce/reduce_all_opt.sql b/yql/essentials/tests/sql/suites/produce/reduce_all_opt.sql new file mode 100644 index 0000000000..467be50696 --- /dev/null +++ b/yql/essentials/tests/sql/suites/produce/reduce_all_opt.sql @@ -0,0 +1,16 @@ +/* syntax version 1 */ +/* postgres can not */ +/* dqfile can not */ +USE plato; + +$udfScript = @@ +import functools +def Len(stream): + sums = [functools.reduce(lambda x,y: x + int(y.value), pair[1], 0) for pair in stream] + return {"sumByAllVal":functools.reduce(lambda x,y: x + y, sums, 0)} +@@; + +$udf = Python::Len(Callable<(Stream<Tuple<String,Stream<Struct<key:String,subkey:String,value:String>>>>)->Optional<Struct<sumByAllVal:Uint32>>>, $udfScript); + +--INSERT INTO Output +REDUCE Input1 ON key USING ALL $udf(TableRow()); diff --git a/yql/essentials/tests/sql/suites/produce/reduce_all_with_python_input_stream-dq_fail.cfg b/yql/essentials/tests/sql/suites/produce/reduce_all_with_python_input_stream-dq_fail.cfg new file mode 100644 index 0000000000..c426164726 --- /dev/null +++ b/yql/essentials/tests/sql/suites/produce/reduce_all_with_python_input_stream-dq_fail.cfg @@ -0,0 +1,4 @@ +xfail +in Input1 input1.txt +udf python3_udf +providers dq diff --git a/yql/essentials/tests/sql/suites/produce/reduce_all_with_python_input_stream._sql b/yql/essentials/tests/sql/suites/produce/reduce_all_with_python_input_stream._sql new file mode 100644 index 0000000000..82b3767cc5 --- /dev/null +++ b/yql/essentials/tests/sql/suites/produce/reduce_all_with_python_input_stream._sql @@ -0,0 +1,14 @@ +/* postgres can not */ +USE plato; + +$udfScript = @@ +import functools +def Len(stream): + sums = [functools.reduce(lambda x,y: x + y, pair[1], 0) for pair in stream] + return {"sumByAllVal":functools.reduce(lambda x,y: x + y, sums, 0)} +@@; + +$udf = Python3::Len(Callable<(Stream<Tuple<String,Stream<Uint32>>>)->Struct<sumByAllVal:Uint32>>, $udfScript); + +--INSERT INTO Output +REDUCE Input1 ON key USING ALL $udf(cast(value as uint32) ?? 0); diff --git a/yql/essentials/tests/sql/suites/produce/reduce_all_with_python_input_stream.cfg b/yql/essentials/tests/sql/suites/produce/reduce_all_with_python_input_stream.cfg new file mode 100644 index 0000000000..13bb8734c4 --- /dev/null +++ b/yql/essentials/tests/sql/suites/produce/reduce_all_with_python_input_stream.cfg @@ -0,0 +1,3 @@ +in Input1 input1.txt +udf python3_udf +providers yt diff --git a/yql/essentials/tests/sql/suites/produce/reduce_by_struct.sql b/yql/essentials/tests/sql/suites/produce/reduce_by_struct.sql new file mode 100644 index 0000000000..a36223f7a4 --- /dev/null +++ b/yql/essentials/tests/sql/suites/produce/reduce_by_struct.sql @@ -0,0 +1,16 @@ +USE plato; + +INSERT INTO @tmp +SELECT * FROM AS_TABLE([ + <|key: <|field1: 1, field2: 1|>, value: 1|>, + <|key: <|field1: 1, field2: 1|>, value: 2|>, + ]); + +COMMIT; + +$reducer = ($_key, $stream) -> ($stream); + +REDUCE @tmp +ON key +USING $reducer(TableRow()); + diff --git a/yql/essentials/tests/sql/suites/produce/reduce_lambda.sql b/yql/essentials/tests/sql/suites/produce/reduce_lambda.sql new file mode 100644 index 0000000000..74d1372d73 --- /dev/null +++ b/yql/essentials/tests/sql/suites/produce/reduce_lambda.sql @@ -0,0 +1,11 @@ +/* postgres can not */ +USE plato; + +$udf = YQL::@@(lambda '(key stream) (AsStruct + '('key key) '('summ (Collect (Condense stream (Uint32 '0) (lambda '(item state) (Bool 'False)) (lambda '(item state) (Add state item))))) +))@@; + +--INSERT INTO Output +$res = (REDUCE Input1 ON key USING $udf(cast(value as uint32) ?? 0)); + +select * from $res order by key; diff --git a/yql/essentials/tests/sql/suites/produce/reduce_lambda_list_mem.sql b/yql/essentials/tests/sql/suites/produce/reduce_lambda_list_mem.sql new file mode 100644 index 0000000000..4593e67ba3 --- /dev/null +++ b/yql/essentials/tests/sql/suites/produce/reduce_lambda_list_mem.sql @@ -0,0 +1,11 @@ +/* postgres can not */ +USE plato; + +$udf = YQL::@@(lambda '(key stream) (AsStruct + '('key key) '('summ (Collect (Condense stream (Uint32 '0) (lambda '(item state) (Bool 'False)) (lambda '(item state) (Add state item))))) +))@@; + +--INSERT INTO Output +$res = (REDUCE (select AsList("foo") as key, "123" as value) ON key USING $udf(cast(value as uint32) ?? 0)); + +select * from $res order by key; diff --git a/yql/essentials/tests/sql/suites/produce/reduce_lambda_list_table.sql b/yql/essentials/tests/sql/suites/produce/reduce_lambda_list_table.sql new file mode 100644 index 0000000000..39aa5ec294 --- /dev/null +++ b/yql/essentials/tests/sql/suites/produce/reduce_lambda_list_table.sql @@ -0,0 +1,11 @@ +/* postgres can not */ +USE plato; + +$udf = YQL::@@(lambda '(key stream) (AsStruct + '('key key) '('summ (Collect (Condense stream (Uint32 '0) (lambda '(item state) (Bool 'False)) (lambda '(item state) (Add state item))))) +))@@; + +--INSERT INTO Output +$res = (REDUCE (select AsList(key) as key, value from Input1) ON key USING $udf(cast(value as uint32) ?? 0)); + +select * from $res order by key; diff --git a/yql/essentials/tests/sql/suites/produce/reduce_lambda_presort_twin.sql b/yql/essentials/tests/sql/suites/produce/reduce_lambda_presort_twin.sql new file mode 100644 index 0000000000..ca37608c70 --- /dev/null +++ b/yql/essentials/tests/sql/suites/produce/reduce_lambda_presort_twin.sql @@ -0,0 +1,13 @@ +/* postgres can not */ +USE plato; + +$udf = YQL::@@(lambda '(key stream) (AsStruct + '('key key) '('superstring (Collect (Condense stream (String '"") (lambda '(item state) (Bool 'False)) (lambda '(item state) + (Concat state (Concat (Member item 'char) (Member item 'num))) + )))) +))@@; + +--INSERT INTO Output +$res = (REDUCE Input1 PRESORT subkey, value desc ON key USING $udf(AsStruct(subkey as char, value as num))); + +select * from $res order by key; diff --git a/yql/essentials/tests/sql/suites/produce/reduce_lambda_presort_twin_list.sql b/yql/essentials/tests/sql/suites/produce/reduce_lambda_presort_twin_list.sql new file mode 100644 index 0000000000..65225b9ffa --- /dev/null +++ b/yql/essentials/tests/sql/suites/produce/reduce_lambda_presort_twin_list.sql @@ -0,0 +1,13 @@ +/* postgres can not */ +USE plato; + +$udf = YQL::@@(lambda '(key stream) (AsStruct + '('key key) '('superstring (Collect (Condense stream (String '"") (lambda '(item state) (Bool 'False)) (lambda '(item state) + (Concat state (Concat (Member item 'char) (Member item 'num))) + )))) +))@@; + +--INSERT INTO Output +$res = (REDUCE Input1 PRESORT AsList(subkey), value desc ON key USING $udf(AsStruct(subkey as char, value as num))); + +select * from $res order by key; diff --git a/yql/essentials/tests/sql/suites/produce/reduce_multi_in-empty.cfg b/yql/essentials/tests/sql/suites/produce/reduce_multi_in-empty.cfg new file mode 100644 index 0000000000..ce36bb4337 --- /dev/null +++ b/yql/essentials/tests/sql/suites/produce/reduce_multi_in-empty.cfg @@ -0,0 +1,2 @@ +in Input empty.txt +res result.txt diff --git a/yql/essentials/tests/sql/suites/produce/reduce_multi_in-sorted.cfg b/yql/essentials/tests/sql/suites/produce/reduce_multi_in-sorted.cfg new file mode 100644 index 0000000000..3f5bebb5a2 --- /dev/null +++ b/yql/essentials/tests/sql/suites/produce/reduce_multi_in-sorted.cfg @@ -0,0 +1,2 @@ +in Input sorted.txt +res result.txt diff --git a/yql/essentials/tests/sql/suites/produce/reduce_multi_in.cfg b/yql/essentials/tests/sql/suites/produce/reduce_multi_in.cfg new file mode 100644 index 0000000000..4468d3ba29 --- /dev/null +++ b/yql/essentials/tests/sql/suites/produce/reduce_multi_in.cfg @@ -0,0 +1,2 @@ +in Input input0.txt +res result.txt diff --git a/yql/essentials/tests/sql/suites/produce/reduce_multi_in.sql b/yql/essentials/tests/sql/suites/produce/reduce_multi_in.sql new file mode 100644 index 0000000000..c819fb4a58 --- /dev/null +++ b/yql/essentials/tests/sql/suites/produce/reduce_multi_in.sql @@ -0,0 +1,22 @@ +/* syntax version 1 */ +/* postgres can not */ +USE plato; + +$udf = YQL::@@ +(lambda '(key stream) + (PartitionByKey stream + (lambda '(item) (Way item)) + (Void) + (Void) + (lambda '(listOfPairs) + (FlatMap listOfPairs + (lambda '(pair) (Just (AsStruct '('key key) '('src (Nth pair '0)) '('cnt (Length (ForwardList (Nth pair '1))))))) + ) + ) + ) +) +@@; + +$r = (REDUCE Input, Input ON key USING $udf(TableRow())); + +SELECT key, src, cnt FROM $r ORDER BY key, src, cnt; diff --git a/yql/essentials/tests/sql/suites/produce/reduce_multi_in_difftype.cfg b/yql/essentials/tests/sql/suites/produce/reduce_multi_in_difftype.cfg new file mode 100644 index 0000000000..4468d3ba29 --- /dev/null +++ b/yql/essentials/tests/sql/suites/produce/reduce_multi_in_difftype.cfg @@ -0,0 +1,2 @@ +in Input input0.txt +res result.txt diff --git a/yql/essentials/tests/sql/suites/produce/reduce_multi_in_difftype.sql b/yql/essentials/tests/sql/suites/produce/reduce_multi_in_difftype.sql new file mode 100644 index 0000000000..b867fc4fdc --- /dev/null +++ b/yql/essentials/tests/sql/suites/produce/reduce_multi_in_difftype.sql @@ -0,0 +1,22 @@ +/* syntax version 1 */ +/* postgres can not */ +USE plato; + +$udf = YQL::@@ +(lambda '(key stream) + (PartitionByKey stream + (lambda '(item) (Way item)) + (Void) + (Void) + (lambda '(listOfPairs) + (FlatMap listOfPairs + (lambda '(pair) (Just (AsStruct '('key key) '('src (Nth pair '0)) '('cnt (Length (ForwardList (Nth pair '1))))))) + ) + ) + ) +) +@@; + +$r = (REDUCE Input, `Input{key,subkey}` ON key USING $udf(TableRow())); + +SELECT key, src, cnt FROM $r ORDER BY key, src, cnt; diff --git a/yql/essentials/tests/sql/suites/produce/reduce_multi_in_difftype_assume.cfg b/yql/essentials/tests/sql/suites/produce/reduce_multi_in_difftype_assume.cfg new file mode 100644 index 0000000000..66737248b8 --- /dev/null +++ b/yql/essentials/tests/sql/suites/produce/reduce_multi_in_difftype_assume.cfg @@ -0,0 +1,2 @@ +in Input sorted.txt +out Output output.txt diff --git a/yql/essentials/tests/sql/suites/produce/reduce_multi_in_difftype_assume.sql b/yql/essentials/tests/sql/suites/produce/reduce_multi_in_difftype_assume.sql new file mode 100644 index 0000000000..159e80f995 --- /dev/null +++ b/yql/essentials/tests/sql/suites/produce/reduce_multi_in_difftype_assume.sql @@ -0,0 +1,46 @@ +/* syntax version 1 */ +/* postgres can not */ +USE plato; + +$user_process = ($key, $t1, $t2, $t3) -> { + return AsStruct( + $key AS key, + COALESCE(cast($t1.subkey as Int32), 0) + COALESCE(cast($t2.subkey as Int32), 0) + COALESCE(cast($t3.subkey as Int32), 0) AS subkey + ); +}; + +$reducer = ($key, $stream) -> { + $stream = YQL::OrderedMap($stream, ($item) -> { + return AsStruct( + YQL::Guess($item, AsAtom("0")).t1 AS t1, + YQL::Guess($item, AsAtom("1")).t2 AS t2, + YQL::Guess($item, AsAtom("2")).t3 AS t3, + ); + }); + $recs = YQL::Collect(YQL::Condense1( + $stream, + ($item) -> {return AsStruct( + $item.t1 AS t1, + $item.t2 AS t2, + $item.t3 AS t3, + );}, + ($_item, $_state) -> {return false;}, + ($item, $state) -> {return AsStruct( + COALESCE($state.t1, $item.t1) AS t1, + COALESCE($state.t2, $item.t2) AS t2, + COALESCE($state.t3, $item.t3) AS t3, + );}, + )); + $rec = Ensure($recs, ListLength($recs) == 1ul)[0]; + return $user_process($key, $rec.t1, $rec.t2, $rec.t3); +}; + +INSERT INTO Output WITH TRUNCATE +REDUCE + (SELECT key, TableRow() AS t1 FROM Input), + (SELECT key, TableRow() AS t2 FROM Input), + (SELECT key, TableRow() AS t3 FROM Input) +ON key +USING $reducer(TableRow()) +ASSUME ORDER BY key; + diff --git a/yql/essentials/tests/sql/suites/produce/reduce_multi_in_difftype_assume_keytuple.cfg b/yql/essentials/tests/sql/suites/produce/reduce_multi_in_difftype_assume_keytuple.cfg new file mode 100644 index 0000000000..66737248b8 --- /dev/null +++ b/yql/essentials/tests/sql/suites/produce/reduce_multi_in_difftype_assume_keytuple.cfg @@ -0,0 +1,2 @@ +in Input sorted.txt +out Output output.txt diff --git a/yql/essentials/tests/sql/suites/produce/reduce_multi_in_difftype_assume_keytuple.sql b/yql/essentials/tests/sql/suites/produce/reduce_multi_in_difftype_assume_keytuple.sql new file mode 100644 index 0000000000..e1f71c3953 --- /dev/null +++ b/yql/essentials/tests/sql/suites/produce/reduce_multi_in_difftype_assume_keytuple.sql @@ -0,0 +1,45 @@ +/* syntax version 1 */ +/* postgres can not */ +USE plato; + +$user_process = ($key, $t1, $t2, $t3) -> { + return AsStruct( + $key.0 AS key, + COALESCE(cast($t1.subkey as Int32), 0) + COALESCE(cast($t2.subkey as Int32), 0) + COALESCE(cast($t3.subkey as Int32), 0) AS subkey + ); +}; + +$reducer = ($key, $stream) -> { + $stream = YQL::OrderedMap($stream, ($item) -> { + return AsStruct( + YQL::Guess($item, AsAtom("0")).t1 AS t1, + YQL::Guess($item, AsAtom("1")).t2 AS t2, + YQL::Guess($item, AsAtom("2")).t3 AS t3, + ); + }); + $recs = YQL::Collect(YQL::Condense1( + $stream, + ($item) -> {return AsStruct( + $item.t1 AS t1, + $item.t2 AS t2, + $item.t3 AS t3, + );}, + ($_item, $_state) -> {return false;}, + ($item, $state) -> {return AsStruct( + COALESCE($state.t1, $item.t1) AS t1, + COALESCE($state.t2, $item.t2) AS t2, + COALESCE($state.t3, $item.t3) AS t3, + );}, + )); + $rec = Ensure($recs, ListLength($recs) == 1ul)[0]; + return $user_process($key, $rec.t1, $rec.t2, $rec.t3); +}; + +INSERT INTO Output WITH TRUNCATE +REDUCE + (SELECT key, subkey, TableRow() AS t1 FROM Input), + (SELECT key, subkey, TableRow() AS t2 FROM Input), + (SELECT key, subkey, TableRow() AS t3 FROM Input) +ON key, subkey +USING $reducer(TableRow()) +ASSUME ORDER BY key, subkey; diff --git a/yql/essentials/tests/sql/suites/produce/reduce_multi_in_keytuple.cfg b/yql/essentials/tests/sql/suites/produce/reduce_multi_in_keytuple.cfg new file mode 100644 index 0000000000..4468d3ba29 --- /dev/null +++ b/yql/essentials/tests/sql/suites/produce/reduce_multi_in_keytuple.cfg @@ -0,0 +1,2 @@ +in Input input0.txt +res result.txt diff --git a/yql/essentials/tests/sql/suites/produce/reduce_multi_in_keytuple.sql b/yql/essentials/tests/sql/suites/produce/reduce_multi_in_keytuple.sql new file mode 100644 index 0000000000..fe9bd349ae --- /dev/null +++ b/yql/essentials/tests/sql/suites/produce/reduce_multi_in_keytuple.sql @@ -0,0 +1,21 @@ +/* syntax version 1 */ +/* postgres can not */ +USE plato; +$udf = YQL::@@ +(lambda '(key stream) + (PartitionByKey stream + (lambda '(item) (Way item)) + (Void) + (Void) + (lambda '(listOfPairs) + (FlatMap listOfPairs + (lambda '(pair) (Just (AsStruct '('key (Nth key '0)) '('src (Nth pair '0)) '('cnt (Length (ForwardList (Nth pair '1))))))) + ) + ) + ) +) +@@; + +$r = (REDUCE Input, Input ON key,subkey USING $udf(TableRow())); + +SELECT key, src, cnt FROM $r ORDER BY key, src, cnt; diff --git a/yql/essentials/tests/sql/suites/produce/reduce_multi_in_keytuple_difftype.cfg b/yql/essentials/tests/sql/suites/produce/reduce_multi_in_keytuple_difftype.cfg new file mode 100644 index 0000000000..4468d3ba29 --- /dev/null +++ b/yql/essentials/tests/sql/suites/produce/reduce_multi_in_keytuple_difftype.cfg @@ -0,0 +1,2 @@ +in Input input0.txt +res result.txt diff --git a/yql/essentials/tests/sql/suites/produce/reduce_multi_in_keytuple_difftype.sql b/yql/essentials/tests/sql/suites/produce/reduce_multi_in_keytuple_difftype.sql new file mode 100644 index 0000000000..ea6a236d22 --- /dev/null +++ b/yql/essentials/tests/sql/suites/produce/reduce_multi_in_keytuple_difftype.sql @@ -0,0 +1,22 @@ +/* syntax version 1 */ +/* postgres can not */ +USE plato; + +$udf = YQL::@@ +(lambda '(key stream) + (PartitionByKey stream + (lambda '(item) (Way item)) + (Void) + (Void) + (lambda '(listOfPairs) + (FlatMap listOfPairs + (lambda '(pair) (Just (AsStruct '('key (Nth key '0)) '('src (Nth pair '0)) '('cnt (Length (ForwardList (Nth pair '1))))))) + ) + ) + ) +) +@@; + +$r = (REDUCE Input, `Input{key,subkey}` ON key,subkey USING $udf(TableRow())); + +SELECT key, src, cnt FROM $r ORDER BY key, src, cnt; diff --git a/yql/essentials/tests/sql/suites/produce/reduce_multi_in_presort.cfg b/yql/essentials/tests/sql/suites/produce/reduce_multi_in_presort.cfg new file mode 100644 index 0000000000..4468d3ba29 --- /dev/null +++ b/yql/essentials/tests/sql/suites/produce/reduce_multi_in_presort.cfg @@ -0,0 +1,2 @@ +in Input input0.txt +res result.txt diff --git a/yql/essentials/tests/sql/suites/produce/reduce_multi_in_presort.sql b/yql/essentials/tests/sql/suites/produce/reduce_multi_in_presort.sql new file mode 100644 index 0000000000..3712268059 --- /dev/null +++ b/yql/essentials/tests/sql/suites/produce/reduce_multi_in_presort.sql @@ -0,0 +1,22 @@ +/* syntax version 1 */ +/* postgres can not */ +USE plato; + +$udf = YQL::@@ +(lambda '(key stream) + (PartitionByKey stream + (lambda '(item) (Way item)) + (Void) + (Void) + (lambda '(listOfPairs) + (FlatMap listOfPairs + (lambda '(pair) (Just (AsStruct '('key key) '('src (Nth pair '0)) '('cnt (Length (ForwardList (Nth pair '1))))))) + ) + ) + ) +) +@@; + +$r = (REDUCE Input, Input PRESORT value, key || subkey ON key USING $udf(TableRow())); + +SELECT key, src, cnt FROM $r ORDER BY key, src, cnt; diff --git a/yql/essentials/tests/sql/suites/produce/reduce_multi_in_ref.cfg b/yql/essentials/tests/sql/suites/produce/reduce_multi_in_ref.cfg new file mode 100644 index 0000000000..4468d3ba29 --- /dev/null +++ b/yql/essentials/tests/sql/suites/produce/reduce_multi_in_ref.cfg @@ -0,0 +1,2 @@ +in Input input0.txt +res result.txt diff --git a/yql/essentials/tests/sql/suites/produce/reduce_multi_in_ref.sql b/yql/essentials/tests/sql/suites/produce/reduce_multi_in_ref.sql new file mode 100644 index 0000000000..9587d1a453 --- /dev/null +++ b/yql/essentials/tests/sql/suites/produce/reduce_multi_in_ref.sql @@ -0,0 +1,24 @@ +/* syntax version 1 */ +/* postgres can not */ +USE plato; + +$udf = YQL::@@ +(lambda '(key stream) + (PartitionByKey stream + (lambda '(item) (Way item)) + (Void) + (Void) + (lambda '(listOfPairs) + (FlatMap listOfPairs + (lambda '(pair) (Just (AsStruct '('key key) '('src (Nth pair '0)) '('cnt (Length (ForwardList (Nth pair '1))))))) + ) + ) + ) +) +@@; + +$src = (select * from plato.Input where key > "200"); + +$r = (REDUCE Input, (select * from Input where key > "100"), $src ON key USING $udf(TableRow())); + +SELECT key, src, cnt FROM $r ORDER BY key, src, cnt; diff --git a/yql/essentials/tests/sql/suites/produce/reduce_multi_in_sampling-sorted.cfg b/yql/essentials/tests/sql/suites/produce/reduce_multi_in_sampling-sorted.cfg new file mode 100644 index 0000000000..2334ceb124 --- /dev/null +++ b/yql/essentials/tests/sql/suites/produce/reduce_multi_in_sampling-sorted.cfg @@ -0,0 +1,2 @@ +in Input sorted1.txt +res result.txt diff --git a/yql/essentials/tests/sql/suites/produce/reduce_multi_in_sampling.cfg b/yql/essentials/tests/sql/suites/produce/reduce_multi_in_sampling.cfg new file mode 100644 index 0000000000..fb4fb16059 --- /dev/null +++ b/yql/essentials/tests/sql/suites/produce/reduce_multi_in_sampling.cfg @@ -0,0 +1,2 @@ +in Input input2.txt +res result.txt diff --git a/yql/essentials/tests/sql/suites/produce/reduce_multi_in_sampling.sql b/yql/essentials/tests/sql/suites/produce/reduce_multi_in_sampling.sql new file mode 100644 index 0000000000..e82a139d2e --- /dev/null +++ b/yql/essentials/tests/sql/suites/produce/reduce_multi_in_sampling.sql @@ -0,0 +1,23 @@ +/* syntax version 1 */ +/* postgres can not */ +/* custom check: len(yt_res_yson[0]['Write'][0]['Data']) < 16 */ +USE plato; + +$udf = YQL::@@ +(lambda '(key stream) + (PartitionByKey stream + (lambda '(item) (Way item)) + (Void) + (Void) + (lambda '(listOfPairs) + (FlatMap listOfPairs + (lambda '(pair) (Just (AsStruct '('key key) '('src (Nth pair '0)) '('cnt (Length (ForwardList (Nth pair '1))))))) + ) + ) + ) +) +@@; + +$r = (REDUCE Input SAMPLE(0.1), Input SAMPLE(0.1) ON key USING $udf(TableRow())); + +SELECT key, src, cnt FROM $r ORDER BY key, src, cnt; diff --git a/yql/essentials/tests/sql/suites/produce/reduce_multi_in_stage_and_flatmap.cfg b/yql/essentials/tests/sql/suites/produce/reduce_multi_in_stage_and_flatmap.cfg new file mode 100644 index 0000000000..4468d3ba29 --- /dev/null +++ b/yql/essentials/tests/sql/suites/produce/reduce_multi_in_stage_and_flatmap.cfg @@ -0,0 +1,2 @@ +in Input input0.txt +res result.txt diff --git a/yql/essentials/tests/sql/suites/produce/reduce_multi_in_stage_and_flatmap.sql b/yql/essentials/tests/sql/suites/produce/reduce_multi_in_stage_and_flatmap.sql new file mode 100644 index 0000000000..1e694e0280 --- /dev/null +++ b/yql/essentials/tests/sql/suites/produce/reduce_multi_in_stage_and_flatmap.sql @@ -0,0 +1,24 @@ +/* syntax version 1 */ +/* postgres can not */ +USE plato; + +$udf = YQL::@@ +(lambda '(key stream) + (PartitionByKey stream + (lambda '(item) (Way item)) + (Void) + (Void) + (lambda '(listOfPairs) + (FlatMap listOfPairs + (lambda '(pair) (Just (AsStruct '('key key) '('src (Nth pair '0)) '('cnt (Length (ForwardList (Nth pair '1))))))) + ) + ) + ) +) +@@; + +$r = (REDUCE Input, AS_TABLE(ListMap(ListFromRange(0,10), ($val) -> { + RETURN AsStruct(Cast($val AS String) AS key, Cast($val AS String) AS subkey, Cast($val AS String) AS value) +})) ON key USING $udf(TableRow())); + +SELECT key, src, cnt FROM $r ORDER BY key, src, cnt;
\ No newline at end of file diff --git a/yql/essentials/tests/sql/suites/produce/reduce_multi_out.cfg b/yql/essentials/tests/sql/suites/produce/reduce_multi_out.cfg new file mode 100644 index 0000000000..f73dce605c --- /dev/null +++ b/yql/essentials/tests/sql/suites/produce/reduce_multi_out.cfg @@ -0,0 +1,4 @@ +in Input input1.txt +res result.txt +udf python2_udf +providers yt diff --git a/yql/essentials/tests/sql/suites/produce/reduce_multi_out.sql b/yql/essentials/tests/sql/suites/produce/reduce_multi_out.sql new file mode 100644 index 0000000000..079ff55899 --- /dev/null +++ b/yql/essentials/tests/sql/suites/produce/reduce_multi_out.sql @@ -0,0 +1,18 @@ +/* syntax version 1 */ +/* postgres can not */ +USE plato; + +$udfScript = @@ +import functools + +def Len(key, input): + sumByValue = functools.reduce(lambda x,y: x + int(y.value), input, 0) + return (sumByValue % 2, {"sumByVal": sumByValue}) +@@; + +$udf = Python::Len(Callable<(String, Stream<Struct<key:String,subkey:String,value:String>>)->Variant<Struct<sumByVal:Uint32>,Struct<sumByVal:Uint32>>>, $udfScript); + +$i, $j = (REDUCE Input ON key USING $udf(TableRow())); + +select * from $i order by sumByVal; +select * from $j order by sumByVal; diff --git a/yql/essentials/tests/sql/suites/produce/reduce_subfields-sorted.cfg b/yql/essentials/tests/sql/suites/produce/reduce_subfields-sorted.cfg new file mode 100644 index 0000000000..a709300fac --- /dev/null +++ b/yql/essentials/tests/sql/suites/produce/reduce_subfields-sorted.cfg @@ -0,0 +1,2 @@ +in Input sorted.txt +udf python3_udf diff --git a/yql/essentials/tests/sql/suites/produce/reduce_subfields.cfg b/yql/essentials/tests/sql/suites/produce/reduce_subfields.cfg new file mode 100644 index 0000000000..0245ac800c --- /dev/null +++ b/yql/essentials/tests/sql/suites/produce/reduce_subfields.cfg @@ -0,0 +1,2 @@ +in Input input0.txt +udf python3_udf diff --git a/yql/essentials/tests/sql/suites/produce/reduce_subfields.sql b/yql/essentials/tests/sql/suites/produce/reduce_subfields.sql new file mode 100644 index 0000000000..3518cac97e --- /dev/null +++ b/yql/essentials/tests/sql/suites/produce/reduce_subfields.sql @@ -0,0 +1,25 @@ +/* postgres can not */ +/* syntax version 1 */ +USE plato; + +$udfScript = @@ +def f(key, input): + for i in input: + yield { + 'key': i.key, + 'value': i.value, + 'pass': 10 + } +@@; + +$udf_stream = Python3::f( +Callable< + (String,Stream<Struct<key:String,value:String>>) + -> + Stream<Struct<key:String,value:String,pass:Int32>> +>, $udfScript); + + +REDUCE Input +ON key +USING $udf_stream(TableRow()); diff --git a/yql/essentials/tests/sql/suites/produce/reduce_typeinfo.cfg b/yql/essentials/tests/sql/suites/produce/reduce_typeinfo.cfg new file mode 100644 index 0000000000..520f10d8f9 --- /dev/null +++ b/yql/essentials/tests/sql/suites/produce/reduce_typeinfo.cfg @@ -0,0 +1,3 @@ +in Input0 input0.txt +udf simple_udf +providers yt
\ No newline at end of file diff --git a/yql/essentials/tests/sql/suites/produce/reduce_typeinfo.sql b/yql/essentials/tests/sql/suites/produce/reduce_typeinfo.sql new file mode 100644 index 0000000000..bebc744f63 --- /dev/null +++ b/yql/essentials/tests/sql/suites/produce/reduce_typeinfo.sql @@ -0,0 +1,18 @@ +/* postgres can not */ +/* syntax version 1 */ +/* ignore runonopt plan diff */ +use plato; + +pragma warning("disable", "4510"); + +$r1 = REDUCE Input0 ON key USING ALL SimpleUdf::GenericAsStruct(TableRows()); +$r2 = REDUCE Input0 ON key USING SimpleUdf::GenericAsStruct(cast(TableRow().subkey as Int32)); +$r3 = REDUCE Input0 ON key USING ALL SimpleUdf::GenericAsStruct(TableRow().key); + + +select * from (select * from $r1 flatten list by arg_0) flatten columns order by key, subkey; +select arg_0 as key, ListSort(YQL::Collect(arg_1)) as values from $r2 order by key; + + +select FormatType(TypeOf(TableRow())) from $r1 limit 1; +select FormatType(TypeOf(TableRow())) from $r3 limit 1; diff --git a/yql/essentials/tests/sql/suites/produce/reduce_with_assume.cfg b/yql/essentials/tests/sql/suites/produce/reduce_with_assume.cfg new file mode 100644 index 0000000000..66737248b8 --- /dev/null +++ b/yql/essentials/tests/sql/suites/produce/reduce_with_assume.cfg @@ -0,0 +1,2 @@ +in Input sorted.txt +out Output output.txt diff --git a/yql/essentials/tests/sql/suites/produce/reduce_with_assume.sql b/yql/essentials/tests/sql/suites/produce/reduce_with_assume.sql new file mode 100644 index 0000000000..3f51be0d12 --- /dev/null +++ b/yql/essentials/tests/sql/suites/produce/reduce_with_assume.sql @@ -0,0 +1,11 @@ +/* postgres can not */ +/* multirun can not */ +/* syntax version 1 */ +USE plato; + +$udf = YQL::@@(lambda '(key stream) (AsStruct + '('key key) '('sum (Collect (Condense stream (Uint32 '0) (lambda '(item state) (Bool 'False)) (lambda '(item state) (Add state item))))) +))@@; + +INSERT INTO Output +REDUCE Input ON key USING $udf(cast(subkey as uint32) ?? 0) ASSUME ORDER BY key; diff --git a/yql/essentials/tests/sql/suites/produce/reduce_with_assume_in_subquery.cfg b/yql/essentials/tests/sql/suites/produce/reduce_with_assume_in_subquery.cfg new file mode 100644 index 0000000000..144f0dd840 --- /dev/null +++ b/yql/essentials/tests/sql/suites/produce/reduce_with_assume_in_subquery.cfg @@ -0,0 +1 @@ +in Input input2.txt diff --git a/yql/essentials/tests/sql/suites/produce/reduce_with_assume_in_subquery.sql b/yql/essentials/tests/sql/suites/produce/reduce_with_assume_in_subquery.sql new file mode 100644 index 0000000000..3913353e12 --- /dev/null +++ b/yql/essentials/tests/sql/suites/produce/reduce_with_assume_in_subquery.sql @@ -0,0 +1,13 @@ +/* postgres can not */ +/* syntax version 1 */ +USE plato; + +$udf = YQL::@@(lambda '(key stream) (AsStruct + '('key key) '('summ (Collect (Condense stream (Nothing (OptionalType (DataType 'String))) (lambda '(item state) (Bool 'False)) (lambda '(item state) (Coalesce state (Just item)))))) +))@@; + +$in = (SELECT * FROM Input ASSUME ORDER BY key, subkey); + +$res = (REDUCE $in ON key USING $udf(value)); + +SELECT * FROM $res ORDER BY key; diff --git a/yql/essentials/tests/sql/suites/produce/reduce_with_flat_lambda.sql b/yql/essentials/tests/sql/suites/produce/reduce_with_flat_lambda.sql new file mode 100644 index 0000000000..b0e1e56200 --- /dev/null +++ b/yql/essentials/tests/sql/suites/produce/reduce_with_flat_lambda.sql @@ -0,0 +1,9 @@ +/* syntax version 1 */ +/* kikimr can not */ +USE plato; + +$udf_stream = ($input)->{ return $input }; + +$res = REDUCE Input0 ON key using all $udf_stream(TableRows()); + +select * from $res order by value; diff --git a/yql/essentials/tests/sql/suites/produce/reduce_with_flat_python_stream.sql b/yql/essentials/tests/sql/suites/produce/reduce_with_flat_python_stream.sql new file mode 100644 index 0000000000..8d164a65c9 --- /dev/null +++ b/yql/essentials/tests/sql/suites/produce/reduce_with_flat_python_stream.sql @@ -0,0 +1,30 @@ +/* syntax version 1 */ +/* kikimr can not */ +USE plato; + +$udfScript = @@ +def f(input): + s = [] + last_key = None + for i in input: + if last_key is not None and last_key != i.key: + s = [] + s.append(i.value) + last_key = i.key + yield { + 'key': i.key, + 'subkey1': i.subkey, + 'value': b''.join(s), + } +@@; + +$udf_stream = Python3::f( +Callable< + (Stream<Struct<key:String,subkey:String,value:String>>) + -> + Stream<Struct<key:String,subkey1:String,value:String>> +>, $udfScript); + +$res = REDUCE Input0 PRESORT value ON key using all $udf_stream(TableRows()) ; + +select * from $res order by key, value;
\ No newline at end of file diff --git a/yql/essentials/tests/sql/suites/produce/reduce_with_presort_diff_order.cfg b/yql/essentials/tests/sql/suites/produce/reduce_with_presort_diff_order.cfg new file mode 100644 index 0000000000..1dc9a1e3a7 --- /dev/null +++ b/yql/essentials/tests/sql/suites/produce/reduce_with_presort_diff_order.cfg @@ -0,0 +1 @@ +in Input input0.txt diff --git a/yql/essentials/tests/sql/suites/produce/reduce_with_presort_diff_order.sql b/yql/essentials/tests/sql/suites/produce/reduce_with_presort_diff_order.sql new file mode 100644 index 0000000000..9bdca4b7d9 --- /dev/null +++ b/yql/essentials/tests/sql/suites/produce/reduce_with_presort_diff_order.sql @@ -0,0 +1,32 @@ +USE plato; + +insert into @skv1v2 +select key, subkey, value as value1, value as value2 from Input order by subkey, key, value1, value2; + +insert into @skv2v1 +select key, subkey, value as value1, value as value2 from Input order by subkey, key, value2, value1; + +insert into @ksv1v2 +select key, subkey, value as value1, value as value2 from Input order by key, subkey, value1, value2; + +commit; + +$udf = YQL::@@(lambda '(key stream) (AsStruct + '('key key) '('summ (Collect (Condense stream (Nothing (OptionalType (DataType 'String))) (lambda '(item state) (Bool 'False)) (lambda '(item state) (Coalesce state (Just item)))))) +))@@; + +select * from ( + reduce concat(@skv1v2, @skv1v2) presort value1, value2 on key, subkey using $udf(value1) --YtReduce +) order by key, summ; + +select * from ( + reduce @ksv1v2 presort value2, value1 on key, subkey using $udf(value1) --YtMapReduce +) order by key, summ; + +select * from ( + reduce concat(@skv1v2, @skv2v1) presort value1, value2 on key, subkey using $udf(value1) --YtMapReduce +) order by key, summ; + +select * from ( + reduce concat(@skv1v2, @ksv1v2) presort value1, value2 on key, subkey using $udf(value1) --YtMapReduce +) order by key, summ; diff --git a/yql/essentials/tests/sql/suites/produce/reduce_with_python.sql b/yql/essentials/tests/sql/suites/produce/reduce_with_python.sql new file mode 100644 index 0000000000..6bdf0efb24 --- /dev/null +++ b/yql/essentials/tests/sql/suites/produce/reduce_with_python.sql @@ -0,0 +1,16 @@ +/* postgres can not */ +/* syntax version 1 */ +USE plato; + +$udfScript = @@ +import functools +def Len(key, input): + return {"value":functools.reduce(lambda x,y: x + 1, input, 0)} +@@; + +$udf = Python::Len(Callable<(String, Stream<String>)->Struct<value:Uint32>>, $udfScript); + +--INSERT INTO Output +$res = (REDUCE Input1 ON key USING $udf(value)); + +select * from $res order by value; diff --git a/yql/essentials/tests/sql/suites/produce/reduce_with_python_few_keys.sql b/yql/essentials/tests/sql/suites/produce/reduce_with_python_few_keys.sql new file mode 100644 index 0000000000..346c977f16 --- /dev/null +++ b/yql/essentials/tests/sql/suites/produce/reduce_with_python_few_keys.sql @@ -0,0 +1,15 @@ +/* postgres can not */ +/* syntax version 1 */ +$udfScript = @@ +import functools +def Len(val_key, input): + return {"zuza": {val_key[0] + b"-" + str(val_key[1]).encode('utf-8'): functools.reduce(lambda x,y: x + 1, input, 0)}} +@@; + +$udf = Python::Len(Callable<(Tuple<String,Uint32>, Stream<String>)->Struct<zuza:Dict<String, Uint32>>>, $udfScript); + +$data = (select Cast(value as uint32) ?? 0 as kk, value as ss, key as val from plato.Input1); + +$res = (reduce $data on val, kk using $udf(ss)); + +select * from $res order by Yql::ToOptional(Yql::DictKeys(zuza)); diff --git a/yql/essentials/tests/sql/suites/produce/reduce_with_python_few_keys_stream.sql b/yql/essentials/tests/sql/suites/produce/reduce_with_python_few_keys_stream.sql new file mode 100644 index 0000000000..aac5ae32b1 --- /dev/null +++ b/yql/essentials/tests/sql/suites/produce/reduce_with_python_few_keys_stream.sql @@ -0,0 +1,19 @@ +/* postgres can not */ +/* syntax version 1 */ +use plato; + +$udfScript = @@ +import functools + +def Len(val_key, input): + return {"zuza": {val_key[0] + b"-" + str(val_key[1]).encode('utf-8'): functools.reduce(lambda x, y: x + 1, input, 0)}} +@@; + +$udf = Python3::Len(Callable<(Tuple<String,Uint32>, Stream<String>)->Struct<zuza:Dict<String, Uint32>>>, $udfScript); + +$data = (select Cast(value as uint32) ?? 0 as kk, value as ss, key as val from Input1); + +--insert into Output +$res = (reduce $data on val, kk using $udf(ss)); + +select * from $res order by DictKeys(zuza); diff --git a/yql/essentials/tests/sql/suites/produce/reduce_with_python_filter_and_having.sql b/yql/essentials/tests/sql/suites/produce/reduce_with_python_filter_and_having.sql new file mode 100644 index 0000000000..dc3beb4025 --- /dev/null +++ b/yql/essentials/tests/sql/suites/produce/reduce_with_python_filter_and_having.sql @@ -0,0 +1,14 @@ +/* postgres can not */ +/* syntax version 1 */ +USE plato; + +$udfScript = @@ +import functools +def Len(key, input): + return {"total":functools.reduce(lambda x,y: x + 1, input, 0)} +@@; + +$udf = Python::Len(Callable<(String, Stream<String>)->Struct<total:Uint32>>, $udfScript); + +--INSERT INTO Output +REDUCE Input1 ON key USING $udf(value) WHERE cast(value as int) > 1 HAVING total > 3; diff --git a/yql/essentials/tests/sql/suites/produce/reduce_with_python_having.sql b/yql/essentials/tests/sql/suites/produce/reduce_with_python_having.sql new file mode 100644 index 0000000000..b226e6f399 --- /dev/null +++ b/yql/essentials/tests/sql/suites/produce/reduce_with_python_having.sql @@ -0,0 +1,14 @@ +/* postgres can not */ +/* syntax version 1 */ +USE plato; + +$udfScript = @@ +import functools +def Len(key, input): + return {"count":functools.reduce(lambda x,y: x + 1, input, 0)} +@@; + +$udf = Python::Len(Callable<(String, Stream<String>)->Struct<count:Uint32>>, $udfScript); + +--INSERT INTO Output +REDUCE Input1 ON key USING $udf(value) HAVING count > 4; diff --git a/yql/essentials/tests/sql/suites/produce/reduce_with_python_input_stream._sql b/yql/essentials/tests/sql/suites/produce/reduce_with_python_input_stream._sql new file mode 100644 index 0000000000..f244cd5c0d --- /dev/null +++ b/yql/essentials/tests/sql/suites/produce/reduce_with_python_input_stream._sql @@ -0,0 +1,14 @@ +/* postgres can not */ +USE plato; + +$udfScript = @@ +import functools +def Len(key, input): + return {"value":functools.reduce(lambda x,y: x + 1, input, 0)} +@@; + +$udf = Python::Len(Callable<(String, Stream<String>)->Struct<value:Uint32>>, $udfScript); + +$res = (REDUCE Input1 ON key USING $udf(value)); + +select * from $res order by value; diff --git a/yql/essentials/tests/sql/suites/produce/reduce_with_python_input_stream.cfg b/yql/essentials/tests/sql/suites/produce/reduce_with_python_input_stream.cfg new file mode 100644 index 0000000000..b16f832837 --- /dev/null +++ b/yql/essentials/tests/sql/suites/produce/reduce_with_python_input_stream.cfg @@ -0,0 +1,2 @@ +in Input1 input1.txt +udf python3_udf diff --git a/yql/essentials/tests/sql/suites/produce/reduce_with_python_presort.sql b/yql/essentials/tests/sql/suites/produce/reduce_with_python_presort.sql new file mode 100644 index 0000000000..fa55b8bce2 --- /dev/null +++ b/yql/essentials/tests/sql/suites/produce/reduce_with_python_presort.sql @@ -0,0 +1,15 @@ +/* postgres can not */ +/* syntax version 1 */ +USE plato; + +$udfScript = @@ +def Len(val_key, input): + return {"joined": {val_key: b", ".join(input)}} +@@; + +$udf = Python3::Len(Callable<(String, Stream<String>)->Struct<joined:Dict<String, String>>>, $udfScript); + +--INSERT INTO Output +$res = (REDUCE Input1 PRESORT value DESC ON key USING $udf(subkey)); + +select * from $res order by Yql::ToOptional(Yql::DictKeys(joined)); diff --git a/yql/essentials/tests/sql/suites/produce/reduce_with_python_presort_stream.sql b/yql/essentials/tests/sql/suites/produce/reduce_with_python_presort_stream.sql new file mode 100644 index 0000000000..276c70136c --- /dev/null +++ b/yql/essentials/tests/sql/suites/produce/reduce_with_python_presort_stream.sql @@ -0,0 +1,15 @@ +/* postgres can not */ +/* syntax version 1 */ +USE plato; + +$udfScript = @@ +def Len(val_key, input): + return {"joined": {val_key: b", ".join(input)}} +@@; + +$udf = Python::Len(Callable<(String, Stream<String>)->Struct<joined:Dict<String, String>>>, $udfScript); + +--INSERT INTO Output +$res = (REDUCE Input1 PRESORT value DESC ON key USING $udf(subkey)); + +select * from $res order by DictKeys(joined); diff --git a/yql/essentials/tests/sql/suites/produce/reduce_with_python_row.sql b/yql/essentials/tests/sql/suites/produce/reduce_with_python_row.sql new file mode 100644 index 0000000000..6d3371d649 --- /dev/null +++ b/yql/essentials/tests/sql/suites/produce/reduce_with_python_row.sql @@ -0,0 +1,17 @@ +/* syntax version 1 */ +/* postgres can not */ +USE plato; + +$udfScript = @@ +import functools + +def Len(key, input): + return {"sumByVal": functools.reduce(lambda x,y: x + int(y.value), input, 0)} +@@; + +$udf = Python3::Len(Callable<(String, Stream<Struct<key:String,subkey:String,value:String>>)->Struct<sumByVal:Uint32>>, $udfScript); + +--INSERT INTO Output +$res = (REDUCE Input1 ON key USING $udf(TableRow())); + +select * from $res order by sumByVal; diff --git a/yql/essentials/tests/sql/suites/produce/reduce_with_python_row_repack.sql b/yql/essentials/tests/sql/suites/produce/reduce_with_python_row_repack.sql new file mode 100644 index 0000000000..1eba13ec1f --- /dev/null +++ b/yql/essentials/tests/sql/suites/produce/reduce_with_python_row_repack.sql @@ -0,0 +1,16 @@ +/* syntax version 1 */ +/* postgres can not */ +USE plato; + +$udfScript = @@ +import functools +def Len(key, input): + return {"sumByValAndKeyLen":functools.reduce(lambda x,y: x + int(y.value) + len(y.key), input, 0)} +@@; + +$udf = Python::Len(Callable<(String, Stream<Struct<key:String,value:String>>)->Struct<sumByValAndKeyLen:Uint32>>, $udfScript); + +--INSERT INTO Output +$res = (REDUCE Input1 ON key USING $udf(AsStruct(TableRow().value as value, TableRow().subkey as key))); + +select * from $res order by sumByValAndKeyLen; diff --git a/yql/essentials/tests/sql/suites/produce/sorted.txt b/yql/essentials/tests/sql/suites/produce/sorted.txt new file mode 100644 index 0000000000..2ede97b886 --- /dev/null +++ b/yql/essentials/tests/sql/suites/produce/sorted.txt @@ -0,0 +1,6 @@ +{"key"="023";"subkey"="3";"value"="aaa"}; +{"key"="037";"subkey"="5";"value"="ddd"}; +{"key"="075";"subkey"="1";"value"="abc"}; +{"key"="150";"subkey"="1";"value"="aaa"}; +{"key"="150";"subkey"="3";"value"="iii"}; +{"key"="150";"subkey"="8";"value"="zzz"}; diff --git a/yql/essentials/tests/sql/suites/produce/sorted.txt.attr b/yql/essentials/tests/sql/suites/produce/sorted.txt.attr new file mode 100644 index 0000000000..391c1a05f6 --- /dev/null +++ b/yql/essentials/tests/sql/suites/produce/sorted.txt.attr @@ -0,0 +1,11 @@ +{"_yql_row_spec"={ + "Type"=["StructType";[ + ["key";["DataType";"String"]]; + ["subkey";["DataType";"String"]]; + ["value";["DataType";"String"]] + ]]; + "SortDirections"=[1;1;1;]; + "SortedBy"=["key";"subkey";"value";]; + "SortedByTypes"=[["DataType";"String";];["DataType";"String";];["DataType";"String";];]; + "SortMembers"=["key";"subkey";"value";]; +}} diff --git a/yql/essentials/tests/sql/suites/produce/sorted1.txt b/yql/essentials/tests/sql/suites/produce/sorted1.txt new file mode 100644 index 0000000000..b214aab0d9 --- /dev/null +++ b/yql/essentials/tests/sql/suites/produce/sorted1.txt @@ -0,0 +1,10 @@ +{"key"="023";"subkey"="3";"value"="aaa"}; +{"key"="037";"subkey"="5";"value"="ddd"}; +{"key"="075";"subkey"="1";"value"="abc"}; +{"key"="150";"subkey"="1";"value"="aaa"}; +{"key"="150";"subkey"="3";"value"="iii"}; +{"key"="150";"subkey"="8";"value"="zzz"}; +{"key"="200";"subkey"="7";"value"="qqq"}; +{"key"="527";"subkey"="4";"value"="bbb"}; +{"key"="761";"subkey"="6";"value"="ccc"}; +{"key"="911";"subkey"="2";"value"="kkk"}; diff --git a/yql/essentials/tests/sql/suites/produce/sorted1.txt.attr b/yql/essentials/tests/sql/suites/produce/sorted1.txt.attr new file mode 100644 index 0000000000..391c1a05f6 --- /dev/null +++ b/yql/essentials/tests/sql/suites/produce/sorted1.txt.attr @@ -0,0 +1,11 @@ +{"_yql_row_spec"={ + "Type"=["StructType";[ + ["key";["DataType";"String"]]; + ["subkey";["DataType";"String"]]; + ["value";["DataType";"String"]] + ]]; + "SortDirections"=[1;1;1;]; + "SortedBy"=["key";"subkey";"value";]; + "SortedByTypes"=[["DataType";"String";];["DataType";"String";];["DataType";"String";];]; + "SortMembers"=["key";"subkey";"value";]; +}} diff --git a/yql/essentials/tests/sql/suites/produce/yql-10297.sql b/yql/essentials/tests/sql/suites/produce/yql-10297.sql new file mode 100644 index 0000000000..9622221e15 --- /dev/null +++ b/yql/essentials/tests/sql/suites/produce/yql-10297.sql @@ -0,0 +1,30 @@ +/* syntax version 1 */ +/* postgres can not */ +DEFINE SUBQUERY $t() AS + select * from as_table([<|key:"0"|>, <|key:"1"|>]); +END DEFINE; + +DEFINE SUBQUERY $split_formula_log($in) AS + $parition = ($row) -> { + $recordType = TypeOf($row); + $varType = VariantType(TupleType($recordType, + $recordType)); + RETURN case + when $row.key = "0" then + Variant($row, "0", $varType) + + when $row.key = "1" then + Variant($row, "1", $varType) + else null + end + ; + }; + + PROCESS $in() USING $parition(TableRow()); +END DEFINE; + + +$a, $b = (PROCESS $split_formula_log($t)); +select * from $a; +select * from $b; + |