diff options
author | udovichenko-r <udovichenko-r@yandex-team.com> | 2024-12-13 14:38:21 +0300 |
---|---|---|
committer | udovichenko-r <udovichenko-r@yandex-team.com> | 2024-12-13 15:57:44 +0300 |
commit | 1402a032649d20ffb80a541f706afcd31cd897fa (patch) | |
tree | 2c7d158a5ab885a6301dd0da832b16fd5f39c711 /yql | |
parent | 933c5664c69e565f3ad5752f5862e0ba860a52fe (diff) | |
download | ydb-1402a032649d20ffb80a541f706afcd31cd897fa.tar.gz |
Facade runner lib
commit_hash:06b99da07b603d38b18615794ce645139fcec9d0
Diffstat (limited to 'yql')
-rw-r--r-- | yql/essentials/core/facade/yql_facade.cpp | 4 | ||||
-rw-r--r-- | yql/essentials/tests/common/test_framework/yqlrun.py | 2 | ||||
-rw-r--r-- | yql/essentials/tools/arrow_kernels_dump/arrow_kernels_dump.cpp | 24 | ||||
-rw-r--r-- | yql/essentials/tools/arrow_kernels_dump/ya.make | 23 | ||||
-rw-r--r-- | yql/essentials/tools/ya.make | 4 | ||||
-rw-r--r-- | yql/essentials/tools/yql_facade_run/ya.make | 50 | ||||
-rw-r--r-- | yql/essentials/tools/yql_facade_run/yql_facade_run.cpp | 779 | ||||
-rw-r--r-- | yql/essentials/tools/yql_facade_run/yql_facade_run.h | 213 | ||||
-rw-r--r-- | yql/tools/yqlrun/gateway_spec.cpp | 14 | ||||
-rw-r--r-- | yql/tools/yqlrun/gateway_spec.h | 8 | ||||
-rw-r--r-- | yql/tools/yqlrun/http/ya.make | 55 | ||||
-rw-r--r-- | yql/tools/yqlrun/http/yql_server.cpp | 4 | ||||
-rw-r--r-- | yql/tools/yqlrun/lib/ya.make | 23 | ||||
-rw-r--r-- | yql/tools/yqlrun/lib/yqlrun_lib.cpp | 128 | ||||
-rw-r--r-- | yql/tools/yqlrun/lib/yqlrun_lib.h | 21 | ||||
-rw-r--r-- | yql/tools/yqlrun/ya.make | 52 | ||||
-rw-r--r-- | yql/tools/yqlrun/yqlrun.cpp | 866 |
17 files changed, 1359 insertions, 911 deletions
diff --git a/yql/essentials/core/facade/yql_facade.cpp b/yql/essentials/core/facade/yql_facade.cpp index ee8e31eb35..a0b07e469f 100644 --- a/yql/essentials/core/facade/yql_facade.cpp +++ b/yql/essentials/core/facade/yql_facade.cpp @@ -1598,7 +1598,7 @@ TMaybe<TString> TProgram::GetStatistics(bool totalOnly, THashMap<TString, TStrin } TStringStream out; - NYson::TYsonWriter writer(&out); + NYson::TYsonWriter writer(&out, OutputFormat_); // Header writer.OnBeginMap(); writer.OnKeyedItem("ExecutionStatistics"); @@ -1677,7 +1677,7 @@ TMaybe<TString> TProgram::GetDiscoveredData() { } TStringStream out; - NYson::TYsonWriter writer(&out); + NYson::TYsonWriter writer(&out, OutputFormat_); writer.OnBeginMap(); for (auto& datasource: TypeCtx_->DataSources) { TStringStream providerOut; diff --git a/yql/essentials/tests/common/test_framework/yqlrun.py b/yql/essentials/tests/common/test_framework/yqlrun.py index 95aa794ec7..e23b81c92f 100644 --- a/yql/essentials/tests/common/test_framework/yqlrun.py +++ b/yql/essentials/tests/common/test_framework/yqlrun.py @@ -234,7 +234,7 @@ class YQLRun(object): for name in self.tables: cmd += '--table=yt.%s@%s ' % (name, self.tables[name].yqlrun_file) - if "--lineage" not in self.extra_args: + if "--lineage" not in self.extra_args and "--peephole" not in self.extra_args: if optimize_only: cmd += '-O ' else: diff --git a/yql/essentials/tools/arrow_kernels_dump/arrow_kernels_dump.cpp b/yql/essentials/tools/arrow_kernels_dump/arrow_kernels_dump.cpp new file mode 100644 index 0000000000..9c7778bfa7 --- /dev/null +++ b/yql/essentials/tools/arrow_kernels_dump/arrow_kernels_dump.cpp @@ -0,0 +1,24 @@ +#include <yql/essentials/minikql/invoke_builtins/mkql_builtins.h> +#include <yql/essentials/public/udf/udf_version.h> + +#include <util/stream/output.h> +#include <util/generic/algorithm.h> +#include <util/folder/path.h> + +int main(int argc, char **argv) { + Y_UNUSED(argc); + Cerr << TFsPath(argv[0]).GetName() << " ABI version: " << NKikimr::NUdf::CurrentAbiVersionStr() << Endl; + + auto builtins = NKikimr::NMiniKQL::CreateBuiltinRegistry(); + auto families = builtins->GetAllKernelFamilies(); + Sort(families, [](const auto& x, const auto& y) { return x.first < y.first; }); + ui64 totalKernels = 0; + for (const auto& f : families) { + auto numKernels = f.second->GetAllKernels().size(); + Cout << f.first << ": " << numKernels << " kernels" << Endl; + totalKernels += numKernels; + } + + Cout << "Total kernel families: " << families.size() << ", kernels: " << totalKernels << Endl; + return 0; +} diff --git a/yql/essentials/tools/arrow_kernels_dump/ya.make b/yql/essentials/tools/arrow_kernels_dump/ya.make new file mode 100644 index 0000000000..d33cfbd5b9 --- /dev/null +++ b/yql/essentials/tools/arrow_kernels_dump/ya.make @@ -0,0 +1,23 @@ +PROGRAM() + +SRCS( + arrow_kernels_dump.cpp +) + +IF (OS_LINUX) + # prevent external python extensions to lookup protobuf symbols (and maybe + # other common stuff) in main binary + EXPORTS_SCRIPT(${ARCADIA_ROOT}/yql/essentials/tools/exports.symlist) +ENDIF() + +PEERDIR( + yql/essentials/minikql/invoke_builtins + yql/essentials/minikql/invoke_builtins/llvm14 + yql/essentials/public/udf/service/terminate_policy + yql/essentials/public/udf + yql/essentials/parser/pg_wrapper +) + +YQL_LAST_ABI_VERSION() + +END() diff --git a/yql/essentials/tools/ya.make b/yql/essentials/tools/ya.make index 89ea1d9d22..8d74153182 100644 --- a/yql/essentials/tools/ya.make +++ b/yql/essentials/tools/ya.make @@ -1,5 +1,6 @@ RECURSE( - astdiff + arrow_kernels_dump + astdiff pg_catalog_dump pg-make-test pgrun @@ -9,4 +10,5 @@ RECURSE( udf_dep_stub udf_probe udf_resolver + yql_facade_run ) diff --git a/yql/essentials/tools/yql_facade_run/ya.make b/yql/essentials/tools/yql_facade_run/ya.make new file mode 100644 index 0000000000..e9d93e235d --- /dev/null +++ b/yql/essentials/tools/yql_facade_run/ya.make @@ -0,0 +1,50 @@ +LIBRARY() + +SRCS( + yql_facade_run.cpp +) + +PEERDIR( + yql/essentials/providers/pg/provider + yql/essentials/providers/common/provider + yql/essentials/providers/common/proto + yql/essentials/providers/common/udf_resolve + yql/essentials/core/file_storage + yql/essentials/core/file_storage/proto + yql/essentials/core/file_storage/defs + yql/essentials/core/url_lister/interface + yql/essentials/core/services/mounts + yql/essentials/core/services + yql/essentials/core/credentials + yql/essentials/core/pg_ext + yql/essentials/core/facade + yql/essentials/core/url_lister + yql/essentials/core/url_preprocessing + yql/essentials/core/peephole_opt + yql/essentials/core + yql/essentials/minikql/invoke_builtins + yql/essentials/minikql + yql/essentials/ast + yql/essentials/parser/pg_wrapper/interface + yql/essentials/parser/pg_catalog + yql/essentials/public/udf + yql/essentials/public/result_format + yql/essentials/utils/failure_injector + yql/essentials/utils/backtrace + yql/essentials/utils/log + yql/essentials/protos + yql/essentials/sql/settings + yql/essentials/sql/v1/format + + library/cpp/resource + library/cpp/getopt + library/cpp/yson/node + library/cpp/yson + library/cpp/logger + + contrib/libs/protobuf +) + +YQL_LAST_ABI_VERSION() + +END() diff --git a/yql/essentials/tools/yql_facade_run/yql_facade_run.cpp b/yql/essentials/tools/yql_facade_run/yql_facade_run.cpp new file mode 100644 index 0000000000..91a3a3eaf8 --- /dev/null +++ b/yql/essentials/tools/yql_facade_run/yql_facade_run.cpp @@ -0,0 +1,779 @@ +#include "yql_facade_run.h" + +#include <yql/essentials/providers/pg/provider/yql_pg_provider.h> +#include <yql/essentials/providers/common/provider/yql_provider_names.h> +#include <yql/essentials/providers/common/proto/gateways_config.pb.h> +#include <yql/essentials/providers/common/udf_resolve/yql_outproc_udf_resolver.h> +#include <yql/essentials/providers/common/udf_resolve/yql_simple_udf_resolver.h> +#include <yql/essentials/providers/common/udf_resolve/yql_udf_resolver_with_index.h> +#include <yql/essentials/core/yql_user_data_storage.h> +#include <yql/essentials/core/yql_udf_resolver.h> +#include <yql/essentials/core/yql_udf_index.h> +#include <yql/essentials/core/yql_udf_index_package_set.h> +#include <yql/essentials/core/yql_library_compiler.h> +#include <yql/essentials/core/yql_type_annotation.h> +#include <yql/essentials/core/pg_ext/yql_pg_ext.h> +#include <yql/essentials/core/services/mounts/yql_mounts.h> +#include <yql/essentials/core/services/yql_out_transformers.h> +#include <yql/essentials/core/file_storage/file_storage.h> +#include <yql/essentials/core/file_storage/proto/file_storage.pb.h> +#include <yql/essentials/core/peephole_opt/yql_opt_peephole_physical.h> +#include <yql/essentials/core/facade/yql_facade.h> +#include <yql/essentials/core/url_lister/url_lister_manager.h> +#include <yql/essentials/core/url_preprocessing/url_preprocessing.h> +#include <yql/essentials/minikql/invoke_builtins/mkql_builtins.h> +#include <yql/essentials/minikql/mkql_function_registry.h> +#include <yql/essentials/ast/yql_expr.h> +#include <yql/essentials/parser/pg_wrapper/interface/parser.h> +#include <yql/essentials/parser/pg_catalog/catalog.h> +#include <yql/essentials/public/udf/udf_version.h> +#include <yql/essentials/public/udf/udf_registrator.h> +#include <yql/essentials/public/udf/udf_validate.h> +#include <yql/essentials/public/result_format/yql_result_format_response.h> +#include <yql/essentials/public/result_format/yql_result_format_type.h> +#include <yql/essentials/public/result_format/yql_result_format_data.h> +#include <yql/essentials/utils/failure_injector/failure_injector.h> +#include <yql/essentials/utils/backtrace/backtrace.h> +#include <yql/essentials/utils/log/log.h> +#include <yql/essentials/protos/yql_mount.pb.h> +#include <yql/essentials/protos/pg_ext.pb.h> +#include <yql/essentials/sql/settings/translation_settings.h> +#include <yql/essentials/sql/v1/format/sql_format.h> + +#include <library/cpp/resource/resource.h> +#include <library/cpp/yson/node/node_io.h> +#include <library/cpp/yson/writer.h> + +#include <google/protobuf/text_format.h> +#include <google/protobuf/arena.h> + +#include <util/stream/output.h> +#include <util/stream/file.h> +#include <util/stream/null.h> +#include <util/system/user.h> +#include <util/string/split.h> +#include <util/string/join.h> +#include <util/string/builder.h> +#include <util/generic/vector.h> +#include <util/generic/ptr.h> +#include <util/generic/yexception.h> +#include <util/datetime/base.h> + +#ifdef __unix__ +#include <sys/resource.h> +#endif + +namespace { + +const ui32 PRETTY_FLAGS = NYql::TAstPrintFlags::PerLine | NYql::TAstPrintFlags::ShortQuote | + NYql::TAstPrintFlags::AdaptArbitraryContent; + +template <typename TMessage> +THolder<TMessage> ParseProtoConfig(const TString& cfgFile) { + auto config = MakeHolder<TMessage>(); + TString configData = TFileInput(cfgFile).ReadAll(); + + using ::google::protobuf::TextFormat; + if (!TextFormat::ParseFromString(configData, config.Get())) { + throw yexception() << "Bad format of config file " << cfgFile; + } + + return config; +} + +template <typename TMessage> +THolder<TMessage> ParseProtoFromResource(TStringBuf resourceName) { + if (!NResource::Has(resourceName)) { + return {}; + } + auto config = MakeHolder<TMessage>(); + TString configData = NResource::Find(resourceName); + + using ::google::protobuf::TextFormat; + if (!TextFormat::ParseFromString(configData, config.Get())) { + throw yexception() << "Bad format of config " << resourceName; + } + return config; +} + +class TOptPipelineConfigurator : public NYql::IPipelineConfigurator { +public: + TOptPipelineConfigurator(NYql::TProgramPtr prg, IOutputStream* planStream, IOutputStream* exprStream, bool withTypes) + : Program_(std::move(prg)) + , PlanStream_(planStream) + , ExprStream_(exprStream) + , WithTypes_(withTypes) + { + } + + void AfterCreate(NYql::TTransformationPipeline* pipeline) const final { + Y_UNUSED(pipeline); + } + + void AfterTypeAnnotation(NYql::TTransformationPipeline* pipeline) const final { + pipeline->Add(NYql::TExprLogTransformer::Sync("OptimizedExpr", NYql::NLog::EComponent::Core, NYql::NLog::ELevel::TRACE), + "OptTrace", NYql::TIssuesIds::CORE, "OptTrace"); + } + + void AfterOptimize(NYql::TTransformationPipeline* pipeline) const final { + if (ExprStream_) { + pipeline->Add(NYql::TExprOutputTransformer::Sync(Program_->ExprRoot(), ExprStream_, WithTypes_), "AstOutput"); + } + if (PlanStream_) { + pipeline->Add(NYql::TPlanOutputTransformer::Sync(PlanStream_, Program_->GetPlanBuilder(), Program_->GetOutputFormat()), "PlanOutput"); + } + } +private: + NYql::TProgramPtr Program_; + IOutputStream* PlanStream_; + IOutputStream* ExprStream_; + bool WithTypes_; +}; + +class TPeepHolePipelineConfigurator : public NYql::IPipelineConfigurator { +public: + TPeepHolePipelineConfigurator() { + } + + void AfterCreate(NYql::TTransformationPipeline* pipeline) const final { + Y_UNUSED(pipeline); + } + + void AfterTypeAnnotation(NYql::TTransformationPipeline* pipeline) const final { + pipeline->Add(NYql::TExprLogTransformer::Sync("OptimizedExpr", NYql::NLog::EComponent::Core, NYql::NLog::ELevel::TRACE), + "OptTrace", NYql::TIssuesIds::CORE, "OptTrace"); + } + + void AfterOptimize(NYql::TTransformationPipeline* pipeline) const final { + pipeline->Add(NYql::MakePeepholeOptimization(pipeline->GetTypeAnnotationContext()), "PeepHole"); + } +}; + +} // unnamed + + +namespace NYql { + +TFacadeRunOptions::TFacadeRunOptions() { + User = GetUsername(); +} + +TFacadeRunOptions::~TFacadeRunOptions() { +} + +void TFacadeRunOptions::InitLogger() { + if (Verbosity != LOG_DEF_PRIORITY) { + NYql::NLog::ELevel level = NYql::NLog::ELevelHelpers::FromInt(Verbosity); + NYql::NLog::EComponentHelpers::ForEach([level](NYql::NLog::EComponent c) { + NYql::NLog::YqlLogger().SetComponentLevel(c, level); + }); + } + + if (TraceOptStream) { + NYql::NLog::YqlLogger().SetComponentLevel(NYql::NLog::EComponent::Core, NYql::NLog::ELevel::TRACE); + NYql::NLog::YqlLogger().SetComponentLevel(NYql::NLog::EComponent::CoreEval, NYql::NLog::ELevel::TRACE); + NYql::NLog::YqlLogger().SetComponentLevel(NYql::NLog::EComponent::CorePeepHole, NYql::NLog::ELevel::TRACE); + } else if (ShowLog) { + NYql::NLog::YqlLogger().SetComponentLevel(NYql::NLog::EComponent::Core, NYql::NLog::ELevel::DEBUG); + } +} + +void TFacadeRunOptions::PrintInfo(const TString& msg) { + if (!NoDebug && Verbosity >= TLOG_INFO) { + Cerr << msg << Endl; + } +} + +void TFacadeRunOptions::Parse(int argc, const char *argv[]) { + + NLastGetopt::TOpts opts = NLastGetopt::TOpts::Default(); + + opts.AddHelpOption(); + opts.AddLongOption('p', "program", "Program file").Required().RequiredArgument("FILE") + .Handler1T<TString>([this](const TString& file) { + ProgramFile = file; + if (ProgramFile == "-") { + ProgramFile = "-stdin-"; + ProgramText = Cin.ReadAll(); + } else { + ProgramText = TFileInput(ProgramFile).ReadAll(); + } + User = GetUsername(); + }); + opts.AddLongOption('s', "sql", "Program is SQL query").NoArgument().StoreValue(&ProgramType, EProgramType::Sql); + if (PgSupport) { + opts.AddLongOption("pg", "Program has PG syntax").NoArgument().StoreValue(&ProgramType, EProgramType::Pg); + opts.AddLongOption("pg-ext", "pg extensions config file").Optional().RequiredArgument("FILE") + .Handler1T<TString>([this](const TString& file) { + PgExtConfig = ParseProtoConfig<NProto::TPgExtensions>(file); + }); + } + opts.AddLongOption('f', "file", "Additional files").RequiredArgument("name@path") + .KVHandler([this](TString name, TString path) { + if (name.empty() || path.empty()) { + throw yexception() << "Incorrect file mapping, expected form name@path, e.g. MyFile@file.txt"; + } + + auto& entry = DataTable[NYql::TUserDataKey::File(NYql::GetDefaultFilePrefix() + name)]; + entry.Type = NYql::EUserDataType::PATH; + entry.Data = path; + }, '@'); + + opts.AddLongOption('U', "url", "Additional urls").RequiredArgument("name@path") + .KVHandler([this](TString name, TString url) { + if (name.empty() || url.empty()) { + throw yexception() << "url mapping, expected form name@url, e.g. MyUrl@http://example.com/file"; + } + + auto& entry = DataTable[NYql::TUserDataKey::File(NYql::GetDefaultFilePrefix() + name)]; + entry.Type = NYql::EUserDataType::URL; + entry.Data = url; + }, '@'); + + opts.AddLongOption('m', "mounts", "Mount points config file.").Optional().RequiredArgument("FILE") + .Handler1T<TString>([this](const TString& file) { + MountConfig = ParseProtoConfig<NYqlMountConfig::TMountConfig>(file); + }); + opts.AddLongOption("params-file", "Query parameters values in YSON format").Optional().RequiredArgument("FILE") + .Handler1T<TString>([this](const TString& file) { + Params = TFileInput(file).ReadAll(); + }); + opts.AddLongOption('G', "gateways", "Used gateways").DefaultValue(JoinSeq(",", SupportedGateways_)) + .Handler1T<TString>([this](const TString& gateways) { + ::StringSplitter(gateways).Split(',').Consume([&](const TStringBuf& val) { + if (!SupportedGateways_.contains(val)) { + throw yexception() << "Unsupported gateway \"" << val << '"'; + } + GatewayTypes.emplace(val); + }); + }); + opts.AddLongOption("gateways-cfg", "Gateways configuration file").Optional().RequiredArgument("FILE") + .Handler1T<TString>([this](const TString& file) { + GatewaysConfig = ParseProtoConfig<TGatewaysConfig>(file); + }); + opts.AddLongOption("fs-cfg", "Fs configuration file").Optional().RequiredArgument("FILE") + .Handler1T<TString>([this](const TString& file) { + FsConfig = MakeHolder<TFileStorageConfig>(); + LoadFsConfigFromFile(file, *FsConfig); + }); + opts.AddLongOption('u', "udf", "Load shared library with UDF by given path").AppendTo(&UdfsPaths); + opts.AddLongOption("udfs-dir", "Load all shared libraries with UDFs found in given directory") + .Handler1T<TString>([this](const TString& dir) { + NKikimr::NMiniKQL::FindUdfsInDir(dir, &UdfsPaths); + }); + opts.AddLongOption("udf-resolver", "Path to udf-resolver").Optional().RequiredArgument("PATH").StoreResult(&UdfResolverPath); + opts.AddLongOption("udf-resolver-filter-syscalls", "Filter syscalls in udf resolver").Optional().NoArgument().SetFlag(&UdfResolverFilterSyscalls); + opts.AddLongOption("scan-udfs", "Scan specified udfs with external udf-resolver to use static function registry").NoArgument().SetFlag(&ScanUdfs); + + opts.AddLongOption("parse-only", "Exit after program has been parsed").NoArgument().StoreValue(&Mode, ERunMode::Parse); + opts.AddLongOption("compile-only", "Exit after program has been compiled").NoArgument().StoreValue(&Mode, ERunMode::Compile); + opts.AddLongOption("validate", "Exit after program has been validated").NoArgument().StoreValue(&Mode, ERunMode::Validate); + opts.AddLongOption("lineage", "Exit after data lineage has been calculated").NoArgument().StoreValue(&Mode, ERunMode::Lineage); + opts.AddLongOption('O',"optimize", "Optimize expression").NoArgument().StoreValue(&Mode, ERunMode::Optimize); + opts.AddLongOption('R',"run", "Run expression using input/output tables").NoArgument().StoreValue(&Mode, ERunMode::Run); + opts.AddLongOption('D', "discover", "Discover tables in the program").NoArgument().StoreValue(&Mode, ERunMode::Discover); + opts.AddLongOption("peephole", "Perform peephole stage of expression using input/output tables").NoArgument().StoreValue(&Mode, ERunMode::Peephole); + + opts.AddLongOption('L', "show-log", "Show transformation log").Optional().NoArgument().SetFlag(&ShowLog); + opts.AddLongOption('v', "verbosity", "Log verbosity level").Optional().RequiredArgument("LEVEL").StoreResult(&Verbosity); + opts.AddLongOption("print-ast", "Print AST after loading").NoArgument().SetFlag(&PrintAst); + opts.AddLongOption("print-expr", "Print rebuild AST before execution").NoArgument() + .Handler0([this]() { + if (!ExprStream) { + ExprStream = &Cout; + } + }); + opts.AddLongOption("with-types", "Print types annotation").NoArgument().SetFlag(&WithTypes); + opts.AddLongOption("trace-opt", "Print AST in the begin of each transformation").NoArgument() + .Handler0([this]() { + TraceOptStream = &Cerr; + }); + opts.AddLongOption("expr-file", "Print AST to that file instead of stdout").Optional().RequiredArgument("FILE") + .Handler1T<TString>([this](const TString& file) { + ExprStreamHolder = MakeHolder<TFixedBufferFileOutput>(file); + ExprStream = ExprStreamHolder.Get(); + }); + opts.AddLongOption("print-result", "Print program execution result to stdout").NoArgument() + .Handler0([this]() { + if (!ResultStream) { + ResultStream = &Cout; + } + }); + opts.AddLongOption("format", "Results format") + .Optional() + .RequiredArgument("STR") + .Choices(THashSet<TString>{"text", "binary", "pretty"}) + .Handler1T<TString>([this](const TString& val) { + if (val == "text") { + ResultsFormat = NYson::EYsonFormat::Text; + } else if (val == "binary") { + ResultsFormat = NYson::EYsonFormat::Binary; + } else if (val == "pretty") { + ResultsFormat = NYson::EYsonFormat::Pretty; + } else { + throw yexception() << "Unknown result format " << val; + } + }); + + opts.AddLongOption("result-file", "Print program execution result to file").Optional().RequiredArgument("FILE") + .Handler1T<TString>([this](const TString& file) { + ResultStreamHolder = MakeHolder<TFixedBufferFileOutput>(file); + ResultStream = ResultStreamHolder.Get(); + }); + opts.AddLongOption('P',"trace-plan", "Print plan before execution").NoArgument() + .Handler0([this]() { + if (!PlanStream) { + PlanStream = &Cerr; + } + }); + opts.AddLongOption("plan-file", "Print program plan to file").Optional().RequiredArgument("FILE") + .Handler1T<TString>([this](const TString& file) { + PlanStreamHolder = MakeHolder<TFixedBufferFileOutput>(file); + PlanStream = PlanStreamHolder.Get(); + }); + opts.AddLongOption("err-file", "Print validate/optimize/runtime errors to file") + .Handler1T<TString>([this](const TString& file) { + ErrStreamHolder = MakeHolder<TFixedBufferFileOutput>(file); + ErrStream = ErrStreamHolder.Get(); + }); + opts.AddLongOption("full-expr", "Avoid buffering of expr/plan").NoArgument().SetFlag(&FullExpr); + opts.AddLongOption("mem-limit", "Set memory limit in megabytes") + .Handler1T<ui32>(0, [](ui32 memLimit) { + if (memLimit) { +#ifdef __unix__ + auto memLimitBytes = memLimit * 1024 * 1024; + + struct rlimit rl; + if (getrlimit(RLIMIT_AS, &rl)) { + throw TSystemError() << "Cannot getrlimit(RLIMIT_AS)"; + } + + rl.rlim_cur = memLimitBytes; + if (setrlimit(RLIMIT_AS, &rl)) { + throw TSystemError() << "Cannot setrlimit(RLIMIT_AS) to " << memLimitBytes << " bytes"; + } +#else + throw yexception() << "Memory limit can not be set on this platfrom"; +#endif + } + }); + + opts.AddLongOption("validate-mode", "Validate udf mode, available values: " + NUdf::ValidateModeAvailables()) + .DefaultValue(NUdf::ValidateModeAsStr(NUdf::EValidateMode::Greedy)) + .Handler1T<TString>([this](const TString& mode) { + ValidateMode = NUdf::ValidateModeByStr(mode); + }); + opts.AddLongOption("stat", "Print execution statistics").Optional().OptionalArgument("FILE") + .Handler1T<TString>([this](const TString& file) { + if (file) { + StatStreamHolder = MakeHolder<TFileOutput>(file); + StatStream = StatStreamHolder.Get(); + } else { + StatStream = &Cerr; + } + }); + opts.AddLongOption("full-stat", "Output full execution statistics").Optional().NoArgument().SetFlag(&FullStatistics); + + opts.AddLongOption("sql-flags", "SQL translator pragma flags").SplitHandler(&SqlFlags, ','); + opts.AddLongOption("syntax-version", "SQL syntax version").StoreResult(&SyntaxVersion).DefaultValue(1); + opts.AddLongOption("ansi-lexer", "Use ansi lexer").NoArgument().SetFlag(&AnsiLexer); + opts.AddLongOption("assume-ydb-on-slash", "Assume YDB provider if cluster name starts with '/'").NoArgument().SetFlag(&AssumeYdbOnClusterWithSlash); + opts.AddLongOption("test-antlr4", "Check antlr4 parser").NoArgument().SetFlag(&TestAntlr4); + + opts.AddLongOption("with-final-issues", "Include some final messages (like statistic) in issues").NoArgument().SetFlag(&WithFinalIssues); + if (FailureInjectionSupport) { + opts.AddLongOption("failure-inject", "Activate failure injection") + .Optional() + .RequiredArgument("INJECTION_NAME=FAIL_COUNT or INJECTION_NAME=SKIP_COUNT/FAIL_COUNT") + .KVHandler([](TString name, TString value) { + TFailureInjector::Activate(); + TStringBuf fail = value; + TStringBuf skip; + if (TStringBuf(value).TrySplit('/', skip, fail)) { + TFailureInjector::Set(name, FromString<ui32>(skip), FromString<ui32>(fail)); + } else { + TFailureInjector::Set(name, 0, FromString<ui32>(fail)); + } + }); + } + + opts.SetFreeArgsMax(0); + + for (auto& ext: OptExtenders_) { + ext(opts); + } + + auto res = NLastGetopt::TOptsParseResult(&opts, argc, argv); + + for (auto& handle: OptHandlers_) { + handle(res); + } + + if (Mode >= ERunMode::Validate && GatewayTypes.empty()) { + throw yexception() << "At least one gateway from the list " << JoinSeq(",", SupportedGateways_).Quote() << " must be specified"; + } + + if (!GatewaysConfig) { + GatewaysConfig = ParseProtoFromResource<TGatewaysConfig>("gateways.conf"); + } + + if (GatewaysConfig && GatewaysConfig->HasSqlCore()) { + SqlFlags.insert(GatewaysConfig->GetSqlCore().GetTranslationFlags().begin(), GatewaysConfig->GetSqlCore().GetTranslationFlags().end()); + } + + if (!FsConfig) { + FsConfig = MakeHolder<TFileStorageConfig>(); + if (NResource::Has("fs.conf")) { + LoadFsConfigFromResource("fs.conf", *FsConfig); + } + } +} + +int TFacadeRunner::Main(int argc, const char *argv[]) { + NYql::NBacktrace::RegisterKikimrFatalActions(); + NYql::NBacktrace::EnableKikimrSymbolize(); + + NYql::NLog::YqlLoggerScope logger(&Cerr); + try { + return DoMain(argc, argv); + } + catch (...) { + Cerr << CurrentExceptionMessage() << Endl; + return 1; + } +} + +int TFacadeRunner::DoMain(int argc, const char *argv[]) { + Y_UNUSED(NUdf::GetStaticSymbols()); + + RunOptions_.Parse(argc, argv); + + if (!RunOptions_.NoDebug) { + Cerr << Name_ << " ABI version: " << NKikimr::NUdf::CurrentAbiVersionStr() << Endl; + } + + RunOptions_.InitLogger(); + + if (RunOptions_.PgSupport) { + ClusterMapping_["pg_catalog"] = PgProviderName; + ClusterMapping_["information_schema"] = PgProviderName; + + NPg::SetSqlLanguageParser(NSQLTranslationPG::CreateSqlLanguageParser()); + NPg::LoadSystemFunctions(*NSQLTranslationPG::CreateSystemFunctionsParser()); + if (RunOptions_.PgExtConfig) { + TVector<NPg::TExtensionDesc> extensions; + PgExtensionsFromProto(*RunOptions_.PgExtConfig, extensions); + NPg::RegisterExtensions(extensions, false, + *NSQLTranslationPG::CreateExtensionSqlParser(), + NKikimr::NMiniKQL::CreateExtensionLoader().get()); + } + + NPg::GetSqlLanguageParser()->Freeze(); + } + + FuncRegistry_ = NKikimr::NMiniKQL::CreateFunctionRegistry(&NYql::NBacktrace::KikimrBackTrace, + NKikimr::NMiniKQL::CreateBuiltinRegistry(), true, RunOptions_.UdfsPaths); + + TExprContext ctx; + if (RunOptions_.PgSupport) { + ctx.NextUniqueId = NPg::GetSqlLanguageParser()->GetContext().NextUniqueId; + } + IModuleResolver::TPtr moduleResolver; + if (RunOptions_.MountConfig) { + TModulesTable modules; + FillUserDataTableFromFileSystem(*RunOptions_.MountConfig, RunOptions_.DataTable); + + if (!CompileLibraries(RunOptions_.DataTable, ctx, modules)) { + *RunOptions_.ErrStream << "Errors on compile libraries:" << Endl; + ctx.IssueManager.GetIssues().PrintTo(*RunOptions_.ErrStream); + return -1; + } + + moduleResolver = std::make_shared<TModuleResolver>(std::move(modules), ctx.NextUniqueId, ClusterMapping_, RunOptions_.SqlFlags, RunOptions_.Mode >= ERunMode::Validate); + } else { + if (!GetYqlDefaultModuleResolver(ctx, moduleResolver, ClusterMapping_, RunOptions_.Mode >= ERunMode::Validate)) { + *RunOptions_.ErrStream << "Errors loading default YQL libraries:" << Endl; + ctx.IssueManager.GetIssues().PrintTo(*RunOptions_.ErrStream); + return -1; + } + } + + TExprContext::TFreezeGuard freezeGuard(ctx); + + if (RunOptions_.Mode >= ERunMode::Validate) { + std::vector<NFS::IDownloaderPtr> downloaders; + for (auto& factory: FsDownloadFactories_) { + downloaders.push_back(factory()); + } + + FileStorage_ = WithAsync(CreateFileStorage(*RunOptions_.FsConfig, downloaders)); + } + + IUdfResolver::TPtr udfResolver; + TUdfIndex::TPtr udfIndex; + if (FileStorage_ && RunOptions_.ScanUdfs) { + if (!RunOptions_.UdfResolverPath) { + Cerr << "udf-resolver path must be specified when use 'scan-udfs'"; + return -1; + } + + udfResolver = NCommon::CreateOutProcUdfResolver(FuncRegistry_.Get(), FileStorage_, RunOptions_.UdfResolverPath, {}, {}, RunOptions_.UdfResolverFilterSyscalls, {}); + + RunOptions_.PrintInfo(TStringBuilder() << TInstant::Now().ToStringLocalUpToSeconds() << " Udf scanning started for " << RunOptions_.UdfsPaths.size() << " udfs ..."); + udfIndex = new TUdfIndex(); + LoadRichMetadataToUdfIndex(*udfResolver, RunOptions_.UdfsPaths, false, TUdfIndex::EOverrideMode::RaiseError, *udfIndex); + RunOptions_.PrintInfo(TStringBuilder() << TInstant::Now().ToStringLocalUpToSeconds() << " UdfIndex done."); + + udfResolver = NCommon::CreateUdfResolverWithIndex(udfIndex, udfResolver, FileStorage_); + RunOptions_.PrintInfo(TStringBuilder() << TInstant::Now().ToStringLocalUpToSeconds() << " Udfs scanned"); + } else { + udfResolver = FileStorage_ && RunOptions_.UdfResolverPath + ? NCommon::CreateOutProcUdfResolver(FuncRegistry_.Get(), FileStorage_, RunOptions_.UdfResolverPath, {}, {}, RunOptions_.UdfResolverFilterSyscalls, {}) + : NCommon::CreateSimpleUdfResolver(FuncRegistry_.Get(), FileStorage_, true); + } + + TVector<TDataProviderInitializer> dataProvidersInit; + if (RunOptions_.PgSupport) { + dataProvidersInit.push_back(GetPgDataProviderInitializer()); + } + for (auto& factory: ProviderFactories_) { + dataProvidersInit.push_back(factory()); + } + + TVector<IUrlListerPtr> urlListers; + for (auto& factory: UrlListerFactories_) { + urlListers.push_back(factory()); + } + + TProgramFactory factory(RunOptions_.UseRepeatableRandomAndTimeProviders, FuncRegistry_.Get(), ctx.NextUniqueId, dataProvidersInit, Name_); + factory.AddUserDataTable(RunOptions_.DataTable); + factory.SetModules(moduleResolver); + factory.SetFileStorage(FileStorage_); + if (RunOptions_.GatewaysConfig && RunOptions_.GatewaysConfig->HasFs()) { + factory.SetUrlPreprocessing(new NYql::TUrlPreprocessing(*RunOptions_.GatewaysConfig)); + } + factory.SetUdfIndex(udfIndex, new TUdfIndexPackageSet()); + factory.SetUdfResolver(udfResolver); + factory.SetGatewaysConfig(RunOptions_.GatewaysConfig.Get()); + factory.SetCredentials(Credentials_); + factory.EnableRangeComputeFor(); + if (!urlListers.empty()) { + factory.SetUrlListerManager(MakeUrlListerManager(urlListers)); + } + + return RunProgram(factory); +} + +int TFacadeRunner::RunProgram(TProgramFactory& factory) { + + TProgramPtr program = factory.Create(RunOptions_.ProgramFile, RunOptions_.ProgramText);; + if (RunOptions_.Params) { + program->SetParametersYson(RunOptions_.Params); + } + + if (RunOptions_.EnableResultPosition) { + program->EnableResultPosition(); + } + + if (ProgressWriter_) { + program->SetProgressWriter(ProgressWriter_); + } + program->SetUseTableMetaFromGraph(RunOptions_.UseMetaFromGrpah); + program->SetValidateOptions(RunOptions_.ValidateMode); + + bool fail = false; + if (RunOptions_.ProgramType != EProgramType::SExpr) { + RunOptions_.PrintInfo("Parse SQL..."); + google::protobuf::Arena arena; + NSQLTranslation::TTranslationSettings settings; + settings.Arena = &arena; + settings.PgParser = EProgramType::Pg == RunOptions_.ProgramType; + settings.ClusterMapping = ClusterMapping_; + settings.Flags = RunOptions_.SqlFlags; + settings.SyntaxVersion = RunOptions_.SyntaxVersion; + settings.AnsiLexer = RunOptions_.AnsiLexer; + settings.TestAntlr4 = RunOptions_.TestAntlr4; + settings.V0Behavior = NSQLTranslation::EV0Behavior::Report; + settings.AssumeYdbOnClusterWithSlash = RunOptions_.AssumeYdbOnClusterWithSlash; + if (ERunMode::Discover == RunOptions_.Mode) { + settings.Mode = NSQLTranslation::ESqlMode::DISCOVERY; + } + if (!program->ParseSql(settings)) { + program->PrintErrorsTo(*RunOptions_.ErrStream); + fail = true; + } + if (!fail && RunOptions_.TestSqlFormat && 1 == RunOptions_.SyntaxVersion) { + TString formattedProgramText; + NYql::TIssues issues; + auto formatter = NSQLFormat::MakeSqlFormatter(settings); + if (!formatter->Format(RunOptions_.ProgramText, formattedProgramText, issues)) { + *RunOptions_.ErrStream << "Format failed" << Endl; + issues.PrintTo(*RunOptions_.ErrStream); + return -1; + } + + auto frmProgram = factory.Create("formatted SQL", formattedProgramText); + if (!frmProgram->ParseSql(settings)) { + frmProgram->PrintErrorsTo(*RunOptions_.ErrStream); + return -1; + } + + TStringStream srcQuery, frmQuery; + + program->AstRoot()->PrettyPrintTo(srcQuery, PRETTY_FLAGS); + frmProgram->AstRoot()->PrettyPrintTo(frmQuery, PRETTY_FLAGS); + if (srcQuery.Str() != frmQuery.Str()) { + *RunOptions_.ErrStream << "source query's AST and formatted query's AST are not same" << Endl; + return -1; + } + } + } else { + RunOptions_.PrintInfo("Parse YQL..."); + if (!program->ParseYql()) { + program->PrintErrorsTo(*RunOptions_.ErrStream); + fail = true; + } + } + + if (RunOptions_.TraceOptStream) { + if (auto ast = program->GetQueryAst()) { + *RunOptions_.TraceOptStream << *ast << Endl; + } + } + if (fail) { + return -1; + } + + if (RunOptions_.PrintAst) { + program->AstRoot()->PrettyPrintTo(Cout, PRETTY_FLAGS); + } + + if (ERunMode::Parse == RunOptions_.Mode) { + return 0; + } + + RunOptions_.PrintInfo("Compile program..."); + if (!program->Compile(RunOptions_.User)) { + program->PrintErrorsTo(*RunOptions_.ErrStream); + fail = true; + } + + if (RunOptions_.TraceOptStream) { + program->Print(RunOptions_.TraceOptStream, nullptr); + } + if (fail) { + return -1; + } + + if (ERunMode::Compile == RunOptions_.Mode) { + if (RunOptions_.ExprStream) { + auto baseAst = ConvertToAst(*program->ExprRoot(), program->ExprCtx(), NYql::TExprAnnotationFlags::None, true); + baseAst.Root->PrettyPrintTo(*RunOptions_.ExprStream, PRETTY_FLAGS); + } + return 0; + } + + auto defOptConfig = TOptPipelineConfigurator(program, RunOptions_.FullExpr ? RunOptions_.PlanStream : nullptr, RunOptions_.FullExpr ? RunOptions_.ExprStream : nullptr, RunOptions_.WithTypes); + IPipelineConfigurator* optConfig = OptPipelineConfigurator_ ? OptPipelineConfigurator_ : &defOptConfig; + + TProgram::TStatus status = TProgram::TStatus::Ok; + if (ERunMode::Peephole == RunOptions_.Mode) { + RunOptions_.PrintInfo("Peephole..."); + auto defConfig = TPeepHolePipelineConfigurator(); + IPipelineConfigurator* config = PeepholePipelineConfigurator_ ? PeepholePipelineConfigurator_ : &defConfig; + status = program->OptimizeWithConfig(RunOptions_.User, *config); + + if (RunOptions_.ExprStream && program->ExprRoot()) { + auto ast = ConvertToAst(*program->ExprRoot(), program->ExprCtx(), RunOptions_.WithTypes ? TExprAnnotationFlags::Types : TExprAnnotationFlags::None, true); + ui32 prettyFlags = TAstPrintFlags::ShortQuote; + if (!RunOptions_.WithTypes) { + prettyFlags |= TAstPrintFlags::PerLine; + } + ast.Root->PrettyPrintTo(*RunOptions_.ExprStream, prettyFlags); + } + + } else if (ERunMode::Run == RunOptions_.Mode) { + RunOptions_.PrintInfo("Run program..."); + status = program->RunWithConfig(RunOptions_.User, *optConfig); + } else if (ERunMode::Optimize == RunOptions_.Mode) { + RunOptions_.PrintInfo("Optimize program..."); + status = program->OptimizeWithConfig(RunOptions_.User, *optConfig); + } else if (ERunMode::Validate == RunOptions_.Mode) { + RunOptions_.PrintInfo("Validate program..."); + status = program->Validate(RunOptions_.User, RunOptions_.ExprStream, RunOptions_.WithTypes); + } else if (ERunMode::Discover == RunOptions_.Mode) { + RunOptions_.PrintInfo("Discover program..."); + status = program->Discover(RunOptions_.User); + } else if (ERunMode::Lineage == RunOptions_.Mode) { + RunOptions_.PrintInfo("Calculating lineage in program..."); + status = program->LineageWithConfig(RunOptions_.User, *optConfig); + } + + if (RunOptions_.WithFinalIssues) { + program->FinalizeIssues(); + } + program->PrintErrorsTo(*RunOptions_.ErrStream); + if (status == TProgram::TStatus::Error) { + if (RunOptions_.TraceOptStream) { + program->Print(RunOptions_.TraceOptStream, nullptr); + } + return -1; + } + + if (!RunOptions_.FullExpr && ERunMode::Peephole != RunOptions_.Mode) { + program->Print(RunOptions_.ExprStream, RunOptions_.PlanStream, /*cleanPlan*/true); + } + + program->ConfigureYsonResultFormat(RunOptions_.ResultsFormat); + + if (RunOptions_.ResultStream) { + RunOptions_.PrintInfo("Getting results..."); + if (ERunMode::Discover == RunOptions_.Mode) { + if (auto data = program->GetDiscoveredData()) { + *RunOptions_.ResultStream << *data; + } + } else if (ERunMode::Lineage == RunOptions_.Mode) { + if (auto data = program->GetLineage()) { + TStringInput in(*data); + NYson::ReformatYsonStream(&in, RunOptions_.ResultStream, RunOptions_.ResultsFormat); + } + } else if (program->HasResults()) { + if (RunOptions_.ValidateResultFormat) { + auto str = program->ResultsAsString(); + if (!str.empty()) { + auto node = NYT::NodeFromYsonString(str); + for (const auto& r : NResult::ParseResponse(node)) { + for (const auto& write : r.Writes) { + if (write.Type) { + NResult::TEmptyTypeVisitor visitor; + NResult::ParseType(*write.Type, visitor); + } + + if (write.Type && write.Data) { + NResult::TEmptyDataVisitor visitor; + NResult::ParseData(*write.Type, *write.Data, visitor); + } + } + } + } + + RunOptions_.ResultStream->Write(str.data(), str.size()); + } else { + *RunOptions_.ResultStream << program->ResultsAsString(); + } + } + } + + if (RunOptions_.StatStream) { + if (auto st = program->GetStatistics(!RunOptions_.FullStatistics)) { + *RunOptions_.StatStream << *st; + } + } + + RunOptions_.PrintInfo(""); + RunOptions_.PrintInfo("Done"); + + return 0; +} + +} // NYql diff --git a/yql/essentials/tools/yql_facade_run/yql_facade_run.h b/yql/essentials/tools/yql_facade_run/yql_facade_run.h new file mode 100644 index 0000000000..0b7e7a9e8d --- /dev/null +++ b/yql/essentials/tools/yql_facade_run/yql_facade_run.h @@ -0,0 +1,213 @@ +#pragma once + +#include <yql/essentials/core/file_storage/defs/downloader.h> +#include <yql/essentials/core/file_storage/file_storage.h> +#include <yql/essentials/core/credentials/yql_credentials.h> +#include <yql/essentials/core/url_lister/interface/url_lister.h> +#include <yql/essentials/core/yql_data_provider.h> +#include <yql/essentials/core/yql_user_data.h> + +#include <library/cpp/getopt/last_getopt.h> +#include <library/cpp/yson/public.h> +#include <library/cpp/logger/priority.h> + +#include <util/generic/hash_set.h> +#include <util/generic/hash.h> +#include <util/generic/strbuf.h> +#include <util/generic/string.h> + +#include <functional> + +namespace NKikimr::NMiniKQL { + class IFunctionRegistry; +} + +namespace NYql { + class TFileStorageConfig; + class TGatewaysConfig; + class TProgramFactory; +} + +namespace NYql::NProto { + class TPgExtensions; +} + +namespace NYqlMountConfig { + class TMountConfig; +} + +namespace NYql { + +enum class ERunMode { + Parse /* "parse" */, + Compile /* "compile" */, + Validate /* "validate" */, + Optimize /* "optimize" */, + Peephole /* "peephole" */, + Lineage /* "lineage" */, + Discover /* "discover" */, + Run /* "run" */, +}; + +enum class EProgramType { + SExpr /* "s-expr" */, + Sql /* "sql" */, + Pg /* "pg" */, +}; + +class TFacadeRunOptions { +public: + TFacadeRunOptions(); + ~TFacadeRunOptions(); + + EProgramType ProgramType = EProgramType::SExpr; + NYson::EYsonFormat ResultsFormat = NYson::EYsonFormat::Text; + ERunMode Mode = ERunMode::Run; + TString ProgramFile; + TString ProgramText; + TString User; + ui64 MemLimit = 0; + + THashSet<TString> SqlFlags; + ui16 SyntaxVersion = 1; + bool AnsiLexer = false; + bool TestAntlr4 = false; + bool AssumeYdbOnClusterWithSlash = false; + + bool PrintAst = false; + bool FullExpr = false; + bool WithTypes = false; + bool FullStatistics = false; + int Verbosity = TLOG_ERR; + bool ShowLog = false; + bool WithFinalIssues = false; + + IOutputStream* TraceOptStream = nullptr; + + IOutputStream* ErrStream = &Cerr; + THolder<IOutputStream> ErrStreamHolder; + IOutputStream* PlanStream = nullptr; + THolder<IOutputStream> PlanStreamHolder; + IOutputStream* ExprStream = nullptr; + THolder<IOutputStream> ExprStreamHolder; + IOutputStream* ResultStream = nullptr; + THolder<IOutputStream> ResultStreamHolder; + IOutputStream* StatStream = nullptr; + THolder<IOutputStream> StatStreamHolder; + + NYql::TUserDataTable DataTable; + TVector<TString> UdfsPaths; + TString Params; + NUdf::EValidateMode ValidateMode = NUdf::EValidateMode::Greedy; + + THashSet<TString> GatewayTypes; + TString UdfResolverPath; + bool UdfResolverFilterSyscalls = false; + bool ScanUdfs = false; + THolder<NYqlMountConfig::TMountConfig> MountConfig; + THolder<TGatewaysConfig> GatewaysConfig; + THolder<TFileStorageConfig> FsConfig; + THolder<NProto::TPgExtensions> PgExtConfig; + + // No command line options for these settings. Should be configured in the inherited class + bool NoDebug = false; + bool PgSupport = true; + bool FailureInjectionSupport = false; + bool UseRepeatableRandomAndTimeProviders = false; + bool UseMetaFromGrpah = false; + bool TestSqlFormat = false; + bool ValidateResultFormat = false; + bool EnableResultPosition = false; + + void Parse(int argc, const char *argv[]); + + void AddOptExtension(std::function<void(NLastGetopt::TOpts& opts)> optExtender) { + OptExtenders_.push_back(std::move(optExtender)); + } + void AddOptHandler(std::function<void(const NLastGetopt::TOptsParseResult& res)> optHandler) { + OptHandlers_.push_back(std::move(optHandler)); + } + void SetSupportedGateways(std::initializer_list<TString> gateways) { + SupportedGateways_.insert(gateways); + } + + void InitLogger(); + + void PrintInfo(const TString& msg); + +private: + std::vector<std::function<void(NLastGetopt::TOpts&)>> OptExtenders_; + std::vector<std::function<void(const NLastGetopt::TOptsParseResult&)>> OptHandlers_; + THashSet<TString> SupportedGateways_; +}; + +class TFacadeRunner { +public: + TFacadeRunner(TString name) + : Name_(std::move(name)) + { + } + int Main(int argc, const char *argv[]); + + void AddFsDownloadFactory(std::function<NFS::IDownloaderPtr()> factory) { + FsDownloadFactories_.push_back(std::move(factory)); + } + void AddProviderFactory(std::function<NYql::TDataProviderInitializer()> factory) { + ProviderFactories_.push_back(std::move(factory)); + } + void AddUrlListerFactory(std::function<IUrlListerPtr()> factory) { + UrlListerFactories_.push_back(std::move(factory)); + } + void AddClusterMapping(TString name, TString provider) { + ClusterMapping_[name] = std::move(provider); + } + template <class TPbConfig> + void FillClusterMapping(const TPbConfig& config, const TString& provider) { + for (auto& cluster: config.GetClusterMapping()) { + ClusterMapping_.emplace(to_lower(cluster.GetName()), provider); + } + } + void SetOperationProgressWriter(TOperationProgressWriter writer) { + ProgressWriter_ = std::move(writer); + } + void SetOptPipelineConfigurator(IPipelineConfigurator* configurator) { + OptPipelineConfigurator_ = configurator; + } + void SetPeepholePipelineConfigurator(IPipelineConfigurator* configurator) { + PeepholePipelineConfigurator_ = configurator; + } + + TFileStoragePtr GetFileStorage() const { + return FileStorage_; + } + TIntrusivePtr<NKikimr::NMiniKQL::IFunctionRegistry> GetFuncRegistry() { + return FuncRegistry_; + } + TCredentials::TPtr GetCredentials() { + return Credentials_; + } + TFacadeRunOptions& GetRunOptions() { + return RunOptions_; + } + +private: + int DoMain(int argc, const char *argv[]); + int RunProgram(TProgramFactory& factory); + +private: + TString Name_; + std::vector<std::function<NFS::IDownloaderPtr()>> FsDownloadFactories_; + std::vector<std::function<TDataProviderInitializer()>> ProviderFactories_; + std::vector<std::function<IUrlListerPtr()>> UrlListerFactories_; + THashMap<TString, TString> ClusterMapping_; + THolder<TFileStorageConfig> FileStorageConfig_; + TFileStoragePtr FileStorage_; + TIntrusivePtr<NKikimr::NMiniKQL::IFunctionRegistry> FuncRegistry_; + TCredentials::TPtr Credentials_ = MakeIntrusive<TCredentials>(); + TOperationProgressWriter ProgressWriter_; + IPipelineConfigurator* OptPipelineConfigurator_ = nullptr; + IPipelineConfigurator* PeepholePipelineConfigurator_ = nullptr; + TFacadeRunOptions RunOptions_; +}; + +} // NYql diff --git a/yql/tools/yqlrun/gateway_spec.cpp b/yql/tools/yqlrun/gateway_spec.cpp deleted file mode 100644 index b53b5ecce8..0000000000 --- a/yql/tools/yqlrun/gateway_spec.cpp +++ /dev/null @@ -1,14 +0,0 @@ -#include <yql/tools/yqlrun/gateway_spec.h> - -#include <library/cpp/getopt/last_getopt.h> - -using namespace NYql; -using namespace NKikimr::NMiniKQL; - -void ExtProviderSpecific(const IFunctionRegistry* funcRegistry, - TVector<TDataProviderInitializer>& dataProvidersInit, - const THashMap<std::pair<TString, TString>, TVector<std::pair<TString, TString>>>& rtmrTableAttributes) { - Y_UNUSED(funcRegistry); - Y_UNUSED(dataProvidersInit); - Y_UNUSED(rtmrTableAttributes); -} diff --git a/yql/tools/yqlrun/gateway_spec.h b/yql/tools/yqlrun/gateway_spec.h deleted file mode 100644 index 9ddf9b1e12..0000000000 --- a/yql/tools/yqlrun/gateway_spec.h +++ /dev/null @@ -1,8 +0,0 @@ -#pragma once - -#include <yql/essentials/minikql/mkql_function_registry.h> -#include <yql/essentials/core/yql_data_provider.h> - -void ExtProviderSpecific(const NKikimr::NMiniKQL::IFunctionRegistry* funcRegistry, - TVector<NYql::TDataProviderInitializer>& dataProvidersInit, - const THashMap<std::pair<TString, TString>, TVector<std::pair<TString, TString>>>& rtmrTableAttributes = {}); diff --git a/yql/tools/yqlrun/http/ya.make b/yql/tools/yqlrun/http/ya.make index 90ad34dfee..34dafb3fae 100644 --- a/yql/tools/yqlrun/http/ya.make +++ b/yql/tools/yqlrun/http/ya.make @@ -10,27 +10,48 @@ SRCS( ) PEERDIR( - library/cpp/charset - library/cpp/http/misc + yt/yql/providers/yt/common + yt/yql/providers/yt/gateway/file + yt/yql/providers/yt/provider + + yql/essentials/providers/common/proto + yql/essentials/providers/common/provider + yql/essentials/providers/common/comp_nodes + yql/essentials/providers/pg/provider + yql/essentials/providers/config + yql/essentials/providers/result/provider + + yql/essentials/public/issue + yql/essentials/core/facade + yql/essentials/core/url_preprocessing + yql/essentials/core/peephole_opt + yql/essentials/core/type_ann + yql/essentials/core/cbo/simple + yql/essentials/core/services + yql/essentials/ast + yql/essentials/core + yql/essentials/minikql + yql/essentials/minikql/comp_nodes + yql/essentials/parser/pg_wrapper/interface + yql/essentials/sql/v1/format + yql/essentials/utils/log + yql/essentials/utils + + library/cpp/http/io library/cpp/http/server - library/cpp/json - library/cpp/logger + library/cpp/http/misc library/cpp/mime/types - library/cpp/openssl/io - library/cpp/string_utils/quote library/cpp/uri - library/cpp/yson + library/cpp/logger library/cpp/yson/node - yql/essentials/core/cbo/simple - yql/essentials/core/facade - yql/essentials/core/type_ann - yql/essentials/providers/result/provider - yql/essentials/parser/pg_wrapper - yql/essentials/sql/v1/format - yt/yql/providers/yt/gateway/file - yt/yql/providers/yt/provider - yql/essentials/core/url_preprocessing - yql/essentials/providers/pg/provider + library/cpp/openssl/io + library/cpp/charset + library/cpp/yson + library/cpp/json + library/cpp/string_utils/quote + library/cpp/getopt + + contrib/libs/protobuf ) FILES( diff --git a/yql/tools/yqlrun/http/yql_server.cpp b/yql/tools/yqlrun/http/yql_server.cpp index 29547d8fcb..4cefa52396 100644 --- a/yql/tools/yqlrun/http/yql_server.cpp +++ b/yql/tools/yqlrun/http/yql_server.cpp @@ -1,7 +1,5 @@ #include "yql_server.h" -#include <yql/tools/yqlrun/gateway_spec.h> - #include <yql/essentials/core/cbo/simple/cbo_simple.h> #include <yql/essentials/providers/common/proto/gateways_config.pb.h> #include <yql/essentials/providers/common/provider/yql_provider_names.h> @@ -189,8 +187,6 @@ TProgramPtr MakeFileProgram(const TString& program, TYqlServer& yqlServer, dataProvidersInit.push_back(GetYtNativeDataProviderInitializer(ytNativeGateway, MakeSimpleCBOOptimizerFactory(), {})); dataProvidersInit.push_back(GetPgDataProviderInitializer()); - ExtProviderSpecific(yqlServer.FunctionRegistry, dataProvidersInit, rtmrTableAttributes); - TProgramFactory programFactory( true, yqlServer.FunctionRegistry, diff --git a/yql/tools/yqlrun/lib/ya.make b/yql/tools/yqlrun/lib/ya.make new file mode 100644 index 0000000000..ab165f806e --- /dev/null +++ b/yql/tools/yqlrun/lib/ya.make @@ -0,0 +1,23 @@ +LIBRARY() + +SRCS( + yqlrun_lib.cpp +) + +PEERDIR( + yt/yql/providers/yt/provider + yt/yql/providers/yt/gateway/file + + yql/essentials/providers/common/provider + yql/essentials/core/cbo + yql/essentials/core/peephole_opt + yql/essentials/core/cbo/simple + yql/essentials/core/services + + yql/essentials/tools/yql_facade_run + +) + +YQL_LAST_ABI_VERSION() + +END() diff --git a/yql/tools/yqlrun/lib/yqlrun_lib.cpp b/yql/tools/yqlrun/lib/yqlrun_lib.cpp new file mode 100644 index 0000000000..f8b681a456 --- /dev/null +++ b/yql/tools/yqlrun/lib/yqlrun_lib.cpp @@ -0,0 +1,128 @@ +#include "yqlrun_lib.h" + +#include <yt/yql/providers/yt/provider/yql_yt_provider_impl.h> +#include <yt/yql/providers/yt/provider/yql_yt_provider.h> +#include <yt/yql/providers/yt/gateway/file/yql_yt_file_services.h> +#include <yt/yql/providers/yt/gateway/file/yql_yt_file.h> +#include <yql/essentials/providers/common/provider/yql_provider_names.h> +#include <yql/essentials/core/peephole_opt/yql_opt_peephole_physical.h> +#include <yql/essentials/core/services/yql_transform_pipeline.h> +#include <yql/essentials/core/cbo/simple/cbo_simple.h> + +#include <util/generic/yexception.h> +#include <util/folder/iterator.h> +#include <util/folder/dirut.h> +#include <util/folder/path.h> +#include <util/stream/output.h> + +namespace { + +class TPeepHolePipelineConfigurator : public NYql::IPipelineConfigurator { +public: + TPeepHolePipelineConfigurator() = default; + + void AfterCreate(NYql::TTransformationPipeline* pipeline) const final { + Y_UNUSED(pipeline); + } + + void AfterTypeAnnotation(NYql::TTransformationPipeline* pipeline) const final { + Y_UNUSED(pipeline); + } + + void AfterOptimize(NYql::TTransformationPipeline* pipeline) const final { + pipeline->Add(NYql::CreateYtWideFlowTransformer(nullptr), "WideFlow"); + pipeline->Add(NYql::CreateYtBlockInputTransformer(nullptr), "BlockInput"); + pipeline->Add(NYql::MakePeepholeOptimization(pipeline->GetTypeAnnotationContext()), "PeepHole"); + } +}; + +TPeepHolePipelineConfigurator PEEPHOLE_CONFIG_INSTANCE; + +} // unnamed + +namespace NYql { + +TYqlRunTool::TYqlRunTool() + : TFacadeRunner("yqlrun") +{ + GetRunOptions().UseRepeatableRandomAndTimeProviders = true; + GetRunOptions().ResultsFormat = NYson::EYsonFormat::Pretty; + + GetRunOptions().AddOptExtension([this](NLastGetopt::TOpts& opts) { + opts.AddLongOption('t', "table", "Table mapping").RequiredArgument("table@file") + .KVHandler([&](TString name, TString path) { + if (name.empty() || path.empty()) { + throw yexception() << "Incorrect table mapping, expected form table@file, e.g. yt.plato.Input@input.txt"; + } + TablesMapping_[name] = path; + }, '@'); + + }); + GetRunOptions().AddOptExtension([this](NLastGetopt::TOpts& opts) { + opts.AddLongOption("tables-dir", "Table dirs mapping").RequiredArgument("cluster@dir") + .KVHandler([&](TString cluster, TString dir) { + if (cluster.empty() || dir.empty()) { + throw yexception() << "Incorrect table directory mapping, expected form cluster@dir, e.g. yt.plato@/tmp/tables"; + } + TablesDirMapping_[cluster] = dir; + for (const auto& entry: TDirIterator(TFsPath(dir))) { + if (auto entryPath = TFsPath(entry.fts_path); entryPath.IsFile() && entryPath.GetExtension() == "txt") { + auto tableName = TString(cluster).append('.').append(entryPath.RelativeTo(TFsPath(dir)).GetPath()); + tableName = tableName.substr(0, tableName.size() - 4); // remove .txt extension + TablesMapping_[tableName] = entryPath.GetPath(); + } + } + }, '@'); + + }); + GetRunOptions().AddOptExtension([this](NLastGetopt::TOpts& opts) { + opts.AddLongOption('C', "cluster", "Cluster to service mapping").RequiredArgument("name@service") + .KVHandler([&](TString cluster, TString provider) { + if (cluster.empty() || provider.empty()) { + throw yexception() << "Incorrect service mapping, expected form cluster@provider, e.g. plato@yt"; + } + AddClusterMapping(std::move(cluster), std::move(provider)); + }, '@'); + + }); + GetRunOptions().AddOptExtension([this](NLastGetopt::TOpts& opts) { + opts.AddLongOption("ndebug", "Do not show debug info in error output").NoArgument().SetFlag(&GetRunOptions().NoDebug); + }); + GetRunOptions().AddOptExtension([this](NLastGetopt::TOpts& opts) { + opts.AddLongOption("keep-temp", "Keep temporary tables").NoArgument().SetFlag(&KeepTemp_); + }); + GetRunOptions().AddOptExtension([this](NLastGetopt::TOpts& opts) { + opts.AddLongOption("show-progress", "Report operation progress").NoArgument() + .Handler0([&]() { + SetOperationProgressWriter([](const TOperationProgress& progress) { + Cerr << "Operation: [" << progress.Category << "] " << progress.Id << ", state: " << progress.State << "\n"; + }); + }); + }); + GetRunOptions().AddOptExtension([this](NLastGetopt::TOpts& opts) { + opts.AddLongOption("tmp-dir", "Directory for temporary tables").RequiredArgument("DIR").StoreResult(&TmpDir_); + }); + + GetRunOptions().AddOptExtension([this](NLastGetopt::TOpts& opts) { + opts.AddLongOption("test-format", "Compare formatted query's AST with the original query's AST (only syntaxVersion=1 is supported)").NoArgument().SetFlag(&GetRunOptions().TestSqlFormat); + }); + GetRunOptions().AddOptExtension([this](NLastGetopt::TOpts& opts) { + opts.AddLongOption("validate-result-format", "Check that result-format can parse Result").NoArgument().SetFlag(&GetRunOptions().ValidateResultFormat); + }); + + GetRunOptions().SetSupportedGateways({TString{YtProviderName}}); + GetRunOptions().GatewayTypes.emplace(YtProviderName); + AddClusterMapping(TString{"plato"}, TString{YtProviderName}); + + AddProviderFactory([this]() -> NYql::TDataProviderInitializer { + auto yqlNativeServices = NFile::TYtFileServices::Make(GetFuncRegistry().Get(), TablesMapping_, GetFileStorage(), TmpDir_, KeepTemp_, TablesDirMapping_); + auto ytNativeGateway = CreateYtFileGateway(yqlNativeServices); + return GetYtNativeDataProviderInitializer(ytNativeGateway, MakeSimpleCBOOptimizerFactory(), {}); + }); + + SetPeepholePipelineConfigurator(&PEEPHOLE_CONFIG_INSTANCE); + +} + + +} // NYql diff --git a/yql/tools/yqlrun/lib/yqlrun_lib.h b/yql/tools/yqlrun/lib/yqlrun_lib.h new file mode 100644 index 0000000000..bb0e360ade --- /dev/null +++ b/yql/tools/yqlrun/lib/yqlrun_lib.h @@ -0,0 +1,21 @@ +#pragma once + +#include <yql/essentials/tools/yql_facade_run/yql_facade_run.h> + +#include <util/generic/string.h> +#include <util/generic/hash.h> + +namespace NYql { + +class TYqlRunTool: public TFacadeRunner { +public: + TYqlRunTool(); + +private: + THashMap<TString, TString> TablesMapping_; + THashMap<TString, TString> TablesDirMapping_; + bool KeepTemp_ = false; + TString TmpDir_; +}; + +} // NYql diff --git a/yql/tools/yqlrun/ya.make b/yql/tools/yqlrun/ya.make index 71458d0874..e0483c0712 100644 --- a/yql/tools/yqlrun/ya.make +++ b/yql/tools/yqlrun/ya.make @@ -4,7 +4,6 @@ ALLOCATOR(J) SRCS( yqlrun.cpp - gateway_spec.cpp ) IF (OS_LINUX) @@ -14,36 +13,37 @@ IF (OS_LINUX) ENDIF() PEERDIR( - contrib/libs/protobuf - library/cpp/getopt - library/cpp/yson - library/cpp/svnversion - yql/essentials/sql/pg - yql/essentials/core/cbo/simple + yql/tools/yqlrun/http + yql/tools/yqlrun/lib + + yt/yql/providers/yt/comp_nodes/llvm14 + yt/yql/providers/yt/codec/codegen + + yql/essentials/providers/common/provider + yql/essentials/providers/common/udf_resolve + yql/essentials/minikql/invoke_builtins + yql/essentials/minikql/invoke_builtins/llvm14 + yql/essentials/minikql/comp_nodes/llvm14 + yql/essentials/parser/pg_wrapper + yql/essentials/parser/pg_catalog + yql/essentials/core/services/mounts yql/essentials/core/facade + yql/essentials/core/pg_ext yql/essentials/core/file_storage yql/essentials/core/file_storage/proto - yql/essentials/core/file_storage/http_download - yql/essentials/core/pg_ext - yql/essentials/core/services/mounts - yql/essentials/minikql/comp_nodes/llvm14 - yql/essentials/protos + yql/essentials/core yql/essentials/public/udf/service/exception_policy yql/essentials/utils/backtrace - yql/essentials/core - yql/essentials/sql/v1/format - yql/essentials/providers/common/codec - yql/essentials/providers/common/comp_nodes - yql/essentials/providers/common/proto - yql/essentials/providers/common/provider - yql/essentials/providers/common/udf_resolve - yt/yql/providers/yt/gateway/file - yt/yql/providers/yt/codec/codegen - yt/yql/providers/yt/comp_nodes/llvm14 - yql/essentials/core/url_preprocessing - yql/tools/yqlrun/http - yql/essentials/parser/pg_wrapper - yql/essentials/public/result_format + yql/essentials/utils/log + yql/essentials/minikql + yql/essentials/protos + yql/essentials/ast + yql/essentials/sql/pg + + library/cpp/getopt + library/cpp/logger + + contrib/libs/protobuf ) YQL_LAST_ABI_VERSION() diff --git a/yql/tools/yqlrun/yqlrun.cpp b/yql/tools/yqlrun/yqlrun.cpp index a1ae1fc6e0..827c9a13c3 100644 --- a/yql/tools/yqlrun/yqlrun.cpp +++ b/yql/tools/yqlrun/yqlrun.cpp @@ -1,320 +1,38 @@ -#include "gateway_spec.h" - +#include <yql/tools/yqlrun/lib/yqlrun_lib.h> #include <yql/tools/yqlrun/http/yql_server.h> -#include <yt/yql/providers/yt/gateway/file/yql_yt_file.h> -#include <yt/yql/providers/yt/gateway/file/yql_yt_file_services.h> -#include <yt/yql/providers/yt/provider/yql_yt_provider.h> -#include <yt/yql/providers/yt/provider/yql_yt_provider_impl.h> -#include <yql/essentials/core/cbo/simple/cbo_simple.h> -#include <yql/essentials/core/url_preprocessing/url_preprocessing.h> - -#include <yql/essentials/sql/v1/format/sql_format.h> - -#include <yql/essentials/providers/pg/provider/yql_pg_provider.h> -#include <yql/essentials/providers/common/codec/yql_codec.h> -#include <yql/essentials/providers/common/provider/yql_provider_names.h> -#include <yql/essentials/providers/common/udf_resolve/yql_simple_udf_resolver.h> #include <yql/essentials/providers/common/udf_resolve/yql_outproc_udf_resolver.h> +#include <yql/essentials/providers/common/udf_resolve/yql_simple_udf_resolver.h> #include <yql/essentials/providers/common/udf_resolve/yql_udf_resolver_with_index.h> -#include <yql/essentials/providers/common/proto/gateways_config.pb.h> -#include <yql/essentials/providers/common/comp_nodes/yql_factory.h> -#include <yql/essentials/minikql/invoke_builtins/mkql_builtins.h> -#include <yql/essentials/minikql/computation/mkql_computation_node.h> -#include <yql/essentials/minikql/comp_nodes/mkql_factories.h> +#include <yql/essentials/providers/common/provider/yql_provider_names.h> + #include <yql/essentials/minikql/mkql_function_registry.h> -#include <yql/essentials/minikql/mkql_utils.h> -#include <yql/essentials/protos/yql_mount.pb.h> +#include <yql/essentials/minikql/invoke_builtins/mkql_builtins.h> +#include <yql/essentials/utils/backtrace/backtrace.h> +#include <yql/essentials/utils/log/tls_backend.h> +#include <yql/essentials/utils/log/log.h> +#include <yql/essentials/parser/pg_wrapper/interface/parser.h> +#include <yql/essentials/parser/pg_catalog/catalog.h> #include <yql/essentials/protos/pg_ext.pb.h> -#include <yql/essentials/core/yql_library_compiler.h> -#include <yql/essentials/core/facade/yql_facade.h> -#include <yql/essentials/core/pg_ext/yql_pg_ext.h> #include <yql/essentials/core/file_storage/file_storage.h> -#include <yql/essentials/core/file_storage/http_download/http_download.h> #include <yql/essentials/core/file_storage/proto/file_storage.pb.h> -#include <yql/essentials/core/peephole_opt/yql_opt_peephole_physical.h> #include <yql/essentials/core/services/mounts/yql_mounts.h> -#include <yql/essentials/utils/log/log.h> -#include <yql/essentials/utils/backtrace/backtrace.h> -#include <yql/essentials/utils/log/tls_backend.h> -#include <yql/essentials/public/udf/udf_validate.h> -#include <yql/essentials/parser/pg_wrapper/interface/comp_factory.h> -#include <yql/essentials/parser/pg_wrapper/interface/parser.h> -#include <yql/essentials/public/result_format/yql_result_format_response.h> -#include <yql/essentials/public/result_format/yql_result_format_type.h> -#include <yql/essentials/public/result_format/yql_result_format_data.h> +#include <yql/essentials/core/facade/yql_facade.h> +#include <yql/essentials/core/pg_ext/yql_pg_ext.h> +#include <yql/essentials/core/yql_udf_resolver.h> +#include <yql/essentials/core/yql_udf_index.h> +#include <yql/essentials/core/yql_library_compiler.h> +#include <yql/essentials/ast/yql_expr.h> -#include <library/cpp/logger/stream.h> -#include <library/cpp/svnversion/svnversion.h> #include <library/cpp/getopt/last_getopt.h> - -#include <library/cpp/yson/public.h> -#include <library/cpp/yson/writer.h> +#include <library/cpp/logger/stream.h> #include <google/protobuf/text_format.h> -#include <util/stream/file.h> -#include <util/system/user.h> -#include <util/folder/iterator.h> -#include <util/folder/dirut.h> -#include <util/string/join.h> -#include <util/string/builder.h> - -#ifdef __unix__ -#include <sys/resource.h> -#endif - -const ui32 PRETTY_FLAGS = NYql::TAstPrintFlags::PerLine | NYql::TAstPrintFlags::ShortQuote | - NYql::TAstPrintFlags::AdaptArbitraryContent; - -class TMultiProgs { -public: - TMultiProgs(NYql::TProgramFactory& factory, const TString& programFile, const TString& programText, size_t concurrentCount = 1) { - Infos.reserve(concurrentCount); - BaseProg = factory.Create(programFile, programText); - if (concurrentCount) { - factory.UnrepeatableRandom(); - } - for (auto i = concurrentCount; i > 0; --i) { - Infos.emplace_back(TProgInfo({factory.Create(programFile, programText), {}})); - } - } - - bool ParseYql() { - bool result = BaseProg->ParseYql(); - for (auto& info: Infos) { - info.Prog->ParseYql(); - } - - return result; - } - - bool ParseSql(const NSQLTranslation::TTranslationSettings& settings) { - bool result = BaseProg->ParseSql(settings); - for (auto& info: Infos) { - info.Prog->ParseSql(settings); - } - - return result; - } - - bool Compile(const TString& username) { - bool result = BaseProg->Compile(username); - for (auto& info: Infos) { - info.Prog->Compile(username); - } - - return result; - } - - template<class T> - bool CompareStreams(const TString& compareGoal, IOutputStream& out, const T& base, const T& concurrent) const { - const auto baseSize = base.Size(); - const auto concurentSize = concurrent.Size(); - if (baseSize == concurentSize && memcmp(base.Data(), concurrent.Data(), baseSize) == 0) { - return true; - } - out << "Difference " << compareGoal << " of cuncurrent mode is not the same as base run. Size base: " << baseSize << - ", size concurrent: " << concurentSize; - if (concurentSize) { - out << ", concurrent stream: " << Endl << concurrent.Data(); - } else { - out << ", base stream: " << Endl << base.Data(); - } - return false; - } - - void PrintExprTo(IOutputStream& out) { - TStringStream baseSS; - auto baseAst = ConvertToAst(*BaseProg->ExprRoot(), BaseProg->ExprCtx(), NYql::TExprAnnotationFlags::None, true); - baseAst.Root->PrettyPrintTo(baseSS, PRETTY_FLAGS); - for (auto& info: Infos) { - TStringStream ss; - auto ast = ConvertToAst(*info.Prog->ExprRoot(), BaseProg->ExprCtx(), NYql::TExprAnnotationFlags::None, true); - ast.Root->PrettyPrintTo(ss, PRETTY_FLAGS); - if (!CompareStreams("expr representation", out, baseSS, ss)) { - return; - } - } - out << baseSS.Data(); - } - - void FinalizeIssues() { - BaseProg->FinalizeIssues(); - for (auto& info : Infos) { - info.Prog->FinalizeIssues(); - } - } - - void PrintErrorsTo(IOutputStream& out) const { - TStringStream baseSS; - BaseProg->PrintErrorsTo(baseSS); - for (auto& info: Infos) { - TStringStream ss; - info.Prog->PrintErrorsTo(ss); - if (!CompareStreams("error", out, baseSS, ss)) { - return; - } - } - out << baseSS.Data(); - } - - void PrintAstTo(IOutputStream& out) const { - TStringStream baseSS; - BaseProg->AstRoot()->PrettyPrintTo(out, PRETTY_FLAGS); - for (auto& info: Infos) { - TStringStream ss; - info.Prog->AstRoot()->PrettyPrintTo(out, PRETTY_FLAGS); - if (!CompareStreams("AST", out, baseSS, ss)) { - return; - } - } - out << baseSS.Data(); - } - - void SetProgressWriter(NYql::TOperationProgressWriter writer) { - BaseProg->SetProgressWriter(writer); - for (auto& info: Infos) { - info.Prog->SetProgressWriter(writer); - } - } - - void SetValidateOptions(NKikimr::NUdf::EValidateMode validateMode) { - BaseProg->SetValidateOptions(validateMode); - for (auto& info: Infos) { - info.Prog->SetValidateOptions(validateMode); - } - } - - void SetParametersYson(const TString& parameters) { - BaseProg->SetParametersYson(parameters); - for (auto& info : Infos) { - info.Prog->SetParametersYson(parameters); - } - } - - void Print(IOutputStream* exprOut, IOutputStream* planOut) { - bool cleanPlan = true; - BaseProg->Print(exprOut, planOut, cleanPlan); - } - - void ResultsOut(IOutputStream& out) { - if (BaseProg->HasResults()) { - BaseProg->ConfigureYsonResultFormat(NYson::EYsonFormat::Pretty); - out << BaseProg->ResultsAsString(); - } - // Multirun results are ignored - } - - void DiscoveredDataOut(IOutputStream& out) { - if (auto data = BaseProg->GetDiscoveredData()) { - TStringInput in(*data); - NYson::ReformatYsonStream(&in, &out, NYson::EYsonFormat::Pretty); - } - } - - void LineageOut(IOutputStream& out) { - if (auto data = BaseProg->GetLineage()) { - TStringInput in(*data); - NYson::ReformatYsonStream(&in, &out, NYson::EYsonFormat::Pretty); - } - } - - NYql::TProgram::TStatus Run(const TString& username, IOutputStream* traceOut, IOutputStream* tracePlan, IOutputStream* exprOut, bool withTypes) { - YQL_ENSURE(Infos.empty()); - return BaseProg->Run(username, traceOut, tracePlan, exprOut, withTypes); - } - - NYql::TProgram::TStatus Optimize(const TString& username, IOutputStream* traceOut, IOutputStream* tracePlan, IOutputStream* exprOut, bool withTypes) { - YQL_ENSURE(Infos.empty()); - return BaseProg->Optimize(username, traceOut, tracePlan, exprOut, withTypes); - } - - NYql::TProgram::TStatus Validate(const TString& username, IOutputStream* exprOut, bool withTypes) { - YQL_ENSURE(Infos.empty()); - return BaseProg->Validate(username, exprOut, withTypes); - } - - NYql::TProgram::TStatus Discover(const TString& username) { - YQL_ENSURE(Infos.empty()); - return BaseProg->Discover(username); - } - - NYql::TProgram::TStatus Lineage(const TString& username, IOutputStream* traceOut, IOutputStream* exprOut, bool withTypes) { - YQL_ENSURE(Infos.empty()); - return BaseProg->Lineage(username, traceOut, exprOut, withTypes); - } - - NYql::TProgram::TStatus Peephole(const TString& username, IOutputStream* exprOut, bool withTypes) { - YQL_ENSURE(Infos.empty()); - using namespace NYql; - - class TPeepHolePipelineConfigurator : public IPipelineConfigurator { - public: - TPeepHolePipelineConfigurator() = default; - - void AfterCreate(TTransformationPipeline* pipeline) const final { - Y_UNUSED(pipeline); - } - - void AfterTypeAnnotation(TTransformationPipeline* pipeline) const final { - Y_UNUSED(pipeline); - } - - void AfterOptimize(TTransformationPipeline* pipeline) const final { - pipeline->Add(CreateYtWideFlowTransformer(nullptr), "WideFlow"); - pipeline->Add(CreateYtBlockInputTransformer(nullptr), "BlockInput"); - pipeline->Add(MakePeepholeOptimization(pipeline->GetTypeAnnotationContext()), "PeepHole"); - } - }; - - TPeepHolePipelineConfigurator config; - auto status = BaseProg->OptimizeWithConfig(username, config); - if (exprOut && BaseProg->ExprRoot()) { - auto ast = ConvertToAst(*BaseProg->ExprRoot(), BaseProg->ExprCtx(), withTypes ? TExprAnnotationFlags::Types : TExprAnnotationFlags::None, true); - ui32 prettyFlags = TAstPrintFlags::ShortQuote; - if (!withTypes) { - prettyFlags |= TAstPrintFlags::PerLine; - } - ast.Root->PrettyPrintTo(*exprOut, prettyFlags); - } - return status; - } - - NYql::TProgram::TStatus RunAsyncAndWait(const TString& username, IOutputStream* traceOut, IOutputStream* tracePlan, IOutputStream* exprOut, - bool withTypes, bool& emulateOutputForMultirun) { - NYql::TProgram::TStatus baseStatus = BaseProg->Run(username, traceOut, tracePlan, exprOut, withTypes); - // switch this flag only after base run - emulateOutputForMultirun = true; - for (auto& info: Infos) { - info.Future = info.Prog->RunAsync(username, nullptr, nullptr, nullptr, withTypes); - YQL_ENSURE(info.Future.Initialized()); - } - - for (bool wasAsync = true; wasAsync;) { - wasAsync = false; - for (auto& info: Infos) { - auto status = info.Future.GetValueSync(); - if (status == NYql::TProgram::TStatus::Async) { - wasAsync = true; - info.Future = info.Prog->ContinueAsync(); - } else if (status == NYql::TProgram::TStatus::Error) { - baseStatus = status; - } - } - } - return baseStatus; - } - -private: - struct TProgInfo { - NYql::TProgramPtr Prog; - NYql::TProgram::TFutureStatus Future; - }; - - NYql::TProgramPtr BaseProg; - TVector<TProgInfo> Infos; -}; +#include <util/generic/string.h> +#include <util/generic/vector.h> +#include <util/generic/hash.h> +#include <util/datetime/base.h> using namespace NYql; using namespace NKikimr::NMiniKQL; @@ -347,7 +65,7 @@ private: void CommonInit(const NLastGetopt::TOptsParseResult& res, const TString& udfResolverPath, bool filterSysCalls, const TVector<TString>& udfsPaths, TFileStoragePtr fileStorage, - IUdfResolver::TPtr& udfResolver, NKikimr::NMiniKQL::IFunctionRegistry::TPtr funcRegistry, TUdfIndex::TPtr& udfIndex) { + IUdfResolver::TPtr& udfResolver, IFunctionRegistry::TPtr funcRegistry, TUdfIndex::TPtr& udfIndex) { if (fileStorage && res.Has("scan-udfs")) { if (!udfResolverPath) { @@ -385,535 +103,14 @@ THolder<TMessage> ParseProtoConfig(const TString& cfgFile) { return config; } -int Main(int argc, const char *argv[]) -{ - Y_UNUSED(NUdf::GetStaticSymbols()); - using namespace NLastGetopt; - TOpts opts = TOpts::Default(); - TString programFile; - TVector<TString> tablesMappingList; - TVector<TString> tablesDirMappingList; - THashMap<TString, TString> tablesMapping; - THashMap<TString, TString> tablesDirMapping; - TVector<TString> filesMappingList; - TUserDataTable filesMapping; - TVector<TString> urlsMappingList; - TString exprFile; - TString resultFile; - TString planFile; - TString errFile; - TString tmpDir; - TVector<TString> udfsPaths; - TString udfsDir; - TString validateModeStr(NUdf::ValidateModeAsStr(NUdf::EValidateMode::Greedy)); - THashSet<TString> gatewayTypes; - TString mountConfig; - TString udfResolverPath; - bool udfResolverFilterSyscalls = false; - THashMap<TString, TString> clusterMapping; - THashSet<TString> sqlFlags; - clusterMapping["plato"] = YtProviderName; - clusterMapping["pg_catalog"] = PgProviderName; - clusterMapping["information_schema"] = PgProviderName; - ui32 progsConcurrentCount = 0; - TString paramsFile; - ui16 syntaxVersion; - ui64 memLimit; - TString gatewaysCfgFile; - TString fsCfgFile; - TString pgExtConfig; - - opts.AddHelpOption(); - opts.AddLongOption('p', "program", "program file").StoreResult<TString>(&programFile); - opts.AddLongOption('s', "sql", "program is SQL query").NoArgument(); - opts.AddLongOption("pg", "program has PG syntax").NoArgument(); - opts.AddLongOption('t', "table", "table@file").AppendTo(&tablesMappingList); - opts.AddLongOption("tables-dir", "cluster@dir").AppendTo(&tablesDirMappingList); - opts.AddLongOption('C', "cluster", "set cluster to service mapping").RequiredArgument("name@service").Handler(new TStoreMappingFunctor(&clusterMapping)); - opts.AddLongOption("ndebug", "should be at first argument, do not show debug info in error output").NoArgument(); - opts.AddLongOption("parse-only", "exit after program has been parsed").NoArgument(); - opts.AddLongOption("print-ast", "print AST after loading").NoArgument(); - opts.AddLongOption("compile-only", "exit after program has been compiled").NoArgument(); - opts.AddLongOption("print-expr", "print rebuild AST before execution").NoArgument(); - opts.AddLongOption("with-types", "print types annotation").NoArgument(); - opts.AddLongOption("trace-opt", "print AST in the begin of each transformation").NoArgument(); - opts.AddLongOption("expr-file", "print AST to that file instead of stdout").StoreResult<TString>(&exprFile); - opts.AddLongOption("print-result", "print program execution result to stdout").NoArgument(); - opts.AddLongOption("result-file", "print program execution result to file").StoreResult<TString>(&resultFile); - opts.AddLongOption("plan-file", "print program plan to file").StoreResult<TString>(&planFile); - opts.AddLongOption("err-file", "print validate/optimize/runtime errors to file").StoreResult<TString>(&errFile); - opts.AddLongOption('P',"trace-plan", "print plan before execution").NoArgument(); - opts.AddLongOption('L', "show-log", "show logs").NoArgument(); - opts.AddLongOption('D', "discover", "discover tables in the program").NoArgument(); - opts.AddLongOption("validate", "exit after program has been validated").NoArgument(); - opts.AddLongOption("lineage", "exit after data lineage has been calculated").NoArgument(); - opts.AddLongOption('O',"optimize", "optimize expression").NoArgument(); - opts.AddLongOption('R',"run", "run expression using input/output tables").NoArgument(); - opts.AddLongOption("peephole", "perform peephole stage of expression using input/output tables").NoArgument(); - opts.AddLongOption('M',"multirun", "run expression in multi-evaluate (race) mode, as option set concurrent count").StoreResult(&progsConcurrentCount).DefaultValue(progsConcurrentCount); - opts.AddLongOption('f', "file", "name@path").AppendTo(&filesMappingList); - opts.AddLongOption('U', "url", "name@path").AppendTo(&urlsMappingList); - opts.AddLongOption('m', "mounts", "Mount points config file.").StoreResult(&mountConfig); - opts.AddLongOption("opt-collision", "provider optimize collision mode").NoArgument(); - opts.AddLongOption("keep-temp", "keep temporary tables").NoArgument(); - opts.AddLongOption("full-expr", "avoid buffering of expr/plan").NoArgument(); - opts.AddLongOption("show-progress", "report operation progress").NoArgument(); - opts.AddLongOption("tmp-dir", "directory for temporary tables").StoreResult<TString>(&tmpDir); - opts.AddLongOption("reverse-mrkey", "reverse keys for Map/Reduce opeations").NoArgument(); - opts.AddLongOption('u', "udf", "Load shared library with UDF by given path").AppendTo(&udfsPaths); - opts.AddLongOption("udfs-dir", "Load all shared libraries with UDFs found in given directory").StoreResult(&udfsDir); - opts.AddLongOption("mem-info", "Print memory usage information").NoArgument(); - opts.AddLongOption("validate-mode", "validate udf mode, available values: " + NUdf::ValidateModeAvailables()).StoreResult<TString>(&validateModeStr).DefaultValue(validateModeStr); - opts.AddLongOption('G', "gateways", "used gateways").SplitHandler(&gatewayTypes, ',').DefaultValue(YtProviderName); - opts.AddLongOption("udf-resolver", "Path to udf-resolver").Optional().RequiredArgument("PATH").StoreResult(&udfResolverPath); - opts.AddLongOption("udf-resolver-filter-syscalls", "Filter syscalls in udf resolver") - .Optional() - .NoArgument() - .SetFlag(&udfResolverFilterSyscalls); - opts.AddLongOption("scan-udfs", "Scan specified udfs with external udf-resolver to use static function registry").NoArgument(); - opts.AddLongOption("params-file", "Query parameters values in YSON format").StoreResult(¶msFile); - - opts.AddLongOption("sql-flags", "SQL translator pragma flags").SplitHandler(&sqlFlags, ','); - opts.AddLongOption("syntax-version", "SQL syntax version").StoreResult(&syntaxVersion).DefaultValue(1); - opts.AddLongOption("ansi-lexer", "Use ansi lexer").NoArgument(); - opts.AddLongOption("assume-ydb-on-slash", "Assume YDB provider if cluster name starts with '/'").NoArgument(); - opts.AddLongOption("mem-limit", "Set memory limit in megabytes").StoreResult(&memLimit).DefaultValue(0); - opts.AddLongOption("gateways-cfg", "gateways configuration file").Optional().RequiredArgument("FILE").StoreResult(&gatewaysCfgFile); - opts.AddLongOption("fs-cfg", "fs configuration file").Optional().RequiredArgument("FILE").StoreResult(&fsCfgFile); - opts.AddLongOption("test-format", "compare formatted query's AST with the original query's AST (only syntaxVersion=1 is supported)").NoArgument(); - opts.AddLongOption("show-kernels", "show all Arrow kernel families").NoArgument(); - opts.AddLongOption("pg-ext", "pg extensions config file").StoreResult(&pgExtConfig); - opts.AddLongOption("with-final-issues", "Include some final messages (like statistic) in issues").NoArgument(); - opts.AddLongOption("validate-result-format", "Check that result-format can parse Result").NoArgument(); - opts.AddLongOption("test-antlr4", "check antlr4 parser").NoArgument(); - - opts.SetFreeArgsMax(0); - TOptsParseResult res(&opts, argc, argv); - auto builtins = CreateBuiltinRegistry(); - if (res.Has("show-kernels")) { - auto families = builtins->GetAllKernelFamilies(); - Sort(families, [](const auto& x, const auto& y) { return x.first < y.first; }); - ui64 totalKernels = 0; - for (const auto& f : families) { - auto numKernels = f.second->GetAllKernels().size(); - Cout << f.first << ": " << numKernels << " kernels\n"; - totalKernels += numKernels; - } - - Cout << "Total kernel families: " << families.size() << ", kernels: " << totalKernels << "\n"; - return 0; - } - - NPg::SetSqlLanguageParser(NSQLTranslationPG::CreateSqlLanguageParser()); - NPg::LoadSystemFunctions(*NSQLTranslationPG::CreateSystemFunctionsParser()); - if (!pgExtConfig.empty()) { - auto config = ParseProtoConfig<NProto::TPgExtensions>(pgExtConfig); - Y_ABORT_UNLESS(config); - TVector<NPg::TExtensionDesc> extensions; - PgExtensionsFromProto(*config, extensions); - NPg::RegisterExtensions(extensions, false, - *NSQLTranslationPG::CreateExtensionSqlParser(), - NKikimr::NMiniKQL::CreateExtensionLoader().get()); - } - - NPg::GetSqlLanguageParser()->Freeze(); - - const bool parseOnly = res.Has("parse-only"); - const bool compileOnly = res.Has("compile-only"); - const bool hasValidate = !parseOnly && !compileOnly; - if (hasValidate && !gatewayTypes.contains(YtProviderName)) { - Cerr << "At least one gateway from the list " << Join(",", YtProviderName).Quote() << " must be specified" << Endl; - return 1; - } - - for (auto& s: tablesMappingList) { - TStringBuf tableName, filePath; - TStringBuf(s).Split('@', tableName, filePath); - if (tableName.empty() || filePath.empty()) { - Cerr << "Incorrect table mapping, expected form table@file, e.g. yt.plato.Input@input.txt" << Endl; - return 1; - } - tablesMapping[tableName] = filePath; - } - - for (auto& s : tablesDirMappingList) { - TStringBuf clusterName, dirPath; - TStringBuf(s).Split('@', clusterName, dirPath); - if (clusterName.empty() || dirPath.empty()) { - Cerr << "Incorrect table directory mapping, expected form cluster@dir, e.g. yt.plato@/tmp/tables" << Endl; - return 1; - } - tablesDirMapping[clusterName] = dirPath; - for (const auto& entry : TDirIterator(TFsPath(dirPath))) { - if (auto entryPath = TFsPath(entry.fts_path); entryPath.IsFile() && entryPath.GetExtension() == "txt") { - auto tableName = TString(clusterName).append('.').append(entryPath.RelativeTo(TFsPath(dirPath)).GetPath()); - tableName = tableName.substr(0, tableName.size() - 4); // remove .txt extension - tablesMapping[tableName] = entryPath.GetPath(); - } - } - } - - if (hasValidate) { - for (auto& s : filesMappingList) { - TStringBuf fileName, filePath; - TStringBuf(s).Split('@', fileName, filePath); - if (fileName.empty() || filePath.empty()) { - Cerr << "Incorrect file mapping, expected form name@path, e.g. MyFile@file.txt" << Endl; - return 1; - } - - auto& file = filesMapping[TUserDataKey::File(GetDefaultFilePrefix() + fileName)]; - file.Type = EUserDataType::PATH; - file.Data = filePath; - } - - for (auto& s : urlsMappingList) { - TStringBuf name, path; - TStringBuf(s).Split('@', name, path); - if (name.empty() || path.empty()) { - Cerr << "Incorrect url mapping, expected form name@path, e.g. MyFile@sbr:123456" << Endl; - return 1; - } - - auto& block = filesMapping[TUserDataKey::File(GetDefaultFilePrefix() + name)]; - block.Type = EUserDataType::URL; - block.Data = path; - } - } - - if (memLimit) { -#ifdef __unix__ - memLimit *= 1024 * 1024; - - struct rlimit rl; - - if (getrlimit(RLIMIT_AS, &rl)) { - throw TSystemError() << "Cannot getrlimit(RLIMIT_AS)"; - } - - rl.rlim_cur = memLimit; - if (setrlimit(RLIMIT_AS, &rl)) { - throw TSystemError() << "Cannot setrlimit(RLIMIT_AS) to " << memLimit << " bytes"; - } -#else - Cerr << "Memory limit can not be set on this platfrom" << Endl; - return 1; -#endif - } - - IOutputStream* errStream = &Cerr; - THolder<TFixedBufferFileOutput> errFileHolder; - if (!errFile.empty()) { - errFileHolder.Reset(new TFixedBufferFileOutput(errFile)); - errStream = errFileHolder.Get(); - } - - TExprContext ctx; - ctx.NextUniqueId = NPg::GetSqlLanguageParser()->GetContext().NextUniqueId; - IModuleResolver::TPtr moduleResolver; - if (!mountConfig.empty()) { - TModulesTable modules; - auto mount = ParseProtoConfig<NYqlMountConfig::TMountConfig>(mountConfig); - Y_ABORT_UNLESS(mount); - FillUserDataTableFromFileSystem(*mount, filesMapping); - - if (!CompileLibraries(filesMapping, ctx, modules)) { - *errStream << "Errors on compile libraries:" << Endl; - ctx.IssueManager.GetIssues().PrintTo(*errStream); - return -1; - } - - moduleResolver = std::make_shared<TModuleResolver>(std::move(modules), ctx.NextUniqueId, clusterMapping, sqlFlags, hasValidate); - } else { - if (!GetYqlDefaultModuleResolver(ctx, moduleResolver, clusterMapping, hasValidate)) { - *errStream << "Errors loading default YQL libraries:" << Endl; - ctx.IssueManager.GetIssues().PrintTo(*errStream); - return -1; - } - } - - TExprContext::TFreezeGuard freezeGuard(ctx); - - TString programText; - if (programFile.empty()) { - Cerr << "Missing --program argument\n"; - return -1; - } - - if (programFile == TStringBuf("-")) { - programFile = TStringBuf("-stdin-"); - programText = Cin.ReadAll(); - } else { - programText = TFileInput(programFile).ReadAll(); - } - - THolder<TFileStorageConfig> fsConfig; - if (!fsCfgFile.empty()) { - fsConfig = ParseProtoConfig<TFileStorageConfig>(fsCfgFile); - if (!fsConfig) { - return 1; - } - } else { - fsConfig = MakeHolder<TFileStorageConfig>(); - } - - THolder<TGatewaysConfig> gatewaysConfig; - if (!gatewaysCfgFile.empty()) { - gatewaysConfig = ParseProtoConfig<TGatewaysConfig>(gatewaysCfgFile); - if (!gatewaysConfig) { - return 1; - } - if (gatewaysConfig->HasSqlCore()) { - sqlFlags.insert(gatewaysConfig->GetSqlCore().GetTranslationFlags().begin(), gatewaysConfig->GetSqlCore().GetTranslationFlags().end()); - } - } - - TFileStoragePtr fileStorage; - if (hasValidate) { - NMiniKQL::FindUdfsInDir(udfsDir, &udfsPaths); - - fileStorage = WithAsync(CreateFileStorage(*fsConfig)); - } - - IUdfResolver::TPtr udfResolver; - auto funcRegistry = CreateFunctionRegistry(&NYql::NBacktrace::KikimrBackTrace, CreateBuiltinRegistry(), true, udfsPaths); - TUdfIndex::TPtr udfIndex; - CommonInit(res, udfResolverPath, udfResolverFilterSyscalls, udfsPaths, fileStorage, udfResolver, funcRegistry, udfIndex); - - TAutoPtr<IThreadPool> ytExecutionQueue; - TVector<TDataProviderInitializer> dataProvidersInit; - - dataProvidersInit.push_back(GetPgDataProviderInitializer()); - - bool emulateOutputForMultirun = false; - if (hasValidate) { - if (gatewayTypes.contains(YtProviderName) || res.Has("opt-collision")) { - auto yqlNativeServices = NFile::TYtFileServices::Make(funcRegistry.Get(), tablesMapping, fileStorage, tmpDir, res.Has("keep-temp"), tablesDirMapping); - auto ytNativeGateway = CreateYtFileGateway(yqlNativeServices, &emulateOutputForMultirun); - dataProvidersInit.push_back(GetYtNativeDataProviderInitializer(ytNativeGateway, MakeSimpleCBOOptimizerFactory(), {})); - } - } - - if (hasValidate && res.Has("opt-collision")) { - ExtProviderSpecific(funcRegistry.Get(), dataProvidersInit); - } - - TProgramFactory factory(true, funcRegistry.Get(), ctx.NextUniqueId, dataProvidersInit, "yqlrun"); - factory.AddUserDataTable(filesMapping); - factory.SetModules(moduleResolver); - factory.SetFileStorage(fileStorage); - if (gatewaysConfig && gatewaysConfig->HasFs()) { - factory.SetUrlPreprocessing(new NYql::TUrlPreprocessing(*gatewaysConfig)); - } - factory.SetUdfIndex(udfIndex, new TUdfIndexPackageSet()); - factory.SetUdfResolver(udfResolver); - factory.SetGatewaysConfig(gatewaysConfig.Get()); - factory.EnableRangeComputeFor(); - - auto program = MakeHolder<TMultiProgs>(factory, programFile, programText, progsConcurrentCount); - if (res.Has("show-progress")) { - program->SetProgressWriter([](const TOperationProgress& progress) { - Cerr << "Operation: [" << progress.Category << "] " << progress.Id << ", state: " << progress.State << "\n"; - }); - } - - if (paramsFile) { - TString parameters = TFileInput(paramsFile).ReadAll(); - program->SetParametersYson(parameters); - } - - if (res.Has("sql") || res.Has("pg")) { - google::protobuf::Arena arena; - NSQLTranslation::TTranslationSettings settings; - settings.Arena = &arena; - settings.PgParser = res.Has("pg"); - settings.ClusterMapping = clusterMapping; - settings.Flags = sqlFlags; - settings.SyntaxVersion = syntaxVersion; - settings.AnsiLexer = res.Has("ansi-lexer"); - settings.TestAntlr4 = res.Has("test-antlr4"); - settings.V0Behavior = NSQLTranslation::EV0Behavior::Report; - settings.AssumeYdbOnClusterWithSlash = res.Has("assume-ydb-on-slash"); - if (res.Has("discover")) { - settings.Mode = NSQLTranslation::ESqlMode::DISCOVERY; - } - if (!program->ParseSql(settings)) { - program->PrintErrorsTo(*errStream); - return 1; - } - if (res.Has("test-format") && syntaxVersion == 1) { - TString formattedProgramText; - NYql::TIssues issues; - auto formatter = NSQLFormat::MakeSqlFormatter(settings); - if (!formatter->Format(programText, formattedProgramText, issues)) { - Cerr << "Format failed: "; - issues.PrintTo(Cerr); - return 1; - } - - auto frmProgram = MakeHolder<TMultiProgs>(factory, "formatted SQL", formattedProgramText, progsConcurrentCount); - if (!frmProgram->ParseSql(settings)) { - frmProgram->PrintErrorsTo(*errStream); - return 1; - } - - TStringStream SrcQuery, FrmQuery; - - program->PrintAstTo(SrcQuery); - frmProgram->PrintAstTo(FrmQuery); - if (SrcQuery.Str() != FrmQuery.Str()) { - Cerr << "source query's AST and formatted query's AST are not same\n"; - return 1; - } - } - } else { - if (!program->ParseYql()) { - program->PrintErrorsTo(*errStream); - return 1; - } - } - - if (res.Has("print-ast")) { - program->PrintAstTo(Cout); - } - - - if (res.Has("parse-only")) - return 0; - - const TString username = GetUsername(); - bool withTypes = res.Has("with-types"); - IOutputStream* traceOut = res.Has("trace-opt") ? &Cerr : nullptr; - - IOutputStream* exprOut = nullptr; - THolder<TFixedBufferFileOutput> exprFileHolder; - if (res.Has("print-expr")) { - exprOut = &Cout; - } else if (!exprFile.empty()) { - exprFileHolder.Reset(new TFixedBufferFileOutput(exprFile)); - exprOut = exprFileHolder.Get(); - } - - IOutputStream* tracePlan = nullptr; - THolder<TFixedBufferFileOutput> planFileHolder; - if (res.Has("trace-plan")) { - tracePlan = &Cout; - } - else if (!planFile.empty()) { - planFileHolder.Reset(new TFixedBufferFileOutput(planFile)); - tracePlan = planFileHolder.Get(); - } - - if (res.Has("show-log")) { - using namespace ::NYql::NLog; - InitLogger(&Cerr); - NLog::EComponentHelpers::ForEach([](NLog::EComponent c) { - YqlLogger().SetComponentLevel(c, ELevel::DEBUG); - }); - } - if (res.Has("trace-opt")) { - NLog::YqlLogger().SetComponentLevel(NLog::EComponent::Core, NLog::ELevel::TRACE); - NLog::YqlLogger().SetComponentLevel(NLog::EComponent::CoreEval, NLog::ELevel::TRACE); - NLog::YqlLogger().SetComponentLevel(NLog::EComponent::CorePeepHole, NLog::ELevel::TRACE); - } - - program->SetValidateOptions(NUdf::ValidateModeByStr(validateModeStr)); - - TProgram::TStatus status = TProgram::TStatus::Ok; - const bool fullExpr = res.Has("full-expr"); - IOutputStream* fullTracePlan = fullExpr ? tracePlan : nullptr; - IOutputStream* fullExprOut = fullExpr ? exprOut : nullptr; - - if (!program->Compile(username)) { - program->PrintErrorsTo(*errStream); - return 1; - } - - if (res.Has("compile-only")) { - if (res.Has("print-expr")) { - program->PrintExprTo(Cout); - } - return 0; - } - - if (res.Has("multirun")) { - status = program->RunAsyncAndWait(username, traceOut, fullTracePlan, fullExprOut, withTypes, emulateOutputForMultirun); - } else if (res.Has("peephole")) { - status = program->Peephole(username, exprOut, withTypes); - } else if (res.Has("run")) { - status = program->Run(username, traceOut, fullTracePlan, fullExprOut, withTypes); - } else if (res.Has("optimize")) { - status = program->Optimize(username, traceOut, fullTracePlan, fullExprOut, withTypes); - } else if (res.Has("validate")) { - status = program->Validate(username, exprOut, withTypes); - } else if (res.Has("discover")) { - status = program->Discover(username); - } else if (res.Has("lineage")) { - status = program->Lineage(username, traceOut, exprOut, withTypes); - } - - if (res.Has("with-final-issues")) { - program->FinalizeIssues(); - } - program->PrintErrorsTo(*errStream); - if (status == TProgram::TStatus::Error) { - return 1; - } - - if (!fullExpr && !res.Has("peephole")) { - program->Print(exprOut, tracePlan); - } - - IOutputStream* resultOut = nullptr; - THolder<TFixedBufferFileOutput> resultFileHolder; - if (res.Has("print-result")) { - resultOut = &Cout; - } else if (!resultFile.empty()) { - resultFileHolder.Reset(new TFixedBufferFileOutput(resultFile)); - resultOut = resultFileHolder.Get(); - } - - if (resultOut) { - if (res.Has("discover")) { - program->DiscoveredDataOut(*resultOut); - } else if (res.Has("lineage")) { - program->LineageOut(*resultOut); - } else { - if (res.Has("validate-result-format")) { - TString str; - TStringOutput out(str); - program->ResultsOut(out); - if (!str.empty()) { - auto node = NYT::NodeFromYsonString(str); - for (const auto& r : NResult::ParseResponse(node)) { - for (const auto& write : r.Writes) { - if (write.Type) { - NResult::TEmptyTypeVisitor visitor; - NResult::ParseType(*write.Type, visitor); - } - - if (write.Type && write.Data) { - NResult::TEmptyDataVisitor visitor; - NResult::ParseData(*write.Type, *write.Data, visitor); - } - } - } - } - - resultOut->Write(str.data(), str.size()); - } else { - program->ResultsOut(*resultOut); - } - } - } - - NLog::CleanupLogger(); - - return 0; -} int RunUI(int argc, const char* argv[]) { + Cerr << "yqlrun ABI version: " << NKikimr::NUdf::CurrentAbiVersionStr() << Endl; + + NYql::NBacktrace::RegisterKikimrFatalActions(); + NYql::NBacktrace::EnableKikimrSymbolize(); + TVector<TString> udfsPaths; TString udfsDir; TString mountConfig; @@ -1064,18 +261,11 @@ int RunUI(int argc, const char* argv[]) } int main(int argc, const char *argv[]) { - if (argc > 1 && TString(argv[1]) != TStringBuf("--ndebug")) { - Cerr << "yqlrun ABI version: " << NKikimr::NUdf::CurrentAbiVersionStr() << Endl; - } - - NYql::NBacktrace::RegisterKikimrFatalActions(); - NYql::NBacktrace::EnableKikimrSymbolize(); - try { if (argc > 1 && TString(argv[1]) == TStringBuf("ui")) { return RunUI(argc, argv); } else { - return Main(argc, argv); + return NYql::TYqlRunTool().Main(argc, argv); } } catch (...) { |