diff options
| author | vvvv <[email protected]> | 2024-11-07 12:29:36 +0300 | 
|---|---|---|
| committer | vvvv <[email protected]> | 2024-11-07 13:49:47 +0300 | 
| commit | d4c258e9431675bab6745c8638df6e3dfd4dca6b (patch) | |
| tree | b5efcfa11351152a4c872fccaea35749141c0b11 /yql/essentials/providers/common/udf_resolve | |
| parent | 13a4f274caef5cfdaf0263b24e4d6bdd5521472b (diff) | |
Moved other yql/essentials libs YQL-19206
init
commit_hash:7d4c435602078407bbf20dd3c32f9c90d2bbcbc0
Diffstat (limited to 'yql/essentials/providers/common/udf_resolve')
9 files changed, 1111 insertions, 0 deletions
diff --git a/yql/essentials/providers/common/udf_resolve/ya.make b/yql/essentials/providers/common/udf_resolve/ya.make new file mode 100644 index 00000000000..65fa463b8c4 --- /dev/null +++ b/yql/essentials/providers/common/udf_resolve/ya.make @@ -0,0 +1,30 @@ +LIBRARY() + +SRCS( +    yql_files_box.cpp +    yql_files_box.h +    yql_outproc_udf_resolver.cpp +    yql_outproc_udf_resolver.h +    yql_simple_udf_resolver.cpp +    yql_simple_udf_resolver.h +    yql_udf_resolver_with_index.cpp +    yql_udf_resolver_with_index.h +) + +PEERDIR( +    library/cpp/digest/md5 +    library/cpp/protobuf/util +    yql/essentials/core/file_storage +    yql/essentials/minikql +    yql/essentials/public/udf +    yql/essentials/utils +    yql/essentials/utils/log +    yql/essentials/core +    yql/essentials/providers/common/mkql +    yql/essentials/providers/common/proto +    yql/essentials/providers/common/schema/expr +) + +YQL_LAST_ABI_VERSION() + +END() diff --git a/yql/essentials/providers/common/udf_resolve/yql_files_box.cpp b/yql/essentials/providers/common/udf_resolve/yql_files_box.cpp new file mode 100644 index 00000000000..0220ba8f6c9 --- /dev/null +++ b/yql/essentials/providers/common/udf_resolve/yql_files_box.cpp @@ -0,0 +1,87 @@ +#include "yql_files_box.h" + +#include <yql/essentials/core/file_storage/storage.h> +#include <yql/essentials/utils/log/log.h> + +#include <util/system/fs.h> +#include <util/system/error.h> +#include <util/system/sysstat.h> +#include <util/folder/dirut.h> + +namespace NYql { +namespace NCommon { + +TFilesBox::TFilesBox(TFsPath dir, TRandGuid randGuid) +    : Dir(std::move(dir)) +    , RandGuid(std::move(randGuid)) +{ +} + +TFilesBox::~TFilesBox() { +    try { +        Destroy(); +    } catch (...) { +        YQL_LOG(ERROR) << "Error occurred in files box destroy: " << CurrentExceptionMessage(); +    } +} + +TString TFilesBox::MakeLinkFrom(const TString& source, const TString& filename) { +    if (!filename) { +        if (auto* existingPath = Mapping.FindPtr(source)) { +            return *existingPath; +        } +    } + +    TFsPath sourcePath(source); +    TString sourceAbsolutePath = sourcePath.IsAbsolute() ? source : (TFsPath::Cwd() / sourcePath).GetPath(); +    TString path; +    if (filename) { +        path = Dir / filename; +        if (!NFs::SymLink(sourceAbsolutePath, path)) { +            ythrow TSystemError() << "Failed to create symlink for file " << sourceAbsolutePath.Quote() << " to file " << path.Quote(); +        } +    } else { +        while (true) { +            path = Dir / RandGuid.GenGuid(); +            if (NFs::SymLink(sourceAbsolutePath, path)) { +                break; +            } else if (LastSystemError() != EEXIST) { +                ythrow TSystemError() << "Failed to create symlink for file " << sourceAbsolutePath.Quote() << " to file " << path.Quote(); +            } +        } +        Mapping.emplace(source, path); +    } +    return path; +} + +TString TFilesBox::GetDir() const { +    return Dir; +} + +void TFilesBox::Destroy() { +    Mapping.clear(); +    Dir.ForceDelete(); +} + +THolder<TFilesBox> CreateFilesBox(const TFsPath& baseDir) { +    TRandGuid randGuid; +    TFsPath path = baseDir / randGuid.GenGuid(); + +    while (true) { +        if (!path.Exists()) { +            int r = Mkdir(path.c_str(), MODE0711); +            if (r == 0) { +                break; +            } +            if (LastSystemError() != EEXIST) { +                ythrow TIoSystemError() << "could not create directory " << path; +            } +        } +        path = baseDir / randGuid.GenGuid(); +    } + +    return MakeHolder<TFilesBox>(std::move(path), std::move(randGuid)); +} + +} +} diff --git a/yql/essentials/providers/common/udf_resolve/yql_files_box.h b/yql/essentials/providers/common/udf_resolve/yql_files_box.h new file mode 100644 index 00000000000..d7f348660c2 --- /dev/null +++ b/yql/essentials/providers/common/udf_resolve/yql_files_box.h @@ -0,0 +1,33 @@ +#pragma once + +#include <yql/essentials/utils/rand_guid.h> + +#include <util/generic/hash.h> +#include <util/folder/path.h> + +namespace NYql { +namespace NCommon { + +/* +  Resembles sandbox for external UDFs +*/ +class TFilesBox { +public: +    TFilesBox(TFsPath dir, TRandGuid randGuid); +    ~TFilesBox(); + +    TString MakeLinkFrom(const TString& source, const TString& filename = {}); +    TString GetDir() const; + +    void Destroy(); + +private: +    TFsPath Dir; +    TRandGuid RandGuid; +    THashMap<TString, TString> Mapping; +}; + +THolder<TFilesBox> CreateFilesBox(const TFsPath& baseDir); + +} +} diff --git a/yql/essentials/providers/common/udf_resolve/yql_outproc_udf_resolver.cpp b/yql/essentials/providers/common/udf_resolve/yql_outproc_udf_resolver.cpp new file mode 100644 index 00000000000..02637d16d8e --- /dev/null +++ b/yql/essentials/providers/common/udf_resolve/yql_outproc_udf_resolver.cpp @@ -0,0 +1,420 @@ +#include "yql_outproc_udf_resolver.h" +#include "yql_simple_udf_resolver.h" +#include "yql_files_box.h" + +#include <yql/essentials/providers/common/proto/udf_resolver.pb.h> +#include <yql/essentials/providers/common/schema/expr/yql_expr_schema.h> +#include <yql/essentials/core/yql_holding_file_storage.h> +#include <yql/essentials/core/yql_type_annotation.h> +#include <yql/essentials/utils/log/log.h> +#include <yql/essentials/utils/retry.h> + +#include <yql/essentials/minikql/mkql_node.h> +#include <yql/essentials/minikql/mkql_type_builder.h> +#include <yql/essentials/minikql/mkql_program_builder.h> +#include <yql/essentials/minikql/mkql_utils.h> + +#include <library/cpp/protobuf/util/pb_io.h> + +#include <util/generic/scope.h> +#include <util/stream/str.h> +#include <util/string/strip.h> +#include <util/system/shellcommand.h> +#include <util/string/split.h> + +#include <regex> + +namespace NYql { +namespace NCommon { + +using namespace NKikimr; +using namespace NKikimr::NMiniKQL; + +namespace { +template <typename F> +void RunResolver( +    const TString& resolverPath, +    const TList<TString>& args, +    IInputStream* input, +    const F& outputHandler, +    const TString& ldLibraryPath = {}) { + +    TShellCommandOptions shellOptions; +    shellOptions +        .SetUseShell(false) +        .SetDetachSession(false) +        .SetInputStream(input); // input can be nullptr + +    if (ldLibraryPath) { +        YQL_LOG(DEBUG) << "Using LD_LIBRARY_PATH = " << ldLibraryPath << " for Udf resolver"; +        shellOptions.Environment["LD_LIBRARY_PATH"] = ldLibraryPath; +    } + +    TShellCommand shell(resolverPath, args, shellOptions); + +    switch (shell.Run().GetStatus()) { +    case TShellCommand::SHELL_INTERNAL_ERROR: +        ythrow yexception() << "Udf resolver internal error: " +            << shell.GetInternalError(); +    case TShellCommand::SHELL_ERROR: +        ythrow yexception() << "Udf resolver shell error: " +            << StripString(shell.GetError()); +    case TShellCommand::SHELL_FINISHED: +        break; +    default: +        ythrow yexception() << "Unexpected udf resolver state: " +            << int(shell.GetStatus()); +    } + +    if (shell.GetError()) { +        YQL_LOG(INFO) << "UdfResolver stderr: " << shell.GetError(); +    } + +    outputHandler(shell.GetOutput()); +} + +template <typename F> +void RunResolver( +    const TString& resolverPath, +    const TList<TString>& args, +    const TResolve& request, +    const F& outputHandler, +    const TString& ldLibraryPath = {}) { + +    TStringStream input; +    YQL_ENSURE(request.SerializeToArcadiaStream(&input), "Cannot serialize TResolve proto message"); +    RunResolver(resolverPath, args, &input, outputHandler, ldLibraryPath); +} + +TString ExtractSharedObjectNameFromErrorMessage(const char* message) { +    if (!message) { +        return ""; +    } + +    // example: +    // util/system/dynlib.cpp:56: libcuda.so.1: cannot open shared object file: No such file or directory +    static std::regex re(".*: (.+): cannot open shared object file: No such file or directory"); +    std::cmatch match; +    if (!std::regex_match(message, match, re)) { +        return ""; +    } + +    return TString(match[1].str()); +} +} + +class TOutProcUdfResolver : public IUdfResolver { +public: +    TOutProcUdfResolver(const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry, +        const TFileStoragePtr& fileStorage, const TString& resolverPath, +        const TString& user, const TString& group, bool filterSyscalls, +        const TString& udfDependencyStubPath, const TMap<TString, TString>& path2md5) +        : FunctionRegistry_(functionRegistry) +        , TypeInfoHelper_(new TTypeInfoHelper) +        , FileStorage_(fileStorage) +        , ResolverPath_(resolverPath) +        , UdfDependencyStubPath_(udfDependencyStubPath) +        , Path2Md5_(path2md5) +    { +        if (user) { +            UserGroupArgs_ = { "-U", user, "-G", group }; +        } + +        if (filterSyscalls) { +            UserGroupArgs_.push_back("-F"); +        } +    } + +    TMaybe<TFilePathWithMd5> GetSystemModulePath(const TStringBuf& moduleName) const override { +        auto path = FunctionRegistry_->FindUdfPath(moduleName); +        if (!path) { +            return Nothing(); +        } + +        const TString md5 = Path2Md5_.Value(*path, ""); +        return MakeMaybe<TFilePathWithMd5>(*path, md5); +    } + +    bool ContainsModule(const TStringBuf& moduleName) const override { +        return FunctionRegistry_->IsLoadedUdfModule(moduleName); +    } + +    bool LoadMetadata(const TVector<TImport*>& imports, const TVector<TFunction*>& functions, TExprContext& ctx) const override { +        THashSet<TString> requiredLoadedModules; +        THashSet<TString> requiredExternalModules; +        TVector<TFunction*> loadedFunctions; +        TVector<TFunction*> externalFunctions; + +        bool hasErrors = false; +        for (auto udf : functions) { +            TStringBuf moduleName, funcName; +            if (!SplitUdfName(udf->Name, moduleName, funcName) || moduleName.empty() || funcName.empty()) { +                ctx.AddError(TIssue(udf->Pos, TStringBuilder() << +                    "Incorrect format of function name: " << udf->Name)); +                hasErrors = true; +            } else { +                if (FunctionRegistry_->IsLoadedUdfModule(moduleName)) { +                    requiredLoadedModules.insert(TString(moduleName)); +                    loadedFunctions.push_back(udf); +                } else { +                    requiredExternalModules.insert(TString(moduleName)); +                    externalFunctions.push_back(udf); +                } +            } +        } + +        TResolve request; +        TVector<TImport*> usedImports; +        THoldingFileStorage holdingFileStorage(FileStorage_); +        THolder<TFilesBox> filesBox = CreateFilesBoxOverFileStorageTemp(); + +        THashMap<TString, TImport*> path2LoadedImport; +        for (auto import : imports) { +            if (import->Modules) { +                bool needLibrary = false; +                for (auto& m : *import->Modules) { +                    if (requiredLoadedModules.contains(m)) { +                        YQL_ENSURE(import->Block->Type == EUserDataType::PATH); +                        path2LoadedImport[import->Block->Data] = import; +                    } + +                    if (requiredExternalModules.contains(m)) { +                        needLibrary = true; +                        break; +                    } +                } + +                if (!needLibrary) { +                    continue; +                } +            } else { +                import->Modules.ConstructInPlace(); +            } + +            try { +                LoadImport(holdingFileStorage, *filesBox, *import, request); +                usedImports.push_back(import); +            } catch (const std::exception& e) { +                ctx.AddError(ExceptionToIssue(e)); +                hasErrors = true; +            } +        } + +        for (auto& module : requiredExternalModules) { +            if (auto path = FunctionRegistry_->FindUdfPath(module)) { +                auto importRequest = request.AddImports(); +                const TString hiddenPath = filesBox->MakeLinkFrom(*path); +                importRequest->SetFileAlias(hiddenPath); +                importRequest->SetPath(hiddenPath); +                importRequest->SetSystem(true); +            } +        } + + +        for (auto udf : externalFunctions) { +            auto udfRequest = request.AddUdfs(); +            udfRequest->SetName(udf->Name); +            udfRequest->SetTypeConfig(udf->TypeConfig); +            if (udf->UserType) { +                udfRequest->SetUserType(WriteTypeToYson(udf->UserType)); +            } +        } + +        TResolveResult response; +        try { +            response = RunResolverAndParseResult(request, { }, *filesBox); +            filesBox->Destroy(); +        } catch (const std::exception& e) { +            ctx.AddError(ExceptionToIssue(e)); +            return false; +        } + +        // extract regardless of hasErrors value +        hasErrors = !ExtractMetadata(response, usedImports, externalFunctions, ctx) || hasErrors; +        hasErrors = !LoadFunctionsMetadata(loadedFunctions, *FunctionRegistry_, TypeInfoHelper_, ctx) || hasErrors; + +        if (!hasErrors) { +            for (auto& m : FunctionRegistry_->GetAllModuleNames()) { +                auto path = *FunctionRegistry_->FindUdfPath(m); +                if (auto import = path2LoadedImport.FindPtr(path)) { +                    (*import)->Modules->push_back(m); +                } +            } +        } + +        return !hasErrors; +    } + +    TResolveResult LoadRichMetadata(const TVector<TImport>& imports) const override { +        TResolve request; +        THoldingFileStorage holdingFileStorage(FileStorage_); +        THolder<TFilesBox> filesBox = CreateFilesBoxOverFileStorageTemp(); +        Y_DEFER { +            filesBox->Destroy(); +        }; + +        for (auto import : imports) { +            LoadImport(holdingFileStorage, *filesBox, import, request); +        } + +        return RunResolverAndParseResult(request, { "--discover-proto" }, *filesBox); +    } + +private: +    THolder<TFilesBox> CreateFilesBoxOverFileStorageTemp() const { +        return CreateFilesBox(FileStorage_->GetTemp()); +    } + +    void LoadImport(THoldingFileStorage& holdingFileStorage, TFilesBox& filesBox, const TImport& import, TResolve& request) const { +        const TString path = (import.Block->Type == EUserDataType::PATH) ? import.Block->Data : holdingFileStorage.FreezeFile(*import.Block)->GetPath().GetPath(); + +        const TString hiddenPath = filesBox.MakeLinkFrom(path); +        auto importRequest = request.AddImports(); +        importRequest->SetFileAlias(import.FileAlias); +        importRequest->SetPath(hiddenPath); +        importRequest->SetCustomUdfPrefix(import.Block->CustomUdfPrefix); +    } + +    TResolveResult RunResolverAndParseResult(const TResolve& request, const TVector<TString>& additionalArgs, TFilesBox& filesBox) const { +        auto args = UserGroupArgs_; +        args.insert(args.end(), additionalArgs.begin(), additionalArgs.end()); + +        TString ldLibraryPath; +        TSet<TString> stubbedLibraries; +        return WithRetry<yexception>(10, [&]() { +            TResolveResult response; +            RunResolver(ResolverPath_, args, request, [&](const TString& output) { +                YQL_ENSURE(response.ParseFromString(output), "Cannot deserialize TResolveResult proto message"); +            }, ldLibraryPath); +            return response; +        }, [&](const yexception& e, int, int) { +            TStringStream stream; +            SerializeToTextFormat(request, stream); +            YQL_LOG(DEBUG) << "Exception from UdfResolver: " << e.what() << " for request " << stream.Str(); +            if (!UdfDependencyStubPath_) { +                YQL_LOG(DEBUG) << "UdfDependencyStubPath is not specified, unable to recover error " << e.what(); +                throw e; +            } + +            TString sharedLibrary = ExtractSharedObjectNameFromErrorMessage(e.what()); +            if (!sharedLibrary) { +                throw e; +            } + +            YQL_LOG(DEBUG) << "UdfResolver needs shared library " << sharedLibrary; +            if (!stubbedLibraries.emplace(sharedLibrary).second) { +                // already tried, giving up +                YQL_LOG(ERROR) << "Unable to load shared library " << sharedLibrary << " even after using dependency stub"; +                throw e; +            } + +            YQL_LOG(DEBUG) << "Using dependency stub for shared library " << sharedLibrary; +            PutSharedLibraryStub(sharedLibrary, filesBox); +            ldLibraryPath = filesBox.GetDir(); +        }); +    } + +    void PutSharedLibraryStub(const TString& sharedLibrary, TFilesBox& filesBox) const { +        YQL_ENSURE(UdfDependencyStubPath_); +        filesBox.MakeLinkFrom(UdfDependencyStubPath_, sharedLibrary); +    } + +    static bool ExtractMetadata(const TResolveResult& response, const TVector<TImport*>& usedImports, const TVector<TFunction*>& functions, TExprContext& ctx) { +        bool hasErrors = false; +        YQL_ENSURE(response.UdfsSize() == functions.size(), "Number of returned udf signatures doesn't match original one"); +        YQL_ENSURE(response.ImportsSize() >= usedImports.size(), "Number of returned udf modules is too low"); + +        for (size_t i = 0; i < usedImports.size(); ++i) { +            const TImportResult& importRes = response.GetImports(i); + +            TImport* import = usedImports[i]; +            if (importRes.HasError()) { +                ctx.AddError(TIssue(import ? import->Pos : TPosition(), importRes.GetError())); +                hasErrors = true; +            } else { +                import->Modules.ConstructInPlace(); +                for (auto& module : importRes.GetModules()) { +                    import->Modules->push_back(module); +                } +            } +        } + +        for (size_t i = 0; i < response.UdfsSize(); ++i) { +            TFunction* udf = functions[i]; +            const TFunctionResult& udfRes = response.GetUdfs(i); +            if (udfRes.HasError()) { +                ctx.AddError(TIssue(udf->Pos, udfRes.GetError())); +                hasErrors = true; +            } else { +                udf->CallableType = ParseTypeFromYson(TStringBuf{udfRes.GetCallableType()}, ctx, udf->Pos); +                if (!udf->CallableType) { +                    hasErrors = true; +                    continue; +                } +                if (udfRes.HasNormalizedUserType()) { +                    udf->NormalizedUserType = ParseTypeFromYson(TStringBuf{udfRes.GetNormalizedUserType()}, ctx, udf->Pos); +                    if (!udf->NormalizedUserType) { +                        hasErrors = true; +                        continue; +                    } +                } +                if (udfRes.HasRunConfigType()) { +                    udf->RunConfigType = ParseTypeFromYson(TStringBuf{udfRes.GetRunConfigType()}, ctx, udf->Pos); +                    if (!udf->RunConfigType) { +                        hasErrors = true; +                        continue; +                    } +                } +                udf->SupportsBlocks = udfRes.GetSupportsBlocks(); +                udf->IsStrict = udfRes.GetIsStrict(); +            } +        } + +        return !hasErrors; +    } + +private: +    const NKikimr::NMiniKQL::IFunctionRegistry* FunctionRegistry_; +    NUdf::ITypeInfoHelper::TPtr TypeInfoHelper_; +    TFileStoragePtr FileStorage_; +    const TString ResolverPath_; +    const TString UdfDependencyStubPath_; +    TList<TString> UserGroupArgs_; +    const TMap<TString, TString> Path2Md5_; +}; + +void LoadSystemModulePaths( +        const TString& resolverPath, +        const TString& dir, +        TUdfModulePathsMap* paths) +{ +    const TList<TString> args = { TString("--list"), dir }; +    RunResolver(resolverPath, args, nullptr, [&](const TString& output) { +        // output format is: +        // {{module_name}}\t{{module_path}}\n + +        for (const auto& it : StringSplitter(output).Split('\n')) { +            TStringBuf moduleName, modulePath; +            const TStringBuf& line = it.Token(); +            if (!line.empty()) { +                line.Split('\t', moduleName, modulePath); +                paths->emplace(moduleName, modulePath); +            } +        } +    }); +} + +IUdfResolver::TPtr CreateOutProcUdfResolver( +    const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry, +    const TFileStoragePtr& fileStorage, +    const TString& resolverPath, +    const TString& user, +    const TString& group, +    bool filterSyscalls, +    const TString& udfDependencyStubPath, +    const TMap<TString, TString>& path2md5) { +    return new TOutProcUdfResolver(functionRegistry, fileStorage, resolverPath, user, group, filterSyscalls, udfDependencyStubPath, path2md5); +} + +} // namespace NCommon +} // namespace NYql diff --git a/yql/essentials/providers/common/udf_resolve/yql_outproc_udf_resolver.h b/yql/essentials/providers/common/udf_resolve/yql_outproc_udf_resolver.h new file mode 100644 index 00000000000..35116a1af1f --- /dev/null +++ b/yql/essentials/providers/common/udf_resolve/yql_outproc_udf_resolver.h @@ -0,0 +1,30 @@ +#pragma once + +#include <yql/essentials/core/yql_udf_resolver.h> +#include <yql/essentials/core/file_storage/file_storage.h> +#include <yql/essentials/minikql/mkql_function_registry.h> +#include <yql/essentials/providers/common/proto/udf_resolver.pb.h> + +#include <util/generic/map.h> +#include <util/generic/string.h> + +namespace NYql { +namespace NCommon { + +void LoadSystemModulePaths( +        const TString& resolverPath, +        const TString& dir, +        NKikimr::NMiniKQL::TUdfModulePathsMap* paths); + +IUdfResolver::TPtr CreateOutProcUdfResolver( +    const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry, +    const TFileStoragePtr& fileStorage, +    const TString& resolverPath, +    const TString& user, +    const TString& group, +    bool filterSysCalls, +    const TString& udfDependencyStubPath, +    const TMap<TString, TString>& path2md5 = {}); + +} // namespace NCommon +} // namespace NYql diff --git a/yql/essentials/providers/common/udf_resolve/yql_simple_udf_resolver.cpp b/yql/essentials/providers/common/udf_resolve/yql_simple_udf_resolver.cpp new file mode 100644 index 00000000000..73aab72bde9 --- /dev/null +++ b/yql/essentials/providers/common/udf_resolve/yql_simple_udf_resolver.cpp @@ -0,0 +1,229 @@ +#include "yql_simple_udf_resolver.h" + +#include <yql/essentials/providers/common/mkql/yql_type_mkql.h> +#include <yql/essentials/core/yql_holding_file_storage.h> + +#include <yql/essentials/minikql/mkql_node.h> +#include <yql/essentials/minikql/mkql_type_builder.h> +#include <yql/essentials/minikql/mkql_program_builder.h> +#include <yql/essentials/minikql/mkql_utils.h> +#include <yql/essentials/minikql/computation/mkql_computation_node.h> + +#include <library/cpp/digest/md5/md5.h> + +#include <util/generic/vector.h> +#include <util/generic/hash_set.h> +#include <util/generic/hash.h> +#include <util/generic/string.h> +#include <util/system/guard.h> +#include <util/system/spinlock.h> + +namespace NYql { +namespace NCommon { + +using namespace NKikimr; +using namespace NKikimr::NMiniKQL; + +class TSimpleUdfResolver : public IUdfResolver { +public: +    TSimpleUdfResolver(const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry, const TFileStoragePtr& fileStorage, bool useFakeMD5) +        : FunctionRegistry_(functionRegistry) +        , FileStorage_(fileStorage) +        , TypeInfoHelper_(new TTypeInfoHelper) +        , UseFakeMD5_(useFakeMD5) +    {} + +    TString GetMD5(const TString& path) const { +        if (UseFakeMD5_) { +            return MD5::Calc(path); +        } else { +            return {}; +        } +    } + +    TMaybe<TFilePathWithMd5> GetSystemModulePath(const TStringBuf& moduleName) const override { +        with_lock(Lock_) { +            auto path = FunctionRegistry_->FindUdfPath(moduleName); +            return path ? MakeMaybe<TFilePathWithMd5>(*path, GetMD5(*path)) : Nothing(); +        } +    } + +    bool LoadMetadata(const TVector<TImport*>& imports, +        const TVector<TFunction*>& functions, TExprContext& ctx) const override { + +        with_lock(Lock_) { +            bool hasErrors = false; +            THashSet<TString> requiredModules; +            for (auto udfPtr : functions) { +                auto& udf = *udfPtr; +                TStringBuf moduleName, funcName; +                if (!SplitUdfName(udf.Name, moduleName, funcName) || moduleName.empty() || funcName.empty()) { +                    ctx.AddError(TIssue(udf.Pos, TStringBuilder() << +                        "Incorrect format of function name: " << udf.Name)); +                    hasErrors = true; +                } else { +                    requiredModules.insert(TString(moduleName)); +                } +            } + +            THoldingFileStorage holdingFileStorage(FileStorage_); +            auto newRegistry = FunctionRegistry_->Clone(); +            THashMap<std::pair<TString, TString>, THashSet<TString>> cachedModules; +            for (auto import: imports) { +                if (import->Modules) { +                    bool needLibrary = false; +                    for (auto& m : *import->Modules) { +                        if (requiredModules.contains(m)) { +                            needLibrary = true; +                            break; +                        } +                    } + +                    if (!needLibrary) { +                        continue; +                    } +                } else { +                    import->Modules.ConstructInPlace(); +                } +                const TString& customUdfPrefix = import->Block->CustomUdfPrefix; +                try { +                    THashSet<TString> modules; +                    if (FileStorage_) { +                        auto link = holdingFileStorage.FreezeFile(*import->Block); +                        auto path = link->GetPath().GetPath(); +                        auto [it, inserted] = cachedModules.emplace(std::make_pair(path, customUdfPrefix), THashSet<TString>()); +                        if (inserted) { +                            newRegistry->LoadUdfs( +                                path, +                                {}, +                                NUdf::IRegistrator::TFlags::TypesOnly, +                                customUdfPrefix, +                                &modules); +                            it->second = modules; +                        } else { +                            modules = it->second; +                        } +                    } else { +                        if (import->Block->Type != EUserDataType::PATH) { +                            ctx.AddError(TIssue(import->Pos, TStringBuilder() << +                                "Only path file type is supported, cannot load file with alias: " << import->FileAlias)); +                            hasErrors = true; +                            continue; +                        } +                        auto [it, inserted] = cachedModules.emplace(std::make_pair(import->Block->Data, customUdfPrefix), THashSet<TString>()); +                        if (inserted) { +                            newRegistry->LoadUdfs( +                                import->Block->Data, +                                {}, +                                NUdf::IRegistrator::TFlags::TypesOnly, +                                customUdfPrefix, +                                &modules); +                            it->second = modules; +                        } else { +                            modules = it->second; +                        } +                    } + +                    import->Modules->assign(modules.begin(), modules.end()); +                } +                catch (yexception& e) { +                    ctx.AddError(TIssue(import->Pos, TStringBuilder() +                        << "Internal error of loading udf module: " << import->FileAlias +                        << ", reason: " << e.what())); +                    hasErrors = true; +                } +            } + +            hasErrors = !LoadFunctionsMetadata(functions, *newRegistry, TypeInfoHelper_, ctx) || hasErrors; +            return !hasErrors; +        } +    } + +    TResolveResult LoadRichMetadata(const TVector<TImport>& imports) const override { +        Y_UNUSED(imports); +        ythrow yexception() << "LoadRichMetadata is not supported in SimpleUdfResolver"; +    } + +    bool ContainsModule(const TStringBuf& moduleName) const override { +        return FunctionRegistry_->IsLoadedUdfModule(moduleName); +    } + +private: +    mutable TAdaptiveLock Lock_; +    const NKikimr::NMiniKQL::IFunctionRegistry* FunctionRegistry_; +    TFileStoragePtr FileStorage_; +    NUdf::ITypeInfoHelper::TPtr TypeInfoHelper_; +    const bool UseFakeMD5_; +}; + +IUdfResolver::TPtr CreateSimpleUdfResolver( +    const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry, +    const TFileStoragePtr& fileStorage, +    bool useFakeMD5 +) { +    return new TSimpleUdfResolver(functionRegistry, fileStorage, useFakeMD5); +} + +bool LoadFunctionsMetadata(const TVector<IUdfResolver::TFunction*>& functions, +    const NKikimr::NMiniKQL::IFunctionRegistry& functionRegistry, +    NUdf::ITypeInfoHelper::TPtr typeInfoHelper, +    TExprContext& ctx) { + +    bool hasErrors = false; +    TScopedAlloc alloc(__LOCATION__); +    TTypeEnvironment env(alloc); + +    for (auto udfPtr : functions) { +        auto& udf = *udfPtr; +        try { +            TType* mkqlUserType = nullptr; +            if (udf.UserType) { +                TStringStream err; +                mkqlUserType = BuildType(*udf.UserType, {env}, err);// +                if (!mkqlUserType) { +                    ctx.AddError(TIssue(udf.Pos, TStringBuilder() << "Invalid user type for function: " +                        << udf.Name << ", error: " << err.Str())); +                    hasErrors = true; +                    continue; +                } +            } + +            auto secureParamsProvider = MakeSimpleSecureParamsProvider(udf.SecureParams); + +            TFunctionTypeInfo funcInfo; +            auto status = functionRegistry.FindFunctionTypeInfo(env, typeInfoHelper, nullptr, +                udf.Name, mkqlUserType, udf.TypeConfig, NUdf::IUdfModule::TFlags::TypesOnly, {}, secureParamsProvider.get(), &funcInfo); +            if (!status.IsOk()) { +                ctx.AddError(TIssue(udf.Pos, TStringBuilder() << "Failed to find UDF function: " << udf.Name +                    << ", reason: " << status.GetError())); +                hasErrors = true; +                continue; +            } + +            udf.CallableType = ConvertMiniKQLType(udf.Pos, funcInfo.FunctionType, ctx); +            YQL_ENSURE(udf.CallableType); +            if (funcInfo.RunConfigType) { +                udf.RunConfigType = ConvertMiniKQLType(udf.Pos, const_cast<TType*>(funcInfo.RunConfigType), ctx); +                YQL_ENSURE(udf.RunConfigType); +            } + +            if (funcInfo.UserType) { +                udf.NormalizedUserType = ConvertMiniKQLType(udf.Pos, const_cast<TType*>(funcInfo.UserType), ctx); +                YQL_ENSURE(udf.NormalizedUserType); +            } + +            udf.SupportsBlocks = funcInfo.SupportsBlocks; +            udf.IsStrict = funcInfo.IsStrict; +        } catch (const std::exception& e) { +            ctx.AddError(TIssue(udf.Pos, TStringBuilder() +                << "Internal error was found when udf metadata is loading for function: " << udf.Name +                << ", reason: " << e.what())); +            hasErrors = true; +        } +    } + +    return !hasErrors; +} + +} // namespace NCommon +} // namespace NYql diff --git a/yql/essentials/providers/common/udf_resolve/yql_simple_udf_resolver.h b/yql/essentials/providers/common/udf_resolve/yql_simple_udf_resolver.h new file mode 100644 index 00000000000..87b40e97b52 --- /dev/null +++ b/yql/essentials/providers/common/udf_resolve/yql_simple_udf_resolver.h @@ -0,0 +1,22 @@ +#pragma once + +#include <yql/essentials/core/yql_type_annotation.h> +#include <yql/essentials/core/file_storage/file_storage.h> + +#include <yql/essentials/minikql/mkql_alloc.h> +#include <yql/essentials/minikql/mkql_function_registry.h> + +namespace NYql { +namespace NCommon { + +IUdfResolver::TPtr CreateSimpleUdfResolver( +    const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry, +    const TFileStoragePtr& fileStorage = {}, bool useFakeMD5 = false); + +bool LoadFunctionsMetadata(const TVector<IUdfResolver::TFunction*>& functions, +    const NKikimr::NMiniKQL::IFunctionRegistry& functionRegistry, +    NUdf::ITypeInfoHelper::TPtr typeInfoHelper, +    TExprContext& ctx); + +} // namespace NCommon +} // namespace NYql diff --git a/yql/essentials/providers/common/udf_resolve/yql_udf_resolver_with_index.cpp b/yql/essentials/providers/common/udf_resolve/yql_udf_resolver_with_index.cpp new file mode 100644 index 00000000000..2ce67fa0f60 --- /dev/null +++ b/yql/essentials/providers/common/udf_resolve/yql_udf_resolver_with_index.cpp @@ -0,0 +1,248 @@ +#include "yql_udf_resolver_with_index.h" + +#include <yql/essentials/providers/common/schema/expr/yql_expr_schema.h> +#include <yql/essentials/core/yql_type_annotation.h> + +#include <yql/essentials/minikql/mkql_node.h> +#include <yql/essentials/minikql/mkql_type_builder.h> +#include <yql/essentials/minikql/mkql_program_builder.h> +#include <yql/essentials/minikql/mkql_utils.h> + +#include <util/generic/hash_set.h> +#include <util/generic/hash.h> +#include <util/generic/map.h> +#include <util/generic/set.h> +#include <util/generic/string.h> +#include <util/system/guard.h> +#include <util/system/mutex.h> + +namespace NYql { +namespace NCommon { + +using namespace NKikimr; +using namespace NKikimr::NMiniKQL; + +class TUdfResolverWithIndex : public IUdfResolver { +    class TResourceFile : public TThrRefBase { +    public: +        typedef TIntrusivePtr<TResourceFile> TPtr; + +    public: +        TResourceFile(TString alias, const TVector<TString>& modules, TFileLinkPtr link) +            : Link_(std::move(link)) +        { +            Import_.FileAlias = alias; +            Import_.Block = &Block_; +            Import_.Modules = MakeMaybe(modules); + +            Block_.Type = EUserDataType::PATH; +            Block_.Data = Link_->GetPath(); +            Block_.Usage.Set(EUserDataBlockUsage::Udf); +        } + +        static TResourceFile::TPtr Create(const TString& packageName, const TSet<TString>& modules, TFileLinkPtr link) { +            // assume package name has no bad symbols for file name +            TString basename =  link->GetPath().Basename(); +            TString alias = basename.StartsWith("lib") ? basename : ("lib_" + packageName + "_udf.so"); +            alias.to_lower(); +            return MakeIntrusive<TResourceFile>(std::move(alias), TVector<TString>(modules.begin(), modules.end()), std::move(link)); +        } + +    public: +        TFileLinkPtr Link_; +        TUserDataBlock Block_; +        TImport Import_; +    }; + +public: +    TUdfResolverWithIndex(TUdfIndex::TPtr udfIndex, IUdfResolver::TPtr fallback, TFileStoragePtr fileStorage) +        : UdfIndex_(udfIndex) +        , Fallback_(fallback) +        , FileStorage_(fileStorage) +    { +        Y_ENSURE(UdfIndex_); +        Y_ENSURE(FileStorage_); +        // fallback is required only to handle type aware functions and loading rich metadata +        Y_ENSURE(Fallback_); +    } + +    TMaybe<TFilePathWithMd5> GetSystemModulePath(const TStringBuf& moduleName) const override { +        with_lock(Lock_) { +            TString moduleNameStr(moduleName); +            if (!UdfIndex_->ContainsModule(moduleNameStr)) { +                return Nothing(); +            } + +            auto file = DownloadFileWithModule(moduleNameStr); +            return MakeMaybe<TFilePathWithMd5>(file->Link_->GetPath(), file->Link_->GetMd5()); +        } +    } + +    bool LoadMetadata(const TVector<TImport*>& imports, const TVector<TFunction*>& functions, TExprContext& ctx) const override { +        with_lock(Lock_) { +            bool hasErrors = false; +            THashSet<TString> requiredModules; +            TVector<TFunction*> fallbackFunctions; +            TVector<TImport*> fallbackImports = imports; +            TSet<TImport*> additionalImports; // avoid duplicates + +            for (auto udfPtr : functions) { +                TImport* additionalImport = nullptr; +                TFunction* fallbackFunction = nullptr; +                if (!LoadFunctionMetadata(*udfPtr, ctx, fallbackFunction, additionalImport)) { +                    hasErrors = true; +                    continue; +                } + +                if (additionalImport) { +                    additionalImports.insert(additionalImport); +                } + +                if (fallbackFunction) { +                    fallbackFunctions.push_back(fallbackFunction); +                } +            } + +            fallbackImports.insert(fallbackImports.end(), additionalImports.begin(), additionalImports.end()); + +            return Fallback_->LoadMetadata(fallbackImports, fallbackFunctions, ctx) && !hasErrors; +        } +    } + +    TResolveResult LoadRichMetadata(const TVector<TImport>& imports) const override { +        return Fallback_->LoadRichMetadata(imports); +    } + +    bool ContainsModule(const TStringBuf& moduleName) const override { +        TString moduleNameStr = TString(moduleName); +        if (UdfIndex_->ContainsModule(moduleNameStr)) { +            return true; +        } + +        return Fallback_->ContainsModule(moduleName); +    } + +private: +    bool LoadFunctionMetadata(TFunction& function, TExprContext& ctx, TFunction*& fallbackFunction, TImport*& additionalImport) const { +        TStringBuf moduleName, funcName; +        if (!SplitUdfName(function.Name, moduleName, funcName) || moduleName.empty() || funcName.empty()) { +            ctx.AddError(TIssue(function.Pos, TStringBuilder() << "Incorrect format of function name: " << function.Name)); +            return false; +        } + +        /* +        the order is really important: +        1) check we have such module +            no-> fallback function +        2) check we have such function +            no -> error +        3) download resource file +            fail -> error +        4) if polymorphic function -> fallback function with additional Import for downloaded file +        */ + +        TString moduleNameStr = TString(moduleName); +        if (!UdfIndex_->ContainsModule(moduleNameStr)) { +            fallbackFunction = &function; +            return true; +        } + +        TFunctionInfo info; +        if (!UdfIndex_->FindFunction(moduleNameStr, function.Name, info)) { +            ctx.AddError(TIssue(function.Pos, TStringBuilder() << "Function not found: " << function.Name)); +            return false; +        } + +        TResourceFile::TPtr file = DownloadFileWithModule(moduleName, function.Pos, ctx); +        if (!file) { +            return false; +        } + +        additionalImport = &file->Import_; + +        if (info.IsTypeAwareness) { +            fallbackFunction = &function; +            return true; +        } + +        if (!info.CallableType) { +            ctx.AddError(TIssue(function.Pos, TStringBuilder() << "CallableType for function " << function.Name << " is empty. Check UDF source code for errors.")); +            return false; +        } + +        function.CallableType = ParseTypeFromYson(TStringBuf{info.CallableType}, ctx, function.Pos); +        if (!function.CallableType) { +            ctx.AddError(TIssue(function.Pos, TStringBuilder() << "Failed to build callable type from YSON for function " << function.Name)); +            return false; +        } + +        if (info.RunConfigType) { +            function.RunConfigType = ParseTypeFromYson(TStringBuf{info.RunConfigType}, ctx, function.Pos); +            if (!function.RunConfigType) { +                ctx.AddError(TIssue(function.Pos, TStringBuilder() << "Failed to build run config type from YSON for function " << function.Name)); +                return false; +            } +        } else { +            function.RunConfigType = std::get<0>(ctx.SingletonTypeCache); +        } + +        function.NormalizedUserType = std::get<0>(ctx.SingletonTypeCache); +        function.IsStrict = info.IsStrict; +        function.SupportsBlocks = info.SupportsBlocks; +        return true; +    } + +    TResourceFile::TPtr DownloadFileWithModule(const TStringBuf& module, const TPosition& pos, TExprContext& ctx) const { +        try { +            return DownloadFileWithModule(module); +        } catch (const std::exception& e) { +            ctx.AddError(ExceptionToIssue(e, pos)); +        } + +        return nullptr; +    } + +    TResourceFile::TPtr DownloadFileWithModule(const TStringBuf& module) const { +        TString moduleName(module); + +        const auto it = DownloadedFiles_.find(module); +        if (it != DownloadedFiles_.end()) { +            return it->second; +        } + +        auto resource = UdfIndex_->FindResourceByModule(moduleName); +        if (!resource) { +            ythrow yexception() << "No resource has been found for registered module " << moduleName; +        } + +        // token is empty for urls for now +        // assumption: file path is frozen already, no need to put into file storage +        const TDownloadLink& downloadLink = resource->Link; +        TFileLinkPtr link = downloadLink.IsUrl ? FileStorage_->PutUrl(downloadLink.Path, {}) : CreateFakeFileLink(downloadLink.Path, downloadLink.Md5); +        TResourceFile::TPtr file = TResourceFile::Create(moduleName, resource->Modules, link); +        for (auto& d : resource->Modules) { +            auto p = DownloadedFiles_.emplace(d, file); +            if (!p.second) { +                // should not happen because UdfIndex handles conflicts +                ythrow yexception() << "file already downloaded for module " << moduleName << ", conflicting path " << downloadLink.Path << ", existing local file " << p.first->second->Link_->GetPath(); +            } +        } + +        return file; +    } + +private: +    mutable TMutex Lock_; +    const TUdfIndex::TPtr UdfIndex_; +    const IUdfResolver::TPtr Fallback_; +    const TFileStoragePtr FileStorage_; +    // module -> downloaded resource file +    mutable TMap<TString, TResourceFile::TPtr> DownloadedFiles_; +}; + +IUdfResolver::TPtr CreateUdfResolverWithIndex(TUdfIndex::TPtr udfIndex, IUdfResolver::TPtr fallback, TFileStoragePtr fileStorage) { +    return new TUdfResolverWithIndex(udfIndex, fallback, fileStorage); +} + +} // namespace NCommon +} // namespace NYql diff --git a/yql/essentials/providers/common/udf_resolve/yql_udf_resolver_with_index.h b/yql/essentials/providers/common/udf_resolve/yql_udf_resolver_with_index.h new file mode 100644 index 00000000000..86bff50c255 --- /dev/null +++ b/yql/essentials/providers/common/udf_resolve/yql_udf_resolver_with_index.h @@ -0,0 +1,12 @@ +#pragma once + +#include <yql/essentials/core/yql_udf_index.h> +#include <yql/essentials/core/file_storage/file_storage.h> + +namespace NYql { +namespace NCommon { + +IUdfResolver::TPtr CreateUdfResolverWithIndex(TUdfIndex::TPtr udfIndex, IUdfResolver::TPtr fallback, TFileStoragePtr fileStorage); + +} // namespace NCommon +} // namespace NYql  | 
