summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--yt/yt/core/rpc/http/channel.cpp95
1 files changed, 59 insertions, 36 deletions
diff --git a/yt/yt/core/rpc/http/channel.cpp b/yt/yt/core/rpc/http/channel.cpp
index 97fea41b233..1f2d6c1bff5 100644
--- a/yt/yt/core/rpc/http/channel.cpp
+++ b/yt/yt/core/rpc/http/channel.cpp
@@ -2,10 +2,13 @@
#include "config.h"
#include "helpers.h"
+#include <yt/yt/core/rpc/dispatcher.h>
+
#include <yt/yt/core/http/client.h>
#include <yt/yt/core/http/http.h>
#include <yt/yt/core/http/helpers.h>
#include <yt/yt/core/http/private.h>
+
#include <yt/yt/core/https/config.h>
#include <yt/yt/core/https/client.h>
@@ -166,19 +169,18 @@ private:
{
public:
TCallHandler(
- THttpChannel* parentChannel,
- IClientPtr client,
- IClientRequestPtr request,
+ THttpChannel* channel,
+ const IClientPtr& client,
+ const IClientRequestPtr& request,
IClientResponseHandlerPtr responseHandler)
- : Client_(client)
{
- TSharedRef httpRequestBody;
auto httpRequestHeaders = TranslateRequest(request);
- auto protocol = parentChannel->IsHttps_ ? "https" : "http";
+ auto protocol = channel->IsHttps_ ? "https" : "http";
// See TServer::DoRegisterService().
- auto url = Format("%v://%v/%v/%v", protocol, parentChannel->EndpointAddress_, request->GetService(), request->GetMethod());
+ auto url = Format("%v://%v/%v/%v", protocol, channel->EndpointAddress_, request->GetService(), request->GetMethod());
+ TSharedRef httpRequestBody;
try {
auto requestBody = request->Serialize();
THROW_ERROR_EXCEPTION_UNLESS(requestBody.Size() == 2, "Attachments are not supported in HTTP");
@@ -189,33 +191,10 @@ private:
return;
}
- Response_ = Client_->Post(url, httpRequestBody, httpRequestHeaders);
-
- Response_.Subscribe(BIND([address = parentChannel->EndpointAddress_, requestId = request->GetRequestId(), responseHandler = std::move(responseHandler)] (
- const TErrorOr<IResponsePtr>& result)
- {
- try {
- if (!result.IsOK()) {
- responseHandler->HandleError(TError(NRpc::EErrorCode::TransportError, "HTTP client request failed") << result);
- } else if (result.Value()->GetStatusCode() == EStatusCode::NotFound) {
- responseHandler->HandleError(TError(NRpc::EErrorCode::NoSuchService, "URL was not resolved to a service"));
- } else if (result.Value()->GetStatusCode() == EStatusCode::BadRequest) {
- responseHandler->HandleError(ParseYTError(result.Value()));
- } else if (result.Value()->GetStatusCode() != EStatusCode::OK) {
- responseHandler->HandleError(TError(NRpc::EErrorCode::TransportError, "Unexpected HTTP status code")
- << TErrorAttribute("status", result.Value()->GetStatusCode()));
- } else {
- NRpc::NProto::TResponseHeader responseHeader;
- ToProto(responseHeader.mutable_request_id(), requestId);
- auto responseMessage = CreateResponseMessage(
- responseHeader,
- PushEnvelope(result.Value()->ReadAll(), NCompression::ECodec::None),
- {});
- responseHandler->HandleResponse(responseMessage, address);
- }
- } catch (const NConcurrency::TFiberCanceledException&) {
- }
- }));
+ Response_ = client->Post(url, httpRequestBody, httpRequestHeaders);
+ Response_.Subscribe(
+ BIND(&TCallHandler::OnResponse, channel->EndpointAddress_, request->GetRequestId(), std::move(responseHandler))
+ .Via(NRpc::TDispatcher::Get()->GetHeavyInvoker()));
}
// IClientRequestControl overrides
@@ -235,10 +214,54 @@ private:
}
private:
- const IClientPtr Client_;
-
TFuture<IResponsePtr> Response_;
+ static void OnResponse(
+ const TString& address,
+ TRequestId requestId,
+ const IClientResponseHandlerPtr& responseHandler,
+ const TErrorOr<IResponsePtr>& responseOrError)
+ {
+ try {
+ if (!responseOrError.IsOK()) {
+ responseHandler->HandleError(TError(NRpc::EErrorCode::TransportError, "HTTP client request failed")
+ << responseOrError);
+ return;
+ }
+
+ const auto& response = responseOrError.Value();
+ if (response->GetStatusCode() == EStatusCode::NotFound) {
+ responseHandler->HandleError(TError(NRpc::EErrorCode::NoSuchService, "URL was not resolved to a service"));
+ return;
+ }
+
+ if (response->GetStatusCode() == EStatusCode::BadRequest) {
+ responseHandler->HandleError(ParseYTError(response));
+ return;
+ }
+
+ if (response->GetStatusCode() != EStatusCode::OK) {
+ responseHandler->HandleError(TError(NRpc::EErrorCode::TransportError, "Unexpected HTTP status code")
+ << TErrorAttribute("status", response->GetStatusCode()));
+ return;
+ }
+
+ auto responseBody = PushEnvelope(response->ReadAll(), NCompression::ECodec::None);
+
+ NRpc::NProto::TResponseHeader responseHeader;
+ ToProto(responseHeader.mutable_request_id(), requestId);
+
+ auto responseMessage = CreateResponseMessage(
+ responseHeader,
+ responseBody,
+ /*attachments*/ {});
+ responseHandler->HandleResponse(responseMessage, address);
+ } catch (const std::exception& ex) {
+ responseHandler->HandleError(TError(NRpc::EErrorCode::TransportError, "Response deserialization failed")
+ << ex);
+ }
+ }
+
// This function does the backwards transformation of NRpc::NHttp::THttpHandler::TranslateRequest().
THeadersPtr TranslateRequest(const IClientRequestPtr& request)
{