diff options
author | vvvv <[email protected]> | 2025-10-06 13:26:25 +0300 |
---|---|---|
committer | vvvv <[email protected]> | 2025-10-06 14:06:25 +0300 |
commit | eca8ce9cb1613d5c983185c4e43c20651a9638aa (patch) | |
tree | 61ee5ae779948e61af9a7691d19eaa2c09869121 /yql/essentials/udfs/common/python/bindings/py_stream.cpp | |
parent | 4adf7eecae16a9b228b28cc5f64c27ef69ad5ec2 (diff) |
YQL-20086 udfs
init
commit_hash:f9684778bf1ea956965f2360b80b91edb7d4ffbe
Diffstat (limited to 'yql/essentials/udfs/common/python/bindings/py_stream.cpp')
-rw-r--r-- | yql/essentials/udfs/common/python/bindings/py_stream.cpp | 176 |
1 files changed, 89 insertions, 87 deletions
diff --git a/yql/essentials/udfs/common/python/bindings/py_stream.cpp b/yql/essentials/udfs/common/python/bindings/py_stream.cpp index 24f7e0eb45d..130fc67b83e 100644 --- a/yql/essentials/udfs/common/python/bindings/py_stream.cpp +++ b/yql/essentials/udfs/common/python/bindings/py_stream.cpp @@ -41,88 +41,88 @@ struct TPyStream { } static PyObject* New( - const TPyCastContext::TPtr& castCtx, - const NUdf::TType* type, - NUdf::IBoxedValuePtr value); + const TPyCastContext::TPtr& castCtx, + const NUdf::TType* type, + NUdf::IBoxedValuePtr value); static PyObject* Next(PyObject* self); }; #if PY_MAJOR_VERSION >= 3 -#define Py_TPFLAGS_HAVE_ITER 0 // NOLINT(readability-identifier-naming) + #define Py_TPFLAGS_HAVE_ITER 0 // NOLINT(readability-identifier-naming) #endif PyTypeObject PyStreamType = { PyVarObject_HEAD_INIT(&PyType_Type, 0) - INIT_MEMBER(tp_name , "yql.TStream"), - INIT_MEMBER(tp_basicsize , sizeof(TPyStream)), - INIT_MEMBER(tp_itemsize , 0), - INIT_MEMBER(tp_dealloc , TPyStream::Dealloc), + INIT_MEMBER(tp_name, "yql.TStream"), + INIT_MEMBER(tp_basicsize, sizeof(TPyStream)), + INIT_MEMBER(tp_itemsize, 0), + INIT_MEMBER(tp_dealloc, TPyStream::Dealloc), #if PY_VERSION_HEX < 0x030800b4 - INIT_MEMBER(tp_print , nullptr), + INIT_MEMBER(tp_print, nullptr), #else INIT_MEMBER(tp_vectorcall_offset, 0), #endif - INIT_MEMBER(tp_getattr , nullptr), - INIT_MEMBER(tp_setattr , nullptr), + INIT_MEMBER(tp_getattr, nullptr), + INIT_MEMBER(tp_setattr, nullptr), #if PY_MAJOR_VERSION >= 3 - INIT_MEMBER(tp_as_async , nullptr), + INIT_MEMBER(tp_as_async, nullptr), #else - INIT_MEMBER(tp_compare , nullptr), + INIT_MEMBER(tp_compare, nullptr), #endif - INIT_MEMBER(tp_repr , TPyStream::Repr), - INIT_MEMBER(tp_as_number , nullptr), - INIT_MEMBER(tp_as_sequence , nullptr), - INIT_MEMBER(tp_as_mapping , nullptr), - INIT_MEMBER(tp_hash , nullptr), - INIT_MEMBER(tp_call , nullptr), - INIT_MEMBER(tp_str , nullptr), - INIT_MEMBER(tp_getattro , nullptr), - INIT_MEMBER(tp_setattro , nullptr), - INIT_MEMBER(tp_as_buffer , nullptr), - INIT_MEMBER(tp_flags , Py_TPFLAGS_HAVE_ITER), - INIT_MEMBER(tp_doc , "yql.TStream object"), - INIT_MEMBER(tp_traverse , nullptr), - INIT_MEMBER(tp_clear , nullptr), - INIT_MEMBER(tp_richcompare , nullptr), - INIT_MEMBER(tp_weaklistoffset , 0), - INIT_MEMBER(tp_iter , PyObject_SelfIter), - INIT_MEMBER(tp_iternext , TPyStream::Next), - INIT_MEMBER(tp_methods , nullptr), - INIT_MEMBER(tp_members , nullptr), - INIT_MEMBER(tp_getset , nullptr), - INIT_MEMBER(tp_base , nullptr), - INIT_MEMBER(tp_dict , nullptr), - INIT_MEMBER(tp_descr_get , nullptr), - INIT_MEMBER(tp_descr_set , nullptr), - INIT_MEMBER(tp_dictoffset , 0), - INIT_MEMBER(tp_init , nullptr), - INIT_MEMBER(tp_alloc , nullptr), - INIT_MEMBER(tp_new , nullptr), - INIT_MEMBER(tp_free , nullptr), - INIT_MEMBER(tp_is_gc , nullptr), - INIT_MEMBER(tp_bases , nullptr), - INIT_MEMBER(tp_mro , nullptr), - INIT_MEMBER(tp_cache , nullptr), - INIT_MEMBER(tp_subclasses , nullptr), - INIT_MEMBER(tp_weaklist , nullptr), - INIT_MEMBER(tp_del , nullptr), - INIT_MEMBER(tp_version_tag , 0), + INIT_MEMBER(tp_repr, TPyStream::Repr), + INIT_MEMBER(tp_as_number, nullptr), + INIT_MEMBER(tp_as_sequence, nullptr), + INIT_MEMBER(tp_as_mapping, nullptr), + INIT_MEMBER(tp_hash, nullptr), + INIT_MEMBER(tp_call, nullptr), + INIT_MEMBER(tp_str, nullptr), + INIT_MEMBER(tp_getattro, nullptr), + INIT_MEMBER(tp_setattro, nullptr), + INIT_MEMBER(tp_as_buffer, nullptr), + INIT_MEMBER(tp_flags, Py_TPFLAGS_HAVE_ITER), + INIT_MEMBER(tp_doc, "yql.TStream object"), + INIT_MEMBER(tp_traverse, nullptr), + INIT_MEMBER(tp_clear, nullptr), + INIT_MEMBER(tp_richcompare, nullptr), + INIT_MEMBER(tp_weaklistoffset, 0), + INIT_MEMBER(tp_iter, PyObject_SelfIter), + INIT_MEMBER(tp_iternext, TPyStream::Next), + INIT_MEMBER(tp_methods, nullptr), + INIT_MEMBER(tp_members, nullptr), + INIT_MEMBER(tp_getset, nullptr), + INIT_MEMBER(tp_base, nullptr), + INIT_MEMBER(tp_dict, nullptr), + INIT_MEMBER(tp_descr_get, nullptr), + INIT_MEMBER(tp_descr_set, nullptr), + INIT_MEMBER(tp_dictoffset, 0), + INIT_MEMBER(tp_init, nullptr), + INIT_MEMBER(tp_alloc, nullptr), + INIT_MEMBER(tp_new, nullptr), + INIT_MEMBER(tp_free, nullptr), + INIT_MEMBER(tp_is_gc, nullptr), + INIT_MEMBER(tp_bases, nullptr), + INIT_MEMBER(tp_mro, nullptr), + INIT_MEMBER(tp_cache, nullptr), + INIT_MEMBER(tp_subclasses, nullptr), + INIT_MEMBER(tp_weaklist, nullptr), + INIT_MEMBER(tp_del, nullptr), + INIT_MEMBER(tp_version_tag, 0), #if PY_MAJOR_VERSION >= 3 - INIT_MEMBER(tp_finalize , nullptr), + INIT_MEMBER(tp_finalize, nullptr), #endif #if PY_VERSION_HEX >= 0x030800b1 - INIT_MEMBER(tp_vectorcall , nullptr), + INIT_MEMBER(tp_vectorcall, nullptr), #endif #if PY_VERSION_HEX >= 0x030800b4 && PY_VERSION_HEX < 0x03090000 - INIT_MEMBER(tp_print , nullptr), + INIT_MEMBER(tp_print, nullptr), #endif }; PyObject* TPyStream::New( - const TPyCastContext::TPtr& castCtx, - const NUdf::TType* type, - NUdf::IBoxedValuePtr value) + const TPyCastContext::TPtr& castCtx, + const NUdf::TType* type, + NUdf::IBoxedValuePtr value) { TPyStream* stream = new TPyStream; PyObject_INIT(stream, &PyStreamType); @@ -143,18 +143,19 @@ PyObject* TPyStream::Next(PyObject* self) { auto status = NUdf::TBoxedValueAccessor::Fetch(*stream->Value.Get(), item); switch (status) { - case NUdf::EFetchStatus::Ok: - return ToPyObject(stream->CastCtx, stream->ItemType, item) + case NUdf::EFetchStatus::Ok: + return ToPyObject(stream->CastCtx, stream->ItemType, item) .Release(); - case NUdf::EFetchStatus::Finish: - return nullptr; - case NUdf::EFetchStatus::Yield: - PyErr_SetNone(PyYieldIterationException); - return nullptr; - default: - Y_ABORT("Unknown stream status"); + case NUdf::EFetchStatus::Finish: + return nullptr; + case NUdf::EFetchStatus::Yield: + PyErr_SetNone(PyYieldIterationException); + return nullptr; + default: + Y_ABORT("Unknown stream status"); } - } PY_CATCH(nullptr) + } + PY_CATCH(nullptr) } ////////////////////////////////////////////////////////////////////////////// @@ -163,13 +164,13 @@ PyObject* TPyStream::Next(PyObject* self) { class TStreamOverPyIter final: public NUdf::TBoxedValue { public: TStreamOverPyIter( - TPyCastContext::TPtr castCtx, - const NUdf::TType* itemType, - TPyObjectPtr pyIter, - TPyObjectPtr pyIterable, - TPyObjectPtr pyGeneratorCallable, - TPyObjectPtr pyGeneratorCallableClosure, - TPyObjectPtr pyGeneratorCallableArgs) + TPyCastContext::TPtr castCtx, + const NUdf::TType* itemType, + TPyObjectPtr pyIter, + TPyObjectPtr pyIterable, + TPyObjectPtr pyGeneratorCallable, + TPyObjectPtr pyGeneratorCallableClosure, + TPyObjectPtr pyGeneratorCallableArgs) : CastCtx_(std::move(castCtx)) , ItemType_(itemType) , PyIter_(std::move(pyIter)) @@ -215,7 +216,9 @@ private: PyIter_.Reset(); TPyObjectPtr result(PyObject_CallObject(PyGeneratorCallable_.Get(), PyGeneratorCallableArgs_.Get())); if (!result) { - UdfTerminate((TStringBuilder() << CastCtx_->PyCtx->Pos << "Failed to execute:\n" << GetLastErrorAsString()).c_str()); + UdfTerminate((TStringBuilder() << CastCtx_->PyCtx->Pos << "Failed to execute:\n" + << GetLastErrorAsString()) + .c_str()); } if (PyGen_Check(result.Get())) { @@ -244,8 +247,7 @@ private: } return NUdf::EFetchStatus::Finish; - } - catch (const yexception& e) { + } catch (const yexception& e) { UdfTerminate((TStringBuilder() << CastCtx_->PyCtx->Pos << e.what()).c_str()); } } @@ -260,14 +262,13 @@ private: TPyObjectPtr PyGeneratorCallableArgs_; }; - ////////////////////////////////////////////////////////////////////////////// // public functions ////////////////////////////////////////////////////////////////////////////// TPyObjectPtr ToPyStream( - const TPyCastContext::TPtr& castCtx, - const NKikimr::NUdf::TType* type, - const NKikimr::NUdf::TUnboxedValuePod& value) + const TPyCastContext::TPtr& castCtx, + const NKikimr::NUdf::TType* type, + const NKikimr::NUdf::TUnboxedValuePod& value) { return TPyStream::New(castCtx, type, value.AsBoxed()); } @@ -278,8 +279,7 @@ NKikimr::NUdf::TUnboxedValue FromPyStream( const TPyObjectPtr& value, const TPyObjectPtr& originalCallable, const TPyObjectPtr& originalCallableClosure, - const TPyObjectPtr& originalCallableArgs -) + const TPyObjectPtr& originalCallableArgs) { const NUdf::TStreamTypeInspector inspector(*castCtx->PyCtx->TypeInfoHelper, type); const NUdf::TType* itemType = inspector.GetItemType(); @@ -290,7 +290,7 @@ NKikimr::NUdf::TUnboxedValue FromPyStream( UdfTerminate((TStringBuilder() << castCtx->PyCtx->Pos << GetLastErrorAsString()).c_str()); } return NUdf::TUnboxedValuePod(new TStreamOverPyIter(castCtx, itemType, std::move(iter), nullptr, - originalCallable, originalCallableClosure, originalCallableArgs)); + originalCallable, originalCallableClosure, originalCallableArgs)); } if (PyIter_Check(value.Get()) @@ -301,7 +301,7 @@ NKikimr::NUdf::TUnboxedValue FromPyStream( ) { TPyObjectPtr iter(value.Get(), TPyObjectPtr::ADD_REF); return NUdf::TUnboxedValuePod(new TStreamOverPyIter(castCtx, itemType, std::move(iter), nullptr, - originalCallable, originalCallableClosure, originalCallableArgs)); + originalCallable, originalCallableClosure, originalCallableArgs)); } // assume that this function will returns generator @@ -324,7 +324,7 @@ NKikimr::NUdf::TUnboxedValue FromPyStream( } return NUdf::TUnboxedValuePod(new TStreamOverPyIter(castCtx, itemType, std::move(iter), nullptr, - originalCallable ? value : nullptr, originalCallable ? callableClosure : nullptr, nullptr)); + originalCallable ? value : nullptr, originalCallable ? callableClosure : nullptr, nullptr)); } // must be after checking for callable @@ -337,7 +337,9 @@ NKikimr::NUdf::TUnboxedValue FromPyStream( } UdfTerminate((TStringBuilder() << castCtx->PyCtx->Pos << "Expected iterator, generator, generator factory, " - "or iterable object, but got " << PyObjectRepr(value.Get())).c_str()); + "or iterable object, but got " + << PyObjectRepr(value.Get())) + .c_str()); } } // namespace NPython |