aboutsummaryrefslogtreecommitdiffstats
path: root/library/python/runtime_py3/test/subinterpreter/stdout_interceptor.cpp
blob: 3cd4b69d01215993b2d3e61a841c75fbfce8f185 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
#include "stdout_interceptor.h"

#include <util/stream/output.h>

namespace {

struct TOStreamWrapper {
    PyObject_HEAD
    IOutputStream* Stm = nullptr;
};

PyObject* Write(TOStreamWrapper *self, PyObject *const *args, Py_ssize_t nargs) noexcept {
    try {
        Py_buffer view;
        for (Py_ssize_t i = 0; i < nargs; ++i) {
            PyObject* buf = args[i];
            if (PyUnicode_Check(args[i])) {
                buf = PyUnicode_AsUTF8String(buf);
                if (!buf) {
                    return nullptr;
                }
            }

            if (PyObject_GetBuffer(buf, &view, PyBUF_SIMPLE | PyBUF_C_CONTIGUOUS) == -1) {
                return nullptr;
            }
            self->Stm->Write(reinterpret_cast<const char*>(view.buf), view.len);
            PyBuffer_Release(&view);
        }

        return Py_None;
    } catch(const std::exception& err) {
        PyErr_SetString(PyExc_IOError, err.what());
    } catch (...) {
        PyErr_SetString(PyExc_RuntimeError, "Unhandled C++ exception of unknown type");
    }
    return nullptr;
}

PyMethodDef TOStreamWrapperMethods[] = {
    {"write", reinterpret_cast<PyCFunction>(Write), METH_FASTCALL, PyDoc_STR("write buffer to wrapped C++ stream")},
    {}
};

PyTypeObject TOStreamWrapperType {
    .ob_base = PyVarObject_HEAD_INIT(NULL, 0)
    .tp_name = "testwrap.OStream",
    .tp_basicsize = sizeof(TOStreamWrapper),
    .tp_itemsize = 0,
    .tp_flags = Py_TPFLAGS_DEFAULT,
    .tp_doc = PyDoc_STR("C++ IOStream wrapper"),
    .tp_methods = TOStreamWrapperMethods,
    .tp_new = PyType_GenericNew,
};

}

TPyStdoutInterceptor::TPyStdoutInterceptor(IOutputStream& redirectionStream) noexcept
    : RealStdout_{PySys_GetObject("stdout")}
{
    Py_INCREF(RealStdout_);

    PyObject* redirect = TOStreamWrapperType.tp_alloc(&TOStreamWrapperType, 0);
    reinterpret_cast<TOStreamWrapper*>(redirect)->Stm = &redirectionStream;

    PySys_SetObject("stdout", redirect);
    Py_DECREF(redirect);
}

TPyStdoutInterceptor::~TPyStdoutInterceptor() noexcept {
    PySys_SetObject("stdout", RealStdout_);
    Py_DECREF(RealStdout_);
}

bool TPyStdoutInterceptor::SetupInterceptionSupport() noexcept {
    return PyType_Ready(&TOStreamWrapperType) == 0;
}