diff options
author | kvk1920 <kvk1920@yandex-team.com> | 2023-09-14 22:32:54 +0300 |
---|---|---|
committer | kvk1920 <kvk1920@yandex-team.com> | 2023-09-14 22:50:46 +0300 |
commit | 09cfb2c95783f076ce68a8995125d73fbffcf0d9 (patch) | |
tree | d578ac7caecf6ed3b9ae3c54bb8f6cd1f3969dc4 | |
parent | 179f98b85dbb00b946e8bc3d40dde90e5b9ba8df (diff) | |
download | ydb-09cfb2c95783f076ce68a8995125d73fbffcf0d9.tar.gz |
Per-request complexity limits
26 files changed, 312 insertions, 222 deletions
diff --git a/yt/yt/client/api/cypress_client.h b/yt/yt/client/api/cypress_client.h index 52f1d3b953..128403106f 100644 --- a/yt/yt/client/api/cypress_client.h +++ b/yt/yt/client/api/cypress_client.h @@ -3,17 +3,24 @@ #include "client_common.h" #include <yt/yt/core/ytree/attribute_filter.h> +#include <yt/yt/core/ytree/request_complexity_limits.h> namespace NYT::NApi { //////////////////////////////////////////////////////////////////////////////// +struct TReadCypressReqeustOptions +{ + NYTree::TReadRequestComplexityOverrides ComplexityLimits; +}; + struct TGetNodeOptions : public TTimeoutOptions , public TTransactionalOptions , public TMasterReadOptions , public TSuppressableAccessTrackingOptions , public TPrerequisiteOptions + , public TReadCypressReqeustOptions { // NB(eshcherbin): Used in profiling Orchid. NYTree::IAttributeDictionaryPtr Options; @@ -58,6 +65,7 @@ struct TListNodeOptions , public TMasterReadOptions , public TSuppressableAccessTrackingOptions , public TPrerequisiteOptions + , public TReadCypressReqeustOptions { NYTree::TAttributeFilter Attributes; std::optional<i64> MaxSize; diff --git a/yt/yt/client/api/rpc_proxy/client_base.cpp b/yt/yt/client/api/rpc_proxy/client_base.cpp index 0432530ca2..b31ce8903f 100644 --- a/yt/yt/client/api/rpc_proxy/client_base.cpp +++ b/yt/yt/client/api/rpc_proxy/client_base.cpp @@ -229,6 +229,8 @@ TFuture<TYsonString> TClientBase::GetNode( req->set_max_size(*options.MaxSize); } + ToProto(req->mutable_complexity_limits(), options.ComplexityLimits); + ToProto(req->mutable_transactional_options(), options); ToProto(req->mutable_prerequisite_options(), options); ToProto(req->mutable_master_read_options(), options); @@ -265,6 +267,8 @@ TFuture<TYsonString> TClientBase::ListNode( req->set_max_size(*options.MaxSize); } + ToProto(req->mutable_complexity_limits(), options.ComplexityLimits); + ToProto(req->mutable_transactional_options(), options); ToProto(req->mutable_prerequisite_options(), options); ToProto(req->mutable_master_read_options(), options); diff --git a/yt/yt/client/driver/command.h b/yt/yt/client/driver/command.h index 669f688fe4..8f1bdbaac1 100644 --- a/yt/yt/client/driver/command.h +++ b/yt/yt/client/driver/command.h @@ -94,7 +94,6 @@ class TTypedCommandBase { protected: TOptions Options; - }; template <class TOptions, class = void> diff --git a/yt/yt/client/driver/cypress_commands.cpp b/yt/yt/client/driver/cypress_commands.cpp index 562c0fd486..6024244f2c 100644 --- a/yt/yt/client/driver/cypress_commands.cpp +++ b/yt/yt/client/driver/cypress_commands.cpp @@ -30,6 +30,12 @@ TGetCommand::TGetCommand() .Optional(); RegisterParameter("return_only_value", ShouldReturnOnlyValue) .Default(false); + RegisterParameter("node_count_limit", Options.ComplexityLimits.NodeCount) + .Optional() + .GreaterThanOrEqual(0); + RegisterParameter("result_size_limit", Options.ComplexityLimits.ResultSize) + .Optional() + .GreaterThanOrEqual(0); } void TGetCommand::DoExecute(ICommandContextPtr context) @@ -126,6 +132,12 @@ TListCommand::TListCommand() .Optional(); RegisterParameter("return_only_value", ShouldReturnOnlyValue) .Default(false); + RegisterParameter("node_count_limit", Options.ComplexityLimits.NodeCount) + .Optional() + .GreaterThanOrEqual(0); + RegisterParameter("result_size_limit", Options.ComplexityLimits.ResultSize) + .Optional() + .GreaterThanOrEqual(0); } void TListCommand::DoExecute(ICommandContextPtr context) diff --git a/yt/yt/core/CMakeLists.darwin-x86_64.txt b/yt/yt/core/CMakeLists.darwin-x86_64.txt index 6c00fde57f..9ee10c5a62 100644 --- a/yt/yt/core/CMakeLists.darwin-x86_64.txt +++ b/yt/yt/core/CMakeLists.darwin-x86_64.txt @@ -286,6 +286,7 @@ target_sources(yt-yt-core PRIVATE ${CMAKE_SOURCE_DIR}/yt/yt/core/ytree/node_detail.cpp ${CMAKE_SOURCE_DIR}/yt/yt/core/ytree/permission.cpp ${CMAKE_SOURCE_DIR}/yt/yt/core/ytree/request_complexity_limiter.cpp + ${CMAKE_SOURCE_DIR}/yt/yt/core/ytree/request_complexity_limits.cpp ${CMAKE_SOURCE_DIR}/yt/yt/core/ytree/serialize.cpp ${CMAKE_SOURCE_DIR}/yt/yt/core/ytree/static_service_dispatcher.cpp ${CMAKE_SOURCE_DIR}/yt/yt/core/ytree/system_attribute_provider.cpp diff --git a/yt/yt/core/CMakeLists.linux-aarch64.txt b/yt/yt/core/CMakeLists.linux-aarch64.txt index 61d8c0dcd9..ca7b9ac8eb 100644 --- a/yt/yt/core/CMakeLists.linux-aarch64.txt +++ b/yt/yt/core/CMakeLists.linux-aarch64.txt @@ -286,6 +286,7 @@ target_sources(yt-yt-core PRIVATE ${CMAKE_SOURCE_DIR}/yt/yt/core/ytree/node_detail.cpp ${CMAKE_SOURCE_DIR}/yt/yt/core/ytree/permission.cpp ${CMAKE_SOURCE_DIR}/yt/yt/core/ytree/request_complexity_limiter.cpp + ${CMAKE_SOURCE_DIR}/yt/yt/core/ytree/request_complexity_limits.cpp ${CMAKE_SOURCE_DIR}/yt/yt/core/ytree/serialize.cpp ${CMAKE_SOURCE_DIR}/yt/yt/core/ytree/static_service_dispatcher.cpp ${CMAKE_SOURCE_DIR}/yt/yt/core/ytree/system_attribute_provider.cpp diff --git a/yt/yt/core/CMakeLists.linux-x86_64.txt b/yt/yt/core/CMakeLists.linux-x86_64.txt index 87c825378b..d47c1788f9 100644 --- a/yt/yt/core/CMakeLists.linux-x86_64.txt +++ b/yt/yt/core/CMakeLists.linux-x86_64.txt @@ -287,6 +287,7 @@ target_sources(yt-yt-core PRIVATE ${CMAKE_SOURCE_DIR}/yt/yt/core/ytree/node_detail.cpp ${CMAKE_SOURCE_DIR}/yt/yt/core/ytree/permission.cpp ${CMAKE_SOURCE_DIR}/yt/yt/core/ytree/request_complexity_limiter.cpp + ${CMAKE_SOURCE_DIR}/yt/yt/core/ytree/request_complexity_limits.cpp ${CMAKE_SOURCE_DIR}/yt/yt/core/ytree/serialize.cpp ${CMAKE_SOURCE_DIR}/yt/yt/core/ytree/static_service_dispatcher.cpp ${CMAKE_SOURCE_DIR}/yt/yt/core/ytree/system_attribute_provider.cpp diff --git a/yt/yt/core/CMakeLists.windows-x86_64.txt b/yt/yt/core/CMakeLists.windows-x86_64.txt index 5fd2f45d9d..0094aec31c 100644 --- a/yt/yt/core/CMakeLists.windows-x86_64.txt +++ b/yt/yt/core/CMakeLists.windows-x86_64.txt @@ -285,6 +285,7 @@ target_sources(yt-yt-core PRIVATE ${CMAKE_SOURCE_DIR}/yt/yt/core/ytree/node_detail.cpp ${CMAKE_SOURCE_DIR}/yt/yt/core/ytree/permission.cpp ${CMAKE_SOURCE_DIR}/yt/yt/core/ytree/request_complexity_limiter.cpp + ${CMAKE_SOURCE_DIR}/yt/yt/core/ytree/request_complexity_limits.cpp ${CMAKE_SOURCE_DIR}/yt/yt/core/ytree/serialize.cpp ${CMAKE_SOURCE_DIR}/yt/yt/core/ytree/static_service_dispatcher.cpp ${CMAKE_SOURCE_DIR}/yt/yt/core/ytree/system_attribute_provider.cpp diff --git a/yt/yt/core/ya.make b/yt/yt/core/ya.make index 130b09bc0f..db4201c01c 100644 --- a/yt/yt/core/ya.make +++ b/yt/yt/core/ya.make @@ -263,6 +263,7 @@ SRCS( ytree/node_detail.cpp ytree/permission.cpp ytree/request_complexity_limiter.cpp + ytree/request_complexity_limits.cpp ytree/serialize.cpp ytree/static_service_dispatcher.cpp ytree/system_attribute_provider.cpp diff --git a/yt/yt/core/yson/async_writer.cpp b/yt/yt/core/yson/async_writer.cpp index afc292ccf3..d5dd646145 100644 --- a/yt/yt/core/yson/async_writer.cpp +++ b/yt/yt/core/yson/async_writer.cpp @@ -8,7 +8,7 @@ namespace NYT::NYson { TAsyncYsonWriter::TAsyncYsonWriter(EYsonType type) : Type_(type) , SyncWriter_(&Stream_, type) - , FlushedSize_(std::make_shared<std::atomic<ui64>>(0)) + , FlushedSize_(std::make_shared<std::atomic<i64>>(0)) { } void TAsyncYsonWriter::OnStringScalar(TStringBuf value) @@ -99,7 +99,7 @@ void TAsyncYsonWriter::OnRaw(TFuture<TYsonString> asyncStr) }))); } -ui64 TAsyncYsonWriter::GetTotalWrittenSize() const +i64 TAsyncYsonWriter::GetTotalWrittenSize() const { return FlushedSize_->load(std::memory_order::relaxed) + SyncWriter_.GetTotalWrittenSize(); } diff --git a/yt/yt/core/yson/async_writer.h b/yt/yt/core/yson/async_writer.h index 2aad00eb19..7132bc1b88 100644 --- a/yt/yt/core/yson/async_writer.h +++ b/yt/yt/core/yson/async_writer.h @@ -43,7 +43,7 @@ public: TFuture<TYsonString> Finish(); const TAsyncSegments& GetSegments() const; - ui64 GetTotalWrittenSize() const; + i64 GetTotalWrittenSize() const; private: const EYsonType Type_; @@ -52,7 +52,7 @@ private: TBufferedBinaryYsonWriter SyncWriter_; TAsyncSegments AsyncSegments_; - std::shared_ptr<std::atomic<ui64>> FlushedSize_; + std::shared_ptr<std::atomic<i64>> FlushedSize_; void FlushCurrentSegment(); }; diff --git a/yt/yt/core/ytree/public.h b/yt/yt/core/ytree/public.h index d895d7ca13..a032267309 100644 --- a/yt/yt/core/ytree/public.h +++ b/yt/yt/core/ytree/public.h @@ -124,6 +124,7 @@ YT_DEFINE_ERROR_ENUM( //////////////////////////////////////////////////////////////////////////////// struct TReadRequestComplexity; +struct TReadRequestComplexityOverrides; DECLARE_REFCOUNTED_CLASS(TReadRequestComplexityLimiter) diff --git a/yt/yt/core/ytree/request_complexity_limiter.cpp b/yt/yt/core/ytree/request_complexity_limiter.cpp index c0e8724c03..5858aba7ee 100644 --- a/yt/yt/core/ytree/request_complexity_limiter.cpp +++ b/yt/yt/core/ytree/request_complexity_limiter.cpp @@ -6,120 +6,36 @@ using namespace NYson; //////////////////////////////////////////////////////////////////////////////// -namespace { - -//////////////////////////////////////////////////////////////////////////////// - -void VerifyNonNegative(const TReadRequestComplexity& complexity) noexcept -{ - YT_VERIFY(complexity.NodeCount.value_or(0) >= 0); - YT_VERIFY(complexity.ResultSize.value_or(0) >= 0); -} - -//////////////////////////////////////////////////////////////////////////////// - -} // namespace - -TReadRequestComplexity::TReadRequestComplexity(std::optional<i64> nodeCount, std::optional<i64> resultSize) noexcept - : NodeCount(std::move(nodeCount)) - , ResultSize(std::move(resultSize)) -{ - VerifyNonNegative(*this); -} - -void TReadRequestComplexity::Sanitize(const TReadRequestComplexity& max) noexcept -{ - auto applyMax = [] (std::optional<i64> value, std::optional<i64> maxValue) { - return (value.has_value() && maxValue.has_value()) - ? std::optional(std::min(*value, *maxValue)) - : value; - }; - NodeCount = applyMax(NodeCount, max.NodeCount); - ResultSize = applyMax(ResultSize, max.ResultSize); -} - -//////////////////////////////////////////////////////////////////////////////// - -TReadRequestComplexityUsage::TReadRequestComplexityUsage() noexcept - : TReadRequestComplexity(0, 0) +TReadRequestComplexityLimiter::TReadRequestComplexityLimiter(TReadRequestComplexity limits) noexcept + : Limits_{limits} { } -TReadRequestComplexityUsage::TReadRequestComplexityUsage(const TReadRequestComplexity& usage) noexcept - : TReadRequestComplexity(usage) -{ - VerifyNonNegative(usage); -} - -TReadRequestComplexityUsage& TReadRequestComplexityUsage::operator+=(const TReadRequestComplexityUsage& that) noexcept -{ - auto add = [] (std::optional<i64> target, std::optional<i64> source) { - if (source.has_value()) { - target = target.value_or(0) + source.value(); - } - return target; - }; - - NodeCount = add(NodeCount, that.NodeCount); - ResultSize = add(ResultSize, that.ResultSize); - return *this; -} - -const TReadRequestComplexity& TReadRequestComplexityUsage::AsComplexity() const noexcept -{ - return static_cast<const TReadRequestComplexity&>(*this); -} - -//////////////////////////////////////////////////////////////////////////////// - -TReadRequestComplexityLimits::TReadRequestComplexityLimits( - const TReadRequestComplexity& limits) noexcept - : TReadRequestComplexity(limits) +void TReadRequestComplexityLimiter::Charge(TReadRequestComplexity usage) noexcept { - VerifyNonNegative(limits); + Usage_.NodeCount += usage.NodeCount; + Usage_.ResultSize += usage.ResultSize; } -TError TReadRequestComplexityLimits::CheckOverdraught( - const TReadRequestComplexityUsage& usage) const noexcept +TError TReadRequestComplexityLimiter::CheckOverdraught() const noexcept { TError error; - auto checkField = [&] (TStringBuf fieldName, std::optional<i64> limit, std::optional<i64> usage) { - if (limit.has_value() && usage.value_or(0) > *limit) { + auto doCheck = [&] (TStringBuf fieldName, i64 limit, i64 usage) { + if (limit < usage) { error.SetCode(NYT::EErrorCode::Generic); - error = error << TErrorAttribute(Format("%v_usage", fieldName), *usage); - error = error << TErrorAttribute(Format("%v_limit", fieldName), *limit); + error = error + << TErrorAttribute(Format("%v_usage", fieldName), usage) + << TErrorAttribute(Format("%v_limit", fieldName), limit); + error.SetMessage("Read request complexity limits exceeded"); } }; - checkField("node_count", NodeCount, usage.NodeCount); - checkField("result_size", ResultSize, usage.ResultSize); - - if (!error.IsOK()) { - error.SetMessage("Read complexity limit exceeded"); - } + doCheck("node_count", Limits_.NodeCount, Usage_.NodeCount); + doCheck("result_size", Limits_.ResultSize, Usage_.ResultSize); return error; } -//////////////////////////////////////////////////////////////////////////////// - -TReadRequestComplexityLimiter::TReadRequestComplexityLimiter() noexcept = default; - -void TReadRequestComplexityLimiter::Reconfigure(const TReadRequestComplexity& limits) noexcept -{ - Limits_ = TReadRequestComplexityLimits(limits); -} - -void TReadRequestComplexityLimiter::Charge(const TReadRequestComplexityUsage& usage) noexcept -{ - Usage_ += usage; -} - -TError TReadRequestComplexityLimiter::CheckOverdraught() const noexcept -{ - return Limits_.CheckOverdraught(Usage_); -} - void TReadRequestComplexityLimiter::ThrowIfOverdraught() const { if (auto error = CheckOverdraught(); !error.IsOK()) { @@ -127,11 +43,6 @@ void TReadRequestComplexityLimiter::ThrowIfOverdraught() const } } -TReadRequestComplexity TReadRequestComplexityLimiter::GetUsage() const noexcept -{ - return Usage_.AsComplexity(); -} - //////////////////////////////////////////////////////////////////////////////// TLimitedAsyncYsonWriter::TLimitedAsyncYsonWriter(TReadRequestComplexityLimiterPtr complexityLimiter) @@ -145,16 +56,14 @@ void DoOnSomething( void (TAsyncYsonWriter::*onSomething)(Args...), Args... values) { + auto writtenBefore = writer.GetTotalWrittenSize(); + (writer.*onSomething)(Args(values)...); if (auto limiter = weakLimiter.Lock()) { - auto writtenBefore = writer.GetTotalWrittenSize(); - (writer.*onSomething)(values...); - if (limiter) { - limiter->Charge(TReadRequestComplexityUsage({ - /*nodeCount*/ CountNodes ? 1 : 0, - /*resultSize*/ writer.GetTotalWrittenSize() - writtenBefore, - })); - limiter->ThrowIfOverdraught(); - } + limiter->Charge({ + .NodeCount = CountNodes ? 1 : 0, + .ResultSize = writer.GetTotalWrittenSize() - writtenBefore, + }); + limiter->ThrowIfOverdraught(); } } @@ -185,47 +94,47 @@ void TLimitedAsyncYsonWriter::OnBooleanScalar(bool value) void TLimitedAsyncYsonWriter::OnEntity() { - DoOnSomething<true>(ComplexityLimiter_, UnderlyingWriter_,&TAsyncYsonWriter::OnEntity); + DoOnSomething<true>(ComplexityLimiter_, UnderlyingWriter_, &TAsyncYsonWriter::OnEntity); } void TLimitedAsyncYsonWriter::OnBeginList() { - DoOnSomething<true>(ComplexityLimiter_, UnderlyingWriter_,&TAsyncYsonWriter::OnBeginList); + DoOnSomething<true>(ComplexityLimiter_, UnderlyingWriter_, &TAsyncYsonWriter::OnBeginList); } void TLimitedAsyncYsonWriter::OnListItem() { - DoOnSomething<false>(ComplexityLimiter_, UnderlyingWriter_,&TAsyncYsonWriter::OnListItem); + DoOnSomething<false>(ComplexityLimiter_, UnderlyingWriter_, &TAsyncYsonWriter::OnListItem); } void TLimitedAsyncYsonWriter::OnEndList() { - DoOnSomething<false>(ComplexityLimiter_, UnderlyingWriter_,&TAsyncYsonWriter::OnEndList); + DoOnSomething<false>(ComplexityLimiter_, UnderlyingWriter_, &TAsyncYsonWriter::OnEndList); } void TLimitedAsyncYsonWriter::OnBeginMap() { - DoOnSomething<true>(ComplexityLimiter_, UnderlyingWriter_,&TAsyncYsonWriter::OnBeginMap); + DoOnSomething<true>(ComplexityLimiter_, UnderlyingWriter_, &TAsyncYsonWriter::OnBeginMap); } void TLimitedAsyncYsonWriter::OnKeyedItem(TStringBuf value) { - DoOnSomething<false>(ComplexityLimiter_, UnderlyingWriter_,&TAsyncYsonWriter::OnKeyedItem, value); + DoOnSomething<false>(ComplexityLimiter_, UnderlyingWriter_, &TAsyncYsonWriter::OnKeyedItem, value); } void TLimitedAsyncYsonWriter::OnEndMap() { - DoOnSomething<false>(ComplexityLimiter_, UnderlyingWriter_,&TAsyncYsonWriter::OnEndMap); + DoOnSomething<false>(ComplexityLimiter_, UnderlyingWriter_, &TAsyncYsonWriter::OnEndMap); } void TLimitedAsyncYsonWriter::OnBeginAttributes() { - DoOnSomething<true>(ComplexityLimiter_, UnderlyingWriter_,&TAsyncYsonWriter::OnBeginAttributes); + DoOnSomething<true>(ComplexityLimiter_, UnderlyingWriter_, &TAsyncYsonWriter::OnBeginAttributes); } void TLimitedAsyncYsonWriter::OnEndAttributes() { - DoOnSomething<false>(ComplexityLimiter_, UnderlyingWriter_,&TAsyncYsonWriter::OnEndAttributes); + DoOnSomething<false>(ComplexityLimiter_, UnderlyingWriter_, &TAsyncYsonWriter::OnEndAttributes); } void TLimitedAsyncYsonWriter::OnRaw(TStringBuf yson, EYsonType type) @@ -235,22 +144,23 @@ void TLimitedAsyncYsonWriter::OnRaw(TStringBuf yson, EYsonType type) void TLimitedAsyncYsonWriter::OnRaw(TFuture<TYsonString> asyncStr) { - ComplexityLimiter_->ThrowIfOverdraught(); - asyncStr.Subscribe(BIND( - [weakLimiter = TWeakPtr(ComplexityLimiter_)] (const TErrorOr<TYsonString>& str) { - if (auto limiter = weakLimiter.Lock()) { + if (ComplexityLimiter_) { + ComplexityLimiter_->ThrowIfOverdraught(); + asyncStr.Subscribe(BIND( + [weakLimiter = TWeakPtr(ComplexityLimiter_)] (const TErrorOr<TYsonString>& str) { if (str.IsOK()) { - limiter->Charge(TReadRequestComplexityUsage({ - /*nodeCount*/ 1, - /*resultSize*/ str.Value().AsStringBuf().size() - })); + if (auto limiter = weakLimiter.Lock()) { + limiter->Charge({ + .NodeCount = 0, + .ResultSize = std::ssize(str.Value().AsStringBuf()) + }); + } } - } - })); + })); + } UnderlyingWriter_.OnRaw(std::move(asyncStr)); } - TFuture<TYsonString> TLimitedAsyncYsonWriter::Finish() { return UnderlyingWriter_.Finish(); diff --git a/yt/yt/core/ytree/request_complexity_limiter.h b/yt/yt/core/ytree/request_complexity_limiter.h index ca89049e6f..74e828b4bd 100644 --- a/yt/yt/core/ytree/request_complexity_limiter.h +++ b/yt/yt/core/ytree/request_complexity_limiter.h @@ -1,72 +1,33 @@ #pragma once +#include "request_complexity_limits.h" + #include <yt/yt/core/yson/async_writer.h> namespace NYT::NYTree { //////////////////////////////////////////////////////////////////////////////// -struct TReadRequestComplexity -{ - std::optional<i64> NodeCount; - std::optional<i64> ResultSize; - - TReadRequestComplexity() noexcept = default; - TReadRequestComplexity(std::optional<i64> nodeCount, std::optional<i64> resultSize) noexcept; - void Sanitize(const TReadRequestComplexity& max) noexcept; -}; - -//////////////////////////////////////////////////////////////////////////////// - -class TReadRequestComplexityUsage - : private TReadRequestComplexity -{ -public: - friend class TReadRequestComplexityLimits; - - TReadRequestComplexityUsage() noexcept; - explicit TReadRequestComplexityUsage(const TReadRequestComplexity& usage) noexcept; - - TReadRequestComplexityUsage& operator+=(const TReadRequestComplexityUsage& that) noexcept; - - const TReadRequestComplexity& AsComplexity() const noexcept; -}; - -//////////////////////////////////////////////////////////////////////////////// - -class TReadRequestComplexityLimits - : private TReadRequestComplexity -{ -public: - TReadRequestComplexityLimits() noexcept = default; - - // NB: Limits must be non-negative. - explicit TReadRequestComplexityLimits(const TReadRequestComplexity& limits) noexcept; - - TError CheckOverdraught(const TReadRequestComplexityUsage& usage) const noexcept; -}; - -//////////////////////////////////////////////////////////////////////////////// - class TReadRequestComplexityLimiter : public TRefCounted , public TNonCopyable { public: - TReadRequestComplexityLimiter() noexcept; - - void Reconfigure(const TReadRequestComplexity& limits) noexcept; + explicit TReadRequestComplexityLimiter(TReadRequestComplexity limits) noexcept; - void Charge(const TReadRequestComplexityUsage& usage) noexcept; + void Charge(TReadRequestComplexity usage) noexcept; TError CheckOverdraught() const noexcept; void ThrowIfOverdraught() const; - TReadRequestComplexity GetUsage() const noexcept; + struct TReadRequestComplexityUsage + { + std::atomic<i64> NodeCount; + }; + DEFINE_BYVAL_RO_PROPERTY(TReadRequestComplexity, Usage); private: - TReadRequestComplexityUsage Usage_; - TReadRequestComplexityLimits Limits_; + const TReadRequestComplexity Limits_; }; DEFINE_REFCOUNTED_TYPE(TReadRequestComplexityLimiter) diff --git a/yt/yt/core/ytree/request_complexity_limits.cpp b/yt/yt/core/ytree/request_complexity_limits.cpp new file mode 100644 index 0000000000..a21dfffc4b --- /dev/null +++ b/yt/yt/core/ytree/request_complexity_limits.cpp @@ -0,0 +1,64 @@ +#include "request_complexity_limits.h" + +#include <yt/yt/core/misc/error.h> +#include <yt/yt/core/misc/protobuf_helpers.h> + +#include <yt/yt_proto/yt/core/ytree/proto/request_complexity_limits.pb.h> + +namespace NYT::NYTree { + +//////////////////////////////////////////////////////////////////////////////// + +void TReadRequestComplexityOverrides::ApplyTo(TReadRequestComplexity& complexity) const noexcept +{ + auto apply = [] (i64& target, std::optional<i64> override) { + target = override.value_or(target); + }; + + apply(complexity.NodeCount, NodeCount); + apply(complexity.ResultSize, ResultSize); +} + +void TReadRequestComplexityOverrides::Validate(TReadRequestComplexity max) const +{ + TError error; + + auto doCheck = [&] (TStringBuf fieldName, std::optional<i64> override, i64 max) { + if (override && *override > max) { + error.SetCode(NYT::EErrorCode::Generic); + error.SetMessage("Read request complexity limits too large"); + error = error << TErrorAttribute(TString(fieldName), *override); + } + }; + + doCheck("node_count", NodeCount, max.NodeCount); + doCheck("result_size", ResultSize, max.ResultSize); + + if (!error.IsOK()) { + THROW_ERROR error; + } +} + +void FromProto( + TReadRequestComplexityOverrides* original, + const NProto::TReadRequestComplexityLimits& serialized) +{ + original->NodeCount = YT_PROTO_OPTIONAL(serialized, node_count); + original->ResultSize = YT_PROTO_OPTIONAL(serialized, result_size); +} + +void ToProto( + NProto::TReadRequestComplexityLimits* serialized, + const TReadRequestComplexityOverrides& original) +{ + if (original.NodeCount) { + serialized->set_node_count(*original.NodeCount); + } + if (original.ResultSize) { + serialized->set_result_size(*original.ResultSize); + } +} + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT:NYTree diff --git a/yt/yt/core/ytree/request_complexity_limits.h b/yt/yt/core/ytree/request_complexity_limits.h new file mode 100644 index 0000000000..6c9699b731 --- /dev/null +++ b/yt/yt/core/ytree/request_complexity_limits.h @@ -0,0 +1,44 @@ +#pragma once + +#include "public.h" + +namespace NYT::NYTree { + +//////////////////////////////////////////////////////////////////////////////// + +namespace NProto { + +class TReadRequestComplexityLimits; + +} // namespace NProto + +//////////////////////////////////////////////////////////////////////////////// + +struct TReadRequestComplexity +{ + i64 NodeCount = 0; + i64 ResultSize = 0; +}; + +//////////////////////////////////////////////////////////////////////////////// + +struct TReadRequestComplexityOverrides +{ + std::optional<i64> NodeCount; + std::optional<i64> ResultSize; + + void ApplyTo(TReadRequestComplexity& complexity) const noexcept; + void Validate(TReadRequestComplexity max) const; +}; + +void FromProto( + TReadRequestComplexityOverrides* original, + const NProto::TReadRequestComplexityLimits& serialized); + +void ToProto( + NProto::TReadRequestComplexityLimits* serialized, + const TReadRequestComplexityOverrides& original); + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT::NYTree diff --git a/yt/yt/core/ytree/ypath_detail.cpp b/yt/yt/core/ytree/ypath_detail.cpp index 4476381942..03e8552ff0 100644 --- a/yt/yt/core/ytree/ypath_detail.cpp +++ b/yt/yt/core/ytree/ypath_detail.cpp @@ -44,6 +44,11 @@ void TYPathServiceContextWrapper::SetRequestHeader(std::unique_ptr<NRpc::NProto: UnderlyingContext_->SetRequestHeader(std::move(header)); } +void TYPathServiceContextWrapper::SetReadRequestComplexityLimiter(const TReadRequestComplexityLimiterPtr& limiter) +{ + UnderlyingContext_->SetReadRequestComplexityLimiter(limiter); +} + TReadRequestComplexityLimiterPtr TYPathServiceContextWrapper::GetReadRequestComplexityLimiter() { return UnderlyingContext_->GetReadRequestComplexityLimiter(); @@ -580,6 +585,34 @@ TFuture<TYsonString> TSupportsAttributes::DoGetAttribute( } } +namespace { + +void OnAttributeRead(auto* context, auto* response, const TErrorOr<TYsonString>& ysonOrError) +{ + if (!ysonOrError.IsOK()) { + context->Reply(ysonOrError); + return; + } + + auto yson = ysonOrError.Value().AsStringBuf(); + + if (auto limiter = context->GetReadRequestComplexityLimiter()) { + limiter->Charge({ + .NodeCount = 1, + .ResultSize = std::ssize(yson), + }); + if (auto error = limiter->CheckOverdraught(); !error.IsOK()) { + context->Reply(error); + return; + } + } + + response->set_value(TString(yson)); + context->Reply(); +} + +} // namespace + void TSupportsAttributes::GetAttribute( const TYPath& path, TReqGet* request, @@ -593,24 +626,7 @@ void TSupportsAttributes::GetAttribute( : TAttributeFilter(); DoGetAttribute(path, attributeFilter).Subscribe(BIND([=] (const TErrorOr<TYsonString>& ysonOrError) { - if (!ysonOrError.IsOK()) { - context->Reply(ysonOrError); - return; - } - - { - auto resultSize = ysonOrError.Value().AsStringBuf().Size(); - if (auto limiter = context->GetReadRequestComplexityLimiter()) { - limiter->Charge(TReadRequestComplexityUsage({/*nodeCount*/ 1, resultSize})); - if (auto error = limiter->CheckOverdraught(); !error.IsOK()) { - context->Reply(error); - return; - } - } - } - - response->set_value(ysonOrError.Value().ToString()); - context->Reply(); + OnAttributeRead(context.Get(), response, ysonOrError); })); } @@ -702,22 +718,7 @@ void TSupportsAttributes::ListAttribute( context->SetRequestInfo(); DoListAttribute(path).Subscribe(BIND([=] (const TErrorOr<TYsonString>& ysonOrError) { - if (ysonOrError.IsOK()) { - { - auto resultSize = ysonOrError.Value().AsStringBuf().Size(); - if (auto limiter = context->GetReadRequestComplexityLimiter()) { - limiter->Charge(TReadRequestComplexityUsage({/*nodeCount*/ 1, resultSize})); - if (auto error = limiter->CheckOverdraught(); !error.IsOK()) { - context->Reply(error); - return; - } - } - } - response->set_value(ysonOrError.Value().ToString()); - context->Reply(); - } else { - context->Reply(ysonOrError); - } + OnAttributeRead(context.Get(), response, ysonOrError); })); } @@ -1652,7 +1653,6 @@ public: template <class... TArgs> TYPathServiceContext(TArgs&&... args) : TServiceContextBase(std::forward<TArgs>(args)...) - , ReadComplexityLimiter_(New<TReadRequestComplexityLimiter>()) { } void SetRequestHeader(std::unique_ptr<NRpc::NProto::TRequestHeader> header) override @@ -1662,13 +1662,18 @@ public: CachedYPathExt_ = nullptr; } + void SetReadRequestComplexityLimiter(const TReadRequestComplexityLimiterPtr& limiter) final + { + ReadComplexityLimiter_ = limiter; + } + TReadRequestComplexityLimiterPtr GetReadRequestComplexityLimiter() final { return ReadComplexityLimiter_; } protected: - TReadRequestComplexityLimiterPtr ReadComplexityLimiter_; + TReadRequestComplexityLimiterPtr ReadComplexityLimiter_ = nullptr; std::optional<NProfiling::TWallTimer> Timer_; const NProto::TYPathHeaderExt* CachedYPathExt_ = nullptr; diff --git a/yt/yt/core/ytree/ypath_detail.h b/yt/yt/core/ytree/ypath_detail.h index 051347da16..2cb43648d6 100644 --- a/yt/yt/core/ytree/ypath_detail.h +++ b/yt/yt/core/ytree/ypath_detail.h @@ -30,6 +30,8 @@ struct IYPathServiceContext : public virtual NRpc::IServiceContext { virtual void SetRequestHeader(std::unique_ptr<NRpc::NProto::TRequestHeader> header) = 0; + + virtual void SetReadRequestComplexityLimiter(const TReadRequestComplexityLimiterPtr& limiter) = 0; virtual TReadRequestComplexityLimiterPtr GetReadRequestComplexityLimiter() = 0; }; @@ -45,6 +47,8 @@ public: explicit TYPathServiceContextWrapper(IYPathServiceContextPtr underlyingContext); void SetRequestHeader(std::unique_ptr<NRpc::NProto::TRequestHeader> header) override; + + void SetReadRequestComplexityLimiter(const TReadRequestComplexityLimiterPtr& limiter) override; TReadRequestComplexityLimiterPtr GetReadRequestComplexityLimiter() override; const IYPathServiceContextPtr& GetUnderlyingContext() const; diff --git a/yt/yt_proto/yt/client/api/rpc_proxy/proto/api_service.proto b/yt/yt_proto/yt/client/api/rpc_proxy/proto/api_service.proto index 4892210e1a..430f00cea1 100644 --- a/yt/yt_proto/yt/client/api/rpc_proxy/proto/api_service.proto +++ b/yt/yt_proto/yt/client/api/rpc_proxy/proto/api_service.proto @@ -9,6 +9,7 @@ option go_package = "github.com/ydb-platform/ydb/yt/go/proto/client/api/rpc_prox import "yt_proto/yt/core/misc/proto/guid.proto"; import "yt_proto/yt/core/misc/proto/error.proto"; import "yt_proto/yt/core/ytree/proto/attributes.proto"; +import "yt_proto/yt/core/ytree/proto/request_complexity_limits.proto"; import "yt_proto/yt/client/hive/proto/timestamp_map.proto"; import "yt_proto/yt/client/chunk_client/proto/data_statistics.proto"; import "yt_proto/yt/client/chaos_client/proto/replication_card.proto"; @@ -1152,6 +1153,7 @@ message TReqGetNode optional NYT.NYTree.NProto.TAttributeFilter attributes = 4; optional int64 max_size = 3; + optional NYT.NYTree.NProto.TReadRequestComplexityLimits complexity_limits = 6; optional TTransactionalOptions transactional_options = 100; optional TPrerequisiteOptions prerequisite_options = 101; @@ -1179,6 +1181,7 @@ message TReqListNode optional NYT.NYTree.NProto.TAttributeFilter attributes = 4; optional int64 max_size = 3; + optional NYT.NYTree.NProto.TReadRequestComplexityLimits complexity_limits = 6; optional TTransactionalOptions transactional_options = 100; optional TPrerequisiteOptions prerequisite_options = 101; diff --git a/yt/yt_proto/yt/core/CMakeLists.darwin-x86_64.txt b/yt/yt_proto/yt/core/CMakeLists.darwin-x86_64.txt index f131b5c4e9..14da9a86fa 100644 --- a/yt/yt_proto/yt/core/CMakeLists.darwin-x86_64.txt +++ b/yt/yt_proto/yt/core/CMakeLists.darwin-x86_64.txt @@ -150,6 +150,18 @@ get_built_tool_path( contrib/tools/protoc/plugins/cpp_styleguide cpp_styleguide ) +get_built_tool_path( + TOOL_protoc_bin + TOOL_protoc_dependency + contrib/tools/protoc/bin + protoc +) +get_built_tool_path( + TOOL_cpp_styleguide_bin + TOOL_cpp_styleguide_dependency + contrib/tools/protoc/plugins/cpp_styleguide + cpp_styleguide +) add_library(yt_proto-yt-core) target_include_directories(yt_proto-yt-core PUBLIC @@ -172,6 +184,7 @@ target_proto_messages(yt_proto-yt-core PRIVATE ${CMAKE_SOURCE_DIR}/yt/yt_proto/yt/core/rpc/proto/rpc.proto ${CMAKE_SOURCE_DIR}/yt/yt_proto/yt/core/yson/proto/protobuf_interop.proto ${CMAKE_SOURCE_DIR}/yt/yt_proto/yt/core/ytree/proto/attributes.proto + ${CMAKE_SOURCE_DIR}/yt/yt_proto/yt/core/ytree/proto/request_complexity_limits.proto ${CMAKE_SOURCE_DIR}/yt/yt_proto/yt/core/ytree/proto/ypath.proto ) target_proto_addincls(yt_proto-yt-core diff --git a/yt/yt_proto/yt/core/CMakeLists.linux-aarch64.txt b/yt/yt_proto/yt/core/CMakeLists.linux-aarch64.txt index 2907cf3959..8d49808dff 100644 --- a/yt/yt_proto/yt/core/CMakeLists.linux-aarch64.txt +++ b/yt/yt_proto/yt/core/CMakeLists.linux-aarch64.txt @@ -150,6 +150,18 @@ get_built_tool_path( contrib/tools/protoc/plugins/cpp_styleguide cpp_styleguide ) +get_built_tool_path( + TOOL_protoc_bin + TOOL_protoc_dependency + contrib/tools/protoc/bin + protoc +) +get_built_tool_path( + TOOL_cpp_styleguide_bin + TOOL_cpp_styleguide_dependency + contrib/tools/protoc/plugins/cpp_styleguide + cpp_styleguide +) add_library(yt_proto-yt-core) target_include_directories(yt_proto-yt-core PUBLIC @@ -173,6 +185,7 @@ target_proto_messages(yt_proto-yt-core PRIVATE ${CMAKE_SOURCE_DIR}/yt/yt_proto/yt/core/rpc/proto/rpc.proto ${CMAKE_SOURCE_DIR}/yt/yt_proto/yt/core/yson/proto/protobuf_interop.proto ${CMAKE_SOURCE_DIR}/yt/yt_proto/yt/core/ytree/proto/attributes.proto + ${CMAKE_SOURCE_DIR}/yt/yt_proto/yt/core/ytree/proto/request_complexity_limits.proto ${CMAKE_SOURCE_DIR}/yt/yt_proto/yt/core/ytree/proto/ypath.proto ) target_proto_addincls(yt_proto-yt-core diff --git a/yt/yt_proto/yt/core/CMakeLists.linux-x86_64.txt b/yt/yt_proto/yt/core/CMakeLists.linux-x86_64.txt index 2907cf3959..8d49808dff 100644 --- a/yt/yt_proto/yt/core/CMakeLists.linux-x86_64.txt +++ b/yt/yt_proto/yt/core/CMakeLists.linux-x86_64.txt @@ -150,6 +150,18 @@ get_built_tool_path( contrib/tools/protoc/plugins/cpp_styleguide cpp_styleguide ) +get_built_tool_path( + TOOL_protoc_bin + TOOL_protoc_dependency + contrib/tools/protoc/bin + protoc +) +get_built_tool_path( + TOOL_cpp_styleguide_bin + TOOL_cpp_styleguide_dependency + contrib/tools/protoc/plugins/cpp_styleguide + cpp_styleguide +) add_library(yt_proto-yt-core) target_include_directories(yt_proto-yt-core PUBLIC @@ -173,6 +185,7 @@ target_proto_messages(yt_proto-yt-core PRIVATE ${CMAKE_SOURCE_DIR}/yt/yt_proto/yt/core/rpc/proto/rpc.proto ${CMAKE_SOURCE_DIR}/yt/yt_proto/yt/core/yson/proto/protobuf_interop.proto ${CMAKE_SOURCE_DIR}/yt/yt_proto/yt/core/ytree/proto/attributes.proto + ${CMAKE_SOURCE_DIR}/yt/yt_proto/yt/core/ytree/proto/request_complexity_limits.proto ${CMAKE_SOURCE_DIR}/yt/yt_proto/yt/core/ytree/proto/ypath.proto ) target_proto_addincls(yt_proto-yt-core diff --git a/yt/yt_proto/yt/core/CMakeLists.windows-x86_64.txt b/yt/yt_proto/yt/core/CMakeLists.windows-x86_64.txt index f131b5c4e9..14da9a86fa 100644 --- a/yt/yt_proto/yt/core/CMakeLists.windows-x86_64.txt +++ b/yt/yt_proto/yt/core/CMakeLists.windows-x86_64.txt @@ -150,6 +150,18 @@ get_built_tool_path( contrib/tools/protoc/plugins/cpp_styleguide cpp_styleguide ) +get_built_tool_path( + TOOL_protoc_bin + TOOL_protoc_dependency + contrib/tools/protoc/bin + protoc +) +get_built_tool_path( + TOOL_cpp_styleguide_bin + TOOL_cpp_styleguide_dependency + contrib/tools/protoc/plugins/cpp_styleguide + cpp_styleguide +) add_library(yt_proto-yt-core) target_include_directories(yt_proto-yt-core PUBLIC @@ -172,6 +184,7 @@ target_proto_messages(yt_proto-yt-core PRIVATE ${CMAKE_SOURCE_DIR}/yt/yt_proto/yt/core/rpc/proto/rpc.proto ${CMAKE_SOURCE_DIR}/yt/yt_proto/yt/core/yson/proto/protobuf_interop.proto ${CMAKE_SOURCE_DIR}/yt/yt_proto/yt/core/ytree/proto/attributes.proto + ${CMAKE_SOURCE_DIR}/yt/yt_proto/yt/core/ytree/proto/request_complexity_limits.proto ${CMAKE_SOURCE_DIR}/yt/yt_proto/yt/core/ytree/proto/ypath.proto ) target_proto_addincls(yt_proto-yt-core diff --git a/yt/yt_proto/yt/core/ya.make b/yt/yt_proto/yt/core/ya.make index 01de274fe4..cc684cf8c9 100644 --- a/yt/yt_proto/yt/core/ya.make +++ b/yt/yt_proto/yt/core/ya.make @@ -20,6 +20,7 @@ SRCS( yson/proto/protobuf_interop.proto ytree/proto/attributes.proto + ytree/proto/request_complexity_limits.proto ytree/proto/ypath.proto ) diff --git a/yt/yt_proto/yt/core/ytree/proto/request_complexity_limits.proto b/yt/yt_proto/yt/core/ytree/proto/request_complexity_limits.proto new file mode 100644 index 0000000000..db37024a09 --- /dev/null +++ b/yt/yt_proto/yt/core/ytree/proto/request_complexity_limits.proto @@ -0,0 +1,13 @@ +package NYT.NYTree.NProto; + +option go_package = "github.com/ydb-platform/ydb/yt/go/proto/core/ytree"; + +//////////////////////////////////////////////////////////////////////////////// + +message TReadRequestComplexityLimits +{ + optional int64 node_count = 1; + optional int64 result_size = 2; +} + +//////////////////////////////////////////////////////////////////////////////// diff --git a/yt/yt_proto/yt/core/ytree/proto/ypath.proto b/yt/yt_proto/yt/core/ytree/proto/ypath.proto index 20e1192ec7..983adc82e0 100644 --- a/yt/yt_proto/yt/core/ytree/proto/ypath.proto +++ b/yt/yt_proto/yt/core/ytree/proto/ypath.proto @@ -2,6 +2,7 @@ package NYT.NYTree.NProto; import "yt_proto/yt/core/rpc/proto/rpc.proto"; import "yt_proto/yt/core/ytree/proto/attributes.proto"; +import "yt_proto/yt/core/ytree/proto/request_complexity_limits.proto"; option go_package = "github.com/ydb-platform/ydb/yt/go/proto/core/ytree"; @@ -30,6 +31,9 @@ message TYPathHeaderExt //! A copy of additional_paths in case of path rewrite. repeated string original_additional_paths = 5; + optional TReadRequestComplexityLimits read_complexity_limits = 7; + + // COMPAT(kvk1920): Unreserve `6` after 23.2. reserved 6; } |