aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/python/websocket-client/py3/websocket/_utils.py
diff options
context:
space:
mode:
authoralexv-smirnov <alex@ydb.tech>2023-12-01 12:02:50 +0300
committeralexv-smirnov <alex@ydb.tech>2023-12-01 13:28:10 +0300
commit0e578a4c44d4abd539d9838347b9ebafaca41dfb (patch)
treea0c1969c37f818c830ebeff9c077eacf30be6ef8 /contrib/python/websocket-client/py3/websocket/_utils.py
parent84f2d3d4cc985e63217cff149bd2e6d67ae6fe22 (diff)
downloadydb-0e578a4c44d4abd539d9838347b9ebafaca41dfb.tar.gz
Change "ya.make"
Diffstat (limited to 'contrib/python/websocket-client/py3/websocket/_utils.py')
-rw-r--r--contrib/python/websocket-client/py3/websocket/_utils.py106
1 files changed, 106 insertions, 0 deletions
diff --git a/contrib/python/websocket-client/py3/websocket/_utils.py b/contrib/python/websocket-client/py3/websocket/_utils.py
new file mode 100644
index 0000000000..62ba0b01b8
--- /dev/null
+++ b/contrib/python/websocket-client/py3/websocket/_utils.py
@@ -0,0 +1,106 @@
+from typing import Union
+
+"""
+_url.py
+websocket - WebSocket client library for Python
+
+Copyright 2023 engn33r
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+"""
+__all__ = ["NoLock", "validate_utf8", "extract_err_message", "extract_error_code"]
+
+
+class NoLock:
+
+ def __enter__(self) -> None:
+ pass
+
+ def __exit__(self, exc_type, exc_value, traceback) -> None:
+ pass
+
+
+try:
+ # If wsaccel is available we use compiled routines to validate UTF-8
+ # strings.
+ from wsaccel.utf8validator import Utf8Validator
+
+ def _validate_utf8(utfbytes: bytes) -> bool:
+ return Utf8Validator().validate(utfbytes)[0]
+
+except ImportError:
+ # UTF-8 validator
+ # python implementation of http://bjoern.hoehrmann.de/utf-8/decoder/dfa/
+
+ _UTF8_ACCEPT = 0
+ _UTF8_REJECT = 12
+
+ _UTF8D = [
+ # The first part of the table maps bytes to character classes that
+ # to reduce the size of the transition table and create bitmasks.
+ 0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0, 0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,
+ 0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0, 0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,
+ 0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0, 0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,
+ 0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0, 0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,
+ 1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1, 9,9,9,9,9,9,9,9,9,9,9,9,9,9,9,9,
+ 7,7,7,7,7,7,7,7,7,7,7,7,7,7,7,7, 7,7,7,7,7,7,7,7,7,7,7,7,7,7,7,7,
+ 8,8,2,2,2,2,2,2,2,2,2,2,2,2,2,2, 2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,
+ 10,3,3,3,3,3,3,3,3,3,3,3,3,4,3,3, 11,6,6,6,5,8,8,8,8,8,8,8,8,8,8,8,
+
+ # The second part is a transition table that maps a combination
+ # of a state of the automaton and a character class to a state.
+ 0,12,24,36,60,96,84,12,12,12,48,72, 12,12,12,12,12,12,12,12,12,12,12,12,
+ 12, 0,12,12,12,12,12, 0,12, 0,12,12, 12,24,12,12,12,12,12,24,12,24,12,12,
+ 12,12,12,12,12,12,12,24,12,12,12,12, 12,24,12,12,12,12,12,12,12,24,12,12,
+ 12,12,12,12,12,12,12,36,12,36,12,12, 12,36,12,12,12,12,12,36,12,36,12,12,
+ 12,36,12,12,12,12,12,12,12,12,12,12, ]
+
+ def _decode(state: int, codep: int, ch: int) -> tuple:
+ tp = _UTF8D[ch]
+
+ codep = (ch & 0x3f) | (codep << 6) if (
+ state != _UTF8_ACCEPT) else (0xff >> tp) & ch
+ state = _UTF8D[256 + state + tp]
+
+ return state, codep
+
+ def _validate_utf8(utfbytes: Union[str, bytes]) -> bool:
+ state = _UTF8_ACCEPT
+ codep = 0
+ for i in utfbytes:
+ state, codep = _decode(state, codep, i)
+ if state == _UTF8_REJECT:
+ return False
+
+ return True
+
+
+def validate_utf8(utfbytes: Union[str, bytes]) -> bool:
+ """
+ validate utf8 byte string.
+ utfbytes: utf byte string to check.
+ return value: if valid utf8 string, return true. Otherwise, return false.
+ """
+ return _validate_utf8(utfbytes)
+
+
+def extract_err_message(exception: Exception) -> Union[str, None]:
+ if exception.args:
+ return exception.args[0]
+ else:
+ return None
+
+
+def extract_error_code(exception: Exception) -> Union[int, None]:
+ if exception.args and len(exception.args) > 1:
+ return exception.args[0] if isinstance(exception.args[0], int) else None