diff options
author | aneporada <[email protected]> | 2025-03-10 14:57:14 +0300 |
---|---|---|
committer | aneporada <[email protected]> | 2025-03-10 15:11:53 +0300 |
commit | 170ddbacc61d815df8eee981deb1316a689b8e3b (patch) | |
tree | 177fec16a466e5f19c575dad3318c3f2e7185e63 | |
parent | 4fb6424a2c472a1d73cf6ca59f73021e06e8484a (diff) |
Introduce yt.RuntimeCluster/Selection + test infrastructure changes
yt.RuntimeClusterSelection and yt.RuntimeClster
fix
Add default yt cluster for yt_file tests
Revert "Add plato + banach to yt_file/yt configs"
This reverts commit 83b85e67d753abed32fc620991b0ddd7ea58dcee.
Add plato + banach to yt_file/yt configs
add MrCluster setting
Support yt.MrCluster
commit_hash:e046a70ebea88ca0adcd497ce2e66d75a3c339ff
-rw-r--r-- | yql/essentials/tests/common/test_framework/test_file_common.py | 15 | ||||
-rw-r--r-- | yql/essentials/tests/common/test_framework/yql_utils.py | 15 | ||||
-rw-r--r-- | yql/tools/yqlrun/lib/yqlrun_lib.cpp | 9 | ||||
-rw-r--r-- | yt/yql/providers/yt/common/yql_yt_settings.cpp | 10 | ||||
-rw-r--r-- | yt/yql/providers/yt/common/yql_yt_settings.h | 8 |
5 files changed, 49 insertions, 8 deletions
diff --git a/yql/essentials/tests/common/test_framework/test_file_common.py b/yql/essentials/tests/common/test_framework/test_file_common.py index 69d5af2c7a6..240182e0056 100644 --- a/yql/essentials/tests/common/test_framework/test_file_common.py +++ b/yql/essentials/tests/common/test_framework/test_file_common.py @@ -14,10 +14,10 @@ from yqlrun import YQLRun from test_utils import get_parameters_json, replace_vars -def get_gateways_config(http_files, yql_http_file_server, force_blocks=False, is_hybrid=False, allow_llvm=True): +def get_gateways_config(http_files, yql_http_file_server, force_blocks=False, is_hybrid=False, allow_llvm=True, postprocess_func=None): config = None - if http_files or force_blocks or is_hybrid or not allow_llvm: + if http_files or force_blocks or is_hybrid or not allow_llvm or postprocess_func is not None: config_message = gateways_config_pb2.TGatewaysConfig() if http_files: schema = config_message.Fs.CustomSchemes.add() @@ -37,6 +37,8 @@ def get_gateways_config(http_files, yql_http_file_server, force_blocks=False, is if not allow_llvm: flags = config_message.YqlCore.Flags.add() flags.Name = 'LLVM_OFF' + if postprocess_func is not None: + postprocess_func(config_message) config = text_format.MessageToString(config_message) return config @@ -86,7 +88,7 @@ def get_sql_query(provider, suite, case, config, data_path=None, template='.sql' def run_file_no_cache(provider, suite, case, cfg, config, yql_http_file_server, yqlrun_binary=None, extra_args=[], force_blocks=False, allow_llvm=True, data_path=None, - run_sql=True): + run_sql=True, cfg_postprocess=None): check_provider(provider, config) sql_query = get_sql_query(provider, suite, case, config, data_path, template='.sql' if run_sql else '.yqls') @@ -114,7 +116,8 @@ def run_file_no_cache(provider, suite, case, cfg, config, yql_http_file_server, prov=provider, keep_temp=not re.search(r"yt\.ReleaseTempData", sql_query), binary=yqlrun_binary, - gateway_config=get_gateways_config(http_files, yql_http_file_server, force_blocks=force_blocks, is_hybrid=is_hybrid(provider), allow_llvm=allow_llvm), + gateway_config=get_gateways_config(http_files, yql_http_file_server, force_blocks=force_blocks, is_hybrid=is_hybrid(provider), allow_llvm=allow_llvm, + postprocess_func=cfg_postprocess), extra_args=extra_args, udfs_dir=yql_binary_path('yql/essentials/tests/common/test_framework/udfs_deps') ) @@ -153,12 +156,12 @@ def run_file_no_cache(provider, suite, case, cfg, config, yql_http_file_server, def run_file(provider, suite, case, cfg, config, yql_http_file_server, yqlrun_binary=None, - extra_args=[], force_blocks=False, allow_llvm=True, data_path=None, run_sql=True): + extra_args=[], force_blocks=False, allow_llvm=True, data_path=None, run_sql=True, cfg_postprocess=None): if (suite, case, cfg) not in run_file.cache: run_file.cache[(suite, case, cfg)] = \ run_file_no_cache(provider, suite, case, cfg, config, yql_http_file_server, yqlrun_binary, extra_args, force_blocks=force_blocks, allow_llvm=allow_llvm, - data_path=data_path, run_sql=run_sql) + data_path=data_path, run_sql=run_sql, cfg_postprocess=cfg_postprocess) return run_file.cache[(suite, case, cfg)] diff --git a/yql/essentials/tests/common/test_framework/yql_utils.py b/yql/essentials/tests/common/test_framework/yql_utils.py index 2136729be9b..3e4a4afa3fe 100644 --- a/yql/essentials/tests/common/test_framework/yql_utils.py +++ b/yql/essentials/tests/common/test_framework/yql_utils.py @@ -151,13 +151,15 @@ Table = namedtuple('Table', ( 'yqlrun_file', 'attr', 'format', - 'exists' + 'exists', + 'cluster' )) def new_table(full_name, file_path=None, yqlrun_file=None, content=None, res_dir=None, attr=None, format_name='yson', def_attr=None, should_exist=False, src_file_alternative=None): assert '.' in full_name, 'expected name like cedar.Input' + cluster = full_name.split('.')[0] name = '.'.join(full_name.split('.')[1:]) if res_dir is None: @@ -231,7 +233,8 @@ def new_table(full_name, file_path=None, yqlrun_file=None, content=None, res_dir new_yqlrun_file, attr, format_name, - exists + exists, + cluster ) @@ -463,6 +466,14 @@ def get_tables(suite, cfg, data_path, def_attr=None): return in_tables, out_tables +def get_table_clusters(suite, cfg, data_path): + in_tables, out_tables = get_tables(suite, cfg, data_path) + clusters = set() + for t in in_tables + out_tables: + clusters.add(t.cluster) + return clusters + + def get_supported_providers(cfg): providers = 'yt', 'kikimr', 'dq', 'hybrid' for item in cfg: diff --git a/yql/tools/yqlrun/lib/yqlrun_lib.cpp b/yql/tools/yqlrun/lib/yqlrun_lib.cpp index fdb3d28457d..3dc35b4f13e 100644 --- a/yql/tools/yqlrun/lib/yqlrun_lib.cpp +++ b/yql/tools/yqlrun/lib/yqlrun_lib.cpp @@ -94,6 +94,15 @@ TYqlRunTool::TYqlRunTool() opts.AddLongOption("validate-result-format", "Check that result-format can parse Result").NoArgument().SetFlag(&GetRunOptions().ValidateResultFormat); }); + GetRunOptions().AddOptHandler([this](const NLastGetopt::TOptsParseResult& res) { + Y_UNUSED(res); + + if (GetRunOptions().GatewaysConfig) { + auto ytConfig = GetRunOptions().GatewaysConfig->GetYt(); + FillClusterMapping(ytConfig, TString{YtProviderName}); + } + }); + GetRunOptions().SetSupportedGateways({TString{YtProviderName}}); GetRunOptions().GatewayTypes.emplace(YtProviderName); AddClusterMapping(TString{"plato"}, TString{YtProviderName}); diff --git a/yt/yql/providers/yt/common/yql_yt_settings.cpp b/yt/yql/providers/yt/common/yql_yt_settings.cpp index b022cb55ba5..aabc175e081 100644 --- a/yt/yql/providers/yt/common/yql_yt_settings.cpp +++ b/yt/yql/providers/yt/common/yql_yt_settings.cpp @@ -531,6 +531,16 @@ TYtConfiguration::TYtConfiguration(TTypeAnnotationContext& typeCtx) REGISTER_SETTING(*this, CompactForDistinct); REGISTER_SETTING(*this, DropUnusedKeysFromKeyFilter); REGISTER_SETTING(*this, ReportEquiJoinStats); + REGISTER_SETTING(*this, RuntimeCluster) + .Validator([this] (const TString& cluster, TString value) { + if (cluster != "$all") { + throw yexception() << "Per-cluster setting is not supported for RuntimeCluster"; + } + if (!ValidClusters.contains(value)) { + throw yexception() << "Unknown cluster name: " << value; + } + }); + REGISTER_SETTING(*this, RuntimeClusterSelection).Parser([](const TString& v) { return FromString<ERuntimeClusterSelectionMode>(v); }); } EReleaseTempDataMode GetReleaseTempDataMode(const TYtSettings& settings) { diff --git a/yt/yql/providers/yt/common/yql_yt_settings.h b/yt/yql/providers/yt/common/yql_yt_settings.h index 53dfcfa6ef1..02656ca051a 100644 --- a/yt/yql/providers/yt/common/yql_yt_settings.h +++ b/yt/yql/providers/yt/common/yql_yt_settings.h @@ -76,6 +76,12 @@ enum class EBlockOutputMode { Force /* "force" */, }; +enum class ERuntimeClusterSelectionMode { + Disable /* "disable" */, + Auto /* "auto" */, + Force /* "force" */, +}; + struct TYtSettings { using TConstPtr = std::shared_ptr<const TYtSettings>; @@ -115,6 +121,7 @@ struct TYtSettings { NCommon::TConfSetting<EInferSchemaMode, false> InferSchemaMode; NCommon::TConfSetting<ui32, false> BatchListFolderConcurrency; NCommon::TConfSetting<bool, false> ForceTmpSecurity; + NCommon::TConfSetting<ERuntimeClusterSelectionMode, false> RuntimeClusterSelection; // Job runtime NCommon::TConfSetting<TString, true> Pool; @@ -212,6 +219,7 @@ struct TYtSettings { NCommon::TConfSetting<TSet<TString>, true> BlockReaderSupportedTypes; NCommon::TConfSetting<TSet<NUdf::EDataSlot>, true> BlockReaderSupportedDataTypes; NCommon::TConfSetting<TString, true> _BinaryCacheFolder; + NCommon::TConfSetting<TString, true> RuntimeCluster; // Optimizers NCommon::TConfSetting<bool, true> _EnableDq; |