diff options
author | imunkin <imunkin@yandex-team.com> | 2024-11-08 10:00:23 +0300 |
---|---|---|
committer | imunkin <imunkin@yandex-team.com> | 2024-11-08 10:12:13 +0300 |
commit | a784a2f943d6e15caa6241e2e96d80aac6dbf375 (patch) | |
tree | 05f1e5366c916b988a8afb75bdab8ddeee0f6e6d /yql/essentials/udfs/common/python/python_udf | |
parent | d70137a7b530ccaa52834274913bbb5a3d1ca06e (diff) | |
download | ydb-a784a2f943d6e15caa6241e2e96d80aac6dbf375.tar.gz |
Move yql/udfs/common/ to /yql/essentials YQL-19206
Except the following directories:
* clickhouse/client
* datetime
* knn
* roaring
commit_hash:c7da95636144d28db109d6b17ddc762e9bacb59f
Diffstat (limited to 'yql/essentials/udfs/common/python/python_udf')
5 files changed, 394 insertions, 0 deletions
diff --git a/yql/essentials/udfs/common/python/python_udf/python_function_factory.h b/yql/essentials/udfs/common/python/python_udf/python_function_factory.h new file mode 100644 index 0000000000..a4e393b486 --- /dev/null +++ b/yql/essentials/udfs/common/python/python_udf/python_function_factory.h @@ -0,0 +1,111 @@ +#pragma once + +#include <yql/essentials/public/udf/udf_value.h> +#include <yql/essentials/public/udf/udf_value_builder.h> +#include <yql/essentials/public/udf/udf_type_builder.h> +#include <yql/essentials/public/udf/udf_registrator.h> +#include <yql/essentials/public/udf/udf_terminator.h> +#include <yql/essentials/udfs/common/python/bindings/py_ptr.h> +#include <yql/essentials/udfs/common/python/bindings/py_callable.h> +#include <yql/essentials/udfs/common/python/bindings/py_cast.h> +#include <yql/essentials/udfs/common/python/bindings/py_errors.h> +#include <yql/essentials/udfs/common/python/bindings/py_gil.h> +#include <yql/essentials/udfs/common/python/bindings/py_utils.h> +#include <yql/essentials/udfs/common/python/bindings/py_yql_module.h> + +#include <util/generic/yexception.h> +#include <util/stream/str.h> +#include <util/stream/printf.h> +#include <util/string/builder.h> +#include <util/string/cast.h> + +using namespace NYql::NUdf; +using namespace NPython; + +////////////////////////////////////////////////////////////////////////////// +// TPythonFunctionFactory +////////////////////////////////////////////////////////////////////////////// +class TPythonFunctionFactory: public TBoxedValue +{ +public: + TPythonFunctionFactory( + const TStringRef& name, + const TStringRef& tag, + const TType* functionType, + ITypeInfoHelper::TPtr&& helper, + const NYql::NUdf::TSourcePosition& pos) + : Ctx(new TPyContext(helper, tag, pos)) + , FunctionName(name) + , FunctionType_(functionType) + { + } + + ~TPythonFunctionFactory() { + Ctx->Cleanup(); + PyCleanup(); + } + +private: + TUnboxedValue Run( + const IValueBuilder* valueBuilder, + const TUnboxedValuePod* args) const override + { + TPyCastContext::TPtr castCtx = MakeIntrusive<TPyCastContext>(valueBuilder, Ctx); + + // for get propper c-compatible null-terminating string + TString source(args[0].AsStringRef()); + + TPyGilLocker lock; + TPyObjectPtr module = CompileModule(FunctionName, source); + if (!module) { + UdfTerminate((TStringBuilder() << Ctx->Pos << "Failed to compile module: " << GetLastErrorAsString()).data()); + } + + TPyObjectPtr function(PyObject_GetAttrString(module.Get(), FunctionName.data())); + if (!function) { + UdfTerminate((TStringBuilder() << Ctx->Pos << "Failed to find entry point: " << GetLastErrorAsString()).data()); + } + + if (!PyCallable_Check(function.Get())) { + UdfTerminate((TStringBuilder() << Ctx->Pos << "Entry point is not a callable").data()); + } + + try { + SetupCallableSettings(castCtx, function.Get()); + } catch (const yexception& e) { + UdfTerminate((TStringBuilder() << Ctx->Pos << "Failed to setup callable settings: " + << e.what()).data()); + } + return FromPyCallable(castCtx, FunctionType_, function.Release()); + } + + static TPyObjectPtr CompileModule(const TString& name, const TString& source) { + unsigned int moduleNum = AtomicCounter++; + TString filename(TStringBuf("embedded:")); + filename += name; + + TPyObjectPtr module, code; + if (HasEncodingCookie(source)) { + code.ResetSteal(Py_CompileString(source.data(), filename.data(), Py_file_input)); + } else { + PyCompilerFlags cflags; + cflags.cf_flags = PyCF_SOURCE_IS_UTF8; + + code.ResetSteal(Py_CompileStringFlags( + source.data(), filename.data(), Py_file_input, &cflags)); + } + + if (code) { + TString nameWithNum = name + ToString(moduleNum); + char* moduleName = const_cast<char*>(nameWithNum.data()); + module.ResetSteal(PyImport_ExecCodeModule(moduleName, code.Get())); + } + + return module; + } + + const TPyContext::TPtr Ctx; + const TString FunctionName; + const TType* FunctionType_; + inline static std::atomic_uint AtomicCounter = 0; +}; diff --git a/yql/essentials/udfs/common/python/python_udf/python_udf.cpp b/yql/essentials/udfs/common/python/python_udf/python_udf.cpp new file mode 100644 index 0000000000..b1739a1775 --- /dev/null +++ b/yql/essentials/udfs/common/python/python_udf/python_udf.cpp @@ -0,0 +1,232 @@ +#include "python_udf.h" +#include "python_function_factory.h" + +#include <yql/essentials/public/udf/udf_version.h> +#include <yql/essentials/udfs/common/python/bindings/py_utils.h> + +#include <util/generic/vector.h> +#include <util/system/execpath.h> + +namespace { + +#if PY_MAJOR_VERSION >= 3 +#define PYTHON_PROGRAMM_NAME L"YQL::Python3" +#else +#define PYTHON_PROGRAMM_NAME "YQL::Python2" +#endif + +int AddToPythonPath(const TVector<TStringBuf>& pathVals) +{ + char pathVar[] = "path"; // PySys_{Get,Set}Object take a non-const char* arg + + TPyObjectPtr sysPath(PySys_GetObject(pathVar), TPyObjectPtr::ADD_REF); + if (!sysPath) return -1; + + for (const auto& val: pathVals) { + TPyObjectPtr pyStr = PyRepr(val.data()); + int rc = PyList_Append(sysPath.Get(), pyStr.Get()); + if (rc != 0) { + return rc; + } + } + + return PySys_SetObject(pathVar, sysPath.Get()); +} + +void InitArcadiaPythonRuntime() +{ + // Arcadia static python import hook resides in __res module + // It modifies sys.meta_path upon import + + TPyObjectPtr mod(PyImport_ImportModule("__res")); + Y_ABORT_UNLESS(mod, "Can't import arcadia python runtime"); +} + +////////////////////////////////////////////////////////////////////////////// +// TPythonModule +////////////////////////////////////////////////////////////////////////////// +class TPythonModule: public IUdfModule +{ +public: + TPythonModule(const TString& resourceName, EPythonFlavor pythonFlavor, bool standalone = true) + : ResourceName(resourceName), Standalone(standalone) + { + if (Standalone) { + Py_SetProgramName(PYTHON_PROGRAMM_NAME); + PrepareYqlModule(); + Py_Initialize(); + } + + InitYqlModule(pythonFlavor, standalone); + + const auto rc = PyRun_SimpleString(R"( +# numpy on import may find installed openblas library and load it, +# which in turn causes it to start CPUCOUNT threads +# with approx. 40Mb memory reserved for each thread; +# +# See more detailed explanation here: https://st.yandex-team.ru/STATLIBS-1715#5bfc68ecbbc039001cec572a +# +# Thus, we reduce negative effects as much as possible +import os +os.environ['OPENBLAS_NUM_THREADS'] = '1' + + +# Following part allows us later to format tracebacks via sys.excepthook +# in thread-safe manner +import sys +import threading +if sys.version_info >= (3, 0): + from io import StringIO, TextIOWrapper as SysStderrType +else: + from cStringIO import StringIO + SysStderrType = file + +class StderrLocal(threading.local): + + def __init__(self): + self.is_real_mode = True + self.buffer = StringIO() + + +class StderrProxy(object): + def __init__(self, stderr): + self._stderr = stderr + self._tls = StderrLocal() + + def _toggle_real_mode(self): + self._tls.is_real_mode = not self._tls.is_real_mode + if not self._tls.is_real_mode: + self._tls.buffer.clear() + + def _get_value(self): + assert not self._tls.is_real_mode + return self._tls.buffer.getvalue() + + def __getattr__(self, attr): + target = self._stderr + if not self._tls.is_real_mode: + target = self._tls.buffer + + return getattr(target, attr) + +if isinstance(sys.stderr, SysStderrType): + sys.stderr = StderrProxy(sys.stderr) +)"); + Y_ABORT_UNLESS(rc >= 0, "Can't setup module"); + + if (pythonFlavor == EPythonFlavor::Arcadia) { + InitArcadiaPythonRuntime(); + } + +#ifndef _win_ + if (Standalone) { + TVector<TStringBuf> paths; + if (pythonFlavor == EPythonFlavor::System) { + paths.push_back(TStringBuf("/usr/lib/python2.7/dist-packages")); + } + paths.push_back(TStringBuf(".")); + const auto r = AddToPythonPath(paths); + Y_ABORT_UNLESS(r >= 0, "Can't add dist-packages into sys.path"); + } +#endif + + char executableVar[] = "executable"; // PySys_{Get,Set}Object take a non-const char* arg + TPyObjectPtr pyExecutableStr = PyRepr(GetExecPath().data()); + Y_ABORT_UNLESS(PySys_SetObject(executableVar, pyExecutableStr.Get()) >= 0, "Can't set sys.executable"); + + if (Standalone) { + PyEval_InitThreads(); + MainThreadState_ = PyEval_SaveThread(); + } + } + + ~TPythonModule() { + if (Standalone) { + PyEval_RestoreThread(MainThreadState_); + Py_Finalize(); + } + } + + void CleanupOnTerminate() const final { + PyCleanup(); + } + + void GetAllFunctions(IFunctionsSink&) const final {} + + void BuildFunctionTypeInfo( + const TStringRef& name, + TType* userType, + const TStringRef& typeConfig, + ui32 flags, + IFunctionTypeInfoBuilder& builder) const final + { + Y_UNUSED(typeConfig); + + if (flags & TFlags::TypesOnly) { + return; + } + + try { + auto typeHelper = builder.TypeInfoHelper(); + if (ETypeKind::Callable != typeHelper->GetTypeKind(userType)) { + return builder.SetError(TStringRef::Of("Expected callable type")); + } + + const auto pos = builder.GetSourcePosition(); + builder.Implementation(new TPythonFunctionFactory(name, ResourceName, userType, std::move(typeHelper), pos)); + } catch (const yexception& e) { + builder.SetError(TStringBuf(e.what())); + } + } + +private: + TString ResourceName; + bool Standalone; + PyThreadState* MainThreadState_; +}; + +////////////////////////////////////////////////////////////////////////////// +// TStubModule +////////////////////////////////////////////////////////////////////////////// +class TStubModule: public IUdfModule { + void GetAllFunctions(IFunctionsSink&) const final {} + + void BuildFunctionTypeInfo( + const TStringRef& /*name*/, + TType* /*userType*/, + const TStringRef& /*typeConfig*/, + ui32 flags, + IFunctionTypeInfoBuilder& /*builder*/) const final + { + Y_DEBUG_ABORT_UNLESS(flags & TFlags::TypesOnly, + "in stub module this function can be called only for types loading"); + } + + void CleanupOnTerminate() const final {} +}; + +} // namespace + +void NKikimr::NUdf::RegisterYqlPythonUdf( + IRegistrator& registrator, + ui32 flags, + TStringBuf moduleName, + TStringBuf resourceName, + EPythonFlavor pythonFlavor) +{ + if (flags & IRegistrator::TFlags::TypesOnly) { + registrator.AddModule(moduleName, new TStubModule); + } else { + registrator.AddModule( + moduleName, + NKikimr::NUdf::GetYqlPythonUdfModule(resourceName, pythonFlavor, true) + ); + } +} + +TUniquePtr<NKikimr::NUdf::IUdfModule> NKikimr::NUdf::GetYqlPythonUdfModule( + TStringBuf resourceName, NKikimr::NUdf::EPythonFlavor pythonFlavor, + bool standalone +) { + return new TPythonModule(TString(resourceName), pythonFlavor, standalone); +} diff --git a/yql/essentials/udfs/common/python/python_udf/python_udf.h b/yql/essentials/udfs/common/python/python_udf/python_udf.h new file mode 100644 index 0000000000..16d7da096d --- /dev/null +++ b/yql/essentials/udfs/common/python/python_udf/python_udf.h @@ -0,0 +1,26 @@ +#pragma once + +#include <yql/essentials/public/udf/udf_registrator.h> + +namespace NYql { +namespace NUdf { + +enum class EPythonFlavor { + System, + Arcadia, +}; + +void RegisterYqlPythonUdf( + IRegistrator& registrator, + ui32 flags, + TStringBuf moduleName, + TStringBuf resourceName, + EPythonFlavor pythonFlavor); + +TUniquePtr<IUdfModule> GetYqlPythonUdfModule( + TStringBuf resourceName, + EPythonFlavor pythonFlavor, + bool standalone); + +} // namespace NUdf +} // namespace NYql diff --git a/yql/essentials/udfs/common/python/python_udf/python_udfs_exports.exports b/yql/essentials/udfs/common/python/python_udf/python_udfs_exports.exports new file mode 100644 index 0000000000..2ffd6f54b5 --- /dev/null +++ b/yql/essentials/udfs/common/python/python_udf/python_udfs_exports.exports @@ -0,0 +1,5 @@ +C Register +C AbiVersion +C RunPython +C BindSymbols +C SetBackTraceCallback diff --git a/yql/essentials/udfs/common/python/python_udf/ya.make b/yql/essentials/udfs/common/python/python_udf/ya.make new file mode 100644 index 0000000000..9a2090665a --- /dev/null +++ b/yql/essentials/udfs/common/python/python_udf/ya.make @@ -0,0 +1,20 @@ +PY23_NATIVE_LIBRARY() + +YQL_ABI_VERSION(2 27 0) + +SRCS( + python_udf.cpp +) + +PEERDIR( + yql/essentials/public/udf + yql/essentials/udfs/common/python/bindings +) + +CFLAGS( + -DDISABLE_PYDEBUG +) + +NO_COMPILER_WARNINGS() + +END() |