diff options
author | robot-piglet <robot-piglet@yandex-team.com> | 2024-03-05 08:51:15 +0300 |
---|---|---|
committer | robot-piglet <robot-piglet@yandex-team.com> | 2024-03-05 09:00:32 +0300 |
commit | 18c72bb588bd1bf332582b98058548f02e183e5d (patch) | |
tree | 21d752e5113b9d58276698044f3c53a6e3a636b5 | |
parent | 74819c4157bd388a7d429c870ea4b343a282dafa (diff) | |
download | ydb-18c72bb588bd1bf332582b98058548f02e183e5d.tar.gz |
Intermediate changes
31 files changed, 277 insertions, 91 deletions
diff --git a/contrib/python/zope.interface/py3/.dist-info/METADATA b/contrib/python/zope.interface/py3/.dist-info/METADATA index f0bfe5f585..551c582ac8 100644 --- a/contrib/python/zope.interface/py3/.dist-info/METADATA +++ b/contrib/python/zope.interface/py3/.dist-info/METADATA @@ -1,6 +1,6 @@ Metadata-Version: 2.1 Name: zope.interface -Version: 6.1 +Version: 6.2 Summary: Interfaces for Python Home-page: https://github.com/zopefoundation/zope.interface Author: Zope Foundation and Contributors @@ -75,6 +75,15 @@ For detailed documentation, please see https://zopeinterface.readthedocs.io/en/l Changes ========= +6.2 (2024-02-16) +================ + +- Add preliminary support for Python 3.13 as of 3.13a3. + +- Add support to use the pipe (``|``) syntax for ``typing.Union``. + (`#280 <https://github.com/zopefoundation/zope.interface/issues/280>`_) + + 6.1 (2023-10-05) ================ diff --git a/contrib/python/zope.interface/py3/ya.make b/contrib/python/zope.interface/py3/ya.make index f11d0d940c..514772e2ba 100644 --- a/contrib/python/zope.interface/py3/ya.make +++ b/contrib/python/zope.interface/py3/ya.make @@ -2,7 +2,7 @@ PY3_LIBRARY() -VERSION(6.1) +VERSION(6.2) LICENSE(ZPL-2.1) diff --git a/contrib/python/zope.interface/py3/zope/interface/__init__.py b/contrib/python/zope.interface/py3/zope/interface/__init__.py index 17a272f1da..8be812dd6c 100644 --- a/contrib/python/zope.interface/py3/zope/interface/__init__.py +++ b/contrib/python/zope.interface/py3/zope/interface/__init__.py @@ -53,11 +53,14 @@ __docformat__ = 'restructuredtext' from zope.interface.interface import Interface from zope.interface.interface import _wire + # Need to actually get the interface elements to implement the right interfaces _wire() del _wire from zope.interface.declarations import Declaration +# The following are to make spec pickles cleaner +from zope.interface.declarations import Provides from zope.interface.declarations import alsoProvides from zope.interface.declarations import classImplements from zope.interface.declarations import classImplementsFirst @@ -72,20 +75,14 @@ from zope.interface.declarations import named from zope.interface.declarations import noLongerProvides from zope.interface.declarations import providedBy from zope.interface.declarations import provider - from zope.interface.exceptions import Invalid - from zope.interface.interface import Attribute from zope.interface.interface import interfacemethod from zope.interface.interface import invariant from zope.interface.interface import taggedValue - -# The following are to make spec pickles cleaner -from zope.interface.declarations import Provides - - from zope.interface.interfaces import IInterfaceDeclaration + moduleProvides(IInterfaceDeclaration) __all__ = ('Interface', 'Attribute') + tuple(IInterfaceDeclaration) diff --git a/contrib/python/zope.interface/py3/zope/interface/_flatten.py b/contrib/python/zope.interface/py3/zope/interface/_flatten.py index a80c2de49a..68b490db1c 100644 --- a/contrib/python/zope.interface/py3/zope/interface/_flatten.py +++ b/contrib/python/zope.interface/py3/zope/interface/_flatten.py @@ -17,6 +17,7 @@ See Adapter class. """ from zope.interface import Declaration + def _flatten(implements, include_None=0): try: diff --git a/contrib/python/zope.interface/py3/zope/interface/_zope_interface_coptimizations.c b/contrib/python/zope.interface/py3/zope/interface/_zope_interface_coptimizations.c index 91899283c0..2cf453a7e6 100644 --- a/contrib/python/zope.interface/py3/zope/interface/_zope_interface_coptimizations.c +++ b/contrib/python/zope.interface/py3/zope/interface/_zope_interface_coptimizations.c @@ -1977,7 +1977,6 @@ static struct PyMethodDef m_methods[] = { {NULL, (PyCFunction)NULL, 0, NULL} /* sentinel */ }; -#if PY_MAJOR_VERSION >= 3 static char module_doc[] = "C optimizations for zope.interface\n\n"; static struct PyModuleDef _zic_module = { @@ -1991,20 +1990,14 @@ static struct PyModuleDef _zic_module = { NULL, NULL }; -#endif static PyObject * init(void) { PyObject *m; -#if PY_MAJOR_VERSION < 3 -#define DEFINE_STRING(S) \ - if(! (str ## S = PyString_FromString(# S))) return NULL -#else #define DEFINE_STRING(S) \ if(! (str ## S = PyUnicode_FromString(# S))) return NULL -#endif DEFINE_STRING(__dict__); DEFINE_STRING(__implemented__); @@ -2054,13 +2047,7 @@ init(void) if (PyType_Ready(&VerifyingBase) < 0) return NULL; - #if PY_MAJOR_VERSION < 3 - /* Create the module and add the functions */ - m = Py_InitModule3("_zope_interface_coptimizations", m_methods, - "C optimizations for zope.interface\n\n"); - #else m = PyModule_Create(&_zic_module); - #endif if (m == NULL) return NULL; @@ -2084,17 +2071,10 @@ init(void) } PyMODINIT_FUNC -#if PY_MAJOR_VERSION < 3 -init_zope_interface_coptimizations(void) -{ - init(); -} -#else PyInit__zope_interface_coptimizations(void) { return init(); } -#endif #ifdef __clang__ #pragma clang diagnostic pop diff --git a/contrib/python/zope.interface/py3/zope/interface/adapter.py b/contrib/python/zope.interface/py3/zope/interface/adapter.py index dbff0d19da..b02f19d562 100644 --- a/contrib/python/zope.interface/py3/zope/interface/adapter.py +++ b/contrib/python/zope.interface/py3/zope/interface/adapter.py @@ -16,14 +16,14 @@ import itertools import weakref +from zope.interface import Interface from zope.interface import implementer from zope.interface import providedBy -from zope.interface import Interface from zope.interface import ro -from zope.interface.interfaces import IAdapterRegistry - from zope.interface._compat import _normalize_name from zope.interface._compat import _use_c_impl +from zope.interface.interfaces import IAdapterRegistry + __all__ = [ 'AdapterRegistry', diff --git a/contrib/python/zope.interface/py3/zope/interface/advice.py b/contrib/python/zope.interface/py3/zope/interface/advice.py index 54e356e672..5a8e377737 100644 --- a/contrib/python/zope.interface/py3/zope/interface/advice.py +++ b/contrib/python/zope.interface/py3/zope/interface/advice.py @@ -27,6 +27,7 @@ Visit the PEAK home page at http://peak.telecommunity.com for more information. from types import FunctionType + __all__ = [ 'determineMetaclass', 'getFrameInfo', @@ -36,6 +37,7 @@ __all__ = [ import sys + def getFrameInfo(frame): """Return (kind,module,locals,globals) for a frame diff --git a/contrib/python/zope.interface/py3/zope/interface/common/__init__.py b/contrib/python/zope.interface/py3/zope/interface/common/__init__.py index 56f4566a24..f308f9edd6 100644 --- a/contrib/python/zope.interface/py3/zope/interface/common/__init__.py +++ b/contrib/python/zope.interface/py3/zope/interface/common/__init__.py @@ -13,11 +13,12 @@ import itertools from types import FunctionType -from zope.interface import classImplements from zope.interface import Interface -from zope.interface.interface import fromFunction +from zope.interface import classImplements from zope.interface.interface import InterfaceClass from zope.interface.interface import _decorator_non_return +from zope.interface.interface import fromFunction + __all__ = [ # Nothing public here. diff --git a/contrib/python/zope.interface/py3/zope/interface/common/builtins.py b/contrib/python/zope.interface/py3/zope/interface/common/builtins.py index 17090e4a79..6e13e06482 100644 --- a/contrib/python/zope.interface/py3/zope/interface/common/builtins.py +++ b/contrib/python/zope.interface/py3/zope/interface/common/builtins.py @@ -19,10 +19,10 @@ that they implement the appropriate interface. """ from zope.interface import classImplements - from zope.interface.common import collections -from zope.interface.common import numbers from zope.interface.common import io +from zope.interface.common import numbers + __all__ = [ 'IList', diff --git a/contrib/python/zope.interface/py3/zope/interface/common/collections.py b/contrib/python/zope.interface/py3/zope/interface/common/collections.py index c549028268..3c751c05c8 100644 --- a/contrib/python/zope.interface/py3/zope/interface/common/collections.py +++ b/contrib/python/zope.interface/py3/zope/interface/common/collections.py @@ -31,17 +31,17 @@ is, ``verifyObject(ISequence, list()))`` will pass, for example), a few might no """ import sys - from abc import ABCMeta -from collections import abc from collections import OrderedDict -from collections import UserList from collections import UserDict +from collections import UserList from collections import UserString +from collections import abc from zope.interface.common import ABCInterface from zope.interface.common import optional + # pylint:disable=inherit-non-class, # pylint:disable=no-self-argument,no-method-argument # pylint:disable=unexpected-special-method-signature diff --git a/contrib/python/zope.interface/py3/zope/interface/common/idatetime.py b/contrib/python/zope.interface/py3/zope/interface/common/idatetime.py index 82f0059c85..aeda07aa0e 100644 --- a/contrib/python/zope.interface/py3/zope/interface/common/idatetime.py +++ b/contrib/python/zope.interface/py3/zope/interface/common/idatetime.py @@ -14,9 +14,14 @@ This module is called idatetime because if it were called datetime the import of the real datetime would fail. """ -from datetime import timedelta, date, datetime, time, tzinfo - -from zope.interface import Interface, Attribute +from datetime import date +from datetime import datetime +from datetime import time +from datetime import timedelta +from datetime import tzinfo + +from zope.interface import Attribute +from zope.interface import Interface from zope.interface import classImplements diff --git a/contrib/python/zope.interface/py3/zope/interface/common/interfaces.py b/contrib/python/zope.interface/py3/zope/interface/common/interfaces.py index 70bd294f35..688e323a97 100644 --- a/contrib/python/zope.interface/py3/zope/interface/common/interfaces.py +++ b/contrib/python/zope.interface/py3/zope/interface/common/interfaces.py @@ -16,6 +16,7 @@ from zope.interface import Interface from zope.interface import classImplements + class IException(Interface): "Interface for `Exception`" classImplements(Exception, IException) diff --git a/contrib/python/zope.interface/py3/zope/interface/common/io.py b/contrib/python/zope.interface/py3/zope/interface/common/io.py index 0d6f3badfc..89f1ba8034 100644 --- a/contrib/python/zope.interface/py3/zope/interface/common/io.py +++ b/contrib/python/zope.interface/py3/zope/interface/common/io.py @@ -23,6 +23,7 @@ import io as abc from zope.interface.common import ABCInterface + # pylint:disable=inherit-non-class, # pylint:disable=no-member diff --git a/contrib/python/zope.interface/py3/zope/interface/common/mapping.py b/contrib/python/zope.interface/py3/zope/interface/common/mapping.py index d04333357f..eb9a2900b7 100644 --- a/contrib/python/zope.interface/py3/zope/interface/common/mapping.py +++ b/contrib/python/zope.interface/py3/zope/interface/common/mapping.py @@ -31,6 +31,7 @@ interfaces in this module. from zope.interface import Interface from zope.interface.common import collections + class IItemMapping(Interface): """Simplest readable mapping object """ diff --git a/contrib/python/zope.interface/py3/zope/interface/common/sequence.py b/contrib/python/zope.interface/py3/zope/interface/common/sequence.py index 5edc73dc6a..738f76d42a 100644 --- a/contrib/python/zope.interface/py3/zope/interface/common/sequence.py +++ b/contrib/python/zope.interface/py3/zope/interface/common/sequence.py @@ -33,6 +33,7 @@ __docformat__ = 'restructuredtext' from zope.interface import Interface from zope.interface.common import collections + class IMinimalSequence(collections.IIterable): """Most basic sequence interface. diff --git a/contrib/python/zope.interface/py3/zope/interface/declarations.py b/contrib/python/zope.interface/py3/zope/interface/declarations.py index 61e2543929..87e625203c 100644 --- a/contrib/python/zope.interface/py3/zope/interface/declarations.py +++ b/contrib/python/zope.interface/py3/zope/interface/declarations.py @@ -27,17 +27,18 @@ There are three flavors of declarations: __docformat__ = 'restructuredtext' import sys +import weakref from types import FunctionType from types import MethodType from types import ModuleType -import weakref +from zope.interface._compat import _use_c_impl from zope.interface.interface import Interface from zope.interface.interface import InterfaceClass -from zope.interface.interface import SpecificationBase -from zope.interface.interface import Specification from zope.interface.interface import NameAndModuleComparisonMixin -from zope.interface._compat import _use_c_impl +from zope.interface.interface import Specification +from zope.interface.interface import SpecificationBase + __all__ = [ # None. The public APIs of this module are diff --git a/contrib/python/zope.interface/py3/zope/interface/document.py b/contrib/python/zope.interface/py3/zope/interface/document.py index 84cfaa0b71..2595c569d2 100644 --- a/contrib/python/zope.interface/py3/zope/interface/document.py +++ b/contrib/python/zope.interface/py3/zope/interface/document.py @@ -18,6 +18,7 @@ interface as structured text. """ import zope.interface + __all__ = [ 'asReStructuredText', 'asStructuredText', diff --git a/contrib/python/zope.interface/py3/zope/interface/exceptions.py b/contrib/python/zope.interface/py3/zope/interface/exceptions.py index d5c234a6da..0612eb230e 100644 --- a/contrib/python/zope.interface/py3/zope/interface/exceptions.py +++ b/contrib/python/zope.interface/py3/zope/interface/exceptions.py @@ -191,7 +191,7 @@ class BrokenMethodImplementation(_TargetInvalid): def __implementation_str(impl): # It could be a callable or some arbitrary object, we don't # know yet. - import inspect # Inspect is a heavy-weight dependency, lots of imports + import inspect # Inspect is a heavy-weight dependency, lots of imports try: sig = inspect.signature formatsig = str diff --git a/contrib/python/zope.interface/py3/zope/interface/interface.py b/contrib/python/zope.interface/py3/zope/interface/interface.py index 1bd6f9e818..8143daf3dd 100644 --- a/contrib/python/zope.interface/py3/zope/interface/interface.py +++ b/contrib/python/zope.interface/py3/zope/interface/interface.py @@ -15,14 +15,16 @@ """ # pylint:disable=protected-access import sys -from types import MethodType -from types import FunctionType import weakref +from types import FunctionType +from types import MethodType +from typing import Union +from zope.interface import ro from zope.interface._compat import _use_c_impl from zope.interface.exceptions import Invalid from zope.interface.ro import ro as calculate_ro -from zope.interface import ro + __all__ = [ # Most of the public API from this module is directly exported @@ -941,6 +943,14 @@ class InterfaceClass(_InterfaceClassBase): def __reduce__(self): return self.__name__ + def __or__(self, other): + """Allow type hinting syntax: Interface | None.""" + return Union[self, other] + + def __ror__(self, other): + """Allow type hinting syntax: None | Interface.""" + return Union[other, self] + Interface = InterfaceClass("Interface", __module__='zope.interface') # Interface is the only member of its own SRO. Interface._calculate_sro = lambda: (Interface,) @@ -1121,8 +1131,9 @@ def _wire(): # pylint:disable=wrong-import-position from zope.interface.declarations import implementedBy from zope.interface.declarations import providedBy -from zope.interface.exceptions import InvalidInterface from zope.interface.exceptions import BrokenImplementation +from zope.interface.exceptions import InvalidInterface + # This ensures that ``Interface`` winds up in the flattened() # list of the immutable declaration. It correctly overrides changed() diff --git a/contrib/python/zope.interface/py3/zope/interface/interfaces.py b/contrib/python/zope.interface/py3/zope/interface/interfaces.py index 2b67ce1a9e..0d315cb6a5 100644 --- a/contrib/python/zope.interface/py3/zope/interface/interfaces.py +++ b/contrib/python/zope.interface/py3/zope/interface/interfaces.py @@ -15,9 +15,10 @@ """ __docformat__ = 'restructuredtext' +from zope.interface.declarations import implementer from zope.interface.interface import Attribute from zope.interface.interface import Interface -from zope.interface.declarations import implementer + __all__ = [ 'ComponentLookupError', diff --git a/contrib/python/zope.interface/py3/zope/interface/registry.py b/contrib/python/zope.interface/py3/zope/interface/registry.py index 292499dbec..a6a24fdd80 100644 --- a/contrib/python/zope.interface/py3/zope/interface/registry.py +++ b/contrib/python/zope.interface/py3/zope/interface/registry.py @@ -15,27 +15,28 @@ """ from collections import defaultdict + try: from zope.event import notify except ImportError: # pragma: no cover def notify(*arg, **kw): pass -from zope.interface.interfaces import ISpecification +from zope.interface.adapter import AdapterRegistry +from zope.interface.declarations import implementedBy +from zope.interface.declarations import implementer +from zope.interface.declarations import implementer_only +from zope.interface.declarations import providedBy +from zope.interface.interface import Interface from zope.interface.interfaces import ComponentLookupError from zope.interface.interfaces import IAdapterRegistration from zope.interface.interfaces import IComponents from zope.interface.interfaces import IHandlerRegistration +from zope.interface.interfaces import ISpecification from zope.interface.interfaces import ISubscriptionAdapterRegistration from zope.interface.interfaces import IUtilityRegistration from zope.interface.interfaces import Registered from zope.interface.interfaces import Unregistered -from zope.interface.interface import Interface -from zope.interface.declarations import implementedBy -from zope.interface.declarations import implementer -from zope.interface.declarations import implementer_only -from zope.interface.declarations import providedBy -from zope.interface.adapter import AdapterRegistry __all__ = [ # Components is public API, but diff --git a/contrib/python/zope.interface/py3/zope/interface/ro.py b/contrib/python/zope.interface/py3/zope/interface/ro.py index 17468e9231..52986483c2 100644 --- a/contrib/python/zope.interface/py3/zope/interface/ro.py +++ b/contrib/python/zope.interface/py3/zope/interface/ro.py @@ -454,6 +454,7 @@ class _TrackingC3(C3): if self.leaf not in bad_iros: if bad_iros == (): import weakref + # This is a race condition, but it doesn't matter much. bad_iros = C3.BAD_IROS = weakref.WeakKeyDictionary() bad_iros[self.leaf] = t = ( @@ -527,6 +528,7 @@ class _ROComparison: def _generate_report(self): if self._c3_report is None: import difflib + # The opcodes we get describe how to turn 'a' into 'b'. So # the old one (legacy) needs to be first ('a') matcher = difflib.SequenceMatcher(None, self.legacy_ro, self.c3_ro) diff --git a/contrib/python/zope.interface/py3/zope/interface/verify.py b/contrib/python/zope.interface/py3/zope/interface/verify.py index 0ab0b3f96b..0894d2d2f7 100644 --- a/contrib/python/zope.interface/py3/zope/interface/verify.py +++ b/contrib/python/zope.interface/py3/zope/interface/verify.py @@ -23,8 +23,10 @@ from zope.interface.exceptions import BrokenMethodImplementation from zope.interface.exceptions import DoesNotImplement from zope.interface.exceptions import Invalid from zope.interface.exceptions import MultipleInvalid +from zope.interface.interface import Method +from zope.interface.interface import fromFunction +from zope.interface.interface import fromMethod -from zope.interface.interface import fromMethod, fromFunction, Method __all__ = [ 'verifyObject', diff --git a/yt/yt/core/bus/tcp/client.cpp b/yt/yt/core/bus/tcp/client.cpp index 6fe8a08856..770375ddfe 100644 --- a/yt/yt/core/bus/tcp/client.cpp +++ b/yt/yt/core/bus/tcp/client.cpp @@ -141,9 +141,11 @@ class TTcpBusClient public: TTcpBusClient( TBusClientConfigPtr config, - IPacketTranscoderFactory* packetTranscoderFactory) + IPacketTranscoderFactory* packetTranscoderFactory, + IMemoryUsageTrackerPtr memoryUsageTracker) : Config_(std::move(config)) , PacketTranscoderFactory_(packetTranscoderFactory) + , MemoryUsageTracker_(std::move(memoryUsageTracker)) { if (Config_->Address) { EndpointDescription_ = *Config_->Address; @@ -204,7 +206,8 @@ public: Config_->UnixDomainSocketPath, std::move(handler), std::move(poller), - PacketTranscoderFactory_); + PacketTranscoderFactory_, + MemoryUsageTracker_); connection->Start(); return New<TTcpClientBusProxy>(std::move(connection)); @@ -215,6 +218,8 @@ private: IPacketTranscoderFactory* const PacketTranscoderFactory_; + const IMemoryUsageTrackerPtr MemoryUsageTracker_; + TString EndpointDescription_; IAttributeDictionaryPtr EndpointAttributes_; }; @@ -223,9 +228,13 @@ private: IBusClientPtr CreateBusClient( TBusClientConfigPtr config, - IPacketTranscoderFactory* packetTranscoderFactory) + IPacketTranscoderFactory* packetTranscoderFactory, + IMemoryUsageTrackerPtr memoryUsageTracker) { - return New<TTcpBusClient>(std::move(config), packetTranscoderFactory); + return New<TTcpBusClient>( + std::move(config), + packetTranscoderFactory, + std::move(memoryUsageTracker)); } //////////////////////////////////////////////////////////////////////////////// diff --git a/yt/yt/core/bus/tcp/client.h b/yt/yt/core/bus/tcp/client.h index 5b38dd1d51..bebb0b3209 100644 --- a/yt/yt/core/bus/tcp/client.h +++ b/yt/yt/core/bus/tcp/client.h @@ -11,7 +11,8 @@ namespace NYT::NBus { //! Initializes a new client for communicating with a given address. IBusClientPtr CreateBusClient( TBusClientConfigPtr config, - IPacketTranscoderFactory* packetTranscoderFactory = GetYTPacketTranscoderFactory()); + IPacketTranscoderFactory* packetTranscoderFactory = GetYTPacketTranscoderFactory(), + IMemoryUsageTrackerPtr memoryUsageTracker = nullptr); //////////////////////////////////////////////////////////////////////////////// diff --git a/yt/yt/core/bus/tcp/connection.cpp b/yt/yt/core/bus/tcp/connection.cpp index 07904d9fbe..8fd3321ab5 100644 --- a/yt/yt/core/bus/tcp/connection.cpp +++ b/yt/yt/core/bus/tcp/connection.cpp @@ -97,6 +97,50 @@ void TTcpConnection::TPacket::EnableCancel(TTcpConnectionPtr connection) //////////////////////////////////////////////////////////////////////////////// +TTcpConnection::TBlobWithMemoryUsageGuard::TBlobWithMemoryUsageGuard( + TBlob&& blob, + TMemoryUsageTrackerGuard&& guard) + : Blob_(std::move(blob)) + , Guard_(std::move(guard)) +{ } + +char* TTcpConnection::TBlobWithMemoryUsageGuard::Begin() +{ + return Blob_.Begin(); +} + +char* TTcpConnection::TBlobWithMemoryUsageGuard::End() +{ + return Blob_.End(); +} + +size_t TTcpConnection::TBlobWithMemoryUsageGuard::Capacity() +{ + return Blob_.Capacity(); +} + +i64 TTcpConnection::TBlobWithMemoryUsageGuard::Size() +{ + return Blob_.Size(); +} + +void TTcpConnection::TBlobWithMemoryUsageGuard::Append(TRef ref) +{ + Blob_.Append(ref); +} + +void TTcpConnection::TBlobWithMemoryUsageGuard::Clear() +{ + Blob_.Clear(); +} + +void TTcpConnection::TBlobWithMemoryUsageGuard::Reserve(i64 size) +{ + Blob_.Reserve(size); +} + +//////////////////////////////////////////////////////////////////////////////// + TTcpConnection::TTcpConnection( TBusConfigPtr config, EConnectionType connectionType, @@ -110,7 +154,8 @@ TTcpConnection::TTcpConnection( const std::optional<TString>& unixDomainSocketPath, IMessageHandlerPtr handler, IPollerPtr poller, - IPacketTranscoderFactory* packetTranscoderFactory) + IPacketTranscoderFactory* packetTranscoderFactory, + IMemoryUsageTrackerPtr memoryUsageTracker) : Config_(std::move(config)) , ConnectionType_(connectionType) , Id_(id) @@ -137,6 +182,7 @@ TTcpConnection::TTcpConnection( , WriteStallTimeout_(NProfiling::DurationToCpuDuration(Config_->WriteStallTimeout)) , EncryptionMode_(Config_->EncryptionMode) , VerificationMode_(Config_->VerificationMode) + , MemoryUsageTracker_(std::move(memoryUsageTracker)) { } TTcpConnection::~TTcpConnection() @@ -211,7 +257,14 @@ void TTcpConnection::Start() } TTcpDispatcher::TImpl::Get()->RegisterConnection(this); - InitBuffers(); + + auto result = InitBuffers(); + + if (!result.IsOK()) { + YT_LOG_ERROR(result, "I/O buffers allocation error"); + Abort(TError(NBus::EErrorCode::TransportError, "I/O buffers allocation error")); + return; + } switch (ConnectionType_) { case EConnectionType::Client: @@ -548,12 +601,34 @@ bool TTcpConnection::AbortIfNetworkingDisabled() return true; } -void TTcpConnection::InitBuffers() +TError TTcpConnection::InitBuffers() { - ReadBuffer_ = TBlob(GetRefCountedTypeCookie<TTcpConnectionReadBufferTag>(), ReadBufferSize, /*initializeStorage*/ false); + auto readBufferGuardOrError = MemoryUsageTracker_ + ? TMemoryUsageTrackerGuard::TryAcquire(MemoryUsageTracker_, ReadBufferSize) + : TMemoryUsageTrackerGuard(); + auto writeBufferGuardOrError = MemoryUsageTracker_ + ? TMemoryUsageTrackerGuard::TryAcquire(MemoryUsageTracker_, WriteBufferSize) + : TMemoryUsageTrackerGuard(); - WriteBuffers_.push_back(std::make_unique<TBlob>(GetRefCountedTypeCookie<TTcpConnectionWriteBufferTag>())); + if (!readBufferGuardOrError.IsOK()) { + return readBufferGuardOrError.Wrap(); + } + + if (!writeBufferGuardOrError.IsOK()) { + return writeBufferGuardOrError.Wrap(); + } + + ReadBuffer_ = TBlobWithMemoryUsageGuard( + TBlob(GetRefCountedTypeCookie<TTcpConnectionReadBufferTag>(), ReadBufferSize, /*initializeStorage*/ false), + std::move(readBufferGuardOrError.Value()) + ); + + WriteBuffers_.push_back(std::make_unique<TBlobWithMemoryUsageGuard>( + TBlob(GetRefCountedTypeCookie<TTcpConnectionWriteBufferTag>()), + std::move(writeBufferGuardOrError.Value()))); WriteBuffers_[0]->Reserve(WriteBufferSize); + + return TError(); } int TTcpConnection::GetSocketPort() @@ -1445,10 +1520,18 @@ bool TTcpConnection::MaybeEncodeFragments() if (buffer->Size() + fragment.Size() > buffer->Capacity()) { // Make sure we never reallocate. flushCoalesced(); - WriteBuffers_.push_back(std::make_unique<TBlob>(GetRefCountedTypeCookie<TTcpConnectionWriteBufferTag>())); + + auto writeBufferGuard = MemoryUsageTracker_ + ? TMemoryUsageTrackerGuard::Acquire(MemoryUsageTracker_, WriteBufferSize) + : TMemoryUsageTrackerGuard(); + WriteBuffers_.push_back(std::make_unique<TBlobWithMemoryUsageGuard>( + TBlob(GetRefCountedTypeCookie<TTcpConnectionWriteBufferTag>()), + std::move(writeBufferGuard))); + buffer = WriteBuffers_.back().get(); buffer->Reserve(std::max(WriteBufferSize, fragment.Size())); } + buffer->Append(fragment); coalescedSize += fragment.Size(); }; diff --git a/yt/yt/core/bus/tcp/connection.h b/yt/yt/core/bus/tcp/connection.h index 0f5d758152..37ac4a4ba9 100644 --- a/yt/yt/core/bus/tcp/connection.h +++ b/yt/yt/core/bus/tcp/connection.h @@ -20,6 +20,7 @@ #include <yt/yt/core/misc/mpsc_stack.h> #include <yt/yt/core/misc/ring_queue.h> #include <yt/yt/core/misc/atomic_ptr.h> +#include <yt/yt/core/misc/memory_usage_tracker.h> #include <yt/yt/core/net/public.h> @@ -86,7 +87,8 @@ public: const std::optional<TString>& unixDomainSocketPath, IMessageHandlerPtr handler, NConcurrency::IPollerPtr poller, - IPacketTranscoderFactory* packetTranscoderFactory); + IPacketTranscoderFactory* packetTranscoderFactory, + IMemoryUsageTrackerPtr memoryUsageTracker); ~TTcpConnection(); @@ -181,6 +183,39 @@ private: void EnableCancel(TTcpConnectionPtr connection); }; + class TBlobWithMemoryUsageGuard { + public: + TBlobWithMemoryUsageGuard( + TBlob&& blob, + TMemoryUsageTrackerGuard&& guard); + + TBlobWithMemoryUsageGuard() = default; + TBlobWithMemoryUsageGuard(const TBlobWithMemoryUsageGuard& other) = delete; + TBlobWithMemoryUsageGuard(TBlobWithMemoryUsageGuard&& other) = default; + ~TBlobWithMemoryUsageGuard() = default; + + TBlobWithMemoryUsageGuard& operator=(const TBlobWithMemoryUsageGuard& other) = delete; + TBlobWithMemoryUsageGuard& operator=(TBlobWithMemoryUsageGuard&& other) = default; + + char* Begin(); + + char* End(); + + size_t Capacity(); + + i64 Size(); + + void Append(TRef ref); + + void Clear(); + + void Reserve(i64 size); + + private: + TBlob Blob_; + TMemoryUsageTrackerGuard Guard_; + }; + using TPacketPtr = TIntrusivePtr<TPacket>; const TBusConfigPtr Config_; @@ -239,7 +274,7 @@ private: std::unique_ptr<IPacketDecoder> Decoder_; const NProfiling::TCpuDuration ReadStallTimeout_; std::atomic<NProfiling::TCpuInstant> LastIncompleteReadTime_ = std::numeric_limits<NProfiling::TCpuInstant>::max(); - TBlob ReadBuffer_; + TBlobWithMemoryUsageGuard ReadBuffer_; TRingQueue<TPacketPtr> QueuedPackets_; TRingQueue<TPacketPtr> EncodedPackets_; @@ -248,7 +283,7 @@ private: std::unique_ptr<IPacketEncoder> Encoder_; const NProfiling::TCpuDuration WriteStallTimeout_; std::atomic<NProfiling::TCpuInstant> LastIncompleteWriteTime_ = std::numeric_limits<NProfiling::TCpuInstant>::max(); - std::vector<std::unique_ptr<TBlob>> WriteBuffers_; + std::vector<std::unique_ptr<TBlobWithMemoryUsageGuard>> WriteBuffers_; TRingQueue<TRef> EncodedFragments_; TRingQueue<size_t> EncodedPacketSizes_; @@ -277,6 +312,8 @@ private: const EEncryptionMode EncryptionMode_; const EVerificationMode VerificationMode_; + const IMemoryUsageTrackerPtr MemoryUsageTracker_; + NYTree::IAttributeDictionaryPtr PeerAttributes_; size_t MaxFragmentsPerWrite_ = 256; @@ -289,7 +326,7 @@ private: bool AbortIfNetworkingDisabled(); void AbortSslSession(); - void InitBuffers(); + TError InitBuffers(); int GetSocketPort(); diff --git a/yt/yt/core/bus/tcp/packet.cpp b/yt/yt/core/bus/tcp/packet.cpp index a6cd02de4f..bfd59be96d 100644 --- a/yt/yt/core/bus/tcp/packet.cpp +++ b/yt/yt/core/bus/tcp/packet.cpp @@ -160,13 +160,17 @@ class TPacketDecoder , public TPacketTranscoderBase<TPacketDecoder> { public: - TPacketDecoder(const NLogging::TLogger& logger, bool verifyChecksum) + TPacketDecoder( + const NLogging::TLogger& logger, + bool verifyChecksum, + IMemoryUsageTrackerPtr memoryUsageTracker) : TPacketTranscoderBase(logger) , Allocator_( PacketDecoderChunkSize, TChunkedMemoryAllocator::DefaultMaxSmallBlockSizeRatio, GetRefCountedTypeCookie<TPacketDecoderTag>()) , VerifyChecksum_(verifyChecksum) + , MemoryUsageTracker_(std::move(memoryUsageTracker)) { Restart(); } @@ -201,6 +205,7 @@ public: Phase_ = EPacketPhase::FixedHeader; PacketSize_ = 0; Parts_.clear(); + MemoryGuards_.clear(); PartIndex_ = -1; Message_.Reset(); @@ -243,11 +248,14 @@ private: TChunkedMemoryAllocator Allocator_; std::vector<TSharedRef> Parts_; + std::vector<TMemoryUsageTrackerGuard> MemoryGuards_; size_t PacketSize_ = 0; const bool VerifyChecksum_; + const IMemoryUsageTrackerPtr MemoryUsageTracker_; + bool EndFixedHeaderPhase() { if (FixedHeader_.Signature != PacketSignature) { @@ -350,6 +358,10 @@ private: } else if (partSize == 0) { Parts_.push_back(TSharedRef::MakeEmpty()); } else { + if (MemoryUsageTracker_) { + MemoryGuards_.push_back(TMemoryUsageTrackerGuard::Acquire(MemoryUsageTracker_, partSize)); + } + auto part = Allocator_.AllocateAligned(partSize); BeginPhase(EPacketPhase::MessagePart, part.Begin(), part.Size()); Parts_.push_back(std::move(part)); @@ -494,14 +506,19 @@ private: //////////////////////////////////////////////////////////////////////////////// -struct TPacketTranscoderFactory +class TPacketTranscoderFactory : public IPacketTranscoderFactory { +public: + TPacketTranscoderFactory(IMemoryUsageTrackerPtr memoryUsageTracker) + : MemoryUsageTracker_(std::move(memoryUsageTracker)) + { } + std::unique_ptr<IPacketDecoder> CreateDecoder( const NLogging::TLogger& logger, bool verifyChecksum) const override { - return std::make_unique<TPacketDecoder>(logger, verifyChecksum); + return std::make_unique<TPacketDecoder>(logger, verifyChecksum, MemoryUsageTracker_); } std::unique_ptr<IPacketEncoder> CreateEncoder( @@ -509,13 +526,16 @@ struct TPacketTranscoderFactory { return std::make_unique<TPacketEncoder>(logger); } + +private: + const IMemoryUsageTrackerPtr MemoryUsageTracker_; }; //////////////////////////////////////////////////////////////////////////////// -IPacketTranscoderFactory* GetYTPacketTranscoderFactory() +IPacketTranscoderFactory* GetYTPacketTranscoderFactory(IMemoryUsageTrackerPtr memoryUsageTracker) { - return LeakySingleton<TPacketTranscoderFactory>(); + return LeakySingleton<TPacketTranscoderFactory>(std::move(memoryUsageTracker)); } //////////////////////////////////////////////////////////////////////////////// diff --git a/yt/yt/core/bus/tcp/packet.h b/yt/yt/core/bus/tcp/packet.h index 05e4693128..8182fbae40 100644 --- a/yt/yt/core/bus/tcp/packet.h +++ b/yt/yt/core/bus/tcp/packet.h @@ -2,6 +2,8 @@ #include "private.h" +#include <yt/yt/core/misc/memory_usage_tracker.h> + namespace NYT::NBus { //////////////////////////////////////////////////////////////////////////////// @@ -76,7 +78,7 @@ struct IPacketTranscoderFactory //////////////////////////////////////////////////////////////////////////////// -IPacketTranscoderFactory* GetYTPacketTranscoderFactory(); +IPacketTranscoderFactory* GetYTPacketTranscoderFactory(IMemoryUsageTrackerPtr memoryUsageTracker = nullptr); //////////////////////////////////////////////////////////////////////////////// diff --git a/yt/yt/core/bus/tcp/server.cpp b/yt/yt/core/bus/tcp/server.cpp index aa46ee9068..ddc15d7889 100644 --- a/yt/yt/core/bus/tcp/server.cpp +++ b/yt/yt/core/bus/tcp/server.cpp @@ -43,11 +43,13 @@ public: TBusServerConfigPtr config, IPollerPtr poller, IMessageHandlerPtr handler, - IPacketTranscoderFactory* packetTranscoderFactory) + IPacketTranscoderFactory* packetTranscoderFactory, + IMemoryUsageTrackerPtr memoryUsageTracker) : Config_(std::move(config)) , Poller_(std::move(poller)) , Handler_(std::move(handler)) , PacketTranscoderFactory_(std::move(packetTranscoderFactory)) + , MemoryUsageTracker_(std::move(memoryUsageTracker)) { YT_VERIFY(Config_); YT_VERIFY(Poller_); @@ -123,6 +125,8 @@ protected: IPacketTranscoderFactory* const PacketTranscoderFactory_; + const IMemoryUsageTrackerPtr MemoryUsageTracker_; + YT_DECLARE_SPIN_LOCK(NThreading::TSpinLock, ControlSpinLock_); SOCKET ServerSocket_ = INVALID_SOCKET; @@ -253,7 +257,8 @@ protected: std::nullopt, Handler_, std::move(poller), - PacketTranscoderFactory_); + PacketTranscoderFactory_, + MemoryUsageTracker_); { auto guard = WriterGuard(ConnectionsSpinLock_); @@ -347,12 +352,14 @@ public: TBusServerConfigPtr config, IPollerPtr poller, IMessageHandlerPtr handler, - IPacketTranscoderFactory* packetTranscoderFactory) + IPacketTranscoderFactory* packetTranscoderFactory, + IMemoryUsageTrackerPtr memoryUsageTracker) : TTcpBusServerBase( std::move(config), std::move(poller), std::move(handler), - packetTranscoderFactory) + packetTranscoderFactory, + std::move(memoryUsageTracker)) { } private: @@ -390,9 +397,11 @@ class TTcpBusServerProxy public: explicit TTcpBusServerProxy( TBusServerConfigPtr config, - IPacketTranscoderFactory* packetTranscoderFactory) + IPacketTranscoderFactory* packetTranscoderFactory, + IMemoryUsageTrackerPtr memoryUsageTracker) : Config_(std::move(config)) , PacketTranscoderFactory_(packetTranscoderFactory) + , MemoryUsageTracker_(std::move(memoryUsageTracker)) { YT_VERIFY(Config_); } @@ -408,7 +417,8 @@ public: Config_, TTcpDispatcher::TImpl::Get()->GetAcceptorPoller(), std::move(handler), - PacketTranscoderFactory_); + PacketTranscoderFactory_, + MemoryUsageTracker_); Server_.Store(server); server->Start(); @@ -428,6 +438,8 @@ private: IPacketTranscoderFactory* const PacketTranscoderFactory_; + const IMemoryUsageTrackerPtr MemoryUsageTracker_; + TAtomicIntrusivePtr<TServer> Server_; }; @@ -467,7 +479,8 @@ private: IBusServerPtr CreateBusServer( TBusServerConfigPtr config, - IPacketTranscoderFactory* packetTranscoderFactory) + IPacketTranscoderFactory* packetTranscoderFactory, + IMemoryUsageTrackerPtr memoryUsageTracker) { std::vector<IBusServerPtr> servers; @@ -475,14 +488,16 @@ IBusServerPtr CreateBusServer( servers.push_back( New<TTcpBusServerProxy<TRemoteTcpBusServer>>( config, - packetTranscoderFactory)); + packetTranscoderFactory, + memoryUsageTracker)); } #ifdef _linux_ // Abstract unix sockets are supported only on Linux. servers.push_back( New<TTcpBusServerProxy<TLocalTcpBusServer>>( config, - packetTranscoderFactory)); + packetTranscoderFactory, + memoryUsageTracker)); #endif return New<TCompositeBusServer>(std::move(servers)); diff --git a/yt/yt/core/bus/tcp/server.h b/yt/yt/core/bus/tcp/server.h index 7dcb351e10..e50ce5320b 100644 --- a/yt/yt/core/bus/tcp/server.h +++ b/yt/yt/core/bus/tcp/server.h @@ -10,7 +10,8 @@ namespace NYT::NBus { IBusServerPtr CreateBusServer( TBusServerConfigPtr config, - IPacketTranscoderFactory* packetTranscoderFactory = GetYTPacketTranscoderFactory()); + IPacketTranscoderFactory* packetTranscoderFactory = GetYTPacketTranscoderFactory(), + IMemoryUsageTrackerPtr memoryUsageTracker = nullptr); //////////////////////////////////////////////////////////////////////////////// |