aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/python/Twisted/py3/twisted/internet/_win32serialport.py
diff options
context:
space:
mode:
authorshmel1k <shmel1k@ydb.tech>2023-11-26 18:16:14 +0300
committershmel1k <shmel1k@ydb.tech>2023-11-26 18:43:30 +0300
commitb8cf9e88f4c5c64d9406af533d8948deb050d695 (patch)
tree218eb61fb3c3b96ec08b4d8cdfef383104a87d63 /contrib/python/Twisted/py3/twisted/internet/_win32serialport.py
parent523f645a83a0ec97a0332dbc3863bb354c92a328 (diff)
downloadydb-b8cf9e88f4c5c64d9406af533d8948deb050d695.tar.gz
add kikimr_configure
Diffstat (limited to 'contrib/python/Twisted/py3/twisted/internet/_win32serialport.py')
-rw-r--r--contrib/python/Twisted/py3/twisted/internet/_win32serialport.py156
1 files changed, 156 insertions, 0 deletions
diff --git a/contrib/python/Twisted/py3/twisted/internet/_win32serialport.py b/contrib/python/Twisted/py3/twisted/internet/_win32serialport.py
new file mode 100644
index 0000000000..2dda4b9816
--- /dev/null
+++ b/contrib/python/Twisted/py3/twisted/internet/_win32serialport.py
@@ -0,0 +1,156 @@
+# -*- test-case-name: twisted.internet.test.test_win32serialport -*-
+
+# Copyright (c) Twisted Matrix Laboratories.
+# See LICENSE for details.
+
+"""
+Serial port support for Windows.
+
+Requires PySerial and pywin32.
+"""
+
+
+import win32event # type: ignore[import]
+import win32file # type: ignore[import]
+
+# system imports
+from serial import PARITY_NONE # type: ignore[import]
+from serial import EIGHTBITS, STOPBITS_ONE
+from serial.serialutil import to_bytes # type: ignore[import]
+
+# twisted imports
+from twisted.internet import abstract
+
+# sibling imports
+from twisted.internet.serialport import BaseSerialPort
+
+
+class SerialPort(BaseSerialPort, abstract.FileDescriptor):
+ """A serial device, acting as a transport, that uses a win32 event."""
+
+ connected = 1
+
+ def __init__(
+ self,
+ protocol,
+ deviceNameOrPortNumber,
+ reactor,
+ baudrate=9600,
+ bytesize=EIGHTBITS,
+ parity=PARITY_NONE,
+ stopbits=STOPBITS_ONE,
+ xonxoff=0,
+ rtscts=0,
+ ):
+ self._serial = self._serialFactory(
+ deviceNameOrPortNumber,
+ baudrate=baudrate,
+ bytesize=bytesize,
+ parity=parity,
+ stopbits=stopbits,
+ timeout=None,
+ xonxoff=xonxoff,
+ rtscts=rtscts,
+ )
+ self.flushInput()
+ self.flushOutput()
+ self.reactor = reactor
+ self.protocol = protocol
+ self.outQueue = []
+ self.closed = 0
+ self.closedNotifies = 0
+ self.writeInProgress = 0
+
+ self.protocol = protocol
+ self._overlappedRead = win32file.OVERLAPPED()
+ self._overlappedRead.hEvent = win32event.CreateEvent(None, 1, 0, None)
+ self._overlappedWrite = win32file.OVERLAPPED()
+ self._overlappedWrite.hEvent = win32event.CreateEvent(None, 0, 0, None)
+
+ self.reactor.addEvent(self._overlappedRead.hEvent, self, "serialReadEvent")
+ self.reactor.addEvent(self._overlappedWrite.hEvent, self, "serialWriteEvent")
+
+ self.protocol.makeConnection(self)
+ self._finishPortSetup()
+
+ def _finishPortSetup(self):
+ """
+ Finish setting up the serial port.
+
+ This is a separate method to facilitate testing.
+ """
+ flags, comstat = self._clearCommError()
+ rc, self.read_buf = win32file.ReadFile(
+ self._serial._port_handle,
+ win32file.AllocateReadBuffer(1),
+ self._overlappedRead,
+ )
+
+ def _clearCommError(self):
+ return win32file.ClearCommError(self._serial._port_handle)
+
+ def serialReadEvent(self):
+ # get that character we set up
+ n = win32file.GetOverlappedResult(
+ self._serial._port_handle, self._overlappedRead, 0
+ )
+ first = to_bytes(self.read_buf[:n])
+ # now we should get everything that is already in the buffer
+ flags, comstat = self._clearCommError()
+ if comstat.cbInQue:
+ win32event.ResetEvent(self._overlappedRead.hEvent)
+ rc, buf = win32file.ReadFile(
+ self._serial._port_handle,
+ win32file.AllocateReadBuffer(comstat.cbInQue),
+ self._overlappedRead,
+ )
+ n = win32file.GetOverlappedResult(
+ self._serial._port_handle, self._overlappedRead, 1
+ )
+ # handle all the received data:
+ self.protocol.dataReceived(first + to_bytes(buf[:n]))
+ else:
+ # handle all the received data:
+ self.protocol.dataReceived(first)
+
+ # set up next one
+ win32event.ResetEvent(self._overlappedRead.hEvent)
+ rc, self.read_buf = win32file.ReadFile(
+ self._serial._port_handle,
+ win32file.AllocateReadBuffer(1),
+ self._overlappedRead,
+ )
+
+ def write(self, data):
+ if data:
+ if self.writeInProgress:
+ self.outQueue.append(data)
+ else:
+ self.writeInProgress = 1
+ win32file.WriteFile(
+ self._serial._port_handle, data, self._overlappedWrite
+ )
+
+ def serialWriteEvent(self):
+ try:
+ dataToWrite = self.outQueue.pop(0)
+ except IndexError:
+ self.writeInProgress = 0
+ return
+ else:
+ win32file.WriteFile(
+ self._serial._port_handle, dataToWrite, self._overlappedWrite
+ )
+
+ def connectionLost(self, reason):
+ """
+ Called when the serial port disconnects.
+
+ Will call C{connectionLost} on the protocol that is handling the
+ serial data.
+ """
+ self.reactor.removeEvent(self._overlappedRead.hEvent)
+ self.reactor.removeEvent(self._overlappedWrite.hEvent)
+ abstract.FileDescriptor.connectionLost(self, reason)
+ self._serial.close()
+ self.protocol.connectionLost(reason)