aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorqrort <31865255+qrort@users.noreply.github.com>2024-01-29 16:50:19 +0300
committerGitHub <noreply@github.com>2024-01-29 16:50:19 +0300
commitf5290a799aa007d25596748b6bbabe2d2399dda1 (patch)
tree6f9122b5f6a47b52671bf355c0f2fe13ec6fad2b
parent156cd635a6cb33112115f06b02edc62fc9847e32 (diff)
downloadydb-f5290a799aa007d25596748b6bbabe2d2399dda1.tar.gz
resolves #1346 (#1347)
-rw-r--r--ydb/library/yql/tools/mrrun/README.md34
-rw-r--r--ydb/library/yql/tools/mrrun/mrrun.cpp832
-rw-r--r--ydb/library/yql/tools/mrrun/mrrun.h10
-rw-r--r--ydb/library/yql/tools/mrrun/ya.make84
-rw-r--r--ydb/library/yql/tools/ya.make1
5 files changed, 961 insertions, 0 deletions
diff --git a/ydb/library/yql/tools/mrrun/README.md b/ydb/library/yql/tools/mrrun/README.md
new file mode 100644
index 0000000000..2ebe690763
--- /dev/null
+++ b/ydb/library/yql/tools/mrrun/README.md
@@ -0,0 +1,34 @@
+# mrrun - Utility for MR query debugging
+
+`mrrun` is a utility designed for local debugging of MapReduce queries. It allows you to execute YQL programs which involve creation of YT transactions.
+
+## Command-Line Options
+
+- `-p <file>`: _(required)_ specify a file with an SQL query or an s-expression (`-` for /dev/stdin).
+- `-s`: if specified, SQL is used; if not specified, the query plan execution specified in s-expression is used.
+- `-f <alias>@<path>`: attach a local file to the query
+- `--mrjob-bin <path>`: specify the path of mrjob binary to upload (the mrrun itself is used by default)
+
+## Example of Local Usage
+
+```bash
+mrrun -s -p query.sql -f file.txt@./some-file.txt
+```
+
+In this example, `mrrun` will use SQL from the file `query.sql`, with file `./some-file.txt` attached to the request with alias `file.txt`.
+
+The simple example of `query.sql`:
+
+```sql
+USE `hahn`;
+
+SELECT Length(name) FROM `home/yql/tutorial/users`;
+SELECT FileContent('file.txt');
+```
+## mrjob binary
+
+Note that default mrjob binary (mrrun itself) has size of ~2GB, so it will take a long time to upload it to YT.
+It is recommended to compile mrjob separately and strip it (final size is about 200MB), then pass it to the dedicated command-line parameter.
+```bash``
+mrrun -s -p query.sql --mrjob-bin ./ydb/library/yql/tools/mrjob/mrjob
+```
diff --git a/ydb/library/yql/tools/mrrun/mrrun.cpp b/ydb/library/yql/tools/mrrun/mrrun.cpp
new file mode 100644
index 0000000000..fb282e77d7
--- /dev/null
+++ b/ydb/library/yql/tools/mrrun/mrrun.cpp
@@ -0,0 +1,832 @@
+#include "mrrun.h"
+
+#include <ydb/public/sdk/cpp/client/ydb_persqueue_public/codecs/codecs.h>
+
+#include <ydb/library/yql/providers/yt/lib/log/yt_logger.h>
+#include <ydb/library/yql/providers/yt/lib/yt_download/yt_download.h>
+#include <ydb/library/yql/providers/yt/lib/yt_url_lister/yt_url_lister.h>
+#include <ydb/library/yql/providers/yt/lib/config_clusters/config_clusters.h>
+#include <ydb/library/yql/providers/yt/gateway/native/yql_yt_native.h>
+#include <ydb/library/yql/providers/yt/provider/yql_yt_gateway.h>
+#include <ydb/library/yql/providers/yt/provider/yql_yt_provider.h>
+#include <ydb/library/yql/providers/yt/mkql_dq/yql_yt_dq_transform.h>
+#include <ydb/library/yql/providers/yt/comp_nodes/dq/dq_yt_factory.h>
+#include <ydb/library/yql/providers/yt/dq_task_preprocessor/yql_yt_dq_task_preprocessor.h>
+
+#include <ydb/library/yql/core/url_preprocessing/url_preprocessing.h>
+
+#include <ydb/core/util/pb.h>
+#include <ydb/public/sdk/cpp/client/ydb_driver/driver.h>
+
+#include <ydb/library/yql/parser/pg_wrapper/interface/comp_factory.h>
+#include <ydb/library/yql/providers/common/proto/gateways_config.pb.h>
+#include <ydb/library/yql/providers/common/provider/yql_provider_names.h>
+#include <ydb/library/yql/providers/common/udf_resolve/yql_simple_udf_resolver.h>
+#include <ydb/library/yql/providers/common/udf_resolve/yql_outproc_udf_resolver.h>
+#include <ydb/library/yql/providers/common/comp_nodes/yql_factory.h>
+#include <ydb/library/yql/providers/clickhouse/actors/yql_ch_source_factory.h>
+#include <ydb/library/yql/providers/clickhouse/provider/yql_clickhouse_provider.h>
+#include <ydb/library/yql/providers/ydb/actors/yql_ydb_source_factory.h>
+#include <ydb/library/yql/providers/ydb/provider/yql_ydb_provider.h>
+#include <ydb/library/yql/providers/ydb/comp_nodes/yql_ydb_dq_transform.h>
+#include <ydb/library/yql/providers/ydb/comp_nodes/yql_ydb_factory.h>
+#include <ydb/library/yql/providers/pg/provider/yql_pg_provider.h>
+#include <ydb/library/yql/dq/integration/transform/yql_dq_task_transform.h>
+#include <ydb/library/yql/providers/dq/provider/yql_dq_gateway.h>
+#include <ydb/library/yql/providers/dq/provider/yql_dq_provider.h>
+#include <ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.h>
+#include <ydb/library/yql/providers/dq/local_gateway/yql_dq_gateway_local.h>
+#include <ydb/library/yql/providers/pq/async_io/dq_pq_read_actor.h>
+#include <ydb/library/yql/providers/pq/async_io/dq_pq_write_actor.h>
+#include <ydb/library/yql/providers/s3/actors/yql_s3_sink_factory.h>
+#include <ydb/library/yql/providers/s3/actors/yql_s3_source_factory.h>
+#include <ydb/library/yql/dq/comp_nodes/yql_common_dq_factory.h>
+#include <ydb/library/yql/minikql/invoke_builtins/mkql_builtins.h>
+#include <ydb/library/yql/minikql/comp_nodes/mkql_factories.h>
+#include <ydb/library/yql/core/yql_library_compiler.h>
+#include <ydb/library/yql/core/peephole_opt/yql_opt_peephole_physical.h>
+#include <ydb/library/yql/core/facade/yql_facade.h>
+#include <ydb/library/yql/core/file_storage/proto/file_storage.pb.h>
+#include <ydb/library/yql/core/file_storage/http_download/http_download.h>
+#include <ydb/library/yql/core/file_storage/file_storage.h>
+#include <ydb/library/yql/core/services/mounts/yql_mounts.h>
+#include <ydb/library/yql/core/services/yql_out_transformers.h>
+#include <ydb/library/yql/core/url_lister/url_lister_manager.h>
+#include <ydb/library/yql/utils/log/log.h>
+#include <ydb/library/yql/utils/backtrace/backtrace.h>
+#include <ydb/library/yql/utils/failure_injector/failure_injector.h>
+
+#include <yt/cpp/mapreduce/client/init.h>
+#include <yt/cpp/mapreduce/interface/config.h>
+
+#include <library/cpp/malloc/api/malloc.h>
+#include <library/cpp/getopt/last_getopt.h>
+#include <library/cpp/resource/resource.h>
+#include <library/cpp/logger/priority.h>
+#include <library/cpp/digest/md5/md5.h>
+#include <library/cpp/yson/writer.h>
+#include <library/cpp/sighandler/async_signals_handler.h>
+
+#include <google/protobuf/text_format.h>
+
+#include <util/generic/scope.h>
+#include <util/system/user.h>
+#include <util/system/env.h>
+#include <util/stream/length.h>
+#include <util/stream/str.h>
+#include <util/stream/null.h>
+#include <util/string/join.h>
+#include <util/string/strip.h>
+
+
+using namespace NKikimr;
+using namespace NYql;
+
+namespace {
+
+struct TRunOptions {
+ bool Sql = false;
+ TString User;
+ NYson::EYsonFormat ResultsFormat;
+ bool OptimizeOnly = false;
+ bool PeepholeOnly = false;
+ bool PrintExpr = false;
+ bool TraceOpt = false;
+ IOutputStream* TracePlanStream = nullptr;
+ bool ShowPropgress = false;
+ ui16 SyntaxVersion;
+ bool UseMetaFromGrpah = false;
+ bool DiscoveryMode = false;
+ bool LineageMode = false;
+ bool FullStatistics = false;
+ IOutputStream* StatisticsStream = nullptr;
+ THashSet<TString> SqlFlags;
+};
+
+class TOptPipelineConfigurator : public IPipelineConfigurator {
+public:
+ TOptPipelineConfigurator(TProgramPtr prg, IOutputStream* planStream)
+ : Program(std::move(prg))
+ , PlanStream(planStream)
+ {
+ }
+
+ void AfterCreate(TTransformationPipeline* pipeline) const final {
+ Y_UNUSED(pipeline);
+ }
+
+ void AfterTypeAnnotation(TTransformationPipeline* pipeline) const final {
+ pipeline->Add(TExprLogTransformer::Sync("OptimizedExpr", NYql::NLog::EComponent::Core, NYql::NLog::ELevel::TRACE),
+ "OptTrace", TIssuesIds::CORE, "OptTrace");
+ }
+
+ void AfterOptimize(TTransformationPipeline* pipeline) const final {
+ if (PlanStream) {
+ pipeline->Add(TPlanOutputTransformer::Sync(PlanStream, Program->GetPlanBuilder(), Program->GetOutputFormat()), "PlanOutput");
+ }
+ }
+private:
+ TProgramPtr Program;
+ IOutputStream* PlanStream;
+};
+
+class TPeepHolePipelineConfigurator : public IPipelineConfigurator {
+public:
+ TPeepHolePipelineConfigurator()
+ : Inner({}, nullptr)
+ {
+ }
+
+ void AfterCreate(TTransformationPipeline* pipeline) const final {
+ Y_UNUSED(pipeline);
+ }
+
+ void AfterTypeAnnotation(TTransformationPipeline* pipeline) const final {
+ pipeline->Add(TExprLogTransformer::Sync("OptimizedExpr", NYql::NLog::EComponent::Core, NYql::NLog::ELevel::TRACE),
+ "OptTrace", TIssuesIds::CORE, "OptTrace");
+ }
+
+ void AfterOptimize(TTransformationPipeline* pipeline) const final {
+ pipeline->Add(MakePeepholeOptimization(pipeline->GetTypeAnnotationContext(), &Inner), "PeepHole");
+ }
+
+ TOptPipelineConfigurator Inner;
+};
+
+void PatchGatewaysConfig(TGatewaysConfig* config, const TString& mrJobBin, const TString& mrJobUdfsDir,
+ size_t numThreads, bool keepTemp)
+{
+ auto ytConfig = config->MutableYt();
+ ytConfig->SetGatewayThreads(numThreads);
+ if (mrJobBin.empty()) {
+ ytConfig->ClearMrJobBin();
+ } else {
+ ytConfig->SetMrJobBin(mrJobBin);
+ ytConfig->SetMrJobBinMd5(MD5::File(mrJobBin));
+ }
+
+ if (mrJobUdfsDir.empty()) {
+ ytConfig->ClearMrJobUdfsDir();
+ } else {
+ ytConfig->SetMrJobUdfsDir(mrJobUdfsDir);
+ }
+ auto attr = ytConfig->MutableDefaultSettings()->Add();
+ attr->SetName("KeepTempTables");
+ attr->SetValue(keepTemp ? "yes" : "no");
+}
+
+void ReadGatewaysConfig(const TString& configFile, TGatewaysConfig* config)
+{
+ Y_ENSURE(!configFile.empty());
+ TString configData = TFileInput(configFile).ReadAll();
+
+ using ::google::protobuf::TextFormat;
+ if (!TextFormat::ParseFromString(configData, config)) {
+ ythrow yexception() << "Bad format of gateways configuration";
+ }
+}
+
+TFileStoragePtr CreateFS(const TString& paramsFile, const TString& defYtServer)
+{
+ TFileStorageConfig params;
+ Y_ENSURE(paramsFile);
+ LoadFsConfigFromFile(paramsFile, params);
+
+ return WithAsync(CreateFileStorage(params, {MakeYtDownloader(params, defYtServer)}));
+}
+
+void FillUsedFiles(
+ const TVector<TString>& filesMappingList,
+ TUserDataTable& filesMapping)
+{
+ for (auto& s : filesMappingList) {
+ TStringBuf fileName, filePath;
+ TStringBuf(s).Split('@', fileName, filePath);
+ if (fileName.empty() || filePath.empty()) {
+ ythrow yexception() << "Incorrect file mapping, expected form "
+ "name@path, e.g. MyFile@file.txt";
+ }
+
+ auto& entry = filesMapping[TUserDataKey::File(GetDefaultFilePrefix() + fileName)];
+ entry.Type = EUserDataType::PATH;
+ entry.Data = filePath;
+ }
+}
+
+bool FillUsedUrls(
+ const TVector<TString>& urlMappingList,
+ TUserDataTable& filesMapping)
+{
+ for (auto& s : urlMappingList) {
+ TStringBuf name, url;
+ TStringBuf(s).Split('@', name, url);
+ if (name.empty() || url.empty()) {
+ Cerr << "Incorrect url mapping, expected form name@url, "
+ "e.g. MyUrl@http://example.com/file" << Endl;
+ return false;
+ }
+
+ auto& entry = filesMapping[TUserDataKey::File(GetDefaultFilePrefix() + name)];
+ entry.Type = EUserDataType::URL;
+ entry.Data = url;
+ }
+ return true;
+}
+
+NDq::IDqAsyncIoFactory::TPtr CreateAsyncIoFactory(const NYdb::TDriver& driver, IHTTPGateway::TPtr httpGateway) {
+ auto factory = MakeIntrusive<NYql::NDq::TDqAsyncIoFactory>();
+ RegisterDqPqReadActorFactory(*factory, driver, nullptr);
+ RegisterYdbReadActorFactory(*factory, driver, nullptr);
+ RegisterS3ReadActorFactory(*factory, nullptr, httpGateway);
+ RegisterS3WriteActorFactory(*factory, nullptr, httpGateway);
+ RegisterClickHouseReadActorFactory(*factory, nullptr, httpGateway);
+ RegisterDqPqWriteActorFactory(*factory, driver, nullptr);
+ return factory;
+}
+
+int RunProgram(TProgramPtr program, const TRunOptions& options, const THashMap<TString, TString>& clusters)
+{
+ if (options.ShowPropgress) {
+ program->SetProgressWriter([](const TOperationProgress& progress) {
+ TStringBuilder remoteId;
+ if (progress.RemoteId) {
+ remoteId << ", remoteId: " << progress.RemoteId;
+ }
+ TStringBuilder counters;
+ if (progress.Counters) {
+ if (progress.Counters->Running) {
+ counters << ' ' << progress.Counters->Running;
+ }
+ if (progress.Counters->Total) {
+ counters << TStringBuf(" (") << (100ul * progress.Counters->Completed / progress.Counters->Total) << TStringBuf("%)");
+ }
+ }
+ Cerr << "Operation: [" << progress.Category << "] " << progress.Id
+ << ", state: " << progress.State << remoteId << counters
+ << ", current stage: " << progress.Stage.first << Endl;
+ });
+ }
+ program->SetUseTableMetaFromGraph(options.UseMetaFromGrpah);
+
+ bool fail = true;
+ if (options.Sql) {
+ Cerr << "Parse SQL..." << Endl;
+ NSQLTranslation::TTranslationSettings sqlSettings;
+ sqlSettings.ClusterMapping = clusters;
+ sqlSettings.SyntaxVersion = options.SyntaxVersion;
+ sqlSettings.V0Behavior = NSQLTranslation::EV0Behavior::Report;
+ if (options.DiscoveryMode) {
+ sqlSettings.Mode = NSQLTranslation::ESqlMode::DISCOVERY;
+ }
+ sqlSettings.Flags = options.SqlFlags;
+ fail = !program->ParseSql(sqlSettings);
+ } else {
+ Cerr << "Parse YQL..." << Endl;
+ fail = !program->ParseYql();
+ }
+ program->PrintErrorsTo(Cerr);
+ if (options.TraceOpt) {
+ if (auto ast = program->GetQueryAst()) {
+ Cerr << *ast << Endl;
+ }
+ }
+ if (fail) {
+ return 1;
+ }
+
+ Cerr << "Compile program..." << Endl;
+ fail = !program->Compile(options.User);
+ program->PrintErrorsTo(Cerr);
+ if (options.TraceOpt) {
+ program->Print(&Cerr, nullptr);
+ }
+ if (fail) {
+ return 1;
+ }
+
+ auto sigHandler = [program](int) {
+ Cerr << "Aborting..." << Endl;
+ try {
+ program->Abort().GetValueSync();
+ } catch (...) {
+ Cerr << CurrentExceptionMessage();
+ }
+ };
+ SetAsyncSignalFunction(SIGINT, sigHandler);
+ SetAsyncSignalFunction(SIGTERM, sigHandler);
+
+ TProgram::TStatus status = TProgram::TStatus::Error;
+ if (options.PeepholeOnly) {
+ Cerr << "Peephole..." << Endl;
+ auto config = TPeepHolePipelineConfigurator();
+ status = program->OptimizeWithConfig(options.User, config);
+ } else if (options.DiscoveryMode) {
+ Cerr << "Discover program..." << Endl;
+ status = program->Discover(options.User);
+ } else if (options.LineageMode) {
+ Cerr << "Calculating lineage in program..." << Endl;
+ auto config = TOptPipelineConfigurator(program, options.TracePlanStream);
+ status = program->LineageWithConfig(options.User, config);
+ } else if (options.OptimizeOnly) {
+ Cerr << "Optimize program..." << Endl;
+ auto config = TOptPipelineConfigurator(program, options.TracePlanStream);
+ status = program->OptimizeWithConfig(options.User, config);
+ } else {
+ Cerr << "Run program..." << Endl;
+ auto config = TOptPipelineConfigurator(program, options.TracePlanStream);
+ status = program->RunWithConfig(options.User, config);
+ }
+
+ auto dummySigHandler = [](int) { };
+ SetAsyncSignalFunction(SIGINT, dummySigHandler);
+ SetAsyncSignalFunction(SIGTERM, dummySigHandler);
+
+ program->PrintErrorsTo(Cerr);
+ if (status == TProgram::TStatus::Error) {
+ if (options.TraceOpt) {
+ program->Print(&Cerr, nullptr);
+ }
+ return 1;
+ }
+ if (options.PrintExpr) {
+ program->Print(&Cout, nullptr);
+ }
+
+ Cerr << "Getting results..." << Endl;
+ if (options.DiscoveryMode) {
+ if (auto data = program->GetDiscoveredData()) {
+ TStringInput in(*data);
+ NYson::ReformatYsonStream(&in, &Cout, options.ResultsFormat);
+ }
+ } else if (options.LineageMode) {
+ if (auto data = program->GetLineage()) {
+ TStringInput in(*data);
+ NYson::ReformatYsonStream(&in, &Cout, options.ResultsFormat);
+ }
+ } else if (program->HasResults()) {
+ NYson::TYsonWriter yson(&Cout, options.ResultsFormat);
+ yson.OnBeginList();
+ for (const auto& result: program->Results()) {
+ yson.OnListItem();
+ yson.OnRaw(result);
+ }
+ yson.OnEndList();
+ }
+
+ if (options.StatisticsStream) {
+ if (auto st = program->GetStatistics(!options.FullStatistics)) {
+ TStringInput in(*st);
+ NYson::ReformatYsonStream(&in, options.StatisticsStream, NYson::EYsonFormat::Pretty);
+ }
+ }
+
+ Cerr << Endl << "Done" << Endl;
+ return 0;
+}
+
+int RunMain(int argc, const char* argv[])
+{
+ TString gatewaysCfgFile;
+ TString progFile;
+ TString user = GetUsername();
+ TString format;
+ TVector<TString> filesMappingList;
+ TVector<TString> urlMappingList;
+ TString fileStorageCfg;
+ TVector<TString> udfsPaths;
+ TString udfsDir;
+ THashSet<TString> gatewayTypes;
+ THashSet<TString> defGatewayTypes = {
+ TString(YtProviderName),
+ TString(YdbProviderName),
+ TString(DqProviderName),
+ };
+ TString mountConfig;
+ TString udfResolver;
+ bool udfResolverFilterSyscalls = false;
+ TString mrJobBin;
+ TString mrJobUdfsDir;
+ TString statFile;
+ int verbosity = 3;
+ bool showLog = false;
+ bool tracePlan;
+ size_t numThreads = 1;
+ TString planFile;
+ TString paramsFile;
+ bool keepTemp = false;
+ TString token = GetEnv("YQL_TOKEN");
+ if (!token) {
+ TString home = GetEnv("HOME");
+ auto tokenPath = TFsPath(home) / ".yql" / "token";
+ if (tokenPath.Exists()) {
+ token = StripStringRight(TFileInput(tokenPath).ReadAll());
+ }
+ }
+ THashMap<TString, TString> customTokens;
+ THashMap<TString, std::pair<ui32, ui32>> failureInjections;
+
+ TRunOptions runOptions;
+
+ NLastGetopt::TOpts opts = NLastGetopt::TOpts::Default();
+ opts.AddLongOption('p', "program", "Program to execute (use '-' to read "
+ "from stdin)")
+ .Required()
+ .RequiredArgument("FILE")
+ .StoreResult(&progFile);
+ opts.AddLongOption('s', "sql", "Program is SQL query")
+ .Optional()
+ .NoArgument()
+ .SetFlag(&runOptions.Sql);
+ opts.AddLongOption('u', "user", "MR user")
+ .Optional()
+ .RequiredArgument("USER")
+ .StoreResult(&user);
+ opts.AddLongOption("format", "results format, one of "
+ "{ binary | text | pretty }")
+ .Optional()
+ .RequiredArgument("STR")
+ .DefaultValue("text")
+ .StoreResult(&format);
+ opts.AddLongOption('f', "file", "name@path").AppendTo(&filesMappingList);
+ opts.AddLongOption("url", "name@url").AppendTo(&urlMappingList);
+ opts.AddLongOption("gateways-cfg", "gateways configuration file")
+ .Optional()
+ .DefaultValue("gateways.conf")
+ .RequiredArgument("FILE")
+ .StoreResult(&gatewaysCfgFile);
+ opts.AddLongOption("fs-cfg", "Path to file storage config")
+ .Optional()
+ .DefaultValue("fs.conf")
+ .StoreResult(&fileStorageCfg);
+ opts.AddLongOption("udf", "Load shared library with UDF by given path")
+ .AppendTo(&udfsPaths);
+ opts.AddLongOption("udfs-dir", "Load all shared libraries with UDFs found"
+ " in given directory")
+ .StoreResult(&udfsDir);
+ opts.AddLongOption('O',"optimize", "optimize expression")
+ .Optional()
+ .NoArgument()
+ .SetFlag(&runOptions.OptimizeOnly);
+ opts.AddLongOption('D', "discover", "discover tables in the program")
+ .Optional()
+ .NoArgument()
+ .SetFlag(&runOptions.DiscoveryMode);
+ opts.AddLongOption("lineage", "calclulate data lineage in the program")
+ .Optional()
+ .NoArgument()
+ .SetFlag(&runOptions.LineageMode);
+ opts.AddLongOption("peephole", "make peephole optimization")
+ .Optional()
+ .NoArgument()
+ .SetFlag(&runOptions.PeepholeOnly);
+ opts.AddLongOption("print-expr", "print rebuild AST before execution")
+ .Optional()
+ .NoArgument()
+ .SetFlag(&runOptions.PrintExpr);
+ opts.AddLongOption('P',"trace-plan", "print plan before execution")
+ .Optional()
+ .NoArgument()
+ .SetFlag(&tracePlan);
+ opts.AddLongOption("plan-file", "file path for plan output")
+ .Optional()
+ .StoreResult(&planFile);
+ opts.AddLongOption("params-file", "Query parameters values in YSON format").StoreResult(&paramsFile);
+ opts.AddLongOption("trace-opt", "print AST in the begin of each transformation")
+ .Optional()
+ .NoArgument()
+ .SetFlag(&runOptions.TraceOpt);
+ opts.AddLongOption('L', "show-log", "show transformation log")
+ .Optional()
+ .NoArgument()
+ .SetFlag(&showLog);
+ opts.AddLongOption('G', "gateways", "used gateways")
+ .SplitHandler(&gatewayTypes, ',')
+ .DefaultValue(JoinSeq(",", defGatewayTypes));
+ opts.AddLongOption('m', "mounts", "Mount points config file.")
+ .StoreResult(&mountConfig);
+ opts.AddLongOption("udf-resolver", "Path to udf-resolver")
+ .Optional()
+ .RequiredArgument("PATH")
+ .StoreResult(&udfResolver);
+ opts.AddLongOption("udf-resolver-filter-syscalls", "Filter syscalls in udf resolver")
+ .Optional()
+ .NoArgument()
+ .SetFlag(&udfResolverFilterSyscalls);
+ opts.AddLongOption("mrjob-bin", "Path to mrjob binary")
+ .Optional()
+ .StoreResult(&mrJobBin);
+ opts.AddLongOption("mrjob-udfsdir", "Path to udfs for mr jobs")
+ .Optional()
+ .StoreResult(&mrJobUdfsDir);
+ opts.AddLongOption('v', "verbosity", "Log verbosity level")
+ .Optional()
+ .RequiredArgument("LEVEL")
+ .DefaultValue("6")
+ .StoreResult(&verbosity);
+ opts.AddLongOption("show-progress", "report operation progress")
+ .NoArgument()
+ .SetFlag(&runOptions.ShowPropgress);;
+ opts.AddLongOption("threads", "gateway threads")
+ .Optional()
+ .RequiredArgument("COUNT")
+ .StoreResult(&numThreads);
+ opts.AddLongOption("keep-temp", "keep temporary tables")
+ .Optional()
+ .NoArgument()
+ .SetFlag(&keepTemp);
+ opts.AddLongOption("syntax-version", "SQL syntax version").StoreResult(&runOptions.SyntaxVersion).DefaultValue(1);
+ opts.AddLongOption("token", "YQL token")
+ .Optional()
+ .RequiredArgument("VALUE")
+ .StoreResult(&token);
+ opts.AddLongOption("custom-tokens", "Custom tokens")
+ .Optional()
+ .RequiredArgument("NAME=VALUE or NAME=@PATH")
+ .KVHandler([&customTokens](TString key, TString value) {
+ if (value.StartsWith('@')) {
+ customTokens[key] = StripStringRight(TFileInput(value.substr(1)).ReadAll());
+ } else {
+ customTokens[key] = value;
+ }
+ });
+ opts.AddLongOption("use-graph-meta", "Use tables metadata from graph")
+ .Optional()
+ .NoArgument()
+ .SetFlag(&runOptions.UseMetaFromGrpah);
+ opts.AddLongOption("stat", "Print execution statistics")
+ .Optional()
+ .OptionalArgument("FILE")
+ .StoreResult(&statFile);
+ opts.AddLongOption("full-stat", "Output full execution statistics")
+ .Optional()
+ .NoArgument()
+ .SetFlag(&runOptions.FullStatistics);
+ opts.AddLongOption("failure-inject", "Activate failure injection")
+ .Optional()
+ .RequiredArgument("INJECTION_NAME=FAIL_COUNT or INJECTION_NAME=SKIP_COUNT/FAIL_COUNT")
+ .KVHandler([&failureInjections](TString key, TString value) {
+ TStringBuf fail = value;
+ TStringBuf skip;
+ if (TStringBuf(value).TrySplit('/', skip, fail)) {
+ failureInjections[key] = std::make_pair(FromString<ui32>(skip), FromString<ui32>(fail));
+ } else {
+ failureInjections[key] = std::make_pair(ui32(0), FromString<ui32>(fail));
+ }
+ });
+
+ opts.SetFreeArgsNum(0);
+
+ NLastGetopt::TOptsParseResult res(&opts, argc, argv);
+ Y_UNUSED(res);
+
+ // Reinit logger with new level
+ if (verbosity != LOG_DEF_PRIORITY) {
+ NYql::NLog::ELevel level = NYql::NLog::ELevelHelpers::FromInt(verbosity);
+ NYql::NLog::EComponentHelpers::ForEach([level](NYql::NLog::EComponent c) {
+ NYql::NLog::YqlLogger().SetComponentLevel(c, level);
+ });
+ }
+
+ if (runOptions.TraceOpt) {
+ NYql::NLog::YqlLogger().SetComponentLevel(NYql::NLog::EComponent::Core, NYql::NLog::ELevel::TRACE);
+ NYql::NLog::YqlLogger().SetComponentLevel(NYql::NLog::EComponent::CoreEval, NYql::NLog::ELevel::TRACE);
+ NYql::NLog::YqlLogger().SetComponentLevel(NYql::NLog::EComponent::CorePeepHole, NYql::NLog::ELevel::TRACE);
+ } else if (showLog) {
+ NYql::NLog::YqlLogger().SetComponentLevel(NYql::NLog::EComponent::Core, NYql::NLog::ELevel::DEBUG);
+ }
+
+ runOptions.ResultsFormat =
+ (format == TStringBuf("binary")) ? NYson::EYsonFormat::Binary
+ : (format == TStringBuf("text")) ? NYson::EYsonFormat::Text
+ : NYson::EYsonFormat::Pretty;
+
+ runOptions.User = user;
+
+ for (auto& gateway: gatewayTypes) {
+ if (!defGatewayTypes.contains(gateway)) {
+ Cerr << "Unsupported gateway " << gateway.Quote() << Endl;
+ return 1;
+ }
+ }
+
+ if (!failureInjections.empty()) {
+ TFailureInjector::Activate();
+ for (auto& [name, count]: failureInjections) {
+ TFailureInjector::Set(name, count.first, count.second);
+ }
+ }
+
+ TUserDataTable dataTable;
+ FillUsedFiles(filesMappingList, dataTable);
+ FillUsedUrls(urlMappingList, dataTable);
+
+ NMiniKQL::FindUdfsInDir(udfsDir, &udfsPaths);
+ auto funcRegistry = NMiniKQL::CreateFunctionRegistry(&NYql::NBacktrace::KikimrBackTrace, NMiniKQL::CreateBuiltinRegistry(), false, udfsPaths);
+
+ TGatewaysConfig gatewaysConfig;
+ ReadGatewaysConfig(gatewaysCfgFile, &gatewaysConfig);
+ PatchGatewaysConfig(&gatewaysConfig, mrJobBin, mrJobUdfsDir, numThreads, keepTemp);
+
+ TString defYtServer = gatewaysConfig.HasYt() ? NYql::TConfigClusters::GetDefaultYtServer(gatewaysConfig.GetYt()) : TString();
+ auto storage = CreateFS(fileStorageCfg, defYtServer);
+
+ auto httpGateway = IHTTPGateway::Make();
+ TVector<TIntrusivePtr<TThrRefBase>> gateways;
+ THashMap<TString, TString> clusters;
+ TVector<TDataProviderInitializer> dataProvidersInit;
+ clusters["pg_catalog"] = PgProviderName;
+ dataProvidersInit.push_back(GetPgDataProviderInitializer());
+
+ if (gatewayTypes.contains(YtProviderName) && gatewaysConfig.HasYt()) {
+ TYtNativeServices services;
+ services.FunctionRegistry = funcRegistry.Get();
+ services.FileStorage = storage;
+ services.Config = std::make_shared<TYtGatewayConfig>(gatewaysConfig.GetYt());
+ auto ytNativeGateway = CreateYtNativeGateway(services);
+ gateways.emplace_back(ytNativeGateway);
+ FillClusterMapping(clusters, gatewaysConfig.GetYt(), TString{YtProviderName});
+ dataProvidersInit.push_back(GetYtNativeDataProviderInitializer(ytNativeGateway));
+ }
+
+ if (gatewayTypes.contains(ClickHouseProviderName) && gatewaysConfig.HasClickHouse()) {
+ FillClusterMapping(clusters, gatewaysConfig.GetClickHouse(), TString{ClickHouseProviderName});
+ dataProvidersInit.push_back(GetClickHouseDataProviderInitializer(httpGateway));
+ }
+
+ const auto driverConfig = NYdb::TDriverConfig().SetLog(CreateLogBackend("cerr"));
+ NYdb::TDriver driver(driverConfig);
+
+ Y_DEFER {
+ driver.Stop(true);
+ };
+
+ if (gatewayTypes.contains(YdbProviderName) && gatewaysConfig.HasYdb()) {
+ FillClusterMapping(clusters, gatewaysConfig.GetYdb(), TString{YdbProviderName});
+ dataProvidersInit.push_back(GetYdbDataProviderInitializer(driver));
+ }
+
+#ifndef _win_
+ if (gatewayTypes.contains(DqProviderName)) {
+ auto dqTaskTransformFactory = CreateCompositeTaskTransformFactory({
+ CreateYtDqTaskTransformFactory(),
+ CreateYdbDqTaskTransformFactory()
+ });
+
+ auto dqCompFactory = NMiniKQL::GetCompositeWithBuiltinFactory({
+ GetCommonDqFactory(),
+ GetDqYtFactory(),
+ GetDqYdbFactory(driver),
+ NMiniKQL::GetYqlFactory(),
+ GetPgFactory()
+ });
+
+ TDqTaskPreprocessorFactoryCollection dqTaskPreprocessorFactories = {
+ NDq::CreateYtDqTaskPreprocessorFactory(false, funcRegistry)
+ };
+
+ const bool enableSpilling = true;
+ auto dqGateway = CreateLocalDqGateway(funcRegistry.Get(), dqCompFactory, dqTaskTransformFactory, dqTaskPreprocessorFactories,
+ enableSpilling, CreateAsyncIoFactory(driver, httpGateway));
+ gateways.emplace_back(dqGateway);
+ dataProvidersInit.push_back(GetDqDataProviderInitializer(&CreateDqExecTransformer, dqGateway, dqCompFactory, {}, storage));
+ }
+#endif
+
+ if (gatewaysConfig.HasSqlCore()) {
+ runOptions.SqlFlags.insert(gatewaysConfig.GetSqlCore().GetTranslationFlags().begin(), gatewaysConfig.GetSqlCore().GetTranslationFlags().end());
+ }
+
+ TExprContext ctx;
+ IModuleResolver::TPtr moduleResolver;
+ if (!mountConfig.empty()) {
+ TModulesTable modules;
+ NYqlMountConfig::TMountConfig mount;
+ Y_ABORT_UNLESS(NKikimr::ParsePBFromFile(mountConfig, &mount));
+ FillUserDataTableFromFileSystem(mount, dataTable);
+
+ if (!CompileLibraries(dataTable, ctx, modules)) {
+ Cerr << "Errors on compile libraries:" << Endl;
+ ctx.IssueManager.GetIssues().PrintTo(Cerr);
+ return 1;
+ }
+
+ moduleResolver = std::make_shared<TModuleResolver>(std::move(modules), ctx.NextUniqueId, clusters, THashSet<TString>());
+ } else {
+ if (!GetYqlDefaultModuleResolver(ctx, moduleResolver, clusters)) {
+ Cerr << "Errors loading default YQL libraries:" << Endl;
+ ctx.IssueManager.GetIssues().PrintTo(Cerr);
+ return 1;
+ }
+ }
+
+ TExprContext::TFreezeGuard freezeGuard(ctx);
+
+ TProgramFactory progFactory(false, funcRegistry.Get(), ctx.NextUniqueId, dataProvidersInit, "mrrun");
+ progFactory.AddUserDataTable(std::move(dataTable));
+ progFactory.SetModules(moduleResolver);
+ if (udfResolver) {
+ progFactory.SetUdfResolver(NCommon::CreateOutProcUdfResolver(funcRegistry.Get(), storage,
+ udfResolver, {}, {}, udfResolverFilterSyscalls, {}));
+ } else {
+ progFactory.SetUdfResolver(NCommon::CreateSimpleUdfResolver(funcRegistry.Get(), storage));
+ }
+ progFactory.SetFileStorage(storage);
+ progFactory.SetUrlPreprocessing(new TUrlPreprocessing(gatewaysConfig));
+ progFactory.SetGatewaysConfig(&gatewaysConfig);
+ TCredentials::TPtr creds = MakeIntrusive<TCredentials>();
+ if (token) {
+ creds->AddCredential("default_yt", TCredential("yt", "", token));
+ creds->AddCredential("default_ydb", TCredential("ydb", "", token));
+ creds->AddCredential("default_pq", TCredential("pq", "", token));
+ creds->AddCredential("default_solomon", TCredential("solomon", "", token));
+ }
+ if (!customTokens.empty()) {
+ for (auto& [key, value]: customTokens) {
+ creds->AddCredential(key, TCredential("custom", "", value));
+ }
+ }
+ progFactory.SetCredentials(creds);
+
+ progFactory.SetUrlListerManager(
+ MakeUrlListerManager(
+ {MakeYtUrlLister()}
+ )
+ );
+
+ TProgramPtr program;
+ if (progFile == TStringBuf("-")) {
+ program = progFactory.Create("-stdin-", Cin.ReadAll());
+ } else {
+ program = progFactory.Create(TFile(progFile, RdOnly));
+ program->SetQueryName(progFile);
+ }
+
+ if (paramsFile) {
+ TString parameters = TFileInput(paramsFile).ReadAll();
+ program->SetParametersYson(parameters);
+ }
+
+ program->EnableResultPosition();
+ THolder<IOutputStream> planStreamHolder;
+ if (tracePlan) {
+ if (!planFile.empty()) {
+ planStreamHolder = MakeHolder<TFileOutput>(planFile);
+ runOptions.TracePlanStream = planStreamHolder.Get();
+ } else {
+ runOptions.TracePlanStream = &Cerr;
+ }
+ } else if (runOptions.ShowPropgress) {
+ runOptions.TracePlanStream = &Cnull;
+ }
+
+ THolder<IOutputStream> statStreamHolder;
+ if (res.Has("stat")) {
+ if (statFile) {
+ statStreamHolder = MakeHolder<TFileOutput>(statFile);
+ runOptions.StatisticsStream = statStreamHolder.Get();
+ } else {
+ runOptions.StatisticsStream = &Cerr;
+ }
+ }
+
+ return RunProgram(std::move(program), runOptions, clusters);
+}
+
+} // namespace
+
+void FlushYtDebugLogOnSignal() {
+ if (!NMalloc::IsAllocatorCorrupted) {
+ NYql::FlushYtDebugLog();
+ }
+}
+
+int main(int argc, const char *argv[])
+{
+ Y_UNUSED(NUdf::GetStaticSymbols());
+ NYql::NBacktrace::AddAfterFatalCallback([](int){ FlushYtDebugLogOnSignal(); });
+ NYql::NBacktrace::RegisterKikimrFatalActions();
+ NYql::NBacktrace::EnableKikimrSymbolize();
+
+ // Init MR/YT for proper work of embedded agent
+ NYT::Initialize(argc, argv);
+
+ NYql::NLog::YqlLoggerScope logger(&Cerr);
+ NYql::SetYtLoggerGlobalBackend(LOG_DEF_PRIORITY);
+
+ YQL_LOG(INFO) << "mrrun ABI version: " << NUdf::CurrentAbiVersionStr();
+
+ if (NYT::TConfig::Get()->Prefix.empty()) {
+ NYT::TConfig::Get()->Prefix = "//";
+ }
+
+ try {
+ int res = RunMain(argc, argv);
+ if (0 == res) {
+ NYql::DropYtDebugLog();
+ }
+ return res;
+ } catch (...) {
+ Cerr << CurrentExceptionMessage() << Endl;
+ return 1;
+ }
+}
diff --git a/ydb/library/yql/tools/mrrun/mrrun.h b/ydb/library/yql/tools/mrrun/mrrun.h
new file mode 100644
index 0000000000..94348d5cb4
--- /dev/null
+++ b/ydb/library/yql/tools/mrrun/mrrun.h
@@ -0,0 +1,10 @@
+#pragma once
+#include <util/generic/string.h>
+#include <util/generic/hash.h>
+
+template <class TPbConfig>
+void FillClusterMapping(THashMap<TString, TString>& clusters, const TPbConfig& config, const TString& provider) {
+ for (auto& cluster: config.GetClusterMapping()) {
+ clusters.emplace(to_lower(cluster.GetName()), provider);
+ }
+}
diff --git a/ydb/library/yql/tools/mrrun/ya.make b/ydb/library/yql/tools/mrrun/ya.make
new file mode 100644
index 0000000000..d37afeb648
--- /dev/null
+++ b/ydb/library/yql/tools/mrrun/ya.make
@@ -0,0 +1,84 @@
+PROGRAM()
+
+ALLOCATOR(J)
+
+SRCS(
+ mrrun.h
+ mrrun.cpp
+)
+
+IF (OS_LINUX)
+ # prevent external python extensions to lookup protobuf symbols (and maybe
+ # other common stuff) in main binary
+ EXPORTS_SCRIPT(${ARCADIA_ROOT}/ydb/library/yql/tools/exports.symlist)
+ENDIF()
+
+PEERDIR(
+ contrib/libs/protobuf
+ ydb/public/sdk/cpp/client/ydb_persqueue_public/codecs
+ library/cpp/digest/md5
+ library/cpp/getopt
+ library/cpp/logger
+ library/cpp/resource
+ library/cpp/yson
+ library/cpp/yson/node
+ library/cpp/sighandler
+ yt/cpp/mapreduce/client
+ yt/cpp/mapreduce/common
+ ydb/core/util
+ ydb/library/yql/sql/pg
+ ydb/library/yql/core
+ ydb/library/yql/core/facade
+ ydb/library/yql/core/file_storage/proto
+ ydb/library/yql/core/file_storage/http_download
+ ydb/library/yql/core/file_storage
+ ydb/library/yql/core/services/mounts
+ ydb/library/yql/core/url_lister
+ ydb/library/yql/dq/comp_nodes
+ ydb/library/yql/dq/integration/transform
+ ydb/library/yql/minikql/comp_nodes/llvm14
+ ydb/library/yql/minikql/invoke_builtins/llvm14
+ ydb/library/yql/protos
+ ydb/library/yql/providers/clickhouse/actors
+ ydb/library/yql/providers/clickhouse/provider
+ ydb/library/yql/providers/common/comp_nodes
+ ydb/library/yql/providers/common/http_gateway
+ ydb/library/yql/providers/common/proto
+ ydb/library/yql/providers/common/udf_resolve
+ ydb/library/yql/providers/dq/local_gateway
+ ydb/library/yql/providers/dq/provider
+ ydb/library/yql/providers/dq/provider/exec
+ ydb/library/yql/providers/pq/async_io
+ ydb/library/yql/providers/s3/actors
+ ydb/library/yql/providers/ydb/actors
+ ydb/library/yql/providers/ydb/comp_nodes
+ ydb/library/yql/providers/ydb/provider
+ ydb/library/yql/providers/pg/provider
+ ydb/library/yql/public/udf/service/terminate_policy
+ ydb/library/yql/utils/log
+ ydb/library/yql/utils/backtrace
+ ydb/library/yql/utils/failure_injector
+ ydb/public/sdk/cpp/client/ydb_driver
+ ydb/library/yql/core/url_preprocessing
+ ydb/library/yql/providers/yt/comp_nodes/dq
+ ydb/library/yql/providers/yt/comp_nodes/llvm14
+ ydb/library/yql/providers/yt/codec/codegen
+ ydb/library/yql/providers/yt/dq_task_preprocessor
+ ydb/library/yql/providers/yt/gateway/native
+ ydb/library/yql/providers/yt/lib/log
+ ydb/library/yql/providers/yt/mkql_dq
+ ydb/library/yql/providers/yt/lib/yt_download
+ ydb/library/yql/providers/yt/lib/yt_url_lister
+ ydb/library/yql/providers/yt/lib/config_clusters
+ ydb/library/yql/parser/pg_wrapper
+)
+
+IF (NOT OS_WINDOWS)
+ PEERDIR(
+ ydb/library/yql/providers/dq/global_worker_manager
+ )
+ENDIF()
+
+YQL_LAST_ABI_VERSION()
+
+END()
diff --git a/ydb/library/yql/tools/ya.make b/ydb/library/yql/tools/ya.make
index 88b6d225db..147e26ae74 100644
--- a/ydb/library/yql/tools/ya.make
+++ b/ydb/library/yql/tools/ya.make
@@ -3,6 +3,7 @@ RECURSE(
dqrun
dq
mrjob
+ mrrun
pgrun
pg-make-test
sql2yql