blob: 5395d3e9f861cf3e577b27e5fcf90df34aec0354 (
plain) (
blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
|
(
(let structType (StructType '('"key" (DataType 'String)) '('"subkey" (DataType 'String)) '('"value" (DataType 'String))))
#comment
(let udfType (CallableType '() '((StreamType structType)) '((StreamType structType))))
(let script '@@
def f(input):
for i in input:
yield {
'key': i.key,
'subkey': i.subkey,
'value': i.value
}
@@)
(let udf (ScriptUdf 'Python3 '"f" udfType (String script)))
(let res (LMap (List (ListType structType)) (lambda '(stream) (block '(
(return (Apply udf stream))
)))))
(let mr_sink (DataSink 'yt (quote plato)))
(let world (Write! world mr_sink (Key '('table (String 'Output))) res '('('mode 'append))))
(let world (Commit! world mr_sink))
(return world)
)
|