aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorrobot-piglet <robot-piglet@yandex-team.com>2024-08-25 12:54:32 +0300
committerrobot-piglet <robot-piglet@yandex-team.com>2024-08-25 13:03:33 +0300
commit4a64a813e1d34e732f35d8a65147974f76395a6f (patch)
treea8da0dede5213f85e45b95047cfbdcf5427cf0b7
parente9bbee265681b79a9ef9795bdc84cf6996f9cfec (diff)
downloadydb-4a64a813e1d34e732f35d8a65147974f76395a6f.tar.gz
Intermediate changes
-rw-r--r--contrib/python/Twisted/py3/.dist-info/METADATA189
-rw-r--r--contrib/python/Twisted/py3/README.rst2
-rw-r--r--contrib/python/Twisted/py3/twisted/_version.py2
-rw-r--r--contrib/python/Twisted/py3/twisted/conch/client/knownhosts.py98
-rw-r--r--contrib/python/Twisted/py3/twisted/conch/endpoints.py78
-rw-r--r--contrib/python/Twisted/py3/twisted/conch/insults/insults.py18
-rw-r--r--contrib/python/Twisted/py3/twisted/conch/insults/window.py12
-rw-r--r--contrib/python/Twisted/py3/twisted/conch/interfaces.py20
-rw-r--r--contrib/python/Twisted/py3/twisted/conch/manhole.py11
-rw-r--r--contrib/python/Twisted/py3/twisted/internet/_baseprocess.py12
-rw-r--r--contrib/python/Twisted/py3/twisted/internet/_posixstdio.py62
-rw-r--r--contrib/python/Twisted/py3/twisted/internet/_producer_helpers.py32
-rw-r--r--contrib/python/Twisted/py3/twisted/internet/_sslverify.py23
-rw-r--r--contrib/python/Twisted/py3/twisted/internet/_threadedselect.py465
-rw-r--r--contrib/python/Twisted/py3/twisted/internet/_win32stdio.py17
-rw-r--r--contrib/python/Twisted/py3/twisted/internet/base.py83
-rw-r--r--contrib/python/Twisted/py3/twisted/internet/cfreactor.py24
-rw-r--r--contrib/python/Twisted/py3/twisted/internet/defer.py85
-rw-r--r--contrib/python/Twisted/py3/twisted/internet/posixbase.py1
-rw-r--r--contrib/python/Twisted/py3/twisted/internet/selectreactor.py61
-rw-r--r--contrib/python/Twisted/py3/twisted/logger/__init__.py3
-rw-r--r--contrib/python/Twisted/py3/twisted/logger/_json.py2
-rw-r--r--contrib/python/Twisted/py3/twisted/logger/_logger.py204
-rw-r--r--contrib/python/Twisted/py3/twisted/mail/imap4.py4
-rw-r--r--contrib/python/Twisted/py3/twisted/protocols/amp.py32
-rw-r--r--contrib/python/Twisted/py3/twisted/protocols/ftp.py220
-rw-r--r--contrib/python/Twisted/py3/twisted/python/_shellcomp.py4
-rw-r--r--contrib/python/Twisted/py3/twisted/python/compat.py31
-rw-r--r--contrib/python/Twisted/py3/twisted/python/deprecate.py10
-rw-r--r--contrib/python/Twisted/py3/twisted/python/failure.py105
-rw-r--r--contrib/python/Twisted/py3/twisted/spread/pb.py8
-rw-r--r--contrib/python/Twisted/py3/twisted/web/_flatten.py7
-rw-r--r--contrib/python/Twisted/py3/twisted/web/_http2.py11
-rw-r--r--contrib/python/Twisted/py3/twisted/web/_newclient.py201
-rw-r--r--contrib/python/Twisted/py3/twisted/web/_responses.py2
-rw-r--r--contrib/python/Twisted/py3/twisted/web/_template_util.py30
-rw-r--r--contrib/python/Twisted/py3/twisted/web/client.py2
-rw-r--r--contrib/python/Twisted/py3/twisted/web/http.py328
-rw-r--r--contrib/python/Twisted/py3/twisted/web/http_headers.py122
-rw-r--r--contrib/python/Twisted/py3/twisted/web/iweb.py7
-rw-r--r--contrib/python/Twisted/py3/twisted/web/resource.py68
-rw-r--r--contrib/python/Twisted/py3/twisted/web/server.py47
-rw-r--r--contrib/python/Twisted/py3/twisted/web/soap.py166
-rw-r--r--contrib/python/Twisted/py3/twisted/web/test/requesthelper.py4
-rw-r--r--contrib/python/Twisted/py3/twisted/web/util.py2
-rw-r--r--contrib/python/Twisted/py3/twisted/web/wsgi.py122
-rw-r--r--contrib/python/Twisted/py3/twisted/words/protocols/irc.py3
-rw-r--r--contrib/python/Twisted/py3/twisted/words/service.py4
-rw-r--r--contrib/python/Twisted/py3/ya.make3
-rw-r--r--contrib/python/hypothesis/py3/.dist-info/METADATA10
-rw-r--r--contrib/python/hypothesis/py3/hypothesis/strategies/_internal/types.py44
-rw-r--r--contrib/python/hypothesis/py3/hypothesis/version.py2
-rw-r--r--contrib/python/hypothesis/py3/ya.make2
-rw-r--r--yt/yt/client/hedging/config.cpp50
-rw-r--r--yt/yt/client/hedging/config.h40
-rw-r--r--yt/yt/client/hedging/counter.cpp12
-rw-r--r--yt/yt/client/hedging/hedging.cpp139
-rw-r--r--yt/yt/client/hedging/hedging.h80
-rw-r--r--yt/yt/client/hedging/hedging_executor.cpp79
-rw-r--r--yt/yt/client/hedging/hedging_executor.h120
-rw-r--r--yt/yt/client/hedging/penalty_provider.cpp62
-rw-r--r--yt/yt/client/hedging/penalty_provider.h2
-rw-r--r--yt/yt/client/hedging/private.cpp (renamed from yt/yt/client/hedging/logger.cpp)2
-rw-r--r--yt/yt/client/hedging/private.h (renamed from yt/yt/client/hedging/logger.h)0
-rw-r--r--yt/yt/client/hedging/public.h5
-rw-r--r--yt/yt/client/hedging/unittests/counters_ut.cpp32
-rw-r--r--yt/yt/client/hedging/unittests/hedging_ut.cpp110
-rw-r--r--yt/yt/client/hedging/unittests/helper.cpp37
-rw-r--r--yt/yt/client/hedging/unittests/helper.h19
-rw-r--r--yt/yt/client/hedging/unittests/hook.cpp20
-rw-r--r--yt/yt/client/hedging/unittests/penalty_provider_ut.cpp14
-rw-r--r--yt/yt/client/hedging/unittests/ya.make6
-rw-r--r--yt/yt/client/hedging/ya.make4
73 files changed, 2237 insertions, 1701 deletions
diff --git a/contrib/python/Twisted/py3/.dist-info/METADATA b/contrib/python/Twisted/py3/.dist-info/METADATA
index 2f5a63dbb8..22f3987e61 100644
--- a/contrib/python/Twisted/py3/.dist-info/METADATA
+++ b/contrib/python/Twisted/py3/.dist-info/METADATA
@@ -1,6 +1,6 @@
-Metadata-Version: 2.1
+Metadata-Version: 2.3
Name: Twisted
-Version: 24.3.0
+Version: 24.7.0
Summary: An asynchronous networking framework written in Python
Project-URL: Changelog, https://github.com/twisted/twisted/blob/HEAD/NEWS.rst
Project-URL: Documentation, https://docs.twistedmatrix.com/
@@ -23,23 +23,52 @@ Requires-Dist: attrs>=21.3.0
Requires-Dist: automat>=0.8.0
Requires-Dist: constantly>=15.1
Requires-Dist: hyperlink>=17.1.1
-Requires-Dist: incremental>=22.10.0
-Requires-Dist: twisted-iocpsupport<2,>=1.0.2; platform_system == 'Windows'
+Requires-Dist: incremental>=24.7.0
Requires-Dist: typing-extensions>=4.2.0
Requires-Dist: zope-interface>=5
Provides-Extra: all-non-platform
-Requires-Dist: twisted[conch,http2,serial,test,tls]; extra == 'all-non-platform'
+Requires-Dist: appdirs>=1.4.0; extra == 'all-non-platform'
+Requires-Dist: bcrypt>=3.1.3; extra == 'all-non-platform'
+Requires-Dist: cryptography>=3.3; extra == 'all-non-platform'
+Requires-Dist: cython-test-exception-raiser<2,>=1.0.2; extra == 'all-non-platform'
+Requires-Dist: h2<5.0,>=3.0; extra == 'all-non-platform'
+Requires-Dist: hypothesis>=6.56; extra == 'all-non-platform'
+Requires-Dist: idna>=2.4; extra == 'all-non-platform'
+Requires-Dist: priority<2.0,>=1.1.0; extra == 'all-non-platform'
+Requires-Dist: pyhamcrest>=2; extra == 'all-non-platform'
+Requires-Dist: pyopenssl>=21.0.0; extra == 'all-non-platform'
+Requires-Dist: pyserial>=3.0; extra == 'all-non-platform'
+Requires-Dist: pywin32!=226; (platform_system == 'Windows') and extra == 'all-non-platform'
+Requires-Dist: service-identity>=18.1.0; extra == 'all-non-platform'
Provides-Extra: all_non_platform
-Requires-Dist: twisted[conch,http2,serial,test,tls]; extra == 'all_non_platform'
+Requires-Dist: appdirs>=1.4.0; extra == 'all_non_platform'
+Requires-Dist: bcrypt>=3.1.3; extra == 'all_non_platform'
+Requires-Dist: cryptography>=3.3; extra == 'all_non_platform'
+Requires-Dist: cython-test-exception-raiser<2,>=1.0.2; extra == 'all_non_platform'
+Requires-Dist: h2<5.0,>=3.0; extra == 'all_non_platform'
+Requires-Dist: hypothesis>=6.56; extra == 'all_non_platform'
+Requires-Dist: idna>=2.4; extra == 'all_non_platform'
+Requires-Dist: priority<2.0,>=1.1.0; extra == 'all_non_platform'
+Requires-Dist: pyhamcrest>=2; extra == 'all_non_platform'
+Requires-Dist: pyopenssl>=21.0.0; extra == 'all_non_platform'
+Requires-Dist: pyserial>=3.0; extra == 'all_non_platform'
+Requires-Dist: pywin32!=226; (platform_system == 'Windows') and extra == 'all_non_platform'
+Requires-Dist: service-identity>=18.1.0; extra == 'all_non_platform'
Provides-Extra: conch
Requires-Dist: appdirs>=1.4.0; extra == 'conch'
Requires-Dist: bcrypt>=3.1.3; extra == 'conch'
Requires-Dist: cryptography>=3.3; extra == 'conch'
Provides-Extra: dev
-Requires-Dist: coverage<7,>=6b1; extra == 'dev'
+Requires-Dist: coverage~=7.5; extra == 'dev'
+Requires-Dist: cython-test-exception-raiser<2,>=1.0.2; extra == 'dev'
+Requires-Dist: hypothesis>=6.56; extra == 'dev'
+Requires-Dist: pydoctor~=23.9.0; extra == 'dev'
Requires-Dist: pyflakes~=2.2; extra == 'dev'
+Requires-Dist: pyhamcrest>=2; extra == 'dev'
Requires-Dist: python-subunit~=1.4; extra == 'dev'
-Requires-Dist: twisted[dev-release]; extra == 'dev'
+Requires-Dist: sphinx-rtd-theme~=1.3; extra == 'dev'
+Requires-Dist: sphinx<7,>=6; extra == 'dev'
+Requires-Dist: towncrier~=23.6; extra == 'dev'
Requires-Dist: twistedchecker~=0.7; extra == 'dev'
Provides-Extra: dev-release
Requires-Dist: pydoctor~=23.9.0; extra == 'dev-release'
@@ -52,34 +81,132 @@ Requires-Dist: sphinx-rtd-theme~=1.3; extra == 'dev_release'
Requires-Dist: sphinx<7,>=6; extra == 'dev_release'
Requires-Dist: towncrier~=23.6; extra == 'dev_release'
Provides-Extra: gtk-platform
+Requires-Dist: appdirs>=1.4.0; extra == 'gtk-platform'
+Requires-Dist: bcrypt>=3.1.3; extra == 'gtk-platform'
+Requires-Dist: cryptography>=3.3; extra == 'gtk-platform'
+Requires-Dist: cython-test-exception-raiser<2,>=1.0.2; extra == 'gtk-platform'
+Requires-Dist: h2<5.0,>=3.0; extra == 'gtk-platform'
+Requires-Dist: hypothesis>=6.56; extra == 'gtk-platform'
+Requires-Dist: idna>=2.4; extra == 'gtk-platform'
+Requires-Dist: priority<2.0,>=1.1.0; extra == 'gtk-platform'
Requires-Dist: pygobject; extra == 'gtk-platform'
-Requires-Dist: twisted[all-non-platform]; extra == 'gtk-platform'
+Requires-Dist: pyhamcrest>=2; extra == 'gtk-platform'
+Requires-Dist: pyopenssl>=21.0.0; extra == 'gtk-platform'
+Requires-Dist: pyserial>=3.0; extra == 'gtk-platform'
+Requires-Dist: pywin32!=226; (platform_system == 'Windows') and extra == 'gtk-platform'
+Requires-Dist: service-identity>=18.1.0; extra == 'gtk-platform'
Provides-Extra: gtk_platform
+Requires-Dist: appdirs>=1.4.0; extra == 'gtk_platform'
+Requires-Dist: bcrypt>=3.1.3; extra == 'gtk_platform'
+Requires-Dist: cryptography>=3.3; extra == 'gtk_platform'
+Requires-Dist: cython-test-exception-raiser<2,>=1.0.2; extra == 'gtk_platform'
+Requires-Dist: h2<5.0,>=3.0; extra == 'gtk_platform'
+Requires-Dist: hypothesis>=6.56; extra == 'gtk_platform'
+Requires-Dist: idna>=2.4; extra == 'gtk_platform'
+Requires-Dist: priority<2.0,>=1.1.0; extra == 'gtk_platform'
Requires-Dist: pygobject; extra == 'gtk_platform'
-Requires-Dist: twisted[all-non-platform]; extra == 'gtk_platform'
+Requires-Dist: pyhamcrest>=2; extra == 'gtk_platform'
+Requires-Dist: pyopenssl>=21.0.0; extra == 'gtk_platform'
+Requires-Dist: pyserial>=3.0; extra == 'gtk_platform'
+Requires-Dist: pywin32!=226; (platform_system == 'Windows') and extra == 'gtk_platform'
+Requires-Dist: service-identity>=18.1.0; extra == 'gtk_platform'
Provides-Extra: http2
Requires-Dist: h2<5.0,>=3.0; extra == 'http2'
Requires-Dist: priority<2.0,>=1.1.0; extra == 'http2'
Provides-Extra: macos-platform
+Requires-Dist: appdirs>=1.4.0; extra == 'macos-platform'
+Requires-Dist: bcrypt>=3.1.3; extra == 'macos-platform'
+Requires-Dist: cryptography>=3.3; extra == 'macos-platform'
+Requires-Dist: cython-test-exception-raiser<2,>=1.0.2; extra == 'macos-platform'
+Requires-Dist: h2<5.0,>=3.0; extra == 'macos-platform'
+Requires-Dist: hypothesis>=6.56; extra == 'macos-platform'
+Requires-Dist: idna>=2.4; extra == 'macos-platform'
+Requires-Dist: priority<2.0,>=1.1.0; extra == 'macos-platform'
+Requires-Dist: pyhamcrest>=2; extra == 'macos-platform'
Requires-Dist: pyobjc-core; extra == 'macos-platform'
Requires-Dist: pyobjc-framework-cfnetwork; extra == 'macos-platform'
Requires-Dist: pyobjc-framework-cocoa; extra == 'macos-platform'
-Requires-Dist: twisted[all-non-platform]; extra == 'macos-platform'
+Requires-Dist: pyopenssl>=21.0.0; extra == 'macos-platform'
+Requires-Dist: pyserial>=3.0; extra == 'macos-platform'
+Requires-Dist: pywin32!=226; (platform_system == 'Windows') and extra == 'macos-platform'
+Requires-Dist: service-identity>=18.1.0; extra == 'macos-platform'
Provides-Extra: macos_platform
+Requires-Dist: appdirs>=1.4.0; extra == 'macos_platform'
+Requires-Dist: bcrypt>=3.1.3; extra == 'macos_platform'
+Requires-Dist: cryptography>=3.3; extra == 'macos_platform'
+Requires-Dist: cython-test-exception-raiser<2,>=1.0.2; extra == 'macos_platform'
+Requires-Dist: h2<5.0,>=3.0; extra == 'macos_platform'
+Requires-Dist: hypothesis>=6.56; extra == 'macos_platform'
+Requires-Dist: idna>=2.4; extra == 'macos_platform'
+Requires-Dist: priority<2.0,>=1.1.0; extra == 'macos_platform'
+Requires-Dist: pyhamcrest>=2; extra == 'macos_platform'
Requires-Dist: pyobjc-core; extra == 'macos_platform'
Requires-Dist: pyobjc-framework-cfnetwork; extra == 'macos_platform'
Requires-Dist: pyobjc-framework-cocoa; extra == 'macos_platform'
-Requires-Dist: twisted[all-non-platform]; extra == 'macos_platform'
+Requires-Dist: pyopenssl>=21.0.0; extra == 'macos_platform'
+Requires-Dist: pyserial>=3.0; extra == 'macos_platform'
+Requires-Dist: pywin32!=226; (platform_system == 'Windows') and extra == 'macos_platform'
+Requires-Dist: service-identity>=18.1.0; extra == 'macos_platform'
Provides-Extra: mypy
+Requires-Dist: appdirs>=1.4.0; extra == 'mypy'
+Requires-Dist: bcrypt>=3.1.3; extra == 'mypy'
+Requires-Dist: coverage~=7.5; extra == 'mypy'
+Requires-Dist: cryptography>=3.3; extra == 'mypy'
+Requires-Dist: cython-test-exception-raiser<2,>=1.0.2; extra == 'mypy'
+Requires-Dist: h2<5.0,>=3.0; extra == 'mypy'
+Requires-Dist: hypothesis>=6.56; extra == 'mypy'
+Requires-Dist: idna>=2.4; extra == 'mypy'
Requires-Dist: mypy-zope~=1.0.3; extra == 'mypy'
Requires-Dist: mypy~=1.8; extra == 'mypy'
-Requires-Dist: twisted[all-non-platform,dev]; extra == 'mypy'
+Requires-Dist: priority<2.0,>=1.1.0; extra == 'mypy'
+Requires-Dist: pydoctor~=23.9.0; extra == 'mypy'
+Requires-Dist: pyflakes~=2.2; extra == 'mypy'
+Requires-Dist: pyhamcrest>=2; extra == 'mypy'
+Requires-Dist: pyopenssl>=21.0.0; extra == 'mypy'
+Requires-Dist: pyserial>=3.0; extra == 'mypy'
+Requires-Dist: python-subunit~=1.4; extra == 'mypy'
+Requires-Dist: pywin32!=226; (platform_system == 'Windows') and extra == 'mypy'
+Requires-Dist: service-identity>=18.1.0; extra == 'mypy'
+Requires-Dist: sphinx-rtd-theme~=1.3; extra == 'mypy'
+Requires-Dist: sphinx<7,>=6; extra == 'mypy'
+Requires-Dist: towncrier~=23.6; extra == 'mypy'
+Requires-Dist: twistedchecker~=0.7; extra == 'mypy'
Requires-Dist: types-pyopenssl; extra == 'mypy'
Requires-Dist: types-setuptools; extra == 'mypy'
Provides-Extra: osx-platform
-Requires-Dist: twisted[macos-platform]; extra == 'osx-platform'
+Requires-Dist: appdirs>=1.4.0; extra == 'osx-platform'
+Requires-Dist: bcrypt>=3.1.3; extra == 'osx-platform'
+Requires-Dist: cryptography>=3.3; extra == 'osx-platform'
+Requires-Dist: cython-test-exception-raiser<2,>=1.0.2; extra == 'osx-platform'
+Requires-Dist: h2<5.0,>=3.0; extra == 'osx-platform'
+Requires-Dist: hypothesis>=6.56; extra == 'osx-platform'
+Requires-Dist: idna>=2.4; extra == 'osx-platform'
+Requires-Dist: priority<2.0,>=1.1.0; extra == 'osx-platform'
+Requires-Dist: pyhamcrest>=2; extra == 'osx-platform'
+Requires-Dist: pyobjc-core; extra == 'osx-platform'
+Requires-Dist: pyobjc-framework-cfnetwork; extra == 'osx-platform'
+Requires-Dist: pyobjc-framework-cocoa; extra == 'osx-platform'
+Requires-Dist: pyopenssl>=21.0.0; extra == 'osx-platform'
+Requires-Dist: pyserial>=3.0; extra == 'osx-platform'
+Requires-Dist: pywin32!=226; (platform_system == 'Windows') and extra == 'osx-platform'
+Requires-Dist: service-identity>=18.1.0; extra == 'osx-platform'
Provides-Extra: osx_platform
-Requires-Dist: twisted[macos-platform]; extra == 'osx_platform'
+Requires-Dist: appdirs>=1.4.0; extra == 'osx_platform'
+Requires-Dist: bcrypt>=3.1.3; extra == 'osx_platform'
+Requires-Dist: cryptography>=3.3; extra == 'osx_platform'
+Requires-Dist: cython-test-exception-raiser<2,>=1.0.2; extra == 'osx_platform'
+Requires-Dist: h2<5.0,>=3.0; extra == 'osx_platform'
+Requires-Dist: hypothesis>=6.56; extra == 'osx_platform'
+Requires-Dist: idna>=2.4; extra == 'osx_platform'
+Requires-Dist: priority<2.0,>=1.1.0; extra == 'osx_platform'
+Requires-Dist: pyhamcrest>=2; extra == 'osx_platform'
+Requires-Dist: pyobjc-core; extra == 'osx_platform'
+Requires-Dist: pyobjc-framework-cfnetwork; extra == 'osx_platform'
+Requires-Dist: pyobjc-framework-cocoa; extra == 'osx_platform'
+Requires-Dist: pyopenssl>=21.0.0; extra == 'osx_platform'
+Requires-Dist: pyserial>=3.0; extra == 'osx_platform'
+Requires-Dist: pywin32!=226; (platform_system == 'Windows') and extra == 'osx_platform'
+Requires-Dist: service-identity>=18.1.0; extra == 'osx_platform'
Provides-Extra: serial
Requires-Dist: pyserial>=3.0; extra == 'serial'
Requires-Dist: pywin32!=226; (platform_system == 'Windows') and extra == 'serial'
@@ -92,11 +219,37 @@ Requires-Dist: idna>=2.4; extra == 'tls'
Requires-Dist: pyopenssl>=21.0.0; extra == 'tls'
Requires-Dist: service-identity>=18.1.0; extra == 'tls'
Provides-Extra: windows-platform
+Requires-Dist: appdirs>=1.4.0; extra == 'windows-platform'
+Requires-Dist: bcrypt>=3.1.3; extra == 'windows-platform'
+Requires-Dist: cryptography>=3.3; extra == 'windows-platform'
+Requires-Dist: cython-test-exception-raiser<2,>=1.0.2; extra == 'windows-platform'
+Requires-Dist: h2<5.0,>=3.0; extra == 'windows-platform'
+Requires-Dist: hypothesis>=6.56; extra == 'windows-platform'
+Requires-Dist: idna>=2.4; extra == 'windows-platform'
+Requires-Dist: priority<2.0,>=1.1.0; extra == 'windows-platform'
+Requires-Dist: pyhamcrest>=2; extra == 'windows-platform'
+Requires-Dist: pyopenssl>=21.0.0; extra == 'windows-platform'
+Requires-Dist: pyserial>=3.0; extra == 'windows-platform'
Requires-Dist: pywin32!=226; extra == 'windows-platform'
-Requires-Dist: twisted[all-non-platform]; extra == 'windows-platform'
+Requires-Dist: pywin32!=226; (platform_system == 'Windows') and extra == 'windows-platform'
+Requires-Dist: service-identity>=18.1.0; extra == 'windows-platform'
+Requires-Dist: twisted-iocpsupport>=1.0.2; extra == 'windows-platform'
Provides-Extra: windows_platform
+Requires-Dist: appdirs>=1.4.0; extra == 'windows_platform'
+Requires-Dist: bcrypt>=3.1.3; extra == 'windows_platform'
+Requires-Dist: cryptography>=3.3; extra == 'windows_platform'
+Requires-Dist: cython-test-exception-raiser<2,>=1.0.2; extra == 'windows_platform'
+Requires-Dist: h2<5.0,>=3.0; extra == 'windows_platform'
+Requires-Dist: hypothesis>=6.56; extra == 'windows_platform'
+Requires-Dist: idna>=2.4; extra == 'windows_platform'
+Requires-Dist: priority<2.0,>=1.1.0; extra == 'windows_platform'
+Requires-Dist: pyhamcrest>=2; extra == 'windows_platform'
+Requires-Dist: pyopenssl>=21.0.0; extra == 'windows_platform'
+Requires-Dist: pyserial>=3.0; extra == 'windows_platform'
Requires-Dist: pywin32!=226; extra == 'windows_platform'
-Requires-Dist: twisted[all-non-platform]; extra == 'windows_platform'
+Requires-Dist: pywin32!=226; (platform_system == 'Windows') and extra == 'windows_platform'
+Requires-Dist: service-identity>=18.1.0; extra == 'windows_platform'
+Requires-Dist: twisted-iocpsupport>=1.0.2; extra == 'windows_platform'
Description-Content-Type: text/x-rst
Twisted
@@ -135,7 +288,7 @@ To install the latest version of Twisted using pip::
$ pip install twisted
-Additional instructions for installing this software are in `the installation instructions <https://github.com/twisted/twisted/blob/trunk/INSTALL.rst>`_.
+Additional instructions for installing this software are in `the installation instructions <https://docs.twisted.org/en/latest/installations.rst>`_.
Documentation and Support
diff --git a/contrib/python/Twisted/py3/README.rst b/contrib/python/Twisted/py3/README.rst
index ba458874ce..1d2f85648c 100644
--- a/contrib/python/Twisted/py3/README.rst
+++ b/contrib/python/Twisted/py3/README.rst
@@ -34,7 +34,7 @@ To install the latest version of Twisted using pip::
$ pip install twisted
-Additional instructions for installing this software are in `the installation instructions <INSTALL.rst>`_.
+Additional instructions for installing this software are in `the installation instructions <https://docs.twisted.org/en/latest/installations.rst>`_.
Documentation and Support
diff --git a/contrib/python/Twisted/py3/twisted/_version.py b/contrib/python/Twisted/py3/twisted/_version.py
index 1d58c477de..f1f493452d 100644
--- a/contrib/python/Twisted/py3/twisted/_version.py
+++ b/contrib/python/Twisted/py3/twisted/_version.py
@@ -7,5 +7,5 @@ Provides Twisted version information.
from incremental import Version
-__version__ = Version("Twisted", 24, 3, 0)
+__version__ = Version("Twisted", 24, 7, 0)
__all__ = ["__version__"]
diff --git a/contrib/python/Twisted/py3/twisted/conch/client/knownhosts.py b/contrib/python/Twisted/py3/twisted/conch/client/knownhosts.py
index 39bf10ba79..44118512bd 100644
--- a/contrib/python/Twisted/py3/twisted/conch/client/knownhosts.py
+++ b/contrib/python/Twisted/py3/twisted/conch/client/knownhosts.py
@@ -8,12 +8,14 @@ An implementation of the OpenSSH known_hosts database.
@since: 8.2
"""
+from __future__ import annotations
import hmac
import sys
from binascii import Error as DecodeError, a2b_base64, b2a_base64
from contextlib import closing
from hashlib import sha1
+from typing import IO, Callable, Literal
from zope.interface import implementer
@@ -21,8 +23,10 @@ from twisted.conch.error import HostKeyChanged, InvalidEntry, UserRejectedKey
from twisted.conch.interfaces import IKnownHostEntry
from twisted.conch.ssh.keys import BadKeyError, FingerprintFormats, Key
from twisted.internet import defer
+from twisted.internet.defer import Deferred
from twisted.logger import Logger
from twisted.python.compat import nativeString
+from twisted.python.filepath import FilePath
from twisted.python.randbytes import secureRandom
from twisted.python.util import FancyEqMixin
@@ -111,34 +115,33 @@ class PlainEntry(_BaseEntry):
file.
@ivar _hostnames: the list of all host-names associated with this entry.
- @type _hostnames: L{list} of L{bytes}
"""
- def __init__(self, hostnames, keyType, publicKey, comment):
- self._hostnames = hostnames
+ def __init__(
+ self, hostnames: list[bytes], keyType: bytes, publicKey: Key, comment: bytes
+ ):
+ self._hostnames: list[bytes] = hostnames
super().__init__(keyType, publicKey, comment)
@classmethod
- def fromString(cls, string):
+ def fromString(cls, string: bytes) -> PlainEntry:
"""
Parse a plain-text entry in a known_hosts file, and return a
corresponding L{PlainEntry}.
@param string: a space-separated string formatted like "hostname
- key-type base64-key-data comment".
-
- @type string: L{bytes}
+ key-type base64-key-data comment".
@raise DecodeError: if the key is not valid encoded as valid base64.
@raise InvalidEntry: if the entry does not have the right number of
- elements and is therefore invalid.
+ elements and is therefore invalid.
@raise BadKeyError: if the key, once decoded from base64, is not
- actually an SSH key.
+ actually an SSH key.
@return: an IKnownHostEntry representing the hostname and key in the
- input line.
+ input line.
@rtype: L{PlainEntry}
"""
@@ -146,30 +149,27 @@ class PlainEntry(_BaseEntry):
self = cls(hostnames.split(b","), keyType, key, comment)
return self
- def matchesHost(self, hostname):
+ def matchesHost(self, hostname: bytes | str) -> bool:
"""
Check to see if this entry matches a given hostname.
@param hostname: A hostname or IP address literal to check against this
entry.
- @type hostname: L{bytes}
@return: C{True} if this entry is for the given hostname or IP address,
C{False} otherwise.
- @rtype: L{bool}
"""
if isinstance(hostname, str):
hostname = hostname.encode("utf-8")
return hostname in self._hostnames
- def toString(self):
+ def toString(self) -> bytes:
"""
Implement L{IKnownHostEntry.toString} by recording the comma-separated
hostnames, key type, and base-64 encoded key.
@return: The string representation of this entry, with unhashed hostname
information.
- @rtype: L{bytes}
"""
fields = [
b",".join(self._hostnames),
@@ -256,33 +256,39 @@ class HashedEntry(_BaseEntry, FancyEqMixin):
compareAttributes = ("_hostSalt", "_hostHash", "keyType", "publicKey", "comment")
- def __init__(self, hostSalt, hostHash, keyType, publicKey, comment):
+ def __init__(
+ self,
+ hostSalt: bytes,
+ hostHash: bytes,
+ keyType: bytes,
+ publicKey: Key,
+ comment: bytes | None,
+ ) -> None:
self._hostSalt = hostSalt
self._hostHash = hostHash
super().__init__(keyType, publicKey, comment)
@classmethod
- def fromString(cls, string):
+ def fromString(cls, string: bytes) -> HashedEntry:
"""
Load a hashed entry from a string representing a line in a known_hosts
file.
@param string: A complete single line from a I{known_hosts} file,
formatted as defined by OpenSSH.
- @type string: L{bytes}
@raise DecodeError: if the key, the hostname, or the is not valid
encoded as valid base64
@raise InvalidEntry: if the entry does not have the right number of
- elements and is therefore invalid, or the host/hash portion contains
- more items than just the host and hash.
+ elements and is therefore invalid, or the host/hash portion
+ contains more items than just the host and hash.
@raise BadKeyError: if the key, once decoded from base64, is not
actually an SSH key.
- @return: The newly created L{HashedEntry} instance, initialized with the
- information from C{string}.
+ @return: The newly created L{HashedEntry} instance, initialized with
+ the information from C{string}.
"""
stuff, keyType, key, comment = _extractCommon(string)
saltAndHash = stuff[len(cls.MAGIC) :].split(b"|")
@@ -346,7 +352,7 @@ class KnownHostsFile:
@ivar _savePath: See C{savePath} parameter of L{__init__}.
"""
- def __init__(self, savePath):
+ def __init__(self, savePath: FilePath[str]) -> None:
"""
Create a new, empty KnownHostsFile.
@@ -356,12 +362,12 @@ class KnownHostsFile:
@param savePath: The L{FilePath} to which to save new entries.
@type savePath: L{FilePath}
"""
- self._added = []
+ self._added: list[IKnownHostEntry] = []
self._savePath = savePath
self._clobber = True
@property
- def savePath(self):
+ def savePath(self) -> FilePath[str]:
"""
@see: C{savePath} parameter of L{__init__}
"""
@@ -431,7 +437,9 @@ class KnownHostsFile:
raise HostKeyChanged(entry, path, line)
return False
- def verifyHostKey(self, ui, hostname, ip, key):
+ def verifyHostKey(
+ self, ui: ConsoleUI, hostname: bytes, ip: bytes, key: Key
+ ) -> Deferred[bool]:
"""
Verify the given host key for the given IP and host, asking for
confirmation from, and notifying, the given UI about changes to this
@@ -453,20 +461,21 @@ class KnownHostsFile:
"""
hhk = defer.execute(self.hasHostKey, hostname, key)
- def gotHasKey(result):
+ def gotHasKey(result: bool) -> bool | Deferred[bool]:
if result:
if not self.hasHostKey(ip, key):
- ui.warn(
- "Warning: Permanently added the %s host key for "
- "IP address '%s' to the list of known hosts."
- % (key.type(), nativeString(ip))
+ addMessage = (
+ f"Warning: Permanently added the {key.type()} host key"
+ f" for IP address '{ip.decode()}' to the list of known"
+ " hosts.\n"
)
+ ui.warn(addMessage.encode("utf-8"))
self.addHostKey(ip, key)
self.save()
return result
else:
- def promptResponse(response):
+ def promptResponse(response: bool) -> bool:
if response:
self.addHostKey(hostname, key)
self.addHostKey(ip, key)
@@ -475,7 +484,7 @@ class KnownHostsFile:
else:
raise UserRejectedKey()
- keytype = key.type()
+ keytype: str = key.type()
if keytype == "EC":
keytype = "ECDSA"
@@ -497,7 +506,7 @@ class KnownHostsFile:
return hhk.addCallback(gotHasKey)
- def addHostKey(self, hostname, key):
+ def addHostKey(self, hostname: bytes, key: Key) -> HashedEntry:
"""
Add a new L{HashedEntry} to the key database.
@@ -520,7 +529,7 @@ class KnownHostsFile:
self._added.append(entry)
return entry
- def save(self):
+ def save(self) -> None:
"""
Save this L{KnownHostsFile} to the path it was loaded from.
"""
@@ -528,11 +537,7 @@ class KnownHostsFile:
if not p.isdir():
p.makedirs()
- if self._clobber:
- mode = "wb"
- else:
- mode = "ab"
-
+ mode: Literal["a", "w"] = "w" if self._clobber else "a"
with self._savePath.open(mode) as hostsFileObj:
if self._added:
hostsFileObj.write(
@@ -542,7 +547,7 @@ class KnownHostsFile:
self._clobber = False
@classmethod
- def fromPath(cls, path):
+ def fromPath(cls, path: FilePath[str]) -> KnownHostsFile:
"""
Create a new L{KnownHostsFile}, potentially reading existing known
hosts information from the given file.
@@ -550,10 +555,8 @@ class KnownHostsFile:
@param path: A path object to use for both reading contents from and
later saving to. If no file exists at this path, it is not an
error; a L{KnownHostsFile} with no entries is returned.
- @type path: L{FilePath}
@return: A L{KnownHostsFile} initialized with entries from C{path}.
- @rtype: L{KnownHostsFile}
"""
knownHosts = cls(path)
knownHosts._clobber = False
@@ -566,7 +569,7 @@ class ConsoleUI:
console, to be used during key verification.
"""
- def __init__(self, opener):
+ def __init__(self, opener: Callable[[], IO[bytes]]):
"""
@param opener: A no-argument callable which should open a console
binary-mode file-like object to be used for reading and writing.
@@ -576,7 +579,7 @@ class ConsoleUI:
"""
self.opener = opener
- def prompt(self, text):
+ def prompt(self, text: bytes) -> Deferred[bool]:
"""
Write the given text as a prompt to the console output, then read a
result from the console input.
@@ -598,20 +601,19 @@ class ConsoleUI:
answer = f.readline().strip().lower()
if answer == b"yes":
return True
- elif answer == b"no":
+ elif answer in {b"no", b""}:
return False
else:
f.write(b"Please type 'yes' or 'no': ")
return d.addCallback(body)
- def warn(self, text):
+ def warn(self, text: bytes) -> None:
"""
Notify the user (non-interactively) of the provided text, by writing it
to the console.
@param text: Some information the user is to be made aware of.
- @type text: L{bytes}
"""
try:
with closing(self.opener()) as f:
diff --git a/contrib/python/Twisted/py3/twisted/conch/endpoints.py b/contrib/python/Twisted/py3/twisted/conch/endpoints.py
index f2ab315848..3269532acd 100644
--- a/contrib/python/Twisted/py3/twisted/conch/endpoints.py
+++ b/contrib/python/Twisted/py3/twisted/conch/endpoints.py
@@ -6,11 +6,19 @@
Endpoint implementations of various SSH interactions.
"""
-__all__ = ["AuthenticationFailed", "SSHCommandAddress", "SSHCommandClientEndpoint"]
+from __future__ import annotations
+
+__all__ = [
+ "AuthenticationFailed",
+ "SSHCommandAddress",
+ "SSHCommandClientEndpoint",
+]
import signal
+from io import BytesIO
from os.path import expanduser
from struct import unpack
+from typing import IO, Any
from zope.interface import Interface, implementer
@@ -689,44 +697,6 @@ class SSHCommandClientEndpoint:
return commandConnected
-class _ReadFile:
- """
- A weakly file-like object which can be used with L{KnownHostsFile} to
- respond in the negative to all prompts for decisions.
- """
-
- def __init__(self, contents):
- """
- @param contents: L{bytes} which will be returned from every C{readline}
- call.
- """
- self._contents = contents
-
- def write(self, data):
- """
- No-op.
-
- @param data: ignored
- """
-
- def readline(self, count=-1):
- """
- Always give back the byte string that this L{_ReadFile} was initialized
- with.
-
- @param count: ignored
-
- @return: A fixed byte-string.
- @rtype: L{bytes}
- """
- return self._contents
-
- def close(self):
- """
- No-op.
- """
-
-
@implementer(_ISSHConnectionCreator)
class _NewConnectionHelper:
"""
@@ -739,17 +709,17 @@ class _NewConnectionHelper:
def __init__(
self,
- reactor,
- hostname,
- port,
- command,
- username,
- keys,
- password,
- agentEndpoint,
- knownHosts,
- ui,
- tty=FilePath(b"/dev/tty"),
+ reactor: Any,
+ hostname: str,
+ port: int,
+ command: str,
+ username: str,
+ keys: str,
+ password: str,
+ agentEndpoint: str,
+ knownHosts: str | None,
+ ui: ConsoleUI | None,
+ tty: FilePath[bytes] | FilePath[str] = FilePath(b"/dev/tty"),
):
"""
@param tty: The path of the tty device to use in case C{ui} is L{None}.
@@ -773,9 +743,9 @@ class _NewConnectionHelper:
if ui is None:
ui = ConsoleUI(self._opener)
self.ui = ui
- self.tty = tty
+ self.tty: FilePath[bytes] | FilePath[str] = tty
- def _opener(self):
+ def _opener(self) -> IO[bytes]:
"""
Open the tty if possible, otherwise give back a file-like object from
which C{b"no"} can be read.
@@ -783,11 +753,11 @@ class _NewConnectionHelper:
For use as the opener argument to L{ConsoleUI}.
"""
try:
- return self.tty.open("rb+")
+ return self.tty.open("r+")
except BaseException:
# Give back a file-like object from which can be read a byte string
# that KnownHostsFile recognizes as rejecting some option (b"no").
- return _ReadFile(b"no")
+ return BytesIO(b"no")
@classmethod
def _knownHosts(cls):
diff --git a/contrib/python/Twisted/py3/twisted/conch/insults/insults.py b/contrib/python/Twisted/py3/twisted/conch/insults/insults.py
index 4640aab368..ff5edf45b0 100644
--- a/contrib/python/Twisted/py3/twisted/conch/insults/insults.py
+++ b/contrib/python/Twisted/py3/twisted/conch/insults/insults.py
@@ -431,23 +431,7 @@ _KEY_NAMES = (
"CONTROL",
)
-
-class _const:
- """
- @ivar name: A string naming this constant
- """
-
- def __init__(self, name: str) -> None:
- self.name = name
-
- def __repr__(self) -> str:
- return "[" + self.name + "]"
-
- def __bytes__(self) -> bytes:
- return ("[" + self.name + "]").encode("ascii")
-
-
-FUNCTION_KEYS = [_const(_name).__bytes__() for _name in _KEY_NAMES]
+FUNCTION_KEYS = [f"[{_name}]".encode("ascii") for _name in _KEY_NAMES]
@implementer(ITerminalTransport)
diff --git a/contrib/python/Twisted/py3/twisted/conch/insults/window.py b/contrib/python/Twisted/py3/twisted/conch/insults/window.py
index c93fae7b21..da0fc1e08e 100644
--- a/contrib/python/Twisted/py3/twisted/conch/insults/window.py
+++ b/contrib/python/Twisted/py3/twisted/conch/insults/window.py
@@ -6,6 +6,8 @@ Simple insults-based widget library
@author: Jp Calderone
"""
+from __future__ import annotations
+
import array
from twisted.conch.insults import helper, insults
@@ -47,7 +49,8 @@ class Widget:
focused = False
parent = None
dirty = False
- width = height = None
+ width: int | None = None
+ height: int | None = None
def repaint(self):
if not self.dirty:
@@ -109,7 +112,12 @@ class Widget:
name = keyID
if not isinstance(keyID, str):
name = name.decode("utf-8")
- func = getattr(self, "func_" + name, None)
+
+ # Peel off the square brackets added by the computed definition of
+ # twisted.conch.insults.insults.FUNCTION_KEYS.
+ methodName = "func_" + name[1:-1]
+
+ func = getattr(self, methodName, None)
if func is not None:
func(modifier)
diff --git a/contrib/python/Twisted/py3/twisted/conch/interfaces.py b/contrib/python/Twisted/py3/twisted/conch/interfaces.py
index 749fad2f96..965519b0ea 100644
--- a/contrib/python/Twisted/py3/twisted/conch/interfaces.py
+++ b/contrib/python/Twisted/py3/twisted/conch/interfaces.py
@@ -5,8 +5,15 @@
This module contains interfaces defined for the L{twisted.conch} package.
"""
+from __future__ import annotations
+
+from typing import TYPE_CHECKING
+
from zope.interface import Attribute, Interface
+if TYPE_CHECKING:
+ from twisted.conch.ssh.keys import Key
+
class IConchUser(Interface):
"""
@@ -363,16 +370,15 @@ class IKnownHostEntry(Interface):
@since: 8.2
"""
- def matchesKey(key):
+ def matchesKey(key: Key) -> bool:
"""
Return True if this entry matches the given Key object, False
otherwise.
@param key: The key object to match against.
- @type key: L{twisted.conch.ssh.keys.Key}
"""
- def matchesHost(hostname):
+ def matchesHost(hostname: bytes) -> bool:
"""
Return True if this entry matches the given hostname, False otherwise.
@@ -381,16 +387,12 @@ class IKnownHostEntry(Interface):
quad string.
@param hostname: The hostname to match against.
- @type hostname: L{str}
"""
- def toString():
+ def toString() -> bytes:
"""
-
@return: a serialized string representation of this entry, suitable for
- inclusion in a known_hosts file. (Newline not included.)
-
- @rtype: L{str}
+ inclusion in a known_hosts file. (Newline not included.)
"""
diff --git a/contrib/python/Twisted/py3/twisted/conch/manhole.py b/contrib/python/Twisted/py3/twisted/conch/manhole.py
index 5bf2f817a4..f552af5bbd 100644
--- a/contrib/python/Twisted/py3/twisted/conch/manhole.py
+++ b/contrib/python/Twisted/py3/twisted/conch/manhole.py
@@ -23,7 +23,6 @@ from typing import Type
from twisted.conch import recvline
from twisted.internet import defer
-from twisted.python.compat import _get_async_param
from twisted.python.htmlizer import TokenPrinter
from twisted.python.monkey import MonkeyPatcher
@@ -161,8 +160,7 @@ class ManholeInterpreter(code.InteractiveInterpreter):
del self._pendingDeferreds[id(obj)]
return failure
- def write(self, data, isAsync=None, **kwargs):
- isAsync = _get_async_param(isAsync, **kwargs)
+ def write(self, data, isAsync=None):
self.handler.addOutput(data, isAsync)
@@ -239,8 +237,7 @@ class Manhole(recvline.HistoricRecvLine):
w = self.terminal.lastWrite
return not w.endswith(b"\n") and not w.endswith(b"\x1bE")
- def addOutput(self, data, isAsync=None, **kwargs):
- isAsync = _get_async_param(isAsync, **kwargs)
+ def addOutput(self, data, isAsync=None):
if isAsync:
self.terminal.eraseLine()
self.terminal.cursorBackward(len(self.lineBuffer) + len(self.ps[self.pn]))
@@ -309,10 +306,6 @@ class VT102Writer:
s = b"".join(self.written)
return s.strip(b"\n").splitlines()[-1]
- if bytes == str:
- # Compat with Python 2.7
- __str__ = __bytes__
-
def lastColorizedLine(source):
"""
diff --git a/contrib/python/Twisted/py3/twisted/internet/_baseprocess.py b/contrib/python/Twisted/py3/twisted/internet/_baseprocess.py
index 83bc08fdc0..142273143a 100644
--- a/contrib/python/Twisted/py3/twisted/internet/_baseprocess.py
+++ b/contrib/python/Twisted/py3/twisted/internet/_baseprocess.py
@@ -9,11 +9,13 @@ L{IReactorProcess} implementations.
from typing import Optional
+from twisted.logger import Logger
from twisted.python.deprecate import getWarningMethod
from twisted.python.failure import Failure
-from twisted.python.log import err
from twisted.python.reflect import qual
+_log = Logger()
+
_missingProcessExited = (
"Since Twisted 8.2, IProcessProtocol.processExited "
"is required. %s must implement it."
@@ -39,10 +41,8 @@ class BaseProcess:
stacklevel=0,
)
else:
- try:
+ with _log.failuresHandled("while calling processExited:"):
processExited(Failure(reason))
- except BaseException:
- err(None, "unexpected error in processExited")
def processEnded(self, status):
"""
@@ -62,7 +62,5 @@ class BaseProcess:
reason = self._getReason(self.status)
proto = self.proto
self.proto = None
- try:
+ with _log.failuresHandled("while calling processEnded:"):
proto.processEnded(Failure(reason))
- except BaseException:
- err(None, "unexpected error in processEnded")
diff --git a/contrib/python/Twisted/py3/twisted/internet/_posixstdio.py b/contrib/python/Twisted/py3/twisted/internet/_posixstdio.py
index b7ef9cdac3..e99920ead3 100644
--- a/contrib/python/Twisted/py3/twisted/internet/_posixstdio.py
+++ b/contrib/python/Twisted/py3/twisted/internet/_posixstdio.py
@@ -10,11 +10,16 @@ Future Plans::
Maintainer: James Y Knight
"""
+from __future__ import annotations
from zope.interface import implementer
-from twisted.internet import error, interfaces, process
-from twisted.python import failure, log
+from twisted.internet import interfaces, process
+from twisted.internet.interfaces import IProtocol, IReactorFDSet
+from twisted.logger import Logger
+from twisted.python.failure import Failure
+
+_log = Logger()
@implementer(interfaces.IAddress)
@@ -34,10 +39,16 @@ class StandardIO:
disconnected = False
disconnecting = False
- def __init__(self, proto, stdin=0, stdout=1, reactor=None):
+ def __init__(
+ self,
+ proto: IProtocol,
+ stdin: int = 0,
+ stdout: int = 1,
+ reactor: IReactorFDSet | None = None,
+ ):
if reactor is None:
- from twisted.internet import reactor
- self.protocol = proto
+ from twisted.internet import reactor # type:ignore[assignment]
+ self.protocol: IProtocol = proto
self._writer = process.ProcessWriter(reactor, self, "write", stdout)
self._reader = process.ProcessReader(reactor, self, "read", stdin)
@@ -75,21 +86,16 @@ class StandardIO:
return PipeAddress()
# Callbacks from process.ProcessReader/ProcessWriter
- def childDataReceived(self, fd, data):
+ def childDataReceived(self, fd: str, data: bytes) -> None:
self.protocol.dataReceived(data)
- def childConnectionLost(self, fd, reason):
+ def childConnectionLost(self, fd: str, reason: Failure) -> None:
if self.disconnected:
return
-
- if reason.value.__class__ == error.ConnectionDone:
- # Normal close
- if fd == "read":
- self._readConnectionLost(reason)
- else:
- self._writeConnectionLost(reason)
+ if fd == "read":
+ self._readConnectionLost(reason)
else:
- self.connectionLost(reason)
+ self._writeConnectionLost(reason)
def connectionLost(self, reason):
self.disconnected = True
@@ -99,7 +105,7 @@ class StandardIO:
_writer = self._writer
protocol = self.protocol
self._reader = self._writer = None
- self.protocol = None
+ self.protocol = None # type:ignore[assignment]
if _writer is not None and not _writer.disconnected:
_writer.connectionLost(reason)
@@ -107,12 +113,10 @@ class StandardIO:
if _reader is not None and not _reader.disconnected:
_reader.connectionLost(reason)
- try:
+ with _log.failuresHandled("while calling stdio connectionLost:"):
protocol.connectionLost(reason)
- except BaseException:
- log.err()
- def _writeConnectionLost(self, reason):
+ def _writeConnectionLost(self, reason: Failure) -> None:
self._writer = None
if self.disconnecting:
self.connectionLost(reason)
@@ -120,21 +124,21 @@ class StandardIO:
p = interfaces.IHalfCloseableProtocol(self.protocol, None)
if p:
- try:
+ with _log.failuresHandled(
+ "while calling stdio writeConnectionLost:"
+ ) as wcl:
p.writeConnectionLost()
- except BaseException:
- log.err()
- self.connectionLost(failure.Failure())
+ if wcl.failed:
+ self.connectionLost(wcl.failure)
- def _readConnectionLost(self, reason):
+ def _readConnectionLost(self, reason: Failure) -> None:
self._reader = None
p = interfaces.IHalfCloseableProtocol(self.protocol, None)
if p:
- try:
+ with _log.failuresHandled("while calling stdio readConnectionLost:") as rcl:
p.readConnectionLost()
- except BaseException:
- log.err()
- self.connectionLost(failure.Failure())
+ if rcl.failed:
+ self.connectionLost(rcl.failure)
else:
self.connectionLost(reason)
diff --git a/contrib/python/Twisted/py3/twisted/internet/_producer_helpers.py b/contrib/python/Twisted/py3/twisted/internet/_producer_helpers.py
index c2136e0509..7583a9e459 100644
--- a/contrib/python/Twisted/py3/twisted/internet/_producer_helpers.py
+++ b/contrib/python/Twisted/py3/twisted/internet/_producer_helpers.py
@@ -1,4 +1,4 @@
-# -*- test-case-name: twisted.test.test_producer_helpers -*-
+# -*- test-case-name: twisted.protocols.test.test_tls,twisted.web.test.test_http2 -*-
# Copyright (c) Twisted Matrix Laboratories.
# See LICENSE for details.
@@ -12,8 +12,9 @@ from zope.interface import implementer
from twisted.internet.interfaces import IPushProducer
from twisted.internet.task import cooperate
-from twisted.python import log
-from twisted.python.reflect import safe_str
+from twisted.logger import Logger
+
+_log = Logger()
# This module exports nothing public, it's for internal Twisted use only.
__all__: List[str] = []
@@ -60,26 +61,19 @@ class _PullToPush:
unregistered, which should result in streaming stopping.
"""
while True:
- try:
+ with _log.failuresHandled(
+ "while calling resumeProducing on {producer}", producer=self._producer
+ ) as op:
self._producer.resumeProducing()
- except BaseException:
- log.err(
- None,
- "%s failed, producing will be stopped:"
- % (safe_str(self._producer),),
- )
- try:
+ if op.failed:
+ with _log.failuresHandled(
+ "while calling unregisterProducer on {consumer}",
+ consumer=self._consumer,
+ ) as handlingop:
self._consumer.unregisterProducer()
+ if handlingop.failed:
# The consumer should now call stopStreaming() on us,
# thus stopping the streaming.
- except BaseException:
- # Since the consumer blew up, we may not have had
- # stopStreaming() called, so we just stop on our own:
- log.err(
- None,
- "%s failed to unregister producer:"
- % (safe_str(self._consumer),),
- )
self._finished = True
return
yield None
diff --git a/contrib/python/Twisted/py3/twisted/internet/_sslverify.py b/contrib/python/Twisted/py3/twisted/internet/_sslverify.py
index abb6bddf7c..2095fe69ec 100644
--- a/contrib/python/Twisted/py3/twisted/internet/_sslverify.py
+++ b/contrib/python/Twisted/py3/twisted/internet/_sslverify.py
@@ -28,13 +28,16 @@ from twisted.internet.interfaces import (
IOpenSSLClientConnectionCreator,
IOpenSSLContextFactory,
)
-from twisted.python import log, util
+from twisted.logger import Logger
from twisted.python.compat import nativeString
from twisted.python.deprecate import _mutuallyExclusiveArguments, deprecated
from twisted.python.failure import Failure
from twisted.python.randbytes import secureRandom
+from twisted.python.util import nameToLabel
from ._idna import _idnaBytes
+_log = Logger()
+
class TLSVersion(Names):
"""
@@ -344,7 +347,7 @@ class DistinguishedName(Dict[str, bytes]):
return set(mapping.values())
for k in sorted(uniqueValues(_x509names)):
- label = util.nameToLabel(k)
+ label = nameToLabel(k)
lablen = max(len(label), lablen)
v = getattr(self, k, None)
if v is not None:
@@ -431,8 +434,8 @@ class Certificate(CertBase):
def __repr__(self) -> str:
return "<{} Subject={} Issuer={}>".format(
self.__class__.__name__,
- self.getSubject().commonName,
- self.getIssuer().commonName,
+ self.getSubject().get("commonName", ""),
+ self.getIssuer().get("commonName", ""),
)
def __eq__(self, other: object) -> bool:
@@ -1057,13 +1060,13 @@ def _tolerateErrors(wrapped):
@rtype: L{callable}
"""
- def infoCallback(connection, where, ret):
- try:
- return wrapped(connection, where, ret)
- except BaseException:
- f = Failure()
- log.err(f, "Error during info_callback")
+ def infoCallback(connection: SSL.Connection, where: int, ret: int) -> object:
+ result = None
+ with _log.failuresHandled("Error during info_callback") as op:
+ result = wrapped(connection, where, ret)
+ if (f := op.failure) is not None:
connection.get_app_data().failVerification(f)
+ return result
return infoCallback
diff --git a/contrib/python/Twisted/py3/twisted/internet/_threadedselect.py b/contrib/python/Twisted/py3/twisted/internet/_threadedselect.py
index 8a53e4ca96..1c7db16b0a 100644
--- a/contrib/python/Twisted/py3/twisted/internet/_threadedselect.py
+++ b/contrib/python/Twisted/py3/twisted/internet/_threadedselect.py
@@ -10,14 +10,14 @@ arbitrary foreign event loop, such as those you find in GUI toolkits.
There are three things you'll need to do to use this reactor.
-Install the reactor at the beginning of your program, before importing
-the rest of Twisted::
+Install the reactor at the beginning of your program, before importing the rest
+of Twisted::
| from twisted.internet import _threadedselect
| _threadedselect.install()
-Interleave this reactor with your foreign event loop, at some point after
-your event loop is initialized::
+Interleave this reactor with your foreign event loop, at some point after your
+event loop is initialized::
| from twisted.internet import reactor
| reactor.interleave(foreignEventLoopWakerFunction)
@@ -31,68 +31,113 @@ reactor::
In order for Twisted to do its work in the main thread (the thread that
interleave is called from), a waker function is necessary. The waker function
-will be called from a "background" thread with one argument: func.
-The waker function's purpose is to call func() from the main thread.
-Many GUI toolkits ship with appropriate waker functions.
-Some examples of this are wxPython's wx.callAfter (may be wxCallAfter in
-older versions of wxPython) or PyObjC's PyObjCTools.AppHelper.callAfter.
-These would be used in place of "foreignEventLoopWakerFunction" in the above
-example.
+will be called from a "background" thread with one argument: func. The waker
+function's purpose is to call func() from the main thread. Many GUI toolkits
+ship with appropriate waker functions. One example of this is wxPython's
+wx.callAfter (may be wxCallAfter in older versions of wxPython). These would
+be used in place of "foreignEventLoopWakerFunction" in the above example.
The other integration point at which the foreign event loop and this reactor
-must integrate is shutdown. In order to ensure clean shutdown of Twisted,
-you must allow for Twisted to come to a complete stop before quitting the
+must integrate is shutdown. In order to ensure clean shutdown of Twisted, you
+must allow for Twisted to come to a complete stop before quitting the
application. Typically, you will do this by setting up an after shutdown
trigger to stop your foreign event loop, and call reactor.stop() where you
would normally have initiated the shutdown procedure for the foreign event
-loop. Shutdown functions that could be used in place of
-"foreignEventloopStop" would be the ExitMainLoop method of the wxApp instance
-with wxPython, or the PyObjCTools.AppHelper.stopEventLoop function.
+loop. Shutdown functions that could be used in place of "foreignEventloopStop"
+would be the ExitMainLoop method of the wxApp instance with wxPython.
"""
+from __future__ import annotations
-import select
-import sys
from errno import EBADF, EINTR
-from functools import partial
from queue import Empty, Queue
from threading import Thread
+from typing import Any, Callable
from zope.interface import implementer
+from twisted._threads import ThreadWorker
from twisted.internet import posixbase
-from twisted.internet.interfaces import IReactorFDSet
-from twisted.internet.posixbase import _NO_FILEDESC, _NO_FILENO
-from twisted.internet.selectreactor import _select
-from twisted.python import failure, log, threadable
+from twisted.internet.interfaces import IReactorFDSet, IReadDescriptor, IWriteDescriptor
+from twisted.internet.selectreactor import _preenDescriptors, _select
+from twisted.logger import Logger
+from twisted.python.log import callWithLogger as _callWithLogger
-
-def dictRemove(dct, value):
- try:
- del dct[value]
- except KeyError:
- pass
+_log = Logger()
def raiseException(e):
raise e
+def _threadsafeSelect(
+ timeout: float | None,
+ readmap: dict[int, IReadDescriptor],
+ writemap: dict[int, IWriteDescriptor],
+ handleResult: Callable[
+ [
+ list[int],
+ list[int],
+ dict[int, IReadDescriptor],
+ dict[int, IWriteDescriptor],
+ bool,
+ ],
+ None,
+ ],
+) -> None:
+ """
+ Invoke C{select}. This will be called in a non-main thread, so it is very
+ careful to work only on integers and avoid calling any application code.
+ """
+ preen = False
+ r = []
+ w = []
+ while 1:
+ readints = readmap.keys()
+ writeints = writemap.keys()
+ try:
+ result = _select(readints, writeints, [], timeout)
+ except ValueError:
+ # Possible problems with file descriptors that were passed:
+ # ValueError may indicate that a file descriptor has gone negative.
+ preen = True
+ break
+ except OSError as se:
+ # The select() system call encountered an error.
+ if se.args[0] == EINTR:
+ # EINTR is hard to replicate in tests using an actual select(),
+ # and I don't want to dedicate effort to testing this function
+ # when it needs to be refactored with selectreactor.
+
+ return # pragma: no cover
+ elif se.args[0] == EBADF:
+ preen = True
+ break
+ else:
+ # OK, I really don't know what's going on. Blow up. Never
+ # mind with the coverage here, since we are just trying to make
+ # sure we don't swallow an exception.
+ raise # pragma: no cover
+ else:
+ r, w, ignored = result
+ break
+ handleResult(r, w, readmap, writemap, preen)
+
+
@implementer(IReactorFDSet)
class ThreadedSelectReactor(posixbase.PosixReactorBase):
"""A threaded select() based reactor - runs on all POSIX platforms and on
Win32.
"""
- def __init__(self):
- threadable.init(1)
- self.reads = {}
- self.writes = {}
- self.toThreadQueue = Queue()
- self.toMainThread = Queue()
- self.workerThread = None
- self.mainWaker = None
+ def __init__(
+ self, waker: Callable[[Callable[[], None]], None] | None = None
+ ) -> None:
+ self.reads: set[IReadDescriptor] = set()
+ self.writes: set[IWriteDescriptor] = set()
posixbase.PosixReactorBase.__init__(self)
- self.addSystemEventTrigger("after", "shutdown", self._mainLoopShutdown)
+ self._selectorThread: ThreadWorker | None = None
+ self.mainWaker = waker
+ self._iterationQueue: Queue[Callable[[], None]] | None = None
def wakeUp(self):
# we want to wake up from any thread
@@ -103,205 +148,131 @@ class ThreadedSelectReactor(posixbase.PosixReactorBase):
self.wakeUp()
return tple
- def _sendToMain(self, msg, *args):
- self.toMainThread.put((msg, args))
- if self.mainWaker is not None:
- self.mainWaker()
-
- def _sendToThread(self, fn, *args):
- self.toThreadQueue.put((fn, args))
-
- def _preenDescriptorsInThread(self):
- log.msg("Malformed file descriptor found. Preening lists.")
- readers = self.reads.keys()
- writers = self.writes.keys()
- self.reads.clear()
- self.writes.clear()
- for selDict, selList in ((self.reads, readers), (self.writes, writers)):
- for selectable in selList:
- try:
- select.select([selectable], [selectable], [selectable], 0)
- except BaseException:
- log.msg("bad descriptor %s" % selectable)
- else:
- selDict[selectable] = 1
+ def _doReadOrWrite(self, selectable: object, method: str) -> None:
+ with _log.failuresHandled(
+ "while handling selectable {sel}", sel=selectable
+ ) as op:
+ why = getattr(selectable, method)()
+ if (fail := op.failure) is not None:
+ why = fail.value
+ if why:
+ self._disconnectSelectable(selectable, why, method == "doRead")
- def _workerInThread(self):
- try:
- while 1:
- fn, args = self.toThreadQueue.get()
- fn(*args)
- except SystemExit:
- pass # Exception indicates this thread should exit
- except BaseException:
- f = failure.Failure()
- self._sendToMain("Failure", f)
-
- def _doSelectInThread(self, timeout):
- """Run one iteration of the I/O monitor loop.
-
- This will run all selectables who had input or output readiness
- waiting for them.
- """
- reads = self.reads
- writes = self.writes
- while 1:
- try:
- r, w, ignored = _select(reads.keys(), writes.keys(), [], timeout)
- break
- except ValueError:
- # Possibly a file descriptor has gone negative?
- log.err()
- self._preenDescriptorsInThread()
- except TypeError:
- # Something *totally* invalid (object w/o fileno, non-integral
- # result) was passed
- log.err()
- self._preenDescriptorsInThread()
- except OSError as se:
- # select(2) encountered an error
- if se.args[0] in (0, 2):
- # windows does this if it got an empty list
- if (not reads) and (not writes):
- return
- else:
- raise
- elif se.args[0] == EINTR:
+ def _selectOnce(self, timeout: float | None, keepGoing: bool) -> None:
+ reads: dict[int, Any] = {}
+ writes: dict[int, Any] = {}
+ for isRead, fdmap, d in [
+ (True, self.reads, reads),
+ (False, self.writes, writes),
+ ]:
+ for each in fdmap: # type:ignore[attr-defined]
+ d[each.fileno()] = each
+
+ mainWaker = self.mainWaker
+ assert mainWaker is not None, (
+ "neither .interleave() nor .mainLoop() / .run() called, "
+ "but we are somehow running the reactor"
+ )
+
+ def callReadsAndWrites(
+ r: list[int],
+ w: list[int],
+ readmap: dict[int, IReadDescriptor],
+ writemap: dict[int, IWriteDescriptor],
+ preen: bool,
+ ) -> None:
+ @mainWaker
+ def onMainThread() -> None:
+ if preen:
+ _preenDescriptors(
+ self.reads, self.writes, self._disconnectSelectable
+ )
return
- elif se.args[0] == EBADF:
- self._preenDescriptorsInThread()
+ _drdw = self._doReadOrWrite
+
+ for readable in r:
+ rselectable = readmap[readable]
+ if rselectable in self.reads:
+ _callWithLogger(rselectable, _drdw, rselectable, "doRead")
+
+ for writable in w:
+ wselectable = writemap[writable]
+ if wselectable in self.writes:
+ _callWithLogger(wselectable, _drdw, wselectable, "doWrite")
+
+ self.runUntilCurrent()
+ if self._started and keepGoing:
+ # see coverage note in .interleave()
+ self._selectOnce(self.timeout(), True) # pragma: no cover
else:
- # OK, I really don't know what's going on. Blow up.
- raise
- self._sendToMain("Notify", r, w)
-
- def _process_Notify(self, r, w):
- reads = self.reads
- writes = self.writes
-
- _drdw = self._doReadOrWrite
- _logrun = log.callWithLogger
- for selectables, method, dct in ((r, "doRead", reads), (w, "doWrite", writes)):
- for selectable in selectables:
- # if this was disconnected in another thread, kill it.
- if selectable not in dct:
- continue
- # This for pausing input when we're not ready for more.
- _logrun(selectable, _drdw, selectable, method, dct)
-
- def _process_Failure(self, f):
- f.raiseException()
-
- _doIterationInThread = _doSelectInThread
-
- def ensureWorkerThread(self):
- if self.workerThread is None or not self.workerThread.isAlive():
- self.workerThread = Thread(target=self._workerInThread)
- self.workerThread.start()
-
- def doThreadIteration(self, timeout):
- self._sendToThread(self._doIterationInThread, timeout)
- self.ensureWorkerThread()
- msg, args = self.toMainThread.get()
- getattr(self, "_process_" + msg)(*args)
-
- doIteration = doThreadIteration
-
- def _interleave(self):
- while self.running:
- self.runUntilCurrent()
- t2 = self.timeout()
- t = self.running and t2
- self._sendToThread(self._doIterationInThread, t)
- yield None
- msg, args = self.toMainThread.get_nowait()
- getattr(self, "_process_" + msg)(*args)
-
- def interleave(self, waker, *args, **kw):
+ self._cleanUpThread()
+
+ if self._selectorThread is None:
+ self._selectorThread = ThreadWorker(
+ lambda target: Thread(target=target).start(), Queue()
+ )
+ self._selectorThread.do(
+ lambda: _threadsafeSelect(timeout, reads, writes, callReadsAndWrites)
+ )
+
+ def _cleanUpThread(self) -> None:
+ """
+ Ensure that the selector thread is stopped.
"""
- interleave(waker) interleaves this reactor with the
- current application by moving the blocking parts of
- the reactor (select() in this case) to a separate
- thread. This is typically useful for integration with
- GUI applications which have their own event loop
- already running.
+ oldThread, self._selectorThread = self._selectorThread, None
+ if oldThread is not None:
+ oldThread.quit()
+
+ def interleave(
+ self,
+ waker: Callable[[Callable[[], None]], None],
+ installSignalHandlers: bool = True,
+ ) -> None:
+ """
+ interleave(waker) interleaves this reactor with the current application
+ by moving the blocking parts of the reactor (select() in this case) to
+ a separate thread. This is typically useful for integration with GUI
+ applications which have their own event loop already running.
See the module docstring for more information.
"""
- self.startRunning(*args, **kw)
- loop = self._interleave()
-
- def mainWaker(waker=waker, loop=loop):
- waker(partial(next, loop))
-
- self.mainWaker = mainWaker
- next(loop)
- self.ensureWorkerThread()
-
- def _mainLoopShutdown(self):
- self.mainWaker = None
- if self.workerThread is not None:
- self._sendToThread(raiseException, SystemExit)
- self.wakeUp()
- try:
- while 1:
- msg, args = self.toMainThread.get_nowait()
- except Empty:
- pass
- self.workerThread.join()
- self.workerThread = None
- try:
- while 1:
- fn, args = self.toThreadQueue.get_nowait()
- if fn is self._doIterationInThread:
- log.msg("Iteration is still in the thread queue!")
- elif fn is raiseException and args[0] is SystemExit:
- pass
- else:
- fn(*args)
- except Empty:
- pass
-
- def _doReadOrWrite(self, selectable, method, dict):
- try:
- why = getattr(selectable, method)()
- handfn = getattr(selectable, "fileno", None)
- if not handfn:
- why = _NO_FILENO
- elif handfn() == -1:
- why = _NO_FILEDESC
- except BaseException:
- why = sys.exc_info()[1]
- log.err()
- if why:
- self._disconnectSelectable(selectable, why, method == "doRead")
-
- def addReader(self, reader):
+ # TODO: This method is excluded from coverage because it only happens
+ # in the case where we are actually running on a foreign event loop,
+ # and twisted's test suite isn't set up that way. It would be nice to
+ # add some dedicated tests for ThreadedSelectReactor that covered this
+ # case.
+ self.mainWaker = waker # pragma: no cover
+ self.startRunning(installSignalHandlers) # pragma: no cover
+ self._selectOnce(0.0, True) # pragma: no cover
+
+ def addReader(self, reader: IReadDescriptor) -> None:
"""Add a FileDescriptor for notification of data available to read."""
- self._sendToThread(self.reads.__setitem__, reader, 1)
+ self.reads.add(reader)
self.wakeUp()
- def addWriter(self, writer):
+ def addWriter(self, writer: IWriteDescriptor) -> None:
"""Add a FileDescriptor for notification of data available to write."""
- self._sendToThread(self.writes.__setitem__, writer, 1)
+ self.writes.add(writer)
self.wakeUp()
- def removeReader(self, reader):
+ def removeReader(self, reader: IReadDescriptor) -> None:
"""Remove a Selectable for notification of data available to read."""
- self._sendToThread(dictRemove, self.reads, reader)
+ if reader in self.reads:
+ self.reads.remove(reader)
- def removeWriter(self, writer):
+ def removeWriter(self, writer: IWriteDescriptor) -> None:
"""Remove a Selectable for notification of data available to write."""
- self._sendToThread(dictRemove, self.writes, writer)
+ if writer in self.writes:
+ self.writes.remove(writer)
- def removeAll(self):
- return self._removeAll(self.reads, self.writes)
+ def removeAll(self) -> list[IReadDescriptor | IWriteDescriptor]:
+ return self._removeAll(self.reads, self.writes) # type:ignore[no-any-return]
- def getReaders(self):
- return list(self.reads.keys())
+ def getReaders(self) -> list[IReadDescriptor]:
+ return list(self.reads)
- def getWriters(self):
- return list(self.writes.keys())
+ def getWriters(self) -> list[IWriteDescriptor]:
+ return list(self.writes)
def stop(self):
"""
@@ -311,18 +282,52 @@ class ThreadedSelectReactor(posixbase.PosixReactorBase):
posixbase.PosixReactorBase.stop(self)
self.wakeUp()
- def run(self, installSignalHandlers=True):
- self.startRunning(installSignalHandlers=installSignalHandlers)
- self.mainLoop()
-
- def mainLoop(self):
- q = Queue()
- self.interleave(q.put)
- while self.running:
- try:
- q.get()()
- except StopIteration:
- break
+ def crash(self):
+ posixbase.PosixReactorBase.crash(self)
+ self.wakeUp()
+
+ # The following methods are mostly for test-suite support, to make
+ # ThreadedSelectReactor behave like another reactor you might call run()
+ # on.
+ def _testMainLoopSetup(self) -> None:
+ """
+ Mostly for compliance with L{IReactorCore} and usability with the
+ tests, set up a fake blocking main-loop; make the "foreign" main loop
+ we are interfacing with be C{self.mainLoop()}, that is reading from a
+ basic Queue.
+ """
+ self._iterationQueue = Queue()
+ self.mainWaker = self._iterationQueue.put
+
+ def _uninstallHandler(self) -> None:
+ """
+ Handle uninstallation to ensure that cleanup is properly performed by
+ ReactorBuilder tests.
+ """
+ super()._uninstallHandler()
+ self._cleanUpThread()
+
+ def iterate(self, timeout: float = 0.0) -> None:
+ if self._iterationQueue is None and self.mainWaker is None: # pragma: no branch
+ self._testMainLoopSetup()
+ self.wakeUp()
+ super().iterate(timeout)
+
+ def doIteration(self, timeout: float | None) -> None:
+ assert self._iterationQueue is not None
+ self._selectOnce(timeout, False)
+ try:
+ work = self._iterationQueue.get(timeout=timeout)
+ except Empty:
+ return
+ work()
+
+ def mainLoop(self) -> None:
+ """
+ This should not normally be run.
+ """
+ self._testMainLoopSetup()
+ super().mainLoop()
def install():
diff --git a/contrib/python/Twisted/py3/twisted/internet/_win32stdio.py b/contrib/python/Twisted/py3/twisted/internet/_win32stdio.py
index 104d65f348..f6a80403d0 100644
--- a/contrib/python/Twisted/py3/twisted/internet/_win32stdio.py
+++ b/contrib/python/Twisted/py3/twisted/internet/_win32stdio.py
@@ -20,8 +20,11 @@ from twisted.internet.interfaces import (
IPushProducer,
ITransport,
)
+from twisted.logger import Logger
from twisted.python.failure import Failure
+_log = Logger()
+
@implementer(IAddress)
class Win32PipeAddress:
@@ -66,14 +69,20 @@ class StandardIO(_pollingfile._PollingTimer):
self.proto.dataReceived(data)
def readConnectionLost(self):
- if IHalfCloseableProtocol.providedBy(self.proto):
- self.proto.readConnectionLost()
+ with _log.failuresHandled("read connection lost") as op:
+ if IHalfCloseableProtocol.providedBy(self.proto):
+ self.proto.readConnectionLost()
self.checkConnLost()
+ if not op.succeeded and not self.disconnecting:
+ self.loseConnection()
def writeConnectionLost(self):
- if IHalfCloseableProtocol.providedBy(self.proto):
- self.proto.writeConnectionLost()
+ with _log.failuresHandled("write connection lost") as op:
+ if IHalfCloseableProtocol.providedBy(self.proto):
+ self.proto.writeConnectionLost()
self.checkConnLost()
+ if not op.succeeded and not self.disconnecting:
+ self.loseConnection()
connsLost = 0
diff --git a/contrib/python/Twisted/py3/twisted/internet/base.py b/contrib/python/Twisted/py3/twisted/internet/base.py
index f039dfe5c4..c807f41873 100644
--- a/contrib/python/Twisted/py3/twisted/internet/base.py
+++ b/contrib/python/Twisted/py3/twisted/internet/base.py
@@ -56,8 +56,10 @@ from twisted.internet.interfaces import (
_ISupportsExitSignalCapturing,
)
from twisted.internet.protocol import ClientFactory
-from twisted.python import log, reflect
+from twisted.logger import Logger
+from twisted.python import reflect
from twisted.python.failure import Failure
+from twisted.python.log import callWithLogger as _callWithLogger
from twisted.python.runtime import platform, seconds as runtimeSeconds
from ._signals import SignalHandling, _WithoutSignalHandling, _WithSignalHandling
@@ -73,6 +75,14 @@ if platform.supportsThreads():
else:
ThreadPool = None # type: ignore[misc, assignment]
+_log = Logger()
+
+# Pre-allocate some static application-code failure logging handlers so that we
+# do not need to allocate them in performance-sensitive bits of code below.
+_topHandler = _log.failureHandler("Unexpected error in main loop")
+_threadCallHandler = _log.failureHandler("while calling from thread")
+_systemEventHandler = _log.failureHandler("While calling system event trigger handler")
+
@implementer(IDelayedCall)
class DelayedCall:
@@ -494,13 +504,11 @@ class _ThreePhaseEvent:
while self.before:
callable, args, kwargs = self.before.pop(0)
self.finishedBefore.append((callable, args, kwargs))
- try:
+ result = None
+ with _systemEventHandler:
result = callable(*args, **kwargs)
- except BaseException:
- log.err()
- else:
- if isinstance(result, Deferred):
- beforeResults.append(result)
+ if isinstance(result, Deferred):
+ beforeResults.append(result)
DeferredList(beforeResults).addCallback(self._continueFiring)
def _continueFiring(self, ignored: object) -> None:
@@ -512,10 +520,8 @@ class _ThreePhaseEvent:
for phase in self.during, self.after:
while phase:
callable, args, kwargs = phase.pop(0)
- try:
+ with _systemEventHandler:
callable(*args, **kwargs)
- except BaseException:
- log.err()
@implementer(IReactorPluggableNameResolver, IReactorPluggableResolver)
@@ -698,19 +704,13 @@ class ReactorBase(PluggableResolverMixin):
def mainLoop(self) -> None:
while self._started:
- try:
- while self._started:
- # Advance simulation time in delayed event
- # processors.
- self.runUntilCurrent()
- t2 = self.timeout()
- t = self.running and t2
- self.doIteration(t)
- except BaseException:
- log.msg("Unexpected error in main loop.")
- log.err()
- else:
- log.msg("Main loop terminated.") # type:ignore[unreachable]
+ with _topHandler:
+ # Advance simulation time in delayed event processors.
+ self.runUntilCurrent()
+ t2 = self.timeout()
+ t = self.running and t2
+ self.doIteration(t)
+ _log.info("Main loop terminated.")
# override in subclasses
@@ -815,7 +815,7 @@ class ReactorBase(PluggableResolverMixin):
@param number: See handler specification in L{signal.signal}
@param frame: See handler specification in L{signal.signal}
"""
- log.msg("Received SIGINT, shutting down.")
+ _log.info("Received SIGINT, shutting down.")
self.callFromThread(self.stop)
self._exitSignal = number
@@ -826,7 +826,7 @@ class ReactorBase(PluggableResolverMixin):
@param number: See handler specification in L{signal.signal}
@param frame: See handler specification in L{signal.signal}
"""
- log.msg("Received SIGBREAK, shutting down.")
+ _log.info("Received SIGBREAK, shutting down.")
self.callFromThread(self.stop)
self._exitSignal = number
@@ -837,7 +837,7 @@ class ReactorBase(PluggableResolverMixin):
@param number: See handler specification in L{signal.signal}
@param frame: See handler specification in L{signal.signal}
"""
- log.msg("Received SIGTERM, shutting down.")
+ _log.info("Received SIGTERM, shutting down.")
self.callFromThread(self.stop)
self._exitSignal = number
@@ -845,7 +845,7 @@ class ReactorBase(PluggableResolverMixin):
"""Disconnect every reader, and writer in the system."""
selectables = self.removeAll()
for reader in selectables:
- log.callWithLogger(
+ _callWithLogger(
reader, reader.connectionLost, Failure(main.CONNECTION_LOST)
)
@@ -1059,10 +1059,8 @@ class ReactorBase(PluggableResolverMixin):
count = 0
total = len(self.threadCallQueue)
for f, a, kw in self.threadCallQueue:
- try:
+ with _threadCallHandler:
f(*a, **kw)
- except BaseException:
- log.err()
count += 1
if count == total:
break
@@ -1085,21 +1083,20 @@ class ReactorBase(PluggableResolverMixin):
heappush(self._pendingTimedCalls, call)
continue
- try:
+ with _log.failuresHandled(
+ "while handling timed call {previous()}",
+ previous=lambda creator=call.creator: (
+ ""
+ if creator is None
+ else "\n"
+ + (" C: from a DelayedCall created here:\n")
+ + " C:"
+ + "".join(creator).rstrip().replace("\n", "\n C:")
+ + "\n"
+ ),
+ ):
call.called = 1
call.func(*call.args, **call.kw)
- except BaseException:
- log.err()
- if call.creator is not None:
- e = "\n"
- e += (
- " C: previous exception occurred in "
- + "a DelayedCall created here:\n"
- )
- e += " C:"
- e += "".join(call.creator).rstrip().replace("\n", "\n C:")
- e += "\n"
- log.msg(e)
if (
self._cancellations > 50
diff --git a/contrib/python/Twisted/py3/twisted/internet/cfreactor.py b/contrib/python/Twisted/py3/twisted/internet/cfreactor.py
index f686092e69..7556b41bf2 100644
--- a/contrib/python/Twisted/py3/twisted/internet/cfreactor.py
+++ b/contrib/python/Twisted/py3/twisted/internet/cfreactor.py
@@ -85,7 +85,7 @@ class _WakerPlus(_UnixWaker):
next timed iteration.
"""
result = super().doRead()
- self.reactor._scheduleSimulate(True)
+ self.reactor._scheduleSimulate()
return result
@@ -386,7 +386,7 @@ class CFReactor(PosixReactorBase):
# it's possible to use startRunning to *attach* a reactor to an
# already-running CFRunLoop, i.e. within a plugin for an application
# that doesn't otherwise use Twisted, rather than calling it via run().
- self._scheduleSimulate(force=True)
+ self._scheduleSimulate()
# [1]: readers & writers are still active in the loop, but arguably
# they should not be.
@@ -484,19 +484,10 @@ class CFReactor(PosixReactorBase):
CFRunLoopTimerInvalidate(self._currentSimulator)
self._currentSimulator = None
- def _scheduleSimulate(self, force: bool = False) -> None:
+ def _scheduleSimulate(self) -> None:
"""
Schedule a call to C{self.runUntilCurrent}. This will cancel the
currently scheduled call if it is already scheduled.
-
- @param force: Even if there are no timed calls, make sure that
- C{runUntilCurrent} runs immediately (in a 0-seconds-from-now
- C{CFRunLoopTimer}). This is necessary for calls which need to
- trigger behavior of C{runUntilCurrent} other than running timed
- calls, such as draining the thread call queue or calling C{crash()}
- when the appropriate flags are set.
-
- @type force: C{bool}
"""
self._stopSimulating()
if not self._started:
@@ -505,7 +496,10 @@ class CFReactor(PosixReactorBase):
# CFRunLoopTimers against the global CFRunLoop.
return
- timeout = 0.0 if force else self.timeout()
+ # runUntilCurrent acts on 3 things: _justStopped to process the
+ # side-effect of reactor.stop(), threadCallQueue to handle any calls
+ # from threads, and _pendingTimedCalls.
+ timeout = 0.0 if (self._justStopped or self.threadCallQueue) else self.timeout()
if timeout is None:
return
@@ -529,12 +523,12 @@ class CFReactor(PosixReactorBase):
self._scheduleSimulate()
return delayedCall
- def stop(self):
+ def stop(self) -> None:
"""
Implement L{IReactorCore.stop}.
"""
PosixReactorBase.stop(self)
- self._scheduleSimulate(True)
+ self._scheduleSimulate()
def crash(self):
"""
diff --git a/contrib/python/Twisted/py3/twisted/internet/defer.py b/contrib/python/Twisted/py3/twisted/internet/defer.py
index caafb52210..1c58baea7c 100644
--- a/contrib/python/Twisted/py3/twisted/internet/defer.py
+++ b/contrib/python/Twisted/py3/twisted/internet/defer.py
@@ -522,9 +522,6 @@ class Deferred(Awaitable[_SelfResultT]):
if errbackKeywords is None:
errbackKeywords = {} # type: ignore[unreachable]
- assert callable(callback)
- assert callable(errback)
-
self.callbacks.append(
(
(callback, callbackArgs, callbackKeywords),
@@ -873,7 +870,6 @@ class Deferred(Awaitable[_SelfResultT]):
@raise AlreadyCalledError: If L{callback} or L{errback} has already been
called on this L{Deferred}.
"""
- assert not isinstance(result, Deferred)
self._startRunCallbacks(result)
def errback(self, fail: Optional[Union[Failure, BaseException]] = None) -> None:
@@ -1094,30 +1090,33 @@ class Deferred(Awaitable[_SelfResultT]):
# expensive, so we avoid it unless self.debug is set.
current.result = Failure(captureVars=self.debug)
else:
- if isinstance(current.result, Deferred):
+ # isinstance() with Awaitable subclass is expensive:
+ if type(current.result) in _DEFERRED_SUBCLASSES:
+ # Can't use cast() cause it's in the performance hot path:
+ currentResult: Deferred[_SelfResultT] = current.result # type: ignore[assignment]
# The result is another Deferred. If it has a result,
# we can take it and keep going.
- resultResult = getattr(current.result, "result", _NO_RESULT)
+ resultResult = getattr(currentResult, "result", _NO_RESULT)
if (
resultResult is _NO_RESULT
- or isinstance(resultResult, Deferred)
- or current.result.paused
+ or type(resultResult) in _DEFERRED_SUBCLASSES
+ or currentResult.paused
):
# Nope, it didn't. Pause and chain.
current.pause()
- current._chainedTo = current.result
+ current._chainedTo = currentResult
# Note: current.result has no result, so it's not
# running its callbacks right now. Therefore we can
# append to the callbacks list directly instead of
# using addCallbacks.
- current.result.callbacks.append(current._continuation())
+ currentResult.callbacks.append(current._continuation())
break
else:
# Yep, it did. Steal it.
- current.result.result = None
+ currentResult.result = None
# Make sure _debugInfo's failure state is updated.
- if current.result._debugInfo is not None:
- current.result._debugInfo.failResult = None
+ if currentResult._debugInfo is not None:
+ currentResult._debugInfo.failResult = None
current.result = resultResult
if finished:
@@ -1326,6 +1325,14 @@ class Deferred(Awaitable[_SelfResultT]):
return _cancellableInlineCallbacks(coro)
raise NotACoroutineError(f"{coro!r} is not a coroutine")
+ def __init_subclass__(cls: Type[Deferred[Any]], **kwargs: Any):
+ # Whenever a subclass is created, record it in L{_DEFERRED_SUBCLASSES}
+ # so we can emulate C{isinstance()} more efficiently.
+ _DEFERRED_SUBCLASSES.append(cls)
+
+
+_DEFERRED_SUBCLASSES = [Deferred]
+
def ensureDeferred(
coro: Union[
@@ -1891,6 +1898,10 @@ class _DefGen_Return(BaseException):
self.value = value
+@deprecated(
+ Version("Twisted", 24, 7, 0),
+ replacement="standard return statement",
+)
def returnValue(val: object) -> NoReturn:
"""
Return val from a L{inlineCallbacks} generator.
@@ -2044,17 +2055,28 @@ def _inlineCallbacks(
# directly. returnValue itself consumes a stack frame, so the
# application code will have a tb_next, but it will *not* have a
# second tb_next.
+ #
+ # Note that there's one additional level due to returnValue being
+ # deprecated
assert appCodeTrace.tb_next is not None
- if appCodeTrace.tb_next.tb_next:
+ assert appCodeTrace.tb_next.tb_next is not None
+ if appCodeTrace.tb_next.tb_next.tb_next:
# If returnValue was invoked non-local to the frame which it is
# exiting, identify the frame that ultimately invoked
# returnValue so that we can warn the user, as this behavior is
# confusing.
+ #
+ # Note that there's one additional level due to returnValue being
+ # deprecated
ultimateTrace = appCodeTrace
assert ultimateTrace is not None
assert ultimateTrace.tb_next is not None
- while ultimateTrace.tb_next.tb_next:
+
+ # Note that there's one additional level due to returnValue being
+ # deprecated
+ assert ultimateTrace.tb_next.tb_next is not None
+ while ultimateTrace.tb_next.tb_next.tb_next:
ultimateTrace = ultimateTrace.tb_next
assert ultimateTrace is not None
@@ -2090,6 +2112,9 @@ def _inlineCallbacks(
status.deferred.callback(callbackValue)
return
+ if iscoroutine(result) or inspect.isgenerator(result):
+ result = _cancellableInlineCallbacks(result)
+
if isinstance(result, Deferred):
# a deferred was yielded, get the result.
result.addBoth(_gotResultInlineCallbacks, waiting, gen, status, context)
@@ -2204,17 +2229,15 @@ def inlineCallbacks(
Your inlineCallbacks-enabled generator will return a L{Deferred} object, which
will result in the return value of the generator (or will fail with a
- failure object if your generator raises an unhandled exception). Note that
- you can't use C{return result} to return a value; use C{returnValue(result)}
- instead. Falling off the end of the generator, or simply using C{return}
- will cause the L{Deferred} to have a result of L{None}.
+ failure object if your generator raises an unhandled exception). Inside
+ the generator simply use C{return result} to return a value.
- Be aware that L{returnValue} will not accept a L{Deferred} as a parameter.
+ Be aware that generator must not return a L{Deferred}.
If you believe the thing you'd like to return could be a L{Deferred}, do
this::
result = yield result
- returnValue(result)
+ return result
The L{Deferred} returned from your deferred generator may errback if your
generator raised an exception::
@@ -2224,23 +2247,27 @@ def inlineCallbacks(
thing = yield makeSomeRequestResultingInDeferred()
if thing == 'I love Twisted':
# will become the result of the Deferred
- returnValue('TWISTED IS GREAT!')
+ return 'TWISTED IS GREAT!'
else:
# will trigger an errback
raise Exception('DESTROY ALL LIFE')
- It is possible to use the C{return} statement instead of L{returnValue}::
-
- @inlineCallbacks
- def loadData(url):
- response = yield makeRequest(url)
- return json.loads(response)
-
You can cancel the L{Deferred} returned from your L{inlineCallbacks}
generator before it is fired by your generator completing (either by
reaching its end, a C{return} statement, or by calling L{returnValue}).
A C{CancelledError} will be raised from the C{yield}ed L{Deferred} that
has been cancelled if that C{Deferred} does not otherwise suppress it.
+
+ C{inlineCallbacks} behaves very similarly to coroutines. Since Twisted 24.7.0
+ it is possible to rewrite functions using C{inlineCallbacks} to C{async def}
+ in piecewise manner and be mostly compatible to existing code.
+
+ The rewrite process is simply replacing C{inlineCallbacks} decorator with
+ C{async def} and all C{yield} occurrences in the function body with C{await}.
+ The function will no longer return a C{Deferred} but a awaitable coroutine.
+ This return value will obviously not have C{Deferred} methods such as
+ C{addCallback}, but it will be possible to C{yield} it in other code based
+ on C{inlineCallbacks}.
"""
@wraps(f)
diff --git a/contrib/python/Twisted/py3/twisted/internet/posixbase.py b/contrib/python/Twisted/py3/twisted/internet/posixbase.py
index ed218b37f3..3be65752a2 100644
--- a/contrib/python/Twisted/py3/twisted/internet/posixbase.py
+++ b/contrib/python/Twisted/py3/twisted/internet/posixbase.py
@@ -39,7 +39,6 @@ from ._signals import (
)
# Exceptions that doSelect might return frequently
-_NO_FILENO = error.ConnectionFdescWentAway("Handler has no fileno method")
_NO_FILEDESC = error.ConnectionFdescWentAway("File descriptor lost")
diff --git a/contrib/python/Twisted/py3/twisted/internet/selectreactor.py b/contrib/python/Twisted/py3/twisted/internet/selectreactor.py
index 199dc40671..ee233f7f10 100644
--- a/contrib/python/Twisted/py3/twisted/internet/selectreactor.py
+++ b/contrib/python/Twisted/py3/twisted/internet/selectreactor.py
@@ -5,18 +5,18 @@
"""
Select reactor
"""
-
+from __future__ import annotations
import select
import sys
from errno import EBADF, EINTR
from time import sleep
-from typing import Type
+from typing import Callable, Type, TypeVar
from zope.interface import implementer
from twisted.internet import posixbase
-from twisted.internet.interfaces import IReactorFDSet
+from twisted.internet.interfaces import IReactorFDSet, IReadDescriptor, IWriteDescriptor
from twisted.python import log
from twisted.python.runtime import platformType
@@ -52,6 +52,36 @@ except ImportError:
else:
_extraBase = _ThreadedWin32EventsMixin
+_T = TypeVar("_T")
+
+
+def _onePreen(
+ toPreen: list[_T],
+ preenInto: set[_T],
+ disconnect: Callable[[_T, Exception, bool], None],
+) -> None:
+ preenInto.clear()
+ for selectable in toPreen:
+ try:
+ select.select([selectable], [selectable], [selectable], 0)
+ except Exception as e:
+ log.msg("bad descriptor %s" % selectable)
+ disconnect(selectable, e, False)
+ else:
+ preenInto.add(selectable)
+
+
+def _preenDescriptors(
+ reads: set[IReadDescriptor],
+ writes: set[IWriteDescriptor],
+ disconnect: Callable[[IReadDescriptor | IWriteDescriptor, Exception, bool], None],
+) -> None:
+ log.msg("Malformed file descriptor found. Preening lists.")
+ readers: list[IReadDescriptor] = list(reads)
+ writers: list[IWriteDescriptor] = list(writes)
+ _onePreen(readers, reads, disconnect)
+ _onePreen(writers, writes, disconnect)
+
@implementer(IReactorFDSet)
class SelectReactor(posixbase.PosixReactorBase, _extraBase): # type: ignore[misc,valid-type]
@@ -65,29 +95,16 @@ class SelectReactor(posixbase.PosixReactorBase, _extraBase): # type: ignore[mis
checked for writability.
"""
- def __init__(self):
+ def __init__(self) -> None:
"""
Initialize file descriptor tracking dictionaries and the base class.
"""
- self._reads = set()
- self._writes = set()
+ self._reads: set[IReadDescriptor] = set()
+ self._writes: set[IWriteDescriptor] = set()
posixbase.PosixReactorBase.__init__(self)
- def _preenDescriptors(self):
- log.msg("Malformed file descriptor found. Preening lists.")
- readers = list(self._reads)
- writers = list(self._writes)
- self._reads.clear()
- self._writes.clear()
- for selSet, selList in ((self._reads, readers), (self._writes, writers)):
- for selectable in selList:
- try:
- select.select([selectable], [selectable], [selectable], 0)
- except Exception as e:
- log.msg("bad descriptor %s" % selectable)
- self._disconnectSelectable(selectable, e, False)
- else:
- selSet.add(selectable)
+ def _preenDescriptors(self) -> None:
+ _preenDescriptors(self._reads, self._writes, self._disconnectSelectable)
def doSelect(self, timeout):
"""
@@ -136,7 +153,7 @@ class SelectReactor(posixbase.PosixReactorBase, _extraBase): # type: ignore[mis
for selectable in selectables:
# if this was disconnected in another thread, kill it.
# ^^^^ --- what the !@#*? serious! -exarkun
- if selectable not in fdset:
+ if selectable not in fdset: # type:ignore[operator]
continue
# This for pausing input when we're not ready for more.
_logrun(selectable, _drdw, selectable, method)
diff --git a/contrib/python/Twisted/py3/twisted/logger/__init__.py b/contrib/python/Twisted/py3/twisted/logger/__init__.py
index 62f2f71f4e..32a91cd0b0 100644
--- a/contrib/python/Twisted/py3/twisted/logger/__init__.py
+++ b/contrib/python/Twisted/py3/twisted/logger/__init__.py
@@ -56,6 +56,7 @@ __all__ = [
"LogEvent",
# From ._logger
"Logger",
+ "Operation",
"_loggerFor",
# From ._observer
"LogPublisher",
@@ -102,7 +103,7 @@ from ._format import (
from ._interfaces import ILogObserver, LogEvent
-from ._logger import Logger, _loggerFor
+from ._logger import Logger, _loggerFor, Operation
from ._observer import LogPublisher
diff --git a/contrib/python/Twisted/py3/twisted/logger/_json.py b/contrib/python/Twisted/py3/twisted/logger/_json.py
index 0cc05ce501..aa837ded75 100644
--- a/contrib/python/Twisted/py3/twisted/logger/_json.py
+++ b/contrib/python/Twisted/py3/twisted/logger/_json.py
@@ -55,7 +55,7 @@ def failureFromJSON(failureDict: JSONDict) -> Failure:
f = Failure.__new__(Failure)
typeInfo = failureDict["type"]
failureDict["type"] = type(typeInfo["__name__"], (), typeInfo)
- f.__dict__ = failureDict
+ f.__setstate__(failureDict)
return f
diff --git a/contrib/python/Twisted/py3/twisted/logger/_logger.py b/contrib/python/Twisted/py3/twisted/logger/_logger.py
index cc428d87af..b7f3e2e121 100644
--- a/contrib/python/Twisted/py3/twisted/logger/_logger.py
+++ b/contrib/python/Twisted/py3/twisted/logger/_logger.py
@@ -6,8 +6,11 @@
Logger class.
"""
+from __future__ import annotations
+
from time import time
-from typing import Any, Optional, cast
+from types import TracebackType
+from typing import Any, Callable, ContextManager, Optional, Protocol, cast
from twisted.python.compat import currentframe
from twisted.python.failure import Failure
@@ -15,6 +18,87 @@ from ._interfaces import ILogObserver, LogTrace
from ._levels import InvalidLogLevelError, LogLevel
+class Operation(Protocol):
+ """
+ An L{Operation} represents the success (or lack thereof) of code performed
+ within the body of the L{Logger.failureHandler} context manager.
+ """
+
+ @property
+ def succeeded(self) -> bool:
+ """
+ Did the operation succeed? C{True} iff the code completed without
+ raising an exception; C{False} while the code is running and C{False}
+ if it raises an exception.
+ """
+
+ @property
+ def failure(self) -> Failure | None:
+ """
+ Did the operation raise an exception? If so, this L{Failure} is that
+ exception.
+ """
+
+ @property
+ def failed(self) -> bool:
+ """
+ Did the operation fail? C{True} iff the code raised an exception;
+ C{False} while the code is running and C{False} if it completed without
+ error.
+ """
+
+
+class _FailCtxMgr:
+ succeeded: bool = False
+ failure: Failure | None = None
+
+ def __init__(self, fail: Callable[[Failure], None]) -> None:
+ self._fail = fail
+
+ @property
+ def failed(self) -> bool:
+ return self.failure is not None
+
+ def __enter__(self) -> Operation:
+ return self
+
+ def __exit__(
+ self,
+ exc_type: type[BaseException] | None,
+ exc_value: BaseException | None,
+ traceback: TracebackType | None,
+ /,
+ ) -> bool:
+ if exc_type is not None:
+ failure = Failure()
+ self.failure = failure
+ self._fail(failure)
+ else:
+ self.succeeded = True
+ return True
+
+
+class _FastFailCtxMgr:
+ def __init__(self, fail: Callable[[Failure], None]) -> None:
+ self._fail = fail
+
+ def __enter__(self) -> None:
+ pass
+
+ def __exit__(
+ self,
+ exc_type: type[BaseException] | None,
+ exc_value: BaseException | None,
+ traceback: TracebackType | None,
+ /,
+ ) -> bool:
+ if exc_type is not None:
+ failure = Failure()
+ self.failure = failure
+ self._fail(failure)
+ return True
+
+
class Logger:
"""
A L{Logger} emits log messages to an observer. You should instantiate it
@@ -164,25 +248,32 @@ class Logger:
d.addErrback(lambda f: log.failure("While frobbing {knob}",
f, knob=knob))
- This method is generally meant to capture unexpected exceptions in
- code; an exception that is caught and handled somehow should be logged,
- if appropriate, via L{Logger.error} instead. If some unknown exception
+ This method is meant to capture unexpected exceptions in code; an
+ exception that is caught and handled somehow should be logged, if
+ appropriate, via L{Logger.error} instead. If some unknown exception
occurs and your code doesn't know how to handle it, as in the above
- example, then this method provides a means to describe the failure in
- nerd-speak. This is done at L{LogLevel.critical} by default, since no
- corrective guidance can be offered to an user/administrator, and the
- impact of the condition is unknown.
+ example, then this method provides a means to describe the failure.
+ This is done at L{LogLevel.critical} by default, since no corrective
+ guidance can be offered to an user/administrator, and the impact of the
+ condition is unknown.
@param format: a message format using new-style (PEP 3101) formatting.
The logging event (which is a L{dict}) is used to render this
format string.
+
@param failure: a L{Failure} to log. If L{None}, a L{Failure} is
created from the exception in flight.
+
@param level: a L{LogLevel} to use.
+
@param kwargs: additional key/value pairs to include in the event.
Note that values which are later mutated may result in
non-deterministic behavior from observers that schedule work for
later execution.
+
+ @see: L{Logger.failureHandler}
+
+ @see: L{Logger.failuresHandled}
"""
if failure is None:
failure = Failure()
@@ -264,6 +355,101 @@ class Logger:
"""
self.emit(LogLevel.critical, format, **kwargs)
+ def failuresHandled(
+ self, format: str, level: LogLevel = LogLevel.critical, **kwargs: object
+ ) -> ContextManager[Operation]:
+ """
+ Run some application code, logging a failure and emitting a traceback
+ in the event that any of it fails, but continuing on. For example::
+
+ log = Logger(...)
+
+ def frameworkCode() -> None:
+ with log.failuresHandled("While frobbing {knob}:", knob=knob) as op:
+ frob(knob)
+ if op.succeeded:
+ log.info("frobbed {knob} successfully", knob=knob)
+
+ This method is meant to capture unexpected exceptions from application
+ code; an exception that is caught and handled somehow should be logged,
+ if appropriate, via L{Logger.error} instead. If some unknown exception
+ occurs and your code doesn't know how to handle it, as in the above
+ example, then this method provides a means to describe the failure.
+ This is done at L{LogLevel.critical} by default, since no corrective
+ guidance can be offered to an user/administrator, and the impact of the
+ condition is unknown.
+
+ @param format: a message format using new-style (PEP 3101) formatting.
+ The logging event (which is a L{dict}) is used to render this
+ format string.
+
+ @param level: a L{LogLevel} to use.
+
+ @param kwargs: additional key/value pairs to include in the event, if
+ it is emitted. Note that values which are later mutated may result
+ in non-deterministic behavior from observers that schedule work for
+ later execution.
+
+ @see: L{Logger.failure}
+ @see: L{Logger.failureHandler}
+
+ @return: A context manager which yields an L{Operation} which will have
+ either its C{succeeded} or C{failed} attribute set to C{True} upon
+ completion of the code within the code within the C{with} block.
+ """
+ return _FailCtxMgr(lambda f: self.failure(format, f, level, **kwargs))
+
+ def failureHandler(
+ self,
+ staticMessage: str,
+ level: LogLevel = LogLevel.critical,
+ ) -> ContextManager[None]:
+ """
+ For performance-sensitive frameworks that needs to handle potential
+ failures from frequently-called application code, and do not need to
+ include detailed structured information about the failure nor inspect
+ the result of the operation, this method returns a context manager that
+ will log exceptions and continue, that can be shared across multiple
+ invocations. It should be instantiated at module scope to avoid
+ additional object creations.
+
+ For example::
+
+ log = Logger(...)
+ ignoringFrobErrors = log.failureHandler("while frobbing:")
+
+ def hotLoop() -> None:
+ with ignoringFrobErrors:
+ frob()
+
+ This method is meant to capture unexpected exceptions from application
+ code; an exception that is caught and handled somehow should be logged,
+ if appropriate, via L{Logger.error} instead. If some unknown exception
+ occurs and your code doesn't know how to handle it, as in the above
+ example, then this method provides a means to describe the failure in
+ nerd-speak. This is done at L{LogLevel.critical} by default, since no
+ corrective guidance can be offered to an user/administrator, and the
+ impact of the condition is unknown.
+
+ @param format: a message format using new-style (PEP 3101) formatting.
+ The logging event (which is a L{dict}) is used to render this
+ format string.
+
+ @param level: a L{LogLevel} to use.
+
+ @see: L{Logger.failure}
+
+ @return: A context manager which does not return a value, but will
+ always exit from exceptions.
+ """
+ return _FastFailCtxMgr(lambda f: self.failure(staticMessage, f, level))
+
_log = Logger()
-_loggerFor = lambda obj: _log.__get__(obj, obj.__class__)
+
+
+def _loggerFor(obj: object) -> Logger:
+ """
+ Get a L{Logger} instance attached to the given class.
+ """
+ return _log.__get__(obj, obj.__class__)
diff --git a/contrib/python/Twisted/py3/twisted/mail/imap4.py b/contrib/python/Twisted/py3/twisted/mail/imap4.py
index 032624e3db..9a9f140795 100644
--- a/contrib/python/Twisted/py3/twisted/mail/imap4.py
+++ b/contrib/python/Twisted/py3/twisted/mail/imap4.py
@@ -80,7 +80,6 @@ from twisted.mail.interfaces import (
from twisted.protocols import basic, policies
from twisted.python import log, text
from twisted.python.compat import (
- _get_async_param,
_matchingString,
iterbytes,
nativeString,
@@ -1076,8 +1075,7 @@ class IMAP4Server(basic.LineReceiver, policies.TimeoutMixin):
def sendNegativeResponse(self, tag=None, message=b""):
self._respond(b"NO", tag, message)
- def sendUntaggedResponse(self, message, isAsync=None, **kwargs):
- isAsync = _get_async_param(isAsync, **kwargs)
+ def sendUntaggedResponse(self, message, isAsync=None):
if not isAsync or (self.blocked is None):
self._respond(message, None, None)
else:
diff --git a/contrib/python/Twisted/py3/twisted/protocols/amp.py b/contrib/python/Twisted/py3/twisted/protocols/amp.py
index ac28a92bfa..8b80982d2a 100644
--- a/contrib/python/Twisted/py3/twisted/protocols/amp.py
+++ b/contrib/python/Twisted/py3/twisted/protocols/amp.py
@@ -222,8 +222,9 @@ from twisted.internet.error import ConnectionClosed, ConnectionLost, PeerVerifyE
from twisted.internet.interfaces import IFileDescriptorReceiver
from twisted.internet.main import CONNECTION_LOST
from twisted.internet.protocol import Protocol
+from twisted.logger import Logger
from twisted.protocols.basic import Int16StringReceiver, StatefulStringProtocol
-from twisted.python import filepath, log
+from twisted.python import filepath
from twisted.python._tzhelper import (
UTC as utc,
FixedOffsetTimeZone as _FixedOffsetTZInfo,
@@ -301,7 +302,7 @@ __all__ = [
"parseString",
]
-
+_log = Logger()
_T_Callable = TypeVar("_T_Callable", bound=Callable[..., object])
@@ -997,7 +998,8 @@ class BoxDispatcher:
answerBox[ANSWER] = box[ASK]
return answerBox
- def formatError(error):
+ def formatError(error: Failure) -> AmpBox:
+ errorBox: AmpBox
if error.check(RemoteAmpError):
code = error.value.errorCode
desc = error.value.description
@@ -1009,7 +1011,7 @@ class BoxDispatcher:
errorBox = AmpBox()
else:
errorBox = QuitBox()
- log.err(error) # here is where server-side logging happens
+ _log.failure("while receiving response to command", error)
# if the error isn't handled
code = UNKNOWN_ERROR_CODE
desc = b"Unknown Error"
@@ -2512,16 +2514,16 @@ class BinaryBoxProtocol(
return None
return Certificate.peerFromTransport(self.transport)
- def unhandledError(self, failure):
+ def unhandledError(self, failure: Failure) -> None:
"""
The buck stops here. This error was completely unhandled, time to
terminate the connection.
"""
- log.err(
- failure,
+ _log.failure(
"Amp server or network failure unhandled by client application. "
"Dropping connection! To avoid, add errbacks to ALL remote "
"commands!",
+ failure,
)
if self.transport is not None:
self.transport.loseConnection()
@@ -2600,9 +2602,11 @@ class AMP(BinaryBoxProtocol, BoxDispatcher, CommandLocator, SimpleStringLocator)
# Save these so we can emit a similar log message in L{connectionLost}.
self._transportPeer = transport.getPeer()
self._transportHost = transport.getHost()
- log.msg(
- "%s connection established (HOST:%s PEER:%s)"
- % (self.__class__.__name__, self._transportHost, self._transportPeer)
+ _log.info(
+ "{cls} connection established (HOST:{host} PEER:{peer})",
+ cls=self.__class__.__name__,
+ host=self._transportHost,
+ peer=self._transportPeer,
)
BinaryBoxProtocol.makeConnection(self, transport)
@@ -2610,9 +2614,11 @@ class AMP(BinaryBoxProtocol, BoxDispatcher, CommandLocator, SimpleStringLocator)
"""
Emit a helpful log message when the connection is lost.
"""
- log.msg(
- "%s connection lost (HOST:%s PEER:%s)"
- % (self.__class__.__name__, self._transportHost, self._transportPeer)
+ _log.info(
+ "{cls} connection lost (HOST:{host} PEER:{peer})",
+ cls=self.__class__.__name__,
+ host=self._transportHost,
+ peer=self._transportPeer,
)
BinaryBoxProtocol.connectionLost(self, reason)
self.transport = None
diff --git a/contrib/python/Twisted/py3/twisted/protocols/ftp.py b/contrib/python/Twisted/py3/twisted/protocols/ftp.py
index ad2f506c98..8ac115c7fa 100644
--- a/contrib/python/Twisted/py3/twisted/protocols/ftp.py
+++ b/contrib/python/Twisted/py3/twisted/protocols/ftp.py
@@ -6,10 +6,10 @@
An FTP protocol implementation
"""
+# System Imports
import errno
import fnmatch
-
-# System Imports
+import ipaddress
import os
import re
import stat
@@ -41,6 +41,7 @@ FILE_STATUS_OK_OPEN_DATA_CNX = "150"
CMD_OK = "200.1"
TYPE_SET_OK = "200.2"
ENTERING_PORT_MODE = "200.3"
+EPSV_ALL_OK = "200.4"
CMD_NOT_IMPLMNTD_SUPERFLUOUS = "202"
SYS_STATUS_OR_HELP_REPLY = "211.1"
FEAT_OK = "211.2"
@@ -80,8 +81,10 @@ SYNTAX_ERR = "500"
SYNTAX_ERR_IN_ARGS = "501"
CMD_NOT_IMPLMNTD = "502.1"
OPTS_NOT_IMPLEMENTED = "502.2"
+PASV_IPV6_NOT_IMPLEMENTED = "502.3"
BAD_CMD_SEQ = "503"
CMD_NOT_IMPLMNTD_FOR_PARAM = "504"
+UNSUPPORTED_NETWORK_PROTOCOL = "522"
NOT_LOGGED_IN = "530.1" # v1 of code 530 - please log in
AUTH_FAILURE = "530.2" # v2 of code 530 - authorization failure
NEED_ACCT_FOR_STOR = "532"
@@ -110,6 +113,7 @@ RESPONSE = {
CMD_OK: "200 Command OK",
TYPE_SET_OK: "200 Type set to %s.",
ENTERING_PORT_MODE: "200 PORT OK",
+ EPSV_ALL_OK: "200 EPSV ALL OK",
CMD_NOT_IMPLMNTD_SUPERFLUOUS: "202 Command not implemented, "
"superfluous at this site",
SYS_STATUS_OR_HELP_REPLY: "211 System status reply",
@@ -127,8 +131,8 @@ RESPONSE = {
CLOSING_DATA_CNX: "226 Abort successful",
TXFR_COMPLETE_OK: "226 Transfer Complete.",
ENTERING_PASV_MODE: "227 Entering Passive Mode (%s).",
- # Where is EPSV defined in the RFCs?
- ENTERING_EPSV_MODE: "229 Entering Extended Passive Mode " "(|||%s|).",
+ # RFC 2428 section 3
+ ENTERING_EPSV_MODE: "229 Entering Extended Passive Mode (|||%s|).",
USR_LOGGED_IN_PROCEED: "230 User logged in, proceed",
GUEST_LOGGED_IN_PROCEED: "230 Anonymous login ok, access " "restrictions apply.",
# i.e. CWD completed OK
@@ -159,8 +163,11 @@ RESPONSE = {
SYNTAX_ERR_IN_ARGS: "501 syntax error in argument(s) %s.",
CMD_NOT_IMPLMNTD: "502 Command '%s' not implemented",
OPTS_NOT_IMPLEMENTED: "502 Option '%s' not implemented.",
+ PASV_IPV6_NOT_IMPLEMENTED: "502 PASV available only for IPv4 (use EPSV instead)",
BAD_CMD_SEQ: "503 Incorrect sequence of commands: " "%s",
CMD_NOT_IMPLMNTD_FOR_PARAM: "504 Not implemented for parameter " "'%s'.",
+ # RFC 2428 section 2
+ UNSUPPORTED_NETWORK_PROTOCOL: "522 Network protocol not supported, use (%s)",
NOT_LOGGED_IN: "530 Please login with USER and PASS.",
AUTH_FAILURE: "530 Sorry, Authentication failed.",
NEED_ACCT_FOR_STOR: "532 Need an account for storing " "files",
@@ -178,6 +185,16 @@ RESPONSE = {
}
+# IANA address family numbers
+# (https://www.iana.org/assignments/address-family-numbers).
+# We only handle IP and IP6 at the moment.
+#
+# If these are needed by other parts of Twisted then they should be moved
+# somewhere more central, filled out, and exported.
+_AFNUM_IP = 1
+_AFNUM_IP6 = 2
+
+
class InvalidPath(Exception):
"""
Internal exception used to signify an error during parsing a path.
@@ -363,6 +380,14 @@ class CmdNotImplementedForArgError(FTPCmdError):
errorCode = CMD_NOT_IMPLMNTD_FOR_PARAM
+class PASVIPv6NotImplementedError(FTPCmdError):
+ """
+ Raised when PASV is used with IPv6.
+ """
+
+ errorCode = PASV_IPV6_NOT_IMPLEMENTED
+
+
class FTPError(Exception):
pass
@@ -387,6 +412,14 @@ class AuthorizationError(FTPCmdError):
errorCode = AUTH_FAILURE
+class UnsupportedNetworkProtocolError(FTPCmdError):
+ """
+ Raised when the client requests an unsupported network protocol.
+ """
+
+ errorCode = UNSUPPORTED_NETWORK_PROTOCOL
+
+
def debugDeferred(self, *_):
log.msg("debugDeferred(): %s" % str(_), debug=True)
@@ -718,6 +751,11 @@ class FTP(basic.LineReceiver, policies.TimeoutMixin):
@ivar listenFactory: A callable with the signature of
L{twisted.internet.interfaces.IReactorTCP.listenTCP} which will be used
to create Ports for passive connections (mainly for testing).
+ @ivar _epsvAll: If true, "EPSV ALL" was received from the client,
+ requiring the server to reject all data connection setup commands
+ other than EPSV. See RFC 2428.
+ @ivar _supportedNetworkProtocols: A collection of network protocol
+ numbers supported by the EPRT and EPSV commands.
@ivar passivePortRange: iterator used as source of passive port numbers.
@type passivePortRange: C{iterator}
@@ -749,6 +787,8 @@ class FTP(basic.LineReceiver, policies.TimeoutMixin):
dtpPort = None
dtpInstance = None
binary = True
+ _epsvAll = False
+ _supportedNetworkProtocols = (_AFNUM_IP, _AFNUM_IP6)
PUBLIC_COMMANDS = ["FEAT", "QUIT"]
FEATURES = ["FEAT", "MDTM", "PASV", "SIZE", "TYPE A;I"]
@@ -786,6 +826,7 @@ class FTP(basic.LineReceiver, policies.TimeoutMixin):
if hasattr(self.shell, "logout") and self.shell.logout is not None:
self.shell.logout()
self.shell = None
+ self._epsvAll = False
self.transport = None
def timeoutConnection(self):
@@ -794,8 +835,7 @@ class FTP(basic.LineReceiver, policies.TimeoutMixin):
def lineReceived(self, line):
self.resetTimeout()
self.pauseProducing()
- if bytes != str:
- line = line.decode(self._encoding)
+ line = line.decode(self._encoding)
def processFailed(err):
if err.check(FTPCmdError):
@@ -873,14 +913,21 @@ class FTP(basic.LineReceiver, policies.TimeoutMixin):
else:
return BAD_CMD_SEQ, "RNTO required after RNFR"
- def getDTPPort(self, factory):
+ def getDTPPort(self, factory, interface=""):
"""
Return a port for passive access, using C{self.passivePortRange}
attribute.
+
+ @param factory: the protocol factory to connect to the port.
+ @type factory: L{twisted.internet.protocol.ServerFactory}
+
+ @param interface: the local IPv4 or IPv6 address to which to bind;
+ defaults to "", i.e. all IPv4 addresses.
+ @type interface: C{str}
"""
for portn in self.passivePortRange:
try:
- dtpPort = self.listenFactory(portn, factory)
+ dtpPort = self.listenFactory(portn, factory, interface=interface)
except error.CannotListenError:
continue
else:
@@ -952,6 +999,28 @@ class FTP(basic.LineReceiver, policies.TimeoutMixin):
response to this command includes the host and port address this
server is listening on.
"""
+ if self._epsvAll:
+ return defer.fail(BadCmdSequenceError("may not send PASV after EPSV ALL"))
+
+ host = self.transport.getHost().host
+ try:
+ address = ipaddress.IPv6Address(host)
+ except ipaddress.AddressValueError:
+ pass
+ else:
+ if address.ipv4_mapped is not None:
+ # IPv4-mapped addresses are usable, but we need to make sure
+ # they're encoded as IPv4 in the response.
+ host = str(address.ipv4_mapped)
+ else:
+ # There's no standard defining the behaviour of PASV with
+ # IPv6, so just claim it as unimplemented. (Some servers
+ # return something like '0,0,0,0' in the host part of the
+ # response in order that at least clients that ignore the
+ # host part can work, and if it becomes necessary then we
+ # could do that too.)
+ return defer.fail(PASVIPv6NotImplementedError())
+
# if we have a DTP port set up, lose it.
if self.dtpFactory is not None:
# cleanupDTP sets dtpFactory to none. Later we'll do
@@ -961,12 +1030,81 @@ class FTP(basic.LineReceiver, policies.TimeoutMixin):
self.dtpFactory.setTimeout(self.dtpTimeout)
self.dtpPort = self.getDTPPort(self.dtpFactory)
- host = self.transport.getHost().host
port = self.dtpPort.getHost().port
self.reply(ENTERING_PASV_MODE, encodeHostPort(host, port))
return self.dtpFactory.deferred.addCallback(lambda ign: None)
+ def _validateNetworkProtocol(self, protocol):
+ """
+ Validate the network protocol requested in an EPRT or EPSV command.
+
+ For now we just hardcode the protocols we support, since this layer
+ doesn't have a good way to discover that.
+
+ @param protocol: An address family number. See RFC 2428 section 2.
+ @type protocol: L{str}
+
+ @raise FTPCmdError: If validation fails.
+ """
+ # We can't actually honour an explicit network protocol request
+ # (violating a SHOULD in RFC 2428 section 3), but let's at least
+ # validate it.
+ try:
+ protocol = int(protocol)
+ except ValueError:
+ raise CmdArgSyntaxError(protocol)
+ if protocol not in self._supportedNetworkProtocols:
+ raise UnsupportedNetworkProtocolError(
+ ",".join(str(p) for p in self._supportedNetworkProtocols)
+ )
+
+ def ftp_EPSV(self, protocol=""):
+ """
+ Extended request for a passive connection.
+
+ As described by U{RFC 2428 section
+ 3<https://tools.ietf.org/html/rfc2428#section-3>}::
+
+ The EPSV command requests that a server listen on a data port
+ and wait for a connection. The EPSV command takes an optional
+ argument. The response to this command includes only the TCP
+ port number of the listening connection. The format of the
+ response, however, is similar to the argument of the EPRT
+ command. This allows the same parsing routines to be used for
+ both commands. In addition, the format leaves a place holder
+ for the network protocol and/or network address, which may be
+ needed in the EPSV response in the future.
+ """
+ if protocol == "ALL":
+ self._epsvAll = True
+ return EPSV_ALL_OK
+ elif protocol:
+ try:
+ self._validateNetworkProtocol(protocol)
+ except FTPCmdError:
+ return defer.fail()
+
+ # if we have a DTP port set up, lose it.
+ if self.dtpFactory is not None:
+ # cleanupDTP sets dtpFactory to none. Later we'll do
+ # cleanup here or something.
+ self.cleanupDTP()
+ self.dtpFactory = DTPFactory(pi=self)
+ self.dtpFactory.setTimeout(self.dtpTimeout)
+ if not protocol or protocol == _AFNUM_IP6:
+ interface = "::"
+ else:
+ interface = ""
+ self.dtpPort = self.getDTPPort(self.dtpFactory, interface=interface)
+
+ port = self.dtpPort.getHost().port
+ self.reply(ENTERING_EPSV_MODE, port)
+ return self.dtpFactory.deferred.addCallback(lambda ign: None)
+
def ftp_PORT(self, address):
+ if self._epsvAll:
+ return defer.fail(BadCmdSequenceError("may not send PORT after EPSV ALL"))
+
addr = tuple(map(int, address.split(",")))
ip = "%d.%d.%d.%d" % tuple(addr[:4])
port = addr[4] << 8 | addr[5]
@@ -988,6 +1126,48 @@ class FTP(basic.LineReceiver, policies.TimeoutMixin):
return self.dtpFactory.deferred.addCallbacks(connected, connFailed)
+ def ftp_EPRT(self, extendedAddress):
+ """
+ Extended request for a data connection.
+
+ As described by U{RFC 2428 section
+ 2<https://tools.ietf.org/html/rfc2428#section-2>}::
+
+ The EPRT command allows for the specification of an extended
+ address for the data connection. The extended address MUST
+ consist of the network protocol as well as the network and
+ transport addresses.
+ """
+ if self._epsvAll:
+ return defer.fail(BadCmdSequenceError("may not send EPRT after EPSV ALL"))
+
+ try:
+ protocol, ip, port = decodeExtendedAddress(extendedAddress)
+ except ValueError:
+ return defer.fail(CmdArgSyntaxError(extendedAddress))
+ if protocol:
+ try:
+ self._validateNetworkProtocol(protocol)
+ except FTPCmdError:
+ return defer.fail()
+
+ # if we have a DTP port set up, lose it.
+ if self.dtpFactory is not None:
+ self.cleanupDTP()
+
+ self.dtpFactory = DTPFactory(pi=self, peerHost=self.transport.getPeer().host)
+ self.dtpFactory.setTimeout(self.dtpTimeout)
+ self.dtpPort = reactor.connectTCP(ip, port, self.dtpFactory)
+
+ def connected(ignored):
+ return ENTERING_PORT_MODE
+
+ def connFailed(err):
+ err.trap(PortConnectionError)
+ return CANT_OPEN_DATA_CNX
+
+ return self.dtpFactory.deferred.addCallbacks(connected, connFailed)
+
def _encodeName(self, name):
"""
Encode C{name} to be sent over the wire.
@@ -1176,7 +1356,7 @@ class FTP(basic.LineReceiver, policies.TimeoutMixin):
@return: a L{Deferred} which will be fired when the transfer is done.
"""
if self.dtpInstance is None:
- raise BadCmdSequenceError("PORT or PASV required before RETR")
+ raise BadCmdSequenceError("PORT, PASV, EPRT, or EPSV required before RETR")
try:
newsegs = toSegments(self.workingDirectory, path)
@@ -1250,7 +1430,7 @@ class FTP(basic.LineReceiver, policies.TimeoutMixin):
pathname does not already exist.
"""
if self.dtpInstance is None:
- raise BadCmdSequenceError("PORT or PASV required before STOR")
+ raise BadCmdSequenceError("PORT, PASV, EPRT, or EPSV required before STOR")
try:
newsegs = toSegments(self.workingDirectory, path)
@@ -2421,6 +2601,18 @@ def encodeHostPort(host, port):
return ",".join(numbers)
+def decodeExtendedAddress(address):
+ """
+ Decode an FTP protocol/address/port combination, using the syntax
+ defined in RFC 2428 section 2.
+
+ @return: a 3-tuple of (protocol, host, port).
+ """
+ delim = address[0]
+ protocol, host, port, _ = address[1:].split(delim)
+ return protocol, host, int(port)
+
+
def _unwrapFirstError(failure):
failure.trap(defer.FirstError)
return failure.value.subFailure
@@ -2611,8 +2803,7 @@ class FTPClientBasic(basic.LineReceiver):
(Private) Parses the response messages from the FTP server.
"""
# Add this line to the current response
- if bytes != str:
- line = line.decode(self._encoding)
+ line = line.decode(self._encoding)
if self.debug:
log.msg("--> %s" % line)
@@ -3177,8 +3368,7 @@ class FTPFileListProtocol(basic.LineReceiver):
self.files = []
def lineReceived(self, line):
- if bytes != str:
- line = line.decode(self._encoding)
+ line = line.decode(self._encoding)
d = self.parseDirectoryLine(line)
if d is None:
self.unknownLine(line)
diff --git a/contrib/python/Twisted/py3/twisted/python/_shellcomp.py b/contrib/python/Twisted/py3/twisted/python/_shellcomp.py
index e36620210b..9c9a46a8d4 100644
--- a/contrib/python/Twisted/py3/twisted/python/_shellcomp.py
+++ b/contrib/python/Twisted/py3/twisted/python/_shellcomp.py
@@ -603,7 +603,9 @@ class ZshArgumentsGenerator:
obj = getattr(self.options, "opt_%s" % longMangled, None)
if obj is not None:
descr = descrFromDoc(obj)
- if descr is not None:
+ # On Python3.13 we have an empty string instead of None,
+ # for missing description.
+ if descr:
return descr
return longname # we really ought to have a good description to use
diff --git a/contrib/python/Twisted/py3/twisted/python/compat.py b/contrib/python/Twisted/py3/twisted/python/compat.py
index 5766ccf648..fc1e5752f3 100644
--- a/contrib/python/Twisted/py3/twisted/python/compat.py
+++ b/contrib/python/Twisted/py3/twisted/python/compat.py
@@ -27,7 +27,6 @@ import os
import platform
import socket
import urllib.parse as urllib_parse
-import warnings
from collections.abc import Sequence
from functools import reduce
from html import escape
@@ -497,35 +496,6 @@ def _constructMethod(cls, name, self):
return _MethodType(func, self)
-def _get_async_param(isAsync=None, **kwargs):
- """
- Provide a backwards-compatible way to get async param value that does not
- cause a syntax error under Python 3.7.
-
- @param isAsync: isAsync param value (should default to None)
- @type isAsync: L{bool}
-
- @param kwargs: keyword arguments of the caller (only async is allowed)
- @type kwargs: L{dict}
-
- @raise TypeError: Both isAsync and async specified.
-
- @return: Final isAsync param value
- @rtype: L{bool}
- """
- if "async" in kwargs:
- warnings.warn(
- "'async' keyword argument is deprecated, please use isAsync",
- DeprecationWarning,
- stacklevel=2,
- )
- if isAsync is None and "async" in kwargs:
- isAsync = kwargs.pop("async")
- if kwargs:
- raise TypeError
- return bool(isAsync)
-
-
def _pypy3BlockingHack():
"""
Work around U{https://foss.heptapod.net/pypy/pypy/-/issues/3051}
@@ -645,6 +615,5 @@ __all__ = [
"intern",
"unichr",
"raw_input",
- "_get_async_param",
"Sequence",
]
diff --git a/contrib/python/Twisted/py3/twisted/python/deprecate.py b/contrib/python/Twisted/py3/twisted/python/deprecate.py
index c5df556f70..aba096d59c 100644
--- a/contrib/python/Twisted/py3/twisted/python/deprecate.py
+++ b/contrib/python/Twisted/py3/twisted/python/deprecate.py
@@ -47,7 +47,8 @@ To deprecate properties you can use::
'''
-To mark module-level attributes as being deprecated you can use::
+While it's best to avoid this as it adds performance overhead to *any* usage of
+the module, to mark module-level attributes as being deprecated you can use::
badAttribute = "someValue"
@@ -258,8 +259,13 @@ def _appendToDocstring(thingWithDoc, textToAppend):
elif len(docstringLines) == 1:
docstringLines.extend(["", textToAppend, ""])
else:
- spaces = docstringLines.pop()
+ trailer = docstringLines[-1]
+ spaces = ""
+ if not trailer.strip():
+ # Deal with differences between Python 3.13 and older versions.
+ spaces = docstringLines.pop()
docstringLines.extend(["", spaces + textToAppend, spaces])
+ docstringLines = [l.lstrip(" ") for l in docstringLines]
thingWithDoc.__doc__ = "\n".join(docstringLines)
diff --git a/contrib/python/Twisted/py3/twisted/python/failure.py b/contrib/python/Twisted/py3/twisted/python/failure.py
index c006d555e5..d253ffad74 100644
--- a/contrib/python/Twisted/py3/twisted/python/failure.py
+++ b/contrib/python/Twisted/py3/twisted/python/failure.py
@@ -11,6 +11,7 @@ Asynchronous-friendly error mechanism.
See L{Failure}.
"""
+from __future__ import annotations
# System Imports
import builtins
@@ -18,6 +19,7 @@ import copy
import inspect
import linecache
import sys
+from functools import partial
from inspect import getmro
from io import StringIO
from typing import Callable, NoReturn, TypeVar
@@ -249,6 +251,7 @@ class Failure(BaseException):
pickled = 0
stack = None
+ _parents = None
# The opcode of "yield" in Python bytecode. We need this in
# _findFailure in order to identify whether an exception was
@@ -288,9 +291,6 @@ class Failure(BaseException):
self.type = self.value = tb = None
self.captureVars = captureVars
- if isinstance(exc_value, str) and exc_type is None:
- raise TypeError("Strings are not supported by Failure")
-
stackOffset = 0
if exc_value is None:
@@ -365,7 +365,6 @@ class Failure(BaseException):
# with bareword "except:"s. This premature exception
# catching means tracebacks generated here don't tend to show
# what called upon the PB object.
-
while f:
if captureVars:
localz = f.f_locals.copy()
@@ -417,11 +416,22 @@ class Failure(BaseException):
)
)
tb = tb.tb_next
+
+ @property
+ def parents(self):
+ if self._parents is not None:
+ return self._parents
+
if inspect.isclass(self.type) and issubclass(self.type, Exception):
parentCs = getmro(self.type)
- self.parents = list(map(reflect.qual, parentCs))
+ self._parents = list(map(reflect.qual, parentCs))
else:
- self.parents = [self.type]
+ self._parents = [self.type]
+ return self._parents
+
+ @parents.setter
+ def parents(self, parents):
+ self._parents = parents
def _extrapolate(self, otherFailure):
"""
@@ -452,6 +462,26 @@ class Failure(BaseException):
frames.extend(self.frames)
self.frames = frames
+ @staticmethod
+ def _withoutTraceback(value: BaseException) -> Failure:
+ """
+ Create a L{Failure} for an exception without a traceback.
+
+ By restricting the inputs significantly, this constructor runs much
+ faster.
+ """
+ result = Failure.__new__(Failure)
+ global count
+ count += 1
+ result.captureVars = False
+ result.count = count
+ result.frames = []
+ result.stack = [] # type: ignore
+ result.value = value
+ result.type = value.__class__
+ result.tb = None
+ return result
+
def trap(self, *errorTypes):
"""
Trap this failure if its type is in a predetermined list.
@@ -587,30 +617,28 @@ class Failure(BaseException):
def __str__(self) -> str:
return "[Failure instance: %s]" % self.getBriefTraceback()
+ def __setstate__(self, state):
+ state["_parents"] = state.pop("parents")
+ self.__dict__.update(state)
+
def __getstate__(self):
- """Avoid pickling objects in the traceback."""
- if self.pickled:
- return self.__dict__
- c = self.__dict__.copy()
+ """
+ Avoid pickling objects in the traceback.
- c["frames"] = [
- [
- v[0],
- v[1],
- v[2],
- _safeReprVars(v[3]),
- _safeReprVars(v[4]),
- ]
- for v in self.frames
- ]
+ This is not called direclty by pickle, since C{BaseException}
+ implements reduce; instead, pickle calls C{Failure.__reduce__} which
+ then calls this API.
+ """
+ # Make sure _parents field is populated:
+ _ = self.parents
- # Added 2003-06-23. See comment above in __init__
- c["tb"] = None
+ c = self.__dict__.copy()
- if self.stack is not None:
- # XXX: This is a band-aid. I can't figure out where these
- # (failure.stack is None) instances are coming from.
- c["stack"] = [
+ # Backwards compatibility with old code, e.g. for Perspective Broker:
+ c["parents"] = c.pop("_parents")
+
+ if self.captureVars:
+ c["frames"] = [
[
v[0],
v[1],
@@ -618,12 +646,35 @@ class Failure(BaseException):
_safeReprVars(v[3]),
_safeReprVars(v[4]),
]
- for v in self.stack
+ for v in self.frames
]
+ # Added 2003-06-23. See comment above in __init__
+ c["tb"] = None
+
+ if self.stack is not None:
+ # XXX: This is a band-aid. I can't figure out where these
+ # (failure.stack is None) instances are coming from.
+ if self.captureVars:
+ c["stack"] = [
+ [
+ v[0],
+ v[1],
+ v[2],
+ _safeReprVars(v[3]),
+ _safeReprVars(v[4]),
+ ]
+ for v in self.stack
+ ]
+
c["pickled"] = 1
return c
+ def __reduce__(self):
+ # BaseException implements a __reduce__ (in C, technically), so we need
+ # to override this to get pickling working.
+ return (partial(Failure.__new__, Failure), (), self.__getstate__())
+
def cleanFailure(self):
"""
Remove references to other objects, replacing them with strings.
diff --git a/contrib/python/Twisted/py3/twisted/spread/pb.py b/contrib/python/Twisted/py3/twisted/spread/pb.py
index dcf545015d..1a58dc6c59 100644
--- a/contrib/python/Twisted/py3/twisted/spread/pb.py
+++ b/contrib/python/Twisted/py3/twisted/spread/pb.py
@@ -449,7 +449,11 @@ class CopyableFailure(failure.Failure, Copyable):
Collect state related to the exception which occurred, discarding
state which cannot reasonably be serialized.
"""
+ # Make sure self._parents is populated:
+ _ = self.parents
+
state = self.__dict__.copy()
+ state["parents"] = state.pop("_parents")
state["tb"] = None
state["frames"] = []
state["stack"] = []
@@ -481,6 +485,10 @@ class CopiedFailure(RemoteCopy, failure.Failure):
@type traceback: C{str}
"""
+ def setCopyableState(self, state):
+ state["_parents"] = state.pop("parents")
+ return super().setCopyableState(state)
+
def printTraceback(self, file=None, elideFrameworkCode=0, detail="default"):
if file is None:
file = log.logfile
diff --git a/contrib/python/Twisted/py3/twisted/web/_flatten.py b/contrib/python/Twisted/py3/twisted/web/_flatten.py
index 87a8bf2dfb..12691b87fa 100644
--- a/contrib/python/Twisted/py3/twisted/web/_flatten.py
+++ b/contrib/python/Twisted/py3/twisted/web/_flatten.py
@@ -418,7 +418,6 @@ async def _flattenTree(
while stack:
try:
- frame = stack[-1].gi_frame
element = next(stack[-1])
if isinstance(element, Deferred):
# Before suspending flattening for an unknown amount of time,
@@ -428,11 +427,11 @@ async def _flattenTree(
except StopIteration:
stack.pop()
except Exception as e:
- stack.pop()
roots = []
for generator in stack:
- roots.append(generator.gi_frame.f_locals["root"])
- roots.append(frame.f_locals["root"])
+ if generator.gi_frame is not None:
+ roots.append(generator.gi_frame.f_locals["root"])
+ stack.pop()
raise FlattenerError(e, roots, extract_tb(exc_info()[2]))
else:
stack.append(element)
diff --git a/contrib/python/Twisted/py3/twisted/web/_http2.py b/contrib/python/Twisted/py3/twisted/web/_http2.py
index 24c24fc0ff..f048c7335e 100644
--- a/contrib/python/Twisted/py3/twisted/web/_http2.py
+++ b/contrib/python/Twisted/py3/twisted/web/_http2.py
@@ -1073,10 +1073,15 @@ class H2Stream:
@type reason: L{bytes}
@param headers: The HTTP response headers.
- @type headers: Any iterable of two-tuples of L{bytes}, representing header
- names and header values.
+ @type headers: L{twisted.web.http_headers.Headers}
"""
- self._conn.writeHeaders(version, code, reason, headers, self.streamID)
+ self._conn.writeHeaders(
+ version,
+ code,
+ reason,
+ [(k, v) for (k, values) in headers.getAllRawHeaders() for v in values],
+ self.streamID,
+ )
def requestDone(self, request):
"""
diff --git a/contrib/python/Twisted/py3/twisted/web/_newclient.py b/contrib/python/Twisted/py3/twisted/web/_newclient.py
index 6fd1ac21ba..a151bdae05 100644
--- a/contrib/python/Twisted/py3/twisted/web/_newclient.py
+++ b/contrib/python/Twisted/py3/twisted/web/_newclient.py
@@ -26,17 +26,14 @@ Various other classes in this module support this usage:
response.
"""
+from __future__ import annotations
+
import re
+from typing import TYPE_CHECKING, Optional
from zope.interface import implementer
-from twisted.internet.defer import (
- CancelledError,
- Deferred,
- fail,
- maybeDeferred,
- succeed,
-)
+from twisted.internet.defer import CancelledError, Deferred, fail, succeed
from twisted.internet.error import ConnectionDone
from twisted.internet.interfaces import IConsumer, IPushProducer
from twisted.internet.protocol import Protocol
@@ -45,7 +42,6 @@ from twisted.protocols.basic import LineReceiver
from twisted.python.compat import networkString
from twisted.python.components import proxyForInterface
from twisted.python.failure import Failure
-from twisted.python.reflect import fullyQualifiedName
from twisted.web.http import (
NO_CONTENT,
NOT_MODIFIED,
@@ -183,21 +179,6 @@ class RequestNotSent(Exception):
"""
-def _callAppFunction(function):
- """
- Call C{function}. If it raises an exception, log it with a minimal
- description of the source.
-
- @return: L{None}
- """
- try:
- function()
- except BaseException:
- _moduleLog.failure(
- "Unexpected exception from {name}", name=fullyQualifiedName(function)
- )
-
-
class HTTPParser(LineReceiver):
"""
L{HTTPParser} handles the parsing side of HTTP processing. With a suitable
@@ -207,6 +188,10 @@ class HTTPParser(LineReceiver):
@ivar headers: All of the non-connection control message headers yet
received.
+ @ivar connHeaders: All of the connection control message headers yet
+ received. See L{CONNECTION_CONTROL_HEADERS} and
+ L{isConnectionControlHeader}.
+
@ivar state: State indicator for the response parsing state machine. One
of C{STATUS}, C{HEADER}, C{BODY}, C{DONE}.
@@ -342,6 +327,15 @@ class HTTPParser(LineReceiver):
self.switchToBodyMode(None)
+_ignoreDecoderErrors = _moduleLog.failureHandler("while interacting with body decoder:")
+_ignoreStopProducerStopWriting = _moduleLog.failureHandler(
+ "while calling stopProducing() in stopWriting():"
+)
+_ignoreStopProducerWrite = _moduleLog.failureHandler(
+ "while calling stopProducing() in write():"
+)
+
+
class HTTPClientParser(HTTPParser):
"""
An HTTP parser which only handles HTTP responses.
@@ -367,7 +361,7 @@ class HTTPClientParser(HTTPParser):
b"chunked": _ChunkedTransferDecoder,
}
- bodyDecoder = None
+ bodyDecoder: _IdentityTransferDecoder | None = None
_log = Logger()
def __init__(self, request, finisher):
@@ -389,6 +383,11 @@ class HTTPClientParser(HTTPParser):
b'HTTP/1.1'. Returns (protocol, major, minor). Will raise ValueError
on bad syntax.
"""
+ # Vast majority of the time this will be the response, so just
+ # immediately return the result:
+ if strversion == b"HTTP/1.1":
+ return (b"HTTP", 1, 1)
+
try:
proto, strnumber = strversion.split(b"/")
major, minor = strnumber.split(b".")
@@ -497,18 +496,9 @@ class HTTPClientParser(HTTPParser):
# allow the transfer decoder to set the response object's
# length attribute.
else:
- contentLengthHeaders = self.connHeaders.getRawHeaders(b"content-length")
- if contentLengthHeaders is None:
- contentLength = None
- elif len(contentLengthHeaders) == 1:
- contentLength = int(contentLengthHeaders[0])
+ contentLength = _contentLength(self.connHeaders)
+ if contentLength is not None:
self.response.length = contentLength
- else:
- # "HTTP Message Splitting" or "HTTP Response Smuggling"
- # potentially happening. Or it's just a buggy server.
- raise ValueError(
- "Too many Content-Length headers; " "response is invalid"
- )
if contentLength == 0:
self._finished(self.clearLineBuffer())
@@ -539,9 +529,14 @@ class HTTPClientParser(HTTPParser):
self._responseDeferred.callback(self.response)
del self._responseDeferred
- def connectionLost(self, reason):
+ def connectionLost(self, reason: Failure | None = None) -> None:
if self.bodyDecoder is not None:
- try:
+ # Handle exceptions from both the body decoder itself and the
+ # various invocations of _bodyDataFinished; treat them all as
+ # application code. The response is part of the HTTP server and
+ # really shouldn't raise exceptions, but maybe there's some buggy
+ # application code somewhere making things difficult.
+ with _ignoreDecoderErrors:
try:
self.bodyDecoder.noMoreData()
except PotentialDataLoss:
@@ -552,12 +547,6 @@ class HTTPClientParser(HTTPParser):
)
else:
self.response._bodyDataFinished()
- except BaseException:
- # Handle exceptions from both the except suites and the else
- # suite. Those functions really shouldn't raise exceptions,
- # but maybe there's some buggy application code somewhere
- # making things difficult.
- self._log.failure("")
elif self.state != DONE:
if self._everReceivedData:
exceptionClass = ResponseFailed
@@ -589,7 +578,7 @@ _VALID_METHOD = re.compile(
b"~",
b"\x30-\x39",
b"\x41-\x5a",
- b"\x61-\x7A",
+ b"\x61-\x7a",
),
),
),
@@ -645,6 +634,77 @@ def _ensureValidURI(uri):
raise ValueError(f"Invalid URI {uri!r}")
+def _decint(data: bytes) -> int:
+ """
+ Parse a decimal integer of the form C{1*DIGIT}, i.e. consisting only of
+ decimal digits. The integer may be embedded in whitespace (space and
+ horizontal tab). This differs from the built-in L{int()} function by
+ disallowing a leading C{+} character and various forms of whitespace
+ (note that we sanitize linear whitespace in header values in
+ L{twisted.web.http_headers.Headers}).
+
+ @param data: Value to parse.
+
+ @returns: A non-negative integer.
+
+ @raises ValueError: When I{value} contains non-decimal characters.
+ """
+ data = data.strip(b" \t")
+ if not data.isdigit():
+ raise ValueError(f"Value contains non-decimal digits: {data!r}")
+ return int(data)
+
+
+def _contentLength(connHeaders: Headers) -> Optional[int]:
+ """
+ Parse the I{Content-Length} connection header.
+
+ Two forms of duplicates are permitted. Header repetition:
+
+ Content-Length: 42
+ Content-Length: 42
+
+ And field value repetition:
+
+ Content-Length: 42, 42
+
+ Duplicates are only permitted if they have the same decimal value
+ (so C{7, 007} are also permitted).
+
+ @param connHeaders: Connection headers per L{HTTPParser.connHeaders}
+
+ @returns: A non-negative number of octets, or L{None} when there is
+ no I{Content-Length} header.
+
+ @raises ValueError: when there are conflicting headers, a header value
+ isn't an integer, or a header value is negative.
+
+ @see: U{https://datatracker.ietf.org/doc/html/rfc9110#section-8.6}
+ """
+ headers = connHeaders.getRawHeaders(b"content-length")
+ if headers is None:
+ return None
+
+ if len(headers) > 1:
+ fieldValues = b",".join(headers)
+ else:
+ [fieldValues] = headers
+
+ if b"," in fieldValues:
+ # Duplicates of the form b'42, 42' are allowed.
+ values = {_decint(v) for v in fieldValues.split(b",")}
+ if len(values) != 1:
+ # "HTTP Message Splitting" or "HTTP Response Smuggling"
+ # potentially happening. Or it's just a buggy server.
+ raise ValueError(
+ f"Invalid response: conflicting Content-Length headers: {fieldValues!r}"
+ )
+ [value] = values
+ else:
+ value = _decint(fieldValues)
+ return value
+
+
@implementer(IClientRequest)
class Request:
"""
@@ -929,12 +989,13 @@ class Request:
self._writeToEmptyBodyContentLength(transport)
else:
self._writeHeaders(transport, None)
+ return succeed(None)
elif self.bodyProducer.length is UNKNOWN_LENGTH:
return self._writeToBodyProducerChunked(transport)
else:
return self._writeToBodyProducerContentLength(transport)
- def stopWriting(self):
+ def stopWriting(self) -> None:
"""
Stop writing this request to the transport. This can only be called
after C{writeTo} and before the L{Deferred} returned by C{writeTo}
@@ -944,7 +1005,8 @@ class Request:
"""
# If bodyProducer is None, then the Deferred returned by writeTo has
# fired already and this method cannot be called.
- _callAppFunction(self.bodyProducer.stopProducing)
+ with _ignoreStopProducerStopWriting:
+ self.bodyProducer.stopProducing()
class LengthEnforcingConsumer:
@@ -1001,7 +1063,8 @@ class LengthEnforcingConsumer:
# we still have _finished which we can use to report the error to a
# better place than the direct caller of this method (some
# arbitrary application code).
- _callAppFunction(self._producer.stopProducing)
+ with _ignoreStopProducerWrite:
+ self._producer.stopProducing()
self._finished.errback(WrongBodyLength("too many bytes written"))
self._allowNoMoreWrites()
@@ -1034,9 +1097,10 @@ def makeStatefulDispatcher(name, template):
@return: The dispatcher function.
"""
+ pfx = f"_{name}_"
def dispatcher(self, *args, **kwargs):
- func = getattr(self, "_" + name + "_" + self._state, None)
+ func = getattr(self, f"{pfx}{self._state}", None)
if func is None:
raise RuntimeError(f"{self!r} has no {name} method in state {self._state}")
return func(*args, **kwargs)
@@ -1270,7 +1334,9 @@ class Response:
"""
self._state = "DEFERRED_CLOSE"
if reason is None:
- reason = Failure(ResponseDone("Response body fully received"))
+ reason = Failure._withoutTraceback(
+ ResponseDone("Response body fully received")
+ )
self._reason = reason
def _bodyDataFinished_CONNECTED(self, reason=None):
@@ -1278,7 +1344,9 @@ class Response:
Disconnect the protocol and move to the C{'FINISHED'} state.
"""
if reason is None:
- reason = Failure(ResponseDone("Response body fully received"))
+ reason = Failure._withoutTraceback(
+ ResponseDone("Response body fully received")
+ )
self._bodyProtocol.connectionLost(reason)
self._bodyProtocol = None
self._state = "FINISHED"
@@ -1468,11 +1536,11 @@ class HTTP11ClientProtocol(Protocol):
"""
_state = "QUIESCENT"
- _parser = None
- _finishedRequest = None
- _currentRequest = None
+ _parser: HTTPClientParser | None = None
+ _finishedRequest: Deferred[Response] | None = None
+ _currentRequest: Request | None = None
_transportProxy = None
- _responseDeferred = None
+ _responseDeferred: Deferred[Response] | None = None
_log = Logger()
def __init__(self, quiescentCallback=lambda c: None):
@@ -1506,7 +1574,10 @@ class HTTP11ClientProtocol(Protocol):
return fail(RequestNotSent())
self._state = "TRANSMITTING"
- _requestDeferred = maybeDeferred(request.writeTo, self.transport)
+ try:
+ _requestDeferred = request.writeTo(self.transport)
+ except BaseException:
+ _requestDeferred = fail()
def cancelRequest(ign):
# Explicitly cancel the request's deferred if it's still trying to
@@ -1550,7 +1621,7 @@ class HTTP11ClientProtocol(Protocol):
return self._finishedRequest
- def _finishResponse(self, rest):
+ def _finishResponse(self, rest: bytes) -> None:
"""
Called by an L{HTTPClientParser} to indicate that it has parsed a
complete response.
@@ -1562,10 +1633,16 @@ class HTTP11ClientProtocol(Protocol):
_finishResponse = makeStatefulDispatcher("finishResponse", _finishResponse)
- def _finishResponse_WAITING(self, rest):
+ def _finishResponse_WAITING(self, rest: bytes) -> None:
# Currently the rest parameter is ignored. Don't forget to use it if
# we ever add support for pipelining. And maybe check what trailers
# mean.
+ if TYPE_CHECKING:
+ assert self._responseDeferred is not None
+ assert self._finishedRequest is not None
+ assert self._currentRequest is not None
+ assert self.transport is not None
+
if self._state == "WAITING":
self._state = "QUIESCENT"
else:
@@ -1590,20 +1667,20 @@ class HTTP11ClientProtocol(Protocol):
or self._state != "QUIESCENT"
or not self._currentRequest.persistent
):
- self._giveUp(Failure(reason))
+ self._giveUp(Failure._withoutTraceback(reason))
else:
# Just in case we had paused the transport, resume it before
# considering it quiescent again.
- self.transport.resumeProducing()
+ producer: IPushProducer = self.transport # type:ignore[assignment]
+ producer.resumeProducing()
# We call the quiescent callback first, to ensure connection gets
# added back to connection pool before we finish the request.
- try:
+ with _moduleLog.failuresHandled("while invoking quiescent callback:") as op:
self._quiescentCallback(self)
- except BaseException:
+ if op.failed:
# If callback throws exception, just log it and disconnect;
# keeping persistent connections around is an optimisation:
- self._log.failure("")
self.transport.loseConnection()
self._disconnectParser(reason)
diff --git a/contrib/python/Twisted/py3/twisted/web/_responses.py b/contrib/python/Twisted/py3/twisted/web/_responses.py
index 2b93229350..5d87fdc597 100644
--- a/contrib/python/Twisted/py3/twisted/web/_responses.py
+++ b/contrib/python/Twisted/py3/twisted/web/_responses.py
@@ -46,6 +46,7 @@ REQUEST_URI_TOO_LONG = 414
UNSUPPORTED_MEDIA_TYPE = 415
REQUESTED_RANGE_NOT_SATISFIABLE = 416
EXPECTATION_FAILED = 417
+IM_A_TEAPOT = 418
INTERNAL_SERVER_ERROR = 500
NOT_IMPLEMENTED = 501
@@ -98,6 +99,7 @@ RESPONSES = {
UNSUPPORTED_MEDIA_TYPE: b"Unsupported Media Type",
REQUESTED_RANGE_NOT_SATISFIABLE: b"Requested Range not satisfiable",
EXPECTATION_FAILED: b"Expectation Failed",
+ IM_A_TEAPOT: b"I'm a teapot",
# 500
INTERNAL_SERVER_ERROR: b"Internal Server Error",
NOT_IMPLEMENTED: b"Not Implemented",
diff --git a/contrib/python/Twisted/py3/twisted/web/_template_util.py b/contrib/python/Twisted/py3/twisted/web/_template_util.py
index 230c33f3e8..501941ad12 100644
--- a/contrib/python/Twisted/py3/twisted/web/_template_util.py
+++ b/contrib/python/Twisted/py3/twisted/web/_template_util.py
@@ -92,7 +92,7 @@ def redirectTo(URL: bytes, request: IRequest) -> bytes:
</body>
</html>
""" % {
- b"url": URL
+ b"url": escape(URL.decode("utf-8")).encode("utf-8")
}
return content
@@ -118,34 +118,6 @@ class Redirect(resource.Resource):
return self
-# FIXME: This is totally broken, see https://twistedmatrix.com/trac/ticket/9838
-class ChildRedirector(Redirect):
- isLeaf = False
-
- def __init__(self, url):
- # XXX is this enough?
- if (
- (url.find("://") == -1)
- and (not url.startswith(".."))
- and (not url.startswith("/"))
- ):
- raise ValueError(
- (
- "It seems you've given me a redirect (%s) that is a child of"
- " myself! That's not good, it'll cause an infinite redirect."
- )
- % url
- )
- Redirect.__init__(self, url)
-
- def getChild(self, name, request):
- newUrl = self.url
- if not newUrl.endswith("/"):
- newUrl += "/"
- newUrl += name
- return ChildRedirector(newUrl)
-
-
class ParentRedirect(resource.Resource):
"""
Redirect to the nearest directory and strip any query string.
diff --git a/contrib/python/Twisted/py3/twisted/web/client.py b/contrib/python/Twisted/py3/twisted/web/client.py
index e66b0cf317..b06f1bef28 100644
--- a/contrib/python/Twisted/py3/twisted/web/client.py
+++ b/contrib/python/Twisted/py3/twisted/web/client.py
@@ -1530,7 +1530,7 @@ class ContentDecoderAgent:
return response
-_canonicalHeaderName = Headers()._canonicalNameCaps
+_canonicalHeaderName = Headers()._encodeName
_defaultSensitiveHeaders = frozenset(
[
b"Authorization",
diff --git a/contrib/python/Twisted/py3/twisted/web/http.py b/contrib/python/Twisted/py3/twisted/web/http.py
index 1c598380ac..e80f6cb365 100644
--- a/contrib/python/Twisted/py3/twisted/web/http.py
+++ b/contrib/python/Twisted/py3/twisted/web/http.py
@@ -31,6 +31,7 @@ also useful for HTTP clients (such as the chunked encoding parser).
it, as in the HTTP 1.1 chunked I{Transfer-Encoding} (RFC 7230 section 4.1).
This limits how much data may be buffered when decoding the line.
"""
+
from __future__ import annotations
__all__ = [
@@ -69,6 +70,7 @@ __all__ = [
"UNSUPPORTED_MEDIA_TYPE",
"REQUESTED_RANGE_NOT_SATISFIABLE",
"EXPECTATION_FAILED",
+ "IM_A_TEAPOT",
"INTERNAL_SERVER_ERROR",
"NOT_IMPLEMENTED",
"BAD_GATEWAY",
@@ -108,9 +110,17 @@ import tempfile
import time
import warnings
from email import message_from_bytes
-from email.message import EmailMessage
-from io import BytesIO
-from typing import AnyStr, Callable, Dict, List, Optional, Tuple
+from email.message import EmailMessage, Message
+from io import BufferedIOBase, BytesIO, TextIOWrapper
+from typing import (
+ AnyStr,
+ Callable,
+ Dict,
+ List,
+ Optional,
+ Protocol as TypingProtocol,
+ Tuple,
+)
from urllib.parse import (
ParseResultBytes,
unquote_to_bytes as unquote,
@@ -124,13 +134,14 @@ from incremental import Version
from twisted.internet import address, interfaces, protocol
from twisted.internet._producer_helpers import _PullToPush
from twisted.internet.defer import Deferred
-from twisted.internet.interfaces import IProtocol
+from twisted.internet.interfaces import IAddress, IDelayedCall, IProtocol, IReactorTime
+from twisted.internet.protocol import Protocol
from twisted.logger import Logger
from twisted.protocols import basic, policies
from twisted.python import log
from twisted.python.compat import nativeString, networkString
from twisted.python.components import proxyForInterface
-from twisted.python.deprecate import deprecated
+from twisted.python.deprecate import deprecated, deprecatedModuleAttribute
from twisted.python.failure import Failure
from twisted.web._responses import (
ACCEPTED,
@@ -144,6 +155,7 @@ from twisted.web._responses import (
GATEWAY_TIMEOUT,
GONE,
HTTP_VERSION_NOT_SUPPORTED,
+ IM_A_TEAPOT,
INSUFFICIENT_STORAGE_SPACE,
INTERNAL_SERVER_ERROR,
LENGTH_REQUIRED,
@@ -224,6 +236,58 @@ weekdayname_lower = [name.lower() for name in weekdayname]
monthname_lower = [name and name.lower() for name in monthname]
+def _parseRequestLine(line: bytes) -> tuple[bytes, bytes, bytes]:
+ """
+ Parse an HTTP request line, which looks like:
+
+ GET /foo/bar HTTP/1.1
+
+ This function attempts to validate the well-formedness of
+ the line. RFC 9112 section 3 provides this ABNF:
+
+ request-line = method SP request-target SP HTTP-version
+
+ We allow any method that is a valid token:
+
+ method = token
+ token = 1*tchar
+ tchar = "!" / "#" / "$" / "%" / "&" / "'" / "*"
+ / "+" / "-" / "." / "^" / "_" / "`" / "|" / "~"
+ / DIGIT / ALPHA
+
+ We allow any non-empty request-target that contains only printable
+ ASCII characters (no whitespace).
+
+ The RFC defines HTTP-version like this:
+
+ HTTP-version = HTTP-name "/" DIGIT "." DIGIT
+ HTTP-name = %s"HTTP"
+
+ However, this function is more strict than the RFC: we only allow
+ HTTP versions of 1.0 and 1.1, as later versions of HTTP don't use
+ a request line.
+
+ @returns: C{(method, request, version)} three-tuple
+
+ @raises: L{ValueError} when malformed
+ """
+ method, request, version = line.split(b" ")
+
+ if not _istoken(method):
+ raise ValueError("Invalid method")
+
+ for c in request:
+ if c <= 32 or c > 176:
+ raise ValueError("Invalid request-target")
+ if request == b"":
+ raise ValueError("Empty request-target")
+
+ if version != b"HTTP/1.1" and version != b"HTTP/1.0":
+ raise ValueError("Invalid version")
+
+ return method, request, version
+
+
def _parseContentType(line: bytes) -> bytes:
"""
Parse the Content-Type header.
@@ -251,11 +315,16 @@ def _getMultiPartArgs(content: bytes, ctype: bytes) -> dict[bytes, list[bytes]]:
if not msg.is_multipart():
raise _MultiPartParseException("Not a multipart.")
- for part in msg.get_payload():
- name = part.get_param("name", header="content-disposition")
+ part: Message
+ # "per Python docs, a list of Message objects when is_multipart() is True,
+ # or a string when is_multipart() is False"
+ for part in msg.get_payload(): # type:ignore[assignment]
+ name: str | None = part.get_param(
+ "name", header="content-disposition"
+ ) # type:ignore[assignment]
if not name:
continue
- payload = part.get_payload(decode=True)
+ payload: bytes = part.get_payload(decode=True) # type:ignore[assignment]
result[name.encode("utf8")] = [payload]
return result
@@ -378,7 +447,7 @@ def stringToDatetime(dateString):
@type dateString: C{bytes}
"""
- parts = nativeString(dateString).split()
+ parts = dateString.decode("ascii").split()
if not parts[0][0:3].lower() in weekdayname_lower:
# Weekday is stupid. Might have been omitted.
@@ -438,6 +507,20 @@ def toChunk(data):
return (networkString(f"{len(data):x}"), b"\r\n", data, b"\r\n")
+def _istoken(b: bytes) -> bool:
+ """
+ Is the string a token per RFC 9110 section 5.6.2?
+ """
+ for c in b:
+ if c not in (
+ b"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz" # ALPHA
+ b"0123456789" # DIGIT
+ b"!#$%^'*+-.^_`|~"
+ ):
+ return False
+ return b != b""
+
+
def _ishexdigits(b: bytes) -> bool:
"""
Is the string case-insensitively hexidecimal?
@@ -764,6 +847,14 @@ class HTTPClient(basic.LineReceiver):
self.setLineMode(rest)
+deprecatedModuleAttribute(
+ Version("Twisted", 24, 7, 0),
+ "Use twisted.web.client.Agent instead.",
+ __name__,
+ HTTPClient.__name__,
+)
+
+
# response codes that must have empty bodies
NO_BODY_CODES = (204, 304)
@@ -1193,7 +1284,6 @@ class Request:
version = self.clientproto
code = b"%d" % (self.code,)
reason = self.code_message
- headers = []
# if we don't have a content length, we send data in
# chunked mode, so that we can support pipelining in
@@ -1204,7 +1294,7 @@ class Request:
and self.method != b"HEAD"
and self.code not in NO_BODY_CODES
):
- headers.append((b"Transfer-Encoding", b"chunked"))
+ self.responseHeaders.setRawHeaders("Transfer-Encoding", [b"chunked"])
self.chunked = 1
if self.lastModified is not None:
@@ -1221,14 +1311,10 @@ class Request:
if self.etag is not None:
self.responseHeaders.setRawHeaders(b"ETag", [self.etag])
- for name, values in self.responseHeaders.getAllRawHeaders():
- for value in values:
- headers.append((name, value))
+ if self.cookies:
+ self.responseHeaders.setRawHeaders(b"Set-Cookie", self.cookies)
- for cookie in self.cookies:
- headers.append((b"Set-Cookie", cookie))
-
- self.channel.writeHeaders(version, code, reason, headers)
+ self.channel.writeHeaders(version, code, reason, self.responseHeaders)
# if this is a "HEAD" request, we shouldn't return any data
if self.method == b"HEAD":
@@ -1356,19 +1442,15 @@ class Request:
cookie += b"; SameSite=" + sameSite
self.cookies.append(cookie)
- def setResponseCode(self, code, message=None):
+ def setResponseCode(self, code: int, message: Optional[bytes] = None) -> None:
"""
Set the HTTP response code.
@type code: L{int}
@type message: L{bytes}
"""
- if not isinstance(code, int):
- raise TypeError("HTTP response code must be int or long")
self.code = code
- if message:
- if not isinstance(message, bytes):
- raise TypeError("HTTP response status message must be bytes")
+ if message is not None:
self.code_message = message
else:
self.code_message = RESPONSES.get(code, b"Unknown Status")
@@ -2000,16 +2082,21 @@ class _ChunkedTransferDecoder:
@returns: C{False}, as there is either insufficient data to continue,
or no data remains.
"""
- if (
- self._receivedTrailerHeadersSize + len(self._buffer)
- > self._maxTrailerHeadersSize
- ):
- raise _MalformedChunkedDataError("Trailer headers data is too long.")
-
eolIndex = self._buffer.find(b"\r\n", self._start)
if eolIndex == -1:
# Still no end of network line marker found.
+ #
+ # Check if we've run up against the trailer size limit: if the next
+ # read contains the terminating CRLF then we'll have this many bytes
+ # of trailers (including the CRLFs).
+ minTrailerSize = (
+ self._receivedTrailerHeadersSize
+ + len(self._buffer)
+ + (1 if self._buffer.endswith(b"\r") else 2)
+ )
+ if minTrailerSize > self._maxTrailerHeadersSize:
+ raise _MalformedChunkedDataError("Trailer headers data is too long.")
# Continue processing more data.
return False
@@ -2019,6 +2106,8 @@ class _ChunkedTransferDecoder:
del self._buffer[0 : eolIndex + 2]
self._start = 0
self._receivedTrailerHeadersSize += eolIndex + 2
+ if self._receivedTrailerHeadersSize > self._maxTrailerHeadersSize:
+ raise _MalformedChunkedDataError("Trailer headers data is too long.")
return True
# eolIndex in this part of code is equal to 0
@@ -2268,13 +2357,15 @@ class HTTPChannel(basic.LineReceiver, policies.TimeoutMixin):
)
self._networkProducer.registerProducer(self, True)
+ def dataReceived(self, data):
+ self.resetTimeout()
+ basic.LineReceiver.dataReceived(self, data)
+
def lineReceived(self, line):
"""
Called for each line from request until the end of headers when
it enters binary mode.
"""
- self.resetTimeout()
-
self._receivedHeaderSize += len(line)
if self._receivedHeaderSize > self.totalHeadersSize:
self._respondToBadRequestAndDisconnect()
@@ -2302,14 +2393,9 @@ class HTTPChannel(basic.LineReceiver, policies.TimeoutMixin):
self.__first_line = 0
- parts = line.split()
- if len(parts) != 3:
- self._respondToBadRequestAndDisconnect()
- return
- command, request, version = parts
try:
- command.decode("ascii")
- except UnicodeDecodeError:
+ command, request, version = _parseRequestLine(line)
+ except ValueError:
self._respondToBadRequestAndDisconnect()
return
@@ -2342,8 +2428,8 @@ class HTTPChannel(basic.LineReceiver, policies.TimeoutMixin):
self.__header = line
def _finishRequestBody(self, data):
- self.allContentReceived()
self._dataBuffer.append(data)
+ self.allContentReceived()
def _maybeChooseTransferDecoder(self, header, data):
"""
@@ -2410,7 +2496,8 @@ class HTTPChannel(basic.LineReceiver, policies.TimeoutMixin):
self._respondToBadRequestAndDisconnect()
return False
- if not header or header[-1:].isspace():
+ # Header names must be tokens, per RFC 9110 section 5.1.
+ if not _istoken(header):
self._respondToBadRequestAndDisconnect()
return False
@@ -2420,12 +2507,7 @@ class HTTPChannel(basic.LineReceiver, policies.TimeoutMixin):
if not self._maybeChooseTransferDecoder(header, data):
return False
- reqHeaders = self.requests[-1].requestHeaders
- values = reqHeaders.getRawHeaders(header)
- if values is not None:
- values.append(data)
- else:
- reqHeaders.setRawHeaders(header, [data])
+ self.requests[-1].requestHeaders.addRawHeader(header, data)
self._receivedHeaderCount += 1
if self._receivedHeaderCount > self.maxHeaders:
@@ -2498,8 +2580,6 @@ class HTTPChannel(basic.LineReceiver, policies.TimeoutMixin):
self._networkProducer.pauseProducing()
return
- self.resetTimeout()
-
try:
self._transferDecoder.dataReceived(data)
except _MalformedChunkedDataError:
@@ -2638,8 +2718,7 @@ class HTTPChannel(basic.LineReceiver, policies.TimeoutMixin):
return False
def writeHeaders(self, version, code, reason, headers):
- """
- Called by L{Request} objects to write a complete set of HTTP headers to
+ """Called by L{Request} objects to write a complete set of HTTP headers to
a transport.
@param version: The HTTP version in use.
@@ -2652,19 +2731,25 @@ class HTTPChannel(basic.LineReceiver, policies.TimeoutMixin):
@type reason: L{bytes}
@param headers: The headers to write to the transport.
- @type headers: L{twisted.web.http_headers.Headers}
- """
- sanitizedHeaders = Headers()
- for name, value in headers:
- sanitizedHeaders.addRawHeader(name, value)
-
- responseLine = version + b" " + code + b" " + reason + b"\r\n"
- headerSequence = [responseLine]
- headerSequence.extend(
- name + b": " + value + b"\r\n"
- for name, values in sanitizedHeaders.getAllRawHeaders()
- for value in values
- )
+ @type headers: L{twisted.web.http_headers.Headers}, or (for backwards
+ compatibility purposes only) any iterable of two-tuples of
+ L{bytes}, representing header names and header values. The latter
+ option is not actually used by Twisted.
+
+ """
+ if not isinstance(headers, Headers):
+ # Turn into Headers instance for security reasons, to make sure we
+ # quite and sanitize everything. This variant should be removed
+ # eventually, it's only here for backwards compatibility.
+ sanitizedHeaders = Headers()
+ for name, value in headers:
+ sanitizedHeaders.addRawHeader(name, value)
+ headers = sanitizedHeaders
+
+ headerSequence = [version, b" ", code, b" ", reason, b"\r\n"]
+ for name, values in headers.getAllRawHeaders():
+ for value in values:
+ headerSequence.extend((name, b": ", value, b"\r\n"))
headerSequence.append(b"\r\n")
self.transport.writeSequence(headerSequence)
@@ -3138,11 +3223,9 @@ class _GenericHTTPChannelProtocol(proxyForInterface(IProtocol, "_channel")): #
using.
"""
if self._negotiatedProtocol is None:
- try:
- negotiatedProtocol = self._channel.transport.negotiatedProtocol
- except AttributeError:
- # Plaintext HTTP, always HTTP/1.1
- negotiatedProtocol = b"http/1.1"
+ negotiatedProtocol = getattr(
+ self._channel.transport, "negotiatedProtocol", b"http/1.1"
+ )
if negotiatedProtocol is None:
negotiatedProtocol = b"http/1.1"
@@ -3191,6 +3274,21 @@ def _genericHTTPChannelProtocolFactory(self):
return _GenericHTTPChannelProtocol(HTTPChannel())
+class _MinimalLogFile(TypingProtocol):
+ def write(self, data: str, /) -> object:
+ """
+ Write some data.
+ """
+
+ def close(self) -> None:
+ """
+ Close the file.
+ """
+
+
+value: type[_MinimalLogFile] = TextIOWrapper
+
+
class HTTPFactory(protocol.ServerFactory):
"""
Factory for HTTP server.
@@ -3221,11 +3319,16 @@ class HTTPFactory(protocol.ServerFactory):
protocol = _genericHTTPChannelProtocolFactory # type: ignore[assignment]
logPath = None
+ _logFile: _MinimalLogFile | None = None
- timeOut = _REQUEST_TIMEOUT
+ timeOut: int | float | None = _REQUEST_TIMEOUT
def __init__(
- self, logPath=None, timeout=_REQUEST_TIMEOUT, logFormatter=None, reactor=None
+ self,
+ logPath: str | bytes | None = None,
+ timeout: int | float = _REQUEST_TIMEOUT,
+ logFormatter: IAccessLogFormatter | None = None,
+ reactor: IReactorTime | None = None,
):
"""
@param logPath: File path to which access log messages will be written
@@ -3245,9 +3348,9 @@ class HTTPFactory(protocol.ServerFactory):
timeouts and compute logging timestamps. Defaults to the global
reactor.
"""
- if not reactor:
- from twisted.internet import reactor
- self.reactor = reactor
+ if reactor is None:
+ from twisted.internet import reactor # type:ignore[assignment]
+ self.reactor: IReactorTime = reactor # type:ignore[assignment]
if logPath is not None:
logPath = os.path.abspath(logPath)
@@ -3258,17 +3361,48 @@ class HTTPFactory(protocol.ServerFactory):
self._logFormatter = logFormatter
# For storing the cached log datetime and the callback to update it
- self._logDateTime = None
- self._logDateTimeCall = None
+ self._logDateTime: str | None = None
+ self._logDateTimeCall: IDelayedCall | None = None
+
+ logFile = property()
+ """
+ A file (object with C{write(data: str)} and C{close()} methods) that will
+ be used for logging HTTP requests and responses in the standard U{Combined
+ Log Format <https://en.wikipedia.org/wiki/Common_Log_Format>} .
+
+ @note: for backwards compatibility purposes, this may be I{set} to an
+ object with a C{write(data: bytes)} method, but these will be detected
+ (by checking if it's an instance of L{BufferedIOBase}) and replaced
+ with a L{TextIOWrapper} when retrieved by getting the attribute again.
+ """
+
+ @logFile.getter
+ def _get_logFile(self) -> _MinimalLogFile:
+ if self._logFile is None:
+ raise AttributeError("no log file present")
+ return self._logFile
+
+ @_get_logFile.setter
+ def _set_logFile(self, newLogFile: BufferedIOBase | _MinimalLogFile) -> None:
+ if isinstance(newLogFile, BufferedIOBase):
+ newLogFile = TextIOWrapper(
+ newLogFile, # type:ignore[arg-type]
+ "utf-8",
+ write_through=True,
+ newline="\n",
+ )
+ self._logFile = newLogFile
+
+ logFile = _set_logFile
- def _updateLogDateTime(self):
+ def _updateLogDateTime(self) -> None:
"""
Update log datetime periodically, so we aren't always recalculating it.
"""
self._logDateTime = datetimeToLogString(self.reactor.seconds())
self._logDateTimeCall = self.reactor.callLater(1, self._updateLogDateTime)
- def buildProtocol(self, addr):
+ def buildProtocol(self, addr: IAddress) -> Protocol | None:
p = protocol.ServerFactory.buildProtocol(self, addr)
# This is a bit of a hack to ensure that the HTTPChannel timeouts
@@ -3276,53 +3410,45 @@ class HTTPFactory(protocol.ServerFactory):
# ideally be resolved by passing the reactor more generally to the
# HTTPChannel, but that won't work for the TimeoutMixin until we fix
# https://twistedmatrix.com/trac/ticket/8488
- p.callLater = self.reactor.callLater
+ p.callLater = self.reactor.callLater # type:ignore[union-attr]
# timeOut needs to be on the Protocol instance cause
# TimeoutMixin expects it there
- p.timeOut = self.timeOut
+ p.timeOut = self.timeOut # type:ignore[union-attr]
return p
- def startFactory(self):
+ def startFactory(self) -> None:
"""
Set up request logging if necessary.
"""
if self._logDateTimeCall is None:
self._updateLogDateTime()
- if self.logPath:
- self.logFile = self._openLogFile(self.logPath)
- else:
- self.logFile = log.logfile
+ self._logFile = self._openLogFile(self.logPath) if self.logPath else log.logfile
- def stopFactory(self):
- if hasattr(self, "logFile"):
- if self.logFile != log.logfile:
- self.logFile.close()
- del self.logFile
+ def stopFactory(self) -> None:
+ if self._logFile is not None:
+ if self._logFile != log.logfile:
+ self._logFile.close()
+ self._logFile = None
if self._logDateTimeCall is not None and self._logDateTimeCall.active():
self._logDateTimeCall.cancel()
self._logDateTimeCall = None
- def _openLogFile(self, path):
+ def _openLogFile(self, path: str | bytes) -> _MinimalLogFile:
"""
Override in subclasses, e.g. to use L{twisted.python.logfile}.
"""
- f = open(path, "ab", 1)
- return f
+ return open(path, "a", 1, newline="\n")
- def log(self, request):
+ def log(self, request: Request) -> None:
"""
Write a line representing C{request} to the access log file.
@param request: The request object about which to log.
- @type request: L{Request}
"""
- try:
- logFile = self.logFile
- except AttributeError:
- pass
- else:
+ logFile = self._logFile
+ if logFile is not None:
line = self._logFormatter(self._logDateTime, request) + "\n"
- logFile.write(line.encode("utf8"))
+ logFile.write(line)
diff --git a/contrib/python/Twisted/py3/twisted/web/http_headers.py b/contrib/python/Twisted/py3/twisted/web/http_headers.py
index f810f4bc2c..8b1d41adb6 100644
--- a/contrib/python/Twisted/py3/twisted/web/http_headers.py
+++ b/contrib/python/Twisted/py3/twisted/web/http_headers.py
@@ -6,9 +6,9 @@
An API for storing HTTP header names and values.
"""
-from collections.abc import Sequence as _Sequence
from typing import (
AnyStr,
+ ClassVar,
Dict,
Iterator,
List,
@@ -26,17 +26,6 @@ from twisted.python.compat import cmp, comparable
_T = TypeVar("_T")
-def _dashCapitalize(name: bytes) -> bytes:
- """
- Return a byte string which is capitalized using '-' as a word separator.
-
- @param name: The name of the header to capitalize.
-
- @return: The given header capitalized using '-' as a word separator.
- """
- return b"-".join([word.capitalize() for word in name.split(b"-")])
-
-
def _sanitizeLinearWhitespace(headerComponent: bytes) -> bytes:
r"""
Replace linear whitespace (C{\n}, C{\r\n}, C{\r}) in a header key
@@ -65,13 +54,17 @@ class Headers:
and values as opaque byte strings.
@cvar _caseMappings: A L{dict} that maps lowercase header names
- to their canonicalized representation.
+ to their canonicalized representation, for headers with unconventional
+ capitalization.
+
+ @cvar _canonicalHeaderCache: A L{dict} that maps header names to their
+ canonicalized representation.
@ivar _rawHeaders: A L{dict} mapping header names as L{bytes} to L{list}s of
header values as L{bytes}.
"""
- _caseMappings = {
+ _caseMappings: ClassVar[Dict[bytes, bytes]] = {
b"content-md5": b"Content-MD5",
b"dnt": b"DNT",
b"etag": b"ETag",
@@ -81,6 +74,12 @@ class Headers:
b"x-xss-protection": b"X-XSS-Protection",
}
+ _canonicalHeaderCache: ClassVar[Dict[Union[bytes, str], bytes]] = {}
+
+ _MAX_CACHED_HEADERS: ClassVar[int] = 10_000
+
+ __slots__ = ["_rawHeaders"]
+
def __init__(
self,
rawHeaders: Optional[Mapping[AnyStr, Sequence[AnyStr]]] = None,
@@ -112,16 +111,36 @@ class Headers:
def _encodeName(self, name: Union[str, bytes]) -> bytes:
"""
- Encode the name of a header (eg 'Content-Type') to an ISO-8859-1 encoded
- bytestring if required.
+ Encode the name of a header (eg 'Content-Type') to an ISO-8859-1
+ encoded bytestring if required. It will be canonicalized and
+ whitespace-sanitized.
@param name: A HTTP header name
@return: C{name}, encoded if required, lowercased
"""
- if isinstance(name, str):
- return name.lower().encode("iso-8859-1")
- return name.lower()
+ if canonicalName := self._canonicalHeaderCache.get(name, None):
+ return canonicalName
+
+ bytes_name = name.encode("iso-8859-1") if isinstance(name, str) else name
+
+ if bytes_name.lower() in self._caseMappings:
+ # Some headers have special capitalization:
+ result = self._caseMappings[bytes_name.lower()]
+ else:
+ result = _sanitizeLinearWhitespace(
+ b"-".join([word.capitalize() for word in bytes_name.split(b"-")])
+ )
+
+ # In general, we should only see a very small number of header
+ # variations in the real world, so caching them is fine. However, an
+ # attacker could generate infinite header variations to fill up RAM, so
+ # we cap how many we cache. The performance degradation from lack of
+ # caching won't be that bad, and legit traffic won't hit it.
+ if len(self._canonicalHeaderCache) < self._MAX_CACHED_HEADERS:
+ self._canonicalHeaderCache[name] = result
+
+ return result
def copy(self):
"""
@@ -151,21 +170,9 @@ class Headers:
"""
self._rawHeaders.pop(self._encodeName(name), None)
- @overload
- def setRawHeaders(self, name: Union[str, bytes], values: Sequence[bytes]) -> None:
- ...
-
- @overload
- def setRawHeaders(self, name: Union[str, bytes], values: Sequence[str]) -> None:
- ...
-
- @overload
def setRawHeaders(
self, name: Union[str, bytes], values: Sequence[Union[str, bytes]]
) -> None:
- ...
-
- def setRawHeaders(self, name: Union[str, bytes], values: object) -> None:
"""
Sets the raw representation of the given header.
@@ -179,29 +186,7 @@ class Headers:
@return: L{None}
"""
- if not isinstance(values, _Sequence):
- raise TypeError(
- "Header entry %r should be sequence but found "
- "instance of %r instead" % (name, type(values))
- )
-
- if not isinstance(name, (bytes, str)):
- raise TypeError(
- f"Header name is an instance of {type(name)!r}, not bytes or str"
- )
-
- for count, value in enumerate(values):
- if not isinstance(value, (bytes, str)):
- raise TypeError(
- "Header value at position %s is an instance of %r, not "
- "bytes or str"
- % (
- count,
- type(value),
- )
- )
-
- _name = _sanitizeLinearWhitespace(self._encodeName(name))
+ _name = self._encodeName(name)
encodedValues: List[bytes] = []
for v in values:
if isinstance(v, str):
@@ -220,20 +205,7 @@ class Headers:
@param value: The value to set for the named header.
"""
- if not isinstance(name, (bytes, str)):
- raise TypeError(
- f"Header name is an instance of {type(name)!r}, not bytes or str"
- )
-
- if not isinstance(value, (bytes, str)):
- raise TypeError(
- "Header value is an instance of %r, not "
- "bytes or str" % (type(value),)
- )
-
- self._rawHeaders.setdefault(
- _sanitizeLinearWhitespace(self._encodeName(name)), []
- ).append(
+ self._rawHeaders.setdefault(self._encodeName(name), []).append(
_sanitizeLinearWhitespace(
value.encode("utf8") if isinstance(value, str) else value
)
@@ -277,19 +249,7 @@ class Headers:
object, as L{bytes}. The keys are capitalized in canonical
capitalization.
"""
- for k, v in self._rawHeaders.items():
- yield self._canonicalNameCaps(k), v
-
- def _canonicalNameCaps(self, name: bytes) -> bytes:
- """
- Return the canonical name for the given header.
-
- @param name: The all-lowercase header name to capitalize in its
- canonical form.
-
- @return: The canonical name of the header.
- """
- return self._caseMappings.get(name, _dashCapitalize(name))
+ return iter(self._rawHeaders.items())
__all__ = ["Headers"]
diff --git a/contrib/python/Twisted/py3/twisted/web/iweb.py b/contrib/python/Twisted/py3/twisted/web/iweb.py
index 1aeb152fd9..040b916c73 100644
--- a/contrib/python/Twisted/py3/twisted/web/iweb.py
+++ b/contrib/python/Twisted/py3/twisted/web/iweb.py
@@ -9,6 +9,7 @@ Interface definitions for L{twisted.web}.
L{IBodyProducer.length} to indicate that the length of the entity
body is not known in advance.
"""
+
from typing import TYPE_CHECKING, Callable, List, Optional
from zope.interface import Attribute, Interface
@@ -595,15 +596,15 @@ class IResponse(Interface):
L{IPushProducer}. The protocol's C{connectionLost} method will be
called with:
- - ResponseDone, which indicates that all bytes from the response
+ - L{ResponseDone}, which indicates that all bytes from the response
have been successfully delivered.
- - PotentialDataLoss, which indicates that it cannot be determined
+ - L{PotentialDataLoss}, which indicates that it cannot be determined
if the entire response body has been delivered. This only occurs
when making requests to HTTP servers which do not set
I{Content-Length} or a I{Transfer-Encoding} in the response.
- - ResponseFailed, which indicates that some bytes from the response
+ - L{ResponseFailed}, which indicates that some bytes from the response
were lost. The C{reasons} attribute of the exception may provide
more specific indications as to why.
"""
diff --git a/contrib/python/Twisted/py3/twisted/web/resource.py b/contrib/python/Twisted/py3/twisted/web/resource.py
index 456db72d12..56595d2995 100644
--- a/contrib/python/Twisted/py3/twisted/web/resource.py
+++ b/contrib/python/Twisted/py3/twisted/web/resource.py
@@ -28,7 +28,7 @@ from incremental import Version
from twisted.python.compat import nativeString
from twisted.python.components import proxyForInterface
-from twisted.python.deprecate import deprecatedModuleAttribute
+from twisted.python.deprecate import deprecated
from twisted.python.reflect import prefixedMethodNames
from twisted.web._responses import FORBIDDEN, NOT_FOUND
from twisted.web.error import UnsupportedMethod
@@ -294,15 +294,9 @@ def _computeAllowedMethods(resource):
return allowedMethods
-class _UnsafeErrorPage(Resource):
+class _UnsafeErrorPageBase(Resource):
"""
- L{_UnsafeErrorPage}, publicly available via the deprecated alias
- C{ErrorPage}, is a resource which responds with a particular
- (parameterized) status and a body consisting of HTML containing some
- descriptive text. This is useful for rendering simple error pages.
-
- Deprecated in Twisted 22.10.0 because it permits HTML injection; use
- L{twisted.web.pages.errorPage} instead.
+ Base class for deprecated error page resources.
@ivar template: A native string which will have a dictionary interpolated
into it to generate the response body. The dictionary has the following
@@ -355,7 +349,26 @@ class _UnsafeErrorPage(Resource):
return self
-class _UnsafeNoResource(_UnsafeErrorPage):
+class _UnsafeErrorPage(_UnsafeErrorPageBase):
+ """
+ L{_UnsafeErrorPage}, publicly available via the deprecated alias
+ C{ErrorPage}, is a resource which responds with a particular
+ (parameterized) status and a body consisting of HTML containing some
+ descriptive text. This is useful for rendering simple error pages.
+
+ Deprecated in Twisted 22.10.0 because it permits HTML injection; use
+ L{twisted.web.pages.errorPage} instead.
+ """
+
+ @deprecated(
+ Version("Twisted", 22, 10, 0),
+ "Use twisted.web.pages.errorPage instead, which properly escapes HTML.",
+ )
+ def __init__(self, status, brief, detail):
+ _UnsafeErrorPageBase.__init__(self, status, brief, detail)
+
+
+class _UnsafeNoResource(_UnsafeErrorPageBase):
"""
L{_UnsafeNoResource}, publicly available via the deprecated alias
C{NoResource}, is a specialization of L{_UnsafeErrorPage} which
@@ -365,11 +378,15 @@ class _UnsafeNoResource(_UnsafeErrorPage):
L{twisted.web.pages.notFound} instead.
"""
+ @deprecated(
+ Version("Twisted", 22, 10, 0),
+ "Use twisted.web.pages.notFound instead, which properly escapes HTML.",
+ )
def __init__(self, message="Sorry. No luck finding that resource."):
- _UnsafeErrorPage.__init__(self, NOT_FOUND, "No Such Resource", message)
+ _UnsafeErrorPageBase.__init__(self, NOT_FOUND, "No Such Resource", message)
-class _UnsafeForbiddenResource(_UnsafeErrorPage):
+class _UnsafeForbiddenResource(_UnsafeErrorPageBase):
"""
L{_UnsafeForbiddenResource}, publicly available via the deprecated alias
C{ForbiddenResource} is a specialization of L{_UnsafeErrorPage} which
@@ -379,8 +396,12 @@ class _UnsafeForbiddenResource(_UnsafeErrorPage):
L{twisted.web.pages.forbidden} instead.
"""
+ @deprecated(
+ Version("Twisted", 22, 10, 0),
+ "Use twisted.web.pages.forbidden instead, which properly escapes HTML.",
+ )
def __init__(self, message="Sorry, resource is forbidden."):
- _UnsafeErrorPage.__init__(self, FORBIDDEN, "Forbidden Resource", message)
+ _UnsafeErrorPageBase.__init__(self, FORBIDDEN, "Forbidden Resource", message)
# Deliberately undocumented public aliases. See GHSA-vg46-2rrj-3647.
@@ -388,27 +409,6 @@ ErrorPage = _UnsafeErrorPage
NoResource = _UnsafeNoResource
ForbiddenResource = _UnsafeForbiddenResource
-deprecatedModuleAttribute(
- Version("Twisted", 22, 10, 0),
- "Use twisted.web.pages.errorPage instead, which properly escapes HTML.",
- __name__,
- "ErrorPage",
-)
-
-deprecatedModuleAttribute(
- Version("Twisted", 22, 10, 0),
- "Use twisted.web.pages.notFound instead, which properly escapes HTML.",
- __name__,
- "NoResource",
-)
-
-deprecatedModuleAttribute(
- Version("Twisted", 22, 10, 0),
- "Use twisted.web.pages.forbidden instead, which properly escapes HTML.",
- __name__,
- "ForbiddenResource",
-)
-
class _IEncodingResource(Interface):
"""
diff --git a/contrib/python/Twisted/py3/twisted/web/server.py b/contrib/python/Twisted/py3/twisted/web/server.py
index 6392a3168a..cfcefad7f3 100644
--- a/contrib/python/Twisted/py3/twisted/web/server.py
+++ b/contrib/python/Twisted/py3/twisted/web/server.py
@@ -25,19 +25,23 @@ from urllib.parse import quote as _quote
from zope.interface import implementer
-from incremental import Version
-
from twisted import copyright
from twisted.internet import address, interfaces
from twisted.internet.error import AlreadyCalled, AlreadyCancelled
from twisted.logger import Logger
from twisted.python import components, failure, reflect
from twisted.python.compat import nativeString, networkString
-from twisted.python.deprecate import deprecatedModuleAttribute
from twisted.spread.pb import Copyable, ViewPoint
from twisted.web import http, iweb, resource, util
from twisted.web.error import UnsupportedMethod
-from twisted.web.http import unquote
+from twisted.web.http import (
+ NO_CONTENT,
+ NOT_MODIFIED,
+ HTTPFactory,
+ Request as _HTTPRequest,
+ datetimeToString,
+ unquote,
+)
NOT_DONE_YET = 1
@@ -51,23 +55,6 @@ __all__ = [
"GzipEncoderFactory",
]
-
-# backwards compatibility
-deprecatedModuleAttribute(
- Version("Twisted", 12, 1, 0),
- "Please use twisted.web.http.datetimeToString instead",
- "twisted.web.server",
- "date_time_string",
-)
-deprecatedModuleAttribute(
- Version("Twisted", 12, 1, 0),
- "Please use twisted.web.http.stringToDatetime instead",
- "twisted.web.server",
- "string_date_time",
-)
-date_time_string = http.datetimeToString
-string_date_time = http.stringToDatetime
-
# Support for other methods may be implemented on a per-resource basis.
supportedMethods = (b"GET", b"HEAD", b"POST")
@@ -112,7 +99,7 @@ class Request(Copyable, http.Request, components.Componentized):
_log = Logger()
def __init__(self, *args, **kw):
- http.Request.__init__(self, *args, **kw)
+ _HTTPRequest.__init__(self, *args, **kw)
components.Componentized.__init__(self)
def getStateToCopyFor(self, issuer):
@@ -187,7 +174,7 @@ class Request(Copyable, http.Request, components.Componentized):
try:
getContentFile = self.channel.site.getContentFile
except AttributeError:
- http.Request.gotLength(self, length)
+ _HTTPRequest.gotLength(self, length)
else:
self.content = getContentFile(length)
@@ -206,7 +193,7 @@ class Request(Copyable, http.Request, components.Componentized):
# set various default headers
self.setHeader(b"server", version)
- self.setHeader(b"date", http.datetimeToString())
+ self.setHeader(b"date", datetimeToString())
# Resource Identification
self.prepath = []
@@ -240,7 +227,7 @@ class Request(Copyable, http.Request, components.Componentized):
# NOT_MODIFIED and NO_CONTENT responses. We also omit it if there
# is a Content-Length header set to 0, as empty bodies don't need
# a content-type.
- needsCT = self.code not in (http.NOT_MODIFIED, http.NO_CONTENT)
+ needsCT = self.code not in (NOT_MODIFIED, NO_CONTENT)
contentType = self.responseHeaders.getRawHeaders(b"content-type")
contentLength = self.responseHeaders.getRawHeaders(b"content-length")
contentLengthZero = contentLength and (contentLength[0] == b"0")
@@ -263,17 +250,17 @@ class Request(Copyable, http.Request, components.Componentized):
if not self._inFakeHead:
if self._encoder:
data = self._encoder.encode(data)
- http.Request.write(self, data)
+ _HTTPRequest.write(self, data)
def finish(self):
"""
- Override C{http.Request.finish} for possible encoding.
+ Override L{twisted.web.http.Request.finish} for possible encoding.
"""
if self._encoder:
data = self._encoder.finish()
if data:
- http.Request.write(self, data)
- return http.Request.finish(self)
+ _HTTPRequest.write(self, data)
+ return _HTTPRequest.finish(self)
def render(self, resrc):
"""
@@ -768,7 +755,7 @@ version = networkString(f"TwistedWeb/{copyright.version}")
@implementer(interfaces.IProtocolNegotiationFactory)
-class Site(http.HTTPFactory):
+class Site(HTTPFactory):
"""
A web site: manage log, sessions, and resources.
diff --git a/contrib/python/Twisted/py3/twisted/web/soap.py b/contrib/python/Twisted/py3/twisted/web/soap.py
deleted file mode 100644
index cc44b41e2a..0000000000
--- a/contrib/python/Twisted/py3/twisted/web/soap.py
+++ /dev/null
@@ -1,166 +0,0 @@
-# -*- test-case-name: twisted.web.test.test_soap -*-
-# Copyright (c) Twisted Matrix Laboratories.
-# See LICENSE for details.
-
-
-"""
-SOAP support for twisted.web.
-
-Requires SOAPpy 0.10.1 or later.
-
-Maintainer: Itamar Shtull-Trauring
-
-Future plans:
-SOAPContext support of some kind.
-Pluggable method lookup policies.
-"""
-
-# SOAPpy
-import SOAPpy
-
-from twisted.internet import defer
-
-# twisted imports
-from twisted.web import client, resource, server
-
-
-class SOAPPublisher(resource.Resource):
- """Publish SOAP methods.
-
- By default, publish methods beginning with 'soap_'. If the method
- has an attribute 'useKeywords', it well get the arguments passed
- as keyword args.
- """
-
- isLeaf = 1
-
- # override to change the encoding used for responses
- encoding = "UTF-8"
-
- def lookupFunction(self, functionName):
- """Lookup published SOAP function.
-
- Override in subclasses. Default behaviour - publish methods
- starting with soap_.
-
- @return: callable or None if not found.
- """
- return getattr(self, "soap_%s" % functionName, None)
-
- def render(self, request):
- """Handle a SOAP command."""
- data = request.content.read()
-
- p, header, body, attrs = SOAPpy.parseSOAPRPC(data, 1, 1, 1)
-
- methodName, args, kwargs = p._name, p._aslist, p._asdict
-
- # deal with changes in SOAPpy 0.11
- if callable(args):
- args = args()
- if callable(kwargs):
- kwargs = kwargs()
-
- function = self.lookupFunction(methodName)
-
- if not function:
- self._methodNotFound(request, methodName)
- return server.NOT_DONE_YET
- else:
- if hasattr(function, "useKeywords"):
- keywords = {}
- for k, v in kwargs.items():
- keywords[str(k)] = v
- d = defer.maybeDeferred(function, **keywords)
- else:
- d = defer.maybeDeferred(function, *args)
-
- d.addCallback(self._gotResult, request, methodName)
- d.addErrback(self._gotError, request, methodName)
- return server.NOT_DONE_YET
-
- def _methodNotFound(self, request, methodName):
- response = SOAPpy.buildSOAP(
- SOAPpy.faultType(
- "%s:Client" % SOAPpy.NS.ENV_T, "Method %s not found" % methodName
- ),
- encoding=self.encoding,
- )
- self._sendResponse(request, response, status=500)
-
- def _gotResult(self, result, request, methodName):
- if not isinstance(result, SOAPpy.voidType):
- result = {"Result": result}
- response = SOAPpy.buildSOAP(
- kw={"%sResponse" % methodName: result}, encoding=self.encoding
- )
- self._sendResponse(request, response)
-
- def _gotError(self, failure, request, methodName):
- e = failure.value
- if isinstance(e, SOAPpy.faultType):
- fault = e
- else:
- fault = SOAPpy.faultType(
- "%s:Server" % SOAPpy.NS.ENV_T, "Method %s failed." % methodName
- )
- response = SOAPpy.buildSOAP(fault, encoding=self.encoding)
- self._sendResponse(request, response, status=500)
-
- def _sendResponse(self, request, response, status=200):
- request.setResponseCode(status)
-
- if self.encoding is not None:
- mimeType = 'text/xml; charset="%s"' % self.encoding
- else:
- mimeType = "text/xml"
- request.setHeader("Content-type", mimeType)
- request.setHeader("Content-length", str(len(response)))
- request.write(response)
- request.finish()
-
-
-class Proxy:
- """A Proxy for making remote SOAP calls.
-
- Pass the URL of the remote SOAP server to the constructor.
-
- Use proxy.callRemote('foobar', 1, 2) to call remote method
- 'foobar' with args 1 and 2, proxy.callRemote('foobar', x=1)
- will call foobar with named argument 'x'.
- """
-
- # at some point this should have encoding etc. kwargs
- def __init__(self, url, namespace=None, header=None):
- self.url = url
- self.namespace = namespace
- self.header = header
-
- def _cbGotResult(self, result):
- result = SOAPpy.parseSOAPRPC(result)
- if hasattr(result, "Result"):
- return result.Result
- elif len(result) == 1:
- ## SOAPpy 0.11.6 wraps the return results in a containing structure.
- ## This check added to make Proxy behaviour emulate SOAPProxy, which
- ## flattens the structure by default.
- ## This behaviour is OK because even singleton lists are wrapped in
- ## another singleton structType, which is almost always useless.
- return result[0]
- else:
- return result
-
- def callRemote(self, method, *args, **kwargs):
- payload = SOAPpy.buildSOAP(
- args=args,
- kw=kwargs,
- method=method,
- header=self.header,
- namespace=self.namespace,
- )
- return client.getPage(
- self.url,
- postdata=payload,
- method="POST",
- headers={"content-type": "text/xml", "SOAPAction": method},
- ).addCallback(self._cbGotResult)
diff --git a/contrib/python/Twisted/py3/twisted/web/test/requesthelper.py b/contrib/python/Twisted/py3/twisted/web/test/requesthelper.py
index a3b0904427..d5c8918b30 100644
--- a/contrib/python/Twisted/py3/twisted/web/test/requesthelper.py
+++ b/contrib/python/Twisted/py3/twisted/web/test/requesthelper.py
@@ -118,6 +118,10 @@ class DummyChannel:
pass
def writeHeaders(self, version, code, reason, headers):
+ if isinstance(headers, Headers):
+ headers = [
+ (k, v) for (k, values) in headers.getAllRawHeaders() for v in values
+ ]
response_line = version + b" " + code + b" " + reason + b"\r\n"
headerSequence = [response_line]
headerSequence.extend(name + b": " + value + b"\r\n" for name, value in headers)
diff --git a/contrib/python/Twisted/py3/twisted/web/util.py b/contrib/python/Twisted/py3/twisted/web/util.py
index 3135f05cd9..756c870480 100644
--- a/contrib/python/Twisted/py3/twisted/web/util.py
+++ b/contrib/python/Twisted/py3/twisted/web/util.py
@@ -9,7 +9,6 @@ An assortment of web server-related utilities.
__all__ = [
"redirectTo",
"Redirect",
- "ChildRedirector",
"ParentRedirect",
"DeferredResource",
"FailureElement",
@@ -24,7 +23,6 @@ __all__ = [
from ._template_util import (
_PRE,
- ChildRedirector,
DeferredResource,
FailureElement,
ParentRedirect,
diff --git a/contrib/python/Twisted/py3/twisted/web/wsgi.py b/contrib/python/Twisted/py3/twisted/web/wsgi.py
index 43227f40e3..e979d30416 100644
--- a/contrib/python/Twisted/py3/twisted/web/wsgi.py
+++ b/contrib/python/Twisted/py3/twisted/web/wsgi.py
@@ -8,6 +8,7 @@ U{Python Web Server Gateway Interface v1.0.1<http://www.python.org/dev/peps/pep-
from collections.abc import Sequence
from sys import exc_info
+from typing import List, Union
from warnings import warn
from zope.interface import implementer
@@ -19,79 +20,49 @@ from twisted.web.http import INTERNAL_SERVER_ERROR
from twisted.web.resource import IResource
from twisted.web.server import NOT_DONE_YET
-# PEP-3333 -- which has superseded PEP-333 -- states that, in both Python 2
-# and Python 3, text strings MUST be represented using the platform's native
-# string type, limited to characters defined in ISO-8859-1. Byte strings are
-# used only for values read from wsgi.input, passed to write() or yielded by
-# the application.
+
+# PEP-3333 -- which has superseded PEP-333 -- states that text strings MUST
+# be represented using the platform's native string type, limited to
+# characters defined in ISO-8859-1. Byte strings are used only for values
+# read from wsgi.input, passed to write() or yielded by the application.
#
# Put another way:
#
-# - In Python 2, all text strings and binary data are of type str/bytes and
-# NEVER of type unicode. Whether the strings contain binary data or
-# ISO-8859-1 text depends on context.
-#
-# - In Python 3, all text strings are of type str, and all binary data are of
+# - All text strings are of type str, and all binary data are of
# type bytes. Text MUST always be limited to that which can be encoded as
# ISO-8859-1, U+0000 to U+00FF inclusive.
#
# The following pair of functions -- _wsgiString() and _wsgiStringToBytes() --
# are used to make Twisted's WSGI support compliant with the standard.
-if str is bytes:
-
- def _wsgiString(string): # Python 2.
- """
- Convert C{string} to an ISO-8859-1 byte string, if it is not already.
-
- @type string: C{str}/C{bytes} or C{unicode}
- @rtype: C{str}/C{bytes}
-
- @raise UnicodeEncodeError: If C{string} contains non-ISO-8859-1 chars.
- """
- if isinstance(string, str):
- return string
- else:
- return string.encode("iso-8859-1")
-
- def _wsgiStringToBytes(string): # Python 2.
- """
- Return C{string} as is; a WSGI string is a byte string in Python 2.
-
- @type string: C{str}/C{bytes}
- @rtype: C{str}/C{bytes}
- """
- return string
+def _wsgiString(string: Union[str, bytes]) -> str:
+ """
+ Convert C{string} to a WSGI "bytes-as-unicode" string.
-else:
+ If it's a byte string, decode as ISO-8859-1. If it's a Unicode string,
+ round-trip it to bytes and back using ISO-8859-1 as the encoding.
- def _wsgiString(string): # Python 3.
- """
- Convert C{string} to a WSGI "bytes-as-unicode" string.
+ @type string: C{str} or C{bytes}
+ @rtype: C{str}
- If it's a byte string, decode as ISO-8859-1. If it's a Unicode string,
- round-trip it to bytes and back using ISO-8859-1 as the encoding.
+ @raise UnicodeEncodeError: If C{string} contains non-ISO-8859-1 chars.
+ """
+ if isinstance(string, str):
+ return string.encode("iso-8859-1").decode("iso-8859-1")
+ else:
+ return string.decode("iso-8859-1")
- @type string: C{str} or C{bytes}
- @rtype: C{str}
- @raise UnicodeEncodeError: If C{string} contains non-ISO-8859-1 chars.
- """
- if isinstance(string, str):
- return string.encode("iso-8859-1").decode("iso-8859-1")
- else:
- return string.decode("iso-8859-1")
-
- def _wsgiStringToBytes(string): # Python 3.
- """
- Convert C{string} from a WSGI "bytes-as-unicode" string to an
- ISO-8859-1 byte string.
+def _wsgiStringToBytes(string: str) -> bytes:
+ """
+ Convert C{string} from a WSGI "bytes-as-unicode" string to an
+ ISO-8859-1 byte string.
- @type string: C{str}
- @rtype: C{bytes}
+ @type string: C{str}
+ @rtype: C{bytes}
- @raise UnicodeEncodeError: If C{string} contains non-ISO-8859-1 chars.
- """
- return string.encode("iso-8859-1")
+ @raise UnicodeEncodeError: If C{string} contains non-ISO-8859-1 chars.
+ """
+ return string.encode("iso-8859-1")
class _ErrorStream:
@@ -108,7 +79,7 @@ class _ErrorStream:
_log = Logger()
- def write(self, data):
+ def write(self, data: str) -> None:
"""
Generate an event for the logging system with the given bytes as the
message.
@@ -117,27 +88,19 @@ class _ErrorStream:
@type data: str
- @raise TypeError: On Python 3, if C{data} is not a native string. On
- Python 2 a warning will be issued.
+ @raise TypeError: if C{data} is not a native string.
"""
if not isinstance(data, str):
- if str is bytes:
- warn(
- "write() argument should be str, not %r (%s)"
- % (data, type(data).__name__),
- category=UnicodeWarning,
- )
- else:
- raise TypeError(
- "write() argument must be str, not %r (%s)"
- % (data, type(data).__name__)
- )
+ raise TypeError(
+ "write() argument must be str, not %r (%s)"
+ % (data, type(data).__name__)
+ )
# Note that in old style, message was a tuple. logger._legacy
# will overwrite this value if it is not properly formatted here.
self._log.error(data, system="wsgi", isError=True, message=(data,))
- def writelines(self, iovec):
+ def writelines(self, iovec: List[str]) -> None:
"""
Join the given lines and pass them to C{write} to be handled in the
usual way.
@@ -147,8 +110,7 @@ class _ErrorStream:
@param iovec: A C{list} of C{'\\n'}-terminated C{str} which will be
logged.
- @raise TypeError: On Python 3, if C{iovec} contains any non-native
- strings. On Python 2 a warning will be issued.
+ @raise TypeError: if C{iovec} contains any non-native strings.
"""
self.write("".join(iovec))
@@ -287,9 +249,11 @@ class _WSGIResponse:
# All keys and values need to be native strings, i.e. of type str in
# *both* Python 2 and Python 3, so says PEP-3333.
+ remotePeer = request.getClientAddress()
self.environ = {
"REQUEST_METHOD": _wsgiString(request.method),
- "REMOTE_ADDR": _wsgiString(request.getClientAddress().host),
+ "REMOTE_ADDR": _wsgiString(remotePeer.host),
+ "REMOTE_PORT": _wsgiString(str(remotePeer.port)),
"SCRIPT_NAME": _wsgiString(scriptName),
"PATH_INFO": _wsgiString(pathInfo),
"QUERY_STRING": _wsgiString(queryString),
@@ -357,8 +321,7 @@ class _WSGIResponse:
raise excInfo[1].with_traceback(excInfo[2])
# PEP-3333 mandates that status should be a native string. In practice
- # this is mandated by Twisted's HTTP implementation too, so we enforce
- # on both Python 2 and Python 3.
+ # this is mandated by Twisted's HTTP implementation too.
if not isinstance(status, str):
raise TypeError(
"status must be str, not {!r} ({})".format(
@@ -535,6 +498,9 @@ class WSGIResource:
An L{IResource} implementation which delegates responsibility for all
resources hierarchically inferior to it to a WSGI application.
+ The C{environ} argument passed to the application, includes the
+ C{REMOTE_PORT} key to complement the C{REMOTE_ADDR} key.
+
@ivar _reactor: An L{IReactorThreads} provider which will be passed on to
L{_WSGIResponse} to schedule calls in the I/O thread.
diff --git a/contrib/python/Twisted/py3/twisted/words/protocols/irc.py b/contrib/python/Twisted/py3/twisted/words/protocols/irc.py
index c4ec04579f..2906fa5627 100644
--- a/contrib/python/Twisted/py3/twisted/words/protocols/irc.py
+++ b/contrib/python/Twisted/py3/twisted/words/protocols/irc.py
@@ -2650,8 +2650,7 @@ class IRCClient(basic.LineReceiver):
basic.LineReceiver.dataReceived(self, data)
def lineReceived(self, line):
- if bytes != str and isinstance(line, bytes):
- # decode bytes from transport to unicode
+ if isinstance(line, bytes):
line = line.decode("utf-8")
line = lowDequote(line)
diff --git a/contrib/python/Twisted/py3/twisted/words/service.py b/contrib/python/Twisted/py3/twisted/words/service.py
index d65a425f94..78189e1c7c 100644
--- a/contrib/python/Twisted/py3/twisted/words/service.py
+++ b/contrib/python/Twisted/py3/twisted/words/service.py
@@ -979,9 +979,7 @@ class PBGroup(pb.Referenceable):
class PBGroupReference(pb.RemoteReference):
def unjellyFor(self, unjellier, unjellyList):
clsName, name, ref = unjellyList
- self.name = name
- if bytes != str and isinstance(self.name, bytes):
- self.name = self.name.decode("utf-8")
+ self.name = name.decode("utf-8")
return pb.RemoteReference.unjellyFor(self, unjellier, [clsName, ref])
def leave(self, reason=None):
diff --git a/contrib/python/Twisted/py3/ya.make b/contrib/python/Twisted/py3/ya.make
index 47590a4139..d74fcea5c5 100644
--- a/contrib/python/Twisted/py3/ya.make
+++ b/contrib/python/Twisted/py3/ya.make
@@ -2,7 +2,7 @@
PY3_LIBRARY()
-VERSION(24.3.0)
+VERSION(24.7.0)
LICENSE(MIT)
@@ -416,7 +416,6 @@ PY_SRCS(
twisted/web/rewrite.py
twisted/web/script.py
twisted/web/server.py
- twisted/web/soap.py
twisted/web/static.py
twisted/web/sux.py
twisted/web/tap.py
diff --git a/contrib/python/hypothesis/py3/.dist-info/METADATA b/contrib/python/hypothesis/py3/.dist-info/METADATA
index 1e09939db8..8b3d585d4b 100644
--- a/contrib/python/hypothesis/py3/.dist-info/METADATA
+++ b/contrib/python/hypothesis/py3/.dist-info/METADATA
@@ -1,6 +1,6 @@
Metadata-Version: 2.1
Name: hypothesis
-Version: 6.110.1
+Version: 6.110.2
Summary: A library for property-based testing
Home-page: https://hypothesis.works
Author: David R. MacIver and Zac Hatfield-Dodds
@@ -41,10 +41,10 @@ Requires-Dist: exceptiongroup>=1.0.0; python_version < "3.11"
Provides-Extra: all
Requires-Dist: black>=19.10b0; extra == "all"
Requires-Dist: click>=7.0; extra == "all"
-Requires-Dist: crosshair-tool>=0.0.65; extra == "all"
+Requires-Dist: crosshair-tool>=0.0.66; extra == "all"
Requires-Dist: django>=3.2; extra == "all"
Requires-Dist: dpcontracts>=0.4; extra == "all"
-Requires-Dist: hypothesis-crosshair>=0.0.11; extra == "all"
+Requires-Dist: hypothesis-crosshair>=0.0.12; extra == "all"
Requires-Dist: lark>=0.10.1; extra == "all"
Requires-Dist: libcst>=0.3.16; extra == "all"
Requires-Dist: numpy>=1.17.3; extra == "all"
@@ -63,8 +63,8 @@ Requires-Dist: rich>=9.0.0; extra == "cli"
Provides-Extra: codemods
Requires-Dist: libcst>=0.3.16; extra == "codemods"
Provides-Extra: crosshair
-Requires-Dist: hypothesis-crosshair>=0.0.11; extra == "crosshair"
-Requires-Dist: crosshair-tool>=0.0.65; extra == "crosshair"
+Requires-Dist: hypothesis-crosshair>=0.0.12; extra == "crosshair"
+Requires-Dist: crosshair-tool>=0.0.66; extra == "crosshair"
Provides-Extra: dateutil
Requires-Dist: python-dateutil>=1.4; extra == "dateutil"
Provides-Extra: django
diff --git a/contrib/python/hypothesis/py3/hypothesis/strategies/_internal/types.py b/contrib/python/hypothesis/py3/hypothesis/strategies/_internal/types.py
index 85051cfdbe..13e01b0b05 100644
--- a/contrib/python/hypothesis/py3/hypothesis/strategies/_internal/types.py
+++ b/contrib/python/hypothesis/py3/hypothesis/strategies/_internal/types.py
@@ -194,6 +194,12 @@ except AttributeError: # pragma: no cover
extended_get_origin = get_origin # type: ignore
+# Used on `TypeVar` objects with no default:
+NoDefaults = (
+ getattr(typing, "NoDefault", object()),
+ getattr(typing_extensions, "NoDefault", object()),
+)
+
# We use this variable to be sure that we are working with a type from `typing`:
typing_root_type = (typing._Final, typing._GenericAlias) # type: ignore
@@ -440,9 +446,9 @@ __EVAL_TYPE_TAKES_TYPE_PARAMS = (
)
-def _try_import_forward_ref(thing, bound, *, type_params): # pragma: no cover
+def _try_import_forward_ref(thing, typ, *, type_params): # pragma: no cover
"""
- Tries to import a real bound type from ``TypeVar`` bound to a ``ForwardRef``.
+ Tries to import a real bound or default type from ``ForwardRef`` in ``TypeVar``.
This function is very "magical" to say the least, please don't use it.
This function fully covered, but is excluded from coverage
@@ -452,13 +458,13 @@ def _try_import_forward_ref(thing, bound, *, type_params): # pragma: no cover
kw = {"globalns": vars(sys.modules[thing.__module__]), "localns": None}
if __EVAL_TYPE_TAKES_TYPE_PARAMS:
kw["type_params"] = type_params
- return typing._eval_type(bound, **kw)
+ return typing._eval_type(typ, **kw)
except (KeyError, AttributeError, NameError):
# We fallback to `ForwardRef` instance, you can register it as a type as well:
# >>> from typing import ForwardRef
# >>> from hypothesis import strategies as st
# >>> st.register_type_strategy(ForwardRef('YourType'), your_strategy)
- return bound
+ return typ
def from_typing_type(thing):
@@ -1082,25 +1088,39 @@ def resolve_Callable(thing):
@register(typing.TypeVar)
+@register("TypeVar", module=typing_extensions)
def resolve_TypeVar(thing):
type_var_key = f"typevar={thing!r}"
- if getattr(thing, "__bound__", None) is not None:
- bound = thing.__bound__
- if isinstance(bound, typing.ForwardRef):
+ bound = getattr(thing, "__bound__", None)
+ default = getattr(thing, "__default__", NoDefaults[0])
+ original_strategies = []
+
+ def resolve_strategies(typ):
+ if isinstance(typ, typing.ForwardRef):
# TODO: on Python 3.13 and later, we should work out what type_params
# could be part of this type, and pass them in here.
- bound = _try_import_forward_ref(thing, bound, type_params=())
- strat = unwrap_strategies(st.from_type(bound))
+ typ = _try_import_forward_ref(thing, typ, type_params=())
+ strat = unwrap_strategies(st.from_type(typ))
if not isinstance(strat, OneOfStrategy):
- return strat
- # The bound was a union, or we resolved it as a union of subtypes,
+ original_strategies.append(strat)
+ else:
+ original_strategies.extend(strat.original_strategies)
+
+ if bound is not None:
+ resolve_strategies(bound)
+ if default not in NoDefaults: # pragma: no cover
+ # Coverage requires 3.13 or `typing_extensions` package.
+ resolve_strategies(default)
+
+ if original_strategies:
+ # The bound / default was a union, or we resolved it as a union of subtypes,
# so we need to unpack the strategy to ensure consistency across uses.
# This incantation runs a sampled_from over the strategies inferred for
# each part of the union, wraps that in shared so that we only generate
# from one type per testcase, and flatmaps that back to instances.
return st.shared(
- st.sampled_from(strat.original_strategies), key=type_var_key
+ st.sampled_from(original_strategies), key=type_var_key
).flatmap(lambda s: s)
builtin_scalar_types = [type(None), bool, int, float, str, bytes]
diff --git a/contrib/python/hypothesis/py3/hypothesis/version.py b/contrib/python/hypothesis/py3/hypothesis/version.py
index e02ca3db87..0c7cb254d5 100644
--- a/contrib/python/hypothesis/py3/hypothesis/version.py
+++ b/contrib/python/hypothesis/py3/hypothesis/version.py
@@ -8,5 +8,5 @@
# v. 2.0. If a copy of the MPL was not distributed with this file, You can
# obtain one at https://mozilla.org/MPL/2.0/.
-__version_info__ = (6, 110, 1)
+__version_info__ = (6, 110, 2)
__version__ = ".".join(map(str, __version_info__))
diff --git a/contrib/python/hypothesis/py3/ya.make b/contrib/python/hypothesis/py3/ya.make
index 44835a36ca..8113ee865a 100644
--- a/contrib/python/hypothesis/py3/ya.make
+++ b/contrib/python/hypothesis/py3/ya.make
@@ -2,7 +2,7 @@
PY3_LIBRARY()
-VERSION(6.110.1)
+VERSION(6.110.2)
LICENSE(MPL-2.0)
diff --git a/yt/yt/client/hedging/config.cpp b/yt/yt/client/hedging/config.cpp
index 9c25a6e13f..715b6599e0 100644
--- a/yt/yt/client/hedging/config.cpp
+++ b/yt/yt/client/hedging/config.cpp
@@ -1,65 +1,25 @@
#include "config.h"
-#include "counter.h"
-#include "rpc.h"
-
namespace NYT::NClient::NHedging::NRpc {
////////////////////////////////////////////////////////////////////////////////
-void TClientConfig::Register(TRegistrar registrar)
+void TConnectionWithPenaltyConfig::Register(TRegistrar registrar)
{
- registrar.Parameter("connection", &TThis::Connection);
- registrar.Parameter("initial_penalty", &TThis::InitialPenalty);
+ registrar.Parameter("initial_penalty", &TThis::InitialPenalty)
+ .Optional();
}
-THedgingClientOptions::TClientOptions::TClientOptions(
- NApi::IClientPtr client,
- const std::string& clusterName,
- TDuration initialPenalty,
- TCounterPtr counter)
- : Client(std::move(client))
- , ClusterName(clusterName)
- , InitialPenalty(initialPenalty)
- , Counter(std::move(counter))
-{ }
-
-THedgingClientOptions::TClientOptions::TClientOptions(
- NApi::IClientPtr client,
- TDuration initialPenalty,
- TCounterPtr counter)
- : TClientOptions(client, "default", initialPenalty, counter)
-{ }
-
void THedgingClientOptions::Register(TRegistrar registrar)
{
- registrar.Parameter("client_configs", &TThis::ClientConfigs)
- .Default();
+ registrar.Parameter("connections", &TThis::Connections)
+ .NonEmpty();
registrar.Parameter("ban_penalty", &TThis::BanPenalty)
.Default(TDuration::MilliSeconds(1));
registrar.Parameter("ban_duration", &TThis::BanDuration)
.Default(TDuration::MilliSeconds(50));
registrar.Parameter("tags", &TThis::Tags)
.Default();
-
- registrar.Postprocessor([] (TThis* config) {
- NProfiling::TTagSet counterTagSet;
-
- for (const auto& [tagName, tagValue] : config->Tags) {
- counterTagSet.AddTag(NProfiling::TTag(tagName, tagValue));
- }
-
- config->Clients.reserve(config->Clients.size());
- for (const auto& client : config->ClientConfigs) {
- THROW_ERROR_EXCEPTION_UNLESS(client->Connection->ClusterUrl, "\"cluster_url\" must be set");
- auto clusterUrl = client->Connection->ClusterUrl.value();
- config->Clients.emplace_back(
- CreateClient(client->Connection),
- clusterUrl,
- client->InitialPenalty,
- New<TCounter>(counterTagSet.WithTag(NProfiling::TTag("yt_cluster", clusterUrl))));
- }
- });
}
////////////////////////////////////////////////////////////////////////////////
diff --git a/yt/yt/client/hedging/config.h b/yt/yt/client/hedging/config.h
index 6e6cc41697..23c11d1ba3 100644
--- a/yt/yt/client/hedging/config.h
+++ b/yt/yt/client/hedging/config.h
@@ -12,58 +12,36 @@ namespace NYT::NClient::NHedging::NRpc {
////////////////////////////////////////////////////////////////////////////////
-struct TClientConfig
- : public virtual NYTree::TYsonStruct
+struct TConnectionWithPenaltyConfig
+ : public virtual NApi::NRpcProxy::TConnectionConfig
{
- NApi::NRpcProxy::TConnectionConfigPtr Connection;
TDuration InitialPenalty;
- REGISTER_YSON_STRUCT(TClientConfig);
+ REGISTER_YSON_STRUCT(TConnectionWithPenaltyConfig);
static void Register(TRegistrar registrar);
};
-DEFINE_REFCOUNTED_TYPE(TClientConfig)
////////////////////////////////////////////////////////////////////////////////
//! The options for hedging client.
+//! TODO(bulatman) Rename to `THedgingClientConfig`.
struct THedgingClientOptions
- : public virtual NYTree::TYsonStructLite
+ : public virtual NYTree::TYsonStruct
{
- std::vector<TClientConfigPtr> ClientConfigs;
-
- struct TClientOptions
- {
- TClientOptions(
- NApi::IClientPtr client,
- const std::string& clusterName,
- TDuration initialPenalty,
- TCounterPtr counter = {});
-
- TClientOptions(
- NApi::IClientPtr client,
- TDuration initialPenalty,
- TCounterPtr counter = {});
-
- NApi::IClientPtr Client;
- std::string ClusterName;
- TDuration InitialPenalty;
- TCounterPtr Counter;
- };
-
+ std::vector<TIntrusivePtr<TConnectionWithPenaltyConfig>> Connections;
TDuration BanPenalty;
TDuration BanDuration;
THashMap<TString, TString> Tags;
- // This parameter is set on postprocessor.
- TVector<TClientOptions> Clients;
-
- REGISTER_YSON_STRUCT_LITE(THedgingClientOptions);
+ REGISTER_YSON_STRUCT(THedgingClientOptions);
static void Register(TRegistrar registrar);
};
+DEFINE_REFCOUNTED_TYPE(THedgingClientOptions)
+
////////////////////////////////////////////////////////////////////////////////
} // namespace NYT::NClient::NHedging::NRpc
diff --git a/yt/yt/client/hedging/counter.cpp b/yt/yt/client/hedging/counter.cpp
index f8a88038f4..2b96cc366d 100644
--- a/yt/yt/client/hedging/counter.cpp
+++ b/yt/yt/client/hedging/counter.cpp
@@ -16,18 +16,15 @@ TCounter::TCounter(const NProfiling::TRegistry& registry)
, EffectivePenalty(registry.TimeGauge("/effective_penalty"))
, ExternalPenalty(registry.TimeGauge("/external_penalty"))
, RequestDuration(registry.TimeHistogram("/request_duration", TDuration::MilliSeconds(1), TDuration::MilliSeconds(70)))
-{
-}
+{ }
TCounter::TCounter(const TString& clusterName)
: TCounter(HedgingClientProfiler.WithTag("yt_cluster", clusterName))
-{
-}
+{ }
TCounter::TCounter(const NProfiling::TTagSet& tagSet)
: TCounter(HedgingClientProfiler.WithTags(tagSet))
-{
-}
+{ }
////////////////////////////////////////////////////////////////////////////////
@@ -43,8 +40,7 @@ TLagPenaltyProviderCounters::TLagPenaltyProviderCounters(const NProfiling::TRegi
TLagPenaltyProviderCounters::TLagPenaltyProviderCounters(const TString& tablePath, const TVector<TString>& clusterNames)
: TLagPenaltyProviderCounters(LagPenaltyProviderProfiler.WithTag("table", tablePath), clusterNames)
-{
-}
+{ }
////////////////////////////////////////////////////////////////////////////////
diff --git a/yt/yt/client/hedging/hedging.cpp b/yt/yt/client/hedging/hedging.cpp
index d78aa78231..f8c6420e92 100644
--- a/yt/yt/client/hedging/hedging.cpp
+++ b/yt/yt/client/hedging/hedging.cpp
@@ -1,9 +1,10 @@
#include "hedging.h"
#include "cache.h"
+#include "config.h"
#include "counter.h"
-#include "logger.h"
#include "rpc.h"
+#include "private.h"
#include <yt/yt/client/api/client.h>
#include <yt/yt/client/api/queue_transaction.h>
@@ -22,9 +23,6 @@
#include <yt/yt_proto/yt/client/hedging/proto/config.pb.h>
-#include <library/cpp/iterator/enumerate.h>
-#include <library/cpp/iterator/zip.h>
-
#include <util/datetime/base.h>
#include <util/generic/va_args.h>
@@ -40,11 +38,11 @@ namespace {
using namespace NYT;
using namespace NApi;
using namespace NYPath;
-using namespace NProfiling;
+using namespace NYTree;
////////////////////////////////////////////////////////////////////////////////
-using TClientBuilder = std::function<NApi::IClientPtr(const TConfig&)>;
+using TClientFactory = std::function<NApi::IClientPtr(const NApi::NRpcProxy::TConnectionConfigPtr&)>;
#define RETRYABLE_METHOD(ReturnType, MethodName, Args) \
ReturnType MethodName(Y_METHOD_USED_ARGS_DECLARATION(Args)) override { \
@@ -59,15 +57,15 @@ class THedgingClient
: public IClient
{
public:
- THedgingClient(const THedgingClientOptions& options, const IPenaltyProviderPtr& penaltyProvider)
- : Executor_(New<THedgingExecutor>(options, penaltyProvider))
+ THedgingClient(THedgingExecutorPtr hedgingExecutor)
+ : Executor_(std::move(hedgingExecutor))
{ }
// IClientBase methods.
// Supported methods.
IConnectionPtr GetConnection() override
{
- return Executor_->GetConnection();
+ return Executor_->GetClient(0)->GetConnection();
}
std::optional<TStringBuf> GetClusterName(bool fetchIfNull = true) override
@@ -98,7 +96,7 @@ public:
UNSUPPORTED_METHOD(TFuture<ITransactionPtr>, StartTransaction, (NTransactionClient::ETransactionType, const TTransactionStartOptions&));
UNSUPPORTED_METHOD(TFuture<ITableWriterPtr>, CreateTableWriter, (const TRichYPath&, const TTableWriterOptions&));
UNSUPPORTED_METHOD(TFuture<void>, SetNode, (const TYPath&, const NYson::TYsonString&, const TSetNodeOptions&));
- UNSUPPORTED_METHOD(TFuture<void>, MultisetAttributesNode, (const TYPath&, const NYTree::IMapNodePtr&, const TMultisetAttributesNodeOptions&));
+ UNSUPPORTED_METHOD(TFuture<void>, MultisetAttributesNode, (const TYPath&, const IMapNodePtr&, const TMultisetAttributesNodeOptions&));
UNSUPPORTED_METHOD(TFuture<void>, RemoveNode, (const TYPath&, const TRemoveNodeOptions&));
UNSUPPORTED_METHOD(TFuture<NCypressClient::TNodeId>, CreateNode, (const TYPath&, NObjectClient::EObjectType, const TCreateNodeOptions&));
UNSUPPORTED_METHOD(TFuture<TLockNodeResult>, LockNode, (const TYPath&, NCypressClient::ELockMode, const TLockNodeOptions&));
@@ -152,10 +150,10 @@ public:
UNSUPPORTED_METHOD(TFuture<TPutFileToCacheResult>, PutFileToCache, (const TYPath&, const TString&, const TPutFileToCacheOptions&));
UNSUPPORTED_METHOD(TFuture<void>, AddMember, (const TString&, const TString&, const TAddMemberOptions&));
UNSUPPORTED_METHOD(TFuture<void>, RemoveMember, (const TString&, const TString&, const TRemoveMemberOptions&));
- UNSUPPORTED_METHOD(TFuture<TCheckPermissionResponse>, CheckPermission, (const TString&, const TYPath&, NYTree::EPermission, const TCheckPermissionOptions&));
- UNSUPPORTED_METHOD(TFuture<TCheckPermissionByAclResult>, CheckPermissionByAcl, (const std::optional<TString>&, NYTree::EPermission, NYTree::INodePtr, const TCheckPermissionByAclOptions&));
- UNSUPPORTED_METHOD(TFuture<void>, TransferAccountResources, (const TString&, const TString&, NYTree::INodePtr, const TTransferAccountResourcesOptions&));
- UNSUPPORTED_METHOD(TFuture<void>, TransferPoolResources, (const TString&, const TString&, const TString&, NYTree::INodePtr, const TTransferPoolResourcesOptions&));
+ UNSUPPORTED_METHOD(TFuture<TCheckPermissionResponse>, CheckPermission, (const TString&, const TYPath&, EPermission, const TCheckPermissionOptions&));
+ UNSUPPORTED_METHOD(TFuture<TCheckPermissionByAclResult>, CheckPermissionByAcl, (const std::optional<TString>&, EPermission, INodePtr, const TCheckPermissionByAclOptions&));
+ UNSUPPORTED_METHOD(TFuture<void>, TransferAccountResources, (const TString&, const TString&, INodePtr, const TTransferAccountResourcesOptions&));
+ UNSUPPORTED_METHOD(TFuture<void>, TransferPoolResources, (const TString&, const TString&, const TString&, INodePtr, const TTransferPoolResourcesOptions&));
UNSUPPORTED_METHOD(TFuture<NScheduler::TOperationId>, StartOperation, (NScheduler::EOperationType, const NYson::TYsonString&, const TStartOperationOptions&));
UNSUPPORTED_METHOD(TFuture<void>, AbortOperation, (const NScheduler::TOperationIdOrAlias&, const TAbortOperationOptions&));
UNSUPPORTED_METHOD(TFuture<void>, SuspendOperation, (const NScheduler::TOperationIdOrAlias&, const TSuspendOperationOptions&));
@@ -236,75 +234,118 @@ private:
const THedgingExecutorPtr Executor_;
};
+NApi::IClientPtr DoCreateHedgingClient(
+ const THedgingClientOptionsPtr& config,
+ const IPenaltyProviderPtr& penaltyProvider,
+ TClientFactory clientFactory)
+{
+ NProfiling::TTagSet counterTagSet;
+ for (const auto& [tagName, tagValue] : config->Tags) {
+ counterTagSet.AddTag(NProfiling::TTag(tagName, tagValue));
+ }
+
+ std::vector<THedgingExecutor::TNode> executorNodes;
+ executorNodes.reserve(config->Connections.size());
+ for (auto& connectionConfig : config->Connections) {
+ YT_VERIFY(connectionConfig->ClusterUrl);
+ const auto& clusterName = connectionConfig->ClusterName.value_or(*connectionConfig->ClusterUrl);
+ executorNodes.push_back({
+ .Client = clientFactory(connectionConfig),
+ .Counter = New<TCounter>(counterTagSet.WithTag(NProfiling::TTag("yt_cluster", clusterName))),
+ .ClusterName = clusterName,
+ .InitialPenalty = connectionConfig->InitialPenalty,
+ });
+ }
+
+ return New<THedgingClient>(
+ New<THedgingExecutor>(executorNodes, config->BanPenalty, config->BanDuration, penaltyProvider));
+}
+
} // namespace
////////////////////////////////////////////////////////////////////////////////
-NApi::IClientPtr CreateHedgingClient(const THedgingClientOptions& options)
+NApi::IClientPtr CreateHedgingClient(const THedgingExecutorPtr& hedgingExecutor)
{
- return New<THedgingClient>(options, CreateDummyPenaltyProvider());
+ return New<THedgingClient>(hedgingExecutor);
}
NApi::IClientPtr CreateHedgingClient(
- const THedgingClientOptions& options,
+ const THedgingClientOptionsPtr& config,
const IPenaltyProviderPtr& penaltyProvider)
{
- return New<THedgingClient>(options, penaltyProvider);
-}
-
-NApi::IClientPtr CreateHedgingClient(const THedgingClientConfig& config)
-{
- return CreateHedgingClient(GetHedgingClientOptions(config));
+ return DoCreateHedgingClient(
+ config,
+ penaltyProvider,
+ [] (const NApi::NRpcProxy::TConnectionConfigPtr& connectionConfig) {
+ return CreateClient(connectionConfig);
+ });
}
-NApi::IClientPtr CreateHedgingClient(const THedgingClientConfig& config, const IClientsCachePtr& clientsCache)
+NApi::IClientPtr CreateHedgingClient(const THedgingClientOptionsPtr& config)
{
- return CreateHedgingClient(GetHedgingClientOptions(config, clientsCache));
+ return CreateHedgingClient(config, CreateDummyPenaltyProvider());
}
NApi::IClientPtr CreateHedgingClient(
- const THedgingClientConfig& config,
+ const THedgingClientOptionsPtr& config,
const IClientsCachePtr& clientsCache,
const IPenaltyProviderPtr& penaltyProvider)
{
- return CreateHedgingClient(GetHedgingClientOptions(config, clientsCache), penaltyProvider);
+ return DoCreateHedgingClient(
+ config,
+ penaltyProvider,
+ [&] (const NApi::NRpcProxy::TConnectionConfigPtr& connectionConfig) {
+ YT_VERIFY(connectionConfig->ClusterUrl);
+ return clientsCache->GetClient(*connectionConfig->ClusterUrl);
+ });
}
-THedgingClientOptions GetHedgingClientOptions(const THedgingClientConfig& config, TClientBuilder clientBuilder)
+NApi::IClientPtr CreateHedgingClient(const THedgingClientOptionsPtr& config, const IClientsCachePtr& clientsCache)
{
- THedgingClientOptions options;
- options.BanPenalty = TDuration::MilliSeconds(config.GetBanPenalty());
- options.BanDuration = TDuration::MilliSeconds(config.GetBanDuration());
+ return CreateHedgingClient(config, clientsCache, CreateDummyPenaltyProvider());
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+THedgingClientOptionsPtr GetHedgingClientConfig(const THedgingClientConfig& protoConfig)
+{
+ auto config = New<THedgingClientOptions>();
+ config->BanPenalty = TDuration::MilliSeconds(protoConfig.GetBanPenalty());
+ config->BanDuration = TDuration::MilliSeconds(protoConfig.GetBanDuration());
NProfiling::TTagSet counterTagSet;
- for (const auto& [tagName, tagValue] : config.GetTags()) {
- counterTagSet.AddTag(NProfiling::TTag(tagName, tagValue));
+ for (const auto& [tagName, tagValue] : protoConfig.GetTags()) {
+ config->Tags.emplace(tagName, tagValue);
}
- options.Clients.reserve(config.GetClients().size());
- for (const auto& client : config.GetClients()) {
- options.Clients.emplace_back(
- clientBuilder(client.GetClientConfig()),
- client.GetClientConfig().GetClusterName(),
- TDuration::MilliSeconds(client.GetInitialPenalty()),
- New<TCounter>(counterTagSet.WithTag(NProfiling::TTag("yt_cluster", client.GetClientConfig().GetClusterName()))));
+ config->Connections.reserve(protoConfig.GetClients().size());
+ for (const auto& client : protoConfig.GetClients()) {
+ auto connectionConfig = ConvertTo<NYT::TIntrusivePtr<TConnectionWithPenaltyConfig>>(
+ GetConnectionConfig(client.GetClientConfig()));
+ connectionConfig->InitialPenalty = TDuration::MilliSeconds(client.GetInitialPenalty());
+ config->Connections.push_back(std::move(connectionConfig));
}
- return options;
+ return config;
}
-THedgingClientOptions GetHedgingClientOptions(const THedgingClientConfig& config)
+NApi::IClientPtr CreateHedgingClient(const THedgingClientConfig& config)
{
- return GetHedgingClientOptions(config, [] (const auto& clientConfig) {
- return CreateClient(clientConfig);
- });
+ return CreateHedgingClient(GetHedgingClientConfig(config));
}
-THedgingClientOptions GetHedgingClientOptions(const THedgingClientConfig& config, const IClientsCachePtr& clientsCache)
+NApi::IClientPtr CreateHedgingClient(const THedgingClientConfig& config, const IClientsCachePtr& clientsCache)
+{
+ return CreateHedgingClient(GetHedgingClientConfig(config), clientsCache);
+}
+
+NApi::IClientPtr CreateHedgingClient(
+ const THedgingClientConfig& config,
+ const IClientsCachePtr& clientsCache,
+ const IPenaltyProviderPtr& penaltyProvider)
{
- return GetHedgingClientOptions(config, [clientsCache] (const auto& clientConfig) {
- return clientsCache->GetClient(clientConfig.GetClusterName());
- });
+ return CreateHedgingClient(GetHedgingClientConfig(config), clientsCache, penaltyProvider);
}
////////////////////////////////////////////////////////////////////////////////
diff --git a/yt/yt/client/hedging/hedging.h b/yt/yt/client/hedging/hedging.h
index 02dbe84561..23c127e040 100644
--- a/yt/yt/client/hedging/hedging.h
+++ b/yt/yt/client/hedging/hedging.h
@@ -9,24 +9,26 @@
#include <yt/yt/client/cache/public.h>
-// @brief HedgingClient is a wrapper for several YT-clients with ability
-// to retry asynchronously the same request with different underlying-clients.
-// HedgingClient implements IClient interface and supports methods that do not change state of data on YT.
-// Currently supported methods: LookupRows, VersionedLookupRows, SelectRows, ExplainQuery,
-// CreateTableReader, GetNode, ListNode, NodeExists, CreateFileReader
-//
-// For initial configuration every YT-client needs an InitialPenalty value.
-// This value is used to determine in which order YT-clients will be used.
-//
-// MinInitialPenalty - minimal retry timeout value out of all YT-clients.
-// EffectivePenalty - is a delay value for starting a request with a corresponding YT-client.
-// For every client this value is calculated as: InitialPenalty - MinInitialPenalty.
-//
-// If any of the clients responses with a success result: requests to other YT-clients are cancelled.
-//
-// If any of the clients responses with an error: it's InitialPenalty is
-// increased by BanPenalty value for the next BanDuration time interval.
-// Both BanPenalty and BanDuration values are set in MultiClientCluster config.
+/*! HedgingClient is a wrapper for several YT-clients with ability
+ * to retry asynchronously the same request with different underlying-clients.
+ * HedgingClient implements IClient interface and supports methods that do not change state of data on YT.
+ * Currently supported methods: LookupRows, VersionedLookupRows, SelectRows, ExplainQuery,
+ * CreateTableReader, GetNode, ListNode, NodeExists, CreateFileReader
+ *
+ * For initial configuration every YT-client needs an InitialPenalty value.
+ * This value is used to determine in which order YT-clients will be used.
+ *
+ * MinInitialPenalty - minimal retry timeout value out of all YT-clients.
+ * EffectivePenalty - is a delay value for starting a request with a corresponding YT-client.
+ * For every client this value is calculated as: InitialPenalty - MinInitialPenalty.
+ *
+ * If any of the clients responses with a success result: requests to other YT-clients are cancelled.
+ *
+ * If any of the clients responses with an error: it's InitialPenalty is
+ * increased by BanPenalty value for the next BanDuration time interval.
+ * Both BanPenalty and BanDuration values are set in MultiClientCluster config.
+ */
+
namespace NYT::NClient::NHedging::NRpc {
////////////////////////////////////////////////////////////////////////////////
@@ -36,34 +38,40 @@ using NCache::IClientsCachePtr;
// from config.proto
class THedgingClientConfig;
-// @brief Options for hedging client.
-// from hedging_executor.h
-struct THedgingClientOptions;
-
////////////////////////////////////////////////////////////////////////////////
-// @brief Method for creating HedgingClient with given options.
-NApi::IClientPtr CreateHedgingClient(const THedgingClientOptions& options);
+NApi::IClientPtr CreateHedgingClient(const THedgingExecutorPtr& hedgingExecutor);
+
+//! Method for creating HedgingClient with given options.
+NApi::IClientPtr CreateHedgingClient(const THedgingClientOptionsPtr& config);
+
+//! Method for creating HedgingClient with given options and ability to use penalty updater policy.
+//! Currently for experimental usage.
+NApi::IClientPtr CreateHedgingClient(const THedgingClientOptionsPtr& config, const IPenaltyProviderPtr& penaltyProvider);
+
+//! Method for creating HedgingClient with given rpc clients config and preinitialized clients.
+NApi::IClientPtr CreateHedgingClient(const THedgingClientOptionsPtr& config, const IClientsCachePtr& clientsCache);
-// @brief Method for creating HedgingClient with given options and ability to use penalty updater policy.
-// Currently for experimental usage.
-NApi::IClientPtr CreateHedgingClient(const THedgingClientOptions& options, const IPenaltyProviderPtr& penaltyProvider);
+//! Method for creating HedgingClient with given rpc clients config, preinitialized clients and PenaltyProvider.
+NApi::IClientPtr CreateHedgingClient(
+ const THedgingClientOptionsPtr& config,
+ const IClientsCachePtr& clientsCache,
+ const IPenaltyProviderPtr& penaltyProvider);
-// @brief Method for creating HedgingClient with given rpc clients config.
+// The following methods should be moved to `ads/bsyeti/libs/ytex/client`.
+
+//! Method for creating HedgingClient options from given config and preinitialized clients.
+THedgingClientOptionsPtr GetHedgingClientConfig(const THedgingClientConfig& config);
+
+//! Method for creating HedgingClient with given rpc clients config.
NApi::IClientPtr CreateHedgingClient(const THedgingClientConfig& config);
-// @brief Method for creating HedgingClient with given rpc clients config and preinitialized clients.
+//! Method for creating HedgingClient with given rpc clients config and preinitialized clients.
NApi::IClientPtr CreateHedgingClient(const THedgingClientConfig& config, const IClientsCachePtr& clientsCache);
-// @brief Method for creating HedgingClient with given rpc clients config, preinitialized clients and PenaltyProvider.
+//! Method for creating HedgingClient with given rpc clients config, preinitialized clients and PenaltyProvider.
NApi::IClientPtr CreateHedgingClient(const THedgingClientConfig& config, const IClientsCachePtr& clientsCache, const IPenaltyProviderPtr& penaltyProvider);
-// @brief Method for creating HedgingClient options from given config and preinitialized clients.
-THedgingClientOptions GetHedgingClientOptions(const THedgingClientConfig& config, const IClientsCachePtr& clientsCache);
-
-// @brief Method for creating HedgingClient options from given config.
-THedgingClientOptions GetHedgingClientOptions(const THedgingClientConfig& config);
-
////////////////////////////////////////////////////////////////////////////////
} // namespace NYT::NClient::NHedging::NRpc
diff --git a/yt/yt/client/hedging/hedging_executor.cpp b/yt/yt/client/hedging/hedging_executor.cpp
index 3459dfa19e..5541464e70 100644
--- a/yt/yt/client/hedging/hedging_executor.cpp
+++ b/yt/yt/client/hedging/hedging_executor.cpp
@@ -1,6 +1,6 @@
#include "hedging_executor.h"
-#include "logger.h"
+#include "private.h"
#include <yt/yt/core/logging/log.h>
@@ -9,72 +9,61 @@ namespace NYT::NClient::NHedging::NRpc {
////////////////////////////////////////////////////////////////////////////////
-THedgingExecutor::THedgingExecutor(const THedgingClientOptions& options, const IPenaltyProviderPtr& penaltyProvider)
- : BanPenalty_(NProfiling::DurationToCpuDuration(options.BanPenalty))
- , BanDuration_(NProfiling::DurationToCpuDuration(options.BanDuration))
+THedgingExecutor::THedgingExecutor(
+ const std::vector<TNode>& nodes,
+ TDuration banPenalty,
+ TDuration banDuration,
+ const IPenaltyProviderPtr& penaltyProvider)
+ : BanPenalty_(banPenalty)
+ , BanDuration_(banDuration)
, PenaltyProvider_(penaltyProvider)
{
- Y_ENSURE(!options.Clients.empty(), "Clients should not be empty!");
- for (const auto& clientOptions : options.Clients) {
- Y_ENSURE(clientOptions.Client, "Client pointer should be valid!");
- Clients_.emplace_back(
- clientOptions.Client,
- NProfiling::DurationToCpuDuration(clientOptions.InitialPenalty),
- clientOptions.Counter ? clientOptions.Counter : New<TCounter>(clientOptions.Client->GetConnection()->GetClusterId()),
- clientOptions.ClusterName);
+ Nodes_.reserve(nodes.size());
+ for (const auto& node : nodes) {
+ Nodes_.push_back({node});
}
}
-NApi::IConnectionPtr THedgingExecutor::GetConnection()
+NApi::IClientPtr THedgingExecutor::GetClient(int index)
{
- return Clients_[0].Client->GetConnection();
+ return Nodes_.at(index).Client;
}
void THedgingExecutor::OnFinishRequest(
- size_t clientIndex,
+ int index,
TDuration effectivePenalty,
- NProfiling::TCpuDuration adaptivePenalty,
- NProfiling::TCpuDuration externalPenalty,
- NProfiling::TCpuInstant start,
+ TDuration adaptivePenalty,
+ TDuration externalPenalty,
+ TInstant start,
const TError& error)
{
- auto& clientInfo = Clients_[clientIndex];
+ auto& node = Nodes_[index];
if (error.IsOK()) {
if (adaptivePenalty) {
TGuard guard(SpinLock_);
- clientInfo.BanUntil = Max<NProfiling::TCpuInstant>();
- clientInfo.AdaptivePenalty = 0;
+ node.BanUntil = TInstant::Max();
+ node.AdaptivePenalty = TDuration::Zero();
}
- clientInfo.Counter->SuccessRequestCount.Increment();
- clientInfo.Counter->RequestDuration.Record(NProfiling::CpuDurationToDuration(NProfiling::GetCpuInstant() - start));
+ node.Counter->SuccessRequestCount.Increment();
+ node.Counter->RequestDuration.Record(TInstant::Now() - start);
} else if (effectivePenalty && (error.GetCode() == EErrorCode::Canceled || error.GetCode() == EErrorCode::FutureCombinerShortcut)) {
- clientInfo.Counter->CancelRequestCount.Increment();
+ node.Counter->CancelRequestCount.Increment();
} else {
- with_lock (SpinLock_) {
- clientInfo.BanUntil = NProfiling::GetCpuInstant() + BanDuration_;
- clientInfo.AdaptivePenalty += BanPenalty_;
+ auto banUntil = TInstant::Now() + BanDuration_;
+ {
+ TGuard guard(SpinLock_);
+ node.BanUntil = banUntil;
+ node.AdaptivePenalty += BanPenalty_;
}
- clientInfo.Counter->ErrorRequestCount.Increment();
- YT_LOG_WARNING("client#%v failed with error %v", clientIndex, error);
+ node.Counter->ErrorRequestCount.Increment();
+ YT_LOG_WARNING(error, "Cluster banned (ClusterName: %v, Deadline: %v)",
+ node.ClusterName,
+ banUntil);
}
- clientInfo.Counter->EffectivePenalty.Update(effectivePenalty);
- clientInfo.Counter->ExternalPenalty.Update(NProfiling::CpuDurationToDuration(externalPenalty));
+ node.Counter->EffectivePenalty.Update(effectivePenalty);
+ node.Counter->ExternalPenalty.Update(externalPenalty);
}
-THedgingExecutor::TEntry::TEntry(
- NApi::IClientPtr client,
- NProfiling::TCpuDuration initialPenalty,
- TCounterPtr counter,
- const std::string& clusterName)
- : Client(std::move(client))
- , ClusterName(clusterName)
- , AdaptivePenalty(0)
- , InitialPenalty(initialPenalty)
- , ExternalPenalty(0)
- , BanUntil(Max<NProfiling::TCpuInstant>())
- , Counter(std::move(counter))
-{ }
-
////////////////////////////////////////////////////////////////////////////////
} // namespace NYT::NClient::NHedging::NRpc
diff --git a/yt/yt/client/hedging/hedging_executor.h b/yt/yt/client/hedging/hedging_executor.h
index cd07651e6f..dd0b4f5c8e 100644
--- a/yt/yt/client/hedging/hedging_executor.h
+++ b/yt/yt/client/hedging/hedging_executor.h
@@ -1,107 +1,115 @@
#pragma once
-#include "config.h"
#include "counter.h"
#include "penalty_provider.h"
#include "public.h"
#include <yt/yt/client/api/client.h>
-#include <yt/yt/core/profiling/timing.h>
-
#include <yt/yt/core/rpc/dispatcher.h>
-#include <library/cpp/iterator/enumerate.h>
+#include <library/cpp/yt/threading/spin_lock.h>
#include <util/datetime/base.h>
#include <util/generic/string.h>
-#include <util/generic/vector.h>
-#include <util/system/spinlock.h>
+#include <vector>
namespace NYT::NClient::NHedging::NRpc {
////////////////////////////////////////////////////////////////////////////////
-DECLARE_REFCOUNTED_CLASS(THedgingExecutor)
-
class THedgingExecutor final
{
public:
- THedgingExecutor(const THedgingClientOptions& options, const IPenaltyProviderPtr& penaltyProvider);
+ struct TNode
+ {
+ NApi::IClientPtr Client;
+ TCounterPtr Counter;
+ std::string ClusterName;
+ TDuration InitialPenalty;
+ };
+
+ THedgingExecutor(
+ const std::vector<TNode>& nodes,
+ TDuration banPenalty,
+ TDuration banDuration,
+ const IPenaltyProviderPtr& penaltyProvider);
- NApi::IConnectionPtr GetConnection();
+ NApi::IClientPtr GetClient(int index);
template <typename T>
TFuture<T> DoWithHedging(TCallback<TFuture<T>(NApi::IClientPtr)> callback)
{
- auto now = NProfiling::GetCpuInstant();
- auto clients = [&] {
+ auto now = TInstant::Now();
+ auto nodes = [&] {
TGuard guard(SpinLock_);
- for (auto& client : Clients_) {
- if (client.BanUntil < now) {
- client.AdaptivePenalty = 0;
+ for (auto& node : Nodes_) {
+ if (node.BanUntil < now) {
+ node.AdaptivePenalty = TDuration::Zero();
}
}
-
- return Clients_;
+ return Nodes_;
}();
- NProfiling::TCpuDuration minInitialPenalty = Max<i64>();
- for (auto& client : clients) {
- client.ExternalPenalty = PenaltyProvider_->Get(client.ClusterName);
- NProfiling::TCpuDuration currentInitialPenalty = client.InitialPenalty + client.AdaptivePenalty + client.ExternalPenalty;
- minInitialPenalty = Min(minInitialPenalty, currentInitialPenalty);
+ auto minInitialPenalty = TDuration::Max();
+ for (auto& node : nodes) {
+ node.ExternalPenalty = PenaltyProvider_->Get(node.ClusterName);
+ auto currentInitialPenalty = node.InitialPenalty + node.AdaptivePenalty + node.ExternalPenalty;
+ minInitialPenalty = std::min(minInitialPenalty, currentInitialPenalty);
}
- TVector<TFuture<T>> futures(Reserve(clients.size()));
- for (auto [i, client] : Enumerate(clients)) {
- TDuration effectivePenalty = NProfiling::CpuDurationToDuration(client.InitialPenalty + client.AdaptivePenalty + client.ExternalPenalty - minInitialPenalty);
- if (effectivePenalty) {
- auto delayedFuture = NConcurrency::TDelayedExecutor::MakeDelayed(effectivePenalty, NYT::NRpc::TDispatcher::Get()->GetHeavyInvoker());
- futures.push_back(delayedFuture.Apply(BIND(callback, client.Client)));
+ std::vector<TFuture<T>> futures;
+ futures.reserve(nodes.size());
+ for (int i = 0; i != std::ssize(nodes); ++i) {
+ const auto& node = nodes[i];
+ auto penalty = node.InitialPenalty + node.AdaptivePenalty + node.ExternalPenalty - minInitialPenalty;
+ if (penalty) {
+ auto delayedFuture = NConcurrency::TDelayedExecutor::MakeDelayed(
+ penalty,
+ NYT::NRpc::TDispatcher::Get()->GetHeavyInvoker());
+ futures.push_back(delayedFuture.Apply(BIND(callback, node.Client)));
} else {
- futures.push_back(callback(client.Client));
+ futures.push_back(callback(node.Client));
}
- futures.back().Subscribe(BIND(&THedgingExecutor::OnFinishRequest, MakeWeak(this), i, effectivePenalty, client.AdaptivePenalty, client.ExternalPenalty, now));
+ futures.back().Subscribe(BIND(
+ &THedgingExecutor::OnFinishRequest,
+ MakeWeak(this),
+ i,
+ penalty,
+ node.AdaptivePenalty,
+ node.ExternalPenalty,
+ now));
}
return AnySucceeded(std::move(futures));
}
private:
- void OnFinishRequest(
- size_t clientIndex,
- TDuration effectivePenalty,
- NProfiling::TCpuDuration adaptivePenalty,
- NProfiling::TCpuDuration externalPenalty,
- NProfiling::TCpuInstant start,
- const TError& r);
-
- struct TEntry
+ struct TNodeExtended
+ : TNode
{
- TEntry(
- NApi::IClientPtr client,
- NProfiling::TCpuDuration initialPenalty,
- TCounterPtr counter,
- const std::string& clusterName);
-
- NApi::IClientPtr Client;
- std::string ClusterName;
- NProfiling::TCpuDuration AdaptivePenalty;
- NProfiling::TCpuDuration InitialPenalty;
- NProfiling::TCpuDuration ExternalPenalty;
- NProfiling::TCpuInstant BanUntil;
- TCounterPtr Counter;
+ TDuration AdaptivePenalty = TDuration::Zero();
+ TDuration ExternalPenalty = TDuration::Zero();
+ TInstant BanUntil = TInstant::Max();
};
- TVector<TEntry> Clients_;
- NProfiling::TCpuDuration BanPenalty_;
- NProfiling::TCpuDuration BanDuration_;
+ std::vector<TNodeExtended> Nodes_;
+ TDuration BanPenalty_;
+ TDuration BanDuration_;
IPenaltyProviderPtr PenaltyProvider_;
- TSpinLock SpinLock_;
+
+ YT_DECLARE_SPIN_LOCK(NThreading::TSpinLock, SpinLock_);
+
+ void OnFinishRequest(
+ int index,
+ TDuration effectivePenalty,
+ TDuration adaptivePenalty,
+ TDuration externalPenalty,
+ TInstant start,
+ const TError& error);
};
DEFINE_REFCOUNTED_TYPE(THedgingExecutor)
diff --git a/yt/yt/client/hedging/penalty_provider.cpp b/yt/yt/client/hedging/penalty_provider.cpp
index 8021ee0f2d..f4b717fffb 100644
--- a/yt/yt/client/hedging/penalty_provider.cpp
+++ b/yt/yt/client/hedging/penalty_provider.cpp
@@ -1,7 +1,7 @@
#include "penalty_provider.h"
#include "counter.h"
-#include "logger.h"
+#include "private.h"
#include "public.h"
#include <yt/yt_proto/yt/client/hedging/proto/config.pb.h>
@@ -14,8 +14,6 @@
#include <yt/yt/core/misc/error.h>
-#include <yt/yt/core/profiling/timing.h>
-
#include <yt/yt/core/rpc/dispatcher.h>
#include <util/generic/hash.h>
@@ -33,9 +31,9 @@ class TDummyLagProvider
: public IPenaltyProvider
{
public:
- NProfiling::TCpuDuration Get(const std::string& /*cluster*/) override
+ TDuration Get(const std::string& /*cluster*/) override
{
- return 0;
+ return TDuration::Zero();
}
};
@@ -48,7 +46,7 @@ public:
TLagPenaltyProvider(const TReplicationLagPenaltyProviderConfig& config, NApi::IClientPtr client)
: TablePath_(config.GetTablePath())
, MaxTabletLag_(TDuration::Seconds(config.GetMaxTabletLag()))
- , LagPenalty_(NProfiling::DurationToCpuDuration(TDuration::MilliSeconds(config.GetLagPenalty())))
+ , LagPenalty_(TDuration::MilliSeconds(config.GetLagPenalty()))
, MaxTabletsWithLagFraction_(config.GetMaxTabletsWithLagFraction())
, Client_(client)
, ClearPenaltiesOnErrors_(config.GetClearPenaltiesOnErrors())
@@ -59,17 +57,16 @@ public:
BIND(&TLagPenaltyProvider::UpdateCurrentLagPenalty, MakeWeak(this)),
TDuration::Seconds(config.GetCheckPeriod())))
{
- Y_ENSURE(Executor_);
- Y_ENSURE(Client_);
+ YT_VERIFY(Executor_);
+ YT_VERIFY(Client_);
for (const auto& cluster : config.GetReplicaClusters()) {
auto [_, inserted] = ReplicaClusters_.try_emplace(cluster);
- Y_ENSURE(inserted, "Replica cluster " << cluster << " is listed twice");
+ THROW_ERROR_EXCEPTION_UNLESS(inserted, "Replica cluster %v is listed twice", cluster);
}
GetNodeOptions_.Timeout = TDuration::Seconds(5);
GetNodeOptions_.ReadFrom = NApi::EMasterChannelKind::Cache;
-
Executor_->Start();
}
@@ -93,7 +90,10 @@ public:
auto cluster = row.second->AsMap()->GetChildOrThrow("cluster_name")->AsString()->GetValue();
if (auto* info = ReplicaClusters_.FindPtr(cluster)) {
info->ReplicaId = NTabletClient::TTableReplicaId::FromString(row.first);
- YT_LOG_INFO("Found ReplicaId %v for table %v in cluster %v", info->ReplicaId, TablePath_, cluster);
+ YT_LOG_INFO("Found replica (ReplicaId: %v, Cluster: %v, Table: %v)",
+ info->ReplicaId,
+ cluster,
+ TablePath_);
};
}
CheckAllReplicaIdsPresent().ThrowOnError();
@@ -129,15 +129,16 @@ public:
return tabletsWithLag;
}
- NProfiling::TCpuDuration CalculateLagPenalty(const ui64 tabletsCount, const ui64 tabletsWithLag)
+ TDuration CalculateLagPenalty(const ui64 tabletsCount, const ui64 tabletsWithLag)
{
- return tabletsWithLag >= tabletsCount * MaxTabletsWithLagFraction_ ? LagPenalty_ : 0;
+ return tabletsWithLag >= tabletsCount * MaxTabletsWithLagFraction_ ? LagPenalty_ : TDuration::Zero();
}
void UpdateCurrentLagPenalty()
{
try {
- YT_LOG_INFO("Start penalty updater check for: %v", TablePath_);
+ YT_LOG_INFO("Start penalty updater check (Table: %v)",
+ TablePath_);
if (!CheckAllReplicaIdsPresent().IsOK()) {
UpdateReplicaIds();
@@ -151,38 +152,41 @@ public:
for (auto& [cluster, info] : ReplicaClusters_) {
Y_ASSERT(info.ReplicaId);
auto curTabletsWithLag = tabletsWithLag.Value(info.ReplicaId, 0);
- NProfiling::TCpuDuration newLagPenalty = CalculateLagPenalty(tabletsCount, curTabletsWithLag);
- info.CurrentLagPenalty.store(newLagPenalty, std::memory_order::relaxed);
+ auto newLagPenalty = CalculateLagPenalty(tabletsCount, curTabletsWithLag);
+ info.CurrentLagPenalty.store(newLagPenalty.GetValue(), std::memory_order::relaxed);
Counters_->LagTabletsCount.at(cluster).Update(curTabletsWithLag);
- YT_LOG_INFO(
- "Finish penalty updater check (%v: %v/%v tablets lagging => penalty %v ms) for: %v",
- cluster, curTabletsWithLag, tabletsCount,
- NProfiling::CpuDurationToDuration(newLagPenalty).MilliSeconds(),
- TablePath_);
+ YT_LOG_INFO("Finish penalty updater check (Cluster: %v: Table: %v, TabletsWithLag: %v/%v, Penalty: %v)",
+ cluster,
+ TablePath_,
+ curTabletsWithLag,
+ tabletsCount,
+ newLagPenalty);
}
Counters_->SuccessRequestCount.Increment();
} catch (const std::exception& err) {
Counters_->ErrorRequestCount.Increment();
-
- YT_LOG_ERROR("Lag penalty updater for %v failed: %v", TablePath_, err.what());
+ YT_LOG_ERROR(err, "Cannot calculate lag (Table: %v)",
+ TablePath_);
if (ClearPenaltiesOnErrors_) {
for (auto& [cluster, info] : ReplicaClusters_) {
info.CurrentLagPenalty.store(0, std::memory_order::relaxed);
- YT_LOG_INFO("Clearing penalty for cluster %v and table %v", cluster, TablePath_);
+ YT_LOG_INFO("Clearing penalty (Cluster: %v, Table: %v)",
+ cluster,
+ TablePath_);
}
}
}
}
- NProfiling::TCpuDuration Get(const std::string& cluster) override
+ TDuration Get(const std::string& cluster) override
{
if (const auto* info = ReplicaClusters_.FindPtr(cluster)) {
- return info->CurrentLagPenalty.load(std::memory_order::relaxed);
+ return TDuration::FromValue(info->CurrentLagPenalty.load(std::memory_order::relaxed));
}
- return 0;
+ return TDuration::Zero();
}
~TLagPenaltyProvider()
@@ -194,14 +198,14 @@ private:
struct TReplicaInfo
{
NTabletClient::TTableReplicaId ReplicaId;
- std::atomic<NProfiling::TCpuDuration> CurrentLagPenalty = 0;
+ std::atomic<ui64> CurrentLagPenalty = 0;
};
const TString TablePath_;
THashMap<std::string, TReplicaInfo> ReplicaClusters_;
const TDuration MaxTabletLag_;
- const NProfiling::TCpuDuration LagPenalty_;
+ const TDuration LagPenalty_;
const float MaxTabletsWithLagFraction_;
NApi::IClientPtr Client_;
const bool ClearPenaltiesOnErrors_;
diff --git a/yt/yt/client/hedging/penalty_provider.h b/yt/yt/client/hedging/penalty_provider.h
index 4c4d04e975..32071c908b 100644
--- a/yt/yt/client/hedging/penalty_provider.h
+++ b/yt/yt/client/hedging/penalty_provider.h
@@ -15,7 +15,7 @@ namespace NYT::NClient::NHedging::NRpc {
struct IPenaltyProvider
: public TRefCounted
{
- virtual NProfiling::TCpuDuration Get(const std::string& cluster) = 0;
+ virtual TDuration Get(const std::string& cluster) = 0;
};
DEFINE_REFCOUNTED_TYPE(IPenaltyProvider)
diff --git a/yt/yt/client/hedging/logger.cpp b/yt/yt/client/hedging/private.cpp
index 97dd0f6fb4..fa9cdb06a8 100644
--- a/yt/yt/client/hedging/logger.cpp
+++ b/yt/yt/client/hedging/private.cpp
@@ -1,4 +1,4 @@
-#include "logger.h"
+#include "private.h"
namespace NYT::NClient::NHedging::NRpc {
diff --git a/yt/yt/client/hedging/logger.h b/yt/yt/client/hedging/private.h
index d4771e3331..d4771e3331 100644
--- a/yt/yt/client/hedging/logger.h
+++ b/yt/yt/client/hedging/private.h
diff --git a/yt/yt/client/hedging/public.h b/yt/yt/client/hedging/public.h
index 1e976eb392..ee569cc5bd 100644
--- a/yt/yt/client/hedging/public.h
+++ b/yt/yt/client/hedging/public.h
@@ -13,10 +13,13 @@ using NCache::IClientsCachePtr;
DECLARE_REFCOUNTED_STRUCT(TCounter)
DECLARE_REFCOUNTED_STRUCT(TLagPenaltyProviderCounters)
-DECLARE_REFCOUNTED_STRUCT(TClientConfig)
+// TODO(bulatman) Rename to THedgingClientConfig.
+DECLARE_REFCOUNTED_STRUCT(THedgingClientOptions)
DECLARE_REFCOUNTED_STRUCT(IPenaltyProvider)
+DECLARE_REFCOUNTED_CLASS(THedgingExecutor)
+
////////////////////////////////////////////////////////////////////////////////
} // namespace NYT::NClient::NHedging::NRpc
diff --git a/yt/yt/client/hedging/unittests/counters_ut.cpp b/yt/yt/client/hedging/unittests/counters_ut.cpp
index 667cc0813b..db6c0ca37f 100644
--- a/yt/yt/client/hedging/unittests/counters_ut.cpp
+++ b/yt/yt/client/hedging/unittests/counters_ut.cpp
@@ -1,24 +1,19 @@
+#include "helper.h"
+
#include <yt/yt/client/hedging/counter.h>
#include <yt/yt/client/hedging/hedging.h>
+#include <yt/yt/client/hedging/hedging_executor.h>
#include <yt/yt/client/unittests/mock/client.h>
#include <yt/yt/core/concurrency/scheduler.h>
-#include <yt/yt/core/actions/cancelable_context.h>
-
#include <yt/yt/library/profiling/sensor.h>
#include <yt/yt/library/profiling/testing.h>
#include <yt/yt/library/profiling/solomon/registry.h>
-#include <library/cpp/iterator/zip.h>
-
#include <library/cpp/testing/gtest/gtest.h>
-#include <util/generic/vector.h>
-
-#include <util/string/join.h>
-
namespace NYT::NClient::NHedging::NRpc {
using ::testing::_;
@@ -37,20 +32,19 @@ const auto SleepQuantum = TDuration::MilliSeconds(100);
#define EXPECT_DURATION_NEAR(a, b) EXPECT_NEAR(a.MilliSeconds(), b.MilliSeconds(), 1)
+
NApi::IClientPtr CreateTestHedgingClient(
- std::initializer_list<NApi::IClientPtr> clients,
- std::initializer_list<TCounterPtr> counters,
+ std::vector<NApi::IClientPtr> clients,
+ std::vector<TCounterPtr> counters,
TDuration banDuration = SleepQuantum * 5)
{
- THedgingClientOptions options;
- options.BanPenalty = SleepQuantum * 2;
- options.BanDuration = banDuration;
- std::initializer_list<TDuration> initialPenalties = {TDuration::Zero(), SleepQuantum};
-
- for (auto [client, initialPenalty, counter] : Zip(clients, initialPenalties, counters)) {
- options.Clients.emplace_back(client, initialPenalty, counter);
- }
- return CreateHedgingClient(options);
+ return NTest::CreateTestHedgingClient(
+ clients,
+ counters,
+ {TDuration::Zero(), SleepQuantum},
+ CreateDummyPenaltyProvider(),
+ SleepQuantum * 2,
+ banDuration);
}
} // namespace
diff --git a/yt/yt/client/hedging/unittests/hedging_ut.cpp b/yt/yt/client/hedging/unittests/hedging_ut.cpp
index 3d3d28cda7..e2e49d46b2 100644
--- a/yt/yt/client/hedging/unittests/hedging_ut.cpp
+++ b/yt/yt/client/hedging/unittests/hedging_ut.cpp
@@ -1,3 +1,5 @@
+#include "helper.h"
+
#include <yt/yt/client/hedging/cache.h>
#include <yt/yt/client/hedging/counter.h>
#include <yt/yt/client/hedging/hedging.h>
@@ -41,19 +43,17 @@ public:
NApi::IClientPtr CreateTestHedgingClient(
TDuration banPenalty,
TDuration banDuration,
- std::initializer_list<NApi::IClientPtr> clients,
- std::initializer_list<TDuration> initialPenalties = {TDuration::Zero(), SleepQuantum},
+ std::vector<NApi::IClientPtr> clients,
+ std::vector<TDuration> initialPenalties = {TDuration::Zero(), SleepQuantum},
const IPenaltyProviderPtr& penaltyProvider = CreateDummyPenaltyProvider())
{
- THedgingClientOptions options;
- options.BanPenalty = banPenalty;
- options.BanDuration = banDuration;
- size_t clientId = 0;
- for (auto [client, initialPenalty] : Zip(clients, initialPenalties)) {
- auto currCliendId = "seneca-" + ToString(++clientId);
- options.Clients.emplace_back(client, currCliendId, initialPenalty, New<TCounter>(currCliendId));
+ std::vector<TCounterPtr> counters;
+ counters.reserve(clients.size());
+ for (int i = 0; i != std::ssize(clients); ++i) {
+ counters.push_back(New<TCounter>(ToString(i)));
}
- return CreateHedgingClient(options, penaltyProvider);
+ return NTest::CreateTestHedgingClient(
+ clients, counters, initialPenalties, penaltyProvider, banPenalty, banDuration);
}
IPenaltyProviderPtr CreateReplicationLagPenaltyProvider(
@@ -86,7 +86,7 @@ TEST(THedgingClientTest, GetResultFromClientWithMinEffectivePenalty)
{
NYPath::TYPath path = "/test/1234";
- NYson::TYsonString clientResult1(TStringBuf("FirstClientData"));
+ NYson::TYsonString clientResult1("FirstClientData"_sb);
auto mockClient1 = New<TStrictMockClient>();
auto mockClient2 = New<TStrictMockClient>();
@@ -102,7 +102,7 @@ TEST(THedgingClientTest, GetResultFromClientWithMinEffectivePenalty)
auto queryResult = NConcurrency::WaitFor(hedgingClient->ListNode(path));
// Check that query result is from first client, because it's effective initial penalty is minimal.
- EXPECT_TRUE(queryResult.IsOK());
+ ASSERT_TRUE(queryResult.IsOK());
EXPECT_EQ(queryResult.Value().AsStringBuf(), clientResult1.AsStringBuf());
}
@@ -110,8 +110,8 @@ TEST(THedgingClientTest, GetclientResult2WhenFirstClientHasFailed)
{
NYPath::TYPath path = "/test/1234";
- NYson::TYsonString clientResult1(TStringBuf("FirstClientData"));
- NYson::TYsonString clientResult2(TStringBuf("SecondClientData"));
+ NYson::TYsonString clientResult1("FirstClientData"_sb);
+ NYson::TYsonString clientResult2("SecondClientData"_sb);
auto mockClient1 = New<TStrictMockClient>();
auto mockClient2 = New<TStrictMockClient>();
@@ -128,7 +128,7 @@ TEST(THedgingClientTest, GetclientResult2WhenFirstClientHasFailed)
auto queryResult = NConcurrency::WaitFor(client->ListNode(path));
// Check that query result is from second client, because first client returned failure and got banned.
- EXPECT_TRUE(queryResult.IsOK());
+ ASSERT_TRUE(queryResult.IsOK());
EXPECT_EQ(queryResult.Value().AsStringBuf(), clientResult2.AsStringBuf());
}
@@ -136,8 +136,8 @@ TEST(THedgingClientTest, GetclientResult1AfterBanTimeHasElapsed)
{
NYPath::TYPath path = "/test/1234";
- NYson::TYsonString clientResult1(TStringBuf("FirstClientData"));
- NYson::TYsonString clientResult2(TStringBuf("SecondClientData"));
+ NYson::TYsonString clientResult1("FirstClientData"_sb);
+ NYson::TYsonString clientResult2("SecondClientData"_sb);
auto mockClient1 = New<TStrictMockClient>();
auto mockClient2 = New<TStrictMockClient>();
@@ -156,14 +156,14 @@ TEST(THedgingClientTest, GetclientResult1AfterBanTimeHasElapsed)
auto queryResult1 = NConcurrency::WaitFor(hedgingClient->ListNode(path));
// Check that first query result is from second client, because first client returned failure and got banned.
- EXPECT_TRUE(queryResult1.IsOK());
+ ASSERT_TRUE(queryResult1.IsOK());
EXPECT_EQ(queryResult1.Value().AsStringBuf(), clientResult2.AsStringBuf());
NConcurrency::TDelayedExecutor::WaitForDuration(banDuration);
auto queryResult2 = NConcurrency::WaitFor(hedgingClient->ListNode(path));
// Check that second query result is from first client, because ban time has elapsed and it's effective initial penalty is minimal again.
- EXPECT_TRUE(queryResult2.IsOK());
+ ASSERT_TRUE(queryResult2.IsOK());
EXPECT_EQ(queryResult2.Value().AsStringBuf(), clientResult1.AsStringBuf());
}
@@ -171,8 +171,8 @@ TEST(THedgingClientTest, GetclientResult2WhenFirstClientIsBanned)
{
NYPath::TYPath path = "/test/1234";
- NYson::TYsonString clientResult1(TStringBuf("FirstClientData"));
- NYson::TYsonString clientResult2(TStringBuf("SecondClientData"));
+ NYson::TYsonString clientResult1("FirstClientData"_sb);
+ NYson::TYsonString clientResult2("SecondClientData"_sb);
auto mockClient1 = New<TStrictMockClient>();
auto mockClient2 = New<TStrictMockClient>();
@@ -189,12 +189,12 @@ TEST(THedgingClientTest, GetclientResult2WhenFirstClientIsBanned)
auto queryResult1 = NConcurrency::WaitFor(hedgingClient->ListNode(path));
// Check that first query result is from second client, because first client returned failure and got banned.
- EXPECT_TRUE(queryResult1.IsOK());
+ ASSERT_TRUE(queryResult1.IsOK());
EXPECT_EQ(queryResult1.Value().AsStringBuf(), clientResult2.AsStringBuf());
auto queryResult2 = NConcurrency::WaitFor(hedgingClient->ListNode(path));
// Check that second query result is from second client, because first client is still banned.
- EXPECT_TRUE(queryResult2.IsOK());
+ ASSERT_TRUE(queryResult2.IsOK());
EXPECT_EQ(queryResult2.Value().AsStringBuf(), clientResult2.AsStringBuf());
}
@@ -202,8 +202,8 @@ TEST(THedgingClientTest, GetclientResult2WhenFirstClientIsSleeping)
{
NYPath::TYPath path = "/test/1234";
- NYson::TYsonString clientResult1(TStringBuf("FirstClientData"));
- NYson::TYsonString clientResult2(TStringBuf("SecondClientData"));
+ NYson::TYsonString clientResult1("FirstClientData"_sb);
+ NYson::TYsonString clientResult2("SecondClientData"_sb);
auto mockClient1 = New<TStrictMockClient>();
auto mockClient2 = New<TStrictMockClient>();
@@ -220,7 +220,7 @@ TEST(THedgingClientTest, GetclientResult2WhenFirstClientIsSleeping)
auto queryResult = NConcurrency::WaitFor(hedgingClient->ListNode(path));
// Check that query result is from second client, because first client is sleeping.
- EXPECT_TRUE(queryResult.IsOK());
+ ASSERT_TRUE(queryResult.IsOK());
EXPECT_EQ(queryResult.Value().AsStringBuf(), clientResult2.AsStringBuf());
}
@@ -228,8 +228,8 @@ TEST(THedgingClientTest, FirstClientIsBannedBecauseResponseWasCancelled)
{
NYPath::TYPath path = "/test/1234";
- NYson::TYsonString clientResult1(TStringBuf("FirstClientData"));
- NYson::TYsonString clientResult2(TStringBuf("SecondClientData"));
+ NYson::TYsonString clientResult1("FirstClientData"_sb);
+ NYson::TYsonString clientResult2("SecondClientData"_sb);
auto mockClient1 = New<TStrictMockClient>();
auto mockClient2 = New<TStrictMockClient>();
@@ -247,7 +247,7 @@ TEST(THedgingClientTest, FirstClientIsBannedBecauseResponseWasCancelled)
auto queryResult1 = NConcurrency::WaitFor(client->ListNode(path));
// Check that query result is from second client, because first client is sleeping.
- EXPECT_TRUE(queryResult1.IsOK());
+ ASSERT_TRUE(queryResult1.IsOK());
EXPECT_EQ(queryResult1.Value().AsStringBuf(), clientResult2.AsStringBuf());
// Wait for finish of all requests
@@ -255,7 +255,7 @@ TEST(THedgingClientTest, FirstClientIsBannedBecauseResponseWasCancelled)
auto queryResult2 = NConcurrency::WaitFor(client->ListNode(path));
// Check that second query result is from second client, because first client was cancelled and got banned.
- EXPECT_TRUE(queryResult2.IsOK());
+ ASSERT_TRUE(queryResult2.IsOK());
EXPECT_EQ(queryResult2.Value().AsStringBuf(), clientResult2.AsStringBuf());
}
@@ -263,9 +263,9 @@ TEST(THedgingClientTest, AmnestyBanPenaltyIfClientSucceeded)
{
NYPath::TYPath path = "/test/1234";
- NYson::TYsonString clientResult1(TStringBuf("FirstClientData"));
- NYson::TYsonString clientResult2(TStringBuf("SecondClientData"));
- NYson::TYsonString thirdClientResult(TStringBuf("ThirdClientData"));
+ NYson::TYsonString clientResult1("FirstClientData"_sb);
+ NYson::TYsonString clientResult2("SecondClientData"_sb);
+ NYson::TYsonString thirdClientResult("ThirdClientData"_sb);
auto mockClient1 = New<TStrictMockClient>();
auto mockClient2 = New<TStrictMockClient>();
@@ -289,7 +289,7 @@ TEST(THedgingClientTest, AmnestyBanPenaltyIfClientSucceeded)
auto queryResult1 = NConcurrency::WaitFor(client->ListNode(path));
// Check that query result is from second client, because first client finished with an error.
- EXPECT_TRUE(queryResult1.IsOK());
+ ASSERT_TRUE(queryResult1.IsOK());
EXPECT_EQ(queryResult1.Value().AsStringBuf(), clientResult2.AsStringBuf());
// Wait for finish of all requests
@@ -297,7 +297,7 @@ TEST(THedgingClientTest, AmnestyBanPenaltyIfClientSucceeded)
auto queryResult2 = NConcurrency::WaitFor(client->ListNode(path));
// Check that second query result is from first client, because other clients were sleeping.
- EXPECT_TRUE(queryResult2.IsOK());
+ ASSERT_TRUE(queryResult2.IsOK());
EXPECT_EQ(queryResult2.Value().AsStringBuf(), clientResult1.AsStringBuf());
// Wait for finish of all requests
@@ -305,7 +305,7 @@ TEST(THedgingClientTest, AmnestyBanPenaltyIfClientSucceeded)
auto queryResult3 = NConcurrency::WaitFor(client->ListNode(path));
// Check that third query result is from first client again, because it's penalty was amnestied.
- EXPECT_TRUE(queryResult3.IsOK());
+ ASSERT_TRUE(queryResult3.IsOK());
EXPECT_EQ(queryResult3.Value().AsStringBuf(), clientResult1.AsStringBuf());
}
@@ -315,8 +315,8 @@ TEST(THedgingClientTest, MultiThread)
auto mockClient1 = New<TStrictMockClient>();
auto mockClient2 = New<TStrictMockClient>();
- NYson::TYsonString clientResult1(TStringBuf("FirstClientData"));
- NYson::TYsonString clientResult2(TStringBuf("SecondClientData"));
+ NYson::TYsonString clientResult1("FirstClientData"_sb);
+ NYson::TYsonString clientResult2("SecondClientData"_sb);
EXPECT_CALL(*mockClient1, ListNode(path, _)).WillRepeatedly([=] (const NYPath::TYPath&, const NApi::TListNodeOptions& options) {
if (options.Timeout) {
@@ -365,8 +365,8 @@ TEST(THedgingClientTest, ResponseFromSecondClientWhenFirstHasReplicationLag)
auto mockClient1 = New<TStrictMockClient>();
auto mockClient2 = New<TStrictMockClient>();
- NYson::TYsonString clientResult1(TStringBuf("FirstClientData"));
- NYson::TYsonString clientResult2(TStringBuf("SecondClientData"));
+ NYson::TYsonString clientResult1("FirstClientData"_sb);
+ NYson::TYsonString clientResult2("SecondClientData"_sb);
EXPECT_CALL(*mockClient1, ListNode(path, _))
.WillRepeatedly(Return(MakeFuture(clientResult1)));
@@ -380,15 +380,14 @@ TEST(THedgingClientTest, ResponseFromSecondClientWhenFirstHasReplicationLag)
auto queryResult = NConcurrency::WaitFor(hedgingClient->ListNode(path));
// Check that query result is from first client, because it's effective initial penalty is minimal.
- EXPECT_TRUE(queryResult.IsOK());
+ ASSERT_TRUE(queryResult.IsOK());
EXPECT_EQ(queryResult.Value().AsStringBuf(), clientResult1.AsStringBuf());
- TString cluster = "seneca-1";
auto maxTabletLag = TDuration::Seconds(10);
auto lagPenalty = 2 * SleepQuantum;
- NYson::TYsonString replicasResult(TStringBuf("{\"575f-131-40502c5-201b420f\" = {\"cluster_name\" = \"seneca-1\"}}"));
- NYson::TYsonString tabletCountResult(TStringBuf("1"));
+ NYson::TYsonString replicasResult("{\"575f-131-40502c5-201b420f\" = {\"cluster_name\" = \"cluster-0\"}}"_sb);
+ NYson::TYsonString tabletCountResult("1"_sb);
auto mockClient3 = New<TStrictMockClient>();
@@ -410,7 +409,7 @@ TEST(THedgingClientTest, ResponseFromSecondClientWhenFirstHasReplicationLag)
auto penaltyProvier = CreateReplicationLagPenaltyProvider(
path,
- cluster,
+ "cluster-0",
maxTabletLag,
lagPenalty,
mockClient3);
@@ -426,7 +425,7 @@ TEST(THedgingClientTest, ResponseFromSecondClientWhenFirstHasReplicationLag)
auto queryResultWithReplicationLagPolicy = NConcurrency::WaitFor(hedgingClientWithPenaltyProvider->ListNode(path));
// Check that query result is from second client, because first client received penalty updater because of replication lag.
- EXPECT_TRUE(queryResultWithReplicationLagPolicy.IsOK());
+ ASSERT_TRUE(queryResultWithReplicationLagPolicy.IsOK());
EXPECT_EQ(queryResultWithReplicationLagPolicy.Value().AsStringBuf(), clientResult2.AsStringBuf());
}
@@ -434,7 +433,7 @@ TEST(THedgingClientTest, CreatingHedgingClientWithPreinitializedClients)
{
const TString clusterName = "test_cluster";
NYPath::TYPath path = "/test/1234";
- NYson::TYsonString clientResult(TStringBuf("ClientData"));
+ NYson::TYsonString clientResult("ClientData"_sb);
auto mockClient = New<TStrictMockClient>();
EXPECT_CALL(*mockClient, ListNode(path, _))
@@ -454,7 +453,7 @@ TEST(THedgingClientTest, CreatingHedgingClientWithPreinitializedClients)
auto queryResult = NConcurrency::WaitFor(hedgingClient->ListNode(path));
// Check that query result is from preinitialized client.
- EXPECT_TRUE(queryResult.IsOK());
+ ASSERT_TRUE(queryResult.IsOK());
EXPECT_EQ(queryResult.Value().AsStringBuf(), clientResult.AsStringBuf());
}
@@ -464,20 +463,19 @@ TEST(THedgingClientTest, ResponseFromFirstClientWhenReplicationLagUpdaterFails)
auto mockClient1 = New<TStrictMockClient>();
auto mockClient2 = New<TStrictMockClient>();
- NYson::TYsonString clientResult1(TStringBuf("FirstClientData"));
- NYson::TYsonString clientResult2(TStringBuf("SecondClientData"));
+ NYson::TYsonString clientResult1("FirstClientData"_sb);
+ NYson::TYsonString clientResult2("SecondClientData"_sb);
EXPECT_CALL(*mockClient1, ListNode(path, _))
.WillRepeatedly(Return(MakeFuture(clientResult1)));
EXPECT_CALL(*mockClient2, ListNode(path, _))
.WillRepeatedly(Return(MakeFuture(clientResult2)));
- TString cluster = "seneca-1";
auto maxTabletLag = TDuration::Seconds(10);
auto lagPenalty = 2 * SleepQuantum;
- NYson::TYsonString replicasResult(TStringBuf("{\"575f-131-40502c5-201b420f\" = {\"cluster_name\" = \"seneca-1\"}}"));
- NYson::TYsonString tabletCountResult(TStringBuf("1"));
+ NYson::TYsonString replicasResult("{\"575f-131-40502c5-201b420f\" = {\"cluster_name\" = \"cluster-0\"}}"_sb);
+ NYson::TYsonString tabletCountResult("1"_sb);
auto mockClient3 = New<TStrictMockClient>();
@@ -499,7 +497,7 @@ TEST(THedgingClientTest, ResponseFromFirstClientWhenReplicationLagUpdaterFails)
auto penaltyProvider = CreateReplicationLagPenaltyProvider(
path,
- cluster,
+ "cluster-0",
maxTabletLag,
lagPenalty,
mockClient3,
@@ -516,14 +514,14 @@ TEST(THedgingClientTest, ResponseFromFirstClientWhenReplicationLagUpdaterFails)
auto queryResultWithReplicationLagPolicy = NConcurrency::WaitFor(hedgingClientWithPenaltyProvider->ListNode(path));
// Check that query result is from second client, because first client received penalty because of replication lag.
- EXPECT_TRUE(queryResultWithReplicationLagPolicy.IsOK());
+ ASSERT_TRUE(queryResultWithReplicationLagPolicy.IsOK());
EXPECT_EQ(queryResultWithReplicationLagPolicy.Value().AsStringBuf(), clientResult2.AsStringBuf());
Sleep(2 * CheckPeriod);
auto queryResultWithCleanedPenalty = NConcurrency::WaitFor(hedgingClientWithPenaltyProvider->ListNode(path));
// Check that query result is from first client, because replication lag was cleaned.
- EXPECT_TRUE(queryResultWithCleanedPenalty.IsOK());
+ ASSERT_TRUE(queryResultWithCleanedPenalty.IsOK());
EXPECT_EQ(queryResultWithCleanedPenalty.Value().AsStringBuf(), clientResult1.AsStringBuf());
}
diff --git a/yt/yt/client/hedging/unittests/helper.cpp b/yt/yt/client/hedging/unittests/helper.cpp
new file mode 100644
index 0000000000..b7b1bc3f33
--- /dev/null
+++ b/yt/yt/client/hedging/unittests/helper.cpp
@@ -0,0 +1,37 @@
+#include "helper.h"
+
+#include <yt/yt/client/hedging/counter.h>
+#include <yt/yt/client/hedging/hedging.h>
+#include <yt/yt/client/hedging/hedging_executor.h>
+
+#include <vector>
+
+namespace NYT::NClient::NHedging::NRpc::NTest {
+
+NApi::IClientPtr CreateTestHedgingClient(
+ std::vector<NApi::IClientPtr> clients,
+ std::vector<TCounterPtr> counters,
+ std::vector<TDuration> initialPenalties,
+ const IPenaltyProviderPtr& penaltyProvider,
+ TDuration banPenalty,
+ TDuration banDuration)
+{
+ YT_VERIFY(clients.size() == counters.size());
+ YT_VERIFY(clients.size() == initialPenalties.size());
+
+ std::vector<THedgingExecutor::TNode> executorNodes;
+ executorNodes.reserve(clients.size());
+ for (int i = 0; i != std::ssize(clients); ++i) {
+ executorNodes.push_back({
+ .Client = clients[i],
+ .Counter = counters[i],
+ .ClusterName = Format("cluster-%v", i),
+ .InitialPenalty = initialPenalties[i],
+ });
+ }
+ return CreateHedgingClient(
+ New<THedgingExecutor>(executorNodes, banPenalty, banDuration, penaltyProvider));
+}
+
+
+} // NYT::NClient::NHedging::NTest
diff --git a/yt/yt/client/hedging/unittests/helper.h b/yt/yt/client/hedging/unittests/helper.h
new file mode 100644
index 0000000000..8e7003041a
--- /dev/null
+++ b/yt/yt/client/hedging/unittests/helper.h
@@ -0,0 +1,19 @@
+#pragma once
+
+#include <yt/yt/client/hedging/public.h>
+
+#include <util/datetime/base.h>
+
+#include <vector>
+
+namespace NYT::NClient::NHedging::NRpc::NTest {
+
+NApi::IClientPtr CreateTestHedgingClient(
+ std::vector<NApi::IClientPtr> clients,
+ std::vector<TCounterPtr> counters,
+ std::vector<TDuration> initialPenalties,
+ const IPenaltyProviderPtr& penaltyProvider,
+ TDuration banPenalty,
+ TDuration banDuration);
+
+} // NYT::NClient::NHedging::NTest
diff --git a/yt/yt/client/hedging/unittests/hook.cpp b/yt/yt/client/hedging/unittests/hook.cpp
deleted file mode 100644
index d7c527999b..0000000000
--- a/yt/yt/client/hedging/unittests/hook.cpp
+++ /dev/null
@@ -1,20 +0,0 @@
-#include <yt/yt/core/logging/log_manager.h>
-
-#include <yt/yt/core/misc/shutdown.h>
-
-#include <library/cpp/testing/hook/hook.h>
-
-Y_TEST_HOOK_BEFORE_RUN(TEST_YT_SETUP)
-{
- NYT::NLogging::TLogManager::Get()->ConfigureFromEnv();
-}
-
-Y_TEST_HOOK_AFTER_RUN(TEST_YT_TEARDOWN)
-{
- NYT::Shutdown();
-#ifdef _asan_enabled_
- // Wait for some time to ensure background cleanup is somewhat complete.
- Sleep(TDuration::Seconds(1));
- NYT::TRefCountedTrackerFacade::Dump();
-#endif
-}
diff --git a/yt/yt/client/hedging/unittests/penalty_provider_ut.cpp b/yt/yt/client/hedging/unittests/penalty_provider_ut.cpp
index db1b90a01b..9a0e6a21d2 100644
--- a/yt/yt/client/hedging/unittests/penalty_provider_ut.cpp
+++ b/yt/yt/client/hedging/unittests/penalty_provider_ut.cpp
@@ -75,7 +75,7 @@ TEST(TLagPenaltyProviderTest, UpdateExternalPenaltyWhenReplicaHasLag)
auto PenaltyProviderPtr = CreateReplicationLagPenaltyProvider(config, client);
Sleep(2 * CheckPeriod);
- EXPECT_EQ(PenaltyProviderPtr->Get(cluster), NProfiling::DurationToCpuDuration(TDuration::MilliSeconds(config.GetLagPenalty())));
+ EXPECT_EQ(PenaltyProviderPtr->Get(cluster), TDuration::MilliSeconds(config.GetLagPenalty()));
}
TEST(TLagPenaltyProviderTest, DoNotUpdatePenaltyWhenReplicaHasNoLag)
@@ -108,7 +108,7 @@ TEST(TLagPenaltyProviderTest, DoNotUpdatePenaltyWhenReplicaHasNoLag)
auto PenaltyProviderPtr = CreateReplicationLagPenaltyProvider(config, client);
Sleep(2 * CheckPeriod);
- EXPECT_EQ(PenaltyProviderPtr->Get(cluster), 0);
+ EXPECT_EQ(PenaltyProviderPtr->Get(cluster), TDuration::Zero());
}
TEST(TLagPenaltyProviderTest, DoNotUpdatePenaltyWhenGetReplicaIdFailed)
@@ -126,7 +126,7 @@ TEST(TLagPenaltyProviderTest, DoNotUpdatePenaltyWhenGetReplicaIdFailed)
auto PenaltyProviderPtr = CreateReplicationLagPenaltyProvider(config, client);
Sleep(2 * CheckPeriod);
- EXPECT_EQ(PenaltyProviderPtr->Get(cluster), 0);
+ EXPECT_EQ(PenaltyProviderPtr->Get(cluster), TDuration::Zero());
}
TEST(TLagPenaltyProviderTest, DoNotUpdatePenaltyWhenGetTabletsCountFailed)
@@ -149,7 +149,7 @@ TEST(TLagPenaltyProviderTest, DoNotUpdatePenaltyWhenGetTabletsCountFailed)
auto PenaltyProviderPtr = CreateReplicationLagPenaltyProvider(config, client);
Sleep(2 * CheckPeriod);
- EXPECT_EQ(PenaltyProviderPtr->Get(cluster), 0);
+ EXPECT_EQ(PenaltyProviderPtr->Get(cluster), TDuration::Zero());
}
TEST(TLagPenaltyProviderTest, DoNotUpdatePenaltyWhenGetTabletsInfoFailed)
@@ -177,7 +177,7 @@ TEST(TLagPenaltyProviderTest, DoNotUpdatePenaltyWhenGetTabletsInfoFailed)
auto PenaltyProviderPtr = CreateReplicationLagPenaltyProvider(config, client);
Sleep(2 * CheckPeriod);
- EXPECT_EQ(PenaltyProviderPtr->Get(cluster), 0);
+ EXPECT_EQ(PenaltyProviderPtr->Get(cluster), TDuration::Zero());
}
TEST(TLagPenaltyProviderTest, ClearPenaltiesAfterError)
@@ -213,10 +213,10 @@ TEST(TLagPenaltyProviderTest, ClearPenaltiesAfterError)
auto PenaltyProviderPtr = CreateReplicationLagPenaltyProvider(config, client);
Sleep(CheckPeriod);
- EXPECT_EQ(PenaltyProviderPtr->Get(cluster), NProfiling::DurationToCpuDuration(TDuration::MilliSeconds(config.GetLagPenalty())));
+ EXPECT_EQ(PenaltyProviderPtr->Get(cluster), TDuration::MilliSeconds(config.GetLagPenalty()));
Sleep(2 * CheckPeriod);
- EXPECT_EQ(PenaltyProviderPtr->Get(cluster), 0);
+ EXPECT_EQ(PenaltyProviderPtr->Get(cluster), TDuration::Zero());
}
////////////////////////////////////////////////////////////////////////////////
diff --git a/yt/yt/client/hedging/unittests/ya.make b/yt/yt/client/hedging/unittests/ya.make
index 5e85f5f24e..f37e50216e 100644
--- a/yt/yt/client/hedging/unittests/ya.make
+++ b/yt/yt/client/hedging/unittests/ya.make
@@ -5,11 +5,11 @@ INCLUDE(${ARCADIA_ROOT}/yt/ya_cpp.make.inc)
EXPLICIT_DATA()
SRCS(
+ helper.cpp
+
counters_ut.cpp
hedging_ut.cpp
penalty_provider_ut.cpp
-
- GLOBAL hook.cpp
)
INCLUDE(${ARCADIA_ROOT}/yt/opensource.inc)
@@ -17,10 +17,10 @@ INCLUDE(${ARCADIA_ROOT}/yt/opensource.inc)
PEERDIR(
library/cpp/iterator
library/cpp/testing/common
- library/cpp/testing/hook
yt/yt/client/hedging
yt/yt/client/unittests/mock
yt/yt/core
+ yt/yt/core/test_framework
yt/yt/library/profiling/solomon
)
diff --git a/yt/yt/client/hedging/ya.make b/yt/yt/client/hedging/ya.make
index 3dbe8af3d5..ec442d3f47 100644
--- a/yt/yt/client/hedging/ya.make
+++ b/yt/yt/client/hedging/ya.make
@@ -7,7 +7,7 @@ SRCS(
counter.cpp
hedging.cpp
hedging_executor.cpp
- logger.cpp
+ private.cpp
penalty_provider.cpp
)
@@ -17,8 +17,6 @@ PEERDIR(
yt/yt/client/cache
yt/yt/library/profiling
yt/yt_proto/yt/client/hedging
-
- library/cpp/iterator
)
END()