blob: 90c0f8a2a7372979dd8b77f1cc703639e1705ada (
plain) (
blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
|
/* syntax version 1 */
-- not supported on windows
/* postgres can not */
$udfScript = @@
import collections;
def processRows(prefix, rowList, separator):
ResultRow = collections.namedtuple("ResultRow", ["Result"]);
result = [];
for row in rowList:
resultValue = prefix + row.Name + separator + row.Value;
resultItem = ResultRow(Result=resultValue);
result.append(resultItem);
return result;
@@;
$udf = Python::processRows(
Callable<(String, List<Struct<Name:String, Value:String>>, String)->List<Struct<Result:String>>>,
$udfScript
);
$data = (
SELECT key AS Name, value AS Value FROM plato.Input0
);
$prefix = ">>";
$p1 = (
PROCESS $data USING $udf($prefix, TableRows(), "=") WHERE Name != "foo"
);
$p2 = (
SELECT Result AS Data FROM $p1
);
$p3 = (
PROCESS $p2 USING Streaming::Process(TableRows(), "grep", AsList("180"))
);
$p4 = (
SELECT Data AS FinalResult FROM $p3
);
SELECT Avg(Length(FinalResult)) AS AvgResultLength FROM $p4;
|