diff options
| -rw-r--r-- | yt/yt/core/rpc/http/channel.cpp | 95 |
1 files changed, 59 insertions, 36 deletions
diff --git a/yt/yt/core/rpc/http/channel.cpp b/yt/yt/core/rpc/http/channel.cpp index 97fea41b233..1f2d6c1bff5 100644 --- a/yt/yt/core/rpc/http/channel.cpp +++ b/yt/yt/core/rpc/http/channel.cpp @@ -2,10 +2,13 @@ #include "config.h" #include "helpers.h" +#include <yt/yt/core/rpc/dispatcher.h> + #include <yt/yt/core/http/client.h> #include <yt/yt/core/http/http.h> #include <yt/yt/core/http/helpers.h> #include <yt/yt/core/http/private.h> + #include <yt/yt/core/https/config.h> #include <yt/yt/core/https/client.h> @@ -166,19 +169,18 @@ private: { public: TCallHandler( - THttpChannel* parentChannel, - IClientPtr client, - IClientRequestPtr request, + THttpChannel* channel, + const IClientPtr& client, + const IClientRequestPtr& request, IClientResponseHandlerPtr responseHandler) - : Client_(client) { - TSharedRef httpRequestBody; auto httpRequestHeaders = TranslateRequest(request); - auto protocol = parentChannel->IsHttps_ ? "https" : "http"; + auto protocol = channel->IsHttps_ ? "https" : "http"; // See TServer::DoRegisterService(). - auto url = Format("%v://%v/%v/%v", protocol, parentChannel->EndpointAddress_, request->GetService(), request->GetMethod()); + auto url = Format("%v://%v/%v/%v", protocol, channel->EndpointAddress_, request->GetService(), request->GetMethod()); + TSharedRef httpRequestBody; try { auto requestBody = request->Serialize(); THROW_ERROR_EXCEPTION_UNLESS(requestBody.Size() == 2, "Attachments are not supported in HTTP"); @@ -189,33 +191,10 @@ private: return; } - Response_ = Client_->Post(url, httpRequestBody, httpRequestHeaders); - - Response_.Subscribe(BIND([address = parentChannel->EndpointAddress_, requestId = request->GetRequestId(), responseHandler = std::move(responseHandler)] ( - const TErrorOr<IResponsePtr>& result) - { - try { - if (!result.IsOK()) { - responseHandler->HandleError(TError(NRpc::EErrorCode::TransportError, "HTTP client request failed") << result); - } else if (result.Value()->GetStatusCode() == EStatusCode::NotFound) { - responseHandler->HandleError(TError(NRpc::EErrorCode::NoSuchService, "URL was not resolved to a service")); - } else if (result.Value()->GetStatusCode() == EStatusCode::BadRequest) { - responseHandler->HandleError(ParseYTError(result.Value())); - } else if (result.Value()->GetStatusCode() != EStatusCode::OK) { - responseHandler->HandleError(TError(NRpc::EErrorCode::TransportError, "Unexpected HTTP status code") - << TErrorAttribute("status", result.Value()->GetStatusCode())); - } else { - NRpc::NProto::TResponseHeader responseHeader; - ToProto(responseHeader.mutable_request_id(), requestId); - auto responseMessage = CreateResponseMessage( - responseHeader, - PushEnvelope(result.Value()->ReadAll(), NCompression::ECodec::None), - {}); - responseHandler->HandleResponse(responseMessage, address); - } - } catch (const NConcurrency::TFiberCanceledException&) { - } - })); + Response_ = client->Post(url, httpRequestBody, httpRequestHeaders); + Response_.Subscribe( + BIND(&TCallHandler::OnResponse, channel->EndpointAddress_, request->GetRequestId(), std::move(responseHandler)) + .Via(NRpc::TDispatcher::Get()->GetHeavyInvoker())); } // IClientRequestControl overrides @@ -235,10 +214,54 @@ private: } private: - const IClientPtr Client_; - TFuture<IResponsePtr> Response_; + static void OnResponse( + const TString& address, + TRequestId requestId, + const IClientResponseHandlerPtr& responseHandler, + const TErrorOr<IResponsePtr>& responseOrError) + { + try { + if (!responseOrError.IsOK()) { + responseHandler->HandleError(TError(NRpc::EErrorCode::TransportError, "HTTP client request failed") + << responseOrError); + return; + } + + const auto& response = responseOrError.Value(); + if (response->GetStatusCode() == EStatusCode::NotFound) { + responseHandler->HandleError(TError(NRpc::EErrorCode::NoSuchService, "URL was not resolved to a service")); + return; + } + + if (response->GetStatusCode() == EStatusCode::BadRequest) { + responseHandler->HandleError(ParseYTError(response)); + return; + } + + if (response->GetStatusCode() != EStatusCode::OK) { + responseHandler->HandleError(TError(NRpc::EErrorCode::TransportError, "Unexpected HTTP status code") + << TErrorAttribute("status", response->GetStatusCode())); + return; + } + + auto responseBody = PushEnvelope(response->ReadAll(), NCompression::ECodec::None); + + NRpc::NProto::TResponseHeader responseHeader; + ToProto(responseHeader.mutable_request_id(), requestId); + + auto responseMessage = CreateResponseMessage( + responseHeader, + responseBody, + /*attachments*/ {}); + responseHandler->HandleResponse(responseMessage, address); + } catch (const std::exception& ex) { + responseHandler->HandleError(TError(NRpc::EErrorCode::TransportError, "Response deserialization failed") + << ex); + } + } + // This function does the backwards transformation of NRpc::NHttp::THttpHandler::TranslateRequest(). THeadersPtr TranslateRequest(const IClientRequestPtr& request) { |
