summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--ydb/library/yql/core/yql_opt_utils.cpp8
-rw-r--r--ydb/library/yql/tests/sql/suites/produce/reduce_all_with_python_input_stream-dq_fail.cfg4
-rw-r--r--ydb/library/yql/tests/sql/suites/produce/reduce_all_with_python_input_stream._sql14
-rw-r--r--ydb/library/yql/tests/sql/suites/produce/reduce_all_with_python_input_stream.cfg3
-rw-r--r--ydb/library/yql/tests/sql/suites/produce/reduce_with_python_input_stream._sql14
-rw-r--r--ydb/library/yql/tests/sql/suites/produce/reduce_with_python_input_stream.cfg2
6 files changed, 44 insertions, 1 deletions
diff --git a/ydb/library/yql/core/yql_opt_utils.cpp b/ydb/library/yql/core/yql_opt_utils.cpp
index d4b94fe4e3d..af74c7329bf 100644
--- a/ydb/library/yql/core/yql_opt_utils.cpp
+++ b/ydb/library/yql/core/yql_opt_utils.cpp
@@ -1813,7 +1813,8 @@ TExprNode::TPtr FindNonYieldTransparentNodeImpl(const TExprNode::TPtr& root, con
|| TCoForwardList::Match(node.Get())
|| TCoApply::Match(node.Get())
|| TCoSwitch::Match(node.Get())
- || node->IsCallable("DqReplicate");
+ || node->IsCallable("DqReplicate")
+ || TCoPartitionsByKeys::Match(node.Get());
}
);
@@ -1851,6 +1852,11 @@ TExprNode::TPtr FindNonYieldTransparentNodeImpl(const TExprNode::TPtr& root, con
return node;
}
}
+ } else if (TCoPartitionsByKeys::Match(candidate.Get())) {
+ const auto handlerChild = candidate->Child(TCoPartitionsByKeys::idx_ListHandlerLambda);
+ if (auto node = FindNonYieldTransparentNodeImpl(handlerChild->TailPtr(), udfSupportsYield, TNodeSet{&handlerChild->Head().Head()})) {
+ return node;
+ }
}
}
return {};
diff --git a/ydb/library/yql/tests/sql/suites/produce/reduce_all_with_python_input_stream-dq_fail.cfg b/ydb/library/yql/tests/sql/suites/produce/reduce_all_with_python_input_stream-dq_fail.cfg
new file mode 100644
index 00000000000..c426164726c
--- /dev/null
+++ b/ydb/library/yql/tests/sql/suites/produce/reduce_all_with_python_input_stream-dq_fail.cfg
@@ -0,0 +1,4 @@
+xfail
+in Input1 input1.txt
+udf python3_udf
+providers dq
diff --git a/ydb/library/yql/tests/sql/suites/produce/reduce_all_with_python_input_stream._sql b/ydb/library/yql/tests/sql/suites/produce/reduce_all_with_python_input_stream._sql
new file mode 100644
index 00000000000..82b3767cc50
--- /dev/null
+++ b/ydb/library/yql/tests/sql/suites/produce/reduce_all_with_python_input_stream._sql
@@ -0,0 +1,14 @@
+/* postgres can not */
+USE plato;
+
+$udfScript = @@
+import functools
+def Len(stream):
+ sums = [functools.reduce(lambda x,y: x + y, pair[1], 0) for pair in stream]
+ return {"sumByAllVal":functools.reduce(lambda x,y: x + y, sums, 0)}
+@@;
+
+$udf = Python3::Len(Callable<(Stream<Tuple<String,Stream<Uint32>>>)->Struct<sumByAllVal:Uint32>>, $udfScript);
+
+--INSERT INTO Output
+REDUCE Input1 ON key USING ALL $udf(cast(value as uint32) ?? 0);
diff --git a/ydb/library/yql/tests/sql/suites/produce/reduce_all_with_python_input_stream.cfg b/ydb/library/yql/tests/sql/suites/produce/reduce_all_with_python_input_stream.cfg
new file mode 100644
index 00000000000..13bb8734c43
--- /dev/null
+++ b/ydb/library/yql/tests/sql/suites/produce/reduce_all_with_python_input_stream.cfg
@@ -0,0 +1,3 @@
+in Input1 input1.txt
+udf python3_udf
+providers yt
diff --git a/ydb/library/yql/tests/sql/suites/produce/reduce_with_python_input_stream._sql b/ydb/library/yql/tests/sql/suites/produce/reduce_with_python_input_stream._sql
new file mode 100644
index 00000000000..f244cd5c0d8
--- /dev/null
+++ b/ydb/library/yql/tests/sql/suites/produce/reduce_with_python_input_stream._sql
@@ -0,0 +1,14 @@
+/* postgres can not */
+USE plato;
+
+$udfScript = @@
+import functools
+def Len(key, input):
+ return {"value":functools.reduce(lambda x,y: x + 1, input, 0)}
+@@;
+
+$udf = Python::Len(Callable<(String, Stream<String>)->Struct<value:Uint32>>, $udfScript);
+
+$res = (REDUCE Input1 ON key USING $udf(value));
+
+select * from $res order by value;
diff --git a/ydb/library/yql/tests/sql/suites/produce/reduce_with_python_input_stream.cfg b/ydb/library/yql/tests/sql/suites/produce/reduce_with_python_input_stream.cfg
new file mode 100644
index 00000000000..b16f832837f
--- /dev/null
+++ b/ydb/library/yql/tests/sql/suites/produce/reduce_with_python_input_stream.cfg
@@ -0,0 +1,2 @@
+in Input1 input1.txt
+udf python3_udf