aboutsummaryrefslogtreecommitdiffstats
path: root/yt/cpp/mapreduce
diff options
context:
space:
mode:
authorermolovd <ermolovd@yandex-team.com>2024-11-18 18:37:13 +0300
committerermolovd <ermolovd@yandex-team.com>2024-11-18 19:07:19 +0300
commiteb9ebcd66823f2331380c62c209c41382fcdabdb (patch)
treecbb1bf8cac576fa446fd44f96086f1fbf8e1a609 /yt/cpp/mapreduce
parent2af8e95bd4d3f701a8dd6d255b1af4ef4a205cf4 (diff)
downloadydb-eb9ebcd66823f2331380c62c209c41382fcdabdb.tar.gz
cpp/mapreduce: fix style
commit_hash:4b9b636737f162b91295356e0ac17f0ea2beab20
Diffstat (limited to 'yt/cpp/mapreduce')
-rw-r--r--yt/cpp/mapreduce/http/http.cpp153
-rw-r--r--yt/cpp/mapreduce/http/http.h55
-rw-r--r--yt/cpp/mapreduce/http/http_client.cpp10
-rw-r--r--yt/cpp/mapreduce/http/http_client.h2
4 files changed, 111 insertions, 109 deletions
diff --git a/yt/cpp/mapreduce/http/http.cpp b/yt/cpp/mapreduce/http/http.cpp
index f9eb8539b5..12aa33ff29 100644
--- a/yt/cpp/mapreduce/http/http.cpp
+++ b/yt/cpp/mapreduce/http/http.cpp
@@ -130,16 +130,16 @@ private:
////////////////////////////////////////////////////////////////////////////////
THttpHeader::THttpHeader(const TString& method, const TString& command, bool isApi)
- : Method(method)
- , Command(command)
- , IsApi(isApi)
+ : Method_(method)
+ , Command_(command)
+ , IsApi_(isApi)
{ }
void THttpHeader::AddParameter(const TString& key, TNode value, bool overwrite)
{
- auto it = Parameters.find(key);
- if (it == Parameters.end()) {
- Parameters.emplace(key, std::move(value));
+ auto it = Parameters_.find(key);
+ if (it == Parameters_.end()) {
+ Parameters_.emplace(key, std::move(value));
} else {
if (overwrite) {
it->second = std::move(value);
@@ -158,12 +158,12 @@ void THttpHeader::MergeParameters(const TNode& newParameters, bool overwrite)
void THttpHeader::RemoveParameter(const TString& key)
{
- Parameters.erase(key);
+ Parameters_.erase(key);
}
TNode THttpHeader::GetParameters() const
{
- return Parameters;
+ return Parameters_;
}
void THttpHeader::AddTransactionId(const TTransactionId& transactionId, bool overwrite)
@@ -202,81 +202,81 @@ void THttpHeader::AddMutationId()
bool THttpHeader::HasMutationId() const
{
- return Parameters.contains("mutation_id");
+ return Parameters_.contains("mutation_id");
}
void THttpHeader::SetToken(const TString& token)
{
- Token = token;
+ Token_ = token;
}
void THttpHeader::SetProxyAddress(const TString& proxyAddress)
{
- ProxyAddress = proxyAddress;
+ ProxyAddress_ = proxyAddress;
}
void THttpHeader::SetHostPort(const TString& hostPort)
{
- HostPort = hostPort;
+ HostPort_ = hostPort;
}
void THttpHeader::SetImpersonationUser(const TString& impersonationUser)
{
- ImpersonationUser = impersonationUser;
+ ImpersonationUser_ = impersonationUser;
}
void THttpHeader::SetServiceTicket(const TString& ticket)
{
- ServiceTicket = ticket;
+ ServiceTicket_ = ticket;
}
void THttpHeader::SetInputFormat(const TMaybe<TFormat>& format)
{
- InputFormat = format;
+ InputFormat_ = format;
}
void THttpHeader::SetOutputFormat(const TMaybe<TFormat>& format)
{
- OutputFormat = format;
+ OutputFormat_ = format;
}
TMaybe<TFormat> THttpHeader::GetOutputFormat() const
{
- return OutputFormat;
+ return OutputFormat_;
}
void THttpHeader::SetRequestCompression(const TString& compression)
{
- RequestCompression = compression;
+ RequestCompression_ = compression;
}
void THttpHeader::SetResponseCompression(const TString& compression)
{
- ResponseCompression = compression;
+ ResponseCompression_ = compression;
}
TString THttpHeader::GetCommand() const
{
- return Command;
+ return Command_;
}
TString THttpHeader::GetUrl(bool needProxy) const
{
TStringStream url;
- if (needProxy && !ProxyAddress.empty()) {
- url << ProxyAddress << "/";
+ if (needProxy && !ProxyAddress_.empty()) {
+ url << ProxyAddress_ << "/";
return url.Str();
}
- if (!ProxyAddress.empty()) {
- url << HostPort;
+ if (!ProxyAddress_.empty()) {
+ url << HostPort_;
}
- if (IsApi) {
- url << "/api/" << TConfig::Get()->ApiVersion << "/" << Command;
+ if (IsApi_) {
+ url << "/api/" << TConfig::Get()->ApiVersion << "/" << Command_;
} else {
- url << "/" << Command;
+ url << "/" << Command_;
}
return url.Str();
@@ -284,16 +284,16 @@ TString THttpHeader::GetUrl(bool needProxy) const
bool THttpHeader::ShouldAcceptFraming() const
{
- return TConfig::Get()->CommandsWithFraming.contains(Command);
+ return TConfig::Get()->CommandsWithFraming.contains(Command_);
}
TString THttpHeader::GetHeaderAsString(const TString& hostName, const TString& requestId, bool includeParameters) const
{
TStringStream result;
- result << Method << " " << GetUrl() << " HTTP/1.1\r\n";
+ result << Method_ << " " << GetUrl() << " HTTP/1.1\r\n";
- GetHeader(HostPort.empty() ? hostName : HostPort, requestId, includeParameters).Get()->WriteTo(&result);
+ GetHeader(HostPort_.empty() ? hostName : HostPort_, requestId, includeParameters).Get()->WriteTo(&result);
if (ShouldAcceptFraming()) {
result << "X-YT-Accept-Framing: 1\r\n";
@@ -311,25 +311,25 @@ NHttp::THeadersPtrWrapper THttpHeader::GetHeader(const TString& hostName, const
headers->Add("Host", hostName);
headers->Add("User-Agent", TProcessState::Get()->ClientVersion);
- if (!Token.empty()) {
- headers->Add("Authorization", "OAuth " + Token);
+ if (!Token_.empty()) {
+ headers->Add("Authorization", "OAuth " + Token_);
}
- if (!ServiceTicket.empty()) {
- headers->Add("X-Ya-Service-Ticket", ServiceTicket);
+ if (!ServiceTicket_.empty()) {
+ headers->Add("X-Ya-Service-Ticket", ServiceTicket_);
}
- if (!ImpersonationUser.empty()) {
- headers->Add("X-Yt-User-Name", ImpersonationUser);
+ if (!ImpersonationUser_.empty()) {
+ headers->Add("X-Yt-User-Name", ImpersonationUser_);
}
- if (Method == "PUT" || Method == "POST") {
+ if (Method_ == "PUT" || Method_ == "POST") {
headers->Add("Transfer-Encoding", "chunked");
}
headers->Add("X-YT-Correlation-Id", requestId);
headers->Add("X-YT-Header-Format", "<format=text>yson");
- headers->Add("Content-Encoding", RequestCompression);
- headers->Add("Accept-Encoding", ResponseCompression);
+ headers->Add("Content-Encoding", RequestCompression_);
+ headers->Add("Accept-Encoding", ResponseCompression_);
auto printYTHeader = [&headers] (const char* headerName, const TString& value) {
static const size_t maxHttpHeaderSize = 64 << 10;
@@ -353,14 +353,14 @@ NHttp::THeadersPtrWrapper THttpHeader::GetHeader(const TString& hostName, const
} while (ptr != finish);
};
- if (InputFormat) {
- printYTHeader("X-YT-Input-Format", NodeToYsonString(InputFormat->Config));
+ if (InputFormat_) {
+ printYTHeader("X-YT-Input-Format", NodeToYsonString(InputFormat_->Config));
}
- if (OutputFormat) {
- printYTHeader("X-YT-Output-Format", NodeToYsonString(OutputFormat->Config));
+ if (OutputFormat_) {
+ printYTHeader("X-YT-Output-Format", NodeToYsonString(OutputFormat_->Config));
}
if (includeParameters) {
- printYTHeader("X-YT-Parameters", NodeToYsonString(Parameters));
+ printYTHeader("X-YT-Parameters", NodeToYsonString(Parameters_));
}
return NHttp::THeadersPtrWrapper(std::move(headers));
@@ -368,7 +368,7 @@ NHttp::THeadersPtrWrapper THttpHeader::GetHeader(const TString& hostName, const
const TString& THttpHeader::GetMethod() const
{
- return Method;
+ return Method_;
}
////////////////////////////////////////////////////////////////////////////////
@@ -679,8 +679,8 @@ THttpResponse::THttpResponse(
IInputStream* socketStream,
const TString& requestId,
const TString& hostName)
- : HttpInput_(socketStream)
- , RequestId_(requestId)
+ : RequestId_(requestId)
+ , HttpInput_(socketStream)
, HostName_(GetProxyName(HttpInput_).GetOrElse(hostName))
, Unframe_(HttpInput_.Headers().HasHeader("X-YT-Framing"))
{
@@ -887,64 +887,63 @@ size_t THttpResponse::UnframeSkip(size_t len)
////////////////////////////////////////////////////////////////////////////////
THttpRequest::THttpRequest()
-{
- RequestId = CreateGuidAsString();
-}
+ : RequestId_(CreateGuidAsString())
+{ }
THttpRequest::THttpRequest(const TString& requestId)
- : RequestId(requestId)
+ : RequestId_(requestId)
{ }
THttpRequest::~THttpRequest()
{
- if (!Connection) {
+ if (!Connection_) {
return;
}
- if (Input && Input->IsKeepAlive() && Input->IsExhausted()) {
+ if (Input_ && Input_->IsKeepAlive() && Input_->IsExhausted()) {
// We should return to the pool only connections where HTTP response was fully read.
// Otherwise next reader might read our remaining data and misinterpret them (YT-6510).
- TConnectionPool::Get()->Release(Connection);
+ TConnectionPool::Get()->Release(Connection_);
} else {
- TConnectionPool::Get()->Invalidate(HostName, Connection);
+ TConnectionPool::Get()->Invalidate(HostName_, Connection_);
}
}
TString THttpRequest::GetRequestId() const
{
- return RequestId;
+ return RequestId_;
}
void THttpRequest::Connect(TString hostName, TDuration socketTimeout)
{
- HostName = std::move(hostName);
+ HostName_ = std::move(hostName);
YT_LOG_DEBUG("REQ %v - requesting connection to %v from connection pool",
- RequestId,
- HostName);
+ RequestId_,
+ HostName_);
StartTime_ = TInstant::Now();
- Connection = TConnectionPool::Get()->Connect(HostName, socketTimeout);
+ Connection_ = TConnectionPool::Get()->Connect(HostName_, socketTimeout);
YT_LOG_DEBUG("REQ %v - connection #%v",
- RequestId,
- Connection->Id);
+ RequestId_,
+ Connection_->Id);
}
IOutputStream* THttpRequest::StartRequestImpl(const THttpHeader& header, bool includeParameters)
{
- auto strHeader = header.GetHeaderAsString(HostName, RequestId, includeParameters);
+ auto strHeader = header.GetHeaderAsString(HostName_, RequestId_, includeParameters);
Url_ = header.GetUrl(true);
- LogRequest(header, Url_, includeParameters, RequestId, HostName);
+ LogRequest(header, Url_, includeParameters, RequestId_, HostName_);
LoggedAttributes_ = GetLoggedAttributes(header, Url_, includeParameters, 128);
auto outputFormat = header.GetOutputFormat();
if (outputFormat && outputFormat->IsTextYson()) {
- LogResponse = true;
+ LogResponse_ = true;
}
- RequestStream_ = MakeHolder<TRequestStream>(this, *Connection->Socket.Get());
+ RequestStream_ = MakeHolder<TRequestStream>(this, *Connection_->Socket.Get());
RequestStream_->Write(strHeader.data(), strHeader.size());
return RequestStream_.Get();
@@ -980,17 +979,17 @@ void THttpRequest::SmallRequest(const THttpHeader& header, TMaybe<TStringBuf> bo
THttpResponse* THttpRequest::GetResponseStream()
{
- if (!Input) {
- SocketInput.Reset(new TSocketInput(*Connection->Socket.Get()));
+ if (!Input_) {
+ SocketInput_.Reset(new TSocketInput(*Connection_->Socket.Get()));
if (TConfig::Get()->UseAbortableResponse) {
Y_ABORT_UNLESS(!Url_.empty());
- Input.Reset(new TAbortableHttpResponse(SocketInput.Get(), RequestId, HostName, Url_));
+ Input_.Reset(new TAbortableHttpResponse(SocketInput_.Get(), RequestId_, HostName_, Url_));
} else {
- Input.Reset(new THttpResponse(SocketInput.Get(), RequestId, HostName));
+ Input_.Reset(new THttpResponse(SocketInput_.Get(), RequestId_, HostName_));
}
- Input->CheckErrorResponse();
+ Input_->CheckErrorResponse();
}
- return Input.Get();
+ return Input_.Get();
}
TString THttpRequest::GetResponse()
@@ -1003,15 +1002,15 @@ TString THttpRequest::GetResponse()
<< "HostName: " << GetResponseStream()->GetHostName() << "; "
<< LoggedAttributes_;
- if (LogResponse) {
+ if (LogResponse_) {
constexpr auto sizeLimit = 1 << 7;
YT_LOG_DEBUG("RSP %v - received response (Response: '%v'; %v)",
- RequestId,
+ RequestId_,
TruncateForLogs(result, sizeLimit),
loggedAttributes.Str());
} else {
YT_LOG_DEBUG("RSP %v - received response of %v bytes (%v)",
- RequestId,
+ RequestId_,
result.size(),
loggedAttributes.Str());
}
@@ -1024,8 +1023,8 @@ int THttpRequest::GetHttpCode() {
void THttpRequest::InvalidateConnection()
{
- TConnectionPool::Get()->Invalidate(HostName, Connection);
- Connection.Reset();
+ TConnectionPool::Get()->Invalidate(HostName_, Connection_);
+ Connection_.Reset();
}
////////////////////////////////////////////////////////////////////////////////
diff --git a/yt/cpp/mapreduce/http/http.h b/yt/cpp/mapreduce/http/http.h
index 95595959ad..013b40aecb 100644
--- a/yt/cpp/mapreduce/http/http.h
+++ b/yt/cpp/mapreduce/http/http.h
@@ -82,24 +82,23 @@ private:
bool ShouldAcceptFraming() const;
private:
- const TString Method;
- const TString Command;
- const bool IsApi;
-
- TNode::TMapType Parameters;
- TString ImpersonationUser;
- TString Token;
- TString ServiceTicket;
- TNode Attributes;
- TString ProxyAddress;
- TString HostPort;
-
-private:
- TMaybe<TFormat> InputFormat = TFormat::YsonText();
- TMaybe<TFormat> OutputFormat = TFormat::YsonText();
-
- TString RequestCompression = "identity";
- TString ResponseCompression = "identity";
+ const TString Method_;
+ const TString Command_;
+ const bool IsApi_;
+
+ TNode::TMapType Parameters_;
+ TString ImpersonationUser_;
+ TString Token_;
+ TString ServiceTicket_;
+ TNode Attributes_;
+ TString ProxyAddress_;
+ TString HostPort_;
+
+ TMaybe<TFormat> InputFormat_ = TFormat::YsonText();
+ TMaybe<TFormat> OutputFormat_ = TFormat::YsonText();
+
+ TString RequestCompression_ = "identity";
+ TString ResponseCompression_ = "identity";
};
////////////////////////////////////////////////////////////////////////////////
@@ -196,13 +195,16 @@ private:
bool RefreshFrameIfNecessary();
private:
- THttpInput HttpInput_;
const TString RequestId_;
+
+ THttpInput HttpInput_;
+
const TString HostName_;
+ const bool Unframe_;
+
int HttpCode_ = 0;
TMaybe<TErrorResponse> ErrorResponse_;
bool IsExhausted_ = false;
- const bool Unframe_;
size_t RemainingFrameSize_ = 0;
};
@@ -239,20 +241,21 @@ private:
class TRequestStream;
private:
- TString HostName;
- TString RequestId;
+ const TString RequestId_;
+
+ TString HostName_;
TString Url_;
TInstant StartTime_;
TString LoggedAttributes_;
- TConnectionPtr Connection;
+ TConnectionPtr Connection_;
THolder<TRequestStream> RequestStream_;
- THolder<TSocketInput> SocketInput;
- THolder<THttpResponse> Input;
+ THolder<TSocketInput> SocketInput_;
+ THolder<THttpResponse> Input_;
- bool LogResponse = false;
+ bool LogResponse_ = false;
};
////////////////////////////////////////////////////////////////////////////////
diff --git a/yt/cpp/mapreduce/http/http_client.cpp b/yt/cpp/mapreduce/http/http_client.cpp
index 305d95b06c..97bf4a7703 100644
--- a/yt/cpp/mapreduce/http/http_client.cpp
+++ b/yt/cpp/mapreduce/http/http_client.cpp
@@ -170,19 +170,21 @@ public:
auto request = std::make_unique<THttpRequest>(requestId);
auto urlRef = NHttp::ParseUrl(url);
+ auto host = CreateHost(urlRef.Host, urlRef.PortStr);
- request->Connect(CreateHost(urlRef.Host, urlRef.PortStr), config.SocketTimeout);
+ request->Connect(host, config.SocketTimeout);
request->SmallRequest(header, body);
return std::make_unique<TDefaultHttpResponse>(std::move(request));
}
IHttpRequestPtr StartRequest(const TString& url, const TString& requestId, const THttpConfig& config, const THttpHeader& header) override
{
- auto request = std::make_unique<THttpRequest>(requestId);
-
auto urlRef = NHttp::ParseUrl(url);
+ auto host = CreateHost(urlRef.Host, urlRef.PortStr);
+
+ auto request = std::make_unique<THttpRequest>(requestId);
- request->Connect(CreateHost(urlRef.Host, urlRef.PortStr), config.SocketTimeout);
+ request->Connect(host, config.SocketTimeout);
auto stream = request->StartRequest(header);
return std::make_unique<TDefaultHttpRequest>(std::move(request), stream);
}
diff --git a/yt/cpp/mapreduce/http/http_client.h b/yt/cpp/mapreduce/http/http_client.h
index 97321c4c9d..6087eca098 100644
--- a/yt/cpp/mapreduce/http/http_client.h
+++ b/yt/cpp/mapreduce/http/http_client.h
@@ -11,8 +11,6 @@
#include <util/stream/fwd.h>
-#include <memory>
-
namespace NYT::NHttpClient {
////////////////////////////////////////////////////////////////////////////////