aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorvvvv <vvvv@ydb.tech>2023-08-01 17:56:49 +0300
committervvvv <vvvv@ydb.tech>2023-08-01 17:56:49 +0300
commitbbd8e0e8a955abfb33f933a5653d97d3eef57285 (patch)
tree9f033a6ed845793a60cfcd91fea5e2a76aa3fce9
parent20efd2f292b02a79125f61b2b5790a89625d674a (diff)
downloadydb-bbd8e0e8a955abfb33f933a5653d97d3eef57285.tar.gz
Move embedded to public
-rw-r--r--ydb/library/yql/public/CMakeLists.txt1
-rw-r--r--ydb/library/yql/public/embedded/CMakeLists.darwin-x86_64.txt48
-rw-r--r--ydb/library/yql/public/embedded/CMakeLists.linux-aarch64.txt49
-rw-r--r--ydb/library/yql/public/embedded/CMakeLists.linux-x86_64.txt49
-rw-r--r--ydb/library/yql/public/embedded/CMakeLists.txt17
-rw-r--r--ydb/library/yql/public/embedded/CMakeLists.windows-x86_64.txt48
-rw-r--r--ydb/library/yql/public/embedded/ya.make41
-rw-r--r--ydb/library/yql/public/embedded/yql_embedded.cpp556
-rw-r--r--ydb/library/yql/public/embedded/yql_embedded.h83
-rw-r--r--ydb/library/yql/public/ya.make1
10 files changed, 893 insertions, 0 deletions
diff --git a/ydb/library/yql/public/CMakeLists.txt b/ydb/library/yql/public/CMakeLists.txt
index 812b435b79..25984d54c8 100644
--- a/ydb/library/yql/public/CMakeLists.txt
+++ b/ydb/library/yql/public/CMakeLists.txt
@@ -7,6 +7,7 @@
add_subdirectory(decimal)
+add_subdirectory(embedded)
add_subdirectory(fastcheck)
add_subdirectory(issue)
add_subdirectory(purecalc)
diff --git a/ydb/library/yql/public/embedded/CMakeLists.darwin-x86_64.txt b/ydb/library/yql/public/embedded/CMakeLists.darwin-x86_64.txt
new file mode 100644
index 0000000000..a50913ea6b
--- /dev/null
+++ b/ydb/library/yql/public/embedded/CMakeLists.darwin-x86_64.txt
@@ -0,0 +1,48 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(yql-public-embedded)
+target_compile_options(yql-public-embedded PRIVATE
+ -DUSE_CURRENT_UDF_ABI_VERSION
+)
+target_link_libraries(yql-public-embedded PUBLIC
+ contrib-libs-cxxsupp
+ yutil
+ contrib-libs-protobuf
+ library-cpp-resource
+ library-cpp-yson
+ cpp-yson-node
+ cpp-mapreduce-client
+ cpp-mapreduce-common
+ library-yql-ast
+ yql-sql-pg
+ yql-core-facade
+ yql-core-file_storage
+ core-file_storage-defs
+ core-file_storage-proto
+ core-file_storage-http_download
+ core-services-mounts
+ yql-core-user_data
+ library-yql-minikql
+ library-yql-protos
+ udf-service-exception_policy
+ yql-utils-backtrace
+ yql-utils-log
+ yql-parser-pg_wrapper
+ providers-common-proto
+ providers-common-udf_resolve
+ yql-core-url_preprocessing
+ yt-gateway-native
+ yt-lib-log
+ yt-lib-yt_download
+ providers-yt-provider
+)
+target_sources(yql-public-embedded PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/public/embedded/yql_embedded.cpp
+)
diff --git a/ydb/library/yql/public/embedded/CMakeLists.linux-aarch64.txt b/ydb/library/yql/public/embedded/CMakeLists.linux-aarch64.txt
new file mode 100644
index 0000000000..fb20043f75
--- /dev/null
+++ b/ydb/library/yql/public/embedded/CMakeLists.linux-aarch64.txt
@@ -0,0 +1,49 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(yql-public-embedded)
+target_compile_options(yql-public-embedded PRIVATE
+ -DUSE_CURRENT_UDF_ABI_VERSION
+)
+target_link_libraries(yql-public-embedded PUBLIC
+ contrib-libs-linux-headers
+ contrib-libs-cxxsupp
+ yutil
+ contrib-libs-protobuf
+ library-cpp-resource
+ library-cpp-yson
+ cpp-yson-node
+ cpp-mapreduce-client
+ cpp-mapreduce-common
+ library-yql-ast
+ yql-sql-pg
+ yql-core-facade
+ yql-core-file_storage
+ core-file_storage-defs
+ core-file_storage-proto
+ core-file_storage-http_download
+ core-services-mounts
+ yql-core-user_data
+ library-yql-minikql
+ library-yql-protos
+ udf-service-exception_policy
+ yql-utils-backtrace
+ yql-utils-log
+ yql-parser-pg_wrapper
+ providers-common-proto
+ providers-common-udf_resolve
+ yql-core-url_preprocessing
+ yt-gateway-native
+ yt-lib-log
+ yt-lib-yt_download
+ providers-yt-provider
+)
+target_sources(yql-public-embedded PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/public/embedded/yql_embedded.cpp
+)
diff --git a/ydb/library/yql/public/embedded/CMakeLists.linux-x86_64.txt b/ydb/library/yql/public/embedded/CMakeLists.linux-x86_64.txt
new file mode 100644
index 0000000000..fb20043f75
--- /dev/null
+++ b/ydb/library/yql/public/embedded/CMakeLists.linux-x86_64.txt
@@ -0,0 +1,49 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(yql-public-embedded)
+target_compile_options(yql-public-embedded PRIVATE
+ -DUSE_CURRENT_UDF_ABI_VERSION
+)
+target_link_libraries(yql-public-embedded PUBLIC
+ contrib-libs-linux-headers
+ contrib-libs-cxxsupp
+ yutil
+ contrib-libs-protobuf
+ library-cpp-resource
+ library-cpp-yson
+ cpp-yson-node
+ cpp-mapreduce-client
+ cpp-mapreduce-common
+ library-yql-ast
+ yql-sql-pg
+ yql-core-facade
+ yql-core-file_storage
+ core-file_storage-defs
+ core-file_storage-proto
+ core-file_storage-http_download
+ core-services-mounts
+ yql-core-user_data
+ library-yql-minikql
+ library-yql-protos
+ udf-service-exception_policy
+ yql-utils-backtrace
+ yql-utils-log
+ yql-parser-pg_wrapper
+ providers-common-proto
+ providers-common-udf_resolve
+ yql-core-url_preprocessing
+ yt-gateway-native
+ yt-lib-log
+ yt-lib-yt_download
+ providers-yt-provider
+)
+target_sources(yql-public-embedded PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/public/embedded/yql_embedded.cpp
+)
diff --git a/ydb/library/yql/public/embedded/CMakeLists.txt b/ydb/library/yql/public/embedded/CMakeLists.txt
new file mode 100644
index 0000000000..f8b31df0c1
--- /dev/null
+++ b/ydb/library/yql/public/embedded/CMakeLists.txt
@@ -0,0 +1,17 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+if (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND NOT HAVE_CUDA)
+ include(CMakeLists.linux-aarch64.txt)
+elseif (CMAKE_SYSTEM_NAME STREQUAL "Darwin" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64")
+ include(CMakeLists.darwin-x86_64.txt)
+elseif (WIN32 AND CMAKE_SYSTEM_PROCESSOR STREQUAL "AMD64" AND NOT HAVE_CUDA)
+ include(CMakeLists.windows-x86_64.txt)
+elseif (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND NOT HAVE_CUDA)
+ include(CMakeLists.linux-x86_64.txt)
+endif()
diff --git a/ydb/library/yql/public/embedded/CMakeLists.windows-x86_64.txt b/ydb/library/yql/public/embedded/CMakeLists.windows-x86_64.txt
new file mode 100644
index 0000000000..a50913ea6b
--- /dev/null
+++ b/ydb/library/yql/public/embedded/CMakeLists.windows-x86_64.txt
@@ -0,0 +1,48 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(yql-public-embedded)
+target_compile_options(yql-public-embedded PRIVATE
+ -DUSE_CURRENT_UDF_ABI_VERSION
+)
+target_link_libraries(yql-public-embedded PUBLIC
+ contrib-libs-cxxsupp
+ yutil
+ contrib-libs-protobuf
+ library-cpp-resource
+ library-cpp-yson
+ cpp-yson-node
+ cpp-mapreduce-client
+ cpp-mapreduce-common
+ library-yql-ast
+ yql-sql-pg
+ yql-core-facade
+ yql-core-file_storage
+ core-file_storage-defs
+ core-file_storage-proto
+ core-file_storage-http_download
+ core-services-mounts
+ yql-core-user_data
+ library-yql-minikql
+ library-yql-protos
+ udf-service-exception_policy
+ yql-utils-backtrace
+ yql-utils-log
+ yql-parser-pg_wrapper
+ providers-common-proto
+ providers-common-udf_resolve
+ yql-core-url_preprocessing
+ yt-gateway-native
+ yt-lib-log
+ yt-lib-yt_download
+ providers-yt-provider
+)
+target_sources(yql-public-embedded PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/public/embedded/yql_embedded.cpp
+)
diff --git a/ydb/library/yql/public/embedded/ya.make b/ydb/library/yql/public/embedded/ya.make
new file mode 100644
index 0000000000..b740e1f1dc
--- /dev/null
+++ b/ydb/library/yql/public/embedded/ya.make
@@ -0,0 +1,41 @@
+LIBRARY()
+
+SRCS(
+ yql_embedded.cpp
+ yql_embedded.h
+)
+
+PEERDIR(
+ contrib/libs/protobuf
+ library/cpp/resource
+ library/cpp/yson
+ library/cpp/yson/node
+ yt/cpp/mapreduce/client
+ yt/cpp/mapreduce/common
+ ydb/library/yql/ast
+ ydb/library/yql/sql/pg
+ ydb/library/yql/core/facade
+ ydb/library/yql/core/file_storage
+ ydb/library/yql/core/file_storage/defs
+ ydb/library/yql/core/file_storage/proto
+ ydb/library/yql/core/file_storage/http_download
+ ydb/library/yql/core/services/mounts
+ ydb/library/yql/core/user_data
+ ydb/library/yql/minikql
+ ydb/library/yql/protos
+ ydb/library/yql/public/udf/service/exception_policy
+ ydb/library/yql/utils/backtrace
+ ydb/library/yql/utils/log
+ ydb/library/yql/parser/pg_wrapper
+ ydb/library/yql/providers/common/proto
+ ydb/library/yql/providers/common/udf_resolve
+ ydb/library/yql/core/url_preprocessing
+ ydb/library/yql/providers/yt/gateway/native
+ ydb/library/yql/providers/yt/lib/log
+ ydb/library/yql/providers/yt/lib/yt_download
+ ydb/library/yql/providers/yt/provider
+)
+
+YQL_LAST_ABI_VERSION()
+
+END()
diff --git a/ydb/library/yql/public/embedded/yql_embedded.cpp b/ydb/library/yql/public/embedded/yql_embedded.cpp
new file mode 100644
index 0000000000..102f302269
--- /dev/null
+++ b/ydb/library/yql/public/embedded/yql_embedded.cpp
@@ -0,0 +1,556 @@
+#include "yql_embedded.h"
+
+#include <ydb/library/yql/providers/yt/lib/log/yt_logger.h>
+#include <ydb/library/yql/providers/yt/lib/yt_download/yt_download.h>
+#include <ydb/library/yql/providers/yt/gateway/native/yql_yt_native.h>
+#include <ydb/library/yql/providers/yt/provider/yql_yt_provider.h>
+
+#include <ydb/library/yql/core/url_preprocessing/url_preprocessing.h>
+
+#include <ydb/library/yql/providers/common/udf_resolve/yql_outproc_udf_resolver.h>
+#include <ydb/library/yql/providers/common/udf_resolve/yql_simple_udf_resolver.h>
+#include <ydb/library/yql/providers/common/proto/gateways_config.pb.h>
+#include <ydb/library/yql/providers/common/provider/yql_provider_names.h>
+
+#include <ydb/library/yql/ast/yql_expr.h>
+#include <ydb/library/yql/minikql/mkql_function_registry.h>
+#include <ydb/library/yql/minikql/invoke_builtins/mkql_builtins.h>
+#include <ydb/library/yql/core/facade/yql_facade.h>
+#include <ydb/library/yql/core/file_storage/file_storage.h>
+#include <ydb/library/yql/core/file_storage/proto/file_storage.pb.h>
+#include <ydb/library/yql/core/file_storage/http_download/http_download.h>
+#include <ydb/library/yql/core/services/mounts/yql_mounts.h>
+#include <ydb/library/yql/utils/log/log.h>
+#include <ydb/library/yql/utils/backtrace/backtrace.h>
+
+#include <yt/cpp/mapreduce/interface/config.h>
+
+#include <library/cpp/yson/parser.h>
+#include <library/cpp/json/json_writer.h>
+
+#include <library/cpp/resource/resource.h>
+#include <google/protobuf/text_format.h>
+#include <library/cpp/digest/md5/md5.h>
+
+#include <util/folder/dirut.h>
+#include <util/folder/path.h>
+#include <util/stream/file.h>
+#include <util/string/builder.h>
+#include <util/string/subst.h>
+#include <util/system/fs.h>
+#include <util/system/user.h>
+#include <util/system/env.h>
+
+namespace NYql {
+ namespace NEmbedded {
+ namespace {
+ void ThrowNotSupported() {
+ ythrow yexception() << "Yson element is not supported";
+ }
+
+ class TJsonConsumer : public NYson::TYsonConsumerBase {
+ public:
+ TJsonConsumer(NJson::TJsonWriter& writer)
+ : Writer(writer)
+ {}
+
+ void OnStringScalar(TStringBuf value) override {
+ Writer.Write(value);
+ }
+
+ void OnInt64Scalar(i64 value) override {
+ Y_UNUSED(value);
+ ThrowNotSupported();
+ }
+
+ void OnUint64Scalar(ui64 value) override {
+ Y_UNUSED(value);
+ ThrowNotSupported();
+ }
+
+ void OnDoubleScalar(double value) override {
+ Y_UNUSED(value);
+ ThrowNotSupported();
+ }
+
+ void OnBooleanScalar(bool value) override {
+ Writer.Write(value);
+ }
+
+ void OnEntity() override {
+ Writer.WriteNull();
+ }
+
+ void OnBeginList() override {
+ Writer.OpenArray();
+ }
+
+ void OnListItem() override {
+ }
+
+ void OnEndList() override {
+ Writer.CloseArray();
+ }
+
+ void OnBeginMap() override {
+ Writer.OpenMap();
+ }
+
+ void OnKeyedItem(TStringBuf key) override {
+ Writer.WriteKey(key);
+ }
+
+ void OnEndMap() override {
+ Writer.CloseMap();
+ }
+
+ void OnBeginAttributes() override {
+ ThrowNotSupported();
+ }
+
+ void OnEndAttributes() override {
+ ThrowNotSupported();
+ }
+
+
+ private:
+ NJson::TJsonWriter& Writer;
+ };
+
+ TString Yson2Json(const TString& yson) {
+ TString jsonString;
+
+ TStringOutput jsonStream{ jsonString };
+ NJson::TJsonWriter jsonWriter{ &jsonStream, false };
+ TJsonConsumer jsonConsumer{ jsonWriter };
+ NYson::ParseYsonStringBuffer(yson, &jsonConsumer);
+ jsonWriter.Flush();
+ return jsonString;
+ }
+ }
+
+ class TOperation: public IOperation {
+ public:
+ TOperation(const TString& result, const TString& plan, const TString& statistics, const TString& taskInfo)
+ : Result_(result)
+ , Plan_(plan)
+ , Statistics_(statistics)
+ , TaskInfo_(taskInfo)
+ {
+ }
+
+ const TString& YsonResult() const override {
+ return Result_;
+ }
+
+ const TString& Plan() const override {
+ return Plan_;
+ }
+
+ const TString& Statistics() const override {
+ return Statistics_;
+ }
+
+ const TString& TaskInfo() const override {
+ return TaskInfo_;
+ }
+
+ private:
+ const TString Result_;
+ const TString Plan_;
+ const TString Statistics_;
+ const TString TaskInfo_;
+ };
+
+ class TOperationFactory: public IOperationFactory {
+ public:
+ TOperationFactory(const TOperationFactoryOptions& options,
+ const TString& configData,
+ std::function<NFS::IDownloaderPtr(const TFileStorageConfig&)> arcDownloaderFactory)
+ : Logger(&Cerr)
+ , Options_(options)
+ {
+ auto& logger = NLog::YqlLogger();
+ logger.SetDefaultPriority(Options_.LogLevel_);
+ for (int i = 0; i < NLog::EComponentHelpers::ToInt(NLog::EComponent::MaxValue); ++i) {
+ logger.SetComponentLevel((NLog::EComponent)i, (NLog::ELevel)Options_.LogLevel_);
+ }
+
+ NYql::SetYtLoggerGlobalBackend(Options_.YtLogLevel_);
+ if (NYT::TConfig::Get()->Prefix.empty()) {
+ NYT::TConfig::Get()->Prefix = "//";
+ }
+
+ if (GetEnv("YT_FORCE_IPV6").empty()) {
+ NYT::TConfig::Get()->ForceIpV6 = true;
+ }
+
+ const bool useStaticLinking = Options_.MrJobBinary_.empty();
+ if (Options_.MrJobBinary_) {
+ EnsureBinary(Options_.MrJobBinary_, "MrJobBinary");
+ }
+
+ if (!::google::protobuf::TextFormat::ParseFromString(configData, &GatewaysConfig_)) {
+ ythrow yexception() << "Bad format of gateways configuration";
+ }
+ auto yqlCoreFlags = GatewaysConfig_.GetYqlCore().GetFlags();
+ GatewaysConfig_.MutableYqlCore()->ClearFlags();
+ for (auto flag : yqlCoreFlags) {
+ if (flag.GetName() != "GeobaseDownloadUrl") {
+ *GatewaysConfig_.MutableYqlCore()->AddFlags() = flag;
+ }
+ }
+
+ auto ytConfig = GatewaysConfig_.MutableYt();
+ if (!ytConfig->HasExecuteUdfLocallyIfPossible()) {
+ ytConfig->SetExecuteUdfLocallyIfPossible(true);
+ }
+ ytConfig->SetYtLogLevel(static_cast<NYql::EYtLogLevel>(Options_.YtLogLevel_));
+ if (useStaticLinking) {
+ ytConfig->ClearMrJobBin();
+ } else {
+ ytConfig->SetMrJobBin(Options_.MrJobBinary_);
+ ytConfig->SetMrJobBinMd5(MD5::File(Options_.MrJobBinary_));
+ }
+
+ ytConfig->ClearMrJobUdfsDir();
+ if (Options_.LocalChainTest_) {
+ ytConfig->SetLocalChainTest(true);
+ ytConfig->SetLocalChainFile(Options_.LocalChainFile_);
+ }
+
+ for (const auto& cluster : Options_.YtClusters_) {
+ auto clusterMapping = ytConfig->AddClusterMapping();
+ clusterMapping->SetName(cluster.Name_);
+ clusterMapping->SetCluster(cluster.Cluster_);
+ }
+ for (size_t index = 0; index < ytConfig->ClusterMappingSize(); ++index) {
+ auto cluster = ytConfig->MutableClusterMapping(index);
+ auto settings = cluster->MutableSettings();
+ bool hasOwners = false;
+ for (int settingsIndex = 0; settingsIndex < settings->size(); ++settingsIndex) {
+ auto attr = settings->Mutable(settingsIndex);
+ if (attr->GetName() == "Owners") {
+ hasOwners = true;
+ if (!Options_.YtOwners_.empty()) {
+ attr->SetValue(Options_.YtOwners_);
+ }
+ }
+ }
+
+ if (!hasOwners && !Options_.YtOwners_.empty()) {
+ auto newSetting = settings->Add();
+ newSetting->SetName("Owners");
+ newSetting->SetValue(Options_.YtOwners_);
+ }
+
+ Clusters_.insert({cluster->GetName(), TString(YtProviderName)});
+ }
+
+ TFileStorageConfig fileStorageConfig;
+ fileStorageConfig.SetMaxSizeMb(1 << 14);
+
+ std::vector<NFS::IDownloaderPtr> downloaders;
+ downloaders.push_back(MakeYtDownloader(fileStorageConfig));
+ auto arcDownloader = arcDownloaderFactory(fileStorageConfig);
+ if (arcDownloader) {
+ downloaders.push_back(arcDownloader);
+ }
+
+ FileStorage_ = WithAsync(CreateFileStorage(fileStorageConfig, downloaders));
+
+ NResource::TResources libs;
+ const TStringBuf prefix = "resfs/file/yql_libs/";
+ NResource::FindMatch(prefix, &libs);
+ for (auto x : libs) {
+ auto libName = x.Key;
+ libName.SkipPrefix(prefix);
+ NUserData::TUserData d{ NUserData::EType::LIBRARY,
+ NUserData::EDisposition::RESOURCE_FILE,
+ TString("yql_libs/") + libName,
+ TString(x.Key) };
+ Options_.UserData_.push_back(d);
+ }
+
+ NUserData::TUserData::UserDataToLibraries(Options_.UserData_, Modules_);
+
+ FuncRegistry_ = NKikimr::NMiniKQL::CreateFunctionRegistry(NKikimr::NMiniKQL::CreateBuiltinRegistry())->Clone();
+
+ const NKikimr::NMiniKQL::TUdfModuleRemappings emptyRemappings;
+ if (!useStaticLinking && !Options_.UdfsDir_.empty()) {
+ FuncRegistry_->SetBackTraceCallback(&NYql::NBacktrace::KikimrBackTrace);
+
+ NKikimr::NMiniKQL::TUdfModulePathsMap systemModules;
+ if (!Options_.UdfResolverBinary_.empty()) {
+ NCommon::LoadSystemModulePaths(
+ Options_.UdfResolverBinary_,
+ Options_.UdfsDir_,
+ &systemModules);
+
+ if (Options_.PreloadUdfs_) {
+ for (const auto& p : systemModules) {
+ FuncRegistry_->LoadUdfs(p.second, emptyRemappings, 0);
+ }
+ }
+ } else {
+ TVector<TString> udfPaths;
+ NKikimr::NMiniKQL::FindUdfsInDir(Options_.UdfsDir_, &udfPaths);
+ for (const auto& path : udfPaths) {
+ Cerr << path << "\n";
+ FuncRegistry_->LoadUdfs(path, emptyRemappings, 0);
+ }
+
+ for (auto& m : FuncRegistry_->GetAllModuleNames()) {
+ TMaybe<TString> path = FuncRegistry_->FindUdfPath(m);
+ if (!path) {
+ // should not happen
+ ythrow yexception() << "Unable to detect UDF path for module " << m;
+ }
+ systemModules.emplace(m, *path);
+ }
+ }
+
+ FuncRegistry_->SetSystemModulePaths(systemModules);
+ }
+
+ if (useStaticLinking) {
+ NKikimr::NMiniKQL::FillStaticModules(*FuncRegistry_);
+ }
+
+ TUserDataTable userDataTable = GetYqlModuleResolver(ExprContext_, ModuleResolver_, Options_.UserData_, Clusters_, {});
+
+ if (!userDataTable) {
+ TStringStream err;
+ ExprContext_.IssueManager.GetIssues().PrintTo(err);
+ ythrow yexception() << "Failed to compile modules:\n"
+ << err.Str();
+ }
+
+ TVector<TDataProviderInitializer> dataProvidersInit;
+
+ TYtNativeServices ytServices;
+ ytServices.FunctionRegistry = FuncRegistry_.Get();
+ ytServices.FileStorage = FileStorage_;
+ ytServices.Config = std::make_shared<TYtGatewayConfig>(*ytConfig);
+ auto ytNativeGateway = CreateYtNativeGateway(ytServices);
+ dataProvidersInit.push_back(GetYtNativeDataProviderInitializer(ytNativeGateway));
+
+ ProgramFactory_ = MakeHolder<TProgramFactory>(
+ false, FuncRegistry_.Get(), ExprContext_.NextUniqueId, dataProvidersInit, "embedded");
+ auto credentials = MakeIntrusive<TCredentials>();
+ if (!Options_.YtToken_.empty()) {
+ credentials->AddCredential("default_yt", TCredential("yt", "", Options_.YtToken_));
+ }
+ if (!Options_.StatToken_.empty()) {
+ credentials->AddCredential("default_statface", TCredential("statface", "", Options_.StatToken_));
+ }
+ for (const auto& [name, value] : Options_.CustomTokens_) {
+ credentials->AddCredential(name, TCredential("custom", "", value));
+ }
+
+ ProgramFactory_->AddUserDataTable(userDataTable);
+ ProgramFactory_->SetCredentials(credentials);
+ ProgramFactory_->SetModules(ModuleResolver_);
+ ProgramFactory_->SetUdfResolver((useStaticLinking || Options_.UdfResolverBinary_.empty()) ? NCommon::CreateSimpleUdfResolver(FuncRegistry_.Get(), FileStorage_) :
+ NCommon::CreateOutProcUdfResolver(FuncRegistry_.Get(), FileStorage_, Options_.UdfResolverBinary_, {}, {}, false, {}));
+ ProgramFactory_->SetGatewaysConfig(&GatewaysConfig_);
+ ProgramFactory_->SetFileStorage(FileStorage_);
+ ProgramFactory_->SetUrlPreprocessing(MakeIntrusive<TUrlPreprocessing>(GatewaysConfig_));
+ }
+
+ THolder<IOperation> Run(const TString& queryText, const TOperationOptions& options) const override {
+ TProgramPtr program = ProgramFactory_->Create("-memory-", queryText);
+ if (options.Title) {
+ program->SetOperationTitle(*options.Title);
+ }
+
+ if (options.Attributes) {
+ program->SetOperationAttrsYson(*options.Attributes);
+ }
+
+ if (options.Parameters) {
+ program->SetParametersYson(*options.Parameters);
+ }
+
+ NSQLTranslation::TTranslationSettings sqlSettings;
+ sqlSettings.ClusterMapping = Clusters_;
+ sqlSettings.ModuleMapping = Modules_;
+ sqlSettings.SyntaxVersion = options.SyntaxVersion;
+ sqlSettings.V0Behavior = NSQLTranslation::EV0Behavior::Disable;
+ for (const auto& item : Options_.UserData_) {
+ if (item.Type_ == NUserData::EType::LIBRARY) {
+ sqlSettings.Libraries.emplace(item.Disposition_ == NUserData::EDisposition::RESOURCE_FILE ?
+ item.Name_ : item.Content_);
+ }
+ }
+
+ if (!program->ParseSql(sqlSettings)) {
+ TStringStream err;
+ program->PrintErrorsTo(err);
+ ythrow yexception() << "Failed to parse SQL: " << err.Str();
+ }
+
+ if (!program->Compile(GetUsername())) {
+ TStringStream err;
+ program->PrintErrorsTo(err);
+ ythrow yexception() << "Failed to compile: " << err.Str();
+ }
+
+ TProgram::TStatus status = TProgram::TStatus::Error;
+ switch (options.Mode) {
+ case EExecuteMode::Run:
+ status = program->Run(GetUsername(), nullptr, nullptr, nullptr);
+ break;
+ case EExecuteMode::Optimize:
+ status = program->Optimize(GetUsername(), nullptr, nullptr, nullptr);
+ break;
+ case EExecuteMode::Validate:
+ status = program->Validate(GetUsername(), nullptr);
+ break;
+ case EExecuteMode::Lineage:
+ status = program->Lineage(GetUsername(), nullptr, nullptr);
+ break;
+ }
+
+ if (status == TProgram::TStatus::Error) {
+ TStringStream err;
+ program->PrintErrorsTo(err);
+ ythrow yexception() << "Failed to run: " << err.Str();
+ }
+
+ TStringStream result;
+ if (options.Mode == EExecuteMode::Lineage) {
+ if (auto data = program->GetLineage()) {
+ TStringInput in(*data);
+ NYson::ReformatYsonStream(&in, &result, Options_.ResultFormat_);
+ }
+ } else if (program->HasResults()) {
+ NYson::TYsonWriter yson(&result, Options_.ResultFormat_);
+ yson.OnBeginList();
+ for (const auto& result : program->Results()) {
+ yson.OnListItem();
+ yson.OnRaw(result);
+ }
+ yson.OnEndList();
+ }
+
+ auto plan = program->GetQueryPlan(TPlanSettings().SetLimitInputPins(std::nullopt).SetLimitOutputPins(std::nullopt)).GetOrElse("");
+ auto taskInfo = program->GetTasksInfo().GetOrElse("");
+
+ auto statistics = program->GetStatistics().GetOrElse("");
+ if (statistics) {
+ TStringStream strInput(statistics);
+ TStringStream strFormatted;
+ NYson::ReformatYsonStream(&strInput, &strFormatted, NYson::EYsonFormat::Pretty);
+ statistics = strFormatted.Str();
+ }
+
+ if (taskInfo) {
+ TStringStream strInput(taskInfo);
+ TStringStream strFormatted;
+ NYson::ReformatYsonStream(&strInput, &strFormatted, NYson::EYsonFormat::Pretty);
+ taskInfo = strFormatted.Str();
+ }
+
+ return MakeHolder<TOperation>(result.Str(), plan, statistics, taskInfo);
+ }
+
+ void Save(const TString& queryText, const TOperationOptions& options, const TString& destinationFolder) const override {
+ using namespace NUserData;
+ TString finalQueryText;
+
+ TStringBuilder cmdLine;
+ cmdLine << "#!/usr/bin/env bash\nset -eux\nya yql -i main.sql";
+ cmdLine << " --syntax-version=" << options.SyntaxVersion;
+ if (options.Title) {
+ cmdLine << " --title=" << options.Title.Get()->Quote();
+ }
+
+ if (options.Parameters) {
+ TFileOutput paramFile(TFsPath(destinationFolder) / "params.json");
+ paramFile.Write(Yson2Json(*options.Parameters));
+ cmdLine << " --parameters-file=params.json";
+ }
+
+ ui32 fileIndex = 0;
+ for (const auto& item : Options_.UserData_) {
+ switch (item.Disposition_) {
+ case EDisposition::INLINE: {
+ auto path = "files" + ToString(++fileIndex);
+ TFileOutput dataFile(TFsPath(destinationFolder) / path);
+ dataFile.Write(item.Content_);
+ cmdLine << " -F " << item.Name_.Quote() << "@" << path;
+ break;
+ }
+ case EDisposition::RESOURCE:
+ case EDisposition::RESOURCE_FILE: {
+ TString skipSlash(TStringBuf(item.Content_).After('/'));
+ if (item.Type_ == EType::LIBRARY) {
+ finalQueryText += "pragma library(" + skipSlash.Quote() + ");\n";
+ }
+
+ auto path = "files" + ToString(++fileIndex);
+ TFileOutput dataFile(TFsPath(destinationFolder) / path);
+ auto resContent = NResource::Find(item.Content_);
+ dataFile.Write(resContent);
+ cmdLine << " -F " << (item.Type_ == EType::LIBRARY ? skipSlash : item.Name_).Quote() << "@" << path;
+ break;
+ }
+ case EDisposition::FILESYSTEM: {
+ cmdLine << " -F " << item.Name_.Quote() << "@" << RealPath(item.Content_).Quote();
+ break;
+ }
+ case EDisposition::URL: {
+ cmdLine << " -U " << item.Name_.Quote() << "@" << item.Content_.Quote();
+ break;
+ }
+ }
+ }
+
+
+ auto patchedQueryText = queryText;
+ SubstGlobal(patchedQueryText, "import .", "import ");
+
+ finalQueryText += patchedQueryText;
+ TFileOutput sqlFile(TFsPath(destinationFolder) / "main.sql");
+ sqlFile.Write(finalQueryText);
+
+ TFileOutput runFile(TFsPath(destinationFolder) / "run.sh");
+ runFile.Write(cmdLine);
+ runFile.Finish();
+ Chmod((TFsPath(destinationFolder) / "run.sh").c_str(), MODE0755);
+ }
+
+ private:
+ void EnsureBinary(const TString& path, const TString& name) {
+ if (path.empty()) {
+ ythrow yexception() << "Parameter: " << name << " must not be empty";
+ }
+
+ if (!NFs::Exists(path)) {
+ ythrow yexception() << "Binary for parameter: " << name << " is not found at path: " << path;
+ }
+ }
+
+ private:
+ NLog::YqlLoggerScope Logger;
+ TOperationFactoryOptions Options_;
+ TFileStoragePtr FileStorage_;
+ TExprContext ExprContext_;
+ TIntrusivePtr<NKikimr::NMiniKQL::IMutableFunctionRegistry> FuncRegistry_;
+ IModuleResolver::TPtr ModuleResolver_;
+ TGatewaysConfig GatewaysConfig_;
+ THolder<TProgramFactory> ProgramFactory_;
+ THashMap<TString, TString> Clusters_;
+ THashMap<TString, TString> Modules_;
+ THashSet<TString> Libraries_;
+ };
+
+ THolder<IOperationFactory> MakeOperationFactory(
+ const TOperationFactoryOptions& options,
+ const TString& configData,
+ std::function<NFS::IDownloaderPtr(const TFileStorageConfig&)> arcDownloaderFactory) {
+ return MakeHolder<TOperationFactory>(options, configData, arcDownloaderFactory);
+ }
+ }
+}
diff --git a/ydb/library/yql/public/embedded/yql_embedded.h b/ydb/library/yql/public/embedded/yql_embedded.h
new file mode 100644
index 0000000000..ff56c0c00d
--- /dev/null
+++ b/ydb/library/yql/public/embedded/yql_embedded.h
@@ -0,0 +1,83 @@
+#pragma once
+
+#include <ydb/library/yql/core/user_data/yql_user_data.h>
+#include <ydb/library/yql/core/file_storage/defs/downloader.h>
+
+#include <yt/cpp/mapreduce/interface/logging/logger.h>
+
+#include <library/cpp/logger/priority.h>
+#include <library/cpp/yson/public.h>
+
+#include <util/generic/hash.h>
+#include <util/generic/maybe.h>
+#include <util/generic/ptr.h>
+#include <util/generic/string.h>
+#include <util/generic/vector.h>
+
+namespace NYql {
+ class TFileStorageConfig;
+
+ namespace NEmbedded {
+ class IOperation {
+ public:
+ virtual ~IOperation() = default;
+ virtual const TString& YsonResult() const = 0;
+ virtual const TString& Plan() const = 0;
+ virtual const TString& Statistics() const = 0;
+ virtual const TString& TaskInfo() const = 0;
+ };
+
+ enum class EExecuteMode {
+ Validate,
+ Optimize,
+ Run,
+ Lineage
+ };
+
+ struct TOperationOptions {
+ TMaybe<TString> Title; // title for YT transactions and operations, should contain 'YQL' somewhere
+ ui16 SyntaxVersion = 1;
+ TMaybe<TString> Attributes; // yson map with additional attributes
+ TMaybe<TString> Parameters; // in yson format
+ EExecuteMode Mode = EExecuteMode::Run;
+ };
+
+ // must be allocated only once per process
+ class IOperationFactory {
+ public:
+ virtual ~IOperationFactory() = default;
+ virtual THolder<IOperation> Run(const TString& queryText, const TOperationOptions& options) const = 0;
+ virtual void Save(const TString& queryText, const TOperationOptions& options, const TString& destinationFolder) const = 0;
+ };
+
+ struct TYtClusterOptions {
+ TString Name_;
+ TString Cluster_;
+ };
+
+ struct TOperationFactoryOptions {
+ TString MrJobBinary_; // assume static linking (including UDFs) if empty
+ TString UdfResolverBinary_;
+ TString UdfsDir_;
+ bool PreloadUdfs_ = false; // used when UdfResolverBinary_ is specified, if UdfResolverBinary_ is empty it is considered equal to true
+ TVector<NUserData::TUserData> UserData_;
+
+ ELogPriority LogLevel_ = TLOG_ERR;
+ NYT::ILogger::ELevel YtLogLevel_ = NYT::ILogger::ERROR;
+ NYson::EYsonFormat ResultFormat_ = NYson::EYsonFormat::Pretty;
+
+ TVector<TYtClusterOptions> YtClusters_;
+ TString YtToken_;
+ TString YtOwners_;
+ TString StatToken_;
+ bool LocalChainTest_ = false;
+ TString LocalChainFile_;
+ THashMap<TString, TString> CustomTokens_;
+ };
+
+ THolder<IOperationFactory> MakeOperationFactory(
+ const TOperationFactoryOptions& options,
+ const TString& configData,
+ std::function<NFS::IDownloaderPtr(const TFileStorageConfig&)> arcDownloaderFactory);
+ }
+}
diff --git a/ydb/library/yql/public/ya.make b/ydb/library/yql/public/ya.make
index 9fbf7e6a27..bdbac7a820 100644
--- a/ydb/library/yql/public/ya.make
+++ b/ydb/library/yql/public/ya.make
@@ -1,5 +1,6 @@
RECURSE(
decimal
+ embedded
fastcheck
issue
purecalc