diff options
author | Alexander Smirnov <alex@ydb.tech> | 2025-02-13 18:38:05 +0000 |
---|---|---|
committer | Alexander Smirnov <alex@ydb.tech> | 2025-02-13 18:38:05 +0000 |
commit | 28180f60aec6dcb2b662b6417c90226553ebe2dc (patch) | |
tree | 9ca4d2b0ea989b075f60d2746159e891c1aa77f7 /yt | |
parent | 09744cf9fbdd1cd31f648b5fabc8a9ed09875e3b (diff) | |
parent | 36161988ade9e56ec69a44ba4ff084ede6e44ee7 (diff) | |
download | ydb-28180f60aec6dcb2b662b6417c90226553ebe2dc.tar.gz |
Merge pull request #14512 from ydb-platform/merge-libs-250213-0050
Diffstat (limited to 'yt')
25 files changed, 473 insertions, 31 deletions
diff --git a/yt/cpp/mapreduce/client/operation.cpp b/yt/cpp/mapreduce/client/operation.cpp index c8cbe5b644..76a55b436f 100644 --- a/yt/cpp/mapreduce/client/operation.cpp +++ b/yt/cpp/mapreduce/client/operation.cpp @@ -879,6 +879,12 @@ void BuildCommonOperationPart( if (baseSpec.MaxFailedJobCount_.Defined()) { (*specNode)["max_failed_job_count"] = *baseSpec.MaxFailedJobCount_; } + if (baseSpec.Description_.Defined()) { + (*specNode)["description"] = *baseSpec.Description_; + } + if (baseSpec.Annotations_.Defined()) { + (*specNode)["annotations"] = *baseSpec.Annotations_; + } } template <typename TSpec> diff --git a/yt/cpp/mapreduce/interface/operation.h b/yt/cpp/mapreduce/interface/operation.h index 218ead0572..c1beb5375d 100644 --- a/yt/cpp/mapreduce/interface/operation.h +++ b/yt/cpp/mapreduce/interface/operation.h @@ -542,6 +542,12 @@ struct TOperationSpecBase /// How many jobs can fail before operation is failed. FLUENT_FIELD_OPTION(ui64, MaxFailedJobCount); + + // Arbitrary structured information related to the operation. + FLUENT_FIELD_OPTION(TNode, Annotations); + + // Similar to Annotations, shown on the operation page. Recommends concise, human-readable entries to prevent clutter. + FLUENT_FIELD_OPTION(TNode, Description); }; /// diff --git a/yt/yql/providers/yt/lib/mkql_helpers/mkql_helpers.cpp b/yt/yql/providers/yt/lib/mkql_helpers/mkql_helpers.cpp index bc307c3178..7916821ca4 100644 --- a/yt/yql/providers/yt/lib/mkql_helpers/mkql_helpers.cpp +++ b/yt/yql/providers/yt/lib/mkql_helpers/mkql_helpers.cpp @@ -69,7 +69,11 @@ void TRecordsRange::Fill(const TExprNode& settingsNode) { if (settingName != TStringBuf("take") && settingName != TStringBuf("skip")) { continue; } - YQL_ENSURE(setting->Child(1)->IsCallable("Uint64")); + if (!setting->Child(1)->IsCallable("Uint64")) { + Offset.Clear(); + Limit.Clear(); + return; + } if (!UpdateRecordsRange(*this, settingName, NYql::FromString<ui64>(*setting->Child(1)->Child(0), NUdf::EDataSlot::Uint64))) { break; } diff --git a/yt/yql/providers/yt/lib/res_pull/table_limiter.cpp b/yt/yql/providers/yt/lib/res_pull/table_limiter.cpp index 98731da58f..ddd4781ed6 100644 --- a/yt/yql/providers/yt/lib/res_pull/table_limiter.cpp +++ b/yt/yql/providers/yt/lib/res_pull/table_limiter.cpp @@ -8,11 +8,16 @@ namespace NYql { TTableLimiter::TTableLimiter(const TRecordsRange& range) : Start(range.Offset.GetOrElse(0ULL)) - , End(range.Limit.Defined() ? Start + *range.Limit : Max()) , Current(0ULL) , TableStart(0ULL) , TableEnd(Max()) { + const auto limit = range.Limit.GetOrElse(Max()); + if (limit > Max<ui64>() - Start) { + End = Max(); + } else { + End = Start + limit; + } } bool TTableLimiter::NextTable(ui64 recordCount) { diff --git a/yt/yql/providers/yt/provider/yql_yt_dq_hybrid.cpp b/yt/yql/providers/yt/provider/yql_yt_dq_hybrid.cpp index 0c8c9fdf6c..b3233e37f1 100644 --- a/yt/yql/providers/yt/provider/yql_yt_dq_hybrid.cpp +++ b/yt/yql/providers/yt/provider/yql_yt_dq_hybrid.cpp @@ -279,31 +279,41 @@ private: .Build() .Done(); - TExprNode::TPtr limit; - if (const auto& limitNode = NYql::GetSetting(sort.Settings().Ref(), EYtSettingType::Limit)) { - limit = GetLimitExpr(limitNode, ctx); - } - + TExprNode::TPtr work; auto [direct, selector] = GetOutputSortSettings(sort, ctx); - auto work = direct && selector ? - limit ? - Build<TCoTopSort>(ctx, sort.Pos()) + if (direct && selector) { + // Don't use runtime limit for TopSort - it may have max<ui64>() value, which cause TopSort to fail + TMaybe<ui64> limit = GetLimit(sort.Settings().Ref()); + work = limit + ? Build<TCoTopSort>(ctx, sort.Pos()) .Input(input) - .Count(std::move(limit)) + .Count<TCoUint64>() + .Literal() + .Value(ToString(*limit), TNodeFlags::Default) + .Build() + .Build() .SortDirections(std::move(direct)) .KeySelectorLambda(std::move(selector)) - .Done().Ptr(): - Build<TCoSort>(ctx, sort.Pos()) + .Done().Ptr() + : Build<TCoSort>(ctx, sort.Pos()) .Input(input) .SortDirections(std::move(direct)) .KeySelectorLambda(std::move(selector)) - .Done().Ptr(): - limit ? - Build<TCoTake>(ctx, sort.Pos()) + .Done().Ptr() + ; + } else { + TExprNode::TPtr limit; + if (const auto& limitNode = NYql::GetSetting(sort.Settings().Ref(), EYtSettingType::Limit)) { + limit = GetLimitExpr(limitNode, ctx); + } + + work = limit + ? Build<TCoTake>(ctx, sort.Pos()) .Input(input) .Count(std::move(limit)) - .Done().Ptr(): - input.Ptr(); + .Done().Ptr() + : input.Ptr(); + } auto settings = NYql::AddSetting(sort.Settings().Ref(), EYtSettingType::NoDq, {}, ctx); auto operation = ctx.ChangeChild(sort.Ref(), TYtTransientOpBase::idx_Settings, std::move(settings)); diff --git a/yt/yql/providers/yt/provider/yql_yt_helpers.cpp b/yt/yql/providers/yt/provider/yql_yt_helpers.cpp index 97b9fdeda9..d8b1f20823 100644 --- a/yt/yql/providers/yt/provider/yql_yt_helpers.cpp +++ b/yt/yql/providers/yt/provider/yql_yt_helpers.cpp @@ -905,7 +905,30 @@ TExprNode::TPtr GetLimitExpr(const TExprNode::TPtr& limitSetting, TExprContext& } if (skip) { - limitValues.push_back(ctx.NewCallable(child->Pos(), "+", { take, skip })); + auto uintMax = ctx.Builder(child->Pos()) + .Callable("Uint64") + .Atom(0, ToString(Max<ui64>()), TNodeFlags::Default) + .Seal() + .Build(); + limitValues.push_back( + ctx.Builder(child->Pos()) + .Callable("If") + .Callable(0, ">") + .Add(0, take) + .Callable(1, "-") + .Add(0, uintMax) + .Add(1, skip) + .Seal() + .Seal() + .Add(1, uintMax) + .Callable(2, "+") + .Add(0, take) + .Add(1, skip) + .Seal() + .Seal() + .Build() + ); + } else { limitValues.push_back(take); } diff --git a/yt/yql/providers/yt/provider/yql_yt_optimize.cpp b/yt/yql/providers/yt/provider/yql_yt_optimize.cpp index 0137b343eb..cd0b5e2ade 100644 --- a/yt/yql/providers/yt/provider/yql_yt_optimize.cpp +++ b/yt/yql/providers/yt/provider/yql_yt_optimize.cpp @@ -73,7 +73,7 @@ TMaybeNode<TYtSection> MaterializeSectionIfRequired(TExprBase world, TYtSection .Paths() .Add(path) .Build() - .Settings(NYql::RemoveSetting(section.Settings().Ref(), EYtSettingType::Sample, ctx)) + .Settings(NYql::RemoveSettings(section.Settings().Ref(), EYtSettingType::Sample | EYtSettingType::SysColumns, ctx)) .Done(); } @@ -89,7 +89,7 @@ TMaybeNode<TYtSection> UpdateSectionWithRange(TExprBase world, TYtSection sectio TVector<TYtPath> skippedPaths; if (auto limiter = TTableLimiter(range)) { if (auto materialized = MaterializeSectionIfRequired(world, section, dataSink, outRowSpec, keepSortness, - {NYql::KeepOnlySettings(section.Settings().Ref(), EYtSettingType::Take | EYtSettingType::Skip | EYtSettingType::SysColumns, ctx)}, state, ctx)) + {NYql::KeepOnlySettings(section.Settings().Ref(), EYtSettingType::Take | EYtSettingType::Skip, ctx)}, state, ctx)) { if (!allowMaterialize || state->Types->EvaluationInProgress) { // Keep section as is diff --git a/yt/yql/tests/sql/suites/limit/dynamic_limit_offset_overflow.sql b/yt/yql/tests/sql/suites/limit/dynamic_limit_offset_overflow.sql new file mode 100644 index 0000000000..5452aceb1c --- /dev/null +++ b/yt/yql/tests/sql/suites/limit/dynamic_limit_offset_overflow.sql @@ -0,0 +1,14 @@ +-- YQL-19579 +-- Check that offset + limit don't overflow max uin64 +use plato; + +$limit = -1; +$offset = 2; +$limit = if($limit >= 0, cast($limit as uint64)); +$offset = if($offset >= 0, cast($offset as uint64)); + +$i = select distinct key from Input; + +select * from $i order by key +limit $limit offset $offset; + diff --git a/yt/yql/tests/sql/suites/select/sample_limit_recordindex.cfg b/yt/yql/tests/sql/suites/select/sample_limit_recordindex.cfg new file mode 100644 index 0000000000..6c06cba116 --- /dev/null +++ b/yt/yql/tests/sql/suites/select/sample_limit_recordindex.cfg @@ -0,0 +1,2 @@ +in Input input1100.txt + diff --git a/yt/yql/tests/sql/suites/select/sample_limit_recordindex.sql b/yt/yql/tests/sql/suites/select/sample_limit_recordindex.sql new file mode 100644 index 0000000000..220fbfa060 --- /dev/null +++ b/yt/yql/tests/sql/suites/select/sample_limit_recordindex.sql @@ -0,0 +1,12 @@ +/* custom check: len(yt_res_yson[0][b'Write'][0][b'Data']) <= 5 */ +USE plato; + +SELECT + key, + subkey, + TableRecordIndex() AS index +FROM + Input +SAMPLE 1.0 / 5 +LIMIT 5 +; diff --git a/yt/yt/client/kafka/requests.cpp b/yt/yt/client/kafka/requests.cpp index e5ac8928f8..f81e97040d 100644 --- a/yt/yt/client/kafka/requests.cpp +++ b/yt/yt/client/kafka/requests.cpp @@ -601,8 +601,10 @@ void TRspFetch::Serialize(IKafkaProtocolWriter* writer, int apiVersion) const //////////////////////////////////////////////////////////////////////////////// -void TReqSaslHandshake::Deserialize(IKafkaProtocolReader* reader, int /*apiVersion*/) +void TReqSaslHandshake::Deserialize(IKafkaProtocolReader* reader, int apiVersion) { + ApiVersion = apiVersion; + Mechanism = reader->ReadString(); } diff --git a/yt/yt/client/kafka/requests.h b/yt/yt/client/kafka/requests.h index e8c4342354..04435ffcaf 100644 --- a/yt/yt/client/kafka/requests.h +++ b/yt/yt/client/kafka/requests.h @@ -120,6 +120,13 @@ struct TRecord //////////////////////////////////////////////////////////////////////////////// +struct TReqBase +{ + int ApiVersion = 0; +}; + +//////////////////////////////////////////////////////////////////////////////// + struct TReqApiVersions { static constexpr ERequestType RequestType = ERequestType::ApiVersions; @@ -504,6 +511,7 @@ struct TRspFetch //////////////////////////////////////////////////////////////////////////////// struct TReqSaslHandshake + : public TReqBase { static constexpr ERequestType RequestType = ERequestType::SaslHandshake; diff --git a/yt/yt/core/bus/client.h b/yt/yt/core/bus/client.h index fa12665fc2..1b9ca361f5 100644 --- a/yt/yt/core/bus/client.h +++ b/yt/yt/core/bus/client.h @@ -2,6 +2,8 @@ #include "public.h" +#include <yt/yt/core/bus/tcp/public.h> + #include <yt/yt/core/ytree/public.h> namespace NYT::NBus { @@ -28,6 +30,9 @@ struct IBusClient //! Typically used for constructing errors. virtual const NYTree::IAttributeDictionary& GetEndpointAttributes() const = 0; + //! Apply new dynamic config. + virtual void OnDynamicConfigChanged(const NBus::TBusClientDynamicConfigPtr& config) = 0; + //! Creates a new bus. /*! * The bus will point to the address supplied during construction. diff --git a/yt/yt/core/bus/server.h b/yt/yt/core/bus/server.h index dd7a35bc13..d217935c36 100644 --- a/yt/yt/core/bus/server.h +++ b/yt/yt/core/bus/server.h @@ -2,6 +2,8 @@ #include "public.h" +#include <yt/yt/core/bus/tcp/public.h> + #include <yt/yt/core/actions/future.h> namespace NYT::NBus { @@ -22,6 +24,12 @@ struct IBusServer */ virtual void Start(IMessageHandlerPtr handler) = 0; + //! Apply new dynamic config. + /* + * \param config New config. + */ + virtual void OnDynamicConfigChanged(const NBus::TBusServerDynamicConfigPtr& config) = 0; + //! Asynchronously stops the listener. /*! * After this call the instance is no longer usable. diff --git a/yt/yt/core/bus/tcp/client.cpp b/yt/yt/core/bus/tcp/client.cpp index 5dd0117bee..81db62b1ab 100644 --- a/yt/yt/core/bus/tcp/client.cpp +++ b/yt/yt/core/bus/tcp/client.cpp @@ -160,6 +160,11 @@ public: return *EndpointAttributes_; } + void OnDynamicConfigChanged(const NBus::TBusClientDynamicConfigPtr& config) override + { + DynamicConfig_.Store(config); + } + IBusPtr CreateBus(IMessageHandlerPtr handler, const TCreateBusOptions& options) override { YT_ASSERT_THREAD_AFFINITY_ANY(); @@ -181,7 +186,6 @@ public: .EndMap()); auto poller = TTcpDispatcher::TImpl::Get()->GetXferPoller(); - auto connection = New<TTcpConnection>( Config_, EConnectionType::Client, @@ -196,7 +200,8 @@ public: std::move(handler), std::move(poller), PacketTranscoderFactory_, - MemoryUsageTracker_); + MemoryUsageTracker_, + DynamicConfig_.Acquire()->NeedRejectConnectionDueMemoryOvercommit); connection->Start(); return New<TTcpClientBusProxy>(std::move(connection)); @@ -204,6 +209,7 @@ public: private: const TBusClientConfigPtr Config_; + TAtomicIntrusivePtr<TBusClientDynamicConfig> DynamicConfig_{New<TBusClientDynamicConfig>()}; IPacketTranscoderFactory* const PacketTranscoderFactory_; const IMemoryUsageTrackerPtr MemoryUsageTracker_; const std::string EndpointDescription_; diff --git a/yt/yt/core/bus/tcp/config.cpp b/yt/yt/core/bus/tcp/config.cpp index f67c6fba6c..809a2d7413 100644 --- a/yt/yt/core/bus/tcp/config.cpp +++ b/yt/yt/core/bus/tcp/config.cpp @@ -126,6 +126,9 @@ TBusServerConfigPtr TBusServerConfig::CreateUds(const std::string& socketPath) return config; } +void TBusServerDynamicConfig::Register(TRegistrar /*registrar*/) +{ } + //////////////////////////////////////////////////////////////////////////////// void TBusConfig::Register(TRegistrar registrar) @@ -170,6 +173,14 @@ void TBusConfig::Register(TRegistrar registrar) //////////////////////////////////////////////////////////////////////////////// +void TBusDynamicConfig::Register(TRegistrar registrar) +{ + registrar.Parameter("need_reject_connection_due_memory_overcommit", &TThis::NeedRejectConnectionDueMemoryOvercommit) + .Default(false); +} + +//////////////////////////////////////////////////////////////////////////////// + void TBusClientConfig::Register(TRegistrar registrar) { registrar.Parameter("address", &TThis::Address) @@ -198,6 +209,9 @@ TBusClientConfigPtr TBusClientConfig::CreateUds(const std::string& socketPath) return config; } +void TBusClientDynamicConfig::Register(TRegistrar /*registrar*/) +{ } + //////////////////////////////////////////////////////////////////////////////// } // namespace NYT::NBus diff --git a/yt/yt/core/bus/tcp/config.h b/yt/yt/core/bus/tcp/config.h index 491612fabb..d0a5bde08d 100644 --- a/yt/yt/core/bus/tcp/config.h +++ b/yt/yt/core/bus/tcp/config.h @@ -130,6 +130,21 @@ DEFINE_REFCOUNTED_TYPE(TBusConfig) //////////////////////////////////////////////////////////////////////////////// +class TBusDynamicConfig + : public NYTree::TYsonStruct +{ +public: + bool NeedRejectConnectionDueMemoryOvercommit; + + REGISTER_YSON_STRUCT(TBusDynamicConfig); + + static void Register(TRegistrar registrar); +}; + +DEFINE_REFCOUNTED_TYPE(TBusDynamicConfig) + +//////////////////////////////////////////////////////////////////////////////// + class TBusServerConfig : public TBusConfig { @@ -151,6 +166,19 @@ DEFINE_REFCOUNTED_TYPE(TBusServerConfig) //////////////////////////////////////////////////////////////////////////////// +class TBusServerDynamicConfig + : public TBusDynamicConfig +{ +public: + REGISTER_YSON_STRUCT(TBusServerDynamicConfig); + + static void Register(TRegistrar registrar); +}; + +DEFINE_REFCOUNTED_TYPE(TBusServerDynamicConfig) + +//////////////////////////////////////////////////////////////////////////////// + class TBusClientConfig : public TBusConfig { @@ -170,5 +198,18 @@ DEFINE_REFCOUNTED_TYPE(TBusClientConfig) //////////////////////////////////////////////////////////////////////////////// +class TBusClientDynamicConfig + : public TBusDynamicConfig +{ +public: + REGISTER_YSON_STRUCT(TBusClientDynamicConfig); + + static void Register(TRegistrar registrar); +}; + +DEFINE_REFCOUNTED_TYPE(TBusClientDynamicConfig) + +//////////////////////////////////////////////////////////////////////////////// + } // namespace NYT::NBus diff --git a/yt/yt/core/bus/tcp/connection.cpp b/yt/yt/core/bus/tcp/connection.cpp index a357e9199d..792cc3d0c4 100644 --- a/yt/yt/core/bus/tcp/connection.cpp +++ b/yt/yt/core/bus/tcp/connection.cpp @@ -114,7 +114,8 @@ TTcpConnection::TTcpConnection( IMessageHandlerPtr handler, IPollerPtr poller, IPacketTranscoderFactory* packetTranscoderFactory, - IMemoryUsageTrackerPtr memoryUsageTracker) + IMemoryUsageTrackerPtr memoryUsageTracker, + bool needRejectConnectionDueMemoryOvercommit) : Config_(std::move(config)) , ConnectionType_(connectionType) , Id_(id) @@ -139,6 +140,7 @@ TTcpConnection::TTcpConnection( , EncryptionMode_(Config_->EncryptionMode) , VerificationMode_(Config_->VerificationMode) , MemoryUsageTracker_(std::move(memoryUsageTracker)) + , NeedRejectConnectionDueMemoryOvercommit_(needRejectConnectionDueMemoryOvercommit) { } TTcpConnection::~TTcpConnection() @@ -596,9 +598,15 @@ void TTcpConnection::InitBuffers() ConnectionType_ == EConnectionType::Server ? GetRefCountedTypeCookie<TTcpServerConnectionWriteBufferTag>() : GetRefCountedTypeCookie<TTcpClientConnectionWriteBufferTag>()); - trackedBlob - .TryReserve(WriteBufferSize) - .ThrowOnError(); + + if (NeedRejectConnectionDueMemoryOvercommit_) { + trackedBlob + .TryReserve(WriteBufferSize) + .ThrowOnError(); + } else { + trackedBlob.Reserve(WriteBufferSize); + } + WriteBuffers_.push_back(std::move(trackedBlob)); } diff --git a/yt/yt/core/bus/tcp/connection.h b/yt/yt/core/bus/tcp/connection.h index bc520f331a..229d244828 100644 --- a/yt/yt/core/bus/tcp/connection.h +++ b/yt/yt/core/bus/tcp/connection.h @@ -88,7 +88,8 @@ public: IMessageHandlerPtr handler, NConcurrency::IPollerPtr poller, IPacketTranscoderFactory* packetTranscoderFactory, - IMemoryUsageTrackerPtr memoryUsageTracker); + IMemoryUsageTrackerPtr memoryUsageTracker, + bool needRejectConnectionDueMemoryOvercommit); ~TTcpConnection(); @@ -279,6 +280,7 @@ private: const EVerificationMode VerificationMode_; const IMemoryUsageTrackerPtr MemoryUsageTracker_; + const bool NeedRejectConnectionDueMemoryOvercommit_; NYTree::IAttributeDictionaryPtr PeerAttributes_; diff --git a/yt/yt/core/bus/tcp/public.h b/yt/yt/core/bus/tcp/public.h index 0c86109c1a..aa86f46c4b 100644 --- a/yt/yt/core/bus/tcp/public.h +++ b/yt/yt/core/bus/tcp/public.h @@ -14,9 +14,15 @@ using TBusNetworkCountersPtr = TIntrusivePtr<TBusNetworkCounters>; DECLARE_REFCOUNTED_CLASS(TMultiplexingBandConfig) DECLARE_REFCOUNTED_CLASS(TTcpDispatcherConfig) DECLARE_REFCOUNTED_CLASS(TTcpDispatcherDynamicConfig) + DECLARE_REFCOUNTED_CLASS(TBusConfig) +DECLARE_REFCOUNTED_CLASS(TBusDynamicConfig) + DECLARE_REFCOUNTED_CLASS(TBusServerConfig) +DECLARE_REFCOUNTED_CLASS(TBusServerDynamicConfig) + DECLARE_REFCOUNTED_CLASS(TBusClientConfig) +DECLARE_REFCOUNTED_CLASS(TBusClientDynamicConfig) struct IPacketTranscoderFactory; diff --git a/yt/yt/core/bus/tcp/server.cpp b/yt/yt/core/bus/tcp/server.cpp index a9fcf0c8a0..2802f2d5d6 100644 --- a/yt/yt/core/bus/tcp/server.cpp +++ b/yt/yt/core/bus/tcp/server.cpp @@ -78,6 +78,13 @@ public: YT_LOG_INFO("Bus server started"); } + void OnDynamicConfigChanged(const NBus::TBusServerDynamicConfigPtr& config) + { + YT_VERIFY(config); + + DynamicConfig_.Store(config); + } + TFuture<void> Stop() { YT_LOG_INFO("Stopping Bus server"); @@ -122,6 +129,7 @@ public: protected: const TBusServerConfigPtr Config_; + TAtomicIntrusivePtr<TBusServerDynamicConfig> DynamicConfig_{New<TBusServerDynamicConfig>()}; const IPollerPtr Poller_; const IMessageHandlerPtr Handler_; IPacketTranscoderFactory* const PacketTranscoderFactory_; @@ -254,7 +262,6 @@ protected: .EndMap()); auto poller = TTcpDispatcher::TImpl::Get()->GetXferPoller(); - auto connection = New<TTcpConnection>( Config_, EConnectionType::Server, @@ -269,7 +276,8 @@ protected: Handler_, std::move(poller), PacketTranscoderFactory_, - MemoryUsageTracker_); + MemoryUsageTracker_, + DynamicConfig_.Acquire()->NeedRejectConnectionDueMemoryOvercommit); { auto guard = WriterGuard(ConnectionsSpinLock_); @@ -435,6 +443,18 @@ public: Server_.Store(server); server->Start(); + server->OnDynamicConfigChanged(DynamicConfig_.Acquire()); + } + + void OnDynamicConfigChanged(const NBus::TBusServerDynamicConfigPtr& config) final + { + YT_VERIFY(config); + + DynamicConfig_.Store(config); + + if (auto server = Server_.Acquire()) { + server->OnDynamicConfigChanged(config); + } } TFuture<void> Stop() final @@ -448,6 +468,7 @@ public: private: const TBusServerConfigPtr Config_; + TAtomicIntrusivePtr<TBusServerDynamicConfig> DynamicConfig_{New<TBusServerDynamicConfig>()}; IPacketTranscoderFactory* const PacketTranscoderFactory_; const IMemoryUsageTrackerPtr MemoryUsageTracker_; @@ -521,6 +542,15 @@ public: } } + void OnDynamicConfigChanged(const NBus::TBusServerDynamicConfigPtr& config) final + { + YT_VERIFY(config); + + for (const auto& server : Servers_) { + server->OnDynamicConfigChanged(config); + } + } + TFuture<void> Stop() final { if (Config_->EnableLocalBypass && Config_->Port) { diff --git a/yt/yt/core/concurrency/arcadia_interop-inl.h b/yt/yt/core/concurrency/arcadia_interop-inl.h new file mode 100644 index 0000000000..57dc19c66d --- /dev/null +++ b/yt/yt/core/concurrency/arcadia_interop-inl.h @@ -0,0 +1,68 @@ +#ifndef ARCADIA_INTEROP_INL_H_ +#error "Direct inclusion of this file is not allowed, include async_batcher.h" +// For the sake of sane code completion. +#include "arcadia_interop.h" +#endif +#undef ARCADIA_INTEROP_INL_H_ + +#include <yt/yt/core/actions/future.h> + +#include <library/cpp/threading/future/core/future.h> + +namespace NYT::NConcurrency { + +//////////////////////////////////////////////////////////////////////////////// + +template <class T> +::NThreading::TFuture<T> ToArcadiaFuture(const TFuture<T>& future) +{ + auto promise = ::NThreading::NewPromise<T>(); + auto wrappedFuture = promise.GetFuture(); + + future + .Subscribe(BIND([promise = std::move(promise)] (const TErrorOr<T>& valueOrError) mutable { + try { + if constexpr (std::is_same_v<T, void>) { + valueOrError + .ThrowOnError(); + promise.TrySetValue(); + } else { + auto value = valueOrError + .ValueOrThrow(); + promise.TrySetValue(std::move(value)); + } + } catch (...) { + promise.TrySetException(std::current_exception()); + } + })); + + return wrappedFuture; +} + +template <class T> +TFuture<T> FromArcadiaFuture(const ::NThreading::TFuture<T>& future) +{ + auto promise = NewPromise<T>(); + auto wrappedFuture = promise.ToFuture(); + + future + .Subscribe([promise = std::move(promise)](::NThreading::TFuture<T> future) { + YT_ASSERT(future.HasValue() || future.HasException()); + try { + if constexpr (std::is_void_v<T>) { + future.TryRethrow(); + promise.TrySet(); + } else { + promise.TrySet(future.ExtractValueSync()); + } + } catch (const std::exception& e) { + promise.TrySet(NYT::TError(e)); + } + }); + + return wrappedFuture; +} + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT::NConcurrency diff --git a/yt/yt/core/concurrency/arcadia_interop.h b/yt/yt/core/concurrency/arcadia_interop.h new file mode 100644 index 0000000000..d46ebb8c00 --- /dev/null +++ b/yt/yt/core/concurrency/arcadia_interop.h @@ -0,0 +1,23 @@ +#pragma once + +#include <yt/yt/core/actions/future.h> + +#include <library/cpp/threading/future/core/future.h> + +namespace NYT::NConcurrency { + +//////////////////////////////////////////////////////////////////////////////// + +template <class T> +::NThreading::TFuture<T> ToArcadiaFuture(const TFuture<T>& future); + +template <class T> +TFuture<T> FromArcadiaFuture(const ::NThreading::TFuture<T>& future); + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT::NConcurrency + +#define ARCADIA_INTEROP_INL_H_ +#include "arcadia_interop-inl.h" +#undef ARCADIA_INTEROP_INL_H_ diff --git a/yt/yt/core/concurrency/unittests/arcadia_interop_ut.cpp b/yt/yt/core/concurrency/unittests/arcadia_interop_ut.cpp new file mode 100644 index 0000000000..a987ebbca2 --- /dev/null +++ b/yt/yt/core/concurrency/unittests/arcadia_interop_ut.cpp @@ -0,0 +1,137 @@ +#include <yt/yt/core/test_framework/framework.h> + +#include <yt/yt/core/concurrency/arcadia_interop.h> + +namespace NYT::NConcurrency { +namespace { + +//////////////////////////////////////////////////////////////////////////////// + +TEST(TFutureInteropTest, FromArcadiaFutureWithValue1) +{ + auto future = FromArcadiaFuture(::NThreading::MakeFuture<int>(1)); + ASSERT_TRUE(future.IsSet()); + EXPECT_EQ(1, future.Get().ValueOrThrow()); +} + +TEST(TFutureInteropTest, FromArcadiaFutureWithValue2) +{ + ::testing::TProbeState state; + auto promise = ::NThreading::NewPromise<::testing::TProbe>(); + auto future = FromArcadiaFuture(promise.GetFuture()); + EXPECT_TRUE(!future.IsSet()); + promise.SetValue(::testing::TProbe(&state)); + ASSERT_TRUE(future.IsSet()); + EXPECT_TRUE(future.Get().ValueOrThrow().IsValid()); + EXPECT_THAT(state, ::testing::HasCopyMoveCounts(0, 3)); +} + +TEST(TFutureInteropTest, FromArcadiaFutureWithError1) +{ + auto promise = ::NThreading::NewPromise<int>(); + promise.SetException("error"); + auto future = FromArcadiaFuture(promise.GetFuture()); + ASSERT_TRUE(future.IsSet()); + EXPECT_THROW_MESSAGE_HAS_SUBSTR(future.Get().ThrowOnError(), std::exception, "error"); +} + +TEST(TFutureInteropTest, FromArcadiaFutureWithError2) +{ + auto promise = ::NThreading::NewPromise<int>(); + auto future = FromArcadiaFuture(promise.GetFuture()); + EXPECT_TRUE(!future.IsSet()); + promise.SetException("error"); + ASSERT_TRUE(future.IsSet()); + EXPECT_THROW_MESSAGE_HAS_SUBSTR(future.Get().ThrowOnError(), std::exception, "error"); +} + +TEST(TFutureInteropTest, FromArcadiaFutureVoid1) +{ + auto promise = ::NThreading::NewPromise<void>(); + auto future = FromArcadiaFuture(promise.GetFuture()); + EXPECT_TRUE(!future.IsSet()); + promise.SetValue(); + ASSERT_TRUE(future.IsSet()); + EXPECT_NO_THROW(future.Get().ThrowOnError()); +} + +TEST(TFutureInteropTest, FromArcadiaFutureVoid2) +{ + auto promise = ::NThreading::NewPromise<void>(); + auto future = FromArcadiaFuture(promise.GetFuture()); + EXPECT_TRUE(!future.IsSet()); + promise.SetException("error"); + ASSERT_TRUE(future.IsSet()); + ASSERT_THROW_MESSAGE_HAS_SUBSTR(future.Get().ThrowOnError(), std::exception, "error"); +} + +TEST(TFutureInteropTest, FromArcadiaFutureCancel) +{ + auto promise = ::NThreading::NewPromise<void>(); + auto future = FromArcadiaFuture(promise.GetFuture()); + EXPECT_TRUE(!future.IsSet()); + future.Cancel(TError("canceled")); + promise.SetValue(); + ASSERT_THROW_MESSAGE_HAS_SUBSTR(future.Get().ThrowOnError(), std::exception, "canceled"); +} + +TEST(TFutureInteropTest, ToArcadiaFutureWithValue) +{ + ::testing::TProbeState state; + auto promise = NewPromise<::testing::TProbe>(); + auto future = ToArcadiaFuture(promise.ToFuture()); + EXPECT_FALSE(future.HasValue()); + promise.Set(::testing::TProbe(&state)); + ASSERT_TRUE(future.HasValue()); + EXPECT_TRUE(future.GetValue().IsValid()); + EXPECT_THAT(state, ::testing::HasCopyMoveCounts(1, 2)); +} + +TEST(TFutureInteropTest, ToArcadiaFutureWithError1) +{ + ::testing::TProbeState state; + auto promise = NewPromise<::testing::TProbe>(); + auto future = ToArcadiaFuture(promise.ToFuture()); + EXPECT_FALSE(future.HasValue()); + promise.Set(TError("error")); + ASSERT_TRUE(future.HasException()); + EXPECT_THROW_MESSAGE_HAS_SUBSTR(future.TryRethrow(), std::exception, "error"); + EXPECT_THAT(state, ::testing::HasCopyMoveCounts(0, 0)); +} + +TEST(TFutureInteropTest, ToArcadiaFutureCanceled) +{ + ::testing::TProbeState state; + auto promise = NewPromise<::testing::TProbe>(); + auto future = ToArcadiaFuture(promise.ToFuture()); + EXPECT_FALSE(future.HasValue()); + + promise.ToFuture().Cancel(TError("canceled")); + ASSERT_TRUE(future.HasException()); + EXPECT_THROW_MESSAGE_HAS_SUBSTR(future.TryRethrow(), std::exception, "canceled"); + EXPECT_THAT(state, ::testing::HasCopyMoveCounts(0, 0)); +} + +TEST(TFutureInteropTest, ToArcadiaFutureVoid1) +{ + auto promise = NewPromise<void>(); + auto future = ToArcadiaFuture(promise.ToFuture()); + EXPECT_FALSE(future.HasValue()); + promise.Set(); + EXPECT_TRUE(future.HasValue()); +} + +TEST(TFutureInteropTest, ToArcadiaFutureVoid2) +{ + auto promise = NewPromise<void>(); + auto future = ToArcadiaFuture(promise.ToFuture()); + EXPECT_FALSE(future.HasValue()); + promise.Set(TError("error")); + EXPECT_TRUE(future.HasException()); + EXPECT_THROW_MESSAGE_HAS_SUBSTR(future.TryRethrow(), std::exception, "error"); +} + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace +} // namespace NYT:::NConcurrency diff --git a/yt/yt/core/concurrency/unittests/ya.make b/yt/yt/core/concurrency/unittests/ya.make index 1d59b915ee..f78e63ebce 100644 --- a/yt/yt/core/concurrency/unittests/ya.make +++ b/yt/yt/core/concurrency/unittests/ya.make @@ -5,6 +5,7 @@ INCLUDE(${ARCADIA_ROOT}/yt/ya_cpp.make.inc) PROTO_NAMESPACE(yt) SRCS( + arcadia_interop_ut.cpp async_barrier_ut.cpp async_looper_ut.cpp async_rw_lock_ut.cpp @@ -47,6 +48,7 @@ PEERDIR( yt/yt/core/test_framework library/cpp/json/yson + library/cpp/threading/future ) REQUIREMENTS( |