aboutsummaryrefslogtreecommitdiffstats
path: root/yt
diff options
context:
space:
mode:
authorAlexander Smirnov <alex@ydb.tech>2024-11-20 11:14:58 +0000
committerAlexander Smirnov <alex@ydb.tech>2024-11-20 11:14:58 +0000
commit31773f157bf8164364649b5f470f52dece0a4317 (patch)
tree33d0f7eef45303ab68cf08ab381ce5e5e36c5240 /yt
parent2c7938962d8689e175574fc1e817c05049f27905 (diff)
parenteff600952d5dfe17942f38f510a8ac2b203bb3a5 (diff)
downloadydb-31773f157bf8164364649b5f470f52dece0a4317.tar.gz
Merge branch 'rightlib' into mergelibs-241120-1113
Diffstat (limited to 'yt')
-rw-r--r--yt/cpp/mapreduce/client/ya.make12
-rw-r--r--yt/cpp/mapreduce/http/abortable_http_response.cpp13
-rw-r--r--yt/cpp/mapreduce/http/abortable_http_response.h3
-rw-r--r--yt/cpp/mapreduce/http/http.cpp340
-rw-r--r--yt/cpp/mapreduce/http/http.h87
-rw-r--r--yt/cpp/mapreduce/http/http_client.cpp16
-rw-r--r--yt/cpp/mapreduce/http/http_client.h2
-rw-r--r--yt/cpp/mapreduce/http/requests.cpp1
-rw-r--r--yt/cpp/mapreduce/http/retry_request.cpp2
-rw-r--r--yt/cpp/mapreduce/http/ut/connection_pool_ut.cpp21
-rw-r--r--yt/cpp/mapreduce/http/ut/http_ut.cpp10
-rw-r--r--yt/cpp/mapreduce/interface/errors.cpp24
-rw-r--r--yt/cpp/mapreduce/interface/errors.h19
-rw-r--r--yt/cpp/mapreduce/raw_client/raw_requests.cpp1
-rw-r--r--yt/yt/client/api/client_common.h36
-rw-r--r--yt/yt/client/api/rpc_proxy/client_impl.cpp12
-rw-r--r--yt/yt/client/api/rpc_proxy/config.cpp4
-rw-r--r--yt/yt/client/api/rpc_proxy/connection_impl.cpp4
-rw-r--r--yt/yt/client/api/rpc_proxy/connection_impl.h2
-rw-r--r--yt/yt/client/api/rpc_proxy/helpers.cpp21
-rw-r--r--yt/yt/client/cache/rpc.cpp6
-rw-r--r--yt/yt/client/cache/rpc.h2
-rw-r--r--yt/yt/client/chaos_client/config.h18
-rw-r--r--yt/yt/client/chaos_client/public.h1
-rw-r--r--yt/yt/client/driver/command-inl.h2
-rw-r--r--yt/yt/client/driver/driver.cpp2
-rw-r--r--yt/yt/client/driver/etc_commands.h2
-rw-r--r--yt/yt/client/driver/queue_commands.cpp34
-rw-r--r--yt/yt/client/driver/table_commands.h6
-rw-r--r--yt/yt/client/federated/client.cpp6
-rw-r--r--yt/yt/client/formats/versioned_writer.h6
-rw-r--r--yt/yt/client/scheduler/public.h1
-rw-r--r--yt/yt/client/table_client/column_sort_schema.cpp4
-rw-r--r--yt/yt/client/table_client/config.cpp2
-rw-r--r--yt/yt/client/table_client/config.h1
-rw-r--r--yt/yt/client/table_client/logical_type.cpp4
-rw-r--r--yt/yt/client/table_client/schema.cpp8
-rw-r--r--yt/yt/client/table_client/versioned_io_options.cpp5
-rw-r--r--yt/yt/client/table_client/wire_protocol.cpp38
-rw-r--r--yt/yt/client/table_client/wire_protocol.h17
-rw-r--r--yt/yt/core/actions/cancelable_context.cpp37
-rw-r--r--yt/yt/core/actions/codicil_guarded_invoker.cpp50
-rw-r--r--yt/yt/core/actions/codicil_guarded_invoker.h19
-rw-r--r--yt/yt/core/actions/invoker_detail.cpp10
-rw-r--r--yt/yt/core/actions/invoker_detail.h1
-rw-r--r--yt/yt/core/actions/signal.h8
-rw-r--r--yt/yt/core/bus/tcp/dispatcher.cpp2
-rw-r--r--yt/yt/core/bus/tcp/dispatcher.h2
-rw-r--r--yt/yt/core/bus/tcp/dispatcher_impl.cpp4
-rw-r--r--yt/yt/core/bus/tcp/dispatcher_impl.h8
-rw-r--r--yt/yt/core/bus/tcp/server.cpp2
-rw-r--r--yt/yt/core/concurrency/action_queue.cpp60
-rw-r--r--yt/yt/core/concurrency/action_queue.h8
-rw-r--r--yt/yt/core/concurrency/delayed_executor.cpp26
-rw-r--r--yt/yt/core/concurrency/fair_share_invoker_pool.cpp9
-rw-r--r--yt/yt/core/concurrency/unittests/async_stream_ut.cpp2
-rw-r--r--yt/yt/core/concurrency/unittests/invoker_pool_ut.cpp4
-rw-r--r--yt/yt/core/concurrency/unittests/scheduler_ut.cpp2
-rw-r--r--yt/yt/core/http/retrying_client.cpp6
-rw-r--r--yt/yt/core/json/json_writer.cpp2
-rw-r--r--yt/yt/core/misc/bit_packed_unsigned_vector-inl.h2
-rw-r--r--yt/yt/core/misc/bit_packed_unsigned_vector.cpp20
-rw-r--r--yt/yt/core/misc/bit_packed_unsigned_vector.h2
-rw-r--r--yt/yt/core/misc/codicil.cpp112
-rw-r--r--yt/yt/core/misc/codicil.h73
-rw-r--r--yt/yt/core/misc/crash_handler.cpp96
-rw-r--r--yt/yt/core/misc/crash_handler.h37
-rw-r--r--yt/yt/core/misc/protobuf_helpers-inl.h6
-rw-r--r--yt/yt/core/misc/unittests/codicil_ut.cpp28
-rw-r--r--yt/yt/core/phoenix/type_def-inl.h17
-rw-r--r--yt/yt/core/phoenix/unittests/phoenix_ut.cpp190
-rw-r--r--yt/yt/core/rpc/client.cpp2
-rw-r--r--yt/yt/core/rpc/grpc/channel.cpp9
-rw-r--r--yt/yt/core/rpc/grpc/channel.h16
-rw-r--r--yt/yt/core/rpc/grpc/helpers.cpp2
-rw-r--r--yt/yt/core/rpc/grpc/helpers.h2
-rw-r--r--yt/yt/core/rpc/grpc/public.h2
-rw-r--r--yt/yt/core/rpc/http/channel.cpp4
-rw-r--r--yt/yt/core/rpc/http/server.cpp2
-rw-r--r--yt/yt/core/rpc/response_keeper.cpp2
-rw-r--r--yt/yt/core/rpc/server_detail.cpp25
-rw-r--r--yt/yt/core/rpc/server_detail.h12
-rw-r--r--yt/yt/core/rpc/service.h6
-rw-r--r--yt/yt/core/rpc/service_detail.cpp56
-rw-r--r--yt/yt/core/rpc/service_detail.h16
-rw-r--r--yt/yt/core/rpc/unittests/lib/test_service.cpp4
-rw-r--r--yt/yt/core/rpc/unittests/mock/service.h4
-rw-r--r--yt/yt/core/tracing/trace_context.h2
-rw-r--r--yt/yt/core/ya.make2
-rw-r--r--yt/yt/core/yson/unittests/yson_writer_ut.cpp18
-rw-r--r--yt/yt/core/ytree/ephemeral_node_factory.cpp2
-rw-r--r--yt/yt/core/ytree/node_detail.cpp2
-rw-r--r--yt/yt/core/ytree/ypath_client.cpp2
-rw-r--r--yt/yt/core/ytree/ypath_detail.cpp2
-rw-r--r--yt/yt/core/ytree/ypath_detail.h5
-rw-r--r--yt/yt/library/column_converters/string_column_converter.cpp8
-rw-r--r--yt/yt/library/formats/yamr_parser_base.h10
-rw-r--r--yt/yt/library/formats/yson_map_to_unversioned_value.h40
-rw-r--r--yt/yt/library/oom/oom.h2
-rw-r--r--yt/yt/library/oom/tcmalloc_memory_limit_handler.cpp6
-rw-r--r--yt/yt/library/process/unittests/process_ut.cpp3
-rw-r--r--yt/yt/library/profiling/solomon/cube.cpp8
-rw-r--r--yt/yt/library/profiling/solomon/cube.h1
-rw-r--r--yt/yt/library/profiling/solomon/exporter.cpp11
-rw-r--r--yt/yt/library/profiling/solomon/exporter.h1
-rw-r--r--yt/yt/library/ytprof/http/handler.cpp34
106 files changed, 1295 insertions, 668 deletions
diff --git a/yt/cpp/mapreduce/client/ya.make b/yt/cpp/mapreduce/client/ya.make
index 2d118b2442..599bbdc92f 100644
--- a/yt/cpp/mapreduce/client/ya.make
+++ b/yt/cpp/mapreduce/client/ya.make
@@ -55,9 +55,15 @@ IF (BUILD_TYPE == "PROFILE")
yt/yt/library/ytprof
)
- SRCS(
- job_profiler.cpp
- )
+ IF (OPENSOURCE)
+ SRCS(
+ dummy_job_profiler.cpp
+ )
+ ELSE()
+ SRCS(
+ job_profiler.cpp
+ )
+ ENDIF()
ELSE()
SRCS(
dummy_job_profiler.cpp
diff --git a/yt/cpp/mapreduce/http/abortable_http_response.cpp b/yt/cpp/mapreduce/http/abortable_http_response.cpp
index 9da9241d33..995bb9de4c 100644
--- a/yt/cpp/mapreduce/http/abortable_http_response.cpp
+++ b/yt/cpp/mapreduce/http/abortable_http_response.cpp
@@ -14,20 +14,20 @@ public:
{
auto g = Guard(Lock_);
auto id = NextId_++;
- IdToOutage.emplace(id, TOutageEntry{std::move(urlPattern), options.ResponseCount_, options.LengthLimit_});
+ IdToOutage_.emplace(id, TOutageEntry{std::move(urlPattern), options.ResponseCount_, options.LengthLimit_});
return id;
}
void StopOutage(TOutageId id)
{
auto g = Guard(Lock_);
- IdToOutage.erase(id);
+ IdToOutage_.erase(id);
}
void Add(IAbortableHttpResponse* response)
{
auto g = Guard(Lock_);
- for (auto& [id, entry] : IdToOutage) {
+ for (auto& [id, entry] : IdToOutage_) {
if (entry.Counter > 0 && response->GetUrl().find(entry.Pattern) != TString::npos) {
response->SetLengthLimit(entry.LengthLimit);
entry.Counter -= 1;
@@ -70,7 +70,7 @@ private:
private:
TOutageId NextId_ = 0;
TIntrusiveList<IAbortableHttpResponse> ResponseList_;
- THashMap<TOutageId, TOutageEntry> IdToOutage;
+ THashMap<TOutageId, TOutageEntry> IdToOutage_;
TMutex Lock_;
};
@@ -137,11 +137,10 @@ bool TAbortableHttpResponseBase::IsAborted() const
////////////////////////////////////////////////////////////////////////////////
TAbortableHttpResponse::TAbortableHttpResponse(
+ TRequestContext context,
IInputStream* socketStream,
- const TString& requestId,
- const TString& hostName,
const TString& url)
- : THttpResponse(socketStream, requestId, hostName)
+ : THttpResponse(std::move(context), socketStream)
, TAbortableHttpResponseBase(url)
{
}
diff --git a/yt/cpp/mapreduce/http/abortable_http_response.h b/yt/cpp/mapreduce/http/abortable_http_response.h
index d72bcfa0a6..e9b1483bf7 100644
--- a/yt/cpp/mapreduce/http/abortable_http_response.h
+++ b/yt/cpp/mapreduce/http/abortable_http_response.h
@@ -108,9 +108,8 @@ public:
public:
TAbortableHttpResponse(
+ TRequestContext context,
IInputStream* socketStream,
- const TString& requestId,
- const TString& hostName,
const TString& url);
/// @brief Abort any responses which match `urlPattern` (i.e. contain it in url).
diff --git a/yt/cpp/mapreduce/http/http.cpp b/yt/cpp/mapreduce/http/http.cpp
index f9eb8539b5..ca243a929a 100644
--- a/yt/cpp/mapreduce/http/http.cpp
+++ b/yt/cpp/mapreduce/http/http.cpp
@@ -10,6 +10,7 @@
#include <yt/cpp/mapreduce/interface/config.h>
#include <yt/cpp/mapreduce/interface/errors.h>
+#include <yt/cpp/mapreduce/interface/error_codes.h>
#include <yt/cpp/mapreduce/interface/logging/yt_log.h>
#include <yt/yt/core/http/http.h>
@@ -39,6 +40,27 @@ namespace NYT {
////////////////////////////////////////////////////////////////////////////////
+std::exception_ptr WrapSystemError(
+ const TRequestContext& context,
+ const std::exception& ex)
+{
+ if (auto errorResponse = dynamic_cast<const TErrorResponse*>(&ex); errorResponse != nullptr) {
+ return std::make_exception_ptr(errorResponse);
+ }
+
+ auto message = NYT::Format("Request %qv to %qv failed", context.RequestId, context.HostName + context.Method);
+ TYtError outer(1, message, {TYtError(NClusterErrorCodes::Generic, ex.what())}, {
+ {"request_id", context.RequestId},
+ {"host", context.HostName},
+ {"method", context.Method},
+ });
+ TTransportError errorResponse(std::move(outer));
+
+ return std::make_exception_ptr(errorResponse);
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
class THttpRequest::TRequestStream
: public IOutputStream
{
@@ -92,17 +114,17 @@ private:
CheckErrorState();
try {
func();
- } catch (const std::exception&) {
- HandleWriteException();
+ } catch (const std::exception& ex) {
+ HandleWriteException(ex);
}
}
// In many cases http proxy stops reading request and resets connection
// if error has happend. This function tries to read error response
// in such cases.
- void HandleWriteException() {
+ void HandleWriteException(const std::exception& ex) {
Y_ABORT_UNLESS(WriteError_ == nullptr);
- WriteError_ = std::current_exception();
+ WriteError_ = WrapSystemError(HttpRequest_->Context_, ex);
Y_ABORT_UNLESS(WriteError_ != nullptr);
try {
HttpRequest_->GetResponseStream();
@@ -130,16 +152,16 @@ private:
////////////////////////////////////////////////////////////////////////////////
THttpHeader::THttpHeader(const TString& method, const TString& command, bool isApi)
- : Method(method)
- , Command(command)
- , IsApi(isApi)
+ : Method_(method)
+ , Command_(command)
+ , IsApi_(isApi)
{ }
void THttpHeader::AddParameter(const TString& key, TNode value, bool overwrite)
{
- auto it = Parameters.find(key);
- if (it == Parameters.end()) {
- Parameters.emplace(key, std::move(value));
+ auto it = Parameters_.find(key);
+ if (it == Parameters_.end()) {
+ Parameters_.emplace(key, std::move(value));
} else {
if (overwrite) {
it->second = std::move(value);
@@ -158,12 +180,12 @@ void THttpHeader::MergeParameters(const TNode& newParameters, bool overwrite)
void THttpHeader::RemoveParameter(const TString& key)
{
- Parameters.erase(key);
+ Parameters_.erase(key);
}
TNode THttpHeader::GetParameters() const
{
- return Parameters;
+ return Parameters_;
}
void THttpHeader::AddTransactionId(const TTransactionId& transactionId, bool overwrite)
@@ -202,81 +224,81 @@ void THttpHeader::AddMutationId()
bool THttpHeader::HasMutationId() const
{
- return Parameters.contains("mutation_id");
+ return Parameters_.contains("mutation_id");
}
void THttpHeader::SetToken(const TString& token)
{
- Token = token;
+ Token_ = token;
}
void THttpHeader::SetProxyAddress(const TString& proxyAddress)
{
- ProxyAddress = proxyAddress;
+ ProxyAddress_ = proxyAddress;
}
void THttpHeader::SetHostPort(const TString& hostPort)
{
- HostPort = hostPort;
+ HostPort_ = hostPort;
}
void THttpHeader::SetImpersonationUser(const TString& impersonationUser)
{
- ImpersonationUser = impersonationUser;
+ ImpersonationUser_ = impersonationUser;
}
void THttpHeader::SetServiceTicket(const TString& ticket)
{
- ServiceTicket = ticket;
+ ServiceTicket_ = ticket;
}
void THttpHeader::SetInputFormat(const TMaybe<TFormat>& format)
{
- InputFormat = format;
+ InputFormat_ = format;
}
void THttpHeader::SetOutputFormat(const TMaybe<TFormat>& format)
{
- OutputFormat = format;
+ OutputFormat_ = format;
}
TMaybe<TFormat> THttpHeader::GetOutputFormat() const
{
- return OutputFormat;
+ return OutputFormat_;
}
void THttpHeader::SetRequestCompression(const TString& compression)
{
- RequestCompression = compression;
+ RequestCompression_ = compression;
}
void THttpHeader::SetResponseCompression(const TString& compression)
{
- ResponseCompression = compression;
+ ResponseCompression_ = compression;
}
TString THttpHeader::GetCommand() const
{
- return Command;
+ return Command_;
}
TString THttpHeader::GetUrl(bool needProxy) const
{
TStringStream url;
- if (needProxy && !ProxyAddress.empty()) {
- url << ProxyAddress << "/";
+ if (needProxy && !ProxyAddress_.empty()) {
+ url << ProxyAddress_ << "/";
return url.Str();
}
- if (!ProxyAddress.empty()) {
- url << HostPort;
+ if (!ProxyAddress_.empty()) {
+ url << HostPort_;
}
- if (IsApi) {
- url << "/api/" << TConfig::Get()->ApiVersion << "/" << Command;
+ if (IsApi_) {
+ url << "/api/" << TConfig::Get()->ApiVersion << "/" << Command_;
} else {
- url << "/" << Command;
+ url << "/" << Command_;
}
return url.Str();
@@ -284,16 +306,16 @@ TString THttpHeader::GetUrl(bool needProxy) const
bool THttpHeader::ShouldAcceptFraming() const
{
- return TConfig::Get()->CommandsWithFraming.contains(Command);
+ return TConfig::Get()->CommandsWithFraming.contains(Command_);
}
TString THttpHeader::GetHeaderAsString(const TString& hostName, const TString& requestId, bool includeParameters) const
{
TStringStream result;
- result << Method << " " << GetUrl() << " HTTP/1.1\r\n";
+ result << Method_ << " " << GetUrl() << " HTTP/1.1\r\n";
- GetHeader(HostPort.empty() ? hostName : HostPort, requestId, includeParameters).Get()->WriteTo(&result);
+ GetHeader(HostPort_.empty() ? hostName : HostPort_, requestId, includeParameters).Get()->WriteTo(&result);
if (ShouldAcceptFraming()) {
result << "X-YT-Accept-Framing: 1\r\n";
@@ -311,25 +333,25 @@ NHttp::THeadersPtrWrapper THttpHeader::GetHeader(const TString& hostName, const
headers->Add("Host", hostName);
headers->Add("User-Agent", TProcessState::Get()->ClientVersion);
- if (!Token.empty()) {
- headers->Add("Authorization", "OAuth " + Token);
+ if (!Token_.empty()) {
+ headers->Add("Authorization", "OAuth " + Token_);
}
- if (!ServiceTicket.empty()) {
- headers->Add("X-Ya-Service-Ticket", ServiceTicket);
+ if (!ServiceTicket_.empty()) {
+ headers->Add("X-Ya-Service-Ticket", ServiceTicket_);
}
- if (!ImpersonationUser.empty()) {
- headers->Add("X-Yt-User-Name", ImpersonationUser);
+ if (!ImpersonationUser_.empty()) {
+ headers->Add("X-Yt-User-Name", ImpersonationUser_);
}
- if (Method == "PUT" || Method == "POST") {
+ if (Method_ == "PUT" || Method_ == "POST") {
headers->Add("Transfer-Encoding", "chunked");
}
headers->Add("X-YT-Correlation-Id", requestId);
headers->Add("X-YT-Header-Format", "<format=text>yson");
- headers->Add("Content-Encoding", RequestCompression);
- headers->Add("Accept-Encoding", ResponseCompression);
+ headers->Add("Content-Encoding", RequestCompression_);
+ headers->Add("Accept-Encoding", ResponseCompression_);
auto printYTHeader = [&headers] (const char* headerName, const TString& value) {
static const size_t maxHttpHeaderSize = 64 << 10;
@@ -353,14 +375,14 @@ NHttp::THeadersPtrWrapper THttpHeader::GetHeader(const TString& hostName, const
} while (ptr != finish);
};
- if (InputFormat) {
- printYTHeader("X-YT-Input-Format", NodeToYsonString(InputFormat->Config));
+ if (InputFormat_) {
+ printYTHeader("X-YT-Input-Format", NodeToYsonString(InputFormat_->Config));
}
- if (OutputFormat) {
- printYTHeader("X-YT-Output-Format", NodeToYsonString(OutputFormat->Config));
+ if (OutputFormat_) {
+ printYTHeader("X-YT-Output-Format", NodeToYsonString(OutputFormat_->Config));
}
if (includeParameters) {
- printYTHeader("X-YT-Parameters", NodeToYsonString(Parameters));
+ printYTHeader("X-YT-Parameters", NodeToYsonString(Parameters_));
}
return NHttp::THeadersPtrWrapper(std::move(headers));
@@ -368,7 +390,7 @@ NHttp::THeadersPtrWrapper THttpHeader::GetHeader(const TString& hostName, const
const TString& THttpHeader::GetMethod() const
{
- return Method;
+ return Method_;
}
////////////////////////////////////////////////////////////////////////////////
@@ -667,33 +689,81 @@ SOCKET TConnectionPool::DoConnect(TAddressCache::TAddressPtr address)
////////////////////////////////////////////////////////////////////////////////
-static TMaybe<TString> GetProxyName(const THttpInput& input)
+class THttpResponse::THttpInputWrapped
+ : public IInputStream
{
- if (auto proxyHeader = input.Headers().FindHeader("X-YT-Proxy")) {
- return proxyHeader->Value();
+public:
+ explicit THttpInputWrapped(TRequestContext context, IInputStream* input)
+ : Context_(std::move(context))
+ , HttpInput_(input)
+ { }
+
+ const THttpHeaders& Headers() const noexcept
+ {
+ return HttpInput_.Headers();
+ }
+
+ const TString& FirstLine() const noexcept
+ {
+ return HttpInput_.FirstLine();
+ }
+
+ bool IsKeepAlive() const noexcept
+ {
+ return HttpInput_.IsKeepAlive();
+ }
+
+ const TMaybe<THttpHeaders>& Trailers() const noexcept
+ {
+ return HttpInput_.Trailers();
}
- return Nothing();
-}
+
+private:
+ size_t DoRead(void* buf, size_t len) override
+ {
+ try {
+ return HttpInput_.Read(buf, len);
+ } catch (const std::exception& ex) {
+ auto wrapped = WrapSystemError(Context_, ex);
+ std::rethrow_exception(wrapped);
+ }
+ }
+
+ size_t DoSkip(size_t len) override
+ {
+ try {
+ return HttpInput_.Skip(len);
+ } catch (const std::exception& ex) {
+ auto wrapped = WrapSystemError(Context_, ex);
+ std::rethrow_exception(wrapped);
+ }
+ }
+
+private:
+ const TRequestContext Context_;
+ THttpInput HttpInput_;
+};
THttpResponse::THttpResponse(
- IInputStream* socketStream,
- const TString& requestId,
- const TString& hostName)
- : HttpInput_(socketStream)
- , RequestId_(requestId)
- , HostName_(GetProxyName(HttpInput_).GetOrElse(hostName))
- , Unframe_(HttpInput_.Headers().HasHeader("X-YT-Framing"))
-{
- HttpCode_ = ParseHttpRetCode(HttpInput_.FirstLine());
+ TRequestContext context,
+ IInputStream* socketStream)
+ : HttpInput_(MakeHolder<THttpInputWrapped>(context, socketStream))
+ , Unframe_(HttpInput_->Headers().HasHeader("X-YT-Framing"))
+ , Context_(std::move(context))
+{
+ if (auto proxyHeader = HttpInput_->Headers().FindHeader("X-YT-Proxy")) {
+ Context_.HostName = proxyHeader->Value();
+ }
+ HttpCode_ = ParseHttpRetCode(HttpInput_->FirstLine());
if (HttpCode_ == 200 || HttpCode_ == 202) {
return;
}
- ErrorResponse_ = TErrorResponse(HttpCode_, RequestId_);
+ ErrorResponse_ = TErrorResponse(HttpCode_, Context_.RequestId);
auto logAndSetError = [&] (const TString& rawError) {
YT_LOG_ERROR("RSP %v - HTTP %v - %v",
- RequestId_,
+ Context_.RequestId,
HttpCode_,
rawError.data());
ErrorResponse_->SetRawError(rawError);
@@ -705,26 +775,26 @@ THttpResponse::THttpResponse(
break;
case 500:
- logAndSetError(::TStringBuilder() << "internal error in proxy " << HostName_);
+ logAndSetError(::TStringBuilder() << "internal error in proxy " << Context_.HostName);
break;
default: {
TStringStream httpHeaders;
httpHeaders << "HTTP headers (";
- for (const auto& header : HttpInput_.Headers()) {
+ for (const auto& header : HttpInput_->Headers()) {
httpHeaders << header.Name() << ": " << header.Value() << "; ";
}
httpHeaders << ")";
auto errorString = Sprintf("RSP %s - HTTP %d - %s",
- RequestId_.data(),
+ Context_.RequestId.data(),
HttpCode_,
httpHeaders.Str().data());
YT_LOG_ERROR("%v",
errorString.data());
- if (auto parsedResponse = ParseError(HttpInput_.Headers())) {
+ if (auto parsedResponse = ParseError(HttpInput_->Headers())) {
ErrorResponse_ = parsedResponse.GetRef();
} else {
ErrorResponse_->SetRawError(
@@ -735,9 +805,12 @@ THttpResponse::THttpResponse(
}
}
+THttpResponse::~THttpResponse()
+{ }
+
const THttpHeaders& THttpResponse::Headers() const
{
- return HttpInput_.Headers();
+ return HttpInput_->Headers();
}
void THttpResponse::CheckErrorResponse() const
@@ -759,19 +832,19 @@ int THttpResponse::GetHttpCode() const
const TString& THttpResponse::GetHostName() const
{
- return HostName_;
+ return Context_.HostName;
}
bool THttpResponse::IsKeepAlive() const
{
- return HttpInput_.IsKeepAlive();
+ return HttpInput_->IsKeepAlive();
}
TMaybe<TErrorResponse> THttpResponse::ParseError(const THttpHeaders& headers)
{
for (const auto& header : headers) {
if (header.Name() == "X-YT-Error") {
- TErrorResponse errorResponse(HttpCode_, RequestId_);
+ TErrorResponse errorResponse(HttpCode_, Context_.RequestId);
errorResponse.ParseFromJsonError(header.Value());
if (errorResponse.IsOk()) {
return Nothing();
@@ -788,14 +861,14 @@ size_t THttpResponse::DoRead(void* buf, size_t len)
if (Unframe_) {
read = UnframeRead(buf, len);
} else {
- read = HttpInput_.Read(buf, len);
+ read = HttpInput_->Read(buf, len);
}
if (read == 0 && len != 0) {
// THttpInput MUST return defined (but may be empty)
// trailers when it is exhausted.
- Y_ABORT_UNLESS(HttpInput_.Trailers().Defined(),
+ Y_ABORT_UNLESS(HttpInput_->Trailers().Defined(),
"trailers MUST be defined for exhausted stream");
- CheckTrailers(HttpInput_.Trailers().GetRef());
+ CheckTrailers(HttpInput_->Trailers().GetRef());
IsExhausted_ = true;
}
return read;
@@ -807,14 +880,14 @@ size_t THttpResponse::DoSkip(size_t len)
if (Unframe_) {
skipped = UnframeSkip(len);
} else {
- skipped = HttpInput_.Skip(len);
+ skipped = HttpInput_->Skip(len);
}
if (skipped == 0 && len != 0) {
// THttpInput MUST return defined (but may be empty)
// trailers when it is exhausted.
- Y_ABORT_UNLESS(HttpInput_.Trailers().Defined(),
+ Y_ABORT_UNLESS(HttpInput_->Trailers().Defined(),
"trailers MUST be defined for exhausted stream");
- CheckTrailers(HttpInput_.Trailers().GetRef());
+ CheckTrailers(HttpInput_->Trailers().GetRef());
IsExhausted_ = true;
}
return skipped;
@@ -825,13 +898,13 @@ void THttpResponse::CheckTrailers(const THttpHeaders& trailers)
if (auto errorResponse = ParseError(trailers)) {
errorResponse->SetIsFromTrailers(true);
YT_LOG_ERROR("RSP %v - %v",
- RequestId_,
+ Context_.RequestId,
errorResponse.GetRef().what());
ythrow errorResponse.GetRef();
}
}
-static ui32 ReadDataFrameSize(THttpInput* stream)
+static ui32 ReadDataFrameSize(IInputStream* stream)
{
ui32 littleEndianSize;
auto read = stream->Load(&littleEndianSize, sizeof(littleEndianSize));
@@ -846,7 +919,7 @@ bool THttpResponse::RefreshFrameIfNecessary()
{
while (RemainingFrameSize_ == 0) {
ui8 frameTypeByte;
- auto read = HttpInput_.Read(&frameTypeByte, sizeof(frameTypeByte));
+ auto read = HttpInput_->Read(&frameTypeByte, sizeof(frameTypeByte));
if (read == 0) {
return false;
}
@@ -855,7 +928,7 @@ bool THttpResponse::RefreshFrameIfNecessary()
case EFrameType::KeepAlive:
break;
case EFrameType::Data:
- RemainingFrameSize_ = ReadDataFrameSize(&HttpInput_);
+ RemainingFrameSize_ = ReadDataFrameSize(HttpInput_.Get());
break;
default:
ythrow yexception() << "Bad frame type " << static_cast<int>(frameTypeByte);
@@ -869,7 +942,7 @@ size_t THttpResponse::UnframeRead(void* buf, size_t len)
if (!RefreshFrameIfNecessary()) {
return 0;
}
- auto read = HttpInput_.Read(buf, Min(len, RemainingFrameSize_));
+ auto read = HttpInput_->Read(buf, Min(len, RemainingFrameSize_));
RemainingFrameSize_ -= read;
return read;
}
@@ -879,80 +952,83 @@ size_t THttpResponse::UnframeSkip(size_t len)
if (!RefreshFrameIfNecessary()) {
return 0;
}
- auto skipped = HttpInput_.Skip(Min(len, RemainingFrameSize_));
+ auto skipped = HttpInput_->Skip(Min(len, RemainingFrameSize_));
RemainingFrameSize_ -= skipped;
return skipped;
}
////////////////////////////////////////////////////////////////////////////////
-THttpRequest::THttpRequest()
-{
- RequestId = CreateGuidAsString();
-}
-
-THttpRequest::THttpRequest(const TString& requestId)
- : RequestId(requestId)
+THttpRequest::THttpRequest(TString requestId, TString hostName, THttpHeader header, TDuration socketTimeout)
+ : Context_(TRequestContext{
+ .RequestId = std::move(requestId),
+ .HostName = std::move(hostName),
+ .Method = header.GetUrl(/*needProxy=*/ false)
+ })
+ , Header_(std::move(header))
+ , Url_(Header_.GetUrl(true))
+ , SocketTimeout_(socketTimeout)
{ }
THttpRequest::~THttpRequest()
{
- if (!Connection) {
+ if (!Connection_) {
return;
}
- if (Input && Input->IsKeepAlive() && Input->IsExhausted()) {
+ if (Input_ && Input_->IsKeepAlive() && Input_->IsExhausted()) {
// We should return to the pool only connections where HTTP response was fully read.
// Otherwise next reader might read our remaining data and misinterpret them (YT-6510).
- TConnectionPool::Get()->Release(Connection);
+ TConnectionPool::Get()->Release(Connection_);
} else {
- TConnectionPool::Get()->Invalidate(HostName, Connection);
+ TConnectionPool::Get()->Invalidate(Context_.HostName, Connection_);
}
}
TString THttpRequest::GetRequestId() const
{
- return RequestId;
+ return Context_.RequestId;
}
-void THttpRequest::Connect(TString hostName, TDuration socketTimeout)
+IOutputStream* THttpRequest::StartRequestImpl(bool includeParameters)
{
- HostName = std::move(hostName);
YT_LOG_DEBUG("REQ %v - requesting connection to %v from connection pool",
- RequestId,
- HostName);
+ Context_.RequestId,
+ Context_.HostName);
StartTime_ = TInstant::Now();
- Connection = TConnectionPool::Get()->Connect(HostName, socketTimeout);
+
+ try {
+ Connection_ = TConnectionPool::Get()->Connect(Context_.HostName, SocketTimeout_);
+ } catch (const std::exception& ex) {
+ auto wrapped = WrapSystemError(Context_, ex);
+ std::rethrow_exception(wrapped);
+ }
YT_LOG_DEBUG("REQ %v - connection #%v",
- RequestId,
- Connection->Id);
-}
+ Context_.RequestId,
+ Connection_->Id);
-IOutputStream* THttpRequest::StartRequestImpl(const THttpHeader& header, bool includeParameters)
-{
- auto strHeader = header.GetHeaderAsString(HostName, RequestId, includeParameters);
- Url_ = header.GetUrl(true);
+ auto strHeader = Header_.GetHeaderAsString(Context_.HostName, Context_.RequestId, includeParameters);
- LogRequest(header, Url_, includeParameters, RequestId, HostName);
+ LogRequest(Header_, Url_, includeParameters, Context_.RequestId, Context_.HostName);
- LoggedAttributes_ = GetLoggedAttributes(header, Url_, includeParameters, 128);
+ LoggedAttributes_ = GetLoggedAttributes(Header_, Url_, includeParameters, 128);
- auto outputFormat = header.GetOutputFormat();
+ auto outputFormat = Header_.GetOutputFormat();
if (outputFormat && outputFormat->IsTextYson()) {
- LogResponse = true;
+ LogResponse_ = true;
}
- RequestStream_ = MakeHolder<TRequestStream>(this, *Connection->Socket.Get());
+ RequestStream_ = MakeHolder<TRequestStream>(this, *Connection_->Socket.Get());
RequestStream_->Write(strHeader.data(), strHeader.size());
return RequestStream_.Get();
}
-IOutputStream* THttpRequest::StartRequest(const THttpHeader& header)
+IOutputStream* THttpRequest::StartRequest()
{
- return StartRequestImpl(header, true);
+ return StartRequestImpl(true);
}
void THttpRequest::FinishRequest()
@@ -961,16 +1037,16 @@ void THttpRequest::FinishRequest()
RequestStream_->Finish();
}
-void THttpRequest::SmallRequest(const THttpHeader& header, TMaybe<TStringBuf> body)
+void THttpRequest::SmallRequest(TMaybe<TStringBuf> body)
{
- if (!body && (header.GetMethod() == "PUT" || header.GetMethod() == "POST")) {
- const auto& parameters = header.GetParameters();
+ if (!body && (Header_.GetMethod() == "PUT" || Header_.GetMethod() == "POST")) {
+ const auto& parameters = Header_.GetParameters();
auto parametersStr = NodeToYsonString(parameters);
- auto* output = StartRequestImpl(header, false);
+ auto* output = StartRequestImpl(false);
output->Write(parametersStr);
FinishRequest();
} else {
- auto* output = StartRequest(header);
+ auto* output = StartRequest();
if (body) {
output->Write(*body);
}
@@ -980,17 +1056,17 @@ void THttpRequest::SmallRequest(const THttpHeader& header, TMaybe<TStringBuf> bo
THttpResponse* THttpRequest::GetResponseStream()
{
- if (!Input) {
- SocketInput.Reset(new TSocketInput(*Connection->Socket.Get()));
+ if (!Input_) {
+ SocketInput_.Reset(new TSocketInput(*Connection_->Socket.Get()));
if (TConfig::Get()->UseAbortableResponse) {
Y_ABORT_UNLESS(!Url_.empty());
- Input.Reset(new TAbortableHttpResponse(SocketInput.Get(), RequestId, HostName, Url_));
+ Input_.Reset(new TAbortableHttpResponse(Context_, SocketInput_.Get(), Url_));
} else {
- Input.Reset(new THttpResponse(SocketInput.Get(), RequestId, HostName));
+ Input_.Reset(new THttpResponse(Context_, SocketInput_.Get()));
}
- Input->CheckErrorResponse();
+ Input_->CheckErrorResponse();
}
- return Input.Get();
+ return Input_.Get();
}
TString THttpRequest::GetResponse()
@@ -1003,15 +1079,15 @@ TString THttpRequest::GetResponse()
<< "HostName: " << GetResponseStream()->GetHostName() << "; "
<< LoggedAttributes_;
- if (LogResponse) {
+ if (LogResponse_) {
constexpr auto sizeLimit = 1 << 7;
YT_LOG_DEBUG("RSP %v - received response (Response: '%v'; %v)",
- RequestId,
+ Context_.RequestId,
TruncateForLogs(result, sizeLimit),
loggedAttributes.Str());
} else {
YT_LOG_DEBUG("RSP %v - received response of %v bytes (%v)",
- RequestId,
+ Context_.RequestId,
result.size(),
loggedAttributes.Str());
}
@@ -1024,8 +1100,8 @@ int THttpRequest::GetHttpCode() {
void THttpRequest::InvalidateConnection()
{
- TConnectionPool::Get()->Invalidate(HostName, Connection);
- Connection.Reset();
+ TConnectionPool::Get()->Invalidate(Context_.HostName, Connection_);
+ Connection_.Reset();
}
////////////////////////////////////////////////////////////////////////////////
diff --git a/yt/cpp/mapreduce/http/http.h b/yt/cpp/mapreduce/http/http.h
index 95595959ad..618b1e2c22 100644
--- a/yt/cpp/mapreduce/http/http.h
+++ b/yt/cpp/mapreduce/http/http.h
@@ -39,6 +39,12 @@ enum class EFrameType
KeepAlive = 0x02,
};
+struct TRequestContext
+{
+ TString RequestId;
+ TString HostName;
+ TString Method;
+};
class THttpHeader
{
@@ -82,24 +88,23 @@ private:
bool ShouldAcceptFraming() const;
private:
- const TString Method;
- const TString Command;
- const bool IsApi;
-
- TNode::TMapType Parameters;
- TString ImpersonationUser;
- TString Token;
- TString ServiceTicket;
- TNode Attributes;
- TString ProxyAddress;
- TString HostPort;
-
-private:
- TMaybe<TFormat> InputFormat = TFormat::YsonText();
- TMaybe<TFormat> OutputFormat = TFormat::YsonText();
-
- TString RequestCompression = "identity";
- TString ResponseCompression = "identity";
+ const TString Method_;
+ const TString Command_;
+ const bool IsApi_;
+
+ TNode::TMapType Parameters_;
+ TString ImpersonationUser_;
+ TString Token_;
+ TString ServiceTicket_;
+ TNode Attributes_;
+ TString ProxyAddress_;
+ TString HostPort_;
+
+ TMaybe<TFormat> InputFormat_ = TFormat::YsonText();
+ TMaybe<TFormat> OutputFormat_ = TFormat::YsonText();
+
+ TString RequestCompression_ = "identity";
+ TString ResponseCompression_ = "identity";
};
////////////////////////////////////////////////////////////////////////////////
@@ -172,9 +177,10 @@ public:
// 'requestId' and 'hostName' are provided for debug reasons
// (they will appear in some error messages).
THttpResponse(
- IInputStream* socketStream,
- const TString& requestId,
- const TString& hostName);
+ TRequestContext context,
+ IInputStream* socketStream);
+
+ ~THttpResponse();
const THttpHeaders& Headers() const;
@@ -196,13 +202,17 @@ private:
bool RefreshFrameIfNecessary();
private:
- THttpInput HttpInput_;
- const TString RequestId_;
- const TString HostName_;
+ class THttpInputWrapped;
+
+private:
+ THolder<THttpInputWrapped> HttpInput_;
+
+ const bool Unframe_;
+
+ TRequestContext Context_;
int HttpCode_ = 0;
TMaybe<TErrorResponse> ErrorResponse_;
bool IsExhausted_ = false;
- const bool Unframe_;
size_t RemainingFrameSize_ = 0;
};
@@ -211,18 +221,15 @@ private:
class THttpRequest
{
public:
- THttpRequest();
- THttpRequest(const TString& requestId);
+ THttpRequest(TString requestId, TString hostName, THttpHeader header, TDuration socketTimeout);
~THttpRequest();
TString GetRequestId() const;
- void Connect(TString hostName, TDuration socketTimeout = TDuration::Zero());
-
- IOutputStream* StartRequest(const THttpHeader& header);
+ IOutputStream* StartRequest();
void FinishRequest();
- void SmallRequest(const THttpHeader& header, TMaybe<TStringBuf> body);
+ void SmallRequest(TMaybe<TStringBuf> body);
THttpResponse* GetResponseStream();
@@ -233,26 +240,28 @@ public:
int GetHttpCode();
private:
- IOutputStream* StartRequestImpl(const THttpHeader& header, bool includeParameters);
+ IOutputStream* StartRequestImpl(bool includeParameters);
private:
class TRequestStream;
private:
- TString HostName;
- TString RequestId;
- TString Url_;
+ const TRequestContext Context_;
+ const THttpHeader Header_;
+ const TString Url_;
+ const TDuration SocketTimeout_;
+
TInstant StartTime_;
TString LoggedAttributes_;
- TConnectionPtr Connection;
+ TConnectionPtr Connection_;
THolder<TRequestStream> RequestStream_;
- THolder<TSocketInput> SocketInput;
- THolder<THttpResponse> Input;
+ THolder<TSocketInput> SocketInput_;
+ THolder<THttpResponse> Input_;
- bool LogResponse = false;
+ bool LogResponse_ = false;
};
////////////////////////////////////////////////////////////////////////////////
diff --git a/yt/cpp/mapreduce/http/http_client.cpp b/yt/cpp/mapreduce/http/http_client.cpp
index 305d95b06c..7e9d761c3c 100644
--- a/yt/cpp/mapreduce/http/http_client.cpp
+++ b/yt/cpp/mapreduce/http/http_client.cpp
@@ -167,23 +167,23 @@ class TDefaultHttpClient
public:
IHttpResponsePtr Request(const TString& url, const TString& requestId, const THttpConfig& config, const THttpHeader& header, TMaybe<TStringBuf> body) override
{
- auto request = std::make_unique<THttpRequest>(requestId);
-
auto urlRef = NHttp::ParseUrl(url);
+ auto host = CreateHost(urlRef.Host, urlRef.PortStr);
+
+ auto request = std::make_unique<THttpRequest>(requestId, host, header, config.SocketTimeout);
- request->Connect(CreateHost(urlRef.Host, urlRef.PortStr), config.SocketTimeout);
- request->SmallRequest(header, body);
+ request->SmallRequest(body);
return std::make_unique<TDefaultHttpResponse>(std::move(request));
}
IHttpRequestPtr StartRequest(const TString& url, const TString& requestId, const THttpConfig& config, const THttpHeader& header) override
{
- auto request = std::make_unique<THttpRequest>(requestId);
-
auto urlRef = NHttp::ParseUrl(url);
+ auto host = CreateHost(urlRef.Host, urlRef.PortStr);
+
+ auto request = std::make_unique<THttpRequest>(requestId, host, header, config.SocketTimeout);
- request->Connect(CreateHost(urlRef.Host, urlRef.PortStr), config.SocketTimeout);
- auto stream = request->StartRequest(header);
+ auto stream = request->StartRequest();
return std::make_unique<TDefaultHttpRequest>(std::move(request), stream);
}
};
diff --git a/yt/cpp/mapreduce/http/http_client.h b/yt/cpp/mapreduce/http/http_client.h
index 97321c4c9d..6087eca098 100644
--- a/yt/cpp/mapreduce/http/http_client.h
+++ b/yt/cpp/mapreduce/http/http_client.h
@@ -11,8 +11,6 @@
#include <util/stream/fwd.h>
-#include <memory>
-
namespace NYT::NHttpClient {
////////////////////////////////////////////////////////////////////////////////
diff --git a/yt/cpp/mapreduce/http/requests.cpp b/yt/cpp/mapreduce/http/requests.cpp
index 7d95a10bc2..cd610d6493 100644
--- a/yt/cpp/mapreduce/http/requests.cpp
+++ b/yt/cpp/mapreduce/http/requests.cpp
@@ -2,7 +2,6 @@
#include "context.h"
#include "host_manager.h"
-#include "retry_request.h"
#include <yt/cpp/mapreduce/client/transaction.h>
diff --git a/yt/cpp/mapreduce/http/retry_request.cpp b/yt/cpp/mapreduce/http/retry_request.cpp
index 307e310b5b..0c719eeda3 100644
--- a/yt/cpp/mapreduce/http/retry_request.cpp
+++ b/yt/cpp/mapreduce/http/retry_request.cpp
@@ -114,7 +114,7 @@ TResponseInfo RetryRequestWithPolicy(
return Request(context, header, body, requestId, config);
} catch (const TErrorResponse& e) {
- LogRequestError(requestId, header, e.GetError().GetMessage(), retryPolicy->GetAttemptDescription());
+ LogRequestError(requestId, header, e.what(), retryPolicy->GetAttemptDescription());
retryWithSameMutationId = e.IsTransportError();
if (!IsRetriable(e)) {
diff --git a/yt/cpp/mapreduce/http/ut/connection_pool_ut.cpp b/yt/cpp/mapreduce/http/ut/connection_pool_ut.cpp
index 90196246c5..fa072675fb 100644
--- a/yt/cpp/mapreduce/http/ut/connection_pool_ut.cpp
+++ b/yt/cpp/mapreduce/http/ut/connection_pool_ut.cpp
@@ -72,11 +72,10 @@ THolder<TSimpleServer> CreateProxyHttpServer()
ui64 port;
ParseFirstLine(inputStr, method, host, port, command);
- THttpRequest request;
const TString hostName = ::TStringBuilder() << host << ":" << port;
- request.Connect(hostName);
auto header = THttpHeader(method, command);
- request.StartRequest(header);
+ THttpRequest request("0-0-0-0", hostName, header, TDuration::Zero());
+ request.StartRequest();
request.FinishRequest();
auto res = request.GetResponseStream();
THttpOutput httpOutput(output);
@@ -138,9 +137,8 @@ TEST(TConnectionPool, TestReleaseUnread)
const TString hostName = ::TStringBuilder() << "localhost:" << simpleServer->GetPort();
for (size_t i = 0; i != 10; ++i) {
- THttpRequest request;
- request.Connect(hostName);
- request.StartRequest(THttpHeader("GET", "foo"));
+ THttpRequest request("0-0-0-0", hostName, THttpHeader("GET", "foo"), TDuration::Zero());
+ request.StartRequest();
request.FinishRequest();
request.GetResponseStream();
}
@@ -155,12 +153,12 @@ TEST(TConnectionPool, TestProxy)
const TString hostName2 = ::TStringBuilder() << "localhost:" << simpleServer2->GetPort();
for (size_t i = 0; i != 10; ++i) {
- THttpRequest request;
- request.Connect(hostName2);
auto header = THttpHeader("GET", "foo");
header.SetProxyAddress(hostName2);
header.SetHostPort(hostName);
- request.StartRequest(header);
+
+ THttpRequest request("0-0-0-0", hostName2, header, TDuration::Zero());
+ request.StartRequest();
request.FinishRequest();
request.GetResponseStream();
}
@@ -176,9 +174,8 @@ TEST(TConnectionPool, TestConcurrency)
const auto func = [&] {
for (int i = 0; i != 100; ++i) {
- THttpRequest request;
- request.Connect(hostName);
- request.StartRequest(THttpHeader("GET", "foo"));
+ THttpRequest request("0-0-0-0", hostName, THttpHeader("GET", "foo"), TDuration::Zero());
+ request.StartRequest();
request.FinishRequest();
auto res = request.GetResponseStream();
res->ReadAll();
diff --git a/yt/cpp/mapreduce/http/ut/http_ut.cpp b/yt/cpp/mapreduce/http/ut/http_ut.cpp
index ca260841d0..e41e83c5a0 100644
--- a/yt/cpp/mapreduce/http/ut/http_ut.cpp
+++ b/yt/cpp/mapreduce/http/ut/http_ut.cpp
@@ -70,9 +70,8 @@ TEST(TFramingTest, FramingSimple)
{
auto server = CreateFramingEchoServer();
- THttpRequest request;
- request.Connect(server->GetAddress());
- auto requestStream = request.StartRequest(THttpHeader("POST", "concatenate"));
+ THttpRequest request("0-0-0-0", server->GetAddress(), THttpHeader("POST", "concatenate"), TDuration::Zero());
+ auto requestStream = request.StartRequest();
*requestStream << "Some funny data";
request.FinishRequest();
auto response = request.GetResponseStream()->ReadAll();
@@ -83,9 +82,8 @@ TEST(TFramingTest, FramingLarge)
{
auto server = CreateFramingEchoServer();
- THttpRequest request;
- request.Connect(server->GetAddress());
- auto requestStream = request.StartRequest(THttpHeader("POST", "concatenate"));
+ THttpRequest request("0-0-0-0", server->GetAddress(), THttpHeader("POST", "concatenate"), TDuration::Zero());
+ auto requestStream = request.StartRequest();
auto data = TString(100000, 'x');
*requestStream << data;
request.FinishRequest();
diff --git a/yt/cpp/mapreduce/interface/errors.cpp b/yt/cpp/mapreduce/interface/errors.cpp
index ef3d2db4a3..0819c8ca56 100644
--- a/yt/cpp/mapreduce/interface/errors.cpp
+++ b/yt/cpp/mapreduce/interface/errors.cpp
@@ -20,20 +20,11 @@ using namespace NJson;
static void WriteErrorDescription(const TYtError& error, IOutputStream* out)
{
- (*out) << '\'' << error.GetMessage() << '\'';
+ (*out) << error.GetMessage();
const auto& innerErrorList = error.InnerErrors();
if (!innerErrorList.empty()) {
- (*out) << " { ";
- bool first = true;
- for (const auto& innerError : innerErrorList) {
- if (first) {
- first = false;
- } else {
- (*out) << " ; ";
- }
- WriteErrorDescription(innerError, out);
- }
- (*out) << " }";
+ (*out) << ": ";
+ WriteErrorDescription(innerErrorList[0], out);
}
}
@@ -118,9 +109,11 @@ TYtError::TYtError(const TString& message)
, Message_(message)
{ }
-TYtError::TYtError(int code, const TString& message)
+TYtError::TYtError(int code, TString message, TVector<TYtError> innerError, TNode::TMapType attributes)
: Code_(code)
, Message_(message)
+ , InnerErrors_(innerError)
+ , Attributes_(attributes)
{ }
TYtError::TYtError(const TJsonValue& value)
@@ -396,6 +389,11 @@ void TErrorResponse::Setup()
*this << Error_.FullDescription();
}
+TTransportError::TTransportError(TYtError error)
+{
+ *this << error.FullDescription();
+}
+
////////////////////////////////////////////////////////////////////////////////
TOperationFailedError::TOperationFailedError(
diff --git a/yt/cpp/mapreduce/interface/errors.h b/yt/cpp/mapreduce/interface/errors.h
index afad58ed72..1311dbcf3d 100644
--- a/yt/cpp/mapreduce/interface/errors.h
+++ b/yt/cpp/mapreduce/interface/errors.h
@@ -6,14 +6,14 @@
/// Errors and exceptions emitted by library.
#include "fwd.h"
-#include "common.h"
#include <library/cpp/yson/node/node.h>
#include <util/generic/bt_exception.h>
-#include <util/generic/yexception.h>
+#include <util/generic/guid.h>
#include <util/generic/string.h>
#include <util/generic/vector.h>
+#include <util/generic/yexception.h>
namespace NJson {
class TJsonValue;
@@ -67,8 +67,8 @@ public:
/// Constructs error with NYT::NClusterErrorCodes::Generic code and given message.
explicit TYtError(const TString& message);
- /// Constructs error with given code and given message.
- TYtError(int code, const TString& message);
+ /// Constructs error from given parameters.
+ TYtError(int code, TString message, TVector<TYtError> inner = {}, TNode::TMapType attributes = {});
/// Construct error from json representation.
TYtError(const ::NJson::TJsonValue& value);
@@ -158,7 +158,6 @@ class TErrorResponse
{
public:
TErrorResponse(int httpCode, const TString& requestId);
- TErrorResponse(int httpCode, TYtError error);
/// Get error object returned by server.
const TYtError& GetError() const;
@@ -222,6 +221,16 @@ private:
////////////////////////////////////////////////////////////////////////////////
+/// @brief System error indicating that response from server cannot be received
+class TTransportError
+ : public yexception
+{
+public:
+ explicit TTransportError(TYtError error);
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
/// Info about failed jobs.
///
/// @see NYT::TOperationFailedError
diff --git a/yt/cpp/mapreduce/raw_client/raw_requests.cpp b/yt/cpp/mapreduce/raw_client/raw_requests.cpp
index 59868c599e..98a8aa0792 100644
--- a/yt/cpp/mapreduce/raw_client/raw_requests.cpp
+++ b/yt/cpp/mapreduce/raw_client/raw_requests.cpp
@@ -728,7 +728,6 @@ private:
}
private:
- THttpRequest Request_;
NHttpClient::IHttpResponsePtr Response_;
IInputStream* ResponseStream_;
};
diff --git a/yt/yt/client/api/client_common.h b/yt/yt/client/api/client_common.h
index a5569fb074..373d4ac61b 100644
--- a/yt/yt/client/api/client_common.h
+++ b/yt/yt/client/api/client_common.h
@@ -141,20 +141,20 @@ struct TSelectRowsOptionsBase
, public TSuppressableAccessTrackingOptions
{
//! Limits range expanding.
- ui64 RangeExpansionLimit = 200000;
- //! Limits maximum parallel subqueries.
- int MaxSubqueries = std::numeric_limits<int>::max();
+ ui64 RangeExpansionLimit = 200'000;
//! Limits parallel subqueries by row count.
- ui64 MinRowCountPerSubquery = 100'000;
+ i64 MinRowCountPerSubquery = 100'000;
//! Path in Cypress with UDFs.
std::optional<TString> UdfRegistryPath;
+ //! Limits maximum parallel subqueries.
+ int MaxSubqueries = std::numeric_limits<int>::max();
+ //! Query language syntax version.
+ int SyntaxVersion = 1;
//! If |true| then logging is more verbose.
bool VerboseLogging = false;
// COMPAT(lukyan)
//! Use fixed and rewritten range inference.
bool NewRangeInference = true;
- //! Query language syntax version.
- int SyntaxVersion = 1;
};
DEFINE_ENUM(EExecutionBackend,
@@ -169,16 +169,8 @@ struct TSelectRowsOptions
std::optional<i64> InputRowLimit;
//! If null then connection defaults are used.
std::optional<i64> OutputRowLimit;
- //! Allow queries without any condition on key columns.
- bool AllowFullScan = true;
- //! Allow queries with join condition which implies foreign query with IN operator.
- bool AllowJoinWithoutIndex = false;
//! Execution pool.
std::optional<TString> ExecutionPool;
- //! If |true| then incomplete result would lead to a failure.
- bool FailOnIncompleteResult = true;
- //! Enables generated code caching.
- bool EnableCodeCache = true;
//! Used to prioritize requests.
TUserWorkloadDescriptor WorkloadDescriptor;
//! Memory limit per execution node.
@@ -189,10 +181,6 @@ struct TSelectRowsOptions
NYson::TYsonString PlaceholderValues;
//! Native or WebAssembly execution backend.
std::optional<EExecutionBackend> ExecutionBackend;
- //! Enables canonical SQL behaviour for relational operators, i.e. null </=/> value -> null.
- bool UseCanonicalNullRelations = false;
- //! Merge versioned rows from different stores when reading.
- bool MergeVersionedRows = true;
//! Expected schemas for tables in a query (used for replica fallback in replicated tables).
using TExpectedTableSchemas = THashMap<NYPath::TYPath, NTableClient::TTableSchemaPtr>;
TExpectedTableSchemas ExpectedTableSchemas;
@@ -200,6 +188,18 @@ struct TSelectRowsOptions
NTableClient::TVersionedReadOptions VersionedReadOptions;
//! Explicitly allow or forbid the usage of row cache.
std::optional<bool> UseLookupCache;
+ //! Allow queries without any condition on key columns.
+ bool AllowFullScan = true;
+ //! Allow queries with join condition which implies foreign query with IN operator.
+ bool AllowJoinWithoutIndex = false;
+ //! If |true| then incomplete result would lead to a failure.
+ bool FailOnIncompleteResult = true;
+ //! Enables generated code caching.
+ bool EnableCodeCache = true;
+ //! Enables canonical SQL behaviour for relational operators, i.e. null </=/> value -> null.
+ bool UseCanonicalNullRelations = false;
+ //! Merge versioned rows from different stores when reading.
+ bool MergeVersionedRows = true;
};
struct TFallbackReplicaOptions
diff --git a/yt/yt/client/api/rpc_proxy/client_impl.cpp b/yt/yt/client/api/rpc_proxy/client_impl.cpp
index f90c95f2c0..cd93d3f214 100644
--- a/yt/yt/client/api/rpc_proxy/client_impl.cpp
+++ b/yt/yt/client/api/rpc_proxy/client_impl.cpp
@@ -112,7 +112,7 @@ IChannelPtr TClient::CreateSequoiaAwareRetryingChannel(IChannelPtr channel, bool
IChannelPtr TClient::CreateNonRetryingChannelByAddress(const std::string& address) const
{
return CreateCredentialsInjectingChannel(
- Connection_->CreateChannelByAddress(TString(address)),
+ Connection_->CreateChannelByAddress(address),
ClientOptions_);
}
@@ -629,7 +629,7 @@ TFuture<std::vector<TTabletInfo>> TClient::GetTabletInfos(
auto& currentReplica = tabletInfo.TableReplicaInfos->emplace_back();
currentReplica.ReplicaId = FromProto<TGuid>(protoReplicaInfo.replica_id());
currentReplica.LastReplicationTimestamp = protoReplicaInfo.last_replication_timestamp();
- currentReplica.Mode = CheckedEnumCast<ETableReplicaMode>(protoReplicaInfo.mode());
+ currentReplica.Mode = FromProto<ETableReplicaMode>(protoReplicaInfo.mode());
currentReplica.CurrentReplicationRowIndex = protoReplicaInfo.current_replication_row_index();
currentReplica.CommittedReplicationRowIndex = protoReplicaInfo.committed_replication_row_index();
currentReplica.ReplicationError = FromProto<TError>(protoReplicaInfo.replication_error());
@@ -2066,16 +2066,16 @@ TFuture<TMaintenanceCountsPerTarget> TClient::RemoveMaintenance(
counts[EMaintenanceType::DisableTabletCells] = rspValue->disable_tablet_cells();
counts[EMaintenanceType::PendingRestart] = rspValue->pending_restart();
} else {
- for (const auto& [type, count] : rspValue->removed_maintenance_counts()) {
- counts[CheckedEnumCast<EMaintenanceType>(type)] = count;
+ for (auto [type, count] : rspValue->removed_maintenance_counts()) {
+ counts[FromProto<EMaintenanceType>(type)] = count;
}
}
} else {
result.reserve(rspValue->removed_maintenance_counts_per_target_size());
for (const auto& [target, protoCounts] : rspValue->removed_maintenance_counts_per_target()) {
auto& counts = result[target];
- for (const auto& [type, count] : protoCounts.counts()) {
- counts[CheckedEnumCast<EMaintenanceType>(type)] = count;
+ for (auto [type, count] : protoCounts.counts()) {
+ counts[FromProto<EMaintenanceType>(type)] = count;
}
}
}
diff --git a/yt/yt/client/api/rpc_proxy/config.cpp b/yt/yt/client/api/rpc_proxy/config.cpp
index 87dfd2b22e..b3aef0ea36 100644
--- a/yt/yt/client/api/rpc_proxy/config.cpp
+++ b/yt/yt/client/api/rpc_proxy/config.cpp
@@ -100,9 +100,9 @@ void TConnectionConfig::Register(TRegistrar registrar)
.Default(NCompression::ECodec::None);
registrar.Parameter("response_codec", &TThis::ResponseCodec)
.Default(NCompression::ECodec::None);
- // COMPAT(danilalexeev ): legacy RPC codecs
+ // COMPAT(danilalexeev): legacy RPC codecs
registrar.Parameter("enable_legacy_rpc_codecs", &TThis::EnableLegacyRpcCodecs)
- .Default(true);
+ .Default(false);
registrar.Parameter("enable_retries", &TThis::EnableRetries)
.Default(false);
diff --git a/yt/yt/client/api/rpc_proxy/connection_impl.cpp b/yt/yt/client/api/rpc_proxy/connection_impl.cpp
index 3a491d8526..fc365aa9e9 100644
--- a/yt/yt/client/api/rpc_proxy/connection_impl.cpp
+++ b/yt/yt/client/api/rpc_proxy/connection_impl.cpp
@@ -291,9 +291,9 @@ IChannelPtr TConnection::CreateChannel(bool sticky)
return CreateRoamingChannel(std::move(provider));
}
-IChannelPtr TConnection::CreateChannelByAddress(const TString& address)
+IChannelPtr TConnection::CreateChannelByAddress(const std::string& address)
{
- return CachingChannelFactory_->CreateChannel(address.ConstRef());
+ return CachingChannelFactory_->CreateChannel(address);
}
TClusterTag TConnection::GetClusterTag() const
diff --git a/yt/yt/client/api/rpc_proxy/connection_impl.h b/yt/yt/client/api/rpc_proxy/connection_impl.h
index 3e050989ca..79442d31d3 100644
--- a/yt/yt/client/api/rpc_proxy/connection_impl.h
+++ b/yt/yt/client/api/rpc_proxy/connection_impl.h
@@ -27,7 +27,7 @@ public:
~TConnection();
NRpc::IChannelPtr CreateChannel(bool sticky);
- NRpc::IChannelPtr CreateChannelByAddress(const TString& address);
+ NRpc::IChannelPtr CreateChannelByAddress(const std::string& address);
const TConnectionConfigPtr& GetConfig();
diff --git a/yt/yt/client/api/rpc_proxy/helpers.cpp b/yt/yt/client/api/rpc_proxy/helpers.cpp
index d37fc8b2bd..88c12aa793 100644
--- a/yt/yt/client/api/rpc_proxy/helpers.cpp
+++ b/yt/yt/client/api/rpc_proxy/helpers.cpp
@@ -523,7 +523,7 @@ void FromProto(NTableClient::TColumnSchema* schema, const NProto::TColumnSchema&
? TColumnStableName(protoSchema.stable_name())
: TColumnStableName(protoSchema.name()));
- auto physicalType = CheckedEnumCast<EValueType>(protoSchema.type());
+ auto physicalType = FromProto<EValueType>(protoSchema.type());
TLogicalTypePtr columnType;
if (protoSchema.has_type_v3()) {
@@ -545,7 +545,7 @@ void FromProto(NTableClient::TColumnSchema* schema, const NProto::TColumnSchema&
<< TErrorAttribute("type", protoSchema.type());
}
} else if (protoSchema.has_logical_type()) {
- auto logicalType = CheckedEnumCast<ESimpleLogicalValueType>(protoSchema.logical_type());
+ auto logicalType = FromProto<ESimpleLogicalValueType>(protoSchema.logical_type());
columnType = MakeLogicalType(logicalType, protoSchema.required());
if (protoSchema.has_type() && GetPhysicalType(logicalType) != physicalType) {
THROW_ERROR_EXCEPTION("Fields \"logical_type\" and \"type\" do not match")
@@ -565,7 +565,7 @@ void FromProto(NTableClient::TColumnSchema* schema, const NProto::TColumnSchema&
schema->SetExpression(YT_PROTO_OPTIONAL(protoSchema, expression));
schema->SetMaterialized(YT_PROTO_OPTIONAL(protoSchema, materialized));
schema->SetAggregate(YT_PROTO_OPTIONAL(protoSchema, aggregate));
- schema->SetSortOrder(YT_APPLY_PROTO_OPTIONAL(protoSchema, sort_order, CheckedEnumCast<ESortOrder>));
+ schema->SetSortOrder(YT_APPLY_PROTO_OPTIONAL(protoSchema, sort_order, FromProto<ESortOrder>));
schema->SetGroup(YT_PROTO_OPTIONAL(protoSchema, group));
schema->SetMaxInlineHunkSize(YT_PROTO_OPTIONAL(protoSchema, max_inline_hunk_size));
}
@@ -609,14 +609,11 @@ void ToProto(NProto::TTabletInfo* protoTabletInfo, const NTabletClient::TTabletI
void FromProto(NTabletClient::TTabletInfo* tabletInfo, const NProto::TTabletInfo& protoTabletInfo)
{
- tabletInfo->TabletId =
- FromProto<TTabletId>(protoTabletInfo.tablet_id());
+ tabletInfo->TabletId = FromProto<TTabletId>(protoTabletInfo.tablet_id());
tabletInfo->MountRevision = FromProto<NHydra::TRevision>(protoTabletInfo.mount_revision());
- tabletInfo->State = CheckedEnumCast<ETabletState>(protoTabletInfo.state());
+ tabletInfo->State = FromProto<ETabletState>(protoTabletInfo.state());
tabletInfo->PivotKey = FromProto<NTableClient::TLegacyOwningKey>(protoTabletInfo.pivot_key());
- if (protoTabletInfo.has_cell_id()) {
- tabletInfo->CellId = FromProto<TTabletCellId>(protoTabletInfo.cell_id());
- }
+ tabletInfo->CellId = FromProto<TTabletCellId>(protoTabletInfo.cell_id());
}
void ToProto(
@@ -1373,7 +1370,7 @@ void FromProto(
(*manifest)->SourcePath = protoManifest.source_path();
(*manifest)->DestinationPath = protoManifest.destination_path();
- (*manifest)->OrderedMode = CheckedEnumCast<EOrderedTableBackupMode>(protoManifest.ordered_mode());
+ (*manifest)->OrderedMode = FromProto<EOrderedTableBackupMode>(protoManifest.ordered_mode());
}
void ToProto(
@@ -2198,10 +2195,10 @@ TTableSchemaPtr DeserializeRowsetSchema(
columns[i].SetStableName(TColumnStableName(entry.name()));
}
if (entry.has_logical_type()) {
- auto simpleLogicalType = CheckedEnumCast<NTableClient::ESimpleLogicalValueType>(entry.logical_type());
+ auto simpleLogicalType = FromProto<NTableClient::ESimpleLogicalValueType>(entry.logical_type());
columns[i].SetLogicalType(OptionalLogicalType(SimpleLogicalType(simpleLogicalType)));
} else if (entry.has_type()) {
- auto simpleLogicalType = CheckedEnumCast<NTableClient::ESimpleLogicalValueType>(entry.type());
+ auto simpleLogicalType = FromProto<NTableClient::ESimpleLogicalValueType>(entry.type());
columns[i].SetLogicalType(OptionalLogicalType(SimpleLogicalType(simpleLogicalType)));
}
}
diff --git a/yt/yt/client/cache/rpc.cpp b/yt/yt/client/cache/rpc.cpp
index da0200d5fc..025d223acf 100644
--- a/yt/yt/client/cache/rpc.cpp
+++ b/yt/yt/client/cache/rpc.cpp
@@ -68,12 +68,12 @@ NApi::IClientPtr CreateClient(TStringBuf clusterUrl)
return CreateClient(clusterUrl, NApi::GetClientOptionsFromEnvStatic());
}
-NApi::IClientPtr CreateClient(TStringBuf cluster, TStringBuf proxyRole)
+NApi::IClientPtr CreateClient(TStringBuf cluster, std::optional<TStringBuf> proxyRole)
{
auto config = New<NApi::NRpcProxy::TConnectionConfig>();
config->ClusterUrl = ToString(cluster);
- if (!proxyRole.empty()) {
- config->ProxyRole = ToString(proxyRole);
+ if (proxyRole && !proxyRole->empty()) {
+ config->ProxyRole = ToString(*proxyRole);
}
config->Postprocess();
return CreateClient(config);
diff --git a/yt/yt/client/cache/rpc.h b/yt/yt/client/cache/rpc.h
index c2a844bb50..06cb3815e2 100644
--- a/yt/yt/client/cache/rpc.h
+++ b/yt/yt/client/cache/rpc.h
@@ -27,7 +27,7 @@ NApi::IClientPtr CreateClient(const NApi::NRpcProxy::TConnectionConfigPtr& confi
NApi::IClientPtr CreateClient(TStringBuf clusterUrl);
//! Allows to specify proxyRole as dedicated option.
-NApi::IClientPtr CreateClient(TStringBuf cluster, TStringBuf proxyRole);
+NApi::IClientPtr CreateClient(TStringBuf cluster, std::optional<TStringBuf> proxyRole);
//! Shortcut to create client with default config and options from env variables (use env:YT_PROXY as serverName).
NApi::IClientPtr CreateClient();
diff --git a/yt/yt/client/chaos_client/config.h b/yt/yt/client/chaos_client/config.h
index 4454dbb187..9391551f8a 100644
--- a/yt/yt/client/chaos_client/config.h
+++ b/yt/yt/client/chaos_client/config.h
@@ -10,10 +10,24 @@ namespace NYT::NChaosClient {
////////////////////////////////////////////////////////////////////////////////
+class TChaosCacheChannelConfig
+ : public NRpc::TRetryingChannelConfig
+ , public NRpc::TBalancingChannelConfig
+{
+public:
+ REGISTER_YSON_STRUCT(TChaosCacheChannelConfig);
+
+ static void Register(TRegistrar /*registrar*/)
+ { }
+};
+
+DEFINE_REFCOUNTED_TYPE(TChaosCacheChannelConfig)
+
+////////////////////////////////////////////////////////////////////////////////
+
class TReplicationCardCacheConfig
: public TAsyncExpiringCacheConfig
- , public NRpc::TBalancingChannelConfig
- , public NRpc::TRetryingChannelConfig
+ , public TChaosCacheChannelConfig
{
public:
bool EnableWatching;
diff --git a/yt/yt/client/chaos_client/public.h b/yt/yt/client/chaos_client/public.h
index 0493104b9d..28f7c30d5c 100644
--- a/yt/yt/client/chaos_client/public.h
+++ b/yt/yt/client/chaos_client/public.h
@@ -24,6 +24,7 @@ constexpr int MaxReplicasPerReplicationCard = 128;
DECLARE_REFCOUNTED_STRUCT(TReplicationCard)
DECLARE_REFCOUNTED_STRUCT(IReplicationCardCache)
+DECLARE_REFCOUNTED_CLASS(TChaosCacheChannelConfig)
DECLARE_REFCOUNTED_CLASS(TReplicationCardCacheConfig)
DECLARE_REFCOUNTED_CLASS(TReplicationCardCacheDynamicConfig)
diff --git a/yt/yt/client/driver/command-inl.h b/yt/yt/client/driver/command-inl.h
index 4b595a57df..54f0d40f4e 100644
--- a/yt/yt/client/driver/command-inl.h
+++ b/yt/yt/client/driver/command-inl.h
@@ -392,7 +392,7 @@ void TSelectRowsCommandBase<
.GreaterThan(0)
.Optional(/*init*/ false);
- registrar.template ParameterWithUniversalAccessor<ui64>(
+ registrar.template ParameterWithUniversalAccessor<i64>(
"min_row_count_per_subquery",
[] (TThis* command) -> auto& {
return command->Options.MinRowCountPerSubquery;
diff --git a/yt/yt/client/driver/driver.cpp b/yt/yt/client/driver/driver.cpp
index 48d6f7d62b..afa22250e6 100644
--- a/yt/yt/client/driver/driver.cpp
+++ b/yt/yt/client/driver/driver.cpp
@@ -357,7 +357,7 @@ public:
REGISTER (TAdvanceQueueConsumerCommand, "advance_queue_consumer", Null, Structured, true, false, ApiVersion4);
REGISTER (TCreateQueueProducerSessionCommand, "create_queue_producer_session", Null, Structured, true, false, ApiVersion4);
REGISTER (TRemoveQueueProducerSessionCommand, "remove_queue_producer_session", Null, Structured, true, false, ApiVersion4);
- REGISTER (TPushQueueProducerCommand, "push_queue_producer", Null, Structured, true, false, ApiVersion4);
+ REGISTER (TPushQueueProducerCommand, "push_queue_producer", Tabular, Structured, true, false, ApiVersion4);
REGISTER (TStartQueryCommand, "start_query", Null, Structured, true, false, ApiVersion4);
REGISTER (TAbortQueryCommand, "abort_query", Null, Structured, true, false, ApiVersion4);
diff --git a/yt/yt/client/driver/etc_commands.h b/yt/yt/client/driver/etc_commands.h
index 5d9e8c04b1..789f8690d4 100644
--- a/yt/yt/client/driver/etc_commands.h
+++ b/yt/yt/client/driver/etc_commands.h
@@ -173,7 +173,7 @@ private:
TString PoolTree;
NYTree::INodePtr ResourceDelta;
- virtual void DoExecute(ICommandContextPtr context) override;
+ void DoExecute(ICommandContextPtr context) override;
};
////////////////////////////////////////////////////////////////////////////////
diff --git a/yt/yt/client/driver/queue_commands.cpp b/yt/yt/client/driver/queue_commands.cpp
index 1db333dc5d..8f8040b193 100644
--- a/yt/yt/client/driver/queue_commands.cpp
+++ b/yt/yt/client/driver/queue_commands.cpp
@@ -163,6 +163,11 @@ void TPullQueueCommand::Register(TRegistrar registrar)
void TPullQueueCommand::DoExecute(ICommandContextPtr context)
{
+ YT_LOG_DEBUG("Executing \"pull_queue\" command (QueuePath: %v, Offset: %v, PartitionIndex: %v)",
+ QueuePath,
+ Offset,
+ PartitionIndex);
+
auto client = context->GetClient();
auto result = WaitFor(client->PullQueue(
@@ -226,6 +231,12 @@ void TPullQueueConsumerCommand::Register(TRegistrar registrar)
void TPullQueueConsumerCommand::DoExecute(ICommandContextPtr context)
{
+ YT_LOG_DEBUG("Executing \"pull_queue_consumer\" command (ConsumerPath: %v, QueuePath: %v, Offset: %v, PartitionIndex: %v)",
+ ConsumerPath,
+ QueuePath,
+ Offset,
+ PartitionIndex);
+
auto client = context->GetClient();
auto result = WaitFor(client->PullQueueConsumer(
@@ -263,6 +274,13 @@ void TAdvanceQueueConsumerCommand::Register(TRegistrar registrar)
void TAdvanceQueueConsumerCommand::DoExecute(ICommandContextPtr context)
{
+ YT_LOG_DEBUG("Executing \"advance_queue_consumer\" command (ConsumerPath: %v, QueuePath: %v, PartitionIndex: %v, OldOffset: %v, NewOffset: %v)",
+ ConsumerPath,
+ QueuePath,
+ PartitionIndex,
+ OldOffset,
+ NewOffset);
+
auto transaction = GetTransaction(context);
if (ClientSide.value_or(false)) {
@@ -297,6 +315,11 @@ void TCreateQueueProducerSessionCommand::Register(TRegistrar registrar)
void TCreateQueueProducerSessionCommand::DoExecute(ICommandContextPtr context)
{
+ YT_LOG_DEBUG("Executing \"create_queue_producer_session\" command (ProducerPath: %v, QueuePath: %v, SessionId: %v)",
+ ProducerPath,
+ QueuePath,
+ SessionId);
+
auto client = context->GetClient();
auto result = WaitFor(client->CreateQueueProducerSession(
@@ -327,6 +350,11 @@ void TRemoveQueueProducerSessionCommand::Register(TRegistrar registrar)
void TRemoveQueueProducerSessionCommand::DoExecute(ICommandContextPtr context)
{
+ YT_LOG_DEBUG("Executing \"remove_queue_producer_session\" command (ProducerPath: %v, QueuePath: %v, SessionId: %v)",
+ ProducerPath,
+ QueuePath,
+ SessionId);
+
auto client = context->GetClient();
WaitFor(client->RemoveQueueProducerSession(
@@ -366,6 +394,12 @@ void TPushQueueProducerCommand::Register(TRegistrar registrar)
void TPushQueueProducerCommand::DoExecute(ICommandContextPtr context)
{
+ YT_LOG_DEBUG("Executing \"push_queue_producer\" command (ProducerPath: %v, QueuePath: %v, SessionId: %v, Epoch: %v)",
+ ProducerPath,
+ QueuePath,
+ SessionId,
+ Epoch);
+
auto tableMountCache = context->GetClient()->GetTableMountCache();
auto queueTableInfoFuture = tableMountCache->GetTableInfo(QueuePath.GetPath());
diff --git a/yt/yt/client/driver/table_commands.h b/yt/yt/client/driver/table_commands.h
index 224bcd9c0c..d511e76622 100644
--- a/yt/yt/client/driver/table_commands.h
+++ b/yt/yt/client/driver/table_commands.h
@@ -400,7 +400,7 @@ public:
private:
NYPath::TRichYPath Path;
- virtual void DoExecute(ICommandContextPtr context) override;
+ void DoExecute(ICommandContextPtr context) override;
bool HasResponseParameters() const override;
};
@@ -585,7 +585,7 @@ public:
private:
NApi::TBackupManifestPtr Manifest;
- virtual void DoExecute(ICommandContextPtr context) override;
+ void DoExecute(ICommandContextPtr context) override;
};
////////////////////////////////////////////////////////////////////////////////
@@ -601,7 +601,7 @@ public:
private:
NApi::TBackupManifestPtr Manifest;
- virtual void DoExecute(ICommandContextPtr context) override;
+ void DoExecute(ICommandContextPtr context) override;
};
////////////////////////////////////////////////////////////////////////////////
diff --git a/yt/yt/client/federated/client.cpp b/yt/yt/client/federated/client.cpp
index fd8fb21dc0..2092672c4f 100644
--- a/yt/yt/client/federated/client.cpp
+++ b/yt/yt/client/federated/client.cpp
@@ -161,17 +161,17 @@ public:
return Underlying_->GetStartTimestamp();
}
- virtual NTransactionClient::EAtomicity GetAtomicity() const override
+ NTransactionClient::EAtomicity GetAtomicity() const override
{
return Underlying_->GetAtomicity();
}
- virtual NTransactionClient::EDurability GetDurability() const override
+ NTransactionClient::EDurability GetDurability() const override
{
return Underlying_->GetDurability();
}
- virtual TDuration GetTimeout() const override
+ TDuration GetTimeout() const override
{
return Underlying_->GetTimeout();
}
diff --git a/yt/yt/client/formats/versioned_writer.h b/yt/yt/client/formats/versioned_writer.h
index 025ba8e1fe..1427d5bf71 100644
--- a/yt/yt/client/formats/versioned_writer.h
+++ b/yt/yt/client/formats/versioned_writer.h
@@ -26,11 +26,11 @@ public:
NTableClient::TTableSchemaPtr schema,
const std::function<std::unique_ptr<NYson::IFlushableYsonConsumer>(IZeroCopyOutput*)>& consumerBuilder);
- virtual TFuture<void> Close() override;
+ TFuture<void> Close() override;
- virtual bool Write(TRange<NTableClient::TVersionedRow> rows) override;
+ bool Write(TRange<NTableClient::TVersionedRow> rows) override;
- virtual TFuture<void> GetReadyEvent() override;
+ TFuture<void> GetReadyEvent() override;
private:
const NConcurrency::IAsyncOutputStreamPtr Stream_;
diff --git a/yt/yt/client/scheduler/public.h b/yt/yt/client/scheduler/public.h
index 2c0f279ed9..10e930bb93 100644
--- a/yt/yt/client/scheduler/public.h
+++ b/yt/yt/client/scheduler/public.h
@@ -143,6 +143,7 @@ DEFINE_ENUM(EAbortReason,
((InterruptionFailed) ( 55))
((OperationIncarnationChanged) ( 56))
((AddressResolveFailed) ( 57))
+ ((UnexpectedNodeJobPhase) ( 58))
((SchedulingFirst) (100))
((SchedulingTimeout) (101))
((SchedulingResourceOvercommit) (102))
diff --git a/yt/yt/client/table_client/column_sort_schema.cpp b/yt/yt/client/table_client/column_sort_schema.cpp
index 2bc14f1a9f..b6002b5e77 100644
--- a/yt/yt/client/table_client/column_sort_schema.cpp
+++ b/yt/yt/client/table_client/column_sort_schema.cpp
@@ -89,11 +89,13 @@ void FromProto(
TSortColumns* sortColumns,
const NProto::TSortColumnsExt& protoSortColumns)
{
+ using NYT::FromProto;
+
YT_VERIFY(protoSortColumns.names_size() == protoSortColumns.sort_orders_size());
for (int columnIndex = 0; columnIndex < protoSortColumns.names_size(); ++columnIndex) {
TColumnSortSchema sortColumn{
.Name = protoSortColumns.names(columnIndex),
- .SortOrder = CheckedEnumCast<ESortOrder>(protoSortColumns.sort_orders(columnIndex))
+ .SortOrder = FromProto<ESortOrder>(protoSortColumns.sort_orders(columnIndex)),
};
sortColumns->push_back(sortColumn);
}
diff --git a/yt/yt/client/table_client/config.cpp b/yt/yt/client/table_client/config.cpp
index 49898440e2..6bfd4b410c 100644
--- a/yt/yt/client/table_client/config.cpp
+++ b/yt/yt/client/table_client/config.cpp
@@ -464,6 +464,8 @@ void TChunkWriterOptions::Register(TRegistrar registrar)
.Default(false);
registrar.Parameter("enable_column_meta_in_chunk_meta", &TThis::EnableColumnMetaInChunkMeta)
.Default(true);
+ registrar.Parameter("consider_min_row_range_data_weight", &TThis::ConsiderMinRowRangeDataWeight)
+ .Default(true);
registrar.Parameter("schema_modification", &TThis::SchemaModification)
.Default(ETableSchemaModification::None);
diff --git a/yt/yt/client/table_client/config.h b/yt/yt/client/table_client/config.h
index a8accf974b..2f91bcd032 100644
--- a/yt/yt/client/table_client/config.h
+++ b/yt/yt/client/table_client/config.h
@@ -429,6 +429,7 @@ public:
bool EnableRowCountInColumnarStatistics;
bool EnableSegmentMetaInBlocks;
bool EnableColumnMetaInChunkMeta;
+ bool ConsiderMinRowRangeDataWeight;
NYTree::INodePtr CastAnyToCompositeNode;
diff --git a/yt/yt/client/table_client/logical_type.cpp b/yt/yt/client/table_client/logical_type.cpp
index 679540a999..4845e37210 100644
--- a/yt/yt/client/table_client/logical_type.cpp
+++ b/yt/yt/client/table_client/logical_type.cpp
@@ -1123,9 +1123,11 @@ void ToProto(NProto::TLogicalType* protoLogicalType, const TLogicalTypePtr& logi
void FromProto(TLogicalTypePtr* logicalType, const NProto::TLogicalType& protoLogicalType)
{
+ using NYT::FromProto;
+
switch (protoLogicalType.type_case()) {
case NProto::TLogicalType::TypeCase::kSimple:
- *logicalType = SimpleLogicalType(CheckedEnumCast<ESimpleLogicalValueType>(protoLogicalType.simple()));
+ *logicalType = SimpleLogicalType(FromProto<ESimpleLogicalValueType>(protoLogicalType.simple()));
return;
case NProto::TLogicalType::TypeCase::kDecimal:
*logicalType = DecimalLogicalType(protoLogicalType.decimal().precision(), protoLogicalType.decimal().scale());
diff --git a/yt/yt/client/table_client/schema.cpp b/yt/yt/client/table_client/schema.cpp
index 25a5c29ed9..dda743a92d 100644
--- a/yt/yt/client/table_client/schema.cpp
+++ b/yt/yt/client/table_client/schema.cpp
@@ -463,10 +463,10 @@ void FromProto(TColumnSchema* schema, const NProto::TColumnSchema& protoSchema)
} else if (protoSchema.has_simple_logical_type()) {
schema->SetLogicalType(
MakeLogicalType(
- CheckedEnumCast<ESimpleLogicalValueType>(protoSchema.simple_logical_type()),
+ FromProto<ESimpleLogicalValueType>(protoSchema.simple_logical_type()),
protoSchema.required()));
} else {
- auto physicalType = CheckedEnumCast<EValueType>(protoSchema.type());
+ auto physicalType = FromProto<EValueType>(protoSchema.type());
schema->SetLogicalType(MakeLogicalType(GetLogicalType(physicalType), protoSchema.required()));
}
@@ -474,7 +474,7 @@ void FromProto(TColumnSchema* schema, const NProto::TColumnSchema& protoSchema)
schema->SetExpression(YT_PROTO_OPTIONAL(protoSchema, expression));
schema->SetMaterialized(YT_PROTO_OPTIONAL(protoSchema, materialized));
schema->SetAggregate(YT_PROTO_OPTIONAL(protoSchema, aggregate));
- schema->SetSortOrder(YT_APPLY_PROTO_OPTIONAL(protoSchema, sort_order, CheckedEnumCast<ESortOrder>));
+ schema->SetSortOrder(YT_APPLY_PROTO_OPTIONAL(protoSchema, sort_order, FromProto<ESortOrder>));
schema->SetGroup(YT_PROTO_OPTIONAL(protoSchema, group));
schema->SetMaxInlineHunkSize(YT_PROTO_OPTIONAL(protoSchema, max_inline_hunk_size));
}
@@ -1514,7 +1514,7 @@ void FromProto(TTableSchema* schema, const NProto::TTableSchemaExt& protoSchema)
FromProto<std::vector<TColumnSchema>>(protoSchema.columns()),
protoSchema.strict(),
protoSchema.unique_keys(),
- CheckedEnumCast<ETableSchemaModification>(protoSchema.schema_modification()),
+ FromProto<ETableSchemaModification>(protoSchema.schema_modification()),
FromProto<std::vector<TDeletedColumn>>(protoSchema.deleted_columns()));
}
diff --git a/yt/yt/client/table_client/versioned_io_options.cpp b/yt/yt/client/table_client/versioned_io_options.cpp
index d181197a1e..040a0463e5 100644
--- a/yt/yt/client/table_client/versioned_io_options.cpp
+++ b/yt/yt/client/table_client/versioned_io_options.cpp
@@ -4,6 +4,7 @@
namespace NYT::NTableClient {
+using NYT::FromProto;
using NYT::ToProto;
////////////////////////////////////////////////////////////////////////////////
@@ -31,7 +32,7 @@ void FromProto(
TVersionedReadOptions* options,
const NProto::TVersionedReadOptions& protoOptions)
{
- options->ReadMode = CheckedEnumCast<EVersionedIOMode>(protoOptions.read_mode());
+ options->ReadMode = FromProto<EVersionedIOMode>(protoOptions.read_mode());
}
void ToProto(
@@ -45,7 +46,7 @@ void FromProto(
NTableClient::TVersionedWriteOptions* options,
const NProto::TVersionedWriteOptions& protoOptions)
{
- options->WriteMode = CheckedEnumCast<EVersionedIOMode>(protoOptions.write_mode());
+ options->WriteMode = FromProto<EVersionedIOMode>(protoOptions.write_mode());
}
std::optional<TString> GetTimestampColumnOriginalNameOrNull(TStringBuf name)
diff --git a/yt/yt/client/table_client/wire_protocol.cpp b/yt/yt/client/table_client/wire_protocol.cpp
index 59638588f2..ec948b7263 100644
--- a/yt/yt/client/table_client/wire_protocol.cpp
+++ b/yt/yt/client/table_client/wire_protocol.cpp
@@ -536,8 +536,9 @@ class TWireProtocolReader
: public IWireProtocolReader
{
public:
- explicit TWireProtocolReader(TSharedRef data, TRowBufferPtr rowBuffer)
+ explicit TWireProtocolReader(TSharedRef data, TRowBufferPtr rowBuffer, TWireProtocolOptions options)
: RowBuffer_(rowBuffer ? std::move(rowBuffer) : New<TRowBuffer>(TWireProtocolReaderTag(), ReaderBufferChunkSize))
+ , Options_(std::move(options))
, Data_(std::move(data))
, Current_(Data_.Begin())
{ }
@@ -781,10 +782,12 @@ public:
private:
const TRowBufferPtr RowBuffer_;
+ const TWireProtocolOptions Options_;
TSharedRef Data_;
TIterator Current_;
+
void ValidateSizeAvailable(size_t size)
{
if (Current_ + size > Data_.End()) {
@@ -857,15 +860,15 @@ private:
void DoReadStringData(EValueType type, ui32 length, const char** result, bool captureValues)
{
- ui32 limit = 0;
+ i64 limit = 0;
if (type == EValueType::String) {
- limit = MaxStringValueLength;
+ limit = Options_.MaxStringValueLength;
}
if (type == EValueType::Any) {
- limit = MaxAnyValueLength;
+ limit = Options_.MaxAnyValueLength;
}
if (type == EValueType::Composite) {
- limit = MaxCompositeValueLength;
+ limit = Options_.MaxCompositeValueLength;
}
if (length > limit) {
THROW_ERROR_EXCEPTION("Value of type %Qlv is too long: length %v, limit %v",
@@ -994,10 +997,10 @@ private:
void ValidateVersionedRowTimestampCount(const TVersionedRowHeader& rowHeader)
{
- if (rowHeader.WriteTimestampCount > MaxTimestampCountPerRow) {
+ if (rowHeader.WriteTimestampCount > Options_.MaxTimestampCountPerRow) {
THROW_ERROR_EXCEPTION("Too many write timestamps in a versioned row");
}
- if (rowHeader.DeleteTimestampCount > MaxTimestampCountPerRow) {
+ if (rowHeader.DeleteTimestampCount > Options_.MaxTimestampCountPerRow) {
THROW_ERROR_EXCEPTION("Too many delete timestamps in a versioned row");
}
}
@@ -1005,10 +1008,10 @@ private:
void ValidateVersionedRowDataWeight(TVersionedRow row)
{
auto dataWeight = GetDataWeight(row);
- if (dataWeight > MaxServerVersionedRowDataWeight) {
+ if (static_cast<i64>(dataWeight) > Options_.MaxVersionedRowDataWeight) {
THROW_ERROR_EXCEPTION("Versioned row data weight is too large: %v > %v",
dataWeight,
- MaxServerVersionedRowDataWeight)
+ Options_.MaxVersionedRowDataWeight)
<< TErrorAttribute("key", ToOwningKey(row));
}
}
@@ -1052,9 +1055,9 @@ auto IWireProtocolReader::GetSchemaData(const TTableSchema& schema) -> TSchemaDa
////////////////////////////////////////////////////////////////////////////////
-std::unique_ptr<IWireProtocolReader> CreateWireProtocolReader(TSharedRef data, TRowBufferPtr rowBuffer)
+std::unique_ptr<IWireProtocolReader> CreateWireProtocolReader(TSharedRef data, TRowBufferPtr rowBuffer, TWireProtocolOptions options)
{
- return std::make_unique<TWireProtocolReader>(std::move(data), std::move(rowBuffer));
+ return std::make_unique<TWireProtocolReader>(std::move(data), std::move(rowBuffer), std::move(options));
}
////////////////////////////////////////////////////////////////////////////////
@@ -1068,12 +1071,14 @@ public:
NCompression::ECodec codecId,
TTableSchemaPtr schema,
bool schemaful,
- const NLogging::TLogger& logger)
+ const NLogging::TLogger& logger,
+ TWireProtocolOptions options)
: CompressedBlocks_(compressedBlocks)
, Codec_(NCompression::GetCodec(codecId))
, Schema_(std::move(schema))
, Schemaful_(schemaful)
, Logger(logger.WithTag("ReaderId: %v", TGuid::Create()))
+ , Options_(std::move(options))
{
YT_LOG_DEBUG("Wire protocol rowset reader created (BlockCount: %v, TotalCompressedSize: %v)",
CompressedBlocks_.size(),
@@ -1102,7 +1107,7 @@ public:
uncompressedBlock.Size());
auto rowBuffer = New<TRowBuffer>(TWireProtocolReaderTag(), ReaderBufferChunkSize);
- WireReader_ = CreateWireProtocolReader(uncompressedBlock, std::move(rowBuffer));
+ WireReader_ = CreateWireProtocolReader(uncompressedBlock, std::move(rowBuffer), Options_);
if (!SchemaChecked_) {
auto actualSchema = WireReader_->ReadTableSchema();
@@ -1164,6 +1169,7 @@ private:
const TTableSchemaPtr Schema_;
bool Schemaful_;
const NLogging::TLogger Logger;
+ const TWireProtocolOptions Options_;
int BlockIndex_ = 0;
std::unique_ptr<IWireProtocolReader> WireReader_;
@@ -1177,14 +1183,16 @@ IWireProtocolRowsetReaderPtr CreateWireProtocolRowsetReader(
NCompression::ECodec codecId,
TTableSchemaPtr schema,
bool schemaful,
- const NLogging::TLogger& logger)
+ const NLogging::TLogger& logger,
+ TWireProtocolOptions options)
{
return New<TWireProtocolRowsetReader>(
compressedBlocks,
codecId,
std::move(schema),
schemaful,
- logger);
+ logger,
+ std::move(options));
}
////////////////////////////////////////////////////////////////////////////////
diff --git a/yt/yt/client/table_client/wire_protocol.h b/yt/yt/client/table_client/wire_protocol.h
index 340f3d6263..493d03b494 100644
--- a/yt/yt/client/table_client/wire_protocol.h
+++ b/yt/yt/client/table_client/wire_protocol.h
@@ -280,13 +280,25 @@ public:
////////////////////////////////////////////////////////////////////////////////
+struct TWireProtocolOptions
+{
+ i64 MaxStringValueLength = NTableClient::MaxStringValueLength;
+ i64 MaxAnyValueLength = NTableClient::MaxAnyValueLength;
+ i64 MaxCompositeValueLength = NTableClient::MaxCompositeValueLength;
+ i64 MaxTimestampCountPerRow = NTableClient::MaxTimestampCountPerRow;
+ i64 MaxVersionedRowDataWeight = NTableClient::MaxServerVersionedRowDataWeight;
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
//! Creates wire protocol reader.
/*!
* If #rowBuffer is null, a default one is created.
*/
std::unique_ptr<IWireProtocolReader> CreateWireProtocolReader(
TSharedRef data,
- TRowBufferPtr rowBuffer = TRowBufferPtr());
+ TRowBufferPtr rowBuffer = TRowBufferPtr(),
+ TWireProtocolOptions options = {});
////////////////////////////////////////////////////////////////////////////////
@@ -301,7 +313,8 @@ IWireProtocolRowsetReaderPtr CreateWireProtocolRowsetReader(
NCompression::ECodec codecId,
NTableClient::TTableSchemaPtr schema,
bool schemaful,
- const NLogging::TLogger& logger);
+ const NLogging::TLogger& logger,
+ TWireProtocolOptions options = {});
////////////////////////////////////////////////////////////////////////////////
diff --git a/yt/yt/core/actions/cancelable_context.cpp b/yt/yt/core/actions/cancelable_context.cpp
index ba24beb035..9993b14cf4 100644
--- a/yt/yt/core/actions/cancelable_context.cpp
+++ b/yt/yt/core/actions/cancelable_context.cpp
@@ -25,6 +25,7 @@ public:
void Invoke(TClosure callback) override
{
YT_ASSERT(callback);
+
auto guard = NDetail::MakeCancelableContextCurrentTokenGuard(Context_);
if (Context_->Canceled_) {
@@ -50,9 +51,43 @@ public:
}));
}
+ void Invoke(TMutableRange<TClosure> callbacks) override
+ {
+ auto guard = NDetail::MakeCancelableContextCurrentTokenGuard(Context_);
+
+ std::vector<TClosure> capturedCallbacks;
+ capturedCallbacks.reserve(callbacks.size());
+ for (auto& callback : callbacks) {
+ capturedCallbacks.push_back(std::move(callback));
+ }
+
+ if (Context_->Canceled_) {
+ capturedCallbacks.clear();
+ return;
+ }
+
+ return UnderlyingInvoker_->Invoke(BIND_NO_PROPAGATE(
+ [
+ this,
+ this_ = MakeStrong(this),
+ capturedCallbacks = std::move(capturedCallbacks)
+ ] () mutable {
+ auto currentTokenGuard = NDetail::MakeCancelableContextCurrentTokenGuard(Context_);
+
+ if (Context_->Canceled_) {
+ capturedCallbacks.clear();
+ return;
+ }
+
+ TCurrentInvokerGuard guard(this);
+ for (const auto& callback : capturedCallbacks) {
+ callback();
+ }
+ }));
+ }
+
private:
const TCancelableContextPtr Context_;
-
};
////////////////////////////////////////////////////////////////////////////////
diff --git a/yt/yt/core/actions/codicil_guarded_invoker.cpp b/yt/yt/core/actions/codicil_guarded_invoker.cpp
new file mode 100644
index 0000000000..b61e0d6fdf
--- /dev/null
+++ b/yt/yt/core/actions/codicil_guarded_invoker.cpp
@@ -0,0 +1,50 @@
+#include "codicil_guarded_invoker.h"
+
+#include <yt/yt/core/actions/bind.h>
+#include <yt/yt/core/actions/current_invoker.h>
+#include <yt/yt/core/actions/invoker_detail.h>
+
+#include <yt/yt/core/misc/codicil.h>
+
+namespace NYT::NConcurrency {
+
+////////////////////////////////////////////////////////////////////////////////
+
+class TCodicilGuardedInvoker
+ : public TInvokerWrapper<false>
+{
+public:
+ TCodicilGuardedInvoker(IInvokerPtr invoker, std::string codicil)
+ : TInvokerWrapper(std::move(invoker))
+ , Codicil_(std::move(codicil))
+ { }
+
+ using TInvokerWrapper::Invoke;
+
+ void Invoke(TClosure callback) override
+ {
+ UnderlyingInvoker_->Invoke(BIND_NO_PROPAGATE(
+ &TCodicilGuardedInvoker::RunCallback,
+ MakeStrong(this),
+ Passed(std::move(callback))));
+ }
+
+private:
+ const std::string Codicil_;
+
+ void RunCallback(TClosure callback)
+ {
+ auto currentInvokerGuard = TCurrentInvokerGuard(this);
+ auto codicilGuard = TCodicilGuard(MakeNonOwningCodicilBuilder(Codicil_));
+ callback();
+ }
+};
+
+IInvokerPtr CreateCodicilGuardedInvoker(IInvokerPtr underlyingInvoker, std::string codicil)
+{
+ return New<TCodicilGuardedInvoker>(std::move(underlyingInvoker), std::move(codicil));
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT::NConcurrency
diff --git a/yt/yt/core/actions/codicil_guarded_invoker.h b/yt/yt/core/actions/codicil_guarded_invoker.h
new file mode 100644
index 0000000000..d0882b9e9f
--- /dev/null
+++ b/yt/yt/core/actions/codicil_guarded_invoker.h
@@ -0,0 +1,19 @@
+#pragma once
+
+#include "public.h"
+
+#include <yt/yt/core/actions/public.h>
+
+namespace NYT::NConcurrency {
+
+////////////////////////////////////////////////////////////////////////////////
+
+//! Creates an invoker that creates a codicil guard with a given string before each
+//! callback invocation.
+IInvokerPtr CreateCodicilGuardedInvoker(
+ IInvokerPtr underlyingInvoker,
+ std::string codicil);
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT::NConcurrency
diff --git a/yt/yt/core/actions/invoker_detail.cpp b/yt/yt/core/actions/invoker_detail.cpp
index a36a0a2b93..573165bafb 100644
--- a/yt/yt/core/actions/invoker_detail.cpp
+++ b/yt/yt/core/actions/invoker_detail.cpp
@@ -18,15 +18,11 @@ TInvokerWrapper<VirtualizeBase>::TInvokerWrapper(IInvokerPtr underlyingInvoker)
}
template <bool VirtualizeBase>
-void TInvokerWrapper<VirtualizeBase>::Invoke(TClosure callback)
-{
- return UnderlyingInvoker_->Invoke(std::move(callback));
-}
-
-template <bool VirtualizeBase>
void TInvokerWrapper<VirtualizeBase>::Invoke(TMutableRange<TClosure> callbacks)
{
- return UnderlyingInvoker_->Invoke(callbacks);
+ for (auto& callback : callbacks) {
+ static_cast<IInvoker*>(this)->Invoke(std::move(callback));
+ }
}
template <bool VirtualizeBase>
diff --git a/yt/yt/core/actions/invoker_detail.h b/yt/yt/core/actions/invoker_detail.h
index 075e992d9d..5f5538a96f 100644
--- a/yt/yt/core/actions/invoker_detail.h
+++ b/yt/yt/core/actions/invoker_detail.h
@@ -34,7 +34,6 @@ class TInvokerWrapper
: public NDetail::TMaybeVirtualInvokerBase<VirtualizeBase>
{
public:
- void Invoke(TClosure callback) override;
void Invoke(TMutableRange<TClosure> callbacks) override;
NThreading::TThreadId GetThreadId() const override;
diff --git a/yt/yt/core/actions/signal.h b/yt/yt/core/actions/signal.h
index 4a254dab83..b46615c415 100644
--- a/yt/yt/core/actions/signal.h
+++ b/yt/yt/core/actions/signal.h
@@ -213,12 +213,12 @@ public: \
protected: \
::NYT::TCallbackList<TSignature> name##_; \
public: \
- virtual void Subscribe##name(const ::NYT::TCallback<TSignature>& callback) override \
+ void Subscribe##name(const ::NYT::TCallback<TSignature>& callback) override \
{ \
name##_.Subscribe(callback); \
} \
\
- virtual void Unsubscribe##name(const ::NYT::TCallback<TSignature>& callback) override \
+ void Unsubscribe##name(const ::NYT::TCallback<TSignature>& callback) override \
{ \
name##_.Unsubscribe(callback); \
} \
@@ -233,8 +233,8 @@ public: \
::NYT::TCallbackList<TSignature>* Get##name##Signal()
#define DECLARE_SIGNAL_OVERRIDE(TSignature, name) \
- virtual void Subscribe##name(const ::NYT::TCallback<TSignature>& callback) override; \
- virtual void Unsubscribe##name(const ::NYT::TCallback<TSignature>& callback) override
+ void Subscribe##name(const ::NYT::TCallback<TSignature>& callback) override; \
+ void Unsubscribe##name(const ::NYT::TCallback<TSignature>& callback) override
#define DECLARE_INTERFACE_SIGNAL(TSignature, name) \
virtual void Subscribe##name(const ::NYT::TCallback<TSignature>& callback) = 0; \
diff --git a/yt/yt/core/bus/tcp/dispatcher.cpp b/yt/yt/core/bus/tcp/dispatcher.cpp
index ac7ffc05a7..ae3f35c251 100644
--- a/yt/yt/core/bus/tcp/dispatcher.cpp
+++ b/yt/yt/core/bus/tcp/dispatcher.cpp
@@ -43,7 +43,7 @@ bool TTcpDispatcher::IsNetworkingDisabled()
return Impl_->IsNetworkingDisabled();
}
-const TString& TTcpDispatcher::GetNetworkNameForAddress(const NNet::TNetworkAddress& address)
+const std::string& TTcpDispatcher::GetNetworkNameForAddress(const NNet::TNetworkAddress& address)
{
return Impl_->GetNetworkNameForAddress(address);
}
diff --git a/yt/yt/core/bus/tcp/dispatcher.h b/yt/yt/core/bus/tcp/dispatcher.h
index 711771ba16..5efd5024e9 100644
--- a/yt/yt/core/bus/tcp/dispatcher.h
+++ b/yt/yt/core/bus/tcp/dispatcher.h
@@ -56,7 +56,7 @@ public:
bool IsNetworkingDisabled();
//! Returns the network name for a given #address.
- const TString& GetNetworkNameForAddress(const NNet::TNetworkAddress& address);
+ const std::string& GetNetworkNameForAddress(const NNet::TNetworkAddress& address);
//! Returns the TOS level configured for a band.
TTosLevel GetTosLevelForBand(EMultiplexingBand band);
diff --git a/yt/yt/core/bus/tcp/dispatcher_impl.cpp b/yt/yt/core/bus/tcp/dispatcher_impl.cpp
index 95024b9733..26df9ef38c 100644
--- a/yt/yt/core/bus/tcp/dispatcher_impl.cpp
+++ b/yt/yt/core/bus/tcp/dispatcher_impl.cpp
@@ -64,7 +64,7 @@ const TIntrusivePtr<TTcpDispatcher::TImpl>& TTcpDispatcher::TImpl::Get()
return TTcpDispatcher::Get()->Impl_;
}
-const TBusNetworkCountersPtr& TTcpDispatcher::TImpl::GetCounters(const TString& networkName, bool encrypted)
+const TBusNetworkCountersPtr& TTcpDispatcher::TImpl::GetCounters(const std::string& networkName, bool encrypted)
{
auto [statistics, ok] = NetworkStatistics_.FindOrInsert(networkName, [] {
return std::array<TNetworkStatistics, 2>{};
@@ -118,7 +118,7 @@ bool TTcpDispatcher::TImpl::IsNetworkingDisabled()
return NetworkingDisabled_.load();
}
-const TString& TTcpDispatcher::TImpl::GetNetworkNameForAddress(const TNetworkAddress& address)
+const std::string& TTcpDispatcher::TImpl::GetNetworkNameForAddress(const TNetworkAddress& address)
{
if (address.IsUnix()) {
return LocalNetworkName;
diff --git a/yt/yt/core/bus/tcp/dispatcher_impl.h b/yt/yt/core/bus/tcp/dispatcher_impl.h
index 948bb87d10..09ed61d50d 100644
--- a/yt/yt/core/bus/tcp/dispatcher_impl.h
+++ b/yt/yt/core/bus/tcp/dispatcher_impl.h
@@ -33,12 +33,12 @@ class TTcpDispatcher::TImpl
public:
static const TIntrusivePtr<TImpl>& Get();
- const TBusNetworkCountersPtr& GetCounters(const TString& networkName, bool encrypted);
+ const TBusNetworkCountersPtr& GetCounters(const std::string& networkName, bool encrypted);
void DisableNetworking();
bool IsNetworkingDisabled();
- const TString& GetNetworkNameForAddress(const NNet::TNetworkAddress& address);
+ const std::string& GetNetworkNameForAddress(const NNet::TNetworkAddress& address);
TTosLevel GetTosLevelForBand(EMultiplexingBand band);
@@ -87,7 +87,7 @@ private:
const TBusNetworkCountersPtr Counters = New<TBusNetworkCounters>();
};
- NConcurrency::TSyncMap<TString, std::array<TNetworkStatistics, 2>> NetworkStatistics_;
+ NConcurrency::TSyncMap<std::string, std::array<TNetworkStatistics, 2>> NetworkStatistics_;
YT_DECLARE_SPIN_LOCK(NThreading::TSpinLock, PeriodicExecutorsLock_);
NConcurrency::TPeriodicExecutorPtr ProfilingExecutor_;
@@ -96,7 +96,7 @@ private:
std::atomic<bool> NetworkingDisabled_ = false;
YT_DECLARE_SPIN_LOCK(NThreading::TForkAwareReaderWriterSpinLock, NetworksLock_);
- std::vector<std::pair<NNet::TIP6Network, TString>> Networks_;
+ std::vector<std::pair<NNet::TIP6Network, std::string>> Networks_;
struct TBandDescriptor
{
diff --git a/yt/yt/core/bus/tcp/server.cpp b/yt/yt/core/bus/tcp/server.cpp
index 98903ce583..84c21fef5a 100644
--- a/yt/yt/core/bus/tcp/server.cpp
+++ b/yt/yt/core/bus/tcp/server.cpp
@@ -177,7 +177,7 @@ protected:
}
}
- int GetTotalServerConnectionCount(const TString& clientNetwork)
+ int GetTotalServerConnectionCount(const std::string& clientNetwork)
{
const auto& dispatcher = TTcpDispatcher::TImpl::Get();
int result = 0;
diff --git a/yt/yt/core/concurrency/action_queue.cpp b/yt/yt/core/concurrency/action_queue.cpp
index 27e8ee6e0d..73b3021fe0 100644
--- a/yt/yt/core/concurrency/action_queue.cpp
+++ b/yt/yt/core/concurrency/action_queue.cpp
@@ -11,7 +11,6 @@
#include <yt/yt/core/ypath/token.h>
-#include <yt/yt/core/misc/crash_handler.h>
#include <yt/yt/core/misc/ring_queue.h>
#include <yt/yt/core/misc/shutdown.h>
@@ -114,11 +113,16 @@ class TSerializedInvoker
, public TInvokerProfileWrapper
{
public:
- TSerializedInvoker(IInvokerPtr underlyingInvoker, const NProfiling::TTagSet& tagSet, NProfiling::IRegistryImplPtr registry)
+ TSerializedInvoker(
+ IInvokerPtr underlyingInvoker,
+ const NProfiling::TTagSet& tagSet,
+ NProfiling::IRegistryImplPtr registry)
: TInvokerWrapper(std::move(underlyingInvoker))
, TInvokerProfileWrapper(std::move(registry), "/serialized", tagSet)
{ }
+ using TInvokerWrapper::Invoke;
+
void Invoke(TClosure callback) override
{
auto wrappedCallback = WrapCallback(std::move(callback));
@@ -174,7 +178,6 @@ private:
private:
TIntrusivePtr<TSerializedInvoker> Owner_;
bool Activated_ = false;
-
};
void TrySchedule(TGuard<NThreading::TSpinLock>&& guard)
@@ -345,7 +348,12 @@ public:
void Invoke(TClosure callback, i64 /*priority*/) override
{
- return UnderlyingInvoker_->Invoke(std::move(callback));
+ Invoke(std::move(callback));
+ }
+
+ void Invoke(TClosure callback) override
+ {
+ UnderlyingInvoker_->Invoke(std::move(callback));
}
};
@@ -378,7 +386,6 @@ public:
private:
const IPrioritizedInvokerPtr UnderlyingInvoker_;
const i64 Priority_;
-
};
IInvokerPtr CreateFixedPriorityInvoker(
@@ -408,6 +415,8 @@ public:
, MaxConcurrentInvocations_(maxConcurrentInvocations)
{ }
+ using TInvokerWrapper::Invoke;
+
void Invoke(TClosure callback) override
{
auto guard = Guard(SpinLock_);
@@ -706,41 +715,6 @@ ISuspendableInvokerPtr CreateSuspendableInvoker(IInvokerPtr underlyingInvoker)
////////////////////////////////////////////////////////////////////////////////
-class TCodicilGuardedInvoker
- : public TInvokerWrapper<false>
-{
-public:
- TCodicilGuardedInvoker(IInvokerPtr invoker, TString codicil)
- : TInvokerWrapper(std::move(invoker))
- , Codicil_(std::move(codicil))
- { }
-
- void Invoke(TClosure callback) override
- {
- UnderlyingInvoker_->Invoke(BIND_NO_PROPAGATE(
- &TCodicilGuardedInvoker::RunCallback,
- MakeStrong(this),
- Passed(std::move(callback))));
- }
-
-private:
- const TString Codicil_;
-
- void RunCallback(TClosure callback)
- {
- TCurrentInvokerGuard currentInvokerGuard(this);
- TCodicilGuard codicilGuard(Codicil_);
- callback();
- }
-};
-
-IInvokerPtr CreateCodicilGuardedInvoker(IInvokerPtr underlyingInvoker, TString codicil)
-{
- return New<TCodicilGuardedInvoker>(std::move(underlyingInvoker), std::move(codicil));
-}
-
-////////////////////////////////////////////////////////////////////////////////
-
class TWatchdogInvoker
: public TInvokerWrapper<false>
{
@@ -754,6 +728,8 @@ public:
, Threshold_(DurationToCpuDuration(threshold))
{ }
+ using TInvokerWrapper::Invoke;
+
void Invoke(TClosure callback) override
{
UnderlyingInvoker_->Invoke(BIND_NO_PROPAGATE(
@@ -763,8 +739,8 @@ public:
}
private:
- NLogging::TLogger Logger;
- TCpuDuration Threshold_;
+ const NLogging::TLogger Logger;
+ const TCpuDuration Threshold_;
void RunCallback(TClosure callback)
{
diff --git a/yt/yt/core/concurrency/action_queue.h b/yt/yt/core/concurrency/action_queue.h
index 3d2e4d8668..77c92bc490 100644
--- a/yt/yt/core/concurrency/action_queue.h
+++ b/yt/yt/core/concurrency/action_queue.h
@@ -94,14 +94,6 @@ ISuspendableInvokerPtr CreateSuspendableInvoker(IInvokerPtr underlyingInvoker);
////////////////////////////////////////////////////////////////////////////////
-//! Creates an invoker that creates a codicil guard with a given string before each
-//! callback invocation.
-IInvokerPtr CreateCodicilGuardedInvoker(
- IInvokerPtr underlyingInvoker,
- TString codicil);
-
-////////////////////////////////////////////////////////////////////////////////
-
//! Creates an invoker that emits warning into #logger when callback executes
//! longer than #threshold without interruptions.
IInvokerPtr CreateWatchdogInvoker(
diff --git a/yt/yt/core/concurrency/delayed_executor.cpp b/yt/yt/core/concurrency/delayed_executor.cpp
index 169a86280e..d86e8e1197 100644
--- a/yt/yt/core/concurrency/delayed_executor.cpp
+++ b/yt/yt/core/concurrency/delayed_executor.cpp
@@ -334,7 +334,10 @@ private:
// NB: The callbacks are forwarded to the DelayedExecutor thread to prevent any user-code
// from leaking to the Delayed Poller thread, which is, e.g., fiber-unfriendly.
auto runAbort = [&] (const TDelayedExecutorEntryPtr& entry) {
- RunCallback(entry, /*aborted*/ true);
+ if (auto callback = TakeCallback(entry)) {
+ const auto& invoker = entry->Invoker ? entry->Invoker : DelayedInvoker_;
+ invoker->Invoke(BIND_NO_PROPAGATE(TCallbackGuard(std::move(callback), /*aborted*/ true)));
+ }
};
for (const auto& entry : ScheduledEntries_) {
runAbort(entry);
@@ -405,32 +408,33 @@ private:
}
ScheduledCallbacksGauge_.Update(ScheduledEntries_.size());
+ THashMap<IInvokerPtr, std::vector<TClosure>> invokerToCallbacks;
while (!ScheduledEntries_.empty()) {
auto it = ScheduledEntries_.begin();
const auto& entry = *it;
+
if (entry->Deadline > now + CoalescingInterval) {
break;
}
+
if (entry->Deadline + LateWarningThreshold < now) {
StaleCallbacksCounter_.Increment();
YT_LOG_DEBUG("Found a late delayed scheduled callback (Deadline: %v, Now: %v)",
entry->Deadline,
now);
}
- RunCallback(entry, false);
+
+ if (auto callback = TakeCallback(entry)) {
+ auto [it, _] = invokerToCallbacks.emplace(std::move(entry->Invoker), std::vector<TClosure>());
+ it->second.push_back(BIND_NO_PROPAGATE(TCallbackGuard(std::move(callback), /*abort*/ false)));
+ }
+
entry->Iterator.reset();
ScheduledEntries_.erase(it);
}
- }
- void RunCallback(const TDelayedExecutorEntryPtr& entry, bool abort)
- {
- if (auto callback = TakeCallback(entry)) {
- const auto& invoker = entry->Invoker
- ? entry->Invoker
- : DelayedInvoker_;
- invoker
- ->Invoke(BIND_NO_PROPAGATE(TCallbackGuard(std::move(callback), abort)));
+ for (auto& [invoker, callbacks] : invokerToCallbacks) {
+ (invoker ? invoker : DelayedInvoker_)->Invoke(TMutableRange(callbacks));
}
}
};
diff --git a/yt/yt/core/concurrency/fair_share_invoker_pool.cpp b/yt/yt/core/concurrency/fair_share_invoker_pool.cpp
index f30d4e2616..7e5644710a 100644
--- a/yt/yt/core/concurrency/fair_share_invoker_pool.cpp
+++ b/yt/yt/core/concurrency/fair_share_invoker_pool.cpp
@@ -549,6 +549,15 @@ private:
}
}
+ void Invoke(TMutableRange<TClosure> callbacks) override
+ {
+ if (auto strongParent = Parent_.Lock()) {
+ for (auto& callback : callbacks) {
+ strongParent->Enqueue(std::move(callback), Index_);
+ }
+ }
+ }
+
private:
const int Index_;
const TWeakPtr<TFairShareInvokerPool> Parent_;
diff --git a/yt/yt/core/concurrency/unittests/async_stream_ut.cpp b/yt/yt/core/concurrency/unittests/async_stream_ut.cpp
index 48f09107f4..3ada04c164 100644
--- a/yt/yt/core/concurrency/unittests/async_stream_ut.cpp
+++ b/yt/yt/core/concurrency/unittests/async_stream_ut.cpp
@@ -123,7 +123,7 @@ public:
{ }
protected:
- virtual size_t DoRead(void* buf, size_t len) override
+ size_t DoRead(void* buf, size_t len) override
{
return InputStream_->Read(buf, std::min(len, MaxBlockSize_));
}
diff --git a/yt/yt/core/concurrency/unittests/invoker_pool_ut.cpp b/yt/yt/core/concurrency/unittests/invoker_pool_ut.cpp
index 188f08e436..498f18ecc5 100644
--- a/yt/yt/core/concurrency/unittests/invoker_pool_ut.cpp
+++ b/yt/yt/core/concurrency/unittests/invoker_pool_ut.cpp
@@ -57,13 +57,15 @@ public:
, InvocationCount_(0)
{ }
+ using TInvokerWrapper::Invoke;
+
void Invoke(TClosure callback) override
{
++InvocationCount_;
if (Bounded_ ) {
EXPECT_TRUE(Parent_.Lock());
}
- TInvokerWrapper::Invoke(std::move(callback));
+ UnderlyingInvoker_->Invoke(std::move(callback));
}
void Bound(const IMockInvokerPoolPtr& parent)
diff --git a/yt/yt/core/concurrency/unittests/scheduler_ut.cpp b/yt/yt/core/concurrency/unittests/scheduler_ut.cpp
index 8b4c5978a9..15320fae90 100644
--- a/yt/yt/core/concurrency/unittests/scheduler_ut.cpp
+++ b/yt/yt/core/concurrency/unittests/scheduler_ut.cpp
@@ -1075,6 +1075,8 @@ public:
: TInvokerWrapper(std::move(underlyingInvoker))
{ }
+ using TInvokerWrapper::Invoke;
+
void Invoke(TClosure callback) override
{
UnderlyingInvoker_->Invoke(BIND(
diff --git a/yt/yt/core/http/retrying_client.cpp b/yt/yt/core/http/retrying_client.cpp
index f386b33bec..d441f1b7b3 100644
--- a/yt/yt/core/http/retrying_client.cpp
+++ b/yt/yt/core/http/retrying_client.cpp
@@ -27,13 +27,13 @@ public:
: RetryChecker_(retryChecker ? std::move(retryChecker) : BIND(&DefaultRetryChecker))
{ }
- virtual bool IsRetriableError(const TError& error) override
+ bool IsRetriableError(const TError& error) override
{
return RetryChecker_(error);
}
- virtual TError CheckError(const IResponsePtr& response) override = 0;
- virtual NYTree::INodePtr GetFormattedResponse() const override = 0;
+ TError CheckError(const IResponsePtr& response) override = 0;
+ NYTree::INodePtr GetFormattedResponse() const override = 0;
protected:
static bool DefaultRetryChecker(const TError& /*error*/)
diff --git a/yt/yt/core/json/json_writer.cpp b/yt/yt/core/json/json_writer.cpp
index 6a74bed6c2..267fc0dea7 100644
--- a/yt/yt/core/json/json_writer.cpp
+++ b/yt/yt/core/json/json_writer.cpp
@@ -20,7 +20,7 @@ class TJsonWriter
{
public:
TJsonWriter(IOutputStream* output, bool isPretty);
- virtual ~TJsonWriter() override;
+ ~TJsonWriter() override;
void Flush() override;
void OnStringScalar(TStringBuf value) override;
diff --git a/yt/yt/core/misc/bit_packed_unsigned_vector-inl.h b/yt/yt/core/misc/bit_packed_unsigned_vector-inl.h
index d1abbbbd0a..3dbb3ce3e5 100644
--- a/yt/yt/core/misc/bit_packed_unsigned_vector-inl.h
+++ b/yt/yt/core/misc/bit_packed_unsigned_vector-inl.h
@@ -25,7 +25,7 @@ inline size_t CompressedUnsignedVectorSizeInWords(ui64 maxValue, size_t count)
inline size_t CompressedUnsignedVectorSizeInBytes(ui64 maxValue, size_t count)
{
- static size_t wordSize = sizeof(ui64);
+ static constexpr size_t wordSize = sizeof(ui64);
return CompressedUnsignedVectorSizeInWords(maxValue, count) * wordSize;
}
diff --git a/yt/yt/core/misc/bit_packed_unsigned_vector.cpp b/yt/yt/core/misc/bit_packed_unsigned_vector.cpp
index 05ff7ed88f..bec799a9f1 100644
--- a/yt/yt/core/misc/bit_packed_unsigned_vector.cpp
+++ b/yt/yt/core/misc/bit_packed_unsigned_vector.cpp
@@ -8,24 +8,26 @@ namespace NYT {
////////////////////////////////////////////////////////////////////////////////
-void PrepareDiffFromExpected(std::vector<ui32>* values, ui32* expected, ui32* maxDiff)
+std::pair<ui32, ui32> PrepareDiffFromExpected(std::vector<ui32>* values)
{
+ ui32 expected = 0;
+ ui32 maxDiff = 0;
+
if (values->empty()) {
- *expected = 0;
- *maxDiff = 0;
- return;
+ return {expected, maxDiff};
}
- *expected = DivRound<int>(values->back(), values->size());
+ expected = DivRound<int>(values->back(), values->size());
- *maxDiff = 0;
i64 expectedValue = 0;
for (int i = 0; i < std::ssize(*values); ++i) {
- expectedValue += *expected;
- i32 diff = values->at(i) - expectedValue;
+ expectedValue += expected;
+ i32 diff = (*values)[i] - expectedValue;
(*values)[i] = ZigZagEncode32(diff);
- *maxDiff = std::max(*maxDiff, (*values)[i]);
+ maxDiff = std::max(maxDiff, (*values)[i]);
}
+
+ return {expected, maxDiff};
}
////////////////////////////////////////////////////////////////////////////////
diff --git a/yt/yt/core/misc/bit_packed_unsigned_vector.h b/yt/yt/core/misc/bit_packed_unsigned_vector.h
index 5a12aaba56..4f95561267 100644
--- a/yt/yt/core/misc/bit_packed_unsigned_vector.h
+++ b/yt/yt/core/misc/bit_packed_unsigned_vector.h
@@ -59,7 +59,7 @@ private:
////////////////////////////////////////////////////////////////////////////////
-void PrepareDiffFromExpected(std::vector<ui32>* values, ui32* expected, ui32* maxDiff);
+std::pair<ui32, ui32> PrepareDiffFromExpected(std::vector<ui32>* values);
////////////////////////////////////////////////////////////////////////////////
diff --git a/yt/yt/core/misc/codicil.cpp b/yt/yt/core/misc/codicil.cpp
new file mode 100644
index 0000000000..6a43d9465c
--- /dev/null
+++ b/yt/yt/core/misc/codicil.cpp
@@ -0,0 +1,112 @@
+#include "codicil.h"
+
+#include <yt/yt/core/concurrency/fls.h>
+
+#include <library/cpp/yt/small_containers/compact_vector.h>
+
+namespace NYT {
+
+////////////////////////////////////////////////////////////////////////////////
+
+namespace NDetail {
+
+using TCodicilBuilderStack = TCompactVector<TCodicilBuilder, 16>;
+
+NConcurrency::TFlsSlot<TCodicilBuilderStack>& CodicilBuilderStackSlot()
+{
+ static NConcurrency::TFlsSlot<TCodicilBuilderStack> result;
+ return result;
+}
+
+} // namespace NDetail
+
+void PushCodicilBuilder(TCodicilBuilder&& builder)
+{
+ NDetail::CodicilBuilderStackSlot()->push_back(std::move(builder));
+}
+
+void PopCodicilBuilder()
+{
+ auto& stack = *NDetail::CodicilBuilderStackSlot();
+ YT_ASSERT(!stack.empty());
+ stack.pop_back();
+}
+
+TRange<TCodicilBuilder> GetCodicilBuilders()
+{
+ // NB: Don't forcefully construct FLS slot to avoid allocations;
+ // these may lead to deadlocks if the program crashes during an allocation itself.
+ if (!NDetail::CodicilBuilderStackSlot().IsInitialized()) {
+ return {};
+ }
+ return TRange(*NDetail::CodicilBuilderStackSlot());
+}
+
+std::vector<std::string> BuildCodicils()
+{
+ auto builders = GetCodicilBuilders();
+ std::vector<std::string> result;
+ result.reserve(builders.size());
+ TCodicilFormatter formatter;
+ for (const auto& builder : builders) {
+ formatter.Reset();
+ builder(&formatter);
+ result.emplace_back(formatter.GetBuffer());
+ }
+ return result;
+}
+
+TCodicilBuilder MakeNonOwningCodicilBuilder(TStringBuf codicil)
+{
+ return [codicil] (TCodicilFormatter* formatter) {
+ formatter->AppendString(codicil);
+ };
+}
+
+TCodicilBuilder MakeOwningCodicilBuilder(std::string codicil)
+{
+ return [codicil = std::move(codicil)] (TCodicilFormatter* formatter) {
+ formatter->AppendString(codicil);
+ };
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+TCodicilGuard::TCodicilGuard(TCodicilBuilder&& builder)
+ : Active_(true)
+{
+ PushCodicilBuilder(std::move(builder));
+}
+
+TCodicilGuard::~TCodicilGuard()
+{
+ Release();
+}
+
+TCodicilGuard::TCodicilGuard(TCodicilGuard&& other)
+ : Active_(other.Active_)
+{
+ other.Active_ = false;
+}
+
+TCodicilGuard& TCodicilGuard::operator=(TCodicilGuard&& other)
+{
+ if (this != &other) {
+ Release();
+ Active_ = other.Active_;
+ other.Active_ = false;
+ }
+ return *this;
+}
+
+void TCodicilGuard::Release()
+{
+ if (Active_) {
+ PopCodicilBuilder();
+ Active_ = false;
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT
diff --git a/yt/yt/core/misc/codicil.h b/yt/yt/core/misc/codicil.h
new file mode 100644
index 0000000000..4b2ed64636
--- /dev/null
+++ b/yt/yt/core/misc/codicil.h
@@ -0,0 +1,73 @@
+#pragma once
+
+#include <library/cpp/yt/string/raw_formatter.h>
+
+#include <library/cpp/yt/memory/range.h>
+
+namespace NYT {
+
+////////////////////////////////////////////////////////////////////////////////
+
+/*
+ * "Codicils" are short human- and machine-readable strings organized
+ * into a per-fiber stack.
+ *
+ * When the crash handler is invoked, it dumps (alongside with the other
+ * useful stuff like backtrace) the codicils.
+ *
+ * For performance reasons, codicils are constructed lazily when needed.
+ */
+
+constexpr auto MaxCodicilLength = 256;
+
+//! A formatter used to build codicils.
+using TCodicilFormatter = TRawFormatter<MaxCodicilLength>;
+
+//! A callback that constructs a codicil.
+using TCodicilBuilder = std::function<void(TCodicilFormatter* formatter)>;
+
+//! Installs a new codicil builder onto the stack.
+void PushCodicilBuilder(TCodicilBuilder&& builder);
+
+//! Removes the top codicil builder from the stack.
+void PopCodicilBuilder();
+
+//! Returns the list of all currently installed codicil builders.
+TRange<TCodicilBuilder> GetCodicilBuilders();
+
+//! Invokes all registered codicil builders to construct codicils.
+//! Not signal-safe; must only be used for testing purposes.
+std::vector<std::string> BuildCodicils();
+
+//! Creates a codicil builder holding a given string view.
+TCodicilBuilder MakeNonOwningCodicilBuilder(TStringBuf codicil);
+
+//! Creates a codicil builder holding a given string.
+TCodicilBuilder MakeOwningCodicilBuilder(std::string codicil);
+
+////////////////////////////////////////////////////////////////////////////////
+
+//! Invokes #PushCodicilBuilder in ctor and #PopCodicilBuilder in dtor.
+class TCodicilGuard
+{
+public:
+ TCodicilGuard() = default;
+ TCodicilGuard(TCodicilBuilder&& builder);
+
+ ~TCodicilGuard();
+
+ TCodicilGuard(const TCodicilGuard& othter) = delete;
+ TCodicilGuard(TCodicilGuard&& other);
+
+ TCodicilGuard& operator=(const TCodicilGuard& other) = delete;
+ TCodicilGuard& operator=(TCodicilGuard&& other);
+
+private:
+ bool Active_ = false;
+
+ void Release();
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT
diff --git a/yt/yt/core/misc/crash_handler.cpp b/yt/yt/core/misc/crash_handler.cpp
index 5bc29f0591..4f7eae4440 100644
--- a/yt/yt/core/misc/crash_handler.cpp
+++ b/yt/yt/core/misc/crash_handler.cpp
@@ -1,8 +1,8 @@
#include "crash_handler.h"
-#include "signal_registry.h"
#include <yt/yt/core/logging/log_manager.h>
+#include <yt/yt/core/misc/codicil.h>
#include <yt/yt/core/misc/proc.h>
#include <yt/yt/core/concurrency/fls.h>
@@ -136,26 +136,21 @@ void DumpTimeInfo()
WriteToStderr(formatter);
}
-using TCodicilStack = std::vector<TString>;
-
-NConcurrency::TFlsSlot<TCodicilStack>& CodicilStackSlot()
-{
- static NConcurrency::TFlsSlot<TCodicilStack> Slot;
- return Slot;
-}
-
-//! Dump codicils.
+//! Dumps codicils.
void DumpCodicils()
{
- // NB: Avoid constructing FLS slot to avoid allocations; these may lead to deadlocks if the
- // program crashes during an allocation itself.
- if (CodicilStackSlot().IsInitialized() && !CodicilStackSlot()->empty()) {
+ auto builders = GetCodicilBuilders();
+ if (!builders.empty()) {
WriteToStderr("*** Begin codicils\n");
- for (const auto& data : *CodicilStackSlot()) {
- TFormatter formatter;
- formatter.AppendString(data.c_str());
- formatter.AppendString("\n");
+ TCodicilFormatter formatter;
+ for (const auto& builder : builders) {
+ formatter.Reset();
+ builder(&formatter);
WriteToStderr(formatter);
+ if (formatter.GetBytesRemaining() == 0) {
+ WriteToStderr(" (truncated)");
+ }
+ WriteToStderr("\n");
}
WriteToStderr("*** End codicils\n");
}
@@ -555,71 +550,4 @@ void CrashSignalHandler(int /*signal*/)
////////////////////////////////////////////////////////////////////////////////
-void PushCodicil(const TString& data)
-{
-#ifdef _unix_
- NDetail::CodicilStackSlot()->push_back(data);
-#else
- Y_UNUSED(data);
-#endif
-}
-
-void PopCodicil()
-{
-#ifdef _unix_
- YT_VERIFY(!NDetail::CodicilStackSlot()->empty());
- NDetail::CodicilStackSlot()->pop_back();
-#endif
-}
-
-std::vector<TString> GetCodicils()
-{
-#ifdef _unix_
- return *NDetail::CodicilStackSlot();
-#else
- return {};
-#endif
-}
-
-TCodicilGuard::TCodicilGuard()
- : Active_(false)
-{ }
-
-TCodicilGuard::TCodicilGuard(const TString& data)
- : Active_(true)
-{
- PushCodicil(data);
-}
-
-TCodicilGuard::~TCodicilGuard()
-{
- Release();
-}
-
-TCodicilGuard::TCodicilGuard(TCodicilGuard&& other)
- : Active_(other.Active_)
-{
- other.Active_ = false;
-}
-
-TCodicilGuard& TCodicilGuard::operator=(TCodicilGuard&& other)
-{
- if (this != &other) {
- Release();
- Active_ = other.Active_;
- other.Active_ = false;
- }
- return *this;
-}
-
-void TCodicilGuard::Release()
-{
- if (Active_) {
- PopCodicil();
- Active_ = false;
- }
-}
-
-////////////////////////////////////////////////////////////////////////////////
-
} // namespace NYT
diff --git a/yt/yt/core/misc/crash_handler.h b/yt/yt/core/misc/crash_handler.h
index 94f903bd30..6ae6abdc97 100644
--- a/yt/yt/core/misc/crash_handler.h
+++ b/yt/yt/core/misc/crash_handler.h
@@ -1,6 +1,6 @@
#pragma once
-#include <util/generic/string.h>
+#include <util/generic/strbuf.h>
#ifdef _unix_
#include <signal.h>
@@ -29,41 +29,6 @@ void DumpStackTrace(TCallback flushCallback, void* startPC = nullptr);
////////////////////////////////////////////////////////////////////////////////
-// "Codicils" are short human- and machine-readable strings organized into a per-fiber stack.
-// When the crash handler is invoked, it dumps (alongside with the other
-// useful stuff like backtrace) the content of the latter stack.
-
-//! Installs a new codicil into the stack.
-void PushCodicil(const TString& data);
-
-//! Removes the top codicils from the stack.
-void PopCodicil();
-
-//! Returns the list of the currently installed codicils.
-std::vector<TString> GetCodicils();
-
-//! Invokes #PushCodicil in ctor and #PopCodicil in dtor.
-class TCodicilGuard
-{
-public:
- TCodicilGuard();
- explicit TCodicilGuard(const TString& data);
- ~TCodicilGuard();
-
- TCodicilGuard(const TCodicilGuard& other) = delete;
- TCodicilGuard(TCodicilGuard&& other);
-
- TCodicilGuard& operator=(const TCodicilGuard& other) = delete;
- TCodicilGuard& operator=(TCodicilGuard&& other);
-
-private:
- bool Active_;
-
- void Release();
-};
-
-////////////////////////////////////////////////////////////////////////////////
-
} // namespace NYT
#define CRASH_HANDLER_INL_H_
diff --git a/yt/yt/core/misc/protobuf_helpers-inl.h b/yt/yt/core/misc/protobuf_helpers-inl.h
index d603aa3fce..39cf53d165 100644
--- a/yt/yt/core/misc/protobuf_helpers-inl.h
+++ b/yt/yt/core/misc/protobuf_helpers-inl.h
@@ -10,6 +10,8 @@
#include <library/cpp/yt/assert/assert.h>
+#include <library/cpp/yt/misc/cast.h>
+
namespace NYT {
////////////////////////////////////////////////////////////////////////////////
@@ -203,7 +205,7 @@ template <class T>
requires TEnumTraits<T>::IsEnum && (!TEnumTraits<T>::IsBitEnum)
void FromProto(T* original, int serialized)
{
- *original = static_cast<T>(serialized);
+ *original = CheckedEnumCast<T>(serialized);
}
template <class T>
@@ -217,7 +219,7 @@ template <class T>
requires TEnumTraits<T>::IsBitEnum
void FromProto(T* original, ui64 serialized)
{
- *original = static_cast<T>(serialized);
+ *original = CheckedEnumCast<T>(serialized);
}
////////////////////////////////////////////////////////////////////////////////
diff --git a/yt/yt/core/misc/unittests/codicil_ut.cpp b/yt/yt/core/misc/unittests/codicil_ut.cpp
index 94d81af601..cd154c8fa6 100644
--- a/yt/yt/core/misc/unittests/codicil_ut.cpp
+++ b/yt/yt/core/misc/unittests/codicil_ut.cpp
@@ -1,11 +1,12 @@
#include <yt/yt/core/test_framework/framework.h>
-#include <yt/yt/core/misc/crash_handler.h>
+#include <yt/yt/core/misc/codicil.h>
#include <yt/yt/core/concurrency/action_queue.h>
#include <yt/yt/core/concurrency/delayed_executor.h>
#include <yt/yt/core/actions/bind.h>
+#include <yt/yt/core/actions/codicil_guarded_invoker.h>
#include <yt/yt/core/actions/future.h>
namespace NYT {
@@ -17,22 +18,31 @@ using namespace NConcurrency;
TEST(TCodicilTest, Simple)
{
- const auto codicil1 = TString("codicil1");
- const auto codicil2 = TString("codicil2");
- TCodicilGuard guard1(codicil1);
- TCodicilGuard guard2(codicil2);
- EXPECT_EQ(GetCodicils(), (std::vector{codicil1, codicil2}));
+ const auto codicil1 = std::string("codicil1");
+ const auto codicil2 = std::string("codicil2");
+ auto guard1 = TCodicilGuard(MakeOwningCodicilBuilder(codicil1));
+ auto guard2 = TCodicilGuard(MakeOwningCodicilBuilder(codicil2));
+ EXPECT_EQ(BuildCodicils(), (std::vector{codicil1, codicil2}));
+}
+
+TEST(TCodicilTest, MaxLength)
+{
+ const auto codicil = std::string(MaxCodicilLength + 1, 'x');
+ auto guard = TCodicilGuard(MakeOwningCodicilBuilder(codicil));
+ auto codicils = BuildCodicils();
+ EXPECT_EQ(std::ssize(codicils), 1);
+ EXPECT_EQ(codicils[0], codicil.substr(0, MaxCodicilLength));
}
TEST(TCodicilTest, CodicilGuardedInvoker)
{
- const auto codicil = TString("codicil");
+ const auto codicil = std::string("codicil");
auto actionQueue = New<TActionQueue>("ActionQueue");
auto invoker = CreateCodicilGuardedInvoker(actionQueue->GetInvoker(), codicil);
BIND([&] {
- EXPECT_EQ(GetCodicils(), (std::vector{codicil}));
+ EXPECT_EQ(BuildCodicils(), (std::vector{codicil}));
TDelayedExecutor::WaitForDuration(TDuration::MilliSeconds(100));
- EXPECT_EQ(GetCodicils(), (std::vector{codicil}));
+ EXPECT_EQ(BuildCodicils(), (std::vector{codicil}));
})
.AsyncVia(invoker)
.Run()
diff --git a/yt/yt/core/phoenix/type_def-inl.h b/yt/yt/core/phoenix/type_def-inl.h
index 34112eaad5..ff39a471b1 100644
--- a/yt/yt/core/phoenix/type_def-inl.h
+++ b/yt/yt/core/phoenix/type_def-inl.h
@@ -1055,6 +1055,12 @@ struct TSerializer
}
template <class T, class C>
+ static void Save(C& context, const TWeakPtr<T>& ptr)
+ {
+ SaveImpl(context, ptr.Lock().Get());
+ }
+
+ template <class T, class C>
static void SaveImpl(C& context, T* ptr)
{
using NYT::Save;
@@ -1127,6 +1133,14 @@ struct TSerializer
LoadImpl</*Inplace*/ true>(context, rawPtr);
}
+ template <class T, class C>
+ static void Load(C& context, TWeakPtr<T>& ptr)
+ {
+ T* rawPtr = nullptr;
+ LoadImpl</*Inplace*/ false>(context, rawPtr);
+ ptr.Reset(rawPtr);
+ }
+
template <bool Inplace, class T, class C>
static void LoadImpl(C& context, T*& rawPtr)
{
@@ -1188,7 +1202,8 @@ template <class T, class C>
requires (std::derived_from<C, NPhoenix2::NDetail::TContextBase>) && (
std::same_as<T, TIntrusivePtr<typename T::TUnderlying>> ||
std::same_as<T, std::unique_ptr<typename T::element_type>> ||
- std::is_pointer_v<T>)
+ std::is_pointer_v<T> ||
+ std::same_as<T, TWeakPtr<typename T::TUnderlying>>)
struct TSerializerTraits<T, C>
{
using TSerializer = NPhoenix2::NDetail::TSerializer;
diff --git a/yt/yt/core/phoenix/unittests/phoenix_ut.cpp b/yt/yt/core/phoenix/unittests/phoenix_ut.cpp
index 950f22eb64..ac4ad2163d 100644
--- a/yt/yt/core/phoenix/unittests/phoenix_ut.cpp
+++ b/yt/yt/core/phoenix/unittests/phoenix_ut.cpp
@@ -1050,6 +1050,98 @@ TEST(TPhoenixTest, RawPtrCycle2)
////////////////////////////////////////////////////////////////////////////////
+namespace NWeakPtrCycle {
+
+struct A
+ : public TRefCounted
+{
+ TWeakPtr<A> X;
+
+ PHOENIX_DECLARE_TYPE(A, 0x5e1325ef);
+};
+
+void A::RegisterMetadata(auto&& registrar)
+{
+ PHOENIX_REGISTER_FIELD(1, X)();
+}
+
+PHOENIX_DEFINE_TYPE(A);
+
+struct B
+ : public TRefCounted
+{
+ TIntrusivePtr<A> L;
+ TIntrusivePtr<A> R;
+
+ PHOENIX_DECLARE_TYPE(B, 0x7ccd0099);
+};
+
+void B::RegisterMetadata(auto&& registrar)
+{
+ PHOENIX_REGISTER_FIELD(1, L)();
+ PHOENIX_REGISTER_FIELD(2, R)();
+}
+
+PHOENIX_DEFINE_TYPE(B);
+
+} // namespace NWeakPtrCycle
+
+TEST(TPhoenixTest, WeakPtrCycle1)
+{
+ using namespace NWeakPtrCycle;
+
+ auto a1 = New<A>();
+ EXPECT_EQ(a1->GetWeakRefCount(), 1);
+
+ a1->X = a1;
+
+ auto a2 = Deserialize<TIntrusivePtr<A>>(Serialize(a1));
+ EXPECT_EQ(a2->GetRefCount(), 1);
+ EXPECT_EQ(a2->GetWeakRefCount(), 2);
+ EXPECT_TRUE(a2->X.Lock());
+ EXPECT_EQ(a2->X.Lock(), a2);
+}
+
+TEST(TPhoenixTest, WeakPtrCycle2)
+{
+ using namespace NWeakPtrCycle;
+
+ auto a1 = New<A>();
+ auto a2 = New<A>();
+ a1->X = a2;
+ a2->X = a1;
+ a2 = nullptr;
+
+ auto a3 = Deserialize<TIntrusivePtr<A>>(Serialize(a1));
+ EXPECT_EQ(a3->GetRefCount(), 1);
+ EXPECT_EQ(a3->GetWeakRefCount(), 1);
+ EXPECT_TRUE(!a3->X.Lock());
+
+ auto a4 = Deserialize<TWeakPtr<A>>(Serialize(a1->X));
+ EXPECT_TRUE(!a4.Lock());
+}
+
+TEST(TPhoenixTest, WeakPtrCycle3)
+{
+ using namespace NWeakPtrCycle;
+
+ auto b1 = New<B>();
+ b1->L = New<A>();
+ b1->R = New<A>();
+ b1->L->X = b1->R;
+ b1->R->X = b1->L;
+
+ auto b2 = Deserialize<TIntrusivePtr<B>>(Serialize(b1));
+ EXPECT_EQ(b2->L->GetRefCount(), 1);
+ EXPECT_EQ(b2->L->GetWeakRefCount(), 2);
+ EXPECT_EQ(b2->R->GetRefCount(), 1);
+ EXPECT_EQ(b2->R->GetWeakRefCount(), 2);
+ EXPECT_EQ(b2->L->X.Lock(), b2->R);
+ EXPECT_EQ(b2->R->X.Lock(), b2->L);
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
namespace NIntrusiveAndRawPtr {
struct A;
@@ -1107,6 +1199,104 @@ TEST(TPhoenixTest, IntrusiveAndRawPtr)
////////////////////////////////////////////////////////////////////////////////
+namespace NIntrusiveAndWeakPtr {
+
+struct A;
+struct B;
+
+struct A
+ : public TRefCounted
+{
+ B* X = nullptr;
+ TIntrusivePtr<B> Y;
+
+ PHOENIX_DECLARE_TYPE(A, 0xab7d77a9);
+};
+
+void A::RegisterMetadata(auto&& registrar)
+{
+ PHOENIX_REGISTER_FIELD(1, X)();
+ PHOENIX_REGISTER_FIELD(2, Y)();
+}
+
+PHOENIX_DEFINE_TYPE(A);
+
+struct B
+ : public TRefCounted
+{
+ int V = -1;
+ TWeakPtr<A> W;
+
+ PHOENIX_DECLARE_TYPE(B, 0xea924741);
+};
+
+void B::RegisterMetadata(auto&& registrar)
+{
+ registrar.template Field<1, &B::V>("v")();
+ PHOENIX_REGISTER_FIELD(2, W)();
+}
+
+PHOENIX_DEFINE_TYPE(B);
+
+struct C
+ : public TRefCounted
+{
+ TIntrusivePtr<A> APtr;
+ TIntrusivePtr<B> BPtr;
+ TWeakPtr<B> BWeakPtr;
+
+ PHOENIX_DECLARE_TYPE(C, 0xea038112);
+};
+
+void C::RegisterMetadata(auto&& registrar)
+{
+ PHOENIX_REGISTER_FIELD(1, APtr)();
+ PHOENIX_REGISTER_FIELD(2, BPtr)();
+ PHOENIX_REGISTER_FIELD(3, BWeakPtr)();
+}
+
+PHOENIX_DEFINE_TYPE(C);
+
+} // namespace NIntrusiveAndWeakPtr
+
+TEST(TPhoenixTest, IntrusiveAndWeakPtr)
+{
+ using namespace NIntrusiveAndWeakPtr;
+
+ auto c1 = New<C>();
+ c1->APtr = New<A>();
+ c1->BPtr = New<B>();
+ c1->BWeakPtr = c1->BPtr;
+ c1->BPtr->W = c1->APtr;
+ c1->APtr->Y = c1->BPtr;
+ EXPECT_EQ(c1->BPtr->GetRefCount(), 2);
+ EXPECT_EQ(c1->APtr->Y->GetRefCount(), 2);
+
+ c1->APtr->Y->V = 7;
+ c1->APtr->X = c1->APtr->Y.Get();
+
+ auto c2 = Deserialize<TIntrusivePtr<C>>(Serialize(c1));
+ EXPECT_EQ(c2->APtr->GetRefCount(), 1);
+ EXPECT_EQ(c2->APtr->GetWeakRefCount(), c1->APtr->GetWeakRefCount());
+ EXPECT_EQ(c2->APtr->Y->GetRefCount(), 2);
+ EXPECT_EQ(c2->APtr->Y->GetWeakRefCount(), c1->APtr->Y->GetWeakRefCount());
+ EXPECT_EQ(c2->APtr->Y->W.Lock(), c2->APtr);
+ EXPECT_EQ(c2->APtr->Y->V, 7);
+ EXPECT_EQ(c2->APtr->X, c2->APtr->Y);
+
+ EXPECT_EQ(c2->BPtr->GetRefCount(), 2);
+ EXPECT_EQ(c2->BPtr->GetWeakRefCount(), c1->BPtr->GetWeakRefCount());
+ EXPECT_EQ(c2->BPtr->W.Lock(), c2->APtr);
+ EXPECT_EQ(c2->BPtr->W.Lock()->Y, c2->BPtr);
+ EXPECT_EQ(c2->BPtr->V, 7);
+
+ EXPECT_EQ(c2->BWeakPtr.Lock()->GetWeakRefCount(), c1->BPtr->GetWeakRefCount());
+ EXPECT_EQ(c2->BWeakPtr.Lock()->GetRefCount(), 3);
+ EXPECT_EQ(c2->BWeakPtr.Lock(), c2->BPtr);
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
namespace NPersistentPolymorphic {
struct TBase
diff --git a/yt/yt/core/rpc/client.cpp b/yt/yt/core/rpc/client.cpp
index 935a9af200..35b7ee60f7 100644
--- a/yt/yt/core/rpc/client.cpp
+++ b/yt/yt/core/rpc/client.cpp
@@ -609,7 +609,7 @@ void TClientResponse::Deserialize(TSharedRefArray responseMessage)
std::optional<NCompression::ECodec> bodyCodecId;
NCompression::ECodec attachmentCodecId;
if (Header_.has_codec()) {
- bodyCodecId = attachmentCodecId = CheckedEnumCast<NCompression::ECodec>(Header_.codec());
+ bodyCodecId = attachmentCodecId = FromProto<NCompression::ECodec>(Header_.codec());
} else {
bodyCodecId = std::nullopt;
attachmentCodecId = NCompression::ECodec::None;
diff --git a/yt/yt/core/rpc/grpc/channel.cpp b/yt/yt/core/rpc/grpc/channel.cpp
index 81a17d4d01..b6ea1fd384 100644
--- a/yt/yt/core/rpc/grpc/channel.cpp
+++ b/yt/yt/core/rpc/grpc/channel.cpp
@@ -126,7 +126,7 @@ DEFINE_ENUM(EClientCallStage,
);
class TChannel
- : public IChannel
+ : public NYT::NRpc::NGrpc::IGrpcChannel
{
public:
explicit TChannel(TChannelConfigPtr config)
@@ -221,6 +221,11 @@ public:
return MemoryUsageTracker_;
}
+ grpc_connectivity_state CheckConnectivityState(bool tryToConnect) override
+ {
+ return grpc_channel_check_connectivity_state(Channel_.Unwrap(), tryToConnect);
+ }
+
private:
const TChannelConfigPtr Config_;
const TString EndpointAddress_;
@@ -736,7 +741,7 @@ public:
} // namespace
-IChannelPtr CreateGrpcChannel(TChannelConfigPtr config)
+IGrpcChannelPtr CreateGrpcChannel(TChannelConfigPtr config)
{
return New<TChannel>(std::move(config));
}
diff --git a/yt/yt/core/rpc/grpc/channel.h b/yt/yt/core/rpc/grpc/channel.h
index 1a11b35254..9e5c08f655 100644
--- a/yt/yt/core/rpc/grpc/channel.h
+++ b/yt/yt/core/rpc/grpc/channel.h
@@ -3,13 +3,27 @@
#include "public.h"
#include <yt/yt/core/rpc/public.h>
+#include <yt/yt/core/rpc/channel.h>
+
+#include <contrib/libs/grpc/include/grpc/impl/connectivity_state.h>
namespace NYT::NRpc::NGrpc {
////////////////////////////////////////////////////////////////////////////////
+struct IGrpcChannel
+ : public NRpc::IChannel
+{
+public:
+ virtual grpc_connectivity_state CheckConnectivityState(bool tryToConnect) = 0;
+};
+
+DEFINE_REFCOUNTED_TYPE(IGrpcChannel)
+
+////////////////////////////////////////////////////////////////////////////////
+
//! Creates a channel implemented via GRPC.
-NRpc::IChannelPtr CreateGrpcChannel(TChannelConfigPtr config);
+IGrpcChannelPtr CreateGrpcChannel(TChannelConfigPtr config);
//! Returns the factory for creating GRPC channels.
NRpc::IChannelFactoryPtr GetGrpcChannelFactory();
diff --git a/yt/yt/core/rpc/grpc/helpers.cpp b/yt/yt/core/rpc/grpc/helpers.cpp
index bfe01bf540..42b7819013 100644
--- a/yt/yt/core/rpc/grpc/helpers.cpp
+++ b/yt/yt/core/rpc/grpc/helpers.cpp
@@ -413,7 +413,7 @@ TSharedRef ExtractMessageFromEnvelopedMessage(const TSharedRef& data)
auto compressedMessage = data.Slice(sourceMessage, sourceMessage + fixedHeader->MessageSize);
- auto codecId = CheckedEnumCast<NCompression::ECodec>(envelope.codec());
+ auto codecId = FromProto<NCompression::ECodec>(envelope.codec());
auto* codec = NCompression::GetCodec(codecId);
return codec->Decompress(compressedMessage);
}
diff --git a/yt/yt/core/rpc/grpc/helpers.h b/yt/yt/core/rpc/grpc/helpers.h
index b6b775c483..a9cff4f292 100644
--- a/yt/yt/core/rpc/grpc/helpers.h
+++ b/yt/yt/core/rpc/grpc/helpers.h
@@ -244,7 +244,7 @@ private:
size_t AvailableBytes_ = 0;
size_t RemainingBytes_;
- virtual size_t DoRead(void* buf, size_t len) override;
+ size_t DoRead(void* buf, size_t len) override;
bool ReadNextSlice();
};
diff --git a/yt/yt/core/rpc/grpc/public.h b/yt/yt/core/rpc/grpc/public.h
index a34e2e768a..7cb846bf33 100644
--- a/yt/yt/core/rpc/grpc/public.h
+++ b/yt/yt/core/rpc/grpc/public.h
@@ -15,6 +15,8 @@ DECLARE_REFCOUNTED_CLASS(TChannelCredentialsConfig)
DECLARE_REFCOUNTED_CLASS(TChannelConfigTemplate)
DECLARE_REFCOUNTED_CLASS(TChannelConfig)
+DECLARE_REFCOUNTED_STRUCT(IGrpcChannel)
+
////////////////////////////////////////////////////////////////////////////////
extern const char* const TracingTraceIdMetadataKey;
diff --git a/yt/yt/core/rpc/http/channel.cpp b/yt/yt/core/rpc/http/channel.cpp
index 1f2d6c1bff..c4b5200c16 100644
--- a/yt/yt/core/rpc/http/channel.cpp
+++ b/yt/yt/core/rpc/http/channel.cpp
@@ -272,7 +272,7 @@ private:
THeadersPtr httpHeaders = New<THeaders>();
if (rpcHeader.has_request_format()) {
- auto format = CheckedEnumCast<EMessageFormat>(rpcHeader.request_format());
+ auto format = FromProto<EMessageFormat>(rpcHeader.request_format());
httpHeaders->Add(ContentTypeHeaderName, ToHttpContentType(format));
}
@@ -281,7 +281,7 @@ private:
}
if (rpcHeader.has_response_format()) {
- auto format = CheckedEnumCast<EMessageFormat>(rpcHeader.response_format());
+ auto format = FromProto<EMessageFormat>(rpcHeader.response_format());
httpHeaders->Add(AcceptHeaderName, ToHttpContentType(format));
}
diff --git a/yt/yt/core/rpc/http/server.cpp b/yt/yt/core/rpc/http/server.cpp
index e762424218..e0caa56bb7 100644
--- a/yt/yt/core/rpc/http/server.cpp
+++ b/yt/yt/core/rpc/http/server.cpp
@@ -116,7 +116,7 @@ public:
YT_VERIFY(message.Size() >= 2);
if (responseHeader.has_format()) {
- auto format = CheckedEnumCast<EMessageFormat>(responseHeader.format());
+ auto format = FromProto<EMessageFormat>(responseHeader.format());
Rsp_->GetHeaders()->Add("Content-Type", ToHttpContentType(format));
}
diff --git a/yt/yt/core/rpc/response_keeper.cpp b/yt/yt/core/rpc/response_keeper.cpp
index 2e1891bea4..f354c67590 100644
--- a/yt/yt/core/rpc/response_keeper.cpp
+++ b/yt/yt/core/rpc/response_keeper.cpp
@@ -380,7 +380,7 @@ bool ValidateHeaderAndParseRememberOption(const TSharedRefArray& responseMessage
{
NProto::TResponseHeader header;
YT_VERIFY(TryParseResponseHeader(responseMessage, &header));
- return FromProto<EErrorCode>(header.error().code()) != EErrorCode::Unavailable;
+ return header.error().code() != ToUnderlying(EErrorCode::Unavailable);
}
void ValidateRetry(TMutationId mutationId, bool isRetry)
diff --git a/yt/yt/core/rpc/server_detail.cpp b/yt/yt/core/rpc/server_detail.cpp
index 94c992054d..ce9802a9dc 100644
--- a/yt/yt/core/rpc/server_detail.cpp
+++ b/yt/yt/core/rpc/server_detail.cpp
@@ -70,6 +70,10 @@ void TServiceContextBase::Initialize()
RequestId_ = FromProto<TRequestId>(RequestHeader_->request_id());
RealmId_ = FromProto<TRealmId>(RequestHeader_->realm_id());
+ MutationId_ = FromProto<TMutationId>(RequestHeader_->mutation_id());
+ ServiceName_ = FromProto<std::string>(RequestHeader_->service());
+ MethodName_ = FromProto<std::string>(RequestHeader_->method());
+
AuthenticationIdentity_.User = RequestHeader_->has_user() ? RequestHeader_->user() : RootUserName;
AuthenticationIdentity_.UserTag = RequestHeader_->has_user_tag() ? RequestHeader_->user_tag() : AuthenticationIdentity_.User;
@@ -208,12 +212,7 @@ TSharedRefArray TServiceContextBase::BuildResponseMessage()
// COMPAT(danilalexeev): legacy RPC codecs.
if (IsResponseBodySerializedWithCompression()) {
- if (RequestHeader_->has_response_codec()) {
- header.set_codec(ToProto(ResponseCodec_));
- } else {
- ResponseBody_ = PushEnvelope(ResponseBody_, ResponseCodec_);
- ResponseAttachments_ = DecompressAttachments(ResponseAttachments_, ResponseCodec_);
- }
+ header.set_codec(ToProto(ResponseCodec_));
}
auto message = Error_.IsOK()
@@ -223,7 +222,7 @@ TSharedRefArray TServiceContextBase::BuildResponseMessage()
ResponseAttachments_)
: CreateErrorResponseMessage(header);
- auto responseMessageError = CheckBusMessageLimits(ResponseMessage_);
+ auto responseMessageError = CheckBusMessageLimits(message);
if (!responseMessageError.IsOK()) {
return CreateErrorResponseMessage(responseMessageError);
}
@@ -399,14 +398,14 @@ TMutationId TServiceContextBase::GetMutationId() const
return FromProto<TMutationId>(RequestHeader_->mutation_id());
}
-std::string TServiceContextBase::GetService() const
+const std::string& TServiceContextBase::GetService() const
{
- return FromProto<std::string>(RequestHeader_->service());
+ return ServiceName_;
}
-std::string TServiceContextBase::GetMethod() const
+const std::string& TServiceContextBase::GetMethod() const
{
- return FromProto<std::string>(RequestHeader_->method());
+ return MethodName_;
}
TRealmId TServiceContextBase::GetRealmId() const
@@ -611,12 +610,12 @@ TMutationId TServiceContextWrapper::GetMutationId() const
return UnderlyingContext_->GetMutationId();
}
-std::string TServiceContextWrapper::GetService() const
+const std::string& TServiceContextWrapper::GetService() const
{
return UnderlyingContext_->GetService();
}
-std::string TServiceContextWrapper::GetMethod() const
+const std::string& TServiceContextWrapper::GetMethod() const
{
return UnderlyingContext_->GetMethod();
}
diff --git a/yt/yt/core/rpc/server_detail.h b/yt/yt/core/rpc/server_detail.h
index 3683ca7477..ef4eeeeb0d 100644
--- a/yt/yt/core/rpc/server_detail.h
+++ b/yt/yt/core/rpc/server_detail.h
@@ -53,8 +53,8 @@ public:
bool IsRetry() const override;
TMutationId GetMutationId() const override;
- std::string GetService() const override;
- std::string GetMethod() const override;
+ const std::string& GetService() const override;
+ const std::string& GetMethod() const override;
TRealmId GetRealmId() const override;
const TAuthenticationIdentity& GetAuthenticationIdentity() const override;
@@ -69,7 +69,6 @@ public:
//! \note Thread affinity: any
TFuture<TSharedRefArray> GetAsyncResponseMessage() const override;
-
const TSharedRefArray& GetResponseMessage() const override;
using TCanceledCallback = TCallback<void(const TError&)>;
@@ -131,6 +130,9 @@ protected:
bool LoggingEnabled_;
TRequestId RequestId_;
TRealmId RealmId_;
+ TMutationId MutationId_;
+ std::string ServiceName_;
+ std::string MethodName_;
TAuthenticationIdentity AuthenticationIdentity_;
@@ -217,8 +219,8 @@ public:
bool IsRetry() const override;
TMutationId GetMutationId() const override;
- std::string GetService() const override;
- std::string GetMethod() const override;
+ const std::string& GetService() const override;
+ const std::string& GetMethod() const override;
TRealmId GetRealmId() const override;
const TAuthenticationIdentity& GetAuthenticationIdentity() const override;
diff --git a/yt/yt/core/rpc/service.h b/yt/yt/core/rpc/service.h
index def1b8dbf0..b9bd49473e 100644
--- a/yt/yt/core/rpc/service.h
+++ b/yt/yt/core/rpc/service.h
@@ -100,12 +100,10 @@ struct IServiceContext
virtual TMutationId GetMutationId() const = 0;
//! Returns request service name.
- // NB: Service name is supposed to be short, so SSO should work.
- virtual std::string GetService() const = 0;
+ virtual const std::string& GetService() const = 0;
//! Returns request method name.
- // NB: Method name is supposed to be short, so SSO should work.
- virtual std::string GetMethod() const = 0;
+ virtual const std::string& GetMethod() const = 0;
//! Returns request realm id.
virtual TRealmId GetRealmId() const = 0;
diff --git a/yt/yt/core/rpc/service_detail.cpp b/yt/yt/core/rpc/service_detail.cpp
index d6c062b9c9..2cf6b573cd 100644
--- a/yt/yt/core/rpc/service_detail.cpp
+++ b/yt/yt/core/rpc/service_detail.cpp
@@ -22,7 +22,7 @@
#include <yt/yt/core/logging/log_manager.h>
-#include <yt/yt/core/misc/crash_handler.h>
+#include <yt/yt/core/misc/codicil.h>
#include <yt/yt/core/misc/finally.h>
#include <yt/yt/core/net/address.h>
@@ -402,7 +402,6 @@ public:
std::move(logger),
incomingRequest.RuntimeInfo->LogLevel.load(std::memory_order::relaxed))
, Service_(std::move(service))
- , RequestId_(incomingRequest.RequestId)
, ReplyBus_(std::move(incomingRequest.ReplyBus))
, RuntimeInfo_(incomingRequest.RuntimeInfo)
, TraceContext_(std::move(incomingRequest.TraceContext))
@@ -561,8 +560,7 @@ public:
RequestId_);
if (RuntimeInfo_->Descriptor.StreamingEnabled) {
- static const auto CanceledError = TError(NYT::EErrorCode::Canceled, "Request canceled");
- AbortStreamsUnlessClosed(CanceledError);
+ AbortStreamsUnlessClosed(TError(NYT::EErrorCode::Canceled, "Request canceled"));
}
CancelInstant_ = NProfiling::GetInstant();
@@ -586,8 +584,7 @@ public:
stage);
if (RuntimeInfo_->Descriptor.StreamingEnabled) {
- static const auto TimedOutError = TError(NYT::EErrorCode::Timeout, "Request timed out");
- AbortStreamsUnlessClosed(TimedOutError);
+ AbortStreamsUnlessClosed(TError(NYT::EErrorCode::Timeout, "Request timed out"));
}
CanceledList_.Fire(GetCanceledError());
@@ -744,7 +741,6 @@ public:
private:
const TServiceBasePtr Service_;
- const TRequestId RequestId_;
const IBusPtr ReplyBus_;
TRuntimeMethodInfo* const RuntimeInfo_;
const TTraceContextPtr TraceContext_;
@@ -800,10 +796,10 @@ private:
{
// COMPAT(danilalexeev): legacy RPC codecs
RequestCodec_ = RequestHeader_->has_request_codec()
- ? CheckedEnumCast<NCompression::ECodec>(RequestHeader_->request_codec())
+ ? FromProto<NCompression::ECodec>(RequestHeader_->request_codec())
: NCompression::ECodec::None;
ResponseCodec_ = RequestHeader_->has_response_codec()
- ? CheckedEnumCast<NCompression::ECodec>(RequestHeader_->response_codec())
+ ? FromProto<NCompression::ECodec>(RequestHeader_->response_codec())
: NCompression::ECodec::None;
Service_->IncrementActiveRequestCount();
@@ -895,8 +891,7 @@ private:
}
if (RuntimeInfo_->Descriptor.StreamingEnabled) {
- static const auto FinishedError = TError("Request finished");
- AbortStreamsUnlessClosed(Error_.IsOK() ? Error_ : FinishedError);
+ AbortStreamsUnlessClosed(Error_.IsOK() ? Error_ : TError("Request finished"));
}
DoSetComplete();
@@ -996,11 +991,20 @@ private:
{
const auto& authenticationIdentity = GetAuthenticationIdentity();
- TCodicilGuard codicilGuard(Format("RequestId: %v, Method: %v.%v, AuthenticationIdentity: %v",
- GetRequestId(),
- GetService(),
- GetMethod(),
- authenticationIdentity));
+ TCodicilGuard codicilGuard([&] (TCodicilFormatter* formatter) {
+ formatter->AppendString("RequestId: ");
+ formatter->AppendGuid(RequestId_);
+ formatter->AppendString(", Service: ");
+ formatter->AppendString(GetService());
+ formatter->AppendString(", Method: ");
+ formatter->AppendString(GetMethod());
+ formatter->AppendString(", User: ");
+ formatter->AppendString(authenticationIdentity.User);
+ if (!authenticationIdentity.UserTag.empty() && authenticationIdentity.UserTag != authenticationIdentity.User) {
+ formatter->AppendString(", UserTag: ");
+ formatter->AppendString(authenticationIdentity.UserTag);
+ }
+ });
TCurrentAuthenticationIdentityGuard identityGuard(&authenticationIdentity);
handler(this, descriptor.Options);
}
@@ -1110,9 +1114,9 @@ private:
{
TNullTraceContextGuard nullGuard;
- YT_LOG_DEBUG("Request logging suppressed (RequestId: %v)", GetRequestId());
+ YT_LOG_DEBUG("Request logging suppressed (RequestId: %v)", RequestId_);
}
- NLogging::TLogManager::Get()->SuppressRequest(GetRequestId());
+ NLogging::TLogManager::Get()->SuppressRequest(RequestId_);
}
void DoSetComplete()
@@ -1275,10 +1279,10 @@ private:
NProto::TStreamingPayloadHeader header;
ToProto(header.mutable_request_id(), RequestId_);
- ToProto(header.mutable_service(), GetService());
- ToProto(header.mutable_method(), GetMethod());
- if (GetRealmId()) {
- ToProto(header.mutable_realm_id(), GetRealmId());
+ ToProto(header.mutable_service(), ServiceName_);
+ ToProto(header.mutable_method(), MethodName_);
+ if (RealmId_) {
+ ToProto(header.mutable_realm_id(), RealmId_);
}
header.set_sequence_number(payload->SequenceNumber);
header.set_codec(ToProto(payload->Codec));
@@ -1317,10 +1321,10 @@ private:
NProto::TStreamingFeedbackHeader header;
ToProto(header.mutable_request_id(), RequestId_);
- header.set_service(GetService());
- header.set_method(GetMethod());
- if (GetRealmId()) {
- ToProto(header.mutable_realm_id(), GetRealmId());
+ header.set_service(ToProto(ServiceName_));
+ header.set_method(ToProto(MethodName_));
+ if (RealmId_) {
+ ToProto(header.mutable_realm_id(), RealmId_);
}
header.set_read_position(feedback.ReadPosition);
diff --git a/yt/yt/core/rpc/service_detail.h b/yt/yt/core/rpc/service_detail.h
index b12bc45ebe..7c43182860 100644
--- a/yt/yt/core/rpc/service_detail.h
+++ b/yt/yt/core/rpc/service_detail.h
@@ -336,9 +336,17 @@ protected:
const auto& underlyingContext = this->GetUnderlyingContext();
const auto& requestHeader = underlyingContext->GetRequestHeader();
- auto codecId = underlyingContext->GetResponseCodec();
- auto serializedBody = SerializeProtoToRefWithCompression(*Response_, codecId);
- underlyingContext->SetResponseBodySerializedWithCompression();
+ // COMPAT(danilalexeev): legacy RPC codecs.
+ NCompression::ECodec attachmentCodecId = NCompression::ECodec::None;
+ auto bodyCodecId = underlyingContext->GetResponseCodec();
+ TSharedRef serializedBody;
+ if (requestHeader.has_response_codec()) {
+ serializedBody = SerializeProtoToRefWithCompression(*Response_, bodyCodecId);
+ attachmentCodecId = bodyCodecId;
+ underlyingContext->SetResponseBodySerializedWithCompression();
+ } else {
+ serializedBody = SerializeProtoToRefWithEnvelope(*Response_, bodyCodecId);
+ }
if (requestHeader.has_response_format()) {
auto format = TryCheckedEnumCast<EMessageFormat>(requestHeader.response_format());
@@ -363,7 +371,7 @@ protected:
}
}
- auto responseAttachments = CompressAttachments(Response_->Attachments(), codecId);
+ auto responseAttachments = CompressAttachments(Response_->Attachments(), attachmentCodecId);
return TSerializedResponse{
.Body = std::move(serializedBody),
diff --git a/yt/yt/core/rpc/unittests/lib/test_service.cpp b/yt/yt/core/rpc/unittests/lib/test_service.cpp
index e71ffffc1a..f6477b361e 100644
--- a/yt/yt/core/rpc/unittests/lib/test_service.cpp
+++ b/yt/yt/core/rpc/unittests/lib/test_service.cpp
@@ -18,6 +18,8 @@ namespace NYT::NRpc {
using namespace NConcurrency;
+using NYT::FromProto;
+
////////////////////////////////////////////////////////////////////////////////
YT_DEFINE_GLOBAL(std::unique_ptr<NThreading::TEvent>, Latch_);
@@ -140,7 +142,7 @@ public:
DECLARE_RPC_SERVICE_METHOD(NTestRpc, Compression)
{
- auto requestCodecId = CheckedEnumCast<NCompression::ECodec>(request->request_codec());
+ auto requestCodecId = FromProto<NCompression::ECodec>(request->request_codec());
auto serializedRequestBody = SerializeProtoToRefWithCompression(*request, requestCodecId);
const auto& compressedRequestBody = context->GetRequestBody();
EXPECT_TRUE(TRef::AreBitwiseEqual(serializedRequestBody, compressedRequestBody));
diff --git a/yt/yt/core/rpc/unittests/mock/service.h b/yt/yt/core/rpc/unittests/mock/service.h
index 72c13d7910..152e515794 100644
--- a/yt/yt/core/rpc/unittests/mock/service.h
+++ b/yt/yt/core/rpc/unittests/mock/service.h
@@ -123,13 +123,13 @@ public:
(const, override));
MOCK_METHOD(
- std::string,
+ const std::string&,
GetService,
(),
(const, override));
MOCK_METHOD(
- std::string,
+ const std::string&,
GetMethod,
(),
(const, override));
diff --git a/yt/yt/core/tracing/trace_context.h b/yt/yt/core/tracing/trace_context.h
index 8bdf90aabd..44f618a261 100644
--- a/yt/yt/core/tracing/trace_context.h
+++ b/yt/yt/core/tracing/trace_context.h
@@ -278,7 +278,7 @@ TTraceContext* GetCurrentTraceContext();
//! Flushes the elapsed time of the current trace context (if any).
void FlushCurrentTraceContextElapsedTime();
-//!
+//! Returns a trace context from #storage (null if there is none).
TTraceContext* TryGetTraceContextFromPropagatingStorage(const NConcurrency::TPropagatingStorage& storage);
//! Creates a new trace context. If the current trace context exists, it becomes the parent of the
diff --git a/yt/yt/core/ya.make b/yt/yt/core/ya.make
index 1948e8d250..59362bed77 100644
--- a/yt/yt/core/ya.make
+++ b/yt/yt/core/ya.make
@@ -14,6 +14,7 @@ NO_LTO()
SRCS(
actions/cancelation_token.cpp
actions/cancelable_context.cpp
+ actions/codicil_guarded_invoker.cpp
actions/current_invoker.cpp
actions/future.cpp
actions/invoker_detail.cpp
@@ -118,6 +119,7 @@ SRCS(
misc/blob_output.cpp
misc/bloom_filter.cpp
misc/checksum.cpp
+ misc/codicil.cpp
misc/config.cpp
misc/coro_pipe.cpp
misc/crash_handler.cpp
diff --git a/yt/yt/core/yson/unittests/yson_writer_ut.cpp b/yt/yt/core/yson/unittests/yson_writer_ut.cpp
index 99414f0d94..118781fd44 100644
--- a/yt/yt/core/yson/unittests/yson_writer_ut.cpp
+++ b/yt/yt/core/yson/unittests/yson_writer_ut.cpp
@@ -6,6 +6,9 @@
#include <yt/yt/core/yson/parser.h>
#include <yt/yt/core/yson/stream.h>
+#include <yt/yt/core/ytree/ephemeral_node_factory.h>
+#include <yt/yt/core/ytree/tree_builder.h>
+
#include <util/string/escape.h>
namespace NYT::NYson {
@@ -457,5 +460,20 @@ TEST(TYsonFragmentWriterTest, NoFirstIndent)
////////////////////////////////////////////////////////////////////////////////
+TEST(TYsonTreeBuilderTest, MaxListSize)
+{
+ TString input =
+ "\"a1\" = {\n"
+ " \"key\" = 42;\n"
+ "};\n";
+
+ auto builder = NYTree::CreateBuilderFromFactory(NYTree::GetEphemeralNodeFactory(), /*treeSizeLimit*/ 0);
+ builder->BeginTree();
+ TStringStream stream(input);
+ EXPECT_THROW(ParseYson(TYsonInput(&stream), builder.get()), std::exception);
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
} // namespace
} // namespace NYT::NYson
diff --git a/yt/yt/core/ytree/ephemeral_node_factory.cpp b/yt/yt/core/ytree/ephemeral_node_factory.cpp
index 878837a093..789fa6ace4 100644
--- a/yt/yt/core/ytree/ephemeral_node_factory.cpp
+++ b/yt/yt/core/ytree/ephemeral_node_factory.cpp
@@ -451,7 +451,7 @@ public:
: ShouldHideAttributes_(shouldHideAttributes)
{ }
- virtual ~TEphemeralNodeFactory() override
+ ~TEphemeralNodeFactory() override
{
RollbackIfNeeded();
}
diff --git a/yt/yt/core/ytree/node_detail.cpp b/yt/yt/core/ytree/node_detail.cpp
index cf785433d1..5660cc43e7 100644
--- a/yt/yt/core/ytree/node_detail.cpp
+++ b/yt/yt/core/ytree/node_detail.cpp
@@ -559,7 +559,7 @@ void TListNodeMixin::SetChild(
void TSupportsSetSelfMixin::SetSelf(
TReqSet* request,
TRspSet* /*response*/,
- const TCtxSetPtr &context)
+ const TCtxSetPtr& context)
{
bool force = request->force();
context->SetRequestInfo("Force: %v", force);
diff --git a/yt/yt/core/ytree/ypath_client.cpp b/yt/yt/core/ytree/ypath_client.cpp
index 810557403a..5cb9d8055e 100644
--- a/yt/yt/core/ytree/ypath_client.cpp
+++ b/yt/yt/core/ytree/ypath_client.cpp
@@ -225,7 +225,7 @@ void TYPathResponse::Deserialize(const TSharedRefArray& message)
// COMPAT(danilalexeev): legacy RPC codecs
auto codecId = header.has_codec()
- ? std::make_optional(CheckedEnumCast<NCompression::ECodec>(header.codec()))
+ ? std::make_optional(FromProto<NCompression::ECodec>(header.codec()))
: std::nullopt;
if (!TryDeserializeBody(message[1], codecId)) {
diff --git a/yt/yt/core/ytree/ypath_detail.cpp b/yt/yt/core/ytree/ypath_detail.cpp
index 23a1610ca8..bcc24ea64b 100644
--- a/yt/yt/core/ytree/ypath_detail.cpp
+++ b/yt/yt/core/ytree/ypath_detail.cpp
@@ -1278,7 +1278,7 @@ class TNodeSetter
private: \
I##name##Node* const Node_; \
\
- virtual ENodeType GetExpectedType() override \
+ ENodeType GetExpectedType() override \
{ \
return ENodeType::name; \
}
diff --git a/yt/yt/core/ytree/ypath_detail.h b/yt/yt/core/ytree/ypath_detail.h
index 797fff24d1..290da3a1fc 100644
--- a/yt/yt/core/ytree/ypath_detail.h
+++ b/yt/yt/core/ytree/ypath_detail.h
@@ -154,11 +154,12 @@ protected:
DEFINE_RPC_SERVICE_METHOD(TSupports##method, method) \
{ \
NYPath::TTokenizer tokenizer(GetRequestTargetYPath(context->RequestHeader())); \
- if (tokenizer.Advance() == NYPath::ETokenType::EndOfStream) { \
+ tokenizer.Advance(); \
+ tokenizer.Skip(NYPath::ETokenType::Ampersand); \
+ if (tokenizer.GetType() == NYPath::ETokenType::EndOfStream) { \
method##Self(request, response, context); \
return; \
} \
- tokenizer.Skip(NYPath::ETokenType::Ampersand); \
if (tokenizer.GetType() != NYPath::ETokenType::Slash) { \
onPathError \
return; \
diff --git a/yt/yt/library/column_converters/string_column_converter.cpp b/yt/yt/library/column_converters/string_column_converter.cpp
index 60b4e28337..797f64f4ee 100644
--- a/yt/yt/library/column_converters/string_column_converter.cpp
+++ b/yt/yt/library/column_converters/string_column_converter.cpp
@@ -128,9 +128,7 @@ private:
auto offsets = GetDirectDenseOffsets();
// Save offsets as diff from expected.
- ui32 expectedLength;
- ui32 maxDiff;
- PrepareDiffFromExpected(&offsets, &expectedLength, &maxDiff);
+ auto [expectedLength, maxDiff] = PrepareDiffFromExpected(&offsets);
auto directDataSize = DirectBuffer_->GetSize();
auto directData = DirectBuffer_->Finish();
@@ -213,9 +211,7 @@ private:
auto idsRef = TSharedRef::MakeCopy<TConverterTag>(TRef(ids.data(), sizeof(ui32) * ids.size()));
// 2. Dictionary offsets.
- ui32 expectedLength;
- ui32 maxDiff;
- PrepareDiffFromExpected(&dictionaryOffsets, &expectedLength, &maxDiff);
+ auto [expectedLength, maxDiff] = PrepareDiffFromExpected(&dictionaryOffsets);
auto dictionaryOffsetsRef = TSharedRef::MakeCopy<TConverterTag>(TRef(dictionaryOffsets.data(), sizeof(ui32) * dictionaryOffsets.size()));
auto primaryColumn = std::make_shared<TBatchColumn>();
diff --git a/yt/yt/library/formats/yamr_parser_base.h b/yt/yt/library/formats/yamr_parser_base.h
index 56968d4908..66e5b306a8 100644
--- a/yt/yt/library/formats/yamr_parser_base.h
+++ b/yt/yt/library/formats/yamr_parser_base.h
@@ -28,7 +28,7 @@ class TYamrConsumerBase
{
public:
explicit TYamrConsumerBase(NYson::IYsonConsumer* consumer);
- virtual void SwitchTable(i64 tableIndex) override;
+ void SwitchTable(i64 tableIndex) override;
protected:
NYson::IYsonConsumer* Consumer;
@@ -53,8 +53,8 @@ public:
bool enableKeyEscaping,
bool enableValueEscaping);
- virtual void Read(TStringBuf data) override;
- virtual void Finish() override;
+ void Read(TStringBuf data) override;
+ void Finish() override;
private:
using EState = EYamrDelimitedBaseParserState;
@@ -123,8 +123,8 @@ public:
bool enableSubkey,
bool enableEom);
- virtual void Read(TStringBuf data) override;
- virtual void Finish() override;
+ void Read(TStringBuf data) override;
+ void Finish() override;
private:
using EState = EYamrLenvalBaseParserState;
diff --git a/yt/yt/library/formats/yson_map_to_unversioned_value.h b/yt/yt/library/formats/yson_map_to_unversioned_value.h
index 023a0600a3..656bf576d0 100644
--- a/yt/yt/library/formats/yson_map_to_unversioned_value.h
+++ b/yt/yt/library/formats/yson_map_to_unversioned_value.h
@@ -22,28 +22,28 @@ public:
NTableClient::IValueConsumer* valueConsumer);
void Reset();
- virtual void OnStringScalar(TStringBuf value) override;
- virtual void OnInt64Scalar(i64 value) override;
- virtual void OnUint64Scalar(ui64 value) override;
- virtual void OnDoubleScalar(double value) override;
- virtual void OnBooleanScalar(bool value) override;
- virtual void OnEntity() override;
- virtual void OnBeginList() override;
- virtual void OnListItem() override;
- virtual void OnBeginMap() override;
- virtual void OnKeyedItem(TStringBuf name) override;
- virtual void OnEndMap() override;
- virtual void OnBeginAttributes() override;
- virtual void OnEndList() override;
- virtual void OnEndAttributes() override;
+ void OnStringScalar(TStringBuf value) override;
+ void OnInt64Scalar(i64 value) override;
+ void OnUint64Scalar(ui64 value) override;
+ void OnDoubleScalar(double value) override;
+ void OnBooleanScalar(bool value) override;
+ void OnEntity() override;
+ void OnBeginList() override;
+ void OnListItem() override;
+ void OnBeginMap() override;
+ void OnKeyedItem(TStringBuf name) override;
+ void OnEndMap() override;
+ void OnBeginAttributes() override;
+ void OnEndList() override;
+ void OnEndAttributes() override;
private:
- virtual const NTableClient::TNameTablePtr& GetNameTable() const override;
- virtual bool GetAllowUnknownColumns() const override;
- virtual void OnBeginRow() override;
- virtual void OnValue(const NTableClient::TUnversionedValue& value) override;
- virtual void OnEndRow() override;
- virtual const NTableClient::TTableSchemaPtr& GetSchema() const override;
+ const NTableClient::TNameTablePtr& GetNameTable() const override;
+ bool GetAllowUnknownColumns() const override;
+ void OnBeginRow() override;
+ void OnValue(const NTableClient::TUnversionedValue& value) override;
+ void OnEndRow() override;
+ const NTableClient::TTableSchemaPtr& GetSchema() const override;
private:
NTableClient::IValueConsumer* const Consumer_;
diff --git a/yt/yt/library/oom/oom.h b/yt/yt/library/oom/oom.h
index 33a9206398..1fa01194dc 100644
--- a/yt/yt/library/oom/oom.h
+++ b/yt/yt/library/oom/oom.h
@@ -26,7 +26,7 @@ struct TTCMallocLimitHandlerOptions
// Files structure would have the following form:
// HeapDumpDirectory/<ActualName>_FilenameSuffix_Timestamp.ext.
- TString FilenameSuffix = "";
+ std::optional<TString> FilenameSuffix;
TDuration Timeout = TDuration::Minutes(5);
};
diff --git a/yt/yt/library/oom/tcmalloc_memory_limit_handler.cpp b/yt/yt/library/oom/tcmalloc_memory_limit_handler.cpp
index ca624aaea0..cad154d211 100644
--- a/yt/yt/library/oom/tcmalloc_memory_limit_handler.cpp
+++ b/yt/yt/library/oom/tcmalloc_memory_limit_handler.cpp
@@ -35,11 +35,15 @@ namespace NYT {
////////////////////////////////////////////////////////////////////////////////
+namespace {
+
TString MakeIncompletePath(const TString& path)
{
return NYT::Format("%v_incomplete", path);
}
+} // namespace
+
////////////////////////////////////////////////////////////////////////////////
void CollectAndDumpMemoryProfile(const TString& memoryProfilePath, tcmalloc::ProfileType profileType)
@@ -205,7 +209,7 @@ private:
{
return NYT::MakeFormatterWrapper([this, &timestamp] (TStringBuilderBase* builder) {
if (Options_.FilenameSuffix) {
- builder->AppendFormat("%v_", Options_.FilenameSuffix);
+ builder->AppendFormat("%v_", *Options_.FilenameSuffix);
}
FormatValue(builder, timestamp, "v");
});
diff --git a/yt/yt/library/process/unittests/process_ut.cpp b/yt/yt/library/process/unittests/process_ut.cpp
index 01a1468e59..15f807561f 100644
--- a/yt/yt/library/process/unittests/process_ut.cpp
+++ b/yt/yt/library/process/unittests/process_ut.cpp
@@ -209,6 +209,9 @@ TEST(TProcessTest, KillFinished)
TEST(TProcessTest, KillZombie)
{
+ // TODO(arkady-e1ppa): This code is for debugging test failures purposes
+ // remove it when investigation is complete.
+ ::signal(SIGCHLD, SIG_DFL);
auto p = New<TSimpleProcess>("/bin/bash");
p->AddArgument("-c");
p->AddArgument("/bin/sleep 1; /bin/true");
diff --git a/yt/yt/library/profiling/solomon/cube.cpp b/yt/yt/library/profiling/solomon/cube.cpp
index 93b54496b2..52a106556f 100644
--- a/yt/yt/library/profiling/solomon/cube.cpp
+++ b/yt/yt/library/profiling/solomon/cube.cpp
@@ -435,7 +435,7 @@ int TCube<T>::ReadSensors(
};
if constexpr (std::is_same_v<T, i64> || std::is_same_v<T, TDuration>) {
- if (options.ConvertCountersToRateGauge) {
+ if (options.ConvertCountersToRateGauge || options.ConvertCountersToDeltaGauge) {
consumer->OnMetricBegin(NMonitoring::EMetricType::GAUGE);
} else {
consumer->OnMetricBegin(NMonitoring::EMetricType::RATE);
@@ -455,6 +455,12 @@ int TCube<T>::ReadSensors(
} else {
consumer->OnDouble(time, value.SecondsFloat() / options.RateDenominator);
}
+ } else if (options.ConvertCountersToDeltaGauge) {
+ if constexpr (std::is_same_v<T, i64>) {
+ consumer->OnDouble(time, value);
+ } else {
+ consumer->OnDouble(time, value.SecondsFloat());
+ }
} else {
// TODO(prime@): RATE is incompatible with windowed read.
if constexpr (std::is_same_v<T, i64>) {
diff --git a/yt/yt/library/profiling/solomon/cube.h b/yt/yt/library/profiling/solomon/cube.h
index 39c638fcc0..061ca24bc1 100644
--- a/yt/yt/library/profiling/solomon/cube.h
+++ b/yt/yt/library/profiling/solomon/cube.h
@@ -24,6 +24,7 @@ struct TReadOptions
std::function<bool(const std::string&)> SensorFilter;
bool ConvertCountersToRateGauge = false;
+ bool ConvertCountersToDeltaGauge = false;
bool RenameConvertedCounters = true;
double RateDenominator = 1.0;
bool EnableHistogramCompat = false;
diff --git a/yt/yt/library/profiling/solomon/exporter.cpp b/yt/yt/library/profiling/solomon/exporter.cpp
index 913fbcfe55..225f7682ff 100644
--- a/yt/yt/library/profiling/solomon/exporter.cpp
+++ b/yt/yt/library/profiling/solomon/exporter.cpp
@@ -79,6 +79,8 @@ void TSolomonExporterConfig::Register(TRegistrar registrar)
.Default(true);
registrar.Parameter("rename_converted_counters", &TThis::RenameConvertedCounters)
.Default(true);
+ registrar.Parameter("convert_counters_to_delta_gauge", &TThis::ConvertCountersToDeltaGauge)
+ .Default(false);
registrar.Parameter("export_summary", &TThis::ExportSummary)
.Default(false);
@@ -134,6 +136,12 @@ void TSolomonExporterConfig::Register(TRegistrar registrar)
});
registrar.Postprocessor([] (TThis* config) {
+ if (config->ConvertCountersToRateForSolomon && config->ConvertCountersToDeltaGauge) {
+ THROW_ERROR_EXCEPTION("\"convert_counters_to_rate_for_solomon\" and \"convert_counters_to_delta_gauge\" both set to true");
+ }
+ });
+
+ registrar.Postprocessor([] (TThis* config) {
for (const auto& [name, shard] : config->Shards) {
if (!shard->GridStep) {
continue;
@@ -780,6 +788,9 @@ void TSolomonExporter::DoHandleShard(
options.RateDenominator = readGridStep->SecondsFloat();
}
}
+ if (Config_->ConvertCountersToDeltaGauge && outputEncodingContext.IsSolomonPull) {
+ options.ConvertCountersToDeltaGauge = true;
+ }
options.EnableSolomonAggregationWorkaround = outputEncodingContext.IsSolomonPull;
options.Times = readWindow;
diff --git a/yt/yt/library/profiling/solomon/exporter.h b/yt/yt/library/profiling/solomon/exporter.h
index bd4aa8b7ea..46bb7f37cb 100644
--- a/yt/yt/library/profiling/solomon/exporter.h
+++ b/yt/yt/library/profiling/solomon/exporter.h
@@ -55,6 +55,7 @@ struct TSolomonExporterConfig
bool ConvertCountersToRateForSolomon;
bool RenameConvertedCounters;
+ bool ConvertCountersToDeltaGauge;
bool ExportSummary;
bool ExportSummaryAsMax;
diff --git a/yt/yt/library/ytprof/http/handler.cpp b/yt/yt/library/ytprof/http/handler.cpp
index 2dbc631957..7ad74d3171 100644
--- a/yt/yt/library/ytprof/http/handler.cpp
+++ b/yt/yt/library/ytprof/http/handler.cpp
@@ -180,7 +180,7 @@ public:
}
private:
- tcmalloc::ProfileType ProfileType_;
+ const tcmalloc::ProfileType ProfileType_;
};
class TTCMallocAllocationProfilerHandler
@@ -206,7 +206,7 @@ class TTCMallocStatHandler
: public IHttpHandler
{
public:
- void HandleRequest(const IRequestPtr& /* req */, const IResponseWriterPtr& rsp) override
+ void HandleRequest(const IRequestPtr& /*req*/, const IResponseWriterPtr& rsp) override
{
auto stat = tcmalloc::MallocExtension::GetStats();
rsp->SetStatus(EStatusCode::OK);
@@ -258,7 +258,7 @@ class TVersionHandler
: public IHttpHandler
{
public:
- void HandleRequest(const IRequestPtr& /* req */, const IResponseWriterPtr& rsp) override
+ void HandleRequest(const IRequestPtr& /*req*/, const IResponseWriterPtr& rsp) override
{
rsp->SetStatus(EStatusCode::OK);
WaitFor(rsp->WriteBody(TSharedRef::FromString(GetVersion())))
@@ -270,7 +270,7 @@ class TBuildIdHandler
: public IHttpHandler
{
public:
- void HandleRequest(const IRequestPtr& /* req */, const IResponseWriterPtr& rsp) override
+ void HandleRequest(const IRequestPtr& /*req*/, const IResponseWriterPtr& rsp) override
{
rsp->SetStatus(EStatusCode::OK);
WaitFor(rsp->WriteBody(TSharedRef::FromString(GetVersion())))
@@ -291,22 +291,28 @@ void Register(
const TString& prefix,
const TBuildInfo& buildInfo)
{
- handlers->Add(prefix + "/profile", New<TCpuProfilerHandler>(buildInfo));
+ handlers->Add(prefix + "/cpu/profile", New<TCpuProfilerHandler>(buildInfo));
- handlers->Add(prefix + "/lock", New<TSpinlockProfilerHandler>(buildInfo, false));
- handlers->Add(prefix + "/block", New<TSpinlockProfilerHandler>(buildInfo, true));
+ handlers->Add(prefix + "/spinlock/lock", New<TSpinlockProfilerHandler>(buildInfo, false));
+ handlers->Add(prefix + "/spinlock/block", New<TSpinlockProfilerHandler>(buildInfo, true));
- handlers->Add(prefix + "/heap", New<TTCMallocSnapshotProfilerHandler>(buildInfo, tcmalloc::ProfileType::kHeap));
- handlers->Add(prefix + "/peak", New<TTCMallocSnapshotProfilerHandler>(buildInfo, tcmalloc::ProfileType::kPeakHeap));
- handlers->Add(prefix + "/fragmentation", New<TTCMallocSnapshotProfilerHandler>(buildInfo, tcmalloc::ProfileType::kFragmentation));
- handlers->Add(prefix + "/allocations", New<TTCMallocAllocationProfilerHandler>(buildInfo));
-
- handlers->Add(prefix + "/tcmalloc", New<TTCMallocStatHandler>());
+ handlers->Add(prefix + "/tcmalloc/current", New<TTCMallocSnapshotProfilerHandler>(buildInfo, tcmalloc::ProfileType::kHeap));
+ handlers->Add(prefix + "/tcmalloc/peak", New<TTCMallocSnapshotProfilerHandler>(buildInfo, tcmalloc::ProfileType::kPeakHeap));
+ handlers->Add(prefix + "/tcmalloc/fragmentation", New<TTCMallocSnapshotProfilerHandler>(buildInfo, tcmalloc::ProfileType::kFragmentation));
+ handlers->Add(prefix + "/tcmalloc/allocation", New<TTCMallocAllocationProfilerHandler>(buildInfo));
+ handlers->Add(prefix + "/tcmalloc/stat", New<TTCMallocStatHandler>());
handlers->Add(prefix + "/binary", New<TBinaryHandler>());
-
+ handlers->Add(prefix + "/build_id", New<TBuildIdHandler>());
handlers->Add(prefix + "/version", New<TVersionHandler>());
+
+ // COMPAT(babenko): consider dropping these
+ handlers->Add(prefix + "/profile", New<TCpuProfilerHandler>(buildInfo));
handlers->Add(prefix + "/buildid", New<TBuildIdHandler>());
+ handlers->Add(prefix + "/heap", New<TTCMallocSnapshotProfilerHandler>(buildInfo, tcmalloc::ProfileType::kHeap));
+ handlers->Add(prefix + "/peak", New<TTCMallocSnapshotProfilerHandler>(buildInfo, tcmalloc::ProfileType::kPeakHeap));
+ handlers->Add(prefix + "/fragmentation", New<TTCMallocSnapshotProfilerHandler>(buildInfo, tcmalloc::ProfileType::kFragmentation));
+ handlers->Add(prefix + "/allocations", New<TTCMallocAllocationProfilerHandler>(buildInfo));
}
////////////////////////////////////////////////////////////////////////////////