aboutsummaryrefslogtreecommitdiffstats
path: root/yql/essentials/tests/common/test_framework/test_file_common.py
blob: 3fc5cfe0256a3f46bb6d848f504cdac76c43b00b (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
import codecs
import os
import pytest
import re
import cyson

import yql.essentials.providers.common.proto.gateways_config_pb2 as gateways_config_pb2

from google.protobuf import text_format
from yql_utils import execute_sql, get_supported_providers, get_tables, get_files, get_http_files, \
    get_pragmas, KSV_ATTR, is_xfail, get_param, YQLExecResult, yql_binary_path, do_custom_error_check
from yqlrun import YQLRun

from test_utils import get_parameters_json, DATA_PATH, replace_vars


def get_gateways_config(http_files, yql_http_file_server, force_blocks=False, is_hybrid=False):
    config = None

    if http_files or force_blocks or is_hybrid:
        config_message = gateways_config_pb2.TGatewaysConfig()
        if http_files:
            schema = config_message.Fs.CustomSchemes.add()
            schema.Pattern = 'http_test://(.*)'
            schema.TargetUrl = yql_http_file_server.compose_http_link('$1')
        if force_blocks:
            config_message.SqlCore.TranslationFlags.extend(['EmitAggApply'])
            flags = config_message.YqlCore.Flags.add()
            flags.Name = 'UseBlocks'
        if is_hybrid:
            activate_hybrid = config_message.Yt.DefaultSettings.add()
            activate_hybrid.Name = "HybridDqExecution"
            activate_hybrid.Value = "1"
            deactivate_dq = config_message.Dq.DefaultSettings.add()
            deactivate_dq.Name = "AnalyzeQuery"
            deactivate_dq.Value = "0"
        config = text_format.MessageToString(config_message)

    return config


def is_hybrid(provider):
    return provider == 'hybrid'


def check_provider(provider, config):
    if provider not in get_supported_providers(config):
        pytest.skip('%s provider is not supported here' % provider)


def get_sql_query(provider, suite, case, config):
    pragmas = get_pragmas(config)

    if get_param('TARGET_PLATFORM'):
        if "yson" in case or "regexp" in case or "match" in case:
            pytest.skip('yson/match/regexp is not supported on non-default target platform')

    if get_param('TARGET_PLATFORM') and is_xfail(config):
        pytest.skip('xfail is not supported on non-default target platform')

    program_sql = os.path.join(DATA_PATH, suite, '%s.sql' % case)

    with codecs.open(program_sql, encoding='utf-8') as program_file_descr:
        sql_query = program_file_descr.read()
        if get_param('TARGET_PLATFORM'):
            if "Yson::" in sql_query:
                pytest.skip('yson udf is not supported on non-default target platform')
        if (provider + 'file can not' in sql_query) or (is_hybrid(provider) and ('ytfile can not' in sql_query)):
            pytest.skip(provider + ' can not execute this')

    pragmas.append(sql_query)
    sql_query = ';\n'.join(pragmas)
    if provider != 'yt' and 'Javascript' in sql_query:
        pytest.skip('ScriptUdf')

    assert 'UseBlocks' not in sql_query, 'UseBlocks should not be used directly, only via ForceBlocks'

    return sql_query


def run_file_no_cache(provider, suite, case, cfg, config, yql_http_file_server, yqlrun_binary=None, extra_args=[], force_blocks=False):
    check_provider(provider, config)

    sql_query = get_sql_query(provider, suite, case, config)
    sql_query = replace_vars(sql_query, "yqlrun_var")

    xfail = is_xfail(config)

    in_tables, out_tables = get_tables(suite, config, DATA_PATH, def_attr=KSV_ATTR)
    files = get_files(suite, config, DATA_PATH)
    http_files = get_http_files(suite, config, DATA_PATH)
    http_files_urls = yql_http_file_server.register_files({}, http_files)

    for table in in_tables:
        if cyson.loads(table.attr).get("type") == "document":
            content = table.content
        else:
            content = table.attr
        if provider != 'yt' and 'Javascript' in content:
            pytest.skip('ScriptUdf')

    parameters = get_parameters_json(suite, config)

    yqlrun = YQLRun(
        prov=provider,
        keep_temp=not re.search(r"yt\.ReleaseTempData", sql_query),
        binary=yqlrun_binary,
        gateway_config=get_gateways_config(http_files, yql_http_file_server, force_blocks=force_blocks, is_hybrid=is_hybrid(provider)),
        extra_args=extra_args,
        udfs_dir=yql_binary_path('yql/essentials/tests/common/test_framework/udfs_deps')
    )

    res, tables_res = execute_sql(
        yqlrun,
        program=sql_query,
        input_tables=in_tables,
        output_tables=out_tables,
        files=files,
        urls=http_files_urls,
        check_error=not xfail,
        verbose=True,
        parameters=parameters)

    fixed_stderr = res.std_err
    if xfail:
        assert res.execution_result.exit_code != 0
        do_custom_error_check(res, sql_query)
        fixed_stderr = None

    fixed_result = YQLExecResult(res.std_out,
                                 fixed_stderr,
                                 res.results,
                                 res.results_file,
                                 res.opt,
                                 res.opt_file,
                                 res.plan,
                                 res.plan_file,
                                 res.program,
                                 res.execution_result,
                                 None)

    return fixed_result, tables_res


def run_file(provider, suite, case, cfg, config, yql_http_file_server, yqlrun_binary=None, extra_args=[], force_blocks=False):
    if (suite, case, cfg) not in run_file.cache:
        run_file.cache[(suite, case, cfg)] = run_file_no_cache(provider, suite, case, cfg, config, yql_http_file_server, yqlrun_binary, extra_args, force_blocks=force_blocks)

    return run_file.cache[(suite, case, cfg)]


run_file.cache = {}