aboutsummaryrefslogtreecommitdiffstats
path: root/yql
diff options
context:
space:
mode:
authorudovichenko-r <udovichenko-r@yandex-team.com>2024-12-13 14:38:21 +0300
committerudovichenko-r <udovichenko-r@yandex-team.com>2024-12-13 15:57:44 +0300
commit1402a032649d20ffb80a541f706afcd31cd897fa (patch)
tree2c7d158a5ab885a6301dd0da832b16fd5f39c711 /yql
parent933c5664c69e565f3ad5752f5862e0ba860a52fe (diff)
downloadydb-1402a032649d20ffb80a541f706afcd31cd897fa.tar.gz
Facade runner lib
commit_hash:06b99da07b603d38b18615794ce645139fcec9d0
Diffstat (limited to 'yql')
-rw-r--r--yql/essentials/core/facade/yql_facade.cpp4
-rw-r--r--yql/essentials/tests/common/test_framework/yqlrun.py2
-rw-r--r--yql/essentials/tools/arrow_kernels_dump/arrow_kernels_dump.cpp24
-rw-r--r--yql/essentials/tools/arrow_kernels_dump/ya.make23
-rw-r--r--yql/essentials/tools/ya.make4
-rw-r--r--yql/essentials/tools/yql_facade_run/ya.make50
-rw-r--r--yql/essentials/tools/yql_facade_run/yql_facade_run.cpp779
-rw-r--r--yql/essentials/tools/yql_facade_run/yql_facade_run.h213
-rw-r--r--yql/tools/yqlrun/gateway_spec.cpp14
-rw-r--r--yql/tools/yqlrun/gateway_spec.h8
-rw-r--r--yql/tools/yqlrun/http/ya.make55
-rw-r--r--yql/tools/yqlrun/http/yql_server.cpp4
-rw-r--r--yql/tools/yqlrun/lib/ya.make23
-rw-r--r--yql/tools/yqlrun/lib/yqlrun_lib.cpp128
-rw-r--r--yql/tools/yqlrun/lib/yqlrun_lib.h21
-rw-r--r--yql/tools/yqlrun/ya.make52
-rw-r--r--yql/tools/yqlrun/yqlrun.cpp866
17 files changed, 1359 insertions, 911 deletions
diff --git a/yql/essentials/core/facade/yql_facade.cpp b/yql/essentials/core/facade/yql_facade.cpp
index ee8e31eb35..a0b07e469f 100644
--- a/yql/essentials/core/facade/yql_facade.cpp
+++ b/yql/essentials/core/facade/yql_facade.cpp
@@ -1598,7 +1598,7 @@ TMaybe<TString> TProgram::GetStatistics(bool totalOnly, THashMap<TString, TStrin
}
TStringStream out;
- NYson::TYsonWriter writer(&out);
+ NYson::TYsonWriter writer(&out, OutputFormat_);
// Header
writer.OnBeginMap();
writer.OnKeyedItem("ExecutionStatistics");
@@ -1677,7 +1677,7 @@ TMaybe<TString> TProgram::GetDiscoveredData() {
}
TStringStream out;
- NYson::TYsonWriter writer(&out);
+ NYson::TYsonWriter writer(&out, OutputFormat_);
writer.OnBeginMap();
for (auto& datasource: TypeCtx_->DataSources) {
TStringStream providerOut;
diff --git a/yql/essentials/tests/common/test_framework/yqlrun.py b/yql/essentials/tests/common/test_framework/yqlrun.py
index 95aa794ec7..e23b81c92f 100644
--- a/yql/essentials/tests/common/test_framework/yqlrun.py
+++ b/yql/essentials/tests/common/test_framework/yqlrun.py
@@ -234,7 +234,7 @@ class YQLRun(object):
for name in self.tables:
cmd += '--table=yt.%s@%s ' % (name, self.tables[name].yqlrun_file)
- if "--lineage" not in self.extra_args:
+ if "--lineage" not in self.extra_args and "--peephole" not in self.extra_args:
if optimize_only:
cmd += '-O '
else:
diff --git a/yql/essentials/tools/arrow_kernels_dump/arrow_kernels_dump.cpp b/yql/essentials/tools/arrow_kernels_dump/arrow_kernels_dump.cpp
new file mode 100644
index 0000000000..9c7778bfa7
--- /dev/null
+++ b/yql/essentials/tools/arrow_kernels_dump/arrow_kernels_dump.cpp
@@ -0,0 +1,24 @@
+#include <yql/essentials/minikql/invoke_builtins/mkql_builtins.h>
+#include <yql/essentials/public/udf/udf_version.h>
+
+#include <util/stream/output.h>
+#include <util/generic/algorithm.h>
+#include <util/folder/path.h>
+
+int main(int argc, char **argv) {
+ Y_UNUSED(argc);
+ Cerr << TFsPath(argv[0]).GetName() << " ABI version: " << NKikimr::NUdf::CurrentAbiVersionStr() << Endl;
+
+ auto builtins = NKikimr::NMiniKQL::CreateBuiltinRegistry();
+ auto families = builtins->GetAllKernelFamilies();
+ Sort(families, [](const auto& x, const auto& y) { return x.first < y.first; });
+ ui64 totalKernels = 0;
+ for (const auto& f : families) {
+ auto numKernels = f.second->GetAllKernels().size();
+ Cout << f.first << ": " << numKernels << " kernels" << Endl;
+ totalKernels += numKernels;
+ }
+
+ Cout << "Total kernel families: " << families.size() << ", kernels: " << totalKernels << Endl;
+ return 0;
+}
diff --git a/yql/essentials/tools/arrow_kernels_dump/ya.make b/yql/essentials/tools/arrow_kernels_dump/ya.make
new file mode 100644
index 0000000000..d33cfbd5b9
--- /dev/null
+++ b/yql/essentials/tools/arrow_kernels_dump/ya.make
@@ -0,0 +1,23 @@
+PROGRAM()
+
+SRCS(
+ arrow_kernels_dump.cpp
+)
+
+IF (OS_LINUX)
+ # prevent external python extensions to lookup protobuf symbols (and maybe
+ # other common stuff) in main binary
+ EXPORTS_SCRIPT(${ARCADIA_ROOT}/yql/essentials/tools/exports.symlist)
+ENDIF()
+
+PEERDIR(
+ yql/essentials/minikql/invoke_builtins
+ yql/essentials/minikql/invoke_builtins/llvm14
+ yql/essentials/public/udf/service/terminate_policy
+ yql/essentials/public/udf
+ yql/essentials/parser/pg_wrapper
+)
+
+YQL_LAST_ABI_VERSION()
+
+END()
diff --git a/yql/essentials/tools/ya.make b/yql/essentials/tools/ya.make
index 89ea1d9d22..8d74153182 100644
--- a/yql/essentials/tools/ya.make
+++ b/yql/essentials/tools/ya.make
@@ -1,5 +1,6 @@
RECURSE(
- astdiff
+ arrow_kernels_dump
+ astdiff
pg_catalog_dump
pg-make-test
pgrun
@@ -9,4 +10,5 @@ RECURSE(
udf_dep_stub
udf_probe
udf_resolver
+ yql_facade_run
)
diff --git a/yql/essentials/tools/yql_facade_run/ya.make b/yql/essentials/tools/yql_facade_run/ya.make
new file mode 100644
index 0000000000..e9d93e235d
--- /dev/null
+++ b/yql/essentials/tools/yql_facade_run/ya.make
@@ -0,0 +1,50 @@
+LIBRARY()
+
+SRCS(
+ yql_facade_run.cpp
+)
+
+PEERDIR(
+ yql/essentials/providers/pg/provider
+ yql/essentials/providers/common/provider
+ yql/essentials/providers/common/proto
+ yql/essentials/providers/common/udf_resolve
+ yql/essentials/core/file_storage
+ yql/essentials/core/file_storage/proto
+ yql/essentials/core/file_storage/defs
+ yql/essentials/core/url_lister/interface
+ yql/essentials/core/services/mounts
+ yql/essentials/core/services
+ yql/essentials/core/credentials
+ yql/essentials/core/pg_ext
+ yql/essentials/core/facade
+ yql/essentials/core/url_lister
+ yql/essentials/core/url_preprocessing
+ yql/essentials/core/peephole_opt
+ yql/essentials/core
+ yql/essentials/minikql/invoke_builtins
+ yql/essentials/minikql
+ yql/essentials/ast
+ yql/essentials/parser/pg_wrapper/interface
+ yql/essentials/parser/pg_catalog
+ yql/essentials/public/udf
+ yql/essentials/public/result_format
+ yql/essentials/utils/failure_injector
+ yql/essentials/utils/backtrace
+ yql/essentials/utils/log
+ yql/essentials/protos
+ yql/essentials/sql/settings
+ yql/essentials/sql/v1/format
+
+ library/cpp/resource
+ library/cpp/getopt
+ library/cpp/yson/node
+ library/cpp/yson
+ library/cpp/logger
+
+ contrib/libs/protobuf
+)
+
+YQL_LAST_ABI_VERSION()
+
+END()
diff --git a/yql/essentials/tools/yql_facade_run/yql_facade_run.cpp b/yql/essentials/tools/yql_facade_run/yql_facade_run.cpp
new file mode 100644
index 0000000000..91a3a3eaf8
--- /dev/null
+++ b/yql/essentials/tools/yql_facade_run/yql_facade_run.cpp
@@ -0,0 +1,779 @@
+#include "yql_facade_run.h"
+
+#include <yql/essentials/providers/pg/provider/yql_pg_provider.h>
+#include <yql/essentials/providers/common/provider/yql_provider_names.h>
+#include <yql/essentials/providers/common/proto/gateways_config.pb.h>
+#include <yql/essentials/providers/common/udf_resolve/yql_outproc_udf_resolver.h>
+#include <yql/essentials/providers/common/udf_resolve/yql_simple_udf_resolver.h>
+#include <yql/essentials/providers/common/udf_resolve/yql_udf_resolver_with_index.h>
+#include <yql/essentials/core/yql_user_data_storage.h>
+#include <yql/essentials/core/yql_udf_resolver.h>
+#include <yql/essentials/core/yql_udf_index.h>
+#include <yql/essentials/core/yql_udf_index_package_set.h>
+#include <yql/essentials/core/yql_library_compiler.h>
+#include <yql/essentials/core/yql_type_annotation.h>
+#include <yql/essentials/core/pg_ext/yql_pg_ext.h>
+#include <yql/essentials/core/services/mounts/yql_mounts.h>
+#include <yql/essentials/core/services/yql_out_transformers.h>
+#include <yql/essentials/core/file_storage/file_storage.h>
+#include <yql/essentials/core/file_storage/proto/file_storage.pb.h>
+#include <yql/essentials/core/peephole_opt/yql_opt_peephole_physical.h>
+#include <yql/essentials/core/facade/yql_facade.h>
+#include <yql/essentials/core/url_lister/url_lister_manager.h>
+#include <yql/essentials/core/url_preprocessing/url_preprocessing.h>
+#include <yql/essentials/minikql/invoke_builtins/mkql_builtins.h>
+#include <yql/essentials/minikql/mkql_function_registry.h>
+#include <yql/essentials/ast/yql_expr.h>
+#include <yql/essentials/parser/pg_wrapper/interface/parser.h>
+#include <yql/essentials/parser/pg_catalog/catalog.h>
+#include <yql/essentials/public/udf/udf_version.h>
+#include <yql/essentials/public/udf/udf_registrator.h>
+#include <yql/essentials/public/udf/udf_validate.h>
+#include <yql/essentials/public/result_format/yql_result_format_response.h>
+#include <yql/essentials/public/result_format/yql_result_format_type.h>
+#include <yql/essentials/public/result_format/yql_result_format_data.h>
+#include <yql/essentials/utils/failure_injector/failure_injector.h>
+#include <yql/essentials/utils/backtrace/backtrace.h>
+#include <yql/essentials/utils/log/log.h>
+#include <yql/essentials/protos/yql_mount.pb.h>
+#include <yql/essentials/protos/pg_ext.pb.h>
+#include <yql/essentials/sql/settings/translation_settings.h>
+#include <yql/essentials/sql/v1/format/sql_format.h>
+
+#include <library/cpp/resource/resource.h>
+#include <library/cpp/yson/node/node_io.h>
+#include <library/cpp/yson/writer.h>
+
+#include <google/protobuf/text_format.h>
+#include <google/protobuf/arena.h>
+
+#include <util/stream/output.h>
+#include <util/stream/file.h>
+#include <util/stream/null.h>
+#include <util/system/user.h>
+#include <util/string/split.h>
+#include <util/string/join.h>
+#include <util/string/builder.h>
+#include <util/generic/vector.h>
+#include <util/generic/ptr.h>
+#include <util/generic/yexception.h>
+#include <util/datetime/base.h>
+
+#ifdef __unix__
+#include <sys/resource.h>
+#endif
+
+namespace {
+
+const ui32 PRETTY_FLAGS = NYql::TAstPrintFlags::PerLine | NYql::TAstPrintFlags::ShortQuote |
+ NYql::TAstPrintFlags::AdaptArbitraryContent;
+
+template <typename TMessage>
+THolder<TMessage> ParseProtoConfig(const TString& cfgFile) {
+ auto config = MakeHolder<TMessage>();
+ TString configData = TFileInput(cfgFile).ReadAll();
+
+ using ::google::protobuf::TextFormat;
+ if (!TextFormat::ParseFromString(configData, config.Get())) {
+ throw yexception() << "Bad format of config file " << cfgFile;
+ }
+
+ return config;
+}
+
+template <typename TMessage>
+THolder<TMessage> ParseProtoFromResource(TStringBuf resourceName) {
+ if (!NResource::Has(resourceName)) {
+ return {};
+ }
+ auto config = MakeHolder<TMessage>();
+ TString configData = NResource::Find(resourceName);
+
+ using ::google::protobuf::TextFormat;
+ if (!TextFormat::ParseFromString(configData, config.Get())) {
+ throw yexception() << "Bad format of config " << resourceName;
+ }
+ return config;
+}
+
+class TOptPipelineConfigurator : public NYql::IPipelineConfigurator {
+public:
+ TOptPipelineConfigurator(NYql::TProgramPtr prg, IOutputStream* planStream, IOutputStream* exprStream, bool withTypes)
+ : Program_(std::move(prg))
+ , PlanStream_(planStream)
+ , ExprStream_(exprStream)
+ , WithTypes_(withTypes)
+ {
+ }
+
+ void AfterCreate(NYql::TTransformationPipeline* pipeline) const final {
+ Y_UNUSED(pipeline);
+ }
+
+ void AfterTypeAnnotation(NYql::TTransformationPipeline* pipeline) const final {
+ pipeline->Add(NYql::TExprLogTransformer::Sync("OptimizedExpr", NYql::NLog::EComponent::Core, NYql::NLog::ELevel::TRACE),
+ "OptTrace", NYql::TIssuesIds::CORE, "OptTrace");
+ }
+
+ void AfterOptimize(NYql::TTransformationPipeline* pipeline) const final {
+ if (ExprStream_) {
+ pipeline->Add(NYql::TExprOutputTransformer::Sync(Program_->ExprRoot(), ExprStream_, WithTypes_), "AstOutput");
+ }
+ if (PlanStream_) {
+ pipeline->Add(NYql::TPlanOutputTransformer::Sync(PlanStream_, Program_->GetPlanBuilder(), Program_->GetOutputFormat()), "PlanOutput");
+ }
+ }
+private:
+ NYql::TProgramPtr Program_;
+ IOutputStream* PlanStream_;
+ IOutputStream* ExprStream_;
+ bool WithTypes_;
+};
+
+class TPeepHolePipelineConfigurator : public NYql::IPipelineConfigurator {
+public:
+ TPeepHolePipelineConfigurator() {
+ }
+
+ void AfterCreate(NYql::TTransformationPipeline* pipeline) const final {
+ Y_UNUSED(pipeline);
+ }
+
+ void AfterTypeAnnotation(NYql::TTransformationPipeline* pipeline) const final {
+ pipeline->Add(NYql::TExprLogTransformer::Sync("OptimizedExpr", NYql::NLog::EComponent::Core, NYql::NLog::ELevel::TRACE),
+ "OptTrace", NYql::TIssuesIds::CORE, "OptTrace");
+ }
+
+ void AfterOptimize(NYql::TTransformationPipeline* pipeline) const final {
+ pipeline->Add(NYql::MakePeepholeOptimization(pipeline->GetTypeAnnotationContext()), "PeepHole");
+ }
+};
+
+} // unnamed
+
+
+namespace NYql {
+
+TFacadeRunOptions::TFacadeRunOptions() {
+ User = GetUsername();
+}
+
+TFacadeRunOptions::~TFacadeRunOptions() {
+}
+
+void TFacadeRunOptions::InitLogger() {
+ if (Verbosity != LOG_DEF_PRIORITY) {
+ NYql::NLog::ELevel level = NYql::NLog::ELevelHelpers::FromInt(Verbosity);
+ NYql::NLog::EComponentHelpers::ForEach([level](NYql::NLog::EComponent c) {
+ NYql::NLog::YqlLogger().SetComponentLevel(c, level);
+ });
+ }
+
+ if (TraceOptStream) {
+ NYql::NLog::YqlLogger().SetComponentLevel(NYql::NLog::EComponent::Core, NYql::NLog::ELevel::TRACE);
+ NYql::NLog::YqlLogger().SetComponentLevel(NYql::NLog::EComponent::CoreEval, NYql::NLog::ELevel::TRACE);
+ NYql::NLog::YqlLogger().SetComponentLevel(NYql::NLog::EComponent::CorePeepHole, NYql::NLog::ELevel::TRACE);
+ } else if (ShowLog) {
+ NYql::NLog::YqlLogger().SetComponentLevel(NYql::NLog::EComponent::Core, NYql::NLog::ELevel::DEBUG);
+ }
+}
+
+void TFacadeRunOptions::PrintInfo(const TString& msg) {
+ if (!NoDebug && Verbosity >= TLOG_INFO) {
+ Cerr << msg << Endl;
+ }
+}
+
+void TFacadeRunOptions::Parse(int argc, const char *argv[]) {
+
+ NLastGetopt::TOpts opts = NLastGetopt::TOpts::Default();
+
+ opts.AddHelpOption();
+ opts.AddLongOption('p', "program", "Program file").Required().RequiredArgument("FILE")
+ .Handler1T<TString>([this](const TString& file) {
+ ProgramFile = file;
+ if (ProgramFile == "-") {
+ ProgramFile = "-stdin-";
+ ProgramText = Cin.ReadAll();
+ } else {
+ ProgramText = TFileInput(ProgramFile).ReadAll();
+ }
+ User = GetUsername();
+ });
+ opts.AddLongOption('s', "sql", "Program is SQL query").NoArgument().StoreValue(&ProgramType, EProgramType::Sql);
+ if (PgSupport) {
+ opts.AddLongOption("pg", "Program has PG syntax").NoArgument().StoreValue(&ProgramType, EProgramType::Pg);
+ opts.AddLongOption("pg-ext", "pg extensions config file").Optional().RequiredArgument("FILE")
+ .Handler1T<TString>([this](const TString& file) {
+ PgExtConfig = ParseProtoConfig<NProto::TPgExtensions>(file);
+ });
+ }
+ opts.AddLongOption('f', "file", "Additional files").RequiredArgument("name@path")
+ .KVHandler([this](TString name, TString path) {
+ if (name.empty() || path.empty()) {
+ throw yexception() << "Incorrect file mapping, expected form name@path, e.g. MyFile@file.txt";
+ }
+
+ auto& entry = DataTable[NYql::TUserDataKey::File(NYql::GetDefaultFilePrefix() + name)];
+ entry.Type = NYql::EUserDataType::PATH;
+ entry.Data = path;
+ }, '@');
+
+ opts.AddLongOption('U', "url", "Additional urls").RequiredArgument("name@path")
+ .KVHandler([this](TString name, TString url) {
+ if (name.empty() || url.empty()) {
+ throw yexception() << "url mapping, expected form name@url, e.g. MyUrl@http://example.com/file";
+ }
+
+ auto& entry = DataTable[NYql::TUserDataKey::File(NYql::GetDefaultFilePrefix() + name)];
+ entry.Type = NYql::EUserDataType::URL;
+ entry.Data = url;
+ }, '@');
+
+ opts.AddLongOption('m', "mounts", "Mount points config file.").Optional().RequiredArgument("FILE")
+ .Handler1T<TString>([this](const TString& file) {
+ MountConfig = ParseProtoConfig<NYqlMountConfig::TMountConfig>(file);
+ });
+ opts.AddLongOption("params-file", "Query parameters values in YSON format").Optional().RequiredArgument("FILE")
+ .Handler1T<TString>([this](const TString& file) {
+ Params = TFileInput(file).ReadAll();
+ });
+ opts.AddLongOption('G', "gateways", "Used gateways").DefaultValue(JoinSeq(",", SupportedGateways_))
+ .Handler1T<TString>([this](const TString& gateways) {
+ ::StringSplitter(gateways).Split(',').Consume([&](const TStringBuf& val) {
+ if (!SupportedGateways_.contains(val)) {
+ throw yexception() << "Unsupported gateway \"" << val << '"';
+ }
+ GatewayTypes.emplace(val);
+ });
+ });
+ opts.AddLongOption("gateways-cfg", "Gateways configuration file").Optional().RequiredArgument("FILE")
+ .Handler1T<TString>([this](const TString& file) {
+ GatewaysConfig = ParseProtoConfig<TGatewaysConfig>(file);
+ });
+ opts.AddLongOption("fs-cfg", "Fs configuration file").Optional().RequiredArgument("FILE")
+ .Handler1T<TString>([this](const TString& file) {
+ FsConfig = MakeHolder<TFileStorageConfig>();
+ LoadFsConfigFromFile(file, *FsConfig);
+ });
+ opts.AddLongOption('u', "udf", "Load shared library with UDF by given path").AppendTo(&UdfsPaths);
+ opts.AddLongOption("udfs-dir", "Load all shared libraries with UDFs found in given directory")
+ .Handler1T<TString>([this](const TString& dir) {
+ NKikimr::NMiniKQL::FindUdfsInDir(dir, &UdfsPaths);
+ });
+ opts.AddLongOption("udf-resolver", "Path to udf-resolver").Optional().RequiredArgument("PATH").StoreResult(&UdfResolverPath);
+ opts.AddLongOption("udf-resolver-filter-syscalls", "Filter syscalls in udf resolver").Optional().NoArgument().SetFlag(&UdfResolverFilterSyscalls);
+ opts.AddLongOption("scan-udfs", "Scan specified udfs with external udf-resolver to use static function registry").NoArgument().SetFlag(&ScanUdfs);
+
+ opts.AddLongOption("parse-only", "Exit after program has been parsed").NoArgument().StoreValue(&Mode, ERunMode::Parse);
+ opts.AddLongOption("compile-only", "Exit after program has been compiled").NoArgument().StoreValue(&Mode, ERunMode::Compile);
+ opts.AddLongOption("validate", "Exit after program has been validated").NoArgument().StoreValue(&Mode, ERunMode::Validate);
+ opts.AddLongOption("lineage", "Exit after data lineage has been calculated").NoArgument().StoreValue(&Mode, ERunMode::Lineage);
+ opts.AddLongOption('O',"optimize", "Optimize expression").NoArgument().StoreValue(&Mode, ERunMode::Optimize);
+ opts.AddLongOption('R',"run", "Run expression using input/output tables").NoArgument().StoreValue(&Mode, ERunMode::Run);
+ opts.AddLongOption('D', "discover", "Discover tables in the program").NoArgument().StoreValue(&Mode, ERunMode::Discover);
+ opts.AddLongOption("peephole", "Perform peephole stage of expression using input/output tables").NoArgument().StoreValue(&Mode, ERunMode::Peephole);
+
+ opts.AddLongOption('L', "show-log", "Show transformation log").Optional().NoArgument().SetFlag(&ShowLog);
+ opts.AddLongOption('v', "verbosity", "Log verbosity level").Optional().RequiredArgument("LEVEL").StoreResult(&Verbosity);
+ opts.AddLongOption("print-ast", "Print AST after loading").NoArgument().SetFlag(&PrintAst);
+ opts.AddLongOption("print-expr", "Print rebuild AST before execution").NoArgument()
+ .Handler0([this]() {
+ if (!ExprStream) {
+ ExprStream = &Cout;
+ }
+ });
+ opts.AddLongOption("with-types", "Print types annotation").NoArgument().SetFlag(&WithTypes);
+ opts.AddLongOption("trace-opt", "Print AST in the begin of each transformation").NoArgument()
+ .Handler0([this]() {
+ TraceOptStream = &Cerr;
+ });
+ opts.AddLongOption("expr-file", "Print AST to that file instead of stdout").Optional().RequiredArgument("FILE")
+ .Handler1T<TString>([this](const TString& file) {
+ ExprStreamHolder = MakeHolder<TFixedBufferFileOutput>(file);
+ ExprStream = ExprStreamHolder.Get();
+ });
+ opts.AddLongOption("print-result", "Print program execution result to stdout").NoArgument()
+ .Handler0([this]() {
+ if (!ResultStream) {
+ ResultStream = &Cout;
+ }
+ });
+ opts.AddLongOption("format", "Results format")
+ .Optional()
+ .RequiredArgument("STR")
+ .Choices(THashSet<TString>{"text", "binary", "pretty"})
+ .Handler1T<TString>([this](const TString& val) {
+ if (val == "text") {
+ ResultsFormat = NYson::EYsonFormat::Text;
+ } else if (val == "binary") {
+ ResultsFormat = NYson::EYsonFormat::Binary;
+ } else if (val == "pretty") {
+ ResultsFormat = NYson::EYsonFormat::Pretty;
+ } else {
+ throw yexception() << "Unknown result format " << val;
+ }
+ });
+
+ opts.AddLongOption("result-file", "Print program execution result to file").Optional().RequiredArgument("FILE")
+ .Handler1T<TString>([this](const TString& file) {
+ ResultStreamHolder = MakeHolder<TFixedBufferFileOutput>(file);
+ ResultStream = ResultStreamHolder.Get();
+ });
+ opts.AddLongOption('P',"trace-plan", "Print plan before execution").NoArgument()
+ .Handler0([this]() {
+ if (!PlanStream) {
+ PlanStream = &Cerr;
+ }
+ });
+ opts.AddLongOption("plan-file", "Print program plan to file").Optional().RequiredArgument("FILE")
+ .Handler1T<TString>([this](const TString& file) {
+ PlanStreamHolder = MakeHolder<TFixedBufferFileOutput>(file);
+ PlanStream = PlanStreamHolder.Get();
+ });
+ opts.AddLongOption("err-file", "Print validate/optimize/runtime errors to file")
+ .Handler1T<TString>([this](const TString& file) {
+ ErrStreamHolder = MakeHolder<TFixedBufferFileOutput>(file);
+ ErrStream = ErrStreamHolder.Get();
+ });
+ opts.AddLongOption("full-expr", "Avoid buffering of expr/plan").NoArgument().SetFlag(&FullExpr);
+ opts.AddLongOption("mem-limit", "Set memory limit in megabytes")
+ .Handler1T<ui32>(0, [](ui32 memLimit) {
+ if (memLimit) {
+#ifdef __unix__
+ auto memLimitBytes = memLimit * 1024 * 1024;
+
+ struct rlimit rl;
+ if (getrlimit(RLIMIT_AS, &rl)) {
+ throw TSystemError() << "Cannot getrlimit(RLIMIT_AS)";
+ }
+
+ rl.rlim_cur = memLimitBytes;
+ if (setrlimit(RLIMIT_AS, &rl)) {
+ throw TSystemError() << "Cannot setrlimit(RLIMIT_AS) to " << memLimitBytes << " bytes";
+ }
+#else
+ throw yexception() << "Memory limit can not be set on this platfrom";
+#endif
+ }
+ });
+
+ opts.AddLongOption("validate-mode", "Validate udf mode, available values: " + NUdf::ValidateModeAvailables())
+ .DefaultValue(NUdf::ValidateModeAsStr(NUdf::EValidateMode::Greedy))
+ .Handler1T<TString>([this](const TString& mode) {
+ ValidateMode = NUdf::ValidateModeByStr(mode);
+ });
+ opts.AddLongOption("stat", "Print execution statistics").Optional().OptionalArgument("FILE")
+ .Handler1T<TString>([this](const TString& file) {
+ if (file) {
+ StatStreamHolder = MakeHolder<TFileOutput>(file);
+ StatStream = StatStreamHolder.Get();
+ } else {
+ StatStream = &Cerr;
+ }
+ });
+ opts.AddLongOption("full-stat", "Output full execution statistics").Optional().NoArgument().SetFlag(&FullStatistics);
+
+ opts.AddLongOption("sql-flags", "SQL translator pragma flags").SplitHandler(&SqlFlags, ',');
+ opts.AddLongOption("syntax-version", "SQL syntax version").StoreResult(&SyntaxVersion).DefaultValue(1);
+ opts.AddLongOption("ansi-lexer", "Use ansi lexer").NoArgument().SetFlag(&AnsiLexer);
+ opts.AddLongOption("assume-ydb-on-slash", "Assume YDB provider if cluster name starts with '/'").NoArgument().SetFlag(&AssumeYdbOnClusterWithSlash);
+ opts.AddLongOption("test-antlr4", "Check antlr4 parser").NoArgument().SetFlag(&TestAntlr4);
+
+ opts.AddLongOption("with-final-issues", "Include some final messages (like statistic) in issues").NoArgument().SetFlag(&WithFinalIssues);
+ if (FailureInjectionSupport) {
+ opts.AddLongOption("failure-inject", "Activate failure injection")
+ .Optional()
+ .RequiredArgument("INJECTION_NAME=FAIL_COUNT or INJECTION_NAME=SKIP_COUNT/FAIL_COUNT")
+ .KVHandler([](TString name, TString value) {
+ TFailureInjector::Activate();
+ TStringBuf fail = value;
+ TStringBuf skip;
+ if (TStringBuf(value).TrySplit('/', skip, fail)) {
+ TFailureInjector::Set(name, FromString<ui32>(skip), FromString<ui32>(fail));
+ } else {
+ TFailureInjector::Set(name, 0, FromString<ui32>(fail));
+ }
+ });
+ }
+
+ opts.SetFreeArgsMax(0);
+
+ for (auto& ext: OptExtenders_) {
+ ext(opts);
+ }
+
+ auto res = NLastGetopt::TOptsParseResult(&opts, argc, argv);
+
+ for (auto& handle: OptHandlers_) {
+ handle(res);
+ }
+
+ if (Mode >= ERunMode::Validate && GatewayTypes.empty()) {
+ throw yexception() << "At least one gateway from the list " << JoinSeq(",", SupportedGateways_).Quote() << " must be specified";
+ }
+
+ if (!GatewaysConfig) {
+ GatewaysConfig = ParseProtoFromResource<TGatewaysConfig>("gateways.conf");
+ }
+
+ if (GatewaysConfig && GatewaysConfig->HasSqlCore()) {
+ SqlFlags.insert(GatewaysConfig->GetSqlCore().GetTranslationFlags().begin(), GatewaysConfig->GetSqlCore().GetTranslationFlags().end());
+ }
+
+ if (!FsConfig) {
+ FsConfig = MakeHolder<TFileStorageConfig>();
+ if (NResource::Has("fs.conf")) {
+ LoadFsConfigFromResource("fs.conf", *FsConfig);
+ }
+ }
+}
+
+int TFacadeRunner::Main(int argc, const char *argv[]) {
+ NYql::NBacktrace::RegisterKikimrFatalActions();
+ NYql::NBacktrace::EnableKikimrSymbolize();
+
+ NYql::NLog::YqlLoggerScope logger(&Cerr);
+ try {
+ return DoMain(argc, argv);
+ }
+ catch (...) {
+ Cerr << CurrentExceptionMessage() << Endl;
+ return 1;
+ }
+}
+
+int TFacadeRunner::DoMain(int argc, const char *argv[]) {
+ Y_UNUSED(NUdf::GetStaticSymbols());
+
+ RunOptions_.Parse(argc, argv);
+
+ if (!RunOptions_.NoDebug) {
+ Cerr << Name_ << " ABI version: " << NKikimr::NUdf::CurrentAbiVersionStr() << Endl;
+ }
+
+ RunOptions_.InitLogger();
+
+ if (RunOptions_.PgSupport) {
+ ClusterMapping_["pg_catalog"] = PgProviderName;
+ ClusterMapping_["information_schema"] = PgProviderName;
+
+ NPg::SetSqlLanguageParser(NSQLTranslationPG::CreateSqlLanguageParser());
+ NPg::LoadSystemFunctions(*NSQLTranslationPG::CreateSystemFunctionsParser());
+ if (RunOptions_.PgExtConfig) {
+ TVector<NPg::TExtensionDesc> extensions;
+ PgExtensionsFromProto(*RunOptions_.PgExtConfig, extensions);
+ NPg::RegisterExtensions(extensions, false,
+ *NSQLTranslationPG::CreateExtensionSqlParser(),
+ NKikimr::NMiniKQL::CreateExtensionLoader().get());
+ }
+
+ NPg::GetSqlLanguageParser()->Freeze();
+ }
+
+ FuncRegistry_ = NKikimr::NMiniKQL::CreateFunctionRegistry(&NYql::NBacktrace::KikimrBackTrace,
+ NKikimr::NMiniKQL::CreateBuiltinRegistry(), true, RunOptions_.UdfsPaths);
+
+ TExprContext ctx;
+ if (RunOptions_.PgSupport) {
+ ctx.NextUniqueId = NPg::GetSqlLanguageParser()->GetContext().NextUniqueId;
+ }
+ IModuleResolver::TPtr moduleResolver;
+ if (RunOptions_.MountConfig) {
+ TModulesTable modules;
+ FillUserDataTableFromFileSystem(*RunOptions_.MountConfig, RunOptions_.DataTable);
+
+ if (!CompileLibraries(RunOptions_.DataTable, ctx, modules)) {
+ *RunOptions_.ErrStream << "Errors on compile libraries:" << Endl;
+ ctx.IssueManager.GetIssues().PrintTo(*RunOptions_.ErrStream);
+ return -1;
+ }
+
+ moduleResolver = std::make_shared<TModuleResolver>(std::move(modules), ctx.NextUniqueId, ClusterMapping_, RunOptions_.SqlFlags, RunOptions_.Mode >= ERunMode::Validate);
+ } else {
+ if (!GetYqlDefaultModuleResolver(ctx, moduleResolver, ClusterMapping_, RunOptions_.Mode >= ERunMode::Validate)) {
+ *RunOptions_.ErrStream << "Errors loading default YQL libraries:" << Endl;
+ ctx.IssueManager.GetIssues().PrintTo(*RunOptions_.ErrStream);
+ return -1;
+ }
+ }
+
+ TExprContext::TFreezeGuard freezeGuard(ctx);
+
+ if (RunOptions_.Mode >= ERunMode::Validate) {
+ std::vector<NFS::IDownloaderPtr> downloaders;
+ for (auto& factory: FsDownloadFactories_) {
+ downloaders.push_back(factory());
+ }
+
+ FileStorage_ = WithAsync(CreateFileStorage(*RunOptions_.FsConfig, downloaders));
+ }
+
+ IUdfResolver::TPtr udfResolver;
+ TUdfIndex::TPtr udfIndex;
+ if (FileStorage_ && RunOptions_.ScanUdfs) {
+ if (!RunOptions_.UdfResolverPath) {
+ Cerr << "udf-resolver path must be specified when use 'scan-udfs'";
+ return -1;
+ }
+
+ udfResolver = NCommon::CreateOutProcUdfResolver(FuncRegistry_.Get(), FileStorage_, RunOptions_.UdfResolverPath, {}, {}, RunOptions_.UdfResolverFilterSyscalls, {});
+
+ RunOptions_.PrintInfo(TStringBuilder() << TInstant::Now().ToStringLocalUpToSeconds() << " Udf scanning started for " << RunOptions_.UdfsPaths.size() << " udfs ...");
+ udfIndex = new TUdfIndex();
+ LoadRichMetadataToUdfIndex(*udfResolver, RunOptions_.UdfsPaths, false, TUdfIndex::EOverrideMode::RaiseError, *udfIndex);
+ RunOptions_.PrintInfo(TStringBuilder() << TInstant::Now().ToStringLocalUpToSeconds() << " UdfIndex done.");
+
+ udfResolver = NCommon::CreateUdfResolverWithIndex(udfIndex, udfResolver, FileStorage_);
+ RunOptions_.PrintInfo(TStringBuilder() << TInstant::Now().ToStringLocalUpToSeconds() << " Udfs scanned");
+ } else {
+ udfResolver = FileStorage_ && RunOptions_.UdfResolverPath
+ ? NCommon::CreateOutProcUdfResolver(FuncRegistry_.Get(), FileStorage_, RunOptions_.UdfResolverPath, {}, {}, RunOptions_.UdfResolverFilterSyscalls, {})
+ : NCommon::CreateSimpleUdfResolver(FuncRegistry_.Get(), FileStorage_, true);
+ }
+
+ TVector<TDataProviderInitializer> dataProvidersInit;
+ if (RunOptions_.PgSupport) {
+ dataProvidersInit.push_back(GetPgDataProviderInitializer());
+ }
+ for (auto& factory: ProviderFactories_) {
+ dataProvidersInit.push_back(factory());
+ }
+
+ TVector<IUrlListerPtr> urlListers;
+ for (auto& factory: UrlListerFactories_) {
+ urlListers.push_back(factory());
+ }
+
+ TProgramFactory factory(RunOptions_.UseRepeatableRandomAndTimeProviders, FuncRegistry_.Get(), ctx.NextUniqueId, dataProvidersInit, Name_);
+ factory.AddUserDataTable(RunOptions_.DataTable);
+ factory.SetModules(moduleResolver);
+ factory.SetFileStorage(FileStorage_);
+ if (RunOptions_.GatewaysConfig && RunOptions_.GatewaysConfig->HasFs()) {
+ factory.SetUrlPreprocessing(new NYql::TUrlPreprocessing(*RunOptions_.GatewaysConfig));
+ }
+ factory.SetUdfIndex(udfIndex, new TUdfIndexPackageSet());
+ factory.SetUdfResolver(udfResolver);
+ factory.SetGatewaysConfig(RunOptions_.GatewaysConfig.Get());
+ factory.SetCredentials(Credentials_);
+ factory.EnableRangeComputeFor();
+ if (!urlListers.empty()) {
+ factory.SetUrlListerManager(MakeUrlListerManager(urlListers));
+ }
+
+ return RunProgram(factory);
+}
+
+int TFacadeRunner::RunProgram(TProgramFactory& factory) {
+
+ TProgramPtr program = factory.Create(RunOptions_.ProgramFile, RunOptions_.ProgramText);;
+ if (RunOptions_.Params) {
+ program->SetParametersYson(RunOptions_.Params);
+ }
+
+ if (RunOptions_.EnableResultPosition) {
+ program->EnableResultPosition();
+ }
+
+ if (ProgressWriter_) {
+ program->SetProgressWriter(ProgressWriter_);
+ }
+ program->SetUseTableMetaFromGraph(RunOptions_.UseMetaFromGrpah);
+ program->SetValidateOptions(RunOptions_.ValidateMode);
+
+ bool fail = false;
+ if (RunOptions_.ProgramType != EProgramType::SExpr) {
+ RunOptions_.PrintInfo("Parse SQL...");
+ google::protobuf::Arena arena;
+ NSQLTranslation::TTranslationSettings settings;
+ settings.Arena = &arena;
+ settings.PgParser = EProgramType::Pg == RunOptions_.ProgramType;
+ settings.ClusterMapping = ClusterMapping_;
+ settings.Flags = RunOptions_.SqlFlags;
+ settings.SyntaxVersion = RunOptions_.SyntaxVersion;
+ settings.AnsiLexer = RunOptions_.AnsiLexer;
+ settings.TestAntlr4 = RunOptions_.TestAntlr4;
+ settings.V0Behavior = NSQLTranslation::EV0Behavior::Report;
+ settings.AssumeYdbOnClusterWithSlash = RunOptions_.AssumeYdbOnClusterWithSlash;
+ if (ERunMode::Discover == RunOptions_.Mode) {
+ settings.Mode = NSQLTranslation::ESqlMode::DISCOVERY;
+ }
+ if (!program->ParseSql(settings)) {
+ program->PrintErrorsTo(*RunOptions_.ErrStream);
+ fail = true;
+ }
+ if (!fail && RunOptions_.TestSqlFormat && 1 == RunOptions_.SyntaxVersion) {
+ TString formattedProgramText;
+ NYql::TIssues issues;
+ auto formatter = NSQLFormat::MakeSqlFormatter(settings);
+ if (!formatter->Format(RunOptions_.ProgramText, formattedProgramText, issues)) {
+ *RunOptions_.ErrStream << "Format failed" << Endl;
+ issues.PrintTo(*RunOptions_.ErrStream);
+ return -1;
+ }
+
+ auto frmProgram = factory.Create("formatted SQL", formattedProgramText);
+ if (!frmProgram->ParseSql(settings)) {
+ frmProgram->PrintErrorsTo(*RunOptions_.ErrStream);
+ return -1;
+ }
+
+ TStringStream srcQuery, frmQuery;
+
+ program->AstRoot()->PrettyPrintTo(srcQuery, PRETTY_FLAGS);
+ frmProgram->AstRoot()->PrettyPrintTo(frmQuery, PRETTY_FLAGS);
+ if (srcQuery.Str() != frmQuery.Str()) {
+ *RunOptions_.ErrStream << "source query's AST and formatted query's AST are not same" << Endl;
+ return -1;
+ }
+ }
+ } else {
+ RunOptions_.PrintInfo("Parse YQL...");
+ if (!program->ParseYql()) {
+ program->PrintErrorsTo(*RunOptions_.ErrStream);
+ fail = true;
+ }
+ }
+
+ if (RunOptions_.TraceOptStream) {
+ if (auto ast = program->GetQueryAst()) {
+ *RunOptions_.TraceOptStream << *ast << Endl;
+ }
+ }
+ if (fail) {
+ return -1;
+ }
+
+ if (RunOptions_.PrintAst) {
+ program->AstRoot()->PrettyPrintTo(Cout, PRETTY_FLAGS);
+ }
+
+ if (ERunMode::Parse == RunOptions_.Mode) {
+ return 0;
+ }
+
+ RunOptions_.PrintInfo("Compile program...");
+ if (!program->Compile(RunOptions_.User)) {
+ program->PrintErrorsTo(*RunOptions_.ErrStream);
+ fail = true;
+ }
+
+ if (RunOptions_.TraceOptStream) {
+ program->Print(RunOptions_.TraceOptStream, nullptr);
+ }
+ if (fail) {
+ return -1;
+ }
+
+ if (ERunMode::Compile == RunOptions_.Mode) {
+ if (RunOptions_.ExprStream) {
+ auto baseAst = ConvertToAst(*program->ExprRoot(), program->ExprCtx(), NYql::TExprAnnotationFlags::None, true);
+ baseAst.Root->PrettyPrintTo(*RunOptions_.ExprStream, PRETTY_FLAGS);
+ }
+ return 0;
+ }
+
+ auto defOptConfig = TOptPipelineConfigurator(program, RunOptions_.FullExpr ? RunOptions_.PlanStream : nullptr, RunOptions_.FullExpr ? RunOptions_.ExprStream : nullptr, RunOptions_.WithTypes);
+ IPipelineConfigurator* optConfig = OptPipelineConfigurator_ ? OptPipelineConfigurator_ : &defOptConfig;
+
+ TProgram::TStatus status = TProgram::TStatus::Ok;
+ if (ERunMode::Peephole == RunOptions_.Mode) {
+ RunOptions_.PrintInfo("Peephole...");
+ auto defConfig = TPeepHolePipelineConfigurator();
+ IPipelineConfigurator* config = PeepholePipelineConfigurator_ ? PeepholePipelineConfigurator_ : &defConfig;
+ status = program->OptimizeWithConfig(RunOptions_.User, *config);
+
+ if (RunOptions_.ExprStream && program->ExprRoot()) {
+ auto ast = ConvertToAst(*program->ExprRoot(), program->ExprCtx(), RunOptions_.WithTypes ? TExprAnnotationFlags::Types : TExprAnnotationFlags::None, true);
+ ui32 prettyFlags = TAstPrintFlags::ShortQuote;
+ if (!RunOptions_.WithTypes) {
+ prettyFlags |= TAstPrintFlags::PerLine;
+ }
+ ast.Root->PrettyPrintTo(*RunOptions_.ExprStream, prettyFlags);
+ }
+
+ } else if (ERunMode::Run == RunOptions_.Mode) {
+ RunOptions_.PrintInfo("Run program...");
+ status = program->RunWithConfig(RunOptions_.User, *optConfig);
+ } else if (ERunMode::Optimize == RunOptions_.Mode) {
+ RunOptions_.PrintInfo("Optimize program...");
+ status = program->OptimizeWithConfig(RunOptions_.User, *optConfig);
+ } else if (ERunMode::Validate == RunOptions_.Mode) {
+ RunOptions_.PrintInfo("Validate program...");
+ status = program->Validate(RunOptions_.User, RunOptions_.ExprStream, RunOptions_.WithTypes);
+ } else if (ERunMode::Discover == RunOptions_.Mode) {
+ RunOptions_.PrintInfo("Discover program...");
+ status = program->Discover(RunOptions_.User);
+ } else if (ERunMode::Lineage == RunOptions_.Mode) {
+ RunOptions_.PrintInfo("Calculating lineage in program...");
+ status = program->LineageWithConfig(RunOptions_.User, *optConfig);
+ }
+
+ if (RunOptions_.WithFinalIssues) {
+ program->FinalizeIssues();
+ }
+ program->PrintErrorsTo(*RunOptions_.ErrStream);
+ if (status == TProgram::TStatus::Error) {
+ if (RunOptions_.TraceOptStream) {
+ program->Print(RunOptions_.TraceOptStream, nullptr);
+ }
+ return -1;
+ }
+
+ if (!RunOptions_.FullExpr && ERunMode::Peephole != RunOptions_.Mode) {
+ program->Print(RunOptions_.ExprStream, RunOptions_.PlanStream, /*cleanPlan*/true);
+ }
+
+ program->ConfigureYsonResultFormat(RunOptions_.ResultsFormat);
+
+ if (RunOptions_.ResultStream) {
+ RunOptions_.PrintInfo("Getting results...");
+ if (ERunMode::Discover == RunOptions_.Mode) {
+ if (auto data = program->GetDiscoveredData()) {
+ *RunOptions_.ResultStream << *data;
+ }
+ } else if (ERunMode::Lineage == RunOptions_.Mode) {
+ if (auto data = program->GetLineage()) {
+ TStringInput in(*data);
+ NYson::ReformatYsonStream(&in, RunOptions_.ResultStream, RunOptions_.ResultsFormat);
+ }
+ } else if (program->HasResults()) {
+ if (RunOptions_.ValidateResultFormat) {
+ auto str = program->ResultsAsString();
+ if (!str.empty()) {
+ auto node = NYT::NodeFromYsonString(str);
+ for (const auto& r : NResult::ParseResponse(node)) {
+ for (const auto& write : r.Writes) {
+ if (write.Type) {
+ NResult::TEmptyTypeVisitor visitor;
+ NResult::ParseType(*write.Type, visitor);
+ }
+
+ if (write.Type && write.Data) {
+ NResult::TEmptyDataVisitor visitor;
+ NResult::ParseData(*write.Type, *write.Data, visitor);
+ }
+ }
+ }
+ }
+
+ RunOptions_.ResultStream->Write(str.data(), str.size());
+ } else {
+ *RunOptions_.ResultStream << program->ResultsAsString();
+ }
+ }
+ }
+
+ if (RunOptions_.StatStream) {
+ if (auto st = program->GetStatistics(!RunOptions_.FullStatistics)) {
+ *RunOptions_.StatStream << *st;
+ }
+ }
+
+ RunOptions_.PrintInfo("");
+ RunOptions_.PrintInfo("Done");
+
+ return 0;
+}
+
+} // NYql
diff --git a/yql/essentials/tools/yql_facade_run/yql_facade_run.h b/yql/essentials/tools/yql_facade_run/yql_facade_run.h
new file mode 100644
index 0000000000..0b7e7a9e8d
--- /dev/null
+++ b/yql/essentials/tools/yql_facade_run/yql_facade_run.h
@@ -0,0 +1,213 @@
+#pragma once
+
+#include <yql/essentials/core/file_storage/defs/downloader.h>
+#include <yql/essentials/core/file_storage/file_storage.h>
+#include <yql/essentials/core/credentials/yql_credentials.h>
+#include <yql/essentials/core/url_lister/interface/url_lister.h>
+#include <yql/essentials/core/yql_data_provider.h>
+#include <yql/essentials/core/yql_user_data.h>
+
+#include <library/cpp/getopt/last_getopt.h>
+#include <library/cpp/yson/public.h>
+#include <library/cpp/logger/priority.h>
+
+#include <util/generic/hash_set.h>
+#include <util/generic/hash.h>
+#include <util/generic/strbuf.h>
+#include <util/generic/string.h>
+
+#include <functional>
+
+namespace NKikimr::NMiniKQL {
+ class IFunctionRegistry;
+}
+
+namespace NYql {
+ class TFileStorageConfig;
+ class TGatewaysConfig;
+ class TProgramFactory;
+}
+
+namespace NYql::NProto {
+ class TPgExtensions;
+}
+
+namespace NYqlMountConfig {
+ class TMountConfig;
+}
+
+namespace NYql {
+
+enum class ERunMode {
+ Parse /* "parse" */,
+ Compile /* "compile" */,
+ Validate /* "validate" */,
+ Optimize /* "optimize" */,
+ Peephole /* "peephole" */,
+ Lineage /* "lineage" */,
+ Discover /* "discover" */,
+ Run /* "run" */,
+};
+
+enum class EProgramType {
+ SExpr /* "s-expr" */,
+ Sql /* "sql" */,
+ Pg /* "pg" */,
+};
+
+class TFacadeRunOptions {
+public:
+ TFacadeRunOptions();
+ ~TFacadeRunOptions();
+
+ EProgramType ProgramType = EProgramType::SExpr;
+ NYson::EYsonFormat ResultsFormat = NYson::EYsonFormat::Text;
+ ERunMode Mode = ERunMode::Run;
+ TString ProgramFile;
+ TString ProgramText;
+ TString User;
+ ui64 MemLimit = 0;
+
+ THashSet<TString> SqlFlags;
+ ui16 SyntaxVersion = 1;
+ bool AnsiLexer = false;
+ bool TestAntlr4 = false;
+ bool AssumeYdbOnClusterWithSlash = false;
+
+ bool PrintAst = false;
+ bool FullExpr = false;
+ bool WithTypes = false;
+ bool FullStatistics = false;
+ int Verbosity = TLOG_ERR;
+ bool ShowLog = false;
+ bool WithFinalIssues = false;
+
+ IOutputStream* TraceOptStream = nullptr;
+
+ IOutputStream* ErrStream = &Cerr;
+ THolder<IOutputStream> ErrStreamHolder;
+ IOutputStream* PlanStream = nullptr;
+ THolder<IOutputStream> PlanStreamHolder;
+ IOutputStream* ExprStream = nullptr;
+ THolder<IOutputStream> ExprStreamHolder;
+ IOutputStream* ResultStream = nullptr;
+ THolder<IOutputStream> ResultStreamHolder;
+ IOutputStream* StatStream = nullptr;
+ THolder<IOutputStream> StatStreamHolder;
+
+ NYql::TUserDataTable DataTable;
+ TVector<TString> UdfsPaths;
+ TString Params;
+ NUdf::EValidateMode ValidateMode = NUdf::EValidateMode::Greedy;
+
+ THashSet<TString> GatewayTypes;
+ TString UdfResolverPath;
+ bool UdfResolverFilterSyscalls = false;
+ bool ScanUdfs = false;
+ THolder<NYqlMountConfig::TMountConfig> MountConfig;
+ THolder<TGatewaysConfig> GatewaysConfig;
+ THolder<TFileStorageConfig> FsConfig;
+ THolder<NProto::TPgExtensions> PgExtConfig;
+
+ // No command line options for these settings. Should be configured in the inherited class
+ bool NoDebug = false;
+ bool PgSupport = true;
+ bool FailureInjectionSupport = false;
+ bool UseRepeatableRandomAndTimeProviders = false;
+ bool UseMetaFromGrpah = false;
+ bool TestSqlFormat = false;
+ bool ValidateResultFormat = false;
+ bool EnableResultPosition = false;
+
+ void Parse(int argc, const char *argv[]);
+
+ void AddOptExtension(std::function<void(NLastGetopt::TOpts& opts)> optExtender) {
+ OptExtenders_.push_back(std::move(optExtender));
+ }
+ void AddOptHandler(std::function<void(const NLastGetopt::TOptsParseResult& res)> optHandler) {
+ OptHandlers_.push_back(std::move(optHandler));
+ }
+ void SetSupportedGateways(std::initializer_list<TString> gateways) {
+ SupportedGateways_.insert(gateways);
+ }
+
+ void InitLogger();
+
+ void PrintInfo(const TString& msg);
+
+private:
+ std::vector<std::function<void(NLastGetopt::TOpts&)>> OptExtenders_;
+ std::vector<std::function<void(const NLastGetopt::TOptsParseResult&)>> OptHandlers_;
+ THashSet<TString> SupportedGateways_;
+};
+
+class TFacadeRunner {
+public:
+ TFacadeRunner(TString name)
+ : Name_(std::move(name))
+ {
+ }
+ int Main(int argc, const char *argv[]);
+
+ void AddFsDownloadFactory(std::function<NFS::IDownloaderPtr()> factory) {
+ FsDownloadFactories_.push_back(std::move(factory));
+ }
+ void AddProviderFactory(std::function<NYql::TDataProviderInitializer()> factory) {
+ ProviderFactories_.push_back(std::move(factory));
+ }
+ void AddUrlListerFactory(std::function<IUrlListerPtr()> factory) {
+ UrlListerFactories_.push_back(std::move(factory));
+ }
+ void AddClusterMapping(TString name, TString provider) {
+ ClusterMapping_[name] = std::move(provider);
+ }
+ template <class TPbConfig>
+ void FillClusterMapping(const TPbConfig& config, const TString& provider) {
+ for (auto& cluster: config.GetClusterMapping()) {
+ ClusterMapping_.emplace(to_lower(cluster.GetName()), provider);
+ }
+ }
+ void SetOperationProgressWriter(TOperationProgressWriter writer) {
+ ProgressWriter_ = std::move(writer);
+ }
+ void SetOptPipelineConfigurator(IPipelineConfigurator* configurator) {
+ OptPipelineConfigurator_ = configurator;
+ }
+ void SetPeepholePipelineConfigurator(IPipelineConfigurator* configurator) {
+ PeepholePipelineConfigurator_ = configurator;
+ }
+
+ TFileStoragePtr GetFileStorage() const {
+ return FileStorage_;
+ }
+ TIntrusivePtr<NKikimr::NMiniKQL::IFunctionRegistry> GetFuncRegistry() {
+ return FuncRegistry_;
+ }
+ TCredentials::TPtr GetCredentials() {
+ return Credentials_;
+ }
+ TFacadeRunOptions& GetRunOptions() {
+ return RunOptions_;
+ }
+
+private:
+ int DoMain(int argc, const char *argv[]);
+ int RunProgram(TProgramFactory& factory);
+
+private:
+ TString Name_;
+ std::vector<std::function<NFS::IDownloaderPtr()>> FsDownloadFactories_;
+ std::vector<std::function<TDataProviderInitializer()>> ProviderFactories_;
+ std::vector<std::function<IUrlListerPtr()>> UrlListerFactories_;
+ THashMap<TString, TString> ClusterMapping_;
+ THolder<TFileStorageConfig> FileStorageConfig_;
+ TFileStoragePtr FileStorage_;
+ TIntrusivePtr<NKikimr::NMiniKQL::IFunctionRegistry> FuncRegistry_;
+ TCredentials::TPtr Credentials_ = MakeIntrusive<TCredentials>();
+ TOperationProgressWriter ProgressWriter_;
+ IPipelineConfigurator* OptPipelineConfigurator_ = nullptr;
+ IPipelineConfigurator* PeepholePipelineConfigurator_ = nullptr;
+ TFacadeRunOptions RunOptions_;
+};
+
+} // NYql
diff --git a/yql/tools/yqlrun/gateway_spec.cpp b/yql/tools/yqlrun/gateway_spec.cpp
deleted file mode 100644
index b53b5ecce8..0000000000
--- a/yql/tools/yqlrun/gateway_spec.cpp
+++ /dev/null
@@ -1,14 +0,0 @@
-#include <yql/tools/yqlrun/gateway_spec.h>
-
-#include <library/cpp/getopt/last_getopt.h>
-
-using namespace NYql;
-using namespace NKikimr::NMiniKQL;
-
-void ExtProviderSpecific(const IFunctionRegistry* funcRegistry,
- TVector<TDataProviderInitializer>& dataProvidersInit,
- const THashMap<std::pair<TString, TString>, TVector<std::pair<TString, TString>>>& rtmrTableAttributes) {
- Y_UNUSED(funcRegistry);
- Y_UNUSED(dataProvidersInit);
- Y_UNUSED(rtmrTableAttributes);
-}
diff --git a/yql/tools/yqlrun/gateway_spec.h b/yql/tools/yqlrun/gateway_spec.h
deleted file mode 100644
index 9ddf9b1e12..0000000000
--- a/yql/tools/yqlrun/gateway_spec.h
+++ /dev/null
@@ -1,8 +0,0 @@
-#pragma once
-
-#include <yql/essentials/minikql/mkql_function_registry.h>
-#include <yql/essentials/core/yql_data_provider.h>
-
-void ExtProviderSpecific(const NKikimr::NMiniKQL::IFunctionRegistry* funcRegistry,
- TVector<NYql::TDataProviderInitializer>& dataProvidersInit,
- const THashMap<std::pair<TString, TString>, TVector<std::pair<TString, TString>>>& rtmrTableAttributes = {});
diff --git a/yql/tools/yqlrun/http/ya.make b/yql/tools/yqlrun/http/ya.make
index 90ad34dfee..34dafb3fae 100644
--- a/yql/tools/yqlrun/http/ya.make
+++ b/yql/tools/yqlrun/http/ya.make
@@ -10,27 +10,48 @@ SRCS(
)
PEERDIR(
- library/cpp/charset
- library/cpp/http/misc
+ yt/yql/providers/yt/common
+ yt/yql/providers/yt/gateway/file
+ yt/yql/providers/yt/provider
+
+ yql/essentials/providers/common/proto
+ yql/essentials/providers/common/provider
+ yql/essentials/providers/common/comp_nodes
+ yql/essentials/providers/pg/provider
+ yql/essentials/providers/config
+ yql/essentials/providers/result/provider
+
+ yql/essentials/public/issue
+ yql/essentials/core/facade
+ yql/essentials/core/url_preprocessing
+ yql/essentials/core/peephole_opt
+ yql/essentials/core/type_ann
+ yql/essentials/core/cbo/simple
+ yql/essentials/core/services
+ yql/essentials/ast
+ yql/essentials/core
+ yql/essentials/minikql
+ yql/essentials/minikql/comp_nodes
+ yql/essentials/parser/pg_wrapper/interface
+ yql/essentials/sql/v1/format
+ yql/essentials/utils/log
+ yql/essentials/utils
+
+ library/cpp/http/io
library/cpp/http/server
- library/cpp/json
- library/cpp/logger
+ library/cpp/http/misc
library/cpp/mime/types
- library/cpp/openssl/io
- library/cpp/string_utils/quote
library/cpp/uri
- library/cpp/yson
+ library/cpp/logger
library/cpp/yson/node
- yql/essentials/core/cbo/simple
- yql/essentials/core/facade
- yql/essentials/core/type_ann
- yql/essentials/providers/result/provider
- yql/essentials/parser/pg_wrapper
- yql/essentials/sql/v1/format
- yt/yql/providers/yt/gateway/file
- yt/yql/providers/yt/provider
- yql/essentials/core/url_preprocessing
- yql/essentials/providers/pg/provider
+ library/cpp/openssl/io
+ library/cpp/charset
+ library/cpp/yson
+ library/cpp/json
+ library/cpp/string_utils/quote
+ library/cpp/getopt
+
+ contrib/libs/protobuf
)
FILES(
diff --git a/yql/tools/yqlrun/http/yql_server.cpp b/yql/tools/yqlrun/http/yql_server.cpp
index 29547d8fcb..4cefa52396 100644
--- a/yql/tools/yqlrun/http/yql_server.cpp
+++ b/yql/tools/yqlrun/http/yql_server.cpp
@@ -1,7 +1,5 @@
#include "yql_server.h"
-#include <yql/tools/yqlrun/gateway_spec.h>
-
#include <yql/essentials/core/cbo/simple/cbo_simple.h>
#include <yql/essentials/providers/common/proto/gateways_config.pb.h>
#include <yql/essentials/providers/common/provider/yql_provider_names.h>
@@ -189,8 +187,6 @@ TProgramPtr MakeFileProgram(const TString& program, TYqlServer& yqlServer,
dataProvidersInit.push_back(GetYtNativeDataProviderInitializer(ytNativeGateway, MakeSimpleCBOOptimizerFactory(), {}));
dataProvidersInit.push_back(GetPgDataProviderInitializer());
- ExtProviderSpecific(yqlServer.FunctionRegistry, dataProvidersInit, rtmrTableAttributes);
-
TProgramFactory programFactory(
true,
yqlServer.FunctionRegistry,
diff --git a/yql/tools/yqlrun/lib/ya.make b/yql/tools/yqlrun/lib/ya.make
new file mode 100644
index 0000000000..ab165f806e
--- /dev/null
+++ b/yql/tools/yqlrun/lib/ya.make
@@ -0,0 +1,23 @@
+LIBRARY()
+
+SRCS(
+ yqlrun_lib.cpp
+)
+
+PEERDIR(
+ yt/yql/providers/yt/provider
+ yt/yql/providers/yt/gateway/file
+
+ yql/essentials/providers/common/provider
+ yql/essentials/core/cbo
+ yql/essentials/core/peephole_opt
+ yql/essentials/core/cbo/simple
+ yql/essentials/core/services
+
+ yql/essentials/tools/yql_facade_run
+
+)
+
+YQL_LAST_ABI_VERSION()
+
+END()
diff --git a/yql/tools/yqlrun/lib/yqlrun_lib.cpp b/yql/tools/yqlrun/lib/yqlrun_lib.cpp
new file mode 100644
index 0000000000..f8b681a456
--- /dev/null
+++ b/yql/tools/yqlrun/lib/yqlrun_lib.cpp
@@ -0,0 +1,128 @@
+#include "yqlrun_lib.h"
+
+#include <yt/yql/providers/yt/provider/yql_yt_provider_impl.h>
+#include <yt/yql/providers/yt/provider/yql_yt_provider.h>
+#include <yt/yql/providers/yt/gateway/file/yql_yt_file_services.h>
+#include <yt/yql/providers/yt/gateway/file/yql_yt_file.h>
+#include <yql/essentials/providers/common/provider/yql_provider_names.h>
+#include <yql/essentials/core/peephole_opt/yql_opt_peephole_physical.h>
+#include <yql/essentials/core/services/yql_transform_pipeline.h>
+#include <yql/essentials/core/cbo/simple/cbo_simple.h>
+
+#include <util/generic/yexception.h>
+#include <util/folder/iterator.h>
+#include <util/folder/dirut.h>
+#include <util/folder/path.h>
+#include <util/stream/output.h>
+
+namespace {
+
+class TPeepHolePipelineConfigurator : public NYql::IPipelineConfigurator {
+public:
+ TPeepHolePipelineConfigurator() = default;
+
+ void AfterCreate(NYql::TTransformationPipeline* pipeline) const final {
+ Y_UNUSED(pipeline);
+ }
+
+ void AfterTypeAnnotation(NYql::TTransformationPipeline* pipeline) const final {
+ Y_UNUSED(pipeline);
+ }
+
+ void AfterOptimize(NYql::TTransformationPipeline* pipeline) const final {
+ pipeline->Add(NYql::CreateYtWideFlowTransformer(nullptr), "WideFlow");
+ pipeline->Add(NYql::CreateYtBlockInputTransformer(nullptr), "BlockInput");
+ pipeline->Add(NYql::MakePeepholeOptimization(pipeline->GetTypeAnnotationContext()), "PeepHole");
+ }
+};
+
+TPeepHolePipelineConfigurator PEEPHOLE_CONFIG_INSTANCE;
+
+} // unnamed
+
+namespace NYql {
+
+TYqlRunTool::TYqlRunTool()
+ : TFacadeRunner("yqlrun")
+{
+ GetRunOptions().UseRepeatableRandomAndTimeProviders = true;
+ GetRunOptions().ResultsFormat = NYson::EYsonFormat::Pretty;
+
+ GetRunOptions().AddOptExtension([this](NLastGetopt::TOpts& opts) {
+ opts.AddLongOption('t', "table", "Table mapping").RequiredArgument("table@file")
+ .KVHandler([&](TString name, TString path) {
+ if (name.empty() || path.empty()) {
+ throw yexception() << "Incorrect table mapping, expected form table@file, e.g. yt.plato.Input@input.txt";
+ }
+ TablesMapping_[name] = path;
+ }, '@');
+
+ });
+ GetRunOptions().AddOptExtension([this](NLastGetopt::TOpts& opts) {
+ opts.AddLongOption("tables-dir", "Table dirs mapping").RequiredArgument("cluster@dir")
+ .KVHandler([&](TString cluster, TString dir) {
+ if (cluster.empty() || dir.empty()) {
+ throw yexception() << "Incorrect table directory mapping, expected form cluster@dir, e.g. yt.plato@/tmp/tables";
+ }
+ TablesDirMapping_[cluster] = dir;
+ for (const auto& entry: TDirIterator(TFsPath(dir))) {
+ if (auto entryPath = TFsPath(entry.fts_path); entryPath.IsFile() && entryPath.GetExtension() == "txt") {
+ auto tableName = TString(cluster).append('.').append(entryPath.RelativeTo(TFsPath(dir)).GetPath());
+ tableName = tableName.substr(0, tableName.size() - 4); // remove .txt extension
+ TablesMapping_[tableName] = entryPath.GetPath();
+ }
+ }
+ }, '@');
+
+ });
+ GetRunOptions().AddOptExtension([this](NLastGetopt::TOpts& opts) {
+ opts.AddLongOption('C', "cluster", "Cluster to service mapping").RequiredArgument("name@service")
+ .KVHandler([&](TString cluster, TString provider) {
+ if (cluster.empty() || provider.empty()) {
+ throw yexception() << "Incorrect service mapping, expected form cluster@provider, e.g. plato@yt";
+ }
+ AddClusterMapping(std::move(cluster), std::move(provider));
+ }, '@');
+
+ });
+ GetRunOptions().AddOptExtension([this](NLastGetopt::TOpts& opts) {
+ opts.AddLongOption("ndebug", "Do not show debug info in error output").NoArgument().SetFlag(&GetRunOptions().NoDebug);
+ });
+ GetRunOptions().AddOptExtension([this](NLastGetopt::TOpts& opts) {
+ opts.AddLongOption("keep-temp", "Keep temporary tables").NoArgument().SetFlag(&KeepTemp_);
+ });
+ GetRunOptions().AddOptExtension([this](NLastGetopt::TOpts& opts) {
+ opts.AddLongOption("show-progress", "Report operation progress").NoArgument()
+ .Handler0([&]() {
+ SetOperationProgressWriter([](const TOperationProgress& progress) {
+ Cerr << "Operation: [" << progress.Category << "] " << progress.Id << ", state: " << progress.State << "\n";
+ });
+ });
+ });
+ GetRunOptions().AddOptExtension([this](NLastGetopt::TOpts& opts) {
+ opts.AddLongOption("tmp-dir", "Directory for temporary tables").RequiredArgument("DIR").StoreResult(&TmpDir_);
+ });
+
+ GetRunOptions().AddOptExtension([this](NLastGetopt::TOpts& opts) {
+ opts.AddLongOption("test-format", "Compare formatted query's AST with the original query's AST (only syntaxVersion=1 is supported)").NoArgument().SetFlag(&GetRunOptions().TestSqlFormat);
+ });
+ GetRunOptions().AddOptExtension([this](NLastGetopt::TOpts& opts) {
+ opts.AddLongOption("validate-result-format", "Check that result-format can parse Result").NoArgument().SetFlag(&GetRunOptions().ValidateResultFormat);
+ });
+
+ GetRunOptions().SetSupportedGateways({TString{YtProviderName}});
+ GetRunOptions().GatewayTypes.emplace(YtProviderName);
+ AddClusterMapping(TString{"plato"}, TString{YtProviderName});
+
+ AddProviderFactory([this]() -> NYql::TDataProviderInitializer {
+ auto yqlNativeServices = NFile::TYtFileServices::Make(GetFuncRegistry().Get(), TablesMapping_, GetFileStorage(), TmpDir_, KeepTemp_, TablesDirMapping_);
+ auto ytNativeGateway = CreateYtFileGateway(yqlNativeServices);
+ return GetYtNativeDataProviderInitializer(ytNativeGateway, MakeSimpleCBOOptimizerFactory(), {});
+ });
+
+ SetPeepholePipelineConfigurator(&PEEPHOLE_CONFIG_INSTANCE);
+
+}
+
+
+} // NYql
diff --git a/yql/tools/yqlrun/lib/yqlrun_lib.h b/yql/tools/yqlrun/lib/yqlrun_lib.h
new file mode 100644
index 0000000000..bb0e360ade
--- /dev/null
+++ b/yql/tools/yqlrun/lib/yqlrun_lib.h
@@ -0,0 +1,21 @@
+#pragma once
+
+#include <yql/essentials/tools/yql_facade_run/yql_facade_run.h>
+
+#include <util/generic/string.h>
+#include <util/generic/hash.h>
+
+namespace NYql {
+
+class TYqlRunTool: public TFacadeRunner {
+public:
+ TYqlRunTool();
+
+private:
+ THashMap<TString, TString> TablesMapping_;
+ THashMap<TString, TString> TablesDirMapping_;
+ bool KeepTemp_ = false;
+ TString TmpDir_;
+};
+
+} // NYql
diff --git a/yql/tools/yqlrun/ya.make b/yql/tools/yqlrun/ya.make
index 71458d0874..e0483c0712 100644
--- a/yql/tools/yqlrun/ya.make
+++ b/yql/tools/yqlrun/ya.make
@@ -4,7 +4,6 @@ ALLOCATOR(J)
SRCS(
yqlrun.cpp
- gateway_spec.cpp
)
IF (OS_LINUX)
@@ -14,36 +13,37 @@ IF (OS_LINUX)
ENDIF()
PEERDIR(
- contrib/libs/protobuf
- library/cpp/getopt
- library/cpp/yson
- library/cpp/svnversion
- yql/essentials/sql/pg
- yql/essentials/core/cbo/simple
+ yql/tools/yqlrun/http
+ yql/tools/yqlrun/lib
+
+ yt/yql/providers/yt/comp_nodes/llvm14
+ yt/yql/providers/yt/codec/codegen
+
+ yql/essentials/providers/common/provider
+ yql/essentials/providers/common/udf_resolve
+ yql/essentials/minikql/invoke_builtins
+ yql/essentials/minikql/invoke_builtins/llvm14
+ yql/essentials/minikql/comp_nodes/llvm14
+ yql/essentials/parser/pg_wrapper
+ yql/essentials/parser/pg_catalog
+ yql/essentials/core/services/mounts
yql/essentials/core/facade
+ yql/essentials/core/pg_ext
yql/essentials/core/file_storage
yql/essentials/core/file_storage/proto
- yql/essentials/core/file_storage/http_download
- yql/essentials/core/pg_ext
- yql/essentials/core/services/mounts
- yql/essentials/minikql/comp_nodes/llvm14
- yql/essentials/protos
+ yql/essentials/core
yql/essentials/public/udf/service/exception_policy
yql/essentials/utils/backtrace
- yql/essentials/core
- yql/essentials/sql/v1/format
- yql/essentials/providers/common/codec
- yql/essentials/providers/common/comp_nodes
- yql/essentials/providers/common/proto
- yql/essentials/providers/common/provider
- yql/essentials/providers/common/udf_resolve
- yt/yql/providers/yt/gateway/file
- yt/yql/providers/yt/codec/codegen
- yt/yql/providers/yt/comp_nodes/llvm14
- yql/essentials/core/url_preprocessing
- yql/tools/yqlrun/http
- yql/essentials/parser/pg_wrapper
- yql/essentials/public/result_format
+ yql/essentials/utils/log
+ yql/essentials/minikql
+ yql/essentials/protos
+ yql/essentials/ast
+ yql/essentials/sql/pg
+
+ library/cpp/getopt
+ library/cpp/logger
+
+ contrib/libs/protobuf
)
YQL_LAST_ABI_VERSION()
diff --git a/yql/tools/yqlrun/yqlrun.cpp b/yql/tools/yqlrun/yqlrun.cpp
index a1ae1fc6e0..827c9a13c3 100644
--- a/yql/tools/yqlrun/yqlrun.cpp
+++ b/yql/tools/yqlrun/yqlrun.cpp
@@ -1,320 +1,38 @@
-#include "gateway_spec.h"
-
+#include <yql/tools/yqlrun/lib/yqlrun_lib.h>
#include <yql/tools/yqlrun/http/yql_server.h>
-#include <yt/yql/providers/yt/gateway/file/yql_yt_file.h>
-#include <yt/yql/providers/yt/gateway/file/yql_yt_file_services.h>
-#include <yt/yql/providers/yt/provider/yql_yt_provider.h>
-#include <yt/yql/providers/yt/provider/yql_yt_provider_impl.h>
-#include <yql/essentials/core/cbo/simple/cbo_simple.h>
-#include <yql/essentials/core/url_preprocessing/url_preprocessing.h>
-
-#include <yql/essentials/sql/v1/format/sql_format.h>
-
-#include <yql/essentials/providers/pg/provider/yql_pg_provider.h>
-#include <yql/essentials/providers/common/codec/yql_codec.h>
-#include <yql/essentials/providers/common/provider/yql_provider_names.h>
-#include <yql/essentials/providers/common/udf_resolve/yql_simple_udf_resolver.h>
#include <yql/essentials/providers/common/udf_resolve/yql_outproc_udf_resolver.h>
+#include <yql/essentials/providers/common/udf_resolve/yql_simple_udf_resolver.h>
#include <yql/essentials/providers/common/udf_resolve/yql_udf_resolver_with_index.h>
-#include <yql/essentials/providers/common/proto/gateways_config.pb.h>
-#include <yql/essentials/providers/common/comp_nodes/yql_factory.h>
-#include <yql/essentials/minikql/invoke_builtins/mkql_builtins.h>
-#include <yql/essentials/minikql/computation/mkql_computation_node.h>
-#include <yql/essentials/minikql/comp_nodes/mkql_factories.h>
+#include <yql/essentials/providers/common/provider/yql_provider_names.h>
+
#include <yql/essentials/minikql/mkql_function_registry.h>
-#include <yql/essentials/minikql/mkql_utils.h>
-#include <yql/essentials/protos/yql_mount.pb.h>
+#include <yql/essentials/minikql/invoke_builtins/mkql_builtins.h>
+#include <yql/essentials/utils/backtrace/backtrace.h>
+#include <yql/essentials/utils/log/tls_backend.h>
+#include <yql/essentials/utils/log/log.h>
+#include <yql/essentials/parser/pg_wrapper/interface/parser.h>
+#include <yql/essentials/parser/pg_catalog/catalog.h>
#include <yql/essentials/protos/pg_ext.pb.h>
-#include <yql/essentials/core/yql_library_compiler.h>
-#include <yql/essentials/core/facade/yql_facade.h>
-#include <yql/essentials/core/pg_ext/yql_pg_ext.h>
#include <yql/essentials/core/file_storage/file_storage.h>
-#include <yql/essentials/core/file_storage/http_download/http_download.h>
#include <yql/essentials/core/file_storage/proto/file_storage.pb.h>
-#include <yql/essentials/core/peephole_opt/yql_opt_peephole_physical.h>
#include <yql/essentials/core/services/mounts/yql_mounts.h>
-#include <yql/essentials/utils/log/log.h>
-#include <yql/essentials/utils/backtrace/backtrace.h>
-#include <yql/essentials/utils/log/tls_backend.h>
-#include <yql/essentials/public/udf/udf_validate.h>
-#include <yql/essentials/parser/pg_wrapper/interface/comp_factory.h>
-#include <yql/essentials/parser/pg_wrapper/interface/parser.h>
-#include <yql/essentials/public/result_format/yql_result_format_response.h>
-#include <yql/essentials/public/result_format/yql_result_format_type.h>
-#include <yql/essentials/public/result_format/yql_result_format_data.h>
+#include <yql/essentials/core/facade/yql_facade.h>
+#include <yql/essentials/core/pg_ext/yql_pg_ext.h>
+#include <yql/essentials/core/yql_udf_resolver.h>
+#include <yql/essentials/core/yql_udf_index.h>
+#include <yql/essentials/core/yql_library_compiler.h>
+#include <yql/essentials/ast/yql_expr.h>
-#include <library/cpp/logger/stream.h>
-#include <library/cpp/svnversion/svnversion.h>
#include <library/cpp/getopt/last_getopt.h>
-
-#include <library/cpp/yson/public.h>
-#include <library/cpp/yson/writer.h>
+#include <library/cpp/logger/stream.h>
#include <google/protobuf/text_format.h>
-#include <util/stream/file.h>
-#include <util/system/user.h>
-#include <util/folder/iterator.h>
-#include <util/folder/dirut.h>
-#include <util/string/join.h>
-#include <util/string/builder.h>
-
-#ifdef __unix__
-#include <sys/resource.h>
-#endif
-
-const ui32 PRETTY_FLAGS = NYql::TAstPrintFlags::PerLine | NYql::TAstPrintFlags::ShortQuote |
- NYql::TAstPrintFlags::AdaptArbitraryContent;
-
-class TMultiProgs {
-public:
- TMultiProgs(NYql::TProgramFactory& factory, const TString& programFile, const TString& programText, size_t concurrentCount = 1) {
- Infos.reserve(concurrentCount);
- BaseProg = factory.Create(programFile, programText);
- if (concurrentCount) {
- factory.UnrepeatableRandom();
- }
- for (auto i = concurrentCount; i > 0; --i) {
- Infos.emplace_back(TProgInfo({factory.Create(programFile, programText), {}}));
- }
- }
-
- bool ParseYql() {
- bool result = BaseProg->ParseYql();
- for (auto& info: Infos) {
- info.Prog->ParseYql();
- }
-
- return result;
- }
-
- bool ParseSql(const NSQLTranslation::TTranslationSettings& settings) {
- bool result = BaseProg->ParseSql(settings);
- for (auto& info: Infos) {
- info.Prog->ParseSql(settings);
- }
-
- return result;
- }
-
- bool Compile(const TString& username) {
- bool result = BaseProg->Compile(username);
- for (auto& info: Infos) {
- info.Prog->Compile(username);
- }
-
- return result;
- }
-
- template<class T>
- bool CompareStreams(const TString& compareGoal, IOutputStream& out, const T& base, const T& concurrent) const {
- const auto baseSize = base.Size();
- const auto concurentSize = concurrent.Size();
- if (baseSize == concurentSize && memcmp(base.Data(), concurrent.Data(), baseSize) == 0) {
- return true;
- }
- out << "Difference " << compareGoal << " of cuncurrent mode is not the same as base run. Size base: " << baseSize <<
- ", size concurrent: " << concurentSize;
- if (concurentSize) {
- out << ", concurrent stream: " << Endl << concurrent.Data();
- } else {
- out << ", base stream: " << Endl << base.Data();
- }
- return false;
- }
-
- void PrintExprTo(IOutputStream& out) {
- TStringStream baseSS;
- auto baseAst = ConvertToAst(*BaseProg->ExprRoot(), BaseProg->ExprCtx(), NYql::TExprAnnotationFlags::None, true);
- baseAst.Root->PrettyPrintTo(baseSS, PRETTY_FLAGS);
- for (auto& info: Infos) {
- TStringStream ss;
- auto ast = ConvertToAst(*info.Prog->ExprRoot(), BaseProg->ExprCtx(), NYql::TExprAnnotationFlags::None, true);
- ast.Root->PrettyPrintTo(ss, PRETTY_FLAGS);
- if (!CompareStreams("expr representation", out, baseSS, ss)) {
- return;
- }
- }
- out << baseSS.Data();
- }
-
- void FinalizeIssues() {
- BaseProg->FinalizeIssues();
- for (auto& info : Infos) {
- info.Prog->FinalizeIssues();
- }
- }
-
- void PrintErrorsTo(IOutputStream& out) const {
- TStringStream baseSS;
- BaseProg->PrintErrorsTo(baseSS);
- for (auto& info: Infos) {
- TStringStream ss;
- info.Prog->PrintErrorsTo(ss);
- if (!CompareStreams("error", out, baseSS, ss)) {
- return;
- }
- }
- out << baseSS.Data();
- }
-
- void PrintAstTo(IOutputStream& out) const {
- TStringStream baseSS;
- BaseProg->AstRoot()->PrettyPrintTo(out, PRETTY_FLAGS);
- for (auto& info: Infos) {
- TStringStream ss;
- info.Prog->AstRoot()->PrettyPrintTo(out, PRETTY_FLAGS);
- if (!CompareStreams("AST", out, baseSS, ss)) {
- return;
- }
- }
- out << baseSS.Data();
- }
-
- void SetProgressWriter(NYql::TOperationProgressWriter writer) {
- BaseProg->SetProgressWriter(writer);
- for (auto& info: Infos) {
- info.Prog->SetProgressWriter(writer);
- }
- }
-
- void SetValidateOptions(NKikimr::NUdf::EValidateMode validateMode) {
- BaseProg->SetValidateOptions(validateMode);
- for (auto& info: Infos) {
- info.Prog->SetValidateOptions(validateMode);
- }
- }
-
- void SetParametersYson(const TString& parameters) {
- BaseProg->SetParametersYson(parameters);
- for (auto& info : Infos) {
- info.Prog->SetParametersYson(parameters);
- }
- }
-
- void Print(IOutputStream* exprOut, IOutputStream* planOut) {
- bool cleanPlan = true;
- BaseProg->Print(exprOut, planOut, cleanPlan);
- }
-
- void ResultsOut(IOutputStream& out) {
- if (BaseProg->HasResults()) {
- BaseProg->ConfigureYsonResultFormat(NYson::EYsonFormat::Pretty);
- out << BaseProg->ResultsAsString();
- }
- // Multirun results are ignored
- }
-
- void DiscoveredDataOut(IOutputStream& out) {
- if (auto data = BaseProg->GetDiscoveredData()) {
- TStringInput in(*data);
- NYson::ReformatYsonStream(&in, &out, NYson::EYsonFormat::Pretty);
- }
- }
-
- void LineageOut(IOutputStream& out) {
- if (auto data = BaseProg->GetLineage()) {
- TStringInput in(*data);
- NYson::ReformatYsonStream(&in, &out, NYson::EYsonFormat::Pretty);
- }
- }
-
- NYql::TProgram::TStatus Run(const TString& username, IOutputStream* traceOut, IOutputStream* tracePlan, IOutputStream* exprOut, bool withTypes) {
- YQL_ENSURE(Infos.empty());
- return BaseProg->Run(username, traceOut, tracePlan, exprOut, withTypes);
- }
-
- NYql::TProgram::TStatus Optimize(const TString& username, IOutputStream* traceOut, IOutputStream* tracePlan, IOutputStream* exprOut, bool withTypes) {
- YQL_ENSURE(Infos.empty());
- return BaseProg->Optimize(username, traceOut, tracePlan, exprOut, withTypes);
- }
-
- NYql::TProgram::TStatus Validate(const TString& username, IOutputStream* exprOut, bool withTypes) {
- YQL_ENSURE(Infos.empty());
- return BaseProg->Validate(username, exprOut, withTypes);
- }
-
- NYql::TProgram::TStatus Discover(const TString& username) {
- YQL_ENSURE(Infos.empty());
- return BaseProg->Discover(username);
- }
-
- NYql::TProgram::TStatus Lineage(const TString& username, IOutputStream* traceOut, IOutputStream* exprOut, bool withTypes) {
- YQL_ENSURE(Infos.empty());
- return BaseProg->Lineage(username, traceOut, exprOut, withTypes);
- }
-
- NYql::TProgram::TStatus Peephole(const TString& username, IOutputStream* exprOut, bool withTypes) {
- YQL_ENSURE(Infos.empty());
- using namespace NYql;
-
- class TPeepHolePipelineConfigurator : public IPipelineConfigurator {
- public:
- TPeepHolePipelineConfigurator() = default;
-
- void AfterCreate(TTransformationPipeline* pipeline) const final {
- Y_UNUSED(pipeline);
- }
-
- void AfterTypeAnnotation(TTransformationPipeline* pipeline) const final {
- Y_UNUSED(pipeline);
- }
-
- void AfterOptimize(TTransformationPipeline* pipeline) const final {
- pipeline->Add(CreateYtWideFlowTransformer(nullptr), "WideFlow");
- pipeline->Add(CreateYtBlockInputTransformer(nullptr), "BlockInput");
- pipeline->Add(MakePeepholeOptimization(pipeline->GetTypeAnnotationContext()), "PeepHole");
- }
- };
-
- TPeepHolePipelineConfigurator config;
- auto status = BaseProg->OptimizeWithConfig(username, config);
- if (exprOut && BaseProg->ExprRoot()) {
- auto ast = ConvertToAst(*BaseProg->ExprRoot(), BaseProg->ExprCtx(), withTypes ? TExprAnnotationFlags::Types : TExprAnnotationFlags::None, true);
- ui32 prettyFlags = TAstPrintFlags::ShortQuote;
- if (!withTypes) {
- prettyFlags |= TAstPrintFlags::PerLine;
- }
- ast.Root->PrettyPrintTo(*exprOut, prettyFlags);
- }
- return status;
- }
-
- NYql::TProgram::TStatus RunAsyncAndWait(const TString& username, IOutputStream* traceOut, IOutputStream* tracePlan, IOutputStream* exprOut,
- bool withTypes, bool& emulateOutputForMultirun) {
- NYql::TProgram::TStatus baseStatus = BaseProg->Run(username, traceOut, tracePlan, exprOut, withTypes);
- // switch this flag only after base run
- emulateOutputForMultirun = true;
- for (auto& info: Infos) {
- info.Future = info.Prog->RunAsync(username, nullptr, nullptr, nullptr, withTypes);
- YQL_ENSURE(info.Future.Initialized());
- }
-
- for (bool wasAsync = true; wasAsync;) {
- wasAsync = false;
- for (auto& info: Infos) {
- auto status = info.Future.GetValueSync();
- if (status == NYql::TProgram::TStatus::Async) {
- wasAsync = true;
- info.Future = info.Prog->ContinueAsync();
- } else if (status == NYql::TProgram::TStatus::Error) {
- baseStatus = status;
- }
- }
- }
- return baseStatus;
- }
-
-private:
- struct TProgInfo {
- NYql::TProgramPtr Prog;
- NYql::TProgram::TFutureStatus Future;
- };
-
- NYql::TProgramPtr BaseProg;
- TVector<TProgInfo> Infos;
-};
+#include <util/generic/string.h>
+#include <util/generic/vector.h>
+#include <util/generic/hash.h>
+#include <util/datetime/base.h>
using namespace NYql;
using namespace NKikimr::NMiniKQL;
@@ -347,7 +65,7 @@ private:
void CommonInit(const NLastGetopt::TOptsParseResult& res, const TString& udfResolverPath, bool filterSysCalls,
const TVector<TString>& udfsPaths, TFileStoragePtr fileStorage,
- IUdfResolver::TPtr& udfResolver, NKikimr::NMiniKQL::IFunctionRegistry::TPtr funcRegistry, TUdfIndex::TPtr& udfIndex) {
+ IUdfResolver::TPtr& udfResolver, IFunctionRegistry::TPtr funcRegistry, TUdfIndex::TPtr& udfIndex) {
if (fileStorage && res.Has("scan-udfs")) {
if (!udfResolverPath) {
@@ -385,535 +103,14 @@ THolder<TMessage> ParseProtoConfig(const TString& cfgFile) {
return config;
}
-int Main(int argc, const char *argv[])
-{
- Y_UNUSED(NUdf::GetStaticSymbols());
- using namespace NLastGetopt;
- TOpts opts = TOpts::Default();
- TString programFile;
- TVector<TString> tablesMappingList;
- TVector<TString> tablesDirMappingList;
- THashMap<TString, TString> tablesMapping;
- THashMap<TString, TString> tablesDirMapping;
- TVector<TString> filesMappingList;
- TUserDataTable filesMapping;
- TVector<TString> urlsMappingList;
- TString exprFile;
- TString resultFile;
- TString planFile;
- TString errFile;
- TString tmpDir;
- TVector<TString> udfsPaths;
- TString udfsDir;
- TString validateModeStr(NUdf::ValidateModeAsStr(NUdf::EValidateMode::Greedy));
- THashSet<TString> gatewayTypes;
- TString mountConfig;
- TString udfResolverPath;
- bool udfResolverFilterSyscalls = false;
- THashMap<TString, TString> clusterMapping;
- THashSet<TString> sqlFlags;
- clusterMapping["plato"] = YtProviderName;
- clusterMapping["pg_catalog"] = PgProviderName;
- clusterMapping["information_schema"] = PgProviderName;
- ui32 progsConcurrentCount = 0;
- TString paramsFile;
- ui16 syntaxVersion;
- ui64 memLimit;
- TString gatewaysCfgFile;
- TString fsCfgFile;
- TString pgExtConfig;
-
- opts.AddHelpOption();
- opts.AddLongOption('p', "program", "program file").StoreResult<TString>(&programFile);
- opts.AddLongOption('s', "sql", "program is SQL query").NoArgument();
- opts.AddLongOption("pg", "program has PG syntax").NoArgument();
- opts.AddLongOption('t', "table", "table@file").AppendTo(&tablesMappingList);
- opts.AddLongOption("tables-dir", "cluster@dir").AppendTo(&tablesDirMappingList);
- opts.AddLongOption('C', "cluster", "set cluster to service mapping").RequiredArgument("name@service").Handler(new TStoreMappingFunctor(&clusterMapping));
- opts.AddLongOption("ndebug", "should be at first argument, do not show debug info in error output").NoArgument();
- opts.AddLongOption("parse-only", "exit after program has been parsed").NoArgument();
- opts.AddLongOption("print-ast", "print AST after loading").NoArgument();
- opts.AddLongOption("compile-only", "exit after program has been compiled").NoArgument();
- opts.AddLongOption("print-expr", "print rebuild AST before execution").NoArgument();
- opts.AddLongOption("with-types", "print types annotation").NoArgument();
- opts.AddLongOption("trace-opt", "print AST in the begin of each transformation").NoArgument();
- opts.AddLongOption("expr-file", "print AST to that file instead of stdout").StoreResult<TString>(&exprFile);
- opts.AddLongOption("print-result", "print program execution result to stdout").NoArgument();
- opts.AddLongOption("result-file", "print program execution result to file").StoreResult<TString>(&resultFile);
- opts.AddLongOption("plan-file", "print program plan to file").StoreResult<TString>(&planFile);
- opts.AddLongOption("err-file", "print validate/optimize/runtime errors to file").StoreResult<TString>(&errFile);
- opts.AddLongOption('P',"trace-plan", "print plan before execution").NoArgument();
- opts.AddLongOption('L', "show-log", "show logs").NoArgument();
- opts.AddLongOption('D', "discover", "discover tables in the program").NoArgument();
- opts.AddLongOption("validate", "exit after program has been validated").NoArgument();
- opts.AddLongOption("lineage", "exit after data lineage has been calculated").NoArgument();
- opts.AddLongOption('O',"optimize", "optimize expression").NoArgument();
- opts.AddLongOption('R',"run", "run expression using input/output tables").NoArgument();
- opts.AddLongOption("peephole", "perform peephole stage of expression using input/output tables").NoArgument();
- opts.AddLongOption('M',"multirun", "run expression in multi-evaluate (race) mode, as option set concurrent count").StoreResult(&progsConcurrentCount).DefaultValue(progsConcurrentCount);
- opts.AddLongOption('f', "file", "name@path").AppendTo(&filesMappingList);
- opts.AddLongOption('U', "url", "name@path").AppendTo(&urlsMappingList);
- opts.AddLongOption('m', "mounts", "Mount points config file.").StoreResult(&mountConfig);
- opts.AddLongOption("opt-collision", "provider optimize collision mode").NoArgument();
- opts.AddLongOption("keep-temp", "keep temporary tables").NoArgument();
- opts.AddLongOption("full-expr", "avoid buffering of expr/plan").NoArgument();
- opts.AddLongOption("show-progress", "report operation progress").NoArgument();
- opts.AddLongOption("tmp-dir", "directory for temporary tables").StoreResult<TString>(&tmpDir);
- opts.AddLongOption("reverse-mrkey", "reverse keys for Map/Reduce opeations").NoArgument();
- opts.AddLongOption('u', "udf", "Load shared library with UDF by given path").AppendTo(&udfsPaths);
- opts.AddLongOption("udfs-dir", "Load all shared libraries with UDFs found in given directory").StoreResult(&udfsDir);
- opts.AddLongOption("mem-info", "Print memory usage information").NoArgument();
- opts.AddLongOption("validate-mode", "validate udf mode, available values: " + NUdf::ValidateModeAvailables()).StoreResult<TString>(&validateModeStr).DefaultValue(validateModeStr);
- opts.AddLongOption('G', "gateways", "used gateways").SplitHandler(&gatewayTypes, ',').DefaultValue(YtProviderName);
- opts.AddLongOption("udf-resolver", "Path to udf-resolver").Optional().RequiredArgument("PATH").StoreResult(&udfResolverPath);
- opts.AddLongOption("udf-resolver-filter-syscalls", "Filter syscalls in udf resolver")
- .Optional()
- .NoArgument()
- .SetFlag(&udfResolverFilterSyscalls);
- opts.AddLongOption("scan-udfs", "Scan specified udfs with external udf-resolver to use static function registry").NoArgument();
- opts.AddLongOption("params-file", "Query parameters values in YSON format").StoreResult(&paramsFile);
-
- opts.AddLongOption("sql-flags", "SQL translator pragma flags").SplitHandler(&sqlFlags, ',');
- opts.AddLongOption("syntax-version", "SQL syntax version").StoreResult(&syntaxVersion).DefaultValue(1);
- opts.AddLongOption("ansi-lexer", "Use ansi lexer").NoArgument();
- opts.AddLongOption("assume-ydb-on-slash", "Assume YDB provider if cluster name starts with '/'").NoArgument();
- opts.AddLongOption("mem-limit", "Set memory limit in megabytes").StoreResult(&memLimit).DefaultValue(0);
- opts.AddLongOption("gateways-cfg", "gateways configuration file").Optional().RequiredArgument("FILE").StoreResult(&gatewaysCfgFile);
- opts.AddLongOption("fs-cfg", "fs configuration file").Optional().RequiredArgument("FILE").StoreResult(&fsCfgFile);
- opts.AddLongOption("test-format", "compare formatted query's AST with the original query's AST (only syntaxVersion=1 is supported)").NoArgument();
- opts.AddLongOption("show-kernels", "show all Arrow kernel families").NoArgument();
- opts.AddLongOption("pg-ext", "pg extensions config file").StoreResult(&pgExtConfig);
- opts.AddLongOption("with-final-issues", "Include some final messages (like statistic) in issues").NoArgument();
- opts.AddLongOption("validate-result-format", "Check that result-format can parse Result").NoArgument();
- opts.AddLongOption("test-antlr4", "check antlr4 parser").NoArgument();
-
- opts.SetFreeArgsMax(0);
- TOptsParseResult res(&opts, argc, argv);
- auto builtins = CreateBuiltinRegistry();
- if (res.Has("show-kernels")) {
- auto families = builtins->GetAllKernelFamilies();
- Sort(families, [](const auto& x, const auto& y) { return x.first < y.first; });
- ui64 totalKernels = 0;
- for (const auto& f : families) {
- auto numKernels = f.second->GetAllKernels().size();
- Cout << f.first << ": " << numKernels << " kernels\n";
- totalKernels += numKernels;
- }
-
- Cout << "Total kernel families: " << families.size() << ", kernels: " << totalKernels << "\n";
- return 0;
- }
-
- NPg::SetSqlLanguageParser(NSQLTranslationPG::CreateSqlLanguageParser());
- NPg::LoadSystemFunctions(*NSQLTranslationPG::CreateSystemFunctionsParser());
- if (!pgExtConfig.empty()) {
- auto config = ParseProtoConfig<NProto::TPgExtensions>(pgExtConfig);
- Y_ABORT_UNLESS(config);
- TVector<NPg::TExtensionDesc> extensions;
- PgExtensionsFromProto(*config, extensions);
- NPg::RegisterExtensions(extensions, false,
- *NSQLTranslationPG::CreateExtensionSqlParser(),
- NKikimr::NMiniKQL::CreateExtensionLoader().get());
- }
-
- NPg::GetSqlLanguageParser()->Freeze();
-
- const bool parseOnly = res.Has("parse-only");
- const bool compileOnly = res.Has("compile-only");
- const bool hasValidate = !parseOnly && !compileOnly;
- if (hasValidate && !gatewayTypes.contains(YtProviderName)) {
- Cerr << "At least one gateway from the list " << Join(",", YtProviderName).Quote() << " must be specified" << Endl;
- return 1;
- }
-
- for (auto& s: tablesMappingList) {
- TStringBuf tableName, filePath;
- TStringBuf(s).Split('@', tableName, filePath);
- if (tableName.empty() || filePath.empty()) {
- Cerr << "Incorrect table mapping, expected form table@file, e.g. yt.plato.Input@input.txt" << Endl;
- return 1;
- }
- tablesMapping[tableName] = filePath;
- }
-
- for (auto& s : tablesDirMappingList) {
- TStringBuf clusterName, dirPath;
- TStringBuf(s).Split('@', clusterName, dirPath);
- if (clusterName.empty() || dirPath.empty()) {
- Cerr << "Incorrect table directory mapping, expected form cluster@dir, e.g. yt.plato@/tmp/tables" << Endl;
- return 1;
- }
- tablesDirMapping[clusterName] = dirPath;
- for (const auto& entry : TDirIterator(TFsPath(dirPath))) {
- if (auto entryPath = TFsPath(entry.fts_path); entryPath.IsFile() && entryPath.GetExtension() == "txt") {
- auto tableName = TString(clusterName).append('.').append(entryPath.RelativeTo(TFsPath(dirPath)).GetPath());
- tableName = tableName.substr(0, tableName.size() - 4); // remove .txt extension
- tablesMapping[tableName] = entryPath.GetPath();
- }
- }
- }
-
- if (hasValidate) {
- for (auto& s : filesMappingList) {
- TStringBuf fileName, filePath;
- TStringBuf(s).Split('@', fileName, filePath);
- if (fileName.empty() || filePath.empty()) {
- Cerr << "Incorrect file mapping, expected form name@path, e.g. MyFile@file.txt" << Endl;
- return 1;
- }
-
- auto& file = filesMapping[TUserDataKey::File(GetDefaultFilePrefix() + fileName)];
- file.Type = EUserDataType::PATH;
- file.Data = filePath;
- }
-
- for (auto& s : urlsMappingList) {
- TStringBuf name, path;
- TStringBuf(s).Split('@', name, path);
- if (name.empty() || path.empty()) {
- Cerr << "Incorrect url mapping, expected form name@path, e.g. MyFile@sbr:123456" << Endl;
- return 1;
- }
-
- auto& block = filesMapping[TUserDataKey::File(GetDefaultFilePrefix() + name)];
- block.Type = EUserDataType::URL;
- block.Data = path;
- }
- }
-
- if (memLimit) {
-#ifdef __unix__
- memLimit *= 1024 * 1024;
-
- struct rlimit rl;
-
- if (getrlimit(RLIMIT_AS, &rl)) {
- throw TSystemError() << "Cannot getrlimit(RLIMIT_AS)";
- }
-
- rl.rlim_cur = memLimit;
- if (setrlimit(RLIMIT_AS, &rl)) {
- throw TSystemError() << "Cannot setrlimit(RLIMIT_AS) to " << memLimit << " bytes";
- }
-#else
- Cerr << "Memory limit can not be set on this platfrom" << Endl;
- return 1;
-#endif
- }
-
- IOutputStream* errStream = &Cerr;
- THolder<TFixedBufferFileOutput> errFileHolder;
- if (!errFile.empty()) {
- errFileHolder.Reset(new TFixedBufferFileOutput(errFile));
- errStream = errFileHolder.Get();
- }
-
- TExprContext ctx;
- ctx.NextUniqueId = NPg::GetSqlLanguageParser()->GetContext().NextUniqueId;
- IModuleResolver::TPtr moduleResolver;
- if (!mountConfig.empty()) {
- TModulesTable modules;
- auto mount = ParseProtoConfig<NYqlMountConfig::TMountConfig>(mountConfig);
- Y_ABORT_UNLESS(mount);
- FillUserDataTableFromFileSystem(*mount, filesMapping);
-
- if (!CompileLibraries(filesMapping, ctx, modules)) {
- *errStream << "Errors on compile libraries:" << Endl;
- ctx.IssueManager.GetIssues().PrintTo(*errStream);
- return -1;
- }
-
- moduleResolver = std::make_shared<TModuleResolver>(std::move(modules), ctx.NextUniqueId, clusterMapping, sqlFlags, hasValidate);
- } else {
- if (!GetYqlDefaultModuleResolver(ctx, moduleResolver, clusterMapping, hasValidate)) {
- *errStream << "Errors loading default YQL libraries:" << Endl;
- ctx.IssueManager.GetIssues().PrintTo(*errStream);
- return -1;
- }
- }
-
- TExprContext::TFreezeGuard freezeGuard(ctx);
-
- TString programText;
- if (programFile.empty()) {
- Cerr << "Missing --program argument\n";
- return -1;
- }
-
- if (programFile == TStringBuf("-")) {
- programFile = TStringBuf("-stdin-");
- programText = Cin.ReadAll();
- } else {
- programText = TFileInput(programFile).ReadAll();
- }
-
- THolder<TFileStorageConfig> fsConfig;
- if (!fsCfgFile.empty()) {
- fsConfig = ParseProtoConfig<TFileStorageConfig>(fsCfgFile);
- if (!fsConfig) {
- return 1;
- }
- } else {
- fsConfig = MakeHolder<TFileStorageConfig>();
- }
-
- THolder<TGatewaysConfig> gatewaysConfig;
- if (!gatewaysCfgFile.empty()) {
- gatewaysConfig = ParseProtoConfig<TGatewaysConfig>(gatewaysCfgFile);
- if (!gatewaysConfig) {
- return 1;
- }
- if (gatewaysConfig->HasSqlCore()) {
- sqlFlags.insert(gatewaysConfig->GetSqlCore().GetTranslationFlags().begin(), gatewaysConfig->GetSqlCore().GetTranslationFlags().end());
- }
- }
-
- TFileStoragePtr fileStorage;
- if (hasValidate) {
- NMiniKQL::FindUdfsInDir(udfsDir, &udfsPaths);
-
- fileStorage = WithAsync(CreateFileStorage(*fsConfig));
- }
-
- IUdfResolver::TPtr udfResolver;
- auto funcRegistry = CreateFunctionRegistry(&NYql::NBacktrace::KikimrBackTrace, CreateBuiltinRegistry(), true, udfsPaths);
- TUdfIndex::TPtr udfIndex;
- CommonInit(res, udfResolverPath, udfResolverFilterSyscalls, udfsPaths, fileStorage, udfResolver, funcRegistry, udfIndex);
-
- TAutoPtr<IThreadPool> ytExecutionQueue;
- TVector<TDataProviderInitializer> dataProvidersInit;
-
- dataProvidersInit.push_back(GetPgDataProviderInitializer());
-
- bool emulateOutputForMultirun = false;
- if (hasValidate) {
- if (gatewayTypes.contains(YtProviderName) || res.Has("opt-collision")) {
- auto yqlNativeServices = NFile::TYtFileServices::Make(funcRegistry.Get(), tablesMapping, fileStorage, tmpDir, res.Has("keep-temp"), tablesDirMapping);
- auto ytNativeGateway = CreateYtFileGateway(yqlNativeServices, &emulateOutputForMultirun);
- dataProvidersInit.push_back(GetYtNativeDataProviderInitializer(ytNativeGateway, MakeSimpleCBOOptimizerFactory(), {}));
- }
- }
-
- if (hasValidate && res.Has("opt-collision")) {
- ExtProviderSpecific(funcRegistry.Get(), dataProvidersInit);
- }
-
- TProgramFactory factory(true, funcRegistry.Get(), ctx.NextUniqueId, dataProvidersInit, "yqlrun");
- factory.AddUserDataTable(filesMapping);
- factory.SetModules(moduleResolver);
- factory.SetFileStorage(fileStorage);
- if (gatewaysConfig && gatewaysConfig->HasFs()) {
- factory.SetUrlPreprocessing(new NYql::TUrlPreprocessing(*gatewaysConfig));
- }
- factory.SetUdfIndex(udfIndex, new TUdfIndexPackageSet());
- factory.SetUdfResolver(udfResolver);
- factory.SetGatewaysConfig(gatewaysConfig.Get());
- factory.EnableRangeComputeFor();
-
- auto program = MakeHolder<TMultiProgs>(factory, programFile, programText, progsConcurrentCount);
- if (res.Has("show-progress")) {
- program->SetProgressWriter([](const TOperationProgress& progress) {
- Cerr << "Operation: [" << progress.Category << "] " << progress.Id << ", state: " << progress.State << "\n";
- });
- }
-
- if (paramsFile) {
- TString parameters = TFileInput(paramsFile).ReadAll();
- program->SetParametersYson(parameters);
- }
-
- if (res.Has("sql") || res.Has("pg")) {
- google::protobuf::Arena arena;
- NSQLTranslation::TTranslationSettings settings;
- settings.Arena = &arena;
- settings.PgParser = res.Has("pg");
- settings.ClusterMapping = clusterMapping;
- settings.Flags = sqlFlags;
- settings.SyntaxVersion = syntaxVersion;
- settings.AnsiLexer = res.Has("ansi-lexer");
- settings.TestAntlr4 = res.Has("test-antlr4");
- settings.V0Behavior = NSQLTranslation::EV0Behavior::Report;
- settings.AssumeYdbOnClusterWithSlash = res.Has("assume-ydb-on-slash");
- if (res.Has("discover")) {
- settings.Mode = NSQLTranslation::ESqlMode::DISCOVERY;
- }
- if (!program->ParseSql(settings)) {
- program->PrintErrorsTo(*errStream);
- return 1;
- }
- if (res.Has("test-format") && syntaxVersion == 1) {
- TString formattedProgramText;
- NYql::TIssues issues;
- auto formatter = NSQLFormat::MakeSqlFormatter(settings);
- if (!formatter->Format(programText, formattedProgramText, issues)) {
- Cerr << "Format failed: ";
- issues.PrintTo(Cerr);
- return 1;
- }
-
- auto frmProgram = MakeHolder<TMultiProgs>(factory, "formatted SQL", formattedProgramText, progsConcurrentCount);
- if (!frmProgram->ParseSql(settings)) {
- frmProgram->PrintErrorsTo(*errStream);
- return 1;
- }
-
- TStringStream SrcQuery, FrmQuery;
-
- program->PrintAstTo(SrcQuery);
- frmProgram->PrintAstTo(FrmQuery);
- if (SrcQuery.Str() != FrmQuery.Str()) {
- Cerr << "source query's AST and formatted query's AST are not same\n";
- return 1;
- }
- }
- } else {
- if (!program->ParseYql()) {
- program->PrintErrorsTo(*errStream);
- return 1;
- }
- }
-
- if (res.Has("print-ast")) {
- program->PrintAstTo(Cout);
- }
-
-
- if (res.Has("parse-only"))
- return 0;
-
- const TString username = GetUsername();
- bool withTypes = res.Has("with-types");
- IOutputStream* traceOut = res.Has("trace-opt") ? &Cerr : nullptr;
-
- IOutputStream* exprOut = nullptr;
- THolder<TFixedBufferFileOutput> exprFileHolder;
- if (res.Has("print-expr")) {
- exprOut = &Cout;
- } else if (!exprFile.empty()) {
- exprFileHolder.Reset(new TFixedBufferFileOutput(exprFile));
- exprOut = exprFileHolder.Get();
- }
-
- IOutputStream* tracePlan = nullptr;
- THolder<TFixedBufferFileOutput> planFileHolder;
- if (res.Has("trace-plan")) {
- tracePlan = &Cout;
- }
- else if (!planFile.empty()) {
- planFileHolder.Reset(new TFixedBufferFileOutput(planFile));
- tracePlan = planFileHolder.Get();
- }
-
- if (res.Has("show-log")) {
- using namespace ::NYql::NLog;
- InitLogger(&Cerr);
- NLog::EComponentHelpers::ForEach([](NLog::EComponent c) {
- YqlLogger().SetComponentLevel(c, ELevel::DEBUG);
- });
- }
- if (res.Has("trace-opt")) {
- NLog::YqlLogger().SetComponentLevel(NLog::EComponent::Core, NLog::ELevel::TRACE);
- NLog::YqlLogger().SetComponentLevel(NLog::EComponent::CoreEval, NLog::ELevel::TRACE);
- NLog::YqlLogger().SetComponentLevel(NLog::EComponent::CorePeepHole, NLog::ELevel::TRACE);
- }
-
- program->SetValidateOptions(NUdf::ValidateModeByStr(validateModeStr));
-
- TProgram::TStatus status = TProgram::TStatus::Ok;
- const bool fullExpr = res.Has("full-expr");
- IOutputStream* fullTracePlan = fullExpr ? tracePlan : nullptr;
- IOutputStream* fullExprOut = fullExpr ? exprOut : nullptr;
-
- if (!program->Compile(username)) {
- program->PrintErrorsTo(*errStream);
- return 1;
- }
-
- if (res.Has("compile-only")) {
- if (res.Has("print-expr")) {
- program->PrintExprTo(Cout);
- }
- return 0;
- }
-
- if (res.Has("multirun")) {
- status = program->RunAsyncAndWait(username, traceOut, fullTracePlan, fullExprOut, withTypes, emulateOutputForMultirun);
- } else if (res.Has("peephole")) {
- status = program->Peephole(username, exprOut, withTypes);
- } else if (res.Has("run")) {
- status = program->Run(username, traceOut, fullTracePlan, fullExprOut, withTypes);
- } else if (res.Has("optimize")) {
- status = program->Optimize(username, traceOut, fullTracePlan, fullExprOut, withTypes);
- } else if (res.Has("validate")) {
- status = program->Validate(username, exprOut, withTypes);
- } else if (res.Has("discover")) {
- status = program->Discover(username);
- } else if (res.Has("lineage")) {
- status = program->Lineage(username, traceOut, exprOut, withTypes);
- }
-
- if (res.Has("with-final-issues")) {
- program->FinalizeIssues();
- }
- program->PrintErrorsTo(*errStream);
- if (status == TProgram::TStatus::Error) {
- return 1;
- }
-
- if (!fullExpr && !res.Has("peephole")) {
- program->Print(exprOut, tracePlan);
- }
-
- IOutputStream* resultOut = nullptr;
- THolder<TFixedBufferFileOutput> resultFileHolder;
- if (res.Has("print-result")) {
- resultOut = &Cout;
- } else if (!resultFile.empty()) {
- resultFileHolder.Reset(new TFixedBufferFileOutput(resultFile));
- resultOut = resultFileHolder.Get();
- }
-
- if (resultOut) {
- if (res.Has("discover")) {
- program->DiscoveredDataOut(*resultOut);
- } else if (res.Has("lineage")) {
- program->LineageOut(*resultOut);
- } else {
- if (res.Has("validate-result-format")) {
- TString str;
- TStringOutput out(str);
- program->ResultsOut(out);
- if (!str.empty()) {
- auto node = NYT::NodeFromYsonString(str);
- for (const auto& r : NResult::ParseResponse(node)) {
- for (const auto& write : r.Writes) {
- if (write.Type) {
- NResult::TEmptyTypeVisitor visitor;
- NResult::ParseType(*write.Type, visitor);
- }
-
- if (write.Type && write.Data) {
- NResult::TEmptyDataVisitor visitor;
- NResult::ParseData(*write.Type, *write.Data, visitor);
- }
- }
- }
- }
-
- resultOut->Write(str.data(), str.size());
- } else {
- program->ResultsOut(*resultOut);
- }
- }
- }
-
- NLog::CleanupLogger();
-
- return 0;
-}
int RunUI(int argc, const char* argv[])
{
+ Cerr << "yqlrun ABI version: " << NKikimr::NUdf::CurrentAbiVersionStr() << Endl;
+
+ NYql::NBacktrace::RegisterKikimrFatalActions();
+ NYql::NBacktrace::EnableKikimrSymbolize();
+
TVector<TString> udfsPaths;
TString udfsDir;
TString mountConfig;
@@ -1064,18 +261,11 @@ int RunUI(int argc, const char* argv[])
}
int main(int argc, const char *argv[]) {
- if (argc > 1 && TString(argv[1]) != TStringBuf("--ndebug")) {
- Cerr << "yqlrun ABI version: " << NKikimr::NUdf::CurrentAbiVersionStr() << Endl;
- }
-
- NYql::NBacktrace::RegisterKikimrFatalActions();
- NYql::NBacktrace::EnableKikimrSymbolize();
-
try {
if (argc > 1 && TString(argv[1]) == TStringBuf("ui")) {
return RunUI(argc, argv);
} else {
- return Main(argc, argv);
+ return NYql::TYqlRunTool().Main(argc, argv);
}
}
catch (...) {