diff options
author | shmel1k <shmel1k@ydb.tech> | 2023-11-26 18:16:14 +0300 |
---|---|---|
committer | shmel1k <shmel1k@ydb.tech> | 2023-11-26 18:43:30 +0300 |
commit | b8cf9e88f4c5c64d9406af533d8948deb050d695 (patch) | |
tree | 218eb61fb3c3b96ec08b4d8cdfef383104a87d63 /contrib/python/Twisted/py3/twisted/internet/_win32serialport.py | |
parent | 523f645a83a0ec97a0332dbc3863bb354c92a328 (diff) | |
download | ydb-b8cf9e88f4c5c64d9406af533d8948deb050d695.tar.gz |
add kikimr_configure
Diffstat (limited to 'contrib/python/Twisted/py3/twisted/internet/_win32serialport.py')
-rw-r--r-- | contrib/python/Twisted/py3/twisted/internet/_win32serialport.py | 156 |
1 files changed, 156 insertions, 0 deletions
diff --git a/contrib/python/Twisted/py3/twisted/internet/_win32serialport.py b/contrib/python/Twisted/py3/twisted/internet/_win32serialport.py new file mode 100644 index 0000000000..2dda4b9816 --- /dev/null +++ b/contrib/python/Twisted/py3/twisted/internet/_win32serialport.py @@ -0,0 +1,156 @@ +# -*- test-case-name: twisted.internet.test.test_win32serialport -*- + +# Copyright (c) Twisted Matrix Laboratories. +# See LICENSE for details. + +""" +Serial port support for Windows. + +Requires PySerial and pywin32. +""" + + +import win32event # type: ignore[import] +import win32file # type: ignore[import] + +# system imports +from serial import PARITY_NONE # type: ignore[import] +from serial import EIGHTBITS, STOPBITS_ONE +from serial.serialutil import to_bytes # type: ignore[import] + +# twisted imports +from twisted.internet import abstract + +# sibling imports +from twisted.internet.serialport import BaseSerialPort + + +class SerialPort(BaseSerialPort, abstract.FileDescriptor): + """A serial device, acting as a transport, that uses a win32 event.""" + + connected = 1 + + def __init__( + self, + protocol, + deviceNameOrPortNumber, + reactor, + baudrate=9600, + bytesize=EIGHTBITS, + parity=PARITY_NONE, + stopbits=STOPBITS_ONE, + xonxoff=0, + rtscts=0, + ): + self._serial = self._serialFactory( + deviceNameOrPortNumber, + baudrate=baudrate, + bytesize=bytesize, + parity=parity, + stopbits=stopbits, + timeout=None, + xonxoff=xonxoff, + rtscts=rtscts, + ) + self.flushInput() + self.flushOutput() + self.reactor = reactor + self.protocol = protocol + self.outQueue = [] + self.closed = 0 + self.closedNotifies = 0 + self.writeInProgress = 0 + + self.protocol = protocol + self._overlappedRead = win32file.OVERLAPPED() + self._overlappedRead.hEvent = win32event.CreateEvent(None, 1, 0, None) + self._overlappedWrite = win32file.OVERLAPPED() + self._overlappedWrite.hEvent = win32event.CreateEvent(None, 0, 0, None) + + self.reactor.addEvent(self._overlappedRead.hEvent, self, "serialReadEvent") + self.reactor.addEvent(self._overlappedWrite.hEvent, self, "serialWriteEvent") + + self.protocol.makeConnection(self) + self._finishPortSetup() + + def _finishPortSetup(self): + """ + Finish setting up the serial port. + + This is a separate method to facilitate testing. + """ + flags, comstat = self._clearCommError() + rc, self.read_buf = win32file.ReadFile( + self._serial._port_handle, + win32file.AllocateReadBuffer(1), + self._overlappedRead, + ) + + def _clearCommError(self): + return win32file.ClearCommError(self._serial._port_handle) + + def serialReadEvent(self): + # get that character we set up + n = win32file.GetOverlappedResult( + self._serial._port_handle, self._overlappedRead, 0 + ) + first = to_bytes(self.read_buf[:n]) + # now we should get everything that is already in the buffer + flags, comstat = self._clearCommError() + if comstat.cbInQue: + win32event.ResetEvent(self._overlappedRead.hEvent) + rc, buf = win32file.ReadFile( + self._serial._port_handle, + win32file.AllocateReadBuffer(comstat.cbInQue), + self._overlappedRead, + ) + n = win32file.GetOverlappedResult( + self._serial._port_handle, self._overlappedRead, 1 + ) + # handle all the received data: + self.protocol.dataReceived(first + to_bytes(buf[:n])) + else: + # handle all the received data: + self.protocol.dataReceived(first) + + # set up next one + win32event.ResetEvent(self._overlappedRead.hEvent) + rc, self.read_buf = win32file.ReadFile( + self._serial._port_handle, + win32file.AllocateReadBuffer(1), + self._overlappedRead, + ) + + def write(self, data): + if data: + if self.writeInProgress: + self.outQueue.append(data) + else: + self.writeInProgress = 1 + win32file.WriteFile( + self._serial._port_handle, data, self._overlappedWrite + ) + + def serialWriteEvent(self): + try: + dataToWrite = self.outQueue.pop(0) + except IndexError: + self.writeInProgress = 0 + return + else: + win32file.WriteFile( + self._serial._port_handle, dataToWrite, self._overlappedWrite + ) + + def connectionLost(self, reason): + """ + Called when the serial port disconnects. + + Will call C{connectionLost} on the protocol that is handling the + serial data. + """ + self.reactor.removeEvent(self._overlappedRead.hEvent) + self.reactor.removeEvent(self._overlappedWrite.hEvent) + abstract.FileDescriptor.connectionLost(self, reason) + self._serial.close() + self.protocol.connectionLost(reason) |