blob: f6b50489a6b0fcef437be50c93bd7897aafef3fa (
plain) (
blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
|
(
(import aggregate_module '"/lib/yql/aggregate.yql")
(let world (Configure! world (DataSource '"yt" '"$all") '"Attr" '"joinmergetableslimit" '"2"))
(let world (Configure! world (DataSource '"yt" '"$all") '"Attr" '"joinmergeunsortedfactor" '"0.8"))
(let world (block '(
(let x (Read! world (DataSource '"yt" '"plato") (MrTableConcat (Key '('table (String '"Input1")))) '('"key") '()))
(let world (Left! x))
(let table1 (Right! x))
(let x (Read! world (DataSource '"yt" '"plato") (MrTableConcat (Key '('table (String '"Input2")))) '('"key") '()))
(let world (Left! x))
(let table2 (Right! x))
(let output (block '(
(let select (block '(
(let core (EquiJoin '(table1 '"L") '(table2 '"R") '('Left '"L" '"R" '('"L" '"key") '('"R" '"key") '()) '()))
(let core (FlatMap core (lambda '(row) (AsList (AsStruct '('"L.key" (Member row '"L.key")) '('"R.key" (Member row '"R.key")) '('"IsJoined" ("!=" ("Not" ("Exists" (SqlColumn row '"key" '"R"))) (Bool '"true"))))))))
(let core (Aggregate core '('"IsJoined") '('('Count0 (Apply (bind aggregate_module '"count_traits_factory") (TypeOf core) (lambda '(row) (SqlColumn row '"key" '"L")))))))
(let core (FlatMap core (lambda '(row) (block '(
(let res (AsStruct '('"IsJoined" (Member row '"IsJoined")) '('"column1" (Member row 'Count0))))
(let res (AsList res))
(return res)
)))))
(return core)
)))
(return select)
)))
(let world (block '(
(let result_sink (DataSink 'result))
(let world (Write! world result_sink (Key) output '('('type) '('autoref) '('columns '('"IsJoined" '"column1")))))
(return (Commit! world result_sink))
)))
(return world)
)))
(let world (block '(
(let plato_sink (DataSink '"yt" '"plato"))
(let world (Commit! world plato_sink))
(return world)
)))
(return world)
)
|