diff options
6 files changed, 44 insertions, 1 deletions
diff --git a/ydb/library/yql/core/yql_opt_utils.cpp b/ydb/library/yql/core/yql_opt_utils.cpp index d4b94fe4e3d..af74c7329bf 100644 --- a/ydb/library/yql/core/yql_opt_utils.cpp +++ b/ydb/library/yql/core/yql_opt_utils.cpp @@ -1813,7 +1813,8 @@ TExprNode::TPtr FindNonYieldTransparentNodeImpl(const TExprNode::TPtr& root, con || TCoForwardList::Match(node.Get()) || TCoApply::Match(node.Get()) || TCoSwitch::Match(node.Get()) - || node->IsCallable("DqReplicate"); + || node->IsCallable("DqReplicate") + || TCoPartitionsByKeys::Match(node.Get()); } ); @@ -1851,6 +1852,11 @@ TExprNode::TPtr FindNonYieldTransparentNodeImpl(const TExprNode::TPtr& root, con return node; } } + } else if (TCoPartitionsByKeys::Match(candidate.Get())) { + const auto handlerChild = candidate->Child(TCoPartitionsByKeys::idx_ListHandlerLambda); + if (auto node = FindNonYieldTransparentNodeImpl(handlerChild->TailPtr(), udfSupportsYield, TNodeSet{&handlerChild->Head().Head()})) { + return node; + } } } return {}; diff --git a/ydb/library/yql/tests/sql/suites/produce/reduce_all_with_python_input_stream-dq_fail.cfg b/ydb/library/yql/tests/sql/suites/produce/reduce_all_with_python_input_stream-dq_fail.cfg new file mode 100644 index 00000000000..c426164726c --- /dev/null +++ b/ydb/library/yql/tests/sql/suites/produce/reduce_all_with_python_input_stream-dq_fail.cfg @@ -0,0 +1,4 @@ +xfail +in Input1 input1.txt +udf python3_udf +providers dq diff --git a/ydb/library/yql/tests/sql/suites/produce/reduce_all_with_python_input_stream._sql b/ydb/library/yql/tests/sql/suites/produce/reduce_all_with_python_input_stream._sql new file mode 100644 index 00000000000..82b3767cc50 --- /dev/null +++ b/ydb/library/yql/tests/sql/suites/produce/reduce_all_with_python_input_stream._sql @@ -0,0 +1,14 @@ +/* postgres can not */ +USE plato; + +$udfScript = @@ +import functools +def Len(stream): + sums = [functools.reduce(lambda x,y: x + y, pair[1], 0) for pair in stream] + return {"sumByAllVal":functools.reduce(lambda x,y: x + y, sums, 0)} +@@; + +$udf = Python3::Len(Callable<(Stream<Tuple<String,Stream<Uint32>>>)->Struct<sumByAllVal:Uint32>>, $udfScript); + +--INSERT INTO Output +REDUCE Input1 ON key USING ALL $udf(cast(value as uint32) ?? 0); diff --git a/ydb/library/yql/tests/sql/suites/produce/reduce_all_with_python_input_stream.cfg b/ydb/library/yql/tests/sql/suites/produce/reduce_all_with_python_input_stream.cfg new file mode 100644 index 00000000000..13bb8734c43 --- /dev/null +++ b/ydb/library/yql/tests/sql/suites/produce/reduce_all_with_python_input_stream.cfg @@ -0,0 +1,3 @@ +in Input1 input1.txt +udf python3_udf +providers yt diff --git a/ydb/library/yql/tests/sql/suites/produce/reduce_with_python_input_stream._sql b/ydb/library/yql/tests/sql/suites/produce/reduce_with_python_input_stream._sql new file mode 100644 index 00000000000..f244cd5c0d8 --- /dev/null +++ b/ydb/library/yql/tests/sql/suites/produce/reduce_with_python_input_stream._sql @@ -0,0 +1,14 @@ +/* postgres can not */ +USE plato; + +$udfScript = @@ +import functools +def Len(key, input): + return {"value":functools.reduce(lambda x,y: x + 1, input, 0)} +@@; + +$udf = Python::Len(Callable<(String, Stream<String>)->Struct<value:Uint32>>, $udfScript); + +$res = (REDUCE Input1 ON key USING $udf(value)); + +select * from $res order by value; diff --git a/ydb/library/yql/tests/sql/suites/produce/reduce_with_python_input_stream.cfg b/ydb/library/yql/tests/sql/suites/produce/reduce_with_python_input_stream.cfg new file mode 100644 index 00000000000..b16f832837f --- /dev/null +++ b/ydb/library/yql/tests/sql/suites/produce/reduce_with_python_input_stream.cfg @@ -0,0 +1,2 @@ +in Input1 input1.txt +udf python3_udf |
