diff options
author | Alexander Smirnov <alex@ydb.tech> | 2024-11-20 11:14:58 +0000 |
---|---|---|
committer | Alexander Smirnov <alex@ydb.tech> | 2024-11-20 11:14:58 +0000 |
commit | 31773f157bf8164364649b5f470f52dece0a4317 (patch) | |
tree | 33d0f7eef45303ab68cf08ab381ce5e5e36c5240 /yql/essentials/tests/common/test_framework/yqlrun.py | |
parent | 2c7938962d8689e175574fc1e817c05049f27905 (diff) | |
parent | eff600952d5dfe17942f38f510a8ac2b203bb3a5 (diff) | |
download | ydb-31773f157bf8164364649b5f470f52dece0a4317.tar.gz |
Merge branch 'rightlib' into mergelibs-241120-1113
Diffstat (limited to 'yql/essentials/tests/common/test_framework/yqlrun.py')
-rw-r--r-- | yql/essentials/tests/common/test_framework/yqlrun.py | 346 |
1 files changed, 346 insertions, 0 deletions
diff --git a/yql/essentials/tests/common/test_framework/yqlrun.py b/yql/essentials/tests/common/test_framework/yqlrun.py new file mode 100644 index 0000000000..b96641912a --- /dev/null +++ b/yql/essentials/tests/common/test_framework/yqlrun.py @@ -0,0 +1,346 @@ +import os +import shutil +import yatest.common +import yql_utils +import cyson as yson +import yql.essentials.providers.common.proto.gateways_config_pb2 as gateways_config_pb2 +import yql.essentials.core.file_storage.proto.file_storage_pb2 as file_storage_pb2 + +import six + +from google.protobuf import text_format + +ARCADIA_PREFIX = 'arcadia/' +ARCADIA_TESTS_DATA_PREFIX = 'arcadia_tests_data/' + +VAR_CHAR_PREFIX = '$' +FIX_DIR_PREFIXES = { + 'SOURCE': yatest.common.source_path, + 'BUILD': yatest.common.build_path, + 'TEST_SOURCE': yatest.common.test_source_path, + 'DATA': yatest.common.data_path, + 'BINARY': yatest.common.binary_path, +} + + +class YQLRun(object): + + def __init__(self, udfs_dir=None, prov='yt', use_sql2yql=False, keep_temp=True, binary=None, gateway_config=None, fs_config=None, extra_args=[], cfg_dir=None, support_udfs=True): + if binary is None: + self.yqlrun_binary = yql_utils.yql_binary_path(os.getenv('YQL_YQLRUN_PATH') or 'contrib/ydb/library/yql/tools/yqlrun/yqlrun') + else: + self.yqlrun_binary = binary + self.extra_args = extra_args + + try: + self.sql2yql_binary = yql_utils.yql_binary_path(os.getenv('YQL_SQL2YQL_PATH') or 'yql/essentials/tools/sql2yql/sql2yql') + except BaseException: + self.sql2yql_binary = None + + try: + self.udf_resolver_binary = yql_utils.yql_binary_path(os.getenv('YQL_UDFRESOLVER_PATH') or 'yql/essentials/tools/udf_resolver/udf_resolver') + except Exception: + self.udf_resolver_binary = None + + if support_udfs: + if udfs_dir is None: + self.udfs_path = yql_utils.get_udfs_path() + else: + self.udfs_path = udfs_dir + else: + self.udfs_path = None + + res_dir = yql_utils.get_yql_dir(prefix='yqlrun_') + self.res_dir = res_dir + self.tables = {} + self.prov = prov + self.use_sql2yql = use_sql2yql + self.keep_temp = keep_temp + + self.gateway_config = gateways_config_pb2.TGatewaysConfig() + if gateway_config is not None: + text_format.Merge(gateway_config, self.gateway_config) + + yql_utils.merge_default_gateway_cfg(cfg_dir or 'yql/essentials/cfg/tests', self.gateway_config) + + self.fs_config = file_storage_pb2.TFileStorageConfig() + + with open(yql_utils.yql_source_path(os.path.join(cfg_dir or 'yql/essentials/cfg/tests', 'fs.conf'))) as f: + text_format.Merge(f.read(), self.fs_config) + + if fs_config is not None: + text_format.Merge(fs_config, self.fs_config) + + if yql_utils.get_param('USE_NATIVE_YT_TYPES'): + attr = self.gateway_config.Yt.DefaultSettings.add() + attr.Name = 'UseNativeYtTypes' + attr.Value = 'true' + + if yql_utils.get_param('SQL_FLAGS'): + flags = yql_utils.get_param('SQL_FLAGS').split(',') + self.gateway_config.SqlCore.TranslationFlags.extend(flags) + + def yql_exec(self, program=None, program_file=None, files=None, urls=None, + run_sql=False, verbose=False, check_error=True, tables=None, pretty_plan=True, + wait=True, parameters={}, extra_env={}, require_udf_resolver=False, scan_udfs=True): + del pretty_plan + + res_dir = self.res_dir + + def res_file_path(name): + return os.path.join(res_dir, name) + + opt_file = res_file_path('opt.yql') + results_file = res_file_path('results.txt') + plan_file = res_file_path('plan.txt') + err_file = res_file_path('err.txt') + + udfs_dir = self.udfs_path + prov = self.prov + + program, program_file = yql_utils.prepare_program(program, program_file, res_dir, + ext='sql' if run_sql else 'yql') + + syntax_version = yql_utils.get_syntax_version(program) + ansi_lexer = yql_utils.ansi_lexer_enabled(program) + + if run_sql and self.use_sql2yql: + orig_sql = program_file + '.orig_sql' + shutil.copy2(program_file, orig_sql) + cmd = [ + self.sql2yql_binary, + orig_sql, + '--yql', + '--output=' + program_file, + '--syntax-version=%d' % syntax_version + ] + if ansi_lexer: + cmd.append('--ansi-lexer') + env = {'YQL_DETERMINISTIC_MODE': '1'} + env.update(extra_env) + for var in [ + 'LLVM_PROFILE_FILE', + 'GO_COVERAGE_PREFIX', + 'PYTHON_COVERAGE_PREFIX', + 'NLG_COVERAGE_FILENAME', + 'YQL_EXPORT_PG_FUNCTIONS_DIR', + 'YQL_ALLOW_ALL_PG_FUNCTIONS', + ]: + if var in os.environ: + env[var] = os.environ[var] + yatest.common.process.execute(cmd, cwd=res_dir, env=env) + + with open(program_file) as f: + yql_program = f.read() + with open(program_file, 'w') as f: + f.write(yql_program) + + gateways_cfg_file = res_file_path('gateways.conf') + with open(gateways_cfg_file, 'w') as f: + f.write(str(self.gateway_config)) + + fs_cfg_file = res_file_path('fs.conf') + with open(fs_cfg_file, 'w') as f: + f.write(str(self.fs_config)) + + cmd = self.yqlrun_binary + ' ' + + if yql_utils.get_param('TRACE_OPT'): + cmd += '--trace-opt ' + + cmd += '-L ' \ + '--program=%(program_file)s ' \ + '--expr-file=%(opt_file)s ' \ + '--result-file=%(results_file)s ' \ + '--plan-file=%(plan_file)s ' \ + '--err-file=%(err_file)s ' \ + '--gateways=%(prov)s ' \ + '--syntax-version=%(syntax_version)d ' \ + '--tmp-dir=%(res_dir)s ' \ + '--gateways-cfg=%(gateways_cfg_file)s ' \ + '--fs-cfg=%(fs_cfg_file)s ' % locals() + + if self.udfs_path is not None: + cmd += '--udfs-dir=%(udfs_dir)s ' % locals() + + if ansi_lexer: + cmd += '--ansi-lexer ' + + if self.keep_temp: + cmd += '--keep-temp ' + + if self.extra_args: + cmd += " ".join(self.extra_args) + " " + + cmd += '--mounts=' + yql_utils.get_mount_config_file() + ' ' + cmd += '--validate-result-format ' + + if files: + for f in files: + if files[f].startswith(ARCADIA_PREFIX): # how does it work with folders? and does it? + files[f] = yatest.common.source_path(files[f][len(ARCADIA_PREFIX):]) + continue + if files[f].startswith(ARCADIA_TESTS_DATA_PREFIX): + files[f] = yatest.common.data_path(files[f][len(ARCADIA_TESTS_DATA_PREFIX):]) + continue + + if files[f].startswith(VAR_CHAR_PREFIX): + for prefix, func in six.iteritems(FIX_DIR_PREFIXES): + if files[f].startswith(VAR_CHAR_PREFIX + prefix): + real_path = func(files[f][len(prefix) + 2:]) # $ + prefix + / + break + else: + raise Exception("unknown prefix in file path %s" % (files[f],)) + copy_dest = os.path.join(res_dir, f) + if not os.path.exists(os.path.dirname(copy_dest)): + os.makedirs(os.path.dirname(copy_dest)) + shutil.copy2( + real_path, + copy_dest, + ) + files[f] = f + continue + + if not files[f].startswith('/'): # why do we check files[f] instead of f here? + path_to_copy = os.path.join( + yatest.common.work_path(), + files[f] + ) + if '/' in files[f]: + copy_dest = os.path.join( + res_dir, + os.path.dirname(files[f]) + ) + if not os.path.exists(copy_dest): + os.makedirs(copy_dest) + else: + copy_dest = res_dir + files[f] = os.path.basename(files[f]) + shutil.copy2(path_to_copy, copy_dest) + else: + shutil.copy2(files[f], res_dir) + files[f] = os.path.basename(files[f]) + cmd += yql_utils.get_cmd_for_files('--file', files) + + if urls: + cmd += yql_utils.get_cmd_for_files('--url', urls) + + optimize_only = False + if tables: + for table in tables: + self.tables[table.full_name] = table + if table.format != 'yson': + optimize_only = True + for name in self.tables: + cmd += '--table=yt.%s@%s ' % (name, self.tables[name].yqlrun_file) + + if "--lineage" not in self.extra_args: + if optimize_only: + cmd += '-O ' + else: + cmd += '--run ' + + if yql_utils.get_param('UDF_RESOLVER') or require_udf_resolver: + assert self.udf_resolver_binary, "Missing udf_resolver binary" + cmd += '--udf-resolver=' + self.udf_resolver_binary + ' ' + if scan_udfs: + cmd += '--scan-udfs ' + if not yatest.common.context.sanitize: + cmd += '--udf-resolver-filter-syscalls ' + + if run_sql and not self.use_sql2yql: + cmd += '--sql ' + + if parameters: + parameters_file = res_file_path('params.yson') + with open(parameters_file, 'w') as f: + f.write(six.ensure_str(yson.dumps(parameters))) + cmd += '--params-file=%s ' % parameters_file + + if verbose: + yql_utils.log('prov is ' + self.prov) + + env = {'YQL_DETERMINISTIC_MODE': '1'} + env.update(extra_env) + for var in [ + 'LLVM_PROFILE_FILE', + 'GO_COVERAGE_PREFIX', + 'PYTHON_COVERAGE_PREFIX', + 'NLG_COVERAGE_FILENAME', + 'YQL_EXPORT_PG_FUNCTIONS_DIR', + 'YQL_ALLOW_ALL_PG_FUNCTIONS', + ]: + if var in os.environ: + env[var] = os.environ[var] + if yql_utils.get_param('STDERR'): + debug_udfs_dir = os.path.join(os.path.abspath('.'), '..', '..', '..') + env_setters = ";".join("{}={}".format(k, v) for k, v in six.iteritems(env)) + yql_utils.log('GDB launch command:') + yql_utils.log('(cd "%s" && %s ya tool gdb --args %s)' % (res_dir, env_setters, cmd.replace(udfs_dir, debug_udfs_dir))) + + proc_result = yatest.common.process.execute(cmd.strip().split(), check_exit_code=False, cwd=res_dir, env=env) + if proc_result.exit_code != 0 and check_error: + with open(err_file, 'r') as f: + err_file_text = f.read() + assert 0, \ + 'Command\n%(command)s\n finished with exit code %(code)d, stderr:\n\n%(stderr)s\n\nerror file:\n%(err_file)s' % { + 'command': cmd, + 'code': proc_result.exit_code, + 'stderr': proc_result.std_err, + 'err_file': err_file_text + } + + if os.path.exists(results_file) and os.stat(results_file).st_size == 0: + os.unlink(results_file) # kikimr yql-exec compatibility + + results, log_results = yql_utils.read_res_file(results_file) + plan, log_plan = yql_utils.read_res_file(plan_file) + opt, log_opt = yql_utils.read_res_file(opt_file) + err, log_err = yql_utils.read_res_file(err_file) + + if verbose: + yql_utils.log('PROGRAM:') + yql_utils.log(program) + yql_utils.log('OPT:') + yql_utils.log(log_opt) + yql_utils.log('PLAN:') + yql_utils.log(log_plan) + yql_utils.log('RESULTS:') + yql_utils.log(log_results) + yql_utils.log('ERROR:') + yql_utils.log(log_err) + + return yql_utils.YQLExecResult( + proc_result.std_out, + yql_utils.normalize_source_code_path(err.replace(res_dir, '<tmp_path>')), + results, + results_file, + opt, + opt_file, + plan, + plan_file, + program, + proc_result, + None + ) + + def create_empty_tables(self, tables): + pass + + def write_tables(self, tables): + pass + + def get_tables(self, tables): + res = {} + for table in tables: + # recreate table after yql program was executed + res[table.full_name] = yql_utils.new_table( + table.full_name, + yqlrun_file=self.tables[table.full_name].yqlrun_file, + res_dir=self.res_dir + ) + + yql_utils.log('YQLRun table ' + table.full_name) + yql_utils.log(res[table.full_name].content) + + return res |