blob: 0a15c6095a7783a61887ce51248215a517b39d19 (
plain) (
blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
|
(
#comment
(let udfTDigest_Create (Udf 'Stat.TDigest_Create))
(let udfTDigest_AddValue (Udf 'Stat.TDigest_AddValue))
(let udfTDigest_GetPercentile (Udf 'Stat.TDigest_GetPercentile))
(let udfTDigest_Serialize (Udf 'Stat.TDigest_Serialize))
(let udfTDigest_Deserialize (Udf 'Stat.TDigest_Deserialize))
(let udfTDigest_Merge (Udf 'Stat.TDigest_Merge))
(let mr_source (DataSource 'yt 'plato))
(let x (Read! world mr_source (Key '('table (String 'Input))) '('key 'subkey 'value) '()))
(let world (Left! x))
(let table1 (Right! x))
(let preMap (lambda '(item) (Just item)))
(let keyExtractor (lambda '(item) (Member item 'key)))
(let init (lambda '(key item) (Apply udfTDigest_Create (Unwrap (FromString (Member item 'value) 'Double)))))
(let update (lambda '(key item state) (Apply udfTDigest_AddValue state (Unwrap (FromString (Member item 'value) 'Double)))))
(let finish (lambda '(key state) (block '(
(let s (Struct))
(let s (AddMember s 'key key))
(let s (AddMember s 'subkey (String '.)))
(let s (AddMember s 'value (Apply udfTDigest_Serialize state)))
(return (Just s))
))))
(let table2 (CombineByKey table1 preMap keyExtractor init update finish))
(let listHandler (lambda '(stream) (FlatMap stream (lambda '(pair) (block '(
(let key (Nth pair '0))
(let list (ForwardList (Nth pair '1)))
(let init (lambda '(item) (Apply udfTDigest_Deserialize (Member item 'value))))
(let update (lambda '(item state) (Apply udfTDigest_Merge (Apply udfTDigest_Deserialize (Member item 'value)) state)))
(let state (Unwrap (Fold1 list init update)))
(let s (Struct))
(let s (AddMember s 'key key))
(let s (AddMember s 'subkey (String '.)))
(let g1 (ToString (Apply udfTDigest_GetPercentile state (Double '0.75))))
(let g2 (ToString (Apply udfTDigest_GetPercentile state (Double '0.999))))
(let s (AddMember s 'value (Concat (Concat g1 (String '" ")) g2)))
(let ret (Just s))
(return ret)
))))))
(let table3 (PartitionByKey table2 keyExtractor (Void) (Void) listHandler))
(let mr_sink (DataSink 'yt 'plato))
(let world (Write! world mr_sink (Key '('table (String 'Output))) table3 '('('mode 'append))))
(let world (Commit! world mr_sink))
(return world)
)
|