aboutsummaryrefslogtreecommitdiffstats
path: root/yql/essentials/tests/sql/suites/produce
diff options
context:
space:
mode:
authorudovichenko-r <udovichenko-r@yandex-team.com>2024-11-19 14:58:38 +0300
committerudovichenko-r <udovichenko-r@yandex-team.com>2024-11-19 15:16:27 +0300
commit24521403b1c44303e043ba540c09b1fe991c7474 (patch)
tree341d1e7206bc7c143d04d2d96f05b6dc0655606d /yql/essentials/tests/sql/suites/produce
parent72b3cd51dc3fb9d16975d353ea82fd85701393cc (diff)
downloadydb-24521403b1c44303e043ba540c09b1fe991c7474.tar.gz
YQL-19206 Move contrib/ydb/library/yql/tests/sql/suites -> yql/essentials/tests/sql/suites
commit_hash:d0ef1f92b09c94db7c2408f946d2a4c62b603f00
Diffstat (limited to 'yql/essentials/tests/sql/suites/produce')
-rw-r--r--yql/essentials/tests/sql/suites/produce/default.cfg7
-rw-r--r--yql/essentials/tests/sql/suites/produce/descending.txt2
-rw-r--r--yql/essentials/tests/sql/suites/produce/descending.txt.attr25
-rw-r--r--yql/essentials/tests/sql/suites/produce/discard_process_with_lambda.sql8
-rw-r--r--yql/essentials/tests/sql/suites/produce/discard_reduce_lambda.sql9
-rw-r--r--yql/essentials/tests/sql/suites/produce/empty.txt0
-rw-r--r--yql/essentials/tests/sql/suites/produce/fuse_reduces_with_presort.cfg1
-rw-r--r--yql/essentials/tests/sql/suites/produce/fuse_reduces_with_presort.sql69
-rw-r--r--yql/essentials/tests/sql/suites/produce/input0.txt4
-rw-r--r--yql/essentials/tests/sql/suites/produce/input1.txt9
-rw-r--r--yql/essentials/tests/sql/suites/produce/input2.txt10
-rw-r--r--yql/essentials/tests/sql/suites/produce/input_sorted.txt4
-rw-r--r--yql/essentials/tests/sql/suites/produce/input_sorted.txt.attr15
-rw-r--r--yql/essentials/tests/sql/suites/produce/native_desc_reduce_with_presort.cfg3
-rw-r--r--yql/essentials/tests/sql/suites/produce/native_desc_reduce_with_presort.sql23
-rw-r--r--yql/essentials/tests/sql/suites/produce/process_and_filter.sql2
-rw-r--r--yql/essentials/tests/sql/suites/produce/process_lambda_opt_args.sql14
-rw-r--r--yql/essentials/tests/sql/suites/produce/process_multi_in.cfg3
-rw-r--r--yql/essentials/tests/sql/suites/produce/process_multi_in.sql22
-rw-r--r--yql/essentials/tests/sql/suites/produce/process_multi_in_single_out.cfg3
-rw-r--r--yql/essentials/tests/sql/suites/produce/process_multi_in_single_out.sql25
-rw-r--r--yql/essentials/tests/sql/suites/produce/process_multi_in_trivial_lambda.cfg2
-rw-r--r--yql/essentials/tests/sql/suites/produce/process_multi_in_trivial_lambda.sql12
-rw-r--r--yql/essentials/tests/sql/suites/produce/process_multi_out.cfg4
-rw-r--r--yql/essentials/tests/sql/suites/produce/process_multi_out.sql24
-rw-r--r--yql/essentials/tests/sql/suites/produce/process_multi_out_bad_count_fail.cfg4
-rw-r--r--yql/essentials/tests/sql/suites/produce/process_multi_out_bad_count_fail.sql24
-rw-r--r--yql/essentials/tests/sql/suites/produce/process_pure_with_sort.sql32
-rw-r--r--yql/essentials/tests/sql/suites/produce/process_row_and_columns.sql19
-rw-r--r--yql/essentials/tests/sql/suites/produce/process_rows_and_filter.cfg3
-rw-r--r--yql/essentials/tests/sql/suites/produce/process_rows_and_filter.sql23
-rw-r--r--yql/essentials/tests/sql/suites/produce/process_rows_sorted_desc_multi_out.cfg2
-rw-r--r--yql/essentials/tests/sql/suites/produce/process_rows_sorted_desc_multi_out.sql30
-rw-r--r--yql/essentials/tests/sql/suites/produce/process_rows_sorted_multi_out.cfg2
-rw-r--r--yql/essentials/tests/sql/suites/produce/process_rows_sorted_multi_out.sql30
-rw-r--r--yql/essentials/tests/sql/suites/produce/process_sorted_desc_multi_out.cfg2
-rw-r--r--yql/essentials/tests/sql/suites/produce/process_sorted_desc_multi_out.sql28
-rw-r--r--yql/essentials/tests/sql/suites/produce/process_sorted_multi_out.cfg2
-rw-r--r--yql/essentials/tests/sql/suites/produce/process_sorted_multi_out.sql28
-rw-r--r--yql/essentials/tests/sql/suites/produce/process_streaming.sql22
-rw-r--r--yql/essentials/tests/sql/suites/produce/process_streaming_count.sql19
-rw-r--r--yql/essentials/tests/sql/suites/produce/process_streaming_inline_bash.sql14
-rw-r--r--yql/essentials/tests/sql/suites/produce/process_trivial_as_struct.sql2
-rw-r--r--yql/essentials/tests/sql/suites/produce/process_with_assume.cfg2
-rw-r--r--yql/essentials/tests/sql/suites/produce/process_with_assume.sql12
-rw-r--r--yql/essentials/tests/sql/suites/produce/process_with_lambda.sql8
-rw-r--r--yql/essentials/tests/sql/suites/produce/process_with_lambda_outstream.sql27
-rw-r--r--yql/essentials/tests/sql/suites/produce/process_with_python.sql10
-rw-r--r--yql/essentials/tests/sql/suites/produce/process_with_python_as_struct.sql10
-rw-r--r--yql/essentials/tests/sql/suites/produce/process_with_python_stream-empty.cfg2
-rw-r--r--yql/essentials/tests/sql/suites/produce/process_with_python_stream.cfg3
-rw-r--r--yql/essentials/tests/sql/suites/produce/process_with_python_stream.sql23
-rw-r--r--yql/essentials/tests/sql/suites/produce/process_with_udf.sql2
-rw-r--r--yql/essentials/tests/sql/suites/produce/process_with_udf_rows.sql17
-rw-r--r--yql/essentials/tests/sql/suites/produce/process_with_udf_validate.sql7
-rw-r--r--yql/essentials/tests/sql/suites/produce/process_with_udf_validate_ignore_broken.sql8
-rw-r--r--yql/essentials/tests/sql/suites/produce/reduce_all.sql16
-rw-r--r--yql/essentials/tests/sql/suites/produce/reduce_all_expr.sql16
-rw-r--r--yql/essentials/tests/sql/suites/produce/reduce_all_list.sql18
-rw-r--r--yql/essentials/tests/sql/suites/produce/reduce_all_list_stream.sql25
-rw-r--r--yql/essentials/tests/sql/suites/produce/reduce_all_multi_in.sql15
-rw-r--r--yql/essentials/tests/sql/suites/produce/reduce_all_opt.sql16
-rw-r--r--yql/essentials/tests/sql/suites/produce/reduce_all_with_python_input_stream-dq_fail.cfg4
-rw-r--r--yql/essentials/tests/sql/suites/produce/reduce_all_with_python_input_stream._sql14
-rw-r--r--yql/essentials/tests/sql/suites/produce/reduce_all_with_python_input_stream.cfg3
-rw-r--r--yql/essentials/tests/sql/suites/produce/reduce_by_struct.sql16
-rw-r--r--yql/essentials/tests/sql/suites/produce/reduce_lambda.sql11
-rw-r--r--yql/essentials/tests/sql/suites/produce/reduce_lambda_list_mem.sql11
-rw-r--r--yql/essentials/tests/sql/suites/produce/reduce_lambda_list_table.sql11
-rw-r--r--yql/essentials/tests/sql/suites/produce/reduce_lambda_presort_twin.sql13
-rw-r--r--yql/essentials/tests/sql/suites/produce/reduce_lambda_presort_twin_list.sql13
-rw-r--r--yql/essentials/tests/sql/suites/produce/reduce_multi_in-empty.cfg2
-rw-r--r--yql/essentials/tests/sql/suites/produce/reduce_multi_in-sorted.cfg2
-rw-r--r--yql/essentials/tests/sql/suites/produce/reduce_multi_in.cfg2
-rw-r--r--yql/essentials/tests/sql/suites/produce/reduce_multi_in.sql22
-rw-r--r--yql/essentials/tests/sql/suites/produce/reduce_multi_in_difftype.cfg2
-rw-r--r--yql/essentials/tests/sql/suites/produce/reduce_multi_in_difftype.sql22
-rw-r--r--yql/essentials/tests/sql/suites/produce/reduce_multi_in_difftype_assume.cfg2
-rw-r--r--yql/essentials/tests/sql/suites/produce/reduce_multi_in_difftype_assume.sql46
-rw-r--r--yql/essentials/tests/sql/suites/produce/reduce_multi_in_difftype_assume_keytuple.cfg2
-rw-r--r--yql/essentials/tests/sql/suites/produce/reduce_multi_in_difftype_assume_keytuple.sql45
-rw-r--r--yql/essentials/tests/sql/suites/produce/reduce_multi_in_keytuple.cfg2
-rw-r--r--yql/essentials/tests/sql/suites/produce/reduce_multi_in_keytuple.sql21
-rw-r--r--yql/essentials/tests/sql/suites/produce/reduce_multi_in_keytuple_difftype.cfg2
-rw-r--r--yql/essentials/tests/sql/suites/produce/reduce_multi_in_keytuple_difftype.sql22
-rw-r--r--yql/essentials/tests/sql/suites/produce/reduce_multi_in_presort.cfg2
-rw-r--r--yql/essentials/tests/sql/suites/produce/reduce_multi_in_presort.sql22
-rw-r--r--yql/essentials/tests/sql/suites/produce/reduce_multi_in_ref.cfg2
-rw-r--r--yql/essentials/tests/sql/suites/produce/reduce_multi_in_ref.sql24
-rw-r--r--yql/essentials/tests/sql/suites/produce/reduce_multi_in_sampling-sorted.cfg2
-rw-r--r--yql/essentials/tests/sql/suites/produce/reduce_multi_in_sampling.cfg2
-rw-r--r--yql/essentials/tests/sql/suites/produce/reduce_multi_in_sampling.sql23
-rw-r--r--yql/essentials/tests/sql/suites/produce/reduce_multi_in_stage_and_flatmap.cfg2
-rw-r--r--yql/essentials/tests/sql/suites/produce/reduce_multi_in_stage_and_flatmap.sql24
-rw-r--r--yql/essentials/tests/sql/suites/produce/reduce_multi_out.cfg4
-rw-r--r--yql/essentials/tests/sql/suites/produce/reduce_multi_out.sql18
-rw-r--r--yql/essentials/tests/sql/suites/produce/reduce_subfields-sorted.cfg2
-rw-r--r--yql/essentials/tests/sql/suites/produce/reduce_subfields.cfg2
-rw-r--r--yql/essentials/tests/sql/suites/produce/reduce_subfields.sql25
-rw-r--r--yql/essentials/tests/sql/suites/produce/reduce_typeinfo.cfg3
-rw-r--r--yql/essentials/tests/sql/suites/produce/reduce_typeinfo.sql18
-rw-r--r--yql/essentials/tests/sql/suites/produce/reduce_with_assume.cfg2
-rw-r--r--yql/essentials/tests/sql/suites/produce/reduce_with_assume.sql11
-rw-r--r--yql/essentials/tests/sql/suites/produce/reduce_with_assume_in_subquery.cfg1
-rw-r--r--yql/essentials/tests/sql/suites/produce/reduce_with_assume_in_subquery.sql13
-rw-r--r--yql/essentials/tests/sql/suites/produce/reduce_with_flat_lambda.sql9
-rw-r--r--yql/essentials/tests/sql/suites/produce/reduce_with_flat_python_stream.sql30
-rw-r--r--yql/essentials/tests/sql/suites/produce/reduce_with_presort_diff_order.cfg1
-rw-r--r--yql/essentials/tests/sql/suites/produce/reduce_with_presort_diff_order.sql32
-rw-r--r--yql/essentials/tests/sql/suites/produce/reduce_with_python.sql16
-rw-r--r--yql/essentials/tests/sql/suites/produce/reduce_with_python_few_keys.sql15
-rw-r--r--yql/essentials/tests/sql/suites/produce/reduce_with_python_few_keys_stream.sql19
-rw-r--r--yql/essentials/tests/sql/suites/produce/reduce_with_python_filter_and_having.sql14
-rw-r--r--yql/essentials/tests/sql/suites/produce/reduce_with_python_having.sql14
-rw-r--r--yql/essentials/tests/sql/suites/produce/reduce_with_python_input_stream._sql14
-rw-r--r--yql/essentials/tests/sql/suites/produce/reduce_with_python_input_stream.cfg2
-rw-r--r--yql/essentials/tests/sql/suites/produce/reduce_with_python_presort.sql15
-rw-r--r--yql/essentials/tests/sql/suites/produce/reduce_with_python_presort_stream.sql15
-rw-r--r--yql/essentials/tests/sql/suites/produce/reduce_with_python_row.sql17
-rw-r--r--yql/essentials/tests/sql/suites/produce/reduce_with_python_row_repack.sql16
-rw-r--r--yql/essentials/tests/sql/suites/produce/sorted.txt6
-rw-r--r--yql/essentials/tests/sql/suites/produce/sorted.txt.attr11
-rw-r--r--yql/essentials/tests/sql/suites/produce/sorted1.txt10
-rw-r--r--yql/essentials/tests/sql/suites/produce/sorted1.txt.attr11
-rw-r--r--yql/essentials/tests/sql/suites/produce/yql-10297.sql30
125 files changed, 1612 insertions, 0 deletions
diff --git a/yql/essentials/tests/sql/suites/produce/default.cfg b/yql/essentials/tests/sql/suites/produce/default.cfg
new file mode 100644
index 0000000000..55e29c24ba
--- /dev/null
+++ b/yql/essentials/tests/sql/suites/produce/default.cfg
@@ -0,0 +1,7 @@
+in Input0 input0.txt
+in Input1 input1.txt
+udf python3_udf
+udf streaming_udf
+udf simple_udf
+udf string_udf
+udf structs_udf
diff --git a/yql/essentials/tests/sql/suites/produce/descending.txt b/yql/essentials/tests/sql/suites/produce/descending.txt
new file mode 100644
index 0000000000..8e63b46e56
--- /dev/null
+++ b/yql/essentials/tests/sql/suites/produce/descending.txt
@@ -0,0 +1,2 @@
+{"key"="075";"subkey"="2";"value"="abc"};
+{"key"="020";"subkey"="1";"value"="q"};
diff --git a/yql/essentials/tests/sql/suites/produce/descending.txt.attr b/yql/essentials/tests/sql/suites/produce/descending.txt.attr
new file mode 100644
index 0000000000..4f8cb9b625
--- /dev/null
+++ b/yql/essentials/tests/sql/suites/produce/descending.txt.attr
@@ -0,0 +1,25 @@
+{
+ "schema"=<
+ "strict" = %true;
+ "unique_keys" = %false
+ >[
+ {
+ "name" = "key";
+ "type" = "string";
+ "required" = %true;
+ "sort_order" = "descending";
+ };
+ {
+ "name" = "subkey";
+ "type" = "string";
+ "required" = %true;
+ "sort_order" = "descending";
+ };
+ {
+ "name" = "value";
+ "type" = "string";
+ "required" = %true;
+ "sort_order" = "descending";
+ };
+ ]
+} \ No newline at end of file
diff --git a/yql/essentials/tests/sql/suites/produce/discard_process_with_lambda.sql b/yql/essentials/tests/sql/suites/produce/discard_process_with_lambda.sql
new file mode 100644
index 0000000000..ea5a9252ff
--- /dev/null
+++ b/yql/essentials/tests/sql/suites/produce/discard_process_with_lambda.sql
@@ -0,0 +1,8 @@
+/* syntax version 1 */
+/* postgres can not */
+$udf = YQL::@@(lambda '(x)
+(FlatMap x
+ (lambda '(y) (AsList y y))
+))@@;
+
+discard process plato.Input0 using $udf(TableRows());
diff --git a/yql/essentials/tests/sql/suites/produce/discard_reduce_lambda.sql b/yql/essentials/tests/sql/suites/produce/discard_reduce_lambda.sql
new file mode 100644
index 0000000000..132b492b4c
--- /dev/null
+++ b/yql/essentials/tests/sql/suites/produce/discard_reduce_lambda.sql
@@ -0,0 +1,9 @@
+/* postgres can not */
+USE plato;
+
+$udf = YQL::@@(lambda '(key stream) (AsStruct
+ '('key key) '('summ (Collect (Condense stream (Uint32 '0) (lambda '(item state) (Bool 'False)) (lambda '(item state) (Add state item)))))
+))@@;
+
+--INSERT INTO Output
+DISCARD REDUCE Input1 ON key USING $udf(cast(value as uint32) ?? 0);
diff --git a/yql/essentials/tests/sql/suites/produce/empty.txt b/yql/essentials/tests/sql/suites/produce/empty.txt
new file mode 100644
index 0000000000..e69de29bb2
--- /dev/null
+++ b/yql/essentials/tests/sql/suites/produce/empty.txt
diff --git a/yql/essentials/tests/sql/suites/produce/fuse_reduces_with_presort.cfg b/yql/essentials/tests/sql/suites/produce/fuse_reduces_with_presort.cfg
new file mode 100644
index 0000000000..612a5060aa
--- /dev/null
+++ b/yql/essentials/tests/sql/suites/produce/fuse_reduces_with_presort.cfg
@@ -0,0 +1 @@
+in Input input_sorted.txt \ No newline at end of file
diff --git a/yql/essentials/tests/sql/suites/produce/fuse_reduces_with_presort.sql b/yql/essentials/tests/sql/suites/produce/fuse_reduces_with_presort.sql
new file mode 100644
index 0000000000..b4f5afd8c2
--- /dev/null
+++ b/yql/essentials/tests/sql/suites/produce/fuse_reduces_with_presort.sql
@@ -0,0 +1,69 @@
+USE plato;
+
+$reduce = ($_, $TableRows) -> {
+ return Yql::Condense1(
+ $TableRows,
+ ($item) -> ($item),
+ ($_, $_) -> (false),
+ ($item, $_) -> ($item)
+ );
+};
+
+$stream =
+select * from Input;
+
+--
+
+$stream1 = Reduce $stream
+presort value1
+on key, subkey
+using $reduce(TableRow())
+assume order by key, subkey, value1;
+
+$stream1 = Reduce $stream1
+presort value1
+on key, subkey
+using $reduce(TableRow())
+assume order by key, subkey, value1;
+
+--
+
+$stream2 = Reduce $stream
+presort value1, value2
+on key, subkey
+using $reduce(TableRow())
+assume order by key, subkey, value1, value2;
+
+$stream2 = Reduce $stream2
+presort value1
+on key, subkey
+using $reduce(TableRow())
+assume order by key, subkey, value1;
+
+--
+
+$stream3 = Reduce $stream
+presort value1, value2, value3
+on key, subkey
+using $reduce(TableRow())
+assume order by key, subkey, value1, value2, value3;
+
+$stream3 = Reduce $stream3
+on key, subkey
+using $reduce(TableRow())
+assume order by key, subkey;
+
+select
+ *
+from $stream1
+ASSUME ORDER BY `key`, `subkey`;
+
+select
+ *
+from $stream2
+ASSUME ORDER BY `key`, `subkey`;
+
+select
+ *
+from $stream3
+ASSUME ORDER BY `key`, `subkey`;
diff --git a/yql/essentials/tests/sql/suites/produce/input0.txt b/yql/essentials/tests/sql/suites/produce/input0.txt
new file mode 100644
index 0000000000..65949ea745
--- /dev/null
+++ b/yql/essentials/tests/sql/suites/produce/input0.txt
@@ -0,0 +1,4 @@
+{"key"="075";"subkey"="1";"value"="abc"};
+{"key"="800";"subkey"="2";"value"="ddd"};
+{"key"="020";"subkey"="3";"value"="q"};
+{"key"="150";"subkey"="4";"value"="qzz"};
diff --git a/yql/essentials/tests/sql/suites/produce/input1.txt b/yql/essentials/tests/sql/suites/produce/input1.txt
new file mode 100644
index 0000000000..60ee525827
--- /dev/null
+++ b/yql/essentials/tests/sql/suites/produce/input1.txt
@@ -0,0 +1,9 @@
+{"key"="foo";"subkey"="a";"value"="7"};
+{"key"="foo";"subkey"="b";"value"="1"};
+{"key"="foo";"subkey"="b";"value"="0"};
+{"key"="foo";"subkey"="a";"value"="2"};
+{"key"="bar";"subkey"="b";"value"="1"};
+{"key"="bar";"subkey"="u";"value"="2"};
+{"key"="bar";"subkey"="n";"value"="3"};
+{"key"="bar";"subkey"="n";"value"="4"};
+{"key"="bar";"subkey"="y";"value"="5"};
diff --git a/yql/essentials/tests/sql/suites/produce/input2.txt b/yql/essentials/tests/sql/suites/produce/input2.txt
new file mode 100644
index 0000000000..b214aab0d9
--- /dev/null
+++ b/yql/essentials/tests/sql/suites/produce/input2.txt
@@ -0,0 +1,10 @@
+{"key"="023";"subkey"="3";"value"="aaa"};
+{"key"="037";"subkey"="5";"value"="ddd"};
+{"key"="075";"subkey"="1";"value"="abc"};
+{"key"="150";"subkey"="1";"value"="aaa"};
+{"key"="150";"subkey"="3";"value"="iii"};
+{"key"="150";"subkey"="8";"value"="zzz"};
+{"key"="200";"subkey"="7";"value"="qqq"};
+{"key"="527";"subkey"="4";"value"="bbb"};
+{"key"="761";"subkey"="6";"value"="ccc"};
+{"key"="911";"subkey"="2";"value"="kkk"};
diff --git a/yql/essentials/tests/sql/suites/produce/input_sorted.txt b/yql/essentials/tests/sql/suites/produce/input_sorted.txt
new file mode 100644
index 0000000000..070ba2f987
--- /dev/null
+++ b/yql/essentials/tests/sql/suites/produce/input_sorted.txt
@@ -0,0 +1,4 @@
+{"key"="020";"subkey"="1";"value1"="abc";"value2"="aabbcc";"value3"="aa"};
+{"key"="075";"subkey"="2";"value1"="ddd";"value2"="dddddd";"value3"="bb"};
+{"key"="150";"subkey"="3";"value1"="q";"value2"="qq";"value3"="cc"};
+{"key"="800";"subkey"="4";"value1"="qzz";"value2"="qqzzzz";"value3"="dd"}; \ No newline at end of file
diff --git a/yql/essentials/tests/sql/suites/produce/input_sorted.txt.attr b/yql/essentials/tests/sql/suites/produce/input_sorted.txt.attr
new file mode 100644
index 0000000000..f091ca59dd
--- /dev/null
+++ b/yql/essentials/tests/sql/suites/produce/input_sorted.txt.attr
@@ -0,0 +1,15 @@
+{
+ "_yql_row_spec"={
+ "Type"=["StructType";[
+ ["key";["DataType";"String"]];
+ ["subkey";["DataType";"String"]];
+ ["value1";["DataType";"String"]];
+ ["value2";["DataType";"String"]];
+ ["value3";["DataType";"String"]]
+ ]];
+ "SortDirections"=[1;1;1;1;1];
+ "SortedBy"=["key";"subkey";"value1";"value2";"value3"];
+ "SortedByTypes"=[["DataType";"String"];["DataType";"String"];["DataType";"String"];["DataType";"String"];["DataType";"String"]];
+ "SortMembers"=["key";"subkey";"value1";"value2";"value3"];
+ }
+}
diff --git a/yql/essentials/tests/sql/suites/produce/native_desc_reduce_with_presort.cfg b/yql/essentials/tests/sql/suites/produce/native_desc_reduce_with_presort.cfg
new file mode 100644
index 0000000000..faf73f95ff
--- /dev/null
+++ b/yql/essentials/tests/sql/suites/produce/native_desc_reduce_with_presort.cfg
@@ -0,0 +1,3 @@
+in Input1 descending.txt
+in Input2 sorted.txt
+res result.txt
diff --git a/yql/essentials/tests/sql/suites/produce/native_desc_reduce_with_presort.sql b/yql/essentials/tests/sql/suites/produce/native_desc_reduce_with_presort.sql
new file mode 100644
index 0000000000..4970a01bbe
--- /dev/null
+++ b/yql/essentials/tests/sql/suites/produce/native_desc_reduce_with_presort.sql
@@ -0,0 +1,23 @@
+/* postgres can not */
+USE plato;
+pragma yt.UseNativeDescSort;
+
+$udf = YQL::@@(lambda '(key stream) (AsStruct
+ '('key key) '('summ (Collect (Condense stream (Nothing (OptionalType (DataType 'String))) (lambda '(item state) (Bool 'False)) (lambda '(item state) (Coalesce state (Just item))))))
+))@@;
+
+select * from (
+ reduce Input1 presort value desc on key, subkey using $udf(value) --YtReduce
+) order by key, summ;
+
+select * from (
+ reduce Input1 presort subkey desc, value desc on key using $udf(value) --YtReduce
+) order by key, summ;
+
+select * from (
+ reduce Input1 presort value on key, subkey using $udf(value) --YtMapReduce
+) order by key, summ;
+
+select * from (
+ reduce concat(Input1, Input2) presort value desc on key, subkey using $udf(value) --YtMapReduce
+) order by key, summ;
diff --git a/yql/essentials/tests/sql/suites/produce/process_and_filter.sql b/yql/essentials/tests/sql/suites/produce/process_and_filter.sql
new file mode 100644
index 0000000000..2bd992f4f4
--- /dev/null
+++ b/yql/essentials/tests/sql/suites/produce/process_and_filter.sql
@@ -0,0 +1,2 @@
+/* postgres can not */
+PROCESS pLaTo.Input0 USING SimpleUdf::Echo(value) as val WHERE value == "abc";
diff --git a/yql/essentials/tests/sql/suites/produce/process_lambda_opt_args.sql b/yql/essentials/tests/sql/suites/produce/process_lambda_opt_args.sql
new file mode 100644
index 0000000000..612885c95b
--- /dev/null
+++ b/yql/essentials/tests/sql/suites/produce/process_lambda_opt_args.sql
@@ -0,0 +1,14 @@
+USE plato;
+
+$f = ($x, $optArg?)->{
+ return Ensure($x, $optArg is null or len($optArg)>0);
+};
+
+PROCESS Input0 USING $f(TableRow());
+
+PROCESS Input0 USING $f(TableRow(),'foo');
+
+PROCESS Input0 USING $f(TableRows());
+
+PROCESS Input0 USING $f(TableRows(),'foo');
+
diff --git a/yql/essentials/tests/sql/suites/produce/process_multi_in.cfg b/yql/essentials/tests/sql/suites/produce/process_multi_in.cfg
new file mode 100644
index 0000000000..19cfc046c5
--- /dev/null
+++ b/yql/essentials/tests/sql/suites/produce/process_multi_in.cfg
@@ -0,0 +1,3 @@
+in Input input0.txt
+res result.txt
+udf python3_udf
diff --git a/yql/essentials/tests/sql/suites/produce/process_multi_in.sql b/yql/essentials/tests/sql/suites/produce/process_multi_in.sql
new file mode 100644
index 0000000000..41d9c9be0e
--- /dev/null
+++ b/yql/essentials/tests/sql/suites/produce/process_multi_in.sql
@@ -0,0 +1,22 @@
+/* syntax version 1 */
+/* postgres can not */
+/* dq file can not */
+$udfScript = @@
+def MyFunc(stream):
+ return stream
+@@;
+
+$record = (SELECT TableRow() FROM plato.Input);
+$recordType = TypeOf(Unwrap($record));
+$streamType = StreamType(VariantType(TupleType($recordType, $recordType, $recordType)));
+$udf = Python3::MyFunc(CallableType(0, $streamType, $streamType), $udfScript);
+
+$src = (select * from plato.Input where key > "200");
+
+$i, $j, $k = (PROCESS plato.Input, (select * from plato.Input where key > "100"), $src USING $udf(TableRows()));
+
+select * from $i;
+
+select * from $j;
+
+select * from $k;
diff --git a/yql/essentials/tests/sql/suites/produce/process_multi_in_single_out.cfg b/yql/essentials/tests/sql/suites/produce/process_multi_in_single_out.cfg
new file mode 100644
index 0000000000..98996c5bf7
--- /dev/null
+++ b/yql/essentials/tests/sql/suites/produce/process_multi_in_single_out.cfg
@@ -0,0 +1,3 @@
+in Input0 input0.txt
+out Output output.txt
+providers yt
diff --git a/yql/essentials/tests/sql/suites/produce/process_multi_in_single_out.sql b/yql/essentials/tests/sql/suites/produce/process_multi_in_single_out.sql
new file mode 100644
index 0000000000..11bc99aa5f
--- /dev/null
+++ b/yql/essentials/tests/sql/suites/produce/process_multi_in_single_out.sql
@@ -0,0 +1,25 @@
+/* syntax version 1 */
+/* postgres can not */
+
+$udf = YQL::@@
+(lambda '(stream)
+ (PartitionByKey stream
+ (lambda '(item) (Way item))
+ (Void)
+ (Void)
+ (lambda '(listOfPairs)
+ (FlatMap listOfPairs (lambda '(pair)
+ (Map (Nth pair '1) (lambda '(elem)
+ (AsStruct
+ '('cnt (Visit elem '0 (lambda '(v) (Member v 'subkey)) '1 (lambda '(v) (Member v 'subkey))))
+ '('src (Nth pair '0))
+ )
+ ))
+ ))
+ )
+ )
+)
+@@;
+
+INSERT INTO plato.Output WITH TRUNCATE
+PROCESS plato.Input0, (select * from plato.Input0 where key > "100") USING $udf(TableRows());
diff --git a/yql/essentials/tests/sql/suites/produce/process_multi_in_trivial_lambda.cfg b/yql/essentials/tests/sql/suites/produce/process_multi_in_trivial_lambda.cfg
new file mode 100644
index 0000000000..4468d3ba29
--- /dev/null
+++ b/yql/essentials/tests/sql/suites/produce/process_multi_in_trivial_lambda.cfg
@@ -0,0 +1,2 @@
+in Input input0.txt
+res result.txt
diff --git a/yql/essentials/tests/sql/suites/produce/process_multi_in_trivial_lambda.sql b/yql/essentials/tests/sql/suites/produce/process_multi_in_trivial_lambda.sql
new file mode 100644
index 0000000000..51c3de3aaf
--- /dev/null
+++ b/yql/essentials/tests/sql/suites/produce/process_multi_in_trivial_lambda.sql
@@ -0,0 +1,12 @@
+/* syntax version 1 */
+/* postgres can not */
+USE plato;
+
+$lambda = ($x) -> { return $x; };
+
+$result = PROCESS Input, Input
+USING
+ $lambda(TableRow())
+;
+
+SELECT * FROM AS_TABLE($result.0);
diff --git a/yql/essentials/tests/sql/suites/produce/process_multi_out.cfg b/yql/essentials/tests/sql/suites/produce/process_multi_out.cfg
new file mode 100644
index 0000000000..d5a164924d
--- /dev/null
+++ b/yql/essentials/tests/sql/suites/produce/process_multi_out.cfg
@@ -0,0 +1,4 @@
+in Input input0.txt
+res result.txt
+udf python2_udf
+providers yt
diff --git a/yql/essentials/tests/sql/suites/produce/process_multi_out.sql b/yql/essentials/tests/sql/suites/produce/process_multi_out.sql
new file mode 100644
index 0000000000..a6162a13a8
--- /dev/null
+++ b/yql/essentials/tests/sql/suites/produce/process_multi_out.sql
@@ -0,0 +1,24 @@
+/* syntax version 1 */
+/* postgres can not */
+$udfScript = @@
+def MyFunc(list):
+ return [(int(x.key) % 2, x) for x in list]
+@@;
+
+$record = (SELECT TableRow() FROM plato.Input);
+$recordType =TypeOf(Unwrap($record));
+
+$udf = Python::MyFunc(
+ CallableType(0,
+ StreamType(
+ VariantType(TupleType($recordType, $recordType))
+ ),
+ StreamType($recordType)),
+ $udfScript
+);
+
+$i, $j = (PROCESS plato.Input USING $udf(TableRows()));
+
+select * from $i;
+
+select * from $j;
diff --git a/yql/essentials/tests/sql/suites/produce/process_multi_out_bad_count_fail.cfg b/yql/essentials/tests/sql/suites/produce/process_multi_out_bad_count_fail.cfg
new file mode 100644
index 0000000000..00bbe745ac
--- /dev/null
+++ b/yql/essentials/tests/sql/suites/produce/process_multi_out_bad_count_fail.cfg
@@ -0,0 +1,4 @@
+xfail
+in Input input0.txt
+res result.txt
+udf python2_udf
diff --git a/yql/essentials/tests/sql/suites/produce/process_multi_out_bad_count_fail.sql b/yql/essentials/tests/sql/suites/produce/process_multi_out_bad_count_fail.sql
new file mode 100644
index 0000000000..fdd02d93e3
--- /dev/null
+++ b/yql/essentials/tests/sql/suites/produce/process_multi_out_bad_count_fail.sql
@@ -0,0 +1,24 @@
+/* syntax version 1 */
+/* postgres can not */
+$udfScript = @@
+def MyFunc(list):
+ return [(int(x.key) % 2, x) for x in list]
+@@;
+
+$record = (SELECT TableRow() FROM plato.Input);
+$recordType =TypeOf(Unwrap($record));
+
+$udf = Python::MyFunc(
+ CallableType(0,
+ StreamType(
+ VariantType(TupleType($recordType, $recordType))
+ ),
+ StreamType($recordType)),
+ $udfScript
+);
+
+$i, $j, $k = (PROCESS plato.Input USING $udf(TableRows()));
+
+select * from $i;
+select * from $j;
+select * from $k;
diff --git a/yql/essentials/tests/sql/suites/produce/process_pure_with_sort.sql b/yql/essentials/tests/sql/suites/produce/process_pure_with_sort.sql
new file mode 100644
index 0000000000..c03cc257ce
--- /dev/null
+++ b/yql/essentials/tests/sql/suites/produce/process_pure_with_sort.sql
@@ -0,0 +1,32 @@
+/* syntax version 1 */
+/* postgres can not */
+USE plato;
+
+$sorted = ($world, $input, $orderByColumns, $asc) -> {
+ $n = ListLength($orderByColumns);
+
+ $keySelector = LambdaCode(($row) -> {
+ $items = ListMap($orderByColumns,
+ ($x) -> {
+ RETURN FuncCode("Member", $row, AtomCode($x));
+ });
+ RETURN ListCode($items);
+ });
+
+ $sort = EvaluateCode(LambdaCode(($x) -> {
+ return FuncCode("Sort",
+ $x,
+ ListCode(ListReplicate(ReprCode($asc), $n)),
+ $keySelector)
+ }));
+
+ RETURN $sort($input($world));
+};
+
+DEFINE SUBQUERY $source() AS
+ PROCESS Input0;
+END DEFINE;
+
+PROCESS $sorted($source, AsList("key","subkey"), true);
+PROCESS $sorted($source, AsList("value"), true);
+PROCESS $sorted($source, ListCreate(TypeOf("")), true); \ No newline at end of file
diff --git a/yql/essentials/tests/sql/suites/produce/process_row_and_columns.sql b/yql/essentials/tests/sql/suites/produce/process_row_and_columns.sql
new file mode 100644
index 0000000000..8ce580f615
--- /dev/null
+++ b/yql/essentials/tests/sql/suites/produce/process_row_and_columns.sql
@@ -0,0 +1,19 @@
+/* syntax version 1 */
+/* postgres can not */
+$udfScript = @@
+def processRow(row, tag, separator):
+ return {"value":row.Name + separator + row.Value + separator + tag};
+@@;
+
+$udf = Python::processRow(
+ Callable<(Struct<Name:String, Tag:String, Value:String>, String, String)->Struct<value:String>>,
+ $udfScript
+);
+
+$data = (
+ SELECT key AS Name, value AS Value, subkey AS Tag FROM plato.Input0
+);
+
+$separator = "|";
+
+PROCESS $data USING $udf(TableRow(), Tag, $separator);
diff --git a/yql/essentials/tests/sql/suites/produce/process_rows_and_filter.cfg b/yql/essentials/tests/sql/suites/produce/process_rows_and_filter.cfg
new file mode 100644
index 0000000000..c1a5b06634
--- /dev/null
+++ b/yql/essentials/tests/sql/suites/produce/process_rows_and_filter.cfg
@@ -0,0 +1,3 @@
+in Input1 input1.txt
+udf python2_udf
+providers yt
diff --git a/yql/essentials/tests/sql/suites/produce/process_rows_and_filter.sql b/yql/essentials/tests/sql/suites/produce/process_rows_and_filter.sql
new file mode 100644
index 0000000000..1af88830eb
--- /dev/null
+++ b/yql/essentials/tests/sql/suites/produce/process_rows_and_filter.sql
@@ -0,0 +1,23 @@
+/* syntax version 1 */
+/* postgres can not */
+$udfScript = @@
+def processRows(prefix, rowList, separator):
+ result = [];
+ for row in rowList:
+ result.append({"Data" : prefix + row.Name + separator + row.Value});
+
+ return result;
+@@;
+
+$udf = Python::processRows(
+ Callable<(String, Stream<Struct<Name:String, Value:String>>, String)->Stream<Struct<Data:String>>>,
+ $udfScript
+);
+
+$data = (
+ SELECT key AS Name, value AS Value FROM plato.Input1
+);
+
+$prefix = ">>";
+
+PROCESS $data USING $udf($prefix, TableRows(), "=") WHERE Name != "foo";
diff --git a/yql/essentials/tests/sql/suites/produce/process_rows_sorted_desc_multi_out.cfg b/yql/essentials/tests/sql/suites/produce/process_rows_sorted_desc_multi_out.cfg
new file mode 100644
index 0000000000..a1b36ede45
--- /dev/null
+++ b/yql/essentials/tests/sql/suites/produce/process_rows_sorted_desc_multi_out.cfg
@@ -0,0 +1,2 @@
+res result.txt
+providers yt
diff --git a/yql/essentials/tests/sql/suites/produce/process_rows_sorted_desc_multi_out.sql b/yql/essentials/tests/sql/suites/produce/process_rows_sorted_desc_multi_out.sql
new file mode 100644
index 0000000000..97fb0222b6
--- /dev/null
+++ b/yql/essentials/tests/sql/suites/produce/process_rows_sorted_desc_multi_out.sql
@@ -0,0 +1,30 @@
+USE plato;
+
+$values = ListMap(
+ ListFromRange(0, 30),
+ ($x) -> (AsStruct($x as x))
+);
+
+INSERT INTO @table SELECT * FROM AS_TABLE($values) ORDER BY x DESC;
+
+COMMIT;
+
+$splitter = ($rows) -> {
+ $recordType = StreamItemType(TypeOf($rows));
+ $varType = VariantType(TupleType($recordType, $recordType, $recordType, $recordType));
+ RETURN Yql::OrderedMap($rows, ($row) -> {
+ RETURN CASE $row.x
+ WHEN 0 THEN Variant($row, "0", $varType)
+ WHEN 1 THEN Variant($row, "1", $varType)
+ WHEN 2 THEN Variant($row, "2", $varType)
+ ELSE Variant($row, "3", $varType)
+ END;
+ });
+};
+
+$a, $b, $c, $d = (PROCESS @table USING $splitter(TableRows()));
+
+SELECT * FROM $a;
+SELECT * FROM $b;
+SELECT * FROM $c ORDER BY x DESC;
+SELECT * FROM $d ORDER BY x DESC; \ No newline at end of file
diff --git a/yql/essentials/tests/sql/suites/produce/process_rows_sorted_multi_out.cfg b/yql/essentials/tests/sql/suites/produce/process_rows_sorted_multi_out.cfg
new file mode 100644
index 0000000000..a1b36ede45
--- /dev/null
+++ b/yql/essentials/tests/sql/suites/produce/process_rows_sorted_multi_out.cfg
@@ -0,0 +1,2 @@
+res result.txt
+providers yt
diff --git a/yql/essentials/tests/sql/suites/produce/process_rows_sorted_multi_out.sql b/yql/essentials/tests/sql/suites/produce/process_rows_sorted_multi_out.sql
new file mode 100644
index 0000000000..17ecc1de2b
--- /dev/null
+++ b/yql/essentials/tests/sql/suites/produce/process_rows_sorted_multi_out.sql
@@ -0,0 +1,30 @@
+USE plato;
+
+$values = ListMap(
+ ListFromRange(0, 30),
+ ($x) -> (AsStruct($x as x))
+);
+
+INSERT INTO @table SELECT * FROM AS_TABLE($values) ORDER BY x;
+
+COMMIT;
+
+$splitter = ($rows) -> {
+ $recordType = StreamItemType(TypeOf($rows));
+ $varType = VariantType(TupleType($recordType, $recordType, $recordType, $recordType));
+ RETURN Yql::OrderedMap($rows, ($row) -> {
+ RETURN CASE $row.x
+ WHEN 0 THEN Variant($row, "0", $varType)
+ WHEN 1 THEN Variant($row, "1", $varType)
+ WHEN 2 THEN Variant($row, "2", $varType)
+ ELSE Variant($row, "3", $varType)
+ END;
+ });
+};
+
+$a, $b, $c, $d = (PROCESS @table USING $splitter(TableRows()));
+
+SELECT * FROM $a;
+SELECT * FROM $b;
+SELECT * FROM $c ORDER BY x;
+SELECT * FROM $d ORDER BY x; \ No newline at end of file
diff --git a/yql/essentials/tests/sql/suites/produce/process_sorted_desc_multi_out.cfg b/yql/essentials/tests/sql/suites/produce/process_sorted_desc_multi_out.cfg
new file mode 100644
index 0000000000..a1b36ede45
--- /dev/null
+++ b/yql/essentials/tests/sql/suites/produce/process_sorted_desc_multi_out.cfg
@@ -0,0 +1,2 @@
+res result.txt
+providers yt
diff --git a/yql/essentials/tests/sql/suites/produce/process_sorted_desc_multi_out.sql b/yql/essentials/tests/sql/suites/produce/process_sorted_desc_multi_out.sql
new file mode 100644
index 0000000000..478eda3d2c
--- /dev/null
+++ b/yql/essentials/tests/sql/suites/produce/process_sorted_desc_multi_out.sql
@@ -0,0 +1,28 @@
+USE plato;
+
+$values = ListMap(
+ ListFromRange(0, 30),
+ ($x) -> (AsStruct($x as x))
+);
+
+INSERT INTO @table SELECT * FROM AS_TABLE($values) ORDER BY x DESC;
+
+COMMIT;
+
+$splitter = ($row) -> {
+ $recordType = TypeOf($row);
+ $varType = VariantType(TupleType($recordType, $recordType, $recordType, $recordType));
+ RETURN CASE $row.x
+ WHEN 0 THEN Variant($row, "0", $varType)
+ WHEN 1 THEN Variant($row, "1", $varType)
+ WHEN 2 THEN Variant($row, "2", $varType)
+ ELSE Variant($row, "3", $varType)
+ END
+};
+
+$a, $b, $c, $d = (PROCESS @table USING $splitter(TableRow()));
+
+SELECT * FROM $a;
+SELECT * FROM $b;
+SELECT * FROM $c ORDER BY x DESC;
+SELECT * FROM $d ORDER BY x DESC; \ No newline at end of file
diff --git a/yql/essentials/tests/sql/suites/produce/process_sorted_multi_out.cfg b/yql/essentials/tests/sql/suites/produce/process_sorted_multi_out.cfg
new file mode 100644
index 0000000000..a1b36ede45
--- /dev/null
+++ b/yql/essentials/tests/sql/suites/produce/process_sorted_multi_out.cfg
@@ -0,0 +1,2 @@
+res result.txt
+providers yt
diff --git a/yql/essentials/tests/sql/suites/produce/process_sorted_multi_out.sql b/yql/essentials/tests/sql/suites/produce/process_sorted_multi_out.sql
new file mode 100644
index 0000000000..87a401a806
--- /dev/null
+++ b/yql/essentials/tests/sql/suites/produce/process_sorted_multi_out.sql
@@ -0,0 +1,28 @@
+USE plato;
+
+$values = ListMap(
+ ListFromRange(0, 30),
+ ($x) -> (AsStruct($x as x))
+);
+
+INSERT INTO @table SELECT * FROM AS_TABLE($values) ORDER BY x;
+
+COMMIT;
+
+$splitter = ($row) -> {
+ $recordType = TypeOf($row);
+ $varType = VariantType(TupleType($recordType, $recordType, $recordType, $recordType));
+ RETURN CASE $row.x
+ WHEN 0 THEN Variant($row, "0", $varType)
+ WHEN 1 THEN Variant($row, "1", $varType)
+ WHEN 2 THEN Variant($row, "2", $varType)
+ ELSE Variant($row, "3", $varType)
+ END
+};
+
+$a, $b, $c, $d = (PROCESS @table USING $splitter(TableRow()));
+
+SELECT * FROM $a;
+SELECT * FROM $b;
+SELECT * FROM $c ORDER BY x;
+SELECT * FROM $d ORDER BY x; \ No newline at end of file
diff --git a/yql/essentials/tests/sql/suites/produce/process_streaming.sql b/yql/essentials/tests/sql/suites/produce/process_streaming.sql
new file mode 100644
index 0000000000..1ad7274c79
--- /dev/null
+++ b/yql/essentials/tests/sql/suites/produce/process_streaming.sql
@@ -0,0 +1,22 @@
+/* syntax version 1 */
+/* postgres can not */
+-- not supported on windows
+
+$input = (
+ SELECT String::JoinFromList(AsList(key, subkey, value), ",") AS Data FROM plato.Input1
+);
+
+$processed = (
+ PROCESS $input USING Streaming::Process(TableRows(), "grep", AsList("[14]"))
+);
+
+$list = (
+ SELECT String::SplitToList(Data, ',') AS DataList FROM $processed
+);
+
+SELECT
+ input.DataList[0] AS key,
+ input.DataList[1] AS subkey,
+ input.DataList[2] AS value
+FROM $list AS input;
+
diff --git a/yql/essentials/tests/sql/suites/produce/process_streaming_count.sql b/yql/essentials/tests/sql/suites/produce/process_streaming_count.sql
new file mode 100644
index 0000000000..c8111d385a
--- /dev/null
+++ b/yql/essentials/tests/sql/suites/produce/process_streaming_count.sql
@@ -0,0 +1,19 @@
+/* syntax version 1 */
+/* postgres can not */
+-- not supported on windows
+
+$input = (
+ SELECT String::JoinFromList(AsList(key, subkey, value), ",") AS Data FROM plato.Input1
+);
+
+$processed = (
+ PROCESS $input USING Streaming::Process(TableRows(), "grep", AsList("[14]"))
+);
+
+SELECT
+ *
+FROM $processed;
+
+SELECT
+ COUNT(*)
+FROM $processed;
diff --git a/yql/essentials/tests/sql/suites/produce/process_streaming_inline_bash.sql b/yql/essentials/tests/sql/suites/produce/process_streaming_inline_bash.sql
new file mode 100644
index 0000000000..5737cf807b
--- /dev/null
+++ b/yql/essentials/tests/sql/suites/produce/process_streaming_inline_bash.sql
@@ -0,0 +1,14 @@
+/* syntax version 1 */
+/* postgres can not */
+-- not supported on windows
+
+$script = @@
+#!/bin/bash
+cat - | grep $1 | head -n 3 | grep [234]
+@@;
+
+$input = (
+ SELECT String::JoinFromList(AsList(key, subkey, value), ",") AS Data FROM plato.Input1
+);
+
+PROCESS $input USING Streaming::ProcessInline(TableRows(), $script, AsList("bar"));
diff --git a/yql/essentials/tests/sql/suites/produce/process_trivial_as_struct.sql b/yql/essentials/tests/sql/suites/produce/process_trivial_as_struct.sql
new file mode 100644
index 0000000000..a948124640
--- /dev/null
+++ b/yql/essentials/tests/sql/suites/produce/process_trivial_as_struct.sql
@@ -0,0 +1,2 @@
+/* postgres can not */
+process plato.Input0 using SimpleUdf::Echo(value) as val; \ No newline at end of file
diff --git a/yql/essentials/tests/sql/suites/produce/process_with_assume.cfg b/yql/essentials/tests/sql/suites/produce/process_with_assume.cfg
new file mode 100644
index 0000000000..66737248b8
--- /dev/null
+++ b/yql/essentials/tests/sql/suites/produce/process_with_assume.cfg
@@ -0,0 +1,2 @@
+in Input sorted.txt
+out Output output.txt
diff --git a/yql/essentials/tests/sql/suites/produce/process_with_assume.sql b/yql/essentials/tests/sql/suites/produce/process_with_assume.sql
new file mode 100644
index 0000000000..ddb1353299
--- /dev/null
+++ b/yql/essentials/tests/sql/suites/produce/process_with_assume.sql
@@ -0,0 +1,12 @@
+/* postgres can not */
+/* multirun can not */
+/* syntax version 1 */
+use plato;
+
+$udf = YQL::@@(lambda '(x)
+(FlatMap x
+ (lambda '(y) (Just (AsStruct '('key (Concat (String '"0") (Member y 'key))) '('subkey (Member y 'subkey)) '('value (Member y 'value)))))
+))@@;
+
+insert into Output with truncate
+process plato.Input using $udf(TableRows()) assume order by key;
diff --git a/yql/essentials/tests/sql/suites/produce/process_with_lambda.sql b/yql/essentials/tests/sql/suites/produce/process_with_lambda.sql
new file mode 100644
index 0000000000..e0886faaf8
--- /dev/null
+++ b/yql/essentials/tests/sql/suites/produce/process_with_lambda.sql
@@ -0,0 +1,8 @@
+/* syntax version 1 */
+/* postgres can not */
+$udf = YQL::@@(lambda '(x)
+(FlatMap x
+ (lambda '(y) (AsList y y))
+))@@;
+
+process plato.Input0 using $udf(TableRows());
diff --git a/yql/essentials/tests/sql/suites/produce/process_with_lambda_outstream.sql b/yql/essentials/tests/sql/suites/produce/process_with_lambda_outstream.sql
new file mode 100644
index 0000000000..0c6c100764
--- /dev/null
+++ b/yql/essentials/tests/sql/suites/produce/process_with_lambda_outstream.sql
@@ -0,0 +1,27 @@
+/* syntax version 1 */
+/* postgres can not */
+USE plato;
+
+$f1 = ($r)->{
+ return $r;
+};
+
+PROCESS Input0 USING $f1(TableRow());
+
+$f2 = ($r)->{
+ return Just($r);
+};
+
+PROCESS Input0 USING $f2(TableRow());
+
+$f3 = ($r)->{
+ return AsList($r,$r);
+};
+
+PROCESS Input0 USING $f3(TableRow());
+
+$f4 = ($r)->{
+ return Yql::Iterator(AsList($r,$r));
+};
+
+PROCESS Input0 USING $f4(TableRow());
diff --git a/yql/essentials/tests/sql/suites/produce/process_with_python.sql b/yql/essentials/tests/sql/suites/produce/process_with_python.sql
new file mode 100644
index 0000000000..caeb8c2cb3
--- /dev/null
+++ b/yql/essentials/tests/sql/suites/produce/process_with_python.sql
@@ -0,0 +1,10 @@
+/* postgres can not */
+/* syntax version 1 */
+$udfScript = @@
+def Dup(s):
+ return [{"value":s},{"value":s}]
+@@;
+
+$udf = Python::Dup(Callable<(String)->List<Struct<value:String>>>, $udfScript);
+
+process plato.Input0 using $udf(value);
diff --git a/yql/essentials/tests/sql/suites/produce/process_with_python_as_struct.sql b/yql/essentials/tests/sql/suites/produce/process_with_python_as_struct.sql
new file mode 100644
index 0000000000..237c2abef8
--- /dev/null
+++ b/yql/essentials/tests/sql/suites/produce/process_with_python_as_struct.sql
@@ -0,0 +1,10 @@
+/* postgres can not */
+/* syntax version 1 */
+$udfScript = @@
+def Dup(s):
+ return [s, s];
+@@;
+
+$udf = Python::Dup(Callable<(String)->List<String>>, $udfScript);
+
+process plato.Input0 using $udf(value) as val;
diff --git a/yql/essentials/tests/sql/suites/produce/process_with_python_stream-empty.cfg b/yql/essentials/tests/sql/suites/produce/process_with_python_stream-empty.cfg
new file mode 100644
index 0000000000..837223c684
--- /dev/null
+++ b/yql/essentials/tests/sql/suites/produce/process_with_python_stream-empty.cfg
@@ -0,0 +1,2 @@
+in Input0 empty.txt
+udf python3_udf
diff --git a/yql/essentials/tests/sql/suites/produce/process_with_python_stream.cfg b/yql/essentials/tests/sql/suites/produce/process_with_python_stream.cfg
new file mode 100644
index 0000000000..e34e70ff55
--- /dev/null
+++ b/yql/essentials/tests/sql/suites/produce/process_with_python_stream.cfg
@@ -0,0 +1,3 @@
+in Input0 input0.txt
+udf python3_udf
+providers yt
diff --git a/yql/essentials/tests/sql/suites/produce/process_with_python_stream.sql b/yql/essentials/tests/sql/suites/produce/process_with_python_stream.sql
new file mode 100644
index 0000000000..33a0b8f8f7
--- /dev/null
+++ b/yql/essentials/tests/sql/suites/produce/process_with_python_stream.sql
@@ -0,0 +1,23 @@
+/* syntax version 1 */
+/* kikimr can not */
+USE plato;
+
+$udfScript = @@
+def f(input,x):
+ for i in input:
+ yield {
+ 'key': i.key,
+ 'subkey': i.subkey,
+ 'value': i.value,
+ 'pass': x
+ }
+@@;
+
+$udf_stream = Python3::f(
+Callable<
+ (Stream<Struct<key:String,subkey:String,value:String>>,Int32)
+ ->
+ Stream<Struct<key:String,subkey:String,value:String,pass:Int32>>
+>, $udfScript);
+
+PROCESS Input0 using $udf_stream(TableRows(), 2);
diff --git a/yql/essentials/tests/sql/suites/produce/process_with_udf.sql b/yql/essentials/tests/sql/suites/produce/process_with_udf.sql
new file mode 100644
index 0000000000..f8fb326983
--- /dev/null
+++ b/yql/essentials/tests/sql/suites/produce/process_with_udf.sql
@@ -0,0 +1,2 @@
+/* postgres can not */
+process plato.Input0 using Person::New(key, subkey, coalesce(cast(value as Uint32), 0)); \ No newline at end of file
diff --git a/yql/essentials/tests/sql/suites/produce/process_with_udf_rows.sql b/yql/essentials/tests/sql/suites/produce/process_with_udf_rows.sql
new file mode 100644
index 0000000000..3df9f4df1b
--- /dev/null
+++ b/yql/essentials/tests/sql/suites/produce/process_with_udf_rows.sql
@@ -0,0 +1,17 @@
+/* postgres can not */
+$udf = Python::process(
+Callable<
+ ()->Stream<Struct<result:Int64>>
+>, @@
+def process():
+ for row in range(10):
+ result = row
+ yield locals()
+@@);
+
+$users = (
+ SELECT `key` AS age, `value` AS name FROM plato.Input0
+);
+
+PROCESS $users
+USING $udf();
diff --git a/yql/essentials/tests/sql/suites/produce/process_with_udf_validate.sql b/yql/essentials/tests/sql/suites/produce/process_with_udf_validate.sql
new file mode 100644
index 0000000000..e5082fdd77
--- /dev/null
+++ b/yql/essentials/tests/sql/suites/produce/process_with_udf_validate.sql
@@ -0,0 +1,7 @@
+/* postgres can not */
+$processed = (
+ process plato.Input0 using Person::New(key, subkey, coalesce(cast(value as Uint32), 0))
+);
+
+PRAGMA config.flags("ValidateUdf", "Lazy");
+SELECT * FROM $processed;
diff --git a/yql/essentials/tests/sql/suites/produce/process_with_udf_validate_ignore_broken.sql b/yql/essentials/tests/sql/suites/produce/process_with_udf_validate_ignore_broken.sql
new file mode 100644
index 0000000000..de471f14f3
--- /dev/null
+++ b/yql/essentials/tests/sql/suites/produce/process_with_udf_validate_ignore_broken.sql
@@ -0,0 +1,8 @@
+/* postgres can not */
+
+$processed = (
+ process plato.Input0 using Person::New(key, subkey, Length(SimpleUdf::ReturnBrokenInt()))
+);
+
+PRAGMA config.flags("ValidateUdf", "None");
+SELECT * FROM $processed;
diff --git a/yql/essentials/tests/sql/suites/produce/reduce_all.sql b/yql/essentials/tests/sql/suites/produce/reduce_all.sql
new file mode 100644
index 0000000000..89362a4454
--- /dev/null
+++ b/yql/essentials/tests/sql/suites/produce/reduce_all.sql
@@ -0,0 +1,16 @@
+/* syntax version 1 */
+/* postgres can not */
+/* dqfile can not */
+USE plato;
+
+$udfScript = @@
+import functools
+def Len(stream):
+ sums = [functools.reduce(lambda x,y: x + int(y.value), pair[1], 0) for pair in stream]
+ return {"sumByAllVal":functools.reduce(lambda x,y: x + y, sums, 0)}
+@@;
+
+$udf = Python::Len(Callable<(Stream<Tuple<String,Stream<Struct<key:String,subkey:String,value:String>>>>)->Struct<sumByAllVal:Uint32>>, $udfScript);
+
+--INSERT INTO Output
+REDUCE Input1 ON key USING ALL $udf(TableRow());
diff --git a/yql/essentials/tests/sql/suites/produce/reduce_all_expr.sql b/yql/essentials/tests/sql/suites/produce/reduce_all_expr.sql
new file mode 100644
index 0000000000..d4e194802f
--- /dev/null
+++ b/yql/essentials/tests/sql/suites/produce/reduce_all_expr.sql
@@ -0,0 +1,16 @@
+/* postgres can not */
+/* syntax version 1 */
+/* dqfile can not */
+USE plato;
+
+$udfScript = @@
+import functools
+def Len(stream):
+ sums = [functools.reduce(lambda x,y: x + y, pair[1], 0) for pair in stream]
+ return {"sumByAllVal":functools.reduce(lambda x,y: x + y, sums, 0)}
+@@;
+
+$udf = Python::Len(Callable<(Stream<Tuple<String,Stream<Uint32>>>)->Struct<sumByAllVal:Uint32>>, $udfScript);
+
+--INSERT INTO Output
+REDUCE Input1 ON key USING ALL $udf(cast(value as uint32) ?? 0);
diff --git a/yql/essentials/tests/sql/suites/produce/reduce_all_list.sql b/yql/essentials/tests/sql/suites/produce/reduce_all_list.sql
new file mode 100644
index 0000000000..ec08c7922d
--- /dev/null
+++ b/yql/essentials/tests/sql/suites/produce/reduce_all_list.sql
@@ -0,0 +1,18 @@
+/* syntax version 1 */
+/* postgres can not */
+/* dq can not */
+/* dqfile can not */
+USE plato;
+
+$udfScript = @@
+import functools
+
+def Len(stream):
+ sums = [functools.reduce(lambda x,y: x + int(y.value), pair[1], 0) for pair in stream]
+ return [{"sumByAllVal":functools.reduce(lambda x,y: x + y, sums, 0)}]
+@@;
+
+$udf = Python::Len(Callable<(Stream<Tuple<String,Stream<Struct<key:String,subkey:String,value:String>>>>)->List<Struct<sumByAllVal:Uint32>>>, $udfScript);
+
+--INSERT INTO Output
+REDUCE Input1 ON key USING ALL $udf(TableRow());
diff --git a/yql/essentials/tests/sql/suites/produce/reduce_all_list_stream.sql b/yql/essentials/tests/sql/suites/produce/reduce_all_list_stream.sql
new file mode 100644
index 0000000000..5e154426c0
--- /dev/null
+++ b/yql/essentials/tests/sql/suites/produce/reduce_all_list_stream.sql
@@ -0,0 +1,25 @@
+/* syntax version 1 */
+/* postgres can not */
+USE plato;
+
+$udfScript = @@
+import functools
+from yql import TYieldIteration
+
+def Sum(stream):
+ def Gen(stream):
+ sums = []
+ for pair in stream:
+ if isinstance(pair, TYieldIteration):
+ yield pair
+ else:
+ sums.append(functools.reduce(lambda x,y: x + int(y.value), pair[1], 0))
+
+ yield {"sumByAllVal":functools.reduce(lambda x,y: x + y, sums, 0)}
+ return Gen(stream)
+@@;
+
+$udf = Python3::Sum(Callable<(Stream<Tuple<String,Stream<Struct<key:String,subkey:String,value:String>>>>)->Stream<Struct<sumByAllVal:Uint32>>>, $udfScript);
+
+--INSERT INTO Output
+REDUCE Input1 ON key USING ALL $udf(TableRow());
diff --git a/yql/essentials/tests/sql/suites/produce/reduce_all_multi_in.sql b/yql/essentials/tests/sql/suites/produce/reduce_all_multi_in.sql
new file mode 100644
index 0000000000..6f6750898e
--- /dev/null
+++ b/yql/essentials/tests/sql/suites/produce/reduce_all_multi_in.sql
@@ -0,0 +1,15 @@
+/* syntax version 1 */
+/* postgres can not */
+/* dqfile can not */
+USE plato;
+
+$udfScript = @@
+import functools
+def Len(stream):
+ sums = [functools.reduce(lambda x,y: x + int(y[1].value), pair[1], 0) for pair in stream]
+ return {"sumByAllVal":functools.reduce(lambda x,y: x + y, sums, 0)}
+@@;
+
+$udf = Python::Len(Callable<(Stream<Tuple<String,Stream<Variant<Struct<key:String,subkey:String,value:String>,Struct<key:String,subkey:String,value:String>>>>>)->Struct<sumByAllVal:Uint32>>, $udfScript);
+
+REDUCE Input1, Input1 ON key USING ALL $udf(TableRow());
diff --git a/yql/essentials/tests/sql/suites/produce/reduce_all_opt.sql b/yql/essentials/tests/sql/suites/produce/reduce_all_opt.sql
new file mode 100644
index 0000000000..467be50696
--- /dev/null
+++ b/yql/essentials/tests/sql/suites/produce/reduce_all_opt.sql
@@ -0,0 +1,16 @@
+/* syntax version 1 */
+/* postgres can not */
+/* dqfile can not */
+USE plato;
+
+$udfScript = @@
+import functools
+def Len(stream):
+ sums = [functools.reduce(lambda x,y: x + int(y.value), pair[1], 0) for pair in stream]
+ return {"sumByAllVal":functools.reduce(lambda x,y: x + y, sums, 0)}
+@@;
+
+$udf = Python::Len(Callable<(Stream<Tuple<String,Stream<Struct<key:String,subkey:String,value:String>>>>)->Optional<Struct<sumByAllVal:Uint32>>>, $udfScript);
+
+--INSERT INTO Output
+REDUCE Input1 ON key USING ALL $udf(TableRow());
diff --git a/yql/essentials/tests/sql/suites/produce/reduce_all_with_python_input_stream-dq_fail.cfg b/yql/essentials/tests/sql/suites/produce/reduce_all_with_python_input_stream-dq_fail.cfg
new file mode 100644
index 0000000000..c426164726
--- /dev/null
+++ b/yql/essentials/tests/sql/suites/produce/reduce_all_with_python_input_stream-dq_fail.cfg
@@ -0,0 +1,4 @@
+xfail
+in Input1 input1.txt
+udf python3_udf
+providers dq
diff --git a/yql/essentials/tests/sql/suites/produce/reduce_all_with_python_input_stream._sql b/yql/essentials/tests/sql/suites/produce/reduce_all_with_python_input_stream._sql
new file mode 100644
index 0000000000..82b3767cc5
--- /dev/null
+++ b/yql/essentials/tests/sql/suites/produce/reduce_all_with_python_input_stream._sql
@@ -0,0 +1,14 @@
+/* postgres can not */
+USE plato;
+
+$udfScript = @@
+import functools
+def Len(stream):
+ sums = [functools.reduce(lambda x,y: x + y, pair[1], 0) for pair in stream]
+ return {"sumByAllVal":functools.reduce(lambda x,y: x + y, sums, 0)}
+@@;
+
+$udf = Python3::Len(Callable<(Stream<Tuple<String,Stream<Uint32>>>)->Struct<sumByAllVal:Uint32>>, $udfScript);
+
+--INSERT INTO Output
+REDUCE Input1 ON key USING ALL $udf(cast(value as uint32) ?? 0);
diff --git a/yql/essentials/tests/sql/suites/produce/reduce_all_with_python_input_stream.cfg b/yql/essentials/tests/sql/suites/produce/reduce_all_with_python_input_stream.cfg
new file mode 100644
index 0000000000..13bb8734c4
--- /dev/null
+++ b/yql/essentials/tests/sql/suites/produce/reduce_all_with_python_input_stream.cfg
@@ -0,0 +1,3 @@
+in Input1 input1.txt
+udf python3_udf
+providers yt
diff --git a/yql/essentials/tests/sql/suites/produce/reduce_by_struct.sql b/yql/essentials/tests/sql/suites/produce/reduce_by_struct.sql
new file mode 100644
index 0000000000..a36223f7a4
--- /dev/null
+++ b/yql/essentials/tests/sql/suites/produce/reduce_by_struct.sql
@@ -0,0 +1,16 @@
+USE plato;
+
+INSERT INTO @tmp
+SELECT * FROM AS_TABLE([
+ <|key: <|field1: 1, field2: 1|>, value: 1|>,
+ <|key: <|field1: 1, field2: 1|>, value: 2|>,
+ ]);
+
+COMMIT;
+
+$reducer = ($_key, $stream) -> ($stream);
+
+REDUCE @tmp
+ON key
+USING $reducer(TableRow());
+
diff --git a/yql/essentials/tests/sql/suites/produce/reduce_lambda.sql b/yql/essentials/tests/sql/suites/produce/reduce_lambda.sql
new file mode 100644
index 0000000000..74d1372d73
--- /dev/null
+++ b/yql/essentials/tests/sql/suites/produce/reduce_lambda.sql
@@ -0,0 +1,11 @@
+/* postgres can not */
+USE plato;
+
+$udf = YQL::@@(lambda '(key stream) (AsStruct
+ '('key key) '('summ (Collect (Condense stream (Uint32 '0) (lambda '(item state) (Bool 'False)) (lambda '(item state) (Add state item)))))
+))@@;
+
+--INSERT INTO Output
+$res = (REDUCE Input1 ON key USING $udf(cast(value as uint32) ?? 0));
+
+select * from $res order by key;
diff --git a/yql/essentials/tests/sql/suites/produce/reduce_lambda_list_mem.sql b/yql/essentials/tests/sql/suites/produce/reduce_lambda_list_mem.sql
new file mode 100644
index 0000000000..4593e67ba3
--- /dev/null
+++ b/yql/essentials/tests/sql/suites/produce/reduce_lambda_list_mem.sql
@@ -0,0 +1,11 @@
+/* postgres can not */
+USE plato;
+
+$udf = YQL::@@(lambda '(key stream) (AsStruct
+ '('key key) '('summ (Collect (Condense stream (Uint32 '0) (lambda '(item state) (Bool 'False)) (lambda '(item state) (Add state item)))))
+))@@;
+
+--INSERT INTO Output
+$res = (REDUCE (select AsList("foo") as key, "123" as value) ON key USING $udf(cast(value as uint32) ?? 0));
+
+select * from $res order by key;
diff --git a/yql/essentials/tests/sql/suites/produce/reduce_lambda_list_table.sql b/yql/essentials/tests/sql/suites/produce/reduce_lambda_list_table.sql
new file mode 100644
index 0000000000..39aa5ec294
--- /dev/null
+++ b/yql/essentials/tests/sql/suites/produce/reduce_lambda_list_table.sql
@@ -0,0 +1,11 @@
+/* postgres can not */
+USE plato;
+
+$udf = YQL::@@(lambda '(key stream) (AsStruct
+ '('key key) '('summ (Collect (Condense stream (Uint32 '0) (lambda '(item state) (Bool 'False)) (lambda '(item state) (Add state item)))))
+))@@;
+
+--INSERT INTO Output
+$res = (REDUCE (select AsList(key) as key, value from Input1) ON key USING $udf(cast(value as uint32) ?? 0));
+
+select * from $res order by key;
diff --git a/yql/essentials/tests/sql/suites/produce/reduce_lambda_presort_twin.sql b/yql/essentials/tests/sql/suites/produce/reduce_lambda_presort_twin.sql
new file mode 100644
index 0000000000..ca37608c70
--- /dev/null
+++ b/yql/essentials/tests/sql/suites/produce/reduce_lambda_presort_twin.sql
@@ -0,0 +1,13 @@
+/* postgres can not */
+USE plato;
+
+$udf = YQL::@@(lambda '(key stream) (AsStruct
+ '('key key) '('superstring (Collect (Condense stream (String '"") (lambda '(item state) (Bool 'False)) (lambda '(item state)
+ (Concat state (Concat (Member item 'char) (Member item 'num)))
+ ))))
+))@@;
+
+--INSERT INTO Output
+$res = (REDUCE Input1 PRESORT subkey, value desc ON key USING $udf(AsStruct(subkey as char, value as num)));
+
+select * from $res order by key;
diff --git a/yql/essentials/tests/sql/suites/produce/reduce_lambda_presort_twin_list.sql b/yql/essentials/tests/sql/suites/produce/reduce_lambda_presort_twin_list.sql
new file mode 100644
index 0000000000..65225b9ffa
--- /dev/null
+++ b/yql/essentials/tests/sql/suites/produce/reduce_lambda_presort_twin_list.sql
@@ -0,0 +1,13 @@
+/* postgres can not */
+USE plato;
+
+$udf = YQL::@@(lambda '(key stream) (AsStruct
+ '('key key) '('superstring (Collect (Condense stream (String '"") (lambda '(item state) (Bool 'False)) (lambda '(item state)
+ (Concat state (Concat (Member item 'char) (Member item 'num)))
+ ))))
+))@@;
+
+--INSERT INTO Output
+$res = (REDUCE Input1 PRESORT AsList(subkey), value desc ON key USING $udf(AsStruct(subkey as char, value as num)));
+
+select * from $res order by key;
diff --git a/yql/essentials/tests/sql/suites/produce/reduce_multi_in-empty.cfg b/yql/essentials/tests/sql/suites/produce/reduce_multi_in-empty.cfg
new file mode 100644
index 0000000000..ce36bb4337
--- /dev/null
+++ b/yql/essentials/tests/sql/suites/produce/reduce_multi_in-empty.cfg
@@ -0,0 +1,2 @@
+in Input empty.txt
+res result.txt
diff --git a/yql/essentials/tests/sql/suites/produce/reduce_multi_in-sorted.cfg b/yql/essentials/tests/sql/suites/produce/reduce_multi_in-sorted.cfg
new file mode 100644
index 0000000000..3f5bebb5a2
--- /dev/null
+++ b/yql/essentials/tests/sql/suites/produce/reduce_multi_in-sorted.cfg
@@ -0,0 +1,2 @@
+in Input sorted.txt
+res result.txt
diff --git a/yql/essentials/tests/sql/suites/produce/reduce_multi_in.cfg b/yql/essentials/tests/sql/suites/produce/reduce_multi_in.cfg
new file mode 100644
index 0000000000..4468d3ba29
--- /dev/null
+++ b/yql/essentials/tests/sql/suites/produce/reduce_multi_in.cfg
@@ -0,0 +1,2 @@
+in Input input0.txt
+res result.txt
diff --git a/yql/essentials/tests/sql/suites/produce/reduce_multi_in.sql b/yql/essentials/tests/sql/suites/produce/reduce_multi_in.sql
new file mode 100644
index 0000000000..c819fb4a58
--- /dev/null
+++ b/yql/essentials/tests/sql/suites/produce/reduce_multi_in.sql
@@ -0,0 +1,22 @@
+/* syntax version 1 */
+/* postgres can not */
+USE plato;
+
+$udf = YQL::@@
+(lambda '(key stream)
+ (PartitionByKey stream
+ (lambda '(item) (Way item))
+ (Void)
+ (Void)
+ (lambda '(listOfPairs)
+ (FlatMap listOfPairs
+ (lambda '(pair) (Just (AsStruct '('key key) '('src (Nth pair '0)) '('cnt (Length (ForwardList (Nth pair '1)))))))
+ )
+ )
+ )
+)
+@@;
+
+$r = (REDUCE Input, Input ON key USING $udf(TableRow()));
+
+SELECT key, src, cnt FROM $r ORDER BY key, src, cnt;
diff --git a/yql/essentials/tests/sql/suites/produce/reduce_multi_in_difftype.cfg b/yql/essentials/tests/sql/suites/produce/reduce_multi_in_difftype.cfg
new file mode 100644
index 0000000000..4468d3ba29
--- /dev/null
+++ b/yql/essentials/tests/sql/suites/produce/reduce_multi_in_difftype.cfg
@@ -0,0 +1,2 @@
+in Input input0.txt
+res result.txt
diff --git a/yql/essentials/tests/sql/suites/produce/reduce_multi_in_difftype.sql b/yql/essentials/tests/sql/suites/produce/reduce_multi_in_difftype.sql
new file mode 100644
index 0000000000..b867fc4fdc
--- /dev/null
+++ b/yql/essentials/tests/sql/suites/produce/reduce_multi_in_difftype.sql
@@ -0,0 +1,22 @@
+/* syntax version 1 */
+/* postgres can not */
+USE plato;
+
+$udf = YQL::@@
+(lambda '(key stream)
+ (PartitionByKey stream
+ (lambda '(item) (Way item))
+ (Void)
+ (Void)
+ (lambda '(listOfPairs)
+ (FlatMap listOfPairs
+ (lambda '(pair) (Just (AsStruct '('key key) '('src (Nth pair '0)) '('cnt (Length (ForwardList (Nth pair '1)))))))
+ )
+ )
+ )
+)
+@@;
+
+$r = (REDUCE Input, `Input{key,subkey}` ON key USING $udf(TableRow()));
+
+SELECT key, src, cnt FROM $r ORDER BY key, src, cnt;
diff --git a/yql/essentials/tests/sql/suites/produce/reduce_multi_in_difftype_assume.cfg b/yql/essentials/tests/sql/suites/produce/reduce_multi_in_difftype_assume.cfg
new file mode 100644
index 0000000000..66737248b8
--- /dev/null
+++ b/yql/essentials/tests/sql/suites/produce/reduce_multi_in_difftype_assume.cfg
@@ -0,0 +1,2 @@
+in Input sorted.txt
+out Output output.txt
diff --git a/yql/essentials/tests/sql/suites/produce/reduce_multi_in_difftype_assume.sql b/yql/essentials/tests/sql/suites/produce/reduce_multi_in_difftype_assume.sql
new file mode 100644
index 0000000000..159e80f995
--- /dev/null
+++ b/yql/essentials/tests/sql/suites/produce/reduce_multi_in_difftype_assume.sql
@@ -0,0 +1,46 @@
+/* syntax version 1 */
+/* postgres can not */
+USE plato;
+
+$user_process = ($key, $t1, $t2, $t3) -> {
+ return AsStruct(
+ $key AS key,
+ COALESCE(cast($t1.subkey as Int32), 0) + COALESCE(cast($t2.subkey as Int32), 0) + COALESCE(cast($t3.subkey as Int32), 0) AS subkey
+ );
+};
+
+$reducer = ($key, $stream) -> {
+ $stream = YQL::OrderedMap($stream, ($item) -> {
+ return AsStruct(
+ YQL::Guess($item, AsAtom("0")).t1 AS t1,
+ YQL::Guess($item, AsAtom("1")).t2 AS t2,
+ YQL::Guess($item, AsAtom("2")).t3 AS t3,
+ );
+ });
+ $recs = YQL::Collect(YQL::Condense1(
+ $stream,
+ ($item) -> {return AsStruct(
+ $item.t1 AS t1,
+ $item.t2 AS t2,
+ $item.t3 AS t3,
+ );},
+ ($_item, $_state) -> {return false;},
+ ($item, $state) -> {return AsStruct(
+ COALESCE($state.t1, $item.t1) AS t1,
+ COALESCE($state.t2, $item.t2) AS t2,
+ COALESCE($state.t3, $item.t3) AS t3,
+ );},
+ ));
+ $rec = Ensure($recs, ListLength($recs) == 1ul)[0];
+ return $user_process($key, $rec.t1, $rec.t2, $rec.t3);
+};
+
+INSERT INTO Output WITH TRUNCATE
+REDUCE
+ (SELECT key, TableRow() AS t1 FROM Input),
+ (SELECT key, TableRow() AS t2 FROM Input),
+ (SELECT key, TableRow() AS t3 FROM Input)
+ON key
+USING $reducer(TableRow())
+ASSUME ORDER BY key;
+
diff --git a/yql/essentials/tests/sql/suites/produce/reduce_multi_in_difftype_assume_keytuple.cfg b/yql/essentials/tests/sql/suites/produce/reduce_multi_in_difftype_assume_keytuple.cfg
new file mode 100644
index 0000000000..66737248b8
--- /dev/null
+++ b/yql/essentials/tests/sql/suites/produce/reduce_multi_in_difftype_assume_keytuple.cfg
@@ -0,0 +1,2 @@
+in Input sorted.txt
+out Output output.txt
diff --git a/yql/essentials/tests/sql/suites/produce/reduce_multi_in_difftype_assume_keytuple.sql b/yql/essentials/tests/sql/suites/produce/reduce_multi_in_difftype_assume_keytuple.sql
new file mode 100644
index 0000000000..e1f71c3953
--- /dev/null
+++ b/yql/essentials/tests/sql/suites/produce/reduce_multi_in_difftype_assume_keytuple.sql
@@ -0,0 +1,45 @@
+/* syntax version 1 */
+/* postgres can not */
+USE plato;
+
+$user_process = ($key, $t1, $t2, $t3) -> {
+ return AsStruct(
+ $key.0 AS key,
+ COALESCE(cast($t1.subkey as Int32), 0) + COALESCE(cast($t2.subkey as Int32), 0) + COALESCE(cast($t3.subkey as Int32), 0) AS subkey
+ );
+};
+
+$reducer = ($key, $stream) -> {
+ $stream = YQL::OrderedMap($stream, ($item) -> {
+ return AsStruct(
+ YQL::Guess($item, AsAtom("0")).t1 AS t1,
+ YQL::Guess($item, AsAtom("1")).t2 AS t2,
+ YQL::Guess($item, AsAtom("2")).t3 AS t3,
+ );
+ });
+ $recs = YQL::Collect(YQL::Condense1(
+ $stream,
+ ($item) -> {return AsStruct(
+ $item.t1 AS t1,
+ $item.t2 AS t2,
+ $item.t3 AS t3,
+ );},
+ ($_item, $_state) -> {return false;},
+ ($item, $state) -> {return AsStruct(
+ COALESCE($state.t1, $item.t1) AS t1,
+ COALESCE($state.t2, $item.t2) AS t2,
+ COALESCE($state.t3, $item.t3) AS t3,
+ );},
+ ));
+ $rec = Ensure($recs, ListLength($recs) == 1ul)[0];
+ return $user_process($key, $rec.t1, $rec.t2, $rec.t3);
+};
+
+INSERT INTO Output WITH TRUNCATE
+REDUCE
+ (SELECT key, subkey, TableRow() AS t1 FROM Input),
+ (SELECT key, subkey, TableRow() AS t2 FROM Input),
+ (SELECT key, subkey, TableRow() AS t3 FROM Input)
+ON key, subkey
+USING $reducer(TableRow())
+ASSUME ORDER BY key, subkey;
diff --git a/yql/essentials/tests/sql/suites/produce/reduce_multi_in_keytuple.cfg b/yql/essentials/tests/sql/suites/produce/reduce_multi_in_keytuple.cfg
new file mode 100644
index 0000000000..4468d3ba29
--- /dev/null
+++ b/yql/essentials/tests/sql/suites/produce/reduce_multi_in_keytuple.cfg
@@ -0,0 +1,2 @@
+in Input input0.txt
+res result.txt
diff --git a/yql/essentials/tests/sql/suites/produce/reduce_multi_in_keytuple.sql b/yql/essentials/tests/sql/suites/produce/reduce_multi_in_keytuple.sql
new file mode 100644
index 0000000000..fe9bd349ae
--- /dev/null
+++ b/yql/essentials/tests/sql/suites/produce/reduce_multi_in_keytuple.sql
@@ -0,0 +1,21 @@
+/* syntax version 1 */
+/* postgres can not */
+USE plato;
+$udf = YQL::@@
+(lambda '(key stream)
+ (PartitionByKey stream
+ (lambda '(item) (Way item))
+ (Void)
+ (Void)
+ (lambda '(listOfPairs)
+ (FlatMap listOfPairs
+ (lambda '(pair) (Just (AsStruct '('key (Nth key '0)) '('src (Nth pair '0)) '('cnt (Length (ForwardList (Nth pair '1)))))))
+ )
+ )
+ )
+)
+@@;
+
+$r = (REDUCE Input, Input ON key,subkey USING $udf(TableRow()));
+
+SELECT key, src, cnt FROM $r ORDER BY key, src, cnt;
diff --git a/yql/essentials/tests/sql/suites/produce/reduce_multi_in_keytuple_difftype.cfg b/yql/essentials/tests/sql/suites/produce/reduce_multi_in_keytuple_difftype.cfg
new file mode 100644
index 0000000000..4468d3ba29
--- /dev/null
+++ b/yql/essentials/tests/sql/suites/produce/reduce_multi_in_keytuple_difftype.cfg
@@ -0,0 +1,2 @@
+in Input input0.txt
+res result.txt
diff --git a/yql/essentials/tests/sql/suites/produce/reduce_multi_in_keytuple_difftype.sql b/yql/essentials/tests/sql/suites/produce/reduce_multi_in_keytuple_difftype.sql
new file mode 100644
index 0000000000..ea6a236d22
--- /dev/null
+++ b/yql/essentials/tests/sql/suites/produce/reduce_multi_in_keytuple_difftype.sql
@@ -0,0 +1,22 @@
+/* syntax version 1 */
+/* postgres can not */
+USE plato;
+
+$udf = YQL::@@
+(lambda '(key stream)
+ (PartitionByKey stream
+ (lambda '(item) (Way item))
+ (Void)
+ (Void)
+ (lambda '(listOfPairs)
+ (FlatMap listOfPairs
+ (lambda '(pair) (Just (AsStruct '('key (Nth key '0)) '('src (Nth pair '0)) '('cnt (Length (ForwardList (Nth pair '1)))))))
+ )
+ )
+ )
+)
+@@;
+
+$r = (REDUCE Input, `Input{key,subkey}` ON key,subkey USING $udf(TableRow()));
+
+SELECT key, src, cnt FROM $r ORDER BY key, src, cnt;
diff --git a/yql/essentials/tests/sql/suites/produce/reduce_multi_in_presort.cfg b/yql/essentials/tests/sql/suites/produce/reduce_multi_in_presort.cfg
new file mode 100644
index 0000000000..4468d3ba29
--- /dev/null
+++ b/yql/essentials/tests/sql/suites/produce/reduce_multi_in_presort.cfg
@@ -0,0 +1,2 @@
+in Input input0.txt
+res result.txt
diff --git a/yql/essentials/tests/sql/suites/produce/reduce_multi_in_presort.sql b/yql/essentials/tests/sql/suites/produce/reduce_multi_in_presort.sql
new file mode 100644
index 0000000000..3712268059
--- /dev/null
+++ b/yql/essentials/tests/sql/suites/produce/reduce_multi_in_presort.sql
@@ -0,0 +1,22 @@
+/* syntax version 1 */
+/* postgres can not */
+USE plato;
+
+$udf = YQL::@@
+(lambda '(key stream)
+ (PartitionByKey stream
+ (lambda '(item) (Way item))
+ (Void)
+ (Void)
+ (lambda '(listOfPairs)
+ (FlatMap listOfPairs
+ (lambda '(pair) (Just (AsStruct '('key key) '('src (Nth pair '0)) '('cnt (Length (ForwardList (Nth pair '1)))))))
+ )
+ )
+ )
+)
+@@;
+
+$r = (REDUCE Input, Input PRESORT value, key || subkey ON key USING $udf(TableRow()));
+
+SELECT key, src, cnt FROM $r ORDER BY key, src, cnt;
diff --git a/yql/essentials/tests/sql/suites/produce/reduce_multi_in_ref.cfg b/yql/essentials/tests/sql/suites/produce/reduce_multi_in_ref.cfg
new file mode 100644
index 0000000000..4468d3ba29
--- /dev/null
+++ b/yql/essentials/tests/sql/suites/produce/reduce_multi_in_ref.cfg
@@ -0,0 +1,2 @@
+in Input input0.txt
+res result.txt
diff --git a/yql/essentials/tests/sql/suites/produce/reduce_multi_in_ref.sql b/yql/essentials/tests/sql/suites/produce/reduce_multi_in_ref.sql
new file mode 100644
index 0000000000..9587d1a453
--- /dev/null
+++ b/yql/essentials/tests/sql/suites/produce/reduce_multi_in_ref.sql
@@ -0,0 +1,24 @@
+/* syntax version 1 */
+/* postgres can not */
+USE plato;
+
+$udf = YQL::@@
+(lambda '(key stream)
+ (PartitionByKey stream
+ (lambda '(item) (Way item))
+ (Void)
+ (Void)
+ (lambda '(listOfPairs)
+ (FlatMap listOfPairs
+ (lambda '(pair) (Just (AsStruct '('key key) '('src (Nth pair '0)) '('cnt (Length (ForwardList (Nth pair '1)))))))
+ )
+ )
+ )
+)
+@@;
+
+$src = (select * from plato.Input where key > "200");
+
+$r = (REDUCE Input, (select * from Input where key > "100"), $src ON key USING $udf(TableRow()));
+
+SELECT key, src, cnt FROM $r ORDER BY key, src, cnt;
diff --git a/yql/essentials/tests/sql/suites/produce/reduce_multi_in_sampling-sorted.cfg b/yql/essentials/tests/sql/suites/produce/reduce_multi_in_sampling-sorted.cfg
new file mode 100644
index 0000000000..2334ceb124
--- /dev/null
+++ b/yql/essentials/tests/sql/suites/produce/reduce_multi_in_sampling-sorted.cfg
@@ -0,0 +1,2 @@
+in Input sorted1.txt
+res result.txt
diff --git a/yql/essentials/tests/sql/suites/produce/reduce_multi_in_sampling.cfg b/yql/essentials/tests/sql/suites/produce/reduce_multi_in_sampling.cfg
new file mode 100644
index 0000000000..fb4fb16059
--- /dev/null
+++ b/yql/essentials/tests/sql/suites/produce/reduce_multi_in_sampling.cfg
@@ -0,0 +1,2 @@
+in Input input2.txt
+res result.txt
diff --git a/yql/essentials/tests/sql/suites/produce/reduce_multi_in_sampling.sql b/yql/essentials/tests/sql/suites/produce/reduce_multi_in_sampling.sql
new file mode 100644
index 0000000000..e82a139d2e
--- /dev/null
+++ b/yql/essentials/tests/sql/suites/produce/reduce_multi_in_sampling.sql
@@ -0,0 +1,23 @@
+/* syntax version 1 */
+/* postgres can not */
+/* custom check: len(yt_res_yson[0]['Write'][0]['Data']) < 16 */
+USE plato;
+
+$udf = YQL::@@
+(lambda '(key stream)
+ (PartitionByKey stream
+ (lambda '(item) (Way item))
+ (Void)
+ (Void)
+ (lambda '(listOfPairs)
+ (FlatMap listOfPairs
+ (lambda '(pair) (Just (AsStruct '('key key) '('src (Nth pair '0)) '('cnt (Length (ForwardList (Nth pair '1)))))))
+ )
+ )
+ )
+)
+@@;
+
+$r = (REDUCE Input SAMPLE(0.1), Input SAMPLE(0.1) ON key USING $udf(TableRow()));
+
+SELECT key, src, cnt FROM $r ORDER BY key, src, cnt;
diff --git a/yql/essentials/tests/sql/suites/produce/reduce_multi_in_stage_and_flatmap.cfg b/yql/essentials/tests/sql/suites/produce/reduce_multi_in_stage_and_flatmap.cfg
new file mode 100644
index 0000000000..4468d3ba29
--- /dev/null
+++ b/yql/essentials/tests/sql/suites/produce/reduce_multi_in_stage_and_flatmap.cfg
@@ -0,0 +1,2 @@
+in Input input0.txt
+res result.txt
diff --git a/yql/essentials/tests/sql/suites/produce/reduce_multi_in_stage_and_flatmap.sql b/yql/essentials/tests/sql/suites/produce/reduce_multi_in_stage_and_flatmap.sql
new file mode 100644
index 0000000000..1e694e0280
--- /dev/null
+++ b/yql/essentials/tests/sql/suites/produce/reduce_multi_in_stage_and_flatmap.sql
@@ -0,0 +1,24 @@
+/* syntax version 1 */
+/* postgres can not */
+USE plato;
+
+$udf = YQL::@@
+(lambda '(key stream)
+ (PartitionByKey stream
+ (lambda '(item) (Way item))
+ (Void)
+ (Void)
+ (lambda '(listOfPairs)
+ (FlatMap listOfPairs
+ (lambda '(pair) (Just (AsStruct '('key key) '('src (Nth pair '0)) '('cnt (Length (ForwardList (Nth pair '1)))))))
+ )
+ )
+ )
+)
+@@;
+
+$r = (REDUCE Input, AS_TABLE(ListMap(ListFromRange(0,10), ($val) -> {
+ RETURN AsStruct(Cast($val AS String) AS key, Cast($val AS String) AS subkey, Cast($val AS String) AS value)
+})) ON key USING $udf(TableRow()));
+
+SELECT key, src, cnt FROM $r ORDER BY key, src, cnt; \ No newline at end of file
diff --git a/yql/essentials/tests/sql/suites/produce/reduce_multi_out.cfg b/yql/essentials/tests/sql/suites/produce/reduce_multi_out.cfg
new file mode 100644
index 0000000000..f73dce605c
--- /dev/null
+++ b/yql/essentials/tests/sql/suites/produce/reduce_multi_out.cfg
@@ -0,0 +1,4 @@
+in Input input1.txt
+res result.txt
+udf python2_udf
+providers yt
diff --git a/yql/essentials/tests/sql/suites/produce/reduce_multi_out.sql b/yql/essentials/tests/sql/suites/produce/reduce_multi_out.sql
new file mode 100644
index 0000000000..079ff55899
--- /dev/null
+++ b/yql/essentials/tests/sql/suites/produce/reduce_multi_out.sql
@@ -0,0 +1,18 @@
+/* syntax version 1 */
+/* postgres can not */
+USE plato;
+
+$udfScript = @@
+import functools
+
+def Len(key, input):
+ sumByValue = functools.reduce(lambda x,y: x + int(y.value), input, 0)
+ return (sumByValue % 2, {"sumByVal": sumByValue})
+@@;
+
+$udf = Python::Len(Callable<(String, Stream<Struct<key:String,subkey:String,value:String>>)->Variant<Struct<sumByVal:Uint32>,Struct<sumByVal:Uint32>>>, $udfScript);
+
+$i, $j = (REDUCE Input ON key USING $udf(TableRow()));
+
+select * from $i order by sumByVal;
+select * from $j order by sumByVal;
diff --git a/yql/essentials/tests/sql/suites/produce/reduce_subfields-sorted.cfg b/yql/essentials/tests/sql/suites/produce/reduce_subfields-sorted.cfg
new file mode 100644
index 0000000000..a709300fac
--- /dev/null
+++ b/yql/essentials/tests/sql/suites/produce/reduce_subfields-sorted.cfg
@@ -0,0 +1,2 @@
+in Input sorted.txt
+udf python3_udf
diff --git a/yql/essentials/tests/sql/suites/produce/reduce_subfields.cfg b/yql/essentials/tests/sql/suites/produce/reduce_subfields.cfg
new file mode 100644
index 0000000000..0245ac800c
--- /dev/null
+++ b/yql/essentials/tests/sql/suites/produce/reduce_subfields.cfg
@@ -0,0 +1,2 @@
+in Input input0.txt
+udf python3_udf
diff --git a/yql/essentials/tests/sql/suites/produce/reduce_subfields.sql b/yql/essentials/tests/sql/suites/produce/reduce_subfields.sql
new file mode 100644
index 0000000000..3518cac97e
--- /dev/null
+++ b/yql/essentials/tests/sql/suites/produce/reduce_subfields.sql
@@ -0,0 +1,25 @@
+/* postgres can not */
+/* syntax version 1 */
+USE plato;
+
+$udfScript = @@
+def f(key, input):
+ for i in input:
+ yield {
+ 'key': i.key,
+ 'value': i.value,
+ 'pass': 10
+ }
+@@;
+
+$udf_stream = Python3::f(
+Callable<
+ (String,Stream<Struct<key:String,value:String>>)
+ ->
+ Stream<Struct<key:String,value:String,pass:Int32>>
+>, $udfScript);
+
+
+REDUCE Input
+ON key
+USING $udf_stream(TableRow());
diff --git a/yql/essentials/tests/sql/suites/produce/reduce_typeinfo.cfg b/yql/essentials/tests/sql/suites/produce/reduce_typeinfo.cfg
new file mode 100644
index 0000000000..520f10d8f9
--- /dev/null
+++ b/yql/essentials/tests/sql/suites/produce/reduce_typeinfo.cfg
@@ -0,0 +1,3 @@
+in Input0 input0.txt
+udf simple_udf
+providers yt \ No newline at end of file
diff --git a/yql/essentials/tests/sql/suites/produce/reduce_typeinfo.sql b/yql/essentials/tests/sql/suites/produce/reduce_typeinfo.sql
new file mode 100644
index 0000000000..bebc744f63
--- /dev/null
+++ b/yql/essentials/tests/sql/suites/produce/reduce_typeinfo.sql
@@ -0,0 +1,18 @@
+/* postgres can not */
+/* syntax version 1 */
+/* ignore runonopt plan diff */
+use plato;
+
+pragma warning("disable", "4510");
+
+$r1 = REDUCE Input0 ON key USING ALL SimpleUdf::GenericAsStruct(TableRows());
+$r2 = REDUCE Input0 ON key USING SimpleUdf::GenericAsStruct(cast(TableRow().subkey as Int32));
+$r3 = REDUCE Input0 ON key USING ALL SimpleUdf::GenericAsStruct(TableRow().key);
+
+
+select * from (select * from $r1 flatten list by arg_0) flatten columns order by key, subkey;
+select arg_0 as key, ListSort(YQL::Collect(arg_1)) as values from $r2 order by key;
+
+
+select FormatType(TypeOf(TableRow())) from $r1 limit 1;
+select FormatType(TypeOf(TableRow())) from $r3 limit 1;
diff --git a/yql/essentials/tests/sql/suites/produce/reduce_with_assume.cfg b/yql/essentials/tests/sql/suites/produce/reduce_with_assume.cfg
new file mode 100644
index 0000000000..66737248b8
--- /dev/null
+++ b/yql/essentials/tests/sql/suites/produce/reduce_with_assume.cfg
@@ -0,0 +1,2 @@
+in Input sorted.txt
+out Output output.txt
diff --git a/yql/essentials/tests/sql/suites/produce/reduce_with_assume.sql b/yql/essentials/tests/sql/suites/produce/reduce_with_assume.sql
new file mode 100644
index 0000000000..3f51be0d12
--- /dev/null
+++ b/yql/essentials/tests/sql/suites/produce/reduce_with_assume.sql
@@ -0,0 +1,11 @@
+/* postgres can not */
+/* multirun can not */
+/* syntax version 1 */
+USE plato;
+
+$udf = YQL::@@(lambda '(key stream) (AsStruct
+ '('key key) '('sum (Collect (Condense stream (Uint32 '0) (lambda '(item state) (Bool 'False)) (lambda '(item state) (Add state item)))))
+))@@;
+
+INSERT INTO Output
+REDUCE Input ON key USING $udf(cast(subkey as uint32) ?? 0) ASSUME ORDER BY key;
diff --git a/yql/essentials/tests/sql/suites/produce/reduce_with_assume_in_subquery.cfg b/yql/essentials/tests/sql/suites/produce/reduce_with_assume_in_subquery.cfg
new file mode 100644
index 0000000000..144f0dd840
--- /dev/null
+++ b/yql/essentials/tests/sql/suites/produce/reduce_with_assume_in_subquery.cfg
@@ -0,0 +1 @@
+in Input input2.txt
diff --git a/yql/essentials/tests/sql/suites/produce/reduce_with_assume_in_subquery.sql b/yql/essentials/tests/sql/suites/produce/reduce_with_assume_in_subquery.sql
new file mode 100644
index 0000000000..3913353e12
--- /dev/null
+++ b/yql/essentials/tests/sql/suites/produce/reduce_with_assume_in_subquery.sql
@@ -0,0 +1,13 @@
+/* postgres can not */
+/* syntax version 1 */
+USE plato;
+
+$udf = YQL::@@(lambda '(key stream) (AsStruct
+ '('key key) '('summ (Collect (Condense stream (Nothing (OptionalType (DataType 'String))) (lambda '(item state) (Bool 'False)) (lambda '(item state) (Coalesce state (Just item))))))
+))@@;
+
+$in = (SELECT * FROM Input ASSUME ORDER BY key, subkey);
+
+$res = (REDUCE $in ON key USING $udf(value));
+
+SELECT * FROM $res ORDER BY key;
diff --git a/yql/essentials/tests/sql/suites/produce/reduce_with_flat_lambda.sql b/yql/essentials/tests/sql/suites/produce/reduce_with_flat_lambda.sql
new file mode 100644
index 0000000000..b0e1e56200
--- /dev/null
+++ b/yql/essentials/tests/sql/suites/produce/reduce_with_flat_lambda.sql
@@ -0,0 +1,9 @@
+/* syntax version 1 */
+/* kikimr can not */
+USE plato;
+
+$udf_stream = ($input)->{ return $input };
+
+$res = REDUCE Input0 ON key using all $udf_stream(TableRows());
+
+select * from $res order by value;
diff --git a/yql/essentials/tests/sql/suites/produce/reduce_with_flat_python_stream.sql b/yql/essentials/tests/sql/suites/produce/reduce_with_flat_python_stream.sql
new file mode 100644
index 0000000000..8d164a65c9
--- /dev/null
+++ b/yql/essentials/tests/sql/suites/produce/reduce_with_flat_python_stream.sql
@@ -0,0 +1,30 @@
+/* syntax version 1 */
+/* kikimr can not */
+USE plato;
+
+$udfScript = @@
+def f(input):
+ s = []
+ last_key = None
+ for i in input:
+ if last_key is not None and last_key != i.key:
+ s = []
+ s.append(i.value)
+ last_key = i.key
+ yield {
+ 'key': i.key,
+ 'subkey1': i.subkey,
+ 'value': b''.join(s),
+ }
+@@;
+
+$udf_stream = Python3::f(
+Callable<
+ (Stream<Struct<key:String,subkey:String,value:String>>)
+ ->
+ Stream<Struct<key:String,subkey1:String,value:String>>
+>, $udfScript);
+
+$res = REDUCE Input0 PRESORT value ON key using all $udf_stream(TableRows()) ;
+
+select * from $res order by key, value; \ No newline at end of file
diff --git a/yql/essentials/tests/sql/suites/produce/reduce_with_presort_diff_order.cfg b/yql/essentials/tests/sql/suites/produce/reduce_with_presort_diff_order.cfg
new file mode 100644
index 0000000000..1dc9a1e3a7
--- /dev/null
+++ b/yql/essentials/tests/sql/suites/produce/reduce_with_presort_diff_order.cfg
@@ -0,0 +1 @@
+in Input input0.txt
diff --git a/yql/essentials/tests/sql/suites/produce/reduce_with_presort_diff_order.sql b/yql/essentials/tests/sql/suites/produce/reduce_with_presort_diff_order.sql
new file mode 100644
index 0000000000..9bdca4b7d9
--- /dev/null
+++ b/yql/essentials/tests/sql/suites/produce/reduce_with_presort_diff_order.sql
@@ -0,0 +1,32 @@
+USE plato;
+
+insert into @skv1v2
+select key, subkey, value as value1, value as value2 from Input order by subkey, key, value1, value2;
+
+insert into @skv2v1
+select key, subkey, value as value1, value as value2 from Input order by subkey, key, value2, value1;
+
+insert into @ksv1v2
+select key, subkey, value as value1, value as value2 from Input order by key, subkey, value1, value2;
+
+commit;
+
+$udf = YQL::@@(lambda '(key stream) (AsStruct
+ '('key key) '('summ (Collect (Condense stream (Nothing (OptionalType (DataType 'String))) (lambda '(item state) (Bool 'False)) (lambda '(item state) (Coalesce state (Just item))))))
+))@@;
+
+select * from (
+ reduce concat(@skv1v2, @skv1v2) presort value1, value2 on key, subkey using $udf(value1) --YtReduce
+) order by key, summ;
+
+select * from (
+ reduce @ksv1v2 presort value2, value1 on key, subkey using $udf(value1) --YtMapReduce
+) order by key, summ;
+
+select * from (
+ reduce concat(@skv1v2, @skv2v1) presort value1, value2 on key, subkey using $udf(value1) --YtMapReduce
+) order by key, summ;
+
+select * from (
+ reduce concat(@skv1v2, @ksv1v2) presort value1, value2 on key, subkey using $udf(value1) --YtMapReduce
+) order by key, summ;
diff --git a/yql/essentials/tests/sql/suites/produce/reduce_with_python.sql b/yql/essentials/tests/sql/suites/produce/reduce_with_python.sql
new file mode 100644
index 0000000000..6bdf0efb24
--- /dev/null
+++ b/yql/essentials/tests/sql/suites/produce/reduce_with_python.sql
@@ -0,0 +1,16 @@
+/* postgres can not */
+/* syntax version 1 */
+USE plato;
+
+$udfScript = @@
+import functools
+def Len(key, input):
+ return {"value":functools.reduce(lambda x,y: x + 1, input, 0)}
+@@;
+
+$udf = Python::Len(Callable<(String, Stream<String>)->Struct<value:Uint32>>, $udfScript);
+
+--INSERT INTO Output
+$res = (REDUCE Input1 ON key USING $udf(value));
+
+select * from $res order by value;
diff --git a/yql/essentials/tests/sql/suites/produce/reduce_with_python_few_keys.sql b/yql/essentials/tests/sql/suites/produce/reduce_with_python_few_keys.sql
new file mode 100644
index 0000000000..346c977f16
--- /dev/null
+++ b/yql/essentials/tests/sql/suites/produce/reduce_with_python_few_keys.sql
@@ -0,0 +1,15 @@
+/* postgres can not */
+/* syntax version 1 */
+$udfScript = @@
+import functools
+def Len(val_key, input):
+ return {"zuza": {val_key[0] + b"-" + str(val_key[1]).encode('utf-8'): functools.reduce(lambda x,y: x + 1, input, 0)}}
+@@;
+
+$udf = Python::Len(Callable<(Tuple<String,Uint32>, Stream<String>)->Struct<zuza:Dict<String, Uint32>>>, $udfScript);
+
+$data = (select Cast(value as uint32) ?? 0 as kk, value as ss, key as val from plato.Input1);
+
+$res = (reduce $data on val, kk using $udf(ss));
+
+select * from $res order by Yql::ToOptional(Yql::DictKeys(zuza));
diff --git a/yql/essentials/tests/sql/suites/produce/reduce_with_python_few_keys_stream.sql b/yql/essentials/tests/sql/suites/produce/reduce_with_python_few_keys_stream.sql
new file mode 100644
index 0000000000..aac5ae32b1
--- /dev/null
+++ b/yql/essentials/tests/sql/suites/produce/reduce_with_python_few_keys_stream.sql
@@ -0,0 +1,19 @@
+/* postgres can not */
+/* syntax version 1 */
+use plato;
+
+$udfScript = @@
+import functools
+
+def Len(val_key, input):
+ return {"zuza": {val_key[0] + b"-" + str(val_key[1]).encode('utf-8'): functools.reduce(lambda x, y: x + 1, input, 0)}}
+@@;
+
+$udf = Python3::Len(Callable<(Tuple<String,Uint32>, Stream<String>)->Struct<zuza:Dict<String, Uint32>>>, $udfScript);
+
+$data = (select Cast(value as uint32) ?? 0 as kk, value as ss, key as val from Input1);
+
+--insert into Output
+$res = (reduce $data on val, kk using $udf(ss));
+
+select * from $res order by DictKeys(zuza);
diff --git a/yql/essentials/tests/sql/suites/produce/reduce_with_python_filter_and_having.sql b/yql/essentials/tests/sql/suites/produce/reduce_with_python_filter_and_having.sql
new file mode 100644
index 0000000000..dc3beb4025
--- /dev/null
+++ b/yql/essentials/tests/sql/suites/produce/reduce_with_python_filter_and_having.sql
@@ -0,0 +1,14 @@
+/* postgres can not */
+/* syntax version 1 */
+USE plato;
+
+$udfScript = @@
+import functools
+def Len(key, input):
+ return {"total":functools.reduce(lambda x,y: x + 1, input, 0)}
+@@;
+
+$udf = Python::Len(Callable<(String, Stream<String>)->Struct<total:Uint32>>, $udfScript);
+
+--INSERT INTO Output
+REDUCE Input1 ON key USING $udf(value) WHERE cast(value as int) > 1 HAVING total > 3;
diff --git a/yql/essentials/tests/sql/suites/produce/reduce_with_python_having.sql b/yql/essentials/tests/sql/suites/produce/reduce_with_python_having.sql
new file mode 100644
index 0000000000..b226e6f399
--- /dev/null
+++ b/yql/essentials/tests/sql/suites/produce/reduce_with_python_having.sql
@@ -0,0 +1,14 @@
+/* postgres can not */
+/* syntax version 1 */
+USE plato;
+
+$udfScript = @@
+import functools
+def Len(key, input):
+ return {"count":functools.reduce(lambda x,y: x + 1, input, 0)}
+@@;
+
+$udf = Python::Len(Callable<(String, Stream<String>)->Struct<count:Uint32>>, $udfScript);
+
+--INSERT INTO Output
+REDUCE Input1 ON key USING $udf(value) HAVING count > 4;
diff --git a/yql/essentials/tests/sql/suites/produce/reduce_with_python_input_stream._sql b/yql/essentials/tests/sql/suites/produce/reduce_with_python_input_stream._sql
new file mode 100644
index 0000000000..f244cd5c0d
--- /dev/null
+++ b/yql/essentials/tests/sql/suites/produce/reduce_with_python_input_stream._sql
@@ -0,0 +1,14 @@
+/* postgres can not */
+USE plato;
+
+$udfScript = @@
+import functools
+def Len(key, input):
+ return {"value":functools.reduce(lambda x,y: x + 1, input, 0)}
+@@;
+
+$udf = Python::Len(Callable<(String, Stream<String>)->Struct<value:Uint32>>, $udfScript);
+
+$res = (REDUCE Input1 ON key USING $udf(value));
+
+select * from $res order by value;
diff --git a/yql/essentials/tests/sql/suites/produce/reduce_with_python_input_stream.cfg b/yql/essentials/tests/sql/suites/produce/reduce_with_python_input_stream.cfg
new file mode 100644
index 0000000000..b16f832837
--- /dev/null
+++ b/yql/essentials/tests/sql/suites/produce/reduce_with_python_input_stream.cfg
@@ -0,0 +1,2 @@
+in Input1 input1.txt
+udf python3_udf
diff --git a/yql/essentials/tests/sql/suites/produce/reduce_with_python_presort.sql b/yql/essentials/tests/sql/suites/produce/reduce_with_python_presort.sql
new file mode 100644
index 0000000000..fa55b8bce2
--- /dev/null
+++ b/yql/essentials/tests/sql/suites/produce/reduce_with_python_presort.sql
@@ -0,0 +1,15 @@
+/* postgres can not */
+/* syntax version 1 */
+USE plato;
+
+$udfScript = @@
+def Len(val_key, input):
+ return {"joined": {val_key: b", ".join(input)}}
+@@;
+
+$udf = Python3::Len(Callable<(String, Stream<String>)->Struct<joined:Dict<String, String>>>, $udfScript);
+
+--INSERT INTO Output
+$res = (REDUCE Input1 PRESORT value DESC ON key USING $udf(subkey));
+
+select * from $res order by Yql::ToOptional(Yql::DictKeys(joined));
diff --git a/yql/essentials/tests/sql/suites/produce/reduce_with_python_presort_stream.sql b/yql/essentials/tests/sql/suites/produce/reduce_with_python_presort_stream.sql
new file mode 100644
index 0000000000..276c70136c
--- /dev/null
+++ b/yql/essentials/tests/sql/suites/produce/reduce_with_python_presort_stream.sql
@@ -0,0 +1,15 @@
+/* postgres can not */
+/* syntax version 1 */
+USE plato;
+
+$udfScript = @@
+def Len(val_key, input):
+ return {"joined": {val_key: b", ".join(input)}}
+@@;
+
+$udf = Python::Len(Callable<(String, Stream<String>)->Struct<joined:Dict<String, String>>>, $udfScript);
+
+--INSERT INTO Output
+$res = (REDUCE Input1 PRESORT value DESC ON key USING $udf(subkey));
+
+select * from $res order by DictKeys(joined);
diff --git a/yql/essentials/tests/sql/suites/produce/reduce_with_python_row.sql b/yql/essentials/tests/sql/suites/produce/reduce_with_python_row.sql
new file mode 100644
index 0000000000..6d3371d649
--- /dev/null
+++ b/yql/essentials/tests/sql/suites/produce/reduce_with_python_row.sql
@@ -0,0 +1,17 @@
+/* syntax version 1 */
+/* postgres can not */
+USE plato;
+
+$udfScript = @@
+import functools
+
+def Len(key, input):
+ return {"sumByVal": functools.reduce(lambda x,y: x + int(y.value), input, 0)}
+@@;
+
+$udf = Python3::Len(Callable<(String, Stream<Struct<key:String,subkey:String,value:String>>)->Struct<sumByVal:Uint32>>, $udfScript);
+
+--INSERT INTO Output
+$res = (REDUCE Input1 ON key USING $udf(TableRow()));
+
+select * from $res order by sumByVal;
diff --git a/yql/essentials/tests/sql/suites/produce/reduce_with_python_row_repack.sql b/yql/essentials/tests/sql/suites/produce/reduce_with_python_row_repack.sql
new file mode 100644
index 0000000000..1eba13ec1f
--- /dev/null
+++ b/yql/essentials/tests/sql/suites/produce/reduce_with_python_row_repack.sql
@@ -0,0 +1,16 @@
+/* syntax version 1 */
+/* postgres can not */
+USE plato;
+
+$udfScript = @@
+import functools
+def Len(key, input):
+ return {"sumByValAndKeyLen":functools.reduce(lambda x,y: x + int(y.value) + len(y.key), input, 0)}
+@@;
+
+$udf = Python::Len(Callable<(String, Stream<Struct<key:String,value:String>>)->Struct<sumByValAndKeyLen:Uint32>>, $udfScript);
+
+--INSERT INTO Output
+$res = (REDUCE Input1 ON key USING $udf(AsStruct(TableRow().value as value, TableRow().subkey as key)));
+
+select * from $res order by sumByValAndKeyLen;
diff --git a/yql/essentials/tests/sql/suites/produce/sorted.txt b/yql/essentials/tests/sql/suites/produce/sorted.txt
new file mode 100644
index 0000000000..2ede97b886
--- /dev/null
+++ b/yql/essentials/tests/sql/suites/produce/sorted.txt
@@ -0,0 +1,6 @@
+{"key"="023";"subkey"="3";"value"="aaa"};
+{"key"="037";"subkey"="5";"value"="ddd"};
+{"key"="075";"subkey"="1";"value"="abc"};
+{"key"="150";"subkey"="1";"value"="aaa"};
+{"key"="150";"subkey"="3";"value"="iii"};
+{"key"="150";"subkey"="8";"value"="zzz"};
diff --git a/yql/essentials/tests/sql/suites/produce/sorted.txt.attr b/yql/essentials/tests/sql/suites/produce/sorted.txt.attr
new file mode 100644
index 0000000000..391c1a05f6
--- /dev/null
+++ b/yql/essentials/tests/sql/suites/produce/sorted.txt.attr
@@ -0,0 +1,11 @@
+{"_yql_row_spec"={
+ "Type"=["StructType";[
+ ["key";["DataType";"String"]];
+ ["subkey";["DataType";"String"]];
+ ["value";["DataType";"String"]]
+ ]];
+ "SortDirections"=[1;1;1;];
+ "SortedBy"=["key";"subkey";"value";];
+ "SortedByTypes"=[["DataType";"String";];["DataType";"String";];["DataType";"String";];];
+ "SortMembers"=["key";"subkey";"value";];
+}}
diff --git a/yql/essentials/tests/sql/suites/produce/sorted1.txt b/yql/essentials/tests/sql/suites/produce/sorted1.txt
new file mode 100644
index 0000000000..b214aab0d9
--- /dev/null
+++ b/yql/essentials/tests/sql/suites/produce/sorted1.txt
@@ -0,0 +1,10 @@
+{"key"="023";"subkey"="3";"value"="aaa"};
+{"key"="037";"subkey"="5";"value"="ddd"};
+{"key"="075";"subkey"="1";"value"="abc"};
+{"key"="150";"subkey"="1";"value"="aaa"};
+{"key"="150";"subkey"="3";"value"="iii"};
+{"key"="150";"subkey"="8";"value"="zzz"};
+{"key"="200";"subkey"="7";"value"="qqq"};
+{"key"="527";"subkey"="4";"value"="bbb"};
+{"key"="761";"subkey"="6";"value"="ccc"};
+{"key"="911";"subkey"="2";"value"="kkk"};
diff --git a/yql/essentials/tests/sql/suites/produce/sorted1.txt.attr b/yql/essentials/tests/sql/suites/produce/sorted1.txt.attr
new file mode 100644
index 0000000000..391c1a05f6
--- /dev/null
+++ b/yql/essentials/tests/sql/suites/produce/sorted1.txt.attr
@@ -0,0 +1,11 @@
+{"_yql_row_spec"={
+ "Type"=["StructType";[
+ ["key";["DataType";"String"]];
+ ["subkey";["DataType";"String"]];
+ ["value";["DataType";"String"]]
+ ]];
+ "SortDirections"=[1;1;1;];
+ "SortedBy"=["key";"subkey";"value";];
+ "SortedByTypes"=[["DataType";"String";];["DataType";"String";];["DataType";"String";];];
+ "SortMembers"=["key";"subkey";"value";];
+}}
diff --git a/yql/essentials/tests/sql/suites/produce/yql-10297.sql b/yql/essentials/tests/sql/suites/produce/yql-10297.sql
new file mode 100644
index 0000000000..9622221e15
--- /dev/null
+++ b/yql/essentials/tests/sql/suites/produce/yql-10297.sql
@@ -0,0 +1,30 @@
+/* syntax version 1 */
+/* postgres can not */
+DEFINE SUBQUERY $t() AS
+ select * from as_table([<|key:"0"|>, <|key:"1"|>]);
+END DEFINE;
+
+DEFINE SUBQUERY $split_formula_log($in) AS
+ $parition = ($row) -> {
+ $recordType = TypeOf($row);
+ $varType = VariantType(TupleType($recordType,
+ $recordType));
+ RETURN case
+ when $row.key = "0" then
+ Variant($row, "0", $varType)
+
+ when $row.key = "1" then
+ Variant($row, "1", $varType)
+ else null
+ end
+ ;
+ };
+
+ PROCESS $in() USING $parition(TableRow());
+END DEFINE;
+
+
+$a, $b = (PROCESS $split_formula_log($t));
+select * from $a;
+select * from $b;
+