diff options
author | vvvv <vvvv@ydb.tech> | 2023-08-01 17:56:49 +0300 |
---|---|---|
committer | vvvv <vvvv@ydb.tech> | 2023-08-01 17:56:49 +0300 |
commit | bbd8e0e8a955abfb33f933a5653d97d3eef57285 (patch) | |
tree | 9f033a6ed845793a60cfcd91fea5e2a76aa3fce9 | |
parent | 20efd2f292b02a79125f61b2b5790a89625d674a (diff) | |
download | ydb-bbd8e0e8a955abfb33f933a5653d97d3eef57285.tar.gz |
Move embedded to public
-rw-r--r-- | ydb/library/yql/public/CMakeLists.txt | 1 | ||||
-rw-r--r-- | ydb/library/yql/public/embedded/CMakeLists.darwin-x86_64.txt | 48 | ||||
-rw-r--r-- | ydb/library/yql/public/embedded/CMakeLists.linux-aarch64.txt | 49 | ||||
-rw-r--r-- | ydb/library/yql/public/embedded/CMakeLists.linux-x86_64.txt | 49 | ||||
-rw-r--r-- | ydb/library/yql/public/embedded/CMakeLists.txt | 17 | ||||
-rw-r--r-- | ydb/library/yql/public/embedded/CMakeLists.windows-x86_64.txt | 48 | ||||
-rw-r--r-- | ydb/library/yql/public/embedded/ya.make | 41 | ||||
-rw-r--r-- | ydb/library/yql/public/embedded/yql_embedded.cpp | 556 | ||||
-rw-r--r-- | ydb/library/yql/public/embedded/yql_embedded.h | 83 | ||||
-rw-r--r-- | ydb/library/yql/public/ya.make | 1 |
10 files changed, 893 insertions, 0 deletions
diff --git a/ydb/library/yql/public/CMakeLists.txt b/ydb/library/yql/public/CMakeLists.txt index 812b435b79..25984d54c8 100644 --- a/ydb/library/yql/public/CMakeLists.txt +++ b/ydb/library/yql/public/CMakeLists.txt @@ -7,6 +7,7 @@ add_subdirectory(decimal) +add_subdirectory(embedded) add_subdirectory(fastcheck) add_subdirectory(issue) add_subdirectory(purecalc) diff --git a/ydb/library/yql/public/embedded/CMakeLists.darwin-x86_64.txt b/ydb/library/yql/public/embedded/CMakeLists.darwin-x86_64.txt new file mode 100644 index 0000000000..a50913ea6b --- /dev/null +++ b/ydb/library/yql/public/embedded/CMakeLists.darwin-x86_64.txt @@ -0,0 +1,48 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(yql-public-embedded) +target_compile_options(yql-public-embedded PRIVATE + -DUSE_CURRENT_UDF_ABI_VERSION +) +target_link_libraries(yql-public-embedded PUBLIC + contrib-libs-cxxsupp + yutil + contrib-libs-protobuf + library-cpp-resource + library-cpp-yson + cpp-yson-node + cpp-mapreduce-client + cpp-mapreduce-common + library-yql-ast + yql-sql-pg + yql-core-facade + yql-core-file_storage + core-file_storage-defs + core-file_storage-proto + core-file_storage-http_download + core-services-mounts + yql-core-user_data + library-yql-minikql + library-yql-protos + udf-service-exception_policy + yql-utils-backtrace + yql-utils-log + yql-parser-pg_wrapper + providers-common-proto + providers-common-udf_resolve + yql-core-url_preprocessing + yt-gateway-native + yt-lib-log + yt-lib-yt_download + providers-yt-provider +) +target_sources(yql-public-embedded PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/library/yql/public/embedded/yql_embedded.cpp +) diff --git a/ydb/library/yql/public/embedded/CMakeLists.linux-aarch64.txt b/ydb/library/yql/public/embedded/CMakeLists.linux-aarch64.txt new file mode 100644 index 0000000000..fb20043f75 --- /dev/null +++ b/ydb/library/yql/public/embedded/CMakeLists.linux-aarch64.txt @@ -0,0 +1,49 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(yql-public-embedded) +target_compile_options(yql-public-embedded PRIVATE + -DUSE_CURRENT_UDF_ABI_VERSION +) +target_link_libraries(yql-public-embedded PUBLIC + contrib-libs-linux-headers + contrib-libs-cxxsupp + yutil + contrib-libs-protobuf + library-cpp-resource + library-cpp-yson + cpp-yson-node + cpp-mapreduce-client + cpp-mapreduce-common + library-yql-ast + yql-sql-pg + yql-core-facade + yql-core-file_storage + core-file_storage-defs + core-file_storage-proto + core-file_storage-http_download + core-services-mounts + yql-core-user_data + library-yql-minikql + library-yql-protos + udf-service-exception_policy + yql-utils-backtrace + yql-utils-log + yql-parser-pg_wrapper + providers-common-proto + providers-common-udf_resolve + yql-core-url_preprocessing + yt-gateway-native + yt-lib-log + yt-lib-yt_download + providers-yt-provider +) +target_sources(yql-public-embedded PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/library/yql/public/embedded/yql_embedded.cpp +) diff --git a/ydb/library/yql/public/embedded/CMakeLists.linux-x86_64.txt b/ydb/library/yql/public/embedded/CMakeLists.linux-x86_64.txt new file mode 100644 index 0000000000..fb20043f75 --- /dev/null +++ b/ydb/library/yql/public/embedded/CMakeLists.linux-x86_64.txt @@ -0,0 +1,49 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(yql-public-embedded) +target_compile_options(yql-public-embedded PRIVATE + -DUSE_CURRENT_UDF_ABI_VERSION +) +target_link_libraries(yql-public-embedded PUBLIC + contrib-libs-linux-headers + contrib-libs-cxxsupp + yutil + contrib-libs-protobuf + library-cpp-resource + library-cpp-yson + cpp-yson-node + cpp-mapreduce-client + cpp-mapreduce-common + library-yql-ast + yql-sql-pg + yql-core-facade + yql-core-file_storage + core-file_storage-defs + core-file_storage-proto + core-file_storage-http_download + core-services-mounts + yql-core-user_data + library-yql-minikql + library-yql-protos + udf-service-exception_policy + yql-utils-backtrace + yql-utils-log + yql-parser-pg_wrapper + providers-common-proto + providers-common-udf_resolve + yql-core-url_preprocessing + yt-gateway-native + yt-lib-log + yt-lib-yt_download + providers-yt-provider +) +target_sources(yql-public-embedded PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/library/yql/public/embedded/yql_embedded.cpp +) diff --git a/ydb/library/yql/public/embedded/CMakeLists.txt b/ydb/library/yql/public/embedded/CMakeLists.txt new file mode 100644 index 0000000000..f8b31df0c1 --- /dev/null +++ b/ydb/library/yql/public/embedded/CMakeLists.txt @@ -0,0 +1,17 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + +if (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND NOT HAVE_CUDA) + include(CMakeLists.linux-aarch64.txt) +elseif (CMAKE_SYSTEM_NAME STREQUAL "Darwin" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64") + include(CMakeLists.darwin-x86_64.txt) +elseif (WIN32 AND CMAKE_SYSTEM_PROCESSOR STREQUAL "AMD64" AND NOT HAVE_CUDA) + include(CMakeLists.windows-x86_64.txt) +elseif (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND NOT HAVE_CUDA) + include(CMakeLists.linux-x86_64.txt) +endif() diff --git a/ydb/library/yql/public/embedded/CMakeLists.windows-x86_64.txt b/ydb/library/yql/public/embedded/CMakeLists.windows-x86_64.txt new file mode 100644 index 0000000000..a50913ea6b --- /dev/null +++ b/ydb/library/yql/public/embedded/CMakeLists.windows-x86_64.txt @@ -0,0 +1,48 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(yql-public-embedded) +target_compile_options(yql-public-embedded PRIVATE + -DUSE_CURRENT_UDF_ABI_VERSION +) +target_link_libraries(yql-public-embedded PUBLIC + contrib-libs-cxxsupp + yutil + contrib-libs-protobuf + library-cpp-resource + library-cpp-yson + cpp-yson-node + cpp-mapreduce-client + cpp-mapreduce-common + library-yql-ast + yql-sql-pg + yql-core-facade + yql-core-file_storage + core-file_storage-defs + core-file_storage-proto + core-file_storage-http_download + core-services-mounts + yql-core-user_data + library-yql-minikql + library-yql-protos + udf-service-exception_policy + yql-utils-backtrace + yql-utils-log + yql-parser-pg_wrapper + providers-common-proto + providers-common-udf_resolve + yql-core-url_preprocessing + yt-gateway-native + yt-lib-log + yt-lib-yt_download + providers-yt-provider +) +target_sources(yql-public-embedded PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/library/yql/public/embedded/yql_embedded.cpp +) diff --git a/ydb/library/yql/public/embedded/ya.make b/ydb/library/yql/public/embedded/ya.make new file mode 100644 index 0000000000..b740e1f1dc --- /dev/null +++ b/ydb/library/yql/public/embedded/ya.make @@ -0,0 +1,41 @@ +LIBRARY() + +SRCS( + yql_embedded.cpp + yql_embedded.h +) + +PEERDIR( + contrib/libs/protobuf + library/cpp/resource + library/cpp/yson + library/cpp/yson/node + yt/cpp/mapreduce/client + yt/cpp/mapreduce/common + ydb/library/yql/ast + ydb/library/yql/sql/pg + ydb/library/yql/core/facade + ydb/library/yql/core/file_storage + ydb/library/yql/core/file_storage/defs + ydb/library/yql/core/file_storage/proto + ydb/library/yql/core/file_storage/http_download + ydb/library/yql/core/services/mounts + ydb/library/yql/core/user_data + ydb/library/yql/minikql + ydb/library/yql/protos + ydb/library/yql/public/udf/service/exception_policy + ydb/library/yql/utils/backtrace + ydb/library/yql/utils/log + ydb/library/yql/parser/pg_wrapper + ydb/library/yql/providers/common/proto + ydb/library/yql/providers/common/udf_resolve + ydb/library/yql/core/url_preprocessing + ydb/library/yql/providers/yt/gateway/native + ydb/library/yql/providers/yt/lib/log + ydb/library/yql/providers/yt/lib/yt_download + ydb/library/yql/providers/yt/provider +) + +YQL_LAST_ABI_VERSION() + +END() diff --git a/ydb/library/yql/public/embedded/yql_embedded.cpp b/ydb/library/yql/public/embedded/yql_embedded.cpp new file mode 100644 index 0000000000..102f302269 --- /dev/null +++ b/ydb/library/yql/public/embedded/yql_embedded.cpp @@ -0,0 +1,556 @@ +#include "yql_embedded.h" + +#include <ydb/library/yql/providers/yt/lib/log/yt_logger.h> +#include <ydb/library/yql/providers/yt/lib/yt_download/yt_download.h> +#include <ydb/library/yql/providers/yt/gateway/native/yql_yt_native.h> +#include <ydb/library/yql/providers/yt/provider/yql_yt_provider.h> + +#include <ydb/library/yql/core/url_preprocessing/url_preprocessing.h> + +#include <ydb/library/yql/providers/common/udf_resolve/yql_outproc_udf_resolver.h> +#include <ydb/library/yql/providers/common/udf_resolve/yql_simple_udf_resolver.h> +#include <ydb/library/yql/providers/common/proto/gateways_config.pb.h> +#include <ydb/library/yql/providers/common/provider/yql_provider_names.h> + +#include <ydb/library/yql/ast/yql_expr.h> +#include <ydb/library/yql/minikql/mkql_function_registry.h> +#include <ydb/library/yql/minikql/invoke_builtins/mkql_builtins.h> +#include <ydb/library/yql/core/facade/yql_facade.h> +#include <ydb/library/yql/core/file_storage/file_storage.h> +#include <ydb/library/yql/core/file_storage/proto/file_storage.pb.h> +#include <ydb/library/yql/core/file_storage/http_download/http_download.h> +#include <ydb/library/yql/core/services/mounts/yql_mounts.h> +#include <ydb/library/yql/utils/log/log.h> +#include <ydb/library/yql/utils/backtrace/backtrace.h> + +#include <yt/cpp/mapreduce/interface/config.h> + +#include <library/cpp/yson/parser.h> +#include <library/cpp/json/json_writer.h> + +#include <library/cpp/resource/resource.h> +#include <google/protobuf/text_format.h> +#include <library/cpp/digest/md5/md5.h> + +#include <util/folder/dirut.h> +#include <util/folder/path.h> +#include <util/stream/file.h> +#include <util/string/builder.h> +#include <util/string/subst.h> +#include <util/system/fs.h> +#include <util/system/user.h> +#include <util/system/env.h> + +namespace NYql { + namespace NEmbedded { + namespace { + void ThrowNotSupported() { + ythrow yexception() << "Yson element is not supported"; + } + + class TJsonConsumer : public NYson::TYsonConsumerBase { + public: + TJsonConsumer(NJson::TJsonWriter& writer) + : Writer(writer) + {} + + void OnStringScalar(TStringBuf value) override { + Writer.Write(value); + } + + void OnInt64Scalar(i64 value) override { + Y_UNUSED(value); + ThrowNotSupported(); + } + + void OnUint64Scalar(ui64 value) override { + Y_UNUSED(value); + ThrowNotSupported(); + } + + void OnDoubleScalar(double value) override { + Y_UNUSED(value); + ThrowNotSupported(); + } + + void OnBooleanScalar(bool value) override { + Writer.Write(value); + } + + void OnEntity() override { + Writer.WriteNull(); + } + + void OnBeginList() override { + Writer.OpenArray(); + } + + void OnListItem() override { + } + + void OnEndList() override { + Writer.CloseArray(); + } + + void OnBeginMap() override { + Writer.OpenMap(); + } + + void OnKeyedItem(TStringBuf key) override { + Writer.WriteKey(key); + } + + void OnEndMap() override { + Writer.CloseMap(); + } + + void OnBeginAttributes() override { + ThrowNotSupported(); + } + + void OnEndAttributes() override { + ThrowNotSupported(); + } + + + private: + NJson::TJsonWriter& Writer; + }; + + TString Yson2Json(const TString& yson) { + TString jsonString; + + TStringOutput jsonStream{ jsonString }; + NJson::TJsonWriter jsonWriter{ &jsonStream, false }; + TJsonConsumer jsonConsumer{ jsonWriter }; + NYson::ParseYsonStringBuffer(yson, &jsonConsumer); + jsonWriter.Flush(); + return jsonString; + } + } + + class TOperation: public IOperation { + public: + TOperation(const TString& result, const TString& plan, const TString& statistics, const TString& taskInfo) + : Result_(result) + , Plan_(plan) + , Statistics_(statistics) + , TaskInfo_(taskInfo) + { + } + + const TString& YsonResult() const override { + return Result_; + } + + const TString& Plan() const override { + return Plan_; + } + + const TString& Statistics() const override { + return Statistics_; + } + + const TString& TaskInfo() const override { + return TaskInfo_; + } + + private: + const TString Result_; + const TString Plan_; + const TString Statistics_; + const TString TaskInfo_; + }; + + class TOperationFactory: public IOperationFactory { + public: + TOperationFactory(const TOperationFactoryOptions& options, + const TString& configData, + std::function<NFS::IDownloaderPtr(const TFileStorageConfig&)> arcDownloaderFactory) + : Logger(&Cerr) + , Options_(options) + { + auto& logger = NLog::YqlLogger(); + logger.SetDefaultPriority(Options_.LogLevel_); + for (int i = 0; i < NLog::EComponentHelpers::ToInt(NLog::EComponent::MaxValue); ++i) { + logger.SetComponentLevel((NLog::EComponent)i, (NLog::ELevel)Options_.LogLevel_); + } + + NYql::SetYtLoggerGlobalBackend(Options_.YtLogLevel_); + if (NYT::TConfig::Get()->Prefix.empty()) { + NYT::TConfig::Get()->Prefix = "//"; + } + + if (GetEnv("YT_FORCE_IPV6").empty()) { + NYT::TConfig::Get()->ForceIpV6 = true; + } + + const bool useStaticLinking = Options_.MrJobBinary_.empty(); + if (Options_.MrJobBinary_) { + EnsureBinary(Options_.MrJobBinary_, "MrJobBinary"); + } + + if (!::google::protobuf::TextFormat::ParseFromString(configData, &GatewaysConfig_)) { + ythrow yexception() << "Bad format of gateways configuration"; + } + auto yqlCoreFlags = GatewaysConfig_.GetYqlCore().GetFlags(); + GatewaysConfig_.MutableYqlCore()->ClearFlags(); + for (auto flag : yqlCoreFlags) { + if (flag.GetName() != "GeobaseDownloadUrl") { + *GatewaysConfig_.MutableYqlCore()->AddFlags() = flag; + } + } + + auto ytConfig = GatewaysConfig_.MutableYt(); + if (!ytConfig->HasExecuteUdfLocallyIfPossible()) { + ytConfig->SetExecuteUdfLocallyIfPossible(true); + } + ytConfig->SetYtLogLevel(static_cast<NYql::EYtLogLevel>(Options_.YtLogLevel_)); + if (useStaticLinking) { + ytConfig->ClearMrJobBin(); + } else { + ytConfig->SetMrJobBin(Options_.MrJobBinary_); + ytConfig->SetMrJobBinMd5(MD5::File(Options_.MrJobBinary_)); + } + + ytConfig->ClearMrJobUdfsDir(); + if (Options_.LocalChainTest_) { + ytConfig->SetLocalChainTest(true); + ytConfig->SetLocalChainFile(Options_.LocalChainFile_); + } + + for (const auto& cluster : Options_.YtClusters_) { + auto clusterMapping = ytConfig->AddClusterMapping(); + clusterMapping->SetName(cluster.Name_); + clusterMapping->SetCluster(cluster.Cluster_); + } + for (size_t index = 0; index < ytConfig->ClusterMappingSize(); ++index) { + auto cluster = ytConfig->MutableClusterMapping(index); + auto settings = cluster->MutableSettings(); + bool hasOwners = false; + for (int settingsIndex = 0; settingsIndex < settings->size(); ++settingsIndex) { + auto attr = settings->Mutable(settingsIndex); + if (attr->GetName() == "Owners") { + hasOwners = true; + if (!Options_.YtOwners_.empty()) { + attr->SetValue(Options_.YtOwners_); + } + } + } + + if (!hasOwners && !Options_.YtOwners_.empty()) { + auto newSetting = settings->Add(); + newSetting->SetName("Owners"); + newSetting->SetValue(Options_.YtOwners_); + } + + Clusters_.insert({cluster->GetName(), TString(YtProviderName)}); + } + + TFileStorageConfig fileStorageConfig; + fileStorageConfig.SetMaxSizeMb(1 << 14); + + std::vector<NFS::IDownloaderPtr> downloaders; + downloaders.push_back(MakeYtDownloader(fileStorageConfig)); + auto arcDownloader = arcDownloaderFactory(fileStorageConfig); + if (arcDownloader) { + downloaders.push_back(arcDownloader); + } + + FileStorage_ = WithAsync(CreateFileStorage(fileStorageConfig, downloaders)); + + NResource::TResources libs; + const TStringBuf prefix = "resfs/file/yql_libs/"; + NResource::FindMatch(prefix, &libs); + for (auto x : libs) { + auto libName = x.Key; + libName.SkipPrefix(prefix); + NUserData::TUserData d{ NUserData::EType::LIBRARY, + NUserData::EDisposition::RESOURCE_FILE, + TString("yql_libs/") + libName, + TString(x.Key) }; + Options_.UserData_.push_back(d); + } + + NUserData::TUserData::UserDataToLibraries(Options_.UserData_, Modules_); + + FuncRegistry_ = NKikimr::NMiniKQL::CreateFunctionRegistry(NKikimr::NMiniKQL::CreateBuiltinRegistry())->Clone(); + + const NKikimr::NMiniKQL::TUdfModuleRemappings emptyRemappings; + if (!useStaticLinking && !Options_.UdfsDir_.empty()) { + FuncRegistry_->SetBackTraceCallback(&NYql::NBacktrace::KikimrBackTrace); + + NKikimr::NMiniKQL::TUdfModulePathsMap systemModules; + if (!Options_.UdfResolverBinary_.empty()) { + NCommon::LoadSystemModulePaths( + Options_.UdfResolverBinary_, + Options_.UdfsDir_, + &systemModules); + + if (Options_.PreloadUdfs_) { + for (const auto& p : systemModules) { + FuncRegistry_->LoadUdfs(p.second, emptyRemappings, 0); + } + } + } else { + TVector<TString> udfPaths; + NKikimr::NMiniKQL::FindUdfsInDir(Options_.UdfsDir_, &udfPaths); + for (const auto& path : udfPaths) { + Cerr << path << "\n"; + FuncRegistry_->LoadUdfs(path, emptyRemappings, 0); + } + + for (auto& m : FuncRegistry_->GetAllModuleNames()) { + TMaybe<TString> path = FuncRegistry_->FindUdfPath(m); + if (!path) { + // should not happen + ythrow yexception() << "Unable to detect UDF path for module " << m; + } + systemModules.emplace(m, *path); + } + } + + FuncRegistry_->SetSystemModulePaths(systemModules); + } + + if (useStaticLinking) { + NKikimr::NMiniKQL::FillStaticModules(*FuncRegistry_); + } + + TUserDataTable userDataTable = GetYqlModuleResolver(ExprContext_, ModuleResolver_, Options_.UserData_, Clusters_, {}); + + if (!userDataTable) { + TStringStream err; + ExprContext_.IssueManager.GetIssues().PrintTo(err); + ythrow yexception() << "Failed to compile modules:\n" + << err.Str(); + } + + TVector<TDataProviderInitializer> dataProvidersInit; + + TYtNativeServices ytServices; + ytServices.FunctionRegistry = FuncRegistry_.Get(); + ytServices.FileStorage = FileStorage_; + ytServices.Config = std::make_shared<TYtGatewayConfig>(*ytConfig); + auto ytNativeGateway = CreateYtNativeGateway(ytServices); + dataProvidersInit.push_back(GetYtNativeDataProviderInitializer(ytNativeGateway)); + + ProgramFactory_ = MakeHolder<TProgramFactory>( + false, FuncRegistry_.Get(), ExprContext_.NextUniqueId, dataProvidersInit, "embedded"); + auto credentials = MakeIntrusive<TCredentials>(); + if (!Options_.YtToken_.empty()) { + credentials->AddCredential("default_yt", TCredential("yt", "", Options_.YtToken_)); + } + if (!Options_.StatToken_.empty()) { + credentials->AddCredential("default_statface", TCredential("statface", "", Options_.StatToken_)); + } + for (const auto& [name, value] : Options_.CustomTokens_) { + credentials->AddCredential(name, TCredential("custom", "", value)); + } + + ProgramFactory_->AddUserDataTable(userDataTable); + ProgramFactory_->SetCredentials(credentials); + ProgramFactory_->SetModules(ModuleResolver_); + ProgramFactory_->SetUdfResolver((useStaticLinking || Options_.UdfResolverBinary_.empty()) ? NCommon::CreateSimpleUdfResolver(FuncRegistry_.Get(), FileStorage_) : + NCommon::CreateOutProcUdfResolver(FuncRegistry_.Get(), FileStorage_, Options_.UdfResolverBinary_, {}, {}, false, {})); + ProgramFactory_->SetGatewaysConfig(&GatewaysConfig_); + ProgramFactory_->SetFileStorage(FileStorage_); + ProgramFactory_->SetUrlPreprocessing(MakeIntrusive<TUrlPreprocessing>(GatewaysConfig_)); + } + + THolder<IOperation> Run(const TString& queryText, const TOperationOptions& options) const override { + TProgramPtr program = ProgramFactory_->Create("-memory-", queryText); + if (options.Title) { + program->SetOperationTitle(*options.Title); + } + + if (options.Attributes) { + program->SetOperationAttrsYson(*options.Attributes); + } + + if (options.Parameters) { + program->SetParametersYson(*options.Parameters); + } + + NSQLTranslation::TTranslationSettings sqlSettings; + sqlSettings.ClusterMapping = Clusters_; + sqlSettings.ModuleMapping = Modules_; + sqlSettings.SyntaxVersion = options.SyntaxVersion; + sqlSettings.V0Behavior = NSQLTranslation::EV0Behavior::Disable; + for (const auto& item : Options_.UserData_) { + if (item.Type_ == NUserData::EType::LIBRARY) { + sqlSettings.Libraries.emplace(item.Disposition_ == NUserData::EDisposition::RESOURCE_FILE ? + item.Name_ : item.Content_); + } + } + + if (!program->ParseSql(sqlSettings)) { + TStringStream err; + program->PrintErrorsTo(err); + ythrow yexception() << "Failed to parse SQL: " << err.Str(); + } + + if (!program->Compile(GetUsername())) { + TStringStream err; + program->PrintErrorsTo(err); + ythrow yexception() << "Failed to compile: " << err.Str(); + } + + TProgram::TStatus status = TProgram::TStatus::Error; + switch (options.Mode) { + case EExecuteMode::Run: + status = program->Run(GetUsername(), nullptr, nullptr, nullptr); + break; + case EExecuteMode::Optimize: + status = program->Optimize(GetUsername(), nullptr, nullptr, nullptr); + break; + case EExecuteMode::Validate: + status = program->Validate(GetUsername(), nullptr); + break; + case EExecuteMode::Lineage: + status = program->Lineage(GetUsername(), nullptr, nullptr); + break; + } + + if (status == TProgram::TStatus::Error) { + TStringStream err; + program->PrintErrorsTo(err); + ythrow yexception() << "Failed to run: " << err.Str(); + } + + TStringStream result; + if (options.Mode == EExecuteMode::Lineage) { + if (auto data = program->GetLineage()) { + TStringInput in(*data); + NYson::ReformatYsonStream(&in, &result, Options_.ResultFormat_); + } + } else if (program->HasResults()) { + NYson::TYsonWriter yson(&result, Options_.ResultFormat_); + yson.OnBeginList(); + for (const auto& result : program->Results()) { + yson.OnListItem(); + yson.OnRaw(result); + } + yson.OnEndList(); + } + + auto plan = program->GetQueryPlan(TPlanSettings().SetLimitInputPins(std::nullopt).SetLimitOutputPins(std::nullopt)).GetOrElse(""); + auto taskInfo = program->GetTasksInfo().GetOrElse(""); + + auto statistics = program->GetStatistics().GetOrElse(""); + if (statistics) { + TStringStream strInput(statistics); + TStringStream strFormatted; + NYson::ReformatYsonStream(&strInput, &strFormatted, NYson::EYsonFormat::Pretty); + statistics = strFormatted.Str(); + } + + if (taskInfo) { + TStringStream strInput(taskInfo); + TStringStream strFormatted; + NYson::ReformatYsonStream(&strInput, &strFormatted, NYson::EYsonFormat::Pretty); + taskInfo = strFormatted.Str(); + } + + return MakeHolder<TOperation>(result.Str(), plan, statistics, taskInfo); + } + + void Save(const TString& queryText, const TOperationOptions& options, const TString& destinationFolder) const override { + using namespace NUserData; + TString finalQueryText; + + TStringBuilder cmdLine; + cmdLine << "#!/usr/bin/env bash\nset -eux\nya yql -i main.sql"; + cmdLine << " --syntax-version=" << options.SyntaxVersion; + if (options.Title) { + cmdLine << " --title=" << options.Title.Get()->Quote(); + } + + if (options.Parameters) { + TFileOutput paramFile(TFsPath(destinationFolder) / "params.json"); + paramFile.Write(Yson2Json(*options.Parameters)); + cmdLine << " --parameters-file=params.json"; + } + + ui32 fileIndex = 0; + for (const auto& item : Options_.UserData_) { + switch (item.Disposition_) { + case EDisposition::INLINE: { + auto path = "files" + ToString(++fileIndex); + TFileOutput dataFile(TFsPath(destinationFolder) / path); + dataFile.Write(item.Content_); + cmdLine << " -F " << item.Name_.Quote() << "@" << path; + break; + } + case EDisposition::RESOURCE: + case EDisposition::RESOURCE_FILE: { + TString skipSlash(TStringBuf(item.Content_).After('/')); + if (item.Type_ == EType::LIBRARY) { + finalQueryText += "pragma library(" + skipSlash.Quote() + ");\n"; + } + + auto path = "files" + ToString(++fileIndex); + TFileOutput dataFile(TFsPath(destinationFolder) / path); + auto resContent = NResource::Find(item.Content_); + dataFile.Write(resContent); + cmdLine << " -F " << (item.Type_ == EType::LIBRARY ? skipSlash : item.Name_).Quote() << "@" << path; + break; + } + case EDisposition::FILESYSTEM: { + cmdLine << " -F " << item.Name_.Quote() << "@" << RealPath(item.Content_).Quote(); + break; + } + case EDisposition::URL: { + cmdLine << " -U " << item.Name_.Quote() << "@" << item.Content_.Quote(); + break; + } + } + } + + + auto patchedQueryText = queryText; + SubstGlobal(patchedQueryText, "import .", "import "); + + finalQueryText += patchedQueryText; + TFileOutput sqlFile(TFsPath(destinationFolder) / "main.sql"); + sqlFile.Write(finalQueryText); + + TFileOutput runFile(TFsPath(destinationFolder) / "run.sh"); + runFile.Write(cmdLine); + runFile.Finish(); + Chmod((TFsPath(destinationFolder) / "run.sh").c_str(), MODE0755); + } + + private: + void EnsureBinary(const TString& path, const TString& name) { + if (path.empty()) { + ythrow yexception() << "Parameter: " << name << " must not be empty"; + } + + if (!NFs::Exists(path)) { + ythrow yexception() << "Binary for parameter: " << name << " is not found at path: " << path; + } + } + + private: + NLog::YqlLoggerScope Logger; + TOperationFactoryOptions Options_; + TFileStoragePtr FileStorage_; + TExprContext ExprContext_; + TIntrusivePtr<NKikimr::NMiniKQL::IMutableFunctionRegistry> FuncRegistry_; + IModuleResolver::TPtr ModuleResolver_; + TGatewaysConfig GatewaysConfig_; + THolder<TProgramFactory> ProgramFactory_; + THashMap<TString, TString> Clusters_; + THashMap<TString, TString> Modules_; + THashSet<TString> Libraries_; + }; + + THolder<IOperationFactory> MakeOperationFactory( + const TOperationFactoryOptions& options, + const TString& configData, + std::function<NFS::IDownloaderPtr(const TFileStorageConfig&)> arcDownloaderFactory) { + return MakeHolder<TOperationFactory>(options, configData, arcDownloaderFactory); + } + } +} diff --git a/ydb/library/yql/public/embedded/yql_embedded.h b/ydb/library/yql/public/embedded/yql_embedded.h new file mode 100644 index 0000000000..ff56c0c00d --- /dev/null +++ b/ydb/library/yql/public/embedded/yql_embedded.h @@ -0,0 +1,83 @@ +#pragma once + +#include <ydb/library/yql/core/user_data/yql_user_data.h> +#include <ydb/library/yql/core/file_storage/defs/downloader.h> + +#include <yt/cpp/mapreduce/interface/logging/logger.h> + +#include <library/cpp/logger/priority.h> +#include <library/cpp/yson/public.h> + +#include <util/generic/hash.h> +#include <util/generic/maybe.h> +#include <util/generic/ptr.h> +#include <util/generic/string.h> +#include <util/generic/vector.h> + +namespace NYql { + class TFileStorageConfig; + + namespace NEmbedded { + class IOperation { + public: + virtual ~IOperation() = default; + virtual const TString& YsonResult() const = 0; + virtual const TString& Plan() const = 0; + virtual const TString& Statistics() const = 0; + virtual const TString& TaskInfo() const = 0; + }; + + enum class EExecuteMode { + Validate, + Optimize, + Run, + Lineage + }; + + struct TOperationOptions { + TMaybe<TString> Title; // title for YT transactions and operations, should contain 'YQL' somewhere + ui16 SyntaxVersion = 1; + TMaybe<TString> Attributes; // yson map with additional attributes + TMaybe<TString> Parameters; // in yson format + EExecuteMode Mode = EExecuteMode::Run; + }; + + // must be allocated only once per process + class IOperationFactory { + public: + virtual ~IOperationFactory() = default; + virtual THolder<IOperation> Run(const TString& queryText, const TOperationOptions& options) const = 0; + virtual void Save(const TString& queryText, const TOperationOptions& options, const TString& destinationFolder) const = 0; + }; + + struct TYtClusterOptions { + TString Name_; + TString Cluster_; + }; + + struct TOperationFactoryOptions { + TString MrJobBinary_; // assume static linking (including UDFs) if empty + TString UdfResolverBinary_; + TString UdfsDir_; + bool PreloadUdfs_ = false; // used when UdfResolverBinary_ is specified, if UdfResolverBinary_ is empty it is considered equal to true + TVector<NUserData::TUserData> UserData_; + + ELogPriority LogLevel_ = TLOG_ERR; + NYT::ILogger::ELevel YtLogLevel_ = NYT::ILogger::ERROR; + NYson::EYsonFormat ResultFormat_ = NYson::EYsonFormat::Pretty; + + TVector<TYtClusterOptions> YtClusters_; + TString YtToken_; + TString YtOwners_; + TString StatToken_; + bool LocalChainTest_ = false; + TString LocalChainFile_; + THashMap<TString, TString> CustomTokens_; + }; + + THolder<IOperationFactory> MakeOperationFactory( + const TOperationFactoryOptions& options, + const TString& configData, + std::function<NFS::IDownloaderPtr(const TFileStorageConfig&)> arcDownloaderFactory); + } +} diff --git a/ydb/library/yql/public/ya.make b/ydb/library/yql/public/ya.make index 9fbf7e6a27..bdbac7a820 100644 --- a/ydb/library/yql/public/ya.make +++ b/ydb/library/yql/public/ya.make @@ -1,5 +1,6 @@ RECURSE( decimal + embedded fastcheck issue purecalc |