aboutsummaryrefslogtreecommitdiffstats
path: root/yql/essentials/udfs/common/python/python_udf/python_udf.cpp
diff options
context:
space:
mode:
authorimunkin <imunkin@yandex-team.com>2024-11-08 10:00:23 +0300
committerimunkin <imunkin@yandex-team.com>2024-11-08 10:12:13 +0300
commita784a2f943d6e15caa6241e2e96d80aac6dbf375 (patch)
tree05f1e5366c916b988a8afb75bdab8ddeee0f6e6d /yql/essentials/udfs/common/python/python_udf/python_udf.cpp
parentd70137a7b530ccaa52834274913bbb5a3d1ca06e (diff)
downloadydb-a784a2f943d6e15caa6241e2e96d80aac6dbf375.tar.gz
Move yql/udfs/common/ to /yql/essentials YQL-19206
Except the following directories: * clickhouse/client * datetime * knn * roaring commit_hash:c7da95636144d28db109d6b17ddc762e9bacb59f
Diffstat (limited to 'yql/essentials/udfs/common/python/python_udf/python_udf.cpp')
-rw-r--r--yql/essentials/udfs/common/python/python_udf/python_udf.cpp232
1 files changed, 232 insertions, 0 deletions
diff --git a/yql/essentials/udfs/common/python/python_udf/python_udf.cpp b/yql/essentials/udfs/common/python/python_udf/python_udf.cpp
new file mode 100644
index 0000000000..b1739a1775
--- /dev/null
+++ b/yql/essentials/udfs/common/python/python_udf/python_udf.cpp
@@ -0,0 +1,232 @@
+#include "python_udf.h"
+#include "python_function_factory.h"
+
+#include <yql/essentials/public/udf/udf_version.h>
+#include <yql/essentials/udfs/common/python/bindings/py_utils.h>
+
+#include <util/generic/vector.h>
+#include <util/system/execpath.h>
+
+namespace {
+
+#if PY_MAJOR_VERSION >= 3
+#define PYTHON_PROGRAMM_NAME L"YQL::Python3"
+#else
+#define PYTHON_PROGRAMM_NAME "YQL::Python2"
+#endif
+
+int AddToPythonPath(const TVector<TStringBuf>& pathVals)
+{
+ char pathVar[] = "path"; // PySys_{Get,Set}Object take a non-const char* arg
+
+ TPyObjectPtr sysPath(PySys_GetObject(pathVar), TPyObjectPtr::ADD_REF);
+ if (!sysPath) return -1;
+
+ for (const auto& val: pathVals) {
+ TPyObjectPtr pyStr = PyRepr(val.data());
+ int rc = PyList_Append(sysPath.Get(), pyStr.Get());
+ if (rc != 0) {
+ return rc;
+ }
+ }
+
+ return PySys_SetObject(pathVar, sysPath.Get());
+}
+
+void InitArcadiaPythonRuntime()
+{
+ // Arcadia static python import hook resides in __res module
+ // It modifies sys.meta_path upon import
+
+ TPyObjectPtr mod(PyImport_ImportModule("__res"));
+ Y_ABORT_UNLESS(mod, "Can't import arcadia python runtime");
+}
+
+//////////////////////////////////////////////////////////////////////////////
+// TPythonModule
+//////////////////////////////////////////////////////////////////////////////
+class TPythonModule: public IUdfModule
+{
+public:
+ TPythonModule(const TString& resourceName, EPythonFlavor pythonFlavor, bool standalone = true)
+ : ResourceName(resourceName), Standalone(standalone)
+ {
+ if (Standalone) {
+ Py_SetProgramName(PYTHON_PROGRAMM_NAME);
+ PrepareYqlModule();
+ Py_Initialize();
+ }
+
+ InitYqlModule(pythonFlavor, standalone);
+
+ const auto rc = PyRun_SimpleString(R"(
+# numpy on import may find installed openblas library and load it,
+# which in turn causes it to start CPUCOUNT threads
+# with approx. 40Mb memory reserved for each thread;
+#
+# See more detailed explanation here: https://st.yandex-team.ru/STATLIBS-1715#5bfc68ecbbc039001cec572a
+#
+# Thus, we reduce negative effects as much as possible
+import os
+os.environ['OPENBLAS_NUM_THREADS'] = '1'
+
+
+# Following part allows us later to format tracebacks via sys.excepthook
+# in thread-safe manner
+import sys
+import threading
+if sys.version_info >= (3, 0):
+ from io import StringIO, TextIOWrapper as SysStderrType
+else:
+ from cStringIO import StringIO
+ SysStderrType = file
+
+class StderrLocal(threading.local):
+
+ def __init__(self):
+ self.is_real_mode = True
+ self.buffer = StringIO()
+
+
+class StderrProxy(object):
+ def __init__(self, stderr):
+ self._stderr = stderr
+ self._tls = StderrLocal()
+
+ def _toggle_real_mode(self):
+ self._tls.is_real_mode = not self._tls.is_real_mode
+ if not self._tls.is_real_mode:
+ self._tls.buffer.clear()
+
+ def _get_value(self):
+ assert not self._tls.is_real_mode
+ return self._tls.buffer.getvalue()
+
+ def __getattr__(self, attr):
+ target = self._stderr
+ if not self._tls.is_real_mode:
+ target = self._tls.buffer
+
+ return getattr(target, attr)
+
+if isinstance(sys.stderr, SysStderrType):
+ sys.stderr = StderrProxy(sys.stderr)
+)");
+ Y_ABORT_UNLESS(rc >= 0, "Can't setup module");
+
+ if (pythonFlavor == EPythonFlavor::Arcadia) {
+ InitArcadiaPythonRuntime();
+ }
+
+#ifndef _win_
+ if (Standalone) {
+ TVector<TStringBuf> paths;
+ if (pythonFlavor == EPythonFlavor::System) {
+ paths.push_back(TStringBuf("/usr/lib/python2.7/dist-packages"));
+ }
+ paths.push_back(TStringBuf("."));
+ const auto r = AddToPythonPath(paths);
+ Y_ABORT_UNLESS(r >= 0, "Can't add dist-packages into sys.path");
+ }
+#endif
+
+ char executableVar[] = "executable"; // PySys_{Get,Set}Object take a non-const char* arg
+ TPyObjectPtr pyExecutableStr = PyRepr(GetExecPath().data());
+ Y_ABORT_UNLESS(PySys_SetObject(executableVar, pyExecutableStr.Get()) >= 0, "Can't set sys.executable");
+
+ if (Standalone) {
+ PyEval_InitThreads();
+ MainThreadState_ = PyEval_SaveThread();
+ }
+ }
+
+ ~TPythonModule() {
+ if (Standalone) {
+ PyEval_RestoreThread(MainThreadState_);
+ Py_Finalize();
+ }
+ }
+
+ void CleanupOnTerminate() const final {
+ PyCleanup();
+ }
+
+ void GetAllFunctions(IFunctionsSink&) const final {}
+
+ void BuildFunctionTypeInfo(
+ const TStringRef& name,
+ TType* userType,
+ const TStringRef& typeConfig,
+ ui32 flags,
+ IFunctionTypeInfoBuilder& builder) const final
+ {
+ Y_UNUSED(typeConfig);
+
+ if (flags & TFlags::TypesOnly) {
+ return;
+ }
+
+ try {
+ auto typeHelper = builder.TypeInfoHelper();
+ if (ETypeKind::Callable != typeHelper->GetTypeKind(userType)) {
+ return builder.SetError(TStringRef::Of("Expected callable type"));
+ }
+
+ const auto pos = builder.GetSourcePosition();
+ builder.Implementation(new TPythonFunctionFactory(name, ResourceName, userType, std::move(typeHelper), pos));
+ } catch (const yexception& e) {
+ builder.SetError(TStringBuf(e.what()));
+ }
+ }
+
+private:
+ TString ResourceName;
+ bool Standalone;
+ PyThreadState* MainThreadState_;
+};
+
+//////////////////////////////////////////////////////////////////////////////
+// TStubModule
+//////////////////////////////////////////////////////////////////////////////
+class TStubModule: public IUdfModule {
+ void GetAllFunctions(IFunctionsSink&) const final {}
+
+ void BuildFunctionTypeInfo(
+ const TStringRef& /*name*/,
+ TType* /*userType*/,
+ const TStringRef& /*typeConfig*/,
+ ui32 flags,
+ IFunctionTypeInfoBuilder& /*builder*/) const final
+ {
+ Y_DEBUG_ABORT_UNLESS(flags & TFlags::TypesOnly,
+ "in stub module this function can be called only for types loading");
+ }
+
+ void CleanupOnTerminate() const final {}
+};
+
+} // namespace
+
+void NKikimr::NUdf::RegisterYqlPythonUdf(
+ IRegistrator& registrator,
+ ui32 flags,
+ TStringBuf moduleName,
+ TStringBuf resourceName,
+ EPythonFlavor pythonFlavor)
+{
+ if (flags & IRegistrator::TFlags::TypesOnly) {
+ registrator.AddModule(moduleName, new TStubModule);
+ } else {
+ registrator.AddModule(
+ moduleName,
+ NKikimr::NUdf::GetYqlPythonUdfModule(resourceName, pythonFlavor, true)
+ );
+ }
+}
+
+TUniquePtr<NKikimr::NUdf::IUdfModule> NKikimr::NUdf::GetYqlPythonUdfModule(
+ TStringBuf resourceName, NKikimr::NUdf::EPythonFlavor pythonFlavor,
+ bool standalone
+) {
+ return new TPythonModule(TString(resourceName), pythonFlavor, standalone);
+}