aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorGleb Vishnevsky <vishnevskiygl@yandex-team.ru>2023-08-02 15:15:47 +0000
committerignat <ignat@yandex-team.com>2023-08-02 18:15:47 +0300
commitb38f6372c7c647fbb80ca490ffb22ccf0d382f38 (patch)
tree81a55ca4ee060c52a72da9eef3215cea95f2b6d2
parentfcc71300fab6bc2d18bea6e57af226daa644eb1e (diff)
downloadydb-b38f6372c7c647fbb80ca490ffb22ccf0d382f38.tar.gz
Add oauth authenticator
Pull Request resolved: #51
-rw-r--r--yt/yt/core/http/CMakeLists.darwin-x86_64.txt1
-rw-r--r--yt/yt/core/http/CMakeLists.linux-aarch64.txt1
-rw-r--r--yt/yt/core/http/CMakeLists.linux-x86_64.txt1
-rw-r--r--yt/yt/core/http/CMakeLists.windows-x86_64.txt1
-rw-r--r--yt/yt/core/http/client.cpp18
-rw-r--r--yt/yt/core/http/config.cpp14
-rw-r--r--yt/yt/core/http/config.h18
-rw-r--r--yt/yt/core/http/helpers.cpp11
-rw-r--r--yt/yt/core/http/helpers.h2
-rw-r--r--yt/yt/core/http/public.h3
-rw-r--r--yt/yt/core/http/retriable_client.cpp235
-rw-r--r--yt/yt/core/http/retriable_client.h75
-rw-r--r--yt/yt/core/http/ya.make1
13 files changed, 368 insertions, 13 deletions
diff --git a/yt/yt/core/http/CMakeLists.darwin-x86_64.txt b/yt/yt/core/http/CMakeLists.darwin-x86_64.txt
index 8b5195db728..c7a5c2a2b28 100644
--- a/yt/yt/core/http/CMakeLists.darwin-x86_64.txt
+++ b/yt/yt/core/http/CMakeLists.darwin-x86_64.txt
@@ -23,6 +23,7 @@ target_sources(yt-core-http PRIVATE
${CMAKE_SOURCE_DIR}/yt/yt/core/http/connection_pool.cpp
${CMAKE_SOURCE_DIR}/yt/yt/core/http/connection_reuse_helpers.cpp
${CMAKE_SOURCE_DIR}/yt/yt/core/http/http.cpp
+ ${CMAKE_SOURCE_DIR}/yt/yt/core/http/retriable_client.cpp
${CMAKE_SOURCE_DIR}/yt/yt/core/http/server.cpp
${CMAKE_SOURCE_DIR}/yt/yt/core/http/stream.cpp
${CMAKE_SOURCE_DIR}/yt/yt/core/http/helpers.cpp
diff --git a/yt/yt/core/http/CMakeLists.linux-aarch64.txt b/yt/yt/core/http/CMakeLists.linux-aarch64.txt
index 96274c67f37..61bc495008b 100644
--- a/yt/yt/core/http/CMakeLists.linux-aarch64.txt
+++ b/yt/yt/core/http/CMakeLists.linux-aarch64.txt
@@ -24,6 +24,7 @@ target_sources(yt-core-http PRIVATE
${CMAKE_SOURCE_DIR}/yt/yt/core/http/connection_pool.cpp
${CMAKE_SOURCE_DIR}/yt/yt/core/http/connection_reuse_helpers.cpp
${CMAKE_SOURCE_DIR}/yt/yt/core/http/http.cpp
+ ${CMAKE_SOURCE_DIR}/yt/yt/core/http/retriable_client.cpp
${CMAKE_SOURCE_DIR}/yt/yt/core/http/server.cpp
${CMAKE_SOURCE_DIR}/yt/yt/core/http/stream.cpp
${CMAKE_SOURCE_DIR}/yt/yt/core/http/helpers.cpp
diff --git a/yt/yt/core/http/CMakeLists.linux-x86_64.txt b/yt/yt/core/http/CMakeLists.linux-x86_64.txt
index 96274c67f37..61bc495008b 100644
--- a/yt/yt/core/http/CMakeLists.linux-x86_64.txt
+++ b/yt/yt/core/http/CMakeLists.linux-x86_64.txt
@@ -24,6 +24,7 @@ target_sources(yt-core-http PRIVATE
${CMAKE_SOURCE_DIR}/yt/yt/core/http/connection_pool.cpp
${CMAKE_SOURCE_DIR}/yt/yt/core/http/connection_reuse_helpers.cpp
${CMAKE_SOURCE_DIR}/yt/yt/core/http/http.cpp
+ ${CMAKE_SOURCE_DIR}/yt/yt/core/http/retriable_client.cpp
${CMAKE_SOURCE_DIR}/yt/yt/core/http/server.cpp
${CMAKE_SOURCE_DIR}/yt/yt/core/http/stream.cpp
${CMAKE_SOURCE_DIR}/yt/yt/core/http/helpers.cpp
diff --git a/yt/yt/core/http/CMakeLists.windows-x86_64.txt b/yt/yt/core/http/CMakeLists.windows-x86_64.txt
index 47398129ec0..79a0874891a 100644
--- a/yt/yt/core/http/CMakeLists.windows-x86_64.txt
+++ b/yt/yt/core/http/CMakeLists.windows-x86_64.txt
@@ -20,6 +20,7 @@ target_sources(yt-core-http PRIVATE
${CMAKE_SOURCE_DIR}/yt/yt/core/http/connection_pool.cpp
${CMAKE_SOURCE_DIR}/yt/yt/core/http/connection_reuse_helpers.cpp
${CMAKE_SOURCE_DIR}/yt/yt/core/http/http.cpp
+ ${CMAKE_SOURCE_DIR}/yt/yt/core/http/retriable_client.cpp
${CMAKE_SOURCE_DIR}/yt/yt/core/http/server.cpp
${CMAKE_SOURCE_DIR}/yt/yt/core/http/stream.cpp
${CMAKE_SOURCE_DIR}/yt/yt/core/http/helpers.cpp
diff --git a/yt/yt/core/http/client.cpp b/yt/yt/core/http/client.cpp
index 40237c0fcb4..67091afd792 100644
--- a/yt/yt/core/http/client.cpp
+++ b/yt/yt/core/http/client.cpp
@@ -5,6 +5,7 @@
#include "config.h"
#include "stream.h"
#include "private.h"
+#include "helpers.h"
#include <yt/yt/core/net/dialer.h>
#include <yt/yt/core/net/config.h>
@@ -167,21 +168,10 @@ private:
}
}
- TString SanitizeUrl(const TString& url)
- {
- // Do not expose URL parameters in error attributes.
- auto urlRef = ParseUrl(url);
- if (urlRef.PortStr.empty()) {
- return TString(urlRef.Host) + urlRef.Path;
- } else {
- return Format("%v:%v%v", urlRef.Host, urlRef.PortStr, urlRef.Path);
- }
- }
-
template <typename T>
TFuture<T> WrapError(const TString& url, TCallback<T()> action)
{
- return BIND([=, this, this_ = MakeStrong(this)] {
+ return BIND([=, this_ = MakeStrong(this)] {
try {
return action();
} catch (const std::exception& ex) {
@@ -257,7 +247,9 @@ private:
request->SetHeaders(headers);
}
- auto requestPath = Format("%v?%v", urlRef.Path, urlRef.RawQuery);
+ auto requestPath = urlRef.RawQuery.empty()
+ ? TString(urlRef.Path)
+ : Format("%v?%v", urlRef.Path, urlRef.RawQuery);
request->WriteRequest(method, requestPath);
return {std::move(request), std::move(response)};
diff --git a/yt/yt/core/http/config.cpp b/yt/yt/core/http/config.cpp
index 91b6acce910..c9423a37bc9 100644
--- a/yt/yt/core/http/config.cpp
+++ b/yt/yt/core/http/config.cpp
@@ -66,6 +66,20 @@ void TClientConfig::Register(TRegistrar registrar)
////////////////////////////////////////////////////////////////////////////////
+void TRetrialbeClientConfig::Register(TRegistrar registrar)
+{
+ registrar.Parameter("request_timeout", &TThis::RequestTimeout)
+ .Default(TDuration::Seconds(15));
+ registrar.Parameter("attempt_timeout", &TThis::AttemptTimeout)
+ .Default(TDuration::Seconds(10));
+ registrar.Parameter("backoff_timeout", &TThis::BackoffTimeout)
+ .Default(TDuration::Seconds(1));
+ registrar.Parameter("max_attempt_count", &TThis::MaxAttemptCount)
+ .Default(3);
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
void TCorsConfig::Register(TRegistrar registrar)
{
registrar.Parameter("disable_cors_check", &TThis::DisableCorsCheck)
diff --git a/yt/yt/core/http/config.h b/yt/yt/core/http/config.h
index a18a480ff89..7468a03462b 100644
--- a/yt/yt/core/http/config.h
+++ b/yt/yt/core/http/config.h
@@ -86,6 +86,24 @@ DEFINE_REFCOUNTED_TYPE(TClientConfig)
////////////////////////////////////////////////////////////////////////////////
+class TRetrialbeClientConfig
+ : public NYTree::TYsonStruct
+{
+public:
+ TDuration RequestTimeout;
+ TDuration AttemptTimeout;
+ TDuration BackoffTimeout;
+ int MaxAttemptCount;
+
+ REGISTER_YSON_STRUCT(TRetrialbeClientConfig);
+
+ static void Register(TRegistrar registrar);
+};
+
+DEFINE_REFCOUNTED_TYPE(TRetrialbeClientConfig);
+
+////////////////////////////////////////////////////////////////////////////////
+
class TCorsConfig
: public NYTree::TYsonStruct
{
diff --git a/yt/yt/core/http/helpers.cpp b/yt/yt/core/http/helpers.cpp
index 71f96dc28ef..fd96bd5c3f8 100644
--- a/yt/yt/core/http/helpers.cpp
+++ b/yt/yt/core/http/helpers.cpp
@@ -461,6 +461,17 @@ void SetBytesRange(const THeadersPtr& headers, std::pair<i64, i64> range)
headers->Set(ContentRangeHeaderName, Format("bytes %v-%v/*", range.first, range.second));
}
+TString SanitizeUrl(const TString& url)
+{
+ // Do not expose URL parameters in error attributes.
+ auto urlRef = ParseUrl(url);
+ if (urlRef.PortStr.empty()) {
+ return TString(urlRef.Host) + urlRef.Path;
+ } else {
+ return Format("%v:%v%v", urlRef.Host, urlRef.PortStr, urlRef.Path);
+ }
+}
+
////////////////////////////////////////////////////////////////////////////////
} // namespace NYT::NHttp
diff --git a/yt/yt/core/http/helpers.h b/yt/yt/core/http/helpers.h
index 09216c651e7..6507640d535 100644
--- a/yt/yt/core/http/helpers.h
+++ b/yt/yt/core/http/helpers.h
@@ -56,6 +56,8 @@ NTracing::TTraceContextPtr GetOrCreateTraceContext(const IRequestPtr& req);
std::optional<std::pair<i64, i64>> FindBytesRange(const THeadersPtr& headers);
void SetBytesRange(const THeadersPtr& headers, std::pair<i64, i64> range);
+TString SanitizeUrl(const TString& url);
+
////////////////////////////////////////////////////////////////////////////////
} // namespace NYT::NHttp
diff --git a/yt/yt/core/http/public.h b/yt/yt/core/http/public.h
index 93148a0d920..694da6f5f30 100644
--- a/yt/yt/core/http/public.h
+++ b/yt/yt/core/http/public.h
@@ -15,12 +15,15 @@ DECLARE_REFCOUNTED_CLASS(IActiveRequest)
DECLARE_REFCOUNTED_STRUCT(IServer)
DECLARE_REFCOUNTED_STRUCT(IClient)
+DECLARE_REFCOUNTED_STRUCT(IRetriableClient)
+DECLARE_REFCOUNTED_STRUCT(IResponseChecker)
DECLARE_REFCOUNTED_STRUCT(IHttpHandler)
DECLARE_REFCOUNTED_CLASS(THttpIOConfig)
DECLARE_REFCOUNTED_CLASS(TServerConfig)
DECLARE_REFCOUNTED_CLASS(TClientConfig)
+DECLARE_REFCOUNTED_CLASS(TRetrialbeClientConfig)
DECLARE_REFCOUNTED_CLASS(TCorsConfig)
DECLARE_REFCOUNTED_CLASS(TConnectionPool)
DECLARE_REFCOUNTED_CLASS(IRequestPathMatcher)
diff --git a/yt/yt/core/http/retriable_client.cpp b/yt/yt/core/http/retriable_client.cpp
new file mode 100644
index 00000000000..400af9931bf
--- /dev/null
+++ b/yt/yt/core/http/retriable_client.cpp
@@ -0,0 +1,235 @@
+#include "config.h"
+
+#include "retriable_client.h"
+#include "private.h"
+
+#include <yt/yt/core/http/client.h>
+#include <yt/yt/core/http/helpers.h>
+#include <yt/yt/core/http/public.h>
+#include <yt/yt/core/json/json_parser.h>
+
+namespace NYT::NHttp {
+
+using namespace NNet;
+using namespace NYTree;
+using namespace NConcurrency;
+
+static const auto& Logger = HttpLogger;
+
+////////////////////////////////////////////////////////////////////////////////
+
+class TJsonResponseChecker
+ : public IResponseChecker
+{
+public:
+ TJsonResponseChecker(
+ TJsonErrorChecker errorChecker,
+ NJson::TJsonFormatConfigPtr jsonFormatConfig)
+ : JsonFormatConfig_(std::move(jsonFormatConfig))
+ , ErrorChecker_(std::move(errorChecker))
+ { }
+
+ TError CheckError(const IResponsePtr& response) override
+ {
+ try {
+ auto body = response->ReadAll();
+ TMemoryInput stream(body.Begin(), body.Size());
+ auto factory = NYTree::CreateEphemeralNodeFactory();
+ auto builder = NYTree::CreateBuilderFromFactory(factory.get());
+ NJson::ParseJson(&stream, builder.get(), JsonFormatConfig_);
+ Json_ = builder->EndTree();
+ } catch (const std::exception& ex) {
+ return TError("Error parsing response")
+ << ex;
+ }
+
+ if (!Json_) {
+ return TError("Got empty result");
+ }
+
+ try {
+ auto result = ErrorChecker_(response, Json_);
+ return result;
+ } catch (const std::exception& err) {
+ return err;
+ }
+ }
+
+ NYTree::INodePtr GetFormattedResponse() const override
+ {
+ return Json_;
+ }
+
+
+private:
+ const NJson::TJsonFormatConfigPtr JsonFormatConfig_;
+ INodePtr Json_;
+ TJsonErrorChecker ErrorChecker_;
+ TError Error_;
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
+IResponseCheckerPtr CreateJsonResponseChecker(
+ TJsonErrorChecker errorChecker,
+ const NJson::TJsonFormatConfigPtr& jsonFormatConfig)
+{
+ return New<TJsonResponseChecker>(std::move(errorChecker), jsonFormatConfig);
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+class TRetrialbeClient
+ : public IRetriableClient
+{
+public:
+ TRetrialbeClient(
+ TRetrialbeClientConfigPtr config,
+ IClientPtr client,
+ IInvokerPtr invoker)
+ : Config_(std::move(config))
+ , Invoker_(std::move(invoker))
+ , UnderlyingClient_(std::move(client))
+ { }
+
+ TFuture<IResponsePtr> Get(
+ const IResponseCheckerPtr& responseChecker,
+ const TString& url,
+ const THeadersPtr& headers) override
+ {
+ return MakeRequest(&IClient::Get, responseChecker, url, headers);
+ }
+
+ TFuture<IResponsePtr> Post(
+ const IResponseCheckerPtr& responseChecker,
+ const TString& url,
+ const TSharedRef& body,
+ const THeadersPtr& headers) override
+ {
+ return MakeRequest(&IClient::Post, responseChecker, url, body, headers);
+ }
+
+ TFuture<IResponsePtr> Patch(
+ const IResponseCheckerPtr& responseChecker,
+ const TString& url,
+ const TSharedRef& body,
+ const THeadersPtr& headers) override
+ {
+ return MakeRequest(&IClient::Patch, responseChecker, url, body, headers);
+ }
+
+ TFuture<IResponsePtr> Put(
+ const IResponseCheckerPtr& responseChecker,
+ const TString& url,
+ const TSharedRef& body,
+ const THeadersPtr& headers) override
+ {
+ return MakeRequest(&IClient::Put, responseChecker, url, body, headers);
+ }
+
+ TFuture<IResponsePtr> Delete(
+ const IResponseCheckerPtr& responseChecker,
+ const TString& url,
+ const THeadersPtr& headers) override
+ {
+ return MakeRequest(&IClient::Delete, responseChecker, url, headers);
+ }
+
+private:
+ const TRetrialbeClientConfigPtr Config_;
+ const IInvokerPtr Invoker_;
+ const IClientPtr UnderlyingClient_;
+
+private:
+ template <typename TCallable, typename... Args>
+ TFuture<IResponsePtr> MakeRequest(
+ TCallable&& func,
+ const IResponseCheckerPtr& responseChecker,
+ const TString& url,
+ Args&&... args)
+ {
+ return BIND([=, this, this_ = MakeStrong(this), func = std::move(func), ...args = std::move(args)] () {
+ return DoMakeRequest(std::move(func), responseChecker, url, std::forward<Args>(args)...);
+ }).AsyncVia(Invoker_).Run();
+ }
+
+ template <typename TCallable, typename... Args>
+ IResponsePtr DoMakeRequest(
+ TCallable&& func,
+ const IResponseCheckerPtr& responseChecker,
+ const TString& url,
+ Args&&... args)
+ {
+ const auto deadline = TInstant::Now() + Config_->RequestTimeout;
+ const auto sanitizedUrl = SanitizeUrl(url);
+
+ YT_LOG_DEBUG("Making request (Url: %v, Deadline: %v, MaxAttemptCount: %v)",
+ sanitizedUrl,
+ deadline,
+ Config_->MaxAttemptCount);
+ std::vector<TError> accumulatedErrors;
+
+ int attempt = 0;
+ while (attempt == 0 || (TInstant::Now() < deadline && attempt < Config_->MaxAttemptCount)) {
+ ++attempt;
+ auto future = BIND(func, UnderlyingClient_, url, std::forward<Args>(args)...)();
+ auto rspOrError = WaitFor(future.WithTimeout(Config_->AttemptTimeout));
+ if (!rspOrError.IsOK()) {
+ auto error = TError("Request attempt %v failed", attempt)
+ << rspOrError
+ << TErrorAttribute("attempt", attempt);
+
+ YT_LOG_WARNING(
+ error,
+ "Request attempt failed (Url: %v, Attempt: %v)",
+ sanitizedUrl,
+ attempt);
+ accumulatedErrors.push_back(std::move(error));
+ continue;
+ }
+
+ auto& rsp = rspOrError.Value();
+ const auto checkError = responseChecker->CheckError(rsp);
+ if (checkError.IsOK()) {
+ return rsp;
+ }
+
+ auto error = TError("Error checking response")
+ << checkError
+ << TErrorAttribute("attempt", attempt);
+ YT_LOG_WARNING(
+ error,
+ "Request attempt failed while checking response (Url: %v, Attempt: %v)",
+ sanitizedUrl,
+ attempt);
+ accumulatedErrors.push_back(std::move(error));
+
+ auto now = TInstant::Now();
+ if (now > deadline) {
+ break;
+ }
+ TDelayedExecutor::WaitForDuration(std::min(Config_->BackoffTimeout, deadline - now));
+ }
+
+ THROW_ERROR_EXCEPTION("HTTP request failed")
+ << std::move(accumulatedErrors)
+ << TErrorAttribute("url", sanitizedUrl)
+ << TErrorAttribute("attempt_count", attempt)
+ << TErrorAttribute("max_attempt_count", Config_->MaxAttemptCount);
+ }
+
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
+IRetriableClientPtr CreateRetriableClient(
+ TRetrialbeClientConfigPtr config,
+ IClientPtr client,
+ IInvokerPtr invoker)
+{
+ return New<TRetrialbeClient>(std::move(config), std::move(client), std::move(invoker));
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+}
diff --git a/yt/yt/core/http/retriable_client.h b/yt/yt/core/http/retriable_client.h
new file mode 100644
index 00000000000..9103389cd65
--- /dev/null
+++ b/yt/yt/core/http/retriable_client.h
@@ -0,0 +1,75 @@
+#pragma once
+
+#include <yt/yt/core/http/public.h>
+
+#include <yt/yt/core/json/public.h>
+
+#include <yt/yt/core/ytree/yson_struct.h>
+
+namespace NYT::NHttp {
+
+////////////////////////////////////////////////////////////////////////////////
+
+struct IResponseChecker
+ : public virtual TRefCounted
+{
+ virtual TError CheckError(const IResponsePtr& response) = 0;
+ virtual NYTree::INodePtr GetFormattedResponse() const = 0;
+};
+
+DEFINE_REFCOUNTED_TYPE(IResponseChecker)
+
+////////////////////////////////////////////////////////////////////////////////
+
+using TJsonErrorChecker = TCallback<TError(const IResponsePtr&, const NYTree::INodePtr&)>;
+
+IResponseCheckerPtr CreateJsonResponseChecker(
+ TJsonErrorChecker errorChecker,
+ const NJson::TJsonFormatConfigPtr& jsonFormatConfig);
+
+////////////////////////////////////////////////////////////////////////////////
+
+struct IRetriableClient
+ : public virtual TRefCounted
+{
+ virtual TFuture<IResponsePtr> Get(
+ const IResponseCheckerPtr& responseChecker,
+ const TString& url,
+ const THeadersPtr& headers = nullptr) = 0;
+
+ virtual TFuture<IResponsePtr> Post(
+ const IResponseCheckerPtr& responseChecker,
+ const TString& url,
+ const TSharedRef& body,
+ const THeadersPtr& headers = nullptr) = 0;
+
+ virtual TFuture<IResponsePtr> Patch(
+ const IResponseCheckerPtr& responseChecker,
+ const TString& url,
+ const TSharedRef& body,
+ const THeadersPtr& headers = nullptr) = 0;
+
+ virtual TFuture<IResponsePtr> Put(
+ const IResponseCheckerPtr& responseChecker,
+ const TString& url,
+ const TSharedRef& body,
+ const THeadersPtr& headers = nullptr) = 0;
+
+ virtual TFuture<IResponsePtr> Delete(
+ const IResponseCheckerPtr& responseChecker,
+ const TString& url,
+ const THeadersPtr& headers = nullptr) = 0;
+};
+
+DEFINE_REFCOUNTED_TYPE(IRetriableClient)
+
+////////////////////////////////////////////////////////////////////////////////
+
+IRetriableClientPtr CreateRetriableClient(
+ TRetrialbeClientConfigPtr config,
+ IClientPtr client,
+ IInvokerPtr invoker);
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT::NHttp
diff --git a/yt/yt/core/http/ya.make b/yt/yt/core/http/ya.make
index c3cf704d566..fe346db002f 100644
--- a/yt/yt/core/http/ya.make
+++ b/yt/yt/core/http/ya.make
@@ -8,6 +8,7 @@ SRCS(
connection_pool.cpp
connection_reuse_helpers.cpp
http.cpp
+ retriable_client.cpp
server.cpp
stream.cpp
helpers.cpp