aboutsummaryrefslogtreecommitdiffstats
path: root/yql/essentials/tests/common/test_framework/test_file_common.py
blob: 37d008759f0805a3b4ea319807b3e5f21955f45d (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
import codecs
import os
import pytest
import re
import cyson

import yql.essentials.providers.common.proto.gateways_config_pb2 as gateways_config_pb2

from google.protobuf import text_format
from yql_utils import execute, get_supported_providers, get_tables, get_files, get_http_files, \
    get_pragmas, KSV_ATTR, is_xfail, get_param, YQLExecResult, yql_binary_path, do_custom_error_check
from yqlrun import YQLRun

from test_utils import get_parameters_json, replace_vars
# FIXME dq usage
from test_utils import DATA_PATH


def get_gateways_config(http_files, yql_http_file_server, force_blocks=False, is_hybrid=False, allow_llvm=True):
    config = None

    if http_files or force_blocks or is_hybrid or not allow_llvm:
        config_message = gateways_config_pb2.TGatewaysConfig()
        if http_files:
            schema = config_message.Fs.CustomSchemes.add()
            schema.Pattern = 'http_test://(.*)'
            schema.TargetUrl = yql_http_file_server.compose_http_link('$1')
        if force_blocks:
            config_message.SqlCore.TranslationFlags.extend(['EmitAggApply'])
            flags = config_message.YqlCore.Flags.add()
            flags.Name = 'UseBlocks'
        if is_hybrid:
            activate_hybrid = config_message.Yt.DefaultSettings.add()
            activate_hybrid.Name = "HybridDqExecution"
            activate_hybrid.Value = "1"
            deactivate_dq = config_message.Dq.DefaultSettings.add()
            deactivate_dq.Name = "AnalyzeQuery"
            deactivate_dq.Value = "0"
        if not allow_llvm:
            flags = config_message.YqlCore.Flags.add()
            flags.Name = 'LLVM_OFF'
        config = text_format.MessageToString(config_message)

    return config


def is_hybrid(provider):
    return provider == 'hybrid'


def check_provider(provider, config):
    if provider == 'pure':
        return

    if provider not in get_supported_providers(config):
        pytest.skip('%s provider is not supported here' % provider)


# FIXME make data_path required (dq usage)
def get_sql_query(provider, suite, case, config, data_path=None, template='.sql'):
    if data_path is None:
        data_path = DATA_PATH
    pragmas = get_pragmas(config)

    if get_param('TARGET_PLATFORM'):
        if "yson" in case or "regexp" in case or "match" in case:
            pytest.skip('yson/match/regexp is not supported on non-default target platform')

    if get_param('TARGET_PLATFORM') and is_xfail(config):
        pytest.skip('xfail is not supported on non-default target platform')

    program_sql = os.path.join(data_path, suite, '%s%s' % (case, template))

    with codecs.open(program_sql, encoding='utf-8') as program_file_descr:
        sql_query = program_file_descr.read()
        if get_param('TARGET_PLATFORM'):
            if "Yson::" in sql_query:
                pytest.skip('yson udf is not supported on non-default target platform')
        if (provider + 'file can not' in sql_query) or (is_hybrid(provider) and ('ytfile can not' in sql_query)):
            pytest.skip(provider + ' can not execute this')

    pragmas.append(sql_query)
    sql_query = ';\n'.join(pragmas)
    if provider != 'yt' and provider != 'pure' and 'Javascript' in sql_query:
        pytest.skip('ScriptUdf')

    assert 'UseBlocks' not in sql_query, 'UseBlocks should not be used directly, only via ForceBlocks'

    return sql_query


def run_file_no_cache(provider, suite, case, cfg, config, yql_http_file_server,
                      yqlrun_binary=None, extra_args=[], force_blocks=False, allow_llvm=True, data_path=None,
                      run_sql=True):
    check_provider(provider, config)
    # FIXME dq usage
    if data_path is None:
        data_path = DATA_PATH

    sql_query = get_sql_query(provider, suite, case, config, data_path, template='.sql' if run_sql else '.yqls')
    sql_query = replace_vars(sql_query, "yqlrun_var")

    xfail = is_xfail(config)

    in_tables, out_tables = get_tables(suite, config, data_path, def_attr=KSV_ATTR) if provider != 'pure' else (None, None)
    files = get_files(suite, config, data_path)
    http_files = get_http_files(suite, config, data_path)
    http_files_urls = yql_http_file_server.register_files({}, http_files)

    if in_tables is not None:
        for table in in_tables:
            if cyson.loads(table.attr).get("type") == "document":
                content = table.content
            else:
                content = table.attr
            if provider != 'yt' and 'Javascript' in content:
                pytest.skip('ScriptUdf')

    parameters = get_parameters_json(suite, config, data_path)

    yqlrun = YQLRun(
        prov=provider,
        keep_temp=not re.search(r"yt\.ReleaseTempData", sql_query),
        binary=yqlrun_binary,
        gateway_config=get_gateways_config(http_files, yql_http_file_server, force_blocks=force_blocks, is_hybrid=is_hybrid(provider), allow_llvm=allow_llvm),
        extra_args=extra_args,
        udfs_dir=yql_binary_path('yql/essentials/tests/common/test_framework/udfs_deps')
    )

    res, tables_res = execute(
        yqlrun,
        program=sql_query,
        input_tables=in_tables,
        output_tables=out_tables,
        files=files,
        urls=http_files_urls,
        check_error=not xfail,
        run_sql=run_sql,
        verbose=True,
        parameters=parameters)

    fixed_stderr = res.std_err
    if xfail:
        assert res.execution_result.exit_code != 0
        do_custom_error_check(res, sql_query)
        fixed_stderr = None

    fixed_result = YQLExecResult(res.std_out,
                                 fixed_stderr,
                                 res.results,
                                 res.results_file,
                                 res.opt,
                                 res.opt_file,
                                 res.plan,
                                 res.plan_file,
                                 res.program,
                                 res.execution_result,
                                 None)

    return fixed_result, tables_res


def run_file(provider, suite, case, cfg, config, yql_http_file_server, yqlrun_binary=None,
             extra_args=[], force_blocks=False, allow_llvm=True, data_path=None, run_sql=True):
    if (suite, case, cfg) not in run_file.cache:
        run_file.cache[(suite, case, cfg)] = \
            run_file_no_cache(provider, suite, case, cfg, config, yql_http_file_server,
                              yqlrun_binary, extra_args, force_blocks=force_blocks, allow_llvm=allow_llvm,
                              data_path=data_path, run_sql=run_sql)

    return run_file.cache[(suite, case, cfg)]


run_file.cache = {}