summaryrefslogtreecommitdiffstats
path: root/yql/essentials/udfs/common/python/bindings/py_stream.cpp
diff options
context:
space:
mode:
authorvvvv <[email protected]>2025-10-06 13:26:25 +0300
committervvvv <[email protected]>2025-10-06 14:06:25 +0300
commiteca8ce9cb1613d5c983185c4e43c20651a9638aa (patch)
tree61ee5ae779948e61af9a7691d19eaa2c09869121 /yql/essentials/udfs/common/python/bindings/py_stream.cpp
parent4adf7eecae16a9b228b28cc5f64c27ef69ad5ec2 (diff)
YQL-20086 udfs
init commit_hash:f9684778bf1ea956965f2360b80b91edb7d4ffbe
Diffstat (limited to 'yql/essentials/udfs/common/python/bindings/py_stream.cpp')
-rw-r--r--yql/essentials/udfs/common/python/bindings/py_stream.cpp176
1 files changed, 89 insertions, 87 deletions
diff --git a/yql/essentials/udfs/common/python/bindings/py_stream.cpp b/yql/essentials/udfs/common/python/bindings/py_stream.cpp
index 24f7e0eb45d..130fc67b83e 100644
--- a/yql/essentials/udfs/common/python/bindings/py_stream.cpp
+++ b/yql/essentials/udfs/common/python/bindings/py_stream.cpp
@@ -41,88 +41,88 @@ struct TPyStream {
}
static PyObject* New(
- const TPyCastContext::TPtr& castCtx,
- const NUdf::TType* type,
- NUdf::IBoxedValuePtr value);
+ const TPyCastContext::TPtr& castCtx,
+ const NUdf::TType* type,
+ NUdf::IBoxedValuePtr value);
static PyObject* Next(PyObject* self);
};
#if PY_MAJOR_VERSION >= 3
-#define Py_TPFLAGS_HAVE_ITER 0 // NOLINT(readability-identifier-naming)
+ #define Py_TPFLAGS_HAVE_ITER 0 // NOLINT(readability-identifier-naming)
#endif
PyTypeObject PyStreamType = {
PyVarObject_HEAD_INIT(&PyType_Type, 0)
- INIT_MEMBER(tp_name , "yql.TStream"),
- INIT_MEMBER(tp_basicsize , sizeof(TPyStream)),
- INIT_MEMBER(tp_itemsize , 0),
- INIT_MEMBER(tp_dealloc , TPyStream::Dealloc),
+ INIT_MEMBER(tp_name, "yql.TStream"),
+ INIT_MEMBER(tp_basicsize, sizeof(TPyStream)),
+ INIT_MEMBER(tp_itemsize, 0),
+ INIT_MEMBER(tp_dealloc, TPyStream::Dealloc),
#if PY_VERSION_HEX < 0x030800b4
- INIT_MEMBER(tp_print , nullptr),
+ INIT_MEMBER(tp_print, nullptr),
#else
INIT_MEMBER(tp_vectorcall_offset, 0),
#endif
- INIT_MEMBER(tp_getattr , nullptr),
- INIT_MEMBER(tp_setattr , nullptr),
+ INIT_MEMBER(tp_getattr, nullptr),
+ INIT_MEMBER(tp_setattr, nullptr),
#if PY_MAJOR_VERSION >= 3
- INIT_MEMBER(tp_as_async , nullptr),
+ INIT_MEMBER(tp_as_async, nullptr),
#else
- INIT_MEMBER(tp_compare , nullptr),
+ INIT_MEMBER(tp_compare, nullptr),
#endif
- INIT_MEMBER(tp_repr , TPyStream::Repr),
- INIT_MEMBER(tp_as_number , nullptr),
- INIT_MEMBER(tp_as_sequence , nullptr),
- INIT_MEMBER(tp_as_mapping , nullptr),
- INIT_MEMBER(tp_hash , nullptr),
- INIT_MEMBER(tp_call , nullptr),
- INIT_MEMBER(tp_str , nullptr),
- INIT_MEMBER(tp_getattro , nullptr),
- INIT_MEMBER(tp_setattro , nullptr),
- INIT_MEMBER(tp_as_buffer , nullptr),
- INIT_MEMBER(tp_flags , Py_TPFLAGS_HAVE_ITER),
- INIT_MEMBER(tp_doc , "yql.TStream object"),
- INIT_MEMBER(tp_traverse , nullptr),
- INIT_MEMBER(tp_clear , nullptr),
- INIT_MEMBER(tp_richcompare , nullptr),
- INIT_MEMBER(tp_weaklistoffset , 0),
- INIT_MEMBER(tp_iter , PyObject_SelfIter),
- INIT_MEMBER(tp_iternext , TPyStream::Next),
- INIT_MEMBER(tp_methods , nullptr),
- INIT_MEMBER(tp_members , nullptr),
- INIT_MEMBER(tp_getset , nullptr),
- INIT_MEMBER(tp_base , nullptr),
- INIT_MEMBER(tp_dict , nullptr),
- INIT_MEMBER(tp_descr_get , nullptr),
- INIT_MEMBER(tp_descr_set , nullptr),
- INIT_MEMBER(tp_dictoffset , 0),
- INIT_MEMBER(tp_init , nullptr),
- INIT_MEMBER(tp_alloc , nullptr),
- INIT_MEMBER(tp_new , nullptr),
- INIT_MEMBER(tp_free , nullptr),
- INIT_MEMBER(tp_is_gc , nullptr),
- INIT_MEMBER(tp_bases , nullptr),
- INIT_MEMBER(tp_mro , nullptr),
- INIT_MEMBER(tp_cache , nullptr),
- INIT_MEMBER(tp_subclasses , nullptr),
- INIT_MEMBER(tp_weaklist , nullptr),
- INIT_MEMBER(tp_del , nullptr),
- INIT_MEMBER(tp_version_tag , 0),
+ INIT_MEMBER(tp_repr, TPyStream::Repr),
+ INIT_MEMBER(tp_as_number, nullptr),
+ INIT_MEMBER(tp_as_sequence, nullptr),
+ INIT_MEMBER(tp_as_mapping, nullptr),
+ INIT_MEMBER(tp_hash, nullptr),
+ INIT_MEMBER(tp_call, nullptr),
+ INIT_MEMBER(tp_str, nullptr),
+ INIT_MEMBER(tp_getattro, nullptr),
+ INIT_MEMBER(tp_setattro, nullptr),
+ INIT_MEMBER(tp_as_buffer, nullptr),
+ INIT_MEMBER(tp_flags, Py_TPFLAGS_HAVE_ITER),
+ INIT_MEMBER(tp_doc, "yql.TStream object"),
+ INIT_MEMBER(tp_traverse, nullptr),
+ INIT_MEMBER(tp_clear, nullptr),
+ INIT_MEMBER(tp_richcompare, nullptr),
+ INIT_MEMBER(tp_weaklistoffset, 0),
+ INIT_MEMBER(tp_iter, PyObject_SelfIter),
+ INIT_MEMBER(tp_iternext, TPyStream::Next),
+ INIT_MEMBER(tp_methods, nullptr),
+ INIT_MEMBER(tp_members, nullptr),
+ INIT_MEMBER(tp_getset, nullptr),
+ INIT_MEMBER(tp_base, nullptr),
+ INIT_MEMBER(tp_dict, nullptr),
+ INIT_MEMBER(tp_descr_get, nullptr),
+ INIT_MEMBER(tp_descr_set, nullptr),
+ INIT_MEMBER(tp_dictoffset, 0),
+ INIT_MEMBER(tp_init, nullptr),
+ INIT_MEMBER(tp_alloc, nullptr),
+ INIT_MEMBER(tp_new, nullptr),
+ INIT_MEMBER(tp_free, nullptr),
+ INIT_MEMBER(tp_is_gc, nullptr),
+ INIT_MEMBER(tp_bases, nullptr),
+ INIT_MEMBER(tp_mro, nullptr),
+ INIT_MEMBER(tp_cache, nullptr),
+ INIT_MEMBER(tp_subclasses, nullptr),
+ INIT_MEMBER(tp_weaklist, nullptr),
+ INIT_MEMBER(tp_del, nullptr),
+ INIT_MEMBER(tp_version_tag, 0),
#if PY_MAJOR_VERSION >= 3
- INIT_MEMBER(tp_finalize , nullptr),
+ INIT_MEMBER(tp_finalize, nullptr),
#endif
#if PY_VERSION_HEX >= 0x030800b1
- INIT_MEMBER(tp_vectorcall , nullptr),
+ INIT_MEMBER(tp_vectorcall, nullptr),
#endif
#if PY_VERSION_HEX >= 0x030800b4 && PY_VERSION_HEX < 0x03090000
- INIT_MEMBER(tp_print , nullptr),
+ INIT_MEMBER(tp_print, nullptr),
#endif
};
PyObject* TPyStream::New(
- const TPyCastContext::TPtr& castCtx,
- const NUdf::TType* type,
- NUdf::IBoxedValuePtr value)
+ const TPyCastContext::TPtr& castCtx,
+ const NUdf::TType* type,
+ NUdf::IBoxedValuePtr value)
{
TPyStream* stream = new TPyStream;
PyObject_INIT(stream, &PyStreamType);
@@ -143,18 +143,19 @@ PyObject* TPyStream::Next(PyObject* self) {
auto status = NUdf::TBoxedValueAccessor::Fetch(*stream->Value.Get(), item);
switch (status) {
- case NUdf::EFetchStatus::Ok:
- return ToPyObject(stream->CastCtx, stream->ItemType, item)
+ case NUdf::EFetchStatus::Ok:
+ return ToPyObject(stream->CastCtx, stream->ItemType, item)
.Release();
- case NUdf::EFetchStatus::Finish:
- return nullptr;
- case NUdf::EFetchStatus::Yield:
- PyErr_SetNone(PyYieldIterationException);
- return nullptr;
- default:
- Y_ABORT("Unknown stream status");
+ case NUdf::EFetchStatus::Finish:
+ return nullptr;
+ case NUdf::EFetchStatus::Yield:
+ PyErr_SetNone(PyYieldIterationException);
+ return nullptr;
+ default:
+ Y_ABORT("Unknown stream status");
}
- } PY_CATCH(nullptr)
+ }
+ PY_CATCH(nullptr)
}
//////////////////////////////////////////////////////////////////////////////
@@ -163,13 +164,13 @@ PyObject* TPyStream::Next(PyObject* self) {
class TStreamOverPyIter final: public NUdf::TBoxedValue {
public:
TStreamOverPyIter(
- TPyCastContext::TPtr castCtx,
- const NUdf::TType* itemType,
- TPyObjectPtr pyIter,
- TPyObjectPtr pyIterable,
- TPyObjectPtr pyGeneratorCallable,
- TPyObjectPtr pyGeneratorCallableClosure,
- TPyObjectPtr pyGeneratorCallableArgs)
+ TPyCastContext::TPtr castCtx,
+ const NUdf::TType* itemType,
+ TPyObjectPtr pyIter,
+ TPyObjectPtr pyIterable,
+ TPyObjectPtr pyGeneratorCallable,
+ TPyObjectPtr pyGeneratorCallableClosure,
+ TPyObjectPtr pyGeneratorCallableArgs)
: CastCtx_(std::move(castCtx))
, ItemType_(itemType)
, PyIter_(std::move(pyIter))
@@ -215,7 +216,9 @@ private:
PyIter_.Reset();
TPyObjectPtr result(PyObject_CallObject(PyGeneratorCallable_.Get(), PyGeneratorCallableArgs_.Get()));
if (!result) {
- UdfTerminate((TStringBuilder() << CastCtx_->PyCtx->Pos << "Failed to execute:\n" << GetLastErrorAsString()).c_str());
+ UdfTerminate((TStringBuilder() << CastCtx_->PyCtx->Pos << "Failed to execute:\n"
+ << GetLastErrorAsString())
+ .c_str());
}
if (PyGen_Check(result.Get())) {
@@ -244,8 +247,7 @@ private:
}
return NUdf::EFetchStatus::Finish;
- }
- catch (const yexception& e) {
+ } catch (const yexception& e) {
UdfTerminate((TStringBuilder() << CastCtx_->PyCtx->Pos << e.what()).c_str());
}
}
@@ -260,14 +262,13 @@ private:
TPyObjectPtr PyGeneratorCallableArgs_;
};
-
//////////////////////////////////////////////////////////////////////////////
// public functions
//////////////////////////////////////////////////////////////////////////////
TPyObjectPtr ToPyStream(
- const TPyCastContext::TPtr& castCtx,
- const NKikimr::NUdf::TType* type,
- const NKikimr::NUdf::TUnboxedValuePod& value)
+ const TPyCastContext::TPtr& castCtx,
+ const NKikimr::NUdf::TType* type,
+ const NKikimr::NUdf::TUnboxedValuePod& value)
{
return TPyStream::New(castCtx, type, value.AsBoxed());
}
@@ -278,8 +279,7 @@ NKikimr::NUdf::TUnboxedValue FromPyStream(
const TPyObjectPtr& value,
const TPyObjectPtr& originalCallable,
const TPyObjectPtr& originalCallableClosure,
- const TPyObjectPtr& originalCallableArgs
-)
+ const TPyObjectPtr& originalCallableArgs)
{
const NUdf::TStreamTypeInspector inspector(*castCtx->PyCtx->TypeInfoHelper, type);
const NUdf::TType* itemType = inspector.GetItemType();
@@ -290,7 +290,7 @@ NKikimr::NUdf::TUnboxedValue FromPyStream(
UdfTerminate((TStringBuilder() << castCtx->PyCtx->Pos << GetLastErrorAsString()).c_str());
}
return NUdf::TUnboxedValuePod(new TStreamOverPyIter(castCtx, itemType, std::move(iter), nullptr,
- originalCallable, originalCallableClosure, originalCallableArgs));
+ originalCallable, originalCallableClosure, originalCallableArgs));
}
if (PyIter_Check(value.Get())
@@ -301,7 +301,7 @@ NKikimr::NUdf::TUnboxedValue FromPyStream(
) {
TPyObjectPtr iter(value.Get(), TPyObjectPtr::ADD_REF);
return NUdf::TUnboxedValuePod(new TStreamOverPyIter(castCtx, itemType, std::move(iter), nullptr,
- originalCallable, originalCallableClosure, originalCallableArgs));
+ originalCallable, originalCallableClosure, originalCallableArgs));
}
// assume that this function will returns generator
@@ -324,7 +324,7 @@ NKikimr::NUdf::TUnboxedValue FromPyStream(
}
return NUdf::TUnboxedValuePod(new TStreamOverPyIter(castCtx, itemType, std::move(iter), nullptr,
- originalCallable ? value : nullptr, originalCallable ? callableClosure : nullptr, nullptr));
+ originalCallable ? value : nullptr, originalCallable ? callableClosure : nullptr, nullptr));
}
// must be after checking for callable
@@ -337,7 +337,9 @@ NKikimr::NUdf::TUnboxedValue FromPyStream(
}
UdfTerminate((TStringBuilder() << castCtx->PyCtx->Pos << "Expected iterator, generator, generator factory, "
- "or iterable object, but got " << PyObjectRepr(value.Get())).c_str());
+ "or iterable object, but got "
+ << PyObjectRepr(value.Get()))
+ .c_str());
}
} // namespace NPython