aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorannashest18 <annashest18@yandex-team.com>2023-10-22 13:19:53 +0300
committerannashest18 <annashest18@yandex-team.com>2023-10-22 13:40:13 +0300
commit24a44df2ab01109cefd64d4bfad05c66e7ebd2af (patch)
tree1ec1ca95a2a8aceaa383fde2e19e61db9ab4fe02
parenta991b019f74cbfe6efb40c3b68b46a1ac2a9bd09 (diff)
downloadydb-24a44df2ab01109cefd64d4bfad05c66e7ebd2af.tar.gz
add using http-proxy for reading table from YT
add using http-proxy for reading table from YT Нам нужна возможность ходить в YT через HTTP proxy для чтения таблиц, используя С++ клиент не из контура Яндекса, к сожалению, сейчас такой возможности нет. В этом ПР черновик изменения, которого нам достаточно https://a.yandex-team.ru/review/4676436/details - тут это же изменение в YT + коммит с тем, как мы планируем использовать
-rw-r--r--yt/cpp/mapreduce/client/client.cpp1
-rw-r--r--yt/cpp/mapreduce/client/client_reader.cpp3
-rw-r--r--yt/cpp/mapreduce/client/file_reader.cpp4
-rw-r--r--yt/cpp/mapreduce/client/retry_heavy_write_request.cpp2
-rw-r--r--yt/cpp/mapreduce/client/retryless_writer.h1
-rw-r--r--yt/cpp/mapreduce/http/context.cpp3
-rw-r--r--yt/cpp/mapreduce/http/context.h1
-rw-r--r--yt/cpp/mapreduce/http/helpers.cpp17
-rw-r--r--yt/cpp/mapreduce/http/helpers.h4
-rw-r--r--yt/cpp/mapreduce/http/host_manager.cpp6
-rw-r--r--yt/cpp/mapreduce/http/http.cpp25
-rw-r--r--yt/cpp/mapreduce/http/http.h6
-rw-r--r--yt/cpp/mapreduce/http/requests.cpp2
-rw-r--r--yt/cpp/mapreduce/http/retry_request.cpp6
-rw-r--r--yt/cpp/mapreduce/interface/client_method_options.h3
-rw-r--r--yt/cpp/mapreduce/raw_client/raw_requests.cpp2
16 files changed, 75 insertions, 11 deletions
diff --git a/yt/cpp/mapreduce/client/client.cpp b/yt/cpp/mapreduce/client/client.cpp
index 8279c1a38b..29d134ad3d 100644
--- a/yt/cpp/mapreduce/client/client.cpp
+++ b/yt/cpp/mapreduce/client/client.cpp
@@ -1296,6 +1296,7 @@ TClientPtr CreateClientImpl(
context.Config = options.Config_ ? options.Config_ : TConfig::Get();
context.TvmOnly = options.TvmOnly_;
context.UseTLS = options.UseTLS_;
+ context.ProxyAddress = options.ProxyAddress_;
context.ServerName = serverName;
if (serverName.find('.') == TString::npos &&
diff --git a/yt/cpp/mapreduce/client/client_reader.cpp b/yt/cpp/mapreduce/client/client_reader.cpp
index f1c1231220..606ac1c297 100644
--- a/yt/cpp/mapreduce/client/client_reader.cpp
+++ b/yt/cpp/mapreduce/client/client_reader.cpp
@@ -191,7 +191,8 @@ void TClientReader::CreateRequest(const TMaybe<ui32>& rangeIndex, const TMaybe<u
try {
const auto proxyName = GetProxyForHeavyRequest(Context_);
- Response_ = Context_.HttpClient->Request(GetFullUrl(proxyName, Context_, header), requestId, header);
+ UpdateHeaderForProxyIfNeed(proxyName, Context_, header);
+ Response_ = Context_.HttpClient->Request(GetFullUrlForProxy(proxyName, Context_, header), requestId, header);
Input_ = Response_->GetResponseStream();
diff --git a/yt/cpp/mapreduce/client/file_reader.cpp b/yt/cpp/mapreduce/client/file_reader.cpp
index 5454b9b11f..57131e5330 100644
--- a/yt/cpp/mapreduce/client/file_reader.cpp
+++ b/yt/cpp/mapreduce/client/file_reader.cpp
@@ -149,6 +149,8 @@ NHttpClient::IHttpResponsePtr TFileReader::Request(const TClientContext& context
header.SetImpersonationUser(*context.ImpersonationUser);
}
+ UpdateHeaderForProxyIfNeed(hostName, context, header);
+
header.AddTransactionId(transactionId);
header.SetOutputFormat(TMaybe<TFormat>()); // Binary format
@@ -208,6 +210,8 @@ NHttpClient::IHttpResponsePtr TBlobTableReader::Request(const TClientContext& co
header.SetImpersonationUser(*context.ImpersonationUser);
}
+ UpdateHeaderForProxyIfNeed(hostName, context, header);
+
header.AddTransactionId(transactionId);
header.SetOutputFormat(TMaybe<TFormat>()); // Binary format
diff --git a/yt/cpp/mapreduce/client/retry_heavy_write_request.cpp b/yt/cpp/mapreduce/client/retry_heavy_write_request.cpp
index 03a1bcfccf..c72f42089f 100644
--- a/yt/cpp/mapreduce/client/retry_heavy_write_request.cpp
+++ b/yt/cpp/mapreduce/client/retry_heavy_write_request.cpp
@@ -51,6 +51,8 @@ void RetryHeavyWriteRequest(
auto hostName = GetProxyForHeavyRequest(context);
requestId = CreateGuidAsString();
+ UpdateHeaderForProxyIfNeed(hostName, context, header);
+
header.AddTransactionId(attemptTx.GetId(), /* overwrite = */ true);
header.SetRequestCompression(ToString(context.Config->ContentEncoding));
diff --git a/yt/cpp/mapreduce/client/retryless_writer.h b/yt/cpp/mapreduce/client/retryless_writer.h
index 835fdaea3b..0a1b3865ee 100644
--- a/yt/cpp/mapreduce/client/retryless_writer.h
+++ b/yt/cpp/mapreduce/client/retryless_writer.h
@@ -55,6 +55,7 @@ public:
TString requestId = CreateGuidAsString();
auto hostName = GetProxyForHeavyRequest(context);
+ UpdateHeaderForProxyIfNeed(hostName, context, header);
Request_ = context.HttpClient->StartRequest(GetFullUrl(hostName, context, header), requestId, header);
BufferedOutput_.Reset(new TBufferedOutput(Request_->GetStream(), BufferSize_));
}
diff --git a/yt/cpp/mapreduce/http/context.cpp b/yt/cpp/mapreduce/http/context.cpp
index 1c016263c5..d158a55195 100644
--- a/yt/cpp/mapreduce/http/context.cpp
+++ b/yt/cpp/mapreduce/http/context.cpp
@@ -12,7 +12,8 @@ bool operator==(const TClientContext& lhs, const TClientContext& rhs)
lhs.ServiceTicketAuth == rhs.ServiceTicketAuth &&
lhs.HttpClient == rhs.HttpClient &&
lhs.UseTLS == rhs.UseTLS &&
- lhs.TvmOnly == rhs.TvmOnly;
+ lhs.TvmOnly == rhs.TvmOnly &&
+ lhs.ProxyAddress == rhs.ProxyAddress;
}
bool operator!=(const TClientContext& lhs, const TClientContext& rhs)
diff --git a/yt/cpp/mapreduce/http/context.h b/yt/cpp/mapreduce/http/context.h
index 3926373e17..9c730f1c3a 100644
--- a/yt/cpp/mapreduce/http/context.h
+++ b/yt/cpp/mapreduce/http/context.h
@@ -21,6 +21,7 @@ struct TClientContext
bool TvmOnly = false;
bool UseTLS = false;
TConfigPtr Config = TConfig::Get();
+ TMaybe<TString> ProxyAddress;
};
bool operator==(const TClientContext& lhs, const TClientContext& rhs);
diff --git a/yt/cpp/mapreduce/http/helpers.cpp b/yt/cpp/mapreduce/http/helpers.cpp
index 8a5fe16138..18f74b5958 100644
--- a/yt/cpp/mapreduce/http/helpers.cpp
+++ b/yt/cpp/mapreduce/http/helpers.cpp
@@ -41,6 +41,23 @@ TString GetFullUrl(const TString& hostName, const TClientContext& context, THttp
return Format("http://%v%v", hostName, header.GetUrl());
}
+void UpdateHeaderForProxyIfNeed(const TString& hostName, const TClientContext& context, THttpHeader& header)
+{
+ if (context.ProxyAddress) {
+ header.SetHostPort(Format("http://%v", hostName));
+ header.SetProxyAddress(*context.ProxyAddress);
+ }
+}
+
+TString GetFullUrlForProxy(const TString& hostName, const TClientContext& context, THttpHeader& header)
+{
+ if (context.ProxyAddress) {
+ THttpHeader emptyHeader(header.GetMethod(), "", false);
+ return GetFullUrl(*context.ProxyAddress, context, emptyHeader);
+ }
+ return GetFullUrl(hostName, context, header);
+}
+
static TString GetParametersDebugString(const THttpHeader& header)
{
const auto& parameters = header.GetParameters();
diff --git a/yt/cpp/mapreduce/http/helpers.h b/yt/cpp/mapreduce/http/helpers.h
index 0c510fa2e8..82ef799ca1 100644
--- a/yt/cpp/mapreduce/http/helpers.h
+++ b/yt/cpp/mapreduce/http/helpers.h
@@ -14,6 +14,10 @@ TString CreateHostNameWithPort(const TString& name, const TClientContext& contex
TString GetFullUrl(const TString& hostName, const TClientContext& context, THttpHeader& header);
+void UpdateHeaderForProxyIfNeed(const TString& hostName, const TClientContext& context, THttpHeader& header);
+
+TString GetFullUrlForProxy(const TString& hostName, const TClientContext& context, THttpHeader& header);
+
TString TruncateForLogs(const TString& text, size_t maxSize);
TString GetLoggedAttributes(const THttpHeader& header, const TString& url, bool includeParameters, size_t sizeLimit);
diff --git a/yt/cpp/mapreduce/http/host_manager.cpp b/yt/cpp/mapreduce/http/host_manager.cpp
index a239dde769..e9716ec7a6 100644
--- a/yt/cpp/mapreduce/http/host_manager.cpp
+++ b/yt/cpp/mapreduce/http/host_manager.cpp
@@ -94,7 +94,7 @@ void THostManager::Reset()
TString THostManager::GetProxyForHeavyRequest(const TClientContext& context)
{
- auto cluster = context.ServerName;
+ auto cluster = context.ProxyAddress ? *context.ProxyAddress : context.ServerName;
{
auto guard = Guard(Lock_);
auto it = ClusterHosts_.find(cluster);
@@ -121,10 +121,10 @@ THostManager::TClusterHostList THostManager::GetHosts(const TClientContext& cont
THttpHeader header("GET", hostsEndpoint, false);
try {
- auto hostName = context.ServerName;
auto requestId = CreateGuidAsString();
// TODO: we need to set socket timeout here
- auto response = context.HttpClient->Request(GetFullUrl(hostName, context, header), requestId, header);
+ UpdateHeaderForProxyIfNeed(context.ServerName, context, header);
+ auto response = context.HttpClient->Request(GetFullUrlForProxy(context.ServerName, context, header), requestId, header);
auto hosts = ParseJsonStringArray(response->GetResponse());
for (auto& host : hosts) {
host = CreateHostNameWithPort(host, context);
diff --git a/yt/cpp/mapreduce/http/http.cpp b/yt/cpp/mapreduce/http/http.cpp
index c5c358c46c..987b6daf4a 100644
--- a/yt/cpp/mapreduce/http/http.cpp
+++ b/yt/cpp/mapreduce/http/http.cpp
@@ -210,6 +210,16 @@ void THttpHeader::SetToken(const TString& token)
Token = token;
}
+void THttpHeader::SetProxyAddress(const TString& proxyAddress)
+{
+ ProxyAddress = proxyAddress;
+}
+
+void THttpHeader::SetHostPort(const TString& hostPort)
+{
+ HostPort = hostPort;
+}
+
void THttpHeader::SetImpersonationUser(const TString& impersonationUser)
{
ImpersonationUser = impersonationUser;
@@ -250,10 +260,19 @@ TString THttpHeader::GetCommand() const
return Command;
}
-TString THttpHeader::GetUrl() const
+TString THttpHeader::GetUrl(bool needProxy) const
{
TStringStream url;
+ if (needProxy && !ProxyAddress.empty()) {
+ url << ProxyAddress << "/";
+ return url.Str();
+ }
+
+ if (!ProxyAddress.empty()) {
+ url << HostPort;
+ }
+
if (IsApi) {
url << "/api/" << TConfig::Get()->ApiVersion << "/" << Command;
} else {
@@ -274,7 +293,7 @@ TString THttpHeader::GetHeaderAsString(const TString& hostName, const TString& r
result << Method << " " << GetUrl() << " HTTP/1.1\r\n";
- GetHeader(hostName, requestId, includeParameters).Get()->WriteTo(&result);
+ GetHeader(HostPort.Empty() ? hostName : HostPort, requestId, includeParameters).Get()->WriteTo(&result);
if (ShouldAcceptFraming()) {
result << "X-YT-Accept-Framing: 1\r\n";
@@ -914,7 +933,7 @@ void THttpRequest::Connect(TString hostName, TDuration socketTimeout)
IOutputStream* THttpRequest::StartRequestImpl(const THttpHeader& header, bool includeParameters)
{
auto strHeader = header.GetHeaderAsString(HostName, RequestId, includeParameters);
- Url_ = header.GetUrl();
+ Url_ = header.GetUrl(true);
LogRequest(header, Url_, includeParameters, RequestId, HostName);
diff --git a/yt/cpp/mapreduce/http/http.h b/yt/cpp/mapreduce/http/http.h
index ee8783088d..8bfa00de1d 100644
--- a/yt/cpp/mapreduce/http/http.h
+++ b/yt/cpp/mapreduce/http/http.h
@@ -57,6 +57,8 @@ public:
bool HasMutationId() const;
void SetToken(const TString& token);
+ void SetProxyAddress(const TString& proxyAddress);
+ void SetHostPort(const TString& hostPort);
void SetImpersonationUser(const TString& impersonationUser);
void SetServiceTicket(const TString& ticket);
@@ -70,7 +72,7 @@ public:
void SetResponseCompression(const TString& compression);
TString GetCommand() const;
- TString GetUrl() const;
+ TString GetUrl(bool needProxy = false) const;
TString GetHeaderAsString(const TString& hostName, const TString& requestId, bool includeParameters = true) const;
NHttp::THeadersPtrWrapper GetHeader(const TString& hostName, const TString& requestId, bool includeParameters) const;
@@ -89,6 +91,8 @@ private:
TString Token;
TString ServiceTicket;
TNode Attributes;
+ TString ProxyAddress;
+ TString HostPort;
private:
TMaybe<TFormat> InputFormat = TFormat::YsonText();
diff --git a/yt/cpp/mapreduce/http/requests.cpp b/yt/cpp/mapreduce/http/requests.cpp
index 7cf0f673bb..7d95a10bc2 100644
--- a/yt/cpp/mapreduce/http/requests.cpp
+++ b/yt/cpp/mapreduce/http/requests.cpp
@@ -41,7 +41,7 @@ TGUID ParseGuidFromResponse(const TString& response)
TString GetProxyForHeavyRequest(const TClientContext& context)
{
if (!context.Config->UseHosts) {
- return context.ServerName;
+ return context.ProxyAddress ? *context.ProxyAddress : context.ServerName;
}
return NPrivate::THostManager::Get().GetProxyForHeavyRequest(context);
diff --git a/yt/cpp/mapreduce/http/retry_request.cpp b/yt/cpp/mapreduce/http/retry_request.cpp
index 383b3f16d5..63a36c2f78 100644
--- a/yt/cpp/mapreduce/http/retry_request.cpp
+++ b/yt/cpp/mapreduce/http/retry_request.cpp
@@ -34,7 +34,9 @@ static TResponseInfo Request(
hostName = context.ServerName;
}
- auto url = GetFullUrl(hostName, context, header);
+ UpdateHeaderForProxyIfNeed(hostName, context, header);
+
+ auto url = GetFullUrlForProxy(hostName, context, header);
auto response = context.HttpClient->Request(url, requestId, config.HttpConfig, header, body);
@@ -83,6 +85,8 @@ TResponseInfo RetryRequestWithPolicy(
header.SetToken(context.Token);
}
+ UpdateHeaderForProxyIfNeed(context.ServerName, context, header);
+
if (context.ImpersonationUser) {
header.SetImpersonationUser(*context.ImpersonationUser);
}
diff --git a/yt/cpp/mapreduce/interface/client_method_options.h b/yt/cpp/mapreduce/interface/client_method_options.h
index 772eeff847..f6e9e5791e 100644
--- a/yt/cpp/mapreduce/interface/client_method_options.h
+++ b/yt/cpp/mapreduce/interface/client_method_options.h
@@ -1071,6 +1071,9 @@ struct TCreateClientOptions
/// access token, api version and more.
/// @see NYT::TConfig
FLUENT_FIELD_DEFAULT(TConfigPtr, Config, nullptr);
+
+ /// @brief Proxy Address to be used for connection
+ FLUENT_FIELD_OPTION(TString, ProxyAddress);
};
///
diff --git a/yt/cpp/mapreduce/raw_client/raw_requests.cpp b/yt/cpp/mapreduce/raw_client/raw_requests.cpp
index bba605ae4c..4a7304572b 100644
--- a/yt/cpp/mapreduce/raw_client/raw_requests.cpp
+++ b/yt/cpp/mapreduce/raw_client/raw_requests.cpp
@@ -676,6 +676,8 @@ public:
auto hostName = GetProxyForHeavyRequest(context);
auto requestId = CreateGuidAsString();
+ UpdateHeaderForProxyIfNeed(hostName, context, header);
+
Response_ = context.HttpClient->Request(GetFullUrl(hostName, context, header), requestId, header);
ResponseStream_ = Response_->GetResponseStream();
}