diff options
Diffstat (limited to 'contrib/python')
47 files changed, 1195 insertions, 249 deletions
diff --git a/contrib/python/argcomplete/py3/.dist-info/METADATA b/contrib/python/argcomplete/py3/.dist-info/METADATA index bf74fb4961..8eff29ade2 100644 --- a/contrib/python/argcomplete/py3/.dist-info/METADATA +++ b/contrib/python/argcomplete/py3/.dist-info/METADATA @@ -1,6 +1,6 @@ Metadata-Version: 2.4 Name: argcomplete -Version: 3.6.0 +Version: 3.6.1 Summary: Bash tab completion for argparse Project-URL: Homepage, https://github.com/kislyuk/argcomplete Project-URL: Documentation, https://kislyuk.github.io/argcomplete diff --git a/contrib/python/argcomplete/py3/argcomplete/finders.py b/contrib/python/argcomplete/py3/argcomplete/finders.py index 793b462eed..8d248fd973 100644 --- a/contrib/python/argcomplete/py3/argcomplete/finders.py +++ b/contrib/python/argcomplete/py3/argcomplete/finders.py @@ -515,7 +515,7 @@ class CompletionFinder(object): # Bash mangles completions which contain characters in COMP_WORDBREAKS. # This workaround has the same effect as __ltrim_colon_completions in bash_completion # (extended to characters other than the colon). - if last_wordbreak_pos: + if last_wordbreak_pos is not None: completions = [c[last_wordbreak_pos + 1 :] for c in completions] special_chars += "();<>|&!`$* \t\n\"'" elif cword_prequote == '"': diff --git a/contrib/python/argcomplete/py3/argcomplete/packages/_shlex.py b/contrib/python/argcomplete/py3/argcomplete/packages/_shlex.py index ecd785b80b..890a38f43f 100644 --- a/contrib/python/argcomplete/py3/argcomplete/packages/_shlex.py +++ b/contrib/python/argcomplete/py3/argcomplete/packages/_shlex.py @@ -177,6 +177,9 @@ class shlex: elif self.whitespace_split: self.token = nextchar self.state = 'a' + # Modified by argcomplete: Record last wordbreak position + if nextchar in self.wordbreaks: + self.last_wordbreak_pos = len(self.token) - 1 else: self.token = nextchar if self.token or (self.posix and quoted): diff --git a/contrib/python/argcomplete/py3/argcomplete/scripts/activate_global_python_argcomplete.py b/contrib/python/argcomplete/py3/argcomplete/scripts/activate_global_python_argcomplete.py index 768b8aa6bf..299d081c0e 100644 --- a/contrib/python/argcomplete/py3/argcomplete/scripts/activate_global_python_argcomplete.py +++ b/contrib/python/argcomplete/py3/argcomplete/scripts/activate_global_python_argcomplete.py @@ -121,16 +121,33 @@ def append_to_config_file(path, shellcode): fh.write(shellcode) print("Added.", file=sys.stderr) - -def link_user_rcfiles(): - # TODO: warn if running as superuser +def link_zsh_user_rcfile(zsh_fpath=None): zsh_rcfile = os.path.join(os.path.expanduser(os.environ.get("ZDOTDIR", "~")), ".zshenv") - append_to_config_file(zsh_rcfile, zsh_shellcode.format(zsh_fpath=get_activator_dir())) + append_to_config_file(zsh_rcfile, zsh_shellcode.format(zsh_fpath=zsh_fpath or get_activator_dir())) +def link_bash_user_rcfile(): bash_completion_user_file = os.path.expanduser("~/.bash_completion") append_to_config_file(bash_completion_user_file, bash_shellcode.format(activator=get_activator_path())) +def link_user_rcfiles(): + # TODO: warn if running as superuser + link_zsh_user_rcfile() + link_bash_user_rcfile() + +def add_zsh_system_dir_to_fpath_for_user(): + if "zsh" not in os.environ.get("SHELL", ""): + return + try: + zsh_system_dir = get_zsh_system_dir() + fpath_output = subprocess.check_output([os.environ["SHELL"], "-c", 'printf "%s\n" "${fpath[@]}"']) + for fpath in fpath_output.decode().splitlines(): + if fpath == zsh_system_dir: + return + link_zsh_user_rcfile(zsh_fpath=zsh_system_dir) + except (FileNotFoundError, subprocess.CalledProcessError): + pass + def main(): global args args = parser.parse_args() @@ -160,6 +177,8 @@ def main(): for destination in destinations: install_to_destination(destination) + add_zsh_system_dir_to_fpath_for_user() + if args.dest is None: print("Please restart your shell or source the installed file to activate it.", file=sys.stderr) diff --git a/contrib/python/argcomplete/py3/argcomplete/shell_integration.py b/contrib/python/argcomplete/py3/argcomplete/shell_integration.py index 37b5603b11..cac48902fa 100644 --- a/contrib/python/argcomplete/py3/argcomplete/shell_integration.py +++ b/contrib/python/argcomplete/py3/argcomplete/shell_integration.py @@ -166,7 +166,8 @@ def shellcode(executables, use_defaults=True, shell="bash", complete_arguments=N executables_list = " ".join(quoted_executables) script = argcomplete_script if script: - function_suffix = "_" + script + # If the script path contain a space, this would generate an invalid function name. + function_suffix = "_" + script.replace(" ", "_SPACE_") else: script = "" function_suffix = "" diff --git a/contrib/python/argcomplete/py3/ya.make b/contrib/python/argcomplete/py3/ya.make index 74c5629658..327bc4e34e 100644 --- a/contrib/python/argcomplete/py3/ya.make +++ b/contrib/python/argcomplete/py3/ya.make @@ -2,7 +2,7 @@ PY3_LIBRARY() -VERSION(3.6.0) +VERSION(3.6.1) LICENSE(Apache-2.0) diff --git a/contrib/python/iniconfig/.dist-info/METADATA b/contrib/python/iniconfig/.dist-info/METADATA index 3ea1e01cb0..3a8ef46a3b 100644 --- a/contrib/python/iniconfig/.dist-info/METADATA +++ b/contrib/python/iniconfig/.dist-info/METADATA @@ -1,6 +1,6 @@ -Metadata-Version: 2.1 +Metadata-Version: 2.4 Name: iniconfig -Version: 2.0.0 +Version: 2.1.0 Summary: brain-dead simple config-ini parsing Project-URL: Homepage, https://github.com/pytest-dev/iniconfig Author-email: Ronny Pfannschmidt <opensource@ronnypfannschmidt.de>, Holger Krekel <holger.krekel@gmail.com> @@ -14,14 +14,15 @@ Classifier: Operating System :: Microsoft :: Windows Classifier: Operating System :: POSIX Classifier: Programming Language :: Python :: 3 Classifier: Programming Language :: Python :: 3 :: Only -Classifier: Programming Language :: Python :: 3.7 Classifier: Programming Language :: Python :: 3.8 Classifier: Programming Language :: Python :: 3.9 Classifier: Programming Language :: Python :: 3.10 Classifier: Programming Language :: Python :: 3.11 +Classifier: Programming Language :: Python :: 3.12 +Classifier: Programming Language :: Python :: 3.13 Classifier: Topic :: Software Development :: Libraries Classifier: Topic :: Utilities -Requires-Python: >=3.7 +Requires-Python: >=3.8 Description-Content-Type: text/x-rst iniconfig: brain-dead simple parsing of ini files diff --git a/contrib/python/iniconfig/LICENSE b/contrib/python/iniconfig/LICENSE index 31ecdfb1db..46f4b2846f 100644 --- a/contrib/python/iniconfig/LICENSE +++ b/contrib/python/iniconfig/LICENSE @@ -1,19 +1,21 @@ +The MIT License (MIT) - Permission is hereby granted, free of charge, to any person obtaining a copy - of this software and associated documentation files (the "Software"), to deal - in the Software without restriction, including without limitation the rights - to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - copies of the Software, and to permit persons to whom the Software is - furnished to do so, subject to the following conditions: - - The above copyright notice and this permission notice shall be included in all - copies or substantial portions of the Software. - - THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - SOFTWARE. +Copyright (c) 2010 - 2023 Holger Krekel and others +Permission is hereby granted, free of charge, to any person obtaining a copy of +this software and associated documentation files (the "Software"), to deal in +the Software without restriction, including without limitation the rights to +use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies +of the Software, and to permit persons to whom the Software is furnished to do +so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/contrib/python/iniconfig/iniconfig/__init__.py b/contrib/python/iniconfig/iniconfig/__init__.py index c1c94f70ae..ed6499bc6c 100644 --- a/contrib/python/iniconfig/iniconfig/__init__.py +++ b/contrib/python/iniconfig/iniconfig/__init__.py @@ -20,7 +20,7 @@ from typing import ( import os if TYPE_CHECKING: - from typing_extensions import Final + from typing import Final __all__ = ["IniConfig", "ParseError", "COMMENTCHARS", "iscommentline"] diff --git a/contrib/python/iniconfig/iniconfig/_version.py b/contrib/python/iniconfig/iniconfig/_version.py index dd1883d734..e058e2c657 100644 --- a/contrib/python/iniconfig/iniconfig/_version.py +++ b/contrib/python/iniconfig/iniconfig/_version.py @@ -1,4 +1,21 @@ -# file generated by setuptools_scm +# file generated by setuptools-scm # don't change, don't track in version control -__version__ = version = '2.0.0' -__version_tuple__ = version_tuple = (2, 0, 0) + +__all__ = ["__version__", "__version_tuple__", "version", "version_tuple"] + +TYPE_CHECKING = False +if TYPE_CHECKING: + from typing import Tuple + from typing import Union + + VERSION_TUPLE = Tuple[Union[int, str], ...] +else: + VERSION_TUPLE = object + +version: str +__version__: str +__version_tuple__: VERSION_TUPLE +version_tuple: VERSION_TUPLE + +__version__ = version = '2.1.0' +__version_tuple__ = version_tuple = (2, 1, 0) diff --git a/contrib/python/iniconfig/iniconfig/exceptions.py b/contrib/python/iniconfig/iniconfig/exceptions.py index bc898e68ee..8c4dc9a8b0 100644 --- a/contrib/python/iniconfig/iniconfig/exceptions.py +++ b/contrib/python/iniconfig/iniconfig/exceptions.py @@ -2,7 +2,7 @@ from __future__ import annotations from typing import TYPE_CHECKING if TYPE_CHECKING: - from typing_extensions import Final + from typing import Final class ParseError(Exception): diff --git a/contrib/python/iniconfig/ya.make b/contrib/python/iniconfig/ya.make index 0121cca743..20492d75c6 100644 --- a/contrib/python/iniconfig/ya.make +++ b/contrib/python/iniconfig/ya.make @@ -2,7 +2,7 @@ PY3_LIBRARY() -VERSION(2.0.0) +VERSION(2.1.0) LICENSE(MIT) diff --git a/contrib/python/pyparsing/py3/.dist-info/METADATA b/contrib/python/pyparsing/py3/.dist-info/METADATA index 6b5fbefef6..ed52278486 100644 --- a/contrib/python/pyparsing/py3/.dist-info/METADATA +++ b/contrib/python/pyparsing/py3/.dist-info/METADATA @@ -1,6 +1,6 @@ Metadata-Version: 2.1 Name: pyparsing -Version: 3.2.1 +Version: 3.2.2 Summary: pyparsing module - Classes and methods to define and execute parsing grammars Author-email: Paul McGuire <ptmcg.gm+pyparsing@gmail.com> Requires-Python: >=3.9 @@ -17,6 +17,7 @@ Classifier: Programming Language :: Python :: 3.10 Classifier: Programming Language :: Python :: 3.11 Classifier: Programming Language :: Python :: 3.12 Classifier: Programming Language :: Python :: 3.13 +Classifier: Programming Language :: Python :: 3.14 Classifier: Programming Language :: Python :: 3 :: Only Classifier: Programming Language :: Python :: Implementation :: CPython Classifier: Programming Language :: Python :: Implementation :: PyPy @@ -56,7 +57,7 @@ Here is a program to parse ``"Hello, World!"`` (or any greeting of the form from pyparsing import Word, alphas greet = Word(alphas) + "," + Word(alphas) + "!" hello = "Hello, World!" - print(hello, "->", greet.parseString(hello)) + print(hello, "->", greet.parse_string(hello)) The program outputs the following:: @@ -66,7 +67,7 @@ The Python representation of the grammar is quite readable, owing to the self-explanatory class names, and the use of '+', '|' and '^' operator definitions. -The parsed results returned from ``parseString()`` is a collection of type +The parsed results returned from ``parse_string()`` is a collection of type ``ParseResults``, which can be accessed as a nested list, a dictionary, or an object with named attributes. diff --git a/contrib/python/pyparsing/py3/README.rst b/contrib/python/pyparsing/py3/README.rst index 24d603c7bc..cfb9889f85 100644 --- a/contrib/python/pyparsing/py3/README.rst +++ b/contrib/python/pyparsing/py3/README.rst @@ -26,7 +26,7 @@ Here is a program to parse ``"Hello, World!"`` (or any greeting of the form from pyparsing import Word, alphas greet = Word(alphas) + "," + Word(alphas) + "!" hello = "Hello, World!" - print(hello, "->", greet.parseString(hello)) + print(hello, "->", greet.parse_string(hello)) The program outputs the following:: @@ -36,7 +36,7 @@ The Python representation of the grammar is quite readable, owing to the self-explanatory class names, and the use of '+', '|' and '^' operator definitions. -The parsed results returned from ``parseString()`` is a collection of type +The parsed results returned from ``parse_string()`` is a collection of type ``ParseResults``, which can be accessed as a nested list, a dictionary, or an object with named attributes. diff --git a/contrib/python/pyparsing/py3/pyparsing/__init__.py b/contrib/python/pyparsing/py3/pyparsing/__init__.py index 726c76cb24..fa1f2abe67 100644 --- a/contrib/python/pyparsing/py3/pyparsing/__init__.py +++ b/contrib/python/pyparsing/py3/pyparsing/__init__.py @@ -120,8 +120,8 @@ class version_info(NamedTuple): return f"{__name__}.{type(self).__name__}({', '.join('{}={!r}'.format(*nv) for nv in zip(self._fields, self))})" -__version_info__ = version_info(3, 2, 1, "final", 1) -__version_time__ = "31 Dec 2024 20:41 UTC" +__version_info__ = version_info(3, 2, 2, "final", 1) +__version_time__ = "22 Mar 2025 22:09 UTC" __version__ = __version_info__.__version__ __versionTime__ = __version_time__ __author__ = "Paul McGuire <ptmcg.gm+pyparsing@gmail.com>" diff --git a/contrib/python/pyparsing/py3/pyparsing/actions.py b/contrib/python/pyparsing/py3/pyparsing/actions.py index f491aab986..0153cc7132 100644 --- a/contrib/python/pyparsing/py3/pyparsing/actions.py +++ b/contrib/python/pyparsing/py3/pyparsing/actions.py @@ -22,7 +22,7 @@ class OnlyOnce: Note: parse action signature must include all 3 arguments. """ - def __init__(self, method_call: Callable[[str, int, ParseResults], Any]): + def __init__(self, method_call: Callable[[str, int, ParseResults], Any]) -> None: from .core import _trim_arity self.callable = _trim_arity(method_call) diff --git a/contrib/python/pyparsing/py3/pyparsing/core.py b/contrib/python/pyparsing/py3/pyparsing/core.py index b884e2d4a4..86be949ad4 100644 --- a/contrib/python/pyparsing/py3/pyparsing/core.py +++ b/contrib/python/pyparsing/py3/pyparsing/core.py @@ -38,7 +38,6 @@ from .util import ( __config_flags, _collapse_string_to_ranges, _escape_regex_range_chars, - _bslash, _flatten, LRUMemo as _LRUMemo, UnboundedMemo as _UnboundedMemo, @@ -246,7 +245,7 @@ class _ParseActionIndexError(Exception): ParserElement parseImpl methods. """ - def __init__(self, msg: str, exc: BaseException): + def __init__(self, msg: str, exc: BaseException) -> None: self.msg: str = msg self.exc: BaseException = exc @@ -355,7 +354,7 @@ def _default_start_debug_action( ( f"{cache_hit_str}Match {expr} at loc {loc}({lineno(loc, instring)},{col(loc, instring)})\n" f" {line(loc, instring)}\n" - f" {' ' * (col(loc, instring) - 1)}^" + f" {'^':>{col(loc, instring)}}" ) ) @@ -454,7 +453,7 @@ class ParserElement(ABC): debug_match: typing.Optional[DebugSuccessAction] debug_fail: typing.Optional[DebugExceptionAction] - def __init__(self, savelist: bool = False): + def __init__(self, savelist: bool = False) -> None: self.parseAction: list[ParseAction] = list() self.failAction: typing.Optional[ParseFailAction] = None self.customName: str = None # type: ignore[assignment] @@ -465,7 +464,7 @@ class ParserElement(ABC): self.whiteChars = set(ParserElement.DEFAULT_WHITE_CHARS) self.copyDefaultWhiteChars = True # used when checking for left-recursion - self.mayReturnEmpty = False + self._may_return_empty = False self.keepTabs = False self.ignoreExprs: list[ParserElement] = list() self.debug = False @@ -483,6 +482,14 @@ class ParserElement(ABC): self.suppress_warnings_: list[Diagnostics] = [] self.show_in_diagram = True + @property + def mayReturnEmpty(self): + return self._may_return_empty + + @mayReturnEmpty.setter + def mayReturnEmpty(self, value): + self._may_return_empty = value + def suppress_warning(self, warning_type: Diagnostics) -> ParserElement: """ Suppress warnings emitted for a particular diagnostic on this expression. @@ -2264,6 +2271,7 @@ class ParserElement(ABC): show_results_names: bool = False, show_groups: bool = False, embed: bool = False, + show_hidden: bool = False, **kwargs, ) -> None: """ @@ -2278,6 +2286,7 @@ class ParserElement(ABC): - ``show_results_names`` - bool flag whether diagram should show annotations for defined results names - ``show_groups`` - bool flag whether groups should be highlighted with an unlabeled surrounding box + - ``show_hidden`` - bool flag to show diagram elements for internal elements that are usually hidden - ``embed`` - bool flag whether generated HTML should omit <HEAD>, <BODY>, and <DOCTYPE> tags to embed the resulting HTML in an enclosing HTML source - ``head`` - str containing additional HTML to insert into the <HEAD> section of the generated code; @@ -2303,6 +2312,7 @@ class ParserElement(ABC): vertical=vertical, show_results_names=show_results_names, show_groups=show_groups, + show_hidden=show_hidden, diagram_kwargs=kwargs, ) if not isinstance(output_html, (str, Path)): @@ -2352,7 +2362,7 @@ class ParserElement(ABC): class _PendingSkip(ParserElement): # internal placeholder class to hold a place were '...' is added to a parser element, # once another ParserElement is added, this placeholder will be replaced with a SkipTo - def __init__(self, expr: ParserElement, must_skip: bool = False): + def __init__(self, expr: ParserElement, must_skip: bool = False) -> None: super().__init__() self.anchor = expr self.must_skip = must_skip @@ -2395,7 +2405,7 @@ class Token(ParserElement): matching patterns. """ - def __init__(self): + def __init__(self) -> None: super().__init__(savelist=False) def _generateDefaultName(self) -> str: @@ -2407,9 +2417,9 @@ class NoMatch(Token): A token that will never match. """ - def __init__(self): + def __init__(self) -> None: super().__init__() - self.mayReturnEmpty = True + self._may_return_empty = True self.mayIndexError = False self.errmsg = "Unmatchable token" @@ -2449,14 +2459,14 @@ class Literal(Token): def __getnewargs__(self): return (self.match,) - def __init__(self, match_string: str = "", *, matchString: str = ""): + def __init__(self, match_string: str = "", *, matchString: str = "") -> None: super().__init__() match_string = matchString or match_string self.match = match_string self.matchLen = len(match_string) self.firstMatchChar = match_string[:1] self.errmsg = f"Expected {self.name}" - self.mayReturnEmpty = False + self._may_return_empty = False self.mayIndexError = False def _generateDefaultName(self) -> str: @@ -2475,9 +2485,9 @@ class Empty(Literal): An empty token, will always match. """ - def __init__(self, match_string="", *, matchString=""): + def __init__(self, match_string="", *, matchString="") -> None: super().__init__("") - self.mayReturnEmpty = True + self._may_return_empty = True self.mayIndexError = False def _generateDefaultName(self) -> str: @@ -2534,7 +2544,7 @@ class Keyword(Token): *, matchString: str = "", identChars: typing.Optional[str] = None, - ): + ) -> None: super().__init__() identChars = identChars or ident_chars if identChars is None: @@ -2546,7 +2556,7 @@ class Keyword(Token): if not self.firstMatchChar: raise ValueError("null string passed to Keyword; use Empty() instead") self.errmsg = f"Expected {type(self).__name__} {self.name}" - self.mayReturnEmpty = False + self._may_return_empty = False self.mayIndexError = False self.caseless = caseless if caseless: @@ -2628,7 +2638,7 @@ class CaselessLiteral(Literal): (Contrast with example for :class:`CaselessKeyword`.) """ - def __init__(self, match_string: str = "", *, matchString: str = ""): + def __init__(self, match_string: str = "", *, matchString: str = "") -> None: match_string = matchString or match_string super().__init__(match_string.upper()) # Preserve the defining literal. @@ -2660,7 +2670,7 @@ class CaselessKeyword(Keyword): *, matchString: str = "", identChars: typing.Optional[str] = None, - ): + ) -> None: identChars = identChars or ident_chars match_string = matchString or match_string super().__init__(match_string, identChars, caseless=True) @@ -2708,7 +2718,7 @@ class CloseMatch(Token): *, maxMismatches: int = 1, caseless=False, - ): + ) -> None: maxMismatches = max_mismatches if max_mismatches is not None else maxMismatches super().__init__() self.match_string = match_string @@ -2716,7 +2726,7 @@ class CloseMatch(Token): self.errmsg = f"Expected {self.match_string!r} (with up to {self.maxMismatches} mismatches)" self.caseless = caseless self.mayIndexError = False - self.mayReturnEmpty = False + self._may_return_empty = False def _generateDefaultName(self) -> str: return f"{type(self).__name__}:{self.match_string!r}" @@ -2834,7 +2844,7 @@ class Word(Token): bodyChars: typing.Optional[str] = None, asKeyword: bool = False, excludeChars: typing.Optional[str] = None, - ): + ) -> None: initChars = initChars or init_chars bodyChars = bodyChars or body_chars asKeyword = asKeyword or as_keyword @@ -3018,7 +3028,7 @@ class Char(Word): *, asKeyword: bool = False, excludeChars: typing.Optional[str] = None, - ): + ) -> None: asKeyword = asKeyword or as_keyword excludeChars = excludeChars or exclude_chars super().__init__( @@ -3060,7 +3070,7 @@ class Regex(Token): *, asGroupList: bool = False, asMatch: bool = False, - ): + ) -> None: """The parameters ``pattern`` and ``flags`` are passed to the ``re.compile()`` function as-is. See the Python `re module <https://docs.python.org/3/library/re.html>`_ module for an @@ -3075,15 +3085,18 @@ class Regex(Token): raise ValueError("null string passed to Regex; use Empty() instead") self._re = None + self._may_return_empty = None # type: ignore [assignment] self.reString = self.pattern = pattern elif hasattr(pattern, "pattern") and hasattr(pattern, "match"): self._re = pattern + self._may_return_empty = None # type: ignore [assignment] self.pattern = self.reString = pattern.pattern elif callable(pattern): # defer creating this pattern until we really need it self.pattern = pattern + self._may_return_empty = None # type: ignore [assignment] self._re = None else: @@ -3120,23 +3133,38 @@ class Regex(Token): try: self._re = re.compile(self.pattern, self.flags) - return self._re except re.error: raise ValueError(f"invalid pattern ({self.pattern!r}) passed to Regex") + else: + self._may_return_empty = self.re.match("", pos=0) is not None + return self._re @cached_property def re_match(self) -> Callable[[str, int], Any]: return self.re.match - @cached_property - def mayReturnEmpty(self) -> bool: # type: ignore[override] - return self.re_match("", 0) is not None + @property + def mayReturnEmpty(self): + if self._may_return_empty is None: + # force compile of regex pattern, to set may_return_empty flag + self.re # noqa + return self._may_return_empty + + @mayReturnEmpty.setter + def mayReturnEmpty(self, value): + self._may_return_empty = value def _generateDefaultName(self) -> str: unescaped = repr(self.pattern).replace("\\\\", "\\") return f"Re:({unescaped})" def parseImpl(self, instring, loc, do_actions=True) -> ParseImplReturnType: + # explicit check for matching past the length of the string; + # this is done because the re module will not complain about + # a match with `pos > len(instring)`, it will just return "" + if loc > len(instring) and self.mayReturnEmpty: + raise ParseException(instring, loc, self.errmsg, self) + result = self.re_match(instring, loc) if not result: raise ParseException(instring, loc, self.errmsg, self) @@ -3151,6 +3179,9 @@ class Regex(Token): return loc, ret def parseImplAsGroupList(self, instring, loc, do_actions=True): + if loc > len(instring) and self.mayReturnEmpty: + raise ParseException(instring, loc, self.errmsg, self) + result = self.re_match(instring, loc) if not result: raise ParseException(instring, loc, self.errmsg, self) @@ -3160,6 +3191,9 @@ class Regex(Token): return loc, ret def parseImplAsMatch(self, instring, loc, do_actions=True): + if loc > len(instring) and self.mayReturnEmpty: + raise ParseException(instring, loc, self.errmsg, self) + result = self.re_match(instring, loc) if not result: raise ParseException(instring, loc, self.errmsg, self) @@ -3258,7 +3292,7 @@ class QuotedString(Token): unquoteResults: bool = True, endQuoteChar: typing.Optional[str] = None, convertWhitespaceEscapes: bool = True, - ): + ) -> None: super().__init__() esc_char = escChar or esc_char esc_quote = escQuote or esc_quote @@ -3362,7 +3396,7 @@ class QuotedString(Token): self.errmsg = f"Expected {self.name}" self.mayIndexError = False - self.mayReturnEmpty = True + self._may_return_empty = True def _generateDefaultName(self) -> str: if self.quote_char == self.end_quote_char and isinstance( @@ -3465,7 +3499,7 @@ class CharsNotIn(Token): exact: int = 0, *, notChars: str = "", - ): + ) -> None: super().__init__() self.skipWhitespace = False self.notChars = not_chars or notChars @@ -3489,7 +3523,7 @@ class CharsNotIn(Token): self.minLen = exact self.errmsg = f"Expected {self.name}" - self.mayReturnEmpty = self.minLen == 0 + self._may_return_empty = self.minLen == 0 self.mayIndexError = False def _generateDefaultName(self) -> str: @@ -3552,7 +3586,9 @@ class White(Token): "\u3000": "<IDEOGRAPHIC_SPACE>", } - def __init__(self, ws: str = " \t\r\n", min: int = 1, max: int = 0, exact: int = 0): + def __init__( + self, ws: str = " \t\r\n", min: int = 1, max: int = 0, exact: int = 0 + ) -> None: super().__init__() self.matchWhite = ws self.set_whitespace_chars( @@ -3560,7 +3596,7 @@ class White(Token): copy_defaults=True, ) # self.leave_whitespace() - self.mayReturnEmpty = True + self._may_return_empty = True self.errmsg = f"Expected {self.name}" self.minLen = min @@ -3594,9 +3630,9 @@ class White(Token): class PositionToken(Token): - def __init__(self): + def __init__(self) -> None: super().__init__() - self.mayReturnEmpty = True + self._may_return_empty = True self.mayIndexError = False @@ -3605,7 +3641,7 @@ class GoToColumn(PositionToken): tabular report scraping. """ - def __init__(self, colno: int): + def __init__(self, colno: int) -> None: super().__init__() self.col = colno @@ -3657,7 +3693,7 @@ class LineStart(PositionToken): """ - def __init__(self): + def __init__(self) -> None: super().__init__() self.leave_whitespace() self.orig_whiteChars = set() | self.whiteChars @@ -3688,7 +3724,7 @@ class LineEnd(PositionToken): parse string """ - def __init__(self): + def __init__(self) -> None: super().__init__() self.whiteChars.discard("\n") self.set_whitespace_chars(self.whiteChars, copy_defaults=False) @@ -3711,7 +3747,7 @@ class StringStart(PositionToken): string """ - def __init__(self): + def __init__(self) -> None: super().__init__() self.set_name("start of text") @@ -3728,7 +3764,7 @@ class StringEnd(PositionToken): Matches if current position is at the end of the parse string """ - def __init__(self): + def __init__(self) -> None: super().__init__() self.set_name("end of text") @@ -3753,7 +3789,9 @@ class WordStart(PositionToken): a line. """ - def __init__(self, word_chars: str = printables, *, wordChars: str = printables): + def __init__( + self, word_chars: str = printables, *, wordChars: str = printables + ) -> None: wordChars = word_chars if wordChars == printables else wordChars super().__init__() self.wordChars = set(wordChars) @@ -3778,7 +3816,9 @@ class WordEnd(PositionToken): of a line. """ - def __init__(self, word_chars: str = printables, *, wordChars: str = printables): + def __init__( + self, word_chars: str = printables, *, wordChars: str = printables + ) -> None: wordChars = word_chars if wordChars == printables else wordChars super().__init__() self.wordChars = set(wordChars) @@ -3822,14 +3862,15 @@ class Tag(Token): - enthusiastic: True """ - def __init__(self, tag_name: str, value: Any = True): + def __init__(self, tag_name: str, value: Any = True) -> None: super().__init__() - self.mayReturnEmpty = True + self._may_return_empty = True self.mayIndexError = False self.leave_whitespace() self.tag_name = tag_name self.tag_value = value self.add_parse_action(self._add_tag) + self.show_in_diagram = False def _add_tag(self, tokens: ParseResults): tokens[self.tag_name] = self.tag_value @@ -3843,7 +3884,9 @@ class ParseExpression(ParserElement): post-processing parsed tokens. """ - def __init__(self, exprs: typing.Iterable[ParserElement], savelist: bool = False): + def __init__( + self, exprs: typing.Iterable[ParserElement], savelist: bool = False + ) -> None: super().__init__(savelist) self.exprs: list[ParserElement] if isinstance(exprs, _generatorType): @@ -3939,7 +3982,7 @@ class ParseExpression(ParserElement): ): self.exprs = other.exprs[:] + [self.exprs[1]] self._defaultName = None - self.mayReturnEmpty |= other.mayReturnEmpty + self._may_return_empty |= other.mayReturnEmpty self.mayIndexError |= other.mayIndexError other = self.exprs[-1] @@ -3951,7 +3994,7 @@ class ParseExpression(ParserElement): ): self.exprs = self.exprs[:-1] + other.exprs[:] self._defaultName = None - self.mayReturnEmpty |= other.mayReturnEmpty + self._may_return_empty |= other.mayReturnEmpty self.mayIndexError |= other.mayIndexError self.errmsg = f"Expected {self}" @@ -4028,7 +4071,7 @@ class And(ParseExpression): """ class _ErrorStop(Empty): - def __init__(self, *args, **kwargs): + def __init__(self, *args, **kwargs) -> None: super().__init__(*args, **kwargs) self.leave_whitespace() @@ -4036,28 +4079,34 @@ class And(ParseExpression): return "-" def __init__( - self, exprs_arg: typing.Iterable[ParserElement], savelist: bool = True - ): - exprs: list[ParserElement] = list(exprs_arg) - if exprs and Ellipsis in exprs: - tmp: list[ParserElement] = [] - for i, expr in enumerate(exprs): - if expr is not Ellipsis: - tmp.append(expr) - continue + self, + exprs_arg: typing.Iterable[Union[ParserElement, str]], + savelist: bool = True, + ) -> None: + # instantiate exprs as a list, converting strs to ParserElements + exprs: list[ParserElement] = [ + self._literalStringClass(e) if isinstance(e, str) else e for e in exprs_arg + ] - if i < len(exprs) - 1: - skipto_arg: ParserElement = typing.cast( - ParseExpression, (Empty() + exprs[i + 1]) - ).exprs[-1] - tmp.append(SkipTo(skipto_arg)("_skipped*")) - continue + # convert any Ellipsis elements to SkipTo + if Ellipsis in exprs: + # Ellipsis cannot be the last element + if exprs[-1] is Ellipsis: raise Exception("cannot construct And with sequence ending in ...") - exprs[:] = tmp + + tmp: list[ParserElement] = [] + for cur_expr, next_expr in zip(exprs, exprs[1:]): + if cur_expr is Ellipsis: + tmp.append(SkipTo(next_expr)("_skipped*")) + else: + tmp.append(cur_expr) + + exprs[:-1] = tmp + super().__init__(exprs, savelist) if self.exprs: - self.mayReturnEmpty = all(e.mayReturnEmpty for e in self.exprs) + self._may_return_empty = all(e.mayReturnEmpty for e in self.exprs) if not isinstance(self.exprs[0], White): self.set_whitespace_chars( self.exprs[0].whiteChars, @@ -4067,7 +4116,7 @@ class And(ParseExpression): else: self.skipWhitespace = False else: - self.mayReturnEmpty = True + self._may_return_empty = True self.callPreparse = True def streamline(self) -> ParserElement: @@ -4117,7 +4166,7 @@ class And(ParseExpression): break cur = typing.cast(ParserElement, next_first) - self.mayReturnEmpty = all(e.mayReturnEmpty for e in self.exprs) + self._may_return_empty = all(e.mayReturnEmpty for e in self.exprs) return self def parseImpl(self, instring, loc, do_actions=True): @@ -4189,18 +4238,20 @@ class Or(ParseExpression): [['123'], ['3.1416'], ['789']] """ - def __init__(self, exprs: typing.Iterable[ParserElement], savelist: bool = False): + def __init__( + self, exprs: typing.Iterable[ParserElement], savelist: bool = False + ) -> None: super().__init__(exprs, savelist) if self.exprs: - self.mayReturnEmpty = any(e.mayReturnEmpty for e in self.exprs) + self._may_return_empty = any(e.mayReturnEmpty for e in self.exprs) self.skipWhitespace = all(e.skipWhitespace for e in self.exprs) else: - self.mayReturnEmpty = True + self._may_return_empty = True def streamline(self) -> ParserElement: super().streamline() if self.exprs: - self.mayReturnEmpty = any(e.mayReturnEmpty for e in self.exprs) + self._may_return_empty = any(e.mayReturnEmpty for e in self.exprs) self.saveAsList = any(e.saveAsList for e in self.exprs) self.skipWhitespace = all( e.skipWhitespace and not isinstance(e, White) for e in self.exprs @@ -4286,7 +4337,8 @@ class Or(ParseExpression): if maxException is not None: # infer from this check that all alternatives failed at the current position # so emit this collective error message instead of any single error message - if maxExcLoc == loc: + parse_start_loc = self.preParse(instring, loc) + if maxExcLoc == parse_start_loc: maxException.msg = self.errmsg or "" raise maxException @@ -4344,13 +4396,15 @@ class MatchFirst(ParseExpression): print(number.search_string("123 3.1416 789")) # Better -> [['123'], ['3.1416'], ['789']] """ - def __init__(self, exprs: typing.Iterable[ParserElement], savelist: bool = False): + def __init__( + self, exprs: typing.Iterable[ParserElement], savelist: bool = False + ) -> None: super().__init__(exprs, savelist) if self.exprs: - self.mayReturnEmpty = any(e.mayReturnEmpty for e in self.exprs) + self._may_return_empty = any(e.mayReturnEmpty for e in self.exprs) self.skipWhitespace = all(e.skipWhitespace for e in self.exprs) else: - self.mayReturnEmpty = True + self._may_return_empty = True def streamline(self) -> ParserElement: if self.streamlined: @@ -4359,13 +4413,13 @@ class MatchFirst(ParseExpression): super().streamline() if self.exprs: self.saveAsList = any(e.saveAsList for e in self.exprs) - self.mayReturnEmpty = any(e.mayReturnEmpty for e in self.exprs) + self._may_return_empty = any(e.mayReturnEmpty for e in self.exprs) self.skipWhitespace = all( e.skipWhitespace and not isinstance(e, White) for e in self.exprs ) else: self.saveAsList = False - self.mayReturnEmpty = True + self._may_return_empty = True return self def parseImpl(self, instring, loc, do_actions=True) -> ParseImplReturnType: @@ -4393,7 +4447,8 @@ class MatchFirst(ParseExpression): if maxException is not None: # infer from this check that all alternatives failed at the current position # so emit this collective error message instead of any individual error message - if maxExcLoc == loc: + parse_start_loc = self.preParse(instring, loc) + if maxExcLoc == parse_start_loc: maxException.msg = self.errmsg or "" raise maxException @@ -4491,12 +4546,14 @@ class Each(ParseExpression): - size: 20 """ - def __init__(self, exprs: typing.Iterable[ParserElement], savelist: bool = True): + def __init__( + self, exprs: typing.Iterable[ParserElement], savelist: bool = True + ) -> None: super().__init__(exprs, savelist) if self.exprs: - self.mayReturnEmpty = all(e.mayReturnEmpty for e in self.exprs) + self._may_return_empty = all(e.mayReturnEmpty for e in self.exprs) else: - self.mayReturnEmpty = True + self._may_return_empty = True self.skipWhitespace = True self.initExprGroups = True self.saveAsList = True @@ -4511,9 +4568,9 @@ class Each(ParseExpression): def streamline(self) -> ParserElement: super().streamline() if self.exprs: - self.mayReturnEmpty = all(e.mayReturnEmpty for e in self.exprs) + self._may_return_empty = all(e.mayReturnEmpty for e in self.exprs) else: - self.mayReturnEmpty = True + self._may_return_empty = True return self def parseImpl(self, instring, loc, do_actions=True) -> ParseImplReturnType: @@ -4612,7 +4669,7 @@ class ParseElementEnhance(ParserElement): post-processing parsed tokens. """ - def __init__(self, expr: Union[ParserElement, str], savelist: bool = False): + def __init__(self, expr: Union[ParserElement, str], savelist: bool = False) -> None: super().__init__(savelist) if isinstance(expr, str_type): expr_str = typing.cast(str, expr) @@ -4626,7 +4683,7 @@ class ParseElementEnhance(ParserElement): self.expr = expr if expr is not None: self.mayIndexError = expr.mayIndexError - self.mayReturnEmpty = expr.mayReturnEmpty + self._may_return_empty = expr.mayReturnEmpty self.set_whitespace_chars( expr.whiteChars, copy_defaults=expr.copyDefaultWhiteChars ) @@ -4724,20 +4781,20 @@ class IndentedBlock(ParseElementEnhance): """ class _Indent(Empty): - def __init__(self, ref_col: int): + def __init__(self, ref_col: int) -> None: super().__init__() self.errmsg = f"expected indent at column {ref_col}" self.add_condition(lambda s, l, t: col(l, s) == ref_col) class _IndentGreater(Empty): - def __init__(self, ref_col: int): + def __init__(self, ref_col: int) -> None: super().__init__() self.errmsg = f"expected indent at column greater than {ref_col}" self.add_condition(lambda s, l, t: col(l, s) > ref_col) def __init__( self, expr: ParserElement, *, recursive: bool = False, grouped: bool = True - ): + ) -> None: super().__init__(expr, savelist=True) # if recursive: # raise NotImplementedError("IndentedBlock with recursive is not implemented") @@ -4792,7 +4849,7 @@ class AtStringStart(ParseElementEnhance): # raises ParseException """ - def __init__(self, expr: Union[ParserElement, str]): + def __init__(self, expr: Union[ParserElement, str]) -> None: super().__init__(expr) self.callPreparse = False @@ -4825,7 +4882,7 @@ class AtLineStart(ParseElementEnhance): """ - def __init__(self, expr: Union[ParserElement, str]): + def __init__(self, expr: Union[ParserElement, str]) -> None: super().__init__(expr) self.callPreparse = False @@ -4858,9 +4915,9 @@ class FollowedBy(ParseElementEnhance): [['shape', 'SQUARE'], ['color', 'BLACK'], ['posn', 'upper left']] """ - def __init__(self, expr: Union[ParserElement, str]): + def __init__(self, expr: Union[ParserElement, str]) -> None: super().__init__(expr) - self.mayReturnEmpty = True + self._may_return_empty = True def parseImpl(self, instring, loc, do_actions=True) -> ParseImplReturnType: # by using self._expr.parse and deleting the contents of the returned ParseResults list @@ -4901,10 +4958,10 @@ class PrecededBy(ParseElementEnhance): """ - def __init__(self, expr: Union[ParserElement, str], retreat: int = 0): + def __init__(self, expr: Union[ParserElement, str], retreat: int = 0) -> None: super().__init__(expr) self.expr = self.expr().leave_whitespace() - self.mayReturnEmpty = True + self._may_return_empty = True self.mayIndexError = False self.exact = False if isinstance(expr, str_type): @@ -5019,13 +5076,13 @@ class NotAny(ParseElementEnhance): integer = Word(nums) + ~Char(".") """ - def __init__(self, expr: Union[ParserElement, str]): + def __init__(self, expr: Union[ParserElement, str]) -> None: super().__init__(expr) # do NOT use self.leave_whitespace(), don't want to propagate to exprs # self.leave_whitespace() self.skipWhitespace = False - self.mayReturnEmpty = True + self._may_return_empty = True self.errmsg = f"Found unwanted token, {self.expr}" def parseImpl(self, instring, loc, do_actions=True) -> ParseImplReturnType: @@ -5044,7 +5101,7 @@ class _MultipleMatch(ParseElementEnhance): stop_on: typing.Optional[Union[ParserElement, str]] = None, *, stopOn: typing.Optional[Union[ParserElement, str]] = None, - ): + ) -> None: super().__init__(expr) stopOn = stopOn or stop_on self.saveAsList = True @@ -5062,9 +5119,10 @@ class _MultipleMatch(ParseElementEnhance): def parseImpl(self, instring, loc, do_actions=True) -> ParseImplReturnType: self_expr_parse = self.expr._parse self_skip_ignorables = self._skipIgnorables - check_ender = self.not_ender is not None - if check_ender: + check_ender = False + if self.not_ender is not None: try_not_ender = self.not_ender.try_parse + check_ender = True # must be at least one (but first see if we are the stopOn sentinel; # if so, fail) @@ -5165,9 +5223,9 @@ class ZeroOrMore(_MultipleMatch): stop_on: typing.Optional[Union[ParserElement, str]] = None, *, stopOn: typing.Optional[Union[ParserElement, str]] = None, - ): + ) -> None: super().__init__(expr, stopOn=stopOn or stop_on) - self.mayReturnEmpty = True + self._may_return_empty = True def parseImpl(self, instring, loc, do_actions=True) -> ParseImplReturnType: try: @@ -5189,7 +5247,7 @@ class DelimitedList(ParseElementEnhance): max: typing.Optional[int] = None, *, allow_trailing_delim: bool = False, - ): + ) -> None: """Helper to define a delimited list of expressions - the delimiter defaults to ','. By default, the list elements and delimiters can have intervening whitespace, and comments, but this can be @@ -5296,11 +5354,11 @@ class Opt(ParseElementEnhance): def __init__( self, expr: Union[ParserElement, str], default: Any = __optionalNotMatched - ): + ) -> None: super().__init__(expr, savelist=False) self.saveAsList = self.expr.saveAsList self.defaultValue = default - self.mayReturnEmpty = True + self._may_return_empty = True def parseImpl(self, instring, loc, do_actions=True) -> ParseImplReturnType: self_expr = self.expr @@ -5401,11 +5459,11 @@ class SkipTo(ParseElementEnhance): fail_on: typing.Optional[Union[ParserElement, str]] = None, *, failOn: typing.Optional[Union[ParserElement, str]] = None, - ): + ) -> None: super().__init__(other) failOn = failOn or fail_on self.ignoreExpr = ignore - self.mayReturnEmpty = True + self._may_return_empty = True self.mayIndexError = False self.includeMatch = include self.saveAsList = False @@ -5512,7 +5570,9 @@ class Forward(ParseElementEnhance): parser created using ``Forward``. """ - def __init__(self, other: typing.Optional[Union[ParserElement, str]] = None): + def __init__( + self, other: typing.Optional[Union[ParserElement, str]] = None + ) -> None: self.caller_frame = traceback.extract_stack(limit=2)[0] super().__init__(other, savelist=False) # type: ignore[arg-type] self.lshift_line = None @@ -5529,7 +5589,7 @@ class Forward(ParseElementEnhance): self.expr = other self.streamlined = other.streamlined self.mayIndexError = self.expr.mayIndexError - self.mayReturnEmpty = self.expr.mayReturnEmpty + self._may_return_empty = self.expr.mayReturnEmpty self.set_whitespace_chars( self.expr.whiteChars, copy_defaults=self.expr.copyDefaultWhiteChars ) @@ -5648,7 +5708,7 @@ class Forward(ParseElementEnhance): try: new_loc, new_peek = super().parseImpl(instring, loc, False) except ParseException: - # we failed before getting any match – do not hide the error + # we failed before getting any match - do not hide the error if isinstance(prev_peek, Exception): raise new_loc, new_peek = prev_loc, prev_peek @@ -5703,17 +5763,20 @@ class Forward(ParseElementEnhance): def _generateDefaultName(self) -> str: # Avoid infinite recursion by setting a temporary _defaultName + save_default_name = self._defaultName self._defaultName = ": ..." # Use the string representation of main expression. - retString = "..." try: if self.expr is not None: - retString = str(self.expr)[:1000] + ret_string = str(self.expr)[:1000] else: - retString = "None" - finally: - return f"{type(self).__name__}: {retString}" + ret_string = "None" + except Exception: + ret_string = "..." + + self._defaultName = save_default_name + return f"{type(self).__name__}: {ret_string}" def copy(self) -> ParserElement: if self.expr is not None: @@ -5752,7 +5815,7 @@ class TokenConverter(ParseElementEnhance): Abstract subclass of :class:`ParseElementEnhance`, for converting parsed results. """ - def __init__(self, expr: Union[ParserElement, str], savelist=False): + def __init__(self, expr: Union[ParserElement, str], savelist=False) -> None: super().__init__(expr) # , savelist) self.saveAsList = False @@ -5783,7 +5846,7 @@ class Combine(TokenConverter): adjacent: bool = True, *, joinString: typing.Optional[str] = None, - ): + ) -> None: super().__init__(expr) joinString = joinString if joinString is not None else join_string # suppress whitespace-stripping in contained parse expressions, but re-enable it on the Combine itself @@ -5835,7 +5898,7 @@ class Group(TokenConverter): # -> ['fn', ['a', 'b', '100']] """ - def __init__(self, expr: ParserElement, aslist: bool = False): + def __init__(self, expr: ParserElement, aslist: bool = False) -> None: super().__init__(expr) self.saveAsList = True self._asPythonList = aslist @@ -5893,7 +5956,7 @@ class Dict(TokenConverter): See more examples at :class:`ParseResults` of accessing fields by results name. """ - def __init__(self, expr: ParserElement, asdict: bool = False): + def __init__(self, expr: ParserElement, asdict: bool = False) -> None: super().__init__(expr) self.saveAsList = True self._asPythonDict = asdict @@ -5969,7 +6032,7 @@ class Suppress(TokenConverter): (See also :class:`DelimitedList`.) """ - def __init__(self, expr: Union[ParserElement, str], savelist: bool = False): + def __init__(self, expr: Union[ParserElement, str], savelist: bool = False) -> None: if expr is ...: expr = _PendingSkip(NoMatch()) super().__init__(expr) @@ -6094,13 +6157,17 @@ def srange(s: str) -> str: - any combination of the above (``'aeiouy'``, ``'a-zA-Z0-9_$'``, etc.) """ - _expanded = lambda p: ( - p - if not isinstance(p, ParseResults) - else "".join(chr(c) for c in range(ord(p[0]), ord(p[1]) + 1)) - ) + + def _expanded(p): + if isinstance(p, ParseResults): + yield from (chr(c) for c in range(ord(p[0]), ord(p[1]) + 1)) + else: + yield p + try: - return "".join(_expanded(part) for part in _reBracketExpr.parse_string(s).body) + return "".join( + [c for part in _reBracketExpr.parse_string(s).body for c in _expanded(part)] + ) except Exception as e: return "" @@ -6156,11 +6223,17 @@ def autoname_elements() -> None: Utility to simplify mass-naming of parser elements, for generating railroad diagram with named subdiagrams. """ - calling_frame = sys._getframe(1) + + # guard against _getframe not being implemented in the current Python + getframe_fn = getattr(sys, "_getframe", lambda _: None) + calling_frame = getframe_fn(1) if calling_frame is None: return + + # find all locals in the calling frame that are ParserElements calling_frame = typing.cast(types.FrameType, calling_frame) for name, var in calling_frame.f_locals.items(): + # if no custom name defined, set the name to the var name if isinstance(var, ParserElement) and not var.customName: var.set_name(name) diff --git a/contrib/python/pyparsing/py3/pyparsing/diagram/__init__.py b/contrib/python/pyparsing/py3/pyparsing/diagram/__init__.py index 56526b741b..526cf3862a 100644 --- a/contrib/python/pyparsing/py3/pyparsing/diagram/__init__.py +++ b/contrib/python/pyparsing/py3/pyparsing/diagram/__init__.py @@ -120,7 +120,7 @@ class EachItem(railroad.Group): all_label = "[ALL]" - def __init__(self, *items): + def __init__(self, *items) -> None: choice_item = railroad.Choice(len(items) - 1, *items) one_or_more_item = railroad.OneOrMore(item=choice_item) super().__init__(one_or_more_item, label=self.all_label) @@ -131,7 +131,7 @@ class AnnotatedItem(railroad.Group): Simple subclass of Group that creates an annotation label """ - def __init__(self, label: str, item): + def __init__(self, label: str, item) -> None: super().__init__(item=item, label=f"[{label}]" if label else "") @@ -144,7 +144,7 @@ class EditablePartial(Generic[T]): # We need this here because the railroad constructors actually transform the data, so can't be called until the # entire tree is assembled - def __init__(self, func: Callable[..., T], args: list, kwargs: dict): + def __init__(self, func: Callable[..., T], args: list, kwargs: dict) -> None: self.func = func self.args = args self.kwargs = kwargs @@ -226,6 +226,7 @@ def to_railroad( vertical: int = 3, show_results_names: bool = False, show_groups: bool = False, + show_hidden: bool = False, ) -> list[NamedDiagram]: """ Convert a pyparsing element tree into a list of diagrams. This is the recommended entrypoint to diagram @@ -238,6 +239,8 @@ def to_railroad( included in the diagram :param show_groups - bool to indicate whether groups should be highlighted with an unlabeled surrounding box + :param show_hidden - bool to indicate whether internal elements that are typically hidden + should be shown """ # Convert the whole tree underneath the root lookup = ConverterState(diagram_kwargs=diagram_kwargs or {}) @@ -248,6 +251,7 @@ def to_railroad( vertical=vertical, show_results_names=show_results_names, show_groups=show_groups, + show_hidden=show_hidden, ) root_id = id(element) @@ -348,7 +352,7 @@ class ConverterState: Stores some state that persists between recursions into the element tree """ - def __init__(self, diagram_kwargs: typing.Optional[dict] = None): + def __init__(self, diagram_kwargs: typing.Optional[dict] = None) -> None: #: A dictionary mapping ParserElements to state relating to them self._element_diagram_states: dict[int, ElementState] = {} #: A dictionary mapping ParserElement IDs to subdiagrams generated from them @@ -453,6 +457,7 @@ def _apply_diagram_item_enhancements(fn): name_hint: str = None, show_results_names: bool = False, show_groups: bool = False, + show_hidden: bool = False, ) -> typing.Optional[EditablePartial]: ret = fn( element, @@ -463,6 +468,7 @@ def _apply_diagram_item_enhancements(fn): name_hint, show_results_names, show_groups, + show_hidden, ) # apply annotation for results name, if present @@ -555,6 +561,7 @@ def _to_diagram_element( name_hint=propagated_name, show_results_names=show_results_names, show_groups=show_groups, + show_hidden=show_hidden, ) # If the element isn't worth extracting, we always treat it as the first time we say it @@ -641,6 +648,7 @@ def _to_diagram_element( name_hint, show_results_names, show_groups, + show_hidden, ] return _to_diagram_element( (~element.not_ender.expr + element.expr)[1, ...].set_name(element.name), @@ -657,6 +665,7 @@ def _to_diagram_element( name_hint, show_results_names, show_groups, + show_hidden, ] return _to_diagram_element( (~element.not_ender.expr + element.expr)[...].set_name(element.name), @@ -707,6 +716,7 @@ def _to_diagram_element( index=i, show_results_names=show_results_names, show_groups=show_groups, + show_hidden=show_hidden, ) # Some elements don't need to be shown in the diagram diff --git a/contrib/python/pyparsing/py3/pyparsing/exceptions.py b/contrib/python/pyparsing/py3/pyparsing/exceptions.py index 57a1579d12..fe07a85585 100644 --- a/contrib/python/pyparsing/py3/pyparsing/exceptions.py +++ b/contrib/python/pyparsing/py3/pyparsing/exceptions.py @@ -52,7 +52,7 @@ class ParseBaseException(Exception): loc: int = 0, msg: typing.Optional[str] = None, elem=None, - ): + ) -> None: if msg is None: msg, pstr = pstr, "" @@ -87,7 +87,7 @@ class ParseBaseException(Exception): ret: list[str] = [] if isinstance(exc, ParseBaseException): ret.append(exc.line) - ret.append(f"{' ' * (exc.column - 1)}^") + ret.append(f"{'^':>{exc.column}}") ret.append(f"{type(exc).__name__}: {exc}") if depth <= 0 or exc.__traceback__ is None: @@ -272,12 +272,11 @@ class ParseException(ParseBaseException): try: integer.parse_string("ABC") except ParseException as pe: - print(pe) - print(f"column: {pe.column}") + print(pe, f"column: {pe.column}") prints:: - Expected integer (at char 0), (line:1, col:1) column: 1 + Expected integer, found 'ABC' (at char 0), (line:1, col:1) column: 1 """ @@ -307,7 +306,7 @@ class RecursiveGrammarException(Exception): Deprecated: only used by deprecated method ParserElement.validate. """ - def __init__(self, parseElementList): + def __init__(self, parseElementList) -> None: self.parseElementTrace = parseElementList def __str__(self) -> str: diff --git a/contrib/python/pyparsing/py3/pyparsing/helpers.py b/contrib/python/pyparsing/py3/pyparsing/helpers.py index f781e87132..7f62df8637 100644 --- a/contrib/python/pyparsing/py3/pyparsing/helpers.py +++ b/contrib/python/pyparsing/py3/pyparsing/helpers.py @@ -208,11 +208,9 @@ def one_of( if caseless: is_equal = lambda a, b: a.upper() == b.upper() masks = lambda a, b: b.upper().startswith(a.upper()) - parse_element_class = CaselessKeyword if asKeyword else CaselessLiteral else: is_equal = operator.eq masks = lambda a, b: b.startswith(a) - parse_element_class = Keyword if asKeyword else Literal symbols: list[str] if isinstance(strs, str_type): @@ -255,7 +253,8 @@ def one_of( if asKeyword: patt = rf"\b(?:{patt})\b" - ret = Regex(patt, flags=re_flags).set_name(" | ".join(symbols)) + ret = Regex(patt, flags=re_flags) + ret.set_name(" | ".join(re.escape(s) for s in symbols)) if caseless: # add parse action to return symbols as specified, not in random @@ -270,13 +269,21 @@ def one_of( "Exception creating Regex for one_of, building MatchFirst", stacklevel=2 ) - # last resort, just use MatchFirst + # last resort, just use MatchFirst of Token class corresponding to caseless + # and asKeyword settings + CASELESS = KEYWORD = True + parse_element_class = { + (CASELESS, KEYWORD): CaselessKeyword, + (CASELESS, not KEYWORD): CaselessLiteral, + (not CASELESS, KEYWORD): Keyword, + (not CASELESS, not KEYWORD): Literal, + }[(caseless, asKeyword)] return MatchFirst(parse_element_class(sym) for sym in symbols).set_name( " | ".join(symbols) ) -def dict_of(key: ParserElement, value: ParserElement) -> ParserElement: +def dict_of(key: ParserElement, value: ParserElement) -> Dict: """Helper to easily and clearly define a dictionary by specifying the respective patterns for the key and value. Takes care of defining the :class:`Dict`, :class:`ZeroOrMore`, and @@ -411,13 +418,16 @@ def locatedExpr(expr: ParserElement) -> ParserElement: ) +_NO_IGNORE_EXPR_GIVEN = NoMatch() + + def nested_expr( opener: Union[str, ParserElement] = "(", closer: Union[str, ParserElement] = ")", content: typing.Optional[ParserElement] = None, - ignore_expr: ParserElement = quoted_string(), + ignore_expr: ParserElement = _NO_IGNORE_EXPR_GIVEN, *, - ignoreExpr: ParserElement = quoted_string(), + ignoreExpr: ParserElement = _NO_IGNORE_EXPR_GIVEN, ) -> ParserElement: """Helper method for defining nested lists enclosed in opening and closing delimiters (``"("`` and ``")"`` are the default). @@ -487,7 +497,10 @@ def nested_expr( dec_to_hex (int) args: [['char', 'hchar']] """ if ignoreExpr != ignore_expr: - ignoreExpr = ignore_expr if ignoreExpr == quoted_string() else ignoreExpr + ignoreExpr = ignore_expr if ignoreExpr is _NO_IGNORE_EXPR_GIVEN else ignoreExpr + if ignoreExpr is _NO_IGNORE_EXPR_GIVEN: + ignoreExpr = quoted_string() + if opener == closer: raise ValueError("opening and closing strings cannot be the same") if content is None: @@ -504,11 +517,11 @@ def nested_expr( exact=1, ) ) - ).set_parse_action(lambda t: t[0].strip()) + ) else: content = empty.copy() + CharsNotIn( opener + closer + ParserElement.DEFAULT_WHITE_CHARS - ).set_parse_action(lambda t: t[0].strip()) + ) else: if ignoreExpr is not None: content = Combine( @@ -518,7 +531,7 @@ def nested_expr( + ~Literal(closer) + CharsNotIn(ParserElement.DEFAULT_WHITE_CHARS, exact=1) ) - ).set_parse_action(lambda t: t[0].strip()) + ) else: content = Combine( OneOrMore( @@ -526,11 +539,16 @@ def nested_expr( + ~Literal(closer) + CharsNotIn(ParserElement.DEFAULT_WHITE_CHARS, exact=1) ) - ).set_parse_action(lambda t: t[0].strip()) + ) else: raise ValueError( "opening and closing arguments must be strings if no content expression is given" ) + if ParserElement.DEFAULT_WHITE_CHARS: + content.set_parse_action( + lambda t: t[0].strip(ParserElement.DEFAULT_WHITE_CHARS) + ) + ret = Forward() if ignoreExpr is not None: ret <<= Group( @@ -691,7 +709,7 @@ def infix_notation( op_list: list[InfixNotationOperatorSpec], lpar: Union[str, ParserElement] = Suppress("("), rpar: Union[str, ParserElement] = Suppress(")"), -) -> ParserElement: +) -> Forward: """Helper method for constructing grammars of expressions made up of operators working in a precedence hierarchy. Operators may be unary or binary, left- or right-associative. Parse actions can also be diff --git a/contrib/python/pyparsing/py3/pyparsing/results.py b/contrib/python/pyparsing/py3/pyparsing/results.py index be834b7e60..956230352c 100644 --- a/contrib/python/pyparsing/py3/pyparsing/results.py +++ b/contrib/python/pyparsing/py3/pyparsing/results.py @@ -23,7 +23,7 @@ class _ParseResultsWithOffset: tup: tuple[ParseResults, int] __slots__ = ["tup"] - def __init__(self, p1: ParseResults, p2: int): + def __init__(self, p1: ParseResults, p2: int) -> None: self.tup: tuple[ParseResults, int] = (p1, p2) def __getitem__(self, i): diff --git a/contrib/python/pyparsing/py3/pyparsing/tools/__init__.py b/contrib/python/pyparsing/py3/pyparsing/tools/__init__.py new file mode 100644 index 0000000000..e69de29bb2 --- /dev/null +++ b/contrib/python/pyparsing/py3/pyparsing/tools/__init__.py diff --git a/contrib/python/pyparsing/py3/pyparsing/tools/cvt_pyparsing_pep8_names.py b/contrib/python/pyparsing/py3/pyparsing/tools/cvt_pyparsing_pep8_names.py new file mode 100644 index 0000000000..f4a8bd9f51 --- /dev/null +++ b/contrib/python/pyparsing/py3/pyparsing/tools/cvt_pyparsing_pep8_names.py @@ -0,0 +1,116 @@ +from functools import lru_cache +import pyparsing as pp + + +@lru_cache(maxsize=None) +def camel_to_snake(s: str) -> str: + """ + Convert CamelCase to snake_case. + """ + return "".join("_" + c.lower() if c.isupper() else c for c in s).lstrip("_") + + +pre_pep8_method_names = """ +addCondition addParseAction anyCloseTag anyOpenTag asDict asList cStyleComment canParseNext conditionAsParseAction +convertToDate convertToDatetime convertToFloat convertToInteger countedArray cppStyleComment dblQuotedString +dblSlashComment defaultName dictOf disableMemoization downcaseTokens enableLeftRecursion enablePackrat getName +htmlComment ignoreWhitespace indentedBlock infixNotation inlineLiteralsUsing javaStyleComment leaveWhitespace +lineEnd lineStart locatedExpr matchOnlyAtCol matchPreviousExpr matchPreviousLiteral nestedExpr nullDebugAction oneOf +originalTextFor parseFile parseString parseWithTabs pythonStyleComment quotedString removeQuotes replaceWith +resetCache restOfLine runTests scanString searchString setBreak setDebug setDebugActions setDefaultWhitespaceChars +setFailAction setName setParseAction setResultsName setWhitespaceChars sglQuotedString stringEnd stringStart tokenMap +traceParseAction transformString tryParse unicodeString upcaseTokens withAttribute withClass +""".split() + +special_changes = { + "opAssoc": "OpAssoc", + "delimitedList": "DelimitedList", + "delimited_list": "DelimitedList", + "replaceHTMLEntity": "replace_html_entity", + "makeHTMLTags": "make_html_tags", + "makeXMLTags": "make_xml_tags", + "commonHTMLEntity": "common_html_entity", + "stripHTMLTags": "strip_html_tags", +} + +pre_pep8_arg_names = """parseAll maxMatches listAllMatches callDuringTry includeSeparators fullDump printResults +failureTests postParse matchString identChars maxMismatches initChars bodyChars asKeyword excludeChars asGroupList +asMatch quoteChar escChar escQuote unquoteResults endQuoteChar convertWhitespaceEscapes notChars wordChars stopOn +failOn joinString markerString intExpr useRegex asString ignoreExpr""".split() + +pre_pep8_method_name = pp.one_of(pre_pep8_method_names, as_keyword=True) +pre_pep8_method_name.set_parse_action(lambda t: camel_to_snake(t[0])) +special_pre_pep8_name = pp.one_of(special_changes, as_keyword=True) +special_pre_pep8_name.set_parse_action(lambda t: special_changes[t[0]]) +# only replace arg names if part of an arg list +pre_pep8_arg_name = pp.Regex( + rf"{pp.util.make_compressed_re(pre_pep8_arg_names)}\s*=" +) +pre_pep8_arg_name.set_parse_action(lambda t: camel_to_snake(t[0])) + +pep8_converter = pre_pep8_method_name | special_pre_pep8_name | pre_pep8_arg_name + +if __name__ == "__main__": + import argparse + from pathlib import Path + import sys + + argparser = argparse.ArgumentParser( + description = ( + "Utility to convert Python pyparsing scripts using legacy" + " camelCase names to use PEP8 snake_case names." + "\nBy default, this script will only show whether this script would make any changes." + ) + ) + argparser.add_argument("--verbose", "-v", action="store_true", help="Show unified diff for each source file") + argparser.add_argument("-vv", action="store_true", dest="verbose2", help="Show unified diff for each source file, plus names of scanned files with no changes") + argparser.add_argument("--update", "-u", action="store_true", help="Update source files in-place") + argparser.add_argument("--encoding", type=str, default="utf-8", help="Encoding of source files (default: utf-8)") + argparser.add_argument("--exit-zero-even-if-changed", "-exit0", action="store_true", help="Exit with status code 0 even if changes were made") + argparser.add_argument("source_filename", nargs="+", help="Source filenames or filename patterns of Python files to be converted") + args = argparser.parse_args() + + + def show_diffs(original, modified): + import difflib + + diff = difflib.unified_diff( + original.splitlines(), modified.splitlines(), lineterm="" + ) + sys.stdout.writelines(f"{diff_line}\n" for diff_line in diff) + + exit_status = 0 + + for filename_pattern in args.source_filename: + + for filename in Path().glob(filename_pattern): + if not Path(filename).is_file(): + continue + + try: + original_contents = Path(filename).read_text(encoding=args.encoding) + modified_contents = pep8_converter.transform_string( + original_contents + ) + + if modified_contents != original_contents: + if args.update: + Path(filename).write_text(modified_contents, encoding=args.encoding) + print(f"Converted {filename}") + else: + print(f"Found required changes in {filename}") + + if args.verbose: + show_diffs(original_contents, modified_contents) + print() + + exit_status = 1 + + else: + if args.verbose2: + print(f"No required changes in {filename}") + + except Exception as e: + print(f"Failed to convert {filename}: {type(e).__name__}: {e}") + + sys.exit(exit_status if not args.exit_zero_even_if_changed else 0) diff --git a/contrib/python/pyparsing/py3/pyparsing/util.py b/contrib/python/pyparsing/py3/pyparsing/util.py index 03a60d4fdd..1cb16e2e62 100644 --- a/contrib/python/pyparsing/py3/pyparsing/util.py +++ b/contrib/python/pyparsing/py3/pyparsing/util.py @@ -1,5 +1,6 @@ # util.py import contextlib +import re from functools import lru_cache, wraps import inspect import itertools @@ -193,7 +194,7 @@ class _GroupConsecutive: (3, iter(['p', 'q', 'r', 's'])) """ - def __init__(self): + def __init__(self) -> None: self.prev = 0 self.counter = itertools.count() self.value = -1 @@ -303,7 +304,11 @@ def _flatten(ll: Iterable) -> list: def make_compressed_re( - word_list: Iterable[str], max_level: int = 2, _level: int = 1 + word_list: Iterable[str], + max_level: int = 2, + *, + non_capturing_groups: bool = True, + _level: int = 1, ) -> str: """ Create a regular expression string from a list of words, collapsing by common @@ -320,15 +325,38 @@ def make_compressed_re( else: yield namelist[0][0], [namelist[0][1:]] + if _level == 1: + if not word_list: + raise ValueError("no words given to make_compressed_re()") + + if "" in word_list: + raise ValueError("word list cannot contain empty string") + else: + # internal recursive call, just return empty string if no words + if not word_list: + return "" + + # dedupe the word list + word_list = list({}.fromkeys(word_list)) + if max_level == 0: - return "|".join(sorted(word_list, key=len, reverse=True)) + if any(len(wd) > 1 for wd in word_list): + return "|".join( + sorted([re.escape(wd) for wd in word_list], key=len, reverse=True) + ) + else: + return f"[{''.join(_escape_regex_range_chars(wd) for wd in word_list)}]" ret = [] sep = "" + ncgroup = "?:" if non_capturing_groups else "" + for initial, suffixes in get_suffixes_from_common_prefixes(sorted(word_list)): ret.append(sep) sep = "|" + initial = re.escape(initial) + trailing = "" if "" in suffixes: trailing = "?" @@ -336,21 +364,33 @@ def make_compressed_re( if len(suffixes) > 1: if all(len(s) == 1 for s in suffixes): - ret.append(f"{initial}[{''.join(suffixes)}]{trailing}") + ret.append( + f"{initial}[{''.join(_escape_regex_range_chars(s) for s in suffixes)}]{trailing}" + ) else: if _level < max_level: suffix_re = make_compressed_re( - sorted(suffixes), max_level, _level + 1 + sorted(suffixes), + max_level, + non_capturing_groups=non_capturing_groups, + _level=_level + 1, ) - ret.append(f"{initial}({suffix_re}){trailing}") + ret.append(f"{initial}({ncgroup}{suffix_re}){trailing}") else: - suffixes.sort(key=len, reverse=True) - ret.append(f"{initial}({'|'.join(suffixes)}){trailing}") + if all(len(s) == 1 for s in suffixes): + ret.append( + f"{initial}[{''.join(_escape_regex_range_chars(s) for s in suffixes)}]{trailing}" + ) + else: + suffixes.sort(key=len, reverse=True) + ret.append( + f"{initial}({ncgroup}{'|'.join(re.escape(s) for s in suffixes)}){trailing}" + ) else: if suffixes: - suffix = suffixes[0] + suffix = re.escape(suffixes[0]) if len(suffix) > 1 and trailing: - ret.append(f"{initial}({suffix}){trailing}") + ret.append(f"{initial}({ncgroup}{suffix}){trailing}") else: ret.append(f"{initial}{suffix}{trailing}") else: diff --git a/contrib/python/pyparsing/py3/ya.make b/contrib/python/pyparsing/py3/ya.make index e229986ca6..a53ebf37ec 100644 --- a/contrib/python/pyparsing/py3/ya.make +++ b/contrib/python/pyparsing/py3/ya.make @@ -4,7 +4,7 @@ PY3_LIBRARY() PROVIDES(pyparsing) -VERSION(3.2.1) +VERSION(3.2.2) LICENSE(MIT) @@ -25,6 +25,8 @@ PY_SRCS( pyparsing/helpers.py pyparsing/results.py pyparsing/testing.py + pyparsing/tools/__init__.py + pyparsing/tools/cvt_pyparsing_pep8_names.py pyparsing/unicode.py pyparsing/util.py ) diff --git a/contrib/python/ydb/py3/.dist-info/METADATA b/contrib/python/ydb/py3/.dist-info/METADATA index b6911ce75e..904414722e 100644 --- a/contrib/python/ydb/py3/.dist-info/METADATA +++ b/contrib/python/ydb/py3/.dist-info/METADATA @@ -1,6 +1,6 @@ Metadata-Version: 2.1 Name: ydb -Version: 3.19.3 +Version: 3.20.1 Summary: YDB Python SDK Home-page: http://github.com/ydb-platform/ydb-python-sdk Author: Yandex LLC diff --git a/contrib/python/ydb/py3/ya.make b/contrib/python/ydb/py3/ya.make index 71cfb8fa72..fbc5d148f8 100644 --- a/contrib/python/ydb/py3/ya.make +++ b/contrib/python/ydb/py3/ya.make @@ -2,7 +2,7 @@ PY3_LIBRARY() -VERSION(3.19.3) +VERSION(3.20.1) LICENSE(Apache-2.0) diff --git a/contrib/python/ydb/py3/ydb/_apis.py b/contrib/python/ydb/py3/ydb/_apis.py index fc28d0ceb2..fc6f16e287 100644 --- a/contrib/python/ydb/py3/ydb/_apis.py +++ b/contrib/python/ydb/py3/ydb/_apis.py @@ -115,6 +115,7 @@ class TopicService(object): DropTopic = "DropTopic" StreamRead = "StreamRead" StreamWrite = "StreamWrite" + UpdateOffsetsInTransaction = "UpdateOffsetsInTransaction" class QueryService(object): diff --git a/contrib/python/ydb/py3/ydb/_errors.py b/contrib/python/ydb/py3/ydb/_errors.py index 17002d2574..1e2308ef39 100644 --- a/contrib/python/ydb/py3/ydb/_errors.py +++ b/contrib/python/ydb/py3/ydb/_errors.py @@ -5,6 +5,7 @@ from . import issues _errors_retriable_fast_backoff_types = [ issues.Unavailable, + issues.ClientInternalError, ] _errors_retriable_slow_backoff_types = [ issues.Aborted, diff --git a/contrib/python/ydb/py3/ydb/_grpc/grpcwrapper/ydb_topic.py b/contrib/python/ydb/py3/ydb/_grpc/grpcwrapper/ydb_topic.py index 5b22c7cf86..0f8a0f03a7 100644 --- a/contrib/python/ydb/py3/ydb/_grpc/grpcwrapper/ydb_topic.py +++ b/contrib/python/ydb/py3/ydb/_grpc/grpcwrapper/ydb_topic.py @@ -141,6 +141,18 @@ class UpdateTokenResponse(IFromProto): ######################################################################################################################## +@dataclass +class TransactionIdentity(IToProto): + tx_id: str + session_id: str + + def to_proto(self) -> ydb_topic_pb2.TransactionIdentity: + return ydb_topic_pb2.TransactionIdentity( + id=self.tx_id, + session=self.session_id, + ) + + class StreamWriteMessage: @dataclass() class InitRequest(IToProto): @@ -199,6 +211,7 @@ class StreamWriteMessage: class WriteRequest(IToProto): messages: typing.List["StreamWriteMessage.WriteRequest.MessageData"] codec: int + tx_identity: Optional[TransactionIdentity] @dataclass class MessageData(IToProto): @@ -237,6 +250,9 @@ class StreamWriteMessage: proto = ydb_topic_pb2.StreamWriteMessage.WriteRequest() proto.codec = self.codec + if self.tx_identity is not None: + proto.tx.CopyFrom(self.tx_identity.to_proto()) + for message in self.messages: proto_mess = proto.messages.add() proto_mess.CopyFrom(message.to_proto()) @@ -297,6 +313,8 @@ class StreamWriteMessage: ) except ValueError: message_write_status = reason + elif proto_ack.HasField("written_in_tx"): + message_write_status = StreamWriteMessage.WriteResponse.WriteAck.StatusWrittenInTx() else: raise NotImplementedError("unexpected ack status") @@ -309,6 +327,9 @@ class StreamWriteMessage: class StatusWritten: offset: int + class StatusWrittenInTx: + pass + @dataclass class StatusSkipped: reason: "StreamWriteMessage.WriteResponse.WriteAck.StatusSkipped.Reason" @@ -1197,6 +1218,52 @@ class MeteringMode(int, IFromProto, IFromPublic, IToPublic): @dataclass +class UpdateOffsetsInTransactionRequest(IToProto): + tx: TransactionIdentity + topics: List[UpdateOffsetsInTransactionRequest.TopicOffsets] + consumer: str + + def to_proto(self): + return ydb_topic_pb2.UpdateOffsetsInTransactionRequest( + tx=self.tx.to_proto(), + consumer=self.consumer, + topics=list( + map( + UpdateOffsetsInTransactionRequest.TopicOffsets.to_proto, + self.topics, + ) + ), + ) + + @dataclass + class TopicOffsets(IToProto): + path: str + partitions: List[UpdateOffsetsInTransactionRequest.TopicOffsets.PartitionOffsets] + + def to_proto(self): + return ydb_topic_pb2.UpdateOffsetsInTransactionRequest.TopicOffsets( + path=self.path, + partitions=list( + map( + UpdateOffsetsInTransactionRequest.TopicOffsets.PartitionOffsets.to_proto, + self.partitions, + ) + ), + ) + + @dataclass + class PartitionOffsets(IToProto): + partition_id: int + partition_offsets: List[OffsetsRange] + + def to_proto(self) -> ydb_topic_pb2.UpdateOffsetsInTransactionRequest.TopicOffsets.PartitionOffsets: + return ydb_topic_pb2.UpdateOffsetsInTransactionRequest.TopicOffsets.PartitionOffsets( + partition_id=self.partition_id, + partition_offsets=list(map(OffsetsRange.to_proto, self.partition_offsets)), + ) + + +@dataclass class CreateTopicRequest(IToProto, IFromPublic): path: str partitioning_settings: "PartitioningSettings" diff --git a/contrib/python/ydb/py3/ydb/_topic_reader/datatypes.py b/contrib/python/ydb/py3/ydb/_topic_reader/datatypes.py index b48501aff2..74f06a086f 100644 --- a/contrib/python/ydb/py3/ydb/_topic_reader/datatypes.py +++ b/contrib/python/ydb/py3/ydb/_topic_reader/datatypes.py @@ -108,6 +108,9 @@ class PartitionSession: waiter = self._ack_waiters.popleft() waiter._finish_ok() + def _update_last_commited_offset_if_needed(self, offset: int): + self.committed_offset = max(self.committed_offset, offset) + def close(self): if self.closed: return @@ -211,3 +214,9 @@ class PublicBatch(ICommittable, ISessionAlive): self._bytes_size = self._bytes_size - new_batch._bytes_size return new_batch + + def _update_partition_offsets(self, tx, exc=None): + if exc is not None: + return + offsets = self._commit_get_offsets_range() + self._partition_session._update_last_commited_offset_if_needed(offsets.end) diff --git a/contrib/python/ydb/py3/ydb/_topic_reader/topic_reader_asyncio.py b/contrib/python/ydb/py3/ydb/_topic_reader/topic_reader_asyncio.py index 7061b4e449..87012554ef 100644 --- a/contrib/python/ydb/py3/ydb/_topic_reader/topic_reader_asyncio.py +++ b/contrib/python/ydb/py3/ydb/_topic_reader/topic_reader_asyncio.py @@ -5,7 +5,7 @@ import concurrent.futures import gzip import typing from asyncio import Task -from collections import OrderedDict +from collections import defaultdict, OrderedDict from typing import Optional, Set, Dict, Union, Callable import ydb @@ -19,17 +19,24 @@ from . import topic_reader from .._grpc.grpcwrapper.common_utils import ( IGrpcWrapperAsyncIO, SupportedDriverType, + to_thread, GrpcWrapperAsyncIO, ) from .._grpc.grpcwrapper.ydb_topic import ( StreamReadMessage, UpdateTokenRequest, UpdateTokenResponse, + UpdateOffsetsInTransactionRequest, Codec, ) from .._errors import check_retriable_error import logging +from ..query.base import TxEvent + +if typing.TYPE_CHECKING: + from ..query.transaction import BaseQueryTxContext + logger = logging.getLogger(__name__) @@ -77,7 +84,7 @@ class PublicAsyncIOReader: ): self._loop = asyncio.get_running_loop() self._closed = False - self._reconnector = ReaderReconnector(driver, settings) + self._reconnector = ReaderReconnector(driver, settings, self._loop) self._parent = _parent async def __aenter__(self): @@ -88,8 +95,12 @@ class PublicAsyncIOReader: def __del__(self): if not self._closed: - task = self._loop.create_task(self.close(flush=False)) - topic_common.wrap_set_name_for_asyncio_task(task, task_name="close reader") + try: + logger.warning("Topic reader was not closed properly. Consider using method close().") + task = self._loop.create_task(self.close(flush=False)) + topic_common.wrap_set_name_for_asyncio_task(task, task_name="close reader") + except BaseException: + logger.warning("Something went wrong during reader close in __del__") async def wait_message(self): """ @@ -112,6 +123,23 @@ class PublicAsyncIOReader: max_messages=max_messages, ) + async def receive_batch_with_tx( + self, + tx: "BaseQueryTxContext", + max_messages: typing.Union[int, None] = None, + ) -> typing.Union[datatypes.PublicBatch, None]: + """ + Get one messages batch with tx from reader. + All messages in a batch from same partition. + + use asyncio.wait_for for wait with timeout. + """ + await self._reconnector.wait_message() + return self._reconnector.receive_batch_with_tx_nowait( + tx=tx, + max_messages=max_messages, + ) + async def receive_message(self) -> typing.Optional[datatypes.PublicMessage]: """ Block until receive new message @@ -165,11 +193,18 @@ class ReaderReconnector: _state_changed: asyncio.Event _stream_reader: Optional["ReaderStream"] _first_error: asyncio.Future[YdbError] + _tx_to_batches_map: Dict[str, typing.List[datatypes.PublicBatch]] - def __init__(self, driver: Driver, settings: topic_reader.PublicReaderSettings): + def __init__( + self, + driver: Driver, + settings: topic_reader.PublicReaderSettings, + loop: Optional[asyncio.AbstractEventLoop] = None, + ): self._id = self._static_reader_reconnector_counter.inc_and_get() self._settings = settings self._driver = driver + self._loop = loop if loop is not None else asyncio.get_running_loop() self._background_tasks = set() self._state_changed = asyncio.Event() @@ -177,6 +212,8 @@ class ReaderReconnector: self._background_tasks.add(asyncio.create_task(self._connection_loop())) self._first_error = asyncio.get_running_loop().create_future() + self._tx_to_batches_map = dict() + async def _connection_loop(self): attempt = 0 while True: @@ -190,6 +227,7 @@ class ReaderReconnector: if not retry_info.is_retriable: self._set_first_error(err) return + await asyncio.sleep(retry_info.sleep_timeout_seconds) attempt += 1 @@ -222,9 +260,87 @@ class ReaderReconnector: max_messages=max_messages, ) + def receive_batch_with_tx_nowait(self, tx: "BaseQueryTxContext", max_messages: Optional[int] = None): + batch = self._stream_reader.receive_batch_nowait( + max_messages=max_messages, + ) + + self._init_tx(tx) + + self._tx_to_batches_map[tx.tx_id].append(batch) + + tx._add_callback(TxEvent.AFTER_COMMIT, batch._update_partition_offsets, self._loop) + + return batch + def receive_message_nowait(self): return self._stream_reader.receive_message_nowait() + def _init_tx(self, tx: "BaseQueryTxContext"): + if tx.tx_id not in self._tx_to_batches_map: # Init tx callbacks + self._tx_to_batches_map[tx.tx_id] = [] + tx._add_callback(TxEvent.BEFORE_COMMIT, self._commit_batches_with_tx, self._loop) + tx._add_callback(TxEvent.AFTER_COMMIT, self._handle_after_tx_commit, self._loop) + tx._add_callback(TxEvent.AFTER_ROLLBACK, self._handle_after_tx_rollback, self._loop) + + async def _commit_batches_with_tx(self, tx: "BaseQueryTxContext"): + grouped_batches = defaultdict(lambda: defaultdict(list)) + for batch in self._tx_to_batches_map[tx.tx_id]: + grouped_batches[batch._partition_session.topic_path][batch._partition_session.partition_id].append(batch) + + request = UpdateOffsetsInTransactionRequest(tx=tx._tx_identity(), consumer=self._settings.consumer, topics=[]) + + for topic_path in grouped_batches: + topic_offsets = UpdateOffsetsInTransactionRequest.TopicOffsets(path=topic_path, partitions=[]) + for partition_id in grouped_batches[topic_path]: + partition_offsets = UpdateOffsetsInTransactionRequest.TopicOffsets.PartitionOffsets( + partition_id=partition_id, + partition_offsets=[ + batch._commit_get_offsets_range() for batch in grouped_batches[topic_path][partition_id] + ], + ) + topic_offsets.partitions.append(partition_offsets) + request.topics.append(topic_offsets) + + try: + return await self._do_commit_batches_with_tx_call(request) + except BaseException: + exc = issues.ClientInternalError("Failed to update offsets in tx.") + tx._set_external_error(exc) + self._stream_reader._set_first_error(exc) + finally: + del self._tx_to_batches_map[tx.tx_id] + + async def _do_commit_batches_with_tx_call(self, request: UpdateOffsetsInTransactionRequest): + args = [ + request.to_proto(), + _apis.TopicService.Stub, + _apis.TopicService.UpdateOffsetsInTransaction, + topic_common.wrap_operation, + ] + + if asyncio.iscoroutinefunction(self._driver.__call__): + res = await self._driver(*args) + else: + res = await to_thread(self._driver, *args, executor=None) + + return res + + async def _handle_after_tx_rollback(self, tx: "BaseQueryTxContext", exc: Optional[BaseException]) -> None: + if tx.tx_id in self._tx_to_batches_map: + del self._tx_to_batches_map[tx.tx_id] + exc = issues.ClientInternalError("Reconnect due to transaction rollback") + self._stream_reader._set_first_error(exc) + + async def _handle_after_tx_commit(self, tx: "BaseQueryTxContext", exc: Optional[BaseException]) -> None: + if tx.tx_id in self._tx_to_batches_map: + del self._tx_to_batches_map[tx.tx_id] + + if exc is not None: + self._stream_reader._set_first_error( + issues.ClientInternalError("Reconnect due to transaction commit failed") + ) + def commit(self, batch: datatypes.ICommittable) -> datatypes.PartitionSession.CommitAckWaiter: return self._stream_reader.commit(batch) diff --git a/contrib/python/ydb/py3/ydb/_topic_reader/topic_reader_sync.py b/contrib/python/ydb/py3/ydb/_topic_reader/topic_reader_sync.py index eda1d374fc..31f2889927 100644 --- a/contrib/python/ydb/py3/ydb/_topic_reader/topic_reader_sync.py +++ b/contrib/python/ydb/py3/ydb/_topic_reader/topic_reader_sync.py @@ -1,5 +1,6 @@ import asyncio import concurrent.futures +import logging import typing from typing import List, Union, Optional @@ -20,6 +21,11 @@ from ydb._topic_reader.topic_reader_asyncio import ( TopicReaderClosedError, ) +if typing.TYPE_CHECKING: + from ..query.transaction import BaseQueryTxContext + +logger = logging.getLogger(__name__) + class TopicReaderSync: _caller: CallFromSyncToAsync @@ -52,7 +58,12 @@ class TopicReaderSync: self._parent = _parent def __del__(self): - self.close(flush=False) + if not self._closed: + try: + logger.warning("Topic reader was not closed properly. Consider using method close().") + self.close(flush=False) + except BaseException: + logger.warning("Something went wrong during reader close in __del__") def __enter__(self): return self @@ -109,6 +120,31 @@ class TopicReaderSync: timeout, ) + def receive_batch_with_tx( + self, + tx: "BaseQueryTxContext", + *, + max_messages: typing.Union[int, None] = None, + max_bytes: typing.Union[int, None] = None, + timeout: Union[float, None] = None, + ) -> Union[PublicBatch, None]: + """ + Get one messages batch with tx from reader + It has no async_ version for prevent lost messages, use async_wait_message as signal for new batches available. + + if no new message in timeout seconds (default - infinite): raise TimeoutError() + if timeout <= 0 - it will fast wait only one event loop cycle - without wait any i/o operations or pauses, get messages from internal buffer only. + """ + self._check_closed() + + return self._caller.safe_call_with_result( + self._async_reader.receive_batch_with_tx( + tx=tx, + max_messages=max_messages, + ), + timeout, + ) + def commit(self, mess: typing.Union[datatypes.PublicMessage, datatypes.PublicBatch]): """ Put commit message to internal buffer. diff --git a/contrib/python/ydb/py3/ydb/_topic_writer/topic_writer.py b/contrib/python/ydb/py3/ydb/_topic_writer/topic_writer.py index aa5fe9749a..a3e407ed86 100644 --- a/contrib/python/ydb/py3/ydb/_topic_writer/topic_writer.py +++ b/contrib/python/ydb/py3/ydb/_topic_writer/topic_writer.py @@ -11,6 +11,7 @@ import typing import ydb.aio from .._grpc.grpcwrapper.ydb_topic import StreamWriteMessage +from .._grpc.grpcwrapper.ydb_topic import TransactionIdentity from .._grpc.grpcwrapper.common_utils import IToProto from .._grpc.grpcwrapper.ydb_topic_public_types import PublicCodec from .. import connection @@ -53,8 +54,12 @@ class PublicWriteResult: class Skipped: pass + @dataclass(eq=True) + class WrittenInTx: + pass + -PublicWriteResultTypes = Union[PublicWriteResult.Written, PublicWriteResult.Skipped] +PublicWriteResultTypes = Union[PublicWriteResult.Written, PublicWriteResult.Skipped, PublicWriteResult.WrittenInTx] class WriterSettings(PublicWriterSettings): @@ -205,6 +210,7 @@ def default_serializer_message_content(data: Any) -> bytes: def messages_to_proto_requests( messages: List[InternalMessage], + tx_identity: Optional[TransactionIdentity], ) -> List[StreamWriteMessage.FromClient]: gropus = _slit_messages_for_send(messages) @@ -215,6 +221,7 @@ def messages_to_proto_requests( StreamWriteMessage.WriteRequest( messages=list(map(InternalMessage.to_message_data, group)), codec=group[0].codec, + tx_identity=tx_identity, ) ) res.append(req) @@ -239,6 +246,7 @@ _message_data_overhead = ( ), ], codec=20000, + tx_identity=None, ) ) .to_proto() diff --git a/contrib/python/ydb/py3/ydb/_topic_writer/topic_writer_asyncio.py b/contrib/python/ydb/py3/ydb/_topic_writer/topic_writer_asyncio.py index 32d8fefe51..ec5b21661d 100644 --- a/contrib/python/ydb/py3/ydb/_topic_writer/topic_writer_asyncio.py +++ b/contrib/python/ydb/py3/ydb/_topic_writer/topic_writer_asyncio.py @@ -1,7 +1,6 @@ import asyncio import concurrent.futures import datetime -import functools import gzip import typing from collections import deque @@ -35,6 +34,7 @@ from .._grpc.grpcwrapper.ydb_topic import ( UpdateTokenRequest, UpdateTokenResponse, StreamWriteMessage, + TransactionIdentity, WriterMessagesFromServerToClient, ) from .._grpc.grpcwrapper.common_utils import ( @@ -43,6 +43,11 @@ from .._grpc.grpcwrapper.common_utils import ( GrpcWrapperAsyncIO, ) +from ..query.base import TxEvent + +if typing.TYPE_CHECKING: + from ..query.transaction import BaseQueryTxContext + logger = logging.getLogger(__name__) @@ -76,8 +81,12 @@ class WriterAsyncIO: def __del__(self): if self._closed or self._loop.is_closed(): return - - self._loop.call_soon(functools.partial(self.close, flush=False)) + try: + logger.warning("Topic writer was not closed properly. Consider using method close().") + task = self._loop.create_task(self.close(flush=False)) + topic_common.wrap_set_name_for_asyncio_task(task, task_name="close writer") + except BaseException: + logger.warning("Something went wrong during writer close in __del__") async def close(self, *, flush: bool = True): if self._closed: @@ -164,6 +173,57 @@ class WriterAsyncIO: return await self._reconnector.wait_init() +class TxWriterAsyncIO(WriterAsyncIO): + _tx: "BaseQueryTxContext" + + def __init__( + self, + tx: "BaseQueryTxContext", + driver: SupportedDriverType, + settings: PublicWriterSettings, + _client=None, + _is_implicit=False, + ): + self._tx = tx + self._loop = asyncio.get_running_loop() + self._closed = False + self._reconnector = WriterAsyncIOReconnector(driver=driver, settings=WriterSettings(settings), tx=self._tx) + self._parent = _client + self._is_implicit = _is_implicit + + # For some reason, creating partition could conflict with other session operations. + # Could be removed later. + self._first_write = True + + tx._add_callback(TxEvent.BEFORE_COMMIT, self._on_before_commit, self._loop) + tx._add_callback(TxEvent.BEFORE_ROLLBACK, self._on_before_rollback, self._loop) + + async def write( + self, + messages: Union[Message, List[Message]], + ): + """ + send one or number of messages to server. + it put message to internal buffer + + For wait with timeout use asyncio.wait_for. + """ + if self._first_write: + self._first_write = False + return await super().write_with_ack(messages) + return await super().write(messages) + + async def _on_before_commit(self, tx: "BaseQueryTxContext"): + if self._is_implicit: + return + await self.close() + + async def _on_before_rollback(self, tx: "BaseQueryTxContext"): + if self._is_implicit: + return + await self.close(flush=False) + + class WriterAsyncIOReconnector: _closed: bool _loop: asyncio.AbstractEventLoop @@ -178,6 +238,7 @@ class WriterAsyncIOReconnector: _codec_selector_batch_num: int _codec_selector_last_codec: Optional[PublicCodec] _codec_selector_check_batches_interval: int + _tx: Optional["BaseQueryTxContext"] if typing.TYPE_CHECKING: _messages_for_encode: asyncio.Queue[List[InternalMessage]] @@ -195,7 +256,9 @@ class WriterAsyncIOReconnector: _stop_reason: asyncio.Future _init_info: Optional[PublicWriterInitInfo] - def __init__(self, driver: SupportedDriverType, settings: WriterSettings): + def __init__( + self, driver: SupportedDriverType, settings: WriterSettings, tx: Optional["BaseQueryTxContext"] = None + ): self._closed = False self._loop = asyncio.get_running_loop() self._driver = driver @@ -205,6 +268,7 @@ class WriterAsyncIOReconnector: self._init_info = None self._stream_connected = asyncio.Event() self._settings = settings + self._tx = tx self._codec_functions = { PublicCodec.RAW: lambda data: data, @@ -354,10 +418,12 @@ class WriterAsyncIOReconnector: # noinspection PyBroadException stream_writer = None try: + tx_identity = None if self._tx is None else self._tx._tx_identity() stream_writer = await WriterAsyncIOStream.create( self._driver, self._init_message, self._settings.update_token_interval, + tx_identity=tx_identity, ) try: if self._init_info is None: @@ -387,7 +453,7 @@ class WriterAsyncIOReconnector: done.pop().result() # need for raise exception - reason of stop task except issues.Error as err: err_info = check_retriable_error(err, retry_settings, attempt) - if not err_info.is_retriable: + if not err_info.is_retriable or self._tx is not None: # no retries in tx writer self._stop(err) return @@ -533,6 +599,8 @@ class WriterAsyncIOReconnector: result = PublicWriteResult.Skipped() elif isinstance(status, write_ack_msg.StatusWritten): result = PublicWriteResult.Written(offset=status.offset) + elif isinstance(status, write_ack_msg.StatusWrittenInTx): + result = PublicWriteResult.WrittenInTx() else: raise TopicWriterError("internal error - receive unexpected ack message.") message_future.set_result(result) @@ -597,10 +665,13 @@ class WriterAsyncIOStream: _update_token_event: asyncio.Event _get_token_function: Optional[Callable[[], str]] + _tx_identity: Optional[TransactionIdentity] + def __init__( self, update_token_interval: Optional[Union[int, float]] = None, get_token_function: Optional[Callable[[], str]] = None, + tx_identity: Optional[TransactionIdentity] = None, ): self._closed = False @@ -609,6 +680,8 @@ class WriterAsyncIOStream: self._update_token_event = asyncio.Event() self._update_token_task = None + self._tx_identity = tx_identity + async def close(self): if self._closed: return @@ -625,6 +698,7 @@ class WriterAsyncIOStream: driver: SupportedDriverType, init_request: StreamWriteMessage.InitRequest, update_token_interval: Optional[Union[int, float]] = None, + tx_identity: Optional[TransactionIdentity] = None, ) -> "WriterAsyncIOStream": stream = GrpcWrapperAsyncIO(StreamWriteMessage.FromServer.from_proto) @@ -634,6 +708,7 @@ class WriterAsyncIOStream: writer = WriterAsyncIOStream( update_token_interval=update_token_interval, get_token_function=creds.get_auth_token if creds else lambda: "", + tx_identity=tx_identity, ) await writer._start(stream, init_request) return writer @@ -680,7 +755,7 @@ class WriterAsyncIOStream: if self._closed: raise RuntimeError("Can not write on closed stream.") - for request in messages_to_proto_requests(messages): + for request in messages_to_proto_requests(messages, self._tx_identity): self._stream.write(request) async def _update_token_loop(self): diff --git a/contrib/python/ydb/py3/ydb/_topic_writer/topic_writer_sync.py b/contrib/python/ydb/py3/ydb/_topic_writer/topic_writer_sync.py index a5193caf7c..954864c968 100644 --- a/contrib/python/ydb/py3/ydb/_topic_writer/topic_writer_sync.py +++ b/contrib/python/ydb/py3/ydb/_topic_writer/topic_writer_sync.py @@ -1,6 +1,7 @@ from __future__ import annotations import asyncio +import logging import typing from concurrent.futures import Future from typing import Union, List, Optional @@ -14,13 +15,23 @@ from .topic_writer import ( TopicWriterClosedError, ) -from .topic_writer_asyncio import WriterAsyncIO +from ..query.base import TxEvent + +from .topic_writer_asyncio import ( + TxWriterAsyncIO, + WriterAsyncIO, +) from .._topic_common.common import ( _get_shared_event_loop, TimeoutType, CallFromSyncToAsync, ) +if typing.TYPE_CHECKING: + from ..query.transaction import BaseQueryTxContext + +logger = logging.getLogger(__name__) + class WriterSync: _caller: CallFromSyncToAsync @@ -63,7 +74,12 @@ class WriterSync: raise def __del__(self): - self.close(flush=False) + if not self._closed: + try: + logger.warning("Topic writer was not closed properly. Consider using method close().") + self.close(flush=False) + except BaseException: + logger.warning("Something went wrong during writer close in __del__") def close(self, *, flush: bool = True, timeout: TimeoutType = None): if self._closed: @@ -122,3 +138,39 @@ class WriterSync: self._check_closed() return self._caller.unsafe_call_with_result(self._async_writer.write_with_ack(messages), timeout=timeout) + + +class TxWriterSync(WriterSync): + def __init__( + self, + tx: "BaseQueryTxContext", + driver: SupportedDriverType, + settings: PublicWriterSettings, + *, + eventloop: Optional[asyncio.AbstractEventLoop] = None, + _parent=None, + ): + + self._closed = False + + if eventloop: + loop = eventloop + else: + loop = _get_shared_event_loop() + + self._caller = CallFromSyncToAsync(loop) + + async def create_async_writer(): + return TxWriterAsyncIO(tx, driver, settings, _is_implicit=True) + + self._async_writer = self._caller.safe_call_with_result(create_async_writer(), None) + self._parent = _parent + + tx._add_callback(TxEvent.BEFORE_COMMIT, self._on_before_commit, None) + tx._add_callback(TxEvent.BEFORE_ROLLBACK, self._on_before_rollback, None) + + def _on_before_commit(self, tx: "BaseQueryTxContext"): + self.close() + + def _on_before_rollback(self, tx: "BaseQueryTxContext"): + self.close(flush=False) diff --git a/contrib/python/ydb/py3/ydb/aio/driver.py b/contrib/python/ydb/py3/ydb/aio/driver.py index 9cd6fd2b74..267997fbcc 100644 --- a/contrib/python/ydb/py3/ydb/aio/driver.py +++ b/contrib/python/ydb/py3/ydb/aio/driver.py @@ -62,4 +62,5 @@ class Driver(pool.ConnectionPool): async def stop(self, timeout=10): await self.table_client._stop_pool_if_needed(timeout=timeout) + self.topic_client.close() await super().stop(timeout=timeout) diff --git a/contrib/python/ydb/py3/ydb/aio/query/pool.py b/contrib/python/ydb/py3/ydb/aio/query/pool.py index 947db65872..f1ca68d1cf 100644 --- a/contrib/python/ydb/py3/ydb/aio/query/pool.py +++ b/contrib/python/ydb/py3/ydb/aio/query/pool.py @@ -158,6 +158,8 @@ class QuerySessionPool: async def wrapped_callee(): async with self.checkout() as session: async with session.transaction(tx_mode=tx_mode) as tx: + if tx_mode.name in ["serializable_read_write", "snapshot_read_only"]: + await tx.begin() result = await callee(tx, *args, **kwargs) await tx.commit() return result @@ -213,12 +215,6 @@ class QuerySessionPool: async def __aexit__(self, exc_type, exc_val, exc_tb): await self.stop() - def __del__(self): - if self._should_stop.is_set() or self._loop.is_closed(): - return - - self._loop.call_soon(self.stop) - class SimpleQuerySessionCheckoutAsync: def __init__(self, pool: QuerySessionPool): diff --git a/contrib/python/ydb/py3/ydb/aio/query/transaction.py b/contrib/python/ydb/py3/ydb/aio/query/transaction.py index 5b63a32b48..f0547e5f01 100644 --- a/contrib/python/ydb/py3/ydb/aio/query/transaction.py +++ b/contrib/python/ydb/py3/ydb/aio/query/transaction.py @@ -16,6 +16,28 @@ logger = logging.getLogger(__name__) class QueryTxContext(BaseQueryTxContext): + def __init__(self, driver, session_state, session, tx_mode): + """ + An object that provides a simple transaction context manager that allows statements execution + in a transaction. You don't have to open transaction explicitly, because context manager encapsulates + transaction control logic, and opens new transaction if: + + 1) By explicit .begin() method; + 2) On execution of a first statement, which is strictly recommended method, because that avoids useless round trip + + This context manager is not thread-safe, so you should not manipulate on it concurrently. + + :param driver: A driver instance + :param session_state: A state of session + :param tx_mode: Transaction mode, which is a one from the following choises: + 1) QuerySerializableReadWrite() which is default mode; + 2) QueryOnlineReadOnly(allow_inconsistent_reads=False); + 3) QuerySnapshotReadOnly(); + 4) QueryStaleReadOnly(). + """ + super().__init__(driver, session_state, session, tx_mode) + self._init_callback_handler(base.CallbackHandlerMode.ASYNC) + async def __aenter__(self) -> "QueryTxContext": """ Enters a context manager and returns a transaction @@ -30,7 +52,7 @@ class QueryTxContext(BaseQueryTxContext): it is not finished explicitly """ await self._ensure_prev_stream_finished() - if self._tx_state._state == QueryTxStateEnum.BEGINED: + if self._tx_state._state == QueryTxStateEnum.BEGINED and self._external_error is None: # It's strictly recommended to close transactions directly # by using commit_tx=True flag while executing statement or by # .commit() or .rollback() methods, but here we trying to do best @@ -65,7 +87,9 @@ class QueryTxContext(BaseQueryTxContext): :return: A committed transaction or exception if commit is failed """ - if self._tx_state._already_in(QueryTxStateEnum.COMMITTED): + self._check_external_error_set() + + if self._tx_state._should_skip(QueryTxStateEnum.COMMITTED): return if self._tx_state._state == QueryTxStateEnum.NOT_INITIALIZED: @@ -74,7 +98,13 @@ class QueryTxContext(BaseQueryTxContext): await self._ensure_prev_stream_finished() - await self._commit_call(settings) + try: + await self._execute_callbacks_async(base.TxEvent.BEFORE_COMMIT) + await self._commit_call(settings) + await self._execute_callbacks_async(base.TxEvent.AFTER_COMMIT, exc=None) + except BaseException as e: + await self._execute_callbacks_async(base.TxEvent.AFTER_COMMIT, exc=e) + raise e async def rollback(self, settings: Optional[BaseRequestSettings] = None) -> None: """Calls rollback on a transaction if it is open otherwise is no-op. If transaction execution @@ -84,7 +114,9 @@ class QueryTxContext(BaseQueryTxContext): :return: A committed transaction or exception if commit is failed """ - if self._tx_state._already_in(QueryTxStateEnum.ROLLBACKED): + self._check_external_error_set() + + if self._tx_state._should_skip(QueryTxStateEnum.ROLLBACKED): return if self._tx_state._state == QueryTxStateEnum.NOT_INITIALIZED: @@ -93,7 +125,13 @@ class QueryTxContext(BaseQueryTxContext): await self._ensure_prev_stream_finished() - await self._rollback_call(settings) + try: + await self._execute_callbacks_async(base.TxEvent.BEFORE_ROLLBACK) + await self._rollback_call(settings) + await self._execute_callbacks_async(base.TxEvent.AFTER_ROLLBACK, exc=None) + except BaseException as e: + await self._execute_callbacks_async(base.TxEvent.AFTER_ROLLBACK, exc=e) + raise e async def execute( self, diff --git a/contrib/python/ydb/py3/ydb/driver.py b/contrib/python/ydb/py3/ydb/driver.py index 49bd223c90..3998aeeef5 100644 --- a/contrib/python/ydb/py3/ydb/driver.py +++ b/contrib/python/ydb/py3/ydb/driver.py @@ -288,4 +288,5 @@ class Driver(pool.ConnectionPool): def stop(self, timeout=10): self.table_client._stop_pool_if_needed(timeout=timeout) + self.topic_client.close() super().stop(timeout=timeout) diff --git a/contrib/python/ydb/py3/ydb/issues.py b/contrib/python/ydb/py3/ydb/issues.py index f38f99f925..4e76f5ed2b 100644 --- a/contrib/python/ydb/py3/ydb/issues.py +++ b/contrib/python/ydb/py3/ydb/issues.py @@ -178,6 +178,10 @@ class SessionPoolEmpty(Error, queue.Empty): status = StatusCode.SESSION_POOL_EMPTY +class ClientInternalError(Error): + status = StatusCode.CLIENT_INTERNAL_ERROR + + class UnexpectedGrpcMessage(Error): def __init__(self, message: str): super().__init__(message) diff --git a/contrib/python/ydb/py3/ydb/query/base.py b/contrib/python/ydb/py3/ydb/query/base.py index 57a769bb1a..a5ebedd95b 100644 --- a/contrib/python/ydb/py3/ydb/query/base.py +++ b/contrib/python/ydb/py3/ydb/query/base.py @@ -1,6 +1,8 @@ import abc +import asyncio import enum import functools +from collections import defaultdict import typing from typing import ( @@ -17,6 +19,10 @@ from .. import issues from .. import _utilities from .. import _apis +from ydb._topic_common.common import CallFromSyncToAsync, _get_shared_event_loop +from ydb._grpc.grpcwrapper.common_utils import to_thread + + if typing.TYPE_CHECKING: from .transaction import BaseQueryTxContext @@ -196,3 +202,64 @@ def wrap_execute_query_response( return convert.ResultSet.from_message(response_pb.result_set, settings) return None + + +class TxEvent(enum.Enum): + BEFORE_COMMIT = "BEFORE_COMMIT" + AFTER_COMMIT = "AFTER_COMMIT" + BEFORE_ROLLBACK = "BEFORE_ROLLBACK" + AFTER_ROLLBACK = "AFTER_ROLLBACK" + + +class CallbackHandlerMode(enum.Enum): + SYNC = "SYNC" + ASYNC = "ASYNC" + + +def _get_sync_callback(method: typing.Callable, loop: Optional[asyncio.AbstractEventLoop]): + if asyncio.iscoroutinefunction(method): + if loop is None: + loop = _get_shared_event_loop() + + def async_to_sync_callback(*args, **kwargs): + caller = CallFromSyncToAsync(loop) + return caller.safe_call_with_result(method(*args, **kwargs), 10) + + return async_to_sync_callback + return method + + +def _get_async_callback(method: typing.Callable): + if asyncio.iscoroutinefunction(method): + return method + + async def sync_to_async_callback(*args, **kwargs): + return await to_thread(method, *args, **kwargs, executor=None) + + return sync_to_async_callback + + +class CallbackHandler: + def _init_callback_handler(self, mode: CallbackHandlerMode) -> None: + self._callbacks = defaultdict(list) + self._callback_mode = mode + + def _execute_callbacks_sync(self, event_name: str, *args, **kwargs) -> None: + for callback in self._callbacks[event_name]: + callback(self, *args, **kwargs) + + async def _execute_callbacks_async(self, event_name: str, *args, **kwargs) -> None: + tasks = [asyncio.create_task(callback(self, *args, **kwargs)) for callback in self._callbacks[event_name]] + if not tasks: + return + await asyncio.gather(*tasks) + + def _prepare_callback( + self, callback: typing.Callable, loop: Optional[asyncio.AbstractEventLoop] + ) -> typing.Callable: + if self._callback_mode == CallbackHandlerMode.SYNC: + return _get_sync_callback(callback, loop) + return _get_async_callback(callback) + + def _add_callback(self, event_name: str, callback: typing.Callable, loop: Optional[asyncio.AbstractEventLoop]): + self._callbacks[event_name].append(self._prepare_callback(callback, loop)) diff --git a/contrib/python/ydb/py3/ydb/query/pool.py b/contrib/python/ydb/py3/ydb/query/pool.py index e3775c4dd1..b25f7db855 100644 --- a/contrib/python/ydb/py3/ydb/query/pool.py +++ b/contrib/python/ydb/py3/ydb/query/pool.py @@ -167,6 +167,8 @@ class QuerySessionPool: def wrapped_callee(): with self.checkout(timeout=retry_settings.max_session_acquire_timeout) as session: with session.transaction(tx_mode=tx_mode) as tx: + if tx_mode.name in ["serializable_read_write", "snapshot_read_only"]: + tx.begin() result = callee(tx, *args, **kwargs) tx.commit() return result @@ -224,9 +226,6 @@ class QuerySessionPool: def __exit__(self, exc_type, exc_val, exc_tb): self.stop() - def __del__(self): - self.stop() - class SimpleQuerySessionCheckout: def __init__(self, pool: QuerySessionPool, timeout: Optional[float]): diff --git a/contrib/python/ydb/py3/ydb/query/transaction.py b/contrib/python/ydb/py3/ydb/query/transaction.py index 414401da4d..ae7642dbe2 100644 --- a/contrib/python/ydb/py3/ydb/query/transaction.py +++ b/contrib/python/ydb/py3/ydb/query/transaction.py @@ -11,6 +11,7 @@ from .. import ( _apis, issues, ) +from .._grpc.grpcwrapper import ydb_topic as _ydb_topic from .._grpc.grpcwrapper import ydb_query as _ydb_query from ..connection import _RpcState as RpcState @@ -42,11 +43,23 @@ class QueryTxStateHelper(abc.ABC): QueryTxStateEnum.DEAD: [], } + _SKIP_TRANSITIONS = { + QueryTxStateEnum.NOT_INITIALIZED: [], + QueryTxStateEnum.BEGINED: [], + QueryTxStateEnum.COMMITTED: [QueryTxStateEnum.COMMITTED, QueryTxStateEnum.ROLLBACKED], + QueryTxStateEnum.ROLLBACKED: [QueryTxStateEnum.COMMITTED, QueryTxStateEnum.ROLLBACKED], + QueryTxStateEnum.DEAD: [], + } + @classmethod def valid_transition(cls, before: QueryTxStateEnum, after: QueryTxStateEnum) -> bool: return after in cls._VALID_TRANSITIONS[before] @classmethod + def should_skip(cls, before: QueryTxStateEnum, after: QueryTxStateEnum) -> bool: + return after in cls._SKIP_TRANSITIONS[before] + + @classmethod def terminal(cls, state: QueryTxStateEnum) -> bool: return len(cls._VALID_TRANSITIONS[state]) == 0 @@ -88,8 +101,8 @@ class QueryTxState: if QueryTxStateHelper.terminal(self._state): raise RuntimeError(f"Transaction is in terminal state: {self._state.value}") - def _already_in(self, target: QueryTxStateEnum) -> bool: - return self._state == target + def _should_skip(self, target: QueryTxStateEnum) -> bool: + return QueryTxStateHelper.should_skip(self._state, target) def _construct_tx_settings(tx_state: QueryTxState) -> _ydb_query.TransactionSettings: @@ -170,7 +183,7 @@ def wrap_tx_rollback_response( return tx -class BaseQueryTxContext: +class BaseQueryTxContext(base.CallbackHandler): def __init__(self, driver, session_state, session, tx_mode): """ An object that provides a simple transaction context manager that allows statements execution @@ -196,6 +209,7 @@ class BaseQueryTxContext: self._session_state = session_state self.session = session self._prev_stream = None + self._external_error = None @property def session_id(self) -> str: @@ -215,6 +229,19 @@ class BaseQueryTxContext: """ return self._tx_state.tx_id + def _tx_identity(self) -> _ydb_topic.TransactionIdentity: + if not self.tx_id: + raise RuntimeError("Unable to get tx identity without started tx.") + return _ydb_topic.TransactionIdentity(self.tx_id, self.session_id) + + def _set_external_error(self, exc: BaseException) -> None: + self._external_error = exc + + def _check_external_error_set(self): + if self._external_error is None: + return + raise issues.ClientInternalError("Transaction was failed by external error.") from self._external_error + def _begin_call(self, settings: Optional[BaseRequestSettings]) -> "BaseQueryTxContext": self._tx_state._check_invalid_transition(QueryTxStateEnum.BEGINED) @@ -228,6 +255,7 @@ class BaseQueryTxContext: ) def _commit_call(self, settings: Optional[BaseRequestSettings]) -> "BaseQueryTxContext": + self._check_external_error_set() self._tx_state._check_invalid_transition(QueryTxStateEnum.COMMITTED) return self._driver( @@ -240,6 +268,7 @@ class BaseQueryTxContext: ) def _rollback_call(self, settings: Optional[BaseRequestSettings]) -> "BaseQueryTxContext": + self._check_external_error_set() self._tx_state._check_invalid_transition(QueryTxStateEnum.ROLLBACKED) return self._driver( @@ -262,6 +291,7 @@ class BaseQueryTxContext: settings: Optional[BaseRequestSettings], ) -> Iterable[_apis.ydb_query.ExecuteQueryResponsePart]: self._tx_state._check_tx_ready_to_use() + self._check_external_error_set() request = base.create_execute_query_request( query=query, @@ -283,18 +313,41 @@ class BaseQueryTxContext: ) def _move_to_beginned(self, tx_id: str) -> None: - if self._tx_state._already_in(QueryTxStateEnum.BEGINED) or not tx_id: + if self._tx_state._should_skip(QueryTxStateEnum.BEGINED) or not tx_id: return self._tx_state._change_state(QueryTxStateEnum.BEGINED) self._tx_state.tx_id = tx_id def _move_to_commited(self) -> None: - if self._tx_state._already_in(QueryTxStateEnum.COMMITTED): + if self._tx_state._should_skip(QueryTxStateEnum.COMMITTED): return self._tx_state._change_state(QueryTxStateEnum.COMMITTED) class QueryTxContext(BaseQueryTxContext): + def __init__(self, driver, session_state, session, tx_mode): + """ + An object that provides a simple transaction context manager that allows statements execution + in a transaction. You don't have to open transaction explicitly, because context manager encapsulates + transaction control logic, and opens new transaction if: + + 1) By explicit .begin() method; + 2) On execution of a first statement, which is strictly recommended method, because that avoids useless round trip + + This context manager is not thread-safe, so you should not manipulate on it concurrently. + + :param driver: A driver instance + :param session_state: A state of session + :param tx_mode: Transaction mode, which is a one from the following choises: + 1) QuerySerializableReadWrite() which is default mode; + 2) QueryOnlineReadOnly(allow_inconsistent_reads=False); + 3) QuerySnapshotReadOnly(); + 4) QueryStaleReadOnly(). + """ + + super().__init__(driver, session_state, session, tx_mode) + self._init_callback_handler(base.CallbackHandlerMode.SYNC) + def __enter__(self) -> "BaseQueryTxContext": """ Enters a context manager and returns a transaction @@ -309,7 +362,7 @@ class QueryTxContext(BaseQueryTxContext): it is not finished explicitly """ self._ensure_prev_stream_finished() - if self._tx_state._state == QueryTxStateEnum.BEGINED: + if self._tx_state._state == QueryTxStateEnum.BEGINED and self._external_error is None: # It's strictly recommended to close transactions directly # by using commit_tx=True flag while executing statement or by # .commit() or .rollback() methods, but here we trying to do best @@ -345,7 +398,8 @@ class QueryTxContext(BaseQueryTxContext): :return: A committed transaction or exception if commit is failed """ - if self._tx_state._already_in(QueryTxStateEnum.COMMITTED): + self._check_external_error_set() + if self._tx_state._should_skip(QueryTxStateEnum.COMMITTED): return if self._tx_state._state == QueryTxStateEnum.NOT_INITIALIZED: @@ -354,7 +408,13 @@ class QueryTxContext(BaseQueryTxContext): self._ensure_prev_stream_finished() - self._commit_call(settings) + try: + self._execute_callbacks_sync(base.TxEvent.BEFORE_COMMIT) + self._commit_call(settings) + self._execute_callbacks_sync(base.TxEvent.AFTER_COMMIT, exc=None) + except BaseException as e: # TODO: probably should be less wide + self._execute_callbacks_sync(base.TxEvent.AFTER_COMMIT, exc=e) + raise e def rollback(self, settings: Optional[BaseRequestSettings] = None) -> None: """Calls rollback on a transaction if it is open otherwise is no-op. If transaction execution @@ -364,7 +424,8 @@ class QueryTxContext(BaseQueryTxContext): :return: A committed transaction or exception if commit is failed """ - if self._tx_state._already_in(QueryTxStateEnum.ROLLBACKED): + self._check_external_error_set() + if self._tx_state._should_skip(QueryTxStateEnum.ROLLBACKED): return if self._tx_state._state == QueryTxStateEnum.NOT_INITIALIZED: @@ -373,7 +434,13 @@ class QueryTxContext(BaseQueryTxContext): self._ensure_prev_stream_finished() - self._rollback_call(settings) + try: + self._execute_callbacks_sync(base.TxEvent.BEFORE_ROLLBACK) + self._rollback_call(settings) + self._execute_callbacks_sync(base.TxEvent.AFTER_ROLLBACK, exc=None) + except BaseException as e: # TODO: probably should be less wide + self._execute_callbacks_sync(base.TxEvent.AFTER_ROLLBACK, exc=e) + raise e def execute( self, diff --git a/contrib/python/ydb/py3/ydb/table.py b/contrib/python/ydb/py3/ydb/table.py index 945e918767..ac73902f3c 100644 --- a/contrib/python/ydb/py3/ydb/table.py +++ b/contrib/python/ydb/py3/ydb/table.py @@ -545,6 +545,9 @@ class TableStats(object): def __init__(self): self.partitions = None self.store_size = 0 + self.rows_estimate = 0 + self.creation_time = None + self.modification_time = None def with_store_size(self, store_size): self.store_size = store_size @@ -554,6 +557,18 @@ class TableStats(object): self.partitions = partitions return self + def with_rows_estimate(self, rows_estimate): + self.rows_estimate = rows_estimate + return self + + def with_creation_time(self, creation_time): + self.creation_time = creation_time + return self + + def with_modification_time(self, modification_time): + self.modification_time = modification_time + return self + class ReadReplicasSettings(object): def __init__(self): @@ -1577,7 +1592,22 @@ class TableSchemeEntry(scheme.SchemeEntry): self.table_stats = None if table_stats is not None: + from ._grpc.grpcwrapper.common_utils import datetime_from_proto_timestamp + self.table_stats = TableStats() + if table_stats.creation_time: + self.table_stats = self.table_stats.with_creation_time( + datetime_from_proto_timestamp(table_stats.creation_time) + ) + + if table_stats.modification_time: + self.table_stats = self.table_stats.with_modification_time( + datetime_from_proto_timestamp(table_stats.modification_time) + ) + + if table_stats.rows_estimate != 0: + self.table_stats = self.table_stats.with_rows_estimate(table_stats.rows_estimate) + if table_stats.partitions != 0: self.table_stats = self.table_stats.with_partitions(table_stats.partitions) diff --git a/contrib/python/ydb/py3/ydb/topic.py b/contrib/python/ydb/py3/ydb/topic.py index 55f4ea04c5..a501f9d275 100644 --- a/contrib/python/ydb/py3/ydb/topic.py +++ b/contrib/python/ydb/py3/ydb/topic.py @@ -25,6 +25,8 @@ __all__ = [ "TopicWriteResult", "TopicWriter", "TopicWriterAsyncIO", + "TopicTxWriter", + "TopicTxWriterAsyncIO", "TopicWriterInitInfo", "TopicWriterMessage", "TopicWriterSettings", @@ -33,6 +35,7 @@ __all__ = [ import concurrent.futures import datetime from dataclasses import dataclass +import logging from typing import List, Union, Mapping, Optional, Dict, Callable from . import aio, Credentials, _apis, issues @@ -65,8 +68,10 @@ from ._topic_writer.topic_writer import ( # noqa: F401 PublicWriteResult as TopicWriteResult, ) +from ydb._topic_writer.topic_writer_asyncio import TxWriterAsyncIO as TopicTxWriterAsyncIO from ydb._topic_writer.topic_writer_asyncio import WriterAsyncIO as TopicWriterAsyncIO from ._topic_writer.topic_writer_sync import WriterSync as TopicWriter +from ._topic_writer.topic_writer_sync import TxWriterSync as TopicTxWriter from ._topic_common.common import ( wrap_operation as _wrap_operation, @@ -88,6 +93,8 @@ from ._grpc.grpcwrapper.ydb_topic_public_types import ( # noqa: F401 PublicAlterAutoPartitioningSettings as TopicAlterAutoPartitioningSettings, ) +logger = logging.getLogger(__name__) + class TopicClientAsyncIO: _closed: bool @@ -108,7 +115,12 @@ class TopicClientAsyncIO: ) def __del__(self): - self.close() + if not self._closed: + try: + logger.warning("Topic client was not closed properly. Consider using method close().") + self.close() + except BaseException: + logger.warning("Something went wrong during topic client close in __del__") async def create_topic( self, @@ -276,6 +288,35 @@ class TopicClientAsyncIO: return TopicWriterAsyncIO(self._driver, settings, _client=self) + def tx_writer( + self, + tx, + topic, + *, + producer_id: Optional[str] = None, # default - random + session_metadata: Mapping[str, str] = None, + partition_id: Union[int, None] = None, + auto_seqno: bool = True, + auto_created_at: bool = True, + codec: Optional[TopicCodec] = None, # default mean auto-select + # encoders: map[codec_code] func(encoded_bytes)->decoded_bytes + # the func will be called from multiply threads in parallel. + encoders: Optional[Mapping[_ydb_topic_public_types.PublicCodec, Callable[[bytes], bytes]]] = None, + # custom encoder executor for call builtin and custom decoders. If None - use shared executor pool. + # If max_worker in the executor is 1 - then encoders will be called from the thread without parallel. + encoder_executor: Optional[concurrent.futures.Executor] = None, + ) -> TopicTxWriterAsyncIO: + args = locals().copy() + del args["self"] + del args["tx"] + + settings = TopicWriterSettings(**args) + + if not settings.encoder_executor: + settings.encoder_executor = self._executor + + return TopicTxWriterAsyncIO(tx=tx, driver=self._driver, settings=settings, _client=self) + def close(self): if self._closed: return @@ -287,7 +328,7 @@ class TopicClientAsyncIO: if not self._closed: return - raise RuntimeError("Topic client closed") + raise issues.Error("Topic client closed") class TopicClient: @@ -310,7 +351,12 @@ class TopicClient: ) def __del__(self): - self.close() + if not self._closed: + try: + logger.warning("Topic client was not closed properly. Consider using method close().") + self.close() + except BaseException: + logger.warning("Something went wrong during topic client close in __del__") def create_topic( self, @@ -487,6 +533,36 @@ class TopicClient: return TopicWriter(self._driver, settings, _parent=self) + def tx_writer( + self, + tx, + topic, + *, + producer_id: Optional[str] = None, # default - random + session_metadata: Mapping[str, str] = None, + partition_id: Union[int, None] = None, + auto_seqno: bool = True, + auto_created_at: bool = True, + codec: Optional[TopicCodec] = None, # default mean auto-select + # encoders: map[codec_code] func(encoded_bytes)->decoded_bytes + # the func will be called from multiply threads in parallel. + encoders: Optional[Mapping[_ydb_topic_public_types.PublicCodec, Callable[[bytes], bytes]]] = None, + # custom encoder executor for call builtin and custom decoders. If None - use shared executor pool. + # If max_worker in the executor is 1 - then encoders will be called from the thread without parallel. + encoder_executor: Optional[concurrent.futures.Executor] = None, # default shared client executor pool + ) -> TopicWriter: + args = locals().copy() + del args["self"] + del args["tx"] + self._check_closed() + + settings = TopicWriterSettings(**args) + + if not settings.encoder_executor: + settings.encoder_executor = self._executor + + return TopicTxWriter(tx, self._driver, settings, _parent=self) + def close(self): if self._closed: return @@ -498,7 +574,7 @@ class TopicClient: if not self._closed: return - raise RuntimeError("Topic client closed") + raise issues.Error("Topic client closed") @dataclass diff --git a/contrib/python/ydb/py3/ydb/ydb_version.py b/contrib/python/ydb/py3/ydb/ydb_version.py index 8bd658d49e..4a5c580f99 100644 --- a/contrib/python/ydb/py3/ydb/ydb_version.py +++ b/contrib/python/ydb/py3/ydb/ydb_version.py @@ -1 +1 @@ -VERSION = "3.19.3" +VERSION = "3.20.1" |