1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
|
# PROCESS
Преобразовать входную таблицу с помощью UDF или [лямбда функции](expressions.md#lambda), которая применяется последовательно к каждой строке входа и имеет возможность для каждой строки входа создать ноль, одну или несколько строк результата (аналог Map в терминах MapReduce).
Таблица по имени ищется в базе данных, заданной оператором [USE](use.md).
В параметрах вызова функции после ключевого слова `USING` явно указывается, значения из каких колонок и в каком порядке передавать для каждой строки входа.
Допустимы функции, которые возвращают результат одного из трех составных типов от `OutputType` (возможные варианты `OutputType` описаны ниже):
* `OutputType` — на каждую строку входа всегда должна быть строка выхода, схема которой определяется типом структуры.
* `OutputType?` — функции оставляет за собой право пропускать строки, возвращая пустые значения (`TUnboxedValue()` в C++, `None` в Python или `null` в JavaScript).
* `Stream<OutputType>` или `List<OutputType>` — возможность вернуть несколько строк.
Вне зависимости от того, какой вариант из перечисленных выше трех выбран, результат преобразовывается к плоской таблице с колонками, определяемыми типом `OutputType`.
В качестве `OutputType` может выступать один из типов:
* `Struct<...>` — у `PROCESS` будет ровно один выход с записями заданной структуры, представляющий собой плоскую таблицу с колонками соответствующими полям `Struct<...>`
* `Variant<Struct<...>,...>` — у `PROCESS` число выходов будет равно числу альтернатив в `Variant`. Записи каждого выхода представлены плоской таблицей с колонками по полям из соответствующей альтернативы. Ко множеству выходов `PROCESS` в этом случае можно обратиться как к кортежу (`Tuple`) списков, который можно распаковать в отдельные [именованные выражения](expressions.md#named-nodes) и использовать независимо.
В списке аргументов функции после ключевого слова `USING` можно передать одно из двух специальных именованных выражений:
* `TableRow()` — текущая строка целиком в виде структуры;
* `TableRows()` — ленивый итератор по строкам, с точки зрения типов — `Stream<Struct<...>>`. В этом случае выходным типом функции может быть только `Stream<OutputType>` или `List<OutputType>`.
{% note info "Примечание" %}
После выполнения `PROCESS` в рамках того же запроса по результирующей таблице (или таблицам) можно выполнить [SELECT](select/index.md), [REDUCE](reduce.md), ещё один `PROCESS` и так далее в зависимости от необходимого результата.
{% endnote %}
Ключевое слово `USING` и указание функции необязательны: если они не указаны, то возвращается исходная таблица. Это может быть удобно для применения [шаблона подзапроса](subquery.md).
В `PROCESS` можно передать несколько входов (под входом здесь подразумевается таблица, [диапазон таблиц](select/concat.md), подзапрос, [именованное выражение](expressions.md#named-nodes)), разделенных запятой. В функцию из `USING` в этом случае можно передать только специальные именованные выражения `TableRow()` или `TableRows()`, которые будут иметь следующий тип:
* `TableRow()` — альтернатива (`Variant`), где каждый элемент имеет тип структуры записи из соответствущего входа. Для каждой входной строки в альтернативе заполнен элемент, соответствущий номеру входа этой строки
* `TableRows()` — ленивый итератор по альтернативам, с точки зрения типов — `Stream<Variant<...>>`. Альтернатива имеет такую же семантику, что и для `TableRow()`
После `USING` в `PROCESS` можно опционально указать `ASSUME ORDER BY` со списком столбцов. Результат такого `PROCESS` будет считаться сортированным, но без выполнения фактической сортировки. Проверка сортированности осуществляется на этапе исполнения запроса. Поддерживается задание порядка сортировки с помощью ключевых слов `ASC` (по возрастанию) и `DESC` (по убыванию). Выражения в `ASSUME ORDER BY` не поддерживается.
## Примеры
```yql
PROCESS my_table
USING MyUdf::MyProcessor(value)
```
```yql
$udfScript = @@
def MyFunc(my_list):
return [(int(x.key) % 2, x) for x in my_list]
@@;
-- Функция возвращает итератор альтернатив
$udf = Python3::MyFunc(Callable<(Stream<Struct<...>>) -> Stream<Variant<Struct<...>, Struct<...>>>>,
$udfScript
);
-- На выходе из PROCESS получаем кортеж списков
$i, $j = (PROCESS my_table USING $udf(TableRows()));
SELECT * FROM $i;
SELECT * FROM $j;
```
```yql
$udfScript = @@
def MyFunc(stream):
for r in stream:
yield {"alt": r[0], "key": r[1].key}
@@;
-- Функция принимает на вход итератор альтернатив
$udf = Python::MyFunc(Callable<(Stream<Variant<Struct<...>, Struct<...>>>) -> Stream<Struct<...>>>,
$udfScript
);
PROCESS my_table1, my_table2 USING $udf(TableRows());
```
|