aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorshakurov <shakurov@yandex-team.com>2024-12-02 14:09:52 +0300
committershakurov <shakurov@yandex-team.com>2024-12-02 14:26:39 +0300
commit399c0870221aa4217fe09d54fd6700fd41ae2687 (patch)
tree0ce18f972dde43135ad8c0fce890451bf50e1dee
parentd84c9f41468183a47aae88e345178263836e9faa (diff)
downloadydb-399c0870221aa4217fe09d54fd6700fd41ae2687.tar.gz
Get rid of WaitFor calls in {C,De}ompressAttachments. Rewrite usage points accordingly.
commit_hash:323aececd0cb4c041b3690980e53ef519a8c0fa1
-rw-r--r--yt/yt/core/rpc/bus/channel.cpp2
-rw-r--r--yt/yt/core/rpc/client.cpp61
-rw-r--r--yt/yt/core/rpc/client.h10
-rw-r--r--yt/yt/core/rpc/helpers.cpp28
-rw-r--r--yt/yt/core/rpc/service_detail.h21
-rw-r--r--yt/yt/core/ytree/ypath_client.cpp5
-rw-r--r--yt/yt/core/ytree/ypath_client.h2
7 files changed, 95 insertions, 34 deletions
diff --git a/yt/yt/core/rpc/bus/channel.cpp b/yt/yt/core/rpc/bus/channel.cpp
index 322d1347fc..50ac07d5cb 100644
--- a/yt/yt/core/rpc/bus/channel.cpp
+++ b/yt/yt/core/rpc/bus/channel.cpp
@@ -718,7 +718,7 @@ private:
header.clear_timeout();
}
- if (options.RequestHeavy) {
+ if (options.RequestHeavy || request->IsAttachmentCompressionEnabled()) {
BIND(&IClientRequest::Serialize, request)
.AsyncVia(TDispatcher::Get()->GetHeavyInvoker())
.Run()
diff --git a/yt/yt/core/rpc/client.cpp b/yt/yt/core/rpc/client.cpp
index 35b7ee60f7..0c7ee040ae 100644
--- a/yt/yt/core/rpc/client.cpp
+++ b/yt/yt/core/rpc/client.cpp
@@ -154,6 +154,17 @@ bool TClientRequest::IsStreamingEnabled() const
return StreamingEnabled_;
}
+bool TClientRequest::IsAttachmentCompressionEnabled() const
+{
+ auto attachmentCodecId = GetEffectiveAttachmentCompressionCodec();
+ return attachmentCodecId != NCompression::ECodec::None;
+}
+
+NCompression::ECodec TClientRequest::GetEffectiveAttachmentCompressionCodec() const
+{
+ return EnableLegacyRpcCodecs_ ? NCompression::ECodec::None : RequestCodec_;
+}
+
const TStreamingParameters& TClientRequest::ClientAttachmentsStreamingParameters() const
{
return ClientAttachmentsStreamingParameters_;
@@ -589,7 +600,7 @@ const IInvokerPtr& TClientResponse::GetInvoker()
: TDispatcher::Get()->GetLightInvoker();
}
-void TClientResponse::Deserialize(TSharedRefArray responseMessage)
+TFuture<void> TClientResponse::Deserialize(TSharedRefArray responseMessage) noexcept
{
YT_ASSERT(responseMessage);
YT_ASSERT(!ResponseMessage_);
@@ -597,12 +608,16 @@ void TClientResponse::Deserialize(TSharedRefArray responseMessage)
ResponseMessage_ = std::move(responseMessage);
if (ResponseMessage_.Size() < 2) {
- THROW_ERROR_EXCEPTION(NRpc::EErrorCode::ProtocolError, "Too few response message parts: %v < 2",
- ResponseMessage_.Size());
+ return MakeFuture(TError(
+ NRpc::EErrorCode::ProtocolError,
+ "Too few response message parts: %v < 2",
+ ResponseMessage_.Size()));
}
if (!TryParseResponseHeader(ResponseMessage_, &Header_)) {
- THROW_ERROR_EXCEPTION(NRpc::EErrorCode::ProtocolError, "Error deserializing response header");
+ return MakeFuture(TError(
+ NRpc::EErrorCode::ProtocolError,
+ "Error deserializing response header"));
}
// COMPAT(danilalexeev): legacy RPC codecs
@@ -616,7 +631,9 @@ void TClientResponse::Deserialize(TSharedRefArray responseMessage)
}
if (!TryDeserializeBody(ResponseMessage_[1], bodyCodecId)) {
- THROW_ERROR_EXCEPTION(NRpc::EErrorCode::ProtocolError, "Error deserializing response body");
+ return MakeFuture(TError(
+ NRpc::EErrorCode::ProtocolError,
+ "Error deserializing response body"));
}
auto compressedAttachments = TRange(ResponseMessage_.Begin() + 2, ResponseMessage_.End());
@@ -624,12 +641,16 @@ void TClientResponse::Deserialize(TSharedRefArray responseMessage)
if (attachmentCodecId == NCompression::ECodec::None) {
Attachments_ = compressedAttachments.ToVector();
+ return VoidFuture;
} else {
- Attachments_ = DecompressAttachments(compressedAttachments, attachmentCodecId);
- }
-
- for (auto& attachment : Attachments_) {
- attachment = TrackMemory(memoryUsageTracker, attachment);
+ return AsyncDecompressAttachments(compressedAttachments, attachmentCodecId)
+ .ApplyUnique(BIND([this, this_ = MakeStrong(this)] (std::vector<TSharedRef>&& decompressedAttachments) {
+ Attachments_ = std::move(decompressedAttachments);
+ auto memoryUsageTracker = ClientContext_->GetMemoryUsageTracker();
+ for (auto& attachment : Attachments_) {
+ attachment = TrackMemory(memoryUsageTracker, attachment);
+ }
+ }));
}
}
@@ -657,18 +678,16 @@ void TClientResponse::DoHandleResponse(TSharedRefArray message, const std::strin
Address_ = address;
- try {
- Deserialize(std::move(message));
- Finish({});
- } catch (const std::exception& ex) {
- Finish(ex);
- }
+ Deserialize(std::move(message))
+ .Subscribe(BIND([timer, this, this_ = MakeStrong(this)] (const TError& error) {
+ Finish(error);
- if (!ClientContext_->GetResponseHeavy() && timer.GetElapsedTime() > LightInvokerDurationWarningThreshold) {
- YT_LOG_DEBUG("Handling light response took too long (RequestId: %v, Duration: %v)",
- ClientContext_->GetRequestId(),
- timer.GetElapsedTime());
- }
+ if (!ClientContext_->GetResponseHeavy() && timer.GetElapsedTime() > LightInvokerDurationWarningThreshold) {
+ YT_LOG_DEBUG("Handling light response took too long (RequestId: %v, Duration: %v)",
+ ClientContext_->GetRequestId(),
+ timer.GetElapsedTime());
+ }
+ }));
}
void TClientResponse::HandleStreamingPayload(const TStreamingPayload& payload)
diff --git a/yt/yt/core/rpc/client.h b/yt/yt/core/rpc/client.h
index b87559933a..0dbbe3c67e 100644
--- a/yt/yt/core/rpc/client.h
+++ b/yt/yt/core/rpc/client.h
@@ -39,11 +39,15 @@ namespace NYT::NRpc {
struct IClientRequest
: public virtual TRefCounted
{
+ //! Potentially heavy if IsAttachmentCompressionEnabled() returns true.
+ //! Callers should consider offloading calls to dedicated thread(s).
virtual TSharedRefArray Serialize() = 0;
virtual const NProto::TRequestHeader& Header() const = 0;
virtual NProto::TRequestHeader& Header() = 0;
+ virtual bool IsAttachmentCompressionEnabled() const = 0;
+
virtual bool IsStreamingEnabled() const = 0;
virtual const TStreamingParameters& ClientAttachmentsStreamingParameters() const = 0;
@@ -153,6 +157,8 @@ public:
NProto::TRequestHeader& Header() override;
const NProto::TRequestHeader& Header() const override;
+ bool IsAttachmentCompressionEnabled() const override;
+
bool IsStreamingEnabled() const override;
const TStreamingParameters& ClientAttachmentsStreamingParameters() const override;
@@ -255,6 +261,8 @@ private:
void PrepareHeader();
TSharedRefArray GetHeaderlessMessage() const;
+
+ NCompression::ECodec GetEffectiveAttachmentCompressionCodec() const;
};
DEFINE_REFCOUNTED_TYPE(TClientRequest)
@@ -372,7 +380,7 @@ private:
void DoHandleError(TError error);
void DoHandleResponse(TSharedRefArray message, const std::string& address);
- void Deserialize(TSharedRefArray responseMessage);
+ TFuture<void> Deserialize(TSharedRefArray responseMessage) noexcept;
};
DEFINE_REFCOUNTED_TYPE(TClientResponse)
diff --git a/yt/yt/core/rpc/helpers.cpp b/yt/yt/core/rpc/helpers.cpp
index ca7b6221e5..e335ca9e1a 100644
--- a/yt/yt/core/rpc/helpers.cpp
+++ b/yt/yt/core/rpc/helpers.cpp
@@ -599,8 +599,18 @@ std::vector<TSharedRef> CompressAttachments(
if (codecId == NCompression::ECodec::None) {
return attachments.ToVector();
}
- return NConcurrency::WaitFor(AsyncCompressAttachments(attachments, codecId))
- .ValueOrThrow();
+
+ auto* codec = NCompression::GetCodec(codecId);
+ std::vector<TSharedRef> result;
+ result.reserve(std::ssize(attachments));
+ std::transform(
+ attachments.begin(),
+ attachments.end(),
+ std::back_inserter(result),
+ [=] (const TSharedRef& attachment) {
+ return codec->Compress(attachment);
+ });
+ return result;
}
std::vector<TSharedRef> DecompressAttachments(
@@ -610,8 +620,18 @@ std::vector<TSharedRef> DecompressAttachments(
if (codecId == NCompression::ECodec::None) {
return attachments.ToVector();
}
- return NConcurrency::WaitFor(AsyncDecompressAttachments(attachments, codecId))
- .ValueOrThrow();
+
+ auto* codec = NCompression::GetCodec(codecId);
+ std::vector<TSharedRef> result;
+ result.reserve(std::ssize(attachments));
+ std::transform(
+ attachments.begin(),
+ attachments.end(),
+ std::back_inserter(result),
+ [=] (const TSharedRef& compressedAttachment) {
+ return codec->Decompress(compressedAttachment);
+ });
+ return result;
}
////////////////////////////////////////////////////////////////////////////////
diff --git a/yt/yt/core/rpc/service_detail.h b/yt/yt/core/rpc/service_detail.h
index 7c43182860..6a5850972c 100644
--- a/yt/yt/core/rpc/service_detail.h
+++ b/yt/yt/core/rpc/service_detail.h
@@ -328,7 +328,7 @@ protected:
struct TSerializedResponse
{
TSharedRef Body;
- std::vector<TSharedRef> Attachments;
+ TFuture<std::vector<TSharedRef>> AttachmentsFuture;
};
TSerializedResponse SerializeResponse()
@@ -371,11 +371,11 @@ protected:
}
}
- auto responseAttachments = CompressAttachments(Response_->Attachments(), attachmentCodecId);
+ auto responseAttachmentsFuture = AsyncCompressAttachments(Response_->Attachments(), attachmentCodecId);
return TSerializedResponse{
.Body = std::move(serializedBody),
- .Attachments = std::move(responseAttachments),
+ .AttachmentsFuture = std::move(responseAttachmentsFuture),
};
}
@@ -392,11 +392,18 @@ protected:
return;
}
- underlyingContext->SetResponseBody(std::move(response.Body));
- underlyingContext->ResponseAttachments() = std::move(response.Attachments);
+ response.AttachmentsFuture.SubscribeUnique(
+ BIND([this, this_ = MakeStrong(this), responseBody = std::move(response.Body)] (TErrorOr<std::vector<TSharedRef>>&& compressedAttachments) {
+ const auto& underlyingContext = this->GetUnderlyingContext();
+ if (compressedAttachments.IsOK()) {
+ underlyingContext->SetResponseBody(std::move(responseBody));
+ underlyingContext->ResponseAttachments() = std::move(compressedAttachments.Value());
+ }
+ underlyingContext->Reply(TError(std::move(compressedAttachments)));
+ }));
+ } else {
+ underlyingContext->Reply(error);
}
-
- underlyingContext->Reply(error);
}
};
diff --git a/yt/yt/core/ytree/ypath_client.cpp b/yt/yt/core/ytree/ypath_client.cpp
index 5cb9d8055e..a2996d6028 100644
--- a/yt/yt/core/ytree/ypath_client.cpp
+++ b/yt/yt/core/ytree/ypath_client.cpp
@@ -151,6 +151,11 @@ NRpc::NProto::TRequestHeader& TYPathRequest::Header()
return Header_;
}
+bool TYPathRequest::IsAttachmentCompressionEnabled() const
+{
+ return false;
+}
+
bool TYPathRequest::IsStreamingEnabled() const
{
return false;
diff --git a/yt/yt/core/ytree/ypath_client.h b/yt/yt/core/ytree/ypath_client.h
index 57f860cf70..7efb19cf3b 100644
--- a/yt/yt/core/ytree/ypath_client.h
+++ b/yt/yt/core/ytree/ypath_client.h
@@ -58,6 +58,8 @@ public:
const NRpc::NProto::TRequestHeader& Header() const override;
NRpc::NProto::TRequestHeader& Header() override;
+ bool IsAttachmentCompressionEnabled() const override;
+
bool IsStreamingEnabled() const override;
const NRpc::TStreamingParameters& ClientAttachmentsStreamingParameters() const override;