aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAlexander Smirnov <alex@ydb.tech>2025-03-22 00:51:33 +0000
committerAlexander Smirnov <alex@ydb.tech>2025-03-22 00:51:33 +0000
commit25754ddf6bf0d9f39deb2b793a946176b6d7c9fb (patch)
treeb81cf85ea8fbef3290618d909971b9ad02dd9e46
parentc18aa245b684fef9b14c697b38e6c6695e0733f3 (diff)
parent87f8036d8027790ed03ac34feb2b5f3e141f948c (diff)
downloadydb-25754ddf6bf0d9f39deb2b793a946176b6d7c9fb.tar.gz
Merge branch 'rightlib' into merge-libs-250322-0050
-rw-r--r--build/conf/python.conf8
-rw-r--r--build/mapping.conf.json2
-rw-r--r--build/platform/test_tool/host.ya.make.inc10
-rw-r--r--build/platform/test_tool/host_os.ya.make.inc10
-rw-r--r--build/sanitize-blacklist.txt2
-rw-r--r--build/sysincl/misc.yml6
-rw-r--r--build/ymake.core.conf2
-rw-r--r--contrib/python/argcomplete/py3/.dist-info/METADATA8
-rw-r--r--contrib/python/argcomplete/py3/README.rst2
-rw-r--r--contrib/python/argcomplete/py3/argcomplete/_check_module.py34
-rw-r--r--contrib/python/argcomplete/py3/argcomplete/bash_completion.d/_python-argcomplete6
-rw-r--r--contrib/python/argcomplete/py3/argcomplete/completers.py17
-rw-r--r--contrib/python/argcomplete/py3/argcomplete/packages/_argparse.py2
-rw-r--r--contrib/python/argcomplete/py3/argcomplete/packages/_shlex.py2
-rw-r--r--contrib/python/argcomplete/py3/argcomplete/scripts/python_argcomplete_check_easy_install_script.py6
-rw-r--r--contrib/python/argcomplete/py3/argcomplete/shell_integration.py6
-rw-r--r--contrib/python/argcomplete/py3/ya.make2
-rw-r--r--library/cpp/neh/tcp2.cpp50
-rw-r--r--library/cpp/netliba/v6/ib_low.cpp2
-rwxr-xr-xya20
-rw-r--r--yql/essentials/parser/pg_wrapper/arena_ctx.cpp11
-rw-r--r--yql/essentials/parser/pg_wrapper/arrow.cpp1
-rw-r--r--yql/essentials/parser/pg_wrapper/interface/codec.h2
-rw-r--r--yql/essentials/sql/pg_dummy/pg_sql_dummy.cpp7
-rw-r--r--yt/cpp/mapreduce/client/transaction_pinger.cpp85
-rw-r--r--yt/yql/providers/yt/comp_nodes/dq/dq_yt_block_reader.cpp34
-rw-r--r--yt/yql/providers/yt/comp_nodes/dq/ya.make.inc1
-rw-r--r--yt/yql/providers/yt/gateway/native/yql_yt_native.cpp20
-rw-r--r--yt/yql/providers/yt/provider/phy_opt/yql_yt_phy_opt_ytql.cpp21
-rw-r--r--yt/yql/tests/sql/suites/ql_filter/integer_single_disable_prune.cfg1
-rw-r--r--yt/yql/tests/sql/suites/ql_filter/integer_single_disable_prune.sql6
-rw-r--r--yt/yt/client/api/delegating_client.h5
-rw-r--r--yt/yt/client/api/private.cpp23
-rw-r--r--yt/yt/client/api/private.h14
-rw-r--r--yt/yt/client/api/public.h2
-rw-r--r--yt/yt/client/api/rpc_proxy/api_service_proxy.h2
-rw-r--r--yt/yt/client/api/rpc_proxy/client_impl.cpp30
-rw-r--r--yt/yt/client/api/rpc_proxy/client_impl.h4
-rw-r--r--yt/yt/client/api/rpc_proxy/connection_impl.cpp3
-rw-r--r--yt/yt/client/api/rpc_proxy/discovery_service_proxy.h28
-rw-r--r--yt/yt/client/api/rpc_proxy/helpers.cpp23
-rw-r--r--yt/yt/client/api/rpc_proxy/helpers.h8
-rw-r--r--yt/yt/client/api/table_client.cpp7
-rw-r--r--yt/yt/client/api/table_client.h22
-rw-r--r--yt/yt/client/api/table_partition_reader.cpp73
-rw-r--r--yt/yt/client/api/table_partition_reader.h33
-rw-r--r--yt/yt/client/driver/driver.cpp2
-rw-r--r--yt/yt/client/driver/driver.h5
-rw-r--r--yt/yt/client/driver/table_commands.cpp63
-rw-r--r--yt/yt/client/driver/table_commands.h18
-rw-r--r--yt/yt/client/federated/client.cpp1
-rw-r--r--yt/yt/client/hedging/hedging.cpp5
-rw-r--r--yt/yt/client/misc/workload.h28
-rw-r--r--yt/yt/client/signature/generator.cpp6
-rw-r--r--yt/yt/client/signature/generator.h1
-rw-r--r--yt/yt/client/table_client/public.h2
-rw-r--r--yt/yt/client/table_client/row_buffer.cpp34
-rw-r--r--yt/yt/client/table_client/row_buffer.h16
-rw-r--r--yt/yt/client/table_client/value_consumer.cpp9
-rw-r--r--yt/yt/client/table_client/value_consumer.h3
-rw-r--r--yt/yt/client/unittests/mock/client.h5
-rw-r--r--yt/yt/client/ya.make2
-rw-r--r--yt/yt/core/logging/structured_log-inl.h28
-rw-r--r--yt/yt/core/logging/structured_log.h24
-rw-r--r--yt/yt/core/misc/fair_share_hierarchical_queue-inl.h15
-rw-r--r--yt/yt/core/misc/fair_share_hierarchical_queue.h3
-rw-r--r--yt/yt/core/misc/serialize.cpp1
-rw-r--r--yt/yt/library/named_value/named_value.cpp60
-rw-r--r--yt/yt/library/named_value/named_value.h14
-rw-r--r--yt/yt_proto/yt/client/api/rpc_proxy/proto/api_service.proto29
70 files changed, 717 insertions, 320 deletions
diff --git a/build/conf/python.conf b/build/conf/python.conf
index 2717a11f9a..7d5867fb97 100644
--- a/build/conf/python.conf
+++ b/build/conf/python.conf
@@ -306,14 +306,14 @@ macro STYLE_PYTHON(CONFIG_TYPE="") {
}
# tag:python-specific tag:test
-### @usage: STYLE_RUFF([CONFIG_TYPE config_type])
+### @usage: STYLE_RUFF([CONFIG_TYPE config_type] [CHECK_FORMAT])
###
-### Check python3 sources for style issues using ruff.
+### Check python3 sources for style issues using ruff. `CHECK_FORMAT` enables `ruff format` check.
RUFF_PROJECT_TO_CONFIG_MAP=build/config/tests/ruff/ruff_config_paths.json
-macro STYLE_RUFF(CONFIG_TYPE="") {
+macro STYLE_RUFF(CONFIG_TYPE="", CHECK_FORMAT?"yes":"no") {
.ALLOWED_IN_LINTERS_MAKE=yes
SET_APPEND(_MAKEFILE_INCLUDE_LIKE_DEPS ${ARCADIA_ROOT}/${RUFF_PROJECT_TO_CONFIG_MAP})
- _ADD_PY_LINTER_CHECK(NAME ruff LINTER tools/ruff_linter/bin/ruff_linter GLOBAL_RESOURCES build/external_resources/ruff FILE_PROCESSING_TIME $RUFF_FILE_PROCESSING_TIME CONFIGS $PYTHON_LINTERS_DEFAULT_CONFIGS PROJECT_TO_CONFIG_MAP $RUFF_PROJECT_TO_CONFIG_MAP CONFIG_TYPE $CONFIG_TYPE)
+ _ADD_PY_LINTER_CHECK(NAME ruff LINTER tools/ruff_linter/bin/ruff_linter GLOBAL_RESOURCES build/external_resources/ruff FILE_PROCESSING_TIME $RUFF_FILE_PROCESSING_TIME CONFIGS $PYTHON_LINTERS_DEFAULT_CONFIGS PROJECT_TO_CONFIG_MAP $RUFF_PROJECT_TO_CONFIG_MAP CONFIG_TYPE $CONFIG_TYPE EXTRA_PARAMS check_format=${CHECK_FORMAT})
}
# tag:python-specific tag:test
diff --git a/build/mapping.conf.json b/build/mapping.conf.json
index f4277985dd..15898974bf 100644
--- a/build/mapping.conf.json
+++ b/build/mapping.conf.json
@@ -522,6 +522,7 @@
"8249001226": "{registry_endpoint}/8249001226",
"8273785013": "{registry_endpoint}/8273785013",
"8307046461": "{registry_endpoint}/8307046461",
+ "8317487990": "{registry_endpoint}/8317487990",
"5486731632": "{registry_endpoint}/5486731632",
"5514350352": "{registry_endpoint}/5514350352",
"5514360398": "{registry_endpoint}/5514360398",
@@ -1818,6 +1819,7 @@
"8249001226": "devtools/ya/test/programs/test_tool/bin/test_tool for linux",
"8273785013": "devtools/ya/test/programs/test_tool/bin/test_tool for linux",
"8307046461": "devtools/ya/test/programs/test_tool/bin/test_tool for linux",
+ "8317487990": "devtools/ya/test/programs/test_tool/bin/test_tool for linux",
"5486731632": "devtools/ya/test/programs/test_tool/bin3/test_tool3 for linux",
"5514350352": "devtools/ya/test/programs/test_tool/bin3/test_tool3 for linux",
"5514360398": "devtools/ya/test/programs/test_tool/bin3/test_tool3 for linux",
diff --git a/build/platform/test_tool/host.ya.make.inc b/build/platform/test_tool/host.ya.make.inc
index 5af9746a41..e1d2a80596 100644
--- a/build/platform/test_tool/host.ya.make.inc
+++ b/build/platform/test_tool/host.ya.make.inc
@@ -1,12 +1,12 @@
IF (HOST_OS_DARWIN AND HOST_ARCH_X86_64)
- DECLARE_EXTERNAL_RESOURCE(TEST_TOOL_HOST sbr:8307070450)
+ DECLARE_EXTERNAL_RESOURCE(TEST_TOOL_HOST sbr:8317513158)
ELSEIF (HOST_OS_DARWIN AND HOST_ARCH_ARM64)
- DECLARE_EXTERNAL_RESOURCE(TEST_TOOL_HOST sbr:8307068606)
+ DECLARE_EXTERNAL_RESOURCE(TEST_TOOL_HOST sbr:8317511126)
ELSEIF (HOST_OS_LINUX AND HOST_ARCH_X86_64)
- DECLARE_EXTERNAL_RESOURCE(TEST_TOOL_HOST sbr:8307073076)
+ DECLARE_EXTERNAL_RESOURCE(TEST_TOOL_HOST sbr:8317517786)
ELSEIF (HOST_OS_LINUX AND HOST_ARCH_AARCH64)
- DECLARE_EXTERNAL_RESOURCE(TEST_TOOL_HOST sbr:8307067350)
+ DECLARE_EXTERNAL_RESOURCE(TEST_TOOL_HOST sbr:8317509303)
ELSEIF (HOST_OS_WINDOWS AND HOST_ARCH_X86_64)
- DECLARE_EXTERNAL_RESOURCE(TEST_TOOL_HOST sbr:8307071994)
+ DECLARE_EXTERNAL_RESOURCE(TEST_TOOL_HOST sbr:8317515729)
ENDIF()
diff --git a/build/platform/test_tool/host_os.ya.make.inc b/build/platform/test_tool/host_os.ya.make.inc
index 3b960b3a0d..d0b597fd49 100644
--- a/build/platform/test_tool/host_os.ya.make.inc
+++ b/build/platform/test_tool/host_os.ya.make.inc
@@ -1,12 +1,12 @@
IF (HOST_OS_DARWIN AND HOST_ARCH_X86_64)
- DECLARE_EXTERNAL_RESOURCE(TEST_TOOL_HOST sbr:8307042769)
+ DECLARE_EXTERNAL_RESOURCE(TEST_TOOL_HOST sbr:8317485510)
ELSEIF (HOST_OS_DARWIN AND HOST_ARCH_ARM64)
- DECLARE_EXTERNAL_RESOURCE(TEST_TOOL_HOST sbr:8307040830)
+ DECLARE_EXTERNAL_RESOURCE(TEST_TOOL_HOST sbr:8317484083)
ELSEIF (HOST_OS_LINUX AND HOST_ARCH_X86_64)
- DECLARE_EXTERNAL_RESOURCE(TEST_TOOL_HOST sbr:8307046461)
+ DECLARE_EXTERNAL_RESOURCE(TEST_TOOL_HOST sbr:8317487990)
ELSEIF (HOST_OS_LINUX AND HOST_ARCH_AARCH64)
- DECLARE_EXTERNAL_RESOURCE(TEST_TOOL_HOST sbr:8307039261)
+ DECLARE_EXTERNAL_RESOURCE(TEST_TOOL_HOST sbr:8317482891)
ELSEIF (HOST_OS_WINDOWS AND HOST_ARCH_X86_64)
- DECLARE_EXTERNAL_RESOURCE(TEST_TOOL_HOST sbr:8307044717)
+ DECLARE_EXTERNAL_RESOURCE(TEST_TOOL_HOST sbr:8317486711)
ENDIF()
diff --git a/build/sanitize-blacklist.txt b/build/sanitize-blacklist.txt
index 711002774b..bd559495dc 100644
--- a/build/sanitize-blacklist.txt
+++ b/build/sanitize-blacklist.txt
@@ -8,4 +8,4 @@ src:*contrib/python/matplotlib/py2/src/*
src:*contrib/python/matplotlib/py3/src/*
# DTCC-1909: issues use-of-uninitialized-value on msan on Clang16
# uninitialized variable size in function add_metadata_from_side_data
-src:*contrib/libs/ffmpeg-3/libavcodec/utils.c
+src:*contrib/deprecated/ffmpeg-3/libavcodec/utils.c
diff --git a/build/sysincl/misc.yml b/build/sysincl/misc.yml
index a27b6e3de7..a1f2c6b8a1 100644
--- a/build/sysincl/misc.yml
+++ b/build/sysincl/misc.yml
@@ -97,12 +97,12 @@
- atomic.h: contrib/restricted/openal-soft/common/atomic.h
- threads.h: contrib/restricted/openal-soft/common/threads.h
-- source_filter: "^contrib/libs/ffmpeg-3/"
+- source_filter: "^contrib/deprecated/ffmpeg-3/"
includes:
- fontconfig/fontconfig.h
- fribidi.h
- - stdatomic.h: contrib/libs/ffmpeg-3/compat/atomics/win32/stdatomic.h
- - atomic.h: contrib/libs/ffmpeg-3/libavutil/atomic.h
+ - stdatomic.h: contrib/deprecated/ffmpeg-3/compat/atomics/win32/stdatomic.h
+ - atomic.h: contrib/deprecated/ffmpeg-3/libavutil/atomic.h
- source_filter: "^contrib/libs/ffmpeg-3.4.1/"
includes:
diff --git a/build/ymake.core.conf b/build/ymake.core.conf
index 939263b153..53c47f8a50 100644
--- a/build/ymake.core.conf
+++ b/build/ymake.core.conf
@@ -4856,7 +4856,7 @@ _P_PK=${hide;kv:"p PK"}
TOUCH_PACKAGE_MF=$GENERATE_MF && $TOUCH_PACKAGE $_P_PK && $ADD_VCS_INFO_FILE_CMD
# Note: we don't use touch.py in the command below to avoid introduction of undesired input
-UNION_CMD=$YMAKE_PYTHON -c open(\'$TARGET\',\'w\').close() ${hide;kv:"p UN"} ${hide;kv:"package UNION"} ${hide;kv:"pc light-cyan"} $UNION_OUTS $VCS_INFO_DISABLE_CACHE__NO_UID__
+UNION_CMD=$YMAKE_PYTHON3 -c open(\'$TARGET\',\'w\').close() ${hide;kv:"p UN"} ${hide;kv:"package UNION"} ${hide;kv:"pc light-cyan"} $UNION_OUTS $VCS_INFO_DISABLE_CACHE__NO_UID__
UNION_CMD_MF=$UNION_CMD && $GENERATE_MF
macro _EXPAND_INS_OUTS(FILES{input}[]) {
diff --git a/contrib/python/argcomplete/py3/.dist-info/METADATA b/contrib/python/argcomplete/py3/.dist-info/METADATA
index fe74af8ae5..bf74fb4961 100644
--- a/contrib/python/argcomplete/py3/.dist-info/METADATA
+++ b/contrib/python/argcomplete/py3/.dist-info/METADATA
@@ -1,6 +1,6 @@
Metadata-Version: 2.4
Name: argcomplete
-Version: 3.5.3
+Version: 3.6.0
Summary: Bash tab completion for argparse
Project-URL: Homepage, https://github.com/kislyuk/argcomplete
Project-URL: Documentation, https://kislyuk.github.io/argcomplete
@@ -9,6 +9,8 @@ Project-URL: Issue Tracker, https://github.com/kislyuk/argcomplete/issues
Project-URL: Change Log, https://github.com/kislyuk/argcomplete/blob/develop/Changes.rst
Author: Andrey Kislyuk
Author-email: kislyuk@gmail.com
+Maintainer: Andrey Kislyuk
+Maintainer-email: kislyuk@gmail.com
License: Apache Software License
License-File: LICENSE.rst
License-File: NOTICE
@@ -20,12 +22,12 @@ Classifier: Operating System :: MacOS :: MacOS X
Classifier: Operating System :: POSIX
Classifier: Programming Language :: Python
Classifier: Programming Language :: Python :: 3
-Classifier: Programming Language :: Python :: 3.7
Classifier: Programming Language :: Python :: 3.8
Classifier: Programming Language :: Python :: 3.9
Classifier: Programming Language :: Python :: 3.10
Classifier: Programming Language :: Python :: 3.11
Classifier: Programming Language :: Python :: 3.12
+Classifier: Programming Language :: Python :: 3.13
Classifier: Programming Language :: Python :: Implementation :: CPython
Classifier: Programming Language :: Python :: Implementation :: PyPy
Classifier: Topic :: Software Development
@@ -296,7 +298,7 @@ work for zsh as well.
Python Support
--------------
-Argcomplete requires Python 3.7+.
+Argcomplete requires Python 3.9+.
Support for other shells
------------------------
diff --git a/contrib/python/argcomplete/py3/README.rst b/contrib/python/argcomplete/py3/README.rst
index 13f0fa7ae3..c897ad4191 100644
--- a/contrib/python/argcomplete/py3/README.rst
+++ b/contrib/python/argcomplete/py3/README.rst
@@ -253,7 +253,7 @@ work for zsh as well.
Python Support
--------------
-Argcomplete requires Python 3.7+.
+Argcomplete requires Python 3.9+.
Support for other shells
------------------------
diff --git a/contrib/python/argcomplete/py3/argcomplete/_check_module.py b/contrib/python/argcomplete/py3/argcomplete/_check_module.py
index 7fd6a5caa4..03726529fc 100644
--- a/contrib/python/argcomplete/py3/argcomplete/_check_module.py
+++ b/contrib/python/argcomplete/py3/argcomplete/_check_module.py
@@ -10,30 +10,7 @@ Intended to be invoked by argcomplete's global completion function.
import os
import sys
import tokenize
-
-try:
- from importlib.util import find_spec # type:ignore
-except ImportError:
- import typing as t
- from collections import namedtuple
- from imp import find_module # type:ignore
-
- ModuleSpec = namedtuple("ModuleSpec", ["origin", "has_location", "submodule_search_locations"])
-
- def find_spec( # type:ignore
- name: str,
- package: t.Optional[str] = None,
- ) -> t.Optional[ModuleSpec]:
- """Minimal implementation as required by `find`."""
- try:
- f, path, _ = find_module(name)
- except ImportError:
- return None
- has_location = path is not None
- if f is None:
- return ModuleSpec(None, has_location, [path])
- f.close()
- return ModuleSpec(path, has_location, None)
+from importlib.util import find_spec
class ArgcompleteMarkerNotFound(RuntimeError):
@@ -42,7 +19,12 @@ class ArgcompleteMarkerNotFound(RuntimeError):
def find(name, return_package=False):
names = name.split(".")
- spec = find_spec(names[0])
+ # Look for the first importlib ModuleSpec that has `origin` set, indicating it's not a namespace package.
+ for package_name_boundary in range(len(names)):
+ spec = find_spec(".".join(names[: package_name_boundary + 1]))
+ if spec is not None and spec.origin is not None:
+ break
+
if spec is None:
raise ArgcompleteMarkerNotFound('no module named "{}"'.format(names[0]))
if not spec.has_location:
@@ -53,7 +35,7 @@ def find(name, return_package=False):
return spec.origin
if len(spec.submodule_search_locations) != 1:
raise ArgcompleteMarkerNotFound("expecting one search location")
- path = os.path.join(spec.submodule_search_locations[0], *names[1:])
+ path = os.path.join(spec.submodule_search_locations[0], *names[package_name_boundary + 1 :])
if os.path.isdir(path):
filename = "__main__.py"
if return_package:
diff --git a/contrib/python/argcomplete/py3/argcomplete/bash_completion.d/_python-argcomplete b/contrib/python/argcomplete/py3/argcomplete/bash_completion.d/_python-argcomplete
index 81c9d41f80..8a91272dea 100644
--- a/contrib/python/argcomplete/py3/argcomplete/bash_completion.d/_python-argcomplete
+++ b/contrib/python/argcomplete/py3/argcomplete/bash_completion.d/_python-argcomplete
@@ -124,6 +124,12 @@ __python_argcomplete_which() {
_python_argcomplete_global() {
if [[ -n "${ZSH_VERSION-}" ]]; then
+ if [[ "${_matcher_num-}" -gt 1 ]]; then
+ # Return early if the completer is called multiple times in the same completion run.
+ # Currently the only known occurrence of this is in zsh when a matcher-list zstyle is declared.
+ # When this happens, _matcher_num is incremented past 1.
+ return
+ fi
# Store result of a regex match in the
# BASH_REMATCH variable rather than MATCH
setopt local_options BASH_REMATCH
diff --git a/contrib/python/argcomplete/py3/argcomplete/completers.py b/contrib/python/argcomplete/py3/argcomplete/completers.py
index 610349f652..4c01e69518 100644
--- a/contrib/python/argcomplete/py3/argcomplete/completers.py
+++ b/contrib/python/argcomplete/py3/argcomplete/completers.py
@@ -63,13 +63,22 @@ class FilesCompleter(BaseCompleter):
# that was fixed in bash 5.3 but affects older versions. Environment variables are not treated
# correctly in older versions and calling bind makes them available. For details, see
# https://savannah.gnu.org/support/index.php?111125
- files = _call(["bash", "-c", "bind; compgen -A directory -- '{p}'".format(p=prefix)], stderr=subprocess.DEVNULL)
+ files = _call(
+ ["bash", "-c", "bind; compgen -A directory -- '{p}'".format(p=prefix)], stderr=subprocess.DEVNULL
+ )
completion += [f + "/" for f in files]
for x in self.allowednames:
- completion += _call(["bash", "-c", "bind; compgen -A file -X '!*.{0}' -- '{p}'".format(x, p=prefix)], stderr=subprocess.DEVNULL)
+ completion += _call(
+ ["bash", "-c", "bind; compgen -A file -X '!*.{0}' -- '{p}'".format(x, p=prefix)],
+ stderr=subprocess.DEVNULL,
+ )
else:
- completion += _call(["bash", "-c", "bind; compgen -A file -- '{p}'".format(p=prefix)], stderr=subprocess.DEVNULL)
- anticomp = _call(["bash", "-c", "bind; compgen -A directory -- '{p}'".format(p=prefix)], stderr=subprocess.DEVNULL)
+ completion += _call(
+ ["bash", "-c", "bind; compgen -A file -- '{p}'".format(p=prefix)], stderr=subprocess.DEVNULL
+ )
+ anticomp = _call(
+ ["bash", "-c", "bind; compgen -A directory -- '{p}'".format(p=prefix)], stderr=subprocess.DEVNULL
+ )
completion = list(set(completion) - set(anticomp))
if self.directories:
diff --git a/contrib/python/argcomplete/py3/argcomplete/packages/_argparse.py b/contrib/python/argcomplete/py3/argcomplete/packages/_argparse.py
index 0666845e3a..f6ecb1f417 100644
--- a/contrib/python/argcomplete/py3/argcomplete/packages/_argparse.py
+++ b/contrib/python/argcomplete/py3/argcomplete/packages/_argparse.py
@@ -75,7 +75,7 @@ class IntrospectiveArgumentParser(ArgumentParser):
except for the lines that contain the string "Added by argcomplete".
'''
- def _parse_known_args(self, arg_strings, namespace, intermixed=False):
+ def _parse_known_args(self, arg_strings, namespace, intermixed=False, **kwargs):
_num_consumed_args.clear() # Added by argcomplete
self._argcomplete_namespace = namespace
self.active_actions: List[Action] = [] # Added by argcomplete
diff --git a/contrib/python/argcomplete/py3/argcomplete/packages/_shlex.py b/contrib/python/argcomplete/py3/argcomplete/packages/_shlex.py
index 613feb2597..ecd785b80b 100644
--- a/contrib/python/argcomplete/py3/argcomplete/packages/_shlex.py
+++ b/contrib/python/argcomplete/py3/argcomplete/packages/_shlex.py
@@ -36,7 +36,7 @@ class shlex:
else:
self.eof = ''
self.commenters = '#'
- self.wordchars = 'abcdfeghijklmnopqrstuvwxyz' 'ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789_'
+ self.wordchars = 'abcdfeghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789_'
# Modified by argcomplete: 2/3 compatibility
# if self.posix:
# self.wordchars += ('ßàáâãäåæçèéêëìíîïðñòóôõöøùúûüýþÿ'
diff --git a/contrib/python/argcomplete/py3/argcomplete/scripts/python_argcomplete_check_easy_install_script.py b/contrib/python/argcomplete/py3/argcomplete/scripts/python_argcomplete_check_easy_install_script.py
index d914c222fe..0eb744c9e9 100644
--- a/contrib/python/argcomplete/py3/argcomplete/scripts/python_argcomplete_check_easy_install_script.py
+++ b/contrib/python/argcomplete/py3/argcomplete/scripts/python_argcomplete_check_easy_install_script.py
@@ -33,7 +33,7 @@ def main():
lines = head.split("\n", 12)
for line in lines:
if line.startswith("# EASY-INSTALL-SCRIPT"):
- import pkg_resources
+ import pkg_resources # type: ignore
re_match = re.match("# EASY-INSTALL-SCRIPT: '(.+)','(.+)'", line)
assert re_match is not None
@@ -48,7 +48,7 @@ def main():
dist, group, name = re_match.groups()
import pkgutil
- import pkg_resources
+ import pkg_resources # type: ignore
entry_point_info = pkg_resources.get_distribution(dist).get_entry_info(group, name)
assert entry_point_info is not None
@@ -71,7 +71,7 @@ def main():
module = re_match.groups()[0]
import pkgutil
- import pkg_resources
+ import pkg_resources # type: ignore
with open(pkgutil.get_loader(module).get_filename()) as mod_fh: # type: ignore
if "PYTHON_ARGCOMPLETE_OK" in mod_fh.read(1024):
diff --git a/contrib/python/argcomplete/py3/argcomplete/shell_integration.py b/contrib/python/argcomplete/py3/argcomplete/shell_integration.py
index f0b9d7db5b..37b5603b11 100644
--- a/contrib/python/argcomplete/py3/argcomplete/shell_integration.py
+++ b/contrib/python/argcomplete/py3/argcomplete/shell_integration.py
@@ -34,6 +34,12 @@ _python_argcomplete%(function_suffix)s() {
local IFS=$'\013'
local script="%(argcomplete_script)s"
if [[ -n "${ZSH_VERSION-}" ]]; then
+ if [[ "${_matcher_num-}" -gt 1 ]]; then
+ # Return early if the completer is called multiple times in the same completion run.
+ # Currently the only known occurrence of this is in zsh when a matcher-list zstyle is declared.
+ # When this happens, _matcher_num is incremented past 1.
+ return
+ fi
local completions
completions=($(IFS="$IFS" \
COMP_LINE="$BUFFER" \
diff --git a/contrib/python/argcomplete/py3/ya.make b/contrib/python/argcomplete/py3/ya.make
index dfa97494a9..74c5629658 100644
--- a/contrib/python/argcomplete/py3/ya.make
+++ b/contrib/python/argcomplete/py3/ya.make
@@ -2,7 +2,7 @@
PY3_LIBRARY()
-VERSION(3.5.3)
+VERSION(3.6.0)
LICENSE(Apache-2.0)
diff --git a/library/cpp/neh/tcp2.cpp b/library/cpp/neh/tcp2.cpp
index 3dad055af1..641408973d 100644
--- a/library/cpp/neh/tcp2.cpp
+++ b/library/cpp/neh/tcp2.cpp
@@ -332,35 +332,6 @@ namespace {
char MemPool_[MemPoolSize_ + MemPoolReserve_];
};
- //protector for limit usage tcp connection output (and used data) only from one thread at same time
- class TOutputLock {
- public:
- TOutputLock() noexcept
- : Lock_(0)
- {
- }
-
- bool TryAquire() noexcept {
- do {
- if (AtomicTryLock(&Lock_)) {
- return true;
- }
- } while (!AtomicGet(Lock_)); //without magic loop atomic lock some unreliable
- return false;
- }
-
- void Release() noexcept {
- AtomicUnlock(&Lock_);
- }
-
- bool IsFree() const noexcept {
- return !AtomicGet(Lock_);
- }
-
- private:
- TAtomic Lock_;
- };
-
class TClient {
class TRequest;
class TConnection;
@@ -718,13 +689,13 @@ namespace {
}
void ProcessOutputReqsQueue() {
- if (OutputLock_.TryAquire()) {
+ if (OutputLock_.TryAcquire()) {
SendMessages(false);
}
}
void ProcessOutputCancelsQueue() {
- if (OutputLock_.TryAquire()) {
+ if (OutputLock_.TryAcquire()) {
AS_.GetIOService().Post(std::bind(&TConnection::SendMessages, TConnectionRef(this), true));
return;
}
@@ -763,7 +734,7 @@ namespace {
PrepareSocket(AS_.Native());
AtomicSet(State_, Connected);
AS_.AsyncPollRead(std::bind(&TConnection::OnCanRead, TConnectionRef(this), _1, _2));
- if (OutputLock_.TryAquire()) {
+ if (OutputLock_.TryAcquire()) {
SendMessages(true);
return;
}
@@ -857,7 +828,7 @@ namespace {
DBGOUT("TClient::SendMessages(exit2)");
return;
}
- } while (OutputLock_.TryAquire());
+ } while (OutputLock_.TryAcquire());
DBGOUT("TClient::SendMessages(exit1)");
}
@@ -1058,7 +1029,8 @@ namespace {
TTcp2Message Msg_;
//output
- TOutputLock OutputLock_;
+
+ TSpinLock OutputLock_; //protect socket/buffers from simultaneous access from few threads
TAtomic NeedCheckReqsQueue_;
TLockFreeQueue<TRequest*> Reqs_;
TAtomic NeedCheckCancelsQueue_;
@@ -1412,11 +1384,11 @@ namespace {
void ProcessOutputQueue() {
AtomicSet(NeedCheckOutputQueue_, 1);
- if (OutputLock_.TryAquire()) {
+ if (OutputLock_.TryAcquire()) {
SendMessages(false);
return;
}
- DBGOUT("ProcessOutputQueue: !AquireOutputOwnership: " << (int)OutputLock_.IsFree());
+ DBGOUT("ProcessOutputQueue: !AquireOutputOwnership: " << (int)!OutputLock_.IsLocked());
}
//must be called only after success aquiring output
@@ -1444,10 +1416,10 @@ namespace {
OutputLock_.Release();
if (!AtomicGet(NeedCheckOutputQueue_)) {
- DBGOUT("Server::SendMessages(exit2): " << (int)OutputLock_.IsFree());
+ DBGOUT("Server::SendMessages(exit2): " << (int)!OutputLock_.IsLocked());
return;
}
- } while (OutputLock_.TryAquire());
+ } while (OutputLock_.TryAcquire());
DBGOUT("Server::SendMessages(exit1)");
} catch (...) {
OnError();
@@ -1518,7 +1490,7 @@ namespace {
TLockFreeQueue<TRequestId> FinReqs_;
//output
- TOutputLock OutputLock_; //protect socket/buffers from simultaneous access from few threads
+ TSpinLock OutputLock_; //protect socket/buffers from simultaneous access from few threads
TAtomic NeedCheckOutputQueue_;
NNeh::TAutoLockFreeQueue<TOutputData> OutputData_;
TOutputBuffers OutputBuffers_;
diff --git a/library/cpp/netliba/v6/ib_low.cpp b/library/cpp/netliba/v6/ib_low.cpp
index 99d77d593f..fca97353d2 100644
--- a/library/cpp/netliba/v6/ib_low.cpp
+++ b/library/cpp/netliba/v6/ib_low.cpp
@@ -33,7 +33,7 @@ namespace NNetliba {
TIntrusivePtr<TIBContext> ctx;
TIntrusivePtr<TIBPort> resPort;
- int numDevices;
+ int numDevices{0};
ibv_device** deviceList = ibv_get_device_list(&numDevices);
//for (int i = 0; i < numDevices; ++i) {
// ibv_device *dev = deviceList[i];
diff --git a/ya b/ya
index fac4860902..6b1f523fbe 100755
--- a/ya
+++ b/ya
@@ -39,33 +39,33 @@ REGISTRY_ENDPOINT = os.environ.get("YA_REGISTRY_ENDPOINT", "https://devtools-reg
PLATFORM_MAP = {
"data": {
"win32": {
- "md5": "873a2f0b74705af5bc1bf41217ce49ce",
+ "md5": "729e5a92e5e9253001266369de2108af",
"urls": [
- f"{REGISTRY_ENDPOINT}/8307066792"
+ f"{REGISTRY_ENDPOINT}/8317503781"
]
},
"darwin": {
- "md5": "31492187efa40748dd75653a87c40cc1",
+ "md5": "f5e0afaf1189c93b9ef3896f3531882b",
"urls": [
- f"{REGISTRY_ENDPOINT}/8307065239"
+ f"{REGISTRY_ENDPOINT}/8317502378"
]
},
"darwin-arm64": {
- "md5": "07ac8ac75b798223cd71f5c31b10110b",
+ "md5": "0717c848e11e10b24436d51076b5fe55",
"urls": [
- f"{REGISTRY_ENDPOINT}/8307063799"
+ f"{REGISTRY_ENDPOINT}/8317500655"
]
},
"linux-aarch64": {
- "md5": "0315db8cf713e6b77de53c738bcb3ea7",
+ "md5": "a8716b4389a8527540e10d45c3c10325",
"urls": [
- f"{REGISTRY_ENDPOINT}/8307062757"
+ f"{REGISTRY_ENDPOINT}/8317498738"
]
},
"linux": {
- "md5": "1287df93b85d867c3fb93e7f9f972b94",
+ "md5": "5fc8c836af0182fa3d93e89bc3fe907c",
"urls": [
- f"{REGISTRY_ENDPOINT}/8307068169"
+ f"{REGISTRY_ENDPOINT}/8317505228"
]
}
}
diff --git a/yql/essentials/parser/pg_wrapper/arena_ctx.cpp b/yql/essentials/parser/pg_wrapper/arena_ctx.cpp
index e91cd6ad7a..6439aaa18a 100644
--- a/yql/essentials/parser/pg_wrapper/arena_ctx.cpp
+++ b/yql/essentials/parser/pg_wrapper/arena_ctx.cpp
@@ -30,9 +30,9 @@ extern MemoryContext ArenaGetChunkContext(void *pointer);
extern Size ArenaGetChunkSpace(void *pointer);
extern bool ArenaIsEmpty(MemoryContext context);
extern void ArenaStats(MemoryContext context,
- MemoryStatsPrintFunc printfunc, void *passthru,
- MemoryContextCounters *totals,
- bool print_to_stderr);
+ MemoryStatsPrintFunc printfunc, void *passthru,
+ MemoryContextCounters *totals,
+ bool print_to_stderr);
#ifdef MEMORY_CONTEXT_CHECKING
extern void ArenaCheck(MemoryContext context);
#endif
@@ -137,4 +137,9 @@ void TArenaMemoryContext::Release() {
Prev = nullptr;
}
+namespace NCommon {
+std::shared_ptr<void> CreateMemoryArenaContext() {
+ return std::make_shared<TArenaMemoryContext>();
+}
+}
}
diff --git a/yql/essentials/parser/pg_wrapper/arrow.cpp b/yql/essentials/parser/pg_wrapper/arrow.cpp
index ee52e76aac..93b27ccdf9 100644
--- a/yql/essentials/parser/pg_wrapper/arrow.cpp
+++ b/yql/essentials/parser/pg_wrapper/arrow.cpp
@@ -436,7 +436,6 @@ NUdf::TBlockItem BlockItemFromDatum(Datum datum, const NPg::TTypeDesc& desc, std
}
NUdf::TBlockItem PgBlockItemFromNativeBinary(const TStringBuf binary, ui32 pgTypeId, std::vector<char>& tmp) {
- NKikimr::NMiniKQL::TPAllocScope call;
StringInfoData stringInfo;
stringInfo.data = (char*)binary.Data();
stringInfo.len = binary.Size();
diff --git a/yql/essentials/parser/pg_wrapper/interface/codec.h b/yql/essentials/parser/pg_wrapper/interface/codec.h
index c2aa3c4af0..a3a634be83 100644
--- a/yql/essentials/parser/pg_wrapper/interface/codec.h
+++ b/yql/essentials/parser/pg_wrapper/interface/codec.h
@@ -50,5 +50,7 @@ void WriteSkiffPg(NKikimr::NMiniKQL::TPgType* type, const NKikimr::NUdf::TUnboxe
extern "C" void ReadSkiffPgValue(NKikimr::NMiniKQL::TPgType* type, NKikimr::NUdf::TUnboxedValue& value, TInputBuf& buf);
extern "C" void WriteSkiffPgValue(NKikimr::NMiniKQL::TPgType* type, const NKikimr::NUdf::TUnboxedValuePod& value, TOutputBuf& buf);
+std::shared_ptr<void> CreateMemoryArenaContext();
+
} // namespace NCommon
} // namespace NYql
diff --git a/yql/essentials/sql/pg_dummy/pg_sql_dummy.cpp b/yql/essentials/sql/pg_dummy/pg_sql_dummy.cpp
index ddf7f42ca7..02cafe876b 100644
--- a/yql/essentials/sql/pg_dummy/pg_sql_dummy.cpp
+++ b/yql/essentials/sql/pg_dummy/pg_sql_dummy.cpp
@@ -548,6 +548,12 @@ TString GetPostgresServerVersionStr() {
namespace NYql {
+namespace NCommon {
+ std::shared_ptr<void> CreateMemoryArenaContext() {
+ throw yexception() << "PG types are not supported";
+ }
+}
+
ui64 HexEncode(const char *src, size_t len, char *dst) {
Y_UNUSED(src);
Y_UNUSED(len);
@@ -556,7 +562,6 @@ ui64 HexEncode(const char *src, size_t len, char *dst) {
throw yexception() << "HexEncode in pg_dummy does nothing";
}
-
std::unique_ptr<IYtColumnConverter> BuildPgTopLevelColumnReader(std::unique_ptr<NKikimr::NUdf::IArrayBuilder>&& /* builder */, const NKikimr::NMiniKQL::TPgType* /* targetType */) {
throw yexception() << "PG types are not supported";
}
diff --git a/yt/cpp/mapreduce/client/transaction_pinger.cpp b/yt/cpp/mapreduce/client/transaction_pinger.cpp
index 2bd4d8c416..d393e50a5d 100644
--- a/yt/cpp/mapreduce/client/transaction_pinger.cpp
+++ b/yt/cpp/mapreduce/client/transaction_pinger.cpp
@@ -200,91 +200,6 @@ private:
////////////////////////////////////////////////////////////////////////////////
-class TThreadPerTransactionPinger
- : public ITransactionPinger
-{
-public:
- ~TThreadPerTransactionPinger() override
- {
- if (Running_) {
- RemoveTransaction(*PingableTx_);
- }
- }
-
- ITransactionPingerPtr GetChildTxPinger() override
- {
- return MakeIntrusive<TThreadPerTransactionPinger>();
- }
-
- void RegisterTransaction(const TPingableTransaction& pingableTx) override
- {
- YT_VERIFY(!Running_);
- YT_VERIFY(PingableTx_ == nullptr);
-
- PingableTx_ = &pingableTx;
- Running_ = true;
-
- PingerThread_ = std::make_unique<TThread>(
- TThread::TParams{Pinger, this}.SetName("pingable_tx"));
- PingerThread_->Start();
- }
-
- bool HasTransaction(const TPingableTransaction& pingableTx) override
- {
- return PingableTx_ == &pingableTx && Running_;
- }
-
- void RemoveTransaction(const TPingableTransaction& pingableTx) override
- {
- YT_VERIFY(HasTransaction(pingableTx));
-
- Running_ = false;
- if (PingerThread_) {
- PingerThread_->Join();
- }
- }
-
-private:
- static void* Pinger(void* opaque)
- {
- static_cast<TThreadPerTransactionPinger*>(opaque)->Pinger();
- return nullptr;
- }
-
- void Pinger()
- {
- auto [minPingInterval, maxPingInterval] = PingableTx_->GetPingInterval();
- while (Running_) {
- TDuration waitTime = minPingInterval + (maxPingInterval - minPingInterval) * RandomNumber<float>();
- try {
- PingableTx_->Ping();
- } catch (const std::exception& e) {
- if (auto* errorResponse = dynamic_cast<const TErrorResponse*>(&e)) {
- if (errorResponse->GetError().ContainsErrorCode(NYT::NClusterErrorCodes::NTransactionClient::NoSuchTransaction)) {
- break;
- } else if (errorResponse->GetError().ContainsErrorCode(NYT::NClusterErrorCodes::Timeout)) {
- waitTime = TDuration::MilliSeconds(0);
- }
- }
- // Else do nothing, going to retry this error.
- }
-
- TInstant t = Now();
- while (Running_ && Now() - t < waitTime) {
- NDetail::TWaitProxy::Get()->Sleep(TDuration::MilliSeconds(100));
- }
- }
- }
-
-private:
- const TPingableTransaction* PingableTx_ = nullptr;
-
- std::atomic<bool> Running_ = false;
- std::unique_ptr<TThread> PingerThread_;
-};
-
-////////////////////////////////////////////////////////////////////////////////
-
ITransactionPingerPtr CreateTransactionPinger(const TConfigPtr& config)
{
YT_LOG_DEBUG("Using async transaction pinger");
diff --git a/yt/yql/providers/yt/comp_nodes/dq/dq_yt_block_reader.cpp b/yt/yql/providers/yt/comp_nodes/dq/dq_yt_block_reader.cpp
index 403a7facdd..50e7a65150 100644
--- a/yt/yql/providers/yt/comp_nodes/dq/dq_yt_block_reader.cpp
+++ b/yt/yql/providers/yt/comp_nodes/dq/dq_yt_block_reader.cpp
@@ -15,6 +15,7 @@
#include <yql/essentials/minikql/mkql_stats_registry.h>
#include <yql/essentials/minikql/mkql_node.h>
#include <yql/essentials/minikql/mkql_type_builder.h>
+#include <yql/essentials/parser/pg_wrapper/interface/codec.h>
#include <yt/yt/core/concurrency/thread_pool.h>
#include <yt/yt/core/threading/thread.h>
@@ -309,25 +310,26 @@ public:
}
arrow::Status OnRecordBatchDecoded(std::shared_ptr<arrow::RecordBatch> batch) override {
- NKikimr::NMiniKQL::TScopedAlloc scope(__LOCATION__);
- TThrowingBindTerminator t;
-
- YQL_ENSURE(batch);
- MKQL_ADD_STAT(JobStats_, BlockCount, 1);
std::vector<arrow::Datum> result;
- YQL_ENSURE((size_t)batch->num_columns() == ColumnConverters_.size());
- result.resize(ColumnConverters_.size());
- size_t matchedColumns = 0;
- for (size_t i = 0; i < ColumnConverters_.size(); ++i) {
- auto columnIdxIt = ColumnOrderMapping.find(batch->schema()->field_names()[i]);
- if (ColumnOrderMapping.end() == columnIdxIt) {
- continue;
+ {
+ auto ctx = NCommon::CreateMemoryArenaContext();
+
+ YQL_ENSURE(batch);
+ MKQL_ADD_STAT(JobStats_, BlockCount, 1);
+ YQL_ENSURE((size_t)batch->num_columns() == ColumnConverters_.size());
+ result.resize(ColumnConverters_.size());
+ size_t matchedColumns = 0;
+ for (size_t i = 0; i < ColumnConverters_.size(); ++i) {
+ auto columnIdxIt = ColumnOrderMapping.find(batch->schema()->field_names()[i]);
+ if (ColumnOrderMapping.end() == columnIdxIt) {
+ continue;
+ }
+ ++matchedColumns;
+ auto columnIdx = columnIdxIt->second;
+ result[columnIdx] = std::move(ColumnConverters_[columnIdx]->Convert(batch->column(i)->data()));
}
- ++matchedColumns;
- auto columnIdx = columnIdxIt->second;
- result[columnIdx] = std::move(ColumnConverters_[columnIdx]->Convert(batch->column(i)->data()));
+ Y_ENSURE(matchedColumns == ColumnOrderMapping.size());
}
- Y_ENSURE(matchedColumns == ColumnOrderMapping.size());
Consumer_->HandleResult(std::make_shared<TResultBatch>(batch->num_rows(), std::move(result)));
return arrow::Status::OK();
}
diff --git a/yt/yql/providers/yt/comp_nodes/dq/ya.make.inc b/yt/yql/providers/yt/comp_nodes/dq/ya.make.inc
index 601177fb3f..332980a4c0 100644
--- a/yt/yql/providers/yt/comp_nodes/dq/ya.make.inc
+++ b/yt/yql/providers/yt/comp_nodes/dq/ya.make.inc
@@ -15,6 +15,7 @@ PEERDIR(
yql/essentials/public/udf
contrib/libs/apache/arrow
contrib/libs/flatbuffers
+ yql/essentials/parser/pg_wrapper/interface
)
ADDINCL(
diff --git a/yt/yql/providers/yt/gateway/native/yql_yt_native.cpp b/yt/yql/providers/yt/gateway/native/yql_yt_native.cpp
index be8ce859a7..9e285ea166 100644
--- a/yt/yql/providers/yt/gateway/native/yql_yt_native.cpp
+++ b/yt/yql/providers/yt/gateway/native/yql_yt_native.cpp
@@ -412,28 +412,28 @@ TString GenerateInputQuery(const TRichYPath& path, const TString& whereExpressio
return result;
}
-void SetInputQuerySpec(NYT::TNode& spec, const TString& inputQuery, bool useSystemColumns) {
+void SetInputQuerySpec(NYT::TNode& spec, const TString& inputQuery, const TYtSettings::TConstPtr& config, bool useSystemColumns) {
spec["input_query"] = inputQuery;
spec["input_query_filter_options"]["enable_chunk_filter"] = true;
- spec["input_query_filter_options"]["enable_row_filter"] = true;
+ spec["input_query_filter_options"]["enable_row_filter"] = config->PruneQLFilterLambda.Get().GetOrElse(DEFAULT_PRUNE_QL_FILTER_LAMBDA);
if (useSystemColumns) {
spec["input_query_options"]["use_system_columns"] = true;
}
}
-void PrepareInputQueryForMerge(NYT::TNode& spec, TVector<TRichYPath>& paths, const TString& whereExpression) {
+void PrepareInputQueryForMerge(NYT::TNode& spec, TVector<TRichYPath>& paths, const TString& whereExpression, const TYtSettings::TConstPtr& config) {
// YQL-19382
if (whereExpression) {
YQL_ENSURE(paths.size() == 1, "YtQLFilter: multiple inputs are not supported");
auto& path = paths[0];
const TString inputQuery = GenerateInputQuery(path, whereExpression, /*useSystemColumns*/ false);
path.Columns_.Clear();
- SetInputQuerySpec(spec, inputQuery, /*useSystemColumns*/ false);
+ SetInputQuerySpec(spec, inputQuery, config, /*useSystemColumns*/ false);
}
}
template <typename T>
-void PrepareInputQueryForMap(NYT::TNode& spec, T& specWithPaths, const TString& whereExpression, bool useSystemColumns) {
+void PrepareInputQueryForMap(NYT::TNode& spec, T& specWithPaths, const TString& whereExpression, const TYtSettings::TConstPtr& config, bool useSystemColumns) {
// YQL-19382
if (whereExpression) {
const auto& inputs = specWithPaths.GetInputs();
@@ -444,7 +444,7 @@ void PrepareInputQueryForMap(NYT::TNode& spec, T& specWithPaths, const TString&
path.Columns_.Clear();
specWithPaths.SetInput(0, path);
}
- SetInputQuerySpec(spec, inputQuery, useSystemColumns);
+ SetInputQuerySpec(spec, inputQuery, config, useSystemColumns);
}
}
@@ -3675,7 +3675,7 @@ private:
spec["schema_inference_mode"] = "from_output"; // YTADMINREQ-17692
}
- PrepareInputQueryForMerge(spec, mergeOpSpec.Inputs_, inputQueryExpr);
+ PrepareInputQueryForMerge(spec, mergeOpSpec.Inputs_, inputQueryExpr, execCtx->Options_.Config());
return execCtx->RunOperation([entry, mergeOpSpec = std::move(mergeOpSpec), spec = std::move(spec)](){
return entry->Tx->Merge(mergeOpSpec, TOperationOptions().StartOperationMode(TOperationOptions::EStartOperationMode::AsyncPrepare).Spec(spec));
@@ -3864,7 +3864,7 @@ private:
spec["job_count"] = static_cast<i64>(*jobCount);
}
- PrepareInputQueryForMap(spec, mapOpSpec, inputQueryExpr, /*useSystemColumns*/ useSkiff);
+ PrepareInputQueryForMap(spec, mapOpSpec, inputQueryExpr, execCtx->Options_.Config(), /*useSystemColumns*/ useSkiff);
TOperationOptions opOpts;
FillOperationOptions(opOpts, execCtx, entry);
@@ -4366,7 +4366,7 @@ private:
spec["mapper"]["output_streams"] = intermediateStreams;
}
- PrepareInputQueryForMap(spec, mapReduceOpSpec, inputQueryExpr, /*useSystemColumns*/ useSkiff);
+ PrepareInputQueryForMap(spec, mapReduceOpSpec, inputQueryExpr, execCtx->Options_.Config(), /*useSystemColumns*/ useSkiff);
TOperationOptions opOpts;
FillOperationOptions(opOpts, execCtx, entry);
@@ -4514,7 +4514,7 @@ private:
spec["reducer"]["enable_input_table_index"] = true;
}
- PrepareInputQueryForMap(spec, mapReduceOpSpec, inputQueryExpr, /*useSystemColumns*/ useSkiff);
+ PrepareInputQueryForMap(spec, mapReduceOpSpec, inputQueryExpr, execCtx->Options_.Config(), /*useSystemColumns*/ useSkiff);
TOperationOptions opOpts;
FillOperationOptions(opOpts, execCtx, entry);
diff --git a/yt/yql/providers/yt/provider/phy_opt/yql_yt_phy_opt_ytql.cpp b/yt/yql/providers/yt/provider/phy_opt/yql_yt_phy_opt_ytql.cpp
index 73cacf723d..bd24c6a473 100644
--- a/yt/yql/providers/yt/provider/phy_opt/yql_yt_phy_opt_ytql.cpp
+++ b/yt/yql/providers/yt/provider/phy_opt/yql_yt_phy_opt_ytql.cpp
@@ -161,23 +161,24 @@ TMaybeNode<TExprBase> TYtPhysicalOptProposalTransformer::ExtractQLFilters(TExprB
}
YQL_ENSURE(qlCompatiblePredicate);
- TExprNode::TPtr prunedPredicate;
- if (otherParts.empty()) {
- prunedPredicate = MakeBool<true>(predicate->Pos(), ctx);
- } else if (otherParts.size() == 1) {
- prunedPredicate = otherParts.front();
- } else {
- prunedPredicate = ctx.NewCallable(predicate->Pos(), "And", std::move(otherParts));
- }
- YQL_ENSURE(prunedPredicate);
-
const auto typeNode = ExpandType(rowArg->Pos(), *rowArg->GetTypeAnn(), ctx);
const auto lambdaNode = ctx.NewLambda(qlCompatiblePredicate->Pos(), ctx.NewArguments(qlCompatiblePredicate->Pos(), {newRowArg}), std::move(qlCompatiblePredicate));
const auto qlFilter = ctx.NewCallable(flatMap.Cast().Pos(), "YtQLFilter", {typeNode, lambdaNode});
auto newOpMap = ctx.ChangeChild(opMap.Ref(), TYtMap::idx_Settings, NYql::AddSetting(opMap.Settings().Ref(), EYtSettingType::QLFilter, qlFilter, ctx));
+
const bool pruneLambda = State_->Configuration->PruneQLFilterLambda.Get().GetOrElse(DEFAULT_PRUNE_QL_FILTER_LAMBDA);
if (pruneLambda) {
+ TExprNode::TPtr prunedPredicate;
+ if (otherParts.empty()) {
+ prunedPredicate = MakeBool<true>(predicate->Pos(), ctx);
+ } else if (otherParts.size() == 1) {
+ prunedPredicate = otherParts.front();
+ } else {
+ prunedPredicate = ctx.NewCallable(predicate->Pos(), "And", std::move(otherParts));
+ }
+ YQL_ENSURE(prunedPredicate);
+
const auto newFlatMap = Build<TCoFlatMapBase>(ctx, flatMap.Cast().Pos())
.InitFrom(flatMap.Cast())
.Lambda()
diff --git a/yt/yql/tests/sql/suites/ql_filter/integer_single_disable_prune.cfg b/yt/yql/tests/sql/suites/ql_filter/integer_single_disable_prune.cfg
new file mode 100644
index 0000000000..d0ce4581d7
--- /dev/null
+++ b/yt/yql/tests/sql/suites/ql_filter/integer_single_disable_prune.cfg
@@ -0,0 +1 @@
+in Input integer.txt
diff --git a/yt/yql/tests/sql/suites/ql_filter/integer_single_disable_prune.sql b/yt/yql/tests/sql/suites/ql_filter/integer_single_disable_prune.sql
new file mode 100644
index 0000000000..293c48c64a
--- /dev/null
+++ b/yt/yql/tests/sql/suites/ql_filter/integer_single_disable_prune.sql
@@ -0,0 +1,6 @@
+pragma yt.UseQLFilter;
+pragma yt.PruneQLFilterLambda='false';
+
+select a
+from plato.Input
+where a > 5;
diff --git a/yt/yt/client/api/delegating_client.h b/yt/yt/client/api/delegating_client.h
index fc166e08de..ca2e945220 100644
--- a/yt/yt/client/api/delegating_client.h
+++ b/yt/yt/client/api/delegating_client.h
@@ -395,6 +395,11 @@ public:
const TPartitionTablesOptions& options),
(paths, options))
+ DELEGATE_METHOD(TFuture<ITablePartitionReaderPtr>, CreateTablePartitionReader, (
+ const TTablePartitionCookiePtr& descriptor,
+ const TReadTablePartitionOptions& options),
+ (descriptor, options))
+
// Journals
DELEGATE_METHOD(TFuture<void>, TruncateJournal, (
const NYPath::TYPath& path,
diff --git a/yt/yt/client/api/private.cpp b/yt/yt/client/api/private.cpp
new file mode 100644
index 0000000000..6379e75128
--- /dev/null
+++ b/yt/yt/client/api/private.cpp
@@ -0,0 +1,23 @@
+#include "private.h"
+
+#include "table_partition_reader.h"
+
+#include <yt/yt/client/table_client/schema.h>
+
+namespace NYT::NApi::NDetail {
+
+////////////////////////////////////////////////////////////////////////////////
+
+std::vector<NTableClient::TTableSchemaPtr> GetTableSchemas(const ITablePartitionReaderPtr& partitionReader)
+{
+ return partitionReader->GetTableSchemas();
+}
+
+std::vector<NTableClient::TColumnNameFilter> GetColumnFilters(const ITablePartitionReaderPtr& partitionReader)
+{
+ return partitionReader->GetColumnFilters();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT::NApi::NDetail
diff --git a/yt/yt/client/api/private.h b/yt/yt/client/api/private.h
index 4a4e6567fe..2d2d9323bd 100644
--- a/yt/yt/client/api/private.h
+++ b/yt/yt/client/api/private.h
@@ -12,5 +12,17 @@ YT_DEFINE_GLOBAL(const NLogging::TLogger, ApiLogger, "Api");
////////////////////////////////////////////////////////////////////////////////
-} // namespace NYT::NApi
+namespace NDetail {
+
+////////////////////////////////////////////////////////////////////////////////
+
+
+// Functions below are for internal usage only. No backward compatibility is guaranteed.
+std::vector<NTableClient::TTableSchemaPtr> GetTableSchemas(const ITablePartitionReaderPtr& partitionReader);
+std::vector<NTableClient::TColumnNameFilter> GetColumnFilters(const ITablePartitionReaderPtr& partitionReader);
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NDetail
+
+} // namespace NYT::NApi
diff --git a/yt/yt/client/api/public.h b/yt/yt/client/api/public.h
index 92a8204fc4..36dbf9d637 100644
--- a/yt/yt/client/api/public.h
+++ b/yt/yt/client/api/public.h
@@ -146,6 +146,8 @@ DECLARE_REFCOUNTED_STRUCT(IRowBatchWriter)
DECLARE_REFCOUNTED_STRUCT(ITableReader)
DECLARE_REFCOUNTED_STRUCT(ITableWriter)
+DECLARE_REFCOUNTED_CLASS(ITablePartitionReader)
+
DECLARE_REFCOUNTED_STRUCT(ITableFragmentWriter);
DECLARE_REFCOUNTED_STRUCT(IFileReader)
diff --git a/yt/yt/client/api/rpc_proxy/api_service_proxy.h b/yt/yt/client/api/rpc_proxy/api_service_proxy.h
index febae10ac9..57a46d9704 100644
--- a/yt/yt/client/api/rpc_proxy/api_service_proxy.h
+++ b/yt/yt/client/api/rpc_proxy/api_service_proxy.h
@@ -149,6 +149,8 @@ public:
.SetStreamingEnabled(true));
DEFINE_RPC_PROXY_METHOD(NRpcProxy::NProto, GetColumnarStatistics);
DEFINE_RPC_PROXY_METHOD(NRpcProxy::NProto, PartitionTables);
+ DEFINE_RPC_PROXY_METHOD(NRpcProxy::NProto, ReadTablePartition,
+ .SetStreamingEnabled(true));
// File caching
DEFINE_RPC_PROXY_METHOD(NRpcProxy::NProto, GetFileFromCache);
diff --git a/yt/yt/client/api/rpc_proxy/client_impl.cpp b/yt/yt/client/api/rpc_proxy/client_impl.cpp
index c871e20e09..a5d3dc4249 100644
--- a/yt/yt/client/api/rpc_proxy/client_impl.cpp
+++ b/yt/yt/client/api/rpc_proxy/client_impl.cpp
@@ -11,6 +11,7 @@
#include "transaction.h"
#include <yt/yt/client/api/helpers.h>
+#include <yt/yt/client/api/table_partition_reader.h>
#include <yt/yt/client/chaos_client/replication_card_serialization.h>
@@ -1738,6 +1739,7 @@ TFuture<NApi::TMultiTablePartitions> TClient::PartitionTables(
req->set_adjust_data_weight_per_partition(options.AdjustDataWeightPerPartition);
req->set_enable_key_guarantee(options.EnableKeyGuarantee);
+ req->set_enable_cookies(options.EnableCookies);
ToProto(req->mutable_transactional_options(), options);
@@ -1746,6 +1748,34 @@ TFuture<NApi::TMultiTablePartitions> TClient::PartitionTables(
}));
}
+TFuture<ITablePartitionReaderPtr> TClient::CreateTablePartitionReader(
+ const TTablePartitionCookiePtr& cookie,
+ const TReadTablePartitionOptions& /*options*/)
+{
+ YT_VERIFY(cookie);
+
+ auto proxy = CreateApiServiceProxy();
+ auto req = proxy.ReadTablePartition();
+ InitStreamingRequest(*req);
+
+ NProto::ToProto(req->mutable_cookie(), cookie);
+
+ return NRpc::CreateRpcClientInputStream(std::move(req))
+ .ApplyUnique(BIND([] (IAsyncZeroCopyInputStreamPtr&& inputStream) -> TFuture<ITablePartitionReaderPtr>{
+ return inputStream->Read().Apply(BIND([=] (const TSharedRef& metaRef) {
+ // Actually we don't have any metadata in first version but we can have it in future. Just parse empty proto.
+ NApi::NRpcProxy::NProto::TRspReadTablePartitionMeta meta;
+ if (!TryDeserializeProto(&meta, metaRef)) {
+ THROW_ERROR_EXCEPTION("Failed to deserialize partition table reader meta information");
+ }
+
+ auto rowBatchReader = New<TRowBatchReader>(std::move(inputStream), /*isStreamWithStatistics*/ false);
+
+ return NApi::CreateTablePartitionReader(rowBatchReader, /*schemas*/ {}, /*columnFilters=*/ {});
+ }));
+ }));
+}
+
TFuture<void> TClient::TruncateJournal(
const TYPath& path,
i64 rowCount,
diff --git a/yt/yt/client/api/rpc_proxy/client_impl.h b/yt/yt/client/api/rpc_proxy/client_impl.h
index db95372369..151a239862 100644
--- a/yt/yt/client/api/rpc_proxy/client_impl.h
+++ b/yt/yt/client/api/rpc_proxy/client_impl.h
@@ -347,6 +347,10 @@ public:
const std::vector<NYPath::TRichYPath>& paths,
const NApi::TPartitionTablesOptions& options) override;
+ TFuture<ITablePartitionReaderPtr> CreateTablePartitionReader(
+ const TTablePartitionCookiePtr& tablePartitionDescriptor,
+ const TReadTablePartitionOptions& options) override;
+
TFuture<void> TruncateJournal(
const NYPath::TYPath& path,
i64 rowCount,
diff --git a/yt/yt/client/api/rpc_proxy/connection_impl.cpp b/yt/yt/client/api/rpc_proxy/connection_impl.cpp
index 4ac7a42726..a6275f5d9a 100644
--- a/yt/yt/client/api/rpc_proxy/connection_impl.cpp
+++ b/yt/yt/client/api/rpc_proxy/connection_impl.cpp
@@ -1,10 +1,7 @@
#include "connection_impl.h"
-#include "discovery_service_proxy.h"
#include "connection_impl.h"
#include "client_impl.h"
#include "config.h"
-#include "helpers.h"
-#include "private.h"
#include <yt/yt/core/net/local_address.h>
#include <yt/yt/core/net/address.h>
diff --git a/yt/yt/client/api/rpc_proxy/discovery_service_proxy.h b/yt/yt/client/api/rpc_proxy/discovery_service_proxy.h
deleted file mode 100644
index 8a524be756..0000000000
--- a/yt/yt/client/api/rpc_proxy/discovery_service_proxy.h
+++ /dev/null
@@ -1,28 +0,0 @@
-#pragma once
-
-#include "public.h"
-
-#include "protocol_version.h"
-
-#include <yt/yt_proto/yt/client/api/rpc_proxy/proto/discovery_service.pb.h>
-
-#include <yt/yt/core/rpc/client.h>
-
-namespace NYT::NApi::NRpcProxy {
-
-////////////////////////////////////////////////////////////////////////////////
-
-class TDiscoveryServiceProxy
- : public NRpc::TProxyBase
-{
-public:
- DEFINE_RPC_PROXY(TDiscoveryServiceProxy, DiscoveryService,
- .SetProtocolVersion(NRpc::TProtocolVersion{0, 0}));
-
- DEFINE_RPC_PROXY_METHOD(NRpcProxy::NProto, DiscoverProxies,
- .SetMultiplexingBand(NRpc::EMultiplexingBand::Control));
-};
-
-////////////////////////////////////////////////////////////////////////////////
-
-} // namespace NYT::NApi::NRpcProxy
diff --git a/yt/yt/client/api/rpc_proxy/helpers.cpp b/yt/yt/client/api/rpc_proxy/helpers.cpp
index 67b0e789de..1a316ed374 100644
--- a/yt/yt/client/api/rpc_proxy/helpers.cpp
+++ b/yt/yt/client/api/rpc_proxy/helpers.cpp
@@ -1111,6 +1111,18 @@ void ToProto(
aggregateStatistics->set_chunk_count(multiTablePartition.AggregateStatistics.ChunkCount);
aggregateStatistics->set_data_weight(multiTablePartition.AggregateStatistics.DataWeight);
aggregateStatistics->set_row_count(multiTablePartition.AggregateStatistics.RowCount);
+
+ if (multiTablePartition.Cookie) {
+ ToProto(protoMultiTablePartition->mutable_cookie(), multiTablePartition.Cookie);
+ }
+}
+
+void ToProto(
+ TProtobufString* protoCookie,
+ const TTablePartitionCookiePtr& cookie)
+{
+ auto cookieBytes = ConvertToYsonString(cookie);
+ *protoCookie = cookieBytes.ToString();
}
void FromProto(
@@ -1127,6 +1139,10 @@ void FromProto(
multiTablePartition->AggregateStatistics.DataWeight = aggregateStatistics.data_weight();
multiTablePartition->AggregateStatistics.RowCount = aggregateStatistics.row_count();
}
+
+ if (protoMultiTablePartition.has_cookie()) {
+ FromProto(&multiTablePartition->Cookie, protoMultiTablePartition.cookie());
+ }
}
void FromProto(
@@ -1138,6 +1154,13 @@ void FromProto(
protoRspPartitionTables.partitions());
}
+void FromProto(
+ TTablePartitionCookiePtr* cookie,
+ const TProtobufString& protoCookie)
+{
+ *cookie = ConvertTo<TTablePartitionCookiePtr>(TYsonStringBuf(protoCookie));
+}
+
void ToProto(
NProto::TRowBatchReadOptions* proto,
const NQueueClient::TQueueRowBatchReadOptions& result)
diff --git a/yt/yt/client/api/rpc_proxy/helpers.h b/yt/yt/client/api/rpc_proxy/helpers.h
index 9e7f432e62..8bfa68b54f 100644
--- a/yt/yt/client/api/rpc_proxy/helpers.h
+++ b/yt/yt/client/api/rpc_proxy/helpers.h
@@ -196,6 +196,10 @@ void ToProto(
NProto::TMultiTablePartition* protoMultiTablePartition,
const NApi::TMultiTablePartition& multiTablePartition);
+void ToProto(
+ TProtobufString* protoCookie,
+ const TTablePartitionCookiePtr& cookie);
+
void FromProto(
NApi::TMultiTablePartition* multiTablePartition,
const NProto::TMultiTablePartition& protoMultiTablePartition);
@@ -204,6 +208,10 @@ void FromProto(
NApi::TMultiTablePartitions* multiTablePartitions,
const NProto::TRspPartitionTables& protoRspPartitionTables);
+void FromProto(
+ TTablePartitionCookiePtr* cookie,
+ const TProtobufString& protoCookie);
+
void ToProto(
NProto::TRowBatchReadOptions* proto,
const NQueueClient::TQueueRowBatchReadOptions& result);
diff --git a/yt/yt/client/api/table_client.cpp b/yt/yt/client/api/table_client.cpp
index 77259821b5..8e86a5f924 100644
--- a/yt/yt/client/api/table_client.cpp
+++ b/yt/yt/client/api/table_client.cpp
@@ -1,5 +1,7 @@
#include "table_client.h"
+#include <yt/yt/client/signature/signature.h>
+
#include <yt/yt/client/ypath/rich.h>
#include <yt/yt/core/ytree/fluent.h>
@@ -32,6 +34,10 @@ void Serialize(const TMultiTablePartition& partition, NYson::IYsonConsumer* cons
BuildYsonFluently(consumer)
.BeginMap()
.Item("table_ranges").Value(partition.TableRanges)
+ .DoIf(static_cast<bool>(partition.Cookie), [&] (TFluentMap fluent) {
+ auto ysonString = NYson::ConvertToYsonString(partition.Cookie);
+ fluent.Item("cookie").Value(ysonString.AsStringBuf());
+ })
.Item("aggregate_statistics").Value(partition.AggregateStatistics)
.EndMap();
}
@@ -49,4 +55,3 @@ void Serialize(const TMultiTablePartitions& partitions, NYson::IYsonConsumer* co
////////////////////////////////////////////////////////////////////////////////
} // namespace NYT::NApi
-
diff --git a/yt/yt/client/api/table_client.h b/yt/yt/client/api/table_client.h
index 01b2b28d20..c6faa288fa 100644
--- a/yt/yt/client/api/table_client.h
+++ b/yt/yt/client/api/table_client.h
@@ -316,13 +316,25 @@ struct TPartitionTablesOptions
std::optional<int> MaxPartitionCount;
bool AdjustDataWeightPerPartition = true;
bool EnableKeyGuarantee = false;
+
+ //! Whether to return cookies that can be fed to CreateTablePartitionReader.
+ bool EnableCookies = false;
};
+struct TReadTablePartitionOptions
+ : public TTableReaderOptions
+{ };
+
+YT_DEFINE_STRONG_TYPEDEF(TTablePartitionCookiePtr, NSignature::TSignaturePtr);
+
struct TMultiTablePartition
{
//! Table ranges are indexed by table index.
std::vector<NYPath::TRichYPath> TableRanges;
+ //! Cookie that can be fed into CreateTablePartitionReader.
+ TTablePartitionCookiePtr Cookie;
+
//! Aggregate statistics of all the table ranges in the partition.
NTableClient::TChunkStripeStatistics AggregateStatistics;
};
@@ -473,7 +485,15 @@ struct ITableClient
virtual TFuture<TMultiTablePartitions> PartitionTables(
const std::vector<NYPath::TRichYPath>& paths,
- const TPartitionTablesOptions& options) = 0;
+ const TPartitionTablesOptions& options = {}) = 0;
+
+ /**
+ * Read table partition that was previously received from PartitionTables method.
+ * This method is cheaper than ReadTable since such reading doesn't make request to master in typical case.
+ */
+ virtual TFuture<ITablePartitionReaderPtr> CreateTablePartitionReader(
+ const TTablePartitionCookiePtr& cookie,
+ const TReadTablePartitionOptions& options = {}) = 0;
};
////////////////////////////////////////////////////////////////////////////////
diff --git a/yt/yt/client/api/table_partition_reader.cpp b/yt/yt/client/api/table_partition_reader.cpp
new file mode 100644
index 0000000000..a3bd4621f2
--- /dev/null
+++ b/yt/yt/client/api/table_partition_reader.cpp
@@ -0,0 +1,73 @@
+#include "table_partition_reader.h"
+
+#include <yt/yt/client/table_client/schema.h>
+
+namespace NYT::NApi {
+
+using namespace NTableClient;
+
+////////////////////////////////////////////////////////////////////////////////
+
+class TTablePartitionReader
+ : public ITablePartitionReader
+{
+public:
+ TTablePartitionReader(
+ IRowBatchReaderPtr rowBatchReader,
+ std::vector<TTableSchemaPtr> tableSchemas,
+ std::vector<TColumnNameFilter> columnFilters)
+ : RowBatchReader_(std::move(rowBatchReader))
+ , TableSchemas_(std::move(tableSchemas))
+ , ColumnFilters_(std::move(columnFilters))
+ {
+ YT_VERIFY(TableSchemas_.size() == ColumnFilters_.size());
+ }
+
+ TFuture<void> GetReadyEvent() const override
+ {
+ return RowBatchReader_->GetReadyEvent();
+ }
+
+ IUnversionedRowBatchPtr Read(const TRowBatchReadOptions& options) override
+ {
+ return RowBatchReader_->Read(options);
+ }
+
+ const TNameTablePtr& GetNameTable() const override
+ {
+ return RowBatchReader_->GetNameTable();
+ }
+
+ std::vector<TTableSchemaPtr> GetTableSchemas() const override
+ {
+ if (TableSchemas_.empty()) {
+ THROW_ERROR_EXCEPTION("Table schemas are unknown");
+ }
+ return TableSchemas_;
+ }
+
+ std::vector<TColumnNameFilter> GetColumnFilters() const override
+ {
+ if (ColumnFilters_.empty()) {
+ THROW_ERROR_EXCEPTION("Column filters are unknown");
+ }
+ return ColumnFilters_;
+ }
+
+private:
+ const IRowBatchReaderPtr RowBatchReader_;
+ const std::vector<TTableSchemaPtr> TableSchemas_;
+ const std::vector<TColumnNameFilter> ColumnFilters_;
+};
+
+ITablePartitionReaderPtr CreateTablePartitionReader(
+ const IRowBatchReaderPtr& rowBatchReader,
+ const std::vector<TTableSchemaPtr>& tableSchemas,
+ const std::vector<TColumnNameFilter>& columnFilters)
+{
+ return New<TTablePartitionReader>(rowBatchReader, tableSchemas, columnFilters);
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT::NApi
diff --git a/yt/yt/client/api/table_partition_reader.h b/yt/yt/client/api/table_partition_reader.h
new file mode 100644
index 0000000000..5e2e909bec
--- /dev/null
+++ b/yt/yt/client/api/table_partition_reader.h
@@ -0,0 +1,33 @@
+#pragma once
+
+#include "public.h"
+#include "private.h"
+
+#include "row_batch_reader.h"
+
+namespace NYT::NApi {
+
+////////////////////////////////////////////////////////////////////////////////
+
+class ITablePartitionReader
+ : public virtual IRowBatchReader
+{
+private:
+ // Functions below are for internal usage only.
+ virtual std::vector<NTableClient::TTableSchemaPtr> GetTableSchemas() const = 0;
+ virtual std::vector<NTableClient::TColumnNameFilter> GetColumnFilters() const = 0;
+
+ friend std::vector<NTableClient::TTableSchemaPtr> NDetail::GetTableSchemas(const ITablePartitionReaderPtr&);
+ friend std::vector<NTableClient::TColumnNameFilter> NDetail::GetColumnFilters(const ITablePartitionReaderPtr&);
+};
+
+DEFINE_REFCOUNTED_TYPE(ITablePartitionReader)
+
+ITablePartitionReaderPtr CreateTablePartitionReader(
+ const IRowBatchReaderPtr& rowBatchReader,
+ const std::vector<NTableClient::TTableSchemaPtr>& tableSchemas,
+ const std::vector<NTableClient::TColumnNameFilter>& columnFilters);
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT::NApi
diff --git a/yt/yt/client/driver/driver.cpp b/yt/yt/client/driver/driver.cpp
index f699cbd74f..11c96e1335 100644
--- a/yt/yt/client/driver/driver.cpp
+++ b/yt/yt/client/driver/driver.cpp
@@ -192,6 +192,8 @@ public:
REGISTER_ALL(TPartitionTablesCommand, "partition_tables", Null, Structured, false, false);
+ REGISTER (TReadTablePartitionCommand, "read_table_partition", Null, Tabular, false, true , ApiVersion4);
+
REGISTER (TInsertRowsCommand, "insert_rows", Tabular, Null, true, true , ApiVersion3);
REGISTER (TLockRowsCommand, "lock_rows", Tabular, Null, true, true , ApiVersion3);
REGISTER (TDeleteRowsCommand, "delete_rows", Tabular, Null, true, true , ApiVersion3);
diff --git a/yt/yt/client/driver/driver.h b/yt/yt/client/driver/driver.h
index 8e8a25de41..b17edf8ba5 100644
--- a/yt/yt/client/driver/driver.h
+++ b/yt/yt/client/driver/driver.h
@@ -23,6 +23,8 @@
#include <yt/yt/core/ytree/public.h>
+#include <library/cpp/yt/memory/memory_usage_tracker.h>
+
namespace NYT::NDriver {
////////////////////////////////////////////////////////////////////////////////
@@ -78,6 +80,9 @@ struct TDriverRequest
//! before first write to output stream.
std::function<void()> ResponseParametersFinishedCallback;
+ //! Memory usage tracker.
+ IMemoryUsageTrackerPtr MemoryUsageTracker = GetNullMemoryUsageTracker();
+
void Reset();
private:
diff --git a/yt/yt/client/driver/table_commands.cpp b/yt/yt/client/driver/table_commands.cpp
index ce1155abfd..6a2d4eb19c 100644
--- a/yt/yt/client/driver/table_commands.cpp
+++ b/yt/yt/client/driver/table_commands.cpp
@@ -4,12 +4,17 @@
#include <yt/yt/client/api/rowset.h>
#include <yt/yt/client/api/skynet.h>
+#include <yt/yt/client/api/table_partition_reader.h>
#include <yt/yt/client/chaos_client/replication_card_serialization.h>
#include <yt/yt/client/formats/config.h>
#include <yt/yt/client/formats/parser.h>
+#include <yt/yt/client/signature/generator.h>
+#include <yt/yt/client/signature/signature.h>
+#include <yt/yt/client/signature/validator.h>
+
#include <yt/yt/client/table_client/adapters.h>
#include <yt/yt/client/table_client/blob_reader.h>
#include <yt/yt/client/table_client/columnar_statistics.h>
@@ -33,6 +38,7 @@
namespace NYT::NDriver {
using namespace NApi;
+using namespace NApi::NDetail;
using namespace NChaosClient;
using namespace NChunkClient;
using namespace NCodegen;
@@ -230,6 +236,51 @@ void TReadBlobTableCommand::DoExecute(ICommandContextPtr context)
////////////////////////////////////////////////////////////////////////////////
+void TReadTablePartitionCommand::Register(TRegistrar registrar)
+{
+ registrar.Parameter("cookie", &TThis::Cookie);
+}
+
+void TReadTablePartitionCommand::DoExecute(ICommandContextPtr context)
+{
+ auto client = context->GetClient();
+
+ auto cookie = ConvertTo<TTablePartitionCookiePtr>(TYsonString(Cookie));
+
+ auto valid = WaitFor(context->GetDriver()->GetSignatureValidator()->Validate(cookie.Underlying()))
+ .ValueOrThrow();
+
+ if (!valid) {
+ THROW_ERROR_EXCEPTION("Signature validation failed");
+ }
+
+ auto reader = WaitFor(client->CreateTablePartitionReader(cookie))
+ .ValueOrThrow();
+
+ auto format = context->GetOutputFormat();
+ auto formatWriter = CreateStaticTableWriterForFormat(
+ /*format*/ format,
+ /*nameTable*/ reader->GetNameTable(),
+ /*tableSchemas*/ GetTableSchemas(reader),
+ /*columnFilters*/ GetColumnFilters(reader),
+ /*output*/ context->Request().OutputStream,
+ /*enableContextSaving*/ false,
+ /*controlAttributesConfig*/ New<TControlAttributesConfig>(),
+ /*keyColumnCount*/ 0);
+
+ TRowBatchReadOptions options{
+ .MaxRowsPerRead = context->GetConfig()->ReadBufferRowCount,
+ .Columnar = (format.GetType() == EFormatType::Arrow),
+ };
+
+ PipeReaderToWriterByBatches(
+ reader,
+ formatWriter,
+ options);
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
void TLocateSkynetShareCommand::Register(TRegistrar registrar)
{
registrar.Parameter("path", &TThis::Path);
@@ -298,7 +349,8 @@ void TWriteTableCommand::DoExecuteImpl(const ICommandContextPtr& context)
TWritingValueConsumer valueConsumer(
schemalessWriter,
ConvertTo<TTypeConversionConfigPtr>(context->GetInputFormat().Attributes()),
- MaxRowBufferSize);
+ MaxRowBufferSize,
+ context->Request().MemoryUsageTracker);
TTableOutput output(CreateParserForFormat(
context->GetInputFormat(),
@@ -454,6 +506,8 @@ void TPartitionTablesCommand::Register(TRegistrar registrar)
.Default(false);
registrar.Parameter("adjust_data_weight_per_partition", &TThis::AdjustDataWeightPerPartition)
.Default(true);
+ registrar.Parameter("enable_cookies", &TThis::EnableCookies)
+ .Default(false);
}
void TPartitionTablesCommand::DoExecute(ICommandContextPtr context)
@@ -467,10 +521,17 @@ void TPartitionTablesCommand::DoExecute(ICommandContextPtr context)
Options.MaxPartitionCount = MaxPartitionCount;
Options.EnableKeyGuarantee = EnableKeyGuarantee;
Options.AdjustDataWeightPerPartition = AdjustDataWeightPerPartition;
+ Options.EnableCookies = EnableCookies;
auto partitions = WaitFor(context->GetClient()->PartitionTables(Paths, Options))
.ValueOrThrow();
+ for (auto& partition : partitions.Partitions) {
+ if (partition.Cookie) {
+ context->GetDriver()->GetSignatureGenerator()->Sign(partition.Cookie.Underlying());
+ }
+ }
+
context->ProduceOutputValue(ConvertToYsonString(partitions));
}
diff --git a/yt/yt/client/driver/table_commands.h b/yt/yt/client/driver/table_commands.h
index 26dacebf7e..eebdc492cd 100644
--- a/yt/yt/client/driver/table_commands.h
+++ b/yt/yt/client/driver/table_commands.h
@@ -58,6 +58,21 @@ private:
////////////////////////////////////////////////////////////////////////////////
+class TReadTablePartitionCommand
+ : public TTypedCommand<NApi::TTableReaderOptions>
+{
+ REGISTER_YSON_STRUCT_LITE(TReadTablePartitionCommand);
+
+ static void Register(TRegistrar registrar);
+
+private:
+ std::string Cookie;
+
+ void DoExecute(ICommandContextPtr context) override;
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
class TLocateSkynetShareCommand
: public TTypedCommand<NApi::TLocateSkynetShareOptions>
{
@@ -141,6 +156,9 @@ private:
//! the #partition_tables command will throw an exception.
bool AdjustDataWeightPerPartition;
+ //! Return cookies that can be used with read_table_partition command
+ bool EnableCookies;
+
void DoExecute(ICommandContextPtr context) override;
};
diff --git a/yt/yt/client/federated/client.cpp b/yt/yt/client/federated/client.cpp
index 9e997a2728..ffce8ff06c 100644
--- a/yt/yt/client/federated/client.cpp
+++ b/yt/yt/client/federated/client.cpp
@@ -394,6 +394,7 @@ public:
UNIMPLEMENTED_METHOD(TFuture<TSkynetSharePartsLocationsPtr>, LocateSkynetShare, (const NYPath::TRichYPath&, const TLocateSkynetShareOptions&));
UNIMPLEMENTED_METHOD(TFuture<std::vector<NTableClient::TColumnarStatistics>>, GetColumnarStatistics, (const std::vector<NYPath::TRichYPath>&, const TGetColumnarStatisticsOptions&));
UNIMPLEMENTED_METHOD(TFuture<TMultiTablePartitions>, PartitionTables, (const std::vector<NYPath::TRichYPath>&, const TPartitionTablesOptions&));
+ UNIMPLEMENTED_METHOD(TFuture<ITablePartitionReaderPtr>, CreateTablePartitionReader, (const TTablePartitionCookiePtr&, const TReadTablePartitionOptions&));
UNIMPLEMENTED_METHOD(TFuture<NYson::TYsonString>, GetTablePivotKeys, (const NYPath::TYPath&, const TGetTablePivotKeysOptions&));
UNIMPLEMENTED_METHOD(TFuture<void>, CreateTableBackup, (const TBackupManifestPtr&, const TCreateTableBackupOptions&));
UNIMPLEMENTED_METHOD(TFuture<void>, RestoreTableBackup, (const TBackupManifestPtr&, const TRestoreTableBackupOptions&));
diff --git a/yt/yt/client/hedging/hedging.cpp b/yt/yt/client/hedging/hedging.cpp
index 09ca2fbc0e..b6f7e294da 100644
--- a/yt/yt/client/hedging/hedging.cpp
+++ b/yt/yt/client/hedging/hedging.cpp
@@ -1,14 +1,14 @@
#include "hedging.h"
-#include "cache.h"
#include "config.h"
#include "counter.h"
#include "rpc.h"
-#include "private.h"
#include <yt/yt/client/api/client.h>
#include <yt/yt/client/api/queue_transaction.h>
+#include <yt/yt/client/cache/cache.h>
+
#include <yt/yt/client/misc/method_helpers.h>
#include <yt/yt/client/table_client/unversioned_row.h>
@@ -141,6 +141,7 @@ public:
UNSUPPORTED_METHOD(TFuture<TSkynetSharePartsLocationsPtr>, LocateSkynetShare, (const TRichYPath&, const TLocateSkynetShareOptions&));
UNSUPPORTED_METHOD(TFuture<std::vector<NTableClient::TColumnarStatistics>>, GetColumnarStatistics, (const std::vector<TRichYPath>&, const TGetColumnarStatisticsOptions&));
UNSUPPORTED_METHOD(TFuture<TMultiTablePartitions>, PartitionTables, (const std::vector<TRichYPath>&, const TPartitionTablesOptions&));
+ UNSUPPORTED_METHOD(TFuture<ITablePartitionReaderPtr>, CreateTablePartitionReader, (const TTablePartitionCookiePtr&, const TReadTablePartitionOptions&));
UNSUPPORTED_METHOD(TFuture<NYson::TYsonString>, GetTablePivotKeys, (const TYPath&, const TGetTablePivotKeysOptions&));
UNSUPPORTED_METHOD(TFuture<void>, CreateTableBackup, (const TBackupManifestPtr&, const TCreateTableBackupOptions&));
UNSUPPORTED_METHOD(TFuture<void>, RestoreTableBackup, (const TBackupManifestPtr&, const TRestoreTableBackupOptions&));
diff --git a/yt/yt/client/misc/workload.h b/yt/yt/client/misc/workload.h
index 559402052b..63ce3a7cb3 100644
--- a/yt/yt/client/misc/workload.h
+++ b/yt/yt/client/misc/workload.h
@@ -16,6 +16,34 @@ namespace NYT {
////////////////////////////////////////////////////////////////////////////////
+// This distribution of weights corresponds to the logic of the GetBasicPriority method.
+static const TEnumIndexedArray<EWorkloadCategory, double> DefaultFairShareWorkloadCategoryWeights = {
+ {EWorkloadCategory::Idle, 0},
+
+ {EWorkloadCategory::SystemReplication, 1},
+ {EWorkloadCategory::SystemMerge, 1},
+ {EWorkloadCategory::SystemReincarnation, 1},
+ {EWorkloadCategory::SystemTabletCompaction, 1},
+ {EWorkloadCategory::SystemTabletPartitioning, 1},
+ {EWorkloadCategory::SystemTabletPreload, 1},
+ {EWorkloadCategory::SystemTabletReplication, 1},
+ {EWorkloadCategory::SystemTabletStoreFlush, 1},
+ {EWorkloadCategory::SystemArtifactCacheDownload, 1},
+ {EWorkloadCategory::UserBatch, 1},
+
+ {EWorkloadCategory::SystemRepair, 2},
+ {EWorkloadCategory::SystemTabletSnapshot, 2},
+
+ {EWorkloadCategory::UserInteractive, 3},
+ {EWorkloadCategory::UserDynamicStoreRead, 3},
+ {EWorkloadCategory::SystemTabletRecovery, 3},
+
+ {EWorkloadCategory::UserRealtime, 4},
+ {EWorkloadCategory::SystemTabletLogging, 4},
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
struct TWorkloadDescriptor
{
TWorkloadDescriptor(
diff --git a/yt/yt/client/signature/generator.cpp b/yt/yt/client/signature/generator.cpp
index 71c12d6ae2..33a431a5ab 100644
--- a/yt/yt/client/signature/generator.cpp
+++ b/yt/yt/client/signature/generator.cpp
@@ -34,6 +34,12 @@ ISignatureGeneratorPtr CreateDummySignatureGenerator()
return New<TDummySignatureGenerator>();
}
+const ISignatureGeneratorPtr& GetDummySignatureGenerator()
+{
+ static ISignatureGeneratorPtr signatureGenerator = CreateDummySignatureGenerator();
+ return signatureGenerator;
+}
+
////////////////////////////////////////////////////////////////////////////////
struct TAlwaysThrowingSignatureGenerator
diff --git a/yt/yt/client/signature/generator.h b/yt/yt/client/signature/generator.h
index 6574128736..771288af7f 100644
--- a/yt/yt/client/signature/generator.h
+++ b/yt/yt/client/signature/generator.h
@@ -23,6 +23,7 @@ DEFINE_REFCOUNTED_TYPE(ISignatureGenerator)
////////////////////////////////////////////////////////////////////////////////
ISignatureGeneratorPtr CreateDummySignatureGenerator();
+const ISignatureGeneratorPtr& GetDummySignatureGenerator();
ISignatureGeneratorPtr CreateAlwaysThrowingSignatureGenerator();
diff --git a/yt/yt/client/table_client/public.h b/yt/yt/client/table_client/public.h
index ac51658026..200ce9f0aa 100644
--- a/yt/yt/client/table_client/public.h
+++ b/yt/yt/client/table_client/public.h
@@ -275,6 +275,8 @@ using TKeyColumnTypes = TCompactVector<EValueType, 16>;
class TColumnFilter;
+using TColumnNameFilter = std::optional<std::vector<std::string>>;
+
struct TUnversionedValue;
using TUnversionedValueRange = TRange<TUnversionedValue>;
using TMutableUnversionedValueRange = TMutableRange<TUnversionedValue>;
diff --git a/yt/yt/client/table_client/row_buffer.cpp b/yt/yt/client/table_client/row_buffer.cpp
index bb16d61c1b..4df6c62a9d 100644
--- a/yt/yt/client/table_client/row_buffer.cpp
+++ b/yt/yt/client/table_client/row_buffer.cpp
@@ -12,12 +12,15 @@ TRowBuffer::TRowBuffer(
TRefCountedTypeCookie tagCookie,
IMemoryChunkProviderPtr chunkProvider,
size_t startChunkSize,
- IMemoryUsageTrackerPtr tracker)
+ IMemoryUsageTrackerPtr tracker,
+ bool allowMemoryOvercommit)
: MemoryTracker_(std::move(tracker))
+ , AllowMemoryOvercommit_(allowMemoryOvercommit)
, Pool_(
tagCookie,
std::move(chunkProvider),
startChunkSize)
+ , MemoryGuard_(TMemoryUsageTrackerGuard::Build(MemoryTracker_))
{ }
TChunkedMemoryPool* TRowBuffer::GetPool()
@@ -28,7 +31,7 @@ TChunkedMemoryPool* TRowBuffer::GetPool()
TMutableUnversionedRow TRowBuffer::AllocateUnversioned(int valueCount)
{
auto result = TMutableUnversionedRow::Allocate(&Pool_, valueCount);
- ValidateNoOverflow();
+ UpdateMemoryUsage();
return result;
}
@@ -44,7 +47,7 @@ TMutableVersionedRow TRowBuffer::AllocateVersioned(
valueCount,
writeTimestampCount,
deleteTimestampCount);
- ValidateNoOverflow();
+ UpdateMemoryUsage();
return result;
}
@@ -56,7 +59,7 @@ void TRowBuffer::CaptureValue(TUnversionedValue* value)
value->Data.String = dst;
}
- ValidateNoOverflow();
+ UpdateMemoryUsage();
}
TVersionedValue TRowBuffer::CaptureValue(const TVersionedValue& value)
@@ -107,7 +110,7 @@ TMutableUnversionedRow TRowBuffer::CaptureRow(TUnversionedValueRange values, boo
}
}
- ValidateNoOverflow();
+ UpdateMemoryUsage();
return capturedRow;
}
@@ -175,7 +178,7 @@ TMutableUnversionedRow TRowBuffer::CaptureAndPermuteRow(
capturedRow[valueCount++] = *addend;
}
- ValidateNoOverflow();
+ UpdateMemoryUsage();
return capturedRow;
}
@@ -201,7 +204,7 @@ TMutableVersionedRow TRowBuffer::CaptureRow(TVersionedRow row, bool captureValue
CaptureValues(capturedRow);
}
- ValidateNoOverflow();
+ UpdateMemoryUsage();
return capturedRow;
}
@@ -310,7 +313,7 @@ TMutableVersionedRow TRowBuffer::CaptureAndPermuteRow(
}
}
- ValidateNoOverflow();
+ UpdateMemoryUsage();
return capturedRow;
}
@@ -318,7 +321,7 @@ TMutableVersionedRow TRowBuffer::CaptureAndPermuteRow(
void TRowBuffer::Absorb(TRowBuffer&& other)
{
Pool_.Absorb(std::move(other.Pool_));
- ValidateNoOverflow();
+ UpdateMemoryUsage();
}
i64 TRowBuffer::GetSize() const
@@ -333,17 +336,17 @@ i64 TRowBuffer::GetCapacity() const
void TRowBuffer::Clear()
{
- MemoryGuard_.reset();
+ MemoryGuard_.Release();
Pool_.Clear();
}
void TRowBuffer::Purge()
{
- MemoryGuard_.reset();
+ MemoryGuard_.Release();
Pool_.Purge();
}
-void TRowBuffer::ValidateNoOverflow()
+void TRowBuffer::UpdateMemoryUsage()
{
if (!MemoryTracker_) {
return;
@@ -351,11 +354,10 @@ void TRowBuffer::ValidateNoOverflow()
auto capacity = Pool_.GetCapacity();
- if (!MemoryGuard_) {
- MemoryGuard_ = TMemoryUsageTrackerGuard::TryAcquire(MemoryTracker_, capacity)
- .ValueOrThrow();
+ if (AllowMemoryOvercommit_) {
+ MemoryGuard_.SetSize(capacity);
} else {
- MemoryGuard_->TrySetSize(capacity)
+ MemoryGuard_.TrySetSize(capacity)
.ThrowOnError();
}
}
diff --git a/yt/yt/client/table_client/row_buffer.h b/yt/yt/client/table_client/row_buffer.h
index 2f7033827e..1896f84579 100644
--- a/yt/yt/client/table_client/row_buffer.h
+++ b/yt/yt/client/table_client/row_buffer.h
@@ -26,14 +26,17 @@ public:
TRefCountedTypeCookie tagCookie,
IMemoryChunkProviderPtr chunkProvider,
size_t startChunkSize = TChunkedMemoryPool::DefaultStartChunkSize,
- IMemoryUsageTrackerPtr tracker = nullptr);
+ IMemoryUsageTrackerPtr tracker = nullptr,
+ bool allowMemoryOvercommit = false);
template <class TTag = TDefaultRowBufferPoolTag>
explicit TRowBuffer(
TTag /*tag*/ = TDefaultRowBufferPoolTag(),
size_t startChunkSize = TChunkedMemoryPool::DefaultStartChunkSize,
- IMemoryUsageTrackerPtr tracker = nullptr)
+ IMemoryUsageTrackerPtr tracker = nullptr,
+ bool allowMemoryOvercommit = false)
: MemoryTracker_(std::move(tracker))
+ , AllowMemoryOvercommit_(allowMemoryOvercommit)
, Pool_(
TTag(),
startChunkSize)
@@ -45,8 +48,10 @@ public:
TRowBuffer(
TTag /*tag*/,
IMemoryChunkProviderPtr chunkProvider,
- IMemoryUsageTrackerPtr tracker = nullptr)
+ IMemoryUsageTrackerPtr tracker = nullptr,
+ bool allowMemoryOvercommit = false)
: MemoryTracker_(std::move(tracker))
+ , AllowMemoryOvercommit_(allowMemoryOvercommit)
, Pool_(
GetRefCountedTypeCookie<TTag>(),
std::move(chunkProvider))
@@ -112,11 +117,12 @@ public:
private:
const IMemoryUsageTrackerPtr MemoryTracker_;
+ const bool AllowMemoryOvercommit_;
TChunkedMemoryPool Pool_;
- std::optional<TMemoryUsageTrackerGuard> MemoryGuard_;
+ TMemoryUsageTrackerGuard MemoryGuard_;
- void ValidateNoOverflow();
+ void UpdateMemoryUsage();
};
DEFINE_REFCOUNTED_TYPE(TRowBuffer)
diff --git a/yt/yt/client/table_client/value_consumer.cpp b/yt/yt/client/table_client/value_consumer.cpp
index 4afb23343d..41db811312 100644
--- a/yt/yt/client/table_client/value_consumer.cpp
+++ b/yt/yt/client/table_client/value_consumer.cpp
@@ -316,11 +316,16 @@ struct TWritingValueConsumerBufferTag
TWritingValueConsumer::TWritingValueConsumer(
IUnversionedWriterPtr writer,
TTypeConversionConfigPtr typeConversionConfig,
- i64 maxRowBufferSize)
+ i64 maxRowBufferSize,
+ IMemoryUsageTrackerPtr tracker)
: TValueConsumerBase(writer->GetSchema(), std::move(typeConversionConfig))
, Writer_(std::move(writer))
, MaxRowBufferSize_(maxRowBufferSize)
- , RowBuffer_(New<TRowBuffer>(TWritingValueConsumerBufferTag()))
+ , RowBuffer_(New<TRowBuffer>(
+ TWritingValueConsumerBufferTag(),
+ TChunkedMemoryPool::DefaultStartChunkSize,
+ std::move(tracker),
+ /*allowMemoryOvercommit*/ true))
{
YT_VERIFY(Writer_);
InitializeIdToTypeMapping();
diff --git a/yt/yt/client/table_client/value_consumer.h b/yt/yt/client/table_client/value_consumer.h
index 08ed4e9506..c32b5859f9 100644
--- a/yt/yt/client/table_client/value_consumer.h
+++ b/yt/yt/client/table_client/value_consumer.h
@@ -129,7 +129,8 @@ public:
explicit TWritingValueConsumer(
IUnversionedWriterPtr writer,
TTypeConversionConfigPtr typeConversionConfig = New<TTypeConversionConfig>(),
- i64 maxRowBufferSize = 1_MB);
+ i64 maxRowBufferSize = 1_MB,
+ IMemoryUsageTrackerPtr tracker = GetNullMemoryUsageTracker());
TFuture<void> Flush() override;
const TNameTablePtr& GetNameTable() const override;
diff --git a/yt/yt/client/unittests/mock/client.h b/yt/yt/client/unittests/mock/client.h
index 95f8a29b35..254f08dd90 100644
--- a/yt/yt/client/unittests/mock/client.h
+++ b/yt/yt/client/unittests/mock/client.h
@@ -516,6 +516,11 @@ public:
const TPartitionTablesOptions& options),
(override));
+ MOCK_METHOD(TFuture<ITablePartitionReaderPtr>, CreateTablePartitionReader, (
+ const TTablePartitionCookiePtr& partition,
+ const TReadTablePartitionOptions& options),
+ (override));
+
MOCK_METHOD(TFuture<void>, TruncateJournal, (
const NYPath::TYPath& path,
i64 rowCount,
diff --git a/yt/yt/client/ya.make b/yt/yt/client/ya.make
index c660a2bd97..ee5c8da6ee 100644
--- a/yt/yt/client/ya.make
+++ b/yt/yt/client/ya.make
@@ -33,6 +33,8 @@ SRCS(
api/sticky_transaction_pool.cpp
api/options.cpp
api/shuffle_client.cpp
+ api/table_partition_reader.cpp
+ api/private.cpp
api/rpc_proxy/address_helpers.cpp
api/rpc_proxy/public.cpp
diff --git a/yt/yt/core/logging/structured_log-inl.h b/yt/yt/core/logging/structured_log-inl.h
index b4f9c438b8..0368f4214b 100644
--- a/yt/yt/core/logging/structured_log-inl.h
+++ b/yt/yt/core/logging/structured_log-inl.h
@@ -12,16 +12,40 @@ namespace NYT::NLogging {
////////////////////////////////////////////////////////////////////////////////
+namespace NDetail {
+
+////////////////////////////////////////////////////////////////////////////////
+
+consteval TStructuredLogKey::TStructuredLogKey(const char* key)
+ : Key(key)
+{
+ if (std::string_view(key) == "message") {
+ // Entering this branch causes a compilation failure that contains this
+ // string in the error message.
+ throw "Key 'message' is reserved and can not be used";
+ }
+}
+
+template <typename T>
+std::tuple<TStructuredLogKey, T> MakeTuple(TStructuredLogKey k, T&& t)
+{
+ return {k, std::forward<T>(t)};
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NDetail
+
template <typename... Values>
void LogStructuredEvent(
const TLogger& logger,
ELogLevel level,
std::string_view message,
- std::tuple<const char*, Values>&&... tags)
+ std::tuple<NDetail::TStructuredLogKey, Values>&&... tags)
{
auto event = LogStructuredEventFluently(logger, level).Item("message").Value(message);
- ((event = event.Item(std::get<0>(tags)).Value(std::forward<Values>(std::get<1>(tags)))), ...);
+ ((event = event.Item(std::get<0>(tags).Key).Value(std::forward<Values>(std::get<1>(tags)))), ...);
}
////////////////////////////////////////////////////////////////////////////////
diff --git a/yt/yt/core/logging/structured_log.h b/yt/yt/core/logging/structured_log.h
index d6e4fe476b..6aaa696f5c 100644
--- a/yt/yt/core/logging/structured_log.h
+++ b/yt/yt/core/logging/structured_log.h
@@ -20,19 +20,37 @@ namespace NYT::NLogging {
////////////////////////////////////////////////////////////////////////////////
+namespace NDetail {
+
+////////////////////////////////////////////////////////////////////////////////
+
+// Helper type to validate keys at compile-time.
+struct TStructuredLogKey
+{
+ std::string_view Key;
+ consteval TStructuredLogKey(const char* key);
+};
+
+template <typename T>
+std::tuple<TStructuredLogKey, T> MakeTuple(TStructuredLogKey k, T&& t);
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NDetail
+
template <typename... Values>
void LogStructuredEvent(
const TLogger& logger,
ELogLevel level,
std::string_view message,
- std::tuple<const char*, Values>&&... tags);
+ std::tuple<NDetail::TStructuredLogKey, Values>&&... tags);
////////////////////////////////////////////////////////////////////////////////
} // namespace NYT::NLogging
-#define YT_SLOG_TUPLE(x) std::make_tuple x,
-#define YT_SLOG_TUPLE_LAST(x) std::make_tuple x
+#define YT_SLOG_TUPLE(x) NYT::NLogging::NDetail::MakeTuple x,
+#define YT_SLOG_TUPLE_LAST(x) NYT::NLogging::NDetail::MakeTuple x
// YT_SLOG_EVENT enforces that primary structured log messages are known at
// compile-time, because structured errors (and logs in general) are usually
diff --git a/yt/yt/core/misc/fair_share_hierarchical_queue-inl.h b/yt/yt/core/misc/fair_share_hierarchical_queue-inl.h
index db9b51223a..8a6b69eabe 100644
--- a/yt/yt/core/misc/fair_share_hierarchical_queue-inl.h
+++ b/yt/yt/core/misc/fair_share_hierarchical_queue-inl.h
@@ -20,7 +20,7 @@ TFairShareHierarchyLevel<TTag>::TFairShareHierarchyLevel(TTag tag, double weight
: Tag_(std::move(tag))
, Weight_(weight)
{
- YT_VERIFY(weight > 0.0);
+ YT_VERIFY(weight >= 0.0);
}
template <typename TTag>
@@ -35,6 +35,19 @@ double TFairShareHierarchyLevel<TTag>::GetWeight() const
return Weight_;
}
+template <typename TTag>
+std::vector<TFairShareHierarchyLevel<TTag>> CreateHierarchyLevels(const std::vector<std::pair<TTag, double>>& tags)
+{
+ std::vector<TFairShareHierarchyLevel<TTag>> levels;
+ levels.reserve(tags.size());
+
+ for (const auto& tag : tags) {
+ levels.emplace_back(tag.first, tag.second);
+ }
+
+ return levels;
+}
+
////////////////////////////////////////////////////////////////////////////////
template <typename TTag>
diff --git a/yt/yt/core/misc/fair_share_hierarchical_queue.h b/yt/yt/core/misc/fair_share_hierarchical_queue.h
index 7149e11b55..9cc4c430a2 100644
--- a/yt/yt/core/misc/fair_share_hierarchical_queue.h
+++ b/yt/yt/core/misc/fair_share_hierarchical_queue.h
@@ -69,6 +69,9 @@ private:
const double Weight_;
};
+template <typename TTag>
+std::vector<TFairShareHierarchyLevel<TTag>> CreateHierarchyLevels(const std::vector<std::pair<TTag, double>>& tags);
+
////////////////////////////////////////////////////////////////////////////////
struct TFairShareLogKey
diff --git a/yt/yt/core/misc/serialize.cpp b/yt/yt/core/misc/serialize.cpp
index 6bc396ddfc..b14db02bac 100644
--- a/yt/yt/core/misc/serialize.cpp
+++ b/yt/yt/core/misc/serialize.cpp
@@ -300,4 +300,3 @@ TEntityStreamLoadContext::TEntityStreamLoadContext(
////////////////////////////////////////////////////////////////////////////////
} // namespace NYT
-
diff --git a/yt/yt/library/named_value/named_value.cpp b/yt/yt/library/named_value/named_value.cpp
index a283e28849..b041bcd772 100644
--- a/yt/yt/library/named_value/named_value.cpp
+++ b/yt/yt/library/named_value/named_value.cpp
@@ -1,12 +1,16 @@
#include "named_value.h"
+#include <yt/yt/core/ytree/convert.h>
+
+#include <util/string/escape.h>
+
namespace NYT::NNamedValue {
using namespace NTableClient;
////////////////////////////////////////////////////////////////////////////////
-NTableClient::TUnversionedOwningRow MakeRow(
+TUnversionedOwningRow MakeRow(
const TNameTablePtr& nameTable,
const std::initializer_list<TNamedValue>& values)
{
@@ -17,7 +21,7 @@ NTableClient::TUnversionedOwningRow MakeRow(
return builder.FinishRow();
}
-NTableClient::TUnversionedOwningRow MakeRow(
+TUnversionedOwningRow MakeRow(
const TNameTablePtr& nameTable,
const std::vector<TNamedValue>& values)
{
@@ -28,9 +32,23 @@ NTableClient::TUnversionedOwningRow MakeRow(
return builder.FinishRow();
}
+std::vector<TNamedValue> MakeNamedValueList(
+ const TNameTablePtr& nameTable,
+ TUnversionedRow row)
+{
+ std::vector<TNamedValue> result;
+ result.reserve(row.GetCount());
+
+ for (const auto& value : row) {
+ auto namedValue = TNamedValue(TString(nameTable->GetNameOrThrow(value.Id)), TNamedValue::ExtractValue(value));
+ result.push_back(std::move(namedValue));
+ }
+ return result;
+}
+
////////////////////////////////////////////////////////////////////////////////
-NTableClient::TUnversionedValue TNamedValue::ToUnversionedValue(const NTableClient::TNameTablePtr& nameTable) const
+TUnversionedValue TNamedValue::ToUnversionedValue(const TNameTablePtr& nameTable) const
{
const int valueId = nameTable->GetIdOrRegisterName(Name_);
return std::visit([valueId] (const auto& value) -> TUnversionedValue {
@@ -56,7 +74,7 @@ NTableClient::TUnversionedValue TNamedValue::ToUnversionedValue(const NTableClie
}, Value_);
}
-TNamedValue::TValue TNamedValue::ExtractValue(const NTableClient::TUnversionedValue& value)
+TNamedValue::TValue TNamedValue::ExtractValue(const TUnversionedValue& value)
{
auto getString = [] (const TUnversionedValue& value) {
return value.AsString();
@@ -86,8 +104,8 @@ TNamedValue::TValue TNamedValue::ExtractValue(const NTableClient::TUnversionedVa
YT_ABORT();
}
-TNamedValue::TValue TNamedValue::ToValue(NTableClient::EValueType valueType, TStringBuf value) {
- using namespace NTableClient;
+TNamedValue::TValue TNamedValue::ToValue(EValueType valueType, TStringBuf value)
+{
if (valueType == EValueType::String) {
return TString(value);
} else if (valueType == EValueType::Any) {
@@ -99,16 +117,34 @@ TNamedValue::TValue TNamedValue::ToValue(NTableClient::EValueType valueType, TSt
}
}
-////////////////////////////////////////////////////////////////////////////////
-
-bool operator ==(const TNamedValue::TAny& lhs, const TNamedValue::TAny& rhs)
+void FormatValue(TStringBuilderBase* builder, const TNamedValue& value, TStringBuf /*spec*/)
{
- return lhs.Value == rhs.Value;
+ using namespace NYson;
+
+ builder->AppendFormat("%Qv=", value.Name_);
+ auto text = std::visit([] (const auto& value) -> TString {
+ using T = std::decay_t<decltype(value)>;
+ if constexpr (std::is_same_v<T, TNamedValue::TAny>) {
+ auto result = TString("<type=any>");
+ result += value.Value;
+ return ConvertToYsonString(TYsonString(result), EYsonFormat::Text).ToString();
+ } else if constexpr (std::is_same_v<T, TNamedValue::TComposite>) {
+ auto result = TString("<type=composite>");
+ result += ConvertToYsonString(TYsonString(value.Value)).ToString();
+ return ConvertToYsonString(TYsonString(result), EYsonFormat::Text).ToString();
+ } else if constexpr (std::is_same_v<T, std::nullptr_t>) {
+ return "#";
+ } else {
+ return ConvertToYsonString(value, EYsonFormat::Text).ToString();
+ }
+ }, value.Value_);
+
+ builder->AppendString(ConvertToYsonString(text, EYsonFormat::Text).AsStringBuf());
}
-bool operator ==(const TNamedValue::TComposite& lhs, const TNamedValue::TComposite& rhs)
+void PrintTo(const TNamedValue& value, std::ostream* os)
{
- return lhs.Value == rhs.Value;
+ *os << Format("%v", value);
}
////////////////////////////////////////////////////////////////////////////////
diff --git a/yt/yt/library/named_value/named_value.h b/yt/yt/library/named_value/named_value.h
index 67592fe9f3..c00081219a 100644
--- a/yt/yt/library/named_value/named_value.h
+++ b/yt/yt/library/named_value/named_value.h
@@ -32,6 +32,9 @@ NTableClient::TUnversionedOwningRow MakeRow(
const NTableClient::TNameTablePtr& nameTable,
const std::vector<TNamedValue>& values);
+
+std::vector<TNamedValue> MakeNamedValueList(const NTableClient::TNameTablePtr& nameTable, NTableClient::TUnversionedRow row);
+
////////////////////////////////////////////////////////////////////////////////
//! Slow but convenient analogue of TUnversionedValue
@@ -41,11 +44,13 @@ public:
struct TAny
{
TString Value;
+ friend bool operator ==(const TNamedValue::TAny& lhs, const TNamedValue::TAny& rhs) = default;
};
struct TComposite
{
TString Value;
+ friend bool operator ==(const TNamedValue::TComposite& lhs, const TNamedValue::TComposite& rhs) = default;
};
using TValue = std::variant<std::nullptr_t, i64, ui64, double, bool, TString, TAny, TComposite>;
@@ -90,12 +95,11 @@ private:
private:
TString Name_;
TValue Value_;
-};
-////////////////////////////////////////////////////////////////////////////////
-
-bool operator ==(const TNamedValue::TAny& lhs, const TNamedValue::TAny& rhs);
-bool operator ==(const TNamedValue::TComposite& lhs, const TNamedValue::TComposite& rhs);
+ friend bool operator ==(const TNamedValue& lhs, const TNamedValue& rhs) = default;
+ friend void PrintTo(const TNamedValue& value, std::ostream* os);
+ friend void FormatValue(TStringBuilderBase* builder, const TNamedValue& value, TStringBuf spec);
+};
////////////////////////////////////////////////////////////////////////////////
diff --git a/yt/yt_proto/yt/client/api/rpc_proxy/proto/api_service.proto b/yt/yt_proto/yt/client/api/rpc_proxy/proto/api_service.proto
index ba3879c8c5..75c4ffe241 100644
--- a/yt/yt_proto/yt/client/api/rpc_proxy/proto/api_service.proto
+++ b/yt/yt_proto/yt/client/api/rpc_proxy/proto/api_service.proto
@@ -2150,6 +2150,8 @@ message TReqPartitionTables
optional bool enable_key_guarantee = 8;
optional bool adjust_data_weight_per_partition = 9;
+ optional bool enable_cookies = 10;
+
optional TTransactionalOptions transactional_options = 100;
}
@@ -2165,6 +2167,8 @@ message TMultiTablePartition
}
optional TStatistics aggregate_statistics = 2;
+
+ optional bytes cookie = 3;
}
message TRspPartitionTables
@@ -2174,6 +2178,31 @@ message TRspPartitionTables
////////////////////////////////////////////////////////////////////////////////
+message TReqReadTablePartition
+{
+ optional bytes cookie = 1;
+
+ optional bool unordered = 2 [default = false];
+ optional bool omit_inaccessible_columns = 3 [default = false];
+ optional bool enable_table_index = 5 [default = false];
+ optional bool enable_row_index = 6 [default = false];
+ optional bool enable_range_index = 7 [default = false];
+ optional bytes config = 4; // YSON-serialized TTableReaderConfig
+ optional ERowsetFormat desired_rowset_format = 8 [default = RF_YT_WIRE];
+ optional ERowsetFormat arrow_fallback_rowset_format = 10 [default = RF_YT_WIRE];
+ optional bytes format = 9; // YSON-serialized TFormat
+}
+
+message TRspReadTablePartition
+{
+}
+
+message TRspReadTablePartitionMeta
+{
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
message TReqBalanceTabletCells
{
required string bundle = 1;