#include "python_udf.h"
#include "python_function_factory.h"
#include <yql/essentials/public/udf/udf_version.h>
#include <yql/essentials/udfs/common/python/bindings/py_utils.h>
#include <util/generic/vector.h>
#include <util/system/execpath.h>
namespace {
#if PY_MAJOR_VERSION >= 3
#define PYTHON_PROGRAMM_NAME L"YQL::Python3"
#else
#define PYTHON_PROGRAMM_NAME "YQL::Python2"
#endif
int AddToPythonPath(const TVector<TStringBuf>& pathVals)
{
char pathVar[] = "path"; // PySys_{Get,Set}Object take a non-const char* arg
TPyObjectPtr sysPath(PySys_GetObject(pathVar), TPyObjectPtr::ADD_REF);
if (!sysPath) return -1;
for (const auto& val: pathVals) {
TPyObjectPtr pyStr = PyRepr(val.data());
int rc = PyList_Append(sysPath.Get(), pyStr.Get());
if (rc != 0) {
return rc;
}
}
return PySys_SetObject(pathVar, sysPath.Get());
}
void InitArcadiaPythonRuntime()
{
// Arcadia static python import hook resides in __res module
// It modifies sys.meta_path upon import
TPyObjectPtr mod(PyImport_ImportModule("__res"));
Y_ABORT_UNLESS(mod, "Can't import arcadia python runtime");
}
//////////////////////////////////////////////////////////////////////////////
// TPythonModule
//////////////////////////////////////////////////////////////////////////////
class TPythonModule: public IUdfModule
{
public:
TPythonModule(const TString& resourceName, EPythonFlavor pythonFlavor, bool standalone = true)
: ResourceName(resourceName), Standalone(standalone)
{
if (Standalone) {
Py_SetProgramName(PYTHON_PROGRAMM_NAME);
PrepareYqlModule();
Py_Initialize();
}
InitYqlModule(pythonFlavor, standalone);
const auto rc = PyRun_SimpleString(R"(
# numpy on import may find installed openblas library and load it,
# which in turn causes it to start CPUCOUNT threads
# with approx. 40Mb memory reserved for each thread;
#
# See more detailed explanation here: https://st.yandex-team.ru/STATLIBS-1715#5bfc68ecbbc039001cec572a
#
# Thus, we reduce negative effects as much as possible
import os
os.environ['OPENBLAS_NUM_THREADS'] = '1'
# Following part allows us later to format tracebacks via sys.excepthook
# in thread-safe manner
import sys
import threading
if sys.version_info >= (3, 0):
from io import StringIO, TextIOWrapper as SysStderrType
else:
from cStringIO import StringIO
SysStderrType = file
class StderrLocal(threading.local):
def __init__(self):
self.is_real_mode = True
self.buffer = StringIO()
class StderrProxy(object):
def __init__(self, stderr):
self._stderr = stderr
self._tls = StderrLocal()
def _toggle_real_mode(self):
self._tls.is_real_mode = not self._tls.is_real_mode
if not self._tls.is_real_mode:
self._tls.buffer.clear()
def _get_value(self):
assert not self._tls.is_real_mode
return self._tls.buffer.getvalue()
def __getattr__(self, attr):
target = self._stderr
if not self._tls.is_real_mode:
target = self._tls.buffer
return getattr(target, attr)
if isinstance(sys.stderr, SysStderrType):
sys.stderr = StderrProxy(sys.stderr)
)");
Y_ABORT_UNLESS(rc >= 0, "Can't setup module");
if (pythonFlavor == EPythonFlavor::Arcadia) {
InitArcadiaPythonRuntime();
}
#ifndef _win_
if (Standalone) {
TVector<TStringBuf> paths;
if (pythonFlavor == EPythonFlavor::System) {
paths.push_back(TStringBuf("/usr/lib/python2.7/dist-packages"));
}
paths.push_back(TStringBuf("."));
const auto r = AddToPythonPath(paths);
Y_ABORT_UNLESS(r >= 0, "Can't add dist-packages into sys.path");
}
#endif
char executableVar[] = "executable"; // PySys_{Get,Set}Object take a non-const char* arg
TPyObjectPtr pyExecutableStr = PyRepr(GetExecPath().data());
Y_ABORT_UNLESS(PySys_SetObject(executableVar, pyExecutableStr.Get()) >= 0, "Can't set sys.executable");
if (Standalone) {
PyEval_InitThreads();
MainThreadState_ = PyEval_SaveThread();
}
}
~TPythonModule() {
if (Standalone) {
PyEval_RestoreThread(MainThreadState_);
Py_Finalize();
}
}
void CleanupOnTerminate() const final {
PyCleanup();
}
void GetAllFunctions(IFunctionsSink&) const final {}
void BuildFunctionTypeInfo(
const TStringRef& name,
TType* userType,
const TStringRef& typeConfig,
ui32 flags,
IFunctionTypeInfoBuilder& builder) const final
{
Y_UNUSED(typeConfig);
if (flags & TFlags::TypesOnly) {
return;
}
try {
auto typeHelper = builder.TypeInfoHelper();
if (ETypeKind::Callable != typeHelper->GetTypeKind(userType)) {
return builder.SetError(TStringRef::Of("Expected callable type"));
}
const auto pos = builder.GetSourcePosition();
builder.Implementation(new TPythonFunctionFactory(name, ResourceName, userType, std::move(typeHelper), pos));
} catch (const yexception& e) {
builder.SetError(TStringBuf(e.what()));
}
}
private:
TString ResourceName;
bool Standalone;
PyThreadState* MainThreadState_;
};
//////////////////////////////////////////////////////////////////////////////
// TStubModule
//////////////////////////////////////////////////////////////////////////////
class TStubModule: public IUdfModule {
void GetAllFunctions(IFunctionsSink&) const final {}
void BuildFunctionTypeInfo(
const TStringRef& /*name*/,
TType* /*userType*/,
const TStringRef& /*typeConfig*/,
ui32 flags,
IFunctionTypeInfoBuilder& /*builder*/) const final
{
Y_DEBUG_ABORT_UNLESS(flags & TFlags::TypesOnly,
"in stub module this function can be called only for types loading");
}
void CleanupOnTerminate() const final {}
};
} // namespace
void NKikimr::NUdf::RegisterYqlPythonUdf(
IRegistrator& registrator,
ui32 flags,
TStringBuf moduleName,
TStringBuf resourceName,
EPythonFlavor pythonFlavor)
{
if (flags & IRegistrator::TFlags::TypesOnly) {
registrator.AddModule(moduleName, new TStubModule);
} else {
registrator.AddModule(
moduleName,
NKikimr::NUdf::GetYqlPythonUdfModule(resourceName, pythonFlavor, true)
);
}
}
TUniquePtr<NKikimr::NUdf::IUdfModule> NKikimr::NUdf::GetYqlPythonUdfModule(
TStringBuf resourceName, NKikimr::NUdf::EPythonFlavor pythonFlavor,
bool standalone
) {
return new TPythonModule(TString(resourceName), pythonFlavor, standalone);
}