diff options
author | Alexander Smirnov <alex@ydb.tech> | 2024-11-20 11:14:58 +0000 |
---|---|---|
committer | Alexander Smirnov <alex@ydb.tech> | 2024-11-20 11:14:58 +0000 |
commit | 31773f157bf8164364649b5f470f52dece0a4317 (patch) | |
tree | 33d0f7eef45303ab68cf08ab381ce5e5e36c5240 /yt | |
parent | 2c7938962d8689e175574fc1e817c05049f27905 (diff) | |
parent | eff600952d5dfe17942f38f510a8ac2b203bb3a5 (diff) | |
download | ydb-31773f157bf8164364649b5f470f52dece0a4317.tar.gz |
Merge branch 'rightlib' into mergelibs-241120-1113
Diffstat (limited to 'yt')
106 files changed, 1295 insertions, 668 deletions
diff --git a/yt/cpp/mapreduce/client/ya.make b/yt/cpp/mapreduce/client/ya.make index 2d118b2442..599bbdc92f 100644 --- a/yt/cpp/mapreduce/client/ya.make +++ b/yt/cpp/mapreduce/client/ya.make @@ -55,9 +55,15 @@ IF (BUILD_TYPE == "PROFILE") yt/yt/library/ytprof ) - SRCS( - job_profiler.cpp - ) + IF (OPENSOURCE) + SRCS( + dummy_job_profiler.cpp + ) + ELSE() + SRCS( + job_profiler.cpp + ) + ENDIF() ELSE() SRCS( dummy_job_profiler.cpp diff --git a/yt/cpp/mapreduce/http/abortable_http_response.cpp b/yt/cpp/mapreduce/http/abortable_http_response.cpp index 9da9241d33..995bb9de4c 100644 --- a/yt/cpp/mapreduce/http/abortable_http_response.cpp +++ b/yt/cpp/mapreduce/http/abortable_http_response.cpp @@ -14,20 +14,20 @@ public: { auto g = Guard(Lock_); auto id = NextId_++; - IdToOutage.emplace(id, TOutageEntry{std::move(urlPattern), options.ResponseCount_, options.LengthLimit_}); + IdToOutage_.emplace(id, TOutageEntry{std::move(urlPattern), options.ResponseCount_, options.LengthLimit_}); return id; } void StopOutage(TOutageId id) { auto g = Guard(Lock_); - IdToOutage.erase(id); + IdToOutage_.erase(id); } void Add(IAbortableHttpResponse* response) { auto g = Guard(Lock_); - for (auto& [id, entry] : IdToOutage) { + for (auto& [id, entry] : IdToOutage_) { if (entry.Counter > 0 && response->GetUrl().find(entry.Pattern) != TString::npos) { response->SetLengthLimit(entry.LengthLimit); entry.Counter -= 1; @@ -70,7 +70,7 @@ private: private: TOutageId NextId_ = 0; TIntrusiveList<IAbortableHttpResponse> ResponseList_; - THashMap<TOutageId, TOutageEntry> IdToOutage; + THashMap<TOutageId, TOutageEntry> IdToOutage_; TMutex Lock_; }; @@ -137,11 +137,10 @@ bool TAbortableHttpResponseBase::IsAborted() const //////////////////////////////////////////////////////////////////////////////// TAbortableHttpResponse::TAbortableHttpResponse( + TRequestContext context, IInputStream* socketStream, - const TString& requestId, - const TString& hostName, const TString& url) - : THttpResponse(socketStream, requestId, hostName) + : THttpResponse(std::move(context), socketStream) , TAbortableHttpResponseBase(url) { } diff --git a/yt/cpp/mapreduce/http/abortable_http_response.h b/yt/cpp/mapreduce/http/abortable_http_response.h index d72bcfa0a6..e9b1483bf7 100644 --- a/yt/cpp/mapreduce/http/abortable_http_response.h +++ b/yt/cpp/mapreduce/http/abortable_http_response.h @@ -108,9 +108,8 @@ public: public: TAbortableHttpResponse( + TRequestContext context, IInputStream* socketStream, - const TString& requestId, - const TString& hostName, const TString& url); /// @brief Abort any responses which match `urlPattern` (i.e. contain it in url). diff --git a/yt/cpp/mapreduce/http/http.cpp b/yt/cpp/mapreduce/http/http.cpp index f9eb8539b5..ca243a929a 100644 --- a/yt/cpp/mapreduce/http/http.cpp +++ b/yt/cpp/mapreduce/http/http.cpp @@ -10,6 +10,7 @@ #include <yt/cpp/mapreduce/interface/config.h> #include <yt/cpp/mapreduce/interface/errors.h> +#include <yt/cpp/mapreduce/interface/error_codes.h> #include <yt/cpp/mapreduce/interface/logging/yt_log.h> #include <yt/yt/core/http/http.h> @@ -39,6 +40,27 @@ namespace NYT { //////////////////////////////////////////////////////////////////////////////// +std::exception_ptr WrapSystemError( + const TRequestContext& context, + const std::exception& ex) +{ + if (auto errorResponse = dynamic_cast<const TErrorResponse*>(&ex); errorResponse != nullptr) { + return std::make_exception_ptr(errorResponse); + } + + auto message = NYT::Format("Request %qv to %qv failed", context.RequestId, context.HostName + context.Method); + TYtError outer(1, message, {TYtError(NClusterErrorCodes::Generic, ex.what())}, { + {"request_id", context.RequestId}, + {"host", context.HostName}, + {"method", context.Method}, + }); + TTransportError errorResponse(std::move(outer)); + + return std::make_exception_ptr(errorResponse); +} + +//////////////////////////////////////////////////////////////////////////////// + class THttpRequest::TRequestStream : public IOutputStream { @@ -92,17 +114,17 @@ private: CheckErrorState(); try { func(); - } catch (const std::exception&) { - HandleWriteException(); + } catch (const std::exception& ex) { + HandleWriteException(ex); } } // In many cases http proxy stops reading request and resets connection // if error has happend. This function tries to read error response // in such cases. - void HandleWriteException() { + void HandleWriteException(const std::exception& ex) { Y_ABORT_UNLESS(WriteError_ == nullptr); - WriteError_ = std::current_exception(); + WriteError_ = WrapSystemError(HttpRequest_->Context_, ex); Y_ABORT_UNLESS(WriteError_ != nullptr); try { HttpRequest_->GetResponseStream(); @@ -130,16 +152,16 @@ private: //////////////////////////////////////////////////////////////////////////////// THttpHeader::THttpHeader(const TString& method, const TString& command, bool isApi) - : Method(method) - , Command(command) - , IsApi(isApi) + : Method_(method) + , Command_(command) + , IsApi_(isApi) { } void THttpHeader::AddParameter(const TString& key, TNode value, bool overwrite) { - auto it = Parameters.find(key); - if (it == Parameters.end()) { - Parameters.emplace(key, std::move(value)); + auto it = Parameters_.find(key); + if (it == Parameters_.end()) { + Parameters_.emplace(key, std::move(value)); } else { if (overwrite) { it->second = std::move(value); @@ -158,12 +180,12 @@ void THttpHeader::MergeParameters(const TNode& newParameters, bool overwrite) void THttpHeader::RemoveParameter(const TString& key) { - Parameters.erase(key); + Parameters_.erase(key); } TNode THttpHeader::GetParameters() const { - return Parameters; + return Parameters_; } void THttpHeader::AddTransactionId(const TTransactionId& transactionId, bool overwrite) @@ -202,81 +224,81 @@ void THttpHeader::AddMutationId() bool THttpHeader::HasMutationId() const { - return Parameters.contains("mutation_id"); + return Parameters_.contains("mutation_id"); } void THttpHeader::SetToken(const TString& token) { - Token = token; + Token_ = token; } void THttpHeader::SetProxyAddress(const TString& proxyAddress) { - ProxyAddress = proxyAddress; + ProxyAddress_ = proxyAddress; } void THttpHeader::SetHostPort(const TString& hostPort) { - HostPort = hostPort; + HostPort_ = hostPort; } void THttpHeader::SetImpersonationUser(const TString& impersonationUser) { - ImpersonationUser = impersonationUser; + ImpersonationUser_ = impersonationUser; } void THttpHeader::SetServiceTicket(const TString& ticket) { - ServiceTicket = ticket; + ServiceTicket_ = ticket; } void THttpHeader::SetInputFormat(const TMaybe<TFormat>& format) { - InputFormat = format; + InputFormat_ = format; } void THttpHeader::SetOutputFormat(const TMaybe<TFormat>& format) { - OutputFormat = format; + OutputFormat_ = format; } TMaybe<TFormat> THttpHeader::GetOutputFormat() const { - return OutputFormat; + return OutputFormat_; } void THttpHeader::SetRequestCompression(const TString& compression) { - RequestCompression = compression; + RequestCompression_ = compression; } void THttpHeader::SetResponseCompression(const TString& compression) { - ResponseCompression = compression; + ResponseCompression_ = compression; } TString THttpHeader::GetCommand() const { - return Command; + return Command_; } TString THttpHeader::GetUrl(bool needProxy) const { TStringStream url; - if (needProxy && !ProxyAddress.empty()) { - url << ProxyAddress << "/"; + if (needProxy && !ProxyAddress_.empty()) { + url << ProxyAddress_ << "/"; return url.Str(); } - if (!ProxyAddress.empty()) { - url << HostPort; + if (!ProxyAddress_.empty()) { + url << HostPort_; } - if (IsApi) { - url << "/api/" << TConfig::Get()->ApiVersion << "/" << Command; + if (IsApi_) { + url << "/api/" << TConfig::Get()->ApiVersion << "/" << Command_; } else { - url << "/" << Command; + url << "/" << Command_; } return url.Str(); @@ -284,16 +306,16 @@ TString THttpHeader::GetUrl(bool needProxy) const bool THttpHeader::ShouldAcceptFraming() const { - return TConfig::Get()->CommandsWithFraming.contains(Command); + return TConfig::Get()->CommandsWithFraming.contains(Command_); } TString THttpHeader::GetHeaderAsString(const TString& hostName, const TString& requestId, bool includeParameters) const { TStringStream result; - result << Method << " " << GetUrl() << " HTTP/1.1\r\n"; + result << Method_ << " " << GetUrl() << " HTTP/1.1\r\n"; - GetHeader(HostPort.empty() ? hostName : HostPort, requestId, includeParameters).Get()->WriteTo(&result); + GetHeader(HostPort_.empty() ? hostName : HostPort_, requestId, includeParameters).Get()->WriteTo(&result); if (ShouldAcceptFraming()) { result << "X-YT-Accept-Framing: 1\r\n"; @@ -311,25 +333,25 @@ NHttp::THeadersPtrWrapper THttpHeader::GetHeader(const TString& hostName, const headers->Add("Host", hostName); headers->Add("User-Agent", TProcessState::Get()->ClientVersion); - if (!Token.empty()) { - headers->Add("Authorization", "OAuth " + Token); + if (!Token_.empty()) { + headers->Add("Authorization", "OAuth " + Token_); } - if (!ServiceTicket.empty()) { - headers->Add("X-Ya-Service-Ticket", ServiceTicket); + if (!ServiceTicket_.empty()) { + headers->Add("X-Ya-Service-Ticket", ServiceTicket_); } - if (!ImpersonationUser.empty()) { - headers->Add("X-Yt-User-Name", ImpersonationUser); + if (!ImpersonationUser_.empty()) { + headers->Add("X-Yt-User-Name", ImpersonationUser_); } - if (Method == "PUT" || Method == "POST") { + if (Method_ == "PUT" || Method_ == "POST") { headers->Add("Transfer-Encoding", "chunked"); } headers->Add("X-YT-Correlation-Id", requestId); headers->Add("X-YT-Header-Format", "<format=text>yson"); - headers->Add("Content-Encoding", RequestCompression); - headers->Add("Accept-Encoding", ResponseCompression); + headers->Add("Content-Encoding", RequestCompression_); + headers->Add("Accept-Encoding", ResponseCompression_); auto printYTHeader = [&headers] (const char* headerName, const TString& value) { static const size_t maxHttpHeaderSize = 64 << 10; @@ -353,14 +375,14 @@ NHttp::THeadersPtrWrapper THttpHeader::GetHeader(const TString& hostName, const } while (ptr != finish); }; - if (InputFormat) { - printYTHeader("X-YT-Input-Format", NodeToYsonString(InputFormat->Config)); + if (InputFormat_) { + printYTHeader("X-YT-Input-Format", NodeToYsonString(InputFormat_->Config)); } - if (OutputFormat) { - printYTHeader("X-YT-Output-Format", NodeToYsonString(OutputFormat->Config)); + if (OutputFormat_) { + printYTHeader("X-YT-Output-Format", NodeToYsonString(OutputFormat_->Config)); } if (includeParameters) { - printYTHeader("X-YT-Parameters", NodeToYsonString(Parameters)); + printYTHeader("X-YT-Parameters", NodeToYsonString(Parameters_)); } return NHttp::THeadersPtrWrapper(std::move(headers)); @@ -368,7 +390,7 @@ NHttp::THeadersPtrWrapper THttpHeader::GetHeader(const TString& hostName, const const TString& THttpHeader::GetMethod() const { - return Method; + return Method_; } //////////////////////////////////////////////////////////////////////////////// @@ -667,33 +689,81 @@ SOCKET TConnectionPool::DoConnect(TAddressCache::TAddressPtr address) //////////////////////////////////////////////////////////////////////////////// -static TMaybe<TString> GetProxyName(const THttpInput& input) +class THttpResponse::THttpInputWrapped + : public IInputStream { - if (auto proxyHeader = input.Headers().FindHeader("X-YT-Proxy")) { - return proxyHeader->Value(); +public: + explicit THttpInputWrapped(TRequestContext context, IInputStream* input) + : Context_(std::move(context)) + , HttpInput_(input) + { } + + const THttpHeaders& Headers() const noexcept + { + return HttpInput_.Headers(); + } + + const TString& FirstLine() const noexcept + { + return HttpInput_.FirstLine(); + } + + bool IsKeepAlive() const noexcept + { + return HttpInput_.IsKeepAlive(); + } + + const TMaybe<THttpHeaders>& Trailers() const noexcept + { + return HttpInput_.Trailers(); } - return Nothing(); -} + +private: + size_t DoRead(void* buf, size_t len) override + { + try { + return HttpInput_.Read(buf, len); + } catch (const std::exception& ex) { + auto wrapped = WrapSystemError(Context_, ex); + std::rethrow_exception(wrapped); + } + } + + size_t DoSkip(size_t len) override + { + try { + return HttpInput_.Skip(len); + } catch (const std::exception& ex) { + auto wrapped = WrapSystemError(Context_, ex); + std::rethrow_exception(wrapped); + } + } + +private: + const TRequestContext Context_; + THttpInput HttpInput_; +}; THttpResponse::THttpResponse( - IInputStream* socketStream, - const TString& requestId, - const TString& hostName) - : HttpInput_(socketStream) - , RequestId_(requestId) - , HostName_(GetProxyName(HttpInput_).GetOrElse(hostName)) - , Unframe_(HttpInput_.Headers().HasHeader("X-YT-Framing")) -{ - HttpCode_ = ParseHttpRetCode(HttpInput_.FirstLine()); + TRequestContext context, + IInputStream* socketStream) + : HttpInput_(MakeHolder<THttpInputWrapped>(context, socketStream)) + , Unframe_(HttpInput_->Headers().HasHeader("X-YT-Framing")) + , Context_(std::move(context)) +{ + if (auto proxyHeader = HttpInput_->Headers().FindHeader("X-YT-Proxy")) { + Context_.HostName = proxyHeader->Value(); + } + HttpCode_ = ParseHttpRetCode(HttpInput_->FirstLine()); if (HttpCode_ == 200 || HttpCode_ == 202) { return; } - ErrorResponse_ = TErrorResponse(HttpCode_, RequestId_); + ErrorResponse_ = TErrorResponse(HttpCode_, Context_.RequestId); auto logAndSetError = [&] (const TString& rawError) { YT_LOG_ERROR("RSP %v - HTTP %v - %v", - RequestId_, + Context_.RequestId, HttpCode_, rawError.data()); ErrorResponse_->SetRawError(rawError); @@ -705,26 +775,26 @@ THttpResponse::THttpResponse( break; case 500: - logAndSetError(::TStringBuilder() << "internal error in proxy " << HostName_); + logAndSetError(::TStringBuilder() << "internal error in proxy " << Context_.HostName); break; default: { TStringStream httpHeaders; httpHeaders << "HTTP headers ("; - for (const auto& header : HttpInput_.Headers()) { + for (const auto& header : HttpInput_->Headers()) { httpHeaders << header.Name() << ": " << header.Value() << "; "; } httpHeaders << ")"; auto errorString = Sprintf("RSP %s - HTTP %d - %s", - RequestId_.data(), + Context_.RequestId.data(), HttpCode_, httpHeaders.Str().data()); YT_LOG_ERROR("%v", errorString.data()); - if (auto parsedResponse = ParseError(HttpInput_.Headers())) { + if (auto parsedResponse = ParseError(HttpInput_->Headers())) { ErrorResponse_ = parsedResponse.GetRef(); } else { ErrorResponse_->SetRawError( @@ -735,9 +805,12 @@ THttpResponse::THttpResponse( } } +THttpResponse::~THttpResponse() +{ } + const THttpHeaders& THttpResponse::Headers() const { - return HttpInput_.Headers(); + return HttpInput_->Headers(); } void THttpResponse::CheckErrorResponse() const @@ -759,19 +832,19 @@ int THttpResponse::GetHttpCode() const const TString& THttpResponse::GetHostName() const { - return HostName_; + return Context_.HostName; } bool THttpResponse::IsKeepAlive() const { - return HttpInput_.IsKeepAlive(); + return HttpInput_->IsKeepAlive(); } TMaybe<TErrorResponse> THttpResponse::ParseError(const THttpHeaders& headers) { for (const auto& header : headers) { if (header.Name() == "X-YT-Error") { - TErrorResponse errorResponse(HttpCode_, RequestId_); + TErrorResponse errorResponse(HttpCode_, Context_.RequestId); errorResponse.ParseFromJsonError(header.Value()); if (errorResponse.IsOk()) { return Nothing(); @@ -788,14 +861,14 @@ size_t THttpResponse::DoRead(void* buf, size_t len) if (Unframe_) { read = UnframeRead(buf, len); } else { - read = HttpInput_.Read(buf, len); + read = HttpInput_->Read(buf, len); } if (read == 0 && len != 0) { // THttpInput MUST return defined (but may be empty) // trailers when it is exhausted. - Y_ABORT_UNLESS(HttpInput_.Trailers().Defined(), + Y_ABORT_UNLESS(HttpInput_->Trailers().Defined(), "trailers MUST be defined for exhausted stream"); - CheckTrailers(HttpInput_.Trailers().GetRef()); + CheckTrailers(HttpInput_->Trailers().GetRef()); IsExhausted_ = true; } return read; @@ -807,14 +880,14 @@ size_t THttpResponse::DoSkip(size_t len) if (Unframe_) { skipped = UnframeSkip(len); } else { - skipped = HttpInput_.Skip(len); + skipped = HttpInput_->Skip(len); } if (skipped == 0 && len != 0) { // THttpInput MUST return defined (but may be empty) // trailers when it is exhausted. - Y_ABORT_UNLESS(HttpInput_.Trailers().Defined(), + Y_ABORT_UNLESS(HttpInput_->Trailers().Defined(), "trailers MUST be defined for exhausted stream"); - CheckTrailers(HttpInput_.Trailers().GetRef()); + CheckTrailers(HttpInput_->Trailers().GetRef()); IsExhausted_ = true; } return skipped; @@ -825,13 +898,13 @@ void THttpResponse::CheckTrailers(const THttpHeaders& trailers) if (auto errorResponse = ParseError(trailers)) { errorResponse->SetIsFromTrailers(true); YT_LOG_ERROR("RSP %v - %v", - RequestId_, + Context_.RequestId, errorResponse.GetRef().what()); ythrow errorResponse.GetRef(); } } -static ui32 ReadDataFrameSize(THttpInput* stream) +static ui32 ReadDataFrameSize(IInputStream* stream) { ui32 littleEndianSize; auto read = stream->Load(&littleEndianSize, sizeof(littleEndianSize)); @@ -846,7 +919,7 @@ bool THttpResponse::RefreshFrameIfNecessary() { while (RemainingFrameSize_ == 0) { ui8 frameTypeByte; - auto read = HttpInput_.Read(&frameTypeByte, sizeof(frameTypeByte)); + auto read = HttpInput_->Read(&frameTypeByte, sizeof(frameTypeByte)); if (read == 0) { return false; } @@ -855,7 +928,7 @@ bool THttpResponse::RefreshFrameIfNecessary() case EFrameType::KeepAlive: break; case EFrameType::Data: - RemainingFrameSize_ = ReadDataFrameSize(&HttpInput_); + RemainingFrameSize_ = ReadDataFrameSize(HttpInput_.Get()); break; default: ythrow yexception() << "Bad frame type " << static_cast<int>(frameTypeByte); @@ -869,7 +942,7 @@ size_t THttpResponse::UnframeRead(void* buf, size_t len) if (!RefreshFrameIfNecessary()) { return 0; } - auto read = HttpInput_.Read(buf, Min(len, RemainingFrameSize_)); + auto read = HttpInput_->Read(buf, Min(len, RemainingFrameSize_)); RemainingFrameSize_ -= read; return read; } @@ -879,80 +952,83 @@ size_t THttpResponse::UnframeSkip(size_t len) if (!RefreshFrameIfNecessary()) { return 0; } - auto skipped = HttpInput_.Skip(Min(len, RemainingFrameSize_)); + auto skipped = HttpInput_->Skip(Min(len, RemainingFrameSize_)); RemainingFrameSize_ -= skipped; return skipped; } //////////////////////////////////////////////////////////////////////////////// -THttpRequest::THttpRequest() -{ - RequestId = CreateGuidAsString(); -} - -THttpRequest::THttpRequest(const TString& requestId) - : RequestId(requestId) +THttpRequest::THttpRequest(TString requestId, TString hostName, THttpHeader header, TDuration socketTimeout) + : Context_(TRequestContext{ + .RequestId = std::move(requestId), + .HostName = std::move(hostName), + .Method = header.GetUrl(/*needProxy=*/ false) + }) + , Header_(std::move(header)) + , Url_(Header_.GetUrl(true)) + , SocketTimeout_(socketTimeout) { } THttpRequest::~THttpRequest() { - if (!Connection) { + if (!Connection_) { return; } - if (Input && Input->IsKeepAlive() && Input->IsExhausted()) { + if (Input_ && Input_->IsKeepAlive() && Input_->IsExhausted()) { // We should return to the pool only connections where HTTP response was fully read. // Otherwise next reader might read our remaining data and misinterpret them (YT-6510). - TConnectionPool::Get()->Release(Connection); + TConnectionPool::Get()->Release(Connection_); } else { - TConnectionPool::Get()->Invalidate(HostName, Connection); + TConnectionPool::Get()->Invalidate(Context_.HostName, Connection_); } } TString THttpRequest::GetRequestId() const { - return RequestId; + return Context_.RequestId; } -void THttpRequest::Connect(TString hostName, TDuration socketTimeout) +IOutputStream* THttpRequest::StartRequestImpl(bool includeParameters) { - HostName = std::move(hostName); YT_LOG_DEBUG("REQ %v - requesting connection to %v from connection pool", - RequestId, - HostName); + Context_.RequestId, + Context_.HostName); StartTime_ = TInstant::Now(); - Connection = TConnectionPool::Get()->Connect(HostName, socketTimeout); + + try { + Connection_ = TConnectionPool::Get()->Connect(Context_.HostName, SocketTimeout_); + } catch (const std::exception& ex) { + auto wrapped = WrapSystemError(Context_, ex); + std::rethrow_exception(wrapped); + } YT_LOG_DEBUG("REQ %v - connection #%v", - RequestId, - Connection->Id); -} + Context_.RequestId, + Connection_->Id); -IOutputStream* THttpRequest::StartRequestImpl(const THttpHeader& header, bool includeParameters) -{ - auto strHeader = header.GetHeaderAsString(HostName, RequestId, includeParameters); - Url_ = header.GetUrl(true); + auto strHeader = Header_.GetHeaderAsString(Context_.HostName, Context_.RequestId, includeParameters); - LogRequest(header, Url_, includeParameters, RequestId, HostName); + LogRequest(Header_, Url_, includeParameters, Context_.RequestId, Context_.HostName); - LoggedAttributes_ = GetLoggedAttributes(header, Url_, includeParameters, 128); + LoggedAttributes_ = GetLoggedAttributes(Header_, Url_, includeParameters, 128); - auto outputFormat = header.GetOutputFormat(); + auto outputFormat = Header_.GetOutputFormat(); if (outputFormat && outputFormat->IsTextYson()) { - LogResponse = true; + LogResponse_ = true; } - RequestStream_ = MakeHolder<TRequestStream>(this, *Connection->Socket.Get()); + RequestStream_ = MakeHolder<TRequestStream>(this, *Connection_->Socket.Get()); RequestStream_->Write(strHeader.data(), strHeader.size()); return RequestStream_.Get(); } -IOutputStream* THttpRequest::StartRequest(const THttpHeader& header) +IOutputStream* THttpRequest::StartRequest() { - return StartRequestImpl(header, true); + return StartRequestImpl(true); } void THttpRequest::FinishRequest() @@ -961,16 +1037,16 @@ void THttpRequest::FinishRequest() RequestStream_->Finish(); } -void THttpRequest::SmallRequest(const THttpHeader& header, TMaybe<TStringBuf> body) +void THttpRequest::SmallRequest(TMaybe<TStringBuf> body) { - if (!body && (header.GetMethod() == "PUT" || header.GetMethod() == "POST")) { - const auto& parameters = header.GetParameters(); + if (!body && (Header_.GetMethod() == "PUT" || Header_.GetMethod() == "POST")) { + const auto& parameters = Header_.GetParameters(); auto parametersStr = NodeToYsonString(parameters); - auto* output = StartRequestImpl(header, false); + auto* output = StartRequestImpl(false); output->Write(parametersStr); FinishRequest(); } else { - auto* output = StartRequest(header); + auto* output = StartRequest(); if (body) { output->Write(*body); } @@ -980,17 +1056,17 @@ void THttpRequest::SmallRequest(const THttpHeader& header, TMaybe<TStringBuf> bo THttpResponse* THttpRequest::GetResponseStream() { - if (!Input) { - SocketInput.Reset(new TSocketInput(*Connection->Socket.Get())); + if (!Input_) { + SocketInput_.Reset(new TSocketInput(*Connection_->Socket.Get())); if (TConfig::Get()->UseAbortableResponse) { Y_ABORT_UNLESS(!Url_.empty()); - Input.Reset(new TAbortableHttpResponse(SocketInput.Get(), RequestId, HostName, Url_)); + Input_.Reset(new TAbortableHttpResponse(Context_, SocketInput_.Get(), Url_)); } else { - Input.Reset(new THttpResponse(SocketInput.Get(), RequestId, HostName)); + Input_.Reset(new THttpResponse(Context_, SocketInput_.Get())); } - Input->CheckErrorResponse(); + Input_->CheckErrorResponse(); } - return Input.Get(); + return Input_.Get(); } TString THttpRequest::GetResponse() @@ -1003,15 +1079,15 @@ TString THttpRequest::GetResponse() << "HostName: " << GetResponseStream()->GetHostName() << "; " << LoggedAttributes_; - if (LogResponse) { + if (LogResponse_) { constexpr auto sizeLimit = 1 << 7; YT_LOG_DEBUG("RSP %v - received response (Response: '%v'; %v)", - RequestId, + Context_.RequestId, TruncateForLogs(result, sizeLimit), loggedAttributes.Str()); } else { YT_LOG_DEBUG("RSP %v - received response of %v bytes (%v)", - RequestId, + Context_.RequestId, result.size(), loggedAttributes.Str()); } @@ -1024,8 +1100,8 @@ int THttpRequest::GetHttpCode() { void THttpRequest::InvalidateConnection() { - TConnectionPool::Get()->Invalidate(HostName, Connection); - Connection.Reset(); + TConnectionPool::Get()->Invalidate(Context_.HostName, Connection_); + Connection_.Reset(); } //////////////////////////////////////////////////////////////////////////////// diff --git a/yt/cpp/mapreduce/http/http.h b/yt/cpp/mapreduce/http/http.h index 95595959ad..618b1e2c22 100644 --- a/yt/cpp/mapreduce/http/http.h +++ b/yt/cpp/mapreduce/http/http.h @@ -39,6 +39,12 @@ enum class EFrameType KeepAlive = 0x02, }; +struct TRequestContext +{ + TString RequestId; + TString HostName; + TString Method; +}; class THttpHeader { @@ -82,24 +88,23 @@ private: bool ShouldAcceptFraming() const; private: - const TString Method; - const TString Command; - const bool IsApi; - - TNode::TMapType Parameters; - TString ImpersonationUser; - TString Token; - TString ServiceTicket; - TNode Attributes; - TString ProxyAddress; - TString HostPort; - -private: - TMaybe<TFormat> InputFormat = TFormat::YsonText(); - TMaybe<TFormat> OutputFormat = TFormat::YsonText(); - - TString RequestCompression = "identity"; - TString ResponseCompression = "identity"; + const TString Method_; + const TString Command_; + const bool IsApi_; + + TNode::TMapType Parameters_; + TString ImpersonationUser_; + TString Token_; + TString ServiceTicket_; + TNode Attributes_; + TString ProxyAddress_; + TString HostPort_; + + TMaybe<TFormat> InputFormat_ = TFormat::YsonText(); + TMaybe<TFormat> OutputFormat_ = TFormat::YsonText(); + + TString RequestCompression_ = "identity"; + TString ResponseCompression_ = "identity"; }; //////////////////////////////////////////////////////////////////////////////// @@ -172,9 +177,10 @@ public: // 'requestId' and 'hostName' are provided for debug reasons // (they will appear in some error messages). THttpResponse( - IInputStream* socketStream, - const TString& requestId, - const TString& hostName); + TRequestContext context, + IInputStream* socketStream); + + ~THttpResponse(); const THttpHeaders& Headers() const; @@ -196,13 +202,17 @@ private: bool RefreshFrameIfNecessary(); private: - THttpInput HttpInput_; - const TString RequestId_; - const TString HostName_; + class THttpInputWrapped; + +private: + THolder<THttpInputWrapped> HttpInput_; + + const bool Unframe_; + + TRequestContext Context_; int HttpCode_ = 0; TMaybe<TErrorResponse> ErrorResponse_; bool IsExhausted_ = false; - const bool Unframe_; size_t RemainingFrameSize_ = 0; }; @@ -211,18 +221,15 @@ private: class THttpRequest { public: - THttpRequest(); - THttpRequest(const TString& requestId); + THttpRequest(TString requestId, TString hostName, THttpHeader header, TDuration socketTimeout); ~THttpRequest(); TString GetRequestId() const; - void Connect(TString hostName, TDuration socketTimeout = TDuration::Zero()); - - IOutputStream* StartRequest(const THttpHeader& header); + IOutputStream* StartRequest(); void FinishRequest(); - void SmallRequest(const THttpHeader& header, TMaybe<TStringBuf> body); + void SmallRequest(TMaybe<TStringBuf> body); THttpResponse* GetResponseStream(); @@ -233,26 +240,28 @@ public: int GetHttpCode(); private: - IOutputStream* StartRequestImpl(const THttpHeader& header, bool includeParameters); + IOutputStream* StartRequestImpl(bool includeParameters); private: class TRequestStream; private: - TString HostName; - TString RequestId; - TString Url_; + const TRequestContext Context_; + const THttpHeader Header_; + const TString Url_; + const TDuration SocketTimeout_; + TInstant StartTime_; TString LoggedAttributes_; - TConnectionPtr Connection; + TConnectionPtr Connection_; THolder<TRequestStream> RequestStream_; - THolder<TSocketInput> SocketInput; - THolder<THttpResponse> Input; + THolder<TSocketInput> SocketInput_; + THolder<THttpResponse> Input_; - bool LogResponse = false; + bool LogResponse_ = false; }; //////////////////////////////////////////////////////////////////////////////// diff --git a/yt/cpp/mapreduce/http/http_client.cpp b/yt/cpp/mapreduce/http/http_client.cpp index 305d95b06c..7e9d761c3c 100644 --- a/yt/cpp/mapreduce/http/http_client.cpp +++ b/yt/cpp/mapreduce/http/http_client.cpp @@ -167,23 +167,23 @@ class TDefaultHttpClient public: IHttpResponsePtr Request(const TString& url, const TString& requestId, const THttpConfig& config, const THttpHeader& header, TMaybe<TStringBuf> body) override { - auto request = std::make_unique<THttpRequest>(requestId); - auto urlRef = NHttp::ParseUrl(url); + auto host = CreateHost(urlRef.Host, urlRef.PortStr); + + auto request = std::make_unique<THttpRequest>(requestId, host, header, config.SocketTimeout); - request->Connect(CreateHost(urlRef.Host, urlRef.PortStr), config.SocketTimeout); - request->SmallRequest(header, body); + request->SmallRequest(body); return std::make_unique<TDefaultHttpResponse>(std::move(request)); } IHttpRequestPtr StartRequest(const TString& url, const TString& requestId, const THttpConfig& config, const THttpHeader& header) override { - auto request = std::make_unique<THttpRequest>(requestId); - auto urlRef = NHttp::ParseUrl(url); + auto host = CreateHost(urlRef.Host, urlRef.PortStr); + + auto request = std::make_unique<THttpRequest>(requestId, host, header, config.SocketTimeout); - request->Connect(CreateHost(urlRef.Host, urlRef.PortStr), config.SocketTimeout); - auto stream = request->StartRequest(header); + auto stream = request->StartRequest(); return std::make_unique<TDefaultHttpRequest>(std::move(request), stream); } }; diff --git a/yt/cpp/mapreduce/http/http_client.h b/yt/cpp/mapreduce/http/http_client.h index 97321c4c9d..6087eca098 100644 --- a/yt/cpp/mapreduce/http/http_client.h +++ b/yt/cpp/mapreduce/http/http_client.h @@ -11,8 +11,6 @@ #include <util/stream/fwd.h> -#include <memory> - namespace NYT::NHttpClient { //////////////////////////////////////////////////////////////////////////////// diff --git a/yt/cpp/mapreduce/http/requests.cpp b/yt/cpp/mapreduce/http/requests.cpp index 7d95a10bc2..cd610d6493 100644 --- a/yt/cpp/mapreduce/http/requests.cpp +++ b/yt/cpp/mapreduce/http/requests.cpp @@ -2,7 +2,6 @@ #include "context.h" #include "host_manager.h" -#include "retry_request.h" #include <yt/cpp/mapreduce/client/transaction.h> diff --git a/yt/cpp/mapreduce/http/retry_request.cpp b/yt/cpp/mapreduce/http/retry_request.cpp index 307e310b5b..0c719eeda3 100644 --- a/yt/cpp/mapreduce/http/retry_request.cpp +++ b/yt/cpp/mapreduce/http/retry_request.cpp @@ -114,7 +114,7 @@ TResponseInfo RetryRequestWithPolicy( return Request(context, header, body, requestId, config); } catch (const TErrorResponse& e) { - LogRequestError(requestId, header, e.GetError().GetMessage(), retryPolicy->GetAttemptDescription()); + LogRequestError(requestId, header, e.what(), retryPolicy->GetAttemptDescription()); retryWithSameMutationId = e.IsTransportError(); if (!IsRetriable(e)) { diff --git a/yt/cpp/mapreduce/http/ut/connection_pool_ut.cpp b/yt/cpp/mapreduce/http/ut/connection_pool_ut.cpp index 90196246c5..fa072675fb 100644 --- a/yt/cpp/mapreduce/http/ut/connection_pool_ut.cpp +++ b/yt/cpp/mapreduce/http/ut/connection_pool_ut.cpp @@ -72,11 +72,10 @@ THolder<TSimpleServer> CreateProxyHttpServer() ui64 port; ParseFirstLine(inputStr, method, host, port, command); - THttpRequest request; const TString hostName = ::TStringBuilder() << host << ":" << port; - request.Connect(hostName); auto header = THttpHeader(method, command); - request.StartRequest(header); + THttpRequest request("0-0-0-0", hostName, header, TDuration::Zero()); + request.StartRequest(); request.FinishRequest(); auto res = request.GetResponseStream(); THttpOutput httpOutput(output); @@ -138,9 +137,8 @@ TEST(TConnectionPool, TestReleaseUnread) const TString hostName = ::TStringBuilder() << "localhost:" << simpleServer->GetPort(); for (size_t i = 0; i != 10; ++i) { - THttpRequest request; - request.Connect(hostName); - request.StartRequest(THttpHeader("GET", "foo")); + THttpRequest request("0-0-0-0", hostName, THttpHeader("GET", "foo"), TDuration::Zero()); + request.StartRequest(); request.FinishRequest(); request.GetResponseStream(); } @@ -155,12 +153,12 @@ TEST(TConnectionPool, TestProxy) const TString hostName2 = ::TStringBuilder() << "localhost:" << simpleServer2->GetPort(); for (size_t i = 0; i != 10; ++i) { - THttpRequest request; - request.Connect(hostName2); auto header = THttpHeader("GET", "foo"); header.SetProxyAddress(hostName2); header.SetHostPort(hostName); - request.StartRequest(header); + + THttpRequest request("0-0-0-0", hostName2, header, TDuration::Zero()); + request.StartRequest(); request.FinishRequest(); request.GetResponseStream(); } @@ -176,9 +174,8 @@ TEST(TConnectionPool, TestConcurrency) const auto func = [&] { for (int i = 0; i != 100; ++i) { - THttpRequest request; - request.Connect(hostName); - request.StartRequest(THttpHeader("GET", "foo")); + THttpRequest request("0-0-0-0", hostName, THttpHeader("GET", "foo"), TDuration::Zero()); + request.StartRequest(); request.FinishRequest(); auto res = request.GetResponseStream(); res->ReadAll(); diff --git a/yt/cpp/mapreduce/http/ut/http_ut.cpp b/yt/cpp/mapreduce/http/ut/http_ut.cpp index ca260841d0..e41e83c5a0 100644 --- a/yt/cpp/mapreduce/http/ut/http_ut.cpp +++ b/yt/cpp/mapreduce/http/ut/http_ut.cpp @@ -70,9 +70,8 @@ TEST(TFramingTest, FramingSimple) { auto server = CreateFramingEchoServer(); - THttpRequest request; - request.Connect(server->GetAddress()); - auto requestStream = request.StartRequest(THttpHeader("POST", "concatenate")); + THttpRequest request("0-0-0-0", server->GetAddress(), THttpHeader("POST", "concatenate"), TDuration::Zero()); + auto requestStream = request.StartRequest(); *requestStream << "Some funny data"; request.FinishRequest(); auto response = request.GetResponseStream()->ReadAll(); @@ -83,9 +82,8 @@ TEST(TFramingTest, FramingLarge) { auto server = CreateFramingEchoServer(); - THttpRequest request; - request.Connect(server->GetAddress()); - auto requestStream = request.StartRequest(THttpHeader("POST", "concatenate")); + THttpRequest request("0-0-0-0", server->GetAddress(), THttpHeader("POST", "concatenate"), TDuration::Zero()); + auto requestStream = request.StartRequest(); auto data = TString(100000, 'x'); *requestStream << data; request.FinishRequest(); diff --git a/yt/cpp/mapreduce/interface/errors.cpp b/yt/cpp/mapreduce/interface/errors.cpp index ef3d2db4a3..0819c8ca56 100644 --- a/yt/cpp/mapreduce/interface/errors.cpp +++ b/yt/cpp/mapreduce/interface/errors.cpp @@ -20,20 +20,11 @@ using namespace NJson; static void WriteErrorDescription(const TYtError& error, IOutputStream* out) { - (*out) << '\'' << error.GetMessage() << '\''; + (*out) << error.GetMessage(); const auto& innerErrorList = error.InnerErrors(); if (!innerErrorList.empty()) { - (*out) << " { "; - bool first = true; - for (const auto& innerError : innerErrorList) { - if (first) { - first = false; - } else { - (*out) << " ; "; - } - WriteErrorDescription(innerError, out); - } - (*out) << " }"; + (*out) << ": "; + WriteErrorDescription(innerErrorList[0], out); } } @@ -118,9 +109,11 @@ TYtError::TYtError(const TString& message) , Message_(message) { } -TYtError::TYtError(int code, const TString& message) +TYtError::TYtError(int code, TString message, TVector<TYtError> innerError, TNode::TMapType attributes) : Code_(code) , Message_(message) + , InnerErrors_(innerError) + , Attributes_(attributes) { } TYtError::TYtError(const TJsonValue& value) @@ -396,6 +389,11 @@ void TErrorResponse::Setup() *this << Error_.FullDescription(); } +TTransportError::TTransportError(TYtError error) +{ + *this << error.FullDescription(); +} + //////////////////////////////////////////////////////////////////////////////// TOperationFailedError::TOperationFailedError( diff --git a/yt/cpp/mapreduce/interface/errors.h b/yt/cpp/mapreduce/interface/errors.h index afad58ed72..1311dbcf3d 100644 --- a/yt/cpp/mapreduce/interface/errors.h +++ b/yt/cpp/mapreduce/interface/errors.h @@ -6,14 +6,14 @@ /// Errors and exceptions emitted by library. #include "fwd.h" -#include "common.h" #include <library/cpp/yson/node/node.h> #include <util/generic/bt_exception.h> -#include <util/generic/yexception.h> +#include <util/generic/guid.h> #include <util/generic/string.h> #include <util/generic/vector.h> +#include <util/generic/yexception.h> namespace NJson { class TJsonValue; @@ -67,8 +67,8 @@ public: /// Constructs error with NYT::NClusterErrorCodes::Generic code and given message. explicit TYtError(const TString& message); - /// Constructs error with given code and given message. - TYtError(int code, const TString& message); + /// Constructs error from given parameters. + TYtError(int code, TString message, TVector<TYtError> inner = {}, TNode::TMapType attributes = {}); /// Construct error from json representation. TYtError(const ::NJson::TJsonValue& value); @@ -158,7 +158,6 @@ class TErrorResponse { public: TErrorResponse(int httpCode, const TString& requestId); - TErrorResponse(int httpCode, TYtError error); /// Get error object returned by server. const TYtError& GetError() const; @@ -222,6 +221,16 @@ private: //////////////////////////////////////////////////////////////////////////////// +/// @brief System error indicating that response from server cannot be received +class TTransportError + : public yexception +{ +public: + explicit TTransportError(TYtError error); +}; + +//////////////////////////////////////////////////////////////////////////////// + /// Info about failed jobs. /// /// @see NYT::TOperationFailedError diff --git a/yt/cpp/mapreduce/raw_client/raw_requests.cpp b/yt/cpp/mapreduce/raw_client/raw_requests.cpp index 59868c599e..98a8aa0792 100644 --- a/yt/cpp/mapreduce/raw_client/raw_requests.cpp +++ b/yt/cpp/mapreduce/raw_client/raw_requests.cpp @@ -728,7 +728,6 @@ private: } private: - THttpRequest Request_; NHttpClient::IHttpResponsePtr Response_; IInputStream* ResponseStream_; }; diff --git a/yt/yt/client/api/client_common.h b/yt/yt/client/api/client_common.h index a5569fb074..373d4ac61b 100644 --- a/yt/yt/client/api/client_common.h +++ b/yt/yt/client/api/client_common.h @@ -141,20 +141,20 @@ struct TSelectRowsOptionsBase , public TSuppressableAccessTrackingOptions { //! Limits range expanding. - ui64 RangeExpansionLimit = 200000; - //! Limits maximum parallel subqueries. - int MaxSubqueries = std::numeric_limits<int>::max(); + ui64 RangeExpansionLimit = 200'000; //! Limits parallel subqueries by row count. - ui64 MinRowCountPerSubquery = 100'000; + i64 MinRowCountPerSubquery = 100'000; //! Path in Cypress with UDFs. std::optional<TString> UdfRegistryPath; + //! Limits maximum parallel subqueries. + int MaxSubqueries = std::numeric_limits<int>::max(); + //! Query language syntax version. + int SyntaxVersion = 1; //! If |true| then logging is more verbose. bool VerboseLogging = false; // COMPAT(lukyan) //! Use fixed and rewritten range inference. bool NewRangeInference = true; - //! Query language syntax version. - int SyntaxVersion = 1; }; DEFINE_ENUM(EExecutionBackend, @@ -169,16 +169,8 @@ struct TSelectRowsOptions std::optional<i64> InputRowLimit; //! If null then connection defaults are used. std::optional<i64> OutputRowLimit; - //! Allow queries without any condition on key columns. - bool AllowFullScan = true; - //! Allow queries with join condition which implies foreign query with IN operator. - bool AllowJoinWithoutIndex = false; //! Execution pool. std::optional<TString> ExecutionPool; - //! If |true| then incomplete result would lead to a failure. - bool FailOnIncompleteResult = true; - //! Enables generated code caching. - bool EnableCodeCache = true; //! Used to prioritize requests. TUserWorkloadDescriptor WorkloadDescriptor; //! Memory limit per execution node. @@ -189,10 +181,6 @@ struct TSelectRowsOptions NYson::TYsonString PlaceholderValues; //! Native or WebAssembly execution backend. std::optional<EExecutionBackend> ExecutionBackend; - //! Enables canonical SQL behaviour for relational operators, i.e. null </=/> value -> null. - bool UseCanonicalNullRelations = false; - //! Merge versioned rows from different stores when reading. - bool MergeVersionedRows = true; //! Expected schemas for tables in a query (used for replica fallback in replicated tables). using TExpectedTableSchemas = THashMap<NYPath::TYPath, NTableClient::TTableSchemaPtr>; TExpectedTableSchemas ExpectedTableSchemas; @@ -200,6 +188,18 @@ struct TSelectRowsOptions NTableClient::TVersionedReadOptions VersionedReadOptions; //! Explicitly allow or forbid the usage of row cache. std::optional<bool> UseLookupCache; + //! Allow queries without any condition on key columns. + bool AllowFullScan = true; + //! Allow queries with join condition which implies foreign query with IN operator. + bool AllowJoinWithoutIndex = false; + //! If |true| then incomplete result would lead to a failure. + bool FailOnIncompleteResult = true; + //! Enables generated code caching. + bool EnableCodeCache = true; + //! Enables canonical SQL behaviour for relational operators, i.e. null </=/> value -> null. + bool UseCanonicalNullRelations = false; + //! Merge versioned rows from different stores when reading. + bool MergeVersionedRows = true; }; struct TFallbackReplicaOptions diff --git a/yt/yt/client/api/rpc_proxy/client_impl.cpp b/yt/yt/client/api/rpc_proxy/client_impl.cpp index f90c95f2c0..cd93d3f214 100644 --- a/yt/yt/client/api/rpc_proxy/client_impl.cpp +++ b/yt/yt/client/api/rpc_proxy/client_impl.cpp @@ -112,7 +112,7 @@ IChannelPtr TClient::CreateSequoiaAwareRetryingChannel(IChannelPtr channel, bool IChannelPtr TClient::CreateNonRetryingChannelByAddress(const std::string& address) const { return CreateCredentialsInjectingChannel( - Connection_->CreateChannelByAddress(TString(address)), + Connection_->CreateChannelByAddress(address), ClientOptions_); } @@ -629,7 +629,7 @@ TFuture<std::vector<TTabletInfo>> TClient::GetTabletInfos( auto& currentReplica = tabletInfo.TableReplicaInfos->emplace_back(); currentReplica.ReplicaId = FromProto<TGuid>(protoReplicaInfo.replica_id()); currentReplica.LastReplicationTimestamp = protoReplicaInfo.last_replication_timestamp(); - currentReplica.Mode = CheckedEnumCast<ETableReplicaMode>(protoReplicaInfo.mode()); + currentReplica.Mode = FromProto<ETableReplicaMode>(protoReplicaInfo.mode()); currentReplica.CurrentReplicationRowIndex = protoReplicaInfo.current_replication_row_index(); currentReplica.CommittedReplicationRowIndex = protoReplicaInfo.committed_replication_row_index(); currentReplica.ReplicationError = FromProto<TError>(protoReplicaInfo.replication_error()); @@ -2066,16 +2066,16 @@ TFuture<TMaintenanceCountsPerTarget> TClient::RemoveMaintenance( counts[EMaintenanceType::DisableTabletCells] = rspValue->disable_tablet_cells(); counts[EMaintenanceType::PendingRestart] = rspValue->pending_restart(); } else { - for (const auto& [type, count] : rspValue->removed_maintenance_counts()) { - counts[CheckedEnumCast<EMaintenanceType>(type)] = count; + for (auto [type, count] : rspValue->removed_maintenance_counts()) { + counts[FromProto<EMaintenanceType>(type)] = count; } } } else { result.reserve(rspValue->removed_maintenance_counts_per_target_size()); for (const auto& [target, protoCounts] : rspValue->removed_maintenance_counts_per_target()) { auto& counts = result[target]; - for (const auto& [type, count] : protoCounts.counts()) { - counts[CheckedEnumCast<EMaintenanceType>(type)] = count; + for (auto [type, count] : protoCounts.counts()) { + counts[FromProto<EMaintenanceType>(type)] = count; } } } diff --git a/yt/yt/client/api/rpc_proxy/config.cpp b/yt/yt/client/api/rpc_proxy/config.cpp index 87dfd2b22e..b3aef0ea36 100644 --- a/yt/yt/client/api/rpc_proxy/config.cpp +++ b/yt/yt/client/api/rpc_proxy/config.cpp @@ -100,9 +100,9 @@ void TConnectionConfig::Register(TRegistrar registrar) .Default(NCompression::ECodec::None); registrar.Parameter("response_codec", &TThis::ResponseCodec) .Default(NCompression::ECodec::None); - // COMPAT(danilalexeev ): legacy RPC codecs + // COMPAT(danilalexeev): legacy RPC codecs registrar.Parameter("enable_legacy_rpc_codecs", &TThis::EnableLegacyRpcCodecs) - .Default(true); + .Default(false); registrar.Parameter("enable_retries", &TThis::EnableRetries) .Default(false); diff --git a/yt/yt/client/api/rpc_proxy/connection_impl.cpp b/yt/yt/client/api/rpc_proxy/connection_impl.cpp index 3a491d8526..fc365aa9e9 100644 --- a/yt/yt/client/api/rpc_proxy/connection_impl.cpp +++ b/yt/yt/client/api/rpc_proxy/connection_impl.cpp @@ -291,9 +291,9 @@ IChannelPtr TConnection::CreateChannel(bool sticky) return CreateRoamingChannel(std::move(provider)); } -IChannelPtr TConnection::CreateChannelByAddress(const TString& address) +IChannelPtr TConnection::CreateChannelByAddress(const std::string& address) { - return CachingChannelFactory_->CreateChannel(address.ConstRef()); + return CachingChannelFactory_->CreateChannel(address); } TClusterTag TConnection::GetClusterTag() const diff --git a/yt/yt/client/api/rpc_proxy/connection_impl.h b/yt/yt/client/api/rpc_proxy/connection_impl.h index 3e050989ca..79442d31d3 100644 --- a/yt/yt/client/api/rpc_proxy/connection_impl.h +++ b/yt/yt/client/api/rpc_proxy/connection_impl.h @@ -27,7 +27,7 @@ public: ~TConnection(); NRpc::IChannelPtr CreateChannel(bool sticky); - NRpc::IChannelPtr CreateChannelByAddress(const TString& address); + NRpc::IChannelPtr CreateChannelByAddress(const std::string& address); const TConnectionConfigPtr& GetConfig(); diff --git a/yt/yt/client/api/rpc_proxy/helpers.cpp b/yt/yt/client/api/rpc_proxy/helpers.cpp index d37fc8b2bd..88c12aa793 100644 --- a/yt/yt/client/api/rpc_proxy/helpers.cpp +++ b/yt/yt/client/api/rpc_proxy/helpers.cpp @@ -523,7 +523,7 @@ void FromProto(NTableClient::TColumnSchema* schema, const NProto::TColumnSchema& ? TColumnStableName(protoSchema.stable_name()) : TColumnStableName(protoSchema.name())); - auto physicalType = CheckedEnumCast<EValueType>(protoSchema.type()); + auto physicalType = FromProto<EValueType>(protoSchema.type()); TLogicalTypePtr columnType; if (protoSchema.has_type_v3()) { @@ -545,7 +545,7 @@ void FromProto(NTableClient::TColumnSchema* schema, const NProto::TColumnSchema& << TErrorAttribute("type", protoSchema.type()); } } else if (protoSchema.has_logical_type()) { - auto logicalType = CheckedEnumCast<ESimpleLogicalValueType>(protoSchema.logical_type()); + auto logicalType = FromProto<ESimpleLogicalValueType>(protoSchema.logical_type()); columnType = MakeLogicalType(logicalType, protoSchema.required()); if (protoSchema.has_type() && GetPhysicalType(logicalType) != physicalType) { THROW_ERROR_EXCEPTION("Fields \"logical_type\" and \"type\" do not match") @@ -565,7 +565,7 @@ void FromProto(NTableClient::TColumnSchema* schema, const NProto::TColumnSchema& schema->SetExpression(YT_PROTO_OPTIONAL(protoSchema, expression)); schema->SetMaterialized(YT_PROTO_OPTIONAL(protoSchema, materialized)); schema->SetAggregate(YT_PROTO_OPTIONAL(protoSchema, aggregate)); - schema->SetSortOrder(YT_APPLY_PROTO_OPTIONAL(protoSchema, sort_order, CheckedEnumCast<ESortOrder>)); + schema->SetSortOrder(YT_APPLY_PROTO_OPTIONAL(protoSchema, sort_order, FromProto<ESortOrder>)); schema->SetGroup(YT_PROTO_OPTIONAL(protoSchema, group)); schema->SetMaxInlineHunkSize(YT_PROTO_OPTIONAL(protoSchema, max_inline_hunk_size)); } @@ -609,14 +609,11 @@ void ToProto(NProto::TTabletInfo* protoTabletInfo, const NTabletClient::TTabletI void FromProto(NTabletClient::TTabletInfo* tabletInfo, const NProto::TTabletInfo& protoTabletInfo) { - tabletInfo->TabletId = - FromProto<TTabletId>(protoTabletInfo.tablet_id()); + tabletInfo->TabletId = FromProto<TTabletId>(protoTabletInfo.tablet_id()); tabletInfo->MountRevision = FromProto<NHydra::TRevision>(protoTabletInfo.mount_revision()); - tabletInfo->State = CheckedEnumCast<ETabletState>(protoTabletInfo.state()); + tabletInfo->State = FromProto<ETabletState>(protoTabletInfo.state()); tabletInfo->PivotKey = FromProto<NTableClient::TLegacyOwningKey>(protoTabletInfo.pivot_key()); - if (protoTabletInfo.has_cell_id()) { - tabletInfo->CellId = FromProto<TTabletCellId>(protoTabletInfo.cell_id()); - } + tabletInfo->CellId = FromProto<TTabletCellId>(protoTabletInfo.cell_id()); } void ToProto( @@ -1373,7 +1370,7 @@ void FromProto( (*manifest)->SourcePath = protoManifest.source_path(); (*manifest)->DestinationPath = protoManifest.destination_path(); - (*manifest)->OrderedMode = CheckedEnumCast<EOrderedTableBackupMode>(protoManifest.ordered_mode()); + (*manifest)->OrderedMode = FromProto<EOrderedTableBackupMode>(protoManifest.ordered_mode()); } void ToProto( @@ -2198,10 +2195,10 @@ TTableSchemaPtr DeserializeRowsetSchema( columns[i].SetStableName(TColumnStableName(entry.name())); } if (entry.has_logical_type()) { - auto simpleLogicalType = CheckedEnumCast<NTableClient::ESimpleLogicalValueType>(entry.logical_type()); + auto simpleLogicalType = FromProto<NTableClient::ESimpleLogicalValueType>(entry.logical_type()); columns[i].SetLogicalType(OptionalLogicalType(SimpleLogicalType(simpleLogicalType))); } else if (entry.has_type()) { - auto simpleLogicalType = CheckedEnumCast<NTableClient::ESimpleLogicalValueType>(entry.type()); + auto simpleLogicalType = FromProto<NTableClient::ESimpleLogicalValueType>(entry.type()); columns[i].SetLogicalType(OptionalLogicalType(SimpleLogicalType(simpleLogicalType))); } } diff --git a/yt/yt/client/cache/rpc.cpp b/yt/yt/client/cache/rpc.cpp index da0200d5fc..025d223acf 100644 --- a/yt/yt/client/cache/rpc.cpp +++ b/yt/yt/client/cache/rpc.cpp @@ -68,12 +68,12 @@ NApi::IClientPtr CreateClient(TStringBuf clusterUrl) return CreateClient(clusterUrl, NApi::GetClientOptionsFromEnvStatic()); } -NApi::IClientPtr CreateClient(TStringBuf cluster, TStringBuf proxyRole) +NApi::IClientPtr CreateClient(TStringBuf cluster, std::optional<TStringBuf> proxyRole) { auto config = New<NApi::NRpcProxy::TConnectionConfig>(); config->ClusterUrl = ToString(cluster); - if (!proxyRole.empty()) { - config->ProxyRole = ToString(proxyRole); + if (proxyRole && !proxyRole->empty()) { + config->ProxyRole = ToString(*proxyRole); } config->Postprocess(); return CreateClient(config); diff --git a/yt/yt/client/cache/rpc.h b/yt/yt/client/cache/rpc.h index c2a844bb50..06cb3815e2 100644 --- a/yt/yt/client/cache/rpc.h +++ b/yt/yt/client/cache/rpc.h @@ -27,7 +27,7 @@ NApi::IClientPtr CreateClient(const NApi::NRpcProxy::TConnectionConfigPtr& confi NApi::IClientPtr CreateClient(TStringBuf clusterUrl); //! Allows to specify proxyRole as dedicated option. -NApi::IClientPtr CreateClient(TStringBuf cluster, TStringBuf proxyRole); +NApi::IClientPtr CreateClient(TStringBuf cluster, std::optional<TStringBuf> proxyRole); //! Shortcut to create client with default config and options from env variables (use env:YT_PROXY as serverName). NApi::IClientPtr CreateClient(); diff --git a/yt/yt/client/chaos_client/config.h b/yt/yt/client/chaos_client/config.h index 4454dbb187..9391551f8a 100644 --- a/yt/yt/client/chaos_client/config.h +++ b/yt/yt/client/chaos_client/config.h @@ -10,10 +10,24 @@ namespace NYT::NChaosClient { //////////////////////////////////////////////////////////////////////////////// +class TChaosCacheChannelConfig + : public NRpc::TRetryingChannelConfig + , public NRpc::TBalancingChannelConfig +{ +public: + REGISTER_YSON_STRUCT(TChaosCacheChannelConfig); + + static void Register(TRegistrar /*registrar*/) + { } +}; + +DEFINE_REFCOUNTED_TYPE(TChaosCacheChannelConfig) + +//////////////////////////////////////////////////////////////////////////////// + class TReplicationCardCacheConfig : public TAsyncExpiringCacheConfig - , public NRpc::TBalancingChannelConfig - , public NRpc::TRetryingChannelConfig + , public TChaosCacheChannelConfig { public: bool EnableWatching; diff --git a/yt/yt/client/chaos_client/public.h b/yt/yt/client/chaos_client/public.h index 0493104b9d..28f7c30d5c 100644 --- a/yt/yt/client/chaos_client/public.h +++ b/yt/yt/client/chaos_client/public.h @@ -24,6 +24,7 @@ constexpr int MaxReplicasPerReplicationCard = 128; DECLARE_REFCOUNTED_STRUCT(TReplicationCard) DECLARE_REFCOUNTED_STRUCT(IReplicationCardCache) +DECLARE_REFCOUNTED_CLASS(TChaosCacheChannelConfig) DECLARE_REFCOUNTED_CLASS(TReplicationCardCacheConfig) DECLARE_REFCOUNTED_CLASS(TReplicationCardCacheDynamicConfig) diff --git a/yt/yt/client/driver/command-inl.h b/yt/yt/client/driver/command-inl.h index 4b595a57df..54f0d40f4e 100644 --- a/yt/yt/client/driver/command-inl.h +++ b/yt/yt/client/driver/command-inl.h @@ -392,7 +392,7 @@ void TSelectRowsCommandBase< .GreaterThan(0) .Optional(/*init*/ false); - registrar.template ParameterWithUniversalAccessor<ui64>( + registrar.template ParameterWithUniversalAccessor<i64>( "min_row_count_per_subquery", [] (TThis* command) -> auto& { return command->Options.MinRowCountPerSubquery; diff --git a/yt/yt/client/driver/driver.cpp b/yt/yt/client/driver/driver.cpp index 48d6f7d62b..afa22250e6 100644 --- a/yt/yt/client/driver/driver.cpp +++ b/yt/yt/client/driver/driver.cpp @@ -357,7 +357,7 @@ public: REGISTER (TAdvanceQueueConsumerCommand, "advance_queue_consumer", Null, Structured, true, false, ApiVersion4); REGISTER (TCreateQueueProducerSessionCommand, "create_queue_producer_session", Null, Structured, true, false, ApiVersion4); REGISTER (TRemoveQueueProducerSessionCommand, "remove_queue_producer_session", Null, Structured, true, false, ApiVersion4); - REGISTER (TPushQueueProducerCommand, "push_queue_producer", Null, Structured, true, false, ApiVersion4); + REGISTER (TPushQueueProducerCommand, "push_queue_producer", Tabular, Structured, true, false, ApiVersion4); REGISTER (TStartQueryCommand, "start_query", Null, Structured, true, false, ApiVersion4); REGISTER (TAbortQueryCommand, "abort_query", Null, Structured, true, false, ApiVersion4); diff --git a/yt/yt/client/driver/etc_commands.h b/yt/yt/client/driver/etc_commands.h index 5d9e8c04b1..789f8690d4 100644 --- a/yt/yt/client/driver/etc_commands.h +++ b/yt/yt/client/driver/etc_commands.h @@ -173,7 +173,7 @@ private: TString PoolTree; NYTree::INodePtr ResourceDelta; - virtual void DoExecute(ICommandContextPtr context) override; + void DoExecute(ICommandContextPtr context) override; }; //////////////////////////////////////////////////////////////////////////////// diff --git a/yt/yt/client/driver/queue_commands.cpp b/yt/yt/client/driver/queue_commands.cpp index 1db333dc5d..8f8040b193 100644 --- a/yt/yt/client/driver/queue_commands.cpp +++ b/yt/yt/client/driver/queue_commands.cpp @@ -163,6 +163,11 @@ void TPullQueueCommand::Register(TRegistrar registrar) void TPullQueueCommand::DoExecute(ICommandContextPtr context) { + YT_LOG_DEBUG("Executing \"pull_queue\" command (QueuePath: %v, Offset: %v, PartitionIndex: %v)", + QueuePath, + Offset, + PartitionIndex); + auto client = context->GetClient(); auto result = WaitFor(client->PullQueue( @@ -226,6 +231,12 @@ void TPullQueueConsumerCommand::Register(TRegistrar registrar) void TPullQueueConsumerCommand::DoExecute(ICommandContextPtr context) { + YT_LOG_DEBUG("Executing \"pull_queue_consumer\" command (ConsumerPath: %v, QueuePath: %v, Offset: %v, PartitionIndex: %v)", + ConsumerPath, + QueuePath, + Offset, + PartitionIndex); + auto client = context->GetClient(); auto result = WaitFor(client->PullQueueConsumer( @@ -263,6 +274,13 @@ void TAdvanceQueueConsumerCommand::Register(TRegistrar registrar) void TAdvanceQueueConsumerCommand::DoExecute(ICommandContextPtr context) { + YT_LOG_DEBUG("Executing \"advance_queue_consumer\" command (ConsumerPath: %v, QueuePath: %v, PartitionIndex: %v, OldOffset: %v, NewOffset: %v)", + ConsumerPath, + QueuePath, + PartitionIndex, + OldOffset, + NewOffset); + auto transaction = GetTransaction(context); if (ClientSide.value_or(false)) { @@ -297,6 +315,11 @@ void TCreateQueueProducerSessionCommand::Register(TRegistrar registrar) void TCreateQueueProducerSessionCommand::DoExecute(ICommandContextPtr context) { + YT_LOG_DEBUG("Executing \"create_queue_producer_session\" command (ProducerPath: %v, QueuePath: %v, SessionId: %v)", + ProducerPath, + QueuePath, + SessionId); + auto client = context->GetClient(); auto result = WaitFor(client->CreateQueueProducerSession( @@ -327,6 +350,11 @@ void TRemoveQueueProducerSessionCommand::Register(TRegistrar registrar) void TRemoveQueueProducerSessionCommand::DoExecute(ICommandContextPtr context) { + YT_LOG_DEBUG("Executing \"remove_queue_producer_session\" command (ProducerPath: %v, QueuePath: %v, SessionId: %v)", + ProducerPath, + QueuePath, + SessionId); + auto client = context->GetClient(); WaitFor(client->RemoveQueueProducerSession( @@ -366,6 +394,12 @@ void TPushQueueProducerCommand::Register(TRegistrar registrar) void TPushQueueProducerCommand::DoExecute(ICommandContextPtr context) { + YT_LOG_DEBUG("Executing \"push_queue_producer\" command (ProducerPath: %v, QueuePath: %v, SessionId: %v, Epoch: %v)", + ProducerPath, + QueuePath, + SessionId, + Epoch); + auto tableMountCache = context->GetClient()->GetTableMountCache(); auto queueTableInfoFuture = tableMountCache->GetTableInfo(QueuePath.GetPath()); diff --git a/yt/yt/client/driver/table_commands.h b/yt/yt/client/driver/table_commands.h index 224bcd9c0c..d511e76622 100644 --- a/yt/yt/client/driver/table_commands.h +++ b/yt/yt/client/driver/table_commands.h @@ -400,7 +400,7 @@ public: private: NYPath::TRichYPath Path; - virtual void DoExecute(ICommandContextPtr context) override; + void DoExecute(ICommandContextPtr context) override; bool HasResponseParameters() const override; }; @@ -585,7 +585,7 @@ public: private: NApi::TBackupManifestPtr Manifest; - virtual void DoExecute(ICommandContextPtr context) override; + void DoExecute(ICommandContextPtr context) override; }; //////////////////////////////////////////////////////////////////////////////// @@ -601,7 +601,7 @@ public: private: NApi::TBackupManifestPtr Manifest; - virtual void DoExecute(ICommandContextPtr context) override; + void DoExecute(ICommandContextPtr context) override; }; //////////////////////////////////////////////////////////////////////////////// diff --git a/yt/yt/client/federated/client.cpp b/yt/yt/client/federated/client.cpp index fd8fb21dc0..2092672c4f 100644 --- a/yt/yt/client/federated/client.cpp +++ b/yt/yt/client/federated/client.cpp @@ -161,17 +161,17 @@ public: return Underlying_->GetStartTimestamp(); } - virtual NTransactionClient::EAtomicity GetAtomicity() const override + NTransactionClient::EAtomicity GetAtomicity() const override { return Underlying_->GetAtomicity(); } - virtual NTransactionClient::EDurability GetDurability() const override + NTransactionClient::EDurability GetDurability() const override { return Underlying_->GetDurability(); } - virtual TDuration GetTimeout() const override + TDuration GetTimeout() const override { return Underlying_->GetTimeout(); } diff --git a/yt/yt/client/formats/versioned_writer.h b/yt/yt/client/formats/versioned_writer.h index 025ba8e1fe..1427d5bf71 100644 --- a/yt/yt/client/formats/versioned_writer.h +++ b/yt/yt/client/formats/versioned_writer.h @@ -26,11 +26,11 @@ public: NTableClient::TTableSchemaPtr schema, const std::function<std::unique_ptr<NYson::IFlushableYsonConsumer>(IZeroCopyOutput*)>& consumerBuilder); - virtual TFuture<void> Close() override; + TFuture<void> Close() override; - virtual bool Write(TRange<NTableClient::TVersionedRow> rows) override; + bool Write(TRange<NTableClient::TVersionedRow> rows) override; - virtual TFuture<void> GetReadyEvent() override; + TFuture<void> GetReadyEvent() override; private: const NConcurrency::IAsyncOutputStreamPtr Stream_; diff --git a/yt/yt/client/scheduler/public.h b/yt/yt/client/scheduler/public.h index 2c0f279ed9..10e930bb93 100644 --- a/yt/yt/client/scheduler/public.h +++ b/yt/yt/client/scheduler/public.h @@ -143,6 +143,7 @@ DEFINE_ENUM(EAbortReason, ((InterruptionFailed) ( 55)) ((OperationIncarnationChanged) ( 56)) ((AddressResolveFailed) ( 57)) + ((UnexpectedNodeJobPhase) ( 58)) ((SchedulingFirst) (100)) ((SchedulingTimeout) (101)) ((SchedulingResourceOvercommit) (102)) diff --git a/yt/yt/client/table_client/column_sort_schema.cpp b/yt/yt/client/table_client/column_sort_schema.cpp index 2bc14f1a9f..b6002b5e77 100644 --- a/yt/yt/client/table_client/column_sort_schema.cpp +++ b/yt/yt/client/table_client/column_sort_schema.cpp @@ -89,11 +89,13 @@ void FromProto( TSortColumns* sortColumns, const NProto::TSortColumnsExt& protoSortColumns) { + using NYT::FromProto; + YT_VERIFY(protoSortColumns.names_size() == protoSortColumns.sort_orders_size()); for (int columnIndex = 0; columnIndex < protoSortColumns.names_size(); ++columnIndex) { TColumnSortSchema sortColumn{ .Name = protoSortColumns.names(columnIndex), - .SortOrder = CheckedEnumCast<ESortOrder>(protoSortColumns.sort_orders(columnIndex)) + .SortOrder = FromProto<ESortOrder>(protoSortColumns.sort_orders(columnIndex)), }; sortColumns->push_back(sortColumn); } diff --git a/yt/yt/client/table_client/config.cpp b/yt/yt/client/table_client/config.cpp index 49898440e2..6bfd4b410c 100644 --- a/yt/yt/client/table_client/config.cpp +++ b/yt/yt/client/table_client/config.cpp @@ -464,6 +464,8 @@ void TChunkWriterOptions::Register(TRegistrar registrar) .Default(false); registrar.Parameter("enable_column_meta_in_chunk_meta", &TThis::EnableColumnMetaInChunkMeta) .Default(true); + registrar.Parameter("consider_min_row_range_data_weight", &TThis::ConsiderMinRowRangeDataWeight) + .Default(true); registrar.Parameter("schema_modification", &TThis::SchemaModification) .Default(ETableSchemaModification::None); diff --git a/yt/yt/client/table_client/config.h b/yt/yt/client/table_client/config.h index a8accf974b..2f91bcd032 100644 --- a/yt/yt/client/table_client/config.h +++ b/yt/yt/client/table_client/config.h @@ -429,6 +429,7 @@ public: bool EnableRowCountInColumnarStatistics; bool EnableSegmentMetaInBlocks; bool EnableColumnMetaInChunkMeta; + bool ConsiderMinRowRangeDataWeight; NYTree::INodePtr CastAnyToCompositeNode; diff --git a/yt/yt/client/table_client/logical_type.cpp b/yt/yt/client/table_client/logical_type.cpp index 679540a999..4845e37210 100644 --- a/yt/yt/client/table_client/logical_type.cpp +++ b/yt/yt/client/table_client/logical_type.cpp @@ -1123,9 +1123,11 @@ void ToProto(NProto::TLogicalType* protoLogicalType, const TLogicalTypePtr& logi void FromProto(TLogicalTypePtr* logicalType, const NProto::TLogicalType& protoLogicalType) { + using NYT::FromProto; + switch (protoLogicalType.type_case()) { case NProto::TLogicalType::TypeCase::kSimple: - *logicalType = SimpleLogicalType(CheckedEnumCast<ESimpleLogicalValueType>(protoLogicalType.simple())); + *logicalType = SimpleLogicalType(FromProto<ESimpleLogicalValueType>(protoLogicalType.simple())); return; case NProto::TLogicalType::TypeCase::kDecimal: *logicalType = DecimalLogicalType(protoLogicalType.decimal().precision(), protoLogicalType.decimal().scale()); diff --git a/yt/yt/client/table_client/schema.cpp b/yt/yt/client/table_client/schema.cpp index 25a5c29ed9..dda743a92d 100644 --- a/yt/yt/client/table_client/schema.cpp +++ b/yt/yt/client/table_client/schema.cpp @@ -463,10 +463,10 @@ void FromProto(TColumnSchema* schema, const NProto::TColumnSchema& protoSchema) } else if (protoSchema.has_simple_logical_type()) { schema->SetLogicalType( MakeLogicalType( - CheckedEnumCast<ESimpleLogicalValueType>(protoSchema.simple_logical_type()), + FromProto<ESimpleLogicalValueType>(protoSchema.simple_logical_type()), protoSchema.required())); } else { - auto physicalType = CheckedEnumCast<EValueType>(protoSchema.type()); + auto physicalType = FromProto<EValueType>(protoSchema.type()); schema->SetLogicalType(MakeLogicalType(GetLogicalType(physicalType), protoSchema.required())); } @@ -474,7 +474,7 @@ void FromProto(TColumnSchema* schema, const NProto::TColumnSchema& protoSchema) schema->SetExpression(YT_PROTO_OPTIONAL(protoSchema, expression)); schema->SetMaterialized(YT_PROTO_OPTIONAL(protoSchema, materialized)); schema->SetAggregate(YT_PROTO_OPTIONAL(protoSchema, aggregate)); - schema->SetSortOrder(YT_APPLY_PROTO_OPTIONAL(protoSchema, sort_order, CheckedEnumCast<ESortOrder>)); + schema->SetSortOrder(YT_APPLY_PROTO_OPTIONAL(protoSchema, sort_order, FromProto<ESortOrder>)); schema->SetGroup(YT_PROTO_OPTIONAL(protoSchema, group)); schema->SetMaxInlineHunkSize(YT_PROTO_OPTIONAL(protoSchema, max_inline_hunk_size)); } @@ -1514,7 +1514,7 @@ void FromProto(TTableSchema* schema, const NProto::TTableSchemaExt& protoSchema) FromProto<std::vector<TColumnSchema>>(protoSchema.columns()), protoSchema.strict(), protoSchema.unique_keys(), - CheckedEnumCast<ETableSchemaModification>(protoSchema.schema_modification()), + FromProto<ETableSchemaModification>(protoSchema.schema_modification()), FromProto<std::vector<TDeletedColumn>>(protoSchema.deleted_columns())); } diff --git a/yt/yt/client/table_client/versioned_io_options.cpp b/yt/yt/client/table_client/versioned_io_options.cpp index d181197a1e..040a0463e5 100644 --- a/yt/yt/client/table_client/versioned_io_options.cpp +++ b/yt/yt/client/table_client/versioned_io_options.cpp @@ -4,6 +4,7 @@ namespace NYT::NTableClient { +using NYT::FromProto; using NYT::ToProto; //////////////////////////////////////////////////////////////////////////////// @@ -31,7 +32,7 @@ void FromProto( TVersionedReadOptions* options, const NProto::TVersionedReadOptions& protoOptions) { - options->ReadMode = CheckedEnumCast<EVersionedIOMode>(protoOptions.read_mode()); + options->ReadMode = FromProto<EVersionedIOMode>(protoOptions.read_mode()); } void ToProto( @@ -45,7 +46,7 @@ void FromProto( NTableClient::TVersionedWriteOptions* options, const NProto::TVersionedWriteOptions& protoOptions) { - options->WriteMode = CheckedEnumCast<EVersionedIOMode>(protoOptions.write_mode()); + options->WriteMode = FromProto<EVersionedIOMode>(protoOptions.write_mode()); } std::optional<TString> GetTimestampColumnOriginalNameOrNull(TStringBuf name) diff --git a/yt/yt/client/table_client/wire_protocol.cpp b/yt/yt/client/table_client/wire_protocol.cpp index 59638588f2..ec948b7263 100644 --- a/yt/yt/client/table_client/wire_protocol.cpp +++ b/yt/yt/client/table_client/wire_protocol.cpp @@ -536,8 +536,9 @@ class TWireProtocolReader : public IWireProtocolReader { public: - explicit TWireProtocolReader(TSharedRef data, TRowBufferPtr rowBuffer) + explicit TWireProtocolReader(TSharedRef data, TRowBufferPtr rowBuffer, TWireProtocolOptions options) : RowBuffer_(rowBuffer ? std::move(rowBuffer) : New<TRowBuffer>(TWireProtocolReaderTag(), ReaderBufferChunkSize)) + , Options_(std::move(options)) , Data_(std::move(data)) , Current_(Data_.Begin()) { } @@ -781,10 +782,12 @@ public: private: const TRowBufferPtr RowBuffer_; + const TWireProtocolOptions Options_; TSharedRef Data_; TIterator Current_; + void ValidateSizeAvailable(size_t size) { if (Current_ + size > Data_.End()) { @@ -857,15 +860,15 @@ private: void DoReadStringData(EValueType type, ui32 length, const char** result, bool captureValues) { - ui32 limit = 0; + i64 limit = 0; if (type == EValueType::String) { - limit = MaxStringValueLength; + limit = Options_.MaxStringValueLength; } if (type == EValueType::Any) { - limit = MaxAnyValueLength; + limit = Options_.MaxAnyValueLength; } if (type == EValueType::Composite) { - limit = MaxCompositeValueLength; + limit = Options_.MaxCompositeValueLength; } if (length > limit) { THROW_ERROR_EXCEPTION("Value of type %Qlv is too long: length %v, limit %v", @@ -994,10 +997,10 @@ private: void ValidateVersionedRowTimestampCount(const TVersionedRowHeader& rowHeader) { - if (rowHeader.WriteTimestampCount > MaxTimestampCountPerRow) { + if (rowHeader.WriteTimestampCount > Options_.MaxTimestampCountPerRow) { THROW_ERROR_EXCEPTION("Too many write timestamps in a versioned row"); } - if (rowHeader.DeleteTimestampCount > MaxTimestampCountPerRow) { + if (rowHeader.DeleteTimestampCount > Options_.MaxTimestampCountPerRow) { THROW_ERROR_EXCEPTION("Too many delete timestamps in a versioned row"); } } @@ -1005,10 +1008,10 @@ private: void ValidateVersionedRowDataWeight(TVersionedRow row) { auto dataWeight = GetDataWeight(row); - if (dataWeight > MaxServerVersionedRowDataWeight) { + if (static_cast<i64>(dataWeight) > Options_.MaxVersionedRowDataWeight) { THROW_ERROR_EXCEPTION("Versioned row data weight is too large: %v > %v", dataWeight, - MaxServerVersionedRowDataWeight) + Options_.MaxVersionedRowDataWeight) << TErrorAttribute("key", ToOwningKey(row)); } } @@ -1052,9 +1055,9 @@ auto IWireProtocolReader::GetSchemaData(const TTableSchema& schema) -> TSchemaDa //////////////////////////////////////////////////////////////////////////////// -std::unique_ptr<IWireProtocolReader> CreateWireProtocolReader(TSharedRef data, TRowBufferPtr rowBuffer) +std::unique_ptr<IWireProtocolReader> CreateWireProtocolReader(TSharedRef data, TRowBufferPtr rowBuffer, TWireProtocolOptions options) { - return std::make_unique<TWireProtocolReader>(std::move(data), std::move(rowBuffer)); + return std::make_unique<TWireProtocolReader>(std::move(data), std::move(rowBuffer), std::move(options)); } //////////////////////////////////////////////////////////////////////////////// @@ -1068,12 +1071,14 @@ public: NCompression::ECodec codecId, TTableSchemaPtr schema, bool schemaful, - const NLogging::TLogger& logger) + const NLogging::TLogger& logger, + TWireProtocolOptions options) : CompressedBlocks_(compressedBlocks) , Codec_(NCompression::GetCodec(codecId)) , Schema_(std::move(schema)) , Schemaful_(schemaful) , Logger(logger.WithTag("ReaderId: %v", TGuid::Create())) + , Options_(std::move(options)) { YT_LOG_DEBUG("Wire protocol rowset reader created (BlockCount: %v, TotalCompressedSize: %v)", CompressedBlocks_.size(), @@ -1102,7 +1107,7 @@ public: uncompressedBlock.Size()); auto rowBuffer = New<TRowBuffer>(TWireProtocolReaderTag(), ReaderBufferChunkSize); - WireReader_ = CreateWireProtocolReader(uncompressedBlock, std::move(rowBuffer)); + WireReader_ = CreateWireProtocolReader(uncompressedBlock, std::move(rowBuffer), Options_); if (!SchemaChecked_) { auto actualSchema = WireReader_->ReadTableSchema(); @@ -1164,6 +1169,7 @@ private: const TTableSchemaPtr Schema_; bool Schemaful_; const NLogging::TLogger Logger; + const TWireProtocolOptions Options_; int BlockIndex_ = 0; std::unique_ptr<IWireProtocolReader> WireReader_; @@ -1177,14 +1183,16 @@ IWireProtocolRowsetReaderPtr CreateWireProtocolRowsetReader( NCompression::ECodec codecId, TTableSchemaPtr schema, bool schemaful, - const NLogging::TLogger& logger) + const NLogging::TLogger& logger, + TWireProtocolOptions options) { return New<TWireProtocolRowsetReader>( compressedBlocks, codecId, std::move(schema), schemaful, - logger); + logger, + std::move(options)); } //////////////////////////////////////////////////////////////////////////////// diff --git a/yt/yt/client/table_client/wire_protocol.h b/yt/yt/client/table_client/wire_protocol.h index 340f3d6263..493d03b494 100644 --- a/yt/yt/client/table_client/wire_protocol.h +++ b/yt/yt/client/table_client/wire_protocol.h @@ -280,13 +280,25 @@ public: //////////////////////////////////////////////////////////////////////////////// +struct TWireProtocolOptions +{ + i64 MaxStringValueLength = NTableClient::MaxStringValueLength; + i64 MaxAnyValueLength = NTableClient::MaxAnyValueLength; + i64 MaxCompositeValueLength = NTableClient::MaxCompositeValueLength; + i64 MaxTimestampCountPerRow = NTableClient::MaxTimestampCountPerRow; + i64 MaxVersionedRowDataWeight = NTableClient::MaxServerVersionedRowDataWeight; +}; + +//////////////////////////////////////////////////////////////////////////////// + //! Creates wire protocol reader. /*! * If #rowBuffer is null, a default one is created. */ std::unique_ptr<IWireProtocolReader> CreateWireProtocolReader( TSharedRef data, - TRowBufferPtr rowBuffer = TRowBufferPtr()); + TRowBufferPtr rowBuffer = TRowBufferPtr(), + TWireProtocolOptions options = {}); //////////////////////////////////////////////////////////////////////////////// @@ -301,7 +313,8 @@ IWireProtocolRowsetReaderPtr CreateWireProtocolRowsetReader( NCompression::ECodec codecId, NTableClient::TTableSchemaPtr schema, bool schemaful, - const NLogging::TLogger& logger); + const NLogging::TLogger& logger, + TWireProtocolOptions options = {}); //////////////////////////////////////////////////////////////////////////////// diff --git a/yt/yt/core/actions/cancelable_context.cpp b/yt/yt/core/actions/cancelable_context.cpp index ba24beb035..9993b14cf4 100644 --- a/yt/yt/core/actions/cancelable_context.cpp +++ b/yt/yt/core/actions/cancelable_context.cpp @@ -25,6 +25,7 @@ public: void Invoke(TClosure callback) override { YT_ASSERT(callback); + auto guard = NDetail::MakeCancelableContextCurrentTokenGuard(Context_); if (Context_->Canceled_) { @@ -50,9 +51,43 @@ public: })); } + void Invoke(TMutableRange<TClosure> callbacks) override + { + auto guard = NDetail::MakeCancelableContextCurrentTokenGuard(Context_); + + std::vector<TClosure> capturedCallbacks; + capturedCallbacks.reserve(callbacks.size()); + for (auto& callback : callbacks) { + capturedCallbacks.push_back(std::move(callback)); + } + + if (Context_->Canceled_) { + capturedCallbacks.clear(); + return; + } + + return UnderlyingInvoker_->Invoke(BIND_NO_PROPAGATE( + [ + this, + this_ = MakeStrong(this), + capturedCallbacks = std::move(capturedCallbacks) + ] () mutable { + auto currentTokenGuard = NDetail::MakeCancelableContextCurrentTokenGuard(Context_); + + if (Context_->Canceled_) { + capturedCallbacks.clear(); + return; + } + + TCurrentInvokerGuard guard(this); + for (const auto& callback : capturedCallbacks) { + callback(); + } + })); + } + private: const TCancelableContextPtr Context_; - }; //////////////////////////////////////////////////////////////////////////////// diff --git a/yt/yt/core/actions/codicil_guarded_invoker.cpp b/yt/yt/core/actions/codicil_guarded_invoker.cpp new file mode 100644 index 0000000000..b61e0d6fdf --- /dev/null +++ b/yt/yt/core/actions/codicil_guarded_invoker.cpp @@ -0,0 +1,50 @@ +#include "codicil_guarded_invoker.h" + +#include <yt/yt/core/actions/bind.h> +#include <yt/yt/core/actions/current_invoker.h> +#include <yt/yt/core/actions/invoker_detail.h> + +#include <yt/yt/core/misc/codicil.h> + +namespace NYT::NConcurrency { + +//////////////////////////////////////////////////////////////////////////////// + +class TCodicilGuardedInvoker + : public TInvokerWrapper<false> +{ +public: + TCodicilGuardedInvoker(IInvokerPtr invoker, std::string codicil) + : TInvokerWrapper(std::move(invoker)) + , Codicil_(std::move(codicil)) + { } + + using TInvokerWrapper::Invoke; + + void Invoke(TClosure callback) override + { + UnderlyingInvoker_->Invoke(BIND_NO_PROPAGATE( + &TCodicilGuardedInvoker::RunCallback, + MakeStrong(this), + Passed(std::move(callback)))); + } + +private: + const std::string Codicil_; + + void RunCallback(TClosure callback) + { + auto currentInvokerGuard = TCurrentInvokerGuard(this); + auto codicilGuard = TCodicilGuard(MakeNonOwningCodicilBuilder(Codicil_)); + callback(); + } +}; + +IInvokerPtr CreateCodicilGuardedInvoker(IInvokerPtr underlyingInvoker, std::string codicil) +{ + return New<TCodicilGuardedInvoker>(std::move(underlyingInvoker), std::move(codicil)); +} + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT::NConcurrency diff --git a/yt/yt/core/actions/codicil_guarded_invoker.h b/yt/yt/core/actions/codicil_guarded_invoker.h new file mode 100644 index 0000000000..d0882b9e9f --- /dev/null +++ b/yt/yt/core/actions/codicil_guarded_invoker.h @@ -0,0 +1,19 @@ +#pragma once + +#include "public.h" + +#include <yt/yt/core/actions/public.h> + +namespace NYT::NConcurrency { + +//////////////////////////////////////////////////////////////////////////////// + +//! Creates an invoker that creates a codicil guard with a given string before each +//! callback invocation. +IInvokerPtr CreateCodicilGuardedInvoker( + IInvokerPtr underlyingInvoker, + std::string codicil); + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT::NConcurrency diff --git a/yt/yt/core/actions/invoker_detail.cpp b/yt/yt/core/actions/invoker_detail.cpp index a36a0a2b93..573165bafb 100644 --- a/yt/yt/core/actions/invoker_detail.cpp +++ b/yt/yt/core/actions/invoker_detail.cpp @@ -18,15 +18,11 @@ TInvokerWrapper<VirtualizeBase>::TInvokerWrapper(IInvokerPtr underlyingInvoker) } template <bool VirtualizeBase> -void TInvokerWrapper<VirtualizeBase>::Invoke(TClosure callback) -{ - return UnderlyingInvoker_->Invoke(std::move(callback)); -} - -template <bool VirtualizeBase> void TInvokerWrapper<VirtualizeBase>::Invoke(TMutableRange<TClosure> callbacks) { - return UnderlyingInvoker_->Invoke(callbacks); + for (auto& callback : callbacks) { + static_cast<IInvoker*>(this)->Invoke(std::move(callback)); + } } template <bool VirtualizeBase> diff --git a/yt/yt/core/actions/invoker_detail.h b/yt/yt/core/actions/invoker_detail.h index 075e992d9d..5f5538a96f 100644 --- a/yt/yt/core/actions/invoker_detail.h +++ b/yt/yt/core/actions/invoker_detail.h @@ -34,7 +34,6 @@ class TInvokerWrapper : public NDetail::TMaybeVirtualInvokerBase<VirtualizeBase> { public: - void Invoke(TClosure callback) override; void Invoke(TMutableRange<TClosure> callbacks) override; NThreading::TThreadId GetThreadId() const override; diff --git a/yt/yt/core/actions/signal.h b/yt/yt/core/actions/signal.h index 4a254dab83..b46615c415 100644 --- a/yt/yt/core/actions/signal.h +++ b/yt/yt/core/actions/signal.h @@ -213,12 +213,12 @@ public: \ protected: \ ::NYT::TCallbackList<TSignature> name##_; \ public: \ - virtual void Subscribe##name(const ::NYT::TCallback<TSignature>& callback) override \ + void Subscribe##name(const ::NYT::TCallback<TSignature>& callback) override \ { \ name##_.Subscribe(callback); \ } \ \ - virtual void Unsubscribe##name(const ::NYT::TCallback<TSignature>& callback) override \ + void Unsubscribe##name(const ::NYT::TCallback<TSignature>& callback) override \ { \ name##_.Unsubscribe(callback); \ } \ @@ -233,8 +233,8 @@ public: \ ::NYT::TCallbackList<TSignature>* Get##name##Signal() #define DECLARE_SIGNAL_OVERRIDE(TSignature, name) \ - virtual void Subscribe##name(const ::NYT::TCallback<TSignature>& callback) override; \ - virtual void Unsubscribe##name(const ::NYT::TCallback<TSignature>& callback) override + void Subscribe##name(const ::NYT::TCallback<TSignature>& callback) override; \ + void Unsubscribe##name(const ::NYT::TCallback<TSignature>& callback) override #define DECLARE_INTERFACE_SIGNAL(TSignature, name) \ virtual void Subscribe##name(const ::NYT::TCallback<TSignature>& callback) = 0; \ diff --git a/yt/yt/core/bus/tcp/dispatcher.cpp b/yt/yt/core/bus/tcp/dispatcher.cpp index ac7ffc05a7..ae3f35c251 100644 --- a/yt/yt/core/bus/tcp/dispatcher.cpp +++ b/yt/yt/core/bus/tcp/dispatcher.cpp @@ -43,7 +43,7 @@ bool TTcpDispatcher::IsNetworkingDisabled() return Impl_->IsNetworkingDisabled(); } -const TString& TTcpDispatcher::GetNetworkNameForAddress(const NNet::TNetworkAddress& address) +const std::string& TTcpDispatcher::GetNetworkNameForAddress(const NNet::TNetworkAddress& address) { return Impl_->GetNetworkNameForAddress(address); } diff --git a/yt/yt/core/bus/tcp/dispatcher.h b/yt/yt/core/bus/tcp/dispatcher.h index 711771ba16..5efd5024e9 100644 --- a/yt/yt/core/bus/tcp/dispatcher.h +++ b/yt/yt/core/bus/tcp/dispatcher.h @@ -56,7 +56,7 @@ public: bool IsNetworkingDisabled(); //! Returns the network name for a given #address. - const TString& GetNetworkNameForAddress(const NNet::TNetworkAddress& address); + const std::string& GetNetworkNameForAddress(const NNet::TNetworkAddress& address); //! Returns the TOS level configured for a band. TTosLevel GetTosLevelForBand(EMultiplexingBand band); diff --git a/yt/yt/core/bus/tcp/dispatcher_impl.cpp b/yt/yt/core/bus/tcp/dispatcher_impl.cpp index 95024b9733..26df9ef38c 100644 --- a/yt/yt/core/bus/tcp/dispatcher_impl.cpp +++ b/yt/yt/core/bus/tcp/dispatcher_impl.cpp @@ -64,7 +64,7 @@ const TIntrusivePtr<TTcpDispatcher::TImpl>& TTcpDispatcher::TImpl::Get() return TTcpDispatcher::Get()->Impl_; } -const TBusNetworkCountersPtr& TTcpDispatcher::TImpl::GetCounters(const TString& networkName, bool encrypted) +const TBusNetworkCountersPtr& TTcpDispatcher::TImpl::GetCounters(const std::string& networkName, bool encrypted) { auto [statistics, ok] = NetworkStatistics_.FindOrInsert(networkName, [] { return std::array<TNetworkStatistics, 2>{}; @@ -118,7 +118,7 @@ bool TTcpDispatcher::TImpl::IsNetworkingDisabled() return NetworkingDisabled_.load(); } -const TString& TTcpDispatcher::TImpl::GetNetworkNameForAddress(const TNetworkAddress& address) +const std::string& TTcpDispatcher::TImpl::GetNetworkNameForAddress(const TNetworkAddress& address) { if (address.IsUnix()) { return LocalNetworkName; diff --git a/yt/yt/core/bus/tcp/dispatcher_impl.h b/yt/yt/core/bus/tcp/dispatcher_impl.h index 948bb87d10..09ed61d50d 100644 --- a/yt/yt/core/bus/tcp/dispatcher_impl.h +++ b/yt/yt/core/bus/tcp/dispatcher_impl.h @@ -33,12 +33,12 @@ class TTcpDispatcher::TImpl public: static const TIntrusivePtr<TImpl>& Get(); - const TBusNetworkCountersPtr& GetCounters(const TString& networkName, bool encrypted); + const TBusNetworkCountersPtr& GetCounters(const std::string& networkName, bool encrypted); void DisableNetworking(); bool IsNetworkingDisabled(); - const TString& GetNetworkNameForAddress(const NNet::TNetworkAddress& address); + const std::string& GetNetworkNameForAddress(const NNet::TNetworkAddress& address); TTosLevel GetTosLevelForBand(EMultiplexingBand band); @@ -87,7 +87,7 @@ private: const TBusNetworkCountersPtr Counters = New<TBusNetworkCounters>(); }; - NConcurrency::TSyncMap<TString, std::array<TNetworkStatistics, 2>> NetworkStatistics_; + NConcurrency::TSyncMap<std::string, std::array<TNetworkStatistics, 2>> NetworkStatistics_; YT_DECLARE_SPIN_LOCK(NThreading::TSpinLock, PeriodicExecutorsLock_); NConcurrency::TPeriodicExecutorPtr ProfilingExecutor_; @@ -96,7 +96,7 @@ private: std::atomic<bool> NetworkingDisabled_ = false; YT_DECLARE_SPIN_LOCK(NThreading::TForkAwareReaderWriterSpinLock, NetworksLock_); - std::vector<std::pair<NNet::TIP6Network, TString>> Networks_; + std::vector<std::pair<NNet::TIP6Network, std::string>> Networks_; struct TBandDescriptor { diff --git a/yt/yt/core/bus/tcp/server.cpp b/yt/yt/core/bus/tcp/server.cpp index 98903ce583..84c21fef5a 100644 --- a/yt/yt/core/bus/tcp/server.cpp +++ b/yt/yt/core/bus/tcp/server.cpp @@ -177,7 +177,7 @@ protected: } } - int GetTotalServerConnectionCount(const TString& clientNetwork) + int GetTotalServerConnectionCount(const std::string& clientNetwork) { const auto& dispatcher = TTcpDispatcher::TImpl::Get(); int result = 0; diff --git a/yt/yt/core/concurrency/action_queue.cpp b/yt/yt/core/concurrency/action_queue.cpp index 27e8ee6e0d..73b3021fe0 100644 --- a/yt/yt/core/concurrency/action_queue.cpp +++ b/yt/yt/core/concurrency/action_queue.cpp @@ -11,7 +11,6 @@ #include <yt/yt/core/ypath/token.h> -#include <yt/yt/core/misc/crash_handler.h> #include <yt/yt/core/misc/ring_queue.h> #include <yt/yt/core/misc/shutdown.h> @@ -114,11 +113,16 @@ class TSerializedInvoker , public TInvokerProfileWrapper { public: - TSerializedInvoker(IInvokerPtr underlyingInvoker, const NProfiling::TTagSet& tagSet, NProfiling::IRegistryImplPtr registry) + TSerializedInvoker( + IInvokerPtr underlyingInvoker, + const NProfiling::TTagSet& tagSet, + NProfiling::IRegistryImplPtr registry) : TInvokerWrapper(std::move(underlyingInvoker)) , TInvokerProfileWrapper(std::move(registry), "/serialized", tagSet) { } + using TInvokerWrapper::Invoke; + void Invoke(TClosure callback) override { auto wrappedCallback = WrapCallback(std::move(callback)); @@ -174,7 +178,6 @@ private: private: TIntrusivePtr<TSerializedInvoker> Owner_; bool Activated_ = false; - }; void TrySchedule(TGuard<NThreading::TSpinLock>&& guard) @@ -345,7 +348,12 @@ public: void Invoke(TClosure callback, i64 /*priority*/) override { - return UnderlyingInvoker_->Invoke(std::move(callback)); + Invoke(std::move(callback)); + } + + void Invoke(TClosure callback) override + { + UnderlyingInvoker_->Invoke(std::move(callback)); } }; @@ -378,7 +386,6 @@ public: private: const IPrioritizedInvokerPtr UnderlyingInvoker_; const i64 Priority_; - }; IInvokerPtr CreateFixedPriorityInvoker( @@ -408,6 +415,8 @@ public: , MaxConcurrentInvocations_(maxConcurrentInvocations) { } + using TInvokerWrapper::Invoke; + void Invoke(TClosure callback) override { auto guard = Guard(SpinLock_); @@ -706,41 +715,6 @@ ISuspendableInvokerPtr CreateSuspendableInvoker(IInvokerPtr underlyingInvoker) //////////////////////////////////////////////////////////////////////////////// -class TCodicilGuardedInvoker - : public TInvokerWrapper<false> -{ -public: - TCodicilGuardedInvoker(IInvokerPtr invoker, TString codicil) - : TInvokerWrapper(std::move(invoker)) - , Codicil_(std::move(codicil)) - { } - - void Invoke(TClosure callback) override - { - UnderlyingInvoker_->Invoke(BIND_NO_PROPAGATE( - &TCodicilGuardedInvoker::RunCallback, - MakeStrong(this), - Passed(std::move(callback)))); - } - -private: - const TString Codicil_; - - void RunCallback(TClosure callback) - { - TCurrentInvokerGuard currentInvokerGuard(this); - TCodicilGuard codicilGuard(Codicil_); - callback(); - } -}; - -IInvokerPtr CreateCodicilGuardedInvoker(IInvokerPtr underlyingInvoker, TString codicil) -{ - return New<TCodicilGuardedInvoker>(std::move(underlyingInvoker), std::move(codicil)); -} - -//////////////////////////////////////////////////////////////////////////////// - class TWatchdogInvoker : public TInvokerWrapper<false> { @@ -754,6 +728,8 @@ public: , Threshold_(DurationToCpuDuration(threshold)) { } + using TInvokerWrapper::Invoke; + void Invoke(TClosure callback) override { UnderlyingInvoker_->Invoke(BIND_NO_PROPAGATE( @@ -763,8 +739,8 @@ public: } private: - NLogging::TLogger Logger; - TCpuDuration Threshold_; + const NLogging::TLogger Logger; + const TCpuDuration Threshold_; void RunCallback(TClosure callback) { diff --git a/yt/yt/core/concurrency/action_queue.h b/yt/yt/core/concurrency/action_queue.h index 3d2e4d8668..77c92bc490 100644 --- a/yt/yt/core/concurrency/action_queue.h +++ b/yt/yt/core/concurrency/action_queue.h @@ -94,14 +94,6 @@ ISuspendableInvokerPtr CreateSuspendableInvoker(IInvokerPtr underlyingInvoker); //////////////////////////////////////////////////////////////////////////////// -//! Creates an invoker that creates a codicil guard with a given string before each -//! callback invocation. -IInvokerPtr CreateCodicilGuardedInvoker( - IInvokerPtr underlyingInvoker, - TString codicil); - -//////////////////////////////////////////////////////////////////////////////// - //! Creates an invoker that emits warning into #logger when callback executes //! longer than #threshold without interruptions. IInvokerPtr CreateWatchdogInvoker( diff --git a/yt/yt/core/concurrency/delayed_executor.cpp b/yt/yt/core/concurrency/delayed_executor.cpp index 169a86280e..d86e8e1197 100644 --- a/yt/yt/core/concurrency/delayed_executor.cpp +++ b/yt/yt/core/concurrency/delayed_executor.cpp @@ -334,7 +334,10 @@ private: // NB: The callbacks are forwarded to the DelayedExecutor thread to prevent any user-code // from leaking to the Delayed Poller thread, which is, e.g., fiber-unfriendly. auto runAbort = [&] (const TDelayedExecutorEntryPtr& entry) { - RunCallback(entry, /*aborted*/ true); + if (auto callback = TakeCallback(entry)) { + const auto& invoker = entry->Invoker ? entry->Invoker : DelayedInvoker_; + invoker->Invoke(BIND_NO_PROPAGATE(TCallbackGuard(std::move(callback), /*aborted*/ true))); + } }; for (const auto& entry : ScheduledEntries_) { runAbort(entry); @@ -405,32 +408,33 @@ private: } ScheduledCallbacksGauge_.Update(ScheduledEntries_.size()); + THashMap<IInvokerPtr, std::vector<TClosure>> invokerToCallbacks; while (!ScheduledEntries_.empty()) { auto it = ScheduledEntries_.begin(); const auto& entry = *it; + if (entry->Deadline > now + CoalescingInterval) { break; } + if (entry->Deadline + LateWarningThreshold < now) { StaleCallbacksCounter_.Increment(); YT_LOG_DEBUG("Found a late delayed scheduled callback (Deadline: %v, Now: %v)", entry->Deadline, now); } - RunCallback(entry, false); + + if (auto callback = TakeCallback(entry)) { + auto [it, _] = invokerToCallbacks.emplace(std::move(entry->Invoker), std::vector<TClosure>()); + it->second.push_back(BIND_NO_PROPAGATE(TCallbackGuard(std::move(callback), /*abort*/ false))); + } + entry->Iterator.reset(); ScheduledEntries_.erase(it); } - } - void RunCallback(const TDelayedExecutorEntryPtr& entry, bool abort) - { - if (auto callback = TakeCallback(entry)) { - const auto& invoker = entry->Invoker - ? entry->Invoker - : DelayedInvoker_; - invoker - ->Invoke(BIND_NO_PROPAGATE(TCallbackGuard(std::move(callback), abort))); + for (auto& [invoker, callbacks] : invokerToCallbacks) { + (invoker ? invoker : DelayedInvoker_)->Invoke(TMutableRange(callbacks)); } } }; diff --git a/yt/yt/core/concurrency/fair_share_invoker_pool.cpp b/yt/yt/core/concurrency/fair_share_invoker_pool.cpp index f30d4e2616..7e5644710a 100644 --- a/yt/yt/core/concurrency/fair_share_invoker_pool.cpp +++ b/yt/yt/core/concurrency/fair_share_invoker_pool.cpp @@ -549,6 +549,15 @@ private: } } + void Invoke(TMutableRange<TClosure> callbacks) override + { + if (auto strongParent = Parent_.Lock()) { + for (auto& callback : callbacks) { + strongParent->Enqueue(std::move(callback), Index_); + } + } + } + private: const int Index_; const TWeakPtr<TFairShareInvokerPool> Parent_; diff --git a/yt/yt/core/concurrency/unittests/async_stream_ut.cpp b/yt/yt/core/concurrency/unittests/async_stream_ut.cpp index 48f09107f4..3ada04c164 100644 --- a/yt/yt/core/concurrency/unittests/async_stream_ut.cpp +++ b/yt/yt/core/concurrency/unittests/async_stream_ut.cpp @@ -123,7 +123,7 @@ public: { } protected: - virtual size_t DoRead(void* buf, size_t len) override + size_t DoRead(void* buf, size_t len) override { return InputStream_->Read(buf, std::min(len, MaxBlockSize_)); } diff --git a/yt/yt/core/concurrency/unittests/invoker_pool_ut.cpp b/yt/yt/core/concurrency/unittests/invoker_pool_ut.cpp index 188f08e436..498f18ecc5 100644 --- a/yt/yt/core/concurrency/unittests/invoker_pool_ut.cpp +++ b/yt/yt/core/concurrency/unittests/invoker_pool_ut.cpp @@ -57,13 +57,15 @@ public: , InvocationCount_(0) { } + using TInvokerWrapper::Invoke; + void Invoke(TClosure callback) override { ++InvocationCount_; if (Bounded_ ) { EXPECT_TRUE(Parent_.Lock()); } - TInvokerWrapper::Invoke(std::move(callback)); + UnderlyingInvoker_->Invoke(std::move(callback)); } void Bound(const IMockInvokerPoolPtr& parent) diff --git a/yt/yt/core/concurrency/unittests/scheduler_ut.cpp b/yt/yt/core/concurrency/unittests/scheduler_ut.cpp index 8b4c5978a9..15320fae90 100644 --- a/yt/yt/core/concurrency/unittests/scheduler_ut.cpp +++ b/yt/yt/core/concurrency/unittests/scheduler_ut.cpp @@ -1075,6 +1075,8 @@ public: : TInvokerWrapper(std::move(underlyingInvoker)) { } + using TInvokerWrapper::Invoke; + void Invoke(TClosure callback) override { UnderlyingInvoker_->Invoke(BIND( diff --git a/yt/yt/core/http/retrying_client.cpp b/yt/yt/core/http/retrying_client.cpp index f386b33bec..d441f1b7b3 100644 --- a/yt/yt/core/http/retrying_client.cpp +++ b/yt/yt/core/http/retrying_client.cpp @@ -27,13 +27,13 @@ public: : RetryChecker_(retryChecker ? std::move(retryChecker) : BIND(&DefaultRetryChecker)) { } - virtual bool IsRetriableError(const TError& error) override + bool IsRetriableError(const TError& error) override { return RetryChecker_(error); } - virtual TError CheckError(const IResponsePtr& response) override = 0; - virtual NYTree::INodePtr GetFormattedResponse() const override = 0; + TError CheckError(const IResponsePtr& response) override = 0; + NYTree::INodePtr GetFormattedResponse() const override = 0; protected: static bool DefaultRetryChecker(const TError& /*error*/) diff --git a/yt/yt/core/json/json_writer.cpp b/yt/yt/core/json/json_writer.cpp index 6a74bed6c2..267fc0dea7 100644 --- a/yt/yt/core/json/json_writer.cpp +++ b/yt/yt/core/json/json_writer.cpp @@ -20,7 +20,7 @@ class TJsonWriter { public: TJsonWriter(IOutputStream* output, bool isPretty); - virtual ~TJsonWriter() override; + ~TJsonWriter() override; void Flush() override; void OnStringScalar(TStringBuf value) override; diff --git a/yt/yt/core/misc/bit_packed_unsigned_vector-inl.h b/yt/yt/core/misc/bit_packed_unsigned_vector-inl.h index d1abbbbd0a..3dbb3ce3e5 100644 --- a/yt/yt/core/misc/bit_packed_unsigned_vector-inl.h +++ b/yt/yt/core/misc/bit_packed_unsigned_vector-inl.h @@ -25,7 +25,7 @@ inline size_t CompressedUnsignedVectorSizeInWords(ui64 maxValue, size_t count) inline size_t CompressedUnsignedVectorSizeInBytes(ui64 maxValue, size_t count) { - static size_t wordSize = sizeof(ui64); + static constexpr size_t wordSize = sizeof(ui64); return CompressedUnsignedVectorSizeInWords(maxValue, count) * wordSize; } diff --git a/yt/yt/core/misc/bit_packed_unsigned_vector.cpp b/yt/yt/core/misc/bit_packed_unsigned_vector.cpp index 05ff7ed88f..bec799a9f1 100644 --- a/yt/yt/core/misc/bit_packed_unsigned_vector.cpp +++ b/yt/yt/core/misc/bit_packed_unsigned_vector.cpp @@ -8,24 +8,26 @@ namespace NYT { //////////////////////////////////////////////////////////////////////////////// -void PrepareDiffFromExpected(std::vector<ui32>* values, ui32* expected, ui32* maxDiff) +std::pair<ui32, ui32> PrepareDiffFromExpected(std::vector<ui32>* values) { + ui32 expected = 0; + ui32 maxDiff = 0; + if (values->empty()) { - *expected = 0; - *maxDiff = 0; - return; + return {expected, maxDiff}; } - *expected = DivRound<int>(values->back(), values->size()); + expected = DivRound<int>(values->back(), values->size()); - *maxDiff = 0; i64 expectedValue = 0; for (int i = 0; i < std::ssize(*values); ++i) { - expectedValue += *expected; - i32 diff = values->at(i) - expectedValue; + expectedValue += expected; + i32 diff = (*values)[i] - expectedValue; (*values)[i] = ZigZagEncode32(diff); - *maxDiff = std::max(*maxDiff, (*values)[i]); + maxDiff = std::max(maxDiff, (*values)[i]); } + + return {expected, maxDiff}; } //////////////////////////////////////////////////////////////////////////////// diff --git a/yt/yt/core/misc/bit_packed_unsigned_vector.h b/yt/yt/core/misc/bit_packed_unsigned_vector.h index 5a12aaba56..4f95561267 100644 --- a/yt/yt/core/misc/bit_packed_unsigned_vector.h +++ b/yt/yt/core/misc/bit_packed_unsigned_vector.h @@ -59,7 +59,7 @@ private: //////////////////////////////////////////////////////////////////////////////// -void PrepareDiffFromExpected(std::vector<ui32>* values, ui32* expected, ui32* maxDiff); +std::pair<ui32, ui32> PrepareDiffFromExpected(std::vector<ui32>* values); //////////////////////////////////////////////////////////////////////////////// diff --git a/yt/yt/core/misc/codicil.cpp b/yt/yt/core/misc/codicil.cpp new file mode 100644 index 0000000000..6a43d9465c --- /dev/null +++ b/yt/yt/core/misc/codicil.cpp @@ -0,0 +1,112 @@ +#include "codicil.h" + +#include <yt/yt/core/concurrency/fls.h> + +#include <library/cpp/yt/small_containers/compact_vector.h> + +namespace NYT { + +//////////////////////////////////////////////////////////////////////////////// + +namespace NDetail { + +using TCodicilBuilderStack = TCompactVector<TCodicilBuilder, 16>; + +NConcurrency::TFlsSlot<TCodicilBuilderStack>& CodicilBuilderStackSlot() +{ + static NConcurrency::TFlsSlot<TCodicilBuilderStack> result; + return result; +} + +} // namespace NDetail + +void PushCodicilBuilder(TCodicilBuilder&& builder) +{ + NDetail::CodicilBuilderStackSlot()->push_back(std::move(builder)); +} + +void PopCodicilBuilder() +{ + auto& stack = *NDetail::CodicilBuilderStackSlot(); + YT_ASSERT(!stack.empty()); + stack.pop_back(); +} + +TRange<TCodicilBuilder> GetCodicilBuilders() +{ + // NB: Don't forcefully construct FLS slot to avoid allocations; + // these may lead to deadlocks if the program crashes during an allocation itself. + if (!NDetail::CodicilBuilderStackSlot().IsInitialized()) { + return {}; + } + return TRange(*NDetail::CodicilBuilderStackSlot()); +} + +std::vector<std::string> BuildCodicils() +{ + auto builders = GetCodicilBuilders(); + std::vector<std::string> result; + result.reserve(builders.size()); + TCodicilFormatter formatter; + for (const auto& builder : builders) { + formatter.Reset(); + builder(&formatter); + result.emplace_back(formatter.GetBuffer()); + } + return result; +} + +TCodicilBuilder MakeNonOwningCodicilBuilder(TStringBuf codicil) +{ + return [codicil] (TCodicilFormatter* formatter) { + formatter->AppendString(codicil); + }; +} + +TCodicilBuilder MakeOwningCodicilBuilder(std::string codicil) +{ + return [codicil = std::move(codicil)] (TCodicilFormatter* formatter) { + formatter->AppendString(codicil); + }; +} + +//////////////////////////////////////////////////////////////////////////////// + +TCodicilGuard::TCodicilGuard(TCodicilBuilder&& builder) + : Active_(true) +{ + PushCodicilBuilder(std::move(builder)); +} + +TCodicilGuard::~TCodicilGuard() +{ + Release(); +} + +TCodicilGuard::TCodicilGuard(TCodicilGuard&& other) + : Active_(other.Active_) +{ + other.Active_ = false; +} + +TCodicilGuard& TCodicilGuard::operator=(TCodicilGuard&& other) +{ + if (this != &other) { + Release(); + Active_ = other.Active_; + other.Active_ = false; + } + return *this; +} + +void TCodicilGuard::Release() +{ + if (Active_) { + PopCodicilBuilder(); + Active_ = false; + } +} + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT diff --git a/yt/yt/core/misc/codicil.h b/yt/yt/core/misc/codicil.h new file mode 100644 index 0000000000..4b2ed64636 --- /dev/null +++ b/yt/yt/core/misc/codicil.h @@ -0,0 +1,73 @@ +#pragma once + +#include <library/cpp/yt/string/raw_formatter.h> + +#include <library/cpp/yt/memory/range.h> + +namespace NYT { + +//////////////////////////////////////////////////////////////////////////////// + +/* + * "Codicils" are short human- and machine-readable strings organized + * into a per-fiber stack. + * + * When the crash handler is invoked, it dumps (alongside with the other + * useful stuff like backtrace) the codicils. + * + * For performance reasons, codicils are constructed lazily when needed. + */ + +constexpr auto MaxCodicilLength = 256; + +//! A formatter used to build codicils. +using TCodicilFormatter = TRawFormatter<MaxCodicilLength>; + +//! A callback that constructs a codicil. +using TCodicilBuilder = std::function<void(TCodicilFormatter* formatter)>; + +//! Installs a new codicil builder onto the stack. +void PushCodicilBuilder(TCodicilBuilder&& builder); + +//! Removes the top codicil builder from the stack. +void PopCodicilBuilder(); + +//! Returns the list of all currently installed codicil builders. +TRange<TCodicilBuilder> GetCodicilBuilders(); + +//! Invokes all registered codicil builders to construct codicils. +//! Not signal-safe; must only be used for testing purposes. +std::vector<std::string> BuildCodicils(); + +//! Creates a codicil builder holding a given string view. +TCodicilBuilder MakeNonOwningCodicilBuilder(TStringBuf codicil); + +//! Creates a codicil builder holding a given string. +TCodicilBuilder MakeOwningCodicilBuilder(std::string codicil); + +//////////////////////////////////////////////////////////////////////////////// + +//! Invokes #PushCodicilBuilder in ctor and #PopCodicilBuilder in dtor. +class TCodicilGuard +{ +public: + TCodicilGuard() = default; + TCodicilGuard(TCodicilBuilder&& builder); + + ~TCodicilGuard(); + + TCodicilGuard(const TCodicilGuard& othter) = delete; + TCodicilGuard(TCodicilGuard&& other); + + TCodicilGuard& operator=(const TCodicilGuard& other) = delete; + TCodicilGuard& operator=(TCodicilGuard&& other); + +private: + bool Active_ = false; + + void Release(); +}; + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT diff --git a/yt/yt/core/misc/crash_handler.cpp b/yt/yt/core/misc/crash_handler.cpp index 5bc29f0591..4f7eae4440 100644 --- a/yt/yt/core/misc/crash_handler.cpp +++ b/yt/yt/core/misc/crash_handler.cpp @@ -1,8 +1,8 @@ #include "crash_handler.h" -#include "signal_registry.h" #include <yt/yt/core/logging/log_manager.h> +#include <yt/yt/core/misc/codicil.h> #include <yt/yt/core/misc/proc.h> #include <yt/yt/core/concurrency/fls.h> @@ -136,26 +136,21 @@ void DumpTimeInfo() WriteToStderr(formatter); } -using TCodicilStack = std::vector<TString>; - -NConcurrency::TFlsSlot<TCodicilStack>& CodicilStackSlot() -{ - static NConcurrency::TFlsSlot<TCodicilStack> Slot; - return Slot; -} - -//! Dump codicils. +//! Dumps codicils. void DumpCodicils() { - // NB: Avoid constructing FLS slot to avoid allocations; these may lead to deadlocks if the - // program crashes during an allocation itself. - if (CodicilStackSlot().IsInitialized() && !CodicilStackSlot()->empty()) { + auto builders = GetCodicilBuilders(); + if (!builders.empty()) { WriteToStderr("*** Begin codicils\n"); - for (const auto& data : *CodicilStackSlot()) { - TFormatter formatter; - formatter.AppendString(data.c_str()); - formatter.AppendString("\n"); + TCodicilFormatter formatter; + for (const auto& builder : builders) { + formatter.Reset(); + builder(&formatter); WriteToStderr(formatter); + if (formatter.GetBytesRemaining() == 0) { + WriteToStderr(" (truncated)"); + } + WriteToStderr("\n"); } WriteToStderr("*** End codicils\n"); } @@ -555,71 +550,4 @@ void CrashSignalHandler(int /*signal*/) //////////////////////////////////////////////////////////////////////////////// -void PushCodicil(const TString& data) -{ -#ifdef _unix_ - NDetail::CodicilStackSlot()->push_back(data); -#else - Y_UNUSED(data); -#endif -} - -void PopCodicil() -{ -#ifdef _unix_ - YT_VERIFY(!NDetail::CodicilStackSlot()->empty()); - NDetail::CodicilStackSlot()->pop_back(); -#endif -} - -std::vector<TString> GetCodicils() -{ -#ifdef _unix_ - return *NDetail::CodicilStackSlot(); -#else - return {}; -#endif -} - -TCodicilGuard::TCodicilGuard() - : Active_(false) -{ } - -TCodicilGuard::TCodicilGuard(const TString& data) - : Active_(true) -{ - PushCodicil(data); -} - -TCodicilGuard::~TCodicilGuard() -{ - Release(); -} - -TCodicilGuard::TCodicilGuard(TCodicilGuard&& other) - : Active_(other.Active_) -{ - other.Active_ = false; -} - -TCodicilGuard& TCodicilGuard::operator=(TCodicilGuard&& other) -{ - if (this != &other) { - Release(); - Active_ = other.Active_; - other.Active_ = false; - } - return *this; -} - -void TCodicilGuard::Release() -{ - if (Active_) { - PopCodicil(); - Active_ = false; - } -} - -//////////////////////////////////////////////////////////////////////////////// - } // namespace NYT diff --git a/yt/yt/core/misc/crash_handler.h b/yt/yt/core/misc/crash_handler.h index 94f903bd30..6ae6abdc97 100644 --- a/yt/yt/core/misc/crash_handler.h +++ b/yt/yt/core/misc/crash_handler.h @@ -1,6 +1,6 @@ #pragma once -#include <util/generic/string.h> +#include <util/generic/strbuf.h> #ifdef _unix_ #include <signal.h> @@ -29,41 +29,6 @@ void DumpStackTrace(TCallback flushCallback, void* startPC = nullptr); //////////////////////////////////////////////////////////////////////////////// -// "Codicils" are short human- and machine-readable strings organized into a per-fiber stack. -// When the crash handler is invoked, it dumps (alongside with the other -// useful stuff like backtrace) the content of the latter stack. - -//! Installs a new codicil into the stack. -void PushCodicil(const TString& data); - -//! Removes the top codicils from the stack. -void PopCodicil(); - -//! Returns the list of the currently installed codicils. -std::vector<TString> GetCodicils(); - -//! Invokes #PushCodicil in ctor and #PopCodicil in dtor. -class TCodicilGuard -{ -public: - TCodicilGuard(); - explicit TCodicilGuard(const TString& data); - ~TCodicilGuard(); - - TCodicilGuard(const TCodicilGuard& other) = delete; - TCodicilGuard(TCodicilGuard&& other); - - TCodicilGuard& operator=(const TCodicilGuard& other) = delete; - TCodicilGuard& operator=(TCodicilGuard&& other); - -private: - bool Active_; - - void Release(); -}; - -//////////////////////////////////////////////////////////////////////////////// - } // namespace NYT #define CRASH_HANDLER_INL_H_ diff --git a/yt/yt/core/misc/protobuf_helpers-inl.h b/yt/yt/core/misc/protobuf_helpers-inl.h index d603aa3fce..39cf53d165 100644 --- a/yt/yt/core/misc/protobuf_helpers-inl.h +++ b/yt/yt/core/misc/protobuf_helpers-inl.h @@ -10,6 +10,8 @@ #include <library/cpp/yt/assert/assert.h> +#include <library/cpp/yt/misc/cast.h> + namespace NYT { //////////////////////////////////////////////////////////////////////////////// @@ -203,7 +205,7 @@ template <class T> requires TEnumTraits<T>::IsEnum && (!TEnumTraits<T>::IsBitEnum) void FromProto(T* original, int serialized) { - *original = static_cast<T>(serialized); + *original = CheckedEnumCast<T>(serialized); } template <class T> @@ -217,7 +219,7 @@ template <class T> requires TEnumTraits<T>::IsBitEnum void FromProto(T* original, ui64 serialized) { - *original = static_cast<T>(serialized); + *original = CheckedEnumCast<T>(serialized); } //////////////////////////////////////////////////////////////////////////////// diff --git a/yt/yt/core/misc/unittests/codicil_ut.cpp b/yt/yt/core/misc/unittests/codicil_ut.cpp index 94d81af601..cd154c8fa6 100644 --- a/yt/yt/core/misc/unittests/codicil_ut.cpp +++ b/yt/yt/core/misc/unittests/codicil_ut.cpp @@ -1,11 +1,12 @@ #include <yt/yt/core/test_framework/framework.h> -#include <yt/yt/core/misc/crash_handler.h> +#include <yt/yt/core/misc/codicil.h> #include <yt/yt/core/concurrency/action_queue.h> #include <yt/yt/core/concurrency/delayed_executor.h> #include <yt/yt/core/actions/bind.h> +#include <yt/yt/core/actions/codicil_guarded_invoker.h> #include <yt/yt/core/actions/future.h> namespace NYT { @@ -17,22 +18,31 @@ using namespace NConcurrency; TEST(TCodicilTest, Simple) { - const auto codicil1 = TString("codicil1"); - const auto codicil2 = TString("codicil2"); - TCodicilGuard guard1(codicil1); - TCodicilGuard guard2(codicil2); - EXPECT_EQ(GetCodicils(), (std::vector{codicil1, codicil2})); + const auto codicil1 = std::string("codicil1"); + const auto codicil2 = std::string("codicil2"); + auto guard1 = TCodicilGuard(MakeOwningCodicilBuilder(codicil1)); + auto guard2 = TCodicilGuard(MakeOwningCodicilBuilder(codicil2)); + EXPECT_EQ(BuildCodicils(), (std::vector{codicil1, codicil2})); +} + +TEST(TCodicilTest, MaxLength) +{ + const auto codicil = std::string(MaxCodicilLength + 1, 'x'); + auto guard = TCodicilGuard(MakeOwningCodicilBuilder(codicil)); + auto codicils = BuildCodicils(); + EXPECT_EQ(std::ssize(codicils), 1); + EXPECT_EQ(codicils[0], codicil.substr(0, MaxCodicilLength)); } TEST(TCodicilTest, CodicilGuardedInvoker) { - const auto codicil = TString("codicil"); + const auto codicil = std::string("codicil"); auto actionQueue = New<TActionQueue>("ActionQueue"); auto invoker = CreateCodicilGuardedInvoker(actionQueue->GetInvoker(), codicil); BIND([&] { - EXPECT_EQ(GetCodicils(), (std::vector{codicil})); + EXPECT_EQ(BuildCodicils(), (std::vector{codicil})); TDelayedExecutor::WaitForDuration(TDuration::MilliSeconds(100)); - EXPECT_EQ(GetCodicils(), (std::vector{codicil})); + EXPECT_EQ(BuildCodicils(), (std::vector{codicil})); }) .AsyncVia(invoker) .Run() diff --git a/yt/yt/core/phoenix/type_def-inl.h b/yt/yt/core/phoenix/type_def-inl.h index 34112eaad5..ff39a471b1 100644 --- a/yt/yt/core/phoenix/type_def-inl.h +++ b/yt/yt/core/phoenix/type_def-inl.h @@ -1055,6 +1055,12 @@ struct TSerializer } template <class T, class C> + static void Save(C& context, const TWeakPtr<T>& ptr) + { + SaveImpl(context, ptr.Lock().Get()); + } + + template <class T, class C> static void SaveImpl(C& context, T* ptr) { using NYT::Save; @@ -1127,6 +1133,14 @@ struct TSerializer LoadImpl</*Inplace*/ true>(context, rawPtr); } + template <class T, class C> + static void Load(C& context, TWeakPtr<T>& ptr) + { + T* rawPtr = nullptr; + LoadImpl</*Inplace*/ false>(context, rawPtr); + ptr.Reset(rawPtr); + } + template <bool Inplace, class T, class C> static void LoadImpl(C& context, T*& rawPtr) { @@ -1188,7 +1202,8 @@ template <class T, class C> requires (std::derived_from<C, NPhoenix2::NDetail::TContextBase>) && ( std::same_as<T, TIntrusivePtr<typename T::TUnderlying>> || std::same_as<T, std::unique_ptr<typename T::element_type>> || - std::is_pointer_v<T>) + std::is_pointer_v<T> || + std::same_as<T, TWeakPtr<typename T::TUnderlying>>) struct TSerializerTraits<T, C> { using TSerializer = NPhoenix2::NDetail::TSerializer; diff --git a/yt/yt/core/phoenix/unittests/phoenix_ut.cpp b/yt/yt/core/phoenix/unittests/phoenix_ut.cpp index 950f22eb64..ac4ad2163d 100644 --- a/yt/yt/core/phoenix/unittests/phoenix_ut.cpp +++ b/yt/yt/core/phoenix/unittests/phoenix_ut.cpp @@ -1050,6 +1050,98 @@ TEST(TPhoenixTest, RawPtrCycle2) //////////////////////////////////////////////////////////////////////////////// +namespace NWeakPtrCycle { + +struct A + : public TRefCounted +{ + TWeakPtr<A> X; + + PHOENIX_DECLARE_TYPE(A, 0x5e1325ef); +}; + +void A::RegisterMetadata(auto&& registrar) +{ + PHOENIX_REGISTER_FIELD(1, X)(); +} + +PHOENIX_DEFINE_TYPE(A); + +struct B + : public TRefCounted +{ + TIntrusivePtr<A> L; + TIntrusivePtr<A> R; + + PHOENIX_DECLARE_TYPE(B, 0x7ccd0099); +}; + +void B::RegisterMetadata(auto&& registrar) +{ + PHOENIX_REGISTER_FIELD(1, L)(); + PHOENIX_REGISTER_FIELD(2, R)(); +} + +PHOENIX_DEFINE_TYPE(B); + +} // namespace NWeakPtrCycle + +TEST(TPhoenixTest, WeakPtrCycle1) +{ + using namespace NWeakPtrCycle; + + auto a1 = New<A>(); + EXPECT_EQ(a1->GetWeakRefCount(), 1); + + a1->X = a1; + + auto a2 = Deserialize<TIntrusivePtr<A>>(Serialize(a1)); + EXPECT_EQ(a2->GetRefCount(), 1); + EXPECT_EQ(a2->GetWeakRefCount(), 2); + EXPECT_TRUE(a2->X.Lock()); + EXPECT_EQ(a2->X.Lock(), a2); +} + +TEST(TPhoenixTest, WeakPtrCycle2) +{ + using namespace NWeakPtrCycle; + + auto a1 = New<A>(); + auto a2 = New<A>(); + a1->X = a2; + a2->X = a1; + a2 = nullptr; + + auto a3 = Deserialize<TIntrusivePtr<A>>(Serialize(a1)); + EXPECT_EQ(a3->GetRefCount(), 1); + EXPECT_EQ(a3->GetWeakRefCount(), 1); + EXPECT_TRUE(!a3->X.Lock()); + + auto a4 = Deserialize<TWeakPtr<A>>(Serialize(a1->X)); + EXPECT_TRUE(!a4.Lock()); +} + +TEST(TPhoenixTest, WeakPtrCycle3) +{ + using namespace NWeakPtrCycle; + + auto b1 = New<B>(); + b1->L = New<A>(); + b1->R = New<A>(); + b1->L->X = b1->R; + b1->R->X = b1->L; + + auto b2 = Deserialize<TIntrusivePtr<B>>(Serialize(b1)); + EXPECT_EQ(b2->L->GetRefCount(), 1); + EXPECT_EQ(b2->L->GetWeakRefCount(), 2); + EXPECT_EQ(b2->R->GetRefCount(), 1); + EXPECT_EQ(b2->R->GetWeakRefCount(), 2); + EXPECT_EQ(b2->L->X.Lock(), b2->R); + EXPECT_EQ(b2->R->X.Lock(), b2->L); +} + +//////////////////////////////////////////////////////////////////////////////// + namespace NIntrusiveAndRawPtr { struct A; @@ -1107,6 +1199,104 @@ TEST(TPhoenixTest, IntrusiveAndRawPtr) //////////////////////////////////////////////////////////////////////////////// +namespace NIntrusiveAndWeakPtr { + +struct A; +struct B; + +struct A + : public TRefCounted +{ + B* X = nullptr; + TIntrusivePtr<B> Y; + + PHOENIX_DECLARE_TYPE(A, 0xab7d77a9); +}; + +void A::RegisterMetadata(auto&& registrar) +{ + PHOENIX_REGISTER_FIELD(1, X)(); + PHOENIX_REGISTER_FIELD(2, Y)(); +} + +PHOENIX_DEFINE_TYPE(A); + +struct B + : public TRefCounted +{ + int V = -1; + TWeakPtr<A> W; + + PHOENIX_DECLARE_TYPE(B, 0xea924741); +}; + +void B::RegisterMetadata(auto&& registrar) +{ + registrar.template Field<1, &B::V>("v")(); + PHOENIX_REGISTER_FIELD(2, W)(); +} + +PHOENIX_DEFINE_TYPE(B); + +struct C + : public TRefCounted +{ + TIntrusivePtr<A> APtr; + TIntrusivePtr<B> BPtr; + TWeakPtr<B> BWeakPtr; + + PHOENIX_DECLARE_TYPE(C, 0xea038112); +}; + +void C::RegisterMetadata(auto&& registrar) +{ + PHOENIX_REGISTER_FIELD(1, APtr)(); + PHOENIX_REGISTER_FIELD(2, BPtr)(); + PHOENIX_REGISTER_FIELD(3, BWeakPtr)(); +} + +PHOENIX_DEFINE_TYPE(C); + +} // namespace NIntrusiveAndWeakPtr + +TEST(TPhoenixTest, IntrusiveAndWeakPtr) +{ + using namespace NIntrusiveAndWeakPtr; + + auto c1 = New<C>(); + c1->APtr = New<A>(); + c1->BPtr = New<B>(); + c1->BWeakPtr = c1->BPtr; + c1->BPtr->W = c1->APtr; + c1->APtr->Y = c1->BPtr; + EXPECT_EQ(c1->BPtr->GetRefCount(), 2); + EXPECT_EQ(c1->APtr->Y->GetRefCount(), 2); + + c1->APtr->Y->V = 7; + c1->APtr->X = c1->APtr->Y.Get(); + + auto c2 = Deserialize<TIntrusivePtr<C>>(Serialize(c1)); + EXPECT_EQ(c2->APtr->GetRefCount(), 1); + EXPECT_EQ(c2->APtr->GetWeakRefCount(), c1->APtr->GetWeakRefCount()); + EXPECT_EQ(c2->APtr->Y->GetRefCount(), 2); + EXPECT_EQ(c2->APtr->Y->GetWeakRefCount(), c1->APtr->Y->GetWeakRefCount()); + EXPECT_EQ(c2->APtr->Y->W.Lock(), c2->APtr); + EXPECT_EQ(c2->APtr->Y->V, 7); + EXPECT_EQ(c2->APtr->X, c2->APtr->Y); + + EXPECT_EQ(c2->BPtr->GetRefCount(), 2); + EXPECT_EQ(c2->BPtr->GetWeakRefCount(), c1->BPtr->GetWeakRefCount()); + EXPECT_EQ(c2->BPtr->W.Lock(), c2->APtr); + EXPECT_EQ(c2->BPtr->W.Lock()->Y, c2->BPtr); + EXPECT_EQ(c2->BPtr->V, 7); + + EXPECT_EQ(c2->BWeakPtr.Lock()->GetWeakRefCount(), c1->BPtr->GetWeakRefCount()); + EXPECT_EQ(c2->BWeakPtr.Lock()->GetRefCount(), 3); + EXPECT_EQ(c2->BWeakPtr.Lock(), c2->BPtr); +} + +//////////////////////////////////////////////////////////////////////////////// + namespace NPersistentPolymorphic { struct TBase diff --git a/yt/yt/core/rpc/client.cpp b/yt/yt/core/rpc/client.cpp index 935a9af200..35b7ee60f7 100644 --- a/yt/yt/core/rpc/client.cpp +++ b/yt/yt/core/rpc/client.cpp @@ -609,7 +609,7 @@ void TClientResponse::Deserialize(TSharedRefArray responseMessage) std::optional<NCompression::ECodec> bodyCodecId; NCompression::ECodec attachmentCodecId; if (Header_.has_codec()) { - bodyCodecId = attachmentCodecId = CheckedEnumCast<NCompression::ECodec>(Header_.codec()); + bodyCodecId = attachmentCodecId = FromProto<NCompression::ECodec>(Header_.codec()); } else { bodyCodecId = std::nullopt; attachmentCodecId = NCompression::ECodec::None; diff --git a/yt/yt/core/rpc/grpc/channel.cpp b/yt/yt/core/rpc/grpc/channel.cpp index 81a17d4d01..b6ea1fd384 100644 --- a/yt/yt/core/rpc/grpc/channel.cpp +++ b/yt/yt/core/rpc/grpc/channel.cpp @@ -126,7 +126,7 @@ DEFINE_ENUM(EClientCallStage, ); class TChannel - : public IChannel + : public NYT::NRpc::NGrpc::IGrpcChannel { public: explicit TChannel(TChannelConfigPtr config) @@ -221,6 +221,11 @@ public: return MemoryUsageTracker_; } + grpc_connectivity_state CheckConnectivityState(bool tryToConnect) override + { + return grpc_channel_check_connectivity_state(Channel_.Unwrap(), tryToConnect); + } + private: const TChannelConfigPtr Config_; const TString EndpointAddress_; @@ -736,7 +741,7 @@ public: } // namespace -IChannelPtr CreateGrpcChannel(TChannelConfigPtr config) +IGrpcChannelPtr CreateGrpcChannel(TChannelConfigPtr config) { return New<TChannel>(std::move(config)); } diff --git a/yt/yt/core/rpc/grpc/channel.h b/yt/yt/core/rpc/grpc/channel.h index 1a11b35254..9e5c08f655 100644 --- a/yt/yt/core/rpc/grpc/channel.h +++ b/yt/yt/core/rpc/grpc/channel.h @@ -3,13 +3,27 @@ #include "public.h" #include <yt/yt/core/rpc/public.h> +#include <yt/yt/core/rpc/channel.h> + +#include <contrib/libs/grpc/include/grpc/impl/connectivity_state.h> namespace NYT::NRpc::NGrpc { //////////////////////////////////////////////////////////////////////////////// +struct IGrpcChannel + : public NRpc::IChannel +{ +public: + virtual grpc_connectivity_state CheckConnectivityState(bool tryToConnect) = 0; +}; + +DEFINE_REFCOUNTED_TYPE(IGrpcChannel) + +//////////////////////////////////////////////////////////////////////////////// + //! Creates a channel implemented via GRPC. -NRpc::IChannelPtr CreateGrpcChannel(TChannelConfigPtr config); +IGrpcChannelPtr CreateGrpcChannel(TChannelConfigPtr config); //! Returns the factory for creating GRPC channels. NRpc::IChannelFactoryPtr GetGrpcChannelFactory(); diff --git a/yt/yt/core/rpc/grpc/helpers.cpp b/yt/yt/core/rpc/grpc/helpers.cpp index bfe01bf540..42b7819013 100644 --- a/yt/yt/core/rpc/grpc/helpers.cpp +++ b/yt/yt/core/rpc/grpc/helpers.cpp @@ -413,7 +413,7 @@ TSharedRef ExtractMessageFromEnvelopedMessage(const TSharedRef& data) auto compressedMessage = data.Slice(sourceMessage, sourceMessage + fixedHeader->MessageSize); - auto codecId = CheckedEnumCast<NCompression::ECodec>(envelope.codec()); + auto codecId = FromProto<NCompression::ECodec>(envelope.codec()); auto* codec = NCompression::GetCodec(codecId); return codec->Decompress(compressedMessage); } diff --git a/yt/yt/core/rpc/grpc/helpers.h b/yt/yt/core/rpc/grpc/helpers.h index b6b775c483..a9cff4f292 100644 --- a/yt/yt/core/rpc/grpc/helpers.h +++ b/yt/yt/core/rpc/grpc/helpers.h @@ -244,7 +244,7 @@ private: size_t AvailableBytes_ = 0; size_t RemainingBytes_; - virtual size_t DoRead(void* buf, size_t len) override; + size_t DoRead(void* buf, size_t len) override; bool ReadNextSlice(); }; diff --git a/yt/yt/core/rpc/grpc/public.h b/yt/yt/core/rpc/grpc/public.h index a34e2e768a..7cb846bf33 100644 --- a/yt/yt/core/rpc/grpc/public.h +++ b/yt/yt/core/rpc/grpc/public.h @@ -15,6 +15,8 @@ DECLARE_REFCOUNTED_CLASS(TChannelCredentialsConfig) DECLARE_REFCOUNTED_CLASS(TChannelConfigTemplate) DECLARE_REFCOUNTED_CLASS(TChannelConfig) +DECLARE_REFCOUNTED_STRUCT(IGrpcChannel) + //////////////////////////////////////////////////////////////////////////////// extern const char* const TracingTraceIdMetadataKey; diff --git a/yt/yt/core/rpc/http/channel.cpp b/yt/yt/core/rpc/http/channel.cpp index 1f2d6c1bff..c4b5200c16 100644 --- a/yt/yt/core/rpc/http/channel.cpp +++ b/yt/yt/core/rpc/http/channel.cpp @@ -272,7 +272,7 @@ private: THeadersPtr httpHeaders = New<THeaders>(); if (rpcHeader.has_request_format()) { - auto format = CheckedEnumCast<EMessageFormat>(rpcHeader.request_format()); + auto format = FromProto<EMessageFormat>(rpcHeader.request_format()); httpHeaders->Add(ContentTypeHeaderName, ToHttpContentType(format)); } @@ -281,7 +281,7 @@ private: } if (rpcHeader.has_response_format()) { - auto format = CheckedEnumCast<EMessageFormat>(rpcHeader.response_format()); + auto format = FromProto<EMessageFormat>(rpcHeader.response_format()); httpHeaders->Add(AcceptHeaderName, ToHttpContentType(format)); } diff --git a/yt/yt/core/rpc/http/server.cpp b/yt/yt/core/rpc/http/server.cpp index e762424218..e0caa56bb7 100644 --- a/yt/yt/core/rpc/http/server.cpp +++ b/yt/yt/core/rpc/http/server.cpp @@ -116,7 +116,7 @@ public: YT_VERIFY(message.Size() >= 2); if (responseHeader.has_format()) { - auto format = CheckedEnumCast<EMessageFormat>(responseHeader.format()); + auto format = FromProto<EMessageFormat>(responseHeader.format()); Rsp_->GetHeaders()->Add("Content-Type", ToHttpContentType(format)); } diff --git a/yt/yt/core/rpc/response_keeper.cpp b/yt/yt/core/rpc/response_keeper.cpp index 2e1891bea4..f354c67590 100644 --- a/yt/yt/core/rpc/response_keeper.cpp +++ b/yt/yt/core/rpc/response_keeper.cpp @@ -380,7 +380,7 @@ bool ValidateHeaderAndParseRememberOption(const TSharedRefArray& responseMessage { NProto::TResponseHeader header; YT_VERIFY(TryParseResponseHeader(responseMessage, &header)); - return FromProto<EErrorCode>(header.error().code()) != EErrorCode::Unavailable; + return header.error().code() != ToUnderlying(EErrorCode::Unavailable); } void ValidateRetry(TMutationId mutationId, bool isRetry) diff --git a/yt/yt/core/rpc/server_detail.cpp b/yt/yt/core/rpc/server_detail.cpp index 94c992054d..ce9802a9dc 100644 --- a/yt/yt/core/rpc/server_detail.cpp +++ b/yt/yt/core/rpc/server_detail.cpp @@ -70,6 +70,10 @@ void TServiceContextBase::Initialize() RequestId_ = FromProto<TRequestId>(RequestHeader_->request_id()); RealmId_ = FromProto<TRealmId>(RequestHeader_->realm_id()); + MutationId_ = FromProto<TMutationId>(RequestHeader_->mutation_id()); + ServiceName_ = FromProto<std::string>(RequestHeader_->service()); + MethodName_ = FromProto<std::string>(RequestHeader_->method()); + AuthenticationIdentity_.User = RequestHeader_->has_user() ? RequestHeader_->user() : RootUserName; AuthenticationIdentity_.UserTag = RequestHeader_->has_user_tag() ? RequestHeader_->user_tag() : AuthenticationIdentity_.User; @@ -208,12 +212,7 @@ TSharedRefArray TServiceContextBase::BuildResponseMessage() // COMPAT(danilalexeev): legacy RPC codecs. if (IsResponseBodySerializedWithCompression()) { - if (RequestHeader_->has_response_codec()) { - header.set_codec(ToProto(ResponseCodec_)); - } else { - ResponseBody_ = PushEnvelope(ResponseBody_, ResponseCodec_); - ResponseAttachments_ = DecompressAttachments(ResponseAttachments_, ResponseCodec_); - } + header.set_codec(ToProto(ResponseCodec_)); } auto message = Error_.IsOK() @@ -223,7 +222,7 @@ TSharedRefArray TServiceContextBase::BuildResponseMessage() ResponseAttachments_) : CreateErrorResponseMessage(header); - auto responseMessageError = CheckBusMessageLimits(ResponseMessage_); + auto responseMessageError = CheckBusMessageLimits(message); if (!responseMessageError.IsOK()) { return CreateErrorResponseMessage(responseMessageError); } @@ -399,14 +398,14 @@ TMutationId TServiceContextBase::GetMutationId() const return FromProto<TMutationId>(RequestHeader_->mutation_id()); } -std::string TServiceContextBase::GetService() const +const std::string& TServiceContextBase::GetService() const { - return FromProto<std::string>(RequestHeader_->service()); + return ServiceName_; } -std::string TServiceContextBase::GetMethod() const +const std::string& TServiceContextBase::GetMethod() const { - return FromProto<std::string>(RequestHeader_->method()); + return MethodName_; } TRealmId TServiceContextBase::GetRealmId() const @@ -611,12 +610,12 @@ TMutationId TServiceContextWrapper::GetMutationId() const return UnderlyingContext_->GetMutationId(); } -std::string TServiceContextWrapper::GetService() const +const std::string& TServiceContextWrapper::GetService() const { return UnderlyingContext_->GetService(); } -std::string TServiceContextWrapper::GetMethod() const +const std::string& TServiceContextWrapper::GetMethod() const { return UnderlyingContext_->GetMethod(); } diff --git a/yt/yt/core/rpc/server_detail.h b/yt/yt/core/rpc/server_detail.h index 3683ca7477..ef4eeeeb0d 100644 --- a/yt/yt/core/rpc/server_detail.h +++ b/yt/yt/core/rpc/server_detail.h @@ -53,8 +53,8 @@ public: bool IsRetry() const override; TMutationId GetMutationId() const override; - std::string GetService() const override; - std::string GetMethod() const override; + const std::string& GetService() const override; + const std::string& GetMethod() const override; TRealmId GetRealmId() const override; const TAuthenticationIdentity& GetAuthenticationIdentity() const override; @@ -69,7 +69,6 @@ public: //! \note Thread affinity: any TFuture<TSharedRefArray> GetAsyncResponseMessage() const override; - const TSharedRefArray& GetResponseMessage() const override; using TCanceledCallback = TCallback<void(const TError&)>; @@ -131,6 +130,9 @@ protected: bool LoggingEnabled_; TRequestId RequestId_; TRealmId RealmId_; + TMutationId MutationId_; + std::string ServiceName_; + std::string MethodName_; TAuthenticationIdentity AuthenticationIdentity_; @@ -217,8 +219,8 @@ public: bool IsRetry() const override; TMutationId GetMutationId() const override; - std::string GetService() const override; - std::string GetMethod() const override; + const std::string& GetService() const override; + const std::string& GetMethod() const override; TRealmId GetRealmId() const override; const TAuthenticationIdentity& GetAuthenticationIdentity() const override; diff --git a/yt/yt/core/rpc/service.h b/yt/yt/core/rpc/service.h index def1b8dbf0..b9bd49473e 100644 --- a/yt/yt/core/rpc/service.h +++ b/yt/yt/core/rpc/service.h @@ -100,12 +100,10 @@ struct IServiceContext virtual TMutationId GetMutationId() const = 0; //! Returns request service name. - // NB: Service name is supposed to be short, so SSO should work. - virtual std::string GetService() const = 0; + virtual const std::string& GetService() const = 0; //! Returns request method name. - // NB: Method name is supposed to be short, so SSO should work. - virtual std::string GetMethod() const = 0; + virtual const std::string& GetMethod() const = 0; //! Returns request realm id. virtual TRealmId GetRealmId() const = 0; diff --git a/yt/yt/core/rpc/service_detail.cpp b/yt/yt/core/rpc/service_detail.cpp index d6c062b9c9..2cf6b573cd 100644 --- a/yt/yt/core/rpc/service_detail.cpp +++ b/yt/yt/core/rpc/service_detail.cpp @@ -22,7 +22,7 @@ #include <yt/yt/core/logging/log_manager.h> -#include <yt/yt/core/misc/crash_handler.h> +#include <yt/yt/core/misc/codicil.h> #include <yt/yt/core/misc/finally.h> #include <yt/yt/core/net/address.h> @@ -402,7 +402,6 @@ public: std::move(logger), incomingRequest.RuntimeInfo->LogLevel.load(std::memory_order::relaxed)) , Service_(std::move(service)) - , RequestId_(incomingRequest.RequestId) , ReplyBus_(std::move(incomingRequest.ReplyBus)) , RuntimeInfo_(incomingRequest.RuntimeInfo) , TraceContext_(std::move(incomingRequest.TraceContext)) @@ -561,8 +560,7 @@ public: RequestId_); if (RuntimeInfo_->Descriptor.StreamingEnabled) { - static const auto CanceledError = TError(NYT::EErrorCode::Canceled, "Request canceled"); - AbortStreamsUnlessClosed(CanceledError); + AbortStreamsUnlessClosed(TError(NYT::EErrorCode::Canceled, "Request canceled")); } CancelInstant_ = NProfiling::GetInstant(); @@ -586,8 +584,7 @@ public: stage); if (RuntimeInfo_->Descriptor.StreamingEnabled) { - static const auto TimedOutError = TError(NYT::EErrorCode::Timeout, "Request timed out"); - AbortStreamsUnlessClosed(TimedOutError); + AbortStreamsUnlessClosed(TError(NYT::EErrorCode::Timeout, "Request timed out")); } CanceledList_.Fire(GetCanceledError()); @@ -744,7 +741,6 @@ public: private: const TServiceBasePtr Service_; - const TRequestId RequestId_; const IBusPtr ReplyBus_; TRuntimeMethodInfo* const RuntimeInfo_; const TTraceContextPtr TraceContext_; @@ -800,10 +796,10 @@ private: { // COMPAT(danilalexeev): legacy RPC codecs RequestCodec_ = RequestHeader_->has_request_codec() - ? CheckedEnumCast<NCompression::ECodec>(RequestHeader_->request_codec()) + ? FromProto<NCompression::ECodec>(RequestHeader_->request_codec()) : NCompression::ECodec::None; ResponseCodec_ = RequestHeader_->has_response_codec() - ? CheckedEnumCast<NCompression::ECodec>(RequestHeader_->response_codec()) + ? FromProto<NCompression::ECodec>(RequestHeader_->response_codec()) : NCompression::ECodec::None; Service_->IncrementActiveRequestCount(); @@ -895,8 +891,7 @@ private: } if (RuntimeInfo_->Descriptor.StreamingEnabled) { - static const auto FinishedError = TError("Request finished"); - AbortStreamsUnlessClosed(Error_.IsOK() ? Error_ : FinishedError); + AbortStreamsUnlessClosed(Error_.IsOK() ? Error_ : TError("Request finished")); } DoSetComplete(); @@ -996,11 +991,20 @@ private: { const auto& authenticationIdentity = GetAuthenticationIdentity(); - TCodicilGuard codicilGuard(Format("RequestId: %v, Method: %v.%v, AuthenticationIdentity: %v", - GetRequestId(), - GetService(), - GetMethod(), - authenticationIdentity)); + TCodicilGuard codicilGuard([&] (TCodicilFormatter* formatter) { + formatter->AppendString("RequestId: "); + formatter->AppendGuid(RequestId_); + formatter->AppendString(", Service: "); + formatter->AppendString(GetService()); + formatter->AppendString(", Method: "); + formatter->AppendString(GetMethod()); + formatter->AppendString(", User: "); + formatter->AppendString(authenticationIdentity.User); + if (!authenticationIdentity.UserTag.empty() && authenticationIdentity.UserTag != authenticationIdentity.User) { + formatter->AppendString(", UserTag: "); + formatter->AppendString(authenticationIdentity.UserTag); + } + }); TCurrentAuthenticationIdentityGuard identityGuard(&authenticationIdentity); handler(this, descriptor.Options); } @@ -1110,9 +1114,9 @@ private: { TNullTraceContextGuard nullGuard; - YT_LOG_DEBUG("Request logging suppressed (RequestId: %v)", GetRequestId()); + YT_LOG_DEBUG("Request logging suppressed (RequestId: %v)", RequestId_); } - NLogging::TLogManager::Get()->SuppressRequest(GetRequestId()); + NLogging::TLogManager::Get()->SuppressRequest(RequestId_); } void DoSetComplete() @@ -1275,10 +1279,10 @@ private: NProto::TStreamingPayloadHeader header; ToProto(header.mutable_request_id(), RequestId_); - ToProto(header.mutable_service(), GetService()); - ToProto(header.mutable_method(), GetMethod()); - if (GetRealmId()) { - ToProto(header.mutable_realm_id(), GetRealmId()); + ToProto(header.mutable_service(), ServiceName_); + ToProto(header.mutable_method(), MethodName_); + if (RealmId_) { + ToProto(header.mutable_realm_id(), RealmId_); } header.set_sequence_number(payload->SequenceNumber); header.set_codec(ToProto(payload->Codec)); @@ -1317,10 +1321,10 @@ private: NProto::TStreamingFeedbackHeader header; ToProto(header.mutable_request_id(), RequestId_); - header.set_service(GetService()); - header.set_method(GetMethod()); - if (GetRealmId()) { - ToProto(header.mutable_realm_id(), GetRealmId()); + header.set_service(ToProto(ServiceName_)); + header.set_method(ToProto(MethodName_)); + if (RealmId_) { + ToProto(header.mutable_realm_id(), RealmId_); } header.set_read_position(feedback.ReadPosition); diff --git a/yt/yt/core/rpc/service_detail.h b/yt/yt/core/rpc/service_detail.h index b12bc45ebe..7c43182860 100644 --- a/yt/yt/core/rpc/service_detail.h +++ b/yt/yt/core/rpc/service_detail.h @@ -336,9 +336,17 @@ protected: const auto& underlyingContext = this->GetUnderlyingContext(); const auto& requestHeader = underlyingContext->GetRequestHeader(); - auto codecId = underlyingContext->GetResponseCodec(); - auto serializedBody = SerializeProtoToRefWithCompression(*Response_, codecId); - underlyingContext->SetResponseBodySerializedWithCompression(); + // COMPAT(danilalexeev): legacy RPC codecs. + NCompression::ECodec attachmentCodecId = NCompression::ECodec::None; + auto bodyCodecId = underlyingContext->GetResponseCodec(); + TSharedRef serializedBody; + if (requestHeader.has_response_codec()) { + serializedBody = SerializeProtoToRefWithCompression(*Response_, bodyCodecId); + attachmentCodecId = bodyCodecId; + underlyingContext->SetResponseBodySerializedWithCompression(); + } else { + serializedBody = SerializeProtoToRefWithEnvelope(*Response_, bodyCodecId); + } if (requestHeader.has_response_format()) { auto format = TryCheckedEnumCast<EMessageFormat>(requestHeader.response_format()); @@ -363,7 +371,7 @@ protected: } } - auto responseAttachments = CompressAttachments(Response_->Attachments(), codecId); + auto responseAttachments = CompressAttachments(Response_->Attachments(), attachmentCodecId); return TSerializedResponse{ .Body = std::move(serializedBody), diff --git a/yt/yt/core/rpc/unittests/lib/test_service.cpp b/yt/yt/core/rpc/unittests/lib/test_service.cpp index e71ffffc1a..f6477b361e 100644 --- a/yt/yt/core/rpc/unittests/lib/test_service.cpp +++ b/yt/yt/core/rpc/unittests/lib/test_service.cpp @@ -18,6 +18,8 @@ namespace NYT::NRpc { using namespace NConcurrency; +using NYT::FromProto; + //////////////////////////////////////////////////////////////////////////////// YT_DEFINE_GLOBAL(std::unique_ptr<NThreading::TEvent>, Latch_); @@ -140,7 +142,7 @@ public: DECLARE_RPC_SERVICE_METHOD(NTestRpc, Compression) { - auto requestCodecId = CheckedEnumCast<NCompression::ECodec>(request->request_codec()); + auto requestCodecId = FromProto<NCompression::ECodec>(request->request_codec()); auto serializedRequestBody = SerializeProtoToRefWithCompression(*request, requestCodecId); const auto& compressedRequestBody = context->GetRequestBody(); EXPECT_TRUE(TRef::AreBitwiseEqual(serializedRequestBody, compressedRequestBody)); diff --git a/yt/yt/core/rpc/unittests/mock/service.h b/yt/yt/core/rpc/unittests/mock/service.h index 72c13d7910..152e515794 100644 --- a/yt/yt/core/rpc/unittests/mock/service.h +++ b/yt/yt/core/rpc/unittests/mock/service.h @@ -123,13 +123,13 @@ public: (const, override)); MOCK_METHOD( - std::string, + const std::string&, GetService, (), (const, override)); MOCK_METHOD( - std::string, + const std::string&, GetMethod, (), (const, override)); diff --git a/yt/yt/core/tracing/trace_context.h b/yt/yt/core/tracing/trace_context.h index 8bdf90aabd..44f618a261 100644 --- a/yt/yt/core/tracing/trace_context.h +++ b/yt/yt/core/tracing/trace_context.h @@ -278,7 +278,7 @@ TTraceContext* GetCurrentTraceContext(); //! Flushes the elapsed time of the current trace context (if any). void FlushCurrentTraceContextElapsedTime(); -//! +//! Returns a trace context from #storage (null if there is none). TTraceContext* TryGetTraceContextFromPropagatingStorage(const NConcurrency::TPropagatingStorage& storage); //! Creates a new trace context. If the current trace context exists, it becomes the parent of the diff --git a/yt/yt/core/ya.make b/yt/yt/core/ya.make index 1948e8d250..59362bed77 100644 --- a/yt/yt/core/ya.make +++ b/yt/yt/core/ya.make @@ -14,6 +14,7 @@ NO_LTO() SRCS( actions/cancelation_token.cpp actions/cancelable_context.cpp + actions/codicil_guarded_invoker.cpp actions/current_invoker.cpp actions/future.cpp actions/invoker_detail.cpp @@ -118,6 +119,7 @@ SRCS( misc/blob_output.cpp misc/bloom_filter.cpp misc/checksum.cpp + misc/codicil.cpp misc/config.cpp misc/coro_pipe.cpp misc/crash_handler.cpp diff --git a/yt/yt/core/yson/unittests/yson_writer_ut.cpp b/yt/yt/core/yson/unittests/yson_writer_ut.cpp index 99414f0d94..118781fd44 100644 --- a/yt/yt/core/yson/unittests/yson_writer_ut.cpp +++ b/yt/yt/core/yson/unittests/yson_writer_ut.cpp @@ -6,6 +6,9 @@ #include <yt/yt/core/yson/parser.h> #include <yt/yt/core/yson/stream.h> +#include <yt/yt/core/ytree/ephemeral_node_factory.h> +#include <yt/yt/core/ytree/tree_builder.h> + #include <util/string/escape.h> namespace NYT::NYson { @@ -457,5 +460,20 @@ TEST(TYsonFragmentWriterTest, NoFirstIndent) //////////////////////////////////////////////////////////////////////////////// +TEST(TYsonTreeBuilderTest, MaxListSize) +{ + TString input = + "\"a1\" = {\n" + " \"key\" = 42;\n" + "};\n"; + + auto builder = NYTree::CreateBuilderFromFactory(NYTree::GetEphemeralNodeFactory(), /*treeSizeLimit*/ 0); + builder->BeginTree(); + TStringStream stream(input); + EXPECT_THROW(ParseYson(TYsonInput(&stream), builder.get()), std::exception); +} + +//////////////////////////////////////////////////////////////////////////////// + } // namespace } // namespace NYT::NYson diff --git a/yt/yt/core/ytree/ephemeral_node_factory.cpp b/yt/yt/core/ytree/ephemeral_node_factory.cpp index 878837a093..789fa6ace4 100644 --- a/yt/yt/core/ytree/ephemeral_node_factory.cpp +++ b/yt/yt/core/ytree/ephemeral_node_factory.cpp @@ -451,7 +451,7 @@ public: : ShouldHideAttributes_(shouldHideAttributes) { } - virtual ~TEphemeralNodeFactory() override + ~TEphemeralNodeFactory() override { RollbackIfNeeded(); } diff --git a/yt/yt/core/ytree/node_detail.cpp b/yt/yt/core/ytree/node_detail.cpp index cf785433d1..5660cc43e7 100644 --- a/yt/yt/core/ytree/node_detail.cpp +++ b/yt/yt/core/ytree/node_detail.cpp @@ -559,7 +559,7 @@ void TListNodeMixin::SetChild( void TSupportsSetSelfMixin::SetSelf( TReqSet* request, TRspSet* /*response*/, - const TCtxSetPtr &context) + const TCtxSetPtr& context) { bool force = request->force(); context->SetRequestInfo("Force: %v", force); diff --git a/yt/yt/core/ytree/ypath_client.cpp b/yt/yt/core/ytree/ypath_client.cpp index 810557403a..5cb9d8055e 100644 --- a/yt/yt/core/ytree/ypath_client.cpp +++ b/yt/yt/core/ytree/ypath_client.cpp @@ -225,7 +225,7 @@ void TYPathResponse::Deserialize(const TSharedRefArray& message) // COMPAT(danilalexeev): legacy RPC codecs auto codecId = header.has_codec() - ? std::make_optional(CheckedEnumCast<NCompression::ECodec>(header.codec())) + ? std::make_optional(FromProto<NCompression::ECodec>(header.codec())) : std::nullopt; if (!TryDeserializeBody(message[1], codecId)) { diff --git a/yt/yt/core/ytree/ypath_detail.cpp b/yt/yt/core/ytree/ypath_detail.cpp index 23a1610ca8..bcc24ea64b 100644 --- a/yt/yt/core/ytree/ypath_detail.cpp +++ b/yt/yt/core/ytree/ypath_detail.cpp @@ -1278,7 +1278,7 @@ class TNodeSetter private: \ I##name##Node* const Node_; \ \ - virtual ENodeType GetExpectedType() override \ + ENodeType GetExpectedType() override \ { \ return ENodeType::name; \ } diff --git a/yt/yt/core/ytree/ypath_detail.h b/yt/yt/core/ytree/ypath_detail.h index 797fff24d1..290da3a1fc 100644 --- a/yt/yt/core/ytree/ypath_detail.h +++ b/yt/yt/core/ytree/ypath_detail.h @@ -154,11 +154,12 @@ protected: DEFINE_RPC_SERVICE_METHOD(TSupports##method, method) \ { \ NYPath::TTokenizer tokenizer(GetRequestTargetYPath(context->RequestHeader())); \ - if (tokenizer.Advance() == NYPath::ETokenType::EndOfStream) { \ + tokenizer.Advance(); \ + tokenizer.Skip(NYPath::ETokenType::Ampersand); \ + if (tokenizer.GetType() == NYPath::ETokenType::EndOfStream) { \ method##Self(request, response, context); \ return; \ } \ - tokenizer.Skip(NYPath::ETokenType::Ampersand); \ if (tokenizer.GetType() != NYPath::ETokenType::Slash) { \ onPathError \ return; \ diff --git a/yt/yt/library/column_converters/string_column_converter.cpp b/yt/yt/library/column_converters/string_column_converter.cpp index 60b4e28337..797f64f4ee 100644 --- a/yt/yt/library/column_converters/string_column_converter.cpp +++ b/yt/yt/library/column_converters/string_column_converter.cpp @@ -128,9 +128,7 @@ private: auto offsets = GetDirectDenseOffsets(); // Save offsets as diff from expected. - ui32 expectedLength; - ui32 maxDiff; - PrepareDiffFromExpected(&offsets, &expectedLength, &maxDiff); + auto [expectedLength, maxDiff] = PrepareDiffFromExpected(&offsets); auto directDataSize = DirectBuffer_->GetSize(); auto directData = DirectBuffer_->Finish(); @@ -213,9 +211,7 @@ private: auto idsRef = TSharedRef::MakeCopy<TConverterTag>(TRef(ids.data(), sizeof(ui32) * ids.size())); // 2. Dictionary offsets. - ui32 expectedLength; - ui32 maxDiff; - PrepareDiffFromExpected(&dictionaryOffsets, &expectedLength, &maxDiff); + auto [expectedLength, maxDiff] = PrepareDiffFromExpected(&dictionaryOffsets); auto dictionaryOffsetsRef = TSharedRef::MakeCopy<TConverterTag>(TRef(dictionaryOffsets.data(), sizeof(ui32) * dictionaryOffsets.size())); auto primaryColumn = std::make_shared<TBatchColumn>(); diff --git a/yt/yt/library/formats/yamr_parser_base.h b/yt/yt/library/formats/yamr_parser_base.h index 56968d4908..66e5b306a8 100644 --- a/yt/yt/library/formats/yamr_parser_base.h +++ b/yt/yt/library/formats/yamr_parser_base.h @@ -28,7 +28,7 @@ class TYamrConsumerBase { public: explicit TYamrConsumerBase(NYson::IYsonConsumer* consumer); - virtual void SwitchTable(i64 tableIndex) override; + void SwitchTable(i64 tableIndex) override; protected: NYson::IYsonConsumer* Consumer; @@ -53,8 +53,8 @@ public: bool enableKeyEscaping, bool enableValueEscaping); - virtual void Read(TStringBuf data) override; - virtual void Finish() override; + void Read(TStringBuf data) override; + void Finish() override; private: using EState = EYamrDelimitedBaseParserState; @@ -123,8 +123,8 @@ public: bool enableSubkey, bool enableEom); - virtual void Read(TStringBuf data) override; - virtual void Finish() override; + void Read(TStringBuf data) override; + void Finish() override; private: using EState = EYamrLenvalBaseParserState; diff --git a/yt/yt/library/formats/yson_map_to_unversioned_value.h b/yt/yt/library/formats/yson_map_to_unversioned_value.h index 023a0600a3..656bf576d0 100644 --- a/yt/yt/library/formats/yson_map_to_unversioned_value.h +++ b/yt/yt/library/formats/yson_map_to_unversioned_value.h @@ -22,28 +22,28 @@ public: NTableClient::IValueConsumer* valueConsumer); void Reset(); - virtual void OnStringScalar(TStringBuf value) override; - virtual void OnInt64Scalar(i64 value) override; - virtual void OnUint64Scalar(ui64 value) override; - virtual void OnDoubleScalar(double value) override; - virtual void OnBooleanScalar(bool value) override; - virtual void OnEntity() override; - virtual void OnBeginList() override; - virtual void OnListItem() override; - virtual void OnBeginMap() override; - virtual void OnKeyedItem(TStringBuf name) override; - virtual void OnEndMap() override; - virtual void OnBeginAttributes() override; - virtual void OnEndList() override; - virtual void OnEndAttributes() override; + void OnStringScalar(TStringBuf value) override; + void OnInt64Scalar(i64 value) override; + void OnUint64Scalar(ui64 value) override; + void OnDoubleScalar(double value) override; + void OnBooleanScalar(bool value) override; + void OnEntity() override; + void OnBeginList() override; + void OnListItem() override; + void OnBeginMap() override; + void OnKeyedItem(TStringBuf name) override; + void OnEndMap() override; + void OnBeginAttributes() override; + void OnEndList() override; + void OnEndAttributes() override; private: - virtual const NTableClient::TNameTablePtr& GetNameTable() const override; - virtual bool GetAllowUnknownColumns() const override; - virtual void OnBeginRow() override; - virtual void OnValue(const NTableClient::TUnversionedValue& value) override; - virtual void OnEndRow() override; - virtual const NTableClient::TTableSchemaPtr& GetSchema() const override; + const NTableClient::TNameTablePtr& GetNameTable() const override; + bool GetAllowUnknownColumns() const override; + void OnBeginRow() override; + void OnValue(const NTableClient::TUnversionedValue& value) override; + void OnEndRow() override; + const NTableClient::TTableSchemaPtr& GetSchema() const override; private: NTableClient::IValueConsumer* const Consumer_; diff --git a/yt/yt/library/oom/oom.h b/yt/yt/library/oom/oom.h index 33a9206398..1fa01194dc 100644 --- a/yt/yt/library/oom/oom.h +++ b/yt/yt/library/oom/oom.h @@ -26,7 +26,7 @@ struct TTCMallocLimitHandlerOptions // Files structure would have the following form: // HeapDumpDirectory/<ActualName>_FilenameSuffix_Timestamp.ext. - TString FilenameSuffix = ""; + std::optional<TString> FilenameSuffix; TDuration Timeout = TDuration::Minutes(5); }; diff --git a/yt/yt/library/oom/tcmalloc_memory_limit_handler.cpp b/yt/yt/library/oom/tcmalloc_memory_limit_handler.cpp index ca624aaea0..cad154d211 100644 --- a/yt/yt/library/oom/tcmalloc_memory_limit_handler.cpp +++ b/yt/yt/library/oom/tcmalloc_memory_limit_handler.cpp @@ -35,11 +35,15 @@ namespace NYT { //////////////////////////////////////////////////////////////////////////////// +namespace { + TString MakeIncompletePath(const TString& path) { return NYT::Format("%v_incomplete", path); } +} // namespace + //////////////////////////////////////////////////////////////////////////////// void CollectAndDumpMemoryProfile(const TString& memoryProfilePath, tcmalloc::ProfileType profileType) @@ -205,7 +209,7 @@ private: { return NYT::MakeFormatterWrapper([this, ×tamp] (TStringBuilderBase* builder) { if (Options_.FilenameSuffix) { - builder->AppendFormat("%v_", Options_.FilenameSuffix); + builder->AppendFormat("%v_", *Options_.FilenameSuffix); } FormatValue(builder, timestamp, "v"); }); diff --git a/yt/yt/library/process/unittests/process_ut.cpp b/yt/yt/library/process/unittests/process_ut.cpp index 01a1468e59..15f807561f 100644 --- a/yt/yt/library/process/unittests/process_ut.cpp +++ b/yt/yt/library/process/unittests/process_ut.cpp @@ -209,6 +209,9 @@ TEST(TProcessTest, KillFinished) TEST(TProcessTest, KillZombie) { + // TODO(arkady-e1ppa): This code is for debugging test failures purposes + // remove it when investigation is complete. + ::signal(SIGCHLD, SIG_DFL); auto p = New<TSimpleProcess>("/bin/bash"); p->AddArgument("-c"); p->AddArgument("/bin/sleep 1; /bin/true"); diff --git a/yt/yt/library/profiling/solomon/cube.cpp b/yt/yt/library/profiling/solomon/cube.cpp index 93b54496b2..52a106556f 100644 --- a/yt/yt/library/profiling/solomon/cube.cpp +++ b/yt/yt/library/profiling/solomon/cube.cpp @@ -435,7 +435,7 @@ int TCube<T>::ReadSensors( }; if constexpr (std::is_same_v<T, i64> || std::is_same_v<T, TDuration>) { - if (options.ConvertCountersToRateGauge) { + if (options.ConvertCountersToRateGauge || options.ConvertCountersToDeltaGauge) { consumer->OnMetricBegin(NMonitoring::EMetricType::GAUGE); } else { consumer->OnMetricBegin(NMonitoring::EMetricType::RATE); @@ -455,6 +455,12 @@ int TCube<T>::ReadSensors( } else { consumer->OnDouble(time, value.SecondsFloat() / options.RateDenominator); } + } else if (options.ConvertCountersToDeltaGauge) { + if constexpr (std::is_same_v<T, i64>) { + consumer->OnDouble(time, value); + } else { + consumer->OnDouble(time, value.SecondsFloat()); + } } else { // TODO(prime@): RATE is incompatible with windowed read. if constexpr (std::is_same_v<T, i64>) { diff --git a/yt/yt/library/profiling/solomon/cube.h b/yt/yt/library/profiling/solomon/cube.h index 39c638fcc0..061ca24bc1 100644 --- a/yt/yt/library/profiling/solomon/cube.h +++ b/yt/yt/library/profiling/solomon/cube.h @@ -24,6 +24,7 @@ struct TReadOptions std::function<bool(const std::string&)> SensorFilter; bool ConvertCountersToRateGauge = false; + bool ConvertCountersToDeltaGauge = false; bool RenameConvertedCounters = true; double RateDenominator = 1.0; bool EnableHistogramCompat = false; diff --git a/yt/yt/library/profiling/solomon/exporter.cpp b/yt/yt/library/profiling/solomon/exporter.cpp index 913fbcfe55..225f7682ff 100644 --- a/yt/yt/library/profiling/solomon/exporter.cpp +++ b/yt/yt/library/profiling/solomon/exporter.cpp @@ -79,6 +79,8 @@ void TSolomonExporterConfig::Register(TRegistrar registrar) .Default(true); registrar.Parameter("rename_converted_counters", &TThis::RenameConvertedCounters) .Default(true); + registrar.Parameter("convert_counters_to_delta_gauge", &TThis::ConvertCountersToDeltaGauge) + .Default(false); registrar.Parameter("export_summary", &TThis::ExportSummary) .Default(false); @@ -134,6 +136,12 @@ void TSolomonExporterConfig::Register(TRegistrar registrar) }); registrar.Postprocessor([] (TThis* config) { + if (config->ConvertCountersToRateForSolomon && config->ConvertCountersToDeltaGauge) { + THROW_ERROR_EXCEPTION("\"convert_counters_to_rate_for_solomon\" and \"convert_counters_to_delta_gauge\" both set to true"); + } + }); + + registrar.Postprocessor([] (TThis* config) { for (const auto& [name, shard] : config->Shards) { if (!shard->GridStep) { continue; @@ -780,6 +788,9 @@ void TSolomonExporter::DoHandleShard( options.RateDenominator = readGridStep->SecondsFloat(); } } + if (Config_->ConvertCountersToDeltaGauge && outputEncodingContext.IsSolomonPull) { + options.ConvertCountersToDeltaGauge = true; + } options.EnableSolomonAggregationWorkaround = outputEncodingContext.IsSolomonPull; options.Times = readWindow; diff --git a/yt/yt/library/profiling/solomon/exporter.h b/yt/yt/library/profiling/solomon/exporter.h index bd4aa8b7ea..46bb7f37cb 100644 --- a/yt/yt/library/profiling/solomon/exporter.h +++ b/yt/yt/library/profiling/solomon/exporter.h @@ -55,6 +55,7 @@ struct TSolomonExporterConfig bool ConvertCountersToRateForSolomon; bool RenameConvertedCounters; + bool ConvertCountersToDeltaGauge; bool ExportSummary; bool ExportSummaryAsMax; diff --git a/yt/yt/library/ytprof/http/handler.cpp b/yt/yt/library/ytprof/http/handler.cpp index 2dbc631957..7ad74d3171 100644 --- a/yt/yt/library/ytprof/http/handler.cpp +++ b/yt/yt/library/ytprof/http/handler.cpp @@ -180,7 +180,7 @@ public: } private: - tcmalloc::ProfileType ProfileType_; + const tcmalloc::ProfileType ProfileType_; }; class TTCMallocAllocationProfilerHandler @@ -206,7 +206,7 @@ class TTCMallocStatHandler : public IHttpHandler { public: - void HandleRequest(const IRequestPtr& /* req */, const IResponseWriterPtr& rsp) override + void HandleRequest(const IRequestPtr& /*req*/, const IResponseWriterPtr& rsp) override { auto stat = tcmalloc::MallocExtension::GetStats(); rsp->SetStatus(EStatusCode::OK); @@ -258,7 +258,7 @@ class TVersionHandler : public IHttpHandler { public: - void HandleRequest(const IRequestPtr& /* req */, const IResponseWriterPtr& rsp) override + void HandleRequest(const IRequestPtr& /*req*/, const IResponseWriterPtr& rsp) override { rsp->SetStatus(EStatusCode::OK); WaitFor(rsp->WriteBody(TSharedRef::FromString(GetVersion()))) @@ -270,7 +270,7 @@ class TBuildIdHandler : public IHttpHandler { public: - void HandleRequest(const IRequestPtr& /* req */, const IResponseWriterPtr& rsp) override + void HandleRequest(const IRequestPtr& /*req*/, const IResponseWriterPtr& rsp) override { rsp->SetStatus(EStatusCode::OK); WaitFor(rsp->WriteBody(TSharedRef::FromString(GetVersion()))) @@ -291,22 +291,28 @@ void Register( const TString& prefix, const TBuildInfo& buildInfo) { - handlers->Add(prefix + "/profile", New<TCpuProfilerHandler>(buildInfo)); + handlers->Add(prefix + "/cpu/profile", New<TCpuProfilerHandler>(buildInfo)); - handlers->Add(prefix + "/lock", New<TSpinlockProfilerHandler>(buildInfo, false)); - handlers->Add(prefix + "/block", New<TSpinlockProfilerHandler>(buildInfo, true)); + handlers->Add(prefix + "/spinlock/lock", New<TSpinlockProfilerHandler>(buildInfo, false)); + handlers->Add(prefix + "/spinlock/block", New<TSpinlockProfilerHandler>(buildInfo, true)); - handlers->Add(prefix + "/heap", New<TTCMallocSnapshotProfilerHandler>(buildInfo, tcmalloc::ProfileType::kHeap)); - handlers->Add(prefix + "/peak", New<TTCMallocSnapshotProfilerHandler>(buildInfo, tcmalloc::ProfileType::kPeakHeap)); - handlers->Add(prefix + "/fragmentation", New<TTCMallocSnapshotProfilerHandler>(buildInfo, tcmalloc::ProfileType::kFragmentation)); - handlers->Add(prefix + "/allocations", New<TTCMallocAllocationProfilerHandler>(buildInfo)); - - handlers->Add(prefix + "/tcmalloc", New<TTCMallocStatHandler>()); + handlers->Add(prefix + "/tcmalloc/current", New<TTCMallocSnapshotProfilerHandler>(buildInfo, tcmalloc::ProfileType::kHeap)); + handlers->Add(prefix + "/tcmalloc/peak", New<TTCMallocSnapshotProfilerHandler>(buildInfo, tcmalloc::ProfileType::kPeakHeap)); + handlers->Add(prefix + "/tcmalloc/fragmentation", New<TTCMallocSnapshotProfilerHandler>(buildInfo, tcmalloc::ProfileType::kFragmentation)); + handlers->Add(prefix + "/tcmalloc/allocation", New<TTCMallocAllocationProfilerHandler>(buildInfo)); + handlers->Add(prefix + "/tcmalloc/stat", New<TTCMallocStatHandler>()); handlers->Add(prefix + "/binary", New<TBinaryHandler>()); - + handlers->Add(prefix + "/build_id", New<TBuildIdHandler>()); handlers->Add(prefix + "/version", New<TVersionHandler>()); + + // COMPAT(babenko): consider dropping these + handlers->Add(prefix + "/profile", New<TCpuProfilerHandler>(buildInfo)); handlers->Add(prefix + "/buildid", New<TBuildIdHandler>()); + handlers->Add(prefix + "/heap", New<TTCMallocSnapshotProfilerHandler>(buildInfo, tcmalloc::ProfileType::kHeap)); + handlers->Add(prefix + "/peak", New<TTCMallocSnapshotProfilerHandler>(buildInfo, tcmalloc::ProfileType::kPeakHeap)); + handlers->Add(prefix + "/fragmentation", New<TTCMallocSnapshotProfilerHandler>(buildInfo, tcmalloc::ProfileType::kFragmentation)); + handlers->Add(prefix + "/allocations", New<TTCMallocAllocationProfilerHandler>(buildInfo)); } //////////////////////////////////////////////////////////////////////////////// |