aboutsummaryrefslogtreecommitdiffstats
path: root/yql/essentials/udfs/common/python/python_udf
diff options
context:
space:
mode:
authorimunkin <imunkin@yandex-team.com>2024-11-08 10:00:23 +0300
committerimunkin <imunkin@yandex-team.com>2024-11-08 10:12:13 +0300
commita784a2f943d6e15caa6241e2e96d80aac6dbf375 (patch)
tree05f1e5366c916b988a8afb75bdab8ddeee0f6e6d /yql/essentials/udfs/common/python/python_udf
parentd70137a7b530ccaa52834274913bbb5a3d1ca06e (diff)
downloadydb-a784a2f943d6e15caa6241e2e96d80aac6dbf375.tar.gz
Move yql/udfs/common/ to /yql/essentials YQL-19206
Except the following directories: * clickhouse/client * datetime * knn * roaring commit_hash:c7da95636144d28db109d6b17ddc762e9bacb59f
Diffstat (limited to 'yql/essentials/udfs/common/python/python_udf')
-rw-r--r--yql/essentials/udfs/common/python/python_udf/python_function_factory.h111
-rw-r--r--yql/essentials/udfs/common/python/python_udf/python_udf.cpp232
-rw-r--r--yql/essentials/udfs/common/python/python_udf/python_udf.h26
-rw-r--r--yql/essentials/udfs/common/python/python_udf/python_udfs_exports.exports5
-rw-r--r--yql/essentials/udfs/common/python/python_udf/ya.make20
5 files changed, 394 insertions, 0 deletions
diff --git a/yql/essentials/udfs/common/python/python_udf/python_function_factory.h b/yql/essentials/udfs/common/python/python_udf/python_function_factory.h
new file mode 100644
index 0000000000..a4e393b486
--- /dev/null
+++ b/yql/essentials/udfs/common/python/python_udf/python_function_factory.h
@@ -0,0 +1,111 @@
+#pragma once
+
+#include <yql/essentials/public/udf/udf_value.h>
+#include <yql/essentials/public/udf/udf_value_builder.h>
+#include <yql/essentials/public/udf/udf_type_builder.h>
+#include <yql/essentials/public/udf/udf_registrator.h>
+#include <yql/essentials/public/udf/udf_terminator.h>
+#include <yql/essentials/udfs/common/python/bindings/py_ptr.h>
+#include <yql/essentials/udfs/common/python/bindings/py_callable.h>
+#include <yql/essentials/udfs/common/python/bindings/py_cast.h>
+#include <yql/essentials/udfs/common/python/bindings/py_errors.h>
+#include <yql/essentials/udfs/common/python/bindings/py_gil.h>
+#include <yql/essentials/udfs/common/python/bindings/py_utils.h>
+#include <yql/essentials/udfs/common/python/bindings/py_yql_module.h>
+
+#include <util/generic/yexception.h>
+#include <util/stream/str.h>
+#include <util/stream/printf.h>
+#include <util/string/builder.h>
+#include <util/string/cast.h>
+
+using namespace NYql::NUdf;
+using namespace NPython;
+
+//////////////////////////////////////////////////////////////////////////////
+// TPythonFunctionFactory
+//////////////////////////////////////////////////////////////////////////////
+class TPythonFunctionFactory: public TBoxedValue
+{
+public:
+ TPythonFunctionFactory(
+ const TStringRef& name,
+ const TStringRef& tag,
+ const TType* functionType,
+ ITypeInfoHelper::TPtr&& helper,
+ const NYql::NUdf::TSourcePosition& pos)
+ : Ctx(new TPyContext(helper, tag, pos))
+ , FunctionName(name)
+ , FunctionType_(functionType)
+ {
+ }
+
+ ~TPythonFunctionFactory() {
+ Ctx->Cleanup();
+ PyCleanup();
+ }
+
+private:
+ TUnboxedValue Run(
+ const IValueBuilder* valueBuilder,
+ const TUnboxedValuePod* args) const override
+ {
+ TPyCastContext::TPtr castCtx = MakeIntrusive<TPyCastContext>(valueBuilder, Ctx);
+
+ // for get propper c-compatible null-terminating string
+ TString source(args[0].AsStringRef());
+
+ TPyGilLocker lock;
+ TPyObjectPtr module = CompileModule(FunctionName, source);
+ if (!module) {
+ UdfTerminate((TStringBuilder() << Ctx->Pos << "Failed to compile module: " << GetLastErrorAsString()).data());
+ }
+
+ TPyObjectPtr function(PyObject_GetAttrString(module.Get(), FunctionName.data()));
+ if (!function) {
+ UdfTerminate((TStringBuilder() << Ctx->Pos << "Failed to find entry point: " << GetLastErrorAsString()).data());
+ }
+
+ if (!PyCallable_Check(function.Get())) {
+ UdfTerminate((TStringBuilder() << Ctx->Pos << "Entry point is not a callable").data());
+ }
+
+ try {
+ SetupCallableSettings(castCtx, function.Get());
+ } catch (const yexception& e) {
+ UdfTerminate((TStringBuilder() << Ctx->Pos << "Failed to setup callable settings: "
+ << e.what()).data());
+ }
+ return FromPyCallable(castCtx, FunctionType_, function.Release());
+ }
+
+ static TPyObjectPtr CompileModule(const TString& name, const TString& source) {
+ unsigned int moduleNum = AtomicCounter++;
+ TString filename(TStringBuf("embedded:"));
+ filename += name;
+
+ TPyObjectPtr module, code;
+ if (HasEncodingCookie(source)) {
+ code.ResetSteal(Py_CompileString(source.data(), filename.data(), Py_file_input));
+ } else {
+ PyCompilerFlags cflags;
+ cflags.cf_flags = PyCF_SOURCE_IS_UTF8;
+
+ code.ResetSteal(Py_CompileStringFlags(
+ source.data(), filename.data(), Py_file_input, &cflags));
+ }
+
+ if (code) {
+ TString nameWithNum = name + ToString(moduleNum);
+ char* moduleName = const_cast<char*>(nameWithNum.data());
+ module.ResetSteal(PyImport_ExecCodeModule(moduleName, code.Get()));
+ }
+
+ return module;
+ }
+
+ const TPyContext::TPtr Ctx;
+ const TString FunctionName;
+ const TType* FunctionType_;
+ inline static std::atomic_uint AtomicCounter = 0;
+};
diff --git a/yql/essentials/udfs/common/python/python_udf/python_udf.cpp b/yql/essentials/udfs/common/python/python_udf/python_udf.cpp
new file mode 100644
index 0000000000..b1739a1775
--- /dev/null
+++ b/yql/essentials/udfs/common/python/python_udf/python_udf.cpp
@@ -0,0 +1,232 @@
+#include "python_udf.h"
+#include "python_function_factory.h"
+
+#include <yql/essentials/public/udf/udf_version.h>
+#include <yql/essentials/udfs/common/python/bindings/py_utils.h>
+
+#include <util/generic/vector.h>
+#include <util/system/execpath.h>
+
+namespace {
+
+#if PY_MAJOR_VERSION >= 3
+#define PYTHON_PROGRAMM_NAME L"YQL::Python3"
+#else
+#define PYTHON_PROGRAMM_NAME "YQL::Python2"
+#endif
+
+int AddToPythonPath(const TVector<TStringBuf>& pathVals)
+{
+ char pathVar[] = "path"; // PySys_{Get,Set}Object take a non-const char* arg
+
+ TPyObjectPtr sysPath(PySys_GetObject(pathVar), TPyObjectPtr::ADD_REF);
+ if (!sysPath) return -1;
+
+ for (const auto& val: pathVals) {
+ TPyObjectPtr pyStr = PyRepr(val.data());
+ int rc = PyList_Append(sysPath.Get(), pyStr.Get());
+ if (rc != 0) {
+ return rc;
+ }
+ }
+
+ return PySys_SetObject(pathVar, sysPath.Get());
+}
+
+void InitArcadiaPythonRuntime()
+{
+ // Arcadia static python import hook resides in __res module
+ // It modifies sys.meta_path upon import
+
+ TPyObjectPtr mod(PyImport_ImportModule("__res"));
+ Y_ABORT_UNLESS(mod, "Can't import arcadia python runtime");
+}
+
+//////////////////////////////////////////////////////////////////////////////
+// TPythonModule
+//////////////////////////////////////////////////////////////////////////////
+class TPythonModule: public IUdfModule
+{
+public:
+ TPythonModule(const TString& resourceName, EPythonFlavor pythonFlavor, bool standalone = true)
+ : ResourceName(resourceName), Standalone(standalone)
+ {
+ if (Standalone) {
+ Py_SetProgramName(PYTHON_PROGRAMM_NAME);
+ PrepareYqlModule();
+ Py_Initialize();
+ }
+
+ InitYqlModule(pythonFlavor, standalone);
+
+ const auto rc = PyRun_SimpleString(R"(
+# numpy on import may find installed openblas library and load it,
+# which in turn causes it to start CPUCOUNT threads
+# with approx. 40Mb memory reserved for each thread;
+#
+# See more detailed explanation here: https://st.yandex-team.ru/STATLIBS-1715#5bfc68ecbbc039001cec572a
+#
+# Thus, we reduce negative effects as much as possible
+import os
+os.environ['OPENBLAS_NUM_THREADS'] = '1'
+
+
+# Following part allows us later to format tracebacks via sys.excepthook
+# in thread-safe manner
+import sys
+import threading
+if sys.version_info >= (3, 0):
+ from io import StringIO, TextIOWrapper as SysStderrType
+else:
+ from cStringIO import StringIO
+ SysStderrType = file
+
+class StderrLocal(threading.local):
+
+ def __init__(self):
+ self.is_real_mode = True
+ self.buffer = StringIO()
+
+
+class StderrProxy(object):
+ def __init__(self, stderr):
+ self._stderr = stderr
+ self._tls = StderrLocal()
+
+ def _toggle_real_mode(self):
+ self._tls.is_real_mode = not self._tls.is_real_mode
+ if not self._tls.is_real_mode:
+ self._tls.buffer.clear()
+
+ def _get_value(self):
+ assert not self._tls.is_real_mode
+ return self._tls.buffer.getvalue()
+
+ def __getattr__(self, attr):
+ target = self._stderr
+ if not self._tls.is_real_mode:
+ target = self._tls.buffer
+
+ return getattr(target, attr)
+
+if isinstance(sys.stderr, SysStderrType):
+ sys.stderr = StderrProxy(sys.stderr)
+)");
+ Y_ABORT_UNLESS(rc >= 0, "Can't setup module");
+
+ if (pythonFlavor == EPythonFlavor::Arcadia) {
+ InitArcadiaPythonRuntime();
+ }
+
+#ifndef _win_
+ if (Standalone) {
+ TVector<TStringBuf> paths;
+ if (pythonFlavor == EPythonFlavor::System) {
+ paths.push_back(TStringBuf("/usr/lib/python2.7/dist-packages"));
+ }
+ paths.push_back(TStringBuf("."));
+ const auto r = AddToPythonPath(paths);
+ Y_ABORT_UNLESS(r >= 0, "Can't add dist-packages into sys.path");
+ }
+#endif
+
+ char executableVar[] = "executable"; // PySys_{Get,Set}Object take a non-const char* arg
+ TPyObjectPtr pyExecutableStr = PyRepr(GetExecPath().data());
+ Y_ABORT_UNLESS(PySys_SetObject(executableVar, pyExecutableStr.Get()) >= 0, "Can't set sys.executable");
+
+ if (Standalone) {
+ PyEval_InitThreads();
+ MainThreadState_ = PyEval_SaveThread();
+ }
+ }
+
+ ~TPythonModule() {
+ if (Standalone) {
+ PyEval_RestoreThread(MainThreadState_);
+ Py_Finalize();
+ }
+ }
+
+ void CleanupOnTerminate() const final {
+ PyCleanup();
+ }
+
+ void GetAllFunctions(IFunctionsSink&) const final {}
+
+ void BuildFunctionTypeInfo(
+ const TStringRef& name,
+ TType* userType,
+ const TStringRef& typeConfig,
+ ui32 flags,
+ IFunctionTypeInfoBuilder& builder) const final
+ {
+ Y_UNUSED(typeConfig);
+
+ if (flags & TFlags::TypesOnly) {
+ return;
+ }
+
+ try {
+ auto typeHelper = builder.TypeInfoHelper();
+ if (ETypeKind::Callable != typeHelper->GetTypeKind(userType)) {
+ return builder.SetError(TStringRef::Of("Expected callable type"));
+ }
+
+ const auto pos = builder.GetSourcePosition();
+ builder.Implementation(new TPythonFunctionFactory(name, ResourceName, userType, std::move(typeHelper), pos));
+ } catch (const yexception& e) {
+ builder.SetError(TStringBuf(e.what()));
+ }
+ }
+
+private:
+ TString ResourceName;
+ bool Standalone;
+ PyThreadState* MainThreadState_;
+};
+
+//////////////////////////////////////////////////////////////////////////////
+// TStubModule
+//////////////////////////////////////////////////////////////////////////////
+class TStubModule: public IUdfModule {
+ void GetAllFunctions(IFunctionsSink&) const final {}
+
+ void BuildFunctionTypeInfo(
+ const TStringRef& /*name*/,
+ TType* /*userType*/,
+ const TStringRef& /*typeConfig*/,
+ ui32 flags,
+ IFunctionTypeInfoBuilder& /*builder*/) const final
+ {
+ Y_DEBUG_ABORT_UNLESS(flags & TFlags::TypesOnly,
+ "in stub module this function can be called only for types loading");
+ }
+
+ void CleanupOnTerminate() const final {}
+};
+
+} // namespace
+
+void NKikimr::NUdf::RegisterYqlPythonUdf(
+ IRegistrator& registrator,
+ ui32 flags,
+ TStringBuf moduleName,
+ TStringBuf resourceName,
+ EPythonFlavor pythonFlavor)
+{
+ if (flags & IRegistrator::TFlags::TypesOnly) {
+ registrator.AddModule(moduleName, new TStubModule);
+ } else {
+ registrator.AddModule(
+ moduleName,
+ NKikimr::NUdf::GetYqlPythonUdfModule(resourceName, pythonFlavor, true)
+ );
+ }
+}
+
+TUniquePtr<NKikimr::NUdf::IUdfModule> NKikimr::NUdf::GetYqlPythonUdfModule(
+ TStringBuf resourceName, NKikimr::NUdf::EPythonFlavor pythonFlavor,
+ bool standalone
+) {
+ return new TPythonModule(TString(resourceName), pythonFlavor, standalone);
+}
diff --git a/yql/essentials/udfs/common/python/python_udf/python_udf.h b/yql/essentials/udfs/common/python/python_udf/python_udf.h
new file mode 100644
index 0000000000..16d7da096d
--- /dev/null
+++ b/yql/essentials/udfs/common/python/python_udf/python_udf.h
@@ -0,0 +1,26 @@
+#pragma once
+
+#include <yql/essentials/public/udf/udf_registrator.h>
+
+namespace NYql {
+namespace NUdf {
+
+enum class EPythonFlavor {
+ System,
+ Arcadia,
+};
+
+void RegisterYqlPythonUdf(
+ IRegistrator& registrator,
+ ui32 flags,
+ TStringBuf moduleName,
+ TStringBuf resourceName,
+ EPythonFlavor pythonFlavor);
+
+TUniquePtr<IUdfModule> GetYqlPythonUdfModule(
+ TStringBuf resourceName,
+ EPythonFlavor pythonFlavor,
+ bool standalone);
+
+} // namespace NUdf
+} // namespace NYql
diff --git a/yql/essentials/udfs/common/python/python_udf/python_udfs_exports.exports b/yql/essentials/udfs/common/python/python_udf/python_udfs_exports.exports
new file mode 100644
index 0000000000..2ffd6f54b5
--- /dev/null
+++ b/yql/essentials/udfs/common/python/python_udf/python_udfs_exports.exports
@@ -0,0 +1,5 @@
+C Register
+C AbiVersion
+C RunPython
+C BindSymbols
+C SetBackTraceCallback
diff --git a/yql/essentials/udfs/common/python/python_udf/ya.make b/yql/essentials/udfs/common/python/python_udf/ya.make
new file mode 100644
index 0000000000..9a2090665a
--- /dev/null
+++ b/yql/essentials/udfs/common/python/python_udf/ya.make
@@ -0,0 +1,20 @@
+PY23_NATIVE_LIBRARY()
+
+YQL_ABI_VERSION(2 27 0)
+
+SRCS(
+ python_udf.cpp
+)
+
+PEERDIR(
+ yql/essentials/public/udf
+ yql/essentials/udfs/common/python/bindings
+)
+
+CFLAGS(
+ -DDISABLE_PYDEBUG
+)
+
+NO_COMPILER_WARNINGS()
+
+END()