summaryrefslogtreecommitdiffstats
path: root/yql/essentials/providers/common/udf_resolve
diff options
context:
space:
mode:
authorvvvv <[email protected]>2024-11-07 12:29:36 +0300
committervvvv <[email protected]>2024-11-07 13:49:47 +0300
commitd4c258e9431675bab6745c8638df6e3dfd4dca6b (patch)
treeb5efcfa11351152a4c872fccaea35749141c0b11 /yql/essentials/providers/common/udf_resolve
parent13a4f274caef5cfdaf0263b24e4d6bdd5521472b (diff)
Moved other yql/essentials libs YQL-19206
init commit_hash:7d4c435602078407bbf20dd3c32f9c90d2bbcbc0
Diffstat (limited to 'yql/essentials/providers/common/udf_resolve')
-rw-r--r--yql/essentials/providers/common/udf_resolve/ya.make30
-rw-r--r--yql/essentials/providers/common/udf_resolve/yql_files_box.cpp87
-rw-r--r--yql/essentials/providers/common/udf_resolve/yql_files_box.h33
-rw-r--r--yql/essentials/providers/common/udf_resolve/yql_outproc_udf_resolver.cpp420
-rw-r--r--yql/essentials/providers/common/udf_resolve/yql_outproc_udf_resolver.h30
-rw-r--r--yql/essentials/providers/common/udf_resolve/yql_simple_udf_resolver.cpp229
-rw-r--r--yql/essentials/providers/common/udf_resolve/yql_simple_udf_resolver.h22
-rw-r--r--yql/essentials/providers/common/udf_resolve/yql_udf_resolver_with_index.cpp248
-rw-r--r--yql/essentials/providers/common/udf_resolve/yql_udf_resolver_with_index.h12
9 files changed, 1111 insertions, 0 deletions
diff --git a/yql/essentials/providers/common/udf_resolve/ya.make b/yql/essentials/providers/common/udf_resolve/ya.make
new file mode 100644
index 00000000000..65fa463b8c4
--- /dev/null
+++ b/yql/essentials/providers/common/udf_resolve/ya.make
@@ -0,0 +1,30 @@
+LIBRARY()
+
+SRCS(
+ yql_files_box.cpp
+ yql_files_box.h
+ yql_outproc_udf_resolver.cpp
+ yql_outproc_udf_resolver.h
+ yql_simple_udf_resolver.cpp
+ yql_simple_udf_resolver.h
+ yql_udf_resolver_with_index.cpp
+ yql_udf_resolver_with_index.h
+)
+
+PEERDIR(
+ library/cpp/digest/md5
+ library/cpp/protobuf/util
+ yql/essentials/core/file_storage
+ yql/essentials/minikql
+ yql/essentials/public/udf
+ yql/essentials/utils
+ yql/essentials/utils/log
+ yql/essentials/core
+ yql/essentials/providers/common/mkql
+ yql/essentials/providers/common/proto
+ yql/essentials/providers/common/schema/expr
+)
+
+YQL_LAST_ABI_VERSION()
+
+END()
diff --git a/yql/essentials/providers/common/udf_resolve/yql_files_box.cpp b/yql/essentials/providers/common/udf_resolve/yql_files_box.cpp
new file mode 100644
index 00000000000..0220ba8f6c9
--- /dev/null
+++ b/yql/essentials/providers/common/udf_resolve/yql_files_box.cpp
@@ -0,0 +1,87 @@
+#include "yql_files_box.h"
+
+#include <yql/essentials/core/file_storage/storage.h>
+#include <yql/essentials/utils/log/log.h>
+
+#include <util/system/fs.h>
+#include <util/system/error.h>
+#include <util/system/sysstat.h>
+#include <util/folder/dirut.h>
+
+namespace NYql {
+namespace NCommon {
+
+TFilesBox::TFilesBox(TFsPath dir, TRandGuid randGuid)
+ : Dir(std::move(dir))
+ , RandGuid(std::move(randGuid))
+{
+}
+
+TFilesBox::~TFilesBox() {
+ try {
+ Destroy();
+ } catch (...) {
+ YQL_LOG(ERROR) << "Error occurred in files box destroy: " << CurrentExceptionMessage();
+ }
+}
+
+TString TFilesBox::MakeLinkFrom(const TString& source, const TString& filename) {
+ if (!filename) {
+ if (auto* existingPath = Mapping.FindPtr(source)) {
+ return *existingPath;
+ }
+ }
+
+ TFsPath sourcePath(source);
+ TString sourceAbsolutePath = sourcePath.IsAbsolute() ? source : (TFsPath::Cwd() / sourcePath).GetPath();
+ TString path;
+ if (filename) {
+ path = Dir / filename;
+ if (!NFs::SymLink(sourceAbsolutePath, path)) {
+ ythrow TSystemError() << "Failed to create symlink for file " << sourceAbsolutePath.Quote() << " to file " << path.Quote();
+ }
+ } else {
+ while (true) {
+ path = Dir / RandGuid.GenGuid();
+ if (NFs::SymLink(sourceAbsolutePath, path)) {
+ break;
+ } else if (LastSystemError() != EEXIST) {
+ ythrow TSystemError() << "Failed to create symlink for file " << sourceAbsolutePath.Quote() << " to file " << path.Quote();
+ }
+ }
+ Mapping.emplace(source, path);
+ }
+ return path;
+}
+
+TString TFilesBox::GetDir() const {
+ return Dir;
+}
+
+void TFilesBox::Destroy() {
+ Mapping.clear();
+ Dir.ForceDelete();
+}
+
+THolder<TFilesBox> CreateFilesBox(const TFsPath& baseDir) {
+ TRandGuid randGuid;
+ TFsPath path = baseDir / randGuid.GenGuid();
+
+ while (true) {
+ if (!path.Exists()) {
+ int r = Mkdir(path.c_str(), MODE0711);
+ if (r == 0) {
+ break;
+ }
+ if (LastSystemError() != EEXIST) {
+ ythrow TIoSystemError() << "could not create directory " << path;
+ }
+ }
+ path = baseDir / randGuid.GenGuid();
+ }
+
+ return MakeHolder<TFilesBox>(std::move(path), std::move(randGuid));
+}
+
+}
+}
diff --git a/yql/essentials/providers/common/udf_resolve/yql_files_box.h b/yql/essentials/providers/common/udf_resolve/yql_files_box.h
new file mode 100644
index 00000000000..d7f348660c2
--- /dev/null
+++ b/yql/essentials/providers/common/udf_resolve/yql_files_box.h
@@ -0,0 +1,33 @@
+#pragma once
+
+#include <yql/essentials/utils/rand_guid.h>
+
+#include <util/generic/hash.h>
+#include <util/folder/path.h>
+
+namespace NYql {
+namespace NCommon {
+
+/*
+ Resembles sandbox for external UDFs
+*/
+class TFilesBox {
+public:
+ TFilesBox(TFsPath dir, TRandGuid randGuid);
+ ~TFilesBox();
+
+ TString MakeLinkFrom(const TString& source, const TString& filename = {});
+ TString GetDir() const;
+
+ void Destroy();
+
+private:
+ TFsPath Dir;
+ TRandGuid RandGuid;
+ THashMap<TString, TString> Mapping;
+};
+
+THolder<TFilesBox> CreateFilesBox(const TFsPath& baseDir);
+
+}
+}
diff --git a/yql/essentials/providers/common/udf_resolve/yql_outproc_udf_resolver.cpp b/yql/essentials/providers/common/udf_resolve/yql_outproc_udf_resolver.cpp
new file mode 100644
index 00000000000..02637d16d8e
--- /dev/null
+++ b/yql/essentials/providers/common/udf_resolve/yql_outproc_udf_resolver.cpp
@@ -0,0 +1,420 @@
+#include "yql_outproc_udf_resolver.h"
+#include "yql_simple_udf_resolver.h"
+#include "yql_files_box.h"
+
+#include <yql/essentials/providers/common/proto/udf_resolver.pb.h>
+#include <yql/essentials/providers/common/schema/expr/yql_expr_schema.h>
+#include <yql/essentials/core/yql_holding_file_storage.h>
+#include <yql/essentials/core/yql_type_annotation.h>
+#include <yql/essentials/utils/log/log.h>
+#include <yql/essentials/utils/retry.h>
+
+#include <yql/essentials/minikql/mkql_node.h>
+#include <yql/essentials/minikql/mkql_type_builder.h>
+#include <yql/essentials/minikql/mkql_program_builder.h>
+#include <yql/essentials/minikql/mkql_utils.h>
+
+#include <library/cpp/protobuf/util/pb_io.h>
+
+#include <util/generic/scope.h>
+#include <util/stream/str.h>
+#include <util/string/strip.h>
+#include <util/system/shellcommand.h>
+#include <util/string/split.h>
+
+#include <regex>
+
+namespace NYql {
+namespace NCommon {
+
+using namespace NKikimr;
+using namespace NKikimr::NMiniKQL;
+
+namespace {
+template <typename F>
+void RunResolver(
+ const TString& resolverPath,
+ const TList<TString>& args,
+ IInputStream* input,
+ const F& outputHandler,
+ const TString& ldLibraryPath = {}) {
+
+ TShellCommandOptions shellOptions;
+ shellOptions
+ .SetUseShell(false)
+ .SetDetachSession(false)
+ .SetInputStream(input); // input can be nullptr
+
+ if (ldLibraryPath) {
+ YQL_LOG(DEBUG) << "Using LD_LIBRARY_PATH = " << ldLibraryPath << " for Udf resolver";
+ shellOptions.Environment["LD_LIBRARY_PATH"] = ldLibraryPath;
+ }
+
+ TShellCommand shell(resolverPath, args, shellOptions);
+
+ switch (shell.Run().GetStatus()) {
+ case TShellCommand::SHELL_INTERNAL_ERROR:
+ ythrow yexception() << "Udf resolver internal error: "
+ << shell.GetInternalError();
+ case TShellCommand::SHELL_ERROR:
+ ythrow yexception() << "Udf resolver shell error: "
+ << StripString(shell.GetError());
+ case TShellCommand::SHELL_FINISHED:
+ break;
+ default:
+ ythrow yexception() << "Unexpected udf resolver state: "
+ << int(shell.GetStatus());
+ }
+
+ if (shell.GetError()) {
+ YQL_LOG(INFO) << "UdfResolver stderr: " << shell.GetError();
+ }
+
+ outputHandler(shell.GetOutput());
+}
+
+template <typename F>
+void RunResolver(
+ const TString& resolverPath,
+ const TList<TString>& args,
+ const TResolve& request,
+ const F& outputHandler,
+ const TString& ldLibraryPath = {}) {
+
+ TStringStream input;
+ YQL_ENSURE(request.SerializeToArcadiaStream(&input), "Cannot serialize TResolve proto message");
+ RunResolver(resolverPath, args, &input, outputHandler, ldLibraryPath);
+}
+
+TString ExtractSharedObjectNameFromErrorMessage(const char* message) {
+ if (!message) {
+ return "";
+ }
+
+ // example:
+ // util/system/dynlib.cpp:56: libcuda.so.1: cannot open shared object file: No such file or directory
+ static std::regex re(".*: (.+): cannot open shared object file: No such file or directory");
+ std::cmatch match;
+ if (!std::regex_match(message, match, re)) {
+ return "";
+ }
+
+ return TString(match[1].str());
+}
+}
+
+class TOutProcUdfResolver : public IUdfResolver {
+public:
+ TOutProcUdfResolver(const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry,
+ const TFileStoragePtr& fileStorage, const TString& resolverPath,
+ const TString& user, const TString& group, bool filterSyscalls,
+ const TString& udfDependencyStubPath, const TMap<TString, TString>& path2md5)
+ : FunctionRegistry_(functionRegistry)
+ , TypeInfoHelper_(new TTypeInfoHelper)
+ , FileStorage_(fileStorage)
+ , ResolverPath_(resolverPath)
+ , UdfDependencyStubPath_(udfDependencyStubPath)
+ , Path2Md5_(path2md5)
+ {
+ if (user) {
+ UserGroupArgs_ = { "-U", user, "-G", group };
+ }
+
+ if (filterSyscalls) {
+ UserGroupArgs_.push_back("-F");
+ }
+ }
+
+ TMaybe<TFilePathWithMd5> GetSystemModulePath(const TStringBuf& moduleName) const override {
+ auto path = FunctionRegistry_->FindUdfPath(moduleName);
+ if (!path) {
+ return Nothing();
+ }
+
+ const TString md5 = Path2Md5_.Value(*path, "");
+ return MakeMaybe<TFilePathWithMd5>(*path, md5);
+ }
+
+ bool ContainsModule(const TStringBuf& moduleName) const override {
+ return FunctionRegistry_->IsLoadedUdfModule(moduleName);
+ }
+
+ bool LoadMetadata(const TVector<TImport*>& imports, const TVector<TFunction*>& functions, TExprContext& ctx) const override {
+ THashSet<TString> requiredLoadedModules;
+ THashSet<TString> requiredExternalModules;
+ TVector<TFunction*> loadedFunctions;
+ TVector<TFunction*> externalFunctions;
+
+ bool hasErrors = false;
+ for (auto udf : functions) {
+ TStringBuf moduleName, funcName;
+ if (!SplitUdfName(udf->Name, moduleName, funcName) || moduleName.empty() || funcName.empty()) {
+ ctx.AddError(TIssue(udf->Pos, TStringBuilder() <<
+ "Incorrect format of function name: " << udf->Name));
+ hasErrors = true;
+ } else {
+ if (FunctionRegistry_->IsLoadedUdfModule(moduleName)) {
+ requiredLoadedModules.insert(TString(moduleName));
+ loadedFunctions.push_back(udf);
+ } else {
+ requiredExternalModules.insert(TString(moduleName));
+ externalFunctions.push_back(udf);
+ }
+ }
+ }
+
+ TResolve request;
+ TVector<TImport*> usedImports;
+ THoldingFileStorage holdingFileStorage(FileStorage_);
+ THolder<TFilesBox> filesBox = CreateFilesBoxOverFileStorageTemp();
+
+ THashMap<TString, TImport*> path2LoadedImport;
+ for (auto import : imports) {
+ if (import->Modules) {
+ bool needLibrary = false;
+ for (auto& m : *import->Modules) {
+ if (requiredLoadedModules.contains(m)) {
+ YQL_ENSURE(import->Block->Type == EUserDataType::PATH);
+ path2LoadedImport[import->Block->Data] = import;
+ }
+
+ if (requiredExternalModules.contains(m)) {
+ needLibrary = true;
+ break;
+ }
+ }
+
+ if (!needLibrary) {
+ continue;
+ }
+ } else {
+ import->Modules.ConstructInPlace();
+ }
+
+ try {
+ LoadImport(holdingFileStorage, *filesBox, *import, request);
+ usedImports.push_back(import);
+ } catch (const std::exception& e) {
+ ctx.AddError(ExceptionToIssue(e));
+ hasErrors = true;
+ }
+ }
+
+ for (auto& module : requiredExternalModules) {
+ if (auto path = FunctionRegistry_->FindUdfPath(module)) {
+ auto importRequest = request.AddImports();
+ const TString hiddenPath = filesBox->MakeLinkFrom(*path);
+ importRequest->SetFileAlias(hiddenPath);
+ importRequest->SetPath(hiddenPath);
+ importRequest->SetSystem(true);
+ }
+ }
+
+
+ for (auto udf : externalFunctions) {
+ auto udfRequest = request.AddUdfs();
+ udfRequest->SetName(udf->Name);
+ udfRequest->SetTypeConfig(udf->TypeConfig);
+ if (udf->UserType) {
+ udfRequest->SetUserType(WriteTypeToYson(udf->UserType));
+ }
+ }
+
+ TResolveResult response;
+ try {
+ response = RunResolverAndParseResult(request, { }, *filesBox);
+ filesBox->Destroy();
+ } catch (const std::exception& e) {
+ ctx.AddError(ExceptionToIssue(e));
+ return false;
+ }
+
+ // extract regardless of hasErrors value
+ hasErrors = !ExtractMetadata(response, usedImports, externalFunctions, ctx) || hasErrors;
+ hasErrors = !LoadFunctionsMetadata(loadedFunctions, *FunctionRegistry_, TypeInfoHelper_, ctx) || hasErrors;
+
+ if (!hasErrors) {
+ for (auto& m : FunctionRegistry_->GetAllModuleNames()) {
+ auto path = *FunctionRegistry_->FindUdfPath(m);
+ if (auto import = path2LoadedImport.FindPtr(path)) {
+ (*import)->Modules->push_back(m);
+ }
+ }
+ }
+
+ return !hasErrors;
+ }
+
+ TResolveResult LoadRichMetadata(const TVector<TImport>& imports) const override {
+ TResolve request;
+ THoldingFileStorage holdingFileStorage(FileStorage_);
+ THolder<TFilesBox> filesBox = CreateFilesBoxOverFileStorageTemp();
+ Y_DEFER {
+ filesBox->Destroy();
+ };
+
+ for (auto import : imports) {
+ LoadImport(holdingFileStorage, *filesBox, import, request);
+ }
+
+ return RunResolverAndParseResult(request, { "--discover-proto" }, *filesBox);
+ }
+
+private:
+ THolder<TFilesBox> CreateFilesBoxOverFileStorageTemp() const {
+ return CreateFilesBox(FileStorage_->GetTemp());
+ }
+
+ void LoadImport(THoldingFileStorage& holdingFileStorage, TFilesBox& filesBox, const TImport& import, TResolve& request) const {
+ const TString path = (import.Block->Type == EUserDataType::PATH) ? import.Block->Data : holdingFileStorage.FreezeFile(*import.Block)->GetPath().GetPath();
+
+ const TString hiddenPath = filesBox.MakeLinkFrom(path);
+ auto importRequest = request.AddImports();
+ importRequest->SetFileAlias(import.FileAlias);
+ importRequest->SetPath(hiddenPath);
+ importRequest->SetCustomUdfPrefix(import.Block->CustomUdfPrefix);
+ }
+
+ TResolveResult RunResolverAndParseResult(const TResolve& request, const TVector<TString>& additionalArgs, TFilesBox& filesBox) const {
+ auto args = UserGroupArgs_;
+ args.insert(args.end(), additionalArgs.begin(), additionalArgs.end());
+
+ TString ldLibraryPath;
+ TSet<TString> stubbedLibraries;
+ return WithRetry<yexception>(10, [&]() {
+ TResolveResult response;
+ RunResolver(ResolverPath_, args, request, [&](const TString& output) {
+ YQL_ENSURE(response.ParseFromString(output), "Cannot deserialize TResolveResult proto message");
+ }, ldLibraryPath);
+ return response;
+ }, [&](const yexception& e, int, int) {
+ TStringStream stream;
+ SerializeToTextFormat(request, stream);
+ YQL_LOG(DEBUG) << "Exception from UdfResolver: " << e.what() << " for request " << stream.Str();
+ if (!UdfDependencyStubPath_) {
+ YQL_LOG(DEBUG) << "UdfDependencyStubPath is not specified, unable to recover error " << e.what();
+ throw e;
+ }
+
+ TString sharedLibrary = ExtractSharedObjectNameFromErrorMessage(e.what());
+ if (!sharedLibrary) {
+ throw e;
+ }
+
+ YQL_LOG(DEBUG) << "UdfResolver needs shared library " << sharedLibrary;
+ if (!stubbedLibraries.emplace(sharedLibrary).second) {
+ // already tried, giving up
+ YQL_LOG(ERROR) << "Unable to load shared library " << sharedLibrary << " even after using dependency stub";
+ throw e;
+ }
+
+ YQL_LOG(DEBUG) << "Using dependency stub for shared library " << sharedLibrary;
+ PutSharedLibraryStub(sharedLibrary, filesBox);
+ ldLibraryPath = filesBox.GetDir();
+ });
+ }
+
+ void PutSharedLibraryStub(const TString& sharedLibrary, TFilesBox& filesBox) const {
+ YQL_ENSURE(UdfDependencyStubPath_);
+ filesBox.MakeLinkFrom(UdfDependencyStubPath_, sharedLibrary);
+ }
+
+ static bool ExtractMetadata(const TResolveResult& response, const TVector<TImport*>& usedImports, const TVector<TFunction*>& functions, TExprContext& ctx) {
+ bool hasErrors = false;
+ YQL_ENSURE(response.UdfsSize() == functions.size(), "Number of returned udf signatures doesn't match original one");
+ YQL_ENSURE(response.ImportsSize() >= usedImports.size(), "Number of returned udf modules is too low");
+
+ for (size_t i = 0; i < usedImports.size(); ++i) {
+ const TImportResult& importRes = response.GetImports(i);
+
+ TImport* import = usedImports[i];
+ if (importRes.HasError()) {
+ ctx.AddError(TIssue(import ? import->Pos : TPosition(), importRes.GetError()));
+ hasErrors = true;
+ } else {
+ import->Modules.ConstructInPlace();
+ for (auto& module : importRes.GetModules()) {
+ import->Modules->push_back(module);
+ }
+ }
+ }
+
+ for (size_t i = 0; i < response.UdfsSize(); ++i) {
+ TFunction* udf = functions[i];
+ const TFunctionResult& udfRes = response.GetUdfs(i);
+ if (udfRes.HasError()) {
+ ctx.AddError(TIssue(udf->Pos, udfRes.GetError()));
+ hasErrors = true;
+ } else {
+ udf->CallableType = ParseTypeFromYson(TStringBuf{udfRes.GetCallableType()}, ctx, udf->Pos);
+ if (!udf->CallableType) {
+ hasErrors = true;
+ continue;
+ }
+ if (udfRes.HasNormalizedUserType()) {
+ udf->NormalizedUserType = ParseTypeFromYson(TStringBuf{udfRes.GetNormalizedUserType()}, ctx, udf->Pos);
+ if (!udf->NormalizedUserType) {
+ hasErrors = true;
+ continue;
+ }
+ }
+ if (udfRes.HasRunConfigType()) {
+ udf->RunConfigType = ParseTypeFromYson(TStringBuf{udfRes.GetRunConfigType()}, ctx, udf->Pos);
+ if (!udf->RunConfigType) {
+ hasErrors = true;
+ continue;
+ }
+ }
+ udf->SupportsBlocks = udfRes.GetSupportsBlocks();
+ udf->IsStrict = udfRes.GetIsStrict();
+ }
+ }
+
+ return !hasErrors;
+ }
+
+private:
+ const NKikimr::NMiniKQL::IFunctionRegistry* FunctionRegistry_;
+ NUdf::ITypeInfoHelper::TPtr TypeInfoHelper_;
+ TFileStoragePtr FileStorage_;
+ const TString ResolverPath_;
+ const TString UdfDependencyStubPath_;
+ TList<TString> UserGroupArgs_;
+ const TMap<TString, TString> Path2Md5_;
+};
+
+void LoadSystemModulePaths(
+ const TString& resolverPath,
+ const TString& dir,
+ TUdfModulePathsMap* paths)
+{
+ const TList<TString> args = { TString("--list"), dir };
+ RunResolver(resolverPath, args, nullptr, [&](const TString& output) {
+ // output format is:
+ // {{module_name}}\t{{module_path}}\n
+
+ for (const auto& it : StringSplitter(output).Split('\n')) {
+ TStringBuf moduleName, modulePath;
+ const TStringBuf& line = it.Token();
+ if (!line.empty()) {
+ line.Split('\t', moduleName, modulePath);
+ paths->emplace(moduleName, modulePath);
+ }
+ }
+ });
+}
+
+IUdfResolver::TPtr CreateOutProcUdfResolver(
+ const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry,
+ const TFileStoragePtr& fileStorage,
+ const TString& resolverPath,
+ const TString& user,
+ const TString& group,
+ bool filterSyscalls,
+ const TString& udfDependencyStubPath,
+ const TMap<TString, TString>& path2md5) {
+ return new TOutProcUdfResolver(functionRegistry, fileStorage, resolverPath, user, group, filterSyscalls, udfDependencyStubPath, path2md5);
+}
+
+} // namespace NCommon
+} // namespace NYql
diff --git a/yql/essentials/providers/common/udf_resolve/yql_outproc_udf_resolver.h b/yql/essentials/providers/common/udf_resolve/yql_outproc_udf_resolver.h
new file mode 100644
index 00000000000..35116a1af1f
--- /dev/null
+++ b/yql/essentials/providers/common/udf_resolve/yql_outproc_udf_resolver.h
@@ -0,0 +1,30 @@
+#pragma once
+
+#include <yql/essentials/core/yql_udf_resolver.h>
+#include <yql/essentials/core/file_storage/file_storage.h>
+#include <yql/essentials/minikql/mkql_function_registry.h>
+#include <yql/essentials/providers/common/proto/udf_resolver.pb.h>
+
+#include <util/generic/map.h>
+#include <util/generic/string.h>
+
+namespace NYql {
+namespace NCommon {
+
+void LoadSystemModulePaths(
+ const TString& resolverPath,
+ const TString& dir,
+ NKikimr::NMiniKQL::TUdfModulePathsMap* paths);
+
+IUdfResolver::TPtr CreateOutProcUdfResolver(
+ const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry,
+ const TFileStoragePtr& fileStorage,
+ const TString& resolverPath,
+ const TString& user,
+ const TString& group,
+ bool filterSysCalls,
+ const TString& udfDependencyStubPath,
+ const TMap<TString, TString>& path2md5 = {});
+
+} // namespace NCommon
+} // namespace NYql
diff --git a/yql/essentials/providers/common/udf_resolve/yql_simple_udf_resolver.cpp b/yql/essentials/providers/common/udf_resolve/yql_simple_udf_resolver.cpp
new file mode 100644
index 00000000000..73aab72bde9
--- /dev/null
+++ b/yql/essentials/providers/common/udf_resolve/yql_simple_udf_resolver.cpp
@@ -0,0 +1,229 @@
+#include "yql_simple_udf_resolver.h"
+
+#include <yql/essentials/providers/common/mkql/yql_type_mkql.h>
+#include <yql/essentials/core/yql_holding_file_storage.h>
+
+#include <yql/essentials/minikql/mkql_node.h>
+#include <yql/essentials/minikql/mkql_type_builder.h>
+#include <yql/essentials/minikql/mkql_program_builder.h>
+#include <yql/essentials/minikql/mkql_utils.h>
+#include <yql/essentials/minikql/computation/mkql_computation_node.h>
+
+#include <library/cpp/digest/md5/md5.h>
+
+#include <util/generic/vector.h>
+#include <util/generic/hash_set.h>
+#include <util/generic/hash.h>
+#include <util/generic/string.h>
+#include <util/system/guard.h>
+#include <util/system/spinlock.h>
+
+namespace NYql {
+namespace NCommon {
+
+using namespace NKikimr;
+using namespace NKikimr::NMiniKQL;
+
+class TSimpleUdfResolver : public IUdfResolver {
+public:
+ TSimpleUdfResolver(const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry, const TFileStoragePtr& fileStorage, bool useFakeMD5)
+ : FunctionRegistry_(functionRegistry)
+ , FileStorage_(fileStorage)
+ , TypeInfoHelper_(new TTypeInfoHelper)
+ , UseFakeMD5_(useFakeMD5)
+ {}
+
+ TString GetMD5(const TString& path) const {
+ if (UseFakeMD5_) {
+ return MD5::Calc(path);
+ } else {
+ return {};
+ }
+ }
+
+ TMaybe<TFilePathWithMd5> GetSystemModulePath(const TStringBuf& moduleName) const override {
+ with_lock(Lock_) {
+ auto path = FunctionRegistry_->FindUdfPath(moduleName);
+ return path ? MakeMaybe<TFilePathWithMd5>(*path, GetMD5(*path)) : Nothing();
+ }
+ }
+
+ bool LoadMetadata(const TVector<TImport*>& imports,
+ const TVector<TFunction*>& functions, TExprContext& ctx) const override {
+
+ with_lock(Lock_) {
+ bool hasErrors = false;
+ THashSet<TString> requiredModules;
+ for (auto udfPtr : functions) {
+ auto& udf = *udfPtr;
+ TStringBuf moduleName, funcName;
+ if (!SplitUdfName(udf.Name, moduleName, funcName) || moduleName.empty() || funcName.empty()) {
+ ctx.AddError(TIssue(udf.Pos, TStringBuilder() <<
+ "Incorrect format of function name: " << udf.Name));
+ hasErrors = true;
+ } else {
+ requiredModules.insert(TString(moduleName));
+ }
+ }
+
+ THoldingFileStorage holdingFileStorage(FileStorage_);
+ auto newRegistry = FunctionRegistry_->Clone();
+ THashMap<std::pair<TString, TString>, THashSet<TString>> cachedModules;
+ for (auto import: imports) {
+ if (import->Modules) {
+ bool needLibrary = false;
+ for (auto& m : *import->Modules) {
+ if (requiredModules.contains(m)) {
+ needLibrary = true;
+ break;
+ }
+ }
+
+ if (!needLibrary) {
+ continue;
+ }
+ } else {
+ import->Modules.ConstructInPlace();
+ }
+ const TString& customUdfPrefix = import->Block->CustomUdfPrefix;
+ try {
+ THashSet<TString> modules;
+ if (FileStorage_) {
+ auto link = holdingFileStorage.FreezeFile(*import->Block);
+ auto path = link->GetPath().GetPath();
+ auto [it, inserted] = cachedModules.emplace(std::make_pair(path, customUdfPrefix), THashSet<TString>());
+ if (inserted) {
+ newRegistry->LoadUdfs(
+ path,
+ {},
+ NUdf::IRegistrator::TFlags::TypesOnly,
+ customUdfPrefix,
+ &modules);
+ it->second = modules;
+ } else {
+ modules = it->second;
+ }
+ } else {
+ if (import->Block->Type != EUserDataType::PATH) {
+ ctx.AddError(TIssue(import->Pos, TStringBuilder() <<
+ "Only path file type is supported, cannot load file with alias: " << import->FileAlias));
+ hasErrors = true;
+ continue;
+ }
+ auto [it, inserted] = cachedModules.emplace(std::make_pair(import->Block->Data, customUdfPrefix), THashSet<TString>());
+ if (inserted) {
+ newRegistry->LoadUdfs(
+ import->Block->Data,
+ {},
+ NUdf::IRegistrator::TFlags::TypesOnly,
+ customUdfPrefix,
+ &modules);
+ it->second = modules;
+ } else {
+ modules = it->second;
+ }
+ }
+
+ import->Modules->assign(modules.begin(), modules.end());
+ }
+ catch (yexception& e) {
+ ctx.AddError(TIssue(import->Pos, TStringBuilder()
+ << "Internal error of loading udf module: " << import->FileAlias
+ << ", reason: " << e.what()));
+ hasErrors = true;
+ }
+ }
+
+ hasErrors = !LoadFunctionsMetadata(functions, *newRegistry, TypeInfoHelper_, ctx) || hasErrors;
+ return !hasErrors;
+ }
+ }
+
+ TResolveResult LoadRichMetadata(const TVector<TImport>& imports) const override {
+ Y_UNUSED(imports);
+ ythrow yexception() << "LoadRichMetadata is not supported in SimpleUdfResolver";
+ }
+
+ bool ContainsModule(const TStringBuf& moduleName) const override {
+ return FunctionRegistry_->IsLoadedUdfModule(moduleName);
+ }
+
+private:
+ mutable TAdaptiveLock Lock_;
+ const NKikimr::NMiniKQL::IFunctionRegistry* FunctionRegistry_;
+ TFileStoragePtr FileStorage_;
+ NUdf::ITypeInfoHelper::TPtr TypeInfoHelper_;
+ const bool UseFakeMD5_;
+};
+
+IUdfResolver::TPtr CreateSimpleUdfResolver(
+ const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry,
+ const TFileStoragePtr& fileStorage,
+ bool useFakeMD5
+) {
+ return new TSimpleUdfResolver(functionRegistry, fileStorage, useFakeMD5);
+}
+
+bool LoadFunctionsMetadata(const TVector<IUdfResolver::TFunction*>& functions,
+ const NKikimr::NMiniKQL::IFunctionRegistry& functionRegistry,
+ NUdf::ITypeInfoHelper::TPtr typeInfoHelper,
+ TExprContext& ctx) {
+
+ bool hasErrors = false;
+ TScopedAlloc alloc(__LOCATION__);
+ TTypeEnvironment env(alloc);
+
+ for (auto udfPtr : functions) {
+ auto& udf = *udfPtr;
+ try {
+ TType* mkqlUserType = nullptr;
+ if (udf.UserType) {
+ TStringStream err;
+ mkqlUserType = BuildType(*udf.UserType, {env}, err);//
+ if (!mkqlUserType) {
+ ctx.AddError(TIssue(udf.Pos, TStringBuilder() << "Invalid user type for function: "
+ << udf.Name << ", error: " << err.Str()));
+ hasErrors = true;
+ continue;
+ }
+ }
+
+ auto secureParamsProvider = MakeSimpleSecureParamsProvider(udf.SecureParams);
+
+ TFunctionTypeInfo funcInfo;
+ auto status = functionRegistry.FindFunctionTypeInfo(env, typeInfoHelper, nullptr,
+ udf.Name, mkqlUserType, udf.TypeConfig, NUdf::IUdfModule::TFlags::TypesOnly, {}, secureParamsProvider.get(), &funcInfo);
+ if (!status.IsOk()) {
+ ctx.AddError(TIssue(udf.Pos, TStringBuilder() << "Failed to find UDF function: " << udf.Name
+ << ", reason: " << status.GetError()));
+ hasErrors = true;
+ continue;
+ }
+
+ udf.CallableType = ConvertMiniKQLType(udf.Pos, funcInfo.FunctionType, ctx);
+ YQL_ENSURE(udf.CallableType);
+ if (funcInfo.RunConfigType) {
+ udf.RunConfigType = ConvertMiniKQLType(udf.Pos, const_cast<TType*>(funcInfo.RunConfigType), ctx);
+ YQL_ENSURE(udf.RunConfigType);
+ }
+
+ if (funcInfo.UserType) {
+ udf.NormalizedUserType = ConvertMiniKQLType(udf.Pos, const_cast<TType*>(funcInfo.UserType), ctx);
+ YQL_ENSURE(udf.NormalizedUserType);
+ }
+
+ udf.SupportsBlocks = funcInfo.SupportsBlocks;
+ udf.IsStrict = funcInfo.IsStrict;
+ } catch (const std::exception& e) {
+ ctx.AddError(TIssue(udf.Pos, TStringBuilder()
+ << "Internal error was found when udf metadata is loading for function: " << udf.Name
+ << ", reason: " << e.what()));
+ hasErrors = true;
+ }
+ }
+
+ return !hasErrors;
+}
+
+} // namespace NCommon
+} // namespace NYql
diff --git a/yql/essentials/providers/common/udf_resolve/yql_simple_udf_resolver.h b/yql/essentials/providers/common/udf_resolve/yql_simple_udf_resolver.h
new file mode 100644
index 00000000000..87b40e97b52
--- /dev/null
+++ b/yql/essentials/providers/common/udf_resolve/yql_simple_udf_resolver.h
@@ -0,0 +1,22 @@
+#pragma once
+
+#include <yql/essentials/core/yql_type_annotation.h>
+#include <yql/essentials/core/file_storage/file_storage.h>
+
+#include <yql/essentials/minikql/mkql_alloc.h>
+#include <yql/essentials/minikql/mkql_function_registry.h>
+
+namespace NYql {
+namespace NCommon {
+
+IUdfResolver::TPtr CreateSimpleUdfResolver(
+ const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry,
+ const TFileStoragePtr& fileStorage = {}, bool useFakeMD5 = false);
+
+bool LoadFunctionsMetadata(const TVector<IUdfResolver::TFunction*>& functions,
+ const NKikimr::NMiniKQL::IFunctionRegistry& functionRegistry,
+ NUdf::ITypeInfoHelper::TPtr typeInfoHelper,
+ TExprContext& ctx);
+
+} // namespace NCommon
+} // namespace NYql
diff --git a/yql/essentials/providers/common/udf_resolve/yql_udf_resolver_with_index.cpp b/yql/essentials/providers/common/udf_resolve/yql_udf_resolver_with_index.cpp
new file mode 100644
index 00000000000..2ce67fa0f60
--- /dev/null
+++ b/yql/essentials/providers/common/udf_resolve/yql_udf_resolver_with_index.cpp
@@ -0,0 +1,248 @@
+#include "yql_udf_resolver_with_index.h"
+
+#include <yql/essentials/providers/common/schema/expr/yql_expr_schema.h>
+#include <yql/essentials/core/yql_type_annotation.h>
+
+#include <yql/essentials/minikql/mkql_node.h>
+#include <yql/essentials/minikql/mkql_type_builder.h>
+#include <yql/essentials/minikql/mkql_program_builder.h>
+#include <yql/essentials/minikql/mkql_utils.h>
+
+#include <util/generic/hash_set.h>
+#include <util/generic/hash.h>
+#include <util/generic/map.h>
+#include <util/generic/set.h>
+#include <util/generic/string.h>
+#include <util/system/guard.h>
+#include <util/system/mutex.h>
+
+namespace NYql {
+namespace NCommon {
+
+using namespace NKikimr;
+using namespace NKikimr::NMiniKQL;
+
+class TUdfResolverWithIndex : public IUdfResolver {
+ class TResourceFile : public TThrRefBase {
+ public:
+ typedef TIntrusivePtr<TResourceFile> TPtr;
+
+ public:
+ TResourceFile(TString alias, const TVector<TString>& modules, TFileLinkPtr link)
+ : Link_(std::move(link))
+ {
+ Import_.FileAlias = alias;
+ Import_.Block = &Block_;
+ Import_.Modules = MakeMaybe(modules);
+
+ Block_.Type = EUserDataType::PATH;
+ Block_.Data = Link_->GetPath();
+ Block_.Usage.Set(EUserDataBlockUsage::Udf);
+ }
+
+ static TResourceFile::TPtr Create(const TString& packageName, const TSet<TString>& modules, TFileLinkPtr link) {
+ // assume package name has no bad symbols for file name
+ TString basename = link->GetPath().Basename();
+ TString alias = basename.StartsWith("lib") ? basename : ("lib_" + packageName + "_udf.so");
+ alias.to_lower();
+ return MakeIntrusive<TResourceFile>(std::move(alias), TVector<TString>(modules.begin(), modules.end()), std::move(link));
+ }
+
+ public:
+ TFileLinkPtr Link_;
+ TUserDataBlock Block_;
+ TImport Import_;
+ };
+
+public:
+ TUdfResolverWithIndex(TUdfIndex::TPtr udfIndex, IUdfResolver::TPtr fallback, TFileStoragePtr fileStorage)
+ : UdfIndex_(udfIndex)
+ , Fallback_(fallback)
+ , FileStorage_(fileStorage)
+ {
+ Y_ENSURE(UdfIndex_);
+ Y_ENSURE(FileStorage_);
+ // fallback is required only to handle type aware functions and loading rich metadata
+ Y_ENSURE(Fallback_);
+ }
+
+ TMaybe<TFilePathWithMd5> GetSystemModulePath(const TStringBuf& moduleName) const override {
+ with_lock(Lock_) {
+ TString moduleNameStr(moduleName);
+ if (!UdfIndex_->ContainsModule(moduleNameStr)) {
+ return Nothing();
+ }
+
+ auto file = DownloadFileWithModule(moduleNameStr);
+ return MakeMaybe<TFilePathWithMd5>(file->Link_->GetPath(), file->Link_->GetMd5());
+ }
+ }
+
+ bool LoadMetadata(const TVector<TImport*>& imports, const TVector<TFunction*>& functions, TExprContext& ctx) const override {
+ with_lock(Lock_) {
+ bool hasErrors = false;
+ THashSet<TString> requiredModules;
+ TVector<TFunction*> fallbackFunctions;
+ TVector<TImport*> fallbackImports = imports;
+ TSet<TImport*> additionalImports; // avoid duplicates
+
+ for (auto udfPtr : functions) {
+ TImport* additionalImport = nullptr;
+ TFunction* fallbackFunction = nullptr;
+ if (!LoadFunctionMetadata(*udfPtr, ctx, fallbackFunction, additionalImport)) {
+ hasErrors = true;
+ continue;
+ }
+
+ if (additionalImport) {
+ additionalImports.insert(additionalImport);
+ }
+
+ if (fallbackFunction) {
+ fallbackFunctions.push_back(fallbackFunction);
+ }
+ }
+
+ fallbackImports.insert(fallbackImports.end(), additionalImports.begin(), additionalImports.end());
+
+ return Fallback_->LoadMetadata(fallbackImports, fallbackFunctions, ctx) && !hasErrors;
+ }
+ }
+
+ TResolveResult LoadRichMetadata(const TVector<TImport>& imports) const override {
+ return Fallback_->LoadRichMetadata(imports);
+ }
+
+ bool ContainsModule(const TStringBuf& moduleName) const override {
+ TString moduleNameStr = TString(moduleName);
+ if (UdfIndex_->ContainsModule(moduleNameStr)) {
+ return true;
+ }
+
+ return Fallback_->ContainsModule(moduleName);
+ }
+
+private:
+ bool LoadFunctionMetadata(TFunction& function, TExprContext& ctx, TFunction*& fallbackFunction, TImport*& additionalImport) const {
+ TStringBuf moduleName, funcName;
+ if (!SplitUdfName(function.Name, moduleName, funcName) || moduleName.empty() || funcName.empty()) {
+ ctx.AddError(TIssue(function.Pos, TStringBuilder() << "Incorrect format of function name: " << function.Name));
+ return false;
+ }
+
+ /*
+ the order is really important:
+ 1) check we have such module
+ no-> fallback function
+ 2) check we have such function
+ no -> error
+ 3) download resource file
+ fail -> error
+ 4) if polymorphic function -> fallback function with additional Import for downloaded file
+ */
+
+ TString moduleNameStr = TString(moduleName);
+ if (!UdfIndex_->ContainsModule(moduleNameStr)) {
+ fallbackFunction = &function;
+ return true;
+ }
+
+ TFunctionInfo info;
+ if (!UdfIndex_->FindFunction(moduleNameStr, function.Name, info)) {
+ ctx.AddError(TIssue(function.Pos, TStringBuilder() << "Function not found: " << function.Name));
+ return false;
+ }
+
+ TResourceFile::TPtr file = DownloadFileWithModule(moduleName, function.Pos, ctx);
+ if (!file) {
+ return false;
+ }
+
+ additionalImport = &file->Import_;
+
+ if (info.IsTypeAwareness) {
+ fallbackFunction = &function;
+ return true;
+ }
+
+ if (!info.CallableType) {
+ ctx.AddError(TIssue(function.Pos, TStringBuilder() << "CallableType for function " << function.Name << " is empty. Check UDF source code for errors."));
+ return false;
+ }
+
+ function.CallableType = ParseTypeFromYson(TStringBuf{info.CallableType}, ctx, function.Pos);
+ if (!function.CallableType) {
+ ctx.AddError(TIssue(function.Pos, TStringBuilder() << "Failed to build callable type from YSON for function " << function.Name));
+ return false;
+ }
+
+ if (info.RunConfigType) {
+ function.RunConfigType = ParseTypeFromYson(TStringBuf{info.RunConfigType}, ctx, function.Pos);
+ if (!function.RunConfigType) {
+ ctx.AddError(TIssue(function.Pos, TStringBuilder() << "Failed to build run config type from YSON for function " << function.Name));
+ return false;
+ }
+ } else {
+ function.RunConfigType = std::get<0>(ctx.SingletonTypeCache);
+ }
+
+ function.NormalizedUserType = std::get<0>(ctx.SingletonTypeCache);
+ function.IsStrict = info.IsStrict;
+ function.SupportsBlocks = info.SupportsBlocks;
+ return true;
+ }
+
+ TResourceFile::TPtr DownloadFileWithModule(const TStringBuf& module, const TPosition& pos, TExprContext& ctx) const {
+ try {
+ return DownloadFileWithModule(module);
+ } catch (const std::exception& e) {
+ ctx.AddError(ExceptionToIssue(e, pos));
+ }
+
+ return nullptr;
+ }
+
+ TResourceFile::TPtr DownloadFileWithModule(const TStringBuf& module) const {
+ TString moduleName(module);
+
+ const auto it = DownloadedFiles_.find(module);
+ if (it != DownloadedFiles_.end()) {
+ return it->second;
+ }
+
+ auto resource = UdfIndex_->FindResourceByModule(moduleName);
+ if (!resource) {
+ ythrow yexception() << "No resource has been found for registered module " << moduleName;
+ }
+
+ // token is empty for urls for now
+ // assumption: file path is frozen already, no need to put into file storage
+ const TDownloadLink& downloadLink = resource->Link;
+ TFileLinkPtr link = downloadLink.IsUrl ? FileStorage_->PutUrl(downloadLink.Path, {}) : CreateFakeFileLink(downloadLink.Path, downloadLink.Md5);
+ TResourceFile::TPtr file = TResourceFile::Create(moduleName, resource->Modules, link);
+ for (auto& d : resource->Modules) {
+ auto p = DownloadedFiles_.emplace(d, file);
+ if (!p.second) {
+ // should not happen because UdfIndex handles conflicts
+ ythrow yexception() << "file already downloaded for module " << moduleName << ", conflicting path " << downloadLink.Path << ", existing local file " << p.first->second->Link_->GetPath();
+ }
+ }
+
+ return file;
+ }
+
+private:
+ mutable TMutex Lock_;
+ const TUdfIndex::TPtr UdfIndex_;
+ const IUdfResolver::TPtr Fallback_;
+ const TFileStoragePtr FileStorage_;
+ // module -> downloaded resource file
+ mutable TMap<TString, TResourceFile::TPtr> DownloadedFiles_;
+};
+
+IUdfResolver::TPtr CreateUdfResolverWithIndex(TUdfIndex::TPtr udfIndex, IUdfResolver::TPtr fallback, TFileStoragePtr fileStorage) {
+ return new TUdfResolverWithIndex(udfIndex, fallback, fileStorage);
+}
+
+} // namespace NCommon
+} // namespace NYql
diff --git a/yql/essentials/providers/common/udf_resolve/yql_udf_resolver_with_index.h b/yql/essentials/providers/common/udf_resolve/yql_udf_resolver_with_index.h
new file mode 100644
index 00000000000..86bff50c255
--- /dev/null
+++ b/yql/essentials/providers/common/udf_resolve/yql_udf_resolver_with_index.h
@@ -0,0 +1,12 @@
+#pragma once
+
+#include <yql/essentials/core/yql_udf_index.h>
+#include <yql/essentials/core/file_storage/file_storage.h>
+
+namespace NYql {
+namespace NCommon {
+
+IUdfResolver::TPtr CreateUdfResolverWithIndex(TUdfIndex::TPtr udfIndex, IUdfResolver::TPtr fallback, TFileStoragePtr fileStorage);
+
+} // namespace NCommon
+} // namespace NYql