diff options
author | shakurov <shakurov@yandex-team.com> | 2024-12-02 14:09:52 +0300 |
---|---|---|
committer | shakurov <shakurov@yandex-team.com> | 2024-12-02 14:26:39 +0300 |
commit | 399c0870221aa4217fe09d54fd6700fd41ae2687 (patch) | |
tree | 0ce18f972dde43135ad8c0fce890451bf50e1dee | |
parent | d84c9f41468183a47aae88e345178263836e9faa (diff) | |
download | ydb-399c0870221aa4217fe09d54fd6700fd41ae2687.tar.gz |
Get rid of WaitFor calls in {C,De}ompressAttachments. Rewrite usage points accordingly.
commit_hash:323aececd0cb4c041b3690980e53ef519a8c0fa1
-rw-r--r-- | yt/yt/core/rpc/bus/channel.cpp | 2 | ||||
-rw-r--r-- | yt/yt/core/rpc/client.cpp | 61 | ||||
-rw-r--r-- | yt/yt/core/rpc/client.h | 10 | ||||
-rw-r--r-- | yt/yt/core/rpc/helpers.cpp | 28 | ||||
-rw-r--r-- | yt/yt/core/rpc/service_detail.h | 21 | ||||
-rw-r--r-- | yt/yt/core/ytree/ypath_client.cpp | 5 | ||||
-rw-r--r-- | yt/yt/core/ytree/ypath_client.h | 2 |
7 files changed, 95 insertions, 34 deletions
diff --git a/yt/yt/core/rpc/bus/channel.cpp b/yt/yt/core/rpc/bus/channel.cpp index 322d1347fc..50ac07d5cb 100644 --- a/yt/yt/core/rpc/bus/channel.cpp +++ b/yt/yt/core/rpc/bus/channel.cpp @@ -718,7 +718,7 @@ private: header.clear_timeout(); } - if (options.RequestHeavy) { + if (options.RequestHeavy || request->IsAttachmentCompressionEnabled()) { BIND(&IClientRequest::Serialize, request) .AsyncVia(TDispatcher::Get()->GetHeavyInvoker()) .Run() diff --git a/yt/yt/core/rpc/client.cpp b/yt/yt/core/rpc/client.cpp index 35b7ee60f7..0c7ee040ae 100644 --- a/yt/yt/core/rpc/client.cpp +++ b/yt/yt/core/rpc/client.cpp @@ -154,6 +154,17 @@ bool TClientRequest::IsStreamingEnabled() const return StreamingEnabled_; } +bool TClientRequest::IsAttachmentCompressionEnabled() const +{ + auto attachmentCodecId = GetEffectiveAttachmentCompressionCodec(); + return attachmentCodecId != NCompression::ECodec::None; +} + +NCompression::ECodec TClientRequest::GetEffectiveAttachmentCompressionCodec() const +{ + return EnableLegacyRpcCodecs_ ? NCompression::ECodec::None : RequestCodec_; +} + const TStreamingParameters& TClientRequest::ClientAttachmentsStreamingParameters() const { return ClientAttachmentsStreamingParameters_; @@ -589,7 +600,7 @@ const IInvokerPtr& TClientResponse::GetInvoker() : TDispatcher::Get()->GetLightInvoker(); } -void TClientResponse::Deserialize(TSharedRefArray responseMessage) +TFuture<void> TClientResponse::Deserialize(TSharedRefArray responseMessage) noexcept { YT_ASSERT(responseMessage); YT_ASSERT(!ResponseMessage_); @@ -597,12 +608,16 @@ void TClientResponse::Deserialize(TSharedRefArray responseMessage) ResponseMessage_ = std::move(responseMessage); if (ResponseMessage_.Size() < 2) { - THROW_ERROR_EXCEPTION(NRpc::EErrorCode::ProtocolError, "Too few response message parts: %v < 2", - ResponseMessage_.Size()); + return MakeFuture(TError( + NRpc::EErrorCode::ProtocolError, + "Too few response message parts: %v < 2", + ResponseMessage_.Size())); } if (!TryParseResponseHeader(ResponseMessage_, &Header_)) { - THROW_ERROR_EXCEPTION(NRpc::EErrorCode::ProtocolError, "Error deserializing response header"); + return MakeFuture(TError( + NRpc::EErrorCode::ProtocolError, + "Error deserializing response header")); } // COMPAT(danilalexeev): legacy RPC codecs @@ -616,7 +631,9 @@ void TClientResponse::Deserialize(TSharedRefArray responseMessage) } if (!TryDeserializeBody(ResponseMessage_[1], bodyCodecId)) { - THROW_ERROR_EXCEPTION(NRpc::EErrorCode::ProtocolError, "Error deserializing response body"); + return MakeFuture(TError( + NRpc::EErrorCode::ProtocolError, + "Error deserializing response body")); } auto compressedAttachments = TRange(ResponseMessage_.Begin() + 2, ResponseMessage_.End()); @@ -624,12 +641,16 @@ void TClientResponse::Deserialize(TSharedRefArray responseMessage) if (attachmentCodecId == NCompression::ECodec::None) { Attachments_ = compressedAttachments.ToVector(); + return VoidFuture; } else { - Attachments_ = DecompressAttachments(compressedAttachments, attachmentCodecId); - } - - for (auto& attachment : Attachments_) { - attachment = TrackMemory(memoryUsageTracker, attachment); + return AsyncDecompressAttachments(compressedAttachments, attachmentCodecId) + .ApplyUnique(BIND([this, this_ = MakeStrong(this)] (std::vector<TSharedRef>&& decompressedAttachments) { + Attachments_ = std::move(decompressedAttachments); + auto memoryUsageTracker = ClientContext_->GetMemoryUsageTracker(); + for (auto& attachment : Attachments_) { + attachment = TrackMemory(memoryUsageTracker, attachment); + } + })); } } @@ -657,18 +678,16 @@ void TClientResponse::DoHandleResponse(TSharedRefArray message, const std::strin Address_ = address; - try { - Deserialize(std::move(message)); - Finish({}); - } catch (const std::exception& ex) { - Finish(ex); - } + Deserialize(std::move(message)) + .Subscribe(BIND([timer, this, this_ = MakeStrong(this)] (const TError& error) { + Finish(error); - if (!ClientContext_->GetResponseHeavy() && timer.GetElapsedTime() > LightInvokerDurationWarningThreshold) { - YT_LOG_DEBUG("Handling light response took too long (RequestId: %v, Duration: %v)", - ClientContext_->GetRequestId(), - timer.GetElapsedTime()); - } + if (!ClientContext_->GetResponseHeavy() && timer.GetElapsedTime() > LightInvokerDurationWarningThreshold) { + YT_LOG_DEBUG("Handling light response took too long (RequestId: %v, Duration: %v)", + ClientContext_->GetRequestId(), + timer.GetElapsedTime()); + } + })); } void TClientResponse::HandleStreamingPayload(const TStreamingPayload& payload) diff --git a/yt/yt/core/rpc/client.h b/yt/yt/core/rpc/client.h index b87559933a..0dbbe3c67e 100644 --- a/yt/yt/core/rpc/client.h +++ b/yt/yt/core/rpc/client.h @@ -39,11 +39,15 @@ namespace NYT::NRpc { struct IClientRequest : public virtual TRefCounted { + //! Potentially heavy if IsAttachmentCompressionEnabled() returns true. + //! Callers should consider offloading calls to dedicated thread(s). virtual TSharedRefArray Serialize() = 0; virtual const NProto::TRequestHeader& Header() const = 0; virtual NProto::TRequestHeader& Header() = 0; + virtual bool IsAttachmentCompressionEnabled() const = 0; + virtual bool IsStreamingEnabled() const = 0; virtual const TStreamingParameters& ClientAttachmentsStreamingParameters() const = 0; @@ -153,6 +157,8 @@ public: NProto::TRequestHeader& Header() override; const NProto::TRequestHeader& Header() const override; + bool IsAttachmentCompressionEnabled() const override; + bool IsStreamingEnabled() const override; const TStreamingParameters& ClientAttachmentsStreamingParameters() const override; @@ -255,6 +261,8 @@ private: void PrepareHeader(); TSharedRefArray GetHeaderlessMessage() const; + + NCompression::ECodec GetEffectiveAttachmentCompressionCodec() const; }; DEFINE_REFCOUNTED_TYPE(TClientRequest) @@ -372,7 +380,7 @@ private: void DoHandleError(TError error); void DoHandleResponse(TSharedRefArray message, const std::string& address); - void Deserialize(TSharedRefArray responseMessage); + TFuture<void> Deserialize(TSharedRefArray responseMessage) noexcept; }; DEFINE_REFCOUNTED_TYPE(TClientResponse) diff --git a/yt/yt/core/rpc/helpers.cpp b/yt/yt/core/rpc/helpers.cpp index ca7b6221e5..e335ca9e1a 100644 --- a/yt/yt/core/rpc/helpers.cpp +++ b/yt/yt/core/rpc/helpers.cpp @@ -599,8 +599,18 @@ std::vector<TSharedRef> CompressAttachments( if (codecId == NCompression::ECodec::None) { return attachments.ToVector(); } - return NConcurrency::WaitFor(AsyncCompressAttachments(attachments, codecId)) - .ValueOrThrow(); + + auto* codec = NCompression::GetCodec(codecId); + std::vector<TSharedRef> result; + result.reserve(std::ssize(attachments)); + std::transform( + attachments.begin(), + attachments.end(), + std::back_inserter(result), + [=] (const TSharedRef& attachment) { + return codec->Compress(attachment); + }); + return result; } std::vector<TSharedRef> DecompressAttachments( @@ -610,8 +620,18 @@ std::vector<TSharedRef> DecompressAttachments( if (codecId == NCompression::ECodec::None) { return attachments.ToVector(); } - return NConcurrency::WaitFor(AsyncDecompressAttachments(attachments, codecId)) - .ValueOrThrow(); + + auto* codec = NCompression::GetCodec(codecId); + std::vector<TSharedRef> result; + result.reserve(std::ssize(attachments)); + std::transform( + attachments.begin(), + attachments.end(), + std::back_inserter(result), + [=] (const TSharedRef& compressedAttachment) { + return codec->Decompress(compressedAttachment); + }); + return result; } //////////////////////////////////////////////////////////////////////////////// diff --git a/yt/yt/core/rpc/service_detail.h b/yt/yt/core/rpc/service_detail.h index 7c43182860..6a5850972c 100644 --- a/yt/yt/core/rpc/service_detail.h +++ b/yt/yt/core/rpc/service_detail.h @@ -328,7 +328,7 @@ protected: struct TSerializedResponse { TSharedRef Body; - std::vector<TSharedRef> Attachments; + TFuture<std::vector<TSharedRef>> AttachmentsFuture; }; TSerializedResponse SerializeResponse() @@ -371,11 +371,11 @@ protected: } } - auto responseAttachments = CompressAttachments(Response_->Attachments(), attachmentCodecId); + auto responseAttachmentsFuture = AsyncCompressAttachments(Response_->Attachments(), attachmentCodecId); return TSerializedResponse{ .Body = std::move(serializedBody), - .Attachments = std::move(responseAttachments), + .AttachmentsFuture = std::move(responseAttachmentsFuture), }; } @@ -392,11 +392,18 @@ protected: return; } - underlyingContext->SetResponseBody(std::move(response.Body)); - underlyingContext->ResponseAttachments() = std::move(response.Attachments); + response.AttachmentsFuture.SubscribeUnique( + BIND([this, this_ = MakeStrong(this), responseBody = std::move(response.Body)] (TErrorOr<std::vector<TSharedRef>>&& compressedAttachments) { + const auto& underlyingContext = this->GetUnderlyingContext(); + if (compressedAttachments.IsOK()) { + underlyingContext->SetResponseBody(std::move(responseBody)); + underlyingContext->ResponseAttachments() = std::move(compressedAttachments.Value()); + } + underlyingContext->Reply(TError(std::move(compressedAttachments))); + })); + } else { + underlyingContext->Reply(error); } - - underlyingContext->Reply(error); } }; diff --git a/yt/yt/core/ytree/ypath_client.cpp b/yt/yt/core/ytree/ypath_client.cpp index 5cb9d8055e..a2996d6028 100644 --- a/yt/yt/core/ytree/ypath_client.cpp +++ b/yt/yt/core/ytree/ypath_client.cpp @@ -151,6 +151,11 @@ NRpc::NProto::TRequestHeader& TYPathRequest::Header() return Header_; } +bool TYPathRequest::IsAttachmentCompressionEnabled() const +{ + return false; +} + bool TYPathRequest::IsStreamingEnabled() const { return false; diff --git a/yt/yt/core/ytree/ypath_client.h b/yt/yt/core/ytree/ypath_client.h index 57f860cf70..7efb19cf3b 100644 --- a/yt/yt/core/ytree/ypath_client.h +++ b/yt/yt/core/ytree/ypath_client.h @@ -58,6 +58,8 @@ public: const NRpc::NProto::TRequestHeader& Header() const override; NRpc::NProto::TRequestHeader& Header() override; + bool IsAttachmentCompressionEnabled() const override; + bool IsStreamingEnabled() const override; const NRpc::TStreamingParameters& ClientAttachmentsStreamingParameters() const override; |