diff options
author | vvvv <vvvv@yandex-team.com> | 2024-11-07 04:19:26 +0300 |
---|---|---|
committer | vvvv <vvvv@yandex-team.com> | 2024-11-07 04:29:50 +0300 |
commit | 2661be00f3bc47590fda9218bf0386d6355c8c88 (patch) | |
tree | 3d316c07519191283d31c5f537efc6aabb42a2f0 /yql/essentials/minikql/mkql_function_registry.cpp | |
parent | cf2a23963ac10add28c50cc114fbf48953eca5aa (diff) | |
download | ydb-2661be00f3bc47590fda9218bf0386d6355c8c88.tar.gz |
Moved yql/minikql YQL-19206
init
[nodiff:caesar]
commit_hash:d1182ef7d430ccf7e4d37ed933c7126d7bd5d6e4
Diffstat (limited to 'yql/essentials/minikql/mkql_function_registry.cpp')
-rw-r--r-- | yql/essentials/minikql/mkql_function_registry.cpp | 573 |
1 files changed, 573 insertions, 0 deletions
diff --git a/yql/essentials/minikql/mkql_function_registry.cpp b/yql/essentials/minikql/mkql_function_registry.cpp new file mode 100644 index 0000000000..ab7c623cb5 --- /dev/null +++ b/yql/essentials/minikql/mkql_function_registry.cpp @@ -0,0 +1,573 @@ +#include "mkql_function_registry.h" +#include "mkql_utils.h" +#include "mkql_type_builder.h" +#include <yql/essentials/public/udf/udf_static_registry.h> + +#include <util/folder/iterator.h> +#include <util/folder/dirut.h> +#include <util/folder/path.h> +#include <util/system/dynlib.h> +#include <util/stream/str.h> +#include <util/string/builder.h> +#include <util/string/split.h> + +namespace { + +using namespace NKikimr; +using namespace NMiniKQL; + +const char MODULE_NAME_DELIMITER = '.'; +const char* RegisterFuncName = "Register"; +const char* AbiVersionFuncName = "AbiVersion"; +#if defined(_win_) || defined(_darwin_) +const char* BindSymbolsFuncName = "BindSymbols"; +#endif +const char* SetBackTraceCallbackName = "SetBackTraceCallback"; + +////////////////////////////////////////////////////////////////////////////// +// TMutableFunctionRegistry +////////////////////////////////////////////////////////////////////////////// +class TMutableFunctionRegistry: public IMutableFunctionRegistry +{ + struct TUdfModule { + TString LibraryPath; + std::shared_ptr<NUdf::IUdfModule> Impl; + }; + + using TUdfModulesMap = THashMap<TString, TUdfModule>; + + struct TUdfLibrary: public TThrRefBase { + ui32 AbiVersion = 0; + TDynamicLibrary Lib; + TUdfLibrary() + { + } + }; + using TUdfLibraryPtr = TIntrusivePtr<TUdfLibrary>; + + class TUdfModuleLoader: public NUdf::IRegistrator { + public: + TUdfModuleLoader( + TUdfModulesMap& modulesMap, + THashSet<TString>* newModules, + const TString& libraryPath, + const TUdfModuleRemappings& remappings, + ui32 abiVersion, + const TString& customUdfPrefix = {}) + : ModulesMap(modulesMap) + , NewModules(newModules) + , LibraryPath(libraryPath) + , Remappings(remappings) + , AbiVersion(NUdf::AbiVersionToStr(abiVersion)) + , CustomUdfPrefix(customUdfPrefix) + { + } + + void AddModule( + const NUdf::TStringRef& name, + NUdf::TUniquePtr<NUdf::IUdfModule> module) override + { + Y_DEBUG_ABORT_UNLESS(module, "Module is empty"); + + if (!HasError()) { + TUdfModule m; + m.LibraryPath = LibraryPath; + m.Impl.reset(module.Release()); + + auto it = Remappings.find(name); + const TString& newName = CustomUdfPrefix + + ((it == Remappings.end()) + ? TString(name) + : it->second); + + auto i = ModulesMap.insert({ newName, std::move(m) }); + if (!i.second) { + TUdfModule* oldModule = ModulesMap.FindPtr(newName); + Y_DEBUG_ABORT_UNLESS(oldModule != nullptr); + Error = (TStringBuilder() + << "UDF module duplication: name " << TStringBuf(name) + << ", already loaded from " << oldModule->LibraryPath + << ", trying to load from " << LibraryPath); + } else if (NewModules) { + NewModules->insert(newName); + } + } + } + + const TString& GetError() const { return Error; } + bool HasError() const { return !Error.empty(); } + + private: + TUdfModulesMap& ModulesMap; + THashSet<TString>* NewModules; + const TString LibraryPath; + const TUdfModuleRemappings& Remappings; + const TString AbiVersion; + TString Error; + const TString CustomUdfPrefix; + }; + +public: + TMutableFunctionRegistry(IBuiltinFunctionRegistry::TPtr builtins) + : Builtins_(std::move(builtins)) + { + } + + TMutableFunctionRegistry(const TMutableFunctionRegistry& rhs) + : Builtins_(rhs.Builtins_) + , LoadedLibraries_(rhs.LoadedLibraries_) + , UdfModules_(rhs.UdfModules_) + , SupportsSizedAllocators_(rhs.SupportsSizedAllocators_) + { + } + + ~TMutableFunctionRegistry() { + } + + void AllowUdfPatch() override { + } + + void LoadUdfs( + const TString& libraryPath, + const TUdfModuleRemappings& remmapings, + ui32 flags = 0, + const TString& customUdfPrefix = {}, + THashSet<TString>* modules = nullptr) override + { + TUdfLibraryPtr lib; + + auto libIt = LoadedLibraries_.find(libraryPath); + if (libIt == LoadedLibraries_.end()) { + lib = MakeIntrusive<TUdfLibrary>(); +#ifdef _win32_ + ui32 loadFlags = 0; +#else + ui32 loadFlags = RTLD_GLOBAL | ((flags & NUdf::IRegistrator::TFlags::TypesOnly) ? RTLD_LAZY : RTLD_NOW); +#endif + TPathSplit absPathSplit(libraryPath); + TString absPath = libraryPath; + if (!absPathSplit.IsAbsolute) { + absPath = JoinPaths(TFsPath::Cwd().PathSplit(), absPathSplit); + } + + lib->Lib.Open(absPath.data(), loadFlags); + lib->Lib.SetUnloadable(false); + + // (1) check ABI version + auto abiVersionFunc = reinterpret_cast<NUdf::TAbiVersionFunctionPtr>( + lib->Lib.SymOptional(AbiVersionFuncName)); + if (!abiVersionFunc) { + return; + } + + ui32 version = abiVersionFunc(); + Y_ENSURE(NUdf::IsAbiCompatible(version) && version >= NUdf::MakeAbiVersion(2, 8, 0), + "Non compatible ABI version of UDF library " << libraryPath + << ", expected up to " << NUdf::AbiVersionToStr(NUdf::CurrentCompatibilityAbiVersion() * 100) + << ", got " << NUdf::AbiVersionToStr(version) + << "; try to re-compile library using " + << "YQL_ABI_VERSION(" << UDF_ABI_VERSION_MAJOR + << " " << UDF_ABI_VERSION_MINOR << " 0) macro in ya.make"); + lib->AbiVersion = version; + if (version < NUdf::MakeAbiVersion(2, 8, 0)) { + SupportsSizedAllocators_ = false; + } + +#if defined(_win_) || defined(_darwin_) + auto bindSymbolsFunc = reinterpret_cast<NUdf::TBindSymbolsFunctionPtr>(lib->Lib.Sym(BindSymbolsFuncName)); + bindSymbolsFunc(NUdf::GetStaticSymbols()); +#endif + + if (BackTraceCallback_) { + auto setter = reinterpret_cast<NUdf::TSetBackTraceCallbackPtr>(lib->Lib.SymOptional(SetBackTraceCallbackName)); + if (setter) { + setter(BackTraceCallback_); + } + } + + libIt = LoadedLibraries_.insert({ libraryPath, lib }).first; + } else { + lib = libIt->second; + } + + // (2) ensure that Register() func is present + auto registerFunc = reinterpret_cast<NUdf::TRegisterFunctionPtr>( + lib->Lib.Sym(RegisterFuncName)); + + // (3) do load + THashSet<TString> newModules; + TUdfModuleLoader loader( + UdfModules_, + &newModules, + libraryPath, + remmapings, + lib->AbiVersion, customUdfPrefix); + registerFunc(loader, flags); + Y_ENSURE(!loader.HasError(), loader.GetError()); + + if (modules) { + *modules = std::move(newModules); + } + } + + void AddModule( + const TStringBuf& libraryPath, + const TStringBuf& moduleName, + NUdf::TUniquePtr<NUdf::IUdfModule> module) override + { + TString libraryPathStr(libraryPath); + auto inserted = LoadedLibraries_.insert({ libraryPathStr, nullptr }); + if (!inserted.second) { + return; + } + + TUdfModuleRemappings remappings; + TUdfModuleLoader loader( + UdfModules_, nullptr, libraryPathStr, + remappings, NUdf::CurrentAbiVersion()); + loader.AddModule(moduleName, std::move(module)); + + Y_ENSURE(!loader.HasError(), loader.GetError()); + } + + void SetSystemModulePaths(const TUdfModulePathsMap& paths) override { + SystemModulePaths_ = paths; + } + + const IBuiltinFunctionRegistry::TPtr& GetBuiltins() const override { + return Builtins_; + } + + TStatus FindFunctionTypeInfo( + const TTypeEnvironment& env, + NUdf::ITypeInfoHelper::TPtr typeInfoHelper, + NUdf::ICountersProvider* countersProvider, + const TStringBuf& name, + TType* userType, + const TStringBuf& typeConfig, + ui32 flags, + const NUdf::TSourcePosition& pos, + const NUdf::ISecureParamsProvider* secureParamsProvider, + TFunctionTypeInfo* funcInfo) const override + { + TStringBuf moduleName, funcName; + if (name.TrySplit(MODULE_NAME_DELIMITER, moduleName, funcName)) { + auto it = UdfModules_.find(moduleName); + if (it != UdfModules_.end()) { + TFunctionTypeInfoBuilder typeInfoBuilder(env, typeInfoHelper, moduleName, + (flags & NUdf::IUdfModule::TFlags::TypesOnly) ? nullptr : countersProvider, pos, secureParamsProvider); + const auto& module = *it->second.Impl; + module.BuildFunctionTypeInfo( + funcName, userType, typeConfig, flags, typeInfoBuilder); + + if (typeInfoBuilder.HasError()) { + return TStatus::Error() + << "Module: " << moduleName + << ", function: " << funcName + << ", error: " << typeInfoBuilder.GetError(); + } + + try { + typeInfoBuilder.Build(funcInfo); + } + catch (yexception& e) { + return TStatus::Error() + << "Module: " << moduleName + << ", function: " << funcName + << ", error: " << e.what(); + } + + if ((flags & NUdf::IRegistrator::TFlags::TypesOnly) && + !funcInfo->FunctionType) + { + return TStatus::Error() + << "Module: " << moduleName + << ", function: " << funcName + << ", function not found"; + } + + if (funcInfo->ModuleIRUniqID) { + funcInfo->ModuleIRUniqID.prepend(moduleName); + } + + return TStatus::Ok(); + } + + return TStatus::Error() + << "Module " << moduleName << " is not registered"; + } + + return TStatus::Error() + << "Function name must be in <module>.<func_name> scheme. " + << "But get " << name; + } + + TMaybe<TString> FindUdfPath(const TStringBuf& moduleName) const override { + if (const TUdfModule* udf = UdfModules_.FindPtr(moduleName)) { + return udf->LibraryPath; + } + + if (const TString* path = SystemModulePaths_.FindPtr(moduleName)) { + return *path; + } + + return Nothing(); + } + + bool IsLoadedUdfModule(const TStringBuf& moduleName) const override { + return UdfModules_.contains(moduleName); + } + + THashSet<TString> GetAllModuleNames() const override { + THashSet<TString> names; + names.reserve(UdfModules_.size()); + for (const auto& module: UdfModules_) { + names.insert(module.first); + } + return names; + } + + TFunctionsMap GetModuleFunctions(const TStringBuf& moduleName) const override { + struct TFunctionNamesSink: public NUdf::IFunctionNamesSink { + TFunctionsMap Functions; + class TFuncDescriptor : public NUdf::IFunctionDescriptor { + public: + TFuncDescriptor(TFunctionProperties& properties) + : Properties(properties) + {} + + private: + void SetTypeAwareness() final { + Properties.IsTypeAwareness = true; + } + + TFunctionProperties& Properties; + }; + + NUdf::IFunctionDescriptor::TPtr Add(const NUdf::TStringRef& name) final { + const auto it = Functions.emplace(name, TFunctionProperties{}); + return new TFuncDescriptor(it.first->second); + } + } sink; + + + const auto it = UdfModules_.find(moduleName); + if (UdfModules_.cend() == it) + return TFunctionsMap(); + it->second.Impl->GetAllFunctions(sink); + return sink.Functions; + } + + bool SupportsSizedAllocators() const override { + return SupportsSizedAllocators_; + } + + void PrintInfoTo(IOutputStream& out) const override { + Builtins_->PrintInfoTo(out); + } + + void CleanupModulesOnTerminate() const override { + for (const auto& module : UdfModules_) { + module.second.Impl->CleanupOnTerminate(); + } + } + + TIntrusivePtr<IMutableFunctionRegistry> Clone() const override { + return new TMutableFunctionRegistry(*this); + } + + void SetBackTraceCallback(NUdf::TBackTraceCallback callback) override { + BackTraceCallback_ = callback; + } + +private: + const IBuiltinFunctionRegistry::TPtr Builtins_; + + THashMap<TString, TUdfLibraryPtr> LoadedLibraries_; + TUdfModulesMap UdfModules_; + THolder<TMemoryUsageInfo> UdfMemoryInfo_; + TUdfModulePathsMap SystemModulePaths_; + NUdf::TBackTraceCallback BackTraceCallback_ = nullptr; + bool SupportsSizedAllocators_ = true; +}; + +////////////////////////////////////////////////////////////////////////////// +// TBuiltinsWrapper +////////////////////////////////////////////////////////////////////////////// +class TBuiltinsWrapper: public IFunctionRegistry +{ +public: + TBuiltinsWrapper(IBuiltinFunctionRegistry::TPtr&& builtins) + : Builtins_(std::move(builtins)) + { + } + + const IBuiltinFunctionRegistry::TPtr& GetBuiltins() const override { + return Builtins_; + } + + void AllowUdfPatch() override { + } + + TStatus FindFunctionTypeInfo( + const TTypeEnvironment& env, + NUdf::ITypeInfoHelper::TPtr typeInfoHelper, + NUdf::ICountersProvider* countersProvider, + const TStringBuf& name, + TType* userType, + const TStringBuf& typeConfig, + ui32 flags, + const NUdf::TSourcePosition& pos, + const NUdf::ISecureParamsProvider* secureParamsProvider, + TFunctionTypeInfo* funcInfo) const override + { + Y_UNUSED(env); + Y_UNUSED(typeInfoHelper); + Y_UNUSED(countersProvider); + Y_UNUSED(name); + Y_UNUSED(userType); + Y_UNUSED(typeConfig); + Y_UNUSED(flags); + Y_UNUSED(pos); + Y_UNUSED(secureParamsProvider); + Y_UNUSED(funcInfo); + return TStatus::Error(TStringBuf("Unsupported access to builtins registry")); + } + + TMaybe<TString> FindUdfPath( + const TStringBuf& /* moduleName */) const override + { + return{}; + } + + bool IsLoadedUdfModule(const TStringBuf& /* moduleName */) const override { + return false; + } + + THashSet<TString> GetAllModuleNames() const override { + return {}; + } + + TFunctionsMap GetModuleFunctions(const TStringBuf&) const override { + return TFunctionsMap(); + } + + bool SupportsSizedAllocators() const override { + return true; + } + + void PrintInfoTo(IOutputStream& out) const override { + Builtins_->PrintInfoTo(out); + } + + void CleanupModulesOnTerminate() const override { + } + + TIntrusivePtr<IMutableFunctionRegistry> Clone() const override { + return new TMutableFunctionRegistry(Builtins_); + } + +private: + const IBuiltinFunctionRegistry::TPtr Builtins_; +}; + +} // namespace + +namespace NKikimr { +namespace NMiniKQL { + +void FindUdfsInDir(const TString& dirPath, TVector<TString>* paths) +{ + static const TStringBuf libPrefix = TStringBuf(MKQL_UDF_LIB_PREFIX); + static const TStringBuf libSuffix = TStringBuf(MKQL_UDF_LIB_SUFFIX); + + if (!dirPath.empty()) { + std::vector<TString> dirs; + StringSplitter(dirPath).Split(';').AddTo(&dirs); + + for (auto d : dirs) { + TDirIterator dir(d, TDirIterator::TOptions(FTS_LOGICAL).SetMaxLevel(10)); + + for (auto file = dir.begin(), end = dir.end(); file != end; ++file) { + // skip entries with empty name, and all non-files + // all valid symlinks are already dereferenced, provided by FTS_LOGICAL + if (file->fts_pathlen == file->fts_namelen || file->fts_info != FTS_F) { + continue; + } + + TString path(file->fts_path); + TString fileName = GetBaseName(path); + + // skip non shared libraries + if (!fileName.StartsWith(libPrefix) || + !fileName.EndsWith(libSuffix)) + { + continue; + } + + // skip test udfs when scanning dir + auto udfName = TStringBuf(fileName).Skip(libPrefix.length()); + if (udfName.StartsWith(TStringBuf("test_"))) { + continue; + } + + paths->push_back(std::move(path)); + } + } + } +} + +bool SplitModuleAndFuncName(const TStringBuf& name, TStringBuf& module, TStringBuf& func) +{ + return name.TrySplit(MODULE_NAME_DELIMITER, module, func); +} + +TString FullName(const TStringBuf& module, const TStringBuf& func) +{ + TString fullName; + fullName.reserve(module.size() + func.size() + 1); + fullName.append(module); + fullName.append(MODULE_NAME_DELIMITER); + fullName.append(func); + return fullName; +} + +TIntrusivePtr<IFunctionRegistry> CreateFunctionRegistry(IBuiltinFunctionRegistry::TPtr&& builtins) +{ + return new TBuiltinsWrapper(std::move(builtins)); +} + +TIntrusivePtr<IFunctionRegistry> CreateFunctionRegistry( + NKikimr::NUdf::TBackTraceCallback backtraceCallback, + IBuiltinFunctionRegistry::TPtr&& builtins, + bool allowUdfPatch, + const TVector<TString>& udfsPaths, + ui32 flags /* = 0 */) +{ + auto registry = MakeHolder<TMutableFunctionRegistry>(std::move(builtins)); + if (allowUdfPatch) { + registry->AllowUdfPatch(); + } + registry->SetBackTraceCallback(backtraceCallback); + + // system UDFs loaded with default names + TUdfModuleRemappings remappings; + THashSet<TString> usedUdfPaths; + for (const TString& udfPath: udfsPaths) { + if (usedUdfPaths.insert(udfPath).second) { + registry->LoadUdfs(udfPath, remappings, flags); + } + } + + return registry.Release(); +} + +void FillStaticModules(IMutableFunctionRegistry& registry) { + for (const auto& wrapper : NUdf::GetStaticUdfModuleWrapperList()) { + auto [name, ptr] = wrapper(); + registry.AddModule(TString(StaticModulePrefix) + name, name, std::move(ptr)); + } +} + +} // namespace NMiniKQL +} // namespace NKiki |