diff options
author | vvvv <vvvv@ydb.tech> | 2023-08-25 23:22:17 +0300 |
---|---|---|
committer | vvvv <vvvv@ydb.tech> | 2023-08-25 23:41:08 +0300 |
commit | 4f101706e390fb7029a8bf4a4ab587bf2e66a365 (patch) | |
tree | 58582db3f13d29ffa65b489f3e9c4944497fc25e | |
parent | fcd88645dec99a9d5bcac7869e9561e651fd9a29 (diff) | |
download | ydb-4f101706e390fb7029a8bf4a4ab587bf2e66a365.tar.gz |
Move yqlrun
30 files changed, 3163 insertions, 1 deletions
diff --git a/build/conf/project_specific/yql_udf.conf b/build/conf/project_specific/yql_udf.conf index 7c1256da47..9da1324b93 100644 --- a/build/conf/project_specific/yql_udf.conf +++ b/build/conf/project_specific/yql_udf.conf @@ -37,7 +37,7 @@ module YQL_UDF_TEST: PY3TEST_BIN { PEERDIR(yql/library/udf_test) DEPENDS(ydb/library/yql/tools/astdiff) - DEPENDS(yql/tools/yqlrun) + DEPENDS(ydb/library/yql/tools/yqlrun) DATA(arcadia/ydb/library/yql/mount) DATA(arcadia/ydb/library/yql/cfg/udf_test) } diff --git a/ydb/library/yql/tools/CMakeLists.txt b/ydb/library/yql/tools/CMakeLists.txt index 1bb9aec0d0..73d4896db4 100644 --- a/ydb/library/yql/tools/CMakeLists.txt +++ b/ydb/library/yql/tools/CMakeLists.txt @@ -8,3 +8,4 @@ add_subdirectory(astdiff) add_subdirectory(mrjob) +add_subdirectory(yqlrun) diff --git a/ydb/library/yql/tools/ya.make b/ydb/library/yql/tools/ya.make index afc6bc7b1f..062df88349 100644 --- a/ydb/library/yql/tools/ya.make +++ b/ydb/library/yql/tools/ya.make @@ -2,4 +2,5 @@ RECURSE( astdiff mrjob mrjob/test + yqlrun ) diff --git a/ydb/library/yql/tools/yqlrun/CMakeLists.darwin-x86_64.txt b/ydb/library/yql/tools/yqlrun/CMakeLists.darwin-x86_64.txt new file mode 100644 index 0000000000..d25eb2002d --- /dev/null +++ b/ydb/library/yql/tools/yqlrun/CMakeLists.darwin-x86_64.txt @@ -0,0 +1,59 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + +add_subdirectory(http) + +add_executable(yqlrun) +target_compile_options(yqlrun PRIVATE + -DUSE_CURRENT_UDF_ABI_VERSION +) +target_link_libraries(yqlrun PUBLIC + contrib-libs-cxxsupp + yutil + library-cpp-cpuid_check + contrib-libs-protobuf + library-cpp-getopt + library-cpp-yson + yql-sql-pg + yql-core-facade + yql-core-file_storage + core-file_storage-proto + core-file_storage-http_download + core-services-mounts + minikql-comp_nodes-llvm + library-yql-protos + udf-service-exception_policy + yql-utils-backtrace + library-yql-core + sql-v1-format + providers-common-codec + providers-common-comp_nodes + providers-common-proto + providers-common-provider + providers-common-udf_resolve + providers-dq-provider + yt-gateway-file + yql-core-url_preprocessing + tools-yqlrun-http + yql-parser-pg_wrapper +) +target_link_options(yqlrun PRIVATE + -Wl,-platform_version,macos,11.0,11.0 + -fPIC + -fPIC + -framework + CoreFoundation +) +target_sources(yqlrun PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/library/yql/tools/yqlrun/yqlrun.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/yql/tools/yqlrun/gateway_spec.cpp +) +target_allocator(yqlrun + cpp-malloc-jemalloc +) +vcs_info(yqlrun) diff --git a/ydb/library/yql/tools/yqlrun/CMakeLists.linux-aarch64.txt b/ydb/library/yql/tools/yqlrun/CMakeLists.linux-aarch64.txt new file mode 100644 index 0000000000..87eca78368 --- /dev/null +++ b/ydb/library/yql/tools/yqlrun/CMakeLists.linux-aarch64.txt @@ -0,0 +1,63 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + +add_subdirectory(http) + +add_executable(yqlrun) +target_compile_options(yqlrun PRIVATE + -DUSE_CURRENT_UDF_ABI_VERSION +) +target_link_libraries(yqlrun PUBLIC + contrib-libs-linux-headers + contrib-libs-cxxsupp + yutil + contrib-libs-protobuf + library-cpp-getopt + library-cpp-yson + yql-sql-pg + yql-core-facade + yql-core-file_storage + core-file_storage-proto + core-file_storage-http_download + core-services-mounts + minikql-comp_nodes-llvm + library-yql-protos + udf-service-exception_policy + yql-utils-backtrace + library-yql-core + sql-v1-format + providers-common-codec + providers-common-comp_nodes + providers-common-proto + providers-common-provider + providers-common-udf_resolve + providers-dq-provider + yt-gateway-file + yql-core-url_preprocessing + tools-yqlrun-http + yql-parser-pg_wrapper +) +target_link_options(yqlrun PRIVATE + -ldl + -lrt + -Wl,--no-as-needed + -fPIC + -fPIC + -lpthread + -lrt + -ldl + -lutil +) +target_sources(yqlrun PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/library/yql/tools/yqlrun/yqlrun.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/yql/tools/yqlrun/gateway_spec.cpp +) +target_allocator(yqlrun + cpp-malloc-jemalloc +) +vcs_info(yqlrun) diff --git a/ydb/library/yql/tools/yqlrun/CMakeLists.linux-x86_64.txt b/ydb/library/yql/tools/yqlrun/CMakeLists.linux-x86_64.txt new file mode 100644 index 0000000000..bafc698219 --- /dev/null +++ b/ydb/library/yql/tools/yqlrun/CMakeLists.linux-x86_64.txt @@ -0,0 +1,64 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + +add_subdirectory(http) + +add_executable(yqlrun) +target_compile_options(yqlrun PRIVATE + -DUSE_CURRENT_UDF_ABI_VERSION +) +target_link_libraries(yqlrun PUBLIC + contrib-libs-linux-headers + contrib-libs-cxxsupp + yutil + library-cpp-cpuid_check + contrib-libs-protobuf + library-cpp-getopt + library-cpp-yson + yql-sql-pg + yql-core-facade + yql-core-file_storage + core-file_storage-proto + core-file_storage-http_download + core-services-mounts + minikql-comp_nodes-llvm + library-yql-protos + udf-service-exception_policy + yql-utils-backtrace + library-yql-core + sql-v1-format + providers-common-codec + providers-common-comp_nodes + providers-common-proto + providers-common-provider + providers-common-udf_resolve + providers-dq-provider + yt-gateway-file + yql-core-url_preprocessing + tools-yqlrun-http + yql-parser-pg_wrapper +) +target_link_options(yqlrun PRIVATE + -ldl + -lrt + -Wl,--no-as-needed + -fPIC + -fPIC + -lpthread + -lrt + -ldl + -lutil +) +target_sources(yqlrun PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/library/yql/tools/yqlrun/yqlrun.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/yql/tools/yqlrun/gateway_spec.cpp +) +target_allocator(yqlrun + cpp-malloc-jemalloc +) +vcs_info(yqlrun) diff --git a/ydb/library/yql/tools/yqlrun/CMakeLists.txt b/ydb/library/yql/tools/yqlrun/CMakeLists.txt new file mode 100644 index 0000000000..f8b31df0c1 --- /dev/null +++ b/ydb/library/yql/tools/yqlrun/CMakeLists.txt @@ -0,0 +1,17 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + +if (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND NOT HAVE_CUDA) + include(CMakeLists.linux-aarch64.txt) +elseif (CMAKE_SYSTEM_NAME STREQUAL "Darwin" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64") + include(CMakeLists.darwin-x86_64.txt) +elseif (WIN32 AND CMAKE_SYSTEM_PROCESSOR STREQUAL "AMD64" AND NOT HAVE_CUDA) + include(CMakeLists.windows-x86_64.txt) +elseif (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND NOT HAVE_CUDA) + include(CMakeLists.linux-x86_64.txt) +endif() diff --git a/ydb/library/yql/tools/yqlrun/CMakeLists.windows-x86_64.txt b/ydb/library/yql/tools/yqlrun/CMakeLists.windows-x86_64.txt new file mode 100644 index 0000000000..f5841dc63a --- /dev/null +++ b/ydb/library/yql/tools/yqlrun/CMakeLists.windows-x86_64.txt @@ -0,0 +1,52 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + +add_subdirectory(http) + +add_executable(yqlrun) +target_compile_options(yqlrun PRIVATE + -DUSE_CURRENT_UDF_ABI_VERSION +) +target_link_libraries(yqlrun PUBLIC + contrib-libs-cxxsupp + yutil + library-cpp-cpuid_check + contrib-libs-protobuf + library-cpp-getopt + library-cpp-yson + yql-sql-pg + yql-core-facade + yql-core-file_storage + core-file_storage-proto + core-file_storage-http_download + core-services-mounts + minikql-comp_nodes-llvm + library-yql-protos + udf-service-exception_policy + yql-utils-backtrace + library-yql-core + sql-v1-format + providers-common-codec + providers-common-comp_nodes + providers-common-proto + providers-common-provider + providers-common-udf_resolve + providers-dq-provider + yt-gateway-file + yql-core-url_preprocessing + tools-yqlrun-http + yql-parser-pg_wrapper +) +target_sources(yqlrun PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/library/yql/tools/yqlrun/yqlrun.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/yql/tools/yqlrun/gateway_spec.cpp +) +target_allocator(yqlrun + system_allocator +) +vcs_info(yqlrun) diff --git a/ydb/library/yql/tools/yqlrun/gateway_spec.cpp b/ydb/library/yql/tools/yqlrun/gateway_spec.cpp new file mode 100644 index 0000000000..96f7d4dadd --- /dev/null +++ b/ydb/library/yql/tools/yqlrun/gateway_spec.cpp @@ -0,0 +1,14 @@ +#include <ydb/library/yql/tools/yqlrun/gateway_spec.h> + +#include <library/cpp/getopt/last_getopt.h> + +using namespace NYql; +using namespace NKikimr::NMiniKQL; + +void ExtProviderSpecific(const IFunctionRegistry* funcRegistry, + TVector<TDataProviderInitializer>& dataProvidersInit, + const THashMap<std::pair<TString, TString>, TVector<std::pair<TString, TString>>>& rtmrTableAttributes) { + Y_UNUSED(funcRegistry); + Y_UNUSED(dataProvidersInit); + Y_UNUSED(rtmrTableAttributes); +} diff --git a/ydb/library/yql/tools/yqlrun/gateway_spec.h b/ydb/library/yql/tools/yqlrun/gateway_spec.h new file mode 100644 index 0000000000..784fc513db --- /dev/null +++ b/ydb/library/yql/tools/yqlrun/gateway_spec.h @@ -0,0 +1,8 @@ +#pragma once + +#include <ydb/library/yql/minikql/mkql_function_registry.h> +#include <ydb/library/yql/core/yql_data_provider.h> + +void ExtProviderSpecific(const NKikimr::NMiniKQL::IFunctionRegistry* funcRegistry, + TVector<NYql::TDataProviderInitializer>& dataProvidersInit, + const THashMap<std::pair<TString, TString>, TVector<std::pair<TString, TString>>>& rtmrTableAttributes = {}); diff --git a/ydb/library/yql/tools/yqlrun/http/CMakeLists.darwin-x86_64.txt b/ydb/library/yql/tools/yqlrun/http/CMakeLists.darwin-x86_64.txt new file mode 100644 index 0000000000..23e04360a4 --- /dev/null +++ b/ydb/library/yql/tools/yqlrun/http/CMakeLists.darwin-x86_64.txt @@ -0,0 +1,45 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(tools-yqlrun-http) +target_compile_options(tools-yqlrun-http PRIVATE + -DUSE_CURRENT_UDF_ABI_VERSION +) +target_link_libraries(tools-yqlrun-http PUBLIC + contrib-libs-cxxsupp + yutil + library-cpp-charset + cpp-http-misc + cpp-http-server + library-cpp-json + library-cpp-logger + cpp-mime-types + cpp-openssl-io + cpp-string_utils-quote + library-cpp-uri + library-cpp-yson + cpp-yson-node + yql-core-facade + yql-core-type_ann + providers-dq-provider + providers-result-provider + yql-parser-pg_wrapper + sql-v1-format + yt-gateway-file + providers-yt-provider + yql-core-url_preprocessing +) +target_sources(tools-yqlrun-http PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/library/yql/tools/yqlrun/http/assets_servlet.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/yql/tools/yqlrun/http/server.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/yql/tools/yqlrun/http/servlet.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/yql/tools/yqlrun/http/yql_functions_servlet.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/yql/tools/yqlrun/http/yql_servlet.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/yql/tools/yqlrun/http/yql_server.cpp +) diff --git a/ydb/library/yql/tools/yqlrun/http/CMakeLists.linux-aarch64.txt b/ydb/library/yql/tools/yqlrun/http/CMakeLists.linux-aarch64.txt new file mode 100644 index 0000000000..66e95df361 --- /dev/null +++ b/ydb/library/yql/tools/yqlrun/http/CMakeLists.linux-aarch64.txt @@ -0,0 +1,46 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(tools-yqlrun-http) +target_compile_options(tools-yqlrun-http PRIVATE + -DUSE_CURRENT_UDF_ABI_VERSION +) +target_link_libraries(tools-yqlrun-http PUBLIC + contrib-libs-linux-headers + contrib-libs-cxxsupp + yutil + library-cpp-charset + cpp-http-misc + cpp-http-server + library-cpp-json + library-cpp-logger + cpp-mime-types + cpp-openssl-io + cpp-string_utils-quote + library-cpp-uri + library-cpp-yson + cpp-yson-node + yql-core-facade + yql-core-type_ann + providers-dq-provider + providers-result-provider + yql-parser-pg_wrapper + sql-v1-format + yt-gateway-file + providers-yt-provider + yql-core-url_preprocessing +) +target_sources(tools-yqlrun-http PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/library/yql/tools/yqlrun/http/assets_servlet.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/yql/tools/yqlrun/http/server.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/yql/tools/yqlrun/http/servlet.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/yql/tools/yqlrun/http/yql_functions_servlet.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/yql/tools/yqlrun/http/yql_servlet.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/yql/tools/yqlrun/http/yql_server.cpp +) diff --git a/ydb/library/yql/tools/yqlrun/http/CMakeLists.linux-x86_64.txt b/ydb/library/yql/tools/yqlrun/http/CMakeLists.linux-x86_64.txt new file mode 100644 index 0000000000..66e95df361 --- /dev/null +++ b/ydb/library/yql/tools/yqlrun/http/CMakeLists.linux-x86_64.txt @@ -0,0 +1,46 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(tools-yqlrun-http) +target_compile_options(tools-yqlrun-http PRIVATE + -DUSE_CURRENT_UDF_ABI_VERSION +) +target_link_libraries(tools-yqlrun-http PUBLIC + contrib-libs-linux-headers + contrib-libs-cxxsupp + yutil + library-cpp-charset + cpp-http-misc + cpp-http-server + library-cpp-json + library-cpp-logger + cpp-mime-types + cpp-openssl-io + cpp-string_utils-quote + library-cpp-uri + library-cpp-yson + cpp-yson-node + yql-core-facade + yql-core-type_ann + providers-dq-provider + providers-result-provider + yql-parser-pg_wrapper + sql-v1-format + yt-gateway-file + providers-yt-provider + yql-core-url_preprocessing +) +target_sources(tools-yqlrun-http PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/library/yql/tools/yqlrun/http/assets_servlet.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/yql/tools/yqlrun/http/server.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/yql/tools/yqlrun/http/servlet.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/yql/tools/yqlrun/http/yql_functions_servlet.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/yql/tools/yqlrun/http/yql_servlet.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/yql/tools/yqlrun/http/yql_server.cpp +) diff --git a/ydb/library/yql/tools/yqlrun/http/CMakeLists.txt b/ydb/library/yql/tools/yqlrun/http/CMakeLists.txt new file mode 100644 index 0000000000..f8b31df0c1 --- /dev/null +++ b/ydb/library/yql/tools/yqlrun/http/CMakeLists.txt @@ -0,0 +1,17 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + +if (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND NOT HAVE_CUDA) + include(CMakeLists.linux-aarch64.txt) +elseif (CMAKE_SYSTEM_NAME STREQUAL "Darwin" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64") + include(CMakeLists.darwin-x86_64.txt) +elseif (WIN32 AND CMAKE_SYSTEM_PROCESSOR STREQUAL "AMD64" AND NOT HAVE_CUDA) + include(CMakeLists.windows-x86_64.txt) +elseif (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND NOT HAVE_CUDA) + include(CMakeLists.linux-x86_64.txt) +endif() diff --git a/ydb/library/yql/tools/yqlrun/http/CMakeLists.windows-x86_64.txt b/ydb/library/yql/tools/yqlrun/http/CMakeLists.windows-x86_64.txt new file mode 100644 index 0000000000..23e04360a4 --- /dev/null +++ b/ydb/library/yql/tools/yqlrun/http/CMakeLists.windows-x86_64.txt @@ -0,0 +1,45 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(tools-yqlrun-http) +target_compile_options(tools-yqlrun-http PRIVATE + -DUSE_CURRENT_UDF_ABI_VERSION +) +target_link_libraries(tools-yqlrun-http PUBLIC + contrib-libs-cxxsupp + yutil + library-cpp-charset + cpp-http-misc + cpp-http-server + library-cpp-json + library-cpp-logger + cpp-mime-types + cpp-openssl-io + cpp-string_utils-quote + library-cpp-uri + library-cpp-yson + cpp-yson-node + yql-core-facade + yql-core-type_ann + providers-dq-provider + providers-result-provider + yql-parser-pg_wrapper + sql-v1-format + yt-gateway-file + providers-yt-provider + yql-core-url_preprocessing +) +target_sources(tools-yqlrun-http PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/library/yql/tools/yqlrun/http/assets_servlet.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/yql/tools/yqlrun/http/server.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/yql/tools/yqlrun/http/servlet.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/yql/tools/yqlrun/http/yql_functions_servlet.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/yql/tools/yqlrun/http/yql_servlet.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/yql/tools/yqlrun/http/yql_server.cpp +) diff --git a/ydb/library/yql/tools/yqlrun/http/assets_servlet.cpp b/ydb/library/yql/tools/yqlrun/http/assets_servlet.cpp new file mode 100644 index 0000000000..7e11e5cff0 --- /dev/null +++ b/ydb/library/yql/tools/yqlrun/http/assets_servlet.cpp @@ -0,0 +1,99 @@ +#include "assets_servlet.h" + +#include <library/cpp/http/misc/httpdate.h> +#include <library/cpp/mime/types/mime.h> +#include <library/cpp/uri/uri.h> + +#include <util/system/fstat.h> +#include <util/system/file.h> + + +namespace NYql { +namespace NHttp { + +/////////////////////////////////////////////////////////////////////////////// +// TAssetsServlet +/////////////////////////////////////////////////////////////////////////////// +TAssetsServlet::TAssetsServlet( + const TString& baseUrl, + const TString& baseDir, + const TString& indexFile) + : BaseUrl_(baseUrl) + , BaseDir_(baseDir) + , IndexFile_(indexFile) +{ +} + +void TAssetsServlet::DoGet(const TRequest& req, TResponse& resp) const +{ + TAsset asset; + LoadAsset(req, &asset); + + TString lastModified = FormatHttpDate(asset.LastModified); + resp.Headers.AddHeader("Last-Modified", lastModified); + if (asset.ContentType) { + resp.ContentType = asset.ContentType; + } + resp.Body = asset.Data; +} + +TString TAssetsServlet::SafeFilePath(const TString& basePath, TStringBuf reqPath) const +{ + Y_ENSURE_EX(reqPath.Head(BaseUrl_.size()) == BaseUrl_, + THttpError(HTTP_BAD_REQUEST) << "invalid url prefix"); + + size_t shift = BaseUrl_ == TStringBuf("/") ? 0 : BaseUrl_.size(); + TStringBuf skipped = reqPath.Skip(shift); + const char* p = skipped.data(); + size_t len = skipped.size(); + + if (p[0] != '/' || (len > 2 && p[1] == '.' && p[2] == '.')) + ythrow THttpError(HTTP_BAD_REQUEST); + + TString buf(skipped.data(), len); + char* b = buf.begin(); + char* e = b + len; + ::NUri::TUri::PathOperation(b, e, 1); + + return basePath + TStringBuf(b, e); +} + +bool TAssetsServlet::IsCachedOnClient(const TRequest& req, const TAsset& asset) const +{ + const TString* value = req.RD.HeaderIn("If-Modified-Since"); + if (value) { + time_t mt = parse_http_date(*value); + return (mt >= asset.LastModified); + } + + return false; +} + +void TAssetsServlet::LoadAsset(const TRequest& req, TAsset* asset) const +{ + TString filePath = SafeFilePath(BaseDir_, req.RD.ScriptName()); + + TFileStat stat(filePath); + if (stat.IsDir()) { + filePath += "/" + IndexFile_; + stat = TFileStat(filePath); + } + + if (!stat.IsFile()) { + ythrow THttpError(HTTP_NOT_FOUND) + << "File '" << filePath << "' not found"; + } + + asset->LastModified = stat.MTime; + asset->ContentType = mimetypeByExt(filePath.data()); + + if (IsCachedOnClient(req, *asset)) { + ythrow THttpError(HTTP_NOT_MODIFIED); + } + + TFile file(filePath, EOpenModeFlag::RdOnly); + asset->Data = TBlob::FromFile(file); +} + +} // namspace NNttp +} // namspace NYql diff --git a/ydb/library/yql/tools/yqlrun/http/assets_servlet.h b/ydb/library/yql/tools/yqlrun/http/assets_servlet.h new file mode 100644 index 0000000000..688270769f --- /dev/null +++ b/ydb/library/yql/tools/yqlrun/http/assets_servlet.h @@ -0,0 +1,41 @@ +#pragma once + +#include "servlet.h" + + +namespace NYql { +namespace NHttp { + +/////////////////////////////////////////////////////////////////////////////// +// TAssetsServlet +/////////////////////////////////////////////////////////////////////////////// +class TAssetsServlet: public IServlet +{ + struct TAsset { + TBlob Data; + time_t LastModified; + const char* ContentType; + }; + +public: + TAssetsServlet( + const TString& baseUrl, + const TString& baseDir, + const TString& indexFile); + + void DoGet(const TRequest& req, TResponse& resp) const override final; + +private: + TString SafeFilePath(const TString& basePath, TStringBuf reqPath) const; + bool IsCachedOnClient(const TRequest& req, const TAsset& asset) const; + void LoadAsset(const TRequest& req, TAsset* asset) const; + +private: + TString BaseUrl_; + TString BaseDir_; + TString IndexFile_; +}; + + +} // namspace NNttp +} // namspace NYql diff --git a/ydb/library/yql/tools/yqlrun/http/server.cpp b/ydb/library/yql/tools/yqlrun/http/server.cpp new file mode 100644 index 0000000000..44626f405a --- /dev/null +++ b/ydb/library/yql/tools/yqlrun/http/server.cpp @@ -0,0 +1,237 @@ +#include "server.h" + +#include <library/cpp/json/json_writer.h> + +#include <util/generic/map.h> +#include <util/datetime/base.h> +#include <util/datetime/cputimer.h> +#include <util/system/hostname.h> +#include <util/stream/buffer.h> +#include <util/generic/bt_exception.h> + + +using namespace NYql; +using namespace NHttp; + +namespace { + +TBlob ToErrorsJson(const TStringBuf& message) +{ + TBufferOutput output; + NJson::TJsonWriter writer(&output, false); + writer.OpenMap(); + + writer.Write(TStringBuf("errors")); + { + writer.OpenArray(); + writer.Write(message); + writer.CloseArray(); + } + + writer.CloseMap(); + writer.Flush(); + + return TBlob::FromBuffer(output.Buffer()); +} + +/////////////////////////////////////////////////////////////////////////////// +// THttpReplier +/////////////////////////////////////////////////////////////////////////////// +class THttpReplier: public THttpClientRequestEx +{ +public: + THttpReplier(const TServer* server) + : Server_(server) + { + } + +private: + TString GetRemoteAddr() const { + NAddr::TOpaqueAddr remoteAddr; + int rc = getpeername(Socket(), remoteAddr.MutableAddr(), remoteAddr.LenPtr()); + Y_ENSURE(rc == 0, "Can't get remote address"); + return PrintHost(remoteAddr); + } + + bool Reply(void*) final { + TSimpleTimer timer; + + if (!ProcessHeaders()) { + return true; + } + RD.Scan(); + TResponse resp; + + const IServlet* servlet = Server_->FindServlet(RD.ScriptName()); + if (servlet != nullptr) { + try { + TRequest req{Input(), RD, Buf}; + + TStringBuf method = TStringBuf(Input().FirstLine()).Before(' '); + if (method == TStringBuf("GET")) { + servlet->DoGet(req, resp); + } else if (method == TStringBuf("POST")) { + servlet->DoPost(req, resp); + } else { + resp.Code = HTTP_METHOD_NOT_ALLOWED; + } + } catch (const THttpError& e) { + resp.Code = e.GetCode(); + resp.ContentType = TStringBuf("application/json"); + if (resp.Code >= HTTP_BAD_REQUEST) { + Cerr << e.what() << Endl; + TStringBuf message = e.AsStrBuf().RNextTok(':'); + if (!message.empty()) { + resp.Body = ToErrorsJson(message); + } + } + } catch (const TWithBackTrace<yexception>& e) { + Cerr << e.what() << Endl; + const TBackTrace* bt = e.BackTrace(); + bt->PrintTo(Cerr); + resp.Code = HTTP_INTERNAL_SERVER_ERROR; + resp.ContentType = TStringBuf("application/json"); + TStringBuf message = e.AsStrBuf().RNextTok(':'); + if (!message.empty()) { + resp.Body = ToErrorsJson(message); + } + } catch (const yexception& e) { + Cerr << e.what() << Endl; + resp.Code = HTTP_INTERNAL_SERVER_ERROR; + resp.ContentType = TStringBuf("application/json"); + TStringBuf message = e.AsStrBuf().RNextTok(':'); + if (!message.empty()) { + resp.Body = ToErrorsJson(message); + } + } + } else { + resp.Code = HTTP_NOT_FOUND; + resp.Body = ToErrorsJson( + TString("Unknown path ") + RD.ScriptName()); + resp.ContentType = TStringBuf("application/json"); + } + + resp.Headers.AddHeader(THttpInputHeader("Content-Type", resp.ContentType)); + resp.OutTo(Output()); + + Cout << '[' << TInstant::Now().ToStringUpToSeconds() << "] " + << GetRemoteAddr() << ' ' + << Input().FirstLine() << ' ' << static_cast<int>(resp.Code) + << " took: " << timer.Get().MilliSeconds() << "ms" + << Endl; + return true; + } + +private: + const TServer* Server_; +}; + +THttpServerOptions CreateOptions(const TServerConfig& config) +{ + THttpServerOptions opts; + + if (config.IsBind(TBind::OnLocal)) { + opts.AddBindAddress("localhost", config.GetPort()); + } + + if (config.IsBind(TBind::OnRemote)) { + opts.AddBindAddress(HostName(), config.GetPort()); + } + + opts.SetPort(config.GetPort()); + opts.SetThreads(4); + opts.SetMaxQueueSize(40); + + return opts; +} + +} // namspace + + +namespace NYql { +namespace NHttp { + +void TServerConfig::InitCliOptions(NLastGetopt::TOpts& opts) +{ + opts.AddLongOption("port", "listening port") + .StoreResult<ui16>(&Port_) + .DefaultValue("3000"); + opts.AddLongOption("local", "bind on local interface").NoArgument(); + opts.AddLongOption("remote", "bind on remote interface").NoArgument(); + opts.AddLongOption("assets", "path to folder with web intrface files") + .StoreResult<TString>(&AssetsPath_).DefaultValue(AssetsPath_); +} +void TServerConfig::ParseFromCli(NLastGetopt::TOptsParseResult& res) +{ + if (res.Has("local")) { + Bind(TBind::OnLocal); + } + + if (res.Has("remote")) { + Bind(TBind::OnRemote); + } + + if (Bind_ == 0) { + Bind(TBind::OnLocal); + } +} + +/////////////////////////////////////////////////////////////////////////////// +// TServer +/////////////////////////////////////////////////////////////////////////////// +TServer::TServer(const TServerConfig& config) + : THttpServer(this, CreateOptions(config)) +{ + Cout << "Start server listening on\n"; + if (config.IsBind(TBind::OnLocal)) { + Cout << "http://localhost:" << config.GetPort() << "/\n"; + } + if (config.IsBind(TBind::OnRemote)) { + Cout << "http://" << HostName() << ':' << config.GetPort() << "/\n"; + } + Cout << Endl; +} + +TServer::~TServer() +{ + for (auto& it: Servlets_) { + delete it.second; + } +} + +void TServer::RegisterServlet(const TString& path, TAutoPtr<IServlet> sp) +{ + Cout << "Registered servlet on " << path << Endl; + + IServlet* servlet = sp.Release(); + Servlets_.push_back(std::pair<TString, IServlet*>(path, servlet)); +} + +TClientRequest* TServer::CreateClient() +{ + return new THttpReplier(this); +} + +const IServlet* TServer::FindServlet(TStringBuf path) const +{ + for (const auto& servlet: Servlets_) { + const TString& urlPrefix = servlet.first; + if (path.StartsWith(urlPrefix)) + return servlet.second; + } + + return nullptr; +} + +TVector<TString> TServer::GetUrlMappings() const +{ + TVector<TString> urls; + urls.reserve(Servlets_.size()); + for (const auto& s: Servlets_) { + urls.push_back(s.first); + } + return urls; +} + +} // namspace NWeb +} // namspace NYql diff --git a/ydb/library/yql/tools/yqlrun/http/server.h b/ydb/library/yql/tools/yqlrun/http/server.h new file mode 100644 index 0000000000..5f4a118920 --- /dev/null +++ b/ydb/library/yql/tools/yqlrun/http/server.h @@ -0,0 +1,93 @@ +#pragma once + +#include "servlet.h" + +#include <library/cpp/http/server/http_ex.h> +#include <library/cpp/getopt/last_getopt.h> + +#include <util/generic/string.h> +#include <util/system/types.h> + + +namespace NYql { +namespace NHttp { + +/////////////////////////////////////////////////////////////////////////////// +// TServerConfig +/////////////////////////////////////////////////////////////////////////////// +struct TBind { + enum { + OnLocal = 0x01, + OnRemote = 0x02, + }; +}; + +class TServerConfig +{ +public: + TServerConfig() + : Bind_(0) + , Port_(0) + { + } + + inline TServerConfig& Bind(ui32 on) { + Bind_ |= on; + return *this; + } + + inline bool IsBind(ui32 on) const { + return Bind_ & on; + } + + inline TServerConfig& SetPort(ui16 port) { + Port_ = port; + return *this; + } + + inline ui16 GetPort() const { + return Port_; + } + + inline TServerConfig& SetAssetsPath(const TString& path) { + AssetsPath_ = path; + return *this; + } + + inline const TString& GetAssetsPath() const { + return AssetsPath_; + } + + void InitCliOptions(NLastGetopt::TOpts& opts); + void ParseFromCli(NLastGetopt::TOptsParseResult& res); + +private: + ui32 Bind_; + ui16 Port_; + TString AssetsPath_; +}; + +/////////////////////////////////////////////////////////////////////////////// +// TServer +/////////////////////////////////////////////////////////////////////////////// +class TServer: + public THttpServer, public THttpServer::ICallBack, + private TNonCopyable +{ +public: + TServer(const TServerConfig& config); + ~TServer(); + + void RegisterServlet(const TString& path, TAutoPtr<IServlet> sp); + const IServlet* FindServlet(TStringBuf path) const; + TVector<TString> GetUrlMappings() const; + +private: + TClientRequest* CreateClient() override final; + +private: + TVector<std::pair<TString, IServlet*>> Servlets_; +}; + +} // namspace NNttp +} // namspace NYql diff --git a/ydb/library/yql/tools/yqlrun/http/servlet.cpp b/ydb/library/yql/tools/yqlrun/http/servlet.cpp new file mode 100644 index 0000000000..9b0e2f7d35 --- /dev/null +++ b/ydb/library/yql/tools/yqlrun/http/servlet.cpp @@ -0,0 +1,53 @@ +#include "yql_servlet.h" + + +namespace NYql { +namespace NHttp { + +/////////////////////////////////////////////////////////////////////////////// +// TResponse +/////////////////////////////////////////////////////////////////////////////// +void TResponse::OutTo(IOutputStream& out) const { + TVector<IOutputStream::TPart> parts; + const size_t FIRST_LINE_PARTS = 3; + const size_t HEADERS_PARTS = Headers.Count() * 4; + const size_t CONTENT_PARTS = 5; + parts.reserve(FIRST_LINE_PARTS + HEADERS_PARTS + CONTENT_PARTS); + + // first line + parts.push_back(IOutputStream::TPart(TStringBuf("HTTP/1.1 "))); + parts.push_back(IOutputStream::TPart(HttpCodeStrEx(Code))); + parts.push_back(IOutputStream::TPart::CrLf()); + + // headers + for (THttpHeaders::TConstIterator i = Headers.Begin(); i != Headers.End(); ++i) { + parts.push_back(IOutputStream::TPart(i->Name())); + parts.push_back(IOutputStream::TPart(TStringBuf(": "))); + parts.push_back(IOutputStream::TPart(i->Value())); + parts.push_back(IOutputStream::TPart::CrLf()); + } + + char buf[50]; + + if (!Body.Empty()) { + TMemoryOutput mo(buf, sizeof(buf)); + mo << Body.Size(); + + parts.push_back(IOutputStream::TPart(TStringBuf("Content-Length: "))); + parts.push_back(IOutputStream::TPart(buf, mo.Buf() - buf)); + parts.push_back(IOutputStream::TPart::CrLf()); + } + + // body + parts.push_back(IOutputStream::TPart::CrLf()); + + if (!Body.Empty()) { + parts.push_back(IOutputStream::TPart(Body.AsCharPtr(), Body.Size())); + } + + out.Write(parts.data(), parts.size()); +} + + +} // namspace NNttp +} // namspace NYql diff --git a/ydb/library/yql/tools/yqlrun/http/servlet.h b/ydb/library/yql/tools/yqlrun/http/servlet.h new file mode 100644 index 0000000000..25b1e561d3 --- /dev/null +++ b/ydb/library/yql/tools/yqlrun/http/servlet.h @@ -0,0 +1,77 @@ +#pragma once + +#include <library/cpp/http/misc/httpcodes.h> +#include <library/cpp/http/misc/httpreqdata.h> +#include <library/cpp/http/io/stream.h> + +#include <util/memory/blob.h> + + +namespace NYql { +namespace NHttp { + +/////////////////////////////////////////////////////////////////////////////// +// THttpError +/////////////////////////////////////////////////////////////////////////////// +class THttpError: public yexception +{ +public: + inline THttpError(HttpCodes code) + : Code_(code) + { + } + + inline HttpCodes GetCode() const { + return Code_; + } + +private: + HttpCodes Code_; +}; + +/////////////////////////////////////////////////////////////////////////////// +// TRequest & TResponse +/////////////////////////////////////////////////////////////////////////////// +struct TRequest +{ + const THttpInput& Input; + const TServerRequestData& RD; + const TBlob& Body; +}; + +struct TResponse +{ + HttpCodes Code; + THttpHeaders Headers; + TBlob Body; + TString ContentType; + + inline TResponse() + : Code(HTTP_OK) + , ContentType(TStringBuf("text/plain")) + { + } + + void OutTo(IOutputStream& out) const; +}; + +/////////////////////////////////////////////////////////////////////////////// +// IServlet +/////////////////////////////////////////////////////////////////////////////// +class IServlet: private TNonCopyable +{ +public: + virtual ~IServlet() = default; + + virtual void DoGet(const TRequest&, TResponse& resp) const { + resp.Code = HttpCodes::HTTP_NOT_IMPLEMENTED; + } + + virtual void DoPost(const TRequest&, TResponse& resp) const { + resp.Code = HttpCodes::HTTP_NOT_IMPLEMENTED; + } +}; + +} // namspace NNttp +} // namspace NYql + diff --git a/ydb/library/yql/tools/yqlrun/http/ya.make b/ydb/library/yql/tools/yqlrun/http/ya.make new file mode 100644 index 0000000000..77c20460fa --- /dev/null +++ b/ydb/library/yql/tools/yqlrun/http/ya.make @@ -0,0 +1,37 @@ +LIBRARY() + +SRCS( + assets_servlet.cpp + server.cpp + servlet.cpp + yql_functions_servlet.cpp + yql_servlet.cpp + yql_server.cpp +) + +PEERDIR( + library/cpp/charset + library/cpp/http/misc + library/cpp/http/server + library/cpp/json + library/cpp/logger + library/cpp/mime/types + library/cpp/openssl/io + library/cpp/string_utils/quote + library/cpp/uri + library/cpp/yson + library/cpp/yson/node + ydb/library/yql/core/facade + ydb/library/yql/core/type_ann + ydb/library/yql/providers/dq/provider + ydb/library/yql/providers/result/provider + ydb/library/yql/parser/pg_wrapper + ydb/library/yql/sql/v1/format + ydb/library/yql/providers/yt/gateway/file + ydb/library/yql/providers/yt/provider + ydb/library/yql/core/url_preprocessing +) + +YQL_LAST_ABI_VERSION() + +END() diff --git a/ydb/library/yql/tools/yqlrun/http/yql_functions_servlet.cpp b/ydb/library/yql/tools/yqlrun/http/yql_functions_servlet.cpp new file mode 100644 index 0000000000..8a20dc44cf --- /dev/null +++ b/ydb/library/yql/tools/yqlrun/http/yql_functions_servlet.cpp @@ -0,0 +1,59 @@ +#include "yql_functions_servlet.h" + +#include <ydb/library/yql/providers/yt/provider/yql_yt_provider.h> +#include <ydb/library/yql/providers/config/yql_config_provider.h> +#include <ydb/library/yql/providers/result/provider/yql_result_provider.h> +#include <ydb/library/yql/core/type_ann/type_ann_core.h> + + +namespace { + +void OutputKnownFunctions(TStringStream& out) +{ + using namespace NYql; + + out << + "window.yql = {};\n" + "window.yql.builtinFunctions = ["; + + const auto& builtinFuncs = GetBuiltinFunctions(); + for (const auto& func: builtinFuncs) { + out << '"' << func << "\","; + } + out << "];\n"; + + out << "window.yql.supportedFunctions = ["; + + const THashSet<TStringBuf>* functionsSet[] = { + &YtDataSourceFunctions(), + &YtDataSinkFunctions(), + &ResultProviderFunctions(), + &ConfigProviderFunctions() + }; + + for (const auto& funcs: functionsSet) { + for (const auto& func: *funcs) { + out << '"' << func << "\","; + } + } + + out << "];"; +} + +} // namspace + +namespace NYql { +namespace NHttp { + +void TYqlFunctoinsServlet::DoGet(const TRequest& req, TResponse& resp) const +{ + Y_UNUSED(req); + + TStringStream out; + OutputKnownFunctions(out); + resp.Body = TBlob::FromString(out.Str()); + resp.Headers.AddHeader(THttpInputHeader("Content-Type: text/javascript")); +} + +} // namspace NNttp +} // namspace NYql diff --git a/ydb/library/yql/tools/yqlrun/http/yql_functions_servlet.h b/ydb/library/yql/tools/yqlrun/http/yql_functions_servlet.h new file mode 100644 index 0000000000..2dc0069e87 --- /dev/null +++ b/ydb/library/yql/tools/yqlrun/http/yql_functions_servlet.h @@ -0,0 +1,19 @@ +#pragma once + +#include "servlet.h" + + +namespace NYql { +namespace NHttp { + +/////////////////////////////////////////////////////////////////////////////// +// TYqlFunctoinsServlet +/////////////////////////////////////////////////////////////////////////////// +class TYqlFunctoinsServlet: public IServlet +{ +public: + void DoGet(const TRequest& req, TResponse& resp) const override final; +}; + +} // namspace NNttp +} // namspace NYql diff --git a/ydb/library/yql/tools/yqlrun/http/yql_server.cpp b/ydb/library/yql/tools/yqlrun/http/yql_server.cpp new file mode 100644 index 0000000000..6c7cbbff6e --- /dev/null +++ b/ydb/library/yql/tools/yqlrun/http/yql_server.cpp @@ -0,0 +1,651 @@ +#include "yql_server.h" + +#include <ydb/library/yql/tools/yqlrun/gateway_spec.h> + +#include <ydb/library/yql/providers/common/proto/gateways_config.pb.h> +#include <ydb/library/yql/providers/common/provider/yql_provider_names.h> +#include <ydb/library/yql/providers/common/comp_nodes/yql_factory.h> +#include <ydb/library/yql/providers/dq/provider/yql_dq_provider.h> +#include <yql/providers/rtmr/gateway/yql_rtmr_dummy_gateway.h> +#include <yql/providers/rtmr/provider/yql_rtmr_provider.h> +#include <ydb/library/yql/providers/yt/common/yql_names.h> +#include <ydb/library/yql/providers/yt/gateway/file/yql_yt_file.h> +#include <ydb/library/yql/providers/yt/gateway/file/yql_yt_file_services.h> +#include <ydb/library/yql/providers/yt/provider/yql_yt_provider_impl.h> +#include <ydb/library/yql/core/url_preprocessing/url_preprocessing.h> +#include <ydb/library/yql/core/peephole_opt/yql_opt_peephole_physical.h> +#include <ydb/library/yql/minikql/comp_nodes/mkql_factories.h> +#include <ydb/library/yql/parser/pg_wrapper/interface/comp_factory.h> +#include <ydb/library/yql/sql/v1/format/sql_format.h> + +#include <ydb/library/yql/utils/log/log.h> +#include <ydb/library/yql/utils/log/tls_backend.h> +#include <ydb/library/yql/core/services/yql_out_transformers.h> +#include <ydb/library/yql/utils/utf8.h> + +#include <library/cpp/logger/stream.h> +#include <library/cpp/yson/node/node_io.h> +#include <library/cpp/yson/node/node_builder.h> +#include <library/cpp/openssl/io/stream.h> +#include <library/cpp/charset/ci_string.h> +#include <library/cpp/yson/parser.h> +#include <library/cpp/string_utils/quote/quote.h> + +#include <google/protobuf/arena.h> + +#include <util/system/fstat.h> +#include <util/system/tempfile.h> +#include <util/string/escape.h> + +namespace NYql { +namespace NHttp { + +namespace { + +#ifdef _unix_ +static volatile sig_atomic_t Terminated = 0; + +void OnTerminate(int) +{ + Terminated = 1; +} +#endif + +class TTempLogRedirector: private NLog::TScopedBackend<TStreamLogBackend> { + using TBase = NLog::TScopedBackend<TStreamLogBackend>; + +public: + TTempLogRedirector(IOutputStream* redirectTo) + : TBase(redirectTo) + { + } +}; + +class TLogLevelPromouter { +public: + TLogLevelPromouter(bool promote) { + PrevLevelCore = NLog::YqlLogger().GetComponentLevel(NLog::EComponent::Core); + PrevLevelEval = NLog::YqlLogger().GetComponentLevel(NLog::EComponent::CoreEval); + PrevLevelPeepHole = NLog::YqlLogger().GetComponentLevel(NLog::EComponent::CorePeepHole); + + if (promote) { + NLog::YqlLogger().SetComponentLevel(NLog::EComponent::Core, NLog::ELevel::TRACE); + NLog::YqlLogger().SetComponentLevel(NLog::EComponent::CoreEval, NLog::ELevel::TRACE); + NLog::YqlLogger().SetComponentLevel(NLog::EComponent::CorePeepHole, NLog::ELevel::TRACE); + } + } + + ~TLogLevelPromouter() { + NLog::YqlLogger().SetComponentLevel(NLog::EComponent::Core, PrevLevelCore); + NLog::YqlLogger().SetComponentLevel(NLog::EComponent::CoreEval, PrevLevelEval); + NLog::YqlLogger().SetComponentLevel(NLog::EComponent::CorePeepHole, PrevLevelPeepHole); + } + +private: + NLog::ELevel PrevLevelCore; + NLog::ELevel PrevLevelEval; + NLog::ELevel PrevLevelPeepHole; +}; + + +class TPeepHolePipelineConfigurator : public IPipelineConfigurator, TLogLevelPromouter { +public: + TPeepHolePipelineConfigurator(bool promote) + : TLogLevelPromouter(promote) + { + } + + void AfterCreate(TTransformationPipeline* pipeline) const final { + Y_UNUSED(pipeline); + } + + void AfterTypeAnnotation(TTransformationPipeline* pipeline) const final { + pipeline->Add(TExprLogTransformer::Sync("OptimizedExpr", NLog::EComponent::Core, NLog::ELevel::TRACE), + "OptTrace", TIssuesIds::CORE, "OptTrace"); + } + + void AfterOptimize(TTransformationPipeline* pipeline) const final { + pipeline->Add(CreateTYtWideFlowTransformer(nullptr), "WideFlow"); + pipeline->Add(MakePeepholeOptimization(pipeline->GetTypeAnnotationContext()), "PeepHole"); + } +}; + +class TOptPipelineConfigurator : public IPipelineConfigurator, TLogLevelPromouter { +public: + TOptPipelineConfigurator(TProgramPtr prg, IOutputStream* stream) + : TLogLevelPromouter(!!stream) + , Program(std::move(prg)) + , Stream(stream) + { + } + + void AfterCreate(TTransformationPipeline* pipeline) const final { + Y_UNUSED(pipeline); + } + + void AfterTypeAnnotation(TTransformationPipeline* pipeline) const final { + if (Stream) { + pipeline->Add(TExprLogTransformer::Sync("OptimizedExpr", NLog::EComponent::Core, NLog::ELevel::TRACE), + "OptTrace", TIssuesIds::CORE, "OptTrace"); + } + } + + void AfterOptimize(TTransformationPipeline* pipeline) const final { + pipeline->Add(TPlanOutputTransformer::Sync(Stream, Program->GetPlanBuilder(), Program->GetOutputFormat()), "PlanOutput"); + } +private: + TProgramPtr Program; + IOutputStream* Stream; +}; + +NSQLTranslation::TTranslationSettings GetTranslationSettings(const THolder<TGatewaysConfig>& gatewaysConfig) { + static const THashMap<TString, TString> clusters = { + { "plato", TString(YtProviderName) }, + { "plato_rtmr", TString(RtmrProviderName) } + }; + + NSQLTranslation::TTranslationSettings settings; + settings.ClusterMapping = clusters; + settings.SyntaxVersion = 1; + settings.InferSyntaxVersion = true; + settings.V0Behavior = NSQLTranslation::EV0Behavior::Report; + if (gatewaysConfig && gatewaysConfig->HasSqlCore()) { + settings.Flags.insert(gatewaysConfig->GetSqlCore().GetTranslationFlags().begin(), gatewaysConfig->GetSqlCore().GetTranslationFlags().end()); + } + return settings; +} + +void SetupProgram(TProgram& prg, const TString& program) { + Y_UNUSED(program); + prg.SetValidateOptions(NKikimr::NUdf::EValidateMode::Greedy); + prg.EnableResultPosition(); +} + +struct TTableFileHolder { + TTempFile Main; + TTempFile Attr; + + TTableFileHolder(const TString& path) + : Main(path) + , Attr(path + ".attr") + {} +}; + +TProgramPtr MakeFileProgram(const TString& program, TYqlServer& yqlServer, + const THashMap<TString, TString>& tables, const THashMap<std::pair<TString, TString>, TVector<std::pair<TString, TString>>>& rtmrTableAttributes) { + + TVector<TDataProviderInitializer> dataProvidersInit; + + auto ytNativeServices = NFile::TYtFileServices::Make(yqlServer.FunctionRegistry, tables, yqlServer.FileStorage); + auto ytNativeGateway = CreateYtFileGateway(ytNativeServices); + + auto dqCompFactory = NKikimr::NMiniKQL::GetCompositeWithBuiltinFactory({ + NKikimr::NMiniKQL::GetYqlFactory(), + GetPgFactory() + }); + + dataProvidersInit.push_back(GetDqDataProviderInitializer([](const TDqStatePtr&){ + return new TNullTransformer; + }, {}, dqCompFactory, {}, yqlServer.FileStorage)); + dataProvidersInit.push_back(GetYtNativeDataProviderInitializer(ytNativeGateway)); + + ExtProviderSpecific(yqlServer.FunctionRegistry, dataProvidersInit, rtmrTableAttributes); + + TProgramFactory programFactory( + true, + yqlServer.FunctionRegistry, + yqlServer.NextUniqueId, + dataProvidersInit, + "yqlrun"); + + programFactory.AddUserDataTable(yqlServer.FilesMapping); + programFactory.SetModules(yqlServer.Modules); + programFactory.SetUdfResolver(yqlServer.UdfResolver); + programFactory.SetUdfIndex(yqlServer.UdfIndex, new TUdfIndexPackageSet()); + programFactory.SetFileStorage(yqlServer.FileStorage); + programFactory.EnableRangeComputeFor(); + programFactory.SetGatewaysConfig(yqlServer.GatewaysConfig.Get()); + if (yqlServer.GatewaysConfig && yqlServer.GatewaysConfig->HasFs()) { + programFactory.SetUrlPreprocessing(new NYql::TUrlPreprocessing(*yqlServer.GatewaysConfig)); + } + + auto prg = programFactory.Create("-stdin-", program); + SetupProgram(*prg, program); + return prg; +} + +TProgramPtr MakeFileProgram(const TString& program, const TString& input, const TString& attr, + TAutoPtr<TTableFileHolder>& inputFile, TTempFile& outputFile, TYqlServer& yqlServer) { + TString cluster = "plato"; + + THashMap<TString, TString> tables; + inputFile.Reset(new TTableFileHolder(MakeTempName())); + TFile mainFile(inputFile->Main.Name(), CreateAlways | RdWr); + TFile attrFile(inputFile->Attr.Name(), CreateAlways | RdWr); + mainFile.Write(input.data(), input.size()); + attrFile.Write(attr.data(), attr.size()); + mainFile.Close(); + attrFile.Close(); + tables[TString(YtProviderName).append('.').append(cluster).append(TStringBuf(".Input"))] = inputFile->Main.Name(); + tables[TString(YtProviderName).append('.').append(cluster).append(TStringBuf(".Output"))] = outputFile.Name(); + NFs::Remove(outputFile.Name()); + + THashMap<std::pair<TString, TString>, TVector<std::pair<TString, TString>>> rtmrTableAttributes; + auto node = NYT::NodeFromYsonString(attr); + if (node.IsMap() && node.HasKey(YqlRowSpecAttribute)) { + rtmrTableAttributes[std::make_pair("plato_rtmr", "Input")] = {{"_yql_row_spec", NYT::NodeToYsonString(node[YqlRowSpecAttribute])}}; + } + + return MakeFileProgram(program, yqlServer, tables, rtmrTableAttributes); +} + +YQL_ACTION(Paste) + void Perform(const TString& program, const TString& input, const TString& attr, ui32 options, const TString& parameters) { + Y_UNUSED(input); + Y_UNUSED(attr); + Y_UNUSED(options); + Y_UNUSED(parameters); + + const static TString pasteHost(TStringBuf("paste.yandex-team.ru")); + + TSocket s(TNetworkAddress(pasteHost, 443)); + TSocketOutput so(s); + TSocketInput si(s); + TOpenSslClientIO ssl(&si, &so); + + { + THttpOutput output(&ssl); + + TStringBuf data = "syntax=yql&text="; + TString quotedProgram(program); + Quote(quotedProgram); + + output << TStringBuf("POST / HTTP/1.1\r\n") + << TStringBuf("Host: ") << pasteHost << TStringBuf("\r\n") + << TStringBuf("Content-Type: application/x-www-form-urlencoded\r\n") + << TStringBuf("Content-Length: ") << (data.size() + quotedProgram.size()) + << TStringBuf("\r\n\r\n") + << data << quotedProgram + << TStringBuf("\r\n"); + + output.Finish(); + } + + { + THttpInput input(&ssl); + unsigned httpCode = ParseHttpRetCode(input.FirstLine()); + Cout << "return code: " << httpCode << Endl; + + for (auto i = input.Headers().Begin(), e = input.Headers().End(); i != e; ++i) { + if (0 == TCiString::compare(i->Name(), TStringBuf("location"))) { + Writer.Write(TStringBuf("location"), i->Value()); + return; + } + } + } + + ythrow yexception() << "Unknown redirect location"; + } +}; + +YQL_ACTION(Format) + void Perform(const TString& program, const TString& input, const TString& attr, ui32 options, const TString& parameters) { + Y_UNUSED(input); + Y_UNUSED(attr); + Y_UNUSED(options); + Y_UNUSED(parameters); + google::protobuf::Arena arena; + NSQLTranslation::TTranslationSettings settings; + settings.Arena = &arena; + auto formatter = NSQLFormat::MakeSqlFormatter(settings); + TString frm_query; + TString error; + NYql::TIssues issues; + if (!formatter->Format(program, frm_query, issues)) { + WriteStatus(false, issues); + } else { + Writer.Write(TStringBuf("sql"), frm_query); + } + } +}; + +/////////////////////////////////////////////////////////////////////////////// +// parse action +/////////////////////////////////////////////////////////////////////////////// +YQL_ACTION(Parse) + void Perform(const TString& program, const TString& input, const TString& attr, ui32 options, const TString& parameters) { + Y_UNUSED(input); + Y_UNUSED(attr); + Y_UNUSED(parameters); + TProgramPtr prg = MakeFileProgram(program, YqlServer, {}, {}); + + bool parsed = (options & TYqlAction::YqlProgram) + ? prg->ParseYql() + : prg->ParseSql(GetTranslationSettings(YqlServer.GatewaysConfig)); + + if (parsed) { + ui32 prettyFlg = TAstPrintFlags::PerLine | TAstPrintFlags::ShortQuote; + Writer.Write(TStringBuf("expr"), prg->AstRoot()->ToString(prettyFlg)); + + if (options & EOptions::PrintAst) { + Writer.Write(TStringBuf("ast")); + WriteAstTree(prg->AstRoot()); + } + } + + WriteStatus(parsed, prg->Issues()); + } +}; + +/////////////////////////////////////////////////////////////////////////////// +// compile action +/////////////////////////////////////////////////////////////////////////////// +YQL_ACTION(Compile) + void Perform(const TString& program, const TString& input, const TString& attr, ui32 options, const TString& parameters) { + Y_UNUSED(input); + Y_UNUSED(attr); + TProgramPtr prg = MakeFileProgram(program, YqlServer, {}, {}); + prg->SetParametersYson(parameters); + + bool noError = (options & TYqlAction::YqlProgram) ? prg->ParseYql() : prg->ParseSql(GetTranslationSettings(YqlServer.GatewaysConfig)); + noError = noError && prg->Compile(GetUsername()); + + if (options & (EOptions::PrintAst | EOptions::PrintExpr)) { + if (prg->ExprRoot()) { + auto ast = ConvertToAst(*prg->ExprRoot(), prg->ExprCtx(), TExprAnnotationFlags::None, true); + + if (options & EOptions::PrintAst) { + Writer.Write(TStringBuf("ast")); + WriteAstTree(ast.Root); + } + + if (options & EOptions::PrintExpr) { + ui32 prettyFlg = TAstPrintFlags::PerLine | TAstPrintFlags::ShortQuote; + Writer.Write(TStringBuf("expr"), ast.Root->ToString(prettyFlg)); + } + } + } + + WriteStatus(noError, prg->Issues()); + } +}; + +/////////////////////////////////////////////////////////////////////////////// +// optimize, validate and peephole actions +/////////////////////////////////////////////////////////////////////////////// +YQL_ACTION(OptimizeOrValidateFile) + void Perform(const TString& program, const TString& input, const TString& attr, ui32 options, const TString& parameters) { + TAutoPtr<TTableFileHolder> inputFile; + TTempFile outputFile(MakeTempName()); + TTempFile outputFileAttr(outputFile.Name() + ".attr"); + TProgramPtr prg = MakeFileProgram(program, input, attr, inputFile, outputFile, YqlServer); + + bool noError = (options & TYqlAction::YqlProgram) ? prg->ParseYql() : prg->ParseSql(GetTranslationSettings(YqlServer.GatewaysConfig)); + + prg->SetParametersYson(parameters); + prg->SetDiagnosticFormat(NYson::EYsonFormat::Pretty); + THolder<TStringStream> traceOut; + THolder<TTempLogRedirector> logRedirector; + if (options & EOptions::PrintTraceOpt) { + traceOut.Reset(new TStringStream); + logRedirector.Reset(new TTempLogRedirector(traceOut.Get())); + } + + noError = noError && prg->Compile(GetUsername()); + if (noError) { + TProgram::TStatus status = TProgram::TStatus::Error; + auto name = TStringBuf(Req.RD.ScriptName()); + if (name.Contains(TStringBuf("/optimize"))) { + auto config = TOptPipelineConfigurator(prg, traceOut.Get()); + status = prg->OptimizeWithConfig(GetUsername(), config); + } else if (name.Contains(TStringBuf("/validate"))) { + status = prg->Validate(GetUsername()); + } else if (name.Contains(TStringBuf("/peephole"))) { + auto config = TPeepHolePipelineConfigurator(options & EOptions::PrintTraceOpt); + status = prg->OptimizeWithConfig(GetUsername(), config); + } + noError = status == TProgram::TStatus::Ok; + } + + if (options & (EOptions::PrintAst | EOptions::PrintExpr)) { + if (prg->ExprRoot()) { + auto ast = ConvertToAst(*prg->ExprRoot(), prg->ExprCtx(), TExprAnnotationFlags::None, true); + + if (options & EOptions::PrintAst) { + Writer.Write(TStringBuf("ast")); + WriteAstTree(ast.Root); + } + + if (options & EOptions::PrintExpr) { + ui32 prettyFlg = TAstPrintFlags::PerLine | TAstPrintFlags::ShortQuote; + Writer.Write(TStringBuf("expr"), ast.Root->ToString(prettyFlg)); + } + } + } + + auto diagnostics = prg->GetDiagnostics(); + if (diagnostics) { + Cerr << *diagnostics; + } + + if (!!traceOut && !traceOut->Str().empty()) { + if (diagnostics) { + traceOut->Write(*diagnostics); + } + + Writer.Write(TStringBuf("opttrace"), traceOut->Str()); + } + WriteStatus(noError, prg->Issues()); + } +}; + + +/////////////////////////////////////////////////////////////////////////////// +// run actions +/////////////////////////////////////////////////////////////////////////////// +YQL_ACTION(FileRun) + void Perform(const TString& program, const TString& input, const TString& attr, ui32 options, const TString& parameters) { + auto name = TStringBuf(Req.RD.ScriptName()); + TAutoPtr<TTableFileHolder> inputFile; + TTempFile outputFile(MakeTempName()); + TTempFile outputFileAttr(outputFile.Name() + ".attr"); + TProgramPtr prg = MakeFileProgram(program, input, attr, inputFile, outputFile, YqlServer); + + bool noError = (options & TYqlAction::YqlProgram) ? prg->ParseYql() : prg->ParseSql(GetTranslationSettings(YqlServer.GatewaysConfig)); + + prg->SetDiagnosticFormat(NYson::EYsonFormat::Pretty); + prg->SetParametersYson(parameters); + THolder<TStringStream> traceOut; + THolder<TTempLogRedirector> logRedirector; + if (options & EOptions::PrintTraceOpt) { + traceOut.Reset(new TStringStream); + logRedirector.Reset(new TTempLogRedirector(traceOut.Get())); + } + + noError = noError && prg->Compile(GetUsername()); + TProgram::TStatus status = TProgram::TStatus::Error; + if (noError) { + auto config = TOptPipelineConfigurator(prg, traceOut.Get()); + if (name.Contains(TStringBuf("/lineage"))) { + status = prg->LineageWithConfig(GetUsername(), config); + } else { + status = prg->RunWithConfig(GetUsername(), config); + } + } + if (options & (EOptions::PrintAst | EOptions::PrintExpr)) { + if (prg->ExprRoot()) { + auto ast = ConvertToAst(*prg->ExprRoot(), prg->ExprCtx(), TExprAnnotationFlags::None, true); + + if (options & EOptions::PrintAst) { + Writer.Write(TStringBuf("ast")); + WriteAstTree(ast.Root); + } + + if (options & EOptions::PrintExpr) { + ui32 prettyFlg = TAstPrintFlags::PerLine | TAstPrintFlags::ShortQuote; + Writer.Write(TStringBuf("expr"), ast.Root->ToString(prettyFlg)); + } + } + } + + auto diagnostics = prg->GetDiagnostics(); + if (diagnostics) { + Cerr << *diagnostics; + } + + if (!!traceOut && !traceOut->Str().empty()) { + if (diagnostics) { + traceOut->Write(*diagnostics); + } + + Writer.Write(TStringBuf("opttrace"), traceOut->Str()); + } + + WriteStatus(status != TProgram::TStatus::Error, prg->Issues()); + + if (status != TProgram::TStatus::Error) { + // write output + Writer.Write(TStringBuf("output")); + Writer.OpenMap(); + if (TFileStat(outputFile.Name()).IsFile()) { + TFileInput fileInput(outputFile.Name()); + + NYT::TNode list = NYT::TNode::CreateList(); + NYT::TNodeBuilder builder(&list); + NYson::TYsonParser parser(&builder, &fileInput, ::NYson::EYsonType::ListFragment); + parser.Parse(); + + std::set<TString> headers; + for (auto& row: list.AsList()) { + for (auto& val: row.AsMap()) { + headers.insert(val.first); + } + } + { // headers + Writer.Write(TStringBuf("headers")); + Writer.OpenArray(); + for (const auto& header : headers) { + Writer.Write(header); + } + Writer.CloseArray(); + } + + { // rows + Writer.Write(TStringBuf("rows")); + Writer.OpenArray(); + for (auto& row: list.AsList()) { + Writer.OpenArray(); + for (const auto& header : headers) { + if (auto p = row.AsMap().FindPtr(header)) { + if (p->IsString()) { + const auto& str = p->AsString(); + Writer.Write(IsUtf8(str) ? str : EscapeC(str)); + } else { + Writer.Write(NYT::NodeToYsonString(*p, NYson::EYsonFormat::Text)); + } + } else { + Writer.Write(TString()); + } + } + Writer.CloseArray(); + } + Writer.CloseArray(); + } + } + + Writer.CloseMap(); + } + + if (name.Contains(TStringBuf("/lineage"))) { + if (auto data = prg->GetLineage()) { + TString str; + TStringOutput out(str); + TStringInput in(*data); + NYson::ReformatYsonStream(&in, &out, NYson::EYsonFormat::Pretty); + Writer.Write(TStringBuf("results"), str); + } + } else { + Writer.Write(TStringBuf("results"), prg->ResultsAsString()); + } + } +}; + +} // namespace + +void TYqlServer::Start() +{ +#ifdef _unix_ + ShutdownOn(SIGINT); + ShutdownOn(SIGTERM); +#endif + + bool started = HttpServer.Start(); + if (!started) { + ythrow yexception() << "YqlServer not started. Error: " + << HttpServer.GetErrorCode() + << ": " << HttpServer.GetError(); + } +} + +void TYqlServer::ShutdownOn(int signal) +{ +#ifdef _unix_ + struct sigaction sa = {}; + sa.sa_handler = OnTerminate; + sigfillset(&sa.sa_mask); // block every signal during the handler + + if (sigaction(signal, &sa, nullptr) < 0) { + ythrow yexception() << "Error: cannot handle signal " << signal; + } +#else + Y_UNUSED(signal); +#endif +} + +void TYqlServer::Wait() +{ +#ifdef _unix_ + while (!Terminated) { + sleep(1); + } +#else + HttpServer.Wait(); +#endif +} + +TAutoPtr<TYqlServer> CreateYqlServer( + TServerConfig config, + const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry, + TUdfIndex::TPtr udfIndex, + ui64 nextUniqueId, + TUserDataTable filesMapping, + THolder<TGatewaysConfig>&& gatewaysConfig, + IModuleResolver::TPtr modules, + IUdfResolver::TPtr udfResolver, + TFileStoragePtr fileStorage) +{ + TAutoPtr<TYqlServer> server = new TYqlServer( + config, functionRegistry, udfIndex, nextUniqueId, + std::move(filesMapping), std::move(gatewaysConfig), modules, udfResolver, fileStorage); + + server->RegisterAction<TYqlActionPaste>("/api/yql/paste"); + server->RegisterAction<TYqlActionParse>("/api/yql/parse"); + server->RegisterAction<TYqlActionCompile>("/api/yql/compile"); + server->RegisterAction<TYqlActionFormat>("/api/yql/format"); + server->RegisterAction<TYqlActionOptimizeOrValidateFile>("/api/yql/validate"); + server->RegisterAction<TYqlActionOptimizeOrValidateFile>("/api/yql/optimize"); + server->RegisterAction<TYqlActionOptimizeOrValidateFile>("/api/yql/peephole"); + + server->RegisterServlet("/js/yql-functions.js", new TYqlFunctoinsServlet()); + + server->RegisterAction<TYqlActionFileRun>("/api/yql/lineage"); + server->RegisterAction<TYqlActionFileRun>("/api/yql/run"); + + server->RegisterServlet("/", + new TAssetsServlet("/", config.GetAssetsPath(), "file-index.html")); + + return server; +} + +} // namspace NHttp +} // namspace NYql diff --git a/ydb/library/yql/tools/yqlrun/http/yql_server.h b/ydb/library/yql/tools/yqlrun/http/yql_server.h new file mode 100644 index 0000000000..e90aebc1c0 --- /dev/null +++ b/ydb/library/yql/tools/yqlrun/http/yql_server.h @@ -0,0 +1,92 @@ +#pragma once + +#include "server.h" +#include "assets_servlet.h" +#include "yql_servlet.h" +#include "yql_functions_servlet.h" + +#include <ydb/library/yql/core/facade/yql_facade.h> +#include <ydb/library/yql/core/yql_csv.h> +#include <ydb/library/yql/core/yql_type_annotation.h> +#include <ydb/library/yql/minikql/mkql_function_registry.h> + +#include <util/stream/file.h> +#include <util/system/user.h> +#include <util/system/tempfile.h> + + +namespace NYql { +namespace NHttp { + +enum class EDataSource { + FILE, + YAMR, + YT +}; + +/////////////////////////////////////////////////////////////////////////////// +// TYqlServer +/////////////////////////////////////////////////////////////////////////////// +class TYqlServer: private TNonCopyable +{ +public: + inline TYqlServer( + const TServerConfig& config, + const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry, + TUdfIndex::TPtr udfIndex, + ui64 nextUniqueId, + TUserDataTable filesMapping, + THolder<TGatewaysConfig>&& gatewaysConfig, + IModuleResolver::TPtr modules, + IUdfResolver::TPtr udfResolver, + TFileStoragePtr fileStorage) + : HttpServer(config) + , FunctionRegistry(functionRegistry) + , UdfIndex(udfIndex) + , NextUniqueId(nextUniqueId) + , FilesMapping(std::move(filesMapping)) + , GatewaysConfig(std::move(gatewaysConfig)) + , Modules(modules) + , UdfResolver(udfResolver) + , FileStorage(fileStorage) + { + } + + template <typename TAction> + void RegisterAction(const TString& path) { + RegisterServlet(path, new TYqlServlet<TAction>(*this)); + } + + void RegisterServlet(const TString& path, TAutoPtr<IServlet> sp) { + HttpServer.RegisterServlet(path, sp); + } + + void ShutdownOn(int signal); + void Start(); + void Wait(); + +public: + TServer HttpServer; + const NKikimr::NMiniKQL::IFunctionRegistry* FunctionRegistry; + TUdfIndex::TPtr UdfIndex; + ui64 NextUniqueId; + TUserDataTable FilesMapping; + const THolder<TGatewaysConfig> GatewaysConfig; + IModuleResolver::TPtr Modules; + IUdfResolver::TPtr UdfResolver; + TFileStoragePtr FileStorage; +}; + +TAutoPtr<TYqlServer> CreateYqlServer( + TServerConfig config, + const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry, + TUdfIndex::TPtr udfIndex, + ui64 nextUniqueId, + TUserDataTable filesMapping, + THolder<TGatewaysConfig>&& gatewaysConfig, + IModuleResolver::TPtr modules = nullptr, + IUdfResolver::TPtr udfResolver = nullptr, + TFileStoragePtr fileStorage = nullptr); + +} // namspace NHttp +} // namspace NYql diff --git a/ydb/library/yql/tools/yqlrun/http/yql_servlet.cpp b/ydb/library/yql/tools/yqlrun/http/yql_servlet.cpp new file mode 100644 index 0000000000..0ba7ebefce --- /dev/null +++ b/ydb/library/yql/tools/yqlrun/http/yql_servlet.cpp @@ -0,0 +1,69 @@ +#include "yql_servlet.h" + +#include <ydb/library/yql/public/issue/yql_issue.h> + +#include <util/string/builder.h> + + +namespace NYql { +namespace NHttp { + +void TYqlAction::WriteStatus(bool success, const TIssues& errors) const { + Writer.Write("succeeded", success); + static const TVector<TString> issueQueue = {"errors", "warnings", "infos"}; + static const THashMap<NYql::TSeverityIds::ESeverityId, TStringBuf> severityMap = { + {TSeverityIds::ESeverityId::TSeverityIds_ESeverityId_S_FATAL, "errors"}, + {TSeverityIds::ESeverityId::TSeverityIds_ESeverityId_S_ERROR, "errors"}, + {TSeverityIds::ESeverityId::TSeverityIds_ESeverityId_S_WARNING, "warnings"}, + {TSeverityIds::ESeverityId::TSeverityIds_ESeverityId_S_INFO, "infos"} + }; + + auto writeIssues = [this](const TString& severity, const TIssues& errors) { + Writer.Write(severity); + Writer.OpenArray(); + + for (const auto& topIssue: errors) { + if (severity != severityMap.at(topIssue.Severity)) { + continue; + } + WalkThroughIssues(topIssue, false, [this, severity](const TIssue& issue, ui16 level) { + TStringBuilder sb; + sb << TString(level, '>') << issue; + Cerr << sb << Endl; + Writer.Write(sb); + }); + } + + Writer.CloseArray(); + }; + + for (auto severety: issueQueue) { + writeIssues(severety, errors); + } +} + +void TYqlAction::WriteAstTree(const TAstNode* node) { + if (node == nullptr) return; + + Writer.OpenMap(); + if (node->IsAtom()) { + Writer.Write(TStringBuf("type"), TStringBuf("atom")); + Writer.Write(TStringBuf("content"), node->GetContent()); + Writer.Write(TStringBuf("flags"), node->GetFlags()); + } else if (node->IsList()) { + Writer.Write(TStringBuf("type"), TStringBuf("list")); + Writer.Write(TStringBuf("content"), TStringBuf("( )")); + if (node->GetChildrenCount() > 0) { + Writer.Write(TStringBuf("children")); + Writer.OpenArray(); + for (ui32 index = 0; index < node->GetChildrenCount(); ++index) { + WriteAstTree(node->GetChild(index)); + } + Writer.CloseArray(); + } + } + Writer.CloseMap(); +} + +} // namspace NNttp +} // namspace NYql diff --git a/ydb/library/yql/tools/yqlrun/http/yql_servlet.h b/ydb/library/yql/tools/yqlrun/http/yql_servlet.h new file mode 100644 index 0000000000..d6e65fa31b --- /dev/null +++ b/ydb/library/yql/tools/yqlrun/http/yql_servlet.h @@ -0,0 +1,127 @@ +#pragma once + +#include "servlet.h" + +#include <ydb/library/yql/ast/yql_errors.h> +#include <ydb/library/yql/ast/yql_ast.h> + +#include <library/cpp/json/json_reader.h> +#include <library/cpp/json/json_writer.h> + + +#define YQL_ACTION(action) \ + class TYqlAction##action: public ::NYql::NHttp::TYqlAction { \ + public: \ + TYqlAction##action( \ + ::NYql::NHttp::TYqlServer& yqlServer, \ + ::NJson::TJsonWriter& writer, \ + const ::NYql::NHttp::TRequest& req, \ + ::NYql::NHttp::TResponse& resp) \ + : ::NYql::NHttp::TYqlAction(yqlServer, writer, req, resp) {} \ + +namespace NYql { +namespace NHttp { + +class TYqlServer; + +/////////////////////////////////////////////////////////////////////////////// +// TYqlAction +/////////////////////////////////////////////////////////////////////////////// +class TYqlAction: private TNonCopyable +{ +public: + enum EOptions { + YqlProgram = 0x01, + SqlProgram = 0x02, + + PrintAst = 0x0100, + PrintExpr = 0x0200, + PrintTraceOpt = 0x0400, + }; + +public: + TYqlAction( + TYqlServer& yqlServer, + NJson::TJsonWriter& writer, + const TRequest& req, + TResponse& resp) + : YqlServer(yqlServer) + , Writer(writer) + , Req(req) + , Resp(resp) + { + } + +protected: + void WriteStatus(bool success, const TIssues& errors) const; + void WriteAstTree(const TAstNode* node); + +protected: + TYqlServer& YqlServer; + NJson::TJsonWriter& Writer; + const TRequest& Req; + TResponse& Resp; +}; + +/////////////////////////////////////////////////////////////////////////////// +// TYqlServlet +/////////////////////////////////////////////////////////////////////////////// +template <typename TAction> +class TYqlServlet: public IServlet +{ +public: + TYqlServlet(TYqlServer& yqlServer) + : YqlServer_(yqlServer) + { + } + + void DoPost(const TRequest& req, TResponse& resp) const override final { + NJson::TJsonValue value; + TStringBuf body(req.Body.AsCharPtr(), req.Body.Size()); + bool parsed = NJson::ReadJsonFastTree(body, &value, true); + Y_ENSURE_EX(parsed, THttpError(HTTP_BAD_REQUEST) << "can't parse json"); + + const TString& program = value[TStringBuf("program")].GetString(); + const TString& input = value[TStringBuf("tableInput")].GetString(); + const TString& attr = value[TStringBuf("tableAttr")].GetString(); + const TString& lang = value[TStringBuf("lang")].GetString(); + const TString& params = value[TStringBuf("parameters")].GetString(); + + ui32 options = 0; + if (req.RD.CgiParam.Has(TStringBuf("printAst"))) { + options |= TYqlAction::PrintAst; + } + + if (req.RD.CgiParam.Has(TStringBuf("printExpr"))) { + options |= TYqlAction::PrintExpr; + } + + if (req.RD.CgiParam.Has(TStringBuf("traceOpt"))) { + options |= TYqlAction::PrintTraceOpt; + } + + if (lang == TStringBuf("yql")) { + options |= TYqlAction::YqlProgram; + } else if (lang == TStringBuf("sql")) { + options |= TYqlAction::SqlProgram; + } + + TStringStream output; + NJson::TJsonWriter writer(&output, false); + writer.OpenMap(); + + TAction action(YqlServer_, writer, req, resp); + action.Perform(program, input, attr, options, params); + + writer.CloseMap(); + writer.Flush(); + resp.Body = TBlob::FromString(output.Str()); + resp.ContentType = TStringBuf("application/json"); + } + +private: + TYqlServer& YqlServer_; +}; + +} // namspace NNttp +} // namspace NYql diff --git a/ydb/library/yql/tools/yqlrun/ya.make b/ydb/library/yql/tools/yqlrun/ya.make new file mode 100644 index 0000000000..195c9dca81 --- /dev/null +++ b/ydb/library/yql/tools/yqlrun/ya.make @@ -0,0 +1,46 @@ +PROGRAM(yqlrun) + +ALLOCATOR(J) + +SRCS( + yqlrun.cpp + gateway_spec.cpp +) + +IF (OS_LINUX) + # prevent external python extensions to lookup protobuf symbols (and maybe + # other common stuff) in main binary + EXPORTS_SCRIPT(${ARCADIA_ROOT}/ydb/library/yql/tools/exports.symlist) +ENDIF() + +PEERDIR( + contrib/libs/protobuf + library/cpp/getopt + library/cpp/yson + ydb/library/yql/sql/pg + ydb/library/yql/core/facade + ydb/library/yql/core/file_storage + ydb/library/yql/core/file_storage/proto + ydb/library/yql/core/file_storage/http_download + ydb/library/yql/core/services/mounts + ydb/library/yql/minikql/comp_nodes/llvm + ydb/library/yql/protos + ydb/library/yql/public/udf/service/exception_policy + ydb/library/yql/utils/backtrace + ydb/library/yql/core + ydb/library/yql/sql/v1/format + ydb/library/yql/providers/common/codec + ydb/library/yql/providers/common/comp_nodes + ydb/library/yql/providers/common/proto + ydb/library/yql/providers/common/provider + ydb/library/yql/providers/common/udf_resolve + ydb/library/yql/providers/dq/provider + ydb/library/yql/providers/yt/gateway/file + ydb/library/yql/core/url_preprocessing + ydb/library/yql/tools/yqlrun/http + ydb/library/yql/parser/pg_wrapper +) + +YQL_LAST_ABI_VERSION() + +END() diff --git a/ydb/library/yql/tools/yqlrun/yqlrun.cpp b/ydb/library/yql/tools/yqlrun/yqlrun.cpp new file mode 100644 index 0000000000..36d75b847f --- /dev/null +++ b/ydb/library/yql/tools/yqlrun/yqlrun.cpp @@ -0,0 +1,984 @@ +#include "gateway_spec.h" + +#include <ydb/library/yql/tools/yqlrun/http/yql_server.h> + +#include <ydb/library/yql/providers/yt/gateway/file/yql_yt_file.h> +#include <ydb/library/yql/providers/yt/gateway/file/yql_yt_file_services.h> +#include <ydb/library/yql/providers/yt/provider/yql_yt_provider.h> +#include <ydb/library/yql/providers/yt/provider/yql_yt_provider_impl.h> +#include <ydb/library/yql/core/url_preprocessing/url_preprocessing.h> + +#include <ydb/library/yql/sql/v1/format/sql_format.h> + +#include <ydb/library/yql/providers/dq/provider/yql_dq_provider.h> +#include <ydb/library/yql/providers/common/codec/yql_codec.h> +#include <ydb/library/yql/providers/common/provider/yql_provider_names.h> +#include <ydb/library/yql/providers/common/udf_resolve/yql_simple_udf_resolver.h> +#include <ydb/library/yql/providers/common/udf_resolve/yql_outproc_udf_resolver.h> +#include <ydb/library/yql/providers/common/udf_resolve/yql_udf_resolver_with_index.h> +#include <ydb/library/yql/providers/common/proto/gateways_config.pb.h> +#include <ydb/library/yql/providers/common/comp_nodes/yql_factory.h> +#include <ydb/library/yql/minikql/invoke_builtins/mkql_builtins.h> +#include <ydb/library/yql/minikql/computation/mkql_computation_node.h> +#include <ydb/library/yql/minikql/comp_nodes/mkql_factories.h> +#include <ydb/library/yql/minikql/mkql_function_registry.h> +#include <ydb/library/yql/minikql/mkql_utils.h> +#include <ydb/library/yql/protos/yql_mount.pb.h> +#include <ydb/library/yql/core/yql_library_compiler.h> +#include <ydb/library/yql/core/facade/yql_facade.h> +#include <ydb/library/yql/core/file_storage/file_storage.h> +#include <ydb/library/yql/core/file_storage/http_download/http_download.h> +#include <ydb/library/yql/core/file_storage/proto/file_storage.pb.h> +#include <ydb/library/yql/core/peephole_opt/yql_opt_peephole_physical.h> +#include <ydb/library/yql/core/services/mounts/yql_mounts.h> +#include <ydb/library/yql/utils/log/log.h> +#include <ydb/library/yql/utils/backtrace/backtrace.h> +#include <ydb/library/yql/utils/log/tls_backend.h> +#include <ydb/library/yql/public/udf/udf_validate.h> +#include <ydb/library/yql/parser/pg_wrapper/interface/comp_factory.h> + +#include <ydb/core/util/pb.h> + +#include <library/cpp/logger/stream.h> +#include <library/cpp/svnversion/svnversion.h> +#include <library/cpp/getopt/last_getopt.h> + +#include <library/cpp/yson/public.h> +#include <library/cpp/yson/writer.h> + +#include <google/protobuf/text_format.h> + +#include <util/stream/file.h> +#include <util/system/user.h> +#include <util/folder/iterator.h> +#include <util/folder/dirut.h> +#include <util/string/join.h> +#include <util/string/builder.h> + +#ifdef __unix__ +#include <sys/resource.h> +#endif + +const ui32 PRETTY_FLAGS = NYql::TAstPrintFlags::PerLine | NYql::TAstPrintFlags::ShortQuote | + NYql::TAstPrintFlags::AdaptArbitraryContent; + +class TMultiProgs { +public: + TMultiProgs(NYql::TProgramFactory& factory, const TString& programFile, const TString& programText, size_t concurrentCount = 1) { + Infos.reserve(concurrentCount); + BaseProg = factory.Create(programFile, programText); + if (concurrentCount) { + factory.UnrepeatableRandom(); + } + for (auto i = concurrentCount; i > 0; --i) { + Infos.emplace_back(TProgInfo({factory.Create(programFile, programText), {}})); + } + } + + bool ParseYql() { + bool result = BaseProg->ParseYql(); + for (auto& info: Infos) { + info.Prog->ParseYql(); + } + + return result; + } + + bool ParseSql(const NSQLTranslation::TTranslationSettings& settings) { + bool result = BaseProg->ParseSql(settings); + for (auto& info: Infos) { + info.Prog->ParseSql(settings); + } + + return result; + } + + bool Compile(const TString& username) { + bool result = BaseProg->Compile(username); + for (auto& info: Infos) { + info.Prog->Compile(username); + } + + return result; + } + + template<class T> + bool CompareStreams(const TString& compareGoal, IOutputStream& out, const T& base, const T& concurrent) const { + const auto baseSize = base.Size(); + const auto concurentSize = concurrent.Size(); + if (baseSize == concurentSize && memcmp(base.Data(), concurrent.Data(), baseSize) == 0) { + return true; + } + out << "Difference " << compareGoal << " of cuncurrent mode is not the same as base run. Size base: " << baseSize << + ", size concurrent: " << concurentSize; + if (concurentSize) { + out << ", concurrent stream: " << Endl << concurrent.Data(); + } else { + out << ", base stream: " << Endl << base.Data(); + } + return false; + } + + void PrintExprTo(IOutputStream& out) { + TStringStream baseSS; + auto baseAst = ConvertToAst(*BaseProg->ExprRoot(), BaseProg->ExprCtx(), NYql::TExprAnnotationFlags::None, true); + baseAst.Root->PrettyPrintTo(baseSS, PRETTY_FLAGS); + for (auto& info: Infos) { + TStringStream ss; + auto ast = ConvertToAst(*info.Prog->ExprRoot(), BaseProg->ExprCtx(), NYql::TExprAnnotationFlags::None, true); + ast.Root->PrettyPrintTo(ss, PRETTY_FLAGS); + if (!CompareStreams("expr representation", out, baseSS, ss)) { + return; + } + } + out << baseSS.Data(); + } + + void PrintErrorsTo(IOutputStream& out) const { + TStringStream baseSS; + BaseProg->PrintErrorsTo(baseSS); + for (auto& info: Infos) { + TStringStream ss; + info.Prog->PrintErrorsTo(ss); + if (!CompareStreams("error", out, baseSS, ss)) { + return; + } + } + out << baseSS.Data(); + } + + void PrintAstTo(IOutputStream& out) const { + TStringStream baseSS; + BaseProg->AstRoot()->PrettyPrintTo(out, PRETTY_FLAGS); + for (auto& info: Infos) { + TStringStream ss; + info.Prog->AstRoot()->PrettyPrintTo(out, PRETTY_FLAGS); + if (!CompareStreams("AST", out, baseSS, ss)) { + return; + } + } + out << baseSS.Data(); + } + + void SetProgressWriter(NYql::TOperationProgressWriter writer) { + BaseProg->SetProgressWriter(writer); + for (auto& info: Infos) { + info.Prog->SetProgressWriter(writer); + } + } + + void SetValidateOptions(NKikimr::NUdf::EValidateMode validateMode) { + BaseProg->SetValidateOptions(validateMode); + for (auto& info: Infos) { + info.Prog->SetValidateOptions(validateMode); + } + } + + void SetParametersYson(const TString& parameters) { + BaseProg->SetParametersYson(parameters); + for (auto& info : Infos) { + info.Prog->SetParametersYson(parameters); + } + } + + void Print(IOutputStream* exprOut, IOutputStream* planOut) { + bool cleanPlan = true; + BaseProg->Print(exprOut, planOut, cleanPlan); + } + + void ResultsOut(IOutputStream& out) { + if (BaseProg->HasResults()) { + BaseProg->ConfigureYsonResultFormat(NYson::EYsonFormat::Pretty); + out << BaseProg->ResultsAsString(); + } + // Multirun results are ignored + } + + void DiscoveredDataOut(IOutputStream& out) { + if (auto data = BaseProg->GetDiscoveredData()) { + TStringInput in(*data); + NYson::ReformatYsonStream(&in, &out, NYson::EYsonFormat::Pretty); + } + } + + void LineageOut(IOutputStream& out) { + if (auto data = BaseProg->GetLineage()) { + TStringInput in(*data); + NYson::ReformatYsonStream(&in, &out, NYson::EYsonFormat::Pretty); + } + } + + NYql::TProgram::TStatus Run(const TString& username, IOutputStream* traceOut, IOutputStream* tracePlan, IOutputStream* exprOut, bool withTypes) { + YQL_ENSURE(Infos.empty()); + return BaseProg->Run(username, traceOut, tracePlan, exprOut, withTypes); + } + + NYql::TProgram::TStatus Optimize(const TString& username, IOutputStream* traceOut, IOutputStream* tracePlan, IOutputStream* exprOut, bool withTypes) { + YQL_ENSURE(Infos.empty()); + return BaseProg->Optimize(username, traceOut, tracePlan, exprOut, withTypes); + } + + NYql::TProgram::TStatus Validate(const TString& username, IOutputStream* exprOut, bool withTypes) { + YQL_ENSURE(Infos.empty()); + return BaseProg->Validate(username, exprOut, withTypes); + } + + NYql::TProgram::TStatus Discover(const TString& username) { + YQL_ENSURE(Infos.empty()); + return BaseProg->Discover(username); + } + + NYql::TProgram::TStatus Lineage(const TString& username, IOutputStream* traceOut, IOutputStream* exprOut, bool withTypes) { + YQL_ENSURE(Infos.empty()); + return BaseProg->Lineage(username, traceOut, exprOut, withTypes); + } + + NYql::TProgram::TStatus Peephole(const TString& username, IOutputStream* exprOut, bool withTypes) { + YQL_ENSURE(Infos.empty()); + using namespace NYql; + + class TPeepHolePipelineConfigurator : public IPipelineConfigurator { + public: + TPeepHolePipelineConfigurator() = default; + + void AfterCreate(TTransformationPipeline* pipeline) const final { + Y_UNUSED(pipeline); + } + + void AfterTypeAnnotation(TTransformationPipeline* pipeline) const final { + Y_UNUSED(pipeline); + } + + void AfterOptimize(TTransformationPipeline* pipeline) const final { + pipeline->Add(CreateTYtWideFlowTransformer(nullptr), "WideFlow"); + pipeline->Add(MakePeepholeOptimization(pipeline->GetTypeAnnotationContext()), "PeepHole"); + } + }; + + TPeepHolePipelineConfigurator config; + auto status = BaseProg->OptimizeWithConfig(username, config); + if (exprOut && BaseProg->ExprRoot()) { + auto ast = ConvertToAst(*BaseProg->ExprRoot(), BaseProg->ExprCtx(), withTypes ? TExprAnnotationFlags::Types : TExprAnnotationFlags::None, true); + ui32 prettyFlags = TAstPrintFlags::ShortQuote; + if (!withTypes) { + prettyFlags |= TAstPrintFlags::PerLine; + } + ast.Root->PrettyPrintTo(*exprOut, prettyFlags); + } + return status; + } + + NYql::TProgram::TStatus RunAsyncAndWait(const TString& username, IOutputStream* traceOut, IOutputStream* tracePlan, IOutputStream* exprOut, + bool withTypes, bool& emulateOutputForMultirun) { + NYql::TProgram::TStatus baseStatus = BaseProg->Run(username, traceOut, tracePlan, exprOut, withTypes); + // switch this flag only after base run + emulateOutputForMultirun = true; + for (auto& info: Infos) { + info.Future = info.Prog->RunAsync(username, nullptr, nullptr, nullptr, withTypes); + YQL_ENSURE(info.Future.Initialized()); + } + + for (bool wasAsync = true; wasAsync;) { + wasAsync = false; + for (auto& info: Infos) { + auto status = info.Future.GetValueSync(); + if (status == NYql::TProgram::TStatus::Async) { + wasAsync = true; + info.Future = info.Prog->ContinueAsync(); + } else if (status == NYql::TProgram::TStatus::Error) { + baseStatus = status; + } + } + } + return baseStatus; + } + +private: + struct TProgInfo { + NYql::TProgramPtr Prog; + NYql::TProgram::TFutureStatus Future; + }; + + NYql::TProgramPtr BaseProg; + TVector<TProgInfo> Infos; +}; + +using namespace NYql; +using namespace NKikimr::NMiniKQL; +using namespace NYql::NHttp; + +namespace NMiniKQL = NKikimr::NMiniKQL; + +class TStoreMappingFunctor: public NLastGetopt::IOptHandler { +public: + TStoreMappingFunctor(THashMap<TString, TString>* target, char delim = '@') + : Target(target) + , Delim(delim) + { + } + + void HandleOpt(const NLastGetopt::TOptsParser* parser) final { + const TStringBuf val(parser->CurValOrDef()); + const auto service = TString(val.After(Delim)); + auto res = Target->emplace(TString(val.Before(Delim)), service); + if (!res.second) { + /// force replace already exist parametr + res.first->second = service; + } + } + +private: + THashMap<TString, TString>* Target; + char Delim; +}; + +void CommonInit(const NLastGetopt::TOptsParseResult& res, const TString& udfResolverPath, bool filterSysCalls, + const TVector<TString>& udfsPaths, TFileStoragePtr fileStorage, + IUdfResolver::TPtr& udfResolver, NKikimr::NMiniKQL::IFunctionRegistry::TPtr funcRegistry, TUdfIndex::TPtr& udfIndex) { + + if (fileStorage && res.Has("scan-udfs")) { + if (!udfResolverPath) { + ythrow yexception() << "udf-resolver path must be specified when use 'scan-udfs'"; + } + + udfResolver = NCommon::CreateOutProcUdfResolver(funcRegistry.Get(), fileStorage, udfResolverPath, {}, {}, filterSysCalls, {}); + + Cerr << TInstant::Now().ToStringLocalUpToSeconds() << " Udf scanning started for " << udfsPaths.size() << " udfs ..." << Endl; + udfIndex = new TUdfIndex(); + LoadRichMetadataToUdfIndex(*udfResolver, udfsPaths, false, TUdfIndex::EOverrideMode::RaiseError, *udfIndex); + Cerr << TInstant::Now().ToStringLocalUpToSeconds() << " UdfIndex done." << Endl; + + udfResolver = NCommon::CreateUdfResolverWithIndex(udfIndex, udfResolver, fileStorage); + Cerr << TInstant::Now().ToStringLocalUpToSeconds() << " Udfs scanned" << Endl; + return; + } + + udfResolver = fileStorage && udfResolverPath + ? NCommon::CreateOutProcUdfResolver(funcRegistry.Get(), fileStorage, udfResolverPath, {}, {}, false, {}) + : NCommon::CreateSimpleUdfResolver(funcRegistry.Get(), fileStorage, true); +} + +template <typename TMessage> +THolder<TMessage> ParseProtoConfig(const TString& cfgFile) { + auto config = MakeHolder<TMessage>(); + TString configData = TFileInput(cfgFile).ReadAll();; + + using ::google::protobuf::TextFormat; + if (!TextFormat::ParseFromString(configData, config.Get())) { + Cerr << "Bad format of gateways configuration"; + return {}; + } + + return config; +} + +int Main(int argc, const char *argv[]) +{ + Y_UNUSED(NUdf::GetStaticSymbols()); + using namespace NLastGetopt; + TOpts opts = TOpts::Default(); + TString programFile; + TVector<TString> tablesMappingList; + THashMap<TString, TString> tablesMapping; + TVector<TString> filesMappingList; + TUserDataTable filesMapping; + TVector<TString> urlsMappingList; + TString exprFile; + TString resultFile; + TString planFile; + TString errFile; + TString tmpDir; + TVector<TString> udfsPaths; + TString udfsDir; + TString validateModeStr(NUdf::ValidateModeAsStr(NUdf::EValidateMode::Greedy)); + THashSet<TString> gatewayTypes; + TString mountConfig; + TString udfResolverPath; + bool udfResolverFilterSyscalls = false; + THashMap<TString, TString> clusterMapping; + THashSet<TString> sqlFlags; + clusterMapping["plato"] = YtProviderName; + ui32 progsConcurrentCount = 0; + TString paramsFile; + ui16 syntaxVersion; + ui64 memLimit; + TString gatewaysCfgFile; + TString fsCfgFile; + + opts.AddHelpOption(); + opts.AddLongOption('p', "program", "program file").StoreResult<TString>(&programFile); + opts.AddLongOption('s', "sql", "program is SQL query").NoArgument(); + opts.AddLongOption('t', "table", "table@file").AppendTo(&tablesMappingList); + opts.AddLongOption('C', "cluster", "set cluster to service mapping").RequiredArgument("name@service").Handler(new TStoreMappingFunctor(&clusterMapping)); + opts.AddLongOption("ndebug", "should be at first argument, do not show debug info in error output").NoArgument(); + opts.AddLongOption("parse-only", "exit after program has been parsed").NoArgument(); + opts.AddLongOption("print-ast", "print AST after loading").NoArgument(); + opts.AddLongOption("compile-only", "exit after program has been compiled").NoArgument(); + opts.AddLongOption("print-expr", "print rebuild AST before execution").NoArgument(); + opts.AddLongOption("with-types", "print types annotation").NoArgument(); + opts.AddLongOption("trace-opt", "print AST in the begin of each transformation").NoArgument(); + opts.AddLongOption("expr-file", "print AST to that file instead of stdout").StoreResult<TString>(&exprFile); + opts.AddLongOption("print-result", "print program execution result to stdout").NoArgument(); + opts.AddLongOption("result-file", "print program execution result to file").StoreResult<TString>(&resultFile); + opts.AddLongOption("plan-file", "print program plan to file").StoreResult<TString>(&planFile); + opts.AddLongOption("err-file", "print validate/optimize/runtime errors to file").StoreResult<TString>(&errFile); + opts.AddLongOption('P',"trace-plan", "print plan before execution").NoArgument(); + opts.AddLongOption('L', "show-log", "show logs").NoArgument(); + opts.AddLongOption('D', "discover", "discover tables in the program").NoArgument(); + opts.AddLongOption("validate", "exit after program has been validated").NoArgument(); + opts.AddLongOption("lineage", "exit after data lineage has been calculated").NoArgument(); + opts.AddLongOption('O',"optimize", "optimize expression").NoArgument(); + opts.AddLongOption('R',"run", "run expression using input/output tables").NoArgument(); + opts.AddLongOption("peephole", "perform peephole stage of expression using input/output tables").NoArgument(); + opts.AddLongOption('M',"multirun", "run expression in multi-evaluate (race) mode, as option set concurrent count").StoreResult(&progsConcurrentCount).DefaultValue(progsConcurrentCount); + opts.AddLongOption('f', "file", "name@path").AppendTo(&filesMappingList); + opts.AddLongOption('U', "url", "name@path").AppendTo(&urlsMappingList); + opts.AddLongOption('m', "mounts", "Mount points config file.").StoreResult(&mountConfig); + opts.AddLongOption("opt-collision", "provider optimize collision mode").NoArgument(); + opts.AddLongOption("keep-temp", "keep temporary tables").NoArgument(); + opts.AddLongOption("full-expr", "avoid buffering of expr/plan").NoArgument(); + opts.AddLongOption("show-progress", "report operation progress").NoArgument(); + opts.AddLongOption("tmp-dir", "directory for temporary tables").StoreResult<TString>(&tmpDir); + opts.AddLongOption("reverse-mrkey", "reverse keys for Map/Reduce opeations").NoArgument(); + opts.AddLongOption('u', "udf", "Load shared library with UDF by given path").AppendTo(&udfsPaths); + opts.AddLongOption("udfs-dir", "Load all shared libraries with UDFs found in given directory").StoreResult(&udfsDir); + opts.AddLongOption("mem-info", "Print memory usage information").NoArgument(); + opts.AddLongOption("validate-mode", "validate udf mode, available values: " + NUdf::ValidateModeAvailables()).StoreResult<TString>(&validateModeStr).DefaultValue(validateModeStr); + opts.AddLongOption('G', "gateways", "used gateways").SplitHandler(&gatewayTypes, ',').DefaultValue(YtProviderName); + opts.AddLongOption("udf-resolver", "Path to udf-resolver").Optional().RequiredArgument("PATH").StoreResult(&udfResolverPath); + opts.AddLongOption("udf-resolver-filter-syscalls", "Filter syscalls in udf resolver") + .Optional() + .NoArgument() + .SetFlag(&udfResolverFilterSyscalls); + opts.AddLongOption("scan-udfs", "Scan specified udfs with external udf-resolver to use static function registry").NoArgument(); + opts.AddLongOption("params-file", "Query parameters values in YSON format").StoreResult(¶msFile); + + opts.AddLongOption("sql-flags", "SQL translator pragma flags").SplitHandler(&sqlFlags, ','); + opts.AddLongOption("syntax-version", "SQL syntax version").StoreResult(&syntaxVersion).DefaultValue(1); + opts.AddLongOption("ansi-lexer", "Use ansi lexer").NoArgument(); + opts.AddLongOption("assume-ydb-on-slash", "Assume YDB provider if cluster name starts with '/'").NoArgument(); + opts.AddLongOption("mem-limit", "Set memory limit in megabytes").StoreResult(&memLimit).DefaultValue(0); + opts.AddLongOption("gateways-cfg", "gateways configuration file").Optional().RequiredArgument("FILE").StoreResult(&gatewaysCfgFile); + opts.AddLongOption("fs-cfg", "fs configuration file").Optional().RequiredArgument("FILE").StoreResult(&fsCfgFile); + opts.AddLongOption("test-format", "compare formatted query's AST with the original query's AST (only syntaxVersion=1 is supported)").NoArgument(); + opts.AddLongOption("show-kernels", "show all Arrow kernel families").NoArgument(); + + opts.SetFreeArgsMax(0); + TOptsParseResult res(&opts, argc, argv); + auto builtins = CreateBuiltinRegistry(); + if (res.Has("show-kernels")) { + auto families = builtins->GetAllKernelFamilies(); + Sort(families, [](const auto& x, const auto& y) { return x.first < y.first; }); + ui64 totalKernels = 0; + for (const auto& f : families) { + auto numKernels = f.second->GetAllKernels().size(); + Cout << f.first << ": " << numKernels << " kernels\n"; + totalKernels += numKernels; + } + + Cout << "Total kernel families: " << families.size() << ", kernels: " << totalKernels << "\n"; + return 0; + } + + const bool parseOnly = res.Has("parse-only"); + const bool compileOnly = res.Has("compile-only"); + const bool hasValidate = !parseOnly && !compileOnly; + if (hasValidate && !gatewayTypes.contains(YtProviderName)) { + Cerr << "At least one gateway from the list " << Join(",", YtProviderName).Quote() << " must be specified" << Endl; + return 1; + } + + for (auto& s: tablesMappingList) { + TStringBuf tableName, filePath; + TStringBuf(s).Split('@', tableName, filePath); + if (tableName.empty() || filePath.empty()) { + Cerr << "Incorrect table mapping, expected form table@file, e.g. yt.plato.Input@input.txt" << Endl; + return 1; + } + tablesMapping[tableName] = filePath; + } + + if (hasValidate) { + for (auto& s : filesMappingList) { + TStringBuf fileName, filePath; + TStringBuf(s).Split('@', fileName, filePath); + if (fileName.empty() || filePath.empty()) { + Cerr << "Incorrect file mapping, expected form name@path, e.g. MyFile@file.txt" << Endl; + return 1; + } + + auto& file = filesMapping[TUserDataKey::File(GetDefaultFilePrefix() + fileName)]; + file.Type = EUserDataType::PATH; + file.Data = filePath; + } + + for (auto& s : urlsMappingList) { + TStringBuf name, path; + TStringBuf(s).Split('@', name, path); + if (name.empty() || path.empty()) { + Cerr << "Incorrect url mapping, expected form name@path, e.g. MyFile@sbr:123456" << Endl; + return 1; + } + + auto& block = filesMapping[TUserDataKey::File(GetDefaultFilePrefix() + name)]; + block.Type = EUserDataType::URL; + block.Data = path; + } + } + + if (memLimit) { +#ifdef __unix__ + memLimit *= 1024 * 1024; + + struct rlimit rl; + + if (getrlimit(RLIMIT_AS, &rl)) { + throw TSystemError() << "Cannot getrlimit(RLIMIT_AS)"; + } + + rl.rlim_cur = memLimit; + if (setrlimit(RLIMIT_AS, &rl)) { + throw TSystemError() << "Cannot setrlimit(RLIMIT_AS) to " << memLimit << " bytes"; + } +#else + Cerr << "Memory limit can not be set on this platfrom" << Endl; + return 1; +#endif + } + + IOutputStream* errStream = &Cerr; + THolder<TFixedBufferFileOutput> errFileHolder; + if (!errFile.empty()) { + errFileHolder.Reset(new TFixedBufferFileOutput(errFile)); + errStream = errFileHolder.Get(); + } + + TExprContext ctx; + IModuleResolver::TPtr moduleResolver; + if (!mountConfig.empty()) { + TModulesTable modules; + NYqlMountConfig::TMountConfig mount; + Y_VERIFY(NKikimr::ParsePBFromFile(mountConfig, &mount)); + FillUserDataTableFromFileSystem(mount, filesMapping); + + if (!CompileLibraries(filesMapping, ctx, modules)) { + *errStream << "Errors on compile libraries:" << Endl; + ctx.IssueManager.GetIssues().PrintTo(*errStream); + return -1; + } + + moduleResolver = std::make_shared<TModuleResolver>(std::move(modules), ctx.NextUniqueId, clusterMapping, sqlFlags, hasValidate); + } else { + if (!GetYqlDefaultModuleResolver(ctx, moduleResolver, clusterMapping, hasValidate)) { + *errStream << "Errors loading default YQL libraries:" << Endl; + ctx.IssueManager.GetIssues().PrintTo(*errStream); + return -1; + } + } + + TExprContext::TFreezeGuard freezeGuard(ctx); + + TString programText; + if (programFile.empty()) { + Cerr << "Missing --program argument\n"; + return -1; + } + + if (programFile == TStringBuf("-")) { + programFile = TStringBuf("-stdin-"); + programText = Cin.ReadAll(); + } else { + programText = TFileInput(programFile).ReadAll(); + } + + THolder<TFileStorageConfig> fsConfig; + if (!fsCfgFile.empty()) { + fsConfig = ParseProtoConfig<TFileStorageConfig>(fsCfgFile); + if (!fsConfig) { + return 1; + } + } else { + fsConfig = MakeHolder<TFileStorageConfig>(); + } + + THolder<TGatewaysConfig> gatewaysConfig; + if (!gatewaysCfgFile.empty()) { + gatewaysConfig = ParseProtoConfig<TGatewaysConfig>(gatewaysCfgFile); + if (!gatewaysConfig) { + return 1; + } + if (gatewaysConfig->HasSqlCore()) { + sqlFlags.insert(gatewaysConfig->GetSqlCore().GetTranslationFlags().begin(), gatewaysConfig->GetSqlCore().GetTranslationFlags().end()); + } + } + + TFileStoragePtr fileStorage; + if (hasValidate) { + NMiniKQL::FindUdfsInDir(udfsDir, &udfsPaths); + + fileStorage = WithAsync(CreateFileStorage(*fsConfig)); + } + + IUdfResolver::TPtr udfResolver; + auto funcRegistry = CreateFunctionRegistry(&NYql::NBacktrace::KikimrBackTrace, CreateBuiltinRegistry(), true, udfsPaths); + TUdfIndex::TPtr udfIndex; + CommonInit(res, udfResolverPath, udfResolverFilterSyscalls, udfsPaths, fileStorage, udfResolver, funcRegistry, udfIndex); + + TAutoPtr<IThreadPool> ytExecutionQueue; + TVector<TDataProviderInitializer> dataProvidersInit; + + auto dqCompFactory = NKikimr::NMiniKQL::GetCompositeWithBuiltinFactory({ + NKikimr::NMiniKQL::GetYqlFactory(), + NYql::GetPgFactory() + }); + + dataProvidersInit.push_back(GetDqDataProviderInitializer([](const TDqStatePtr&){ + return new TNullTransformer; + }, {}, dqCompFactory, {}, fileStorage)); + + bool emulateOutputForMultirun = false; + if (hasValidate) { + if (gatewayTypes.contains(YtProviderName) || res.Has("opt-collision")) { + auto yqlNativeServices = NFile::TYtFileServices::Make(funcRegistry.Get(), tablesMapping, fileStorage, tmpDir, res.Has("keep-temp")); + auto ytNativeGateway = CreateYtFileGateway(yqlNativeServices, &emulateOutputForMultirun); + dataProvidersInit.push_back(GetYtNativeDataProviderInitializer(ytNativeGateway)); + } + } + + if (hasValidate && res.Has("opt-collision")) { + ExtProviderSpecific(funcRegistry.Get(), dataProvidersInit); + } + + TProgramFactory factory(true, funcRegistry.Get(), ctx.NextUniqueId, dataProvidersInit, "yqlrun"); + factory.AddUserDataTable(filesMapping); + factory.SetModules(moduleResolver); + factory.SetFileStorage(fileStorage); + if (gatewaysConfig && gatewaysConfig->HasFs()) { + factory.SetUrlPreprocessing(new NYql::TUrlPreprocessing(*gatewaysConfig)); + } + factory.SetUdfIndex(udfIndex, new TUdfIndexPackageSet()); + factory.SetUdfResolver(udfResolver); + factory.SetGatewaysConfig(gatewaysConfig.Get()); + factory.EnableRangeComputeFor(); + + auto program = MakeHolder<TMultiProgs>(factory, programFile, programText, progsConcurrentCount); + if (res.Has("show-progress")) { + program->SetProgressWriter([](const TOperationProgress& progress) { + Cerr << "Operation: [" << progress.Category << "] " << progress.Id << ", state: " << progress.State << "\n"; + }); + } + + if (paramsFile) { + TString parameters = TFileInput(paramsFile).ReadAll(); + program->SetParametersYson(parameters); + } + + if (res.Has("sql")) { + google::protobuf::Arena arena; + NSQLTranslation::TTranslationSettings settings; + settings.Arena = &arena; + settings.ClusterMapping = clusterMapping; + settings.Flags = sqlFlags; + settings.SyntaxVersion = syntaxVersion; + settings.AnsiLexer = res.Has("ansi-lexer"); + settings.V0Behavior = NSQLTranslation::EV0Behavior::Report; + settings.AssumeYdbOnClusterWithSlash = res.Has("assume-ydb-on-slash"); + if (res.Has("discover")) { + settings.Mode = NSQLTranslation::ESqlMode::DISCOVERY; + } + if (!program->ParseSql(settings)) { + program->PrintErrorsTo(*errStream); + return 1; + } + if (res.Has("test-format") && syntaxVersion == 1) { + TString formattedProgramText; + NYql::TIssues issues; + auto formatter = NSQLFormat::MakeSqlFormatter(settings); + if (!formatter->Format(programText, formattedProgramText, issues)) { + Cerr << "Format failed: "; + issues.PrintTo(Cerr); + return 1; + } + + auto frmProgram = MakeHolder<TMultiProgs>(factory, "formatted SQL", formattedProgramText, progsConcurrentCount); + if (!frmProgram->ParseSql(settings)) { + frmProgram->PrintErrorsTo(*errStream); + return 1; + } + + TStringStream SrcQuery, FrmQuery; + + program->PrintAstTo(SrcQuery); + frmProgram->PrintAstTo(FrmQuery); + if (SrcQuery.Str() != FrmQuery.Str()) { + Cerr << "source query's AST and formatted query's AST are not same\n"; + return 1; + } + } + } else { + if (!program->ParseYql()) { + program->PrintErrorsTo(*errStream); + return 1; + } + } + + if (res.Has("print-ast")) { + program->PrintAstTo(Cout); + } + + + if (res.Has("parse-only")) + return 0; + + const TString username = GetUsername(); + bool withTypes = res.Has("with-types"); + IOutputStream* traceOut = res.Has("trace-opt") ? &Cerr : nullptr; + + IOutputStream* exprOut = nullptr; + THolder<TFixedBufferFileOutput> exprFileHolder; + if (res.Has("print-expr")) { + exprOut = &Cout; + } else if (!exprFile.empty()) { + exprFileHolder.Reset(new TFixedBufferFileOutput(exprFile)); + exprOut = exprFileHolder.Get(); + } + + IOutputStream* tracePlan = nullptr; + THolder<TFixedBufferFileOutput> planFileHolder; + if (res.Has("trace-plan")) { + tracePlan = &Cout; + } + else if (!planFile.empty()) { + planFileHolder.Reset(new TFixedBufferFileOutput(planFile)); + tracePlan = planFileHolder.Get(); + } + + if (res.Has("show-log")) { + using namespace ::NYql::NLog; + InitLogger(&Cerr); + NLog::EComponentHelpers::ForEach([](NLog::EComponent c) { + YqlLogger().SetComponentLevel(c, ELevel::DEBUG); + }); + } + if (res.Has("trace-opt")) { + NLog::YqlLogger().SetComponentLevel(NLog::EComponent::Core, NLog::ELevel::TRACE); + NLog::YqlLogger().SetComponentLevel(NLog::EComponent::CoreEval, NLog::ELevel::TRACE); + NLog::YqlLogger().SetComponentLevel(NLog::EComponent::CorePeepHole, NLog::ELevel::TRACE); + } + + program->SetValidateOptions(NUdf::ValidateModeByStr(validateModeStr)); + + TProgram::TStatus status = TProgram::TStatus::Ok; + const bool fullExpr = res.Has("full-expr"); + IOutputStream* fullTracePlan = fullExpr ? tracePlan : nullptr; + IOutputStream* fullExprOut = fullExpr ? exprOut : nullptr; + + if (!program->Compile(username)) { + program->PrintErrorsTo(*errStream); + return 1; + } + + if (res.Has("compile-only")) { + if (res.Has("print-expr")) { + program->PrintExprTo(Cout); + } + return 0; + } + + if (res.Has("multirun")) { + status = program->RunAsyncAndWait(username, traceOut, fullTracePlan, fullExprOut, withTypes, emulateOutputForMultirun); + } else if (res.Has("peephole")) { + status = program->Peephole(username, exprOut, withTypes); + } else if (res.Has("run")) { + status = program->Run(username, traceOut, fullTracePlan, fullExprOut, withTypes); + } else if (res.Has("optimize")) { + status = program->Optimize(username, traceOut, fullTracePlan, fullExprOut, withTypes); + } else if (res.Has("validate")) { + status = program->Validate(username, exprOut, withTypes); + } else if (res.Has("discover")) { + status = program->Discover(username); + } else if (res.Has("lineage")) { + status = program->Lineage(username, traceOut, exprOut, withTypes); + } + + program->PrintErrorsTo(*errStream); + if (status == TProgram::TStatus::Error) { + return 1; + } + + if (!fullExpr && !res.Has("peephole")) { + program->Print(exprOut, tracePlan); + } + + IOutputStream* resultOut = nullptr; + THolder<TFixedBufferFileOutput> resultFileHolder; + if (res.Has("print-result")) { + resultOut = &Cout; + } else if (!resultFile.empty()) { + resultFileHolder.Reset(new TFixedBufferFileOutput(resultFile)); + resultOut = resultFileHolder.Get(); + } + + if (resultOut) { + if (res.Has("discover")) { + program->DiscoveredDataOut(*resultOut); + } else if (res.Has("lineage")) { + program->LineageOut(*resultOut); + } else { + program->ResultsOut(*resultOut); + } + } + + NLog::CleanupLogger(); + + return 0; +} + +int RunUI(int argc, const char* argv[]) +{ + TVector<TString> udfsPaths; + TString udfsDir; + TString mountConfig; + TVector<TString> filesMappingList; + TString udfResolverPath; + bool udfResolverFilterSyscalls = false; + TString gatewaysCfgFile; + TString fsCfgFile; + + THashMap<TString, TString> clusterMapping; + clusterMapping["plato"] = YtProviderName; + + NLastGetopt::TOpts opts = NLastGetopt::TOpts::Default(); + opts.AddLongOption('u', "udf", "Load shared library with UDF by given path").AppendTo(&udfsPaths); + opts.AddLongOption("udfs-dir", "Load all shared libraries with UDFs found in given directory").StoreResult(&udfsDir); + opts.AddLongOption('m', "mounts", "Mount points config file.").StoreResult(&mountConfig); + opts.AddLongOption('f', "file", "name@path").AppendTo(&filesMappingList); + opts.AddLongOption("udf-resolver", "Path to udf-resolver").Optional().RequiredArgument("PATH").StoreResult(&udfResolverPath); + opts.AddLongOption("udf-resolver-filter-syscalls", "Filter syscalls in udf resolver") + .Optional() + .NoArgument() + .SetFlag(&udfResolverFilterSyscalls); + opts.AddLongOption("scan-udfs", "Scan specified udfs with external udf resolver to use static function registry").NoArgument(); + opts.AddLongOption('C', "cluster", "set cluster to service mapping").RequiredArgument("name@service").Handler(new TStoreMappingFunctor(&clusterMapping)); + opts.AddLongOption("gateways-cfg", "gateways configuration file").Optional().RequiredArgument("FILE").StoreResult(&gatewaysCfgFile); + opts.AddLongOption("fs-cfg", "fs configuration file").Optional().RequiredArgument("FILE").StoreResult(&fsCfgFile); + + TServerConfig config; + config.SetAssetsPath("http/www"); + config.InitCliOptions(opts); + NLastGetopt::TOptsParseResult res(&opts, argc, argv); + config.ParseFromCli(res); + + TUserDataTable userData; + for (auto& s : filesMappingList) { + TStringBuf fileName, filePath; + TStringBuf(s).Split('@', fileName, filePath); + if (fileName.empty() || filePath.empty()) { + Cerr << "Incorrect file mapping, expected form name@path, e.g. MyFile@file.txt" << Endl; + return 1; + } + + auto& file = userData[TUserDataKey::File(GetDefaultFilePrefix() + fileName)]; + file.Type = EUserDataType::PATH; + file.Data = filePath; + } + + NMiniKQL::FindUdfsInDir(udfsDir, &udfsPaths); + + THolder<TGatewaysConfig> gatewaysConfig; + if (!gatewaysCfgFile.empty()) { + gatewaysConfig = ParseProtoConfig<TGatewaysConfig>(gatewaysCfgFile); + if (!gatewaysConfig) { + return -1; + } + } + + THolder<TFileStorageConfig> fsConfig; + if (!fsCfgFile.empty()) { + fsConfig = ParseProtoConfig<TFileStorageConfig>(fsCfgFile); + if (!fsConfig) { + return 1; + } + } else { + fsConfig = MakeHolder<TFileStorageConfig>(); + } + + auto fileStorage = WithAsync(CreateFileStorage(*fsConfig)); + + IUdfResolver::TPtr udfResolver; + auto funcRegistry = CreateFunctionRegistry(&NYql::NBacktrace::KikimrBackTrace, CreateBuiltinRegistry(), false, udfsPaths); + TUdfIndex::TPtr udfIndex; + + CommonInit(res, udfResolverPath, udfResolverFilterSyscalls, udfsPaths, fileStorage, udfResolver, funcRegistry, udfIndex); + + TExprContext ctx; + IModuleResolver::TPtr moduleResolver; + if (!mountConfig.empty()) { + TModulesTable modules; + NYqlMountConfig::TMountConfig mount; + Y_VERIFY(NKikimr::ParsePBFromFile(mountConfig, &mount)); + FillUserDataTableFromFileSystem(mount, userData); + + if (!CompileLibraries(userData, ctx, modules)) { + Cerr << "Errors on compile libraries:" << Endl; + ctx.IssueManager.GetIssues().PrintTo(Cerr); + return -1; + } + + moduleResolver = std::make_shared<TModuleResolver>(std::move(modules), ctx.NextUniqueId, clusterMapping, THashSet<TString>()); + } else { + if (!GetYqlDefaultModuleResolver(ctx, moduleResolver, clusterMapping)) { + Cerr << "Errors loading default YQL libraries:" << Endl; + ctx.IssueManager.GetIssues().PrintTo(Cerr); + return -1; + } + } + + TString fn = "pkg/a/b/c.sql"; + TString content0 = "$sqr = ($x) -> { return 2 * $x * $x; }; export $sqr;"; + TString content1 = "$sqr = ($x) -> { return 3 * $x * $x; }; export $sqr;"; + moduleResolver->RegisterPackage("a.b"); + if (!moduleResolver->AddFromMemory(fn, content0, ctx, 1, 0) || !moduleResolver->AddFromMemory(fn, content1, ctx, 1, 1)) { + Cerr << "Unable to compile SQL library" << Endl; + ctx.IssueManager.GetIssues().PrintTo(Cerr); + return -1; + } + + TExprContext::TFreezeGuard freezeGuard(ctx); + + NLog::YqlLoggerScope logger(new NLog::TTlsLogBackend(new TStreamLogBackend(&Cerr))); + NLog::YqlLogger().SetComponentLevel(NLog::EComponent::Core, NLog::ELevel::DEBUG); + NLog::YqlLogger().SetComponentLevel(NLog::EComponent::CoreEval, NLog::ELevel::DEBUG); + NLog::YqlLogger().SetComponentLevel(NLog::EComponent::CorePeepHole, NLog::ELevel::DEBUG); + + auto server = CreateYqlServer(config, + funcRegistry.Get(), udfIndex, ctx.NextUniqueId, + userData, + std::move(gatewaysConfig), + moduleResolver, udfResolver, fileStorage); + server->Start(); + server->Wait(); + + return 0; +} + +int main(int argc, const char *argv[]) { + if (argc > 1 && TString(argv[1]) != TStringBuf("--ndebug")) { + Cerr << "yqlrun ABI version: " << NKikimr::NUdf::CurrentAbiVersionStr() << Endl; + } + + NYql::NBacktrace::RegisterKikimrFatalActions(); + NYql::NBacktrace::EnableKikimrSymbolize(); + + try { + if (argc > 1 && TString(argv[1]) == TStringBuf("ui")) { + return RunUI(argc, argv); + } else { + return Main(argc, argv); + } + } + catch (...) { + Cerr << CurrentExceptionMessage() << Endl; + return 1; + } +} |