summaryrefslogtreecommitdiffstats
path: root/yt/yql/tests/sql/suites/produce/reduce_with_flat_python_stream.yql
blob: 8d164a65c9a46c6a255daaecb9925b2a12d5bd78 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
/* syntax version 1 */
/* kikimr can not */
USE plato;

$udfScript = @@
def f(input):
   s = []
   last_key = None
   for i in input:
      if last_key is not None and last_key != i.key:
        s = []
      s.append(i.value)
      last_key = i.key
      yield {
        'key': i.key,
        'subkey1': i.subkey,
        'value': b''.join(s),
      }
@@;

$udf_stream = Python3::f(
Callable<
    (Stream<Struct<key:String,subkey:String,value:String>>)
    ->
    Stream<Struct<key:String,subkey1:String,value:String>>
>, $udfScript);

$res = REDUCE Input0 PRESORT value ON key using all $udf_stream(TableRows()) ;

select * from $res order by key, value;