aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/python/websocket-client/py2/websocket/_abnf.py
diff options
context:
space:
mode:
authoralexv-smirnov <alex@ydb.tech>2023-12-01 12:02:50 +0300
committeralexv-smirnov <alex@ydb.tech>2023-12-01 13:28:10 +0300
commit0e578a4c44d4abd539d9838347b9ebafaca41dfb (patch)
treea0c1969c37f818c830ebeff9c077eacf30be6ef8 /contrib/python/websocket-client/py2/websocket/_abnf.py
parent84f2d3d4cc985e63217cff149bd2e6d67ae6fe22 (diff)
downloadydb-0e578a4c44d4abd539d9838347b9ebafaca41dfb.tar.gz
Change "ya.make"
Diffstat (limited to 'contrib/python/websocket-client/py2/websocket/_abnf.py')
-rw-r--r--contrib/python/websocket-client/py2/websocket/_abnf.py458
1 files changed, 458 insertions, 0 deletions
diff --git a/contrib/python/websocket-client/py2/websocket/_abnf.py b/contrib/python/websocket-client/py2/websocket/_abnf.py
new file mode 100644
index 0000000000..80fbe1f9b9
--- /dev/null
+++ b/contrib/python/websocket-client/py2/websocket/_abnf.py
@@ -0,0 +1,458 @@
+"""
+
+"""
+
+"""
+websocket - WebSocket client library for Python
+
+Copyright (C) 2010 Hiroki Ohtani(liris)
+
+ This library is free software; you can redistribute it and/or
+ modify it under the terms of the GNU Lesser General Public
+ License as published by the Free Software Foundation; either
+ version 2.1 of the License, or (at your option) any later version.
+
+ This library is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ Lesser General Public License for more details.
+
+ You should have received a copy of the GNU Lesser General Public
+ License along with this library; if not, write to the Free Software
+ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+
+"""
+import array
+import os
+import struct
+
+import six
+
+from ._exceptions import *
+from ._utils import validate_utf8
+from threading import Lock
+
+try:
+ if six.PY3:
+ import numpy
+ else:
+ numpy = None
+except ImportError:
+ numpy = None
+
+try:
+ # If wsaccel is available we use compiled routines to mask data.
+ if not numpy:
+ from wsaccel.xormask import XorMaskerSimple
+
+ def _mask(_m, _d):
+ return XorMaskerSimple(_m).process(_d)
+except ImportError:
+ # wsaccel is not available, we rely on python implementations.
+ def _mask(_m, _d):
+ for i in range(len(_d)):
+ _d[i] ^= _m[i % 4]
+
+ if six.PY3:
+ return _d.tobytes()
+ else:
+ return _d.tostring()
+
+
+__all__ = [
+ 'ABNF', 'continuous_frame', 'frame_buffer',
+ 'STATUS_NORMAL',
+ 'STATUS_GOING_AWAY',
+ 'STATUS_PROTOCOL_ERROR',
+ 'STATUS_UNSUPPORTED_DATA_TYPE',
+ 'STATUS_STATUS_NOT_AVAILABLE',
+ 'STATUS_ABNORMAL_CLOSED',
+ 'STATUS_INVALID_PAYLOAD',
+ 'STATUS_POLICY_VIOLATION',
+ 'STATUS_MESSAGE_TOO_BIG',
+ 'STATUS_INVALID_EXTENSION',
+ 'STATUS_UNEXPECTED_CONDITION',
+ 'STATUS_BAD_GATEWAY',
+ 'STATUS_TLS_HANDSHAKE_ERROR',
+]
+
+# closing frame status codes.
+STATUS_NORMAL = 1000
+STATUS_GOING_AWAY = 1001
+STATUS_PROTOCOL_ERROR = 1002
+STATUS_UNSUPPORTED_DATA_TYPE = 1003
+STATUS_STATUS_NOT_AVAILABLE = 1005
+STATUS_ABNORMAL_CLOSED = 1006
+STATUS_INVALID_PAYLOAD = 1007
+STATUS_POLICY_VIOLATION = 1008
+STATUS_MESSAGE_TOO_BIG = 1009
+STATUS_INVALID_EXTENSION = 1010
+STATUS_UNEXPECTED_CONDITION = 1011
+STATUS_BAD_GATEWAY = 1014
+STATUS_TLS_HANDSHAKE_ERROR = 1015
+
+VALID_CLOSE_STATUS = (
+ STATUS_NORMAL,
+ STATUS_GOING_AWAY,
+ STATUS_PROTOCOL_ERROR,
+ STATUS_UNSUPPORTED_DATA_TYPE,
+ STATUS_INVALID_PAYLOAD,
+ STATUS_POLICY_VIOLATION,
+ STATUS_MESSAGE_TOO_BIG,
+ STATUS_INVALID_EXTENSION,
+ STATUS_UNEXPECTED_CONDITION,
+ STATUS_BAD_GATEWAY,
+)
+
+
+class ABNF(object):
+ """
+ ABNF frame class.
+ See http://tools.ietf.org/html/rfc5234
+ and http://tools.ietf.org/html/rfc6455#section-5.2
+ """
+
+ # operation code values.
+ OPCODE_CONT = 0x0
+ OPCODE_TEXT = 0x1
+ OPCODE_BINARY = 0x2
+ OPCODE_CLOSE = 0x8
+ OPCODE_PING = 0x9
+ OPCODE_PONG = 0xa
+
+ # available operation code value tuple
+ OPCODES = (OPCODE_CONT, OPCODE_TEXT, OPCODE_BINARY, OPCODE_CLOSE,
+ OPCODE_PING, OPCODE_PONG)
+
+ # opcode human readable string
+ OPCODE_MAP = {
+ OPCODE_CONT: "cont",
+ OPCODE_TEXT: "text",
+ OPCODE_BINARY: "binary",
+ OPCODE_CLOSE: "close",
+ OPCODE_PING: "ping",
+ OPCODE_PONG: "pong"
+ }
+
+ # data length threshold.
+ LENGTH_7 = 0x7e
+ LENGTH_16 = 1 << 16
+ LENGTH_63 = 1 << 63
+
+ def __init__(self, fin=0, rsv1=0, rsv2=0, rsv3=0,
+ opcode=OPCODE_TEXT, mask=1, data=""):
+ """
+ Constructor for ABNF. Please check RFC for arguments.
+ """
+ self.fin = fin
+ self.rsv1 = rsv1
+ self.rsv2 = rsv2
+ self.rsv3 = rsv3
+ self.opcode = opcode
+ self.mask = mask
+ if data is None:
+ data = ""
+ self.data = data
+ self.get_mask_key = os.urandom
+
+ def validate(self, skip_utf8_validation=False):
+ """
+ Validate the ABNF frame.
+
+ Parameters
+ ----------
+ skip_utf8_validation: skip utf8 validation.
+ """
+ if self.rsv1 or self.rsv2 or self.rsv3:
+ raise WebSocketProtocolException("rsv is not implemented, yet")
+
+ if self.opcode not in ABNF.OPCODES:
+ raise WebSocketProtocolException("Invalid opcode %r", self.opcode)
+
+ if self.opcode == ABNF.OPCODE_PING and not self.fin:
+ raise WebSocketProtocolException("Invalid ping frame.")
+
+ if self.opcode == ABNF.OPCODE_CLOSE:
+ l = len(self.data)
+ if not l:
+ return
+ if l == 1 or l >= 126:
+ raise WebSocketProtocolException("Invalid close frame.")
+ if l > 2 and not skip_utf8_validation and not validate_utf8(self.data[2:]):
+ raise WebSocketProtocolException("Invalid close frame.")
+
+ code = 256 * \
+ six.byte2int(self.data[0:1]) + six.byte2int(self.data[1:2])
+ if not self._is_valid_close_status(code):
+ raise WebSocketProtocolException("Invalid close opcode.")
+
+ @staticmethod
+ def _is_valid_close_status(code):
+ return code in VALID_CLOSE_STATUS or (3000 <= code < 5000)
+
+ def __str__(self):
+ return "fin=" + str(self.fin) \
+ + " opcode=" + str(self.opcode) \
+ + " data=" + str(self.data)
+
+ @staticmethod
+ def create_frame(data, opcode, fin=1):
+ """
+ Create frame to send text, binary and other data.
+
+ Parameters
+ ----------
+ data: <type>
+ data to send. This is string value(byte array).
+ If opcode is OPCODE_TEXT and this value is unicode,
+ data value is converted into unicode string, automatically.
+ opcode: <type>
+ operation code. please see OPCODE_XXX.
+ fin: <type>
+ fin flag. if set to 0, create continue fragmentation.
+ """
+ if opcode == ABNF.OPCODE_TEXT and isinstance(data, six.text_type):
+ data = data.encode("utf-8")
+ # mask must be set if send data from client
+ return ABNF(fin, 0, 0, 0, opcode, 1, data)
+
+ def format(self):
+ """
+ Format this object to string(byte array) to send data to server.
+ """
+ if any(x not in (0, 1) for x in [self.fin, self.rsv1, self.rsv2, self.rsv3]):
+ raise ValueError("not 0 or 1")
+ if self.opcode not in ABNF.OPCODES:
+ raise ValueError("Invalid OPCODE")
+ length = len(self.data)
+ if length >= ABNF.LENGTH_63:
+ raise ValueError("data is too long")
+
+ frame_header = chr(self.fin << 7 |
+ self.rsv1 << 6 | self.rsv2 << 5 | self.rsv3 << 4 |
+ self.opcode)
+ if length < ABNF.LENGTH_7:
+ frame_header += chr(self.mask << 7 | length)
+ frame_header = six.b(frame_header)
+ elif length < ABNF.LENGTH_16:
+ frame_header += chr(self.mask << 7 | 0x7e)
+ frame_header = six.b(frame_header)
+ frame_header += struct.pack("!H", length)
+ else:
+ frame_header += chr(self.mask << 7 | 0x7f)
+ frame_header = six.b(frame_header)
+ frame_header += struct.pack("!Q", length)
+
+ if not self.mask:
+ return frame_header + self.data
+ else:
+ mask_key = self.get_mask_key(4)
+ return frame_header + self._get_masked(mask_key)
+
+ def _get_masked(self, mask_key):
+ s = ABNF.mask(mask_key, self.data)
+
+ if isinstance(mask_key, six.text_type):
+ mask_key = mask_key.encode('utf-8')
+
+ return mask_key + s
+
+ @staticmethod
+ def mask(mask_key, data):
+ """
+ Mask or unmask data. Just do xor for each byte
+
+ Parameters
+ ----------
+ mask_key: <type>
+ 4 byte string(byte).
+ data: <type>
+ data to mask/unmask.
+ """
+ if data is None:
+ data = ""
+
+ if isinstance(mask_key, six.text_type):
+ mask_key = six.b(mask_key)
+
+ if isinstance(data, six.text_type):
+ data = six.b(data)
+
+ if numpy:
+ origlen = len(data)
+ _mask_key = mask_key[3] << 24 | mask_key[2] << 16 | mask_key[1] << 8 | mask_key[0]
+
+ # We need data to be a multiple of four...
+ data += bytes(" " * (4 - (len(data) % 4)), "us-ascii")
+ a = numpy.frombuffer(data, dtype="uint32")
+ masked = numpy.bitwise_xor(a, [_mask_key]).astype("uint32")
+ if len(data) > origlen:
+ return masked.tobytes()[:origlen]
+ return masked.tobytes()
+ else:
+ _m = array.array("B", mask_key)
+ _d = array.array("B", data)
+ return _mask(_m, _d)
+
+
+class frame_buffer(object):
+ _HEADER_MASK_INDEX = 5
+ _HEADER_LENGTH_INDEX = 6
+
+ def __init__(self, recv_fn, skip_utf8_validation):
+ self.recv = recv_fn
+ self.skip_utf8_validation = skip_utf8_validation
+ # Buffers over the packets from the layer beneath until desired amount
+ # bytes of bytes are received.
+ self.recv_buffer = []
+ self.clear()
+ self.lock = Lock()
+
+ def clear(self):
+ self.header = None
+ self.length = None
+ self.mask = None
+
+ def has_received_header(self):
+ return self.header is None
+
+ def recv_header(self):
+ header = self.recv_strict(2)
+ b1 = header[0]
+
+ if six.PY2:
+ b1 = ord(b1)
+
+ fin = b1 >> 7 & 1
+ rsv1 = b1 >> 6 & 1
+ rsv2 = b1 >> 5 & 1
+ rsv3 = b1 >> 4 & 1
+ opcode = b1 & 0xf
+ b2 = header[1]
+
+ if six.PY2:
+ b2 = ord(b2)
+
+ has_mask = b2 >> 7 & 1
+ length_bits = b2 & 0x7f
+
+ self.header = (fin, rsv1, rsv2, rsv3, opcode, has_mask, length_bits)
+
+ def has_mask(self):
+ if not self.header:
+ return False
+ return self.header[frame_buffer._HEADER_MASK_INDEX]
+
+ def has_received_length(self):
+ return self.length is None
+
+ def recv_length(self):
+ bits = self.header[frame_buffer._HEADER_LENGTH_INDEX]
+ length_bits = bits & 0x7f
+ if length_bits == 0x7e:
+ v = self.recv_strict(2)
+ self.length = struct.unpack("!H", v)[0]
+ elif length_bits == 0x7f:
+ v = self.recv_strict(8)
+ self.length = struct.unpack("!Q", v)[0]
+ else:
+ self.length = length_bits
+
+ def has_received_mask(self):
+ return self.mask is None
+
+ def recv_mask(self):
+ self.mask = self.recv_strict(4) if self.has_mask() else ""
+
+ def recv_frame(self):
+
+ with self.lock:
+ # Header
+ if self.has_received_header():
+ self.recv_header()
+ (fin, rsv1, rsv2, rsv3, opcode, has_mask, _) = self.header
+
+ # Frame length
+ if self.has_received_length():
+ self.recv_length()
+ length = self.length
+
+ # Mask
+ if self.has_received_mask():
+ self.recv_mask()
+ mask = self.mask
+
+ # Payload
+ payload = self.recv_strict(length)
+ if has_mask:
+ payload = ABNF.mask(mask, payload)
+
+ # Reset for next frame
+ self.clear()
+
+ frame = ABNF(fin, rsv1, rsv2, rsv3, opcode, has_mask, payload)
+ frame.validate(self.skip_utf8_validation)
+
+ return frame
+
+ def recv_strict(self, bufsize):
+ shortage = bufsize - sum(len(x) for x in self.recv_buffer)
+ while shortage > 0:
+ # Limit buffer size that we pass to socket.recv() to avoid
+ # fragmenting the heap -- the number of bytes recv() actually
+ # reads is limited by socket buffer and is relatively small,
+ # yet passing large numbers repeatedly causes lots of large
+ # buffers allocated and then shrunk, which results in
+ # fragmentation.
+ bytes_ = self.recv(min(16384, shortage))
+ self.recv_buffer.append(bytes_)
+ shortage -= len(bytes_)
+
+ unified = six.b("").join(self.recv_buffer)
+
+ if shortage == 0:
+ self.recv_buffer = []
+ return unified
+ else:
+ self.recv_buffer = [unified[bufsize:]]
+ return unified[:bufsize]
+
+
+class continuous_frame(object):
+
+ def __init__(self, fire_cont_frame, skip_utf8_validation):
+ self.fire_cont_frame = fire_cont_frame
+ self.skip_utf8_validation = skip_utf8_validation
+ self.cont_data = None
+ self.recving_frames = None
+
+ def validate(self, frame):
+ if not self.recving_frames and frame.opcode == ABNF.OPCODE_CONT:
+ raise WebSocketProtocolException("Illegal frame")
+ if self.recving_frames and \
+ frame.opcode in (ABNF.OPCODE_TEXT, ABNF.OPCODE_BINARY):
+ raise WebSocketProtocolException("Illegal frame")
+
+ def add(self, frame):
+ if self.cont_data:
+ self.cont_data[1] += frame.data
+ else:
+ if frame.opcode in (ABNF.OPCODE_TEXT, ABNF.OPCODE_BINARY):
+ self.recving_frames = frame.opcode
+ self.cont_data = [frame.opcode, frame.data]
+
+ if frame.fin:
+ self.recving_frames = None
+
+ def is_fire(self, frame):
+ return frame.fin or self.fire_cont_frame
+
+ def extract(self, frame):
+ data = self.cont_data
+ self.cont_data = None
+ frame.data = data[1]
+ if not self.fire_cont_frame and data[0] == ABNF.OPCODE_TEXT and not self.skip_utf8_validation and not validate_utf8(frame.data):
+ raise WebSocketPayloadException(
+ "cannot decode: " + repr(frame.data))
+
+ return [data[0], frame]