aboutsummaryrefslogtreecommitdiffstats
path: root/yql/essentials/tests/s-expressions/suites/InMem/StreamInCombineByKey.yql
blob: e58ebc251e5307eb0f29f2685a95709eb56bf6a8 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
(
#comment
(let config (DataSource 'config))
(let world (Configure! world config 'PureDataSource 'yt))

(let res_sink (DataSink 'result))
(let list (AsList '((Uint32 '1) (String 'a)) '((Uint32 '1) (String 'b)) '((Uint32 '2) (String 'c))))
(let premap (lambda '(x) (Iterator (AsList x))))
(let keyExtractor (lambda '(x) (Nth x '0)))
(let handlerInit (lambda '(key item) (Nth item '1)))
(let handlerUpdate (lambda '(key item state) (Concat (Nth item '1) state)))
(let handlerFinish (lambda '(key state) (Iterator (AsList '(key state)))))
(let world (Write! world res_sink (Key) (CombineByKey list premap keyExtractor handlerInit handlerUpdate handlerFinish) '('('type))))
(let world (Commit! world res_sink))
(return world)
)