blob: aeefafd75f6997887f18f079106854ae05ee2d4c (
plain) (
blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
|
(
#comment
(let mr_source (DataSource 'yt 'plato))
(let mr_sink (DataSink 'yt 'plato))
(let x (Read! world mr_source (Key '('table (String 'Input1))) (Void) '()))
(let world (Left! x))
(let data (Right! x))
(let varTuple (VariantType (TupleType
(StructType
'('key (DataType 'String))
'('subkey (DataType 'String))
'('value (DataType 'String))
)
(StructType
'('key (DataType 'String))
'('subkey (DataType 'String))
'('value (DataType 'String))
)
)))
(let data (LMap data (lambda '(stream) (block '(
(return (FlatMap stream (lambda '(item) (block '(
(let intValue (FromString (Member item 'key) 'Int32))
(let res
(If (Coalesce (Equal (% intValue (Int32 '2)) (Int32 '0)) (Bool 'false))
(Variant item '0 varTuple)
(Variant item '1 varTuple)
)
)
(return (Just res))
)))))
)))))
(let dataTuple (Demux data))
(let world (Write! world mr_sink (Key '('table (String 'Output1))) (Nth dataTuple '0) '('('mode 'renew))))
(let world (Write! world mr_sink (Key '('table (String 'Output2))) (Nth dataTuple '1) '('('mode 'renew))))
(let world (Commit! world mr_sink))
(return world)
)
|