diff options
author | Alexander Smirnov <alex@ydb.tech> | 2025-03-22 00:51:33 +0000 |
---|---|---|
committer | Alexander Smirnov <alex@ydb.tech> | 2025-03-22 00:51:33 +0000 |
commit | 25754ddf6bf0d9f39deb2b793a946176b6d7c9fb (patch) | |
tree | b81cf85ea8fbef3290618d909971b9ad02dd9e46 | |
parent | c18aa245b684fef9b14c697b38e6c6695e0733f3 (diff) | |
parent | 87f8036d8027790ed03ac34feb2b5f3e141f948c (diff) | |
download | ydb-25754ddf6bf0d9f39deb2b793a946176b6d7c9fb.tar.gz |
Merge branch 'rightlib' into merge-libs-250322-0050
70 files changed, 717 insertions, 320 deletions
diff --git a/build/conf/python.conf b/build/conf/python.conf index 2717a11f9a..7d5867fb97 100644 --- a/build/conf/python.conf +++ b/build/conf/python.conf @@ -306,14 +306,14 @@ macro STYLE_PYTHON(CONFIG_TYPE="") { } # tag:python-specific tag:test -### @usage: STYLE_RUFF([CONFIG_TYPE config_type]) +### @usage: STYLE_RUFF([CONFIG_TYPE config_type] [CHECK_FORMAT]) ### -### Check python3 sources for style issues using ruff. +### Check python3 sources for style issues using ruff. `CHECK_FORMAT` enables `ruff format` check. RUFF_PROJECT_TO_CONFIG_MAP=build/config/tests/ruff/ruff_config_paths.json -macro STYLE_RUFF(CONFIG_TYPE="") { +macro STYLE_RUFF(CONFIG_TYPE="", CHECK_FORMAT?"yes":"no") { .ALLOWED_IN_LINTERS_MAKE=yes SET_APPEND(_MAKEFILE_INCLUDE_LIKE_DEPS ${ARCADIA_ROOT}/${RUFF_PROJECT_TO_CONFIG_MAP}) - _ADD_PY_LINTER_CHECK(NAME ruff LINTER tools/ruff_linter/bin/ruff_linter GLOBAL_RESOURCES build/external_resources/ruff FILE_PROCESSING_TIME $RUFF_FILE_PROCESSING_TIME CONFIGS $PYTHON_LINTERS_DEFAULT_CONFIGS PROJECT_TO_CONFIG_MAP $RUFF_PROJECT_TO_CONFIG_MAP CONFIG_TYPE $CONFIG_TYPE) + _ADD_PY_LINTER_CHECK(NAME ruff LINTER tools/ruff_linter/bin/ruff_linter GLOBAL_RESOURCES build/external_resources/ruff FILE_PROCESSING_TIME $RUFF_FILE_PROCESSING_TIME CONFIGS $PYTHON_LINTERS_DEFAULT_CONFIGS PROJECT_TO_CONFIG_MAP $RUFF_PROJECT_TO_CONFIG_MAP CONFIG_TYPE $CONFIG_TYPE EXTRA_PARAMS check_format=${CHECK_FORMAT}) } # tag:python-specific tag:test diff --git a/build/mapping.conf.json b/build/mapping.conf.json index f4277985dd..15898974bf 100644 --- a/build/mapping.conf.json +++ b/build/mapping.conf.json @@ -522,6 +522,7 @@ "8249001226": "{registry_endpoint}/8249001226", "8273785013": "{registry_endpoint}/8273785013", "8307046461": "{registry_endpoint}/8307046461", + "8317487990": "{registry_endpoint}/8317487990", "5486731632": "{registry_endpoint}/5486731632", "5514350352": "{registry_endpoint}/5514350352", "5514360398": "{registry_endpoint}/5514360398", @@ -1818,6 +1819,7 @@ "8249001226": "devtools/ya/test/programs/test_tool/bin/test_tool for linux", "8273785013": "devtools/ya/test/programs/test_tool/bin/test_tool for linux", "8307046461": "devtools/ya/test/programs/test_tool/bin/test_tool for linux", + "8317487990": "devtools/ya/test/programs/test_tool/bin/test_tool for linux", "5486731632": "devtools/ya/test/programs/test_tool/bin3/test_tool3 for linux", "5514350352": "devtools/ya/test/programs/test_tool/bin3/test_tool3 for linux", "5514360398": "devtools/ya/test/programs/test_tool/bin3/test_tool3 for linux", diff --git a/build/platform/test_tool/host.ya.make.inc b/build/platform/test_tool/host.ya.make.inc index 5af9746a41..e1d2a80596 100644 --- a/build/platform/test_tool/host.ya.make.inc +++ b/build/platform/test_tool/host.ya.make.inc @@ -1,12 +1,12 @@ IF (HOST_OS_DARWIN AND HOST_ARCH_X86_64) - DECLARE_EXTERNAL_RESOURCE(TEST_TOOL_HOST sbr:8307070450) + DECLARE_EXTERNAL_RESOURCE(TEST_TOOL_HOST sbr:8317513158) ELSEIF (HOST_OS_DARWIN AND HOST_ARCH_ARM64) - DECLARE_EXTERNAL_RESOURCE(TEST_TOOL_HOST sbr:8307068606) + DECLARE_EXTERNAL_RESOURCE(TEST_TOOL_HOST sbr:8317511126) ELSEIF (HOST_OS_LINUX AND HOST_ARCH_X86_64) - DECLARE_EXTERNAL_RESOURCE(TEST_TOOL_HOST sbr:8307073076) + DECLARE_EXTERNAL_RESOURCE(TEST_TOOL_HOST sbr:8317517786) ELSEIF (HOST_OS_LINUX AND HOST_ARCH_AARCH64) - DECLARE_EXTERNAL_RESOURCE(TEST_TOOL_HOST sbr:8307067350) + DECLARE_EXTERNAL_RESOURCE(TEST_TOOL_HOST sbr:8317509303) ELSEIF (HOST_OS_WINDOWS AND HOST_ARCH_X86_64) - DECLARE_EXTERNAL_RESOURCE(TEST_TOOL_HOST sbr:8307071994) + DECLARE_EXTERNAL_RESOURCE(TEST_TOOL_HOST sbr:8317515729) ENDIF() diff --git a/build/platform/test_tool/host_os.ya.make.inc b/build/platform/test_tool/host_os.ya.make.inc index 3b960b3a0d..d0b597fd49 100644 --- a/build/platform/test_tool/host_os.ya.make.inc +++ b/build/platform/test_tool/host_os.ya.make.inc @@ -1,12 +1,12 @@ IF (HOST_OS_DARWIN AND HOST_ARCH_X86_64) - DECLARE_EXTERNAL_RESOURCE(TEST_TOOL_HOST sbr:8307042769) + DECLARE_EXTERNAL_RESOURCE(TEST_TOOL_HOST sbr:8317485510) ELSEIF (HOST_OS_DARWIN AND HOST_ARCH_ARM64) - DECLARE_EXTERNAL_RESOURCE(TEST_TOOL_HOST sbr:8307040830) + DECLARE_EXTERNAL_RESOURCE(TEST_TOOL_HOST sbr:8317484083) ELSEIF (HOST_OS_LINUX AND HOST_ARCH_X86_64) - DECLARE_EXTERNAL_RESOURCE(TEST_TOOL_HOST sbr:8307046461) + DECLARE_EXTERNAL_RESOURCE(TEST_TOOL_HOST sbr:8317487990) ELSEIF (HOST_OS_LINUX AND HOST_ARCH_AARCH64) - DECLARE_EXTERNAL_RESOURCE(TEST_TOOL_HOST sbr:8307039261) + DECLARE_EXTERNAL_RESOURCE(TEST_TOOL_HOST sbr:8317482891) ELSEIF (HOST_OS_WINDOWS AND HOST_ARCH_X86_64) - DECLARE_EXTERNAL_RESOURCE(TEST_TOOL_HOST sbr:8307044717) + DECLARE_EXTERNAL_RESOURCE(TEST_TOOL_HOST sbr:8317486711) ENDIF() diff --git a/build/sanitize-blacklist.txt b/build/sanitize-blacklist.txt index 711002774b..bd559495dc 100644 --- a/build/sanitize-blacklist.txt +++ b/build/sanitize-blacklist.txt @@ -8,4 +8,4 @@ src:*contrib/python/matplotlib/py2/src/* src:*contrib/python/matplotlib/py3/src/* # DTCC-1909: issues use-of-uninitialized-value on msan on Clang16 # uninitialized variable size in function add_metadata_from_side_data -src:*contrib/libs/ffmpeg-3/libavcodec/utils.c +src:*contrib/deprecated/ffmpeg-3/libavcodec/utils.c diff --git a/build/sysincl/misc.yml b/build/sysincl/misc.yml index a27b6e3de7..a1f2c6b8a1 100644 --- a/build/sysincl/misc.yml +++ b/build/sysincl/misc.yml @@ -97,12 +97,12 @@ - atomic.h: contrib/restricted/openal-soft/common/atomic.h - threads.h: contrib/restricted/openal-soft/common/threads.h -- source_filter: "^contrib/libs/ffmpeg-3/" +- source_filter: "^contrib/deprecated/ffmpeg-3/" includes: - fontconfig/fontconfig.h - fribidi.h - - stdatomic.h: contrib/libs/ffmpeg-3/compat/atomics/win32/stdatomic.h - - atomic.h: contrib/libs/ffmpeg-3/libavutil/atomic.h + - stdatomic.h: contrib/deprecated/ffmpeg-3/compat/atomics/win32/stdatomic.h + - atomic.h: contrib/deprecated/ffmpeg-3/libavutil/atomic.h - source_filter: "^contrib/libs/ffmpeg-3.4.1/" includes: diff --git a/build/ymake.core.conf b/build/ymake.core.conf index 939263b153..53c47f8a50 100644 --- a/build/ymake.core.conf +++ b/build/ymake.core.conf @@ -4856,7 +4856,7 @@ _P_PK=${hide;kv:"p PK"} TOUCH_PACKAGE_MF=$GENERATE_MF && $TOUCH_PACKAGE $_P_PK && $ADD_VCS_INFO_FILE_CMD # Note: we don't use touch.py in the command below to avoid introduction of undesired input -UNION_CMD=$YMAKE_PYTHON -c open(\'$TARGET\',\'w\').close() ${hide;kv:"p UN"} ${hide;kv:"package UNION"} ${hide;kv:"pc light-cyan"} $UNION_OUTS $VCS_INFO_DISABLE_CACHE__NO_UID__ +UNION_CMD=$YMAKE_PYTHON3 -c open(\'$TARGET\',\'w\').close() ${hide;kv:"p UN"} ${hide;kv:"package UNION"} ${hide;kv:"pc light-cyan"} $UNION_OUTS $VCS_INFO_DISABLE_CACHE__NO_UID__ UNION_CMD_MF=$UNION_CMD && $GENERATE_MF macro _EXPAND_INS_OUTS(FILES{input}[]) { diff --git a/contrib/python/argcomplete/py3/.dist-info/METADATA b/contrib/python/argcomplete/py3/.dist-info/METADATA index fe74af8ae5..bf74fb4961 100644 --- a/contrib/python/argcomplete/py3/.dist-info/METADATA +++ b/contrib/python/argcomplete/py3/.dist-info/METADATA @@ -1,6 +1,6 @@ Metadata-Version: 2.4 Name: argcomplete -Version: 3.5.3 +Version: 3.6.0 Summary: Bash tab completion for argparse Project-URL: Homepage, https://github.com/kislyuk/argcomplete Project-URL: Documentation, https://kislyuk.github.io/argcomplete @@ -9,6 +9,8 @@ Project-URL: Issue Tracker, https://github.com/kislyuk/argcomplete/issues Project-URL: Change Log, https://github.com/kislyuk/argcomplete/blob/develop/Changes.rst Author: Andrey Kislyuk Author-email: kislyuk@gmail.com +Maintainer: Andrey Kislyuk +Maintainer-email: kislyuk@gmail.com License: Apache Software License License-File: LICENSE.rst License-File: NOTICE @@ -20,12 +22,12 @@ Classifier: Operating System :: MacOS :: MacOS X Classifier: Operating System :: POSIX Classifier: Programming Language :: Python Classifier: Programming Language :: Python :: 3 -Classifier: Programming Language :: Python :: 3.7 Classifier: Programming Language :: Python :: 3.8 Classifier: Programming Language :: Python :: 3.9 Classifier: Programming Language :: Python :: 3.10 Classifier: Programming Language :: Python :: 3.11 Classifier: Programming Language :: Python :: 3.12 +Classifier: Programming Language :: Python :: 3.13 Classifier: Programming Language :: Python :: Implementation :: CPython Classifier: Programming Language :: Python :: Implementation :: PyPy Classifier: Topic :: Software Development @@ -296,7 +298,7 @@ work for zsh as well. Python Support -------------- -Argcomplete requires Python 3.7+. +Argcomplete requires Python 3.9+. Support for other shells ------------------------ diff --git a/contrib/python/argcomplete/py3/README.rst b/contrib/python/argcomplete/py3/README.rst index 13f0fa7ae3..c897ad4191 100644 --- a/contrib/python/argcomplete/py3/README.rst +++ b/contrib/python/argcomplete/py3/README.rst @@ -253,7 +253,7 @@ work for zsh as well. Python Support -------------- -Argcomplete requires Python 3.7+. +Argcomplete requires Python 3.9+. Support for other shells ------------------------ diff --git a/contrib/python/argcomplete/py3/argcomplete/_check_module.py b/contrib/python/argcomplete/py3/argcomplete/_check_module.py index 7fd6a5caa4..03726529fc 100644 --- a/contrib/python/argcomplete/py3/argcomplete/_check_module.py +++ b/contrib/python/argcomplete/py3/argcomplete/_check_module.py @@ -10,30 +10,7 @@ Intended to be invoked by argcomplete's global completion function. import os import sys import tokenize - -try: - from importlib.util import find_spec # type:ignore -except ImportError: - import typing as t - from collections import namedtuple - from imp import find_module # type:ignore - - ModuleSpec = namedtuple("ModuleSpec", ["origin", "has_location", "submodule_search_locations"]) - - def find_spec( # type:ignore - name: str, - package: t.Optional[str] = None, - ) -> t.Optional[ModuleSpec]: - """Minimal implementation as required by `find`.""" - try: - f, path, _ = find_module(name) - except ImportError: - return None - has_location = path is not None - if f is None: - return ModuleSpec(None, has_location, [path]) - f.close() - return ModuleSpec(path, has_location, None) +from importlib.util import find_spec class ArgcompleteMarkerNotFound(RuntimeError): @@ -42,7 +19,12 @@ class ArgcompleteMarkerNotFound(RuntimeError): def find(name, return_package=False): names = name.split(".") - spec = find_spec(names[0]) + # Look for the first importlib ModuleSpec that has `origin` set, indicating it's not a namespace package. + for package_name_boundary in range(len(names)): + spec = find_spec(".".join(names[: package_name_boundary + 1])) + if spec is not None and spec.origin is not None: + break + if spec is None: raise ArgcompleteMarkerNotFound('no module named "{}"'.format(names[0])) if not spec.has_location: @@ -53,7 +35,7 @@ def find(name, return_package=False): return spec.origin if len(spec.submodule_search_locations) != 1: raise ArgcompleteMarkerNotFound("expecting one search location") - path = os.path.join(spec.submodule_search_locations[0], *names[1:]) + path = os.path.join(spec.submodule_search_locations[0], *names[package_name_boundary + 1 :]) if os.path.isdir(path): filename = "__main__.py" if return_package: diff --git a/contrib/python/argcomplete/py3/argcomplete/bash_completion.d/_python-argcomplete b/contrib/python/argcomplete/py3/argcomplete/bash_completion.d/_python-argcomplete index 81c9d41f80..8a91272dea 100644 --- a/contrib/python/argcomplete/py3/argcomplete/bash_completion.d/_python-argcomplete +++ b/contrib/python/argcomplete/py3/argcomplete/bash_completion.d/_python-argcomplete @@ -124,6 +124,12 @@ __python_argcomplete_which() { _python_argcomplete_global() { if [[ -n "${ZSH_VERSION-}" ]]; then + if [[ "${_matcher_num-}" -gt 1 ]]; then + # Return early if the completer is called multiple times in the same completion run. + # Currently the only known occurrence of this is in zsh when a matcher-list zstyle is declared. + # When this happens, _matcher_num is incremented past 1. + return + fi # Store result of a regex match in the # BASH_REMATCH variable rather than MATCH setopt local_options BASH_REMATCH diff --git a/contrib/python/argcomplete/py3/argcomplete/completers.py b/contrib/python/argcomplete/py3/argcomplete/completers.py index 610349f652..4c01e69518 100644 --- a/contrib/python/argcomplete/py3/argcomplete/completers.py +++ b/contrib/python/argcomplete/py3/argcomplete/completers.py @@ -63,13 +63,22 @@ class FilesCompleter(BaseCompleter): # that was fixed in bash 5.3 but affects older versions. Environment variables are not treated # correctly in older versions and calling bind makes them available. For details, see # https://savannah.gnu.org/support/index.php?111125 - files = _call(["bash", "-c", "bind; compgen -A directory -- '{p}'".format(p=prefix)], stderr=subprocess.DEVNULL) + files = _call( + ["bash", "-c", "bind; compgen -A directory -- '{p}'".format(p=prefix)], stderr=subprocess.DEVNULL + ) completion += [f + "/" for f in files] for x in self.allowednames: - completion += _call(["bash", "-c", "bind; compgen -A file -X '!*.{0}' -- '{p}'".format(x, p=prefix)], stderr=subprocess.DEVNULL) + completion += _call( + ["bash", "-c", "bind; compgen -A file -X '!*.{0}' -- '{p}'".format(x, p=prefix)], + stderr=subprocess.DEVNULL, + ) else: - completion += _call(["bash", "-c", "bind; compgen -A file -- '{p}'".format(p=prefix)], stderr=subprocess.DEVNULL) - anticomp = _call(["bash", "-c", "bind; compgen -A directory -- '{p}'".format(p=prefix)], stderr=subprocess.DEVNULL) + completion += _call( + ["bash", "-c", "bind; compgen -A file -- '{p}'".format(p=prefix)], stderr=subprocess.DEVNULL + ) + anticomp = _call( + ["bash", "-c", "bind; compgen -A directory -- '{p}'".format(p=prefix)], stderr=subprocess.DEVNULL + ) completion = list(set(completion) - set(anticomp)) if self.directories: diff --git a/contrib/python/argcomplete/py3/argcomplete/packages/_argparse.py b/contrib/python/argcomplete/py3/argcomplete/packages/_argparse.py index 0666845e3a..f6ecb1f417 100644 --- a/contrib/python/argcomplete/py3/argcomplete/packages/_argparse.py +++ b/contrib/python/argcomplete/py3/argcomplete/packages/_argparse.py @@ -75,7 +75,7 @@ class IntrospectiveArgumentParser(ArgumentParser): except for the lines that contain the string "Added by argcomplete". ''' - def _parse_known_args(self, arg_strings, namespace, intermixed=False): + def _parse_known_args(self, arg_strings, namespace, intermixed=False, **kwargs): _num_consumed_args.clear() # Added by argcomplete self._argcomplete_namespace = namespace self.active_actions: List[Action] = [] # Added by argcomplete diff --git a/contrib/python/argcomplete/py3/argcomplete/packages/_shlex.py b/contrib/python/argcomplete/py3/argcomplete/packages/_shlex.py index 613feb2597..ecd785b80b 100644 --- a/contrib/python/argcomplete/py3/argcomplete/packages/_shlex.py +++ b/contrib/python/argcomplete/py3/argcomplete/packages/_shlex.py @@ -36,7 +36,7 @@ class shlex: else: self.eof = '' self.commenters = '#' - self.wordchars = 'abcdfeghijklmnopqrstuvwxyz' 'ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789_' + self.wordchars = 'abcdfeghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789_' # Modified by argcomplete: 2/3 compatibility # if self.posix: # self.wordchars += ('ßàáâãäåæçèéêëìíîïðñòóôõöøùúûüýþÿ' diff --git a/contrib/python/argcomplete/py3/argcomplete/scripts/python_argcomplete_check_easy_install_script.py b/contrib/python/argcomplete/py3/argcomplete/scripts/python_argcomplete_check_easy_install_script.py index d914c222fe..0eb744c9e9 100644 --- a/contrib/python/argcomplete/py3/argcomplete/scripts/python_argcomplete_check_easy_install_script.py +++ b/contrib/python/argcomplete/py3/argcomplete/scripts/python_argcomplete_check_easy_install_script.py @@ -33,7 +33,7 @@ def main(): lines = head.split("\n", 12) for line in lines: if line.startswith("# EASY-INSTALL-SCRIPT"): - import pkg_resources + import pkg_resources # type: ignore re_match = re.match("# EASY-INSTALL-SCRIPT: '(.+)','(.+)'", line) assert re_match is not None @@ -48,7 +48,7 @@ def main(): dist, group, name = re_match.groups() import pkgutil - import pkg_resources + import pkg_resources # type: ignore entry_point_info = pkg_resources.get_distribution(dist).get_entry_info(group, name) assert entry_point_info is not None @@ -71,7 +71,7 @@ def main(): module = re_match.groups()[0] import pkgutil - import pkg_resources + import pkg_resources # type: ignore with open(pkgutil.get_loader(module).get_filename()) as mod_fh: # type: ignore if "PYTHON_ARGCOMPLETE_OK" in mod_fh.read(1024): diff --git a/contrib/python/argcomplete/py3/argcomplete/shell_integration.py b/contrib/python/argcomplete/py3/argcomplete/shell_integration.py index f0b9d7db5b..37b5603b11 100644 --- a/contrib/python/argcomplete/py3/argcomplete/shell_integration.py +++ b/contrib/python/argcomplete/py3/argcomplete/shell_integration.py @@ -34,6 +34,12 @@ _python_argcomplete%(function_suffix)s() { local IFS=$'\013' local script="%(argcomplete_script)s" if [[ -n "${ZSH_VERSION-}" ]]; then + if [[ "${_matcher_num-}" -gt 1 ]]; then + # Return early if the completer is called multiple times in the same completion run. + # Currently the only known occurrence of this is in zsh when a matcher-list zstyle is declared. + # When this happens, _matcher_num is incremented past 1. + return + fi local completions completions=($(IFS="$IFS" \ COMP_LINE="$BUFFER" \ diff --git a/contrib/python/argcomplete/py3/ya.make b/contrib/python/argcomplete/py3/ya.make index dfa97494a9..74c5629658 100644 --- a/contrib/python/argcomplete/py3/ya.make +++ b/contrib/python/argcomplete/py3/ya.make @@ -2,7 +2,7 @@ PY3_LIBRARY() -VERSION(3.5.3) +VERSION(3.6.0) LICENSE(Apache-2.0) diff --git a/library/cpp/neh/tcp2.cpp b/library/cpp/neh/tcp2.cpp index 3dad055af1..641408973d 100644 --- a/library/cpp/neh/tcp2.cpp +++ b/library/cpp/neh/tcp2.cpp @@ -332,35 +332,6 @@ namespace { char MemPool_[MemPoolSize_ + MemPoolReserve_]; }; - //protector for limit usage tcp connection output (and used data) only from one thread at same time - class TOutputLock { - public: - TOutputLock() noexcept - : Lock_(0) - { - } - - bool TryAquire() noexcept { - do { - if (AtomicTryLock(&Lock_)) { - return true; - } - } while (!AtomicGet(Lock_)); //without magic loop atomic lock some unreliable - return false; - } - - void Release() noexcept { - AtomicUnlock(&Lock_); - } - - bool IsFree() const noexcept { - return !AtomicGet(Lock_); - } - - private: - TAtomic Lock_; - }; - class TClient { class TRequest; class TConnection; @@ -718,13 +689,13 @@ namespace { } void ProcessOutputReqsQueue() { - if (OutputLock_.TryAquire()) { + if (OutputLock_.TryAcquire()) { SendMessages(false); } } void ProcessOutputCancelsQueue() { - if (OutputLock_.TryAquire()) { + if (OutputLock_.TryAcquire()) { AS_.GetIOService().Post(std::bind(&TConnection::SendMessages, TConnectionRef(this), true)); return; } @@ -763,7 +734,7 @@ namespace { PrepareSocket(AS_.Native()); AtomicSet(State_, Connected); AS_.AsyncPollRead(std::bind(&TConnection::OnCanRead, TConnectionRef(this), _1, _2)); - if (OutputLock_.TryAquire()) { + if (OutputLock_.TryAcquire()) { SendMessages(true); return; } @@ -857,7 +828,7 @@ namespace { DBGOUT("TClient::SendMessages(exit2)"); return; } - } while (OutputLock_.TryAquire()); + } while (OutputLock_.TryAcquire()); DBGOUT("TClient::SendMessages(exit1)"); } @@ -1058,7 +1029,8 @@ namespace { TTcp2Message Msg_; //output - TOutputLock OutputLock_; + + TSpinLock OutputLock_; //protect socket/buffers from simultaneous access from few threads TAtomic NeedCheckReqsQueue_; TLockFreeQueue<TRequest*> Reqs_; TAtomic NeedCheckCancelsQueue_; @@ -1412,11 +1384,11 @@ namespace { void ProcessOutputQueue() { AtomicSet(NeedCheckOutputQueue_, 1); - if (OutputLock_.TryAquire()) { + if (OutputLock_.TryAcquire()) { SendMessages(false); return; } - DBGOUT("ProcessOutputQueue: !AquireOutputOwnership: " << (int)OutputLock_.IsFree()); + DBGOUT("ProcessOutputQueue: !AquireOutputOwnership: " << (int)!OutputLock_.IsLocked()); } //must be called only after success aquiring output @@ -1444,10 +1416,10 @@ namespace { OutputLock_.Release(); if (!AtomicGet(NeedCheckOutputQueue_)) { - DBGOUT("Server::SendMessages(exit2): " << (int)OutputLock_.IsFree()); + DBGOUT("Server::SendMessages(exit2): " << (int)!OutputLock_.IsLocked()); return; } - } while (OutputLock_.TryAquire()); + } while (OutputLock_.TryAcquire()); DBGOUT("Server::SendMessages(exit1)"); } catch (...) { OnError(); @@ -1518,7 +1490,7 @@ namespace { TLockFreeQueue<TRequestId> FinReqs_; //output - TOutputLock OutputLock_; //protect socket/buffers from simultaneous access from few threads + TSpinLock OutputLock_; //protect socket/buffers from simultaneous access from few threads TAtomic NeedCheckOutputQueue_; NNeh::TAutoLockFreeQueue<TOutputData> OutputData_; TOutputBuffers OutputBuffers_; diff --git a/library/cpp/netliba/v6/ib_low.cpp b/library/cpp/netliba/v6/ib_low.cpp index 99d77d593f..fca97353d2 100644 --- a/library/cpp/netliba/v6/ib_low.cpp +++ b/library/cpp/netliba/v6/ib_low.cpp @@ -33,7 +33,7 @@ namespace NNetliba { TIntrusivePtr<TIBContext> ctx; TIntrusivePtr<TIBPort> resPort; - int numDevices; + int numDevices{0}; ibv_device** deviceList = ibv_get_device_list(&numDevices); //for (int i = 0; i < numDevices; ++i) { // ibv_device *dev = deviceList[i]; @@ -39,33 +39,33 @@ REGISTRY_ENDPOINT = os.environ.get("YA_REGISTRY_ENDPOINT", "https://devtools-reg PLATFORM_MAP = { "data": { "win32": { - "md5": "873a2f0b74705af5bc1bf41217ce49ce", + "md5": "729e5a92e5e9253001266369de2108af", "urls": [ - f"{REGISTRY_ENDPOINT}/8307066792" + f"{REGISTRY_ENDPOINT}/8317503781" ] }, "darwin": { - "md5": "31492187efa40748dd75653a87c40cc1", + "md5": "f5e0afaf1189c93b9ef3896f3531882b", "urls": [ - f"{REGISTRY_ENDPOINT}/8307065239" + f"{REGISTRY_ENDPOINT}/8317502378" ] }, "darwin-arm64": { - "md5": "07ac8ac75b798223cd71f5c31b10110b", + "md5": "0717c848e11e10b24436d51076b5fe55", "urls": [ - f"{REGISTRY_ENDPOINT}/8307063799" + f"{REGISTRY_ENDPOINT}/8317500655" ] }, "linux-aarch64": { - "md5": "0315db8cf713e6b77de53c738bcb3ea7", + "md5": "a8716b4389a8527540e10d45c3c10325", "urls": [ - f"{REGISTRY_ENDPOINT}/8307062757" + f"{REGISTRY_ENDPOINT}/8317498738" ] }, "linux": { - "md5": "1287df93b85d867c3fb93e7f9f972b94", + "md5": "5fc8c836af0182fa3d93e89bc3fe907c", "urls": [ - f"{REGISTRY_ENDPOINT}/8307068169" + f"{REGISTRY_ENDPOINT}/8317505228" ] } } diff --git a/yql/essentials/parser/pg_wrapper/arena_ctx.cpp b/yql/essentials/parser/pg_wrapper/arena_ctx.cpp index e91cd6ad7a..6439aaa18a 100644 --- a/yql/essentials/parser/pg_wrapper/arena_ctx.cpp +++ b/yql/essentials/parser/pg_wrapper/arena_ctx.cpp @@ -30,9 +30,9 @@ extern MemoryContext ArenaGetChunkContext(void *pointer); extern Size ArenaGetChunkSpace(void *pointer); extern bool ArenaIsEmpty(MemoryContext context); extern void ArenaStats(MemoryContext context, - MemoryStatsPrintFunc printfunc, void *passthru, - MemoryContextCounters *totals, - bool print_to_stderr); + MemoryStatsPrintFunc printfunc, void *passthru, + MemoryContextCounters *totals, + bool print_to_stderr); #ifdef MEMORY_CONTEXT_CHECKING extern void ArenaCheck(MemoryContext context); #endif @@ -137,4 +137,9 @@ void TArenaMemoryContext::Release() { Prev = nullptr; } +namespace NCommon { +std::shared_ptr<void> CreateMemoryArenaContext() { + return std::make_shared<TArenaMemoryContext>(); +} +} } diff --git a/yql/essentials/parser/pg_wrapper/arrow.cpp b/yql/essentials/parser/pg_wrapper/arrow.cpp index ee52e76aac..93b27ccdf9 100644 --- a/yql/essentials/parser/pg_wrapper/arrow.cpp +++ b/yql/essentials/parser/pg_wrapper/arrow.cpp @@ -436,7 +436,6 @@ NUdf::TBlockItem BlockItemFromDatum(Datum datum, const NPg::TTypeDesc& desc, std } NUdf::TBlockItem PgBlockItemFromNativeBinary(const TStringBuf binary, ui32 pgTypeId, std::vector<char>& tmp) { - NKikimr::NMiniKQL::TPAllocScope call; StringInfoData stringInfo; stringInfo.data = (char*)binary.Data(); stringInfo.len = binary.Size(); diff --git a/yql/essentials/parser/pg_wrapper/interface/codec.h b/yql/essentials/parser/pg_wrapper/interface/codec.h index c2aa3c4af0..a3a634be83 100644 --- a/yql/essentials/parser/pg_wrapper/interface/codec.h +++ b/yql/essentials/parser/pg_wrapper/interface/codec.h @@ -50,5 +50,7 @@ void WriteSkiffPg(NKikimr::NMiniKQL::TPgType* type, const NKikimr::NUdf::TUnboxe extern "C" void ReadSkiffPgValue(NKikimr::NMiniKQL::TPgType* type, NKikimr::NUdf::TUnboxedValue& value, TInputBuf& buf); extern "C" void WriteSkiffPgValue(NKikimr::NMiniKQL::TPgType* type, const NKikimr::NUdf::TUnboxedValuePod& value, TOutputBuf& buf); +std::shared_ptr<void> CreateMemoryArenaContext(); + } // namespace NCommon } // namespace NYql diff --git a/yql/essentials/sql/pg_dummy/pg_sql_dummy.cpp b/yql/essentials/sql/pg_dummy/pg_sql_dummy.cpp index ddf7f42ca7..02cafe876b 100644 --- a/yql/essentials/sql/pg_dummy/pg_sql_dummy.cpp +++ b/yql/essentials/sql/pg_dummy/pg_sql_dummy.cpp @@ -548,6 +548,12 @@ TString GetPostgresServerVersionStr() { namespace NYql { +namespace NCommon { + std::shared_ptr<void> CreateMemoryArenaContext() { + throw yexception() << "PG types are not supported"; + } +} + ui64 HexEncode(const char *src, size_t len, char *dst) { Y_UNUSED(src); Y_UNUSED(len); @@ -556,7 +562,6 @@ ui64 HexEncode(const char *src, size_t len, char *dst) { throw yexception() << "HexEncode in pg_dummy does nothing"; } - std::unique_ptr<IYtColumnConverter> BuildPgTopLevelColumnReader(std::unique_ptr<NKikimr::NUdf::IArrayBuilder>&& /* builder */, const NKikimr::NMiniKQL::TPgType* /* targetType */) { throw yexception() << "PG types are not supported"; } diff --git a/yt/cpp/mapreduce/client/transaction_pinger.cpp b/yt/cpp/mapreduce/client/transaction_pinger.cpp index 2bd4d8c416..d393e50a5d 100644 --- a/yt/cpp/mapreduce/client/transaction_pinger.cpp +++ b/yt/cpp/mapreduce/client/transaction_pinger.cpp @@ -200,91 +200,6 @@ private: //////////////////////////////////////////////////////////////////////////////// -class TThreadPerTransactionPinger - : public ITransactionPinger -{ -public: - ~TThreadPerTransactionPinger() override - { - if (Running_) { - RemoveTransaction(*PingableTx_); - } - } - - ITransactionPingerPtr GetChildTxPinger() override - { - return MakeIntrusive<TThreadPerTransactionPinger>(); - } - - void RegisterTransaction(const TPingableTransaction& pingableTx) override - { - YT_VERIFY(!Running_); - YT_VERIFY(PingableTx_ == nullptr); - - PingableTx_ = &pingableTx; - Running_ = true; - - PingerThread_ = std::make_unique<TThread>( - TThread::TParams{Pinger, this}.SetName("pingable_tx")); - PingerThread_->Start(); - } - - bool HasTransaction(const TPingableTransaction& pingableTx) override - { - return PingableTx_ == &pingableTx && Running_; - } - - void RemoveTransaction(const TPingableTransaction& pingableTx) override - { - YT_VERIFY(HasTransaction(pingableTx)); - - Running_ = false; - if (PingerThread_) { - PingerThread_->Join(); - } - } - -private: - static void* Pinger(void* opaque) - { - static_cast<TThreadPerTransactionPinger*>(opaque)->Pinger(); - return nullptr; - } - - void Pinger() - { - auto [minPingInterval, maxPingInterval] = PingableTx_->GetPingInterval(); - while (Running_) { - TDuration waitTime = minPingInterval + (maxPingInterval - minPingInterval) * RandomNumber<float>(); - try { - PingableTx_->Ping(); - } catch (const std::exception& e) { - if (auto* errorResponse = dynamic_cast<const TErrorResponse*>(&e)) { - if (errorResponse->GetError().ContainsErrorCode(NYT::NClusterErrorCodes::NTransactionClient::NoSuchTransaction)) { - break; - } else if (errorResponse->GetError().ContainsErrorCode(NYT::NClusterErrorCodes::Timeout)) { - waitTime = TDuration::MilliSeconds(0); - } - } - // Else do nothing, going to retry this error. - } - - TInstant t = Now(); - while (Running_ && Now() - t < waitTime) { - NDetail::TWaitProxy::Get()->Sleep(TDuration::MilliSeconds(100)); - } - } - } - -private: - const TPingableTransaction* PingableTx_ = nullptr; - - std::atomic<bool> Running_ = false; - std::unique_ptr<TThread> PingerThread_; -}; - -//////////////////////////////////////////////////////////////////////////////// - ITransactionPingerPtr CreateTransactionPinger(const TConfigPtr& config) { YT_LOG_DEBUG("Using async transaction pinger"); diff --git a/yt/yql/providers/yt/comp_nodes/dq/dq_yt_block_reader.cpp b/yt/yql/providers/yt/comp_nodes/dq/dq_yt_block_reader.cpp index 403a7facdd..50e7a65150 100644 --- a/yt/yql/providers/yt/comp_nodes/dq/dq_yt_block_reader.cpp +++ b/yt/yql/providers/yt/comp_nodes/dq/dq_yt_block_reader.cpp @@ -15,6 +15,7 @@ #include <yql/essentials/minikql/mkql_stats_registry.h> #include <yql/essentials/minikql/mkql_node.h> #include <yql/essentials/minikql/mkql_type_builder.h> +#include <yql/essentials/parser/pg_wrapper/interface/codec.h> #include <yt/yt/core/concurrency/thread_pool.h> #include <yt/yt/core/threading/thread.h> @@ -309,25 +310,26 @@ public: } arrow::Status OnRecordBatchDecoded(std::shared_ptr<arrow::RecordBatch> batch) override { - NKikimr::NMiniKQL::TScopedAlloc scope(__LOCATION__); - TThrowingBindTerminator t; - - YQL_ENSURE(batch); - MKQL_ADD_STAT(JobStats_, BlockCount, 1); std::vector<arrow::Datum> result; - YQL_ENSURE((size_t)batch->num_columns() == ColumnConverters_.size()); - result.resize(ColumnConverters_.size()); - size_t matchedColumns = 0; - for (size_t i = 0; i < ColumnConverters_.size(); ++i) { - auto columnIdxIt = ColumnOrderMapping.find(batch->schema()->field_names()[i]); - if (ColumnOrderMapping.end() == columnIdxIt) { - continue; + { + auto ctx = NCommon::CreateMemoryArenaContext(); + + YQL_ENSURE(batch); + MKQL_ADD_STAT(JobStats_, BlockCount, 1); + YQL_ENSURE((size_t)batch->num_columns() == ColumnConverters_.size()); + result.resize(ColumnConverters_.size()); + size_t matchedColumns = 0; + for (size_t i = 0; i < ColumnConverters_.size(); ++i) { + auto columnIdxIt = ColumnOrderMapping.find(batch->schema()->field_names()[i]); + if (ColumnOrderMapping.end() == columnIdxIt) { + continue; + } + ++matchedColumns; + auto columnIdx = columnIdxIt->second; + result[columnIdx] = std::move(ColumnConverters_[columnIdx]->Convert(batch->column(i)->data())); } - ++matchedColumns; - auto columnIdx = columnIdxIt->second; - result[columnIdx] = std::move(ColumnConverters_[columnIdx]->Convert(batch->column(i)->data())); + Y_ENSURE(matchedColumns == ColumnOrderMapping.size()); } - Y_ENSURE(matchedColumns == ColumnOrderMapping.size()); Consumer_->HandleResult(std::make_shared<TResultBatch>(batch->num_rows(), std::move(result))); return arrow::Status::OK(); } diff --git a/yt/yql/providers/yt/comp_nodes/dq/ya.make.inc b/yt/yql/providers/yt/comp_nodes/dq/ya.make.inc index 601177fb3f..332980a4c0 100644 --- a/yt/yql/providers/yt/comp_nodes/dq/ya.make.inc +++ b/yt/yql/providers/yt/comp_nodes/dq/ya.make.inc @@ -15,6 +15,7 @@ PEERDIR( yql/essentials/public/udf contrib/libs/apache/arrow contrib/libs/flatbuffers + yql/essentials/parser/pg_wrapper/interface ) ADDINCL( diff --git a/yt/yql/providers/yt/gateway/native/yql_yt_native.cpp b/yt/yql/providers/yt/gateway/native/yql_yt_native.cpp index be8ce859a7..9e285ea166 100644 --- a/yt/yql/providers/yt/gateway/native/yql_yt_native.cpp +++ b/yt/yql/providers/yt/gateway/native/yql_yt_native.cpp @@ -412,28 +412,28 @@ TString GenerateInputQuery(const TRichYPath& path, const TString& whereExpressio return result; } -void SetInputQuerySpec(NYT::TNode& spec, const TString& inputQuery, bool useSystemColumns) { +void SetInputQuerySpec(NYT::TNode& spec, const TString& inputQuery, const TYtSettings::TConstPtr& config, bool useSystemColumns) { spec["input_query"] = inputQuery; spec["input_query_filter_options"]["enable_chunk_filter"] = true; - spec["input_query_filter_options"]["enable_row_filter"] = true; + spec["input_query_filter_options"]["enable_row_filter"] = config->PruneQLFilterLambda.Get().GetOrElse(DEFAULT_PRUNE_QL_FILTER_LAMBDA); if (useSystemColumns) { spec["input_query_options"]["use_system_columns"] = true; } } -void PrepareInputQueryForMerge(NYT::TNode& spec, TVector<TRichYPath>& paths, const TString& whereExpression) { +void PrepareInputQueryForMerge(NYT::TNode& spec, TVector<TRichYPath>& paths, const TString& whereExpression, const TYtSettings::TConstPtr& config) { // YQL-19382 if (whereExpression) { YQL_ENSURE(paths.size() == 1, "YtQLFilter: multiple inputs are not supported"); auto& path = paths[0]; const TString inputQuery = GenerateInputQuery(path, whereExpression, /*useSystemColumns*/ false); path.Columns_.Clear(); - SetInputQuerySpec(spec, inputQuery, /*useSystemColumns*/ false); + SetInputQuerySpec(spec, inputQuery, config, /*useSystemColumns*/ false); } } template <typename T> -void PrepareInputQueryForMap(NYT::TNode& spec, T& specWithPaths, const TString& whereExpression, bool useSystemColumns) { +void PrepareInputQueryForMap(NYT::TNode& spec, T& specWithPaths, const TString& whereExpression, const TYtSettings::TConstPtr& config, bool useSystemColumns) { // YQL-19382 if (whereExpression) { const auto& inputs = specWithPaths.GetInputs(); @@ -444,7 +444,7 @@ void PrepareInputQueryForMap(NYT::TNode& spec, T& specWithPaths, const TString& path.Columns_.Clear(); specWithPaths.SetInput(0, path); } - SetInputQuerySpec(spec, inputQuery, useSystemColumns); + SetInputQuerySpec(spec, inputQuery, config, useSystemColumns); } } @@ -3675,7 +3675,7 @@ private: spec["schema_inference_mode"] = "from_output"; // YTADMINREQ-17692 } - PrepareInputQueryForMerge(spec, mergeOpSpec.Inputs_, inputQueryExpr); + PrepareInputQueryForMerge(spec, mergeOpSpec.Inputs_, inputQueryExpr, execCtx->Options_.Config()); return execCtx->RunOperation([entry, mergeOpSpec = std::move(mergeOpSpec), spec = std::move(spec)](){ return entry->Tx->Merge(mergeOpSpec, TOperationOptions().StartOperationMode(TOperationOptions::EStartOperationMode::AsyncPrepare).Spec(spec)); @@ -3864,7 +3864,7 @@ private: spec["job_count"] = static_cast<i64>(*jobCount); } - PrepareInputQueryForMap(spec, mapOpSpec, inputQueryExpr, /*useSystemColumns*/ useSkiff); + PrepareInputQueryForMap(spec, mapOpSpec, inputQueryExpr, execCtx->Options_.Config(), /*useSystemColumns*/ useSkiff); TOperationOptions opOpts; FillOperationOptions(opOpts, execCtx, entry); @@ -4366,7 +4366,7 @@ private: spec["mapper"]["output_streams"] = intermediateStreams; } - PrepareInputQueryForMap(spec, mapReduceOpSpec, inputQueryExpr, /*useSystemColumns*/ useSkiff); + PrepareInputQueryForMap(spec, mapReduceOpSpec, inputQueryExpr, execCtx->Options_.Config(), /*useSystemColumns*/ useSkiff); TOperationOptions opOpts; FillOperationOptions(opOpts, execCtx, entry); @@ -4514,7 +4514,7 @@ private: spec["reducer"]["enable_input_table_index"] = true; } - PrepareInputQueryForMap(spec, mapReduceOpSpec, inputQueryExpr, /*useSystemColumns*/ useSkiff); + PrepareInputQueryForMap(spec, mapReduceOpSpec, inputQueryExpr, execCtx->Options_.Config(), /*useSystemColumns*/ useSkiff); TOperationOptions opOpts; FillOperationOptions(opOpts, execCtx, entry); diff --git a/yt/yql/providers/yt/provider/phy_opt/yql_yt_phy_opt_ytql.cpp b/yt/yql/providers/yt/provider/phy_opt/yql_yt_phy_opt_ytql.cpp index 73cacf723d..bd24c6a473 100644 --- a/yt/yql/providers/yt/provider/phy_opt/yql_yt_phy_opt_ytql.cpp +++ b/yt/yql/providers/yt/provider/phy_opt/yql_yt_phy_opt_ytql.cpp @@ -161,23 +161,24 @@ TMaybeNode<TExprBase> TYtPhysicalOptProposalTransformer::ExtractQLFilters(TExprB } YQL_ENSURE(qlCompatiblePredicate); - TExprNode::TPtr prunedPredicate; - if (otherParts.empty()) { - prunedPredicate = MakeBool<true>(predicate->Pos(), ctx); - } else if (otherParts.size() == 1) { - prunedPredicate = otherParts.front(); - } else { - prunedPredicate = ctx.NewCallable(predicate->Pos(), "And", std::move(otherParts)); - } - YQL_ENSURE(prunedPredicate); - const auto typeNode = ExpandType(rowArg->Pos(), *rowArg->GetTypeAnn(), ctx); const auto lambdaNode = ctx.NewLambda(qlCompatiblePredicate->Pos(), ctx.NewArguments(qlCompatiblePredicate->Pos(), {newRowArg}), std::move(qlCompatiblePredicate)); const auto qlFilter = ctx.NewCallable(flatMap.Cast().Pos(), "YtQLFilter", {typeNode, lambdaNode}); auto newOpMap = ctx.ChangeChild(opMap.Ref(), TYtMap::idx_Settings, NYql::AddSetting(opMap.Settings().Ref(), EYtSettingType::QLFilter, qlFilter, ctx)); + const bool pruneLambda = State_->Configuration->PruneQLFilterLambda.Get().GetOrElse(DEFAULT_PRUNE_QL_FILTER_LAMBDA); if (pruneLambda) { + TExprNode::TPtr prunedPredicate; + if (otherParts.empty()) { + prunedPredicate = MakeBool<true>(predicate->Pos(), ctx); + } else if (otherParts.size() == 1) { + prunedPredicate = otherParts.front(); + } else { + prunedPredicate = ctx.NewCallable(predicate->Pos(), "And", std::move(otherParts)); + } + YQL_ENSURE(prunedPredicate); + const auto newFlatMap = Build<TCoFlatMapBase>(ctx, flatMap.Cast().Pos()) .InitFrom(flatMap.Cast()) .Lambda() diff --git a/yt/yql/tests/sql/suites/ql_filter/integer_single_disable_prune.cfg b/yt/yql/tests/sql/suites/ql_filter/integer_single_disable_prune.cfg new file mode 100644 index 0000000000..d0ce4581d7 --- /dev/null +++ b/yt/yql/tests/sql/suites/ql_filter/integer_single_disable_prune.cfg @@ -0,0 +1 @@ +in Input integer.txt diff --git a/yt/yql/tests/sql/suites/ql_filter/integer_single_disable_prune.sql b/yt/yql/tests/sql/suites/ql_filter/integer_single_disable_prune.sql new file mode 100644 index 0000000000..293c48c64a --- /dev/null +++ b/yt/yql/tests/sql/suites/ql_filter/integer_single_disable_prune.sql @@ -0,0 +1,6 @@ +pragma yt.UseQLFilter; +pragma yt.PruneQLFilterLambda='false'; + +select a +from plato.Input +where a > 5; diff --git a/yt/yt/client/api/delegating_client.h b/yt/yt/client/api/delegating_client.h index fc166e08de..ca2e945220 100644 --- a/yt/yt/client/api/delegating_client.h +++ b/yt/yt/client/api/delegating_client.h @@ -395,6 +395,11 @@ public: const TPartitionTablesOptions& options), (paths, options)) + DELEGATE_METHOD(TFuture<ITablePartitionReaderPtr>, CreateTablePartitionReader, ( + const TTablePartitionCookiePtr& descriptor, + const TReadTablePartitionOptions& options), + (descriptor, options)) + // Journals DELEGATE_METHOD(TFuture<void>, TruncateJournal, ( const NYPath::TYPath& path, diff --git a/yt/yt/client/api/private.cpp b/yt/yt/client/api/private.cpp new file mode 100644 index 0000000000..6379e75128 --- /dev/null +++ b/yt/yt/client/api/private.cpp @@ -0,0 +1,23 @@ +#include "private.h" + +#include "table_partition_reader.h" + +#include <yt/yt/client/table_client/schema.h> + +namespace NYT::NApi::NDetail { + +//////////////////////////////////////////////////////////////////////////////// + +std::vector<NTableClient::TTableSchemaPtr> GetTableSchemas(const ITablePartitionReaderPtr& partitionReader) +{ + return partitionReader->GetTableSchemas(); +} + +std::vector<NTableClient::TColumnNameFilter> GetColumnFilters(const ITablePartitionReaderPtr& partitionReader) +{ + return partitionReader->GetColumnFilters(); +} + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT::NApi::NDetail diff --git a/yt/yt/client/api/private.h b/yt/yt/client/api/private.h index 4a4e6567fe..2d2d9323bd 100644 --- a/yt/yt/client/api/private.h +++ b/yt/yt/client/api/private.h @@ -12,5 +12,17 @@ YT_DEFINE_GLOBAL(const NLogging::TLogger, ApiLogger, "Api"); //////////////////////////////////////////////////////////////////////////////// -} // namespace NYT::NApi +namespace NDetail { + +//////////////////////////////////////////////////////////////////////////////// + + +// Functions below are for internal usage only. No backward compatibility is guaranteed. +std::vector<NTableClient::TTableSchemaPtr> GetTableSchemas(const ITablePartitionReaderPtr& partitionReader); +std::vector<NTableClient::TColumnNameFilter> GetColumnFilters(const ITablePartitionReaderPtr& partitionReader); +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NDetail + +} // namespace NYT::NApi diff --git a/yt/yt/client/api/public.h b/yt/yt/client/api/public.h index 92a8204fc4..36dbf9d637 100644 --- a/yt/yt/client/api/public.h +++ b/yt/yt/client/api/public.h @@ -146,6 +146,8 @@ DECLARE_REFCOUNTED_STRUCT(IRowBatchWriter) DECLARE_REFCOUNTED_STRUCT(ITableReader) DECLARE_REFCOUNTED_STRUCT(ITableWriter) +DECLARE_REFCOUNTED_CLASS(ITablePartitionReader) + DECLARE_REFCOUNTED_STRUCT(ITableFragmentWriter); DECLARE_REFCOUNTED_STRUCT(IFileReader) diff --git a/yt/yt/client/api/rpc_proxy/api_service_proxy.h b/yt/yt/client/api/rpc_proxy/api_service_proxy.h index febae10ac9..57a46d9704 100644 --- a/yt/yt/client/api/rpc_proxy/api_service_proxy.h +++ b/yt/yt/client/api/rpc_proxy/api_service_proxy.h @@ -149,6 +149,8 @@ public: .SetStreamingEnabled(true)); DEFINE_RPC_PROXY_METHOD(NRpcProxy::NProto, GetColumnarStatistics); DEFINE_RPC_PROXY_METHOD(NRpcProxy::NProto, PartitionTables); + DEFINE_RPC_PROXY_METHOD(NRpcProxy::NProto, ReadTablePartition, + .SetStreamingEnabled(true)); // File caching DEFINE_RPC_PROXY_METHOD(NRpcProxy::NProto, GetFileFromCache); diff --git a/yt/yt/client/api/rpc_proxy/client_impl.cpp b/yt/yt/client/api/rpc_proxy/client_impl.cpp index c871e20e09..a5d3dc4249 100644 --- a/yt/yt/client/api/rpc_proxy/client_impl.cpp +++ b/yt/yt/client/api/rpc_proxy/client_impl.cpp @@ -11,6 +11,7 @@ #include "transaction.h" #include <yt/yt/client/api/helpers.h> +#include <yt/yt/client/api/table_partition_reader.h> #include <yt/yt/client/chaos_client/replication_card_serialization.h> @@ -1738,6 +1739,7 @@ TFuture<NApi::TMultiTablePartitions> TClient::PartitionTables( req->set_adjust_data_weight_per_partition(options.AdjustDataWeightPerPartition); req->set_enable_key_guarantee(options.EnableKeyGuarantee); + req->set_enable_cookies(options.EnableCookies); ToProto(req->mutable_transactional_options(), options); @@ -1746,6 +1748,34 @@ TFuture<NApi::TMultiTablePartitions> TClient::PartitionTables( })); } +TFuture<ITablePartitionReaderPtr> TClient::CreateTablePartitionReader( + const TTablePartitionCookiePtr& cookie, + const TReadTablePartitionOptions& /*options*/) +{ + YT_VERIFY(cookie); + + auto proxy = CreateApiServiceProxy(); + auto req = proxy.ReadTablePartition(); + InitStreamingRequest(*req); + + NProto::ToProto(req->mutable_cookie(), cookie); + + return NRpc::CreateRpcClientInputStream(std::move(req)) + .ApplyUnique(BIND([] (IAsyncZeroCopyInputStreamPtr&& inputStream) -> TFuture<ITablePartitionReaderPtr>{ + return inputStream->Read().Apply(BIND([=] (const TSharedRef& metaRef) { + // Actually we don't have any metadata in first version but we can have it in future. Just parse empty proto. + NApi::NRpcProxy::NProto::TRspReadTablePartitionMeta meta; + if (!TryDeserializeProto(&meta, metaRef)) { + THROW_ERROR_EXCEPTION("Failed to deserialize partition table reader meta information"); + } + + auto rowBatchReader = New<TRowBatchReader>(std::move(inputStream), /*isStreamWithStatistics*/ false); + + return NApi::CreateTablePartitionReader(rowBatchReader, /*schemas*/ {}, /*columnFilters=*/ {}); + })); + })); +} + TFuture<void> TClient::TruncateJournal( const TYPath& path, i64 rowCount, diff --git a/yt/yt/client/api/rpc_proxy/client_impl.h b/yt/yt/client/api/rpc_proxy/client_impl.h index db95372369..151a239862 100644 --- a/yt/yt/client/api/rpc_proxy/client_impl.h +++ b/yt/yt/client/api/rpc_proxy/client_impl.h @@ -347,6 +347,10 @@ public: const std::vector<NYPath::TRichYPath>& paths, const NApi::TPartitionTablesOptions& options) override; + TFuture<ITablePartitionReaderPtr> CreateTablePartitionReader( + const TTablePartitionCookiePtr& tablePartitionDescriptor, + const TReadTablePartitionOptions& options) override; + TFuture<void> TruncateJournal( const NYPath::TYPath& path, i64 rowCount, diff --git a/yt/yt/client/api/rpc_proxy/connection_impl.cpp b/yt/yt/client/api/rpc_proxy/connection_impl.cpp index 4ac7a42726..a6275f5d9a 100644 --- a/yt/yt/client/api/rpc_proxy/connection_impl.cpp +++ b/yt/yt/client/api/rpc_proxy/connection_impl.cpp @@ -1,10 +1,7 @@ #include "connection_impl.h" -#include "discovery_service_proxy.h" #include "connection_impl.h" #include "client_impl.h" #include "config.h" -#include "helpers.h" -#include "private.h" #include <yt/yt/core/net/local_address.h> #include <yt/yt/core/net/address.h> diff --git a/yt/yt/client/api/rpc_proxy/discovery_service_proxy.h b/yt/yt/client/api/rpc_proxy/discovery_service_proxy.h deleted file mode 100644 index 8a524be756..0000000000 --- a/yt/yt/client/api/rpc_proxy/discovery_service_proxy.h +++ /dev/null @@ -1,28 +0,0 @@ -#pragma once - -#include "public.h" - -#include "protocol_version.h" - -#include <yt/yt_proto/yt/client/api/rpc_proxy/proto/discovery_service.pb.h> - -#include <yt/yt/core/rpc/client.h> - -namespace NYT::NApi::NRpcProxy { - -//////////////////////////////////////////////////////////////////////////////// - -class TDiscoveryServiceProxy - : public NRpc::TProxyBase -{ -public: - DEFINE_RPC_PROXY(TDiscoveryServiceProxy, DiscoveryService, - .SetProtocolVersion(NRpc::TProtocolVersion{0, 0})); - - DEFINE_RPC_PROXY_METHOD(NRpcProxy::NProto, DiscoverProxies, - .SetMultiplexingBand(NRpc::EMultiplexingBand::Control)); -}; - -//////////////////////////////////////////////////////////////////////////////// - -} // namespace NYT::NApi::NRpcProxy diff --git a/yt/yt/client/api/rpc_proxy/helpers.cpp b/yt/yt/client/api/rpc_proxy/helpers.cpp index 67b0e789de..1a316ed374 100644 --- a/yt/yt/client/api/rpc_proxy/helpers.cpp +++ b/yt/yt/client/api/rpc_proxy/helpers.cpp @@ -1111,6 +1111,18 @@ void ToProto( aggregateStatistics->set_chunk_count(multiTablePartition.AggregateStatistics.ChunkCount); aggregateStatistics->set_data_weight(multiTablePartition.AggregateStatistics.DataWeight); aggregateStatistics->set_row_count(multiTablePartition.AggregateStatistics.RowCount); + + if (multiTablePartition.Cookie) { + ToProto(protoMultiTablePartition->mutable_cookie(), multiTablePartition.Cookie); + } +} + +void ToProto( + TProtobufString* protoCookie, + const TTablePartitionCookiePtr& cookie) +{ + auto cookieBytes = ConvertToYsonString(cookie); + *protoCookie = cookieBytes.ToString(); } void FromProto( @@ -1127,6 +1139,10 @@ void FromProto( multiTablePartition->AggregateStatistics.DataWeight = aggregateStatistics.data_weight(); multiTablePartition->AggregateStatistics.RowCount = aggregateStatistics.row_count(); } + + if (protoMultiTablePartition.has_cookie()) { + FromProto(&multiTablePartition->Cookie, protoMultiTablePartition.cookie()); + } } void FromProto( @@ -1138,6 +1154,13 @@ void FromProto( protoRspPartitionTables.partitions()); } +void FromProto( + TTablePartitionCookiePtr* cookie, + const TProtobufString& protoCookie) +{ + *cookie = ConvertTo<TTablePartitionCookiePtr>(TYsonStringBuf(protoCookie)); +} + void ToProto( NProto::TRowBatchReadOptions* proto, const NQueueClient::TQueueRowBatchReadOptions& result) diff --git a/yt/yt/client/api/rpc_proxy/helpers.h b/yt/yt/client/api/rpc_proxy/helpers.h index 9e7f432e62..8bfa68b54f 100644 --- a/yt/yt/client/api/rpc_proxy/helpers.h +++ b/yt/yt/client/api/rpc_proxy/helpers.h @@ -196,6 +196,10 @@ void ToProto( NProto::TMultiTablePartition* protoMultiTablePartition, const NApi::TMultiTablePartition& multiTablePartition); +void ToProto( + TProtobufString* protoCookie, + const TTablePartitionCookiePtr& cookie); + void FromProto( NApi::TMultiTablePartition* multiTablePartition, const NProto::TMultiTablePartition& protoMultiTablePartition); @@ -204,6 +208,10 @@ void FromProto( NApi::TMultiTablePartitions* multiTablePartitions, const NProto::TRspPartitionTables& protoRspPartitionTables); +void FromProto( + TTablePartitionCookiePtr* cookie, + const TProtobufString& protoCookie); + void ToProto( NProto::TRowBatchReadOptions* proto, const NQueueClient::TQueueRowBatchReadOptions& result); diff --git a/yt/yt/client/api/table_client.cpp b/yt/yt/client/api/table_client.cpp index 77259821b5..8e86a5f924 100644 --- a/yt/yt/client/api/table_client.cpp +++ b/yt/yt/client/api/table_client.cpp @@ -1,5 +1,7 @@ #include "table_client.h" +#include <yt/yt/client/signature/signature.h> + #include <yt/yt/client/ypath/rich.h> #include <yt/yt/core/ytree/fluent.h> @@ -32,6 +34,10 @@ void Serialize(const TMultiTablePartition& partition, NYson::IYsonConsumer* cons BuildYsonFluently(consumer) .BeginMap() .Item("table_ranges").Value(partition.TableRanges) + .DoIf(static_cast<bool>(partition.Cookie), [&] (TFluentMap fluent) { + auto ysonString = NYson::ConvertToYsonString(partition.Cookie); + fluent.Item("cookie").Value(ysonString.AsStringBuf()); + }) .Item("aggregate_statistics").Value(partition.AggregateStatistics) .EndMap(); } @@ -49,4 +55,3 @@ void Serialize(const TMultiTablePartitions& partitions, NYson::IYsonConsumer* co //////////////////////////////////////////////////////////////////////////////// } // namespace NYT::NApi - diff --git a/yt/yt/client/api/table_client.h b/yt/yt/client/api/table_client.h index 01b2b28d20..c6faa288fa 100644 --- a/yt/yt/client/api/table_client.h +++ b/yt/yt/client/api/table_client.h @@ -316,13 +316,25 @@ struct TPartitionTablesOptions std::optional<int> MaxPartitionCount; bool AdjustDataWeightPerPartition = true; bool EnableKeyGuarantee = false; + + //! Whether to return cookies that can be fed to CreateTablePartitionReader. + bool EnableCookies = false; }; +struct TReadTablePartitionOptions + : public TTableReaderOptions +{ }; + +YT_DEFINE_STRONG_TYPEDEF(TTablePartitionCookiePtr, NSignature::TSignaturePtr); + struct TMultiTablePartition { //! Table ranges are indexed by table index. std::vector<NYPath::TRichYPath> TableRanges; + //! Cookie that can be fed into CreateTablePartitionReader. + TTablePartitionCookiePtr Cookie; + //! Aggregate statistics of all the table ranges in the partition. NTableClient::TChunkStripeStatistics AggregateStatistics; }; @@ -473,7 +485,15 @@ struct ITableClient virtual TFuture<TMultiTablePartitions> PartitionTables( const std::vector<NYPath::TRichYPath>& paths, - const TPartitionTablesOptions& options) = 0; + const TPartitionTablesOptions& options = {}) = 0; + + /** + * Read table partition that was previously received from PartitionTables method. + * This method is cheaper than ReadTable since such reading doesn't make request to master in typical case. + */ + virtual TFuture<ITablePartitionReaderPtr> CreateTablePartitionReader( + const TTablePartitionCookiePtr& cookie, + const TReadTablePartitionOptions& options = {}) = 0; }; //////////////////////////////////////////////////////////////////////////////// diff --git a/yt/yt/client/api/table_partition_reader.cpp b/yt/yt/client/api/table_partition_reader.cpp new file mode 100644 index 0000000000..a3bd4621f2 --- /dev/null +++ b/yt/yt/client/api/table_partition_reader.cpp @@ -0,0 +1,73 @@ +#include "table_partition_reader.h" + +#include <yt/yt/client/table_client/schema.h> + +namespace NYT::NApi { + +using namespace NTableClient; + +//////////////////////////////////////////////////////////////////////////////// + +class TTablePartitionReader + : public ITablePartitionReader +{ +public: + TTablePartitionReader( + IRowBatchReaderPtr rowBatchReader, + std::vector<TTableSchemaPtr> tableSchemas, + std::vector<TColumnNameFilter> columnFilters) + : RowBatchReader_(std::move(rowBatchReader)) + , TableSchemas_(std::move(tableSchemas)) + , ColumnFilters_(std::move(columnFilters)) + { + YT_VERIFY(TableSchemas_.size() == ColumnFilters_.size()); + } + + TFuture<void> GetReadyEvent() const override + { + return RowBatchReader_->GetReadyEvent(); + } + + IUnversionedRowBatchPtr Read(const TRowBatchReadOptions& options) override + { + return RowBatchReader_->Read(options); + } + + const TNameTablePtr& GetNameTable() const override + { + return RowBatchReader_->GetNameTable(); + } + + std::vector<TTableSchemaPtr> GetTableSchemas() const override + { + if (TableSchemas_.empty()) { + THROW_ERROR_EXCEPTION("Table schemas are unknown"); + } + return TableSchemas_; + } + + std::vector<TColumnNameFilter> GetColumnFilters() const override + { + if (ColumnFilters_.empty()) { + THROW_ERROR_EXCEPTION("Column filters are unknown"); + } + return ColumnFilters_; + } + +private: + const IRowBatchReaderPtr RowBatchReader_; + const std::vector<TTableSchemaPtr> TableSchemas_; + const std::vector<TColumnNameFilter> ColumnFilters_; +}; + +ITablePartitionReaderPtr CreateTablePartitionReader( + const IRowBatchReaderPtr& rowBatchReader, + const std::vector<TTableSchemaPtr>& tableSchemas, + const std::vector<TColumnNameFilter>& columnFilters) +{ + return New<TTablePartitionReader>(rowBatchReader, tableSchemas, columnFilters); +} + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT::NApi diff --git a/yt/yt/client/api/table_partition_reader.h b/yt/yt/client/api/table_partition_reader.h new file mode 100644 index 0000000000..5e2e909bec --- /dev/null +++ b/yt/yt/client/api/table_partition_reader.h @@ -0,0 +1,33 @@ +#pragma once + +#include "public.h" +#include "private.h" + +#include "row_batch_reader.h" + +namespace NYT::NApi { + +//////////////////////////////////////////////////////////////////////////////// + +class ITablePartitionReader + : public virtual IRowBatchReader +{ +private: + // Functions below are for internal usage only. + virtual std::vector<NTableClient::TTableSchemaPtr> GetTableSchemas() const = 0; + virtual std::vector<NTableClient::TColumnNameFilter> GetColumnFilters() const = 0; + + friend std::vector<NTableClient::TTableSchemaPtr> NDetail::GetTableSchemas(const ITablePartitionReaderPtr&); + friend std::vector<NTableClient::TColumnNameFilter> NDetail::GetColumnFilters(const ITablePartitionReaderPtr&); +}; + +DEFINE_REFCOUNTED_TYPE(ITablePartitionReader) + +ITablePartitionReaderPtr CreateTablePartitionReader( + const IRowBatchReaderPtr& rowBatchReader, + const std::vector<NTableClient::TTableSchemaPtr>& tableSchemas, + const std::vector<NTableClient::TColumnNameFilter>& columnFilters); + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT::NApi diff --git a/yt/yt/client/driver/driver.cpp b/yt/yt/client/driver/driver.cpp index f699cbd74f..11c96e1335 100644 --- a/yt/yt/client/driver/driver.cpp +++ b/yt/yt/client/driver/driver.cpp @@ -192,6 +192,8 @@ public: REGISTER_ALL(TPartitionTablesCommand, "partition_tables", Null, Structured, false, false); + REGISTER (TReadTablePartitionCommand, "read_table_partition", Null, Tabular, false, true , ApiVersion4); + REGISTER (TInsertRowsCommand, "insert_rows", Tabular, Null, true, true , ApiVersion3); REGISTER (TLockRowsCommand, "lock_rows", Tabular, Null, true, true , ApiVersion3); REGISTER (TDeleteRowsCommand, "delete_rows", Tabular, Null, true, true , ApiVersion3); diff --git a/yt/yt/client/driver/driver.h b/yt/yt/client/driver/driver.h index 8e8a25de41..b17edf8ba5 100644 --- a/yt/yt/client/driver/driver.h +++ b/yt/yt/client/driver/driver.h @@ -23,6 +23,8 @@ #include <yt/yt/core/ytree/public.h> +#include <library/cpp/yt/memory/memory_usage_tracker.h> + namespace NYT::NDriver { //////////////////////////////////////////////////////////////////////////////// @@ -78,6 +80,9 @@ struct TDriverRequest //! before first write to output stream. std::function<void()> ResponseParametersFinishedCallback; + //! Memory usage tracker. + IMemoryUsageTrackerPtr MemoryUsageTracker = GetNullMemoryUsageTracker(); + void Reset(); private: diff --git a/yt/yt/client/driver/table_commands.cpp b/yt/yt/client/driver/table_commands.cpp index ce1155abfd..6a2d4eb19c 100644 --- a/yt/yt/client/driver/table_commands.cpp +++ b/yt/yt/client/driver/table_commands.cpp @@ -4,12 +4,17 @@ #include <yt/yt/client/api/rowset.h> #include <yt/yt/client/api/skynet.h> +#include <yt/yt/client/api/table_partition_reader.h> #include <yt/yt/client/chaos_client/replication_card_serialization.h> #include <yt/yt/client/formats/config.h> #include <yt/yt/client/formats/parser.h> +#include <yt/yt/client/signature/generator.h> +#include <yt/yt/client/signature/signature.h> +#include <yt/yt/client/signature/validator.h> + #include <yt/yt/client/table_client/adapters.h> #include <yt/yt/client/table_client/blob_reader.h> #include <yt/yt/client/table_client/columnar_statistics.h> @@ -33,6 +38,7 @@ namespace NYT::NDriver { using namespace NApi; +using namespace NApi::NDetail; using namespace NChaosClient; using namespace NChunkClient; using namespace NCodegen; @@ -230,6 +236,51 @@ void TReadBlobTableCommand::DoExecute(ICommandContextPtr context) //////////////////////////////////////////////////////////////////////////////// +void TReadTablePartitionCommand::Register(TRegistrar registrar) +{ + registrar.Parameter("cookie", &TThis::Cookie); +} + +void TReadTablePartitionCommand::DoExecute(ICommandContextPtr context) +{ + auto client = context->GetClient(); + + auto cookie = ConvertTo<TTablePartitionCookiePtr>(TYsonString(Cookie)); + + auto valid = WaitFor(context->GetDriver()->GetSignatureValidator()->Validate(cookie.Underlying())) + .ValueOrThrow(); + + if (!valid) { + THROW_ERROR_EXCEPTION("Signature validation failed"); + } + + auto reader = WaitFor(client->CreateTablePartitionReader(cookie)) + .ValueOrThrow(); + + auto format = context->GetOutputFormat(); + auto formatWriter = CreateStaticTableWriterForFormat( + /*format*/ format, + /*nameTable*/ reader->GetNameTable(), + /*tableSchemas*/ GetTableSchemas(reader), + /*columnFilters*/ GetColumnFilters(reader), + /*output*/ context->Request().OutputStream, + /*enableContextSaving*/ false, + /*controlAttributesConfig*/ New<TControlAttributesConfig>(), + /*keyColumnCount*/ 0); + + TRowBatchReadOptions options{ + .MaxRowsPerRead = context->GetConfig()->ReadBufferRowCount, + .Columnar = (format.GetType() == EFormatType::Arrow), + }; + + PipeReaderToWriterByBatches( + reader, + formatWriter, + options); +} + +//////////////////////////////////////////////////////////////////////////////// + void TLocateSkynetShareCommand::Register(TRegistrar registrar) { registrar.Parameter("path", &TThis::Path); @@ -298,7 +349,8 @@ void TWriteTableCommand::DoExecuteImpl(const ICommandContextPtr& context) TWritingValueConsumer valueConsumer( schemalessWriter, ConvertTo<TTypeConversionConfigPtr>(context->GetInputFormat().Attributes()), - MaxRowBufferSize); + MaxRowBufferSize, + context->Request().MemoryUsageTracker); TTableOutput output(CreateParserForFormat( context->GetInputFormat(), @@ -454,6 +506,8 @@ void TPartitionTablesCommand::Register(TRegistrar registrar) .Default(false); registrar.Parameter("adjust_data_weight_per_partition", &TThis::AdjustDataWeightPerPartition) .Default(true); + registrar.Parameter("enable_cookies", &TThis::EnableCookies) + .Default(false); } void TPartitionTablesCommand::DoExecute(ICommandContextPtr context) @@ -467,10 +521,17 @@ void TPartitionTablesCommand::DoExecute(ICommandContextPtr context) Options.MaxPartitionCount = MaxPartitionCount; Options.EnableKeyGuarantee = EnableKeyGuarantee; Options.AdjustDataWeightPerPartition = AdjustDataWeightPerPartition; + Options.EnableCookies = EnableCookies; auto partitions = WaitFor(context->GetClient()->PartitionTables(Paths, Options)) .ValueOrThrow(); + for (auto& partition : partitions.Partitions) { + if (partition.Cookie) { + context->GetDriver()->GetSignatureGenerator()->Sign(partition.Cookie.Underlying()); + } + } + context->ProduceOutputValue(ConvertToYsonString(partitions)); } diff --git a/yt/yt/client/driver/table_commands.h b/yt/yt/client/driver/table_commands.h index 26dacebf7e..eebdc492cd 100644 --- a/yt/yt/client/driver/table_commands.h +++ b/yt/yt/client/driver/table_commands.h @@ -58,6 +58,21 @@ private: //////////////////////////////////////////////////////////////////////////////// +class TReadTablePartitionCommand + : public TTypedCommand<NApi::TTableReaderOptions> +{ + REGISTER_YSON_STRUCT_LITE(TReadTablePartitionCommand); + + static void Register(TRegistrar registrar); + +private: + std::string Cookie; + + void DoExecute(ICommandContextPtr context) override; +}; + +//////////////////////////////////////////////////////////////////////////////// + class TLocateSkynetShareCommand : public TTypedCommand<NApi::TLocateSkynetShareOptions> { @@ -141,6 +156,9 @@ private: //! the #partition_tables command will throw an exception. bool AdjustDataWeightPerPartition; + //! Return cookies that can be used with read_table_partition command + bool EnableCookies; + void DoExecute(ICommandContextPtr context) override; }; diff --git a/yt/yt/client/federated/client.cpp b/yt/yt/client/federated/client.cpp index 9e997a2728..ffce8ff06c 100644 --- a/yt/yt/client/federated/client.cpp +++ b/yt/yt/client/federated/client.cpp @@ -394,6 +394,7 @@ public: UNIMPLEMENTED_METHOD(TFuture<TSkynetSharePartsLocationsPtr>, LocateSkynetShare, (const NYPath::TRichYPath&, const TLocateSkynetShareOptions&)); UNIMPLEMENTED_METHOD(TFuture<std::vector<NTableClient::TColumnarStatistics>>, GetColumnarStatistics, (const std::vector<NYPath::TRichYPath>&, const TGetColumnarStatisticsOptions&)); UNIMPLEMENTED_METHOD(TFuture<TMultiTablePartitions>, PartitionTables, (const std::vector<NYPath::TRichYPath>&, const TPartitionTablesOptions&)); + UNIMPLEMENTED_METHOD(TFuture<ITablePartitionReaderPtr>, CreateTablePartitionReader, (const TTablePartitionCookiePtr&, const TReadTablePartitionOptions&)); UNIMPLEMENTED_METHOD(TFuture<NYson::TYsonString>, GetTablePivotKeys, (const NYPath::TYPath&, const TGetTablePivotKeysOptions&)); UNIMPLEMENTED_METHOD(TFuture<void>, CreateTableBackup, (const TBackupManifestPtr&, const TCreateTableBackupOptions&)); UNIMPLEMENTED_METHOD(TFuture<void>, RestoreTableBackup, (const TBackupManifestPtr&, const TRestoreTableBackupOptions&)); diff --git a/yt/yt/client/hedging/hedging.cpp b/yt/yt/client/hedging/hedging.cpp index 09ca2fbc0e..b6f7e294da 100644 --- a/yt/yt/client/hedging/hedging.cpp +++ b/yt/yt/client/hedging/hedging.cpp @@ -1,14 +1,14 @@ #include "hedging.h" -#include "cache.h" #include "config.h" #include "counter.h" #include "rpc.h" -#include "private.h" #include <yt/yt/client/api/client.h> #include <yt/yt/client/api/queue_transaction.h> +#include <yt/yt/client/cache/cache.h> + #include <yt/yt/client/misc/method_helpers.h> #include <yt/yt/client/table_client/unversioned_row.h> @@ -141,6 +141,7 @@ public: UNSUPPORTED_METHOD(TFuture<TSkynetSharePartsLocationsPtr>, LocateSkynetShare, (const TRichYPath&, const TLocateSkynetShareOptions&)); UNSUPPORTED_METHOD(TFuture<std::vector<NTableClient::TColumnarStatistics>>, GetColumnarStatistics, (const std::vector<TRichYPath>&, const TGetColumnarStatisticsOptions&)); UNSUPPORTED_METHOD(TFuture<TMultiTablePartitions>, PartitionTables, (const std::vector<TRichYPath>&, const TPartitionTablesOptions&)); + UNSUPPORTED_METHOD(TFuture<ITablePartitionReaderPtr>, CreateTablePartitionReader, (const TTablePartitionCookiePtr&, const TReadTablePartitionOptions&)); UNSUPPORTED_METHOD(TFuture<NYson::TYsonString>, GetTablePivotKeys, (const TYPath&, const TGetTablePivotKeysOptions&)); UNSUPPORTED_METHOD(TFuture<void>, CreateTableBackup, (const TBackupManifestPtr&, const TCreateTableBackupOptions&)); UNSUPPORTED_METHOD(TFuture<void>, RestoreTableBackup, (const TBackupManifestPtr&, const TRestoreTableBackupOptions&)); diff --git a/yt/yt/client/misc/workload.h b/yt/yt/client/misc/workload.h index 559402052b..63ce3a7cb3 100644 --- a/yt/yt/client/misc/workload.h +++ b/yt/yt/client/misc/workload.h @@ -16,6 +16,34 @@ namespace NYT { //////////////////////////////////////////////////////////////////////////////// +// This distribution of weights corresponds to the logic of the GetBasicPriority method. +static const TEnumIndexedArray<EWorkloadCategory, double> DefaultFairShareWorkloadCategoryWeights = { + {EWorkloadCategory::Idle, 0}, + + {EWorkloadCategory::SystemReplication, 1}, + {EWorkloadCategory::SystemMerge, 1}, + {EWorkloadCategory::SystemReincarnation, 1}, + {EWorkloadCategory::SystemTabletCompaction, 1}, + {EWorkloadCategory::SystemTabletPartitioning, 1}, + {EWorkloadCategory::SystemTabletPreload, 1}, + {EWorkloadCategory::SystemTabletReplication, 1}, + {EWorkloadCategory::SystemTabletStoreFlush, 1}, + {EWorkloadCategory::SystemArtifactCacheDownload, 1}, + {EWorkloadCategory::UserBatch, 1}, + + {EWorkloadCategory::SystemRepair, 2}, + {EWorkloadCategory::SystemTabletSnapshot, 2}, + + {EWorkloadCategory::UserInteractive, 3}, + {EWorkloadCategory::UserDynamicStoreRead, 3}, + {EWorkloadCategory::SystemTabletRecovery, 3}, + + {EWorkloadCategory::UserRealtime, 4}, + {EWorkloadCategory::SystemTabletLogging, 4}, +}; + +//////////////////////////////////////////////////////////////////////////////// + struct TWorkloadDescriptor { TWorkloadDescriptor( diff --git a/yt/yt/client/signature/generator.cpp b/yt/yt/client/signature/generator.cpp index 71c12d6ae2..33a431a5ab 100644 --- a/yt/yt/client/signature/generator.cpp +++ b/yt/yt/client/signature/generator.cpp @@ -34,6 +34,12 @@ ISignatureGeneratorPtr CreateDummySignatureGenerator() return New<TDummySignatureGenerator>(); } +const ISignatureGeneratorPtr& GetDummySignatureGenerator() +{ + static ISignatureGeneratorPtr signatureGenerator = CreateDummySignatureGenerator(); + return signatureGenerator; +} + //////////////////////////////////////////////////////////////////////////////// struct TAlwaysThrowingSignatureGenerator diff --git a/yt/yt/client/signature/generator.h b/yt/yt/client/signature/generator.h index 6574128736..771288af7f 100644 --- a/yt/yt/client/signature/generator.h +++ b/yt/yt/client/signature/generator.h @@ -23,6 +23,7 @@ DEFINE_REFCOUNTED_TYPE(ISignatureGenerator) //////////////////////////////////////////////////////////////////////////////// ISignatureGeneratorPtr CreateDummySignatureGenerator(); +const ISignatureGeneratorPtr& GetDummySignatureGenerator(); ISignatureGeneratorPtr CreateAlwaysThrowingSignatureGenerator(); diff --git a/yt/yt/client/table_client/public.h b/yt/yt/client/table_client/public.h index ac51658026..200ce9f0aa 100644 --- a/yt/yt/client/table_client/public.h +++ b/yt/yt/client/table_client/public.h @@ -275,6 +275,8 @@ using TKeyColumnTypes = TCompactVector<EValueType, 16>; class TColumnFilter; +using TColumnNameFilter = std::optional<std::vector<std::string>>; + struct TUnversionedValue; using TUnversionedValueRange = TRange<TUnversionedValue>; using TMutableUnversionedValueRange = TMutableRange<TUnversionedValue>; diff --git a/yt/yt/client/table_client/row_buffer.cpp b/yt/yt/client/table_client/row_buffer.cpp index bb16d61c1b..4df6c62a9d 100644 --- a/yt/yt/client/table_client/row_buffer.cpp +++ b/yt/yt/client/table_client/row_buffer.cpp @@ -12,12 +12,15 @@ TRowBuffer::TRowBuffer( TRefCountedTypeCookie tagCookie, IMemoryChunkProviderPtr chunkProvider, size_t startChunkSize, - IMemoryUsageTrackerPtr tracker) + IMemoryUsageTrackerPtr tracker, + bool allowMemoryOvercommit) : MemoryTracker_(std::move(tracker)) + , AllowMemoryOvercommit_(allowMemoryOvercommit) , Pool_( tagCookie, std::move(chunkProvider), startChunkSize) + , MemoryGuard_(TMemoryUsageTrackerGuard::Build(MemoryTracker_)) { } TChunkedMemoryPool* TRowBuffer::GetPool() @@ -28,7 +31,7 @@ TChunkedMemoryPool* TRowBuffer::GetPool() TMutableUnversionedRow TRowBuffer::AllocateUnversioned(int valueCount) { auto result = TMutableUnversionedRow::Allocate(&Pool_, valueCount); - ValidateNoOverflow(); + UpdateMemoryUsage(); return result; } @@ -44,7 +47,7 @@ TMutableVersionedRow TRowBuffer::AllocateVersioned( valueCount, writeTimestampCount, deleteTimestampCount); - ValidateNoOverflow(); + UpdateMemoryUsage(); return result; } @@ -56,7 +59,7 @@ void TRowBuffer::CaptureValue(TUnversionedValue* value) value->Data.String = dst; } - ValidateNoOverflow(); + UpdateMemoryUsage(); } TVersionedValue TRowBuffer::CaptureValue(const TVersionedValue& value) @@ -107,7 +110,7 @@ TMutableUnversionedRow TRowBuffer::CaptureRow(TUnversionedValueRange values, boo } } - ValidateNoOverflow(); + UpdateMemoryUsage(); return capturedRow; } @@ -175,7 +178,7 @@ TMutableUnversionedRow TRowBuffer::CaptureAndPermuteRow( capturedRow[valueCount++] = *addend; } - ValidateNoOverflow(); + UpdateMemoryUsage(); return capturedRow; } @@ -201,7 +204,7 @@ TMutableVersionedRow TRowBuffer::CaptureRow(TVersionedRow row, bool captureValue CaptureValues(capturedRow); } - ValidateNoOverflow(); + UpdateMemoryUsage(); return capturedRow; } @@ -310,7 +313,7 @@ TMutableVersionedRow TRowBuffer::CaptureAndPermuteRow( } } - ValidateNoOverflow(); + UpdateMemoryUsage(); return capturedRow; } @@ -318,7 +321,7 @@ TMutableVersionedRow TRowBuffer::CaptureAndPermuteRow( void TRowBuffer::Absorb(TRowBuffer&& other) { Pool_.Absorb(std::move(other.Pool_)); - ValidateNoOverflow(); + UpdateMemoryUsage(); } i64 TRowBuffer::GetSize() const @@ -333,17 +336,17 @@ i64 TRowBuffer::GetCapacity() const void TRowBuffer::Clear() { - MemoryGuard_.reset(); + MemoryGuard_.Release(); Pool_.Clear(); } void TRowBuffer::Purge() { - MemoryGuard_.reset(); + MemoryGuard_.Release(); Pool_.Purge(); } -void TRowBuffer::ValidateNoOverflow() +void TRowBuffer::UpdateMemoryUsage() { if (!MemoryTracker_) { return; @@ -351,11 +354,10 @@ void TRowBuffer::ValidateNoOverflow() auto capacity = Pool_.GetCapacity(); - if (!MemoryGuard_) { - MemoryGuard_ = TMemoryUsageTrackerGuard::TryAcquire(MemoryTracker_, capacity) - .ValueOrThrow(); + if (AllowMemoryOvercommit_) { + MemoryGuard_.SetSize(capacity); } else { - MemoryGuard_->TrySetSize(capacity) + MemoryGuard_.TrySetSize(capacity) .ThrowOnError(); } } diff --git a/yt/yt/client/table_client/row_buffer.h b/yt/yt/client/table_client/row_buffer.h index 2f7033827e..1896f84579 100644 --- a/yt/yt/client/table_client/row_buffer.h +++ b/yt/yt/client/table_client/row_buffer.h @@ -26,14 +26,17 @@ public: TRefCountedTypeCookie tagCookie, IMemoryChunkProviderPtr chunkProvider, size_t startChunkSize = TChunkedMemoryPool::DefaultStartChunkSize, - IMemoryUsageTrackerPtr tracker = nullptr); + IMemoryUsageTrackerPtr tracker = nullptr, + bool allowMemoryOvercommit = false); template <class TTag = TDefaultRowBufferPoolTag> explicit TRowBuffer( TTag /*tag*/ = TDefaultRowBufferPoolTag(), size_t startChunkSize = TChunkedMemoryPool::DefaultStartChunkSize, - IMemoryUsageTrackerPtr tracker = nullptr) + IMemoryUsageTrackerPtr tracker = nullptr, + bool allowMemoryOvercommit = false) : MemoryTracker_(std::move(tracker)) + , AllowMemoryOvercommit_(allowMemoryOvercommit) , Pool_( TTag(), startChunkSize) @@ -45,8 +48,10 @@ public: TRowBuffer( TTag /*tag*/, IMemoryChunkProviderPtr chunkProvider, - IMemoryUsageTrackerPtr tracker = nullptr) + IMemoryUsageTrackerPtr tracker = nullptr, + bool allowMemoryOvercommit = false) : MemoryTracker_(std::move(tracker)) + , AllowMemoryOvercommit_(allowMemoryOvercommit) , Pool_( GetRefCountedTypeCookie<TTag>(), std::move(chunkProvider)) @@ -112,11 +117,12 @@ public: private: const IMemoryUsageTrackerPtr MemoryTracker_; + const bool AllowMemoryOvercommit_; TChunkedMemoryPool Pool_; - std::optional<TMemoryUsageTrackerGuard> MemoryGuard_; + TMemoryUsageTrackerGuard MemoryGuard_; - void ValidateNoOverflow(); + void UpdateMemoryUsage(); }; DEFINE_REFCOUNTED_TYPE(TRowBuffer) diff --git a/yt/yt/client/table_client/value_consumer.cpp b/yt/yt/client/table_client/value_consumer.cpp index 4afb23343d..41db811312 100644 --- a/yt/yt/client/table_client/value_consumer.cpp +++ b/yt/yt/client/table_client/value_consumer.cpp @@ -316,11 +316,16 @@ struct TWritingValueConsumerBufferTag TWritingValueConsumer::TWritingValueConsumer( IUnversionedWriterPtr writer, TTypeConversionConfigPtr typeConversionConfig, - i64 maxRowBufferSize) + i64 maxRowBufferSize, + IMemoryUsageTrackerPtr tracker) : TValueConsumerBase(writer->GetSchema(), std::move(typeConversionConfig)) , Writer_(std::move(writer)) , MaxRowBufferSize_(maxRowBufferSize) - , RowBuffer_(New<TRowBuffer>(TWritingValueConsumerBufferTag())) + , RowBuffer_(New<TRowBuffer>( + TWritingValueConsumerBufferTag(), + TChunkedMemoryPool::DefaultStartChunkSize, + std::move(tracker), + /*allowMemoryOvercommit*/ true)) { YT_VERIFY(Writer_); InitializeIdToTypeMapping(); diff --git a/yt/yt/client/table_client/value_consumer.h b/yt/yt/client/table_client/value_consumer.h index 08ed4e9506..c32b5859f9 100644 --- a/yt/yt/client/table_client/value_consumer.h +++ b/yt/yt/client/table_client/value_consumer.h @@ -129,7 +129,8 @@ public: explicit TWritingValueConsumer( IUnversionedWriterPtr writer, TTypeConversionConfigPtr typeConversionConfig = New<TTypeConversionConfig>(), - i64 maxRowBufferSize = 1_MB); + i64 maxRowBufferSize = 1_MB, + IMemoryUsageTrackerPtr tracker = GetNullMemoryUsageTracker()); TFuture<void> Flush() override; const TNameTablePtr& GetNameTable() const override; diff --git a/yt/yt/client/unittests/mock/client.h b/yt/yt/client/unittests/mock/client.h index 95f8a29b35..254f08dd90 100644 --- a/yt/yt/client/unittests/mock/client.h +++ b/yt/yt/client/unittests/mock/client.h @@ -516,6 +516,11 @@ public: const TPartitionTablesOptions& options), (override)); + MOCK_METHOD(TFuture<ITablePartitionReaderPtr>, CreateTablePartitionReader, ( + const TTablePartitionCookiePtr& partition, + const TReadTablePartitionOptions& options), + (override)); + MOCK_METHOD(TFuture<void>, TruncateJournal, ( const NYPath::TYPath& path, i64 rowCount, diff --git a/yt/yt/client/ya.make b/yt/yt/client/ya.make index c660a2bd97..ee5c8da6ee 100644 --- a/yt/yt/client/ya.make +++ b/yt/yt/client/ya.make @@ -33,6 +33,8 @@ SRCS( api/sticky_transaction_pool.cpp api/options.cpp api/shuffle_client.cpp + api/table_partition_reader.cpp + api/private.cpp api/rpc_proxy/address_helpers.cpp api/rpc_proxy/public.cpp diff --git a/yt/yt/core/logging/structured_log-inl.h b/yt/yt/core/logging/structured_log-inl.h index b4f9c438b8..0368f4214b 100644 --- a/yt/yt/core/logging/structured_log-inl.h +++ b/yt/yt/core/logging/structured_log-inl.h @@ -12,16 +12,40 @@ namespace NYT::NLogging { //////////////////////////////////////////////////////////////////////////////// +namespace NDetail { + +//////////////////////////////////////////////////////////////////////////////// + +consteval TStructuredLogKey::TStructuredLogKey(const char* key) + : Key(key) +{ + if (std::string_view(key) == "message") { + // Entering this branch causes a compilation failure that contains this + // string in the error message. + throw "Key 'message' is reserved and can not be used"; + } +} + +template <typename T> +std::tuple<TStructuredLogKey, T> MakeTuple(TStructuredLogKey k, T&& t) +{ + return {k, std::forward<T>(t)}; +} + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NDetail + template <typename... Values> void LogStructuredEvent( const TLogger& logger, ELogLevel level, std::string_view message, - std::tuple<const char*, Values>&&... tags) + std::tuple<NDetail::TStructuredLogKey, Values>&&... tags) { auto event = LogStructuredEventFluently(logger, level).Item("message").Value(message); - ((event = event.Item(std::get<0>(tags)).Value(std::forward<Values>(std::get<1>(tags)))), ...); + ((event = event.Item(std::get<0>(tags).Key).Value(std::forward<Values>(std::get<1>(tags)))), ...); } //////////////////////////////////////////////////////////////////////////////// diff --git a/yt/yt/core/logging/structured_log.h b/yt/yt/core/logging/structured_log.h index d6e4fe476b..6aaa696f5c 100644 --- a/yt/yt/core/logging/structured_log.h +++ b/yt/yt/core/logging/structured_log.h @@ -20,19 +20,37 @@ namespace NYT::NLogging { //////////////////////////////////////////////////////////////////////////////// +namespace NDetail { + +//////////////////////////////////////////////////////////////////////////////// + +// Helper type to validate keys at compile-time. +struct TStructuredLogKey +{ + std::string_view Key; + consteval TStructuredLogKey(const char* key); +}; + +template <typename T> +std::tuple<TStructuredLogKey, T> MakeTuple(TStructuredLogKey k, T&& t); + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NDetail + template <typename... Values> void LogStructuredEvent( const TLogger& logger, ELogLevel level, std::string_view message, - std::tuple<const char*, Values>&&... tags); + std::tuple<NDetail::TStructuredLogKey, Values>&&... tags); //////////////////////////////////////////////////////////////////////////////// } // namespace NYT::NLogging -#define YT_SLOG_TUPLE(x) std::make_tuple x, -#define YT_SLOG_TUPLE_LAST(x) std::make_tuple x +#define YT_SLOG_TUPLE(x) NYT::NLogging::NDetail::MakeTuple x, +#define YT_SLOG_TUPLE_LAST(x) NYT::NLogging::NDetail::MakeTuple x // YT_SLOG_EVENT enforces that primary structured log messages are known at // compile-time, because structured errors (and logs in general) are usually diff --git a/yt/yt/core/misc/fair_share_hierarchical_queue-inl.h b/yt/yt/core/misc/fair_share_hierarchical_queue-inl.h index db9b51223a..8a6b69eabe 100644 --- a/yt/yt/core/misc/fair_share_hierarchical_queue-inl.h +++ b/yt/yt/core/misc/fair_share_hierarchical_queue-inl.h @@ -20,7 +20,7 @@ TFairShareHierarchyLevel<TTag>::TFairShareHierarchyLevel(TTag tag, double weight : Tag_(std::move(tag)) , Weight_(weight) { - YT_VERIFY(weight > 0.0); + YT_VERIFY(weight >= 0.0); } template <typename TTag> @@ -35,6 +35,19 @@ double TFairShareHierarchyLevel<TTag>::GetWeight() const return Weight_; } +template <typename TTag> +std::vector<TFairShareHierarchyLevel<TTag>> CreateHierarchyLevels(const std::vector<std::pair<TTag, double>>& tags) +{ + std::vector<TFairShareHierarchyLevel<TTag>> levels; + levels.reserve(tags.size()); + + for (const auto& tag : tags) { + levels.emplace_back(tag.first, tag.second); + } + + return levels; +} + //////////////////////////////////////////////////////////////////////////////// template <typename TTag> diff --git a/yt/yt/core/misc/fair_share_hierarchical_queue.h b/yt/yt/core/misc/fair_share_hierarchical_queue.h index 7149e11b55..9cc4c430a2 100644 --- a/yt/yt/core/misc/fair_share_hierarchical_queue.h +++ b/yt/yt/core/misc/fair_share_hierarchical_queue.h @@ -69,6 +69,9 @@ private: const double Weight_; }; +template <typename TTag> +std::vector<TFairShareHierarchyLevel<TTag>> CreateHierarchyLevels(const std::vector<std::pair<TTag, double>>& tags); + //////////////////////////////////////////////////////////////////////////////// struct TFairShareLogKey diff --git a/yt/yt/core/misc/serialize.cpp b/yt/yt/core/misc/serialize.cpp index 6bc396ddfc..b14db02bac 100644 --- a/yt/yt/core/misc/serialize.cpp +++ b/yt/yt/core/misc/serialize.cpp @@ -300,4 +300,3 @@ TEntityStreamLoadContext::TEntityStreamLoadContext( //////////////////////////////////////////////////////////////////////////////// } // namespace NYT - diff --git a/yt/yt/library/named_value/named_value.cpp b/yt/yt/library/named_value/named_value.cpp index a283e28849..b041bcd772 100644 --- a/yt/yt/library/named_value/named_value.cpp +++ b/yt/yt/library/named_value/named_value.cpp @@ -1,12 +1,16 @@ #include "named_value.h" +#include <yt/yt/core/ytree/convert.h> + +#include <util/string/escape.h> + namespace NYT::NNamedValue { using namespace NTableClient; //////////////////////////////////////////////////////////////////////////////// -NTableClient::TUnversionedOwningRow MakeRow( +TUnversionedOwningRow MakeRow( const TNameTablePtr& nameTable, const std::initializer_list<TNamedValue>& values) { @@ -17,7 +21,7 @@ NTableClient::TUnversionedOwningRow MakeRow( return builder.FinishRow(); } -NTableClient::TUnversionedOwningRow MakeRow( +TUnversionedOwningRow MakeRow( const TNameTablePtr& nameTable, const std::vector<TNamedValue>& values) { @@ -28,9 +32,23 @@ NTableClient::TUnversionedOwningRow MakeRow( return builder.FinishRow(); } +std::vector<TNamedValue> MakeNamedValueList( + const TNameTablePtr& nameTable, + TUnversionedRow row) +{ + std::vector<TNamedValue> result; + result.reserve(row.GetCount()); + + for (const auto& value : row) { + auto namedValue = TNamedValue(TString(nameTable->GetNameOrThrow(value.Id)), TNamedValue::ExtractValue(value)); + result.push_back(std::move(namedValue)); + } + return result; +} + //////////////////////////////////////////////////////////////////////////////// -NTableClient::TUnversionedValue TNamedValue::ToUnversionedValue(const NTableClient::TNameTablePtr& nameTable) const +TUnversionedValue TNamedValue::ToUnversionedValue(const TNameTablePtr& nameTable) const { const int valueId = nameTable->GetIdOrRegisterName(Name_); return std::visit([valueId] (const auto& value) -> TUnversionedValue { @@ -56,7 +74,7 @@ NTableClient::TUnversionedValue TNamedValue::ToUnversionedValue(const NTableClie }, Value_); } -TNamedValue::TValue TNamedValue::ExtractValue(const NTableClient::TUnversionedValue& value) +TNamedValue::TValue TNamedValue::ExtractValue(const TUnversionedValue& value) { auto getString = [] (const TUnversionedValue& value) { return value.AsString(); @@ -86,8 +104,8 @@ TNamedValue::TValue TNamedValue::ExtractValue(const NTableClient::TUnversionedVa YT_ABORT(); } -TNamedValue::TValue TNamedValue::ToValue(NTableClient::EValueType valueType, TStringBuf value) { - using namespace NTableClient; +TNamedValue::TValue TNamedValue::ToValue(EValueType valueType, TStringBuf value) +{ if (valueType == EValueType::String) { return TString(value); } else if (valueType == EValueType::Any) { @@ -99,16 +117,34 @@ TNamedValue::TValue TNamedValue::ToValue(NTableClient::EValueType valueType, TSt } } -//////////////////////////////////////////////////////////////////////////////// - -bool operator ==(const TNamedValue::TAny& lhs, const TNamedValue::TAny& rhs) +void FormatValue(TStringBuilderBase* builder, const TNamedValue& value, TStringBuf /*spec*/) { - return lhs.Value == rhs.Value; + using namespace NYson; + + builder->AppendFormat("%Qv=", value.Name_); + auto text = std::visit([] (const auto& value) -> TString { + using T = std::decay_t<decltype(value)>; + if constexpr (std::is_same_v<T, TNamedValue::TAny>) { + auto result = TString("<type=any>"); + result += value.Value; + return ConvertToYsonString(TYsonString(result), EYsonFormat::Text).ToString(); + } else if constexpr (std::is_same_v<T, TNamedValue::TComposite>) { + auto result = TString("<type=composite>"); + result += ConvertToYsonString(TYsonString(value.Value)).ToString(); + return ConvertToYsonString(TYsonString(result), EYsonFormat::Text).ToString(); + } else if constexpr (std::is_same_v<T, std::nullptr_t>) { + return "#"; + } else { + return ConvertToYsonString(value, EYsonFormat::Text).ToString(); + } + }, value.Value_); + + builder->AppendString(ConvertToYsonString(text, EYsonFormat::Text).AsStringBuf()); } -bool operator ==(const TNamedValue::TComposite& lhs, const TNamedValue::TComposite& rhs) +void PrintTo(const TNamedValue& value, std::ostream* os) { - return lhs.Value == rhs.Value; + *os << Format("%v", value); } //////////////////////////////////////////////////////////////////////////////// diff --git a/yt/yt/library/named_value/named_value.h b/yt/yt/library/named_value/named_value.h index 67592fe9f3..c00081219a 100644 --- a/yt/yt/library/named_value/named_value.h +++ b/yt/yt/library/named_value/named_value.h @@ -32,6 +32,9 @@ NTableClient::TUnversionedOwningRow MakeRow( const NTableClient::TNameTablePtr& nameTable, const std::vector<TNamedValue>& values); + +std::vector<TNamedValue> MakeNamedValueList(const NTableClient::TNameTablePtr& nameTable, NTableClient::TUnversionedRow row); + //////////////////////////////////////////////////////////////////////////////// //! Slow but convenient analogue of TUnversionedValue @@ -41,11 +44,13 @@ public: struct TAny { TString Value; + friend bool operator ==(const TNamedValue::TAny& lhs, const TNamedValue::TAny& rhs) = default; }; struct TComposite { TString Value; + friend bool operator ==(const TNamedValue::TComposite& lhs, const TNamedValue::TComposite& rhs) = default; }; using TValue = std::variant<std::nullptr_t, i64, ui64, double, bool, TString, TAny, TComposite>; @@ -90,12 +95,11 @@ private: private: TString Name_; TValue Value_; -}; -//////////////////////////////////////////////////////////////////////////////// - -bool operator ==(const TNamedValue::TAny& lhs, const TNamedValue::TAny& rhs); -bool operator ==(const TNamedValue::TComposite& lhs, const TNamedValue::TComposite& rhs); + friend bool operator ==(const TNamedValue& lhs, const TNamedValue& rhs) = default; + friend void PrintTo(const TNamedValue& value, std::ostream* os); + friend void FormatValue(TStringBuilderBase* builder, const TNamedValue& value, TStringBuf spec); +}; //////////////////////////////////////////////////////////////////////////////// diff --git a/yt/yt_proto/yt/client/api/rpc_proxy/proto/api_service.proto b/yt/yt_proto/yt/client/api/rpc_proxy/proto/api_service.proto index ba3879c8c5..75c4ffe241 100644 --- a/yt/yt_proto/yt/client/api/rpc_proxy/proto/api_service.proto +++ b/yt/yt_proto/yt/client/api/rpc_proxy/proto/api_service.proto @@ -2150,6 +2150,8 @@ message TReqPartitionTables optional bool enable_key_guarantee = 8; optional bool adjust_data_weight_per_partition = 9; + optional bool enable_cookies = 10; + optional TTransactionalOptions transactional_options = 100; } @@ -2165,6 +2167,8 @@ message TMultiTablePartition } optional TStatistics aggregate_statistics = 2; + + optional bytes cookie = 3; } message TRspPartitionTables @@ -2174,6 +2178,31 @@ message TRspPartitionTables //////////////////////////////////////////////////////////////////////////////// +message TReqReadTablePartition +{ + optional bytes cookie = 1; + + optional bool unordered = 2 [default = false]; + optional bool omit_inaccessible_columns = 3 [default = false]; + optional bool enable_table_index = 5 [default = false]; + optional bool enable_row_index = 6 [default = false]; + optional bool enable_range_index = 7 [default = false]; + optional bytes config = 4; // YSON-serialized TTableReaderConfig + optional ERowsetFormat desired_rowset_format = 8 [default = RF_YT_WIRE]; + optional ERowsetFormat arrow_fallback_rowset_format = 10 [default = RF_YT_WIRE]; + optional bytes format = 9; // YSON-serialized TFormat +} + +message TRspReadTablePartition +{ +} + +message TRspReadTablePartitionMeta +{ +} + +//////////////////////////////////////////////////////////////////////////////// + message TReqBalanceTabletCells { required string bundle = 1; |