summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoraneporada <[email protected]>2025-03-10 14:57:14 +0300
committeraneporada <[email protected]>2025-03-10 15:11:53 +0300
commit170ddbacc61d815df8eee981deb1316a689b8e3b (patch)
tree177fec16a466e5f19c575dad3318c3f2e7185e63
parent4fb6424a2c472a1d73cf6ca59f73021e06e8484a (diff)
Introduce yt.RuntimeCluster/Selection + test infrastructure changes
yt.RuntimeClusterSelection and yt.RuntimeClster fix Add default yt cluster for yt_file tests Revert "Add plato + banach to yt_file/yt configs" This reverts commit 83b85e67d753abed32fc620991b0ddd7ea58dcee. Add plato + banach to yt_file/yt configs add MrCluster setting Support yt.MrCluster commit_hash:e046a70ebea88ca0adcd497ce2e66d75a3c339ff
-rw-r--r--yql/essentials/tests/common/test_framework/test_file_common.py15
-rw-r--r--yql/essentials/tests/common/test_framework/yql_utils.py15
-rw-r--r--yql/tools/yqlrun/lib/yqlrun_lib.cpp9
-rw-r--r--yt/yql/providers/yt/common/yql_yt_settings.cpp10
-rw-r--r--yt/yql/providers/yt/common/yql_yt_settings.h8
5 files changed, 49 insertions, 8 deletions
diff --git a/yql/essentials/tests/common/test_framework/test_file_common.py b/yql/essentials/tests/common/test_framework/test_file_common.py
index 69d5af2c7a6..240182e0056 100644
--- a/yql/essentials/tests/common/test_framework/test_file_common.py
+++ b/yql/essentials/tests/common/test_framework/test_file_common.py
@@ -14,10 +14,10 @@ from yqlrun import YQLRun
from test_utils import get_parameters_json, replace_vars
-def get_gateways_config(http_files, yql_http_file_server, force_blocks=False, is_hybrid=False, allow_llvm=True):
+def get_gateways_config(http_files, yql_http_file_server, force_blocks=False, is_hybrid=False, allow_llvm=True, postprocess_func=None):
config = None
- if http_files or force_blocks or is_hybrid or not allow_llvm:
+ if http_files or force_blocks or is_hybrid or not allow_llvm or postprocess_func is not None:
config_message = gateways_config_pb2.TGatewaysConfig()
if http_files:
schema = config_message.Fs.CustomSchemes.add()
@@ -37,6 +37,8 @@ def get_gateways_config(http_files, yql_http_file_server, force_blocks=False, is
if not allow_llvm:
flags = config_message.YqlCore.Flags.add()
flags.Name = 'LLVM_OFF'
+ if postprocess_func is not None:
+ postprocess_func(config_message)
config = text_format.MessageToString(config_message)
return config
@@ -86,7 +88,7 @@ def get_sql_query(provider, suite, case, config, data_path=None, template='.sql'
def run_file_no_cache(provider, suite, case, cfg, config, yql_http_file_server,
yqlrun_binary=None, extra_args=[], force_blocks=False, allow_llvm=True, data_path=None,
- run_sql=True):
+ run_sql=True, cfg_postprocess=None):
check_provider(provider, config)
sql_query = get_sql_query(provider, suite, case, config, data_path, template='.sql' if run_sql else '.yqls')
@@ -114,7 +116,8 @@ def run_file_no_cache(provider, suite, case, cfg, config, yql_http_file_server,
prov=provider,
keep_temp=not re.search(r"yt\.ReleaseTempData", sql_query),
binary=yqlrun_binary,
- gateway_config=get_gateways_config(http_files, yql_http_file_server, force_blocks=force_blocks, is_hybrid=is_hybrid(provider), allow_llvm=allow_llvm),
+ gateway_config=get_gateways_config(http_files, yql_http_file_server, force_blocks=force_blocks, is_hybrid=is_hybrid(provider), allow_llvm=allow_llvm,
+ postprocess_func=cfg_postprocess),
extra_args=extra_args,
udfs_dir=yql_binary_path('yql/essentials/tests/common/test_framework/udfs_deps')
)
@@ -153,12 +156,12 @@ def run_file_no_cache(provider, suite, case, cfg, config, yql_http_file_server,
def run_file(provider, suite, case, cfg, config, yql_http_file_server, yqlrun_binary=None,
- extra_args=[], force_blocks=False, allow_llvm=True, data_path=None, run_sql=True):
+ extra_args=[], force_blocks=False, allow_llvm=True, data_path=None, run_sql=True, cfg_postprocess=None):
if (suite, case, cfg) not in run_file.cache:
run_file.cache[(suite, case, cfg)] = \
run_file_no_cache(provider, suite, case, cfg, config, yql_http_file_server,
yqlrun_binary, extra_args, force_blocks=force_blocks, allow_llvm=allow_llvm,
- data_path=data_path, run_sql=run_sql)
+ data_path=data_path, run_sql=run_sql, cfg_postprocess=cfg_postprocess)
return run_file.cache[(suite, case, cfg)]
diff --git a/yql/essentials/tests/common/test_framework/yql_utils.py b/yql/essentials/tests/common/test_framework/yql_utils.py
index 2136729be9b..3e4a4afa3fe 100644
--- a/yql/essentials/tests/common/test_framework/yql_utils.py
+++ b/yql/essentials/tests/common/test_framework/yql_utils.py
@@ -151,13 +151,15 @@ Table = namedtuple('Table', (
'yqlrun_file',
'attr',
'format',
- 'exists'
+ 'exists',
+ 'cluster'
))
def new_table(full_name, file_path=None, yqlrun_file=None, content=None, res_dir=None,
attr=None, format_name='yson', def_attr=None, should_exist=False, src_file_alternative=None):
assert '.' in full_name, 'expected name like cedar.Input'
+ cluster = full_name.split('.')[0]
name = '.'.join(full_name.split('.')[1:])
if res_dir is None:
@@ -231,7 +233,8 @@ def new_table(full_name, file_path=None, yqlrun_file=None, content=None, res_dir
new_yqlrun_file,
attr,
format_name,
- exists
+ exists,
+ cluster
)
@@ -463,6 +466,14 @@ def get_tables(suite, cfg, data_path, def_attr=None):
return in_tables, out_tables
+def get_table_clusters(suite, cfg, data_path):
+ in_tables, out_tables = get_tables(suite, cfg, data_path)
+ clusters = set()
+ for t in in_tables + out_tables:
+ clusters.add(t.cluster)
+ return clusters
+
+
def get_supported_providers(cfg):
providers = 'yt', 'kikimr', 'dq', 'hybrid'
for item in cfg:
diff --git a/yql/tools/yqlrun/lib/yqlrun_lib.cpp b/yql/tools/yqlrun/lib/yqlrun_lib.cpp
index fdb3d28457d..3dc35b4f13e 100644
--- a/yql/tools/yqlrun/lib/yqlrun_lib.cpp
+++ b/yql/tools/yqlrun/lib/yqlrun_lib.cpp
@@ -94,6 +94,15 @@ TYqlRunTool::TYqlRunTool()
opts.AddLongOption("validate-result-format", "Check that result-format can parse Result").NoArgument().SetFlag(&GetRunOptions().ValidateResultFormat);
});
+ GetRunOptions().AddOptHandler([this](const NLastGetopt::TOptsParseResult& res) {
+ Y_UNUSED(res);
+
+ if (GetRunOptions().GatewaysConfig) {
+ auto ytConfig = GetRunOptions().GatewaysConfig->GetYt();
+ FillClusterMapping(ytConfig, TString{YtProviderName});
+ }
+ });
+
GetRunOptions().SetSupportedGateways({TString{YtProviderName}});
GetRunOptions().GatewayTypes.emplace(YtProviderName);
AddClusterMapping(TString{"plato"}, TString{YtProviderName});
diff --git a/yt/yql/providers/yt/common/yql_yt_settings.cpp b/yt/yql/providers/yt/common/yql_yt_settings.cpp
index b022cb55ba5..aabc175e081 100644
--- a/yt/yql/providers/yt/common/yql_yt_settings.cpp
+++ b/yt/yql/providers/yt/common/yql_yt_settings.cpp
@@ -531,6 +531,16 @@ TYtConfiguration::TYtConfiguration(TTypeAnnotationContext& typeCtx)
REGISTER_SETTING(*this, CompactForDistinct);
REGISTER_SETTING(*this, DropUnusedKeysFromKeyFilter);
REGISTER_SETTING(*this, ReportEquiJoinStats);
+ REGISTER_SETTING(*this, RuntimeCluster)
+ .Validator([this] (const TString& cluster, TString value) {
+ if (cluster != "$all") {
+ throw yexception() << "Per-cluster setting is not supported for RuntimeCluster";
+ }
+ if (!ValidClusters.contains(value)) {
+ throw yexception() << "Unknown cluster name: " << value;
+ }
+ });
+ REGISTER_SETTING(*this, RuntimeClusterSelection).Parser([](const TString& v) { return FromString<ERuntimeClusterSelectionMode>(v); });
}
EReleaseTempDataMode GetReleaseTempDataMode(const TYtSettings& settings) {
diff --git a/yt/yql/providers/yt/common/yql_yt_settings.h b/yt/yql/providers/yt/common/yql_yt_settings.h
index 53dfcfa6ef1..02656ca051a 100644
--- a/yt/yql/providers/yt/common/yql_yt_settings.h
+++ b/yt/yql/providers/yt/common/yql_yt_settings.h
@@ -76,6 +76,12 @@ enum class EBlockOutputMode {
Force /* "force" */,
};
+enum class ERuntimeClusterSelectionMode {
+ Disable /* "disable" */,
+ Auto /* "auto" */,
+ Force /* "force" */,
+};
+
struct TYtSettings {
using TConstPtr = std::shared_ptr<const TYtSettings>;
@@ -115,6 +121,7 @@ struct TYtSettings {
NCommon::TConfSetting<EInferSchemaMode, false> InferSchemaMode;
NCommon::TConfSetting<ui32, false> BatchListFolderConcurrency;
NCommon::TConfSetting<bool, false> ForceTmpSecurity;
+ NCommon::TConfSetting<ERuntimeClusterSelectionMode, false> RuntimeClusterSelection;
// Job runtime
NCommon::TConfSetting<TString, true> Pool;
@@ -212,6 +219,7 @@ struct TYtSettings {
NCommon::TConfSetting<TSet<TString>, true> BlockReaderSupportedTypes;
NCommon::TConfSetting<TSet<NUdf::EDataSlot>, true> BlockReaderSupportedDataTypes;
NCommon::TConfSetting<TString, true> _BinaryCacheFolder;
+ NCommon::TConfSetting<TString, true> RuntimeCluster;
// Optimizers
NCommon::TConfSetting<bool, true> _EnableDq;