diff options
author | ermolovd <ermolovd@yandex-team.com> | 2024-11-18 18:37:13 +0300 |
---|---|---|
committer | ermolovd <ermolovd@yandex-team.com> | 2024-11-18 19:07:19 +0300 |
commit | eb9ebcd66823f2331380c62c209c41382fcdabdb (patch) | |
tree | cbb1bf8cac576fa446fd44f96086f1fbf8e1a609 /yt/cpp/mapreduce | |
parent | 2af8e95bd4d3f701a8dd6d255b1af4ef4a205cf4 (diff) | |
download | ydb-eb9ebcd66823f2331380c62c209c41382fcdabdb.tar.gz |
cpp/mapreduce: fix style
commit_hash:4b9b636737f162b91295356e0ac17f0ea2beab20
Diffstat (limited to 'yt/cpp/mapreduce')
-rw-r--r-- | yt/cpp/mapreduce/http/http.cpp | 153 | ||||
-rw-r--r-- | yt/cpp/mapreduce/http/http.h | 55 | ||||
-rw-r--r-- | yt/cpp/mapreduce/http/http_client.cpp | 10 | ||||
-rw-r--r-- | yt/cpp/mapreduce/http/http_client.h | 2 |
4 files changed, 111 insertions, 109 deletions
diff --git a/yt/cpp/mapreduce/http/http.cpp b/yt/cpp/mapreduce/http/http.cpp index f9eb8539b5..12aa33ff29 100644 --- a/yt/cpp/mapreduce/http/http.cpp +++ b/yt/cpp/mapreduce/http/http.cpp @@ -130,16 +130,16 @@ private: //////////////////////////////////////////////////////////////////////////////// THttpHeader::THttpHeader(const TString& method, const TString& command, bool isApi) - : Method(method) - , Command(command) - , IsApi(isApi) + : Method_(method) + , Command_(command) + , IsApi_(isApi) { } void THttpHeader::AddParameter(const TString& key, TNode value, bool overwrite) { - auto it = Parameters.find(key); - if (it == Parameters.end()) { - Parameters.emplace(key, std::move(value)); + auto it = Parameters_.find(key); + if (it == Parameters_.end()) { + Parameters_.emplace(key, std::move(value)); } else { if (overwrite) { it->second = std::move(value); @@ -158,12 +158,12 @@ void THttpHeader::MergeParameters(const TNode& newParameters, bool overwrite) void THttpHeader::RemoveParameter(const TString& key) { - Parameters.erase(key); + Parameters_.erase(key); } TNode THttpHeader::GetParameters() const { - return Parameters; + return Parameters_; } void THttpHeader::AddTransactionId(const TTransactionId& transactionId, bool overwrite) @@ -202,81 +202,81 @@ void THttpHeader::AddMutationId() bool THttpHeader::HasMutationId() const { - return Parameters.contains("mutation_id"); + return Parameters_.contains("mutation_id"); } void THttpHeader::SetToken(const TString& token) { - Token = token; + Token_ = token; } void THttpHeader::SetProxyAddress(const TString& proxyAddress) { - ProxyAddress = proxyAddress; + ProxyAddress_ = proxyAddress; } void THttpHeader::SetHostPort(const TString& hostPort) { - HostPort = hostPort; + HostPort_ = hostPort; } void THttpHeader::SetImpersonationUser(const TString& impersonationUser) { - ImpersonationUser = impersonationUser; + ImpersonationUser_ = impersonationUser; } void THttpHeader::SetServiceTicket(const TString& ticket) { - ServiceTicket = ticket; + ServiceTicket_ = ticket; } void THttpHeader::SetInputFormat(const TMaybe<TFormat>& format) { - InputFormat = format; + InputFormat_ = format; } void THttpHeader::SetOutputFormat(const TMaybe<TFormat>& format) { - OutputFormat = format; + OutputFormat_ = format; } TMaybe<TFormat> THttpHeader::GetOutputFormat() const { - return OutputFormat; + return OutputFormat_; } void THttpHeader::SetRequestCompression(const TString& compression) { - RequestCompression = compression; + RequestCompression_ = compression; } void THttpHeader::SetResponseCompression(const TString& compression) { - ResponseCompression = compression; + ResponseCompression_ = compression; } TString THttpHeader::GetCommand() const { - return Command; + return Command_; } TString THttpHeader::GetUrl(bool needProxy) const { TStringStream url; - if (needProxy && !ProxyAddress.empty()) { - url << ProxyAddress << "/"; + if (needProxy && !ProxyAddress_.empty()) { + url << ProxyAddress_ << "/"; return url.Str(); } - if (!ProxyAddress.empty()) { - url << HostPort; + if (!ProxyAddress_.empty()) { + url << HostPort_; } - if (IsApi) { - url << "/api/" << TConfig::Get()->ApiVersion << "/" << Command; + if (IsApi_) { + url << "/api/" << TConfig::Get()->ApiVersion << "/" << Command_; } else { - url << "/" << Command; + url << "/" << Command_; } return url.Str(); @@ -284,16 +284,16 @@ TString THttpHeader::GetUrl(bool needProxy) const bool THttpHeader::ShouldAcceptFraming() const { - return TConfig::Get()->CommandsWithFraming.contains(Command); + return TConfig::Get()->CommandsWithFraming.contains(Command_); } TString THttpHeader::GetHeaderAsString(const TString& hostName, const TString& requestId, bool includeParameters) const { TStringStream result; - result << Method << " " << GetUrl() << " HTTP/1.1\r\n"; + result << Method_ << " " << GetUrl() << " HTTP/1.1\r\n"; - GetHeader(HostPort.empty() ? hostName : HostPort, requestId, includeParameters).Get()->WriteTo(&result); + GetHeader(HostPort_.empty() ? hostName : HostPort_, requestId, includeParameters).Get()->WriteTo(&result); if (ShouldAcceptFraming()) { result << "X-YT-Accept-Framing: 1\r\n"; @@ -311,25 +311,25 @@ NHttp::THeadersPtrWrapper THttpHeader::GetHeader(const TString& hostName, const headers->Add("Host", hostName); headers->Add("User-Agent", TProcessState::Get()->ClientVersion); - if (!Token.empty()) { - headers->Add("Authorization", "OAuth " + Token); + if (!Token_.empty()) { + headers->Add("Authorization", "OAuth " + Token_); } - if (!ServiceTicket.empty()) { - headers->Add("X-Ya-Service-Ticket", ServiceTicket); + if (!ServiceTicket_.empty()) { + headers->Add("X-Ya-Service-Ticket", ServiceTicket_); } - if (!ImpersonationUser.empty()) { - headers->Add("X-Yt-User-Name", ImpersonationUser); + if (!ImpersonationUser_.empty()) { + headers->Add("X-Yt-User-Name", ImpersonationUser_); } - if (Method == "PUT" || Method == "POST") { + if (Method_ == "PUT" || Method_ == "POST") { headers->Add("Transfer-Encoding", "chunked"); } headers->Add("X-YT-Correlation-Id", requestId); headers->Add("X-YT-Header-Format", "<format=text>yson"); - headers->Add("Content-Encoding", RequestCompression); - headers->Add("Accept-Encoding", ResponseCompression); + headers->Add("Content-Encoding", RequestCompression_); + headers->Add("Accept-Encoding", ResponseCompression_); auto printYTHeader = [&headers] (const char* headerName, const TString& value) { static const size_t maxHttpHeaderSize = 64 << 10; @@ -353,14 +353,14 @@ NHttp::THeadersPtrWrapper THttpHeader::GetHeader(const TString& hostName, const } while (ptr != finish); }; - if (InputFormat) { - printYTHeader("X-YT-Input-Format", NodeToYsonString(InputFormat->Config)); + if (InputFormat_) { + printYTHeader("X-YT-Input-Format", NodeToYsonString(InputFormat_->Config)); } - if (OutputFormat) { - printYTHeader("X-YT-Output-Format", NodeToYsonString(OutputFormat->Config)); + if (OutputFormat_) { + printYTHeader("X-YT-Output-Format", NodeToYsonString(OutputFormat_->Config)); } if (includeParameters) { - printYTHeader("X-YT-Parameters", NodeToYsonString(Parameters)); + printYTHeader("X-YT-Parameters", NodeToYsonString(Parameters_)); } return NHttp::THeadersPtrWrapper(std::move(headers)); @@ -368,7 +368,7 @@ NHttp::THeadersPtrWrapper THttpHeader::GetHeader(const TString& hostName, const const TString& THttpHeader::GetMethod() const { - return Method; + return Method_; } //////////////////////////////////////////////////////////////////////////////// @@ -679,8 +679,8 @@ THttpResponse::THttpResponse( IInputStream* socketStream, const TString& requestId, const TString& hostName) - : HttpInput_(socketStream) - , RequestId_(requestId) + : RequestId_(requestId) + , HttpInput_(socketStream) , HostName_(GetProxyName(HttpInput_).GetOrElse(hostName)) , Unframe_(HttpInput_.Headers().HasHeader("X-YT-Framing")) { @@ -887,64 +887,63 @@ size_t THttpResponse::UnframeSkip(size_t len) //////////////////////////////////////////////////////////////////////////////// THttpRequest::THttpRequest() -{ - RequestId = CreateGuidAsString(); -} + : RequestId_(CreateGuidAsString()) +{ } THttpRequest::THttpRequest(const TString& requestId) - : RequestId(requestId) + : RequestId_(requestId) { } THttpRequest::~THttpRequest() { - if (!Connection) { + if (!Connection_) { return; } - if (Input && Input->IsKeepAlive() && Input->IsExhausted()) { + if (Input_ && Input_->IsKeepAlive() && Input_->IsExhausted()) { // We should return to the pool only connections where HTTP response was fully read. // Otherwise next reader might read our remaining data and misinterpret them (YT-6510). - TConnectionPool::Get()->Release(Connection); + TConnectionPool::Get()->Release(Connection_); } else { - TConnectionPool::Get()->Invalidate(HostName, Connection); + TConnectionPool::Get()->Invalidate(HostName_, Connection_); } } TString THttpRequest::GetRequestId() const { - return RequestId; + return RequestId_; } void THttpRequest::Connect(TString hostName, TDuration socketTimeout) { - HostName = std::move(hostName); + HostName_ = std::move(hostName); YT_LOG_DEBUG("REQ %v - requesting connection to %v from connection pool", - RequestId, - HostName); + RequestId_, + HostName_); StartTime_ = TInstant::Now(); - Connection = TConnectionPool::Get()->Connect(HostName, socketTimeout); + Connection_ = TConnectionPool::Get()->Connect(HostName_, socketTimeout); YT_LOG_DEBUG("REQ %v - connection #%v", - RequestId, - Connection->Id); + RequestId_, + Connection_->Id); } IOutputStream* THttpRequest::StartRequestImpl(const THttpHeader& header, bool includeParameters) { - auto strHeader = header.GetHeaderAsString(HostName, RequestId, includeParameters); + auto strHeader = header.GetHeaderAsString(HostName_, RequestId_, includeParameters); Url_ = header.GetUrl(true); - LogRequest(header, Url_, includeParameters, RequestId, HostName); + LogRequest(header, Url_, includeParameters, RequestId_, HostName_); LoggedAttributes_ = GetLoggedAttributes(header, Url_, includeParameters, 128); auto outputFormat = header.GetOutputFormat(); if (outputFormat && outputFormat->IsTextYson()) { - LogResponse = true; + LogResponse_ = true; } - RequestStream_ = MakeHolder<TRequestStream>(this, *Connection->Socket.Get()); + RequestStream_ = MakeHolder<TRequestStream>(this, *Connection_->Socket.Get()); RequestStream_->Write(strHeader.data(), strHeader.size()); return RequestStream_.Get(); @@ -980,17 +979,17 @@ void THttpRequest::SmallRequest(const THttpHeader& header, TMaybe<TStringBuf> bo THttpResponse* THttpRequest::GetResponseStream() { - if (!Input) { - SocketInput.Reset(new TSocketInput(*Connection->Socket.Get())); + if (!Input_) { + SocketInput_.Reset(new TSocketInput(*Connection_->Socket.Get())); if (TConfig::Get()->UseAbortableResponse) { Y_ABORT_UNLESS(!Url_.empty()); - Input.Reset(new TAbortableHttpResponse(SocketInput.Get(), RequestId, HostName, Url_)); + Input_.Reset(new TAbortableHttpResponse(SocketInput_.Get(), RequestId_, HostName_, Url_)); } else { - Input.Reset(new THttpResponse(SocketInput.Get(), RequestId, HostName)); + Input_.Reset(new THttpResponse(SocketInput_.Get(), RequestId_, HostName_)); } - Input->CheckErrorResponse(); + Input_->CheckErrorResponse(); } - return Input.Get(); + return Input_.Get(); } TString THttpRequest::GetResponse() @@ -1003,15 +1002,15 @@ TString THttpRequest::GetResponse() << "HostName: " << GetResponseStream()->GetHostName() << "; " << LoggedAttributes_; - if (LogResponse) { + if (LogResponse_) { constexpr auto sizeLimit = 1 << 7; YT_LOG_DEBUG("RSP %v - received response (Response: '%v'; %v)", - RequestId, + RequestId_, TruncateForLogs(result, sizeLimit), loggedAttributes.Str()); } else { YT_LOG_DEBUG("RSP %v - received response of %v bytes (%v)", - RequestId, + RequestId_, result.size(), loggedAttributes.Str()); } @@ -1024,8 +1023,8 @@ int THttpRequest::GetHttpCode() { void THttpRequest::InvalidateConnection() { - TConnectionPool::Get()->Invalidate(HostName, Connection); - Connection.Reset(); + TConnectionPool::Get()->Invalidate(HostName_, Connection_); + Connection_.Reset(); } //////////////////////////////////////////////////////////////////////////////// diff --git a/yt/cpp/mapreduce/http/http.h b/yt/cpp/mapreduce/http/http.h index 95595959ad..013b40aecb 100644 --- a/yt/cpp/mapreduce/http/http.h +++ b/yt/cpp/mapreduce/http/http.h @@ -82,24 +82,23 @@ private: bool ShouldAcceptFraming() const; private: - const TString Method; - const TString Command; - const bool IsApi; - - TNode::TMapType Parameters; - TString ImpersonationUser; - TString Token; - TString ServiceTicket; - TNode Attributes; - TString ProxyAddress; - TString HostPort; - -private: - TMaybe<TFormat> InputFormat = TFormat::YsonText(); - TMaybe<TFormat> OutputFormat = TFormat::YsonText(); - - TString RequestCompression = "identity"; - TString ResponseCompression = "identity"; + const TString Method_; + const TString Command_; + const bool IsApi_; + + TNode::TMapType Parameters_; + TString ImpersonationUser_; + TString Token_; + TString ServiceTicket_; + TNode Attributes_; + TString ProxyAddress_; + TString HostPort_; + + TMaybe<TFormat> InputFormat_ = TFormat::YsonText(); + TMaybe<TFormat> OutputFormat_ = TFormat::YsonText(); + + TString RequestCompression_ = "identity"; + TString ResponseCompression_ = "identity"; }; //////////////////////////////////////////////////////////////////////////////// @@ -196,13 +195,16 @@ private: bool RefreshFrameIfNecessary(); private: - THttpInput HttpInput_; const TString RequestId_; + + THttpInput HttpInput_; + const TString HostName_; + const bool Unframe_; + int HttpCode_ = 0; TMaybe<TErrorResponse> ErrorResponse_; bool IsExhausted_ = false; - const bool Unframe_; size_t RemainingFrameSize_ = 0; }; @@ -239,20 +241,21 @@ private: class TRequestStream; private: - TString HostName; - TString RequestId; + const TString RequestId_; + + TString HostName_; TString Url_; TInstant StartTime_; TString LoggedAttributes_; - TConnectionPtr Connection; + TConnectionPtr Connection_; THolder<TRequestStream> RequestStream_; - THolder<TSocketInput> SocketInput; - THolder<THttpResponse> Input; + THolder<TSocketInput> SocketInput_; + THolder<THttpResponse> Input_; - bool LogResponse = false; + bool LogResponse_ = false; }; //////////////////////////////////////////////////////////////////////////////// diff --git a/yt/cpp/mapreduce/http/http_client.cpp b/yt/cpp/mapreduce/http/http_client.cpp index 305d95b06c..97bf4a7703 100644 --- a/yt/cpp/mapreduce/http/http_client.cpp +++ b/yt/cpp/mapreduce/http/http_client.cpp @@ -170,19 +170,21 @@ public: auto request = std::make_unique<THttpRequest>(requestId); auto urlRef = NHttp::ParseUrl(url); + auto host = CreateHost(urlRef.Host, urlRef.PortStr); - request->Connect(CreateHost(urlRef.Host, urlRef.PortStr), config.SocketTimeout); + request->Connect(host, config.SocketTimeout); request->SmallRequest(header, body); return std::make_unique<TDefaultHttpResponse>(std::move(request)); } IHttpRequestPtr StartRequest(const TString& url, const TString& requestId, const THttpConfig& config, const THttpHeader& header) override { - auto request = std::make_unique<THttpRequest>(requestId); - auto urlRef = NHttp::ParseUrl(url); + auto host = CreateHost(urlRef.Host, urlRef.PortStr); + + auto request = std::make_unique<THttpRequest>(requestId); - request->Connect(CreateHost(urlRef.Host, urlRef.PortStr), config.SocketTimeout); + request->Connect(host, config.SocketTimeout); auto stream = request->StartRequest(header); return std::make_unique<TDefaultHttpRequest>(std::move(request), stream); } diff --git a/yt/cpp/mapreduce/http/http_client.h b/yt/cpp/mapreduce/http/http_client.h index 97321c4c9d..6087eca098 100644 --- a/yt/cpp/mapreduce/http/http_client.h +++ b/yt/cpp/mapreduce/http/http_client.h @@ -11,8 +11,6 @@ #include <util/stream/fwd.h> -#include <memory> - namespace NYT::NHttpClient { //////////////////////////////////////////////////////////////////////////////// |