diff options
author | robot-piglet <robot-piglet@yandex-team.com> | 2024-03-10 19:22:41 +0300 |
---|---|---|
committer | robot-piglet <robot-piglet@yandex-team.com> | 2024-03-10 19:31:09 +0300 |
commit | 13a34e8a2fe1c3498a9a3e1d56202bb29eb5d17b (patch) | |
tree | f188fb0f394d6b20e68951e88cf555610dad8cf1 | |
parent | e0439374e8770430b5a391cea94769059544e2a2 (diff) | |
download | ydb-13a34e8a2fe1c3498a9a3e1d56202bb29eb5d17b.tar.gz |
Intermediate changes
58 files changed, 418 insertions, 276 deletions
diff --git a/contrib/python/hypothesis/py3/.dist-info/METADATA b/contrib/python/hypothesis/py3/.dist-info/METADATA index 171a09eebb..95fe6d3510 100644 --- a/contrib/python/hypothesis/py3/.dist-info/METADATA +++ b/contrib/python/hypothesis/py3/.dist-info/METADATA @@ -1,6 +1,6 @@ Metadata-Version: 2.1 Name: hypothesis -Version: 6.98.10 +Version: 6.98.11 Summary: A library for property-based testing Home-page: https://hypothesis.works Author: David R. MacIver and Zac Hatfield-Dodds diff --git a/contrib/python/hypothesis/py3/hypothesis/internal/conjecture/data.py b/contrib/python/hypothesis/py3/hypothesis/internal/conjecture/data.py index cdce1afacf..b84db4df85 100644 --- a/contrib/python/hypothesis/py3/hypothesis/internal/conjecture/data.py +++ b/contrib/python/hypothesis/py3/hypothesis/internal/conjecture/data.py @@ -62,11 +62,14 @@ from hypothesis.internal.floats import ( from hypothesis.internal.intervalsets import IntervalSet if TYPE_CHECKING: + from typing import TypeAlias + from typing_extensions import dataclass_transform from hypothesis.strategies import SearchStrategy from hypothesis.strategies._internal.strategies import Ex else: + TypeAlias = object def dataclass_transform(): def wrapper(tp): @@ -94,6 +97,41 @@ TargetObservations = Dict[Optional[str], Union[int, float]] T = TypeVar("T") +class IntegerKWargs(TypedDict): + min_value: Optional[int] + max_value: Optional[int] + weights: Optional[Sequence[float]] + shrink_towards: int + + +class FloatKWargs(TypedDict): + min_value: float + max_value: float + allow_nan: bool + smallest_nonzero_magnitude: float + + +class StringKWargs(TypedDict): + intervals: IntervalSet + min_size: int + max_size: Optional[int] + + +class BytesKWargs(TypedDict): + size: int + + +class BooleanKWargs(TypedDict): + p: float + + +IRType: TypeAlias = Union[int, str, bool, float, bytes] +IRKWargsType: TypeAlias = Union[ + IntegerKWargs, FloatKWargs, StringKWargs, BytesKWargs, BooleanKWargs +] +IRTypeName: TypeAlias = Literal["integer", "string", "boolean", "float", "bytes"] + + class ExtraInformation: """A class for holding shared state on a ``ConjectureData`` that should be added to the final ``ConjectureResult``.""" @@ -798,34 +836,6 @@ global_test_counter = 0 MAX_DEPTH = 100 -class IntegerKWargs(TypedDict): - min_value: Optional[int] - max_value: Optional[int] - weights: Optional[Sequence[float]] - shrink_towards: int - - -class FloatKWargs(TypedDict): - min_value: float - max_value: float - allow_nan: bool - smallest_nonzero_magnitude: float - - -class StringKWargs(TypedDict): - intervals: IntervalSet - min_size: int - max_size: Optional[int] - - -class BytesKWargs(TypedDict): - size: int - - -class BooleanKWargs(TypedDict): - p: float - - class DataObserver: """Observer class for recording the behaviour of a ConjectureData object, primarily used for tracking diff --git a/contrib/python/hypothesis/py3/hypothesis/internal/conjecture/datatree.py b/contrib/python/hypothesis/py3/hypothesis/internal/conjecture/datatree.py index a9a6e5b196..c8e5d70aa7 100644 --- a/contrib/python/hypothesis/py3/hypothesis/internal/conjecture/datatree.py +++ b/contrib/python/hypothesis/py3/hypothesis/internal/conjecture/datatree.py @@ -10,7 +10,7 @@ import itertools import math -from typing import TYPE_CHECKING, List, Literal, Optional, Union +from typing import List, Optional, Union import attr @@ -24,23 +24,14 @@ from hypothesis.internal.conjecture.data import ( DataObserver, FloatKWargs, IntegerKWargs, + IRKWargsType, + IRType, + IRTypeName, Status, StringKWargs, ) from hypothesis.internal.floats import count_between_floats, float_to_int, int_to_float -if TYPE_CHECKING: - from typing import TypeAlias -else: - TypeAlias = object - -IRType: TypeAlias = Union[int, str, bool, float, bytes] -IRKWargsType: TypeAlias = Union[ - IntegerKWargs, FloatKWargs, StringKWargs, BytesKWargs, BooleanKWargs -] -# this would be "IRTypeType", but that's just confusing. -IRLiteralType: TypeAlias = Literal["integer", "string", "boolean", "float", "bytes"] - class PreviouslyUnseenBehaviour(HypothesisException): pass @@ -336,7 +327,7 @@ class TreeNode: # have the same length. The values at index i belong to node i. kwargs: List[IRKWargsType] = attr.ib(factory=list) values: List[IRType] = attr.ib(factory=list) - ir_types: List[IRLiteralType] = attr.ib(factory=list) + ir_types: List[IRTypeName] = attr.ib(factory=list) # The indices of nodes which had forced values. # @@ -885,7 +876,7 @@ class TreeRecordingObserver(DataObserver): def draw_value( self, - ir_type: IRLiteralType, + ir_type: IRTypeName, value: IRType, *, was_forced: bool, diff --git a/contrib/python/hypothesis/py3/hypothesis/stateful.py b/contrib/python/hypothesis/py3/hypothesis/stateful.py index 2ab7ef13d5..60cd92721c 100644 --- a/contrib/python/hypothesis/py3/hypothesis/stateful.py +++ b/contrib/python/hypothesis/py3/hypothesis/stateful.py @@ -358,7 +358,6 @@ class RuleBasedStateMachine(metaclass=StateMachineMeta): return cls._invariants_per_class[cls] def _repr_step(self, rule, data, result): - self.step_count = getattr(self, "step_count", 0) + 1 output_assignment = "" if rule.targets: if isinstance(result, MultipleResults): @@ -431,7 +430,7 @@ class RuleBasedStateMachine(metaclass=StateMachineMeta): return StateMachineTestCase -@attr.s() +@attr.s(repr=False) class Rule: targets = attr.ib() function = attr.ib(repr=get_pretty_function_description) @@ -451,6 +450,11 @@ class Rule: self.arguments_strategies[k] = v self.bundles = tuple(bundles) + def __repr__(self) -> str: + rep = get_pretty_function_description + bits = [f"{k}={rep(v)}" for k, v in attr.asdict(self).items() if v] + return f"{self.__class__.__name__}({', '.join(bits)})" + self_strategy = st.runner() @@ -937,7 +941,8 @@ class RuleStrategy(SearchStrategy): self.rules = list(machine.rules()) self.enabled_rules_strategy = st.shared( - FeatureStrategy(), key=("enabled rules", machine) + FeatureStrategy(at_least_one_of={r.function.__name__ for r in self.rules}), + key=("enabled rules", machine), ) # The order is a bit arbitrary. Primarily we're trying to group rules @@ -965,17 +970,16 @@ class RuleStrategy(SearchStrategy): feature_flags = data.draw(self.enabled_rules_strategy) - # Note: The order of the filters here is actually quite important, - # because checking is_enabled makes choices, so increases the size of - # the choice sequence. This means that if we are in a case where many - # rules are invalid we will make a lot more choices if we ask if they - # are enabled before we ask if they are valid, so our test cases will - # be artificially large. - rule = data.draw( - st.sampled_from(self.rules) - .filter(self.is_valid) - .filter(lambda r: feature_flags.is_enabled(r.function.__name__)) - ) + def rule_is_enabled(r): + # Note: The order of the filters here is actually quite important, + # because checking is_enabled makes choices, so increases the size of + # the choice sequence. This means that if we are in a case where many + # rules are invalid we would make a lot more choices if we ask if they + # are enabled before we ask if they are valid, so our test cases would + # be artificially large. + return self.is_valid(r) and feature_flags.is_enabled(r.function.__name__) + + rule = data.draw(st.sampled_from(self.rules).filter(rule_is_enabled)) arguments = {} for k, strat in rule.arguments_strategies.items(): diff --git a/contrib/python/hypothesis/py3/hypothesis/strategies/_internal/collections.py b/contrib/python/hypothesis/py3/hypothesis/strategies/_internal/collections.py index 1f86f37a42..e8f8f21ba4 100644 --- a/contrib/python/hypothesis/py3/hypothesis/strategies/_internal/collections.py +++ b/contrib/python/hypothesis/py3/hypothesis/strategies/_internal/collections.py @@ -13,6 +13,7 @@ from typing import Any, Iterable, Tuple, overload from hypothesis.errors import InvalidArgument from hypothesis.internal.conjecture import utils as cu +from hypothesis.internal.conjecture.engine import BUFFER_SIZE from hypothesis.internal.conjecture.junkdrawer import LazySequenceCopy from hypothesis.internal.conjecture.utils import combine_labels from hypothesis.internal.filtering import get_integer_predicate_bounds @@ -142,6 +143,10 @@ class ListStrategy(SearchStrategy): self.min_size = min_size or 0 self.max_size = max_size if max_size is not None else float("inf") assert 0 <= self.min_size <= self.max_size + if min_size > BUFFER_SIZE: + raise InvalidArgument( + f"min_size={min_size:_d} is larger than Hypothesis is designed to handle" + ) self.average_size = min( max(self.min_size * 2, self.min_size + 5), 0.5 * (self.min_size + self.max_size), diff --git a/contrib/python/hypothesis/py3/hypothesis/strategies/_internal/featureflags.py b/contrib/python/hypothesis/py3/hypothesis/strategies/_internal/featureflags.py index cf72b5c10b..98af8f087a 100644 --- a/contrib/python/hypothesis/py3/hypothesis/strategies/_internal/featureflags.py +++ b/contrib/python/hypothesis/py3/hypothesis/strategies/_internal/featureflags.py @@ -31,7 +31,7 @@ class FeatureFlags: required disabled features. """ - def __init__(self, data=None, enabled=(), disabled=()): + def __init__(self, data=None, enabled=(), disabled=(), at_least_one_of=()): self.__data = data self.__is_disabled = {} @@ -52,13 +52,18 @@ class FeatureFlags: # features will be enabled. This is so that we shrink in the direction # of more features being enabled. if self.__data is not None: - self.__p_disabled = data.draw_integer(0, 255) / 255.0 + self.__p_disabled = data.draw_integer(0, 254) / 255 else: # If data is None we're in example mode so all that matters is the # enabled/disabled lists above. We set this up so that everything # else is enabled by default. self.__p_disabled = 0.0 + # The naive approach can lead to disabling e.g. every single rule on a + # RuleBasedStateMachine, which aborts the test as unable to make progress. + # Track the set of possible names, and ensure that at least one is enabled. + self.__at_least_one_of = set(at_least_one_of) + def is_enabled(self, name): """Tests whether the feature named ``name`` should be enabled on this test run.""" @@ -81,10 +86,19 @@ class FeatureFlags: # of the test case where we originally decided, the next point at # which we make this decision just makes the decision it previously # made. + oneof = self.__at_least_one_of is_disabled = self.__data.draw_boolean( - self.__p_disabled, forced=self.__is_disabled.get(name) + self.__p_disabled, + forced=( + False + if len(oneof) == 1 and name in oneof + else self.__is_disabled.get(name) + ), ) self.__is_disabled[name] = is_disabled + if name in oneof and not is_disabled: + oneof.clear() + oneof.discard(name) data.stop_example() return not is_disabled @@ -100,5 +114,9 @@ class FeatureFlags: class FeatureStrategy(SearchStrategy): + def __init__(self, at_least_one_of=()): + super().__init__() + self._at_least_one_of = frozenset(at_least_one_of) + def do_draw(self, data): - return FeatureFlags(data) + return FeatureFlags(data, at_least_one_of=self._at_least_one_of) diff --git a/contrib/python/hypothesis/py3/hypothesis/version.py b/contrib/python/hypothesis/py3/hypothesis/version.py index ce617c47e9..da7f74708c 100644 --- a/contrib/python/hypothesis/py3/hypothesis/version.py +++ b/contrib/python/hypothesis/py3/hypothesis/version.py @@ -8,5 +8,5 @@ # v. 2.0. If a copy of the MPL was not distributed with this file, You can # obtain one at https://mozilla.org/MPL/2.0/. -__version_info__ = (6, 98, 10) +__version_info__ = (6, 98, 11) __version__ = ".".join(map(str, __version_info__)) diff --git a/contrib/python/hypothesis/py3/ya.make b/contrib/python/hypothesis/py3/ya.make index 4c7de0c00e..c71ce1c809 100644 --- a/contrib/python/hypothesis/py3/ya.make +++ b/contrib/python/hypothesis/py3/ya.make @@ -2,7 +2,7 @@ PY3_LIBRARY() -VERSION(6.98.10) +VERSION(6.98.11) LICENSE(MPL-2.0) diff --git a/contrib/python/setuptools/py3/.dist-info/METADATA b/contrib/python/setuptools/py3/.dist-info/METADATA index 6abcbef24b..237cc66f13 100644 --- a/contrib/python/setuptools/py3/.dist-info/METADATA +++ b/contrib/python/setuptools/py3/.dist-info/METADATA @@ -1,6 +1,6 @@ Metadata-Version: 2.1 Name: setuptools -Version: 69.1.0 +Version: 69.1.1 Summary: Easily download, build, install, upgrade, and uninstall Python packages Home-page: https://github.com/pypa/setuptools Author: Python Packaging Authority @@ -43,6 +43,7 @@ Requires-Dist: flake8-2020 ; extra == 'testing' Requires-Dist: virtualenv >=13.0.0 ; extra == 'testing' Requires-Dist: wheel ; extra == 'testing' Requires-Dist: pip >=19.1 ; extra == 'testing' +Requires-Dist: packaging >=23.2 ; extra == 'testing' Requires-Dist: jaraco.envs >=2.2 ; extra == 'testing' Requires-Dist: pytest-xdist ; extra == 'testing' Requires-Dist: jaraco.path >=3.2.0 ; extra == 'testing' @@ -63,7 +64,7 @@ Requires-Dist: jaraco.path >=3.2.0 ; extra == 'testing-integration' Requires-Dist: jaraco.envs >=2.2 ; extra == 'testing-integration' Requires-Dist: build[virtualenv] >=1.0.3 ; extra == 'testing-integration' Requires-Dist: filelock >=3.4.0 ; extra == 'testing-integration' -Requires-Dist: packaging >=23.1 ; extra == 'testing-integration' +Requires-Dist: packaging >=23.2 ; extra == 'testing-integration' Requires-Dist: pytest-cov ; (platform_python_implementation != "PyPy") and extra == 'testing' Requires-Dist: pytest-mypy >=0.9.1 ; (platform_python_implementation != "PyPy") and extra == 'testing' Requires-Dist: jaraco.develop >=7.21 ; (python_version >= "3.9" and sys_platform != "cygwin") and extra == 'testing' diff --git a/contrib/python/setuptools/py3/setuptools/build_meta.py b/contrib/python/setuptools/py3/setuptools/build_meta.py index 0a0abfdae0..2decd2d214 100644 --- a/contrib/python/setuptools/py3/setuptools/build_meta.py +++ b/contrib/python/setuptools/py3/setuptools/build_meta.py @@ -369,7 +369,12 @@ class _BuildMetaBackend(_ConfigSettingsTranslator): return self._bubble_up_info_directory(metadata_directory, ".dist-info") def _build_with_temp_dir( - self, setup_command, result_extension, result_directory, config_settings + self, + setup_command, + result_extension, + result_directory, + config_settings, + arbitrary_args=(), ): result_directory = os.path.abspath(result_directory) @@ -384,6 +389,7 @@ class _BuildMetaBackend(_ConfigSettingsTranslator): *setup_command, "--dist-dir", tmp_dist_dir, + *arbitrary_args, ] with no_install_setup_requires(): self.run_setup() @@ -402,10 +408,11 @@ class _BuildMetaBackend(_ConfigSettingsTranslator): ): with suppress_known_deprecation(): return self._build_with_temp_dir( - ['bdist_wheel', *self._arbitrary_args(config_settings)], + ['bdist_wheel'], '.whl', wheel_directory, config_settings, + self._arbitrary_args(config_settings), ) def build_sdist(self, sdist_directory, config_settings=None): diff --git a/contrib/python/setuptools/py3/ya.make b/contrib/python/setuptools/py3/ya.make index 564d268875..4210fba192 100644 --- a/contrib/python/setuptools/py3/ya.make +++ b/contrib/python/setuptools/py3/ya.make @@ -2,7 +2,7 @@ PY3_LIBRARY() -VERSION(69.1.0) +VERSION(69.1.1) LICENSE(MIT) diff --git a/yt/cpp/mapreduce/interface/operation.h b/yt/cpp/mapreduce/interface/operation.h index 69c873090d..09abf1f88a 100644 --- a/yt/cpp/mapreduce/interface/operation.h +++ b/yt/cpp/mapreduce/interface/operation.h @@ -1202,7 +1202,7 @@ struct TRawMapReduceOperationSpec /// /// @brief Schema inference mode. /// -/// @see https://ytsaurus.tech/docs/en/user-guide/storage/static_schema.html#schema_inference +/// @see https://ytsaurus.tech/docs/en/user-guide/storage/static-schema.html#schema_inference enum class ESchemaInferenceMode : int { FromInput /* "from_input" */, @@ -1258,7 +1258,7 @@ struct TSortOperationSpec /// /// @brief Inference mode for output table schema. /// - /// @see https://ytsaurus.tech/docs/en/user-guide/storage/static_schema.html#schema_inference + /// @see https://ytsaurus.tech/docs/en/user-guide/storage/static-schema.html#schema_inference FLUENT_FIELD_OPTION(ESchemaInferenceMode, SchemaInferenceMode); /// @@ -1332,7 +1332,7 @@ struct TMergeOperationSpec /// /// @brief Inference mode for output table schema. /// - /// @see https://ytsaurus.tech/docs/en/user-guide/storage/static_schema.html#schema_inference + /// @see https://ytsaurus.tech/docs/en/user-guide/storage/static-schema.html#schema_inference FLUENT_FIELD_OPTION(ESchemaInferenceMode, SchemaInferenceMode); }; @@ -1358,7 +1358,7 @@ struct TEraseOperationSpec /// /// @brief Inference mode for output table schema. /// - /// @see https://ytsaurus.tech/docs/en/user-guide/storage/static_schema.html#schema_inference + /// @see https://ytsaurus.tech/docs/en/user-guide/storage/static-schema.html#schema_inference FLUENT_FIELD_OPTION(ESchemaInferenceMode, SchemaInferenceMode); }; @@ -1392,7 +1392,7 @@ struct TRemoteCopyOperationSpec /// /// @brief Inference mode for output table schema. /// - /// @see https://ytsaurus.tech/docs/en/user-guide/storage/static_schema.html#schema_inference + /// @see https://ytsaurus.tech/docs/en/user-guide/storage/static-schema.html#schema_inference FLUENT_FIELD_OPTION(ESchemaInferenceMode, SchemaInferenceMode); /// diff --git a/yt/yt/client/api/admin_client.h b/yt/yt/client/api/admin_client.h index cf8d789c57..1780ad0ab7 100644 --- a/yt/yt/client/api/admin_client.h +++ b/yt/yt/client/api/admin_client.h @@ -28,6 +28,10 @@ struct TBuildMasterSnapshotsOptions bool Retry = true; }; +struct TGetMasterConsistentStateOptions + : public TTimeoutOptions +{ }; + struct TExitReadOnlyOptions : public TTimeoutOptions { }; @@ -155,6 +159,7 @@ struct TResurrectChunkLocationsResult }; using TCellIdToSnapshotIdMap = THashMap<NHydra::TCellId, int>; +using TCellIdToSequenceNumberMap = THashMap<NHydra::TCellId, i64>; struct TAddMaintenanceOptions : public TTimeoutOptions @@ -200,6 +205,9 @@ struct IAdminClient virtual TFuture<TCellIdToSnapshotIdMap> BuildMasterSnapshots( const TBuildMasterSnapshotsOptions& options = {}) = 0; + virtual TFuture<TCellIdToSequenceNumberMap> GetMasterConsistentState( + const TGetMasterConsistentStateOptions& options = {}) = 0; + virtual TFuture<void> ExitReadOnly( NHydra::TCellId cellId, const TExitReadOnlyOptions& options = {}) = 0; diff --git a/yt/yt/client/api/delegating_client.h b/yt/yt/client/api/delegating_client.h index 97d46ce93a..e4d3341492 100644 --- a/yt/yt/client/api/delegating_client.h +++ b/yt/yt/client/api/delegating_client.h @@ -558,6 +558,10 @@ public: const TBuildMasterSnapshotsOptions& options), (options)) + DELEGATE_METHOD(TFuture<TCellIdToSequenceNumberMap>, GetMasterConsistentState, ( + const TGetMasterConsistentStateOptions& options), + (options)) + DELEGATE_METHOD(TFuture<void>, ExitReadOnly, ( NHydra::TCellId cellId, const TExitReadOnlyOptions& options), diff --git a/yt/yt/client/api/rpc_proxy/client_impl.cpp b/yt/yt/client/api/rpc_proxy/client_impl.cpp index 99706535aa..a0d99186d2 100644 --- a/yt/yt/client/api/rpc_proxy/client_impl.cpp +++ b/yt/yt/client/api/rpc_proxy/client_impl.cpp @@ -1648,6 +1648,11 @@ TFuture<TCellIdToSnapshotIdMap> TClient::BuildMasterSnapshots(const TBuildMaster ThrowUnimplemented("BuildMasterSnapshots"); } +TFuture<TCellIdToSequenceNumberMap> TClient::GetMasterConsistentState(const TGetMasterConsistentStateOptions& /*options*/) +{ + ThrowUnimplemented("GetMasterConsistentState"); +} + TFuture<void> TClient::ExitReadOnly( NHydra::TCellId cellId, const TExitReadOnlyOptions& /*options*/) diff --git a/yt/yt/client/api/rpc_proxy/client_impl.h b/yt/yt/client/api/rpc_proxy/client_impl.h index 8dc59e06a0..b2854eff49 100644 --- a/yt/yt/client/api/rpc_proxy/client_impl.h +++ b/yt/yt/client/api/rpc_proxy/client_impl.h @@ -323,6 +323,9 @@ public: TFuture<TCellIdToSnapshotIdMap> BuildMasterSnapshots( const TBuildMasterSnapshotsOptions& options) override; + TFuture<TCellIdToSequenceNumberMap> GetMasterConsistentState( + const TGetMasterConsistentStateOptions& options) override; + TFuture<void> ExitReadOnly( NHydra::TCellId cellId, const TExitReadOnlyOptions& options) override; diff --git a/yt/yt/client/driver/admin_commands.cpp b/yt/yt/client/driver/admin_commands.cpp index c1af19fc13..4586547f11 100644 --- a/yt/yt/client/driver/admin_commands.cpp +++ b/yt/yt/client/driver/admin_commands.cpp @@ -101,6 +101,26 @@ void TBuildMasterSnapshotsCommand::DoExecute(ICommandContextPtr context) //////////////////////////////////////////////////////////////////////////////// +void TGetMasterConsistentStateCommand::Register(TRegistrar /*registrar*/) +{ } + +void TGetMasterConsistentStateCommand::DoExecute(ICommandContextPtr context) +{ + auto cellIdToSequenceNumber = WaitFor(context->GetClient()->GetMasterConsistentState(Options)) + .ValueOrThrow(); + + context->ProduceOutputValue(BuildYsonStringFluently() + .DoListFor(cellIdToSequenceNumber, [=] (TFluentList fluent, const auto& pair) { + fluent + .Item().BeginMap() + .Item("cell_id").Value(pair.first) + .Item("sequence_number").Value(pair.second) + .EndMap(); + })); +} + +//////////////////////////////////////////////////////////////////////////////// + void TExitReadOnlyCommand::Register(TRegistrar registrar) { registrar.Parameter("cell_id", &TThis::CellId_); diff --git a/yt/yt/client/driver/admin_commands.h b/yt/yt/client/driver/admin_commands.h index 0665eb3494..a61576abc2 100644 --- a/yt/yt/client/driver/admin_commands.h +++ b/yt/yt/client/driver/admin_commands.h @@ -34,6 +34,20 @@ private: //////////////////////////////////////////////////////////////////////////////// +class TGetMasterConsistentStateCommand + : public TTypedCommand<NApi::TGetMasterConsistentStateOptions> +{ +public: + REGISTER_YSON_STRUCT_LITE(TGetMasterConsistentStateCommand); + + static void Register(TRegistrar registrar); + +private: + void DoExecute(ICommandContextPtr context) override; +}; + +//////////////////////////////////////////////////////////////////////////////// + class TExitReadOnlyCommand : public TTypedCommand<NApi::TExitReadOnlyOptions> { diff --git a/yt/yt/client/driver/driver.cpp b/yt/yt/client/driver/driver.cpp index 94fd78bdf6..a57237f46b 100644 --- a/yt/yt/client/driver/driver.cpp +++ b/yt/yt/client/driver/driver.cpp @@ -316,6 +316,7 @@ public: REGISTER_ALL(TBuildSnapshotCommand, "build_snapshot", Null, Structured, true, false); REGISTER_ALL(TBuildMasterSnapshotsCommand, "build_master_snapshots", Null, Structured, true, false); + REGISTER_ALL(TGetMasterConsistentStateCommand, "get_master_consistent_state", Null, Structured, true, false); REGISTER_ALL(TExitReadOnlyCommand, "exit_read_only", Null, Structured, true, false); REGISTER_ALL(TMasterExitReadOnlyCommand, "master_exit_read_only", Null, Structured, true, false); REGISTER_ALL(TDiscombobulateNonvotingPeersCommand, "discombobulate_nonvoting_peers", Null, Structured, true, false); diff --git a/yt/yt/client/federated/client.cpp b/yt/yt/client/federated/client.cpp index dad29ee6a5..1e43f174a0 100644 --- a/yt/yt/client/federated/client.cpp +++ b/yt/yt/client/federated/client.cpp @@ -386,6 +386,7 @@ public: UNIMPLEMENTED_METHOD(TFuture<void>, CheckClusterLiveness, (const TCheckClusterLivenessOptions&)); UNIMPLEMENTED_METHOD(TFuture<int>, BuildSnapshot, (const TBuildSnapshotOptions&)); UNIMPLEMENTED_METHOD(TFuture<TCellIdToSnapshotIdMap>, BuildMasterSnapshots, (const TBuildMasterSnapshotsOptions&)); + UNIMPLEMENTED_METHOD(TFuture<TCellIdToSequenceNumberMap>, GetMasterConsistentState, (const TGetMasterConsistentStateOptions&)); UNIMPLEMENTED_METHOD(TFuture<void>, ExitReadOnly, (NObjectClient::TCellId, const TExitReadOnlyOptions&)); UNIMPLEMENTED_METHOD(TFuture<void>, MasterExitReadOnly, (const TMasterExitReadOnlyOptions&)); UNIMPLEMENTED_METHOD(TFuture<void>, DiscombobulateNonvotingPeers, (NObjectClient::TCellId, const TDiscombobulateNonvotingPeersOptions&)); diff --git a/yt/yt/client/hedging/hedging.cpp b/yt/yt/client/hedging/hedging.cpp index 0be20d47af..6d0122b584 100644 --- a/yt/yt/client/hedging/hedging.cpp +++ b/yt/yt/client/hedging/hedging.cpp @@ -173,6 +173,7 @@ public: UNSUPPORTED_METHOD(TFuture<void>, CheckClusterLiveness, (const TCheckClusterLivenessOptions&)); UNSUPPORTED_METHOD(TFuture<int>, BuildSnapshot, (const TBuildSnapshotOptions&)); UNSUPPORTED_METHOD(TFuture<TCellIdToSnapshotIdMap>, BuildMasterSnapshots, (const TBuildMasterSnapshotsOptions&)); + UNSUPPORTED_METHOD(TFuture<TCellIdToSequenceNumberMap>, GetMasterConsistentState, (const TGetMasterConsistentStateOptions&)); UNSUPPORTED_METHOD(TFuture<void>, ExitReadOnly, (NObjectClient::TCellId, const TExitReadOnlyOptions&)); UNSUPPORTED_METHOD(TFuture<void>, MasterExitReadOnly, (const TMasterExitReadOnlyOptions&)); UNSUPPORTED_METHOD(TFuture<void>, DiscombobulateNonvotingPeers, (NObjectClient::TCellId, const TDiscombobulateNonvotingPeersOptions&)); diff --git a/yt/yt/client/hive/public.h b/yt/yt/client/hive/public.h index 5e8c8284bc..5143ae3c79 100644 --- a/yt/yt/client/hive/public.h +++ b/yt/yt/client/hive/public.h @@ -34,6 +34,7 @@ DECLARE_REFCOUNTED_STRUCT(ITransactionParticipant) YT_DEFINE_ERROR_ENUM( ((MailboxNotCreatedYet) (2200)) ((ParticipantUnregistered) (2201)) + ((EntryNotFound) (2202)) ); //////////////////////////////////////////////////////////////////////////////// diff --git a/yt/yt/client/security_client/access_control.h b/yt/yt/client/security_client/access_control.h index 009492b189..4152b9f7bc 100644 --- a/yt/yt/client/security_client/access_control.h +++ b/yt/yt/client/security_client/access_control.h @@ -42,6 +42,7 @@ const THashMap<EAccessControlObject, TAccessControlObjectDescriptor> AccessContr ACCESS_CONTROL_ENTRY(EAccessControlObjectNamespace::AdminCommands, EAccessControlObject::ResurrectChunkLocations), ACCESS_CONTROL_ENTRY(EAccessControlObjectNamespace::AdminCommands, EAccessControlObject::BuildSnapshot), ACCESS_CONTROL_ENTRY(EAccessControlObjectNamespace::AdminCommands, EAccessControlObject::BuildMasterSnapshot), + ACCESS_CONTROL_ENTRY(EAccessControlObjectNamespace::AdminCommands, EAccessControlObject::GetMasterConsistentState), ACCESS_CONTROL_ENTRY(EAccessControlObjectNamespace::AdminCommands, EAccessControlObject::ExitReadOnly), ACCESS_CONTROL_ENTRY(EAccessControlObjectNamespace::AdminCommands, EAccessControlObject::MasterExitReadOnly), ACCESS_CONTROL_ENTRY(EAccessControlObjectNamespace::AdminCommands, EAccessControlObject::DiscombobulateNonvotingPeers), diff --git a/yt/yt/client/security_client/public.h b/yt/yt/client/security_client/public.h index 9d6e84df55..43e86b1ef3 100644 --- a/yt/yt/client/security_client/public.h +++ b/yt/yt/client/security_client/public.h @@ -90,6 +90,7 @@ DEFINE_ENUM(EAccessControlObject, (ResurrectChunkLocations) (BuildSnapshot) (BuildMasterSnapshot) + (GetMasterConsistentState) (ExitReadOnly) (MasterExitReadOnly) (DiscombobulateNonvotingPeers) diff --git a/yt/yt/client/table_client/config.cpp b/yt/yt/client/table_client/config.cpp index 863672915b..d30ec5c8d2 100644 --- a/yt/yt/client/table_client/config.cpp +++ b/yt/yt/client/table_client/config.cpp @@ -270,9 +270,6 @@ void TDictionaryCompressionConfig::Register(TRegistrar registrar) registrar.Parameter("column_dictionary_size", &TThis::ColumnDictionarySize) .GreaterThanOrEqual(NCompression::GetDictionaryCompressionCodec()->GetMinDictionarySize()) .Default(32_KB); - registrar.Parameter("compression_level", &TThis::CompressionLevel) - .InRange(1, NCompression::GetDictionaryCompressionCodec()->GetMaxCompressionLevel()) - .Default(NCompression::GetDictionaryCompressionCodec()->GetDefaultCompressionLevel()); registrar.Parameter("applied_policies", &TThis::AppliedPolicies) .Default({ EDictionaryCompressionPolicy::LargeChunkFirst, @@ -285,10 +282,7 @@ void TDictionaryCompressionConfig::Register(TRegistrar registrar) .Default(12_MB); registrar.Parameter("max_acceptable_compression_ratio", &TThis::MaxAcceptableCompressionRatio) .Default(0.7) - .GreaterThanOrEqual(0.); - registrar.Parameter("max_decompression_blob_size", &TThis::MaxDecompressionBlobSize) - .GreaterThan(0) - .Default(64_MB); + .InRange(0, 1); registrar.Postprocessor([] (TThis* config) { if (config->DesiredSampleCount > config->MaxProcessedSampleCount) { @@ -316,6 +310,18 @@ void TDictionaryCompressionConfig::Register(TRegistrar registrar) /////////////////////////////////////////////////////////////////////////////// +void TDictionaryCompressionSessionConfig::Register(TRegistrar registrar) +{ + registrar.Parameter("compression_level", &TThis::CompressionLevel) + .InRange(1, NCompression::GetDictionaryCompressionCodec()->GetMaxCompressionLevel()) + .Default(NCompression::GetDictionaryCompressionCodec()->GetDefaultCompressionLevel()); + registrar.Parameter("max_decompression_blob_size", &TThis::MaxDecompressionBlobSize) + .GreaterThan(0) + .Default(64_MB); +} + +/////////////////////////////////////////////////////////////////////////////// + void TBatchHunkReaderConfig::Register(TRegistrar registrar) { registrar.Parameter("max_hunk_count_per_read", &TThis::MaxHunkCountPerRead) diff --git a/yt/yt/client/table_client/config.h b/yt/yt/client/table_client/config.h index 332285c7b5..4f6ebfbad1 100644 --- a/yt/yt/client/table_client/config.h +++ b/yt/yt/client/table_client/config.h @@ -259,15 +259,36 @@ public: //! Upper limit on acceptable compression ratio. No chunk compression is performed if this limit is exceeded. double MaxAcceptableCompressionRatio; + REGISTER_YSON_STRUCT(TDictionaryCompressionConfig); + + static void Register(TRegistrar registrar); +}; + +DEFINE_REFCOUNTED_TYPE(TDictionaryCompressionConfig) + +//////////////////////////////////////////////////////////////////////////////// + +class TDictionaryCompressionSessionConfig + : public virtual NYTree::TYsonStruct +{ +public: + // Compression session options. + + //! Level of compression algorithm. + //! Applied to digested compression dictionary upon its construction. + int CompressionLevel; + + // Decompression session options. + //! Upper limit on content size of a batch that can be decompressed within a single iteration. i64 MaxDecompressionBlobSize; - REGISTER_YSON_STRUCT(TDictionaryCompressionConfig); + REGISTER_YSON_STRUCT(TDictionaryCompressionSessionConfig); static void Register(TRegistrar registrar); }; -DEFINE_REFCOUNTED_TYPE(TDictionaryCompressionConfig) +DEFINE_REFCOUNTED_TYPE(TDictionaryCompressionSessionConfig) //////////////////////////////////////////////////////////////////////////////// @@ -292,6 +313,7 @@ class TTableReaderConfig , public virtual TChunkReaderConfig , public TBatchHunkReaderConfig , public NChunkClient::TChunkFragmentReaderConfig + , public TDictionaryCompressionSessionConfig { public: bool SuppressAccessTracking; diff --git a/yt/yt/client/table_client/public.h b/yt/yt/client/table_client/public.h index 195ce2e684..ff8ce94195 100644 --- a/yt/yt/client/table_client/public.h +++ b/yt/yt/client/table_client/public.h @@ -352,6 +352,8 @@ DECLARE_REFCOUNTED_CLASS(TDictionaryCompressionConfig) DECLARE_REFCOUNTED_CLASS(TBatchHunkReaderConfig) +DECLARE_REFCOUNTED_CLASS(TDictionaryCompressionSessionConfig) + DECLARE_REFCOUNTED_CLASS(TTableReaderConfig) DECLARE_REFCOUNTED_CLASS(TTableWriterConfig) diff --git a/yt/yt/client/unittests/mock/client.h b/yt/yt/client/unittests/mock/client.h index 4d081f3620..0ab1250f20 100644 --- a/yt/yt/client/unittests/mock/client.h +++ b/yt/yt/client/unittests/mock/client.h @@ -238,6 +238,10 @@ public: const TBuildMasterSnapshotsOptions& options), (override)); + MOCK_METHOD(TFuture<TCellIdToSequenceNumberMap>, GetMasterConsistentState, ( + const TGetMasterConsistentStateOptions& options), + (override)); + MOCK_METHOD(TFuture<void>, ExitReadOnly, ( NHydra::TCellId cellId, const TExitReadOnlyOptions& options), diff --git a/yt/yt/core/concurrency/delayed_executor.cpp b/yt/yt/core/concurrency/delayed_executor.cpp index e7f040bccc..12606becc8 100644 --- a/yt/yt/core/concurrency/delayed_executor.cpp +++ b/yt/yt/core/concurrency/delayed_executor.cpp @@ -172,8 +172,9 @@ private: TPollerThread() : TThread( "DelayedPoller", - NThreading::EThreadPriority::Normal, - /*shutdownPriority*/ 200) + NThreading::TThreadOptions{ + .ShutdownPriority = 200, + }) { } void EnqueueSubmission(TDelayedExecutorEntryPtr entry) diff --git a/yt/yt/core/concurrency/fair_share_thread_pool.cpp b/yt/yt/core/concurrency/fair_share_thread_pool.cpp index 68da649486..37be0d6a54 100644 --- a/yt/yt/core/concurrency/fair_share_thread_pool.cpp +++ b/yt/yt/core/concurrency/fair_share_thread_pool.cpp @@ -471,8 +471,9 @@ public: std::move(callbackEventCount), threadGroupName, threadName, - threadPriority, - /*shutdownPriority*/ 0) + NThreading::TThreadOptions{ + .ThreadPriority = threadPriority, + }) , Queue_(std::move(queue)) , Index_(index) { } @@ -561,14 +562,14 @@ private: TThreadPoolBase::DoConfigure(threadCount); } - TSchedulerThreadBasePtr SpawnThread(int index) override + TSchedulerThreadPtr SpawnThread(int index) override { return New<TFairShareThread>( Queue_, CallbackEventCount_, ThreadNamePrefix_, MakeThreadName(index), - ThreadPriority_, + NThreading::EThreadPriority::Normal, index); } }; diff --git a/yt/yt/core/concurrency/fiber_scheduler_thread.cpp b/yt/yt/core/concurrency/fiber_scheduler_thread.cpp index 7fcb074e84..62c0e8ad68 100644 --- a/yt/yt/core/concurrency/fiber_scheduler_thread.cpp +++ b/yt/yt/core/concurrency/fiber_scheduler_thread.cpp @@ -845,14 +845,14 @@ private: //////////////////////////////////////////////////////////////////////////////// TFiberSchedulerThread::TFiberSchedulerThread( - const TString& threadGroupName, - const TString& threadName, - NThreading::EThreadPriority threadPriority, - int shutdownPriority) - : TThread(threadName, threadPriority, shutdownPriority) - , ThreadGroupName_(threadGroupName) + TString threadGroupName, + TString threadName, + NThreading::TThreadOptions options) + : TThread(std::move(threadName), std::move(options)) + , ThreadGroupName_(std::move(threadGroupName)) { } + void TFiberSchedulerThread::ThreadMain() { // Hold this strongly. diff --git a/yt/yt/core/concurrency/fiber_scheduler_thread.h b/yt/yt/core/concurrency/fiber_scheduler_thread.h index 76144fd791..b368a7910c 100644 --- a/yt/yt/core/concurrency/fiber_scheduler_thread.h +++ b/yt/yt/core/concurrency/fiber_scheduler_thread.h @@ -16,18 +16,17 @@ class TFiberSchedulerThread { public: TFiberSchedulerThread( - const TString& threadGroupName, - const TString& threadName, - NThreading::EThreadPriority threadPriority = NThreading::EThreadPriority::Normal, - int shutdownPriority = 0); + TString threadGroupName, + TString threadName, + NThreading::TThreadOptions options = {}); //! Empty callback signals about stopping. virtual TClosure OnExecute() = 0; private: - void ThreadMain() override; - const TString ThreadGroupName_; + + void ThreadMain() override; }; //////////////////////////////////////////////////////////////////////////////// diff --git a/yt/yt/core/concurrency/new_fair_share_thread_pool.cpp b/yt/yt/core/concurrency/new_fair_share_thread_pool.cpp index f996602e61..fcfc64eec7 100644 --- a/yt/yt/core/concurrency/new_fair_share_thread_pool.cpp +++ b/yt/yt/core/concurrency/new_fair_share_thread_pool.cpp @@ -1307,7 +1307,7 @@ private: TThreadPoolBase::DoConfigure(threadCount); } - TSchedulerThreadBasePtr SpawnThread(int index) override + TSchedulerThreadPtr SpawnThread(int index) override { return New<TFairShareThread>( Queue_, diff --git a/yt/yt/core/concurrency/notify_manager.cpp b/yt/yt/core/concurrency/notify_manager.cpp index 743f06f0ac..596b3bc346 100644 --- a/yt/yt/core/concurrency/notify_manager.cpp +++ b/yt/yt/core/concurrency/notify_manager.cpp @@ -17,7 +17,7 @@ constexpr auto WaitTimeWarningThreshold = TDuration::Seconds(30); TNotifyManager::TNotifyManager( TIntrusivePtr<NThreading::TEventCount> eventCount, const NProfiling::TTagSet& tagSet, - const TDuration pollingPeriod) + TDuration pollingPeriod) : EventCount_(std::move(eventCount)) , WakeupCounter_(NProfiling::TProfiler("/action_queue") .WithTags(tagSet) diff --git a/yt/yt/core/concurrency/notify_manager.h b/yt/yt/core/concurrency/notify_manager.h index f62383e3f2..efb71029d3 100644 --- a/yt/yt/core/concurrency/notify_manager.h +++ b/yt/yt/core/concurrency/notify_manager.h @@ -20,7 +20,7 @@ public: TNotifyManager( TIntrusivePtr<NThreading::TEventCount> eventCount, const NProfiling::TTagSet& counterTagSet, - const TDuration pollingPeriod); + TDuration pollingPeriod); TCpuInstant ResetMinEnqueuedAt(); diff --git a/yt/yt/core/concurrency/private.h b/yt/yt/core/concurrency/private.h index fcbcefe605..5d300edee3 100644 --- a/yt/yt/core/concurrency/private.h +++ b/yt/yt/core/concurrency/private.h @@ -52,7 +52,7 @@ using TMpscSuspendableSingleQueueSchedulerThreadPtr = TIntrusivePtr<TMpscSuspend DECLARE_REFCOUNTED_CLASS(TFiber) -DECLARE_REFCOUNTED_CLASS(TSchedulerThreadBase) +DECLARE_REFCOUNTED_CLASS(TSchedulerThread) DECLARE_REFCOUNTED_CLASS(TFairShareInvokerQueue) DECLARE_REFCOUNTED_CLASS(TFairShareQueueSchedulerThread) diff --git a/yt/yt/core/concurrency/quantized_executor.cpp b/yt/yt/core/concurrency/quantized_executor.cpp index 93893e2b14..4487a6a205 100644 --- a/yt/yt/core/concurrency/quantized_executor.cpp +++ b/yt/yt/core/concurrency/quantized_executor.cpp @@ -21,28 +21,18 @@ public: TQuantizedExecutor( TString name, ICallbackProviderPtr callbackProvider, - int workerCount) + const TQuantizedExecutorOptions& options) : Name_(std::move(name)) + , CallbackProvider_(std::move(callbackProvider)) + , Options_(options) , Logger(ConcurrencyLogger.WithTag("Executor: %v", Name_)) , ControlQueue_(New<TActionQueue>(Format("%vCtl", Name_))) , ControlInvoker_(ControlQueue_->GetInvoker()) - , CallbackProvider_(std::move(callbackProvider)) - , DesiredWorkerCount_(workerCount) + , DesiredWorkerCount_(options.WorkerCount) { VERIFY_INVOKER_THREAD_AFFINITY(ControlInvoker_, ControlThread); } - void Initialize(TCallback<void()> workerInitializer) override - { - WorkerInitializer_ = std::move(workerInitializer); - - BIND(&TQuantizedExecutor::DoReconfigure, MakeStrong(this)) - .AsyncVia(ControlInvoker_) - .Run() - .Get() - .ThrowOnError(); - } - TFuture<void> Run(TDuration timeout) override { VERIFY_THREAD_AFFINITY_ANY(); @@ -56,11 +46,13 @@ public: { VERIFY_THREAD_AFFINITY_ANY(); - DesiredWorkerCount_ = workerCount; + DesiredWorkerCount_.store(workerCount); } private: const TString Name_; + const ICallbackProviderPtr CallbackProvider_; + const TQuantizedExecutorOptions Options_; const TLogger Logger; @@ -68,9 +60,6 @@ private: const IInvokerPtr ControlInvoker_; YT_DECLARE_SPIN_LOCK(NThreading::TSpinLock, CallbackProviderLock_); - ICallbackProviderPtr CallbackProvider_; - - TCallback<void()> WorkerInitializer_; std::vector<ISuspendableActionQueuePtr> Workers_; std::vector<IInvokerPtr> Invokers_; @@ -106,21 +95,17 @@ private: Workers_.reserve(desiredWorkerCount); Invokers_.reserve(desiredWorkerCount); for (int index = currentWorkerCount; index < desiredWorkerCount; ++index) { - auto worker = CreateSuspendableActionQueue(Format("%v:%v", Name_, index)); + auto worker = CreateSuspendableActionQueue( + /*threadName*/ Format("%v:%v", Name_, index), + {.ThreadInitializer = Options_.ThreadInitializer}); // NB: #GetInvoker initializes queue. Invokers_.push_back(worker->GetInvoker()); - if (WorkerInitializer_) { - BIND(WorkerInitializer_) - .AsyncVia(Invokers_.back()) - .Run() - .Get(); - } worker->Suspend(/*immediately*/ true) .Get() .ThrowOnError(); - Workers_.emplace_back(std::move(worker)); + Workers_.push_back(std::move(worker)); } } @@ -274,12 +259,12 @@ private: IQuantizedExecutorPtr CreateQuantizedExecutor( TString name, ICallbackProviderPtr callbackProvider, - int workerCount) + TQuantizedExecutorOptions options) { return New<TQuantizedExecutor>( std::move(name), std::move(callbackProvider), - workerCount); + std::move(options)); } //////////////////////////////////////////////////////////////////////////////// diff --git a/yt/yt/core/concurrency/quantized_executor.h b/yt/yt/core/concurrency/quantized_executor.h index bea5a4f64c..bc6c78bad1 100644 --- a/yt/yt/core/concurrency/quantized_executor.h +++ b/yt/yt/core/concurrency/quantized_executor.h @@ -24,8 +24,6 @@ DEFINE_REFCOUNTED_TYPE(ICallbackProvider) struct IQuantizedExecutor : public TRefCounted { - virtual void Initialize(TCallback<void()> workerInitializer = {}) = 0; - //! Starts new quantum of time, returns a future that becomes set //! when quantum ends. /*! @@ -38,7 +36,7 @@ struct IQuantizedExecutor */ virtual TFuture<void> Run(TDuration timeout) = 0; - //! Updates number of workers. + //! Updates the number of workers. virtual void Reconfigure(int workerCount) = 0; }; @@ -46,10 +44,16 @@ DEFINE_REFCOUNTED_TYPE(IQuantizedExecutor) //////////////////////////////////////////////////////////////////////////////// +struct TQuantizedExecutorOptions +{ + int WorkerCount = 1; + std::function<void()> ThreadInitializer; +}; + IQuantizedExecutorPtr CreateQuantizedExecutor( TString name, ICallbackProviderPtr callbackProvider, - int workerCount); + TQuantizedExecutorOptions options = {}); //////////////////////////////////////////////////////////////////////////////// diff --git a/yt/yt/core/concurrency/scheduler_thread.cpp b/yt/yt/core/concurrency/scheduler_thread.cpp index 7d89333427..b4a2999bee 100644 --- a/yt/yt/core/concurrency/scheduler_thread.cpp +++ b/yt/yt/core/concurrency/scheduler_thread.cpp @@ -11,59 +11,59 @@ using namespace NProfiling; //////////////////////////////////////////////////////////////////////////////// -TSchedulerThreadBase::~TSchedulerThreadBase() -{ - Stop(); -} - -TSchedulerThreadBase::TSchedulerThreadBase( +TSchedulerThread::TSchedulerThread( TIntrusivePtr<NThreading::TEventCount> callbackEventCount, - const TString& threadGroupName, - const TString& threadName, - NThreading::EThreadPriority threadPriority, - int shutdownPriority) + TString threadGroupName, + TString threadName, + NThreading::TThreadOptions options) : TFiberSchedulerThread( - threadGroupName, - threadName, - threadPriority, - shutdownPriority) + std::move(threadGroupName), + std::move(threadName), + std::move(options)) , CallbackEventCount_(std::move(callbackEventCount)) { } -void TSchedulerThreadBase::OnStart() +TSchedulerThread::~TSchedulerThread() +{ + Stop(); +} + +void TSchedulerThread::OnStart() { } -void TSchedulerThreadBase::OnStop() +void TSchedulerThread::OnStop() { } -void TSchedulerThreadBase::Stop(bool graceful) +void TSchedulerThread::Stop(bool graceful) { GracefulStop_.store(graceful); TThread::Stop(); } -void TSchedulerThreadBase::Stop() +void TSchedulerThread::Stop() { + TFiberSchedulerThread::Stop(); TThread::Stop(); } -void TSchedulerThreadBase::StartEpilogue() +void TSchedulerThread::StartEpilogue() { + TFiberSchedulerThread::StartEpilogue(); OnStart(); } -void TSchedulerThreadBase::StopPrologue() +void TSchedulerThread::StopPrologue() { + TFiberSchedulerThread::StopPrologue(); CallbackEventCount_->NotifyAll(); } -void TSchedulerThreadBase::StopEpilogue() +void TSchedulerThread::StopEpilogue() { + TFiberSchedulerThread::StopEpilogue(); OnStop(); } -//////////////////////////////////////////////////////////////////////////////// - TClosure TSchedulerThread::OnExecute() { EndExecute(); diff --git a/yt/yt/core/concurrency/scheduler_thread.h b/yt/yt/core/concurrency/scheduler_thread.h index 7cf4ee151b..c4953c9bd1 100644 --- a/yt/yt/core/concurrency/scheduler_thread.h +++ b/yt/yt/core/concurrency/scheduler_thread.h @@ -6,45 +6,29 @@ namespace NYT::NConcurrency { //////////////////////////////////////////////////////////////////////////////// -class TSchedulerThreadBase +class TSchedulerThread : public TFiberSchedulerThread { public: - ~TSchedulerThreadBase(); - void Stop(bool graceful); void Stop(); protected: const TIntrusivePtr<NThreading::TEventCount> CallbackEventCount_; + std::atomic<bool> GracefulStop_ = false; - TSchedulerThreadBase( + TSchedulerThread( TIntrusivePtr<NThreading::TEventCount> callbackEventCount, - const TString& threadGroupName, - const TString& threadName, - NThreading::EThreadPriority threadPriority = NThreading::EThreadPriority::Normal, - int shutdownPriority = 0); + TString threadGroupName, + TString threadName, + NThreading::TThreadOptions options = {}); + + ~TSchedulerThread(); virtual void OnStart(); virtual void OnStop(); -private: - void StartEpilogue() override; - void StopPrologue() override; - void StopEpilogue() override; -}; - -DEFINE_REFCOUNTED_TYPE(TSchedulerThreadBase) - -//////////////////////////////////////////////////////////////////////////////// - -class TSchedulerThread - : public TSchedulerThreadBase -{ -protected: - using TSchedulerThreadBase::TSchedulerThreadBase; - TClosure OnExecute() override; virtual TClosure BeginExecute() = 0; @@ -57,8 +41,14 @@ private: void MaybeRunMaintenance(TCpuInstant now); void RunMaintenance(); + + void StartEpilogue() override; + void StopPrologue() override; + void StopEpilogue() override; }; +DEFINE_REFCOUNTED_TYPE(TSchedulerThread) + //////////////////////////////////////////////////////////////////////////////// } //namespace NYT::NConcurrency diff --git a/yt/yt/core/concurrency/single_queue_scheduler_thread.cpp b/yt/yt/core/concurrency/single_queue_scheduler_thread.cpp index 0b66712d25..df1a18964b 100644 --- a/yt/yt/core/concurrency/single_queue_scheduler_thread.cpp +++ b/yt/yt/core/concurrency/single_queue_scheduler_thread.cpp @@ -9,16 +9,14 @@ template <class TQueueImpl> TSingleQueueSchedulerThread<TQueueImpl>::TSingleQueueSchedulerThread( TInvokerQueuePtr<TQueueImpl> queue, TIntrusivePtr<NThreading::TEventCount> callbackEventCount, - const TString& threadGroupName, - const TString& threadName, - NThreading::EThreadPriority threadPriority, - int shutdownPriority) + TString threadGroupName, + TString threadName, + NThreading::TThreadOptions options) : TSchedulerThread( std::move(callbackEventCount), - threadGroupName, - threadName, - threadPriority, - shutdownPriority) + std::move(threadGroupName), + std::move(threadName), + std::move(options)) , Queue_(std::move(queue)) , Token_(Queue_->MakeConsumerToken()) { } @@ -52,12 +50,14 @@ template <class TQueueImpl> TSuspendableSingleQueueSchedulerThread<TQueueImpl>::TSuspendableSingleQueueSchedulerThread( TInvokerQueuePtr<TQueueImpl> queue, TIntrusivePtr<NThreading::TEventCount> callbackEventCount, - const TString& threadGroupName, - const TString& threadName) + TString threadGroupName, + TString threadName, + NThreading::TThreadOptions options) : TSchedulerThread( std::move(callbackEventCount), - threadGroupName, - threadName) + std::move(threadGroupName), + std::move(threadName), + std::move(options)) , Queue_(std::move(queue)) , Token_(Queue_->MakeConsumerToken()) { } diff --git a/yt/yt/core/concurrency/single_queue_scheduler_thread.h b/yt/yt/core/concurrency/single_queue_scheduler_thread.h index 8ffe2232cc..2adffcde9d 100644 --- a/yt/yt/core/concurrency/single_queue_scheduler_thread.h +++ b/yt/yt/core/concurrency/single_queue_scheduler_thread.h @@ -18,10 +18,9 @@ public: TSingleQueueSchedulerThread( TInvokerQueuePtr<TQueueImpl> queue, TIntrusivePtr<NThreading::TEventCount> callbackEventCount, - const TString& threadGroupName, - const TString& threadName, - NThreading::EThreadPriority threadPriority = NThreading::EThreadPriority::Normal, - int shutdownPriority = 0); + TString threadGroupName, + TString threadName, + NThreading::TThreadOptions options = {}); protected: const TInvokerQueuePtr<TQueueImpl> Queue_; @@ -45,8 +44,9 @@ public: TSuspendableSingleQueueSchedulerThread( TInvokerQueuePtr<TQueueImpl> queue, TIntrusivePtr<NThreading::TEventCount> callbackEventCount, - const TString& threadGroupName, - const TString& threadName); + TString threadGroupName, + TString threadName, + NThreading::TThreadOptions options); TFuture<void> Suspend(bool immediately); diff --git a/yt/yt/core/concurrency/suspendable_action_queue.cpp b/yt/yt/core/concurrency/suspendable_action_queue.cpp index 85b1c9aacf..c5e6bed21f 100644 --- a/yt/yt/core/concurrency/suspendable_action_queue.cpp +++ b/yt/yt/core/concurrency/suspendable_action_queue.cpp @@ -14,7 +14,9 @@ class TSuspendableActionQueue : public ISuspendableActionQueue { public: - explicit TSuspendableActionQueue(const TString& threadName) + TSuspendableActionQueue( + TString threadName, + TSuspendableActionQueueOptions options) : Queue_(New<TMpscInvokerQueue>( CallbackEventCount_, GetThreadTags(threadName))) @@ -23,7 +25,10 @@ public: Queue_, CallbackEventCount_, threadName, - threadName)) + threadName, + NThreading::TThreadOptions{ + .ThreadInitializer = options.ThreadInitializer, + })) , ShutdownCookie_(RegisterShutdownCallback( Format("SuspendableActionQueue(%v)", threadName), BIND_NO_PROPAGATE(&TSuspendableActionQueue::Shutdown, MakeWeak(this), /*graceful*/ false), @@ -71,6 +76,7 @@ public: } private: + const TSuspendableActionQueueOptions Options_; const TIntrusivePtr<NThreading::TEventCount> CallbackEventCount_ = New<NThreading::TEventCount>(); const TMpscInvokerQueuePtr Queue_; const IInvokerPtr Invoker_; @@ -96,9 +102,13 @@ private: //////////////////////////////////////////////////////////////////////////////// -ISuspendableActionQueuePtr CreateSuspendableActionQueue(const TString& threadName) +ISuspendableActionQueuePtr CreateSuspendableActionQueue( + TString threadName, + TSuspendableActionQueueOptions options) { - return New<TSuspendableActionQueue>(threadName); + return New<TSuspendableActionQueue>( + std::move(threadName), + std::move(options)); } //////////////////////////////////////////////////////////////////////////////// diff --git a/yt/yt/core/concurrency/suspendable_action_queue.h b/yt/yt/core/concurrency/suspendable_action_queue.h index 0909aa0b49..4c8f4173fe 100644 --- a/yt/yt/core/concurrency/suspendable_action_queue.h +++ b/yt/yt/core/concurrency/suspendable_action_queue.h @@ -29,7 +29,14 @@ DEFINE_REFCOUNTED_TYPE(ISuspendableActionQueue) //////////////////////////////////////////////////////////////////////////////// -ISuspendableActionQueuePtr CreateSuspendableActionQueue(const TString& threadName); +struct TSuspendableActionQueueOptions +{ + std::function<void()> ThreadInitializer; +}; + +ISuspendableActionQueuePtr CreateSuspendableActionQueue( + TString threadName, + TSuspendableActionQueueOptions options = {}); //////////////////////////////////////////////////////////////////////////////// diff --git a/yt/yt/core/concurrency/system_invokers.cpp b/yt/yt/core/concurrency/system_invokers.cpp index c3c389cdd4..f336b4d503 100644 --- a/yt/yt/core/concurrency/system_invokers.cpp +++ b/yt/yt/core/concurrency/system_invokers.cpp @@ -27,8 +27,9 @@ public: CallbackEventCount_, threadName, threadName, - NThreading::EThreadPriority::Normal, - /*shutdownPriority*/ shutdownPriority - 1)) + NThreading::TThreadOptions{ + .ShutdownPriority = shutdownPriority - 1, + })) , ShutdownCookie_(RegisterShutdownCallback( Format("SystemInvokerThread:%v", threadName), BIND_NO_PROPAGATE(&TSystemInvokerThread::Shutdown, this), diff --git a/yt/yt/core/concurrency/thread_pool.cpp b/yt/yt/core/concurrency/thread_pool.cpp index b2975b4baa..aa56320bfd 100644 --- a/yt/yt/core/concurrency/thread_pool.cpp +++ b/yt/yt/core/concurrency/thread_pool.cpp @@ -28,7 +28,7 @@ public: TInvokerQueueAdapter( TIntrusivePtr<NThreading::TEventCount> callbackEventCount, const TTagSet& counterTagSet, - const TDuration pollingPeriod) + TDuration pollingPeriod) : TMpmcInvokerQueue(callbackEventCount, counterTagSet) , TNotifyManager(callbackEventCount, counterTagSet, pollingPeriod) { } @@ -97,18 +97,22 @@ public: TIntrusivePtr<NThreading::TEventCount> callbackEventCount, const TString& threadGroupName, const TString& threadName, - NThreading::EThreadPriority threadPriority) + const TThreadPoolOptions& options) : TSchedulerThread( callbackEventCount, threadGroupName, threadName, - threadPriority, - /*shutdownPriority*/ 0) + NThreading::TThreadOptions{ + .ThreadPriority = options.ThreadPriority, + .ThreadInitializer = options.ThreadInitializer, + }) , Queue_(std::move(queue)) + , Options_(options) { } protected: const TIntrusivePtr<TInvokerQueueAdapter> Queue_; + const TThreadPoolOptions Options_; TEnqueuedAction CurrentAction_; @@ -141,13 +145,13 @@ public: TThreadPool( int threadCount, const TString& threadNamePrefix, - NThreading::EThreadPriority threadPriority, - const TDuration pollingPeriod) - : TThreadPoolBase(threadNamePrefix, threadPriority) + const TThreadPoolOptions& options) + : TThreadPoolBase(threadNamePrefix) + , Options_(options) , Queue_(New<TInvokerQueueAdapter>( CallbackEventCount_, GetThreadTags(ThreadNamePrefix_), - pollingPeriod)) + options.PollingPeriod)) , Invoker_(Queue_) { Configure(threadCount); @@ -180,6 +184,7 @@ public: } private: + const TThreadPoolOptions Options_; const TIntrusivePtr<NThreading::TEventCount> CallbackEventCount_ = New<NThreading::TEventCount>(); const TIntrusivePtr<TInvokerQueueAdapter> Queue_; const IInvokerPtr Invoker_; @@ -198,14 +203,14 @@ private: }); } - TSchedulerThreadBasePtr SpawnThread(int index) override + TSchedulerThreadPtr SpawnThread(int index) override { return New<TThreadPoolThread>( Queue_, CallbackEventCount_, ThreadNamePrefix_, MakeThreadName(index), - ThreadPriority_); + Options_); } }; @@ -214,14 +219,12 @@ private: IThreadPoolPtr CreateThreadPool( int threadCount, const TString& threadNamePrefix, - NThreading::EThreadPriority threadPriority, - TDuration pollingPeriod) + const TThreadPoolOptions& options) { return New<TThreadPool>( threadCount, threadNamePrefix, - threadPriority, - pollingPeriod); + options); } //////////////////////////////////////////////////////////////////////////////// diff --git a/yt/yt/core/concurrency/thread_pool.h b/yt/yt/core/concurrency/thread_pool.h index 2792f9a876..9c1bbfd782 100644 --- a/yt/yt/core/concurrency/thread_pool.h +++ b/yt/yt/core/concurrency/thread_pool.h @@ -13,23 +13,38 @@ namespace NYT::NConcurrency { struct IThreadPool : public virtual TRefCounted { + //! Terminates all the threads. virtual void Shutdown() = 0; - //! Returns current thread count, it can differ from value set by Configure() - //! because it clamped between 1 and maximum thread count. + //! Returns the current thread count. + /*! + * This can differ from value set by #Configure + * because it clamped between 1 and the maximum thread count. + */ virtual int GetThreadCount() = 0; + + //! Enables changing thread pool configuration at runtime. virtual void Configure(int threadCount) = 0; + //! Returns the invoker for enqueuing callbacks into the thread pool. virtual const IInvokerPtr& GetInvoker() = 0; }; DEFINE_REFCOUNTED_TYPE(IThreadPool) +//////////////////////////////////////////////////////////////////////////////// + +struct TThreadPoolOptions +{ + NThreading::EThreadPriority ThreadPriority = NThreading::EThreadPriority::Normal; + TDuration PollingPeriod = TDuration::MilliSeconds(10); + std::function<void()> ThreadInitializer; +}; + IThreadPoolPtr CreateThreadPool( int threadCount, const TString& threadNamePrefix, - NThreading::EThreadPriority threadPriority = NThreading::EThreadPriority::Normal, - TDuration pollingPeriod = TDuration::MilliSeconds(10)); + const TThreadPoolOptions& options = {}); //////////////////////////////////////////////////////////////////////////////// diff --git a/yt/yt/core/concurrency/thread_pool_detail.cpp b/yt/yt/core/concurrency/thread_pool_detail.cpp index 90b56511b6..9e2b0ee1b2 100644 --- a/yt/yt/core/concurrency/thread_pool_detail.cpp +++ b/yt/yt/core/concurrency/thread_pool_detail.cpp @@ -13,11 +13,8 @@ static const auto& Logger = ConcurrencyLogger; //////////////////////////////////////////////////////////////////////////////// -TThreadPoolBase::TThreadPoolBase( - TString threadNamePrefix, - NThreading::EThreadPriority threadPriority) +TThreadPoolBase::TThreadPoolBase(TString threadNamePrefix) : ThreadNamePrefix_(std::move(threadNamePrefix)) - , ThreadPriority_(threadPriority) , ShutdownCookie_(RegisterShutdownCallback( Format("ThreadPool(%v)", ThreadNamePrefix_), BIND_NO_PROPAGATE(&TThreadPoolBase::Shutdown, MakeWeak(this)), diff --git a/yt/yt/core/concurrency/thread_pool_detail.h b/yt/yt/core/concurrency/thread_pool_detail.h index 412208a9bc..14a306d116 100644 --- a/yt/yt/core/concurrency/thread_pool_detail.h +++ b/yt/yt/core/concurrency/thread_pool_detail.h @@ -16,9 +16,7 @@ class TThreadPoolBase public: static constexpr int MaxThreadCount = 64; - explicit TThreadPoolBase( - TString threadNamePrefix, - NThreading::EThreadPriority threadPriority = NThreading::EThreadPriority::Normal); + explicit TThreadPoolBase(TString threadNamePrefix); void Configure(int threadCount); void Shutdown(); @@ -28,7 +26,6 @@ public: protected: const TString ThreadNamePrefix_; - const NThreading::EThreadPriority ThreadPriority_; const TShutdownCookie ShutdownCookie_; @@ -37,7 +34,7 @@ protected: std::atomic<bool> ShutdownFlag_ = false; YT_DECLARE_SPIN_LOCK(NThreading::TSpinLock, SpinLock_); - std::vector<TSchedulerThreadBasePtr> Threads_; + std::vector<TSchedulerThreadPtr> Threads_; void Resize(); @@ -48,7 +45,7 @@ protected: virtual TClosure MakeFinalizerCallback(); virtual void DoConfigure(int threadCount); - virtual TSchedulerThreadBasePtr SpawnThread(int index) = 0; + virtual TSchedulerThreadPtr SpawnThread(int index) = 0; }; //////////////////////////////////////////////////////////////////////////////// diff --git a/yt/yt/core/concurrency/two_level_fair_share_thread_pool.cpp b/yt/yt/core/concurrency/two_level_fair_share_thread_pool.cpp index d554844235..4711db5fb3 100644 --- a/yt/yt/core/concurrency/two_level_fair_share_thread_pool.cpp +++ b/yt/yt/core/concurrency/two_level_fair_share_thread_pool.cpp @@ -711,7 +711,7 @@ private: TThreadPoolBase::DoConfigure(threadCount); } - TSchedulerThreadBasePtr SpawnThread(int index) override + TSchedulerThreadPtr SpawnThread(int index) override { return New<TFairShareThread>( Queue_, diff --git a/yt/yt/core/concurrency/unittests/quantized_executor_ut.cpp b/yt/yt/core/concurrency/unittests/quantized_executor_ut.cpp index 3f7c7df097..08dd20cc5c 100644 --- a/yt/yt/core/concurrency/unittests/quantized_executor_ut.cpp +++ b/yt/yt/core/concurrency/unittests/quantized_executor_ut.cpp @@ -48,37 +48,36 @@ class TInitializingCallbackProvider : public ICallbackProvider { public: - TCallback<void()> GetInitializer() + std::function<void()> GetInitializer() { - return BIND([this, this_ = MakeStrong(this)] { - Initialized_ = true; - }); + return [this, this_ = MakeStrong(this)] { + Initialized_.store(true); + }; } TCallback<void()> ExtractCallback() override { - if (Finished_) { + if (IsFinished()) { return {}; - } else { - return BIND([this, this_ = MakeStrong(this)] { - Finished_ = true; - }); } + return BIND([this, this_ = MakeStrong(this)] { + Finished_.store(true); + }); } bool IsInitialized() const { - return Initialized_; + return Initialized_.load(); } bool IsFinished() const { - return Finished_; + return Finished_.load(); } private: - bool Initialized_ = false; - bool Finished_ = false; + std::atomic<bool> Initialized_ = false; + std::atomic<bool> Finished_ = false; }; class TLongCallbackProvider @@ -131,15 +130,13 @@ protected: void InitSimple(int workerCount, i64 iterationCount) { SimpleCallbackProvider_ = New<TSimpleCallbackProvider>(iterationCount); - Executor_ = CreateQuantizedExecutor("test", SimpleCallbackProvider_, workerCount); - Executor_->Initialize(); + Executor_ = CreateQuantizedExecutor("test", SimpleCallbackProvider_, {.WorkerCount = workerCount}); } void InitLong(int workerCount, i64 iterationCount) { LongCallbackProvider_ = New<TLongCallbackProvider>(iterationCount); - Executor_ = CreateQuantizedExecutor("test", LongCallbackProvider_, workerCount); - Executor_->Initialize(); + Executor_ = CreateQuantizedExecutor("test", LongCallbackProvider_, {.WorkerCount = workerCount}); } }; @@ -226,13 +223,10 @@ TEST_F(TQuantizedExecutorTest, Reconfigure) TEST_F(TQuantizedExecutorTest, WorkerInitializer) { auto callbackProvider = New<TInitializingCallbackProvider>(); - EXPECT_FALSE(callbackProvider->IsInitialized()); EXPECT_FALSE(callbackProvider->IsFinished()); - Executor_ = CreateQuantizedExecutor("test", callbackProvider, /*workerCount*/ 1); - Executor_->Initialize(callbackProvider->GetInitializer()); + Executor_ = CreateQuantizedExecutor("test", callbackProvider, {.ThreadInitializer = callbackProvider->GetInitializer()}); - EXPECT_TRUE(callbackProvider->IsInitialized()); EXPECT_FALSE(callbackProvider->IsFinished()); WaitFor(Executor_->Run(TDuration::MilliSeconds(300))) diff --git a/yt/yt/core/logging/log_manager.cpp b/yt/yt/core/logging/log_manager.cpp index 1481e4053d..00bfe4cb1b 100644 --- a/yt/yt/core/logging/log_manager.cpp +++ b/yt/yt/core/logging/log_manager.cpp @@ -717,8 +717,9 @@ private: owner->EventCount_, "Logging", "Logging", - /*threadPriority*/ NThreading::EThreadPriority::Normal, - /*shutdownPriority*/ 200) + NThreading::TThreadOptions{ + .ShutdownPriority = 200, + }) , Owner_(owner) { } diff --git a/yt/yt/core/misc/utf8_decoder.cpp b/yt/yt/core/misc/utf8_decoder.cpp index 6acb70b69b..0767a3d901 100644 --- a/yt/yt/core/misc/utf8_decoder.cpp +++ b/yt/yt/core/misc/utf8_decoder.cpp @@ -66,7 +66,7 @@ TStringBuf TUtf8Transcoder::Decode(TStringBuf str) i += 1; } else { THROW_ERROR_EXCEPTION("Unicode symbols with codes greater than 255 are not supported. " - "Please refer to https://wiki.yandex-team.ru/yt/userdoc/formats/#json and " + "Please refer to https://ytsaurus.tech/docs/en/user-guide/storage/formats#json and " "consider using encode_utf8=false in format options"); } } diff --git a/yt/yt/core/rpc/grpc/dispatcher.cpp b/yt/yt/core/rpc/grpc/dispatcher.cpp index 8c1b959312..09ce5386dc 100644 --- a/yt/yt/core/rpc/grpc/dispatcher.cpp +++ b/yt/yt/core/rpc/grpc/dispatcher.cpp @@ -88,8 +88,7 @@ private: TDispatcherThread(TGrpcLibraryLockPtr libraryLock, int index) : TThread( Format("Grpc/%v", index), - NThreading::EThreadPriority::Normal, - GrpcDispatcherThreadShutdownPriority) + {.ShutdownPriority = GrpcDispatcherThreadShutdownPriority}) , LibraryLock_(std::move(libraryLock)) , GuardedCompletionQueue_(TGrpcCompletionQueuePtr(grpc_completion_queue_create_for_next(nullptr))) { diff --git a/yt/yt/core/threading/thread.cpp b/yt/yt/core/threading/thread.cpp index 8748862650..7bd7049493 100644 --- a/yt/yt/core/threading/thread.cpp +++ b/yt/yt/core/threading/thread.cpp @@ -29,11 +29,9 @@ static const auto& Logger = ThreadingLogger; TThread::TThread( TString threadName, - EThreadPriority threadPriority, - int shutdownPriority) + TThreadOptions options) : ThreadName_(std::move(threadName)) - , ThreadPriority_(threadPriority) - , ShutdownPriority_(shutdownPriority) + , Options_(std::move(options)) , UniqueThreadId_(++UniqueThreadIdGenerator) , UnderlyingThread_(&StaticThreadMainTrampoline, this) { } @@ -69,7 +67,7 @@ bool TThread::StartSlow() ShutdownCookie_ = RegisterShutdownCallback( Format("Thread(%v)", ThreadName_), BIND_NO_PROPAGATE(&TThread::Stop, MakeWeak(this)), - ShutdownPriority_); + Options_.ShutdownPriority); if (!ShutdownCookie_) { Stopping_ = true; return false; @@ -237,6 +235,10 @@ void TThread::ThreadMainTrampoline() YT_THREAD_LOCAL(TExitInterceptor) Interceptor; + if (Options_.ThreadInitializer) { + Options_.ThreadInitializer(); + } + ThreadMain(); GetTlsRef(Interceptor).Disarm(); @@ -261,7 +263,7 @@ void TThread::SetThreadPriority() YT_VERIFY(ThreadId_ != InvalidThreadId); #ifdef _linux_ - if (ThreadPriority_ == EThreadPriority::RealTime) { + if (Options_.ThreadPriority == EThreadPriority::RealTime) { struct sched_param param{ .sched_priority = 1 }; @@ -275,7 +277,7 @@ void TThread::SetThreadPriority() } } #else - Y_UNUSED(ThreadPriority_); + Y_UNUSED(Options_); Y_UNUSED(Logger); #endif } diff --git a/yt/yt/core/threading/thread.h b/yt/yt/core/threading/thread.h index 2599f18db2..ac9c9885ec 100644 --- a/yt/yt/core/threading/thread.h +++ b/yt/yt/core/threading/thread.h @@ -13,6 +13,13 @@ namespace NYT::NThreading { //////////////////////////////////////////////////////////////////////////////// +struct TThreadOptions +{ + NThreading::EThreadPriority ThreadPriority = NThreading::EThreadPriority::Normal; + std::function<void()> ThreadInitializer; + int ShutdownPriority = 0; +}; + //! A shutdown-aware thread wrapper. class TThread : public virtual TRefCounted @@ -20,8 +27,7 @@ class TThread public: explicit TThread( TString threadName, - EThreadPriority threadPriority = EThreadPriority::Normal, - int shutdownPriority = 0); + TThreadOptions options = {}); ~TThread(); //! Ensures the thread is started. @@ -55,8 +61,7 @@ protected: private: const TString ThreadName_; - const EThreadPriority ThreadPriority_; - const int ShutdownPriority_; + const TThreadOptions Options_; const TThreadId UniqueThreadId_; diff --git a/yt/yt/core/ytree/virtual.cpp b/yt/yt/core/ytree/virtual.cpp index 9a1c475cfe..4cff5fa7bb 100644 --- a/yt/yt/core/ytree/virtual.cpp +++ b/yt/yt/core/ytree/virtual.cpp @@ -66,7 +66,7 @@ void ExecuteBatchRead( auto batchFuture = BIND([writeItems, batchWriter, batchIndexRange = batchIndexRanges[index]] { writeItems(batchIndexRange, batchWriter); }) - .AsyncVia(NRpc::TDispatcher::Get()->GetHeavyInvoker()) + .AsyncVia(offloadParams->OffloadInvoker) .Run(); batchFutures.push_back(std::move(batchFuture)); } diff --git a/yt/yt/core/ytree/virtual.h b/yt/yt/core/ytree/virtual.h index 74a88f0dc5..0ed73bc9ad 100644 --- a/yt/yt/core/ytree/virtual.h +++ b/yt/yt/core/ytree/virtual.h @@ -12,6 +12,7 @@ namespace NYT::NYTree { struct TVirtualCompositeNodeReadOffloadParams { + IInvokerPtr OffloadInvoker; NConcurrency::EWaitForStrategy WaitForStrategy = NConcurrency::EWaitForStrategy::WaitFor; i64 BatchSize = 10'000; }; |