blob: bcb27c7be3edf313ea75b95eebec8e672059195e (
plain) (
blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
|
(
#comment
(let mr_source (DataSource 'yt 'plato))
(let mr_sink (DataSink 'yt 'plato))
(let x (Read! world mr_source (Key '('table (String 'Input2))) (Void) '()))
(let world (Left! x))
(let data (Right! x))
(let varTuple (VariantType (TupleType
(StructType
'('key (DataType 'String))
'('subkey (DataType 'String))
'('value (DataType 'String))
)
(StructType
'('key (DataType 'String))
'('subkey (DataType 'String))
'('value (DataType 'String))
)
)))
(let keySelector (lambda '(x) (Member x 'key)))
(let listHandler (lambda '(groups) (block '(
(return (Map groups (lambda '(group) (block '(
(let key (Nth group '0))
(let stream (Nth group '1))
(let s (Struct))
(let s (AddMember s 'key key))
(let s (AddMember s 'subkey (String '.)))
(let s (AddMember s 'value (ToString (Length (ForwardList stream)))))
(let intValue (FromString key 'Int32))
(let res
(If (Coalesce (Equal (% intValue (Int32 '2)) (Int32 '0)) (Bool 'false))
(Variant s '0 varTuple)
(Variant s '1 varTuple)
)
)
(return res)
)))))
))))
(let data (PartitionByKey data keySelector (Void) (Void) listHandler))
(let dataTuple (Demux data))
(let data1 (Nth dataTuple '0))
(let data2 (Nth dataTuple '1))
(let world (Write! world mr_sink (Key '('table (String 'Output1))) data1 '('('mode 'renew))))
(let world (Write! world mr_sink (Key '('table (String 'Output2))) data2 '('('mode 'renew))))
(let world (Commit! world mr_sink))
(return world)
)
|