diff options
author | annashest18 <annashest18@yandex-team.com> | 2023-10-22 13:19:53 +0300 |
---|---|---|
committer | annashest18 <annashest18@yandex-team.com> | 2023-10-22 13:40:13 +0300 |
commit | 24a44df2ab01109cefd64d4bfad05c66e7ebd2af (patch) | |
tree | 1ec1ca95a2a8aceaa383fde2e19e61db9ab4fe02 /yt/cpp/mapreduce/http | |
parent | a991b019f74cbfe6efb40c3b68b46a1ac2a9bd09 (diff) | |
download | ydb-24a44df2ab01109cefd64d4bfad05c66e7ebd2af.tar.gz |
add using http-proxy for reading table from YT
add using http-proxy for reading table from YT
Нам нужна возможность ходить в YT через HTTP proxy для чтения таблиц, используя С++ клиент не из контура Яндекса, к сожалению, сейчас такой возможности нет. В этом ПР черновик изменения, которого нам достаточно
https://a.yandex-team.ru/review/4676436/details - тут это же изменение в YT + коммит с тем, как мы планируем использовать
Diffstat (limited to 'yt/cpp/mapreduce/http')
-rw-r--r-- | yt/cpp/mapreduce/http/context.cpp | 3 | ||||
-rw-r--r-- | yt/cpp/mapreduce/http/context.h | 1 | ||||
-rw-r--r-- | yt/cpp/mapreduce/http/helpers.cpp | 17 | ||||
-rw-r--r-- | yt/cpp/mapreduce/http/helpers.h | 4 | ||||
-rw-r--r-- | yt/cpp/mapreduce/http/host_manager.cpp | 6 | ||||
-rw-r--r-- | yt/cpp/mapreduce/http/http.cpp | 25 | ||||
-rw-r--r-- | yt/cpp/mapreduce/http/http.h | 6 | ||||
-rw-r--r-- | yt/cpp/mapreduce/http/requests.cpp | 2 | ||||
-rw-r--r-- | yt/cpp/mapreduce/http/retry_request.cpp | 6 |
9 files changed, 60 insertions, 10 deletions
diff --git a/yt/cpp/mapreduce/http/context.cpp b/yt/cpp/mapreduce/http/context.cpp index 1c016263c5..d158a55195 100644 --- a/yt/cpp/mapreduce/http/context.cpp +++ b/yt/cpp/mapreduce/http/context.cpp @@ -12,7 +12,8 @@ bool operator==(const TClientContext& lhs, const TClientContext& rhs) lhs.ServiceTicketAuth == rhs.ServiceTicketAuth && lhs.HttpClient == rhs.HttpClient && lhs.UseTLS == rhs.UseTLS && - lhs.TvmOnly == rhs.TvmOnly; + lhs.TvmOnly == rhs.TvmOnly && + lhs.ProxyAddress == rhs.ProxyAddress; } bool operator!=(const TClientContext& lhs, const TClientContext& rhs) diff --git a/yt/cpp/mapreduce/http/context.h b/yt/cpp/mapreduce/http/context.h index 3926373e17..9c730f1c3a 100644 --- a/yt/cpp/mapreduce/http/context.h +++ b/yt/cpp/mapreduce/http/context.h @@ -21,6 +21,7 @@ struct TClientContext bool TvmOnly = false; bool UseTLS = false; TConfigPtr Config = TConfig::Get(); + TMaybe<TString> ProxyAddress; }; bool operator==(const TClientContext& lhs, const TClientContext& rhs); diff --git a/yt/cpp/mapreduce/http/helpers.cpp b/yt/cpp/mapreduce/http/helpers.cpp index 8a5fe16138..18f74b5958 100644 --- a/yt/cpp/mapreduce/http/helpers.cpp +++ b/yt/cpp/mapreduce/http/helpers.cpp @@ -41,6 +41,23 @@ TString GetFullUrl(const TString& hostName, const TClientContext& context, THttp return Format("http://%v%v", hostName, header.GetUrl()); } +void UpdateHeaderForProxyIfNeed(const TString& hostName, const TClientContext& context, THttpHeader& header) +{ + if (context.ProxyAddress) { + header.SetHostPort(Format("http://%v", hostName)); + header.SetProxyAddress(*context.ProxyAddress); + } +} + +TString GetFullUrlForProxy(const TString& hostName, const TClientContext& context, THttpHeader& header) +{ + if (context.ProxyAddress) { + THttpHeader emptyHeader(header.GetMethod(), "", false); + return GetFullUrl(*context.ProxyAddress, context, emptyHeader); + } + return GetFullUrl(hostName, context, header); +} + static TString GetParametersDebugString(const THttpHeader& header) { const auto& parameters = header.GetParameters(); diff --git a/yt/cpp/mapreduce/http/helpers.h b/yt/cpp/mapreduce/http/helpers.h index 0c510fa2e8..82ef799ca1 100644 --- a/yt/cpp/mapreduce/http/helpers.h +++ b/yt/cpp/mapreduce/http/helpers.h @@ -14,6 +14,10 @@ TString CreateHostNameWithPort(const TString& name, const TClientContext& contex TString GetFullUrl(const TString& hostName, const TClientContext& context, THttpHeader& header); +void UpdateHeaderForProxyIfNeed(const TString& hostName, const TClientContext& context, THttpHeader& header); + +TString GetFullUrlForProxy(const TString& hostName, const TClientContext& context, THttpHeader& header); + TString TruncateForLogs(const TString& text, size_t maxSize); TString GetLoggedAttributes(const THttpHeader& header, const TString& url, bool includeParameters, size_t sizeLimit); diff --git a/yt/cpp/mapreduce/http/host_manager.cpp b/yt/cpp/mapreduce/http/host_manager.cpp index a239dde769..e9716ec7a6 100644 --- a/yt/cpp/mapreduce/http/host_manager.cpp +++ b/yt/cpp/mapreduce/http/host_manager.cpp @@ -94,7 +94,7 @@ void THostManager::Reset() TString THostManager::GetProxyForHeavyRequest(const TClientContext& context) { - auto cluster = context.ServerName; + auto cluster = context.ProxyAddress ? *context.ProxyAddress : context.ServerName; { auto guard = Guard(Lock_); auto it = ClusterHosts_.find(cluster); @@ -121,10 +121,10 @@ THostManager::TClusterHostList THostManager::GetHosts(const TClientContext& cont THttpHeader header("GET", hostsEndpoint, false); try { - auto hostName = context.ServerName; auto requestId = CreateGuidAsString(); // TODO: we need to set socket timeout here - auto response = context.HttpClient->Request(GetFullUrl(hostName, context, header), requestId, header); + UpdateHeaderForProxyIfNeed(context.ServerName, context, header); + auto response = context.HttpClient->Request(GetFullUrlForProxy(context.ServerName, context, header), requestId, header); auto hosts = ParseJsonStringArray(response->GetResponse()); for (auto& host : hosts) { host = CreateHostNameWithPort(host, context); diff --git a/yt/cpp/mapreduce/http/http.cpp b/yt/cpp/mapreduce/http/http.cpp index c5c358c46c..987b6daf4a 100644 --- a/yt/cpp/mapreduce/http/http.cpp +++ b/yt/cpp/mapreduce/http/http.cpp @@ -210,6 +210,16 @@ void THttpHeader::SetToken(const TString& token) Token = token; } +void THttpHeader::SetProxyAddress(const TString& proxyAddress) +{ + ProxyAddress = proxyAddress; +} + +void THttpHeader::SetHostPort(const TString& hostPort) +{ + HostPort = hostPort; +} + void THttpHeader::SetImpersonationUser(const TString& impersonationUser) { ImpersonationUser = impersonationUser; @@ -250,10 +260,19 @@ TString THttpHeader::GetCommand() const return Command; } -TString THttpHeader::GetUrl() const +TString THttpHeader::GetUrl(bool needProxy) const { TStringStream url; + if (needProxy && !ProxyAddress.empty()) { + url << ProxyAddress << "/"; + return url.Str(); + } + + if (!ProxyAddress.empty()) { + url << HostPort; + } + if (IsApi) { url << "/api/" << TConfig::Get()->ApiVersion << "/" << Command; } else { @@ -274,7 +293,7 @@ TString THttpHeader::GetHeaderAsString(const TString& hostName, const TString& r result << Method << " " << GetUrl() << " HTTP/1.1\r\n"; - GetHeader(hostName, requestId, includeParameters).Get()->WriteTo(&result); + GetHeader(HostPort.Empty() ? hostName : HostPort, requestId, includeParameters).Get()->WriteTo(&result); if (ShouldAcceptFraming()) { result << "X-YT-Accept-Framing: 1\r\n"; @@ -914,7 +933,7 @@ void THttpRequest::Connect(TString hostName, TDuration socketTimeout) IOutputStream* THttpRequest::StartRequestImpl(const THttpHeader& header, bool includeParameters) { auto strHeader = header.GetHeaderAsString(HostName, RequestId, includeParameters); - Url_ = header.GetUrl(); + Url_ = header.GetUrl(true); LogRequest(header, Url_, includeParameters, RequestId, HostName); diff --git a/yt/cpp/mapreduce/http/http.h b/yt/cpp/mapreduce/http/http.h index ee8783088d..8bfa00de1d 100644 --- a/yt/cpp/mapreduce/http/http.h +++ b/yt/cpp/mapreduce/http/http.h @@ -57,6 +57,8 @@ public: bool HasMutationId() const; void SetToken(const TString& token); + void SetProxyAddress(const TString& proxyAddress); + void SetHostPort(const TString& hostPort); void SetImpersonationUser(const TString& impersonationUser); void SetServiceTicket(const TString& ticket); @@ -70,7 +72,7 @@ public: void SetResponseCompression(const TString& compression); TString GetCommand() const; - TString GetUrl() const; + TString GetUrl(bool needProxy = false) const; TString GetHeaderAsString(const TString& hostName, const TString& requestId, bool includeParameters = true) const; NHttp::THeadersPtrWrapper GetHeader(const TString& hostName, const TString& requestId, bool includeParameters) const; @@ -89,6 +91,8 @@ private: TString Token; TString ServiceTicket; TNode Attributes; + TString ProxyAddress; + TString HostPort; private: TMaybe<TFormat> InputFormat = TFormat::YsonText(); diff --git a/yt/cpp/mapreduce/http/requests.cpp b/yt/cpp/mapreduce/http/requests.cpp index 7cf0f673bb..7d95a10bc2 100644 --- a/yt/cpp/mapreduce/http/requests.cpp +++ b/yt/cpp/mapreduce/http/requests.cpp @@ -41,7 +41,7 @@ TGUID ParseGuidFromResponse(const TString& response) TString GetProxyForHeavyRequest(const TClientContext& context) { if (!context.Config->UseHosts) { - return context.ServerName; + return context.ProxyAddress ? *context.ProxyAddress : context.ServerName; } return NPrivate::THostManager::Get().GetProxyForHeavyRequest(context); diff --git a/yt/cpp/mapreduce/http/retry_request.cpp b/yt/cpp/mapreduce/http/retry_request.cpp index 383b3f16d5..63a36c2f78 100644 --- a/yt/cpp/mapreduce/http/retry_request.cpp +++ b/yt/cpp/mapreduce/http/retry_request.cpp @@ -34,7 +34,9 @@ static TResponseInfo Request( hostName = context.ServerName; } - auto url = GetFullUrl(hostName, context, header); + UpdateHeaderForProxyIfNeed(hostName, context, header); + + auto url = GetFullUrlForProxy(hostName, context, header); auto response = context.HttpClient->Request(url, requestId, config.HttpConfig, header, body); @@ -83,6 +85,8 @@ TResponseInfo RetryRequestWithPolicy( header.SetToken(context.Token); } + UpdateHeaderForProxyIfNeed(context.ServerName, context, header); + if (context.ImpersonationUser) { header.SetImpersonationUser(*context.ImpersonationUser); } |