aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorrobot-piglet <robot-piglet@yandex-team.com>2024-03-10 19:22:41 +0300
committerrobot-piglet <robot-piglet@yandex-team.com>2024-03-10 19:31:09 +0300
commit13a34e8a2fe1c3498a9a3e1d56202bb29eb5d17b (patch)
treef188fb0f394d6b20e68951e88cf555610dad8cf1
parente0439374e8770430b5a391cea94769059544e2a2 (diff)
downloadydb-13a34e8a2fe1c3498a9a3e1d56202bb29eb5d17b.tar.gz
Intermediate changes
-rw-r--r--contrib/python/hypothesis/py3/.dist-info/METADATA2
-rw-r--r--contrib/python/hypothesis/py3/hypothesis/internal/conjecture/data.py66
-rw-r--r--contrib/python/hypothesis/py3/hypothesis/internal/conjecture/datatree.py21
-rw-r--r--contrib/python/hypothesis/py3/hypothesis/stateful.py32
-rw-r--r--contrib/python/hypothesis/py3/hypothesis/strategies/_internal/collections.py5
-rw-r--r--contrib/python/hypothesis/py3/hypothesis/strategies/_internal/featureflags.py26
-rw-r--r--contrib/python/hypothesis/py3/hypothesis/version.py2
-rw-r--r--contrib/python/hypothesis/py3/ya.make2
-rw-r--r--contrib/python/setuptools/py3/.dist-info/METADATA5
-rw-r--r--contrib/python/setuptools/py3/setuptools/build_meta.py11
-rw-r--r--contrib/python/setuptools/py3/ya.make2
-rw-r--r--yt/cpp/mapreduce/interface/operation.h10
-rw-r--r--yt/yt/client/api/admin_client.h8
-rw-r--r--yt/yt/client/api/delegating_client.h4
-rw-r--r--yt/yt/client/api/rpc_proxy/client_impl.cpp5
-rw-r--r--yt/yt/client/api/rpc_proxy/client_impl.h3
-rw-r--r--yt/yt/client/driver/admin_commands.cpp20
-rw-r--r--yt/yt/client/driver/admin_commands.h14
-rw-r--r--yt/yt/client/driver/driver.cpp1
-rw-r--r--yt/yt/client/federated/client.cpp1
-rw-r--r--yt/yt/client/hedging/hedging.cpp1
-rw-r--r--yt/yt/client/hive/public.h1
-rw-r--r--yt/yt/client/security_client/access_control.h1
-rw-r--r--yt/yt/client/security_client/public.h1
-rw-r--r--yt/yt/client/table_client/config.cpp20
-rw-r--r--yt/yt/client/table_client/config.h26
-rw-r--r--yt/yt/client/table_client/public.h2
-rw-r--r--yt/yt/client/unittests/mock/client.h4
-rw-r--r--yt/yt/core/concurrency/delayed_executor.cpp5
-rw-r--r--yt/yt/core/concurrency/fair_share_thread_pool.cpp9
-rw-r--r--yt/yt/core/concurrency/fiber_scheduler_thread.cpp12
-rw-r--r--yt/yt/core/concurrency/fiber_scheduler_thread.h11
-rw-r--r--yt/yt/core/concurrency/new_fair_share_thread_pool.cpp2
-rw-r--r--yt/yt/core/concurrency/notify_manager.cpp2
-rw-r--r--yt/yt/core/concurrency/notify_manager.h2
-rw-r--r--yt/yt/core/concurrency/private.h2
-rw-r--r--yt/yt/core/concurrency/quantized_executor.cpp41
-rw-r--r--yt/yt/core/concurrency/quantized_executor.h12
-rw-r--r--yt/yt/core/concurrency/scheduler_thread.cpp46
-rw-r--r--yt/yt/core/concurrency/scheduler_thread.h38
-rw-r--r--yt/yt/core/concurrency/single_queue_scheduler_thread.cpp24
-rw-r--r--yt/yt/core/concurrency/single_queue_scheduler_thread.h12
-rw-r--r--yt/yt/core/concurrency/suspendable_action_queue.cpp18
-rw-r--r--yt/yt/core/concurrency/suspendable_action_queue.h9
-rw-r--r--yt/yt/core/concurrency/system_invokers.cpp5
-rw-r--r--yt/yt/core/concurrency/thread_pool.cpp31
-rw-r--r--yt/yt/core/concurrency/thread_pool.h23
-rw-r--r--yt/yt/core/concurrency/thread_pool_detail.cpp5
-rw-r--r--yt/yt/core/concurrency/thread_pool_detail.h9
-rw-r--r--yt/yt/core/concurrency/two_level_fair_share_thread_pool.cpp2
-rw-r--r--yt/yt/core/concurrency/unittests/quantized_executor_ut.cpp36
-rw-r--r--yt/yt/core/logging/log_manager.cpp5
-rw-r--r--yt/yt/core/misc/utf8_decoder.cpp2
-rw-r--r--yt/yt/core/rpc/grpc/dispatcher.cpp3
-rw-r--r--yt/yt/core/threading/thread.cpp16
-rw-r--r--yt/yt/core/threading/thread.h13
-rw-r--r--yt/yt/core/ytree/virtual.cpp2
-rw-r--r--yt/yt/core/ytree/virtual.h1
58 files changed, 418 insertions, 276 deletions
diff --git a/contrib/python/hypothesis/py3/.dist-info/METADATA b/contrib/python/hypothesis/py3/.dist-info/METADATA
index 171a09eebb..95fe6d3510 100644
--- a/contrib/python/hypothesis/py3/.dist-info/METADATA
+++ b/contrib/python/hypothesis/py3/.dist-info/METADATA
@@ -1,6 +1,6 @@
Metadata-Version: 2.1
Name: hypothesis
-Version: 6.98.10
+Version: 6.98.11
Summary: A library for property-based testing
Home-page: https://hypothesis.works
Author: David R. MacIver and Zac Hatfield-Dodds
diff --git a/contrib/python/hypothesis/py3/hypothesis/internal/conjecture/data.py b/contrib/python/hypothesis/py3/hypothesis/internal/conjecture/data.py
index cdce1afacf..b84db4df85 100644
--- a/contrib/python/hypothesis/py3/hypothesis/internal/conjecture/data.py
+++ b/contrib/python/hypothesis/py3/hypothesis/internal/conjecture/data.py
@@ -62,11 +62,14 @@ from hypothesis.internal.floats import (
from hypothesis.internal.intervalsets import IntervalSet
if TYPE_CHECKING:
+ from typing import TypeAlias
+
from typing_extensions import dataclass_transform
from hypothesis.strategies import SearchStrategy
from hypothesis.strategies._internal.strategies import Ex
else:
+ TypeAlias = object
def dataclass_transform():
def wrapper(tp):
@@ -94,6 +97,41 @@ TargetObservations = Dict[Optional[str], Union[int, float]]
T = TypeVar("T")
+class IntegerKWargs(TypedDict):
+ min_value: Optional[int]
+ max_value: Optional[int]
+ weights: Optional[Sequence[float]]
+ shrink_towards: int
+
+
+class FloatKWargs(TypedDict):
+ min_value: float
+ max_value: float
+ allow_nan: bool
+ smallest_nonzero_magnitude: float
+
+
+class StringKWargs(TypedDict):
+ intervals: IntervalSet
+ min_size: int
+ max_size: Optional[int]
+
+
+class BytesKWargs(TypedDict):
+ size: int
+
+
+class BooleanKWargs(TypedDict):
+ p: float
+
+
+IRType: TypeAlias = Union[int, str, bool, float, bytes]
+IRKWargsType: TypeAlias = Union[
+ IntegerKWargs, FloatKWargs, StringKWargs, BytesKWargs, BooleanKWargs
+]
+IRTypeName: TypeAlias = Literal["integer", "string", "boolean", "float", "bytes"]
+
+
class ExtraInformation:
"""A class for holding shared state on a ``ConjectureData`` that should
be added to the final ``ConjectureResult``."""
@@ -798,34 +836,6 @@ global_test_counter = 0
MAX_DEPTH = 100
-class IntegerKWargs(TypedDict):
- min_value: Optional[int]
- max_value: Optional[int]
- weights: Optional[Sequence[float]]
- shrink_towards: int
-
-
-class FloatKWargs(TypedDict):
- min_value: float
- max_value: float
- allow_nan: bool
- smallest_nonzero_magnitude: float
-
-
-class StringKWargs(TypedDict):
- intervals: IntervalSet
- min_size: int
- max_size: Optional[int]
-
-
-class BytesKWargs(TypedDict):
- size: int
-
-
-class BooleanKWargs(TypedDict):
- p: float
-
-
class DataObserver:
"""Observer class for recording the behaviour of a
ConjectureData object, primarily used for tracking
diff --git a/contrib/python/hypothesis/py3/hypothesis/internal/conjecture/datatree.py b/contrib/python/hypothesis/py3/hypothesis/internal/conjecture/datatree.py
index a9a6e5b196..c8e5d70aa7 100644
--- a/contrib/python/hypothesis/py3/hypothesis/internal/conjecture/datatree.py
+++ b/contrib/python/hypothesis/py3/hypothesis/internal/conjecture/datatree.py
@@ -10,7 +10,7 @@
import itertools
import math
-from typing import TYPE_CHECKING, List, Literal, Optional, Union
+from typing import List, Optional, Union
import attr
@@ -24,23 +24,14 @@ from hypothesis.internal.conjecture.data import (
DataObserver,
FloatKWargs,
IntegerKWargs,
+ IRKWargsType,
+ IRType,
+ IRTypeName,
Status,
StringKWargs,
)
from hypothesis.internal.floats import count_between_floats, float_to_int, int_to_float
-if TYPE_CHECKING:
- from typing import TypeAlias
-else:
- TypeAlias = object
-
-IRType: TypeAlias = Union[int, str, bool, float, bytes]
-IRKWargsType: TypeAlias = Union[
- IntegerKWargs, FloatKWargs, StringKWargs, BytesKWargs, BooleanKWargs
-]
-# this would be "IRTypeType", but that's just confusing.
-IRLiteralType: TypeAlias = Literal["integer", "string", "boolean", "float", "bytes"]
-
class PreviouslyUnseenBehaviour(HypothesisException):
pass
@@ -336,7 +327,7 @@ class TreeNode:
# have the same length. The values at index i belong to node i.
kwargs: List[IRKWargsType] = attr.ib(factory=list)
values: List[IRType] = attr.ib(factory=list)
- ir_types: List[IRLiteralType] = attr.ib(factory=list)
+ ir_types: List[IRTypeName] = attr.ib(factory=list)
# The indices of nodes which had forced values.
#
@@ -885,7 +876,7 @@ class TreeRecordingObserver(DataObserver):
def draw_value(
self,
- ir_type: IRLiteralType,
+ ir_type: IRTypeName,
value: IRType,
*,
was_forced: bool,
diff --git a/contrib/python/hypothesis/py3/hypothesis/stateful.py b/contrib/python/hypothesis/py3/hypothesis/stateful.py
index 2ab7ef13d5..60cd92721c 100644
--- a/contrib/python/hypothesis/py3/hypothesis/stateful.py
+++ b/contrib/python/hypothesis/py3/hypothesis/stateful.py
@@ -358,7 +358,6 @@ class RuleBasedStateMachine(metaclass=StateMachineMeta):
return cls._invariants_per_class[cls]
def _repr_step(self, rule, data, result):
- self.step_count = getattr(self, "step_count", 0) + 1
output_assignment = ""
if rule.targets:
if isinstance(result, MultipleResults):
@@ -431,7 +430,7 @@ class RuleBasedStateMachine(metaclass=StateMachineMeta):
return StateMachineTestCase
-@attr.s()
+@attr.s(repr=False)
class Rule:
targets = attr.ib()
function = attr.ib(repr=get_pretty_function_description)
@@ -451,6 +450,11 @@ class Rule:
self.arguments_strategies[k] = v
self.bundles = tuple(bundles)
+ def __repr__(self) -> str:
+ rep = get_pretty_function_description
+ bits = [f"{k}={rep(v)}" for k, v in attr.asdict(self).items() if v]
+ return f"{self.__class__.__name__}({', '.join(bits)})"
+
self_strategy = st.runner()
@@ -937,7 +941,8 @@ class RuleStrategy(SearchStrategy):
self.rules = list(machine.rules())
self.enabled_rules_strategy = st.shared(
- FeatureStrategy(), key=("enabled rules", machine)
+ FeatureStrategy(at_least_one_of={r.function.__name__ for r in self.rules}),
+ key=("enabled rules", machine),
)
# The order is a bit arbitrary. Primarily we're trying to group rules
@@ -965,17 +970,16 @@ class RuleStrategy(SearchStrategy):
feature_flags = data.draw(self.enabled_rules_strategy)
- # Note: The order of the filters here is actually quite important,
- # because checking is_enabled makes choices, so increases the size of
- # the choice sequence. This means that if we are in a case where many
- # rules are invalid we will make a lot more choices if we ask if they
- # are enabled before we ask if they are valid, so our test cases will
- # be artificially large.
- rule = data.draw(
- st.sampled_from(self.rules)
- .filter(self.is_valid)
- .filter(lambda r: feature_flags.is_enabled(r.function.__name__))
- )
+ def rule_is_enabled(r):
+ # Note: The order of the filters here is actually quite important,
+ # because checking is_enabled makes choices, so increases the size of
+ # the choice sequence. This means that if we are in a case where many
+ # rules are invalid we would make a lot more choices if we ask if they
+ # are enabled before we ask if they are valid, so our test cases would
+ # be artificially large.
+ return self.is_valid(r) and feature_flags.is_enabled(r.function.__name__)
+
+ rule = data.draw(st.sampled_from(self.rules).filter(rule_is_enabled))
arguments = {}
for k, strat in rule.arguments_strategies.items():
diff --git a/contrib/python/hypothesis/py3/hypothesis/strategies/_internal/collections.py b/contrib/python/hypothesis/py3/hypothesis/strategies/_internal/collections.py
index 1f86f37a42..e8f8f21ba4 100644
--- a/contrib/python/hypothesis/py3/hypothesis/strategies/_internal/collections.py
+++ b/contrib/python/hypothesis/py3/hypothesis/strategies/_internal/collections.py
@@ -13,6 +13,7 @@ from typing import Any, Iterable, Tuple, overload
from hypothesis.errors import InvalidArgument
from hypothesis.internal.conjecture import utils as cu
+from hypothesis.internal.conjecture.engine import BUFFER_SIZE
from hypothesis.internal.conjecture.junkdrawer import LazySequenceCopy
from hypothesis.internal.conjecture.utils import combine_labels
from hypothesis.internal.filtering import get_integer_predicate_bounds
@@ -142,6 +143,10 @@ class ListStrategy(SearchStrategy):
self.min_size = min_size or 0
self.max_size = max_size if max_size is not None else float("inf")
assert 0 <= self.min_size <= self.max_size
+ if min_size > BUFFER_SIZE:
+ raise InvalidArgument(
+ f"min_size={min_size:_d} is larger than Hypothesis is designed to handle"
+ )
self.average_size = min(
max(self.min_size * 2, self.min_size + 5),
0.5 * (self.min_size + self.max_size),
diff --git a/contrib/python/hypothesis/py3/hypothesis/strategies/_internal/featureflags.py b/contrib/python/hypothesis/py3/hypothesis/strategies/_internal/featureflags.py
index cf72b5c10b..98af8f087a 100644
--- a/contrib/python/hypothesis/py3/hypothesis/strategies/_internal/featureflags.py
+++ b/contrib/python/hypothesis/py3/hypothesis/strategies/_internal/featureflags.py
@@ -31,7 +31,7 @@ class FeatureFlags:
required disabled features.
"""
- def __init__(self, data=None, enabled=(), disabled=()):
+ def __init__(self, data=None, enabled=(), disabled=(), at_least_one_of=()):
self.__data = data
self.__is_disabled = {}
@@ -52,13 +52,18 @@ class FeatureFlags:
# features will be enabled. This is so that we shrink in the direction
# of more features being enabled.
if self.__data is not None:
- self.__p_disabled = data.draw_integer(0, 255) / 255.0
+ self.__p_disabled = data.draw_integer(0, 254) / 255
else:
# If data is None we're in example mode so all that matters is the
# enabled/disabled lists above. We set this up so that everything
# else is enabled by default.
self.__p_disabled = 0.0
+ # The naive approach can lead to disabling e.g. every single rule on a
+ # RuleBasedStateMachine, which aborts the test as unable to make progress.
+ # Track the set of possible names, and ensure that at least one is enabled.
+ self.__at_least_one_of = set(at_least_one_of)
+
def is_enabled(self, name):
"""Tests whether the feature named ``name`` should be enabled on this
test run."""
@@ -81,10 +86,19 @@ class FeatureFlags:
# of the test case where we originally decided, the next point at
# which we make this decision just makes the decision it previously
# made.
+ oneof = self.__at_least_one_of
is_disabled = self.__data.draw_boolean(
- self.__p_disabled, forced=self.__is_disabled.get(name)
+ self.__p_disabled,
+ forced=(
+ False
+ if len(oneof) == 1 and name in oneof
+ else self.__is_disabled.get(name)
+ ),
)
self.__is_disabled[name] = is_disabled
+ if name in oneof and not is_disabled:
+ oneof.clear()
+ oneof.discard(name)
data.stop_example()
return not is_disabled
@@ -100,5 +114,9 @@ class FeatureFlags:
class FeatureStrategy(SearchStrategy):
+ def __init__(self, at_least_one_of=()):
+ super().__init__()
+ self._at_least_one_of = frozenset(at_least_one_of)
+
def do_draw(self, data):
- return FeatureFlags(data)
+ return FeatureFlags(data, at_least_one_of=self._at_least_one_of)
diff --git a/contrib/python/hypothesis/py3/hypothesis/version.py b/contrib/python/hypothesis/py3/hypothesis/version.py
index ce617c47e9..da7f74708c 100644
--- a/contrib/python/hypothesis/py3/hypothesis/version.py
+++ b/contrib/python/hypothesis/py3/hypothesis/version.py
@@ -8,5 +8,5 @@
# v. 2.0. If a copy of the MPL was not distributed with this file, You can
# obtain one at https://mozilla.org/MPL/2.0/.
-__version_info__ = (6, 98, 10)
+__version_info__ = (6, 98, 11)
__version__ = ".".join(map(str, __version_info__))
diff --git a/contrib/python/hypothesis/py3/ya.make b/contrib/python/hypothesis/py3/ya.make
index 4c7de0c00e..c71ce1c809 100644
--- a/contrib/python/hypothesis/py3/ya.make
+++ b/contrib/python/hypothesis/py3/ya.make
@@ -2,7 +2,7 @@
PY3_LIBRARY()
-VERSION(6.98.10)
+VERSION(6.98.11)
LICENSE(MPL-2.0)
diff --git a/contrib/python/setuptools/py3/.dist-info/METADATA b/contrib/python/setuptools/py3/.dist-info/METADATA
index 6abcbef24b..237cc66f13 100644
--- a/contrib/python/setuptools/py3/.dist-info/METADATA
+++ b/contrib/python/setuptools/py3/.dist-info/METADATA
@@ -1,6 +1,6 @@
Metadata-Version: 2.1
Name: setuptools
-Version: 69.1.0
+Version: 69.1.1
Summary: Easily download, build, install, upgrade, and uninstall Python packages
Home-page: https://github.com/pypa/setuptools
Author: Python Packaging Authority
@@ -43,6 +43,7 @@ Requires-Dist: flake8-2020 ; extra == 'testing'
Requires-Dist: virtualenv >=13.0.0 ; extra == 'testing'
Requires-Dist: wheel ; extra == 'testing'
Requires-Dist: pip >=19.1 ; extra == 'testing'
+Requires-Dist: packaging >=23.2 ; extra == 'testing'
Requires-Dist: jaraco.envs >=2.2 ; extra == 'testing'
Requires-Dist: pytest-xdist ; extra == 'testing'
Requires-Dist: jaraco.path >=3.2.0 ; extra == 'testing'
@@ -63,7 +64,7 @@ Requires-Dist: jaraco.path >=3.2.0 ; extra == 'testing-integration'
Requires-Dist: jaraco.envs >=2.2 ; extra == 'testing-integration'
Requires-Dist: build[virtualenv] >=1.0.3 ; extra == 'testing-integration'
Requires-Dist: filelock >=3.4.0 ; extra == 'testing-integration'
-Requires-Dist: packaging >=23.1 ; extra == 'testing-integration'
+Requires-Dist: packaging >=23.2 ; extra == 'testing-integration'
Requires-Dist: pytest-cov ; (platform_python_implementation != "PyPy") and extra == 'testing'
Requires-Dist: pytest-mypy >=0.9.1 ; (platform_python_implementation != "PyPy") and extra == 'testing'
Requires-Dist: jaraco.develop >=7.21 ; (python_version >= "3.9" and sys_platform != "cygwin") and extra == 'testing'
diff --git a/contrib/python/setuptools/py3/setuptools/build_meta.py b/contrib/python/setuptools/py3/setuptools/build_meta.py
index 0a0abfdae0..2decd2d214 100644
--- a/contrib/python/setuptools/py3/setuptools/build_meta.py
+++ b/contrib/python/setuptools/py3/setuptools/build_meta.py
@@ -369,7 +369,12 @@ class _BuildMetaBackend(_ConfigSettingsTranslator):
return self._bubble_up_info_directory(metadata_directory, ".dist-info")
def _build_with_temp_dir(
- self, setup_command, result_extension, result_directory, config_settings
+ self,
+ setup_command,
+ result_extension,
+ result_directory,
+ config_settings,
+ arbitrary_args=(),
):
result_directory = os.path.abspath(result_directory)
@@ -384,6 +389,7 @@ class _BuildMetaBackend(_ConfigSettingsTranslator):
*setup_command,
"--dist-dir",
tmp_dist_dir,
+ *arbitrary_args,
]
with no_install_setup_requires():
self.run_setup()
@@ -402,10 +408,11 @@ class _BuildMetaBackend(_ConfigSettingsTranslator):
):
with suppress_known_deprecation():
return self._build_with_temp_dir(
- ['bdist_wheel', *self._arbitrary_args(config_settings)],
+ ['bdist_wheel'],
'.whl',
wheel_directory,
config_settings,
+ self._arbitrary_args(config_settings),
)
def build_sdist(self, sdist_directory, config_settings=None):
diff --git a/contrib/python/setuptools/py3/ya.make b/contrib/python/setuptools/py3/ya.make
index 564d268875..4210fba192 100644
--- a/contrib/python/setuptools/py3/ya.make
+++ b/contrib/python/setuptools/py3/ya.make
@@ -2,7 +2,7 @@
PY3_LIBRARY()
-VERSION(69.1.0)
+VERSION(69.1.1)
LICENSE(MIT)
diff --git a/yt/cpp/mapreduce/interface/operation.h b/yt/cpp/mapreduce/interface/operation.h
index 69c873090d..09abf1f88a 100644
--- a/yt/cpp/mapreduce/interface/operation.h
+++ b/yt/cpp/mapreduce/interface/operation.h
@@ -1202,7 +1202,7 @@ struct TRawMapReduceOperationSpec
///
/// @brief Schema inference mode.
///
-/// @see https://ytsaurus.tech/docs/en/user-guide/storage/static_schema.html#schema_inference
+/// @see https://ytsaurus.tech/docs/en/user-guide/storage/static-schema.html#schema_inference
enum class ESchemaInferenceMode : int
{
FromInput /* "from_input" */,
@@ -1258,7 +1258,7 @@ struct TSortOperationSpec
///
/// @brief Inference mode for output table schema.
///
- /// @see https://ytsaurus.tech/docs/en/user-guide/storage/static_schema.html#schema_inference
+ /// @see https://ytsaurus.tech/docs/en/user-guide/storage/static-schema.html#schema_inference
FLUENT_FIELD_OPTION(ESchemaInferenceMode, SchemaInferenceMode);
///
@@ -1332,7 +1332,7 @@ struct TMergeOperationSpec
///
/// @brief Inference mode for output table schema.
///
- /// @see https://ytsaurus.tech/docs/en/user-guide/storage/static_schema.html#schema_inference
+ /// @see https://ytsaurus.tech/docs/en/user-guide/storage/static-schema.html#schema_inference
FLUENT_FIELD_OPTION(ESchemaInferenceMode, SchemaInferenceMode);
};
@@ -1358,7 +1358,7 @@ struct TEraseOperationSpec
///
/// @brief Inference mode for output table schema.
///
- /// @see https://ytsaurus.tech/docs/en/user-guide/storage/static_schema.html#schema_inference
+ /// @see https://ytsaurus.tech/docs/en/user-guide/storage/static-schema.html#schema_inference
FLUENT_FIELD_OPTION(ESchemaInferenceMode, SchemaInferenceMode);
};
@@ -1392,7 +1392,7 @@ struct TRemoteCopyOperationSpec
///
/// @brief Inference mode for output table schema.
///
- /// @see https://ytsaurus.tech/docs/en/user-guide/storage/static_schema.html#schema_inference
+ /// @see https://ytsaurus.tech/docs/en/user-guide/storage/static-schema.html#schema_inference
FLUENT_FIELD_OPTION(ESchemaInferenceMode, SchemaInferenceMode);
///
diff --git a/yt/yt/client/api/admin_client.h b/yt/yt/client/api/admin_client.h
index cf8d789c57..1780ad0ab7 100644
--- a/yt/yt/client/api/admin_client.h
+++ b/yt/yt/client/api/admin_client.h
@@ -28,6 +28,10 @@ struct TBuildMasterSnapshotsOptions
bool Retry = true;
};
+struct TGetMasterConsistentStateOptions
+ : public TTimeoutOptions
+{ };
+
struct TExitReadOnlyOptions
: public TTimeoutOptions
{ };
@@ -155,6 +159,7 @@ struct TResurrectChunkLocationsResult
};
using TCellIdToSnapshotIdMap = THashMap<NHydra::TCellId, int>;
+using TCellIdToSequenceNumberMap = THashMap<NHydra::TCellId, i64>;
struct TAddMaintenanceOptions
: public TTimeoutOptions
@@ -200,6 +205,9 @@ struct IAdminClient
virtual TFuture<TCellIdToSnapshotIdMap> BuildMasterSnapshots(
const TBuildMasterSnapshotsOptions& options = {}) = 0;
+ virtual TFuture<TCellIdToSequenceNumberMap> GetMasterConsistentState(
+ const TGetMasterConsistentStateOptions& options = {}) = 0;
+
virtual TFuture<void> ExitReadOnly(
NHydra::TCellId cellId,
const TExitReadOnlyOptions& options = {}) = 0;
diff --git a/yt/yt/client/api/delegating_client.h b/yt/yt/client/api/delegating_client.h
index 97d46ce93a..e4d3341492 100644
--- a/yt/yt/client/api/delegating_client.h
+++ b/yt/yt/client/api/delegating_client.h
@@ -558,6 +558,10 @@ public:
const TBuildMasterSnapshotsOptions& options),
(options))
+ DELEGATE_METHOD(TFuture<TCellIdToSequenceNumberMap>, GetMasterConsistentState, (
+ const TGetMasterConsistentStateOptions& options),
+ (options))
+
DELEGATE_METHOD(TFuture<void>, ExitReadOnly, (
NHydra::TCellId cellId,
const TExitReadOnlyOptions& options),
diff --git a/yt/yt/client/api/rpc_proxy/client_impl.cpp b/yt/yt/client/api/rpc_proxy/client_impl.cpp
index 99706535aa..a0d99186d2 100644
--- a/yt/yt/client/api/rpc_proxy/client_impl.cpp
+++ b/yt/yt/client/api/rpc_proxy/client_impl.cpp
@@ -1648,6 +1648,11 @@ TFuture<TCellIdToSnapshotIdMap> TClient::BuildMasterSnapshots(const TBuildMaster
ThrowUnimplemented("BuildMasterSnapshots");
}
+TFuture<TCellIdToSequenceNumberMap> TClient::GetMasterConsistentState(const TGetMasterConsistentStateOptions& /*options*/)
+{
+ ThrowUnimplemented("GetMasterConsistentState");
+}
+
TFuture<void> TClient::ExitReadOnly(
NHydra::TCellId cellId,
const TExitReadOnlyOptions& /*options*/)
diff --git a/yt/yt/client/api/rpc_proxy/client_impl.h b/yt/yt/client/api/rpc_proxy/client_impl.h
index 8dc59e06a0..b2854eff49 100644
--- a/yt/yt/client/api/rpc_proxy/client_impl.h
+++ b/yt/yt/client/api/rpc_proxy/client_impl.h
@@ -323,6 +323,9 @@ public:
TFuture<TCellIdToSnapshotIdMap> BuildMasterSnapshots(
const TBuildMasterSnapshotsOptions& options) override;
+ TFuture<TCellIdToSequenceNumberMap> GetMasterConsistentState(
+ const TGetMasterConsistentStateOptions& options) override;
+
TFuture<void> ExitReadOnly(
NHydra::TCellId cellId,
const TExitReadOnlyOptions& options) override;
diff --git a/yt/yt/client/driver/admin_commands.cpp b/yt/yt/client/driver/admin_commands.cpp
index c1af19fc13..4586547f11 100644
--- a/yt/yt/client/driver/admin_commands.cpp
+++ b/yt/yt/client/driver/admin_commands.cpp
@@ -101,6 +101,26 @@ void TBuildMasterSnapshotsCommand::DoExecute(ICommandContextPtr context)
////////////////////////////////////////////////////////////////////////////////
+void TGetMasterConsistentStateCommand::Register(TRegistrar /*registrar*/)
+{ }
+
+void TGetMasterConsistentStateCommand::DoExecute(ICommandContextPtr context)
+{
+ auto cellIdToSequenceNumber = WaitFor(context->GetClient()->GetMasterConsistentState(Options))
+ .ValueOrThrow();
+
+ context->ProduceOutputValue(BuildYsonStringFluently()
+ .DoListFor(cellIdToSequenceNumber, [=] (TFluentList fluent, const auto& pair) {
+ fluent
+ .Item().BeginMap()
+ .Item("cell_id").Value(pair.first)
+ .Item("sequence_number").Value(pair.second)
+ .EndMap();
+ }));
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
void TExitReadOnlyCommand::Register(TRegistrar registrar)
{
registrar.Parameter("cell_id", &TThis::CellId_);
diff --git a/yt/yt/client/driver/admin_commands.h b/yt/yt/client/driver/admin_commands.h
index 0665eb3494..a61576abc2 100644
--- a/yt/yt/client/driver/admin_commands.h
+++ b/yt/yt/client/driver/admin_commands.h
@@ -34,6 +34,20 @@ private:
////////////////////////////////////////////////////////////////////////////////
+class TGetMasterConsistentStateCommand
+ : public TTypedCommand<NApi::TGetMasterConsistentStateOptions>
+{
+public:
+ REGISTER_YSON_STRUCT_LITE(TGetMasterConsistentStateCommand);
+
+ static void Register(TRegistrar registrar);
+
+private:
+ void DoExecute(ICommandContextPtr context) override;
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
class TExitReadOnlyCommand
: public TTypedCommand<NApi::TExitReadOnlyOptions>
{
diff --git a/yt/yt/client/driver/driver.cpp b/yt/yt/client/driver/driver.cpp
index 94fd78bdf6..a57237f46b 100644
--- a/yt/yt/client/driver/driver.cpp
+++ b/yt/yt/client/driver/driver.cpp
@@ -316,6 +316,7 @@ public:
REGISTER_ALL(TBuildSnapshotCommand, "build_snapshot", Null, Structured, true, false);
REGISTER_ALL(TBuildMasterSnapshotsCommand, "build_master_snapshots", Null, Structured, true, false);
+ REGISTER_ALL(TGetMasterConsistentStateCommand, "get_master_consistent_state", Null, Structured, true, false);
REGISTER_ALL(TExitReadOnlyCommand, "exit_read_only", Null, Structured, true, false);
REGISTER_ALL(TMasterExitReadOnlyCommand, "master_exit_read_only", Null, Structured, true, false);
REGISTER_ALL(TDiscombobulateNonvotingPeersCommand, "discombobulate_nonvoting_peers", Null, Structured, true, false);
diff --git a/yt/yt/client/federated/client.cpp b/yt/yt/client/federated/client.cpp
index dad29ee6a5..1e43f174a0 100644
--- a/yt/yt/client/federated/client.cpp
+++ b/yt/yt/client/federated/client.cpp
@@ -386,6 +386,7 @@ public:
UNIMPLEMENTED_METHOD(TFuture<void>, CheckClusterLiveness, (const TCheckClusterLivenessOptions&));
UNIMPLEMENTED_METHOD(TFuture<int>, BuildSnapshot, (const TBuildSnapshotOptions&));
UNIMPLEMENTED_METHOD(TFuture<TCellIdToSnapshotIdMap>, BuildMasterSnapshots, (const TBuildMasterSnapshotsOptions&));
+ UNIMPLEMENTED_METHOD(TFuture<TCellIdToSequenceNumberMap>, GetMasterConsistentState, (const TGetMasterConsistentStateOptions&));
UNIMPLEMENTED_METHOD(TFuture<void>, ExitReadOnly, (NObjectClient::TCellId, const TExitReadOnlyOptions&));
UNIMPLEMENTED_METHOD(TFuture<void>, MasterExitReadOnly, (const TMasterExitReadOnlyOptions&));
UNIMPLEMENTED_METHOD(TFuture<void>, DiscombobulateNonvotingPeers, (NObjectClient::TCellId, const TDiscombobulateNonvotingPeersOptions&));
diff --git a/yt/yt/client/hedging/hedging.cpp b/yt/yt/client/hedging/hedging.cpp
index 0be20d47af..6d0122b584 100644
--- a/yt/yt/client/hedging/hedging.cpp
+++ b/yt/yt/client/hedging/hedging.cpp
@@ -173,6 +173,7 @@ public:
UNSUPPORTED_METHOD(TFuture<void>, CheckClusterLiveness, (const TCheckClusterLivenessOptions&));
UNSUPPORTED_METHOD(TFuture<int>, BuildSnapshot, (const TBuildSnapshotOptions&));
UNSUPPORTED_METHOD(TFuture<TCellIdToSnapshotIdMap>, BuildMasterSnapshots, (const TBuildMasterSnapshotsOptions&));
+ UNSUPPORTED_METHOD(TFuture<TCellIdToSequenceNumberMap>, GetMasterConsistentState, (const TGetMasterConsistentStateOptions&));
UNSUPPORTED_METHOD(TFuture<void>, ExitReadOnly, (NObjectClient::TCellId, const TExitReadOnlyOptions&));
UNSUPPORTED_METHOD(TFuture<void>, MasterExitReadOnly, (const TMasterExitReadOnlyOptions&));
UNSUPPORTED_METHOD(TFuture<void>, DiscombobulateNonvotingPeers, (NObjectClient::TCellId, const TDiscombobulateNonvotingPeersOptions&));
diff --git a/yt/yt/client/hive/public.h b/yt/yt/client/hive/public.h
index 5e8c8284bc..5143ae3c79 100644
--- a/yt/yt/client/hive/public.h
+++ b/yt/yt/client/hive/public.h
@@ -34,6 +34,7 @@ DECLARE_REFCOUNTED_STRUCT(ITransactionParticipant)
YT_DEFINE_ERROR_ENUM(
((MailboxNotCreatedYet) (2200))
((ParticipantUnregistered) (2201))
+ ((EntryNotFound) (2202))
);
////////////////////////////////////////////////////////////////////////////////
diff --git a/yt/yt/client/security_client/access_control.h b/yt/yt/client/security_client/access_control.h
index 009492b189..4152b9f7bc 100644
--- a/yt/yt/client/security_client/access_control.h
+++ b/yt/yt/client/security_client/access_control.h
@@ -42,6 +42,7 @@ const THashMap<EAccessControlObject, TAccessControlObjectDescriptor> AccessContr
ACCESS_CONTROL_ENTRY(EAccessControlObjectNamespace::AdminCommands, EAccessControlObject::ResurrectChunkLocations),
ACCESS_CONTROL_ENTRY(EAccessControlObjectNamespace::AdminCommands, EAccessControlObject::BuildSnapshot),
ACCESS_CONTROL_ENTRY(EAccessControlObjectNamespace::AdminCommands, EAccessControlObject::BuildMasterSnapshot),
+ ACCESS_CONTROL_ENTRY(EAccessControlObjectNamespace::AdminCommands, EAccessControlObject::GetMasterConsistentState),
ACCESS_CONTROL_ENTRY(EAccessControlObjectNamespace::AdminCommands, EAccessControlObject::ExitReadOnly),
ACCESS_CONTROL_ENTRY(EAccessControlObjectNamespace::AdminCommands, EAccessControlObject::MasterExitReadOnly),
ACCESS_CONTROL_ENTRY(EAccessControlObjectNamespace::AdminCommands, EAccessControlObject::DiscombobulateNonvotingPeers),
diff --git a/yt/yt/client/security_client/public.h b/yt/yt/client/security_client/public.h
index 9d6e84df55..43e86b1ef3 100644
--- a/yt/yt/client/security_client/public.h
+++ b/yt/yt/client/security_client/public.h
@@ -90,6 +90,7 @@ DEFINE_ENUM(EAccessControlObject,
(ResurrectChunkLocations)
(BuildSnapshot)
(BuildMasterSnapshot)
+ (GetMasterConsistentState)
(ExitReadOnly)
(MasterExitReadOnly)
(DiscombobulateNonvotingPeers)
diff --git a/yt/yt/client/table_client/config.cpp b/yt/yt/client/table_client/config.cpp
index 863672915b..d30ec5c8d2 100644
--- a/yt/yt/client/table_client/config.cpp
+++ b/yt/yt/client/table_client/config.cpp
@@ -270,9 +270,6 @@ void TDictionaryCompressionConfig::Register(TRegistrar registrar)
registrar.Parameter("column_dictionary_size", &TThis::ColumnDictionarySize)
.GreaterThanOrEqual(NCompression::GetDictionaryCompressionCodec()->GetMinDictionarySize())
.Default(32_KB);
- registrar.Parameter("compression_level", &TThis::CompressionLevel)
- .InRange(1, NCompression::GetDictionaryCompressionCodec()->GetMaxCompressionLevel())
- .Default(NCompression::GetDictionaryCompressionCodec()->GetDefaultCompressionLevel());
registrar.Parameter("applied_policies", &TThis::AppliedPolicies)
.Default({
EDictionaryCompressionPolicy::LargeChunkFirst,
@@ -285,10 +282,7 @@ void TDictionaryCompressionConfig::Register(TRegistrar registrar)
.Default(12_MB);
registrar.Parameter("max_acceptable_compression_ratio", &TThis::MaxAcceptableCompressionRatio)
.Default(0.7)
- .GreaterThanOrEqual(0.);
- registrar.Parameter("max_decompression_blob_size", &TThis::MaxDecompressionBlobSize)
- .GreaterThan(0)
- .Default(64_MB);
+ .InRange(0, 1);
registrar.Postprocessor([] (TThis* config) {
if (config->DesiredSampleCount > config->MaxProcessedSampleCount) {
@@ -316,6 +310,18 @@ void TDictionaryCompressionConfig::Register(TRegistrar registrar)
///////////////////////////////////////////////////////////////////////////////
+void TDictionaryCompressionSessionConfig::Register(TRegistrar registrar)
+{
+ registrar.Parameter("compression_level", &TThis::CompressionLevel)
+ .InRange(1, NCompression::GetDictionaryCompressionCodec()->GetMaxCompressionLevel())
+ .Default(NCompression::GetDictionaryCompressionCodec()->GetDefaultCompressionLevel());
+ registrar.Parameter("max_decompression_blob_size", &TThis::MaxDecompressionBlobSize)
+ .GreaterThan(0)
+ .Default(64_MB);
+}
+
+///////////////////////////////////////////////////////////////////////////////
+
void TBatchHunkReaderConfig::Register(TRegistrar registrar)
{
registrar.Parameter("max_hunk_count_per_read", &TThis::MaxHunkCountPerRead)
diff --git a/yt/yt/client/table_client/config.h b/yt/yt/client/table_client/config.h
index 332285c7b5..4f6ebfbad1 100644
--- a/yt/yt/client/table_client/config.h
+++ b/yt/yt/client/table_client/config.h
@@ -259,15 +259,36 @@ public:
//! Upper limit on acceptable compression ratio. No chunk compression is performed if this limit is exceeded.
double MaxAcceptableCompressionRatio;
+ REGISTER_YSON_STRUCT(TDictionaryCompressionConfig);
+
+ static void Register(TRegistrar registrar);
+};
+
+DEFINE_REFCOUNTED_TYPE(TDictionaryCompressionConfig)
+
+////////////////////////////////////////////////////////////////////////////////
+
+class TDictionaryCompressionSessionConfig
+ : public virtual NYTree::TYsonStruct
+{
+public:
+ // Compression session options.
+
+ //! Level of compression algorithm.
+ //! Applied to digested compression dictionary upon its construction.
+ int CompressionLevel;
+
+ // Decompression session options.
+
//! Upper limit on content size of a batch that can be decompressed within a single iteration.
i64 MaxDecompressionBlobSize;
- REGISTER_YSON_STRUCT(TDictionaryCompressionConfig);
+ REGISTER_YSON_STRUCT(TDictionaryCompressionSessionConfig);
static void Register(TRegistrar registrar);
};
-DEFINE_REFCOUNTED_TYPE(TDictionaryCompressionConfig)
+DEFINE_REFCOUNTED_TYPE(TDictionaryCompressionSessionConfig)
////////////////////////////////////////////////////////////////////////////////
@@ -292,6 +313,7 @@ class TTableReaderConfig
, public virtual TChunkReaderConfig
, public TBatchHunkReaderConfig
, public NChunkClient::TChunkFragmentReaderConfig
+ , public TDictionaryCompressionSessionConfig
{
public:
bool SuppressAccessTracking;
diff --git a/yt/yt/client/table_client/public.h b/yt/yt/client/table_client/public.h
index 195ce2e684..ff8ce94195 100644
--- a/yt/yt/client/table_client/public.h
+++ b/yt/yt/client/table_client/public.h
@@ -352,6 +352,8 @@ DECLARE_REFCOUNTED_CLASS(TDictionaryCompressionConfig)
DECLARE_REFCOUNTED_CLASS(TBatchHunkReaderConfig)
+DECLARE_REFCOUNTED_CLASS(TDictionaryCompressionSessionConfig)
+
DECLARE_REFCOUNTED_CLASS(TTableReaderConfig)
DECLARE_REFCOUNTED_CLASS(TTableWriterConfig)
diff --git a/yt/yt/client/unittests/mock/client.h b/yt/yt/client/unittests/mock/client.h
index 4d081f3620..0ab1250f20 100644
--- a/yt/yt/client/unittests/mock/client.h
+++ b/yt/yt/client/unittests/mock/client.h
@@ -238,6 +238,10 @@ public:
const TBuildMasterSnapshotsOptions& options),
(override));
+ MOCK_METHOD(TFuture<TCellIdToSequenceNumberMap>, GetMasterConsistentState, (
+ const TGetMasterConsistentStateOptions& options),
+ (override));
+
MOCK_METHOD(TFuture<void>, ExitReadOnly, (
NHydra::TCellId cellId,
const TExitReadOnlyOptions& options),
diff --git a/yt/yt/core/concurrency/delayed_executor.cpp b/yt/yt/core/concurrency/delayed_executor.cpp
index e7f040bccc..12606becc8 100644
--- a/yt/yt/core/concurrency/delayed_executor.cpp
+++ b/yt/yt/core/concurrency/delayed_executor.cpp
@@ -172,8 +172,9 @@ private:
TPollerThread()
: TThread(
"DelayedPoller",
- NThreading::EThreadPriority::Normal,
- /*shutdownPriority*/ 200)
+ NThreading::TThreadOptions{
+ .ShutdownPriority = 200,
+ })
{ }
void EnqueueSubmission(TDelayedExecutorEntryPtr entry)
diff --git a/yt/yt/core/concurrency/fair_share_thread_pool.cpp b/yt/yt/core/concurrency/fair_share_thread_pool.cpp
index 68da649486..37be0d6a54 100644
--- a/yt/yt/core/concurrency/fair_share_thread_pool.cpp
+++ b/yt/yt/core/concurrency/fair_share_thread_pool.cpp
@@ -471,8 +471,9 @@ public:
std::move(callbackEventCount),
threadGroupName,
threadName,
- threadPriority,
- /*shutdownPriority*/ 0)
+ NThreading::TThreadOptions{
+ .ThreadPriority = threadPriority,
+ })
, Queue_(std::move(queue))
, Index_(index)
{ }
@@ -561,14 +562,14 @@ private:
TThreadPoolBase::DoConfigure(threadCount);
}
- TSchedulerThreadBasePtr SpawnThread(int index) override
+ TSchedulerThreadPtr SpawnThread(int index) override
{
return New<TFairShareThread>(
Queue_,
CallbackEventCount_,
ThreadNamePrefix_,
MakeThreadName(index),
- ThreadPriority_,
+ NThreading::EThreadPriority::Normal,
index);
}
};
diff --git a/yt/yt/core/concurrency/fiber_scheduler_thread.cpp b/yt/yt/core/concurrency/fiber_scheduler_thread.cpp
index 7fcb074e84..62c0e8ad68 100644
--- a/yt/yt/core/concurrency/fiber_scheduler_thread.cpp
+++ b/yt/yt/core/concurrency/fiber_scheduler_thread.cpp
@@ -845,14 +845,14 @@ private:
////////////////////////////////////////////////////////////////////////////////
TFiberSchedulerThread::TFiberSchedulerThread(
- const TString& threadGroupName,
- const TString& threadName,
- NThreading::EThreadPriority threadPriority,
- int shutdownPriority)
- : TThread(threadName, threadPriority, shutdownPriority)
- , ThreadGroupName_(threadGroupName)
+ TString threadGroupName,
+ TString threadName,
+ NThreading::TThreadOptions options)
+ : TThread(std::move(threadName), std::move(options))
+ , ThreadGroupName_(std::move(threadGroupName))
{ }
+
void TFiberSchedulerThread::ThreadMain()
{
// Hold this strongly.
diff --git a/yt/yt/core/concurrency/fiber_scheduler_thread.h b/yt/yt/core/concurrency/fiber_scheduler_thread.h
index 76144fd791..b368a7910c 100644
--- a/yt/yt/core/concurrency/fiber_scheduler_thread.h
+++ b/yt/yt/core/concurrency/fiber_scheduler_thread.h
@@ -16,18 +16,17 @@ class TFiberSchedulerThread
{
public:
TFiberSchedulerThread(
- const TString& threadGroupName,
- const TString& threadName,
- NThreading::EThreadPriority threadPriority = NThreading::EThreadPriority::Normal,
- int shutdownPriority = 0);
+ TString threadGroupName,
+ TString threadName,
+ NThreading::TThreadOptions options = {});
//! Empty callback signals about stopping.
virtual TClosure OnExecute() = 0;
private:
- void ThreadMain() override;
-
const TString ThreadGroupName_;
+
+ void ThreadMain() override;
};
////////////////////////////////////////////////////////////////////////////////
diff --git a/yt/yt/core/concurrency/new_fair_share_thread_pool.cpp b/yt/yt/core/concurrency/new_fair_share_thread_pool.cpp
index f996602e61..fcfc64eec7 100644
--- a/yt/yt/core/concurrency/new_fair_share_thread_pool.cpp
+++ b/yt/yt/core/concurrency/new_fair_share_thread_pool.cpp
@@ -1307,7 +1307,7 @@ private:
TThreadPoolBase::DoConfigure(threadCount);
}
- TSchedulerThreadBasePtr SpawnThread(int index) override
+ TSchedulerThreadPtr SpawnThread(int index) override
{
return New<TFairShareThread>(
Queue_,
diff --git a/yt/yt/core/concurrency/notify_manager.cpp b/yt/yt/core/concurrency/notify_manager.cpp
index 743f06f0ac..596b3bc346 100644
--- a/yt/yt/core/concurrency/notify_manager.cpp
+++ b/yt/yt/core/concurrency/notify_manager.cpp
@@ -17,7 +17,7 @@ constexpr auto WaitTimeWarningThreshold = TDuration::Seconds(30);
TNotifyManager::TNotifyManager(
TIntrusivePtr<NThreading::TEventCount> eventCount,
const NProfiling::TTagSet& tagSet,
- const TDuration pollingPeriod)
+ TDuration pollingPeriod)
: EventCount_(std::move(eventCount))
, WakeupCounter_(NProfiling::TProfiler("/action_queue")
.WithTags(tagSet)
diff --git a/yt/yt/core/concurrency/notify_manager.h b/yt/yt/core/concurrency/notify_manager.h
index f62383e3f2..efb71029d3 100644
--- a/yt/yt/core/concurrency/notify_manager.h
+++ b/yt/yt/core/concurrency/notify_manager.h
@@ -20,7 +20,7 @@ public:
TNotifyManager(
TIntrusivePtr<NThreading::TEventCount> eventCount,
const NProfiling::TTagSet& counterTagSet,
- const TDuration pollingPeriod);
+ TDuration pollingPeriod);
TCpuInstant ResetMinEnqueuedAt();
diff --git a/yt/yt/core/concurrency/private.h b/yt/yt/core/concurrency/private.h
index fcbcefe605..5d300edee3 100644
--- a/yt/yt/core/concurrency/private.h
+++ b/yt/yt/core/concurrency/private.h
@@ -52,7 +52,7 @@ using TMpscSuspendableSingleQueueSchedulerThreadPtr = TIntrusivePtr<TMpscSuspend
DECLARE_REFCOUNTED_CLASS(TFiber)
-DECLARE_REFCOUNTED_CLASS(TSchedulerThreadBase)
+DECLARE_REFCOUNTED_CLASS(TSchedulerThread)
DECLARE_REFCOUNTED_CLASS(TFairShareInvokerQueue)
DECLARE_REFCOUNTED_CLASS(TFairShareQueueSchedulerThread)
diff --git a/yt/yt/core/concurrency/quantized_executor.cpp b/yt/yt/core/concurrency/quantized_executor.cpp
index 93893e2b14..4487a6a205 100644
--- a/yt/yt/core/concurrency/quantized_executor.cpp
+++ b/yt/yt/core/concurrency/quantized_executor.cpp
@@ -21,28 +21,18 @@ public:
TQuantizedExecutor(
TString name,
ICallbackProviderPtr callbackProvider,
- int workerCount)
+ const TQuantizedExecutorOptions& options)
: Name_(std::move(name))
+ , CallbackProvider_(std::move(callbackProvider))
+ , Options_(options)
, Logger(ConcurrencyLogger.WithTag("Executor: %v", Name_))
, ControlQueue_(New<TActionQueue>(Format("%vCtl", Name_)))
, ControlInvoker_(ControlQueue_->GetInvoker())
- , CallbackProvider_(std::move(callbackProvider))
- , DesiredWorkerCount_(workerCount)
+ , DesiredWorkerCount_(options.WorkerCount)
{
VERIFY_INVOKER_THREAD_AFFINITY(ControlInvoker_, ControlThread);
}
- void Initialize(TCallback<void()> workerInitializer) override
- {
- WorkerInitializer_ = std::move(workerInitializer);
-
- BIND(&TQuantizedExecutor::DoReconfigure, MakeStrong(this))
- .AsyncVia(ControlInvoker_)
- .Run()
- .Get()
- .ThrowOnError();
- }
-
TFuture<void> Run(TDuration timeout) override
{
VERIFY_THREAD_AFFINITY_ANY();
@@ -56,11 +46,13 @@ public:
{
VERIFY_THREAD_AFFINITY_ANY();
- DesiredWorkerCount_ = workerCount;
+ DesiredWorkerCount_.store(workerCount);
}
private:
const TString Name_;
+ const ICallbackProviderPtr CallbackProvider_;
+ const TQuantizedExecutorOptions Options_;
const TLogger Logger;
@@ -68,9 +60,6 @@ private:
const IInvokerPtr ControlInvoker_;
YT_DECLARE_SPIN_LOCK(NThreading::TSpinLock, CallbackProviderLock_);
- ICallbackProviderPtr CallbackProvider_;
-
- TCallback<void()> WorkerInitializer_;
std::vector<ISuspendableActionQueuePtr> Workers_;
std::vector<IInvokerPtr> Invokers_;
@@ -106,21 +95,17 @@ private:
Workers_.reserve(desiredWorkerCount);
Invokers_.reserve(desiredWorkerCount);
for (int index = currentWorkerCount; index < desiredWorkerCount; ++index) {
- auto worker = CreateSuspendableActionQueue(Format("%v:%v", Name_, index));
+ auto worker = CreateSuspendableActionQueue(
+ /*threadName*/ Format("%v:%v", Name_, index),
+ {.ThreadInitializer = Options_.ThreadInitializer});
// NB: #GetInvoker initializes queue.
Invokers_.push_back(worker->GetInvoker());
- if (WorkerInitializer_) {
- BIND(WorkerInitializer_)
- .AsyncVia(Invokers_.back())
- .Run()
- .Get();
- }
worker->Suspend(/*immediately*/ true)
.Get()
.ThrowOnError();
- Workers_.emplace_back(std::move(worker));
+ Workers_.push_back(std::move(worker));
}
}
@@ -274,12 +259,12 @@ private:
IQuantizedExecutorPtr CreateQuantizedExecutor(
TString name,
ICallbackProviderPtr callbackProvider,
- int workerCount)
+ TQuantizedExecutorOptions options)
{
return New<TQuantizedExecutor>(
std::move(name),
std::move(callbackProvider),
- workerCount);
+ std::move(options));
}
////////////////////////////////////////////////////////////////////////////////
diff --git a/yt/yt/core/concurrency/quantized_executor.h b/yt/yt/core/concurrency/quantized_executor.h
index bea5a4f64c..bc6c78bad1 100644
--- a/yt/yt/core/concurrency/quantized_executor.h
+++ b/yt/yt/core/concurrency/quantized_executor.h
@@ -24,8 +24,6 @@ DEFINE_REFCOUNTED_TYPE(ICallbackProvider)
struct IQuantizedExecutor
: public TRefCounted
{
- virtual void Initialize(TCallback<void()> workerInitializer = {}) = 0;
-
//! Starts new quantum of time, returns a future that becomes set
//! when quantum ends.
/*!
@@ -38,7 +36,7 @@ struct IQuantizedExecutor
*/
virtual TFuture<void> Run(TDuration timeout) = 0;
- //! Updates number of workers.
+ //! Updates the number of workers.
virtual void Reconfigure(int workerCount) = 0;
};
@@ -46,10 +44,16 @@ DEFINE_REFCOUNTED_TYPE(IQuantizedExecutor)
////////////////////////////////////////////////////////////////////////////////
+struct TQuantizedExecutorOptions
+{
+ int WorkerCount = 1;
+ std::function<void()> ThreadInitializer;
+};
+
IQuantizedExecutorPtr CreateQuantizedExecutor(
TString name,
ICallbackProviderPtr callbackProvider,
- int workerCount);
+ TQuantizedExecutorOptions options = {});
////////////////////////////////////////////////////////////////////////////////
diff --git a/yt/yt/core/concurrency/scheduler_thread.cpp b/yt/yt/core/concurrency/scheduler_thread.cpp
index 7d89333427..b4a2999bee 100644
--- a/yt/yt/core/concurrency/scheduler_thread.cpp
+++ b/yt/yt/core/concurrency/scheduler_thread.cpp
@@ -11,59 +11,59 @@ using namespace NProfiling;
////////////////////////////////////////////////////////////////////////////////
-TSchedulerThreadBase::~TSchedulerThreadBase()
-{
- Stop();
-}
-
-TSchedulerThreadBase::TSchedulerThreadBase(
+TSchedulerThread::TSchedulerThread(
TIntrusivePtr<NThreading::TEventCount> callbackEventCount,
- const TString& threadGroupName,
- const TString& threadName,
- NThreading::EThreadPriority threadPriority,
- int shutdownPriority)
+ TString threadGroupName,
+ TString threadName,
+ NThreading::TThreadOptions options)
: TFiberSchedulerThread(
- threadGroupName,
- threadName,
- threadPriority,
- shutdownPriority)
+ std::move(threadGroupName),
+ std::move(threadName),
+ std::move(options))
, CallbackEventCount_(std::move(callbackEventCount))
{ }
-void TSchedulerThreadBase::OnStart()
+TSchedulerThread::~TSchedulerThread()
+{
+ Stop();
+}
+
+void TSchedulerThread::OnStart()
{ }
-void TSchedulerThreadBase::OnStop()
+void TSchedulerThread::OnStop()
{ }
-void TSchedulerThreadBase::Stop(bool graceful)
+void TSchedulerThread::Stop(bool graceful)
{
GracefulStop_.store(graceful);
TThread::Stop();
}
-void TSchedulerThreadBase::Stop()
+void TSchedulerThread::Stop()
{
+ TFiberSchedulerThread::Stop();
TThread::Stop();
}
-void TSchedulerThreadBase::StartEpilogue()
+void TSchedulerThread::StartEpilogue()
{
+ TFiberSchedulerThread::StartEpilogue();
OnStart();
}
-void TSchedulerThreadBase::StopPrologue()
+void TSchedulerThread::StopPrologue()
{
+ TFiberSchedulerThread::StopPrologue();
CallbackEventCount_->NotifyAll();
}
-void TSchedulerThreadBase::StopEpilogue()
+void TSchedulerThread::StopEpilogue()
{
+ TFiberSchedulerThread::StopEpilogue();
OnStop();
}
-////////////////////////////////////////////////////////////////////////////////
-
TClosure TSchedulerThread::OnExecute()
{
EndExecute();
diff --git a/yt/yt/core/concurrency/scheduler_thread.h b/yt/yt/core/concurrency/scheduler_thread.h
index 7cf4ee151b..c4953c9bd1 100644
--- a/yt/yt/core/concurrency/scheduler_thread.h
+++ b/yt/yt/core/concurrency/scheduler_thread.h
@@ -6,45 +6,29 @@ namespace NYT::NConcurrency {
////////////////////////////////////////////////////////////////////////////////
-class TSchedulerThreadBase
+class TSchedulerThread
: public TFiberSchedulerThread
{
public:
- ~TSchedulerThreadBase();
-
void Stop(bool graceful);
void Stop();
protected:
const TIntrusivePtr<NThreading::TEventCount> CallbackEventCount_;
+
std::atomic<bool> GracefulStop_ = false;
- TSchedulerThreadBase(
+ TSchedulerThread(
TIntrusivePtr<NThreading::TEventCount> callbackEventCount,
- const TString& threadGroupName,
- const TString& threadName,
- NThreading::EThreadPriority threadPriority = NThreading::EThreadPriority::Normal,
- int shutdownPriority = 0);
+ TString threadGroupName,
+ TString threadName,
+ NThreading::TThreadOptions options = {});
+
+ ~TSchedulerThread();
virtual void OnStart();
virtual void OnStop();
-private:
- void StartEpilogue() override;
- void StopPrologue() override;
- void StopEpilogue() override;
-};
-
-DEFINE_REFCOUNTED_TYPE(TSchedulerThreadBase)
-
-////////////////////////////////////////////////////////////////////////////////
-
-class TSchedulerThread
- : public TSchedulerThreadBase
-{
-protected:
- using TSchedulerThreadBase::TSchedulerThreadBase;
-
TClosure OnExecute() override;
virtual TClosure BeginExecute() = 0;
@@ -57,8 +41,14 @@ private:
void MaybeRunMaintenance(TCpuInstant now);
void RunMaintenance();
+
+ void StartEpilogue() override;
+ void StopPrologue() override;
+ void StopEpilogue() override;
};
+DEFINE_REFCOUNTED_TYPE(TSchedulerThread)
+
////////////////////////////////////////////////////////////////////////////////
} //namespace NYT::NConcurrency
diff --git a/yt/yt/core/concurrency/single_queue_scheduler_thread.cpp b/yt/yt/core/concurrency/single_queue_scheduler_thread.cpp
index 0b66712d25..df1a18964b 100644
--- a/yt/yt/core/concurrency/single_queue_scheduler_thread.cpp
+++ b/yt/yt/core/concurrency/single_queue_scheduler_thread.cpp
@@ -9,16 +9,14 @@ template <class TQueueImpl>
TSingleQueueSchedulerThread<TQueueImpl>::TSingleQueueSchedulerThread(
TInvokerQueuePtr<TQueueImpl> queue,
TIntrusivePtr<NThreading::TEventCount> callbackEventCount,
- const TString& threadGroupName,
- const TString& threadName,
- NThreading::EThreadPriority threadPriority,
- int shutdownPriority)
+ TString threadGroupName,
+ TString threadName,
+ NThreading::TThreadOptions options)
: TSchedulerThread(
std::move(callbackEventCount),
- threadGroupName,
- threadName,
- threadPriority,
- shutdownPriority)
+ std::move(threadGroupName),
+ std::move(threadName),
+ std::move(options))
, Queue_(std::move(queue))
, Token_(Queue_->MakeConsumerToken())
{ }
@@ -52,12 +50,14 @@ template <class TQueueImpl>
TSuspendableSingleQueueSchedulerThread<TQueueImpl>::TSuspendableSingleQueueSchedulerThread(
TInvokerQueuePtr<TQueueImpl> queue,
TIntrusivePtr<NThreading::TEventCount> callbackEventCount,
- const TString& threadGroupName,
- const TString& threadName)
+ TString threadGroupName,
+ TString threadName,
+ NThreading::TThreadOptions options)
: TSchedulerThread(
std::move(callbackEventCount),
- threadGroupName,
- threadName)
+ std::move(threadGroupName),
+ std::move(threadName),
+ std::move(options))
, Queue_(std::move(queue))
, Token_(Queue_->MakeConsumerToken())
{ }
diff --git a/yt/yt/core/concurrency/single_queue_scheduler_thread.h b/yt/yt/core/concurrency/single_queue_scheduler_thread.h
index 8ffe2232cc..2adffcde9d 100644
--- a/yt/yt/core/concurrency/single_queue_scheduler_thread.h
+++ b/yt/yt/core/concurrency/single_queue_scheduler_thread.h
@@ -18,10 +18,9 @@ public:
TSingleQueueSchedulerThread(
TInvokerQueuePtr<TQueueImpl> queue,
TIntrusivePtr<NThreading::TEventCount> callbackEventCount,
- const TString& threadGroupName,
- const TString& threadName,
- NThreading::EThreadPriority threadPriority = NThreading::EThreadPriority::Normal,
- int shutdownPriority = 0);
+ TString threadGroupName,
+ TString threadName,
+ NThreading::TThreadOptions options = {});
protected:
const TInvokerQueuePtr<TQueueImpl> Queue_;
@@ -45,8 +44,9 @@ public:
TSuspendableSingleQueueSchedulerThread(
TInvokerQueuePtr<TQueueImpl> queue,
TIntrusivePtr<NThreading::TEventCount> callbackEventCount,
- const TString& threadGroupName,
- const TString& threadName);
+ TString threadGroupName,
+ TString threadName,
+ NThreading::TThreadOptions options);
TFuture<void> Suspend(bool immediately);
diff --git a/yt/yt/core/concurrency/suspendable_action_queue.cpp b/yt/yt/core/concurrency/suspendable_action_queue.cpp
index 85b1c9aacf..c5e6bed21f 100644
--- a/yt/yt/core/concurrency/suspendable_action_queue.cpp
+++ b/yt/yt/core/concurrency/suspendable_action_queue.cpp
@@ -14,7 +14,9 @@ class TSuspendableActionQueue
: public ISuspendableActionQueue
{
public:
- explicit TSuspendableActionQueue(const TString& threadName)
+ TSuspendableActionQueue(
+ TString threadName,
+ TSuspendableActionQueueOptions options)
: Queue_(New<TMpscInvokerQueue>(
CallbackEventCount_,
GetThreadTags(threadName)))
@@ -23,7 +25,10 @@ public:
Queue_,
CallbackEventCount_,
threadName,
- threadName))
+ threadName,
+ NThreading::TThreadOptions{
+ .ThreadInitializer = options.ThreadInitializer,
+ }))
, ShutdownCookie_(RegisterShutdownCallback(
Format("SuspendableActionQueue(%v)", threadName),
BIND_NO_PROPAGATE(&TSuspendableActionQueue::Shutdown, MakeWeak(this), /*graceful*/ false),
@@ -71,6 +76,7 @@ public:
}
private:
+ const TSuspendableActionQueueOptions Options_;
const TIntrusivePtr<NThreading::TEventCount> CallbackEventCount_ = New<NThreading::TEventCount>();
const TMpscInvokerQueuePtr Queue_;
const IInvokerPtr Invoker_;
@@ -96,9 +102,13 @@ private:
////////////////////////////////////////////////////////////////////////////////
-ISuspendableActionQueuePtr CreateSuspendableActionQueue(const TString& threadName)
+ISuspendableActionQueuePtr CreateSuspendableActionQueue(
+ TString threadName,
+ TSuspendableActionQueueOptions options)
{
- return New<TSuspendableActionQueue>(threadName);
+ return New<TSuspendableActionQueue>(
+ std::move(threadName),
+ std::move(options));
}
////////////////////////////////////////////////////////////////////////////////
diff --git a/yt/yt/core/concurrency/suspendable_action_queue.h b/yt/yt/core/concurrency/suspendable_action_queue.h
index 0909aa0b49..4c8f4173fe 100644
--- a/yt/yt/core/concurrency/suspendable_action_queue.h
+++ b/yt/yt/core/concurrency/suspendable_action_queue.h
@@ -29,7 +29,14 @@ DEFINE_REFCOUNTED_TYPE(ISuspendableActionQueue)
////////////////////////////////////////////////////////////////////////////////
-ISuspendableActionQueuePtr CreateSuspendableActionQueue(const TString& threadName);
+struct TSuspendableActionQueueOptions
+{
+ std::function<void()> ThreadInitializer;
+};
+
+ISuspendableActionQueuePtr CreateSuspendableActionQueue(
+ TString threadName,
+ TSuspendableActionQueueOptions options = {});
////////////////////////////////////////////////////////////////////////////////
diff --git a/yt/yt/core/concurrency/system_invokers.cpp b/yt/yt/core/concurrency/system_invokers.cpp
index c3c389cdd4..f336b4d503 100644
--- a/yt/yt/core/concurrency/system_invokers.cpp
+++ b/yt/yt/core/concurrency/system_invokers.cpp
@@ -27,8 +27,9 @@ public:
CallbackEventCount_,
threadName,
threadName,
- NThreading::EThreadPriority::Normal,
- /*shutdownPriority*/ shutdownPriority - 1))
+ NThreading::TThreadOptions{
+ .ShutdownPriority = shutdownPriority - 1,
+ }))
, ShutdownCookie_(RegisterShutdownCallback(
Format("SystemInvokerThread:%v", threadName),
BIND_NO_PROPAGATE(&TSystemInvokerThread::Shutdown, this),
diff --git a/yt/yt/core/concurrency/thread_pool.cpp b/yt/yt/core/concurrency/thread_pool.cpp
index b2975b4baa..aa56320bfd 100644
--- a/yt/yt/core/concurrency/thread_pool.cpp
+++ b/yt/yt/core/concurrency/thread_pool.cpp
@@ -28,7 +28,7 @@ public:
TInvokerQueueAdapter(
TIntrusivePtr<NThreading::TEventCount> callbackEventCount,
const TTagSet& counterTagSet,
- const TDuration pollingPeriod)
+ TDuration pollingPeriod)
: TMpmcInvokerQueue(callbackEventCount, counterTagSet)
, TNotifyManager(callbackEventCount, counterTagSet, pollingPeriod)
{ }
@@ -97,18 +97,22 @@ public:
TIntrusivePtr<NThreading::TEventCount> callbackEventCount,
const TString& threadGroupName,
const TString& threadName,
- NThreading::EThreadPriority threadPriority)
+ const TThreadPoolOptions& options)
: TSchedulerThread(
callbackEventCount,
threadGroupName,
threadName,
- threadPriority,
- /*shutdownPriority*/ 0)
+ NThreading::TThreadOptions{
+ .ThreadPriority = options.ThreadPriority,
+ .ThreadInitializer = options.ThreadInitializer,
+ })
, Queue_(std::move(queue))
+ , Options_(options)
{ }
protected:
const TIntrusivePtr<TInvokerQueueAdapter> Queue_;
+ const TThreadPoolOptions Options_;
TEnqueuedAction CurrentAction_;
@@ -141,13 +145,13 @@ public:
TThreadPool(
int threadCount,
const TString& threadNamePrefix,
- NThreading::EThreadPriority threadPriority,
- const TDuration pollingPeriod)
- : TThreadPoolBase(threadNamePrefix, threadPriority)
+ const TThreadPoolOptions& options)
+ : TThreadPoolBase(threadNamePrefix)
+ , Options_(options)
, Queue_(New<TInvokerQueueAdapter>(
CallbackEventCount_,
GetThreadTags(ThreadNamePrefix_),
- pollingPeriod))
+ options.PollingPeriod))
, Invoker_(Queue_)
{
Configure(threadCount);
@@ -180,6 +184,7 @@ public:
}
private:
+ const TThreadPoolOptions Options_;
const TIntrusivePtr<NThreading::TEventCount> CallbackEventCount_ = New<NThreading::TEventCount>();
const TIntrusivePtr<TInvokerQueueAdapter> Queue_;
const IInvokerPtr Invoker_;
@@ -198,14 +203,14 @@ private:
});
}
- TSchedulerThreadBasePtr SpawnThread(int index) override
+ TSchedulerThreadPtr SpawnThread(int index) override
{
return New<TThreadPoolThread>(
Queue_,
CallbackEventCount_,
ThreadNamePrefix_,
MakeThreadName(index),
- ThreadPriority_);
+ Options_);
}
};
@@ -214,14 +219,12 @@ private:
IThreadPoolPtr CreateThreadPool(
int threadCount,
const TString& threadNamePrefix,
- NThreading::EThreadPriority threadPriority,
- TDuration pollingPeriod)
+ const TThreadPoolOptions& options)
{
return New<TThreadPool>(
threadCount,
threadNamePrefix,
- threadPriority,
- pollingPeriod);
+ options);
}
////////////////////////////////////////////////////////////////////////////////
diff --git a/yt/yt/core/concurrency/thread_pool.h b/yt/yt/core/concurrency/thread_pool.h
index 2792f9a876..9c1bbfd782 100644
--- a/yt/yt/core/concurrency/thread_pool.h
+++ b/yt/yt/core/concurrency/thread_pool.h
@@ -13,23 +13,38 @@ namespace NYT::NConcurrency {
struct IThreadPool
: public virtual TRefCounted
{
+ //! Terminates all the threads.
virtual void Shutdown() = 0;
- //! Returns current thread count, it can differ from value set by Configure()
- //! because it clamped between 1 and maximum thread count.
+ //! Returns the current thread count.
+ /*!
+ * This can differ from value set by #Configure
+ * because it clamped between 1 and the maximum thread count.
+ */
virtual int GetThreadCount() = 0;
+
+ //! Enables changing thread pool configuration at runtime.
virtual void Configure(int threadCount) = 0;
+ //! Returns the invoker for enqueuing callbacks into the thread pool.
virtual const IInvokerPtr& GetInvoker() = 0;
};
DEFINE_REFCOUNTED_TYPE(IThreadPool)
+////////////////////////////////////////////////////////////////////////////////
+
+struct TThreadPoolOptions
+{
+ NThreading::EThreadPriority ThreadPriority = NThreading::EThreadPriority::Normal;
+ TDuration PollingPeriod = TDuration::MilliSeconds(10);
+ std::function<void()> ThreadInitializer;
+};
+
IThreadPoolPtr CreateThreadPool(
int threadCount,
const TString& threadNamePrefix,
- NThreading::EThreadPriority threadPriority = NThreading::EThreadPriority::Normal,
- TDuration pollingPeriod = TDuration::MilliSeconds(10));
+ const TThreadPoolOptions& options = {});
////////////////////////////////////////////////////////////////////////////////
diff --git a/yt/yt/core/concurrency/thread_pool_detail.cpp b/yt/yt/core/concurrency/thread_pool_detail.cpp
index 90b56511b6..9e2b0ee1b2 100644
--- a/yt/yt/core/concurrency/thread_pool_detail.cpp
+++ b/yt/yt/core/concurrency/thread_pool_detail.cpp
@@ -13,11 +13,8 @@ static const auto& Logger = ConcurrencyLogger;
////////////////////////////////////////////////////////////////////////////////
-TThreadPoolBase::TThreadPoolBase(
- TString threadNamePrefix,
- NThreading::EThreadPriority threadPriority)
+TThreadPoolBase::TThreadPoolBase(TString threadNamePrefix)
: ThreadNamePrefix_(std::move(threadNamePrefix))
- , ThreadPriority_(threadPriority)
, ShutdownCookie_(RegisterShutdownCallback(
Format("ThreadPool(%v)", ThreadNamePrefix_),
BIND_NO_PROPAGATE(&TThreadPoolBase::Shutdown, MakeWeak(this)),
diff --git a/yt/yt/core/concurrency/thread_pool_detail.h b/yt/yt/core/concurrency/thread_pool_detail.h
index 412208a9bc..14a306d116 100644
--- a/yt/yt/core/concurrency/thread_pool_detail.h
+++ b/yt/yt/core/concurrency/thread_pool_detail.h
@@ -16,9 +16,7 @@ class TThreadPoolBase
public:
static constexpr int MaxThreadCount = 64;
- explicit TThreadPoolBase(
- TString threadNamePrefix,
- NThreading::EThreadPriority threadPriority = NThreading::EThreadPriority::Normal);
+ explicit TThreadPoolBase(TString threadNamePrefix);
void Configure(int threadCount);
void Shutdown();
@@ -28,7 +26,6 @@ public:
protected:
const TString ThreadNamePrefix_;
- const NThreading::EThreadPriority ThreadPriority_;
const TShutdownCookie ShutdownCookie_;
@@ -37,7 +34,7 @@ protected:
std::atomic<bool> ShutdownFlag_ = false;
YT_DECLARE_SPIN_LOCK(NThreading::TSpinLock, SpinLock_);
- std::vector<TSchedulerThreadBasePtr> Threads_;
+ std::vector<TSchedulerThreadPtr> Threads_;
void Resize();
@@ -48,7 +45,7 @@ protected:
virtual TClosure MakeFinalizerCallback();
virtual void DoConfigure(int threadCount);
- virtual TSchedulerThreadBasePtr SpawnThread(int index) = 0;
+ virtual TSchedulerThreadPtr SpawnThread(int index) = 0;
};
////////////////////////////////////////////////////////////////////////////////
diff --git a/yt/yt/core/concurrency/two_level_fair_share_thread_pool.cpp b/yt/yt/core/concurrency/two_level_fair_share_thread_pool.cpp
index d554844235..4711db5fb3 100644
--- a/yt/yt/core/concurrency/two_level_fair_share_thread_pool.cpp
+++ b/yt/yt/core/concurrency/two_level_fair_share_thread_pool.cpp
@@ -711,7 +711,7 @@ private:
TThreadPoolBase::DoConfigure(threadCount);
}
- TSchedulerThreadBasePtr SpawnThread(int index) override
+ TSchedulerThreadPtr SpawnThread(int index) override
{
return New<TFairShareThread>(
Queue_,
diff --git a/yt/yt/core/concurrency/unittests/quantized_executor_ut.cpp b/yt/yt/core/concurrency/unittests/quantized_executor_ut.cpp
index 3f7c7df097..08dd20cc5c 100644
--- a/yt/yt/core/concurrency/unittests/quantized_executor_ut.cpp
+++ b/yt/yt/core/concurrency/unittests/quantized_executor_ut.cpp
@@ -48,37 +48,36 @@ class TInitializingCallbackProvider
: public ICallbackProvider
{
public:
- TCallback<void()> GetInitializer()
+ std::function<void()> GetInitializer()
{
- return BIND([this, this_ = MakeStrong(this)] {
- Initialized_ = true;
- });
+ return [this, this_ = MakeStrong(this)] {
+ Initialized_.store(true);
+ };
}
TCallback<void()> ExtractCallback() override
{
- if (Finished_) {
+ if (IsFinished()) {
return {};
- } else {
- return BIND([this, this_ = MakeStrong(this)] {
- Finished_ = true;
- });
}
+ return BIND([this, this_ = MakeStrong(this)] {
+ Finished_.store(true);
+ });
}
bool IsInitialized() const
{
- return Initialized_;
+ return Initialized_.load();
}
bool IsFinished() const
{
- return Finished_;
+ return Finished_.load();
}
private:
- bool Initialized_ = false;
- bool Finished_ = false;
+ std::atomic<bool> Initialized_ = false;
+ std::atomic<bool> Finished_ = false;
};
class TLongCallbackProvider
@@ -131,15 +130,13 @@ protected:
void InitSimple(int workerCount, i64 iterationCount)
{
SimpleCallbackProvider_ = New<TSimpleCallbackProvider>(iterationCount);
- Executor_ = CreateQuantizedExecutor("test", SimpleCallbackProvider_, workerCount);
- Executor_->Initialize();
+ Executor_ = CreateQuantizedExecutor("test", SimpleCallbackProvider_, {.WorkerCount = workerCount});
}
void InitLong(int workerCount, i64 iterationCount)
{
LongCallbackProvider_ = New<TLongCallbackProvider>(iterationCount);
- Executor_ = CreateQuantizedExecutor("test", LongCallbackProvider_, workerCount);
- Executor_->Initialize();
+ Executor_ = CreateQuantizedExecutor("test", LongCallbackProvider_, {.WorkerCount = workerCount});
}
};
@@ -226,13 +223,10 @@ TEST_F(TQuantizedExecutorTest, Reconfigure)
TEST_F(TQuantizedExecutorTest, WorkerInitializer)
{
auto callbackProvider = New<TInitializingCallbackProvider>();
- EXPECT_FALSE(callbackProvider->IsInitialized());
EXPECT_FALSE(callbackProvider->IsFinished());
- Executor_ = CreateQuantizedExecutor("test", callbackProvider, /*workerCount*/ 1);
- Executor_->Initialize(callbackProvider->GetInitializer());
+ Executor_ = CreateQuantizedExecutor("test", callbackProvider, {.ThreadInitializer = callbackProvider->GetInitializer()});
- EXPECT_TRUE(callbackProvider->IsInitialized());
EXPECT_FALSE(callbackProvider->IsFinished());
WaitFor(Executor_->Run(TDuration::MilliSeconds(300)))
diff --git a/yt/yt/core/logging/log_manager.cpp b/yt/yt/core/logging/log_manager.cpp
index 1481e4053d..00bfe4cb1b 100644
--- a/yt/yt/core/logging/log_manager.cpp
+++ b/yt/yt/core/logging/log_manager.cpp
@@ -717,8 +717,9 @@ private:
owner->EventCount_,
"Logging",
"Logging",
- /*threadPriority*/ NThreading::EThreadPriority::Normal,
- /*shutdownPriority*/ 200)
+ NThreading::TThreadOptions{
+ .ShutdownPriority = 200,
+ })
, Owner_(owner)
{ }
diff --git a/yt/yt/core/misc/utf8_decoder.cpp b/yt/yt/core/misc/utf8_decoder.cpp
index 6acb70b69b..0767a3d901 100644
--- a/yt/yt/core/misc/utf8_decoder.cpp
+++ b/yt/yt/core/misc/utf8_decoder.cpp
@@ -66,7 +66,7 @@ TStringBuf TUtf8Transcoder::Decode(TStringBuf str)
i += 1;
} else {
THROW_ERROR_EXCEPTION("Unicode symbols with codes greater than 255 are not supported. "
- "Please refer to https://wiki.yandex-team.ru/yt/userdoc/formats/#json and "
+ "Please refer to https://ytsaurus.tech/docs/en/user-guide/storage/formats#json and "
"consider using encode_utf8=false in format options");
}
}
diff --git a/yt/yt/core/rpc/grpc/dispatcher.cpp b/yt/yt/core/rpc/grpc/dispatcher.cpp
index 8c1b959312..09ce5386dc 100644
--- a/yt/yt/core/rpc/grpc/dispatcher.cpp
+++ b/yt/yt/core/rpc/grpc/dispatcher.cpp
@@ -88,8 +88,7 @@ private:
TDispatcherThread(TGrpcLibraryLockPtr libraryLock, int index)
: TThread(
Format("Grpc/%v", index),
- NThreading::EThreadPriority::Normal,
- GrpcDispatcherThreadShutdownPriority)
+ {.ShutdownPriority = GrpcDispatcherThreadShutdownPriority})
, LibraryLock_(std::move(libraryLock))
, GuardedCompletionQueue_(TGrpcCompletionQueuePtr(grpc_completion_queue_create_for_next(nullptr)))
{
diff --git a/yt/yt/core/threading/thread.cpp b/yt/yt/core/threading/thread.cpp
index 8748862650..7bd7049493 100644
--- a/yt/yt/core/threading/thread.cpp
+++ b/yt/yt/core/threading/thread.cpp
@@ -29,11 +29,9 @@ static const auto& Logger = ThreadingLogger;
TThread::TThread(
TString threadName,
- EThreadPriority threadPriority,
- int shutdownPriority)
+ TThreadOptions options)
: ThreadName_(std::move(threadName))
- , ThreadPriority_(threadPriority)
- , ShutdownPriority_(shutdownPriority)
+ , Options_(std::move(options))
, UniqueThreadId_(++UniqueThreadIdGenerator)
, UnderlyingThread_(&StaticThreadMainTrampoline, this)
{ }
@@ -69,7 +67,7 @@ bool TThread::StartSlow()
ShutdownCookie_ = RegisterShutdownCallback(
Format("Thread(%v)", ThreadName_),
BIND_NO_PROPAGATE(&TThread::Stop, MakeWeak(this)),
- ShutdownPriority_);
+ Options_.ShutdownPriority);
if (!ShutdownCookie_) {
Stopping_ = true;
return false;
@@ -237,6 +235,10 @@ void TThread::ThreadMainTrampoline()
YT_THREAD_LOCAL(TExitInterceptor) Interceptor;
+ if (Options_.ThreadInitializer) {
+ Options_.ThreadInitializer();
+ }
+
ThreadMain();
GetTlsRef(Interceptor).Disarm();
@@ -261,7 +263,7 @@ void TThread::SetThreadPriority()
YT_VERIFY(ThreadId_ != InvalidThreadId);
#ifdef _linux_
- if (ThreadPriority_ == EThreadPriority::RealTime) {
+ if (Options_.ThreadPriority == EThreadPriority::RealTime) {
struct sched_param param{
.sched_priority = 1
};
@@ -275,7 +277,7 @@ void TThread::SetThreadPriority()
}
}
#else
- Y_UNUSED(ThreadPriority_);
+ Y_UNUSED(Options_);
Y_UNUSED(Logger);
#endif
}
diff --git a/yt/yt/core/threading/thread.h b/yt/yt/core/threading/thread.h
index 2599f18db2..ac9c9885ec 100644
--- a/yt/yt/core/threading/thread.h
+++ b/yt/yt/core/threading/thread.h
@@ -13,6 +13,13 @@ namespace NYT::NThreading {
////////////////////////////////////////////////////////////////////////////////
+struct TThreadOptions
+{
+ NThreading::EThreadPriority ThreadPriority = NThreading::EThreadPriority::Normal;
+ std::function<void()> ThreadInitializer;
+ int ShutdownPriority = 0;
+};
+
//! A shutdown-aware thread wrapper.
class TThread
: public virtual TRefCounted
@@ -20,8 +27,7 @@ class TThread
public:
explicit TThread(
TString threadName,
- EThreadPriority threadPriority = EThreadPriority::Normal,
- int shutdownPriority = 0);
+ TThreadOptions options = {});
~TThread();
//! Ensures the thread is started.
@@ -55,8 +61,7 @@ protected:
private:
const TString ThreadName_;
- const EThreadPriority ThreadPriority_;
- const int ShutdownPriority_;
+ const TThreadOptions Options_;
const TThreadId UniqueThreadId_;
diff --git a/yt/yt/core/ytree/virtual.cpp b/yt/yt/core/ytree/virtual.cpp
index 9a1c475cfe..4cff5fa7bb 100644
--- a/yt/yt/core/ytree/virtual.cpp
+++ b/yt/yt/core/ytree/virtual.cpp
@@ -66,7 +66,7 @@ void ExecuteBatchRead(
auto batchFuture = BIND([writeItems, batchWriter, batchIndexRange = batchIndexRanges[index]] {
writeItems(batchIndexRange, batchWriter);
})
- .AsyncVia(NRpc::TDispatcher::Get()->GetHeavyInvoker())
+ .AsyncVia(offloadParams->OffloadInvoker)
.Run();
batchFutures.push_back(std::move(batchFuture));
}
diff --git a/yt/yt/core/ytree/virtual.h b/yt/yt/core/ytree/virtual.h
index 74a88f0dc5..0ed73bc9ad 100644
--- a/yt/yt/core/ytree/virtual.h
+++ b/yt/yt/core/ytree/virtual.h
@@ -12,6 +12,7 @@ namespace NYT::NYTree {
struct TVirtualCompositeNodeReadOffloadParams
{
+ IInvokerPtr OffloadInvoker;
NConcurrency::EWaitForStrategy WaitForStrategy = NConcurrency::EWaitForStrategy::WaitFor;
i64 BatchSize = 10'000;
};