diff options
author | Gleb Vishnevsky <vishnevskiygl@yandex-team.ru> | 2023-08-02 15:15:47 +0000 |
---|---|---|
committer | ignat <ignat@yandex-team.com> | 2023-08-02 18:15:47 +0300 |
commit | b38f6372c7c647fbb80ca490ffb22ccf0d382f38 (patch) | |
tree | 81a55ca4ee060c52a72da9eef3215cea95f2b6d2 | |
parent | fcc71300fab6bc2d18bea6e57af226daa644eb1e (diff) | |
download | ydb-b38f6372c7c647fbb80ca490ffb22ccf0d382f38.tar.gz |
Add oauth authenticator
Pull Request resolved: #51
-rw-r--r-- | yt/yt/core/http/CMakeLists.darwin-x86_64.txt | 1 | ||||
-rw-r--r-- | yt/yt/core/http/CMakeLists.linux-aarch64.txt | 1 | ||||
-rw-r--r-- | yt/yt/core/http/CMakeLists.linux-x86_64.txt | 1 | ||||
-rw-r--r-- | yt/yt/core/http/CMakeLists.windows-x86_64.txt | 1 | ||||
-rw-r--r-- | yt/yt/core/http/client.cpp | 18 | ||||
-rw-r--r-- | yt/yt/core/http/config.cpp | 14 | ||||
-rw-r--r-- | yt/yt/core/http/config.h | 18 | ||||
-rw-r--r-- | yt/yt/core/http/helpers.cpp | 11 | ||||
-rw-r--r-- | yt/yt/core/http/helpers.h | 2 | ||||
-rw-r--r-- | yt/yt/core/http/public.h | 3 | ||||
-rw-r--r-- | yt/yt/core/http/retriable_client.cpp | 235 | ||||
-rw-r--r-- | yt/yt/core/http/retriable_client.h | 75 | ||||
-rw-r--r-- | yt/yt/core/http/ya.make | 1 |
13 files changed, 368 insertions, 13 deletions
diff --git a/yt/yt/core/http/CMakeLists.darwin-x86_64.txt b/yt/yt/core/http/CMakeLists.darwin-x86_64.txt index 8b5195db728..c7a5c2a2b28 100644 --- a/yt/yt/core/http/CMakeLists.darwin-x86_64.txt +++ b/yt/yt/core/http/CMakeLists.darwin-x86_64.txt @@ -23,6 +23,7 @@ target_sources(yt-core-http PRIVATE ${CMAKE_SOURCE_DIR}/yt/yt/core/http/connection_pool.cpp ${CMAKE_SOURCE_DIR}/yt/yt/core/http/connection_reuse_helpers.cpp ${CMAKE_SOURCE_DIR}/yt/yt/core/http/http.cpp + ${CMAKE_SOURCE_DIR}/yt/yt/core/http/retriable_client.cpp ${CMAKE_SOURCE_DIR}/yt/yt/core/http/server.cpp ${CMAKE_SOURCE_DIR}/yt/yt/core/http/stream.cpp ${CMAKE_SOURCE_DIR}/yt/yt/core/http/helpers.cpp diff --git a/yt/yt/core/http/CMakeLists.linux-aarch64.txt b/yt/yt/core/http/CMakeLists.linux-aarch64.txt index 96274c67f37..61bc495008b 100644 --- a/yt/yt/core/http/CMakeLists.linux-aarch64.txt +++ b/yt/yt/core/http/CMakeLists.linux-aarch64.txt @@ -24,6 +24,7 @@ target_sources(yt-core-http PRIVATE ${CMAKE_SOURCE_DIR}/yt/yt/core/http/connection_pool.cpp ${CMAKE_SOURCE_DIR}/yt/yt/core/http/connection_reuse_helpers.cpp ${CMAKE_SOURCE_DIR}/yt/yt/core/http/http.cpp + ${CMAKE_SOURCE_DIR}/yt/yt/core/http/retriable_client.cpp ${CMAKE_SOURCE_DIR}/yt/yt/core/http/server.cpp ${CMAKE_SOURCE_DIR}/yt/yt/core/http/stream.cpp ${CMAKE_SOURCE_DIR}/yt/yt/core/http/helpers.cpp diff --git a/yt/yt/core/http/CMakeLists.linux-x86_64.txt b/yt/yt/core/http/CMakeLists.linux-x86_64.txt index 96274c67f37..61bc495008b 100644 --- a/yt/yt/core/http/CMakeLists.linux-x86_64.txt +++ b/yt/yt/core/http/CMakeLists.linux-x86_64.txt @@ -24,6 +24,7 @@ target_sources(yt-core-http PRIVATE ${CMAKE_SOURCE_DIR}/yt/yt/core/http/connection_pool.cpp ${CMAKE_SOURCE_DIR}/yt/yt/core/http/connection_reuse_helpers.cpp ${CMAKE_SOURCE_DIR}/yt/yt/core/http/http.cpp + ${CMAKE_SOURCE_DIR}/yt/yt/core/http/retriable_client.cpp ${CMAKE_SOURCE_DIR}/yt/yt/core/http/server.cpp ${CMAKE_SOURCE_DIR}/yt/yt/core/http/stream.cpp ${CMAKE_SOURCE_DIR}/yt/yt/core/http/helpers.cpp diff --git a/yt/yt/core/http/CMakeLists.windows-x86_64.txt b/yt/yt/core/http/CMakeLists.windows-x86_64.txt index 47398129ec0..79a0874891a 100644 --- a/yt/yt/core/http/CMakeLists.windows-x86_64.txt +++ b/yt/yt/core/http/CMakeLists.windows-x86_64.txt @@ -20,6 +20,7 @@ target_sources(yt-core-http PRIVATE ${CMAKE_SOURCE_DIR}/yt/yt/core/http/connection_pool.cpp ${CMAKE_SOURCE_DIR}/yt/yt/core/http/connection_reuse_helpers.cpp ${CMAKE_SOURCE_DIR}/yt/yt/core/http/http.cpp + ${CMAKE_SOURCE_DIR}/yt/yt/core/http/retriable_client.cpp ${CMAKE_SOURCE_DIR}/yt/yt/core/http/server.cpp ${CMAKE_SOURCE_DIR}/yt/yt/core/http/stream.cpp ${CMAKE_SOURCE_DIR}/yt/yt/core/http/helpers.cpp diff --git a/yt/yt/core/http/client.cpp b/yt/yt/core/http/client.cpp index 40237c0fcb4..67091afd792 100644 --- a/yt/yt/core/http/client.cpp +++ b/yt/yt/core/http/client.cpp @@ -5,6 +5,7 @@ #include "config.h" #include "stream.h" #include "private.h" +#include "helpers.h" #include <yt/yt/core/net/dialer.h> #include <yt/yt/core/net/config.h> @@ -167,21 +168,10 @@ private: } } - TString SanitizeUrl(const TString& url) - { - // Do not expose URL parameters in error attributes. - auto urlRef = ParseUrl(url); - if (urlRef.PortStr.empty()) { - return TString(urlRef.Host) + urlRef.Path; - } else { - return Format("%v:%v%v", urlRef.Host, urlRef.PortStr, urlRef.Path); - } - } - template <typename T> TFuture<T> WrapError(const TString& url, TCallback<T()> action) { - return BIND([=, this, this_ = MakeStrong(this)] { + return BIND([=, this_ = MakeStrong(this)] { try { return action(); } catch (const std::exception& ex) { @@ -257,7 +247,9 @@ private: request->SetHeaders(headers); } - auto requestPath = Format("%v?%v", urlRef.Path, urlRef.RawQuery); + auto requestPath = urlRef.RawQuery.empty() + ? TString(urlRef.Path) + : Format("%v?%v", urlRef.Path, urlRef.RawQuery); request->WriteRequest(method, requestPath); return {std::move(request), std::move(response)}; diff --git a/yt/yt/core/http/config.cpp b/yt/yt/core/http/config.cpp index 91b6acce910..c9423a37bc9 100644 --- a/yt/yt/core/http/config.cpp +++ b/yt/yt/core/http/config.cpp @@ -66,6 +66,20 @@ void TClientConfig::Register(TRegistrar registrar) //////////////////////////////////////////////////////////////////////////////// +void TRetrialbeClientConfig::Register(TRegistrar registrar) +{ + registrar.Parameter("request_timeout", &TThis::RequestTimeout) + .Default(TDuration::Seconds(15)); + registrar.Parameter("attempt_timeout", &TThis::AttemptTimeout) + .Default(TDuration::Seconds(10)); + registrar.Parameter("backoff_timeout", &TThis::BackoffTimeout) + .Default(TDuration::Seconds(1)); + registrar.Parameter("max_attempt_count", &TThis::MaxAttemptCount) + .Default(3); +} + +//////////////////////////////////////////////////////////////////////////////// + void TCorsConfig::Register(TRegistrar registrar) { registrar.Parameter("disable_cors_check", &TThis::DisableCorsCheck) diff --git a/yt/yt/core/http/config.h b/yt/yt/core/http/config.h index a18a480ff89..7468a03462b 100644 --- a/yt/yt/core/http/config.h +++ b/yt/yt/core/http/config.h @@ -86,6 +86,24 @@ DEFINE_REFCOUNTED_TYPE(TClientConfig) //////////////////////////////////////////////////////////////////////////////// +class TRetrialbeClientConfig + : public NYTree::TYsonStruct +{ +public: + TDuration RequestTimeout; + TDuration AttemptTimeout; + TDuration BackoffTimeout; + int MaxAttemptCount; + + REGISTER_YSON_STRUCT(TRetrialbeClientConfig); + + static void Register(TRegistrar registrar); +}; + +DEFINE_REFCOUNTED_TYPE(TRetrialbeClientConfig); + +//////////////////////////////////////////////////////////////////////////////// + class TCorsConfig : public NYTree::TYsonStruct { diff --git a/yt/yt/core/http/helpers.cpp b/yt/yt/core/http/helpers.cpp index 71f96dc28ef..fd96bd5c3f8 100644 --- a/yt/yt/core/http/helpers.cpp +++ b/yt/yt/core/http/helpers.cpp @@ -461,6 +461,17 @@ void SetBytesRange(const THeadersPtr& headers, std::pair<i64, i64> range) headers->Set(ContentRangeHeaderName, Format("bytes %v-%v/*", range.first, range.second)); } +TString SanitizeUrl(const TString& url) +{ + // Do not expose URL parameters in error attributes. + auto urlRef = ParseUrl(url); + if (urlRef.PortStr.empty()) { + return TString(urlRef.Host) + urlRef.Path; + } else { + return Format("%v:%v%v", urlRef.Host, urlRef.PortStr, urlRef.Path); + } +} + //////////////////////////////////////////////////////////////////////////////// } // namespace NYT::NHttp diff --git a/yt/yt/core/http/helpers.h b/yt/yt/core/http/helpers.h index 09216c651e7..6507640d535 100644 --- a/yt/yt/core/http/helpers.h +++ b/yt/yt/core/http/helpers.h @@ -56,6 +56,8 @@ NTracing::TTraceContextPtr GetOrCreateTraceContext(const IRequestPtr& req); std::optional<std::pair<i64, i64>> FindBytesRange(const THeadersPtr& headers); void SetBytesRange(const THeadersPtr& headers, std::pair<i64, i64> range); +TString SanitizeUrl(const TString& url); + //////////////////////////////////////////////////////////////////////////////// } // namespace NYT::NHttp diff --git a/yt/yt/core/http/public.h b/yt/yt/core/http/public.h index 93148a0d920..694da6f5f30 100644 --- a/yt/yt/core/http/public.h +++ b/yt/yt/core/http/public.h @@ -15,12 +15,15 @@ DECLARE_REFCOUNTED_CLASS(IActiveRequest) DECLARE_REFCOUNTED_STRUCT(IServer) DECLARE_REFCOUNTED_STRUCT(IClient) +DECLARE_REFCOUNTED_STRUCT(IRetriableClient) +DECLARE_REFCOUNTED_STRUCT(IResponseChecker) DECLARE_REFCOUNTED_STRUCT(IHttpHandler) DECLARE_REFCOUNTED_CLASS(THttpIOConfig) DECLARE_REFCOUNTED_CLASS(TServerConfig) DECLARE_REFCOUNTED_CLASS(TClientConfig) +DECLARE_REFCOUNTED_CLASS(TRetrialbeClientConfig) DECLARE_REFCOUNTED_CLASS(TCorsConfig) DECLARE_REFCOUNTED_CLASS(TConnectionPool) DECLARE_REFCOUNTED_CLASS(IRequestPathMatcher) diff --git a/yt/yt/core/http/retriable_client.cpp b/yt/yt/core/http/retriable_client.cpp new file mode 100644 index 00000000000..400af9931bf --- /dev/null +++ b/yt/yt/core/http/retriable_client.cpp @@ -0,0 +1,235 @@ +#include "config.h" + +#include "retriable_client.h" +#include "private.h" + +#include <yt/yt/core/http/client.h> +#include <yt/yt/core/http/helpers.h> +#include <yt/yt/core/http/public.h> +#include <yt/yt/core/json/json_parser.h> + +namespace NYT::NHttp { + +using namespace NNet; +using namespace NYTree; +using namespace NConcurrency; + +static const auto& Logger = HttpLogger; + +//////////////////////////////////////////////////////////////////////////////// + +class TJsonResponseChecker + : public IResponseChecker +{ +public: + TJsonResponseChecker( + TJsonErrorChecker errorChecker, + NJson::TJsonFormatConfigPtr jsonFormatConfig) + : JsonFormatConfig_(std::move(jsonFormatConfig)) + , ErrorChecker_(std::move(errorChecker)) + { } + + TError CheckError(const IResponsePtr& response) override + { + try { + auto body = response->ReadAll(); + TMemoryInput stream(body.Begin(), body.Size()); + auto factory = NYTree::CreateEphemeralNodeFactory(); + auto builder = NYTree::CreateBuilderFromFactory(factory.get()); + NJson::ParseJson(&stream, builder.get(), JsonFormatConfig_); + Json_ = builder->EndTree(); + } catch (const std::exception& ex) { + return TError("Error parsing response") + << ex; + } + + if (!Json_) { + return TError("Got empty result"); + } + + try { + auto result = ErrorChecker_(response, Json_); + return result; + } catch (const std::exception& err) { + return err; + } + } + + NYTree::INodePtr GetFormattedResponse() const override + { + return Json_; + } + + +private: + const NJson::TJsonFormatConfigPtr JsonFormatConfig_; + INodePtr Json_; + TJsonErrorChecker ErrorChecker_; + TError Error_; +}; + +//////////////////////////////////////////////////////////////////////////////// + +IResponseCheckerPtr CreateJsonResponseChecker( + TJsonErrorChecker errorChecker, + const NJson::TJsonFormatConfigPtr& jsonFormatConfig) +{ + return New<TJsonResponseChecker>(std::move(errorChecker), jsonFormatConfig); +} + +//////////////////////////////////////////////////////////////////////////////// + +class TRetrialbeClient + : public IRetriableClient +{ +public: + TRetrialbeClient( + TRetrialbeClientConfigPtr config, + IClientPtr client, + IInvokerPtr invoker) + : Config_(std::move(config)) + , Invoker_(std::move(invoker)) + , UnderlyingClient_(std::move(client)) + { } + + TFuture<IResponsePtr> Get( + const IResponseCheckerPtr& responseChecker, + const TString& url, + const THeadersPtr& headers) override + { + return MakeRequest(&IClient::Get, responseChecker, url, headers); + } + + TFuture<IResponsePtr> Post( + const IResponseCheckerPtr& responseChecker, + const TString& url, + const TSharedRef& body, + const THeadersPtr& headers) override + { + return MakeRequest(&IClient::Post, responseChecker, url, body, headers); + } + + TFuture<IResponsePtr> Patch( + const IResponseCheckerPtr& responseChecker, + const TString& url, + const TSharedRef& body, + const THeadersPtr& headers) override + { + return MakeRequest(&IClient::Patch, responseChecker, url, body, headers); + } + + TFuture<IResponsePtr> Put( + const IResponseCheckerPtr& responseChecker, + const TString& url, + const TSharedRef& body, + const THeadersPtr& headers) override + { + return MakeRequest(&IClient::Put, responseChecker, url, body, headers); + } + + TFuture<IResponsePtr> Delete( + const IResponseCheckerPtr& responseChecker, + const TString& url, + const THeadersPtr& headers) override + { + return MakeRequest(&IClient::Delete, responseChecker, url, headers); + } + +private: + const TRetrialbeClientConfigPtr Config_; + const IInvokerPtr Invoker_; + const IClientPtr UnderlyingClient_; + +private: + template <typename TCallable, typename... Args> + TFuture<IResponsePtr> MakeRequest( + TCallable&& func, + const IResponseCheckerPtr& responseChecker, + const TString& url, + Args&&... args) + { + return BIND([=, this, this_ = MakeStrong(this), func = std::move(func), ...args = std::move(args)] () { + return DoMakeRequest(std::move(func), responseChecker, url, std::forward<Args>(args)...); + }).AsyncVia(Invoker_).Run(); + } + + template <typename TCallable, typename... Args> + IResponsePtr DoMakeRequest( + TCallable&& func, + const IResponseCheckerPtr& responseChecker, + const TString& url, + Args&&... args) + { + const auto deadline = TInstant::Now() + Config_->RequestTimeout; + const auto sanitizedUrl = SanitizeUrl(url); + + YT_LOG_DEBUG("Making request (Url: %v, Deadline: %v, MaxAttemptCount: %v)", + sanitizedUrl, + deadline, + Config_->MaxAttemptCount); + std::vector<TError> accumulatedErrors; + + int attempt = 0; + while (attempt == 0 || (TInstant::Now() < deadline && attempt < Config_->MaxAttemptCount)) { + ++attempt; + auto future = BIND(func, UnderlyingClient_, url, std::forward<Args>(args)...)(); + auto rspOrError = WaitFor(future.WithTimeout(Config_->AttemptTimeout)); + if (!rspOrError.IsOK()) { + auto error = TError("Request attempt %v failed", attempt) + << rspOrError + << TErrorAttribute("attempt", attempt); + + YT_LOG_WARNING( + error, + "Request attempt failed (Url: %v, Attempt: %v)", + sanitizedUrl, + attempt); + accumulatedErrors.push_back(std::move(error)); + continue; + } + + auto& rsp = rspOrError.Value(); + const auto checkError = responseChecker->CheckError(rsp); + if (checkError.IsOK()) { + return rsp; + } + + auto error = TError("Error checking response") + << checkError + << TErrorAttribute("attempt", attempt); + YT_LOG_WARNING( + error, + "Request attempt failed while checking response (Url: %v, Attempt: %v)", + sanitizedUrl, + attempt); + accumulatedErrors.push_back(std::move(error)); + + auto now = TInstant::Now(); + if (now > deadline) { + break; + } + TDelayedExecutor::WaitForDuration(std::min(Config_->BackoffTimeout, deadline - now)); + } + + THROW_ERROR_EXCEPTION("HTTP request failed") + << std::move(accumulatedErrors) + << TErrorAttribute("url", sanitizedUrl) + << TErrorAttribute("attempt_count", attempt) + << TErrorAttribute("max_attempt_count", Config_->MaxAttemptCount); + } + +}; + +//////////////////////////////////////////////////////////////////////////////// + +IRetriableClientPtr CreateRetriableClient( + TRetrialbeClientConfigPtr config, + IClientPtr client, + IInvokerPtr invoker) +{ + return New<TRetrialbeClient>(std::move(config), std::move(client), std::move(invoker)); +} + +//////////////////////////////////////////////////////////////////////////////// + +} diff --git a/yt/yt/core/http/retriable_client.h b/yt/yt/core/http/retriable_client.h new file mode 100644 index 00000000000..9103389cd65 --- /dev/null +++ b/yt/yt/core/http/retriable_client.h @@ -0,0 +1,75 @@ +#pragma once + +#include <yt/yt/core/http/public.h> + +#include <yt/yt/core/json/public.h> + +#include <yt/yt/core/ytree/yson_struct.h> + +namespace NYT::NHttp { + +//////////////////////////////////////////////////////////////////////////////// + +struct IResponseChecker + : public virtual TRefCounted +{ + virtual TError CheckError(const IResponsePtr& response) = 0; + virtual NYTree::INodePtr GetFormattedResponse() const = 0; +}; + +DEFINE_REFCOUNTED_TYPE(IResponseChecker) + +//////////////////////////////////////////////////////////////////////////////// + +using TJsonErrorChecker = TCallback<TError(const IResponsePtr&, const NYTree::INodePtr&)>; + +IResponseCheckerPtr CreateJsonResponseChecker( + TJsonErrorChecker errorChecker, + const NJson::TJsonFormatConfigPtr& jsonFormatConfig); + +//////////////////////////////////////////////////////////////////////////////// + +struct IRetriableClient + : public virtual TRefCounted +{ + virtual TFuture<IResponsePtr> Get( + const IResponseCheckerPtr& responseChecker, + const TString& url, + const THeadersPtr& headers = nullptr) = 0; + + virtual TFuture<IResponsePtr> Post( + const IResponseCheckerPtr& responseChecker, + const TString& url, + const TSharedRef& body, + const THeadersPtr& headers = nullptr) = 0; + + virtual TFuture<IResponsePtr> Patch( + const IResponseCheckerPtr& responseChecker, + const TString& url, + const TSharedRef& body, + const THeadersPtr& headers = nullptr) = 0; + + virtual TFuture<IResponsePtr> Put( + const IResponseCheckerPtr& responseChecker, + const TString& url, + const TSharedRef& body, + const THeadersPtr& headers = nullptr) = 0; + + virtual TFuture<IResponsePtr> Delete( + const IResponseCheckerPtr& responseChecker, + const TString& url, + const THeadersPtr& headers = nullptr) = 0; +}; + +DEFINE_REFCOUNTED_TYPE(IRetriableClient) + +//////////////////////////////////////////////////////////////////////////////// + +IRetriableClientPtr CreateRetriableClient( + TRetrialbeClientConfigPtr config, + IClientPtr client, + IInvokerPtr invoker); + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT::NHttp diff --git a/yt/yt/core/http/ya.make b/yt/yt/core/http/ya.make index c3cf704d566..fe346db002f 100644 --- a/yt/yt/core/http/ya.make +++ b/yt/yt/core/http/ya.make @@ -8,6 +8,7 @@ SRCS( connection_pool.cpp connection_reuse_helpers.cpp http.cpp + retriable_client.cpp server.cpp stream.cpp helpers.cpp |