1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
|
#include "stdout_interceptor.h"
#include <util/stream/output.h>
namespace {
struct TOStreamWrapper {
PyObject_HEAD
IOutputStream* Stm = nullptr;
};
PyObject* Write(TOStreamWrapper *self, PyObject *const *args, Py_ssize_t nargs) noexcept {
try {
Py_buffer view;
for (Py_ssize_t i = 0; i < nargs; ++i) {
PyObject* buf = args[i];
if (PyUnicode_Check(args[i])) {
buf = PyUnicode_AsUTF8String(buf);
if (!buf) {
return nullptr;
}
}
if (PyObject_GetBuffer(buf, &view, PyBUF_SIMPLE | PyBUF_C_CONTIGUOUS) == -1) {
return nullptr;
}
self->Stm->Write(reinterpret_cast<const char*>(view.buf), view.len);
PyBuffer_Release(&view);
}
return Py_None;
} catch(const std::exception& err) {
PyErr_SetString(PyExc_IOError, err.what());
} catch (...) {
PyErr_SetString(PyExc_RuntimeError, "Unhandled C++ exception of unknown type");
}
return nullptr;
}
PyMethodDef TOStreamWrapperMethods[] = {
{"write", reinterpret_cast<PyCFunction>(Write), METH_FASTCALL, PyDoc_STR("write buffer to wrapped C++ stream")},
{}
};
PyTypeObject TOStreamWrapperType {
.ob_base = PyVarObject_HEAD_INIT(NULL, 0)
.tp_name = "testwrap.OStream",
.tp_basicsize = sizeof(TOStreamWrapper),
.tp_itemsize = 0,
.tp_flags = Py_TPFLAGS_DEFAULT,
.tp_doc = PyDoc_STR("C++ IOStream wrapper"),
.tp_methods = TOStreamWrapperMethods,
.tp_new = PyType_GenericNew,
};
}
TPyStdoutInterceptor::TPyStdoutInterceptor(IOutputStream& redirectionStream) noexcept
: RealStdout_{PySys_GetObject("stdout")}
{
Py_INCREF(RealStdout_);
PyObject* redirect = TOStreamWrapperType.tp_alloc(&TOStreamWrapperType, 0);
reinterpret_cast<TOStreamWrapper*>(redirect)->Stm = &redirectionStream;
PySys_SetObject("stdout", redirect);
Py_DECREF(redirect);
}
TPyStdoutInterceptor::~TPyStdoutInterceptor() noexcept {
PySys_SetObject("stdout", RealStdout_);
Py_DECREF(RealStdout_);
}
bool TPyStdoutInterceptor::SetupInterceptionSupport() noexcept {
return PyType_Ready(&TOStreamWrapperType) == 0;
}
|