aboutsummaryrefslogtreecommitdiffstats
path: root/yt
diff options
context:
space:
mode:
authorrobot-piglet <robot-piglet@yandex-team.com>2023-11-14 19:18:07 +0300
committerrobot-piglet <robot-piglet@yandex-team.com>2023-11-14 20:20:53 +0300
commit874ef51d3d3edfa25f5a505ec6ab50e172965d1e (patch)
tree620fb5e02063d23509d3aa3df2215c099ccde0b7 /yt
parente356b34d3b0399e2f170881af15c91e4db9e3d11 (diff)
downloadydb-874ef51d3d3edfa25f5a505ec6ab50e172965d1e.tar.gz
Intermediate changes
Diffstat (limited to 'yt')
-rw-r--r--yt/yt/client/api/rpc_proxy/client_base.cpp15
-rw-r--r--yt/yt/client/api/rpc_proxy/config.cpp3
-rw-r--r--yt/yt/client/api/rpc_proxy/config.h3
-rw-r--r--yt/yt/client/election/public.h3
-rw-r--r--yt/yt/client/hydra/public.h1
-rw-r--r--yt/yt/core/yson/protobuf_interop.cpp44
-rw-r--r--yt/yt/core/yson/protobuf_interop.h19
-rw-r--r--yt/yt/core/yson/protobuf_interop_options.h27
-rw-r--r--yt/yt/core/yson/unittests/protobuf_yson_ut.cpp50
-rw-r--r--yt/yt/library/CMakeLists.darwin-x86_64.txt2
-rw-r--r--yt/yt/library/CMakeLists.linux-aarch64.txt1
-rw-r--r--yt/yt/library/CMakeLists.linux-x86_64.txt1
-rw-r--r--yt/yt/library/CMakeLists.windows-x86_64.txt2
-rw-r--r--yt/yt/library/containers/CMakeLists.darwin-x86_64.txt30
-rw-r--r--yt/yt/library/containers/CMakeLists.linux-aarch64.txt32
-rw-r--r--yt/yt/library/containers/CMakeLists.linux-x86_64.txt32
-rw-r--r--yt/yt/library/containers/CMakeLists.txt17
-rw-r--r--yt/yt/library/containers/CMakeLists.windows-x86_64.txt27
-rw-r--r--yt/yt/library/containers/cgroup.cpp752
-rw-r--r--yt/yt/library/containers/cgroup.h290
-rw-r--r--yt/yt/library/containers/config.cpp64
-rw-r--r--yt/yt/library/containers/config.h64
-rw-r--r--yt/yt/library/containers/cri/config.cpp54
-rw-r--r--yt/yt/library/containers/cri/config.h70
-rw-r--r--yt/yt/library/containers/cri/cri_api.cpp33
-rw-r--r--yt/yt/library/containers/cri/cri_api.h99
-rw-r--r--yt/yt/library/containers/cri/cri_executor.cpp666
-rw-r--r--yt/yt/library/containers/cri/cri_executor.h207
-rw-r--r--yt/yt/library/containers/cri/private.h13
-rw-r--r--yt/yt/library/containers/cri/public.h17
-rw-r--r--yt/yt/library/containers/cri/ya.make22
-rw-r--r--yt/yt/library/containers/disk_manager/config.cpp61
-rw-r--r--yt/yt/library/containers/disk_manager/config.h79
-rw-r--r--yt/yt/library/containers/disk_manager/disk_info_provider.cpp64
-rw-r--r--yt/yt/library/containers/disk_manager/disk_info_provider.h38
-rw-r--r--yt/yt/library/containers/disk_manager/disk_manager_proxy.cpp49
-rw-r--r--yt/yt/library/containers/disk_manager/disk_manager_proxy.h38
-rw-r--r--yt/yt/library/containers/disk_manager/public.h48
-rw-r--r--yt/yt/library/containers/disk_manager/ya.make19
-rw-r--r--yt/yt/library/containers/instance.cpp812
-rw-r--r--yt/yt/library/containers/instance.h168
-rw-r--r--yt/yt/library/containers/instance_limits_tracker.cpp179
-rw-r--r--yt/yt/library/containers/instance_limits_tracker.h59
-rw-r--r--yt/yt/library/containers/porto_executor.cpp1079
-rw-r--r--yt/yt/library/containers/porto_executor.h142
-rw-r--r--yt/yt/library/containers/porto_health_checker.cpp69
-rw-r--r--yt/yt/library/containers/porto_health_checker.h52
-rw-r--r--yt/yt/library/containers/porto_resource_tracker.cpp711
-rw-r--r--yt/yt/library/containers/porto_resource_tracker.h158
-rw-r--r--yt/yt/library/containers/private.h13
-rw-r--r--yt/yt/library/containers/process.cpp154
-rw-r--r--yt/yt/library/containers/process.h46
-rw-r--r--yt/yt/library/containers/public.h163
-rw-r--r--yt/yt/library/containers/unittests/containers_ut.cpp133
-rw-r--r--yt/yt/library/containers/unittests/porto_resource_tracker_ut.cpp251
-rw-r--r--yt/yt/library/containers/unittests/process_ut.cpp302
-rw-r--r--yt/yt/library/containers/unittests/ya.make35
-rw-r--r--yt/yt/library/containers/ya.make37
-rw-r--r--yt/yt/library/process/CMakeLists.darwin-x86_64.txt26
-rw-r--r--yt/yt/library/process/CMakeLists.txt4
-rw-r--r--yt/yt/library/process/CMakeLists.windows-x86_64.txt23
-rw-r--r--yt/yt/library/program/CMakeLists.darwin-x86_64.txt1
-rw-r--r--yt/yt/library/program/CMakeLists.linux-aarch64.txt1
-rw-r--r--yt/yt/library/program/CMakeLists.linux-x86_64.txt1
-rw-r--r--yt/yt/library/program/CMakeLists.windows-x86_64.txt1
-rw-r--r--yt/yt/library/program/config.cpp4
-rw-r--r--yt/yt/library/program/config.h4
-rw-r--r--yt/yt/library/program/helpers.cpp11
-rw-r--r--yt/yt/library/program/ya.make1
69 files changed, 106 insertions, 7590 deletions
diff --git a/yt/yt/client/api/rpc_proxy/client_base.cpp b/yt/yt/client/api/rpc_proxy/client_base.cpp
index 812b6cd843..17d6fbdf8e 100644
--- a/yt/yt/client/api/rpc_proxy/client_base.cpp
+++ b/yt/yt/client/api/rpc_proxy/client_base.cpp
@@ -911,11 +911,16 @@ TFuture<std::vector<TUnversionedLookupRowsResult>> TClientBase::MultiLookup(
}
template<class TRequest>
-void FillRequestBySelectRowsOptionsBase(const TSelectRowsOptionsBase& options, TRequest request)
+void FillRequestBySelectRowsOptionsBase(
+ const TSelectRowsOptionsBase& options,
+ const std::optional<NYPath::TYPath>& defaultUdfRegistryPath,
+ TRequest request)
{
request->set_timestamp(options.Timestamp);
if (options.UdfRegistryPath) {
request->set_udf_registry_path(*options.UdfRegistryPath);
+ } else if (defaultUdfRegistryPath) {
+ request->set_udf_registry_path(*options.UdfRegistryPath);
}
}
@@ -929,11 +934,13 @@ TFuture<TSelectRowsResult> TClientBase::SelectRows(
req->SetResponseHeavy(true);
req->set_query(query);
- FillRequestBySelectRowsOptionsBase(options, req);
+ const auto& config = GetRpcProxyConnection()->GetConfig();
+
+ FillRequestBySelectRowsOptionsBase(options, config->UdfRegistryPath, req);
// TODO(ifsmirnov): retention timestamp in explain_query.
req->set_retention_timestamp(options.RetentionTimestamp);
// TODO(lukyan): Move to FillRequestBySelectRowsOptionsBase
- req->SetTimeout(options.Timeout.value_or(GetRpcProxyConnection()->GetConfig()->DefaultSelectRowsTimeout));
+ req->SetTimeout(options.Timeout.value_or(config->DefaultSelectRowsTimeout));
if (options.InputRowLimit) {
req->set_input_row_limit(*options.InputRowLimit);
@@ -981,7 +988,7 @@ TFuture<TYsonString> TClientBase::ExplainQuery(
auto req = proxy.ExplainQuery();
req->set_query(query);
- FillRequestBySelectRowsOptionsBase(options, req);
+ FillRequestBySelectRowsOptionsBase(options, GetRpcProxyConnection()->GetConfig()->UdfRegistryPath, req);
return req->Invoke().Apply(BIND([] (const TApiServiceProxy::TRspExplainQueryPtr& rsp) {
return TYsonString(rsp->value());
diff --git a/yt/yt/client/api/rpc_proxy/config.cpp b/yt/yt/client/api/rpc_proxy/config.cpp
index 7df4d7239f..ced3029902 100644
--- a/yt/yt/client/api/rpc_proxy/config.cpp
+++ b/yt/yt/client/api/rpc_proxy/config.cpp
@@ -107,6 +107,9 @@ void TConnectionConfig::Register(TRegistrar registrar)
registrar.Parameter("clock_cluster_tag", &TThis::ClockClusterTag)
.Default(NObjectClient::InvalidCellTag);
+ registrar.Parameter("udf_registry_path", &TThis::UdfRegistryPath)
+ .Optional();
+
registrar.Postprocessor([] (TThis* config) {
if (!config->ProxyEndpoints && !config->ClusterUrl && !config->ProxyAddresses && !config->ProxyUnixDomainSocket) {
THROW_ERROR_EXCEPTION("Either \"endpoints\" or \"cluster_url\" or \"proxy_addresses\" or \"proxy_unix_domain_socket\" must be specified");
diff --git a/yt/yt/client/api/rpc_proxy/config.h b/yt/yt/client/api/rpc_proxy/config.h
index 244f5cd1d4..3f3de07185 100644
--- a/yt/yt/client/api/rpc_proxy/config.h
+++ b/yt/yt/client/api/rpc_proxy/config.h
@@ -70,6 +70,9 @@ public:
NObjectClient::TCellTag ClockClusterTag;
+ //! Path in Cypress with UDFs.
+ std::optional<NYPath::TYPath> UdfRegistryPath;
+
REGISTER_YSON_STRUCT(TConnectionConfig);
static void Register(TRegistrar registrar);
diff --git a/yt/yt/client/election/public.h b/yt/yt/client/election/public.h
index b48fc712db..7b79682b48 100644
--- a/yt/yt/client/election/public.h
+++ b/yt/yt/client/election/public.h
@@ -12,8 +12,7 @@ namespace NYT::NElection {
using TEpochId = TGuid;
using TPeerPriority = std::pair<i64, i64>;
-using TPeerId = int;
-constexpr TPeerId InvalidPeerId = -1;
+constexpr int InvalidPeerId = -1;
using TCellId = TGuid;
extern const TCellId NullCellId;
diff --git a/yt/yt/client/hydra/public.h b/yt/yt/client/hydra/public.h
index 5f1e63206b..c2bdbf94cd 100644
--- a/yt/yt/client/hydra/public.h
+++ b/yt/yt/client/hydra/public.h
@@ -52,7 +52,6 @@ struct TElectionPriority;
using NElection::TCellId;
using NElection::NullCellId;
-using NElection::TPeerId;
using NElection::InvalidPeerId;
using NElection::TPeerPriority;
using NElection::TEpochId;
diff --git a/yt/yt/core/yson/protobuf_interop.cpp b/yt/yt/core/yson/protobuf_interop.cpp
index 6df2d4854d..6c14f8db3c 100644
--- a/yt/yt/core/yson/protobuf_interop.cpp
+++ b/yt/yt/core/yson/protobuf_interop.cpp
@@ -69,6 +69,12 @@ static constexpr int ProtobufMapValueFieldNumber = 2;
namespace {
+////////////////////////////////////////////////////////////////////////////////
+
+const NLogging::TLogger Logger("ProtobufInterop");
+
+////////////////////////////////////////////////////////////////////////////////
+
bool IsSignedIntegralType(FieldDescriptor::Type type)
{
switch (type) {
@@ -850,6 +856,30 @@ protected:
}
}
}
+
+ void ValidateString(TStringBuf data, EUtf8Check checkOption, TString fieldFullName)
+ {
+ if (checkOption == EUtf8Check::None || IsUtf(data)) {
+ return;
+ }
+ switch (checkOption) {
+ case EUtf8Check::None:
+ break;
+ case EUtf8Check::Log:
+ YT_LOG_WARNING("Field %v accepts only valid UTF-8 sequence, but got (%Qv)",
+ YPathStack_.GetHumanReadablePath(),
+ data);
+ break;
+ case EUtf8Check::Throw:
+ THROW_ERROR_EXCEPTION("Field %v accepts only valid UTF-8 sequence, but got (%Qv)",
+ YPathStack_.GetHumanReadablePath(),
+ data)
+ << TErrorAttribute("ypath", YPathStack_.GetPath())
+ << TErrorAttribute("proto_field", fieldFullName);
+ break;
+ }
+ }
+
};
////////////////////////////////////////////////////////////////////////////////
@@ -954,12 +984,7 @@ private:
const auto* field = FieldStack_.back().Field;
switch (field->GetType()) {
case FieldDescriptor::TYPE_STRING:
- if (Options_.CheckUtf8 && !IsUtf(value)) {
- THROW_ERROR_EXCEPTION("Field %v accepts only valid UTF-8 sequence",
- YPathStack_.GetHumanReadablePath())
- << TErrorAttribute("ypath", YPathStack_.GetPath())
- << TErrorAttribute("proto_field", field->GetFullName());
- }
+ ValidateString(value, Options_.CheckUtf8, field->GetFullName());
case FieldDescriptor::TYPE_BYTES:
BodyCodedStream_.WriteVarint64(value.length());
BodyCodedStream_.WriteRaw(value.begin(), static_cast<int>(value.length()));
@@ -2475,11 +2500,8 @@ private:
<< TErrorAttribute("proto_field", field->GetFullName());
}
TStringBuf data(PooledString_.data(), length);
- if (Options_.CheckUtf8 && field->GetType() == FieldDescriptor::TYPE_STRING && !IsUtf(data)) {
- THROW_ERROR_EXCEPTION("Field %v expected to contain valid UTF-8 sequence",
- YPathStack_.GetHumanReadablePath())
- << TErrorAttribute("ypath", YPathStack_.GetPath())
- << TErrorAttribute("proto_field", field->GetFullName());
+ if (field->GetType() == FieldDescriptor::TYPE_STRING) {
+ ValidateString(data, Options_.CheckUtf8, field->GetFullName());
}
ParseScalar([&] {
if (field->GetBytesFieldConverter()) {
diff --git a/yt/yt/core/yson/protobuf_interop.h b/yt/yt/core/yson/protobuf_interop.h
index 48992052eb..3591ef4dab 100644
--- a/yt/yt/core/yson/protobuf_interop.h
+++ b/yt/yt/core/yson/protobuf_interop.h
@@ -106,11 +106,6 @@ struct TProtobufElementResolveResult
TStringBuf TailPath;
};
-struct TResolveProtobufElementByYPathOptions
-{
- bool AllowUnknownYsonFields = false;
-};
-
//! Introspects a given #rootType and locates an element (represented
//! by TProtobufElement discriminated union) at a given #path.
//! Throws if some definite error occurs during resolve (i.e. a malformed
@@ -139,20 +134,6 @@ std::unique_ptr<IYsonConsumer> CreateProtobufWriter(
////////////////////////////////////////////////////////////////////////////////
-struct TProtobufParserOptions
-{
- //! If |true| then fields with numbers not found in protobuf metadata are
- //! silently skipped; otherwise an exception is thrown.
- bool SkipUnknownFields = false;
-
- //! If |true| then required fields not found in protobuf metadata are
- //! silently skipped; otherwise an exception is thrown.
- bool SkipRequiredFields = false;
-
- // Check if |string| fields contain actual UTF-8 strings.
- bool CheckUtf8 = false;
-};
-
//! Parses a byte sequence and translates it into IYsonConsumer calls.
/*!
* IMPORTANT! Due to performance reasons the implementation currently assumes
diff --git a/yt/yt/core/yson/protobuf_interop_options.h b/yt/yt/core/yson/protobuf_interop_options.h
index 254a1faf9f..62173ca268 100644
--- a/yt/yt/core/yson/protobuf_interop_options.h
+++ b/yt/yt/core/yson/protobuf_interop_options.h
@@ -11,6 +11,17 @@ namespace NYT::NYson {
////////////////////////////////////////////////////////////////////////////////
+struct TResolveProtobufElementByYPathOptions
+{
+ bool AllowUnknownYsonFields = false;
+};
+
+DEFINE_ENUM(EUtf8Check,
+ (None)
+ (Log)
+ (Throw)
+);
+
struct TProtobufWriterOptions
{
//! Keep: all unknown fields found during YSON parsing
@@ -33,7 +44,21 @@ struct TProtobufWriterOptions
bool SkipRequiredFields = false;
// Check if |string| fields contain actual UTF-8 strings.
- bool CheckUtf8 = false;
+ EUtf8Check CheckUtf8 = EUtf8Check::None;
+};
+
+struct TProtobufParserOptions
+{
+ //! If |true| then fields with numbers not found in protobuf metadata are
+ //! silently skipped; otherwise an exception is thrown.
+ bool SkipUnknownFields = false;
+
+ //! If |true| then required fields not found in protobuf metadata are
+ //! silently skipped; otherwise an exception is thrown.
+ bool SkipRequiredFields = false;
+
+ // Check if |string| fields contain actual UTF-8 strings.
+ EUtf8Check CheckUtf8 = EUtf8Check::None;
};
////////////////////////////////////////////////////////////////////////////////
diff --git a/yt/yt/core/yson/unittests/protobuf_yson_ut.cpp b/yt/yt/core/yson/unittests/protobuf_yson_ut.cpp
index 2719f939a2..111d9cd19e 100644
--- a/yt/yt/core/yson/unittests/protobuf_yson_ut.cpp
+++ b/yt/yt/core/yson/unittests/protobuf_yson_ut.cpp
@@ -1017,30 +1017,38 @@ TEST(TYsonToProtobufTest, Entities)
TEST(TYsonToProtobufTest, ValidUtf8StringCheck)
{
- TProtobufWriterOptions options{
- .CheckUtf8 = true,
- };
-
- TString invalidUtf8 = "\xc3\x28";
-
- auto check = [&] {
- TEST_PROLOGUE_WITH_OPTIONS(TMessage, options)
- .BeginMap()
- .Item("string_field").Value(invalidUtf8)
- .EndMap();
- };
+ for (auto option: {EUtf8Check::None, EUtf8Check::Log, EUtf8Check::Throw}) {
+ TProtobufWriterOptions options{
+ .CheckUtf8 = option,
+ };
- EXPECT_THROW_WITH_SUBSTRING(check(), "valid UTF-8");
+ TString invalidUtf8 = "\xc3\x28";
- NProto::TMessage message;
- message.set_string_field(invalidUtf8);
- TString newYsonString;
- TStringOutput newYsonOutputStream(newYsonString);
- TYsonWriter ysonWriter(&newYsonOutputStream, EYsonFormat::Pretty);
+ auto check = [&] {
+ TEST_PROLOGUE_WITH_OPTIONS(TMessage, options)
+ .BeginMap()
+ .Item("string_field").Value(invalidUtf8)
+ .EndMap();
+ };
+ if (option == EUtf8Check::Throw) {
+ EXPECT_THROW_WITH_SUBSTRING(check(), "valid UTF-8");
+ } else {
+ EXPECT_NO_THROW(check());
+ }
- EXPECT_THROW_WITH_SUBSTRING(
- WriteProtobufMessage(&ysonWriter, message, TProtobufParserOptions{.CheckUtf8 = true}),
- "valid UTF-8");
+ NProto::TMessage message;
+ message.set_string_field(invalidUtf8);
+ TString newYsonString;
+ TStringOutput newYsonOutputStream(newYsonString);
+ TYsonWriter ysonWriter(&newYsonOutputStream, EYsonFormat::Pretty);
+ if (option == EUtf8Check::Throw) {
+ EXPECT_THROW_WITH_SUBSTRING(
+ WriteProtobufMessage(&ysonWriter, message, TProtobufParserOptions{.CheckUtf8 = option}),
+ "valid UTF-8");
+ } else {
+ EXPECT_NO_THROW(WriteProtobufMessage(&ysonWriter, message, TProtobufParserOptions{.CheckUtf8 = option}));
+ }
+ }
}
TEST(TYsonToProtobufTest, CustomUnknownFieldsModeResolver)
diff --git a/yt/yt/library/CMakeLists.darwin-x86_64.txt b/yt/yt/library/CMakeLists.darwin-x86_64.txt
index 2debbb7626..a85455d5cb 100644
--- a/yt/yt/library/CMakeLists.darwin-x86_64.txt
+++ b/yt/yt/library/CMakeLists.darwin-x86_64.txt
@@ -7,12 +7,10 @@
add_subdirectory(auth)
-add_subdirectory(containers)
add_subdirectory(decimal)
add_subdirectory(erasure)
add_subdirectory(monitoring)
add_subdirectory(numeric)
-add_subdirectory(process)
add_subdirectory(profiling)
add_subdirectory(program)
add_subdirectory(quantile_digest)
diff --git a/yt/yt/library/CMakeLists.linux-aarch64.txt b/yt/yt/library/CMakeLists.linux-aarch64.txt
index 524ffcf525..07cc57b53c 100644
--- a/yt/yt/library/CMakeLists.linux-aarch64.txt
+++ b/yt/yt/library/CMakeLists.linux-aarch64.txt
@@ -8,7 +8,6 @@
add_subdirectory(auth)
add_subdirectory(backtrace_introspector)
-add_subdirectory(containers)
add_subdirectory(decimal)
add_subdirectory(erasure)
add_subdirectory(monitoring)
diff --git a/yt/yt/library/CMakeLists.linux-x86_64.txt b/yt/yt/library/CMakeLists.linux-x86_64.txt
index 524ffcf525..07cc57b53c 100644
--- a/yt/yt/library/CMakeLists.linux-x86_64.txt
+++ b/yt/yt/library/CMakeLists.linux-x86_64.txt
@@ -8,7 +8,6 @@
add_subdirectory(auth)
add_subdirectory(backtrace_introspector)
-add_subdirectory(containers)
add_subdirectory(decimal)
add_subdirectory(erasure)
add_subdirectory(monitoring)
diff --git a/yt/yt/library/CMakeLists.windows-x86_64.txt b/yt/yt/library/CMakeLists.windows-x86_64.txt
index 4502da4e61..eaf7bb9e34 100644
--- a/yt/yt/library/CMakeLists.windows-x86_64.txt
+++ b/yt/yt/library/CMakeLists.windows-x86_64.txt
@@ -6,9 +6,7 @@
# original buildsystem will not be accepted.
-add_subdirectory(containers)
add_subdirectory(monitoring)
-add_subdirectory(process)
add_subdirectory(profiling)
add_subdirectory(program)
add_subdirectory(syncmap)
diff --git a/yt/yt/library/containers/CMakeLists.darwin-x86_64.txt b/yt/yt/library/containers/CMakeLists.darwin-x86_64.txt
deleted file mode 100644
index faab79bbf6..0000000000
--- a/yt/yt/library/containers/CMakeLists.darwin-x86_64.txt
+++ /dev/null
@@ -1,30 +0,0 @@
-
-# This file was generated by the build system used internally in the Yandex monorepo.
-# Only simple modifications are allowed (adding source-files to targets, adding simple properties
-# like target_include_directories). These modifications will be ported to original
-# ya.make files by maintainers. Any complex modifications which can't be ported back to the
-# original buildsystem will not be accepted.
-
-
-
-add_library(yt-library-containers)
-target_compile_options(yt-library-containers PRIVATE
- -Wdeprecated-this-capture
-)
-target_link_libraries(yt-library-containers PUBLIC
- contrib-libs-cxxsupp
- yutil
- cpp-porto-proto
- yt-library-process
- yt-yt-core
-)
-target_sources(yt-library-containers PRIVATE
- ${CMAKE_SOURCE_DIR}/yt/yt/library/containers/cgroup.cpp
- ${CMAKE_SOURCE_DIR}/yt/yt/library/containers/config.cpp
- ${CMAKE_SOURCE_DIR}/yt/yt/library/containers/instance.cpp
- ${CMAKE_SOURCE_DIR}/yt/yt/library/containers/instance_limits_tracker.cpp
- ${CMAKE_SOURCE_DIR}/yt/yt/library/containers/process.cpp
- ${CMAKE_SOURCE_DIR}/yt/yt/library/containers/porto_executor.cpp
- ${CMAKE_SOURCE_DIR}/yt/yt/library/containers/porto_resource_tracker.cpp
- ${CMAKE_SOURCE_DIR}/yt/yt/library/containers/porto_health_checker.cpp
-)
diff --git a/yt/yt/library/containers/CMakeLists.linux-aarch64.txt b/yt/yt/library/containers/CMakeLists.linux-aarch64.txt
deleted file mode 100644
index d3ab3811e0..0000000000
--- a/yt/yt/library/containers/CMakeLists.linux-aarch64.txt
+++ /dev/null
@@ -1,32 +0,0 @@
-
-# This file was generated by the build system used internally in the Yandex monorepo.
-# Only simple modifications are allowed (adding source-files to targets, adding simple properties
-# like target_include_directories). These modifications will be ported to original
-# ya.make files by maintainers. Any complex modifications which can't be ported back to the
-# original buildsystem will not be accepted.
-
-
-
-add_library(yt-library-containers)
-target_compile_options(yt-library-containers PRIVATE
- -Wdeprecated-this-capture
-)
-target_link_libraries(yt-library-containers PUBLIC
- contrib-libs-linux-headers
- contrib-libs-cxxsupp
- yutil
- cpp-porto-proto
- yt-library-process
- yt-yt-core
- library-cpp-porto
-)
-target_sources(yt-library-containers PRIVATE
- ${CMAKE_SOURCE_DIR}/yt/yt/library/containers/cgroup.cpp
- ${CMAKE_SOURCE_DIR}/yt/yt/library/containers/config.cpp
- ${CMAKE_SOURCE_DIR}/yt/yt/library/containers/instance.cpp
- ${CMAKE_SOURCE_DIR}/yt/yt/library/containers/instance_limits_tracker.cpp
- ${CMAKE_SOURCE_DIR}/yt/yt/library/containers/process.cpp
- ${CMAKE_SOURCE_DIR}/yt/yt/library/containers/porto_executor.cpp
- ${CMAKE_SOURCE_DIR}/yt/yt/library/containers/porto_resource_tracker.cpp
- ${CMAKE_SOURCE_DIR}/yt/yt/library/containers/porto_health_checker.cpp
-)
diff --git a/yt/yt/library/containers/CMakeLists.linux-x86_64.txt b/yt/yt/library/containers/CMakeLists.linux-x86_64.txt
deleted file mode 100644
index d3ab3811e0..0000000000
--- a/yt/yt/library/containers/CMakeLists.linux-x86_64.txt
+++ /dev/null
@@ -1,32 +0,0 @@
-
-# This file was generated by the build system used internally in the Yandex monorepo.
-# Only simple modifications are allowed (adding source-files to targets, adding simple properties
-# like target_include_directories). These modifications will be ported to original
-# ya.make files by maintainers. Any complex modifications which can't be ported back to the
-# original buildsystem will not be accepted.
-
-
-
-add_library(yt-library-containers)
-target_compile_options(yt-library-containers PRIVATE
- -Wdeprecated-this-capture
-)
-target_link_libraries(yt-library-containers PUBLIC
- contrib-libs-linux-headers
- contrib-libs-cxxsupp
- yutil
- cpp-porto-proto
- yt-library-process
- yt-yt-core
- library-cpp-porto
-)
-target_sources(yt-library-containers PRIVATE
- ${CMAKE_SOURCE_DIR}/yt/yt/library/containers/cgroup.cpp
- ${CMAKE_SOURCE_DIR}/yt/yt/library/containers/config.cpp
- ${CMAKE_SOURCE_DIR}/yt/yt/library/containers/instance.cpp
- ${CMAKE_SOURCE_DIR}/yt/yt/library/containers/instance_limits_tracker.cpp
- ${CMAKE_SOURCE_DIR}/yt/yt/library/containers/process.cpp
- ${CMAKE_SOURCE_DIR}/yt/yt/library/containers/porto_executor.cpp
- ${CMAKE_SOURCE_DIR}/yt/yt/library/containers/porto_resource_tracker.cpp
- ${CMAKE_SOURCE_DIR}/yt/yt/library/containers/porto_health_checker.cpp
-)
diff --git a/yt/yt/library/containers/CMakeLists.txt b/yt/yt/library/containers/CMakeLists.txt
deleted file mode 100644
index f8b31df0c1..0000000000
--- a/yt/yt/library/containers/CMakeLists.txt
+++ /dev/null
@@ -1,17 +0,0 @@
-
-# This file was generated by the build system used internally in the Yandex monorepo.
-# Only simple modifications are allowed (adding source-files to targets, adding simple properties
-# like target_include_directories). These modifications will be ported to original
-# ya.make files by maintainers. Any complex modifications which can't be ported back to the
-# original buildsystem will not be accepted.
-
-
-if (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND NOT HAVE_CUDA)
- include(CMakeLists.linux-aarch64.txt)
-elseif (CMAKE_SYSTEM_NAME STREQUAL "Darwin" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64")
- include(CMakeLists.darwin-x86_64.txt)
-elseif (WIN32 AND CMAKE_SYSTEM_PROCESSOR STREQUAL "AMD64" AND NOT HAVE_CUDA)
- include(CMakeLists.windows-x86_64.txt)
-elseif (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND NOT HAVE_CUDA)
- include(CMakeLists.linux-x86_64.txt)
-endif()
diff --git a/yt/yt/library/containers/CMakeLists.windows-x86_64.txt b/yt/yt/library/containers/CMakeLists.windows-x86_64.txt
deleted file mode 100644
index 998e1690fa..0000000000
--- a/yt/yt/library/containers/CMakeLists.windows-x86_64.txt
+++ /dev/null
@@ -1,27 +0,0 @@
-
-# This file was generated by the build system used internally in the Yandex monorepo.
-# Only simple modifications are allowed (adding source-files to targets, adding simple properties
-# like target_include_directories). These modifications will be ported to original
-# ya.make files by maintainers. Any complex modifications which can't be ported back to the
-# original buildsystem will not be accepted.
-
-
-
-add_library(yt-library-containers)
-target_link_libraries(yt-library-containers PUBLIC
- contrib-libs-cxxsupp
- yutil
- cpp-porto-proto
- yt-library-process
- yt-yt-core
-)
-target_sources(yt-library-containers PRIVATE
- ${CMAKE_SOURCE_DIR}/yt/yt/library/containers/cgroup.cpp
- ${CMAKE_SOURCE_DIR}/yt/yt/library/containers/config.cpp
- ${CMAKE_SOURCE_DIR}/yt/yt/library/containers/instance.cpp
- ${CMAKE_SOURCE_DIR}/yt/yt/library/containers/instance_limits_tracker.cpp
- ${CMAKE_SOURCE_DIR}/yt/yt/library/containers/process.cpp
- ${CMAKE_SOURCE_DIR}/yt/yt/library/containers/porto_executor.cpp
- ${CMAKE_SOURCE_DIR}/yt/yt/library/containers/porto_resource_tracker.cpp
- ${CMAKE_SOURCE_DIR}/yt/yt/library/containers/porto_health_checker.cpp
-)
diff --git a/yt/yt/library/containers/cgroup.cpp b/yt/yt/library/containers/cgroup.cpp
deleted file mode 100644
index b43ab1e14b..0000000000
--- a/yt/yt/library/containers/cgroup.cpp
+++ /dev/null
@@ -1,752 +0,0 @@
-#include "cgroup.h"
-#include "private.h"
-
-#include <yt/yt/core/misc/fs.h>
-#include <yt/yt/core/misc/proc.h>
-
-#include <yt/yt/core/ytree/fluent.h>
-
-#include <util/string/split.h>
-#include <util/system/filemap.h>
-
-#include <util/system/yield.h>
-
-#ifdef _linux_
- #include <unistd.h>
- #include <sys/stat.h>
- #include <errno.h>
-#endif
-
-namespace NYT::NContainers {
-
-using namespace NYTree;
-
-////////////////////////////////////////////////////////////////////////////////
-
-static const auto& Logger = ContainersLogger;
-static const TString CGroupRootPath("/sys/fs/cgroup");
-#ifdef _linux_
-static const int ReadByAll = S_IRUSR | S_IRGRP | S_IROTH;
-static const int ReadExecuteByAll = ReadByAll | S_IXUSR | S_IXGRP | S_IXOTH;
-#endif
-
-////////////////////////////////////////////////////////////////////////////////
-
-namespace {
-
-TString GetParentFor(const TString& type)
-{
-#ifdef _linux_
- auto rawData = TUnbufferedFileInput("/proc/self/cgroup")
- .ReadAll();
- auto result = ParseProcessCGroups(rawData);
- return result[type];
-#else
- Y_UNUSED(type);
- return "_parent_";
-#endif
-}
-
-#ifdef _linux_
-
-std::vector<TString> ReadAllValues(const TString& fileName)
-{
- auto raw = TUnbufferedFileInput(fileName)
- .ReadAll();
-
- YT_LOG_DEBUG("File %v contains %Qv",
- fileName,
- raw);
-
- TVector<TString> values;
- StringSplitter(raw.data())
- .SplitBySet(" \n")
- .SkipEmpty()
- .Collect(&values);
- return values;
-}
-
-TDuration FromJiffies(ui64 jiffies)
-{
- static const auto TicksPerSecond = sysconf(_SC_CLK_TCK);
- return TDuration::MicroSeconds(1000 * 1000 * jiffies / TicksPerSecond);
-}
-
-#endif
-
-} // namespace
-
-////////////////////////////////////////////////////////////////////////////////
-
-void TKillProcessGroupTool::operator()(const TString& processGroupPath) const
-{
- SafeSetUid(0);
- TNonOwningCGroup group(processGroupPath);
- group.Kill();
-}
-
-////////////////////////////////////////////////////////////////////////////////
-
-TNonOwningCGroup::TNonOwningCGroup(const TString& fullPath)
- : FullPath_(fullPath)
-{ }
-
-TNonOwningCGroup::TNonOwningCGroup(const TString& type, const TString& name)
- : FullPath_(NFS::CombinePaths({
- CGroupRootPath,
- type,
- GetParentFor(type),
- name
- }))
-{ }
-
-TNonOwningCGroup::TNonOwningCGroup(TNonOwningCGroup&& other)
- : FullPath_(std::move(other.FullPath_))
-{ }
-
-void TNonOwningCGroup::AddTask(int pid) const
-{
- YT_LOG_INFO(
- "Adding task to cgroup (Task: %v, Cgroup: %v)",
- pid,
- FullPath_);
- Append("tasks", ToString(pid));
-}
-
-void TNonOwningCGroup::AddCurrentTask() const
-{
- YT_VERIFY(!IsNull());
-#ifdef _linux_
- auto pid = getpid();
- AddTask(pid);
-#endif
-}
-
-TString TNonOwningCGroup::Get(const TString& name) const
-{
- YT_VERIFY(!IsNull());
- TString result;
-#ifdef _linux_
- const auto path = GetPath(name);
- result = TFileInput(path).ReadLine();
-#else
- Y_UNUSED(name);
-#endif
- return result;
-}
-
-void TNonOwningCGroup::Set(const TString& name, const TString& value) const
-{
- YT_VERIFY(!IsNull());
-#ifdef _linux_
- auto path = GetPath(name);
- TUnbufferedFileOutput output(TFile(path, EOpenModeFlag::WrOnly));
- output << value;
-#else
- Y_UNUSED(name);
- Y_UNUSED(value);
-#endif
-}
-
-void TNonOwningCGroup::Append(const TString& name, const TString& value) const
-{
- YT_VERIFY(!IsNull());
-#ifdef _linux_
- auto path = GetPath(name);
- TUnbufferedFileOutput output(TFile(path, EOpenModeFlag::ForAppend));
- output << value;
-#else
- Y_UNUSED(name);
- Y_UNUSED(value);
-#endif
-}
-
-bool TNonOwningCGroup::IsRoot() const
-{
- return FullPath_ == CGroupRootPath;
-}
-
-bool TNonOwningCGroup::IsNull() const
-{
- return FullPath_.empty();
-}
-
-bool TNonOwningCGroup::Exists() const
-{
- return NFS::Exists(FullPath_);
-}
-
-std::vector<int> TNonOwningCGroup::GetProcesses() const
-{
- std::vector<int> results;
- if (!IsNull()) {
-#ifdef _linux_
- auto values = ReadAllValues(GetPath("cgroup.procs"));
- for (const auto& value : values) {
- int pid = FromString<int>(value);
- results.push_back(pid);
- }
-#endif
- }
- return results;
-}
-
-std::vector<int> TNonOwningCGroup::GetTasks() const
-{
- std::vector<int> results;
- if (!IsNull()) {
-#ifdef _linux_
- auto values = ReadAllValues(GetPath("tasks"));
- for (const auto& value : values) {
- int pid = FromString<int>(value);
- results.push_back(pid);
- }
-#endif
- }
- return results;
-}
-
-const TString& TNonOwningCGroup::GetFullPath() const
-{
- return FullPath_;
-}
-
-std::vector<TNonOwningCGroup> TNonOwningCGroup::GetChildren() const
-{
- // We retry enumerating directories, since it may fail with weird diagnostics if
- // number of subcgroups changes.
- while (true) {
- try {
- std::vector<TNonOwningCGroup> result;
-
- if (IsNull()) {
- return result;
- }
-
- auto directories = NFS::EnumerateDirectories(FullPath_);
- for (const auto& directory : directories) {
- result.emplace_back(NFS::CombinePaths(FullPath_, directory));
- }
- return result;
- } catch (const std::exception& ex) {
- YT_LOG_WARNING(ex, "Failed to list subcgroups (Path: %v)", FullPath_);
- }
- }
-}
-
-void TNonOwningCGroup::EnsureExistence() const
-{
- YT_LOG_INFO("Creating cgroup (Cgroup: %v)", FullPath_);
-
- YT_VERIFY(!IsNull());
-
-#ifdef _linux_
- NFS::MakeDirRecursive(FullPath_, 0755);
-#endif
-}
-
-void TNonOwningCGroup::Lock() const
-{
- Traverse(
- BIND([] (const TNonOwningCGroup& group) { group.DoLock(); }),
- BIND([] (const TNonOwningCGroup& /*group*/) {}));
-}
-
-void TNonOwningCGroup::Unlock() const
-{
- Traverse(
- BIND([] (const TNonOwningCGroup& /*group*/) {}),
- BIND([] (const TNonOwningCGroup& group) { group.DoUnlock(); }));
-}
-
-void TNonOwningCGroup::Kill() const
-{
- YT_VERIFY(!IsRoot());
-
- Traverse(
- BIND([] (const TNonOwningCGroup& group) { group.DoKill(); }),
- BIND([] (const TNonOwningCGroup& /*group*/) {}));
-}
-
-void TNonOwningCGroup::RemoveAllSubcgroups() const
-{
- Traverse(
- BIND([] (const TNonOwningCGroup& group) {
- group.TryUnlock();
- }),
- BIND([this_ = this] (const TNonOwningCGroup& group) {
- if (this_ != &group) {
- group.DoRemove();
- }
- }));
-}
-
-void TNonOwningCGroup::RemoveRecursive() const
-{
- RemoveAllSubcgroups();
- DoRemove();
-}
-
-void TNonOwningCGroup::DoLock() const
-{
- YT_LOG_INFO("Locking cgroup (Cgroup: %v)", FullPath_);
-
-#ifdef _linux_
- if (!IsNull()) {
- int code = chmod(FullPath_.data(), ReadExecuteByAll);
- YT_VERIFY(code == 0);
-
- code = chmod(GetPath("tasks").data(), ReadByAll);
- YT_VERIFY(code == 0);
- }
-#endif
-}
-
-bool TNonOwningCGroup::TryUnlock() const
-{
- YT_LOG_INFO("Unlocking cgroup (Cgroup: %v)", FullPath_);
-
- if (!Exists()) {
- return true;
- }
-
- bool result = true;
-
-#ifdef _linux_
- if (!IsNull()) {
- int code = chmod(GetPath("tasks").data(), ReadByAll | S_IWUSR);
- if (code != 0) {
- result = false;
- }
-
- code = chmod(FullPath_.data(), ReadExecuteByAll | S_IWUSR);
- if (code != 0) {
- result = false;
- }
- }
-#endif
-
- return result;
-}
-
-void TNonOwningCGroup::DoUnlock() const
-{
- YT_VERIFY(TryUnlock());
-}
-
-void TNonOwningCGroup::DoKill() const
-{
- YT_LOG_DEBUG("Started killing processes in cgroup (Cgroup: %v)", FullPath_);
-
-#ifdef _linux_
- while (true) {
- auto pids = GetTasks();
- if (pids.empty())
- break;
-
- YT_LOG_DEBUG("Killing processes (Pids: %v)", pids);
-
- for (int pid : pids) {
- auto result = kill(pid, SIGKILL);
- if (result == -1) {
- YT_VERIFY(errno == ESRCH);
- }
- }
-
- ThreadYield();
- }
-#endif
-
- YT_LOG_DEBUG("Finished killing processes in cgroup (Cgroup: %v)", FullPath_);
-}
-
-void TNonOwningCGroup::DoRemove() const
-{
- if (NFS::Exists(FullPath_)) {
- NFS::Remove(FullPath_);
- }
-}
-
-void TNonOwningCGroup::Traverse(
- const TCallback<void(const TNonOwningCGroup&)>& preorderAction,
- const TCallback<void(const TNonOwningCGroup&)>& postorderAction) const
-{
- preorderAction(*this);
-
- for (const auto& child : GetChildren()) {
- child.Traverse(preorderAction, postorderAction);
- }
-
- postorderAction(*this);
-}
-
-TString TNonOwningCGroup::GetPath(const TString& filename) const
-{
- return NFS::CombinePaths(FullPath_, filename);
-}
-
-////////////////////////////////////////////////////////////////////////////////
-
-TCGroup::TCGroup(const TString& type, const TString& name)
- : TNonOwningCGroup(type, name)
-{ }
-
-TCGroup::TCGroup(TCGroup&& other)
- : TNonOwningCGroup(std::move(other))
- , Created_(other.Created_)
-{
- other.Created_ = false;
-}
-
-TCGroup::TCGroup(TNonOwningCGroup&& other)
- : TNonOwningCGroup(std::move(other))
- , Created_(false)
-{ }
-
-TCGroup::~TCGroup()
-{
- if (Created_) {
- Destroy();
- }
-}
-
-void TCGroup::Create()
-{
- EnsureExistence();
- Created_ = true;
-}
-
-void TCGroup::Destroy()
-{
- YT_LOG_INFO("Destroying cgroup (Cgroup: %v)", FullPath_);
- YT_VERIFY(Created_);
-
-#ifdef _linux_
- try {
- NFS::Remove(FullPath_);
- } catch (const std::exception& ex) {
- YT_LOG_FATAL(ex, "Failed to destroy cgroup (Cgroup: %v)", FullPath_);
- }
-#endif
- Created_ = false;
-}
-
-bool TCGroup::IsCreated() const
-{
- return Created_;
-}
-
-////////////////////////////////////////////////////////////////////////////////
-
-const TString TCpuAccounting::Name = "cpuacct";
-
-TCpuAccounting::TStatistics& operator-=(TCpuAccounting::TStatistics& lhs, const TCpuAccounting::TStatistics& rhs)
-{
- #define XX(name) lhs.name = lhs.name.ValueOrThrow() - rhs.name.ValueOrThrow();
- XX(UserUsageTime)
- XX(SystemUsageTime)
- XX(WaitTime)
- XX(ThrottledTime)
- XX(ContextSwitchesDelta)
- XX(PeakThreadCount)
- #undef XX
- return lhs;
-}
-
-TCpuAccounting::TCpuAccounting(const TString& name)
- : TCGroup(Name, name)
-{ }
-
-TCpuAccounting::TCpuAccounting(TNonOwningCGroup&& nonOwningCGroup)
- : TCGroup(std::move(nonOwningCGroup))
-{ }
-
-TCpuAccounting::TStatistics TCpuAccounting::GetStatisticsRecursive() const
-{
- TCpuAccounting::TStatistics result;
-#ifdef _linux_
- try {
- auto path = NFS::CombinePaths(GetFullPath(), "cpuacct.stat");
- auto values = ReadAllValues(path);
- YT_VERIFY(values.size() == 4);
-
- TString type[2];
- ui64 jiffies[2];
-
- for (int i = 0; i < 2; ++i) {
- type[i] = values[2 * i];
- jiffies[i] = FromString<ui64>(values[2 * i + 1]);
- }
-
- for (int i = 0; i < 2; ++i) {
- if (type[i] == "user") {
- result.UserUsageTime = FromJiffies(jiffies[i]);
- } else if (type[i] == "system") {
- result.SystemUsageTime = FromJiffies(jiffies[i]);
- }
- }
- } catch (const std::exception& ex) {
- YT_LOG_FATAL(
- ex,
- "Failed to retrieve CPU statistics from cgroup (Cgroup: %v)",
- GetFullPath());
- }
-#endif
- return result;
-}
-
-TCpuAccounting::TStatistics TCpuAccounting::GetStatistics() const
-{
- auto statistics = GetStatisticsRecursive();
- for (auto& cgroup : GetChildren()) {
- auto cpuCGroup = TCpuAccounting(std::move(cgroup));
- statistics -= cpuCGroup.GetStatisticsRecursive();
- }
- return statistics;
-}
-
-
-////////////////////////////////////////////////////////////////////////////////
-
-const TString TCpu::Name = "cpu";
-
-static const int DefaultCpuShare = 1024;
-
-TCpu::TCpu(const TString& name)
- : TCGroup(Name, name)
-{ }
-
-void TCpu::SetShare(double share)
-{
- int cpuShare = static_cast<int>(share * DefaultCpuShare);
- Set("cpu.shares", ToString(cpuShare));
-}
-
-////////////////////////////////////////////////////////////////////////////////
-
-const TString TBlockIO::Name = "blkio";
-
-TBlockIO::TBlockIO(const TString& name)
- : TCGroup(Name, name)
-{ }
-
-// For more information about format of data
-// read https://www.kernel.org/doc/Documentation/cgroups/blkio-controller.txt
-
-TBlockIO::TStatistics TBlockIO::GetStatistics() const
-{
- TBlockIO::TStatistics result;
-#ifdef _linux_
- auto bytesStats = GetDetailedStatistics("blkio.io_service_bytes");
- for (const auto& item : bytesStats) {
- if (item.Type == "Read") {
- result.IOReadByte = result.IOReadByte.ValueOrThrow() + item.Value;
- } else if (item.Type == "Write") {
- result.IOWriteByte = result.IOReadByte.ValueOrThrow() + item.Value;
- }
- }
-
- auto ioStats = GetDetailedStatistics("blkio.io_serviced");
- for (const auto& item : ioStats) {
- if (item.Type == "Read") {
- result.IOReadOps = result.IOReadOps.ValueOrThrow() + item.Value;
- result.IOOps = result.IOOps.ValueOrThrow() + item.Value;
- } else if (item.Type == "Write") {
- result.IOWriteOps = result.IOWriteOps.ValueOrThrow() + item.Value;
- result.IOOps = result.IOOps.ValueOrThrow() + item.Value;
- }
- }
-#endif
- return result;
-}
-
-std::vector<TBlockIO::TStatisticsItem> TBlockIO::GetIOServiceBytes() const
-{
- return GetDetailedStatistics("blkio.io_service_bytes");
-}
-
-std::vector<TBlockIO::TStatisticsItem> TBlockIO::GetIOServiced() const
-{
- return GetDetailedStatistics("blkio.io_serviced");
-}
-
-std::vector<TBlockIO::TStatisticsItem> TBlockIO::GetDetailedStatistics(const char* filename) const
-{
- std::vector<TBlockIO::TStatisticsItem> result;
-#ifdef _linux_
- try {
- auto path = NFS::CombinePaths(GetFullPath(), filename);
- auto values = ReadAllValues(path);
-
- int lineNumber = 0;
- while (3 * lineNumber + 2 < std::ssize(values)) {
- TStatisticsItem item;
- item.DeviceId = values[3 * lineNumber];
- item.Type = values[3 * lineNumber + 1];
- item.Value = FromString<ui64>(values[3 * lineNumber + 2]);
-
- {
- auto guard = Guard(SpinLock_);
- DeviceIds_.insert(item.DeviceId);
- }
-
- if (item.Type == "Read" || item.Type == "Write") {
- result.push_back(item);
-
- YT_LOG_DEBUG("IO operations serviced (OperationCount: %v, OperationType: %v, DeviceId: %v)",
- item.Value,
- item.Type,
- item.DeviceId);
- }
- ++lineNumber;
- }
- } catch (const std::exception& ex) {
- YT_LOG_FATAL(
- ex,
- "Failed to retrieve block IO statistics from cgroup (Cgroup: %v)",
- GetFullPath());
- }
-#else
- Y_UNUSED(filename);
-#endif
- return result;
-}
-
-void TBlockIO::ThrottleOperations(i64 operations) const
-{
- auto guard = Guard(SpinLock_);
- for (const auto& deviceId : DeviceIds_) {
- auto value = Format("%v %v", deviceId, operations);
- Append("blkio.throttle.read_iops_device", value);
- Append("blkio.throttle.write_iops_device", value);
- }
-}
-
-////////////////////////////////////////////////////////////////////////////////
-
-const TString TMemory::Name = "memory";
-
-TMemory::TMemory(const TString& name)
- : TCGroup(Name, name)
-{ }
-
-TMemory::TStatistics TMemory::GetStatistics() const
-{
- TMemory::TStatistics result;
-#ifdef _linux_
- try {
- auto values = ReadAllValues(GetPath("memory.stat"));
- int lineNumber = 0;
- while (2 * lineNumber + 1 < std::ssize(values)) {
- const auto& type = values[2 * lineNumber];
- const auto& unparsedValue = values[2 * lineNumber + 1];
- if (type == "rss") {
- result.Rss = FromString<ui64>(unparsedValue);
- }
- if (type == "mapped_file") {
- result.MappedFile = FromString<ui64>(unparsedValue);
- }
- if (type == "pgmajfault") {
- result.MajorPageFaults = FromString<ui64>(unparsedValue);
- }
- ++lineNumber;
- }
- } catch (const std::exception& ex) {
- YT_LOG_FATAL(
- ex,
- "Failed to retrieve memory statistics from cgroup (Cgroup: %v)",
- GetFullPath());
- }
-#endif
- return result;
-}
-
-i64 TMemory::GetMaxMemoryUsage() const
-{
- return FromString<i64>(Get("memory.max_usage_in_bytes"));
-}
-
-void TMemory::SetLimitInBytes(i64 bytes) const
-{
- Set("memory.limit_in_bytes", ToString(bytes));
-}
-
-void TMemory::ForceEmpty() const
-{
- Set("memory.force_empty", "0");
-}
-
-////////////////////////////////////////////////////////////////////////////////
-
-const TString TFreezer::Name = "freezer";
-
-TFreezer::TFreezer(const TString& name)
- : TCGroup(Name, name)
-{ }
-
-TString TFreezer::GetState() const
-{
- return Get("freezer.state");
-}
-
-void TFreezer::Freeze() const
-{
- Set("freezer.state", "FROZEN");
-}
-
-void TFreezer::Unfreeze() const
-{
- Set("freezer.state", "THAWED");
-}
-
-////////////////////////////////////////////////////////////////////////////////
-
-std::map<TString, TString> ParseProcessCGroups(const TString& str)
-{
- std::map<TString, TString> result;
-
- TVector<TString> values;
- StringSplitter(str.data()).SplitBySet(":\n").SkipEmpty().Collect(&values);
- for (size_t i = 0; i + 2 < values.size(); i += 3) {
- // Check format.
- FromString<int>(values[i]);
-
- const auto& subsystemsSet = values[i + 1];
- const auto& name = values[i + 2];
-
- TVector<TString> subsystems;
- StringSplitter(subsystemsSet.data()).Split(',').SkipEmpty().Collect(&subsystems);
- for (const auto& subsystem : subsystems) {
- if (!subsystem.StartsWith("name=")) {
- int start = 0;
- if (name.StartsWith("/")) {
- start = 1;
- }
- result[subsystem] = name.substr(start);
- }
- }
- }
-
- return result;
-}
-
-std::map<TString, TString> GetProcessCGroups(pid_t pid)
-{
- auto cgroupsPath = Format("/proc/%v/cgroup", pid);
- auto rawCgroups = TFileInput{cgroupsPath}.ReadAll();
- return ParseProcessCGroups(rawCgroups);
-}
-
-bool IsValidCGroupType(const TString& type)
-{
- return
- type == TCpuAccounting::Name ||
- type == TCpu::Name ||
- type == TBlockIO::Name ||
- type == TMemory::Name ||
- type == TFreezer::Name;
-}
-
-////////////////////////////////////////////////////////////////////////////////
-
-} // namespace NYT::NContainers
diff --git a/yt/yt/library/containers/cgroup.h b/yt/yt/library/containers/cgroup.h
deleted file mode 100644
index a61fbbddc3..0000000000
--- a/yt/yt/library/containers/cgroup.h
+++ /dev/null
@@ -1,290 +0,0 @@
-#pragma once
-
-#include "public.h"
-
-#include <yt/yt/core/actions/public.h>
-
-#include <yt/yt/core/ytree/yson_struct.h>
-#include <yt/yt/core/yson/public.h>
-
-#include <yt/yt/core/misc/property.h>
-
-#include <library/cpp/yt/threading/spin_lock.h>
-
-#include <vector>
-
-namespace NYT::NContainers {
-
-////////////////////////////////////////////////////////////////////////////////
-
-void RemoveAllSubcgroups(const TString& path);
-
-////////////////////////////////////////////////////////////////////////////////
-
-struct TKillProcessGroupTool
-{
- void operator()(const TString& processGroupPath) const;
-};
-
-////////////////////////////////////////////////////////////////////////////////
-
-class TNonOwningCGroup
- : private TNonCopyable
-{
-public:
- DEFINE_BYREF_RO_PROPERTY(TString, FullPath);
-
-public:
- TNonOwningCGroup() = default;
- explicit TNonOwningCGroup(const TString& fullPath);
- TNonOwningCGroup(const TString& type, const TString& name);
- TNonOwningCGroup(TNonOwningCGroup&& other);
-
- void AddTask(int pid) const;
- void AddCurrentTask() const;
-
- bool IsRoot() const;
- bool IsNull() const;
- bool Exists() const;
-
- std::vector<int> GetProcesses() const;
- std::vector<int> GetTasks() const;
- const TString& GetFullPath() const;
-
- std::vector<TNonOwningCGroup> GetChildren() const;
-
- void EnsureExistence() const;
-
- void Lock() const;
- void Unlock() const;
-
- void Kill() const;
-
- void RemoveAllSubcgroups() const;
- void RemoveRecursive() const;
-
-protected:
- TString Get(const TString& name) const;
- void Set(const TString& name, const TString& value) const;
- void Append(const TString& name, const TString& value) const;
-
- void DoLock() const;
- void DoUnlock() const;
-
- bool TryUnlock() const;
-
- void DoKill() const;
-
- void DoRemove() const;
-
- void Traverse(
- const TCallback<void(const TNonOwningCGroup&)>& preorderAction,
- const TCallback<void(const TNonOwningCGroup&)>& postorderAction) const;
-
- TString GetPath(const TString& filename) const;
-};
-
-////////////////////////////////////////////////////////////////////////////////
-
-class TCGroup
- : public TNonOwningCGroup
-{
-protected:
- TCGroup(const TString& type, const TString& name);
- TCGroup(TNonOwningCGroup&& other);
- TCGroup(TCGroup&& other);
-
-public:
- ~TCGroup();
-
- void Create();
- void Destroy();
-
- bool IsCreated() const;
-
-private:
- bool Created_ = false;
-};
-
-////////////////////////////////////////////////////////////////////////////////
-
-class TCpuAccounting
- : public TCGroup
-{
-public:
- static const TString Name;
-
- struct TStatistics
- {
- TErrorOr<TDuration> TotalUsageTime;
- TErrorOr<TDuration> UserUsageTime;
- TErrorOr<TDuration> SystemUsageTime;
- TErrorOr<TDuration> WaitTime;
- TErrorOr<TDuration> ThrottledTime;
-
- TErrorOr<ui64> ThreadCount;
- TErrorOr<ui64> ContextSwitches;
- TErrorOr<ui64> ContextSwitchesDelta;
- TErrorOr<ui64> PeakThreadCount;
-
- TErrorOr<TDuration> LimitTime;
- TErrorOr<TDuration> GuaranteeTime;
- };
-
- explicit TCpuAccounting(const TString& name);
-
- TStatistics GetStatisticsRecursive() const;
- TStatistics GetStatistics() const;
-
-private:
- explicit TCpuAccounting(TNonOwningCGroup&& nonOwningCGroup);
-};
-
-void Serialize(const TCpuAccounting::TStatistics& statistics, NYson::IYsonConsumer* consumer);
-
-////////////////////////////////////////////////////////////////////////////////
-
-class TCpu
- : public TCGroup
-{
-public:
- static const TString Name;
-
- explicit TCpu(const TString& name);
-
- void SetShare(double share);
-};
-
-////////////////////////////////////////////////////////////////////////////////
-
-class TBlockIO
- : public TCGroup
-{
-public:
- static const TString Name;
-
- struct TStatistics
- {
- TErrorOr<ui64> IOReadByte;
- TErrorOr<ui64> IOWriteByte;
- TErrorOr<ui64> IOBytesLimit;
-
- TErrorOr<ui64> IOReadOps;
- TErrorOr<ui64> IOWriteOps;
- TErrorOr<ui64> IOOps;
- TErrorOr<ui64> IOOpsLimit;
-
- TErrorOr<TDuration> IOTotalTime;
- TErrorOr<TDuration> IOWaitTime;
- };
-
- struct TStatisticsItem
- {
- TString DeviceId;
- TString Type;
- ui64 Value = 0;
- };
-
- explicit TBlockIO(const TString& name);
-
- TStatistics GetStatistics() const;
- void ThrottleOperations(i64 iops) const;
-
-private:
- //! Guards device ids.
- YT_DECLARE_SPIN_LOCK(NThreading::TSpinLock, SpinLock_);
- //! Set of all seen device ids.
- mutable THashSet<TString> DeviceIds_;
-
- std::vector<TBlockIO::TStatisticsItem> GetDetailedStatistics(const char* filename) const;
-
- std::vector<TStatisticsItem> GetIOServiceBytes() const;
- std::vector<TStatisticsItem> GetIOServiced() const;
-};
-
-void Serialize(const TBlockIO::TStatistics& statistics, NYson::IYsonConsumer* consumer);
-
-////////////////////////////////////////////////////////////////////////////////
-
-class TMemory
- : public TCGroup
-{
-public:
- static const TString Name;
-
- struct TStatistics
- {
- TErrorOr<ui64> Rss;
- TErrorOr<ui64> MappedFile;
- TErrorOr<ui64> MinorPageFaults;
- TErrorOr<ui64> MajorPageFaults;
-
- TErrorOr<ui64> FileCacheUsage;
- TErrorOr<ui64> AnonUsage;
- TErrorOr<ui64> AnonLimit;
- TErrorOr<ui64> MemoryUsage;
- TErrorOr<ui64> MemoryGuarantee;
- TErrorOr<ui64> MemoryLimit;
- TErrorOr<ui64> MaxMemoryUsage;
-
- TErrorOr<ui64> OomKills;
- TErrorOr<ui64> OomKillsTotal;
- };
-
- explicit TMemory(const TString& name);
-
- TStatistics GetStatistics() const;
- i64 GetMaxMemoryUsage() const;
-
- void SetLimitInBytes(i64 bytes) const;
-
- void ForceEmpty() const;
-};
-
-void Serialize(const TMemory::TStatistics& statistics, NYson::IYsonConsumer* consumer);
-
-////////////////////////////////////////////////////////////////////////////////
-
-class TNetwork
-{
-public:
- struct TStatistics
- {
- TErrorOr<ui64> TxBytes;
- TErrorOr<ui64> TxPackets;
- TErrorOr<ui64> TxDrops;
- TErrorOr<ui64> TxLimit;
-
- TErrorOr<ui64> RxBytes;
- TErrorOr<ui64> RxPackets;
- TErrorOr<ui64> RxDrops;
- TErrorOr<ui64> RxLimit;
- };
-};
-
-void Serialize(const TNetwork::TStatistics& statistics, NYson::IYsonConsumer* consumer);
-
-////////////////////////////////////////////////////////////////////////////////
-
-class TFreezer
- : public TCGroup
-{
-public:
- static const TString Name;
-
- explicit TFreezer(const TString& name);
-
- TString GetState() const;
- void Freeze() const;
- void Unfreeze() const;
-};
-
-////////////////////////////////////////////////////////////////////////////////
-
-std::map<TString, TString> ParseProcessCGroups(const TString& str);
-std::map<TString, TString> GetProcessCGroups(pid_t pid);
-bool IsValidCGroupType(const TString& type);
-
-////////////////////////////////////////////////////////////////////////////////
-
-} // namespace NYT::NContainers
diff --git a/yt/yt/library/containers/config.cpp b/yt/yt/library/containers/config.cpp
deleted file mode 100644
index 39e46f2372..0000000000
--- a/yt/yt/library/containers/config.cpp
+++ /dev/null
@@ -1,64 +0,0 @@
-#include "config.h"
-
-namespace NYT::NContainers {
-
-////////////////////////////////////////////////////////////////////////////////
-
-void TPodSpecConfig::Register(TRegistrar registrar)
-{
- registrar.Parameter("cpu_to_vcpu_factor", &TThis::CpuToVCpuFactor)
- .Default();
-}
-
-////////////////////////////////////////////////////////////////////////////////
-
-bool TCGroupConfig::IsCGroupSupported(const TString& cgroupType) const
-{
- auto it = std::find_if(
- SupportedCGroups.begin(),
- SupportedCGroups.end(),
- [&] (const TString& type) {
- return type == cgroupType;
- });
- return it != SupportedCGroups.end();
-}
-
-void TCGroupConfig::Register(TRegistrar registrar)
-{
- registrar.Parameter("supported_cgroups", &TThis::SupportedCGroups)
- .Default();
-
- registrar.Postprocessor([] (TThis* config) {
- for (const auto& type : config->SupportedCGroups) {
- if (!IsValidCGroupType(type)) {
- THROW_ERROR_EXCEPTION("Invalid cgroup type %Qv", type);
- }
- }
- });
-}
-
-////////////////////////////////////////////////////////////////////////////////
-
-void TPortoExecutorDynamicConfig::Register(TRegistrar registrar)
-{
- registrar.Parameter("retries_timeout", &TThis::RetriesTimeout)
- .Default(TDuration::Seconds(10));
- registrar.Parameter("poll_period", &TThis::PollPeriod)
- .Default(TDuration::MilliSeconds(100));
- registrar.Parameter("api_timeout", &TThis::ApiTimeout)
- .Default(TDuration::Minutes(5));
- registrar.Parameter("api_disk_timeout", &TThis::ApiDiskTimeout)
- .Default(TDuration::Minutes(30));
- registrar.Parameter("enable_network_isolation", &TThis::EnableNetworkIsolation)
- .Default(true);
- registrar.Parameter("enable_test_porto_failures", &TThis::EnableTestPortoFailures)
- .Default(false);
- registrar.Parameter("stub_error_code", &TThis::StubErrorCode)
- .Default(EPortoErrorCode::SocketError);
- registrar.Parameter("enable_test_porto_not_responding", &TThis::EnableTestPortoNotResponding)
- .Default(false);
-}
-
-////////////////////////////////////////////////////////////////////////////////
-
-} // namespace NYT::NContainers
diff --git a/yt/yt/library/containers/config.h b/yt/yt/library/containers/config.h
deleted file mode 100644
index 3639274cff..0000000000
--- a/yt/yt/library/containers/config.h
+++ /dev/null
@@ -1,64 +0,0 @@
-#pragma once
-
-#include "public.h"
-
-#include <yt/yt/core/ytree/yson_struct.h>
-
-namespace NYT::NContainers {
-
-////////////////////////////////////////////////////////////////////////////////
-
-class TPodSpecConfig
- : public virtual NYTree::TYsonStruct
-{
-public:
- std::optional<double> CpuToVCpuFactor;
-
- REGISTER_YSON_STRUCT(TPodSpecConfig);
-
- static void Register(TRegistrar registrar);
-};
-
-DEFINE_REFCOUNTED_TYPE(TPodSpecConfig)
-
-////////////////////////////////////////////////////////////////////////////////
-
-class TCGroupConfig
- : public virtual NYTree::TYsonStruct
-{
-public:
- std::vector<TString> SupportedCGroups;
-
- bool IsCGroupSupported(const TString& cgroupType) const;
-
- REGISTER_YSON_STRUCT(TCGroupConfig);
-
- static void Register(TRegistrar registrar);
-};
-
-////////////////////////////////////////////////////////////////////////////////
-
-class TPortoExecutorDynamicConfig
- : public NYTree::TYsonStruct
-{
-public:
- TDuration RetriesTimeout;
- TDuration PollPeriod;
- TDuration ApiTimeout;
- TDuration ApiDiskTimeout;
- bool EnableNetworkIsolation;
- bool EnableTestPortoFailures;
- bool EnableTestPortoNotResponding;
-
- EPortoErrorCode StubErrorCode;
-
- REGISTER_YSON_STRUCT(TPortoExecutorDynamicConfig);
-
- static void Register(TRegistrar registrar);
-};
-
-DEFINE_REFCOUNTED_TYPE(TPortoExecutorDynamicConfig)
-
-////////////////////////////////////////////////////////////////////////////////
-
-} // namespace NYT::NContainers
diff --git a/yt/yt/library/containers/cri/config.cpp b/yt/yt/library/containers/cri/config.cpp
deleted file mode 100644
index 5572f4d980..0000000000
--- a/yt/yt/library/containers/cri/config.cpp
+++ /dev/null
@@ -1,54 +0,0 @@
-#include "config.h"
-#include "cri_api.h"
-
-namespace NYT::NContainers::NCri {
-
-////////////////////////////////////////////////////////////////////////////////
-
-void TCriExecutorConfig::Register(TRegistrar registrar)
-{
- registrar.Parameter("runtime_endpoint", &TThis::RuntimeEndpoint)
- .Default(TString(DefaultCriEndpoint));
-
- registrar.Parameter("image_endpoint", &TThis::ImageEndpoint)
- .Default(TString(DefaultCriEndpoint));
-
- registrar.Parameter("namespace", &TThis::Namespace)
- .NonEmpty();
-
- registrar.Parameter("runtime_handler", &TThis::RuntimeHandler)
- .Optional();
-
- registrar.Parameter("base_cgroup", &TThis::BaseCgroup)
- .NonEmpty();
-
- registrar.Parameter("cpu_period", &TThis::CpuPeriod)
- .Default(TDuration::MilliSeconds(100));
-}
-
-////////////////////////////////////////////////////////////////////////////////
-
-void TCriAuthConfig::Register(TRegistrar registrar)
-{
- registrar.Parameter("username", &TThis::Username)
- .Optional();
-
- registrar.Parameter("password", &TThis::Password)
- .Optional();
-
- registrar.Parameter("auth", &TThis::Auth)
- .Optional();
-
- registrar.Parameter("server_address", &TThis::ServerAddress)
- .Optional();
-
- registrar.Parameter("identity_token", &TThis::IdentityToken)
- .Optional();
-
- registrar.Parameter("registry_token", &TThis::RegistryToken)
- .Optional();
-}
-
-////////////////////////////////////////////////////////////////////////////////
-
-} // namespace NYT::NContainers::NCri
diff --git a/yt/yt/library/containers/cri/config.h b/yt/yt/library/containers/cri/config.h
deleted file mode 100644
index 4ea33fd390..0000000000
--- a/yt/yt/library/containers/cri/config.h
+++ /dev/null
@@ -1,70 +0,0 @@
-#pragma once
-
-#include "public.h"
-
-#include <yt/yt/core/rpc/config.h>
-
-namespace NYT::NContainers::NCri {
-
-////////////////////////////////////////////////////////////////////////////////
-
-class TCriExecutorConfig
- : public NRpc::TRetryingChannelConfig
-{
-public:
- //! gRPC endpoint for CRI container runtime service.
- TString RuntimeEndpoint;
-
- //! gRPC endpoint for CRI image manager service.
- TString ImageEndpoint;
-
- //! CRI namespace where this executor operates.
- TString Namespace;
-
- //! Name of CRI runtime configuration to use.
- TString RuntimeHandler;
-
- //! Common parent cgroup for all pods.
- TString BaseCgroup;
-
- //! Cpu quota period for cpu limits.
- TDuration CpuPeriod;
-
- REGISTER_YSON_STRUCT(TCriExecutorConfig);
-
- static void Register(TRegistrar registrar);
-};
-
-DEFINE_REFCOUNTED_TYPE(TCriExecutorConfig)
-
-////////////////////////////////////////////////////////////////////////////////
-
-// TODO(khlebnikov): split docker registry stuff into common "docker" library.
-
-//! TCriAuthConfig depicts docker registry authentification
-class TCriAuthConfig
- : public NYTree::TYsonStruct
-{
-public:
- TString Username;
-
- TString Password;
-
- TString Auth;
-
- TString ServerAddress;
-
- TString IdentityToken;
-
- TString RegistryToken;
-
- REGISTER_YSON_STRUCT(TCriAuthConfig);
-
- static void Register(TRegistrar registrar);
-};
-
-DEFINE_REFCOUNTED_TYPE(TCriAuthConfig)
-
-////////////////////////////////////////////////////////////////////////////////
-
-} // namespace NYT::NContainers::NCri
diff --git a/yt/yt/library/containers/cri/cri_api.cpp b/yt/yt/library/containers/cri/cri_api.cpp
deleted file mode 100644
index 93457017ba..0000000000
--- a/yt/yt/library/containers/cri/cri_api.cpp
+++ /dev/null
@@ -1,33 +0,0 @@
-#include "cri_api.h"
-
-namespace NYT::NContainers::NCri {
-
-using namespace NRpc;
-
-////////////////////////////////////////////////////////////////////////////////
-
-TCriRuntimeApi::TCriRuntimeApi(IChannelPtr channel)
- : TProxyBase(std::move(channel), GetDescriptor())
-{ }
-
-const TServiceDescriptor& TCriRuntimeApi::GetDescriptor()
-{
- static const auto Descriptor = TServiceDescriptor(NProto::RuntimeService::service_full_name());
- return Descriptor;
-}
-
-////////////////////////////////////////////////////////////////////////////////
-
-TCriImageApi::TCriImageApi(IChannelPtr channel)
- : TProxyBase(std::move(channel), GetDescriptor())
-{ }
-
-const TServiceDescriptor& TCriImageApi::GetDescriptor()
-{
- static const auto Descriptor = TServiceDescriptor(NProto::ImageService::service_full_name());
- return Descriptor;
-}
-
-////////////////////////////////////////////////////////////////////////////////
-
-} // namespace NYT::NContainers::NCri
diff --git a/yt/yt/library/containers/cri/cri_api.h b/yt/yt/library/containers/cri/cri_api.h
deleted file mode 100644
index 74fe9a64a0..0000000000
--- a/yt/yt/library/containers/cri/cri_api.h
+++ /dev/null
@@ -1,99 +0,0 @@
-#pragma once
-
-#include <yt/yt/core/rpc/client.h>
-
-#include <k8s.io/cri-api/pkg/apis/runtime/v1/api.grpc.pb.h>
-
-namespace NYT::NContainers::NCri {
-
-////////////////////////////////////////////////////////////////////////////////
-
-namespace NProto = ::runtime::v1;
-
-//! Reasonable default for CRI gRPC socket address.
-constexpr TStringBuf DefaultCriEndpoint = "unix:///run/containerd/containerd.sock";
-
-//! RuntimeReady means the runtime is up and ready to accept basic containers.
-constexpr TStringBuf RuntimeReady = "RuntimeReady";
-
-//! NetworkReady means the runtime network is up and ready to accept containers which require network.
-constexpr TStringBuf NetworkReady = "NetworkReady";
-
-//! CRI uses cgroupfs notation for systemd slices, but each name must ends with ".slice".
-constexpr TStringBuf SystemdSliceSuffix = ".slice";
-
-////////////////////////////////////////////////////////////////////////////////
-
-//! CRI labels for pods and containers managed by YT
-constexpr TStringBuf YTPodNamespaceLabel = "tech.ytsaurus.pod.namespace";
-constexpr TStringBuf YTPodNameLabel = "tech.ytsaurus.pod.name";
-constexpr TStringBuf YTContainerNameLabel = "tech.ytsaurus.container.name";
-constexpr TStringBuf YTJobIdLabel = "tech.ytsaurus.job.id";
-
-////////////////////////////////////////////////////////////////////////////////
-
-#define DEFINE_CRI_API_METHOD(method, ...) \
- DEFINE_RPC_PROXY_METHOD_GENERIC(method, NProto::method##Request, NProto::method##Response, __VA_ARGS__)
-
-//! See https://github.com/kubernetes/cri-api
-class TCriRuntimeApi
- : public NRpc::TProxyBase
-{
-public:
- explicit TCriRuntimeApi(NRpc::IChannelPtr channel);
-
- static const NRpc::TServiceDescriptor& GetDescriptor();
-
- DEFINE_CRI_API_METHOD(Version);
- DEFINE_CRI_API_METHOD(RunPodSandbox);
- DEFINE_CRI_API_METHOD(StopPodSandbox);
- DEFINE_CRI_API_METHOD(RemovePodSandbox);
- DEFINE_CRI_API_METHOD(PodSandboxStatus);
- DEFINE_CRI_API_METHOD(ListPodSandbox);
- DEFINE_CRI_API_METHOD(CreateContainer);
- DEFINE_CRI_API_METHOD(StartContainer);
- DEFINE_CRI_API_METHOD(StopContainer);
- DEFINE_CRI_API_METHOD(RemoveContainer);
- DEFINE_CRI_API_METHOD(ListContainers);
- DEFINE_CRI_API_METHOD(ContainerStatus);
- DEFINE_CRI_API_METHOD(UpdateContainerResources);
- DEFINE_CRI_API_METHOD(ReopenContainerLog);
- DEFINE_CRI_API_METHOD(ExecSync);
- DEFINE_CRI_API_METHOD(Exec);
- DEFINE_CRI_API_METHOD(Attach);
- DEFINE_CRI_API_METHOD(PortForward);
- DEFINE_CRI_API_METHOD(ContainerStats);
- DEFINE_CRI_API_METHOD(ListContainerStats);
- DEFINE_CRI_API_METHOD(PodSandboxStats);
- DEFINE_CRI_API_METHOD(ListPodSandboxStats);
- DEFINE_CRI_API_METHOD(UpdateRuntimeConfig);
- DEFINE_CRI_API_METHOD(Status);
- DEFINE_CRI_API_METHOD(CheckpointContainer);
- DEFINE_CRI_API_METHOD(ListMetricDescriptors);
- DEFINE_CRI_API_METHOD(ListPodSandboxMetrics);
-
- // FIXME(khlebnikov): figure out streaming results
- // DEFINE_RPC_PROXY_METHOD_GENERIC(GetContainerEvents, NProto::GetEventsRequest, NProto::ContainerEventResponse,
- // .SetStreamingEnabled(true));
-};
-
-////////////////////////////////////////////////////////////////////////////////
-
-class TCriImageApi
- : public NRpc::TProxyBase
-{
-public:
- explicit TCriImageApi(NRpc::IChannelPtr channel);
-
- static const NRpc::TServiceDescriptor& GetDescriptor();
-
- DEFINE_CRI_API_METHOD(ListImages);
- DEFINE_CRI_API_METHOD(ImageStatus);
- DEFINE_CRI_API_METHOD(PullImage);
- DEFINE_CRI_API_METHOD(RemoveImage);
- DEFINE_CRI_API_METHOD(ImageFsInfo);
-};
-
-////////////////////////////////////////////////////////////////////////////////
-
-} // namespace NYT::NContainers::NCri
diff --git a/yt/yt/library/containers/cri/cri_executor.cpp b/yt/yt/library/containers/cri/cri_executor.cpp
deleted file mode 100644
index 428fd93165..0000000000
--- a/yt/yt/library/containers/cri/cri_executor.cpp
+++ /dev/null
@@ -1,666 +0,0 @@
-#include "cri_executor.h"
-#include "private.h"
-
-#include <yt/yt/core/actions/bind.h>
-
-#include <yt/yt/core/rpc/grpc/channel.h>
-
-#include <yt/yt/core/rpc/retrying_channel.h>
-
-#include <yt/yt/core/misc/error.h>
-#include <yt/yt/core/misc/proc.h>
-#include <yt/yt/core/misc/protobuf_helpers.h>
-
-#include <yt/yt/core/concurrency/periodic_executor.h>
-
-namespace NYT::NContainers::NCri {
-
-using namespace NRpc;
-using namespace NRpc::NGrpc;
-using namespace NConcurrency;
-
-////////////////////////////////////////////////////////////////////////////////
-
-void FormatValue(TStringBuilderBase* builder, const TCriDescriptor& descriptor, TStringBuf /*spec*/)
-{
- builder->AppendFormat("%v (%s)", descriptor.Id.substr(0, 12), descriptor.Name);
-}
-
-void FormatValue(TStringBuilderBase* builder, const TCriPodDescriptor& descriptor, TStringBuf /*spec*/)
-{
- builder->AppendFormat("%v (%s)", descriptor.Id.substr(0, 12), descriptor.Name);
-}
-
-void FormatValue(TStringBuilderBase* builder, const TCriImageDescriptor& descriptor, TStringBuf /*spec*/)
-{
- builder->AppendString(descriptor.Image);
-}
-
-static TError DecodeExitCode(int exitCode, const TString& reason)
-{
- if (exitCode == 0) {
- return TError();
- }
-
- // TODO(khkebnikov) map reason == "OOMKilled"
-
- // Common bash notation for signals: 128 + signal
- if (exitCode > 128) {
- int signalNumber = exitCode - 128;
- return TError(
- EProcessErrorCode::Signal,
- "Process terminated by signal %v",
- signalNumber)
- << TErrorAttribute("signal", signalNumber)
- << TErrorAttribute("reason", reason);
- }
-
- // TODO(khkebnikov) check these
- // 125 - container failed to run
- // 126 - non executable
- // 127 - command not found
- // 128 - invalid exit code
- // 255 - exit code out of range
-
- return TError(
- EProcessErrorCode::NonZeroExitCode,
- "Process exited with code %v",
- exitCode)
- << TErrorAttribute("exit_code", exitCode)
- << TErrorAttribute("reason", reason);
-}
-
-////////////////////////////////////////////////////////////////////////////////
-
-class TCriProcess
- : public TProcessBase
-{
-public:
- TCriProcess(
- const TString& path,
- ICriExecutorPtr executor,
- TCriContainerSpecPtr containerSpec,
- const TCriPodDescriptor& podDescriptor,
- TCriPodSpecPtr podSpec,
- TDuration pollPeriod = TDuration::MilliSeconds(100))
- : TProcessBase(path)
- , Executor_(std::move(executor))
- , ContainerSpec_(std::move(containerSpec))
- , PodDescriptor_(podDescriptor)
- , PodSpec_(std::move(podSpec))
- , PollPeriod_(pollPeriod)
- {
- // Just for symmetry with sibling classes.
- AddArgument(Path_);
- }
-
- void Kill(int /*signal*/) override
- {
- WaitFor(Executor_->StopContainer(ContainerDescriptor_))
- .ThrowOnError();
- }
-
- NNet::IConnectionWriterPtr GetStdInWriter() override
- {
- THROW_ERROR_EXCEPTION("Not implemented for CRI process");
- }
-
- NNet::IConnectionReaderPtr GetStdOutReader() override
- {
- THROW_ERROR_EXCEPTION("Not implemented for CRI process");
- }
-
- NNet::IConnectionReaderPtr GetStdErrReader() override
- {
- THROW_ERROR_EXCEPTION("Not implemented for CRI process");
- }
-
-private:
- const ICriExecutorPtr Executor_;
- const TCriContainerSpecPtr ContainerSpec_;
- const TCriPodDescriptor PodDescriptor_;
- const TCriPodSpecPtr PodSpec_;
- const TDuration PollPeriod_;
-
- TCriDescriptor ContainerDescriptor_;
-
- TPeriodicExecutorPtr AsyncWaitExecutor_;
-
- void DoSpawn() override
- {
- if (ContainerSpec_->Command.empty()) {
- ContainerSpec_->Command = {Path_};
- }
- ContainerSpec_->Arguments = std::vector<TString>(Args_.begin() + 1, Args_.end());
- ContainerSpec_->WorkingDirectory = WorkingDirectory_;
-
- ContainerSpec_->BindMounts.emplace_back(
- NCri::TCriBindMount {
- .ContainerPath = WorkingDirectory_,
- .HostPath = WorkingDirectory_,
- .ReadOnly = false,
- }
- );
-
- for (const auto& keyVal : Env_) {
- TStringBuf key, val;
- if (TStringBuf(keyVal).TrySplit('=', key, val)) {
- ContainerSpec_->Environment[key] = val;
- }
- }
-
- ContainerDescriptor_ = WaitFor(Executor_->CreateContainer(ContainerSpec_, PodDescriptor_, PodSpec_))
- .ValueOrThrow();
-
- YT_LOG_DEBUG("Spawning process (Command: %v, Container: %v)", ContainerSpec_->Command[0], ContainerDescriptor_);
- WaitFor(Executor_->StartContainer(ContainerDescriptor_))
- .ThrowOnError();
-
- // TODO(khkebnikov) replace polling with CRI event
- AsyncWaitExecutor_ = New<TPeriodicExecutor>(
- GetSyncInvoker(),
- BIND(&TCriProcess::PollContainerStatus, MakeStrong(this)),
- PollPeriod_);
-
- AsyncWaitExecutor_->Start();
- }
-
- void PollContainerStatus()
- {
- Executor_->GetContainerStatus(ContainerDescriptor_)
- .SubscribeUnique(BIND(&TCriProcess::OnContainerStatus, MakeStrong(this)));
- }
-
- void OnContainerStatus(TErrorOr<TCriRuntimeApi::TRspContainerStatusPtr>&& responseOrError)
- {
- auto response = responseOrError.ValueOrThrow();
- if (!response->has_status()) {
- return;
- }
- auto status = response->status();
- if (status.state() == NProto::CONTAINER_EXITED) {
- auto error = DecodeExitCode(status.exit_code(), status.reason());
- YT_LOG_DEBUG(error, "Process finished (Container: %v)", ContainerDescriptor_);
- YT_UNUSED_FUTURE(AsyncWaitExecutor_->Stop());
- FinishedPromise_.TrySet(error);
- }
- }
-};
-
-DEFINE_REFCOUNTED_TYPE(TCriProcess)
-
-////////////////////////////////////////////////////////////////////////////////
-
-class TCriExecutor
- : public ICriExecutor
-{
-public:
- TCriExecutor(
- TCriExecutorConfigPtr config,
- IChannelFactoryPtr channelFactory)
- : Config_(std::move(config))
- , RuntimeApi_(CreateRetryingChannel(Config_, channelFactory->CreateChannel(Config_->RuntimeEndpoint)))
- , ImageApi_(CreateRetryingChannel(Config_, channelFactory->CreateChannel(Config_->ImageEndpoint)))
- { }
-
- TString GetPodCgroup(TString podName) const override
- {
- TStringBuilder cgroup;
- cgroup.AppendString(Config_->BaseCgroup);
- cgroup.AppendString("/");
- cgroup.AppendString(podName);
- if (Config_->BaseCgroup.EndsWith(SystemdSliceSuffix)) {
- cgroup.AppendString(SystemdSliceSuffix);
- }
- return cgroup.Flush();
- }
-
- TFuture<TCriRuntimeApi::TRspStatusPtr> GetRuntimeStatus(bool verbose = false) override
- {
- auto req = RuntimeApi_.Status();
- req->set_verbose(verbose);
- return req->Invoke();
- }
-
- TFuture<TCriRuntimeApi::TRspListPodSandboxPtr> ListPodSandbox(
- std::function<void(NProto::PodSandboxFilter&)> initFilter = nullptr) override
- {
- auto req = RuntimeApi_.ListPodSandbox();
-
- {
- auto* filter = req->mutable_filter();
-
- if (auto namespace_ = Config_->Namespace) {
- auto& labels = *filter->mutable_label_selector();
- labels[YTPodNamespaceLabel] = namespace_;
- }
-
- if (initFilter) {
- initFilter(*filter);
- }
- }
-
- return req->Invoke();
- }
-
- TFuture<TCriRuntimeApi::TRspListContainersPtr> ListContainers(
- std::function<void(NProto::ContainerFilter&)> initFilter = nullptr) override
- {
- auto req = RuntimeApi_.ListContainers();
-
- {
- auto* filter = req->mutable_filter();
-
- if (auto namespace_ = Config_->Namespace) {
- auto& labels = *filter->mutable_label_selector();
- labels[YTPodNamespaceLabel] = namespace_;
- }
-
- if (initFilter) {
- initFilter(*filter);
- }
- }
-
- return req->Invoke();
- }
-
- TFuture<void> ForEachPodSandbox(
- const TCallback<void(const TCriPodDescriptor&, const NProto::PodSandbox&)>& callback,
- std::function<void(NProto::PodSandboxFilter&)> initFilter) override
- {
- return ListPodSandbox(initFilter).Apply(BIND([=] (const TCriRuntimeApi::TRspListPodSandboxPtr& rsp) {
- for (const auto& pod : rsp->items()) {
- TCriPodDescriptor descriptor{.Name=pod.metadata().name(), .Id=pod.id()};
- callback(descriptor, pod);
- }
- }));
- }
-
- TFuture<void> ForEachContainer(
- const TCallback<void(const TCriDescriptor&, const NProto::Container&)>& callback,
- std::function<void(NProto::ContainerFilter&)> initFilter = nullptr) override
- {
- return ListContainers(initFilter).Apply(BIND([=] (const TCriRuntimeApi::TRspListContainersPtr& rsp) {
- for (const auto& ct : rsp->containers()) {
- TCriDescriptor descriptor{.Name=ct.metadata().name(), .Id=ct.id()};
- callback(descriptor, ct);
- }
- }));
- }
-
- TFuture<TCriRuntimeApi::TRspPodSandboxStatusPtr> GetPodSandboxStatus(
- const TCriPodDescriptor& podDescriptor, bool verbose = false) override
- {
- auto req = RuntimeApi_.PodSandboxStatus();
- req->set_pod_sandbox_id(podDescriptor.Id);
- req->set_verbose(verbose);
- return req->Invoke();
- }
-
- TFuture<TCriRuntimeApi::TRspContainerStatusPtr> GetContainerStatus(
- const TCriDescriptor& descriptor, bool verbose = false) override
- {
- auto req = RuntimeApi_.ContainerStatus();
- req->set_container_id(descriptor.Id);
- req->set_verbose(verbose);
- return req->Invoke();
- }
-
- TFuture<TCriPodDescriptor> RunPodSandbox(TCriPodSpecPtr podSpec) override
- {
- auto req = RuntimeApi_.RunPodSandbox();
-
- FillPodSandboxConfig(req->mutable_config(), *podSpec);
-
- if (Config_->RuntimeHandler) {
- req->set_runtime_handler(Config_->RuntimeHandler);
- }
-
- return req->Invoke().Apply(BIND([name = podSpec->Name] (const TCriRuntimeApi::TRspRunPodSandboxPtr& rsp) -> TCriPodDescriptor {
- return TCriPodDescriptor{.Name = name, .Id = rsp->pod_sandbox_id()};
- }));
- }
-
- TFuture<void> StopPodSandbox(const TCriPodDescriptor& podDescriptor) override
- {
- auto req = RuntimeApi_.StopPodSandbox();
- req->set_pod_sandbox_id(podDescriptor.Id);
- return req->Invoke().AsVoid();
- }
-
- TFuture<void> RemovePodSandbox(const TCriPodDescriptor& podDescriptor) override
- {
- auto req = RuntimeApi_.RemovePodSandbox();
- req->set_pod_sandbox_id(podDescriptor.Id);
- return req->Invoke().AsVoid();
- }
-
- TFuture<void> UpdatePodResources(
- const TCriPodDescriptor& /*pod*/,
- const TCriContainerResources& /*resources*/) override
- {
- return MakeFuture(TError("Not implemented"));
- }
-
- TFuture<TCriDescriptor> CreateContainer(
- TCriContainerSpecPtr ctSpec,
- const TCriPodDescriptor& podDescriptor,
- TCriPodSpecPtr podSpec) override
- {
- auto req = RuntimeApi_.CreateContainer();
- req->set_pod_sandbox_id(podDescriptor.Id);
-
- auto* config = req->mutable_config();
-
- {
- auto* metadata = config->mutable_metadata();
- metadata->set_name(ctSpec->Name);
- }
-
- {
- auto& labels = *config->mutable_labels();
-
- for (const auto& [key, val] : ctSpec->Labels) {
- labels[key] = val;
- }
-
- labels[YTPodNamespaceLabel] = Config_->Namespace;
- labels[YTPodNameLabel] = podSpec->Name;
- labels[YTContainerNameLabel] = ctSpec->Name;
- }
-
- FillImageSpec(config->mutable_image(), ctSpec->Image);
-
- for (const auto& mountSpec : ctSpec->BindMounts) {
- auto* mount = config->add_mounts();
- mount->set_container_path(mountSpec.ContainerPath);
- mount->set_host_path(mountSpec.HostPath);
- mount->set_readonly(mountSpec.ReadOnly);
- mount->set_propagation(NProto::PROPAGATION_PRIVATE);
- }
-
- {
- ToProto(config->mutable_command(), ctSpec->Command);
- ToProto(config->mutable_args(), ctSpec->Arguments);
-
- config->set_working_dir(ctSpec->WorkingDirectory);
-
- for (const auto& [key, val] : ctSpec->Environment) {
- auto* env = config->add_envs();
- env->set_key(key);
- env->set_value(val);
- }
- }
-
- {
- auto* linux = config->mutable_linux();
- FillLinuxContainerResources(linux->mutable_resources(), ctSpec->Resources);
-
- auto* security = linux->mutable_security_context();
-
- auto* namespaces = security->mutable_namespace_options();
- namespaces->set_network(NProto::NODE);
-
- security->set_readonly_rootfs(ctSpec->ReadOnlyRootFS);
-
- if (ctSpec->Credentials.Uid) {
- security->mutable_run_as_user()->set_value(*ctSpec->Credentials.Uid);
- }
- if (ctSpec->Credentials.Gid) {
- security->mutable_run_as_group()->set_value(*ctSpec->Credentials.Gid);
- }
- ToProto(security->mutable_supplemental_groups(), ctSpec->Credentials.Groups);
- }
-
- FillPodSandboxConfig(req->mutable_sandbox_config(), *podSpec);
-
- return req->Invoke().Apply(BIND([name = ctSpec->Name] (const TCriRuntimeApi::TRspCreateContainerPtr& rsp) -> TCriDescriptor {
- return TCriDescriptor{.Name = "", .Id = rsp->container_id()};
- }));
- }
-
- TFuture<void> StartContainer(const TCriDescriptor& descriptor) override
- {
- auto req = RuntimeApi_.StartContainer();
- req->set_container_id(descriptor.Id);
- return req->Invoke().AsVoid();
- }
-
- TFuture<void> StopContainer(const TCriDescriptor& descriptor, TDuration timeout) override
- {
- auto req = RuntimeApi_.StopContainer();
- req->set_container_id(descriptor.Id);
- req->set_timeout(timeout.Seconds());
- return req->Invoke().AsVoid();
- }
-
- TFuture<void> RemoveContainer(const TCriDescriptor& descriptor) override
- {
- auto req = RuntimeApi_.RemoveContainer();
- req->set_container_id(descriptor.Id);
- return req->Invoke().AsVoid();
- }
-
- TFuture<void> UpdateContainerResources(const TCriDescriptor& descriptor, const TCriContainerResources& resources) override
- {
- auto req = RuntimeApi_.UpdateContainerResources();
- req->set_container_id(descriptor.Id);
- FillLinuxContainerResources(req->mutable_linux(), resources);
- return req->Invoke().AsVoid();
- }
-
- void CleanNamespace() override
- {
- YT_VERIFY(Config_->Namespace);
- auto pods = WaitFor(ListPodSandbox())
- .ValueOrThrow();
-
- {
- std::vector<TFuture<void>> futures;
- futures.reserve(pods->items_size());
- for (const auto& pod : pods->items()) {
- TCriPodDescriptor podDescriptor{.Name = pod.metadata().name(), .Id = pod.id() };
- futures.push_back(StopPodSandbox(podDescriptor));
- }
- WaitFor(AllSucceeded(std::move(futures)))
- .ThrowOnError();
- }
-
- {
- std::vector<TFuture<void>> futures;
- futures.reserve(pods->items_size());
- for (const auto& pod : pods->items()) {
- TCriPodDescriptor podDescriptor{.Name = pod.metadata().name(), .Id = pod.id()};
- futures.push_back(RemovePodSandbox(podDescriptor));
- }
- WaitFor(AllSucceeded(std::move(futures)))
- .ThrowOnError();
- }
- }
-
- void CleanPodSandbox(const TCriPodDescriptor& podDescriptor) override
- {
- auto containers = WaitFor(ListContainers([=] (NProto::ContainerFilter& filter) {
- filter.set_pod_sandbox_id(podDescriptor.Id);
- }))
- .ValueOrThrow();
-
- {
- std::vector<TFuture<void>> futures;
- futures.reserve(containers->containers_size());
- for (const auto& ct : containers->containers()) {
- TCriDescriptor ctDescriptor{.Name = ct.metadata().name(), .Id = ct.id()};
- futures.push_back(StopContainer(ctDescriptor, TDuration::Zero()));
- }
- WaitFor(AllSucceeded(std::move(futures)))
- .ThrowOnError();
- }
-
- {
- std::vector<TFuture<void>> futures;
- futures.reserve(containers->containers_size());
- for (const auto& ct : containers->containers()) {
- TCriDescriptor ctDescriptor{.Name = ct.metadata().name(), .Id = ct.id()};
- futures.push_back(RemoveContainer(ctDescriptor));
- }
- WaitFor(AllSucceeded(std::move(futures)))
- .ThrowOnError();
- }
- }
-
- TFuture<TCriImageApi::TRspListImagesPtr> ListImages(
- std::function<void(NProto::ImageFilter&)> initFilter = nullptr) override
- {
- auto req = ImageApi_.ListImages();
- if (initFilter) {
- initFilter(*req->mutable_filter());
- }
- return req->Invoke();
- }
-
- TFuture<TCriImageApi::TRspImageStatusPtr> GetImageStatus(
- const TCriImageDescriptor& image,
- bool verbose = false) override
- {
- auto req = ImageApi_.ImageStatus();
- FillImageSpec(req->mutable_image(), image);
- req->set_verbose(verbose);
- return req->Invoke();
- }
-
- TFuture<TCriImageDescriptor> PullImage(
- const TCriImageDescriptor& image,
- bool always,
- TCriAuthConfigPtr authConfig,
- TCriPodSpecPtr podSpec) override
- {
- if (!always) {
- return GetImageStatus(image)
- .Apply(BIND([=, this, this_ = MakeStrong(this)] (const TCriImageApi::TRspImageStatusPtr& imageStatus) {
- if (imageStatus->has_image()) {
- return MakeFuture(TCriImageDescriptor{.Image = imageStatus->image().id()});
- }
- return PullImage(image, /*always*/ true, authConfig, podSpec);
- }));
- }
-
- auto req = ImageApi_.PullImage();
- FillImageSpec(req->mutable_image(), image);
- if (authConfig) {
- FillAuthConfig(req->mutable_auth(), *authConfig);
- }
- if (podSpec) {
- FillPodSandboxConfig(req->mutable_sandbox_config(), *podSpec);
- }
- return req->Invoke().Apply(BIND([] (const TCriImageApi::TRspPullImagePtr& rsp) -> TCriImageDescriptor {
- return TCriImageDescriptor{.Image = rsp->image_ref()};
- }));
- }
-
- TFuture<void> RemoveImage(const TCriImageDescriptor& image) override
- {
- auto req = ImageApi_.RemoveImage();
- FillImageSpec(req->mutable_image(), image);
- return req->Invoke().AsVoid();
- }
-
- TProcessBasePtr CreateProcess(
- const TString& path,
- TCriContainerSpecPtr containerSpec,
- const TCriPodDescriptor& podDescriptor,
- TCriPodSpecPtr podSpec) override
- {
- return New<TCriProcess>(path, this, std::move(containerSpec), podDescriptor, std::move(podSpec));
- }
-
-private:
- const TCriExecutorConfigPtr Config_;
- TCriRuntimeApi RuntimeApi_;
- TCriImageApi ImageApi_;
-
- void FillLinuxContainerResources(NProto::LinuxContainerResources* resources, const TCriContainerResources& spec)
- {
- auto* unified = resources->mutable_unified();
-
- if (spec.CpuLimit) {
- i64 period = Config_->CpuPeriod.MicroSeconds();
- i64 quota = period * *spec.CpuLimit;
-
- resources->set_cpu_period(period);
- resources->set_cpu_quota(quota);
- }
-
- if (spec.MemoryLimit) {
- resources->set_memory_limit_in_bytes(*spec.MemoryLimit);
- }
-
- if (spec.MemoryRequest) {
- (*unified)["memory.low"] = ToString(*spec.MemoryRequest);
- }
- }
-
- void FillPodSandboxConfig(NProto::PodSandboxConfig* config, const TCriPodSpec& spec)
- {
- {
- auto* metadata = config->mutable_metadata();
- metadata->set_namespace_(Config_->Namespace);
- metadata->set_name(spec.Name);
- metadata->set_uid(spec.Name);
- }
-
- {
- auto& labels = *config->mutable_labels();
- labels[YTPodNamespaceLabel] = Config_->Namespace;
- labels[YTPodNameLabel] = spec.Name;
- }
-
- {
- auto* linux = config->mutable_linux();
- linux->set_cgroup_parent(GetPodCgroup(spec.Name));
-
- auto* security = linux->mutable_security_context();
- auto* namespaces = security->mutable_namespace_options();
- namespaces->set_network(NProto::NODE);
- }
- }
-
- void FillImageSpec(NProto::ImageSpec* spec, const TCriImageDescriptor& image)
- {
- spec->set_image(image.Image);
- }
-
- void FillAuthConfig(NProto::AuthConfig* auth, const TCriAuthConfig& authConfig)
- {
- if (!authConfig.Username.empty()) {
- auth->set_username(authConfig.Username);
- }
- if (!authConfig.Password.empty()) {
- auth->set_password(authConfig.Password);
- }
- if (!authConfig.Auth.empty()) {
- auth->set_auth(authConfig.Auth);
- }
- if (!authConfig.ServerAddress.empty()) {
- auth->set_server_address(authConfig.ServerAddress);
- }
- if (!authConfig.IdentityToken.empty()) {
- auth->set_identity_token(authConfig.IdentityToken);
- }
- if (!authConfig.RegistryToken.empty()) {
- auth->set_registry_token(authConfig.RegistryToken);
- }
- }
-};
-
-////////////////////////////////////////////////////////////////////////////////
-
-ICriExecutorPtr CreateCriExecutor(TCriExecutorConfigPtr config)
-{
- return New<TCriExecutor>(
- std::move(config),
- GetGrpcChannelFactory());
-}
-
-////////////////////////////////////////////////////////////////////////////////
-
-} // namespace NYT::NContainers::NCri
diff --git a/yt/yt/library/containers/cri/cri_executor.h b/yt/yt/library/containers/cri/cri_executor.h
deleted file mode 100644
index de9741721f..0000000000
--- a/yt/yt/library/containers/cri/cri_executor.h
+++ /dev/null
@@ -1,207 +0,0 @@
-#pragma once
-
-#include "public.h"
-#include "config.h"
-#include "cri_api.h"
-
-#include <yt/yt/library/process/process.h>
-
-#include <yt/yt/core/ytree/yson_struct.h>
-
-namespace NYT::NContainers::NCri {
-
-////////////////////////////////////////////////////////////////////////////////
-
-struct TCriDescriptor
-{
- TString Name;
- TString Id;
-};
-
-struct TCriPodDescriptor
-{
- TString Name;
- TString Id;
-};
-
-struct TCriImageDescriptor
-{
- TString Image;
-};
-
-void FormatValue(TStringBuilderBase* builder, const TCriDescriptor& descriptor, TStringBuf spec);
-void FormatValue(TStringBuilderBase* builder, const TCriPodDescriptor& descriptor, TStringBuf spec);
-void FormatValue(TStringBuilderBase* builder, const TCriImageDescriptor& descriptor, TStringBuf spec);
-
-////////////////////////////////////////////////////////////////////////////////
-
-struct TCriContainerResources
-{
- std::optional<double> CpuLimit;
- std::optional<double> CpuRequest;
- std::optional<i64> MemoryLimit;
- std::optional<i64> MemoryRequest;
-};
-
-struct TCriPodSpec
- : public TRefCounted
-{
- TString Name;
- TCriContainerResources Resources;
-};
-
-DEFINE_REFCOUNTED_TYPE(TCriPodSpec)
-
-struct TCriBindMount
-{
- TString ContainerPath;
- TString HostPath;
- bool ReadOnly;
-};
-
-struct TCriCredentials
-{
- std::optional<i64> Uid;
- std::optional<i64> Gid;
- std::vector<i64> Groups;
-};
-
-struct TCriContainerSpec
- : public TRefCounted
-{
- TString Name;
-
- THashMap<TString, TString> Labels;
-
- TCriImageDescriptor Image;
-
- bool ReadOnlyRootFS;
-
- std::vector<TCriBindMount> BindMounts;
-
- TCriCredentials Credentials;
-
- TCriContainerResources Resources;
-
- //! Command to execute (i.e., entrypoint for docker).
- std::vector<TString> Command;
-
- //! Arguments for the Command (i.e., command for docker).
- std::vector<TString> Arguments;
-
- //! Current working directory of the command.
- TString WorkingDirectory;
-
- //! Environment variable to set in the container.
- THashMap<TString, TString> Environment;
-};
-
-DEFINE_REFCOUNTED_TYPE(TCriContainerSpec)
-
-////////////////////////////////////////////////////////////////////////////////
-
-//! Wrapper around CRI gRPC API
-//!
-//! @see yt/yt/contrib/cri-api/k8s.io/cri-api/pkg/apis/runtime/v1/api.proto
-//! @see https://github.com/kubernetes/cri-api
-struct ICriExecutor
- : public TRefCounted
-{
- //! Returns status of the CRI runtime.
- //! @param verbose fill field "info" with runtime-specific debug.
- virtual TFuture<TCriRuntimeApi::TRspStatusPtr> GetRuntimeStatus(bool verbose = false) = 0;
-
- // PodSandbox
-
- virtual TString GetPodCgroup(TString podName) const = 0;
-
- virtual TFuture<TCriRuntimeApi::TRspListPodSandboxPtr> ListPodSandbox(
- std::function<void(NProto::PodSandboxFilter&)> initFilter = nullptr) = 0;
-
- virtual TFuture<TCriRuntimeApi::TRspListContainersPtr> ListContainers(
- std::function<void(NProto::ContainerFilter&)> initFilter = nullptr) = 0;
-
- virtual TFuture<void> ForEachPodSandbox(
- const TCallback<void(const TCriPodDescriptor&, const NProto::PodSandbox&)>& callback,
- std::function<void(NProto::PodSandboxFilter&)> initFilter = nullptr) = 0;
-
- virtual TFuture<void> ForEachContainer(
- const TCallback<void(const TCriDescriptor&, const NProto::Container&)>& callback,
- std::function<void(NProto::ContainerFilter&)> initFilter = nullptr) = 0;
-
- //! Returns status of the pod.
- //! @param verbose fill field "info" with runtime-specific debug.
- virtual TFuture<TCriRuntimeApi::TRspPodSandboxStatusPtr> GetPodSandboxStatus(
- const TCriPodDescriptor& pod, bool verbose = false) = 0;
-
- //! Returns status of the container.
- //! @param verbose fill "info" with runtime-specific debug information.
- virtual TFuture<TCriRuntimeApi::TRspContainerStatusPtr> GetContainerStatus(
- const TCriDescriptor& ct, bool verbose = false) = 0;
-
- virtual TFuture<TCriPodDescriptor> RunPodSandbox(TCriPodSpecPtr podSpec) = 0;
- virtual TFuture<void> StopPodSandbox(const TCriPodDescriptor& pod) = 0;
- virtual TFuture<void> RemovePodSandbox(const TCriPodDescriptor& pod) = 0;
- virtual TFuture<void> UpdatePodResources(
- const TCriPodDescriptor& pod,
- const TCriContainerResources& resources) = 0;
-
- //! Remove all pods and containers in namespace managed by executor.
- virtual void CleanNamespace() = 0;
-
- //! Remove all containers in one pod.
- virtual void CleanPodSandbox(const TCriPodDescriptor& pod) = 0;
-
- virtual TFuture<TCriDescriptor> CreateContainer(
- TCriContainerSpecPtr containerSpec,
- const TCriPodDescriptor& pod,
- TCriPodSpecPtr podSpec) = 0;
-
- virtual TFuture<void> StartContainer(const TCriDescriptor& ct) = 0;
-
- //! Stops container if it's running.
- //! @param timeout defines timeout for graceful stop, timeout=0 - kill instantly.
- virtual TFuture<void> StopContainer(
- const TCriDescriptor& ct,
- TDuration timeout = TDuration::Zero()) = 0;
-
- virtual TFuture<void> RemoveContainer(const TCriDescriptor& ct) = 0;
-
- virtual TFuture<void> UpdateContainerResources(
- const TCriDescriptor& ct,
- const TCriContainerResources& resources) = 0;
-
- virtual TFuture<TCriImageApi::TRspListImagesPtr> ListImages(
- std::function<void(NProto::ImageFilter&)> initFilter = nullptr) = 0;
-
- //! Returns status of the image.
- //! @param verbose fill field "info" with runtime-specific debug.
- virtual TFuture<TCriImageApi::TRspImageStatusPtr> GetImageStatus(
- const TCriImageDescriptor& image,
- bool verbose = false) = 0;
-
- virtual TFuture<TCriImageDescriptor> PullImage(
- const TCriImageDescriptor& image,
- bool always = false,
- TCriAuthConfigPtr authConfig = nullptr,
- TCriPodSpecPtr podSpec = nullptr) = 0;
-
- virtual TFuture<void> RemoveImage(const TCriImageDescriptor& image) = 0;
-
- // FIXME(khlebnikov): temporary compat
- virtual TProcessBasePtr CreateProcess(
- const TString& path,
- TCriContainerSpecPtr containerSpec,
- const TCriPodDescriptor& pod,
- TCriPodSpecPtr podSpec) = 0;
-};
-
-DEFINE_REFCOUNTED_TYPE(ICriExecutor)
-
-////////////////////////////////////////////////////////////////////////////////
-
-ICriExecutorPtr CreateCriExecutor(TCriExecutorConfigPtr config);
-
-////////////////////////////////////////////////////////////////////////////////
-
-} // namespace NYT::NContainers::NCri
diff --git a/yt/yt/library/containers/cri/private.h b/yt/yt/library/containers/cri/private.h
deleted file mode 100644
index 36fdf194f5..0000000000
--- a/yt/yt/library/containers/cri/private.h
+++ /dev/null
@@ -1,13 +0,0 @@
-#pragma once
-
-#include <yt/yt/core/logging/log.h>
-
-namespace NYT::NContainers::NCri {
-
-////////////////////////////////////////////////////////////////////////////////
-
-inline const NLogging::TLogger Logger("Cri");
-
-////////////////////////////////////////////////////////////////////////////////
-
-} // namespace NYT::NContainers::NCri
diff --git a/yt/yt/library/containers/cri/public.h b/yt/yt/library/containers/cri/public.h
deleted file mode 100644
index a12ee86d57..0000000000
--- a/yt/yt/library/containers/cri/public.h
+++ /dev/null
@@ -1,17 +0,0 @@
-#pragma once
-
-#include <yt/yt/core/misc/intrusive_ptr.h>
-
-namespace NYT::NContainers::NCri {
-
-////////////////////////////////////////////////////////////////////////////////
-
-DECLARE_REFCOUNTED_STRUCT(TCriPodSpec)
-DECLARE_REFCOUNTED_STRUCT(TCriContainerSpec)
-DECLARE_REFCOUNTED_CLASS(TCriExecutorConfig)
-DECLARE_REFCOUNTED_CLASS(TCriAuthConfig)
-DECLARE_REFCOUNTED_STRUCT(ICriExecutor)
-
-////////////////////////////////////////////////////////////////////////////////
-
-} // namespace NYT::NContainers::NCri
diff --git a/yt/yt/library/containers/cri/ya.make b/yt/yt/library/containers/cri/ya.make
deleted file mode 100644
index dc9dd15a0b..0000000000
--- a/yt/yt/library/containers/cri/ya.make
+++ /dev/null
@@ -1,22 +0,0 @@
-LIBRARY()
-
-INCLUDE(${ARCADIA_ROOT}/yt/ya_cpp.make.inc)
-
-PEERDIR(
- yt/yt/core
- yt/yt/core/rpc/grpc
- yt/yt/contrib/cri-api
-)
-
-SRCS(
- cri_api.cpp
- cri_executor.cpp
- config.cpp
-)
-
-ADDINCL(
- ONE_LEVEL
- yt/yt/contrib/cri-api
-)
-
-END()
diff --git a/yt/yt/library/containers/disk_manager/config.cpp b/yt/yt/library/containers/disk_manager/config.cpp
deleted file mode 100644
index 84484db630..0000000000
--- a/yt/yt/library/containers/disk_manager/config.cpp
+++ /dev/null
@@ -1,61 +0,0 @@
-#include "config.h"
-
-namespace NYT::NContainers {
-
-////////////////////////////////////////////////////////////////////////////////
-
-void TMockedDiskConfig::Register(TRegistrar registrar)
-{
- registrar.Parameter("disk_id", &TThis::DiskId)
- .Default();
- registrar.Parameter("device_path", &TThis::DevicePath)
- .Default();
- registrar.Parameter("device_name", &TThis::DeviceName)
- .Default();
- registrar.Parameter("disk_model", &TThis::DiskModel)
- .Default();
- registrar.Parameter("partition_fs_labels", &TThis::PartitionFsLabels)
- .Default();
- registrar.Parameter("state", &TThis::State)
- .Default(EDiskState::Ok);
-}
-
-////////////////////////////////////////////////////////////////////////////////
-
-void TDiskInfoProviderConfig::Register(TRegistrar registrar)
-{
- registrar.Parameter("disk_ids", &TThis::DiskIds)
- .Default();
-}
-
-////////////////////////////////////////////////////////////////////////////////
-
-void TDiskManagerProxyConfig::Register(TRegistrar registrar)
-{
- registrar.Parameter("disk_manager_address", &TThis::DiskManagerAddress)
- .Default("unix:/run/yandex-diskmanager/yandex-diskmanager.sock");
- registrar.Parameter("disk_manager_service_name", &TThis::DiskManagerServiceName)
- .Default("diskman.DiskManager");
-
- registrar.Parameter("is_mock", &TThis::IsMock)
- .Default(false);
- registrar.Parameter("mock_disks", &TThis::MockDisks)
- .Default();
- registrar.Parameter("mock_yt_paths", &TThis::MockYtPaths)
- .Default();
-
- registrar.Parameter("request_timeout", &TThis::RequestTimeout)
- .Default(TDuration::Seconds(10));
-}
-
-////////////////////////////////////////////////////////////////////////////////
-
-void TDiskManagerProxyDynamicConfig::Register(TRegistrar registrar)
-{
- registrar.Parameter("request_timeout", &TThis::RequestTimeout)
- .Default();
-}
-
-////////////////////////////////////////////////////////////////////////////////
-
-} // namespace NYT::NContainers
diff --git a/yt/yt/library/containers/disk_manager/config.h b/yt/yt/library/containers/disk_manager/config.h
deleted file mode 100644
index 4f01d378b9..0000000000
--- a/yt/yt/library/containers/disk_manager/config.h
+++ /dev/null
@@ -1,79 +0,0 @@
-#pragma once
-
-#include "public.h"
-
-#include <yt/yt/core/ytree/yson_struct.h>
-
-namespace NYT::NContainers {
-
-////////////////////////////////////////////////////////////////////////////////
-
-struct TMockedDiskConfig
- : public NYTree::TYsonStruct
-{
- TString DiskId;
- TString DevicePath;
- TString DeviceName;
- TString DiskModel;
- std::vector<TString> PartitionFsLabels;
- EDiskState State;
-
- REGISTER_YSON_STRUCT(TMockedDiskConfig);
-
- static void Register(TRegistrar registrar);
-};
-
-DEFINE_REFCOUNTED_TYPE(TMockedDiskConfig)
-
-////////////////////////////////////////////////////////////////////////////////
-
-struct TDiskManagerProxyConfig
- : public NYTree::TYsonStruct
-{
- TString DiskManagerAddress;
- TString DiskManagerServiceName;
-
- bool IsMock;
- std::vector<TMockedDiskConfigPtr> MockDisks;
- std::vector<TString> MockYtPaths;
-
- TDuration RequestTimeout;
-
- REGISTER_YSON_STRUCT(TDiskManagerProxyConfig);
-
- static void Register(TRegistrar registrar);
-};
-
-DEFINE_REFCOUNTED_TYPE(TDiskManagerProxyConfig)
-
-////////////////////////////////////////////////////////////////////////////////
-
-struct TDiskInfoProviderConfig
- : public NYTree::TYsonStruct
-{
- std::vector<TString> DiskIds;
-
- REGISTER_YSON_STRUCT(TDiskInfoProviderConfig);
-
- static void Register(TRegistrar registrar);
-};
-
-DEFINE_REFCOUNTED_TYPE(TDiskInfoProviderConfig)
-
-////////////////////////////////////////////////////////////////////////////////
-
-struct TDiskManagerProxyDynamicConfig
- : public NYTree::TYsonStruct
-{
- std::optional<TDuration> RequestTimeout;
-
- REGISTER_YSON_STRUCT(TDiskManagerProxyDynamicConfig);
-
- static void Register(TRegistrar registrar);
-};
-
-DEFINE_REFCOUNTED_TYPE(TDiskManagerProxyDynamicConfig)
-
-////////////////////////////////////////////////////////////////////////////////
-
-} // namespace NYT::NContainers
diff --git a/yt/yt/library/containers/disk_manager/disk_info_provider.cpp b/yt/yt/library/containers/disk_manager/disk_info_provider.cpp
deleted file mode 100644
index 0ee3a5b6cb..0000000000
--- a/yt/yt/library/containers/disk_manager/disk_info_provider.cpp
+++ /dev/null
@@ -1,64 +0,0 @@
-#include "disk_info_provider.h"
-
-#include <yt/yt/library/containers/disk_manager/disk_manager_proxy.h>
-
-#include <yt/yt/core/actions/future.h>
-#include <yt/yt/core/actions/invoker_util.h>
-
-#include <yt/yt/core/concurrency/public.h>
-
-namespace NYT::NContainers {
-
-////////////////////////////////////////////////////////////////////////////////
-
-TDiskInfoProvider::TDiskInfoProvider(
- IDiskManagerProxyPtr diskManagerProxy,
- TDiskInfoProviderConfigPtr config)
- : DiskManagerProxy_(std::move(diskManagerProxy))
- , Config_(std::move(config))
-{ }
-
-const std::vector<TString>& TDiskInfoProvider::GetConfigDiskIds() const
-{
- return Config_->DiskIds;
-}
-
-TFuture<std::vector<TDiskInfo>> TDiskInfoProvider::GetYTDiskInfos()
-{
- auto diskInfosFuture = DiskManagerProxy_->GetDisks();
- auto ytDiskPathsFuture = DiskManagerProxy_->GetYtDiskMountPaths();
-
- // Merge two futures and filter disks placed in /yt.
- return diskInfosFuture.Apply(BIND([=] (const std::vector<TDiskInfo>& diskInfos) {
- return ytDiskPathsFuture.Apply(BIND([=] (const THashSet<TString>& diskPaths) {
- std::vector<TDiskInfo> disks;
-
- for (const auto& diskInfo : diskInfos) {
- for (const auto& partitionFsLabel : diskInfo.PartitionFsLabels) {
- if (diskPaths.contains(partitionFsLabel)) {
- disks.push_back(diskInfo);
- break;
- }
- }
- }
-
- return disks;
- }));
- }));
-}
-
-TFuture<void> TDiskInfoProvider::RecoverDisk(const TString& diskId)
-{
- return DiskManagerProxy_->RecoverDiskById(diskId, ERecoverPolicy::RecoverAuto);
-}
-
-TFuture<void> TDiskInfoProvider::FailDisk(
- const TString& diskId,
- const TString& reason)
-{
- return DiskManagerProxy_->FailDiskById(diskId, reason);
-}
-
-////////////////////////////////////////////////////////////////////////////////
-
-} // namespace NYT::NContainers
diff --git a/yt/yt/library/containers/disk_manager/disk_info_provider.h b/yt/yt/library/containers/disk_manager/disk_info_provider.h
deleted file mode 100644
index b8d686438d..0000000000
--- a/yt/yt/library/containers/disk_manager/disk_info_provider.h
+++ /dev/null
@@ -1,38 +0,0 @@
-#pragma once
-
-#include "public.h"
-
-#include <yt/yt/core/actions/future.h>
-
-namespace NYT::NContainers {
-
-////////////////////////////////////////////////////////////////////////////////
-
-class TDiskInfoProvider
- : public TRefCounted
-{
-public:
- TDiskInfoProvider(
- IDiskManagerProxyPtr diskManagerProxy,
- TDiskInfoProviderConfigPtr config);
-
- const std::vector<TString>& GetConfigDiskIds() const;
-
- TFuture<std::vector<TDiskInfo>> GetYTDiskInfos();
-
- TFuture<void> RecoverDisk(const TString& diskId);
-
- TFuture<void> FailDisk(
- const TString& diskId,
- const TString& reason);
-
-private:
- const IDiskManagerProxyPtr DiskManagerProxy_;
- const TDiskInfoProviderConfigPtr Config_;
-};
-
-DEFINE_REFCOUNTED_TYPE(TDiskInfoProvider)
-
-////////////////////////////////////////////////////////////////////////////////
-
-} // namespace NYT::NContainers
diff --git a/yt/yt/library/containers/disk_manager/disk_manager_proxy.cpp b/yt/yt/library/containers/disk_manager/disk_manager_proxy.cpp
deleted file mode 100644
index 961723c51f..0000000000
--- a/yt/yt/library/containers/disk_manager/disk_manager_proxy.cpp
+++ /dev/null
@@ -1,49 +0,0 @@
-#include "disk_manager_proxy.h"
-
-namespace NYT::NContainers {
-
-////////////////////////////////////////////////////////////////////////////////
-
-struct TDiskManagerProxyMock
- : public IDiskManagerProxy
-{
- virtual TFuture<THashSet<TString>> GetYtDiskMountPaths()
- {
- THROW_ERROR_EXCEPTION("Disk manager library is not available under this build configuration");
- }
-
- virtual TFuture<std::vector<TDiskInfo>> GetDisks()
- {
- THROW_ERROR_EXCEPTION("Disk manager library is not available under this build configuration");
- }
-
- virtual TFuture<void> RecoverDiskById(const TString& /*diskId*/, ERecoverPolicy /*recoverPolicy*/)
- {
- THROW_ERROR_EXCEPTION("Disk manager library is not available under this build configuration");
- }
-
- virtual TFuture<void> FailDiskById(const TString& /*diskId*/, const TString& /*reason*/)
- {
- THROW_ERROR_EXCEPTION("Disk manager library is not available under this build configuration");
- }
-
- virtual void OnDynamicConfigChanged(const TDiskManagerProxyDynamicConfigPtr& /*newConfig*/)
- {
- // Do nothing
- }
-};
-
-DEFINE_REFCOUNTED_TYPE(TDiskManagerProxyMock)
-
-////////////////////////////////////////////////////////////////////////////////
-
-Y_WEAK IDiskManagerProxyPtr CreateDiskManagerProxy(TDiskManagerProxyConfigPtr /*config*/)
-{
- // This implementation is used when disk_manager_proxy_impl.cpp is not linked.
-
- return New<TDiskManagerProxyMock>();
-}
-
-////////////////////////////////////////////////////////////////////////////////
-
-} // namespace NYT::NContainers
diff --git a/yt/yt/library/containers/disk_manager/disk_manager_proxy.h b/yt/yt/library/containers/disk_manager/disk_manager_proxy.h
deleted file mode 100644
index d2da5c1873..0000000000
--- a/yt/yt/library/containers/disk_manager/disk_manager_proxy.h
+++ /dev/null
@@ -1,38 +0,0 @@
-#pragma once
-
-#include "public.h"
-
-#include <yt/yt/library/containers/disk_manager/config.h>
-
-#include <yt/yt/core/misc/atomic_object.h>
-
-#include <yt/yt/core/rpc/client.h>
-
-namespace NYT::NContainers {
-
-////////////////////////////////////////////////////////////////////////////////
-
-struct IDiskManagerProxy
- : public virtual TRefCounted
-{
- virtual TFuture<THashSet<TString>> GetYtDiskMountPaths() = 0;
-
- virtual TFuture<std::vector<TDiskInfo>> GetDisks() = 0;
-
- virtual TFuture<void> RecoverDiskById(const TString& diskId, ERecoverPolicy recoverPolicy) = 0;
-
- virtual TFuture<void> FailDiskById(const TString& diskId, const TString& reason) = 0;
-
- virtual void OnDynamicConfigChanged(const TDiskManagerProxyDynamicConfigPtr& newConfig) = 0;
-
-};
-
-DEFINE_REFCOUNTED_TYPE(IDiskManagerProxy)
-
-////////////////////////////////////////////////////////////////////////////////
-
-IDiskManagerProxyPtr CreateDiskManagerProxy(TDiskManagerProxyConfigPtr config);
-
-////////////////////////////////////////////////////////////////////////////////
-
-} // namespace NYT::NContainers
diff --git a/yt/yt/library/containers/disk_manager/public.h b/yt/yt/library/containers/disk_manager/public.h
deleted file mode 100644
index 8a812638f3..0000000000
--- a/yt/yt/library/containers/disk_manager/public.h
+++ /dev/null
@@ -1,48 +0,0 @@
-#pragma once
-
-#include <yt/yt/core/misc/public.h>
-
-namespace NYT::NContainers {
-
-////////////////////////////////////////////////////////////////////////////////
-
-DEFINE_ENUM(EDiskState,
- ((Unknown) (0))
- ((Ok) (1))
- ((Failed) (2))
- ((RecoverWait) (3))
-);
-
-// 1. Remount all disk volumes to it's default state
-// 2. Recreate disk layout, all data on disk will be lost
-// 3. Replace phisical disk
-DEFINE_ENUM(ERecoverPolicy,
- ((RecoverAuto) (0))
- ((RecoverMount) (1))
- ((RecoverLayout) (2))
- ((RecoverDisk) (3))
-);
-
-struct TDiskInfo
-{
- TString DiskId;
- TString DevicePath;
- TString DeviceName;
- TString DiskModel;
- THashSet<TString> PartitionFsLabels;
- EDiskState State;
-};
-
-////////////////////////////////////////////////////////////////////////////////
-
-DECLARE_REFCOUNTED_STRUCT(TMockedDiskConfig)
-DECLARE_REFCOUNTED_STRUCT(TDiskManagerProxyConfig)
-DECLARE_REFCOUNTED_STRUCT(TDiskManagerProxyDynamicConfig)
-DECLARE_REFCOUNTED_STRUCT(TDiskInfoProviderConfig)
-
-DECLARE_REFCOUNTED_STRUCT(IDiskManagerProxy)
-DECLARE_REFCOUNTED_CLASS(TDiskInfoProvider)
-
-////////////////////////////////////////////////////////////////////////////////
-
-} // namespace NYT::NContainers
diff --git a/yt/yt/library/containers/disk_manager/ya.make b/yt/yt/library/containers/disk_manager/ya.make
deleted file mode 100644
index dcb260cf38..0000000000
--- a/yt/yt/library/containers/disk_manager/ya.make
+++ /dev/null
@@ -1,19 +0,0 @@
-LIBRARY()
-
-INCLUDE(${ARCADIA_ROOT}/yt/ya_cpp.make.inc)
-
-PEERDIR(
- yt/yt/core
-)
-
-SRCS(
- config.cpp
- disk_info_provider.cpp
- disk_manager_proxy.cpp
-)
-
-IF (NOT OPENSOURCE)
- INCLUDE(ya_non_opensource.inc)
-ENDIF()
-
-END()
diff --git a/yt/yt/library/containers/instance.cpp b/yt/yt/library/containers/instance.cpp
deleted file mode 100644
index 0a56987e1b..0000000000
--- a/yt/yt/library/containers/instance.cpp
+++ /dev/null
@@ -1,812 +0,0 @@
-#ifdef __linux__
-
-#include "instance.h"
-
-#include "porto_executor.h"
-#include "private.h"
-
-#include <yt/yt/library/containers/cgroup.h>
-#include <yt/yt/library/containers/config.h>
-
-#include <yt/yt/core/concurrency/scheduler.h>
-
-#include <yt/yt/core/logging/log.h>
-
-#include <yt/yt/core/misc/collection_helpers.h>
-#include <yt/yt/core/misc/error.h>
-#include <yt/yt/core/misc/fs.h>
-#include <yt/yt/core/misc/proc.h>
-
-#include <library/cpp/porto/libporto.hpp>
-
-#include <util/stream/file.h>
-
-#include <util/string/cast.h>
-#include <util/string/split.h>
-
-#include <util/system/env.h>
-
-#include <initializer_list>
-#include <string>
-
-namespace NYT::NContainers {
-
-using namespace NConcurrency;
-using namespace NNet;
-
-////////////////////////////////////////////////////////////////////////////////
-
-namespace NDetail {
-
-// Porto passes command string to wordexp, where quota (') symbol
-// is delimiter. So we must replace it with concatenation ('"'"').
-TString EscapeForWordexp(const char* in)
-{
- TString buffer;
- while (*in) {
- if (*in == '\'') {
- buffer.append(R"('"'"')");
- } else {
- buffer.append(*in);
- }
- in++;
- }
- return buffer;
-}
-
-i64 Extract(
- const TString& input,
- const TString& pattern,
- const TString& terminator = "\n")
-{
- auto start = input.find(pattern) + pattern.length();
- auto end = input.find(terminator, start);
- return std::stol(input.substr(start, (end == input.npos) ? end : end - start));
-}
-
-i64 ExtractSum(
- const TString& input,
- const TString& pattern,
- const TString& delimiter,
- const TString& terminator = "\n")
-{
- i64 sum = 0;
- TString::size_type pos = 0;
- while (pos < input.length()) {
- pos = input.find(pattern, pos);
- if (pos == input.npos) {
- break;
- }
- pos += pattern.length();
-
- pos = input.find(delimiter, pos);
- if (pos == input.npos) {
- break;
- }
-
- pos++;
- auto end = input.find(terminator, pos);
- sum += std::stol(input.substr(pos, (end == input.npos) ? end : end - pos));
- }
- return sum;
-}
-
-using TPortoStatRule = std::pair<TString, std::function<i64(const TString& input)>>;
-
-static const std::function<i64(const TString&)> LongExtractor = [] (const TString& in) {
- return std::stol(in);
-};
-
-static const std::function<i64(const TString&)> CoreNsPerSecondExtractor = [] (const TString& in) {
- int pos = in.find("c", 0);
- return (std::stod(in.substr(0, pos))) * 1'000'000'000;
-};
-
-static const std::function<i64(const TString&)> GetIOStatExtractor(const TString& rwMode = "")
-{
- return [rwMode] (const TString& in) {
- return ExtractSum(in, "hw", rwMode + ":", ";");
- };
-}
-
-static const std::function<i64(const TString&)> GetStatByKeyExtractor(const TString& statKey)
-{
- return [statKey] (const TString& in) {
- return Extract(in, statKey);
- };
-}
-
-const THashMap<EStatField, TPortoStatRule> PortoStatRules = {
- {EStatField::CpuUsage, {"cpu_usage", LongExtractor}},
- {EStatField::CpuSystemUsage, {"cpu_usage_system", LongExtractor}},
- {EStatField::CpuWait, {"cpu_wait", LongExtractor}},
- {EStatField::CpuThrottled, {"cpu_throttled", LongExtractor}},
- {EStatField::ThreadCount, {"thread_count", LongExtractor}},
- {EStatField::CpuLimit, {"cpu_limit_bound", CoreNsPerSecondExtractor}},
- {EStatField::CpuGuarantee, {"cpu_guarantee_bound", CoreNsPerSecondExtractor}},
- {EStatField::Rss, {"memory.stat", GetStatByKeyExtractor("total_rss")}},
- {EStatField::MappedFile, {"memory.stat", GetStatByKeyExtractor("total_mapped_file")}},
- {EStatField::MinorPageFaults, {"minor_faults", LongExtractor}},
- {EStatField::MajorPageFaults, {"major_faults", LongExtractor}},
- {EStatField::FileCacheUsage, {"cache_usage", LongExtractor}},
- {EStatField::AnonMemoryUsage, {"anon_usage", LongExtractor}},
- {EStatField::AnonMemoryLimit, {"anon_limit_total", LongExtractor}},
- {EStatField::MemoryUsage, {"memory_usage", LongExtractor}},
- {EStatField::MemoryGuarantee, {"memory_guarantee", LongExtractor}},
- {EStatField::MemoryLimit, {"memory_limit_total", LongExtractor}},
- {EStatField::MaxMemoryUsage, {"memory.max_usage_in_bytes", LongExtractor}},
- {EStatField::OomKills, {"oom_kills", LongExtractor}},
- {EStatField::OomKillsTotal, {"oom_kills_total", LongExtractor}},
-
- {EStatField::IOReadByte, {"io_read", GetIOStatExtractor()}},
- {EStatField::IOWriteByte, {"io_write", GetIOStatExtractor()}},
- {EStatField::IOBytesLimit, {"io_limit", GetIOStatExtractor()}},
- {EStatField::IOReadOps, {"io_read_ops", GetIOStatExtractor()}},
- {EStatField::IOWriteOps, {"io_write_ops", GetIOStatExtractor()}},
- {EStatField::IOOps, {"io_ops", GetIOStatExtractor()}},
- {EStatField::IOOpsLimit, {"io_ops_limit", GetIOStatExtractor()}},
- {EStatField::IOTotalTime, {"io_time", GetIOStatExtractor()}},
- {EStatField::IOWaitTime, {"io_wait", GetIOStatExtractor()}},
-
- {EStatField::NetTxBytes, {"net_tx_bytes[veth]", LongExtractor}},
- {EStatField::NetTxPackets, {"net_tx_packets[veth]", LongExtractor}},
- {EStatField::NetTxDrops, {"net_tx_drops[veth]", LongExtractor}},
- {EStatField::NetTxLimit, {"net_limit[veth]", LongExtractor}},
- {EStatField::NetRxBytes, {"net_rx_bytes[veth]", LongExtractor}},
- {EStatField::NetRxPackets, {"net_rx_packets[veth]", LongExtractor}},
- {EStatField::NetRxDrops, {"net_rx_drops[veth]", LongExtractor}},
- {EStatField::NetRxLimit, {"net_rx_limit[veth]", LongExtractor}},
-};
-
-std::optional<TString> GetParentName(const TString& name)
-{
- if (name.empty()) {
- return std::nullopt;
- }
-
- auto slashPosition = name.rfind('/');
- if (slashPosition == TString::npos) {
- return "";
- }
-
- return name.substr(0, slashPosition);
-}
-
-std::optional<TString> GetRootName(const TString& name)
-{
- if (name.empty()) {
- return std::nullopt;
- }
-
- if (name == "/") {
- return name;
- }
-
- auto slashPosition = name.find('/');
- if (slashPosition == TString::npos) {
- return name;
- }
-
- return name.substr(0, slashPosition);
-}
-
-} // namespace NDetail
-
-////////////////////////////////////////////////////////////////////////////////
-
-class TPortoInstanceLauncher
- : public IInstanceLauncher
-{
-public:
- TPortoInstanceLauncher(const TString& name, IPortoExecutorPtr executor)
- : Executor_(std::move(executor))
- , Logger(ContainersLogger.WithTag("Container: %v", name))
- {
- Spec_.Name = name;
- Spec_.CGroupControllers = {
- "freezer",
- "cpu",
- "cpuacct",
- "net_cls",
- "blkio",
- "devices",
- "pids"
- };
- }
-
- const TString& GetName() const override
- {
- return Spec_.Name;
- }
-
- bool HasRoot() const override
- {
- return static_cast<bool>(Spec_.RootFS);
- }
-
- void SetStdIn(const TString& inputPath) override
- {
- Spec_.StdinPath = inputPath;
- }
-
- void SetStdOut(const TString& outPath) override
- {
- Spec_.StdoutPath = outPath;
- }
-
- void SetStdErr(const TString& errorPath) override
- {
- Spec_.StderrPath = errorPath;
- }
-
- void SetCwd(const TString& pwd) override
- {
- Spec_.CurrentWorkingDirectory = pwd;
- }
-
- void SetCoreDumpHandler(const std::optional<TString>& handler) override
- {
- if (handler) {
- Spec_.CoreCommand = *handler;
- Spec_.EnableCoreDumps = true;
- } else {
- Spec_.EnableCoreDumps = false;
- }
- }
-
- void SetRoot(const TRootFS& rootFS) override
- {
- Spec_.RootFS = rootFS;
- }
-
- void SetThreadLimit(i64 threadLimit) override
- {
- Spec_.ThreadLimit = threadLimit;
- }
-
- void SetDevices(const std::vector<TDevice>& devices) override
- {
- Spec_.Devices = devices;
- }
-
- void SetEnablePorto(EEnablePorto enablePorto) override
- {
- Spec_.EnablePorto = enablePorto;
- }
-
- void SetIsolate(bool isolate) override
- {
- Spec_.Isolate = isolate;
- }
-
- void EnableMemoryTracking() override
- {
- Spec_.CGroupControllers.push_back("memory");
- }
-
- void SetGroup(int groupId) override
- {
- Spec_.GroupId = groupId;
- }
-
- void SetUser(const TString& user) override
- {
- Spec_.User = user;
- }
-
- void SetIPAddresses(const std::vector<NNet::TIP6Address>& addresses, bool enableNat64) override
- {
- Spec_.IPAddresses = addresses;
- Spec_.EnableNat64 = enableNat64;
- Spec_.DisableNetwork = false;
- }
-
- void DisableNetwork() override
- {
- Spec_.DisableNetwork = true;
- Spec_.IPAddresses.clear();
- Spec_.EnableNat64 = false;
- }
-
- void SetHostName(const TString& hostName) override
- {
- Spec_.HostName = hostName;
- }
-
- TFuture<IInstancePtr> Launch(
- const TString& path,
- const std::vector<TString>& args,
- const THashMap<TString, TString>& env) override
- {
- TStringBuilder commandBuilder;
- auto append = [&] (const auto& value) {
- commandBuilder.AppendString("'");
- commandBuilder.AppendString(NDetail::EscapeForWordexp(value.c_str()));
- commandBuilder.AppendString("' ");
- };
-
- append(path);
- for (const auto& arg : args) {
- append(arg);
- }
-
- Spec_.Command = commandBuilder.Flush();
- YT_LOG_DEBUG("Executing Porto container (Name: %v, Command: %v)",
- Spec_.Name,
- Spec_.Command);
-
- Spec_.Env = env;
-
- auto onContainerCreated = [this, this_ = MakeStrong(this)] (const TError& error) -> IInstancePtr {
- if (!error.IsOK()) {
- THROW_ERROR_EXCEPTION(EErrorCode::FailedToStartContainer, "Unable to start container")
- << error;
- }
-
- return GetPortoInstance(Executor_, Spec_.Name);
- };
-
- return Executor_->CreateContainer(Spec_, /* start */ true)
- .Apply(BIND(onContainerCreated));
- }
-
-private:
- IPortoExecutorPtr Executor_;
- TRunnableContainerSpec Spec_;
- const NLogging::TLogger Logger;
-};
-
-IInstanceLauncherPtr CreatePortoInstanceLauncher(const TString& name, IPortoExecutorPtr executor)
-{
- return New<TPortoInstanceLauncher>(name, executor);
-}
-
-////////////////////////////////////////////////////////////////////////////////
-
-class TPortoInstance
- : public IInstance
-{
-public:
- static IInstancePtr GetSelf(IPortoExecutorPtr executor)
- {
- return New<TPortoInstance>(GetSelfContainerName(executor), executor);
- }
-
- static IInstancePtr GetInstance(IPortoExecutorPtr executor, const TString& name)
- {
- return New<TPortoInstance>(name, executor);
- }
-
- void Kill(int signal) override
- {
- auto error = WaitFor(Executor_->KillContainer(Name_, signal));
- // Killing already finished process is not an error.
- if (error.FindMatching(EPortoErrorCode::InvalidState)) {
- return;
- }
- if (!error.IsOK()) {
- THROW_ERROR_EXCEPTION("Failed to send signal to Porto instance")
- << TErrorAttribute("signal", signal)
- << TErrorAttribute("container", Name_)
- << error;
- }
- }
-
- void Destroy() override
- {
- WaitFor(Executor_->DestroyContainer(Name_))
- .ThrowOnError();
- Destroyed_ = true;
- }
-
- void Stop() override
- {
- WaitFor(Executor_->StopContainer(Name_))
- .ThrowOnError();
- }
-
- TErrorOr<ui64> CalculateCpuUserUsage(
- TErrorOr<ui64>& cpuUsage,
- TErrorOr<ui64>& cpuSystemUsage) const
- {
- if (cpuUsage.IsOK() && cpuSystemUsage.IsOK()) {
- return cpuUsage.Value() > cpuSystemUsage.Value() ? cpuUsage.Value() - cpuSystemUsage.Value() : 0;
- } else if (cpuUsage.IsOK()) {
- return TError("Missing property %Qlv in Porto response", EStatField::CpuSystemUsage)
- << TErrorAttribute("container", Name_);
- } else {
- return TError("Missing property %Qlv in Porto response", EStatField::CpuUsage)
- << TErrorAttribute("container", Name_);
- }
- }
-
- TResourceUsage GetResourceUsage(
- const std::vector<EStatField>& fields) const override
- {
- std::vector<TString> properties;
- properties.push_back("absolute_name");
-
- bool userTimeRequested = false;
- bool contextSwitchesRequested = false;
- for (auto field : fields) {
- if (auto it = NDetail::PortoStatRules.find(field)) {
- const auto& rule = it->second;
- properties.push_back(rule.first);
- } else if (field == EStatField::ContextSwitchesDelta || field == EStatField::ContextSwitches) {
- contextSwitchesRequested = true;
- } else if (field == EStatField::CpuUserUsage) {
- userTimeRequested = true;
- } else {
- THROW_ERROR_EXCEPTION("Unknown resource field %Qlv requested", field)
- << TErrorAttribute("container", Name_);
- }
- }
-
- auto propertyMap = WaitFor(Executor_->GetContainerProperties(Name_, properties))
- .ValueOrThrow();
-
- TResourceUsage result;
-
- for (auto field : fields) {
- auto ruleIt = NDetail::PortoStatRules.find(field);
- if (ruleIt == NDetail::PortoStatRules.end()) {
- continue;
- }
-
- const auto& [property, callback] = ruleIt->second;
- auto& record = result[field];
- if (auto responseIt = propertyMap.find(property); responseIt != propertyMap.end()) {
- const auto& valueOrError = responseIt->second;
- if (valueOrError.IsOK()) {
- const auto& value = valueOrError.Value();
-
- try {
- record = callback(value);
- } catch (const std::exception& ex) {
- record = TError("Error parsing Porto property %Qlv", field)
- << TErrorAttribute("container", Name_)
- << TErrorAttribute("property_value", value)
- << ex;
- }
- } else {
- record = TError("Error getting Porto property %Qlv", field)
- << TErrorAttribute("container", Name_)
- << valueOrError;
- }
- } else {
- record = TError("Missing property %Qlv in Porto response", field)
- << TErrorAttribute("container", Name_);
- }
- }
-
- // We should maintain context switch information even if this field
- // is not requested since metrics of individual containers can go up and down.
- auto subcontainers = WaitFor(Executor_->ListSubcontainers(Name_, /*includeRoot*/ true))
- .ValueOrThrow();
-
- auto metricMap = WaitFor(Executor_->GetContainerMetrics(subcontainers, "ctxsw"))
- .ValueOrThrow();
-
- // TODO(don-dron): remove diff calculation from GetResourceUsage, because GetResourceUsage must return only snapshot stat.
- {
- auto guard = Guard(ContextSwitchMapLock_);
-
- for (const auto& [container, newValue] : metricMap) {
- auto& prevValue = ContextSwitchMap_[container];
- TotalContextSwitches_ += std::max<i64>(0LL, newValue - prevValue);
- prevValue = newValue;
- }
-
- if (contextSwitchesRequested) {
- result[EStatField::ContextSwitchesDelta] = TotalContextSwitches_;
- }
- }
-
- if (contextSwitchesRequested) {
- ui64 totalContextSwitches = 0;
-
- for (const auto& [container, newValue] : metricMap) {
- totalContextSwitches += std::max<ui64>(0UL, newValue);
- }
-
- result[EStatField::ContextSwitches] = totalContextSwitches;
- }
-
- if (userTimeRequested) {
- result[EStatField::CpuUserUsage] = CalculateCpuUserUsage(
- result[EStatField::CpuUsage],
- result[EStatField::CpuSystemUsage]);
- }
-
- return result;
- }
-
- TResourceLimits GetResourceLimits() const override
- {
- std::vector<TString> properties;
- static TString memoryLimitProperty = "memory_limit_total";
- static TString cpuLimitProperty = "cpu_limit_bound";
- static TString cpuGuaranteeProperty = "cpu_guarantee_bound";
- properties.push_back(memoryLimitProperty);
- properties.push_back(cpuLimitProperty);
- properties.push_back(cpuGuaranteeProperty);
-
- auto responseOrError = WaitFor(Executor_->GetContainerProperties(Name_, properties));
- THROW_ERROR_EXCEPTION_IF_FAILED(responseOrError, "Failed to get Porto container resource limits");
-
- const auto& response = responseOrError.Value();
-
- const auto& memoryLimitRsp = response.at(memoryLimitProperty);
- THROW_ERROR_EXCEPTION_IF_FAILED(memoryLimitRsp, "Failed to get memory limit from Porto");
-
- i64 memoryLimit;
- if (!TryFromString<i64>(memoryLimitRsp.Value(), memoryLimit)) {
- THROW_ERROR_EXCEPTION("Failed to parse memory limit value from Porto")
- << TErrorAttribute(memoryLimitProperty, memoryLimitRsp.Value());
- }
-
- const auto& cpuLimitRsp = response.at(cpuLimitProperty);
- THROW_ERROR_EXCEPTION_IF_FAILED(cpuLimitRsp, "Failed to get CPU limit from Porto");
-
- double cpuLimit;
- YT_VERIFY(cpuLimitRsp.Value().EndsWith('c'));
- auto cpuLimitValue = TStringBuf(cpuLimitRsp.Value().begin(), cpuLimitRsp.Value().size() - 1);
- if (!TryFromString<double>(cpuLimitValue, cpuLimit)) {
- THROW_ERROR_EXCEPTION("Failed to parse CPU limit value from Porto")
- << TErrorAttribute(cpuLimitProperty, cpuLimitRsp.Value());
- }
-
- const auto& cpuGuaranteeRsp = response.at(cpuGuaranteeProperty);
- THROW_ERROR_EXCEPTION_IF_FAILED(cpuGuaranteeRsp, "Failed to get CPU guarantee from Porto");
-
- double cpuGuarantee;
- if (!cpuGuaranteeRsp.Value()) {
- // XXX(ignat): hack for missing response from Porto.
- cpuGuarantee = 0.0;
- } else {
- YT_VERIFY(cpuGuaranteeRsp.Value().EndsWith('c'));
- auto cpuGuaranteeValue = TStringBuf(cpuGuaranteeRsp.Value().begin(), cpuGuaranteeRsp.Value().size() - 1);
- if (!TryFromString<double>(cpuGuaranteeValue, cpuGuarantee)) {
- THROW_ERROR_EXCEPTION("Failed to parse CPU guarantee value from Porto")
- << TErrorAttribute(cpuGuaranteeProperty, cpuGuaranteeRsp.Value());
- }
- }
-
- return TResourceLimits{
- .CpuLimit = cpuLimit,
- .CpuGuarantee = cpuGuarantee,
- .Memory = memoryLimit,
- };
- }
-
- void SetCpuGuarantee(double cores) override
- {
- SetProperty("cpu_guarantee", ToString(cores) + "c");
- }
-
- void SetCpuLimit(double cores) override
- {
- SetProperty("cpu_limit", ToString(cores) + "c");
- }
-
- void SetCpuWeight(double weight) override
- {
- SetProperty("cpu_weight", weight);
- }
-
- void SetMemoryGuarantee(i64 memoryGuarantee) override
- {
- SetProperty("memory_guarantee", memoryGuarantee);
- }
-
- void SetIOWeight(double weight) override
- {
- SetProperty("io_weight", weight);
- }
-
- void SetIOThrottle(i64 operations) override
- {
- SetProperty("io_ops_limit", operations);
- }
-
- TString GetStderr() const override
- {
- return *WaitFor(Executor_->GetContainerProperty(Name_, "stderr"))
- .ValueOrThrow();
- }
-
- TString GetName() const override
- {
- return Name_;
- }
-
- std::optional<TString> GetParentName() const override
- {
- return NDetail::GetParentName(Name_);
- }
-
- std::optional<TString> GetRootName() const override
- {
- return NDetail::GetRootName(Name_);
- }
-
- pid_t GetPid() const override
- {
- auto pid = *WaitFor(Executor_->GetContainerProperty(Name_, "root_pid"))
- .ValueOrThrow();
- return std::stoi(pid);
- }
-
- i64 GetMajorPageFaultCount() const override
- {
- auto faults = WaitFor(Executor_->GetContainerProperty(Name_, "major_faults"))
- .ValueOrThrow();
- return faults
- ? std::stoll(*faults)
- : 0;
- }
-
- double GetCpuGuarantee() const override
- {
- auto result = WaitFor(Executor_->GetContainerProperty(Name_, "cpu_guarantee"))
- .ValueOrThrow();
- return result
- ? std::stod(*result)
- : 0;
- }
-
- std::vector<pid_t> GetPids() const override
- {
- auto getPidCgroup = [&] (const TString& cgroups) {
- for (TStringBuf cgroup : StringSplitter(cgroups).SplitByString("; ")) {
- if (cgroup.StartsWith("pids:")) {
- auto startPosition = cgroup.find('/');
- YT_VERIFY(startPosition != TString::npos);
- return cgroup.substr(startPosition);
- }
- }
- THROW_ERROR_EXCEPTION("Pids cgroup not found for container %Qv", GetName())
- << TErrorAttribute("cgroups", cgroups);
- };
-
- auto cgroups = *WaitFor(Executor_->GetContainerProperty(Name_, "cgroups"))
- .ValueOrThrow();
- // Porto returns full cgroup name, with mount prefix, such as "/sys/fs/cgroup/pids".
- auto instanceCgroup = getPidCgroup(cgroups);
-
- std::vector<pid_t> pids;
- for (auto pid : ListPids()) {
- std::map<TString, TString> cgroups;
- try {
- cgroups = GetProcessCGroups(pid);
- } catch (const std::exception& ex) {
- YT_LOG_DEBUG(ex, "Failed to get CGroups for process (Pid: %v)", pid);
- continue;
- }
-
- // Pid cgroups are returned in short form.
- auto processPidCgroup = cgroups["pids"];
- if (!processPidCgroup.empty() && instanceCgroup.EndsWith(processPidCgroup)) {
- pids.push_back(pid);
- }
- }
-
- return pids;
- }
-
- TFuture<void> Wait() override
- {
- return Executor_->PollContainer(Name_)
- .Apply(BIND([] (int status) {
- StatusToError(status)
- .ThrowOnError();
- }));
- }
-
-private:
- const TString Name_;
- const IPortoExecutorPtr Executor_;
- const NLogging::TLogger Logger;
-
- bool Destroyed_ = false;
-
- YT_DECLARE_SPIN_LOCK(NThreading::TSpinLock, ContextSwitchMapLock_);
- mutable i64 TotalContextSwitches_ = 0;
- mutable THashMap<TString, i64> ContextSwitchMap_;
-
- TPortoInstance(TString name, IPortoExecutorPtr executor)
- : Name_(std::move(name))
- , Executor_(std::move(executor))
- , Logger(ContainersLogger.WithTag("Container: %v", Name_))
- { }
-
- void SetProperty(const TString& key, const TString& value)
- {
- WaitFor(Executor_->SetContainerProperty(Name_, key, value))
- .ThrowOnError();
- }
-
- void SetProperty(const TString& key, i64 value)
- {
- SetProperty(key, ToString(value));
- }
-
- void SetProperty(const TString& key, double value)
- {
- SetProperty(key, ToString(value));
- }
-
- DECLARE_NEW_FRIEND()
-};
-
-////////////////////////////////////////////////////////////////////////////////
-
-TString GetSelfContainerName(const IPortoExecutorPtr& executor)
-{
- try {
- auto properties = WaitFor(executor->GetContainerProperties(
- "self",
- std::vector<TString>{"absolute_name", "absolute_namespace"}))
- .ValueOrThrow();
-
- auto absoluteName = properties.at("absolute_name")
- .ValueOrThrow();
- auto absoluteNamespace = properties.at("absolute_namespace")
- .ValueOrThrow();
-
- if (absoluteName == "/") {
- return absoluteName;
- }
-
- if (absoluteName.length() < absoluteNamespace.length()) {
- YT_VERIFY(absoluteName + "/" == absoluteNamespace);
- return "";
- } else {
- YT_VERIFY(absoluteName.StartsWith(absoluteNamespace));
- return absoluteName.substr(absoluteNamespace.length());
- }
- } catch (const std::exception& ex) {
- THROW_ERROR_EXCEPTION("Failed to get name for container \"self\"")
- << ex;
- }
-}
-
-IInstancePtr GetSelfPortoInstance(IPortoExecutorPtr executor)
-{
- return TPortoInstance::GetSelf(executor);
-}
-
-IInstancePtr GetPortoInstance(IPortoExecutorPtr executor, const TString& name)
-{
- return TPortoInstance::GetInstance(executor, name);
-}
-
-IInstancePtr GetRootPortoInstance(IPortoExecutorPtr executor)
-{
- auto self = GetSelfPortoInstance(executor);
- return TPortoInstance::GetInstance(executor, *self->GetRootName());
-}
-
-double GetSelfPortoInstanceVCpuFactor()
-{
- auto config = New<TPortoExecutorDynamicConfig>();
- auto executorPtr = CreatePortoExecutor(config, "");
- auto currentContainer = GetSelfPortoInstance(executorPtr);
- double cpuLimit = currentContainer->GetResourceLimits().CpuLimit;
- if (cpuLimit <= 0) {
- THROW_ERROR_EXCEPTION("Cpu limit must be greater than 0");
- }
-
- // DEPLOY_VCPU_LIMIT stores value in millicores
- if (TString vcpuLimitStr = GetEnv("DEPLOY_VCPU_LIMIT"); !vcpuLimitStr.Empty()) {
- double vcpuLimit = FromString<double>(vcpuLimitStr) / 1000.0;
- return vcpuLimit / cpuLimit;
- }
- THROW_ERROR_EXCEPTION("Failed to get vcpu limit from env variable");
-}
-
-////////////////////////////////////////////////////////////////////////////////
-
-} // namespace NYT::NContainers
-
-#endif
diff --git a/yt/yt/library/containers/instance.h b/yt/yt/library/containers/instance.h
deleted file mode 100644
index ff6e0b3ce1..0000000000
--- a/yt/yt/library/containers/instance.h
+++ /dev/null
@@ -1,168 +0,0 @@
-#pragma once
-
-#include "public.h"
-
-#include <yt/yt/core/actions/future.h>
-
-#include <yt/yt/core/net/address.h>
-
-namespace NYT::NContainers {
-
-////////////////////////////////////////////////////////////////////////////////
-
-using TResourceUsage = THashMap<EStatField, TErrorOr<ui64>>;
-
-const std::vector<EStatField> InstanceStatFields{
- EStatField::CpuUsage,
- EStatField::CpuUserUsage,
- EStatField::CpuSystemUsage,
- EStatField::CpuWait,
- EStatField::CpuThrottled,
- EStatField::ContextSwitches,
- EStatField::ContextSwitchesDelta,
- EStatField::ThreadCount,
- EStatField::CpuLimit,
- EStatField::CpuGuarantee,
-
- EStatField::Rss,
- EStatField::MappedFile,
- EStatField::MajorPageFaults,
- EStatField::MinorPageFaults,
- EStatField::FileCacheUsage,
- EStatField::AnonMemoryUsage,
- EStatField::AnonMemoryLimit,
- EStatField::MemoryUsage,
- EStatField::MemoryGuarantee,
- EStatField::MemoryLimit,
- EStatField::MaxMemoryUsage,
- EStatField::OomKills,
- EStatField::OomKillsTotal,
-
- EStatField::IOReadByte,
- EStatField::IOWriteByte,
- EStatField::IOBytesLimit,
- EStatField::IOReadOps,
- EStatField::IOWriteOps,
- EStatField::IOOps,
- EStatField::IOOpsLimit,
- EStatField::IOTotalTime,
- EStatField::IOWaitTime,
-
- EStatField::NetTxBytes,
- EStatField::NetTxPackets,
- EStatField::NetTxDrops,
- EStatField::NetTxLimit,
- EStatField::NetRxBytes,
- EStatField::NetRxPackets,
- EStatField::NetRxDrops,
- EStatField::NetRxLimit,
-};
-
-struct TResourceLimits
-{
- double CpuLimit;
- double CpuGuarantee;
- i64 Memory;
-};
-
-////////////////////////////////////////////////////////////////////////////////
-
-struct IInstanceLauncher
- : public TRefCounted
-{
- virtual bool HasRoot() const = 0;
- virtual const TString& GetName() const = 0;
-
- virtual void SetStdIn(const TString& inputPath) = 0;
- virtual void SetStdOut(const TString& outPath) = 0;
- virtual void SetStdErr(const TString& errorPath) = 0;
- virtual void SetCwd(const TString& pwd) = 0;
-
- // Null core dump handler implies disabled core dumps.
- virtual void SetCoreDumpHandler(const std::optional<TString>& handler) = 0;
- virtual void SetRoot(const TRootFS& rootFS) = 0;
-
- virtual void SetThreadLimit(i64 threadLimit) = 0;
- virtual void SetDevices(const std::vector<TDevice>& devices) = 0;
-
- virtual void SetEnablePorto(EEnablePorto enablePorto) = 0;
- virtual void SetIsolate(bool isolate) = 0;
- virtual void EnableMemoryTracking() = 0;
- virtual void SetGroup(int groupId) = 0;
- virtual void SetUser(const TString& user) = 0;
- virtual void SetIPAddresses(
- const std::vector<NNet::TIP6Address>& addresses,
- bool enableNat64 = false) = 0;
- virtual void DisableNetwork() = 0;
- virtual void SetHostName(const TString& hostName) = 0;
-
- virtual TFuture<IInstancePtr> Launch(
- const TString& path,
- const std::vector<TString>& args,
- const THashMap<TString, TString>& env) = 0;
-};
-
-DEFINE_REFCOUNTED_TYPE(IInstanceLauncher)
-
-#ifdef _linux_
-IInstanceLauncherPtr CreatePortoInstanceLauncher(const TString& name, IPortoExecutorPtr executor);
-#endif
-
-////////////////////////////////////////////////////////////////////////////////
-
-struct IInstance
- : public TRefCounted
-{
- virtual void Kill(int signal) = 0;
- virtual void Stop() = 0;
- virtual void Destroy() = 0;
-
- virtual TResourceUsage GetResourceUsage(
- const std::vector<EStatField>& fields = InstanceStatFields) const = 0;
- virtual TResourceLimits GetResourceLimits() const = 0;
- virtual void SetCpuGuarantee(double cores) = 0;
- virtual void SetCpuLimit(double cores) = 0;
- virtual void SetCpuWeight(double weight) = 0;
- virtual void SetIOWeight(double weight) = 0;
- virtual void SetIOThrottle(i64 operations) = 0;
- virtual void SetMemoryGuarantee(i64 memoryGuarantee) = 0;
-
- virtual TString GetStderr() const = 0;
-
- virtual TString GetName() const = 0;
- virtual std::optional<TString> GetParentName() const = 0;
- virtual std::optional<TString> GetRootName() const = 0;
-
- //! Returns externally visible pid of the root process inside container.
- //! Throws if container is not running.
- virtual pid_t GetPid() const = 0;
- //! Returns the list of externally visible pids of processes running inside container.
- virtual std::vector<pid_t> GetPids() const = 0;
-
- virtual i64 GetMajorPageFaultCount() const = 0;
- virtual double GetCpuGuarantee() const = 0;
-
- //! Future is set when container reaches terminal state (stopped or dead).
- //! Resulting error is OK iff container exited with code 0.
- virtual TFuture<void> Wait() = 0;
-};
-
-DEFINE_REFCOUNTED_TYPE(IInstance)
-
-////////////////////////////////////////////////////////////////////////////////
-
-#ifdef _linux_
-TString GetSelfContainerName(const IPortoExecutorPtr& executor);
-
-IInstancePtr GetSelfPortoInstance(IPortoExecutorPtr executor);
-IInstancePtr GetRootPortoInstance(IPortoExecutorPtr executor);
-IInstancePtr GetPortoInstance(IPortoExecutorPtr executor, const TString& name);
-
-//! Works only in Yandex.Deploy pod environment where env DEPLOY_VCPU_LIMIT is set.
-//! Throws if this env is absent.
-double GetSelfPortoInstanceVCpuFactor();
-#endif
-
-////////////////////////////////////////////////////////////////////////////////
-
-} // namespace NYT::NContainers
diff --git a/yt/yt/library/containers/instance_limits_tracker.cpp b/yt/yt/library/containers/instance_limits_tracker.cpp
deleted file mode 100644
index 55ef7d2d67..0000000000
--- a/yt/yt/library/containers/instance_limits_tracker.cpp
+++ /dev/null
@@ -1,179 +0,0 @@
-#include "public.h"
-#include "instance_limits_tracker.h"
-#include "instance.h"
-#include "porto_resource_tracker.h"
-#include "private.h"
-
-#include <yt/yt/core/concurrency/periodic_executor.h>
-
-#include <yt/yt/core/ytree/fluent.h>
-#include <yt/yt/core/ytree/ypath_service.h>
-
-namespace NYT::NContainers {
-
-using namespace NYTree;
-
-////////////////////////////////////////////////////////////////////////////////
-
-static const auto& Logger = ContainersLogger;
-
-////////////////////////////////////////////////////////////////////////////////
-
-TInstanceLimitsTracker::TInstanceLimitsTracker(
- IInstancePtr instance,
- IInstancePtr root,
- IInvokerPtr invoker,
- TDuration updatePeriod)
- : Invoker_(std::move(invoker))
- , Executor_(New<NConcurrency::TPeriodicExecutor>(
- Invoker_,
- BIND(&TInstanceLimitsTracker::DoUpdateLimits, MakeWeak(this)),
- updatePeriod))
-{
-#ifdef _linux_
- SelfTracker_ = New<TPortoResourceTracker>(std::move(instance), updatePeriod / 2);
- RootTracker_ = New<TPortoResourceTracker>(std::move(root), updatePeriod / 2);
-#else
- Y_UNUSED(instance);
- Y_UNUSED(root);
-#endif
-}
-
-void TInstanceLimitsTracker::Start()
-{
- if (!Running_) {
- Executor_->Start();
- Running_ = true;
- YT_LOG_INFO("Instance limits tracker started");
- }
-}
-
-void TInstanceLimitsTracker::Stop()
-{
- if (Running_) {
- YT_UNUSED_FUTURE(Executor_->Stop());
- Running_ = false;
- YT_LOG_INFO("Instance limits tracker stopped");
- }
-}
-
-void TInstanceLimitsTracker::DoUpdateLimits()
-{
- VERIFY_INVOKER_AFFINITY(Invoker_);
-
-#ifdef _linux_
- YT_LOG_DEBUG("Checking for instance limits update");
-
- auto setIfOk = [] (auto* destination, const auto& valueOrError, const TString& fieldName, bool alert = true) {
- if (valueOrError.IsOK()) {
- *destination = valueOrError.Value();
- } else {
- YT_LOG_ALERT_IF(alert, valueOrError, "Failed to get container property (Field: %v)",
- fieldName);
-
- YT_LOG_DEBUG(valueOrError, "Failed to get container property (Field: %v)",
- fieldName);
- }
- };
-
- try {
- auto memoryStatistics = SelfTracker_->GetMemoryStatistics();
- auto netStatistics = RootTracker_->GetNetworkStatistics();
- auto cpuStatistics = SelfTracker_->GetCpuStatistics();
-
- setIfOk(&MemoryUsage_, memoryStatistics.Rss, "MemoryRss");
-
- TDuration cpuGuarantee;
- TDuration cpuLimit;
-
- if (cpuStatistics.GuaranteeTime.IsOK()) {
- setIfOk(&cpuGuarantee, cpuStatistics.GuaranteeTime, "CpuGuarantee");
- } else {
- // XXX(don-dron, ignat): do nothing, see NContainers::TPortoInstance::GetResourceLimits, hack for missing response from Porto.
- }
-
- setIfOk(&cpuLimit, cpuStatistics.LimitTime, "CpuLimit");
-
- if (CpuGuarantee_ != cpuGuarantee) {
- YT_LOG_INFO("Instance CPU guarantee updated (OldCpuGuarantee: %v, NewCpuGuarantee: %v)",
- CpuGuarantee_,
- cpuGuarantee);
- CpuGuarantee_ = cpuGuarantee;
- // NB: We do not fire LimitsUpdated since this value used only for diagnostics.
- }
-
- TInstanceLimits limits;
- limits.Cpu = cpuLimit.SecondsFloat();
-
- if (memoryStatistics.AnonLimit.IsOK() && memoryStatistics.MemoryLimit.IsOK()) {
- i64 anonLimit = memoryStatistics.AnonLimit.Value();
- i64 memoryLimit = memoryStatistics.MemoryLimit.Value();
-
- if (anonLimit > 0 && memoryLimit > 0) {
- limits.Memory = std::min(anonLimit, memoryLimit);
- } else if (anonLimit > 0) {
- limits.Memory = anonLimit;
- } else {
- limits.Memory = memoryLimit;
- }
- } else {
- setIfOk(&limits.Memory, memoryStatistics.MemoryLimit, "MemoryLimit");
- }
-
- static constexpr bool DontFireAlertOnError = {};
- setIfOk(&limits.NetTx, netStatistics.TxLimit, "NetTxLimit", DontFireAlertOnError);
- setIfOk(&limits.NetRx, netStatistics.RxLimit, "NetRxLimit", DontFireAlertOnError);
-
- if (InstanceLimits_ != limits) {
- YT_LOG_INFO("Instance limits updated (OldLimits: %v, NewLimits: %v)",
- InstanceLimits_,
- limits);
- InstanceLimits_ = limits;
- LimitsUpdated_.Fire(limits);
- }
- } catch (const std::exception& ex) {
- YT_LOG_WARNING(ex, "Failed to get instance limits");
- }
-#endif
-}
-
-IYPathServicePtr TInstanceLimitsTracker::GetOrchidService()
-{
- return IYPathService::FromProducer(BIND(&TInstanceLimitsTracker::DoBuildOrchid, MakeStrong(this)))
- ->Via(Invoker_);
-}
-
-void TInstanceLimitsTracker::DoBuildOrchid(NYson::IYsonConsumer* consumer) const
-{
- NYTree::BuildYsonFluently(consumer)
- .BeginMap()
- .DoIf(static_cast<bool>(InstanceLimits_), [&] (auto fluent) {
- fluent.Item("cpu_limit").Value(InstanceLimits_->Cpu);
- })
- .DoIf(static_cast<bool>(CpuGuarantee_), [&] (auto fluent) {
- fluent.Item("cpu_guarantee").Value(*CpuGuarantee_);
- })
- .DoIf(static_cast<bool>(InstanceLimits_), [&] (auto fluent) {
- fluent.Item("memory_limit").Value(InstanceLimits_->Memory);
- })
- .DoIf(static_cast<bool>(MemoryUsage_), [&] (auto fluent) {
- fluent.Item("memory_usage").Value(*MemoryUsage_);
- })
- .EndMap();
-}
-
-////////////////////////////////////////////////////////////////////////////////
-
-void FormatValue(TStringBuilderBase* builder, const TInstanceLimits& limits, TStringBuf /*format*/)
-{
- builder->AppendFormat(
- "{Cpu: %v, Memory: %v, NetTx: %v, NetRx: %v}",
- limits.Cpu,
- limits.Memory,
- limits.NetTx,
- limits.NetRx);
-}
-
-////////////////////////////////////////////////////////////////////////////////
-
-} // namespace NYT::NContainers
diff --git a/yt/yt/library/containers/instance_limits_tracker.h b/yt/yt/library/containers/instance_limits_tracker.h
deleted file mode 100644
index e652fff446..0000000000
--- a/yt/yt/library/containers/instance_limits_tracker.h
+++ /dev/null
@@ -1,59 +0,0 @@
-#pragma once
-
-#include "public.h"
-
-#include <yt/yt/core/actions/signal.h>
-
-#include <yt/yt/core/concurrency/public.h>
-
-#include <yt/yt/core/yson/public.h>
-
-#include <yt/yt/core/ytree/public.h>
-
-namespace NYT::NContainers {
-
-////////////////////////////////////////////////////////////////////////////////
-
-class TInstanceLimitsTracker
- : public TRefCounted
-{
-public:
- //! Raises when container limits change.
- DEFINE_SIGNAL(void(const TInstanceLimits&), LimitsUpdated);
-
-public:
- TInstanceLimitsTracker(
- IInstancePtr instance,
- IInstancePtr root,
- IInvokerPtr invoker,
- TDuration updatePeriod);
-
- void Start();
- void Stop();
-
- NYTree::IYPathServicePtr GetOrchidService();
-
-private:
- void DoUpdateLimits();
- void DoBuildOrchid(NYson::IYsonConsumer* consumer) const;
-
- TPortoResourceTrackerPtr SelfTracker_;
- TPortoResourceTrackerPtr RootTracker_;
- const IInvokerPtr Invoker_;
- const NConcurrency::TPeriodicExecutorPtr Executor_;
-
- std::optional<TDuration> CpuGuarantee_;
- std::optional<TInstanceLimits> InstanceLimits_;
- std::optional<i64> MemoryUsage_;
- bool Running_ = false;
-};
-
-DEFINE_REFCOUNTED_TYPE(TInstanceLimitsTracker)
-
-////////////////////////////////////////////////////////////////////////////////
-
-void FormatValue(TStringBuilderBase* builder, const TInstanceLimits& limits, TStringBuf format);
-
-////////////////////////////////////////////////////////////////////////////////
-
-} // namespace NYT::NContainers
diff --git a/yt/yt/library/containers/porto_executor.cpp b/yt/yt/library/containers/porto_executor.cpp
deleted file mode 100644
index a6a44fd20f..0000000000
--- a/yt/yt/library/containers/porto_executor.cpp
+++ /dev/null
@@ -1,1079 +0,0 @@
-#include "porto_executor.h"
-#include "config.h"
-
-#include "private.h"
-
-#include <yt/yt/core/concurrency/action_queue.h>
-#include <yt/yt/core/concurrency/periodic_executor.h>
-#include <yt/yt/core/concurrency/scheduler.h>
-
-#include <yt/yt/core/logging/log.h>
-
-#include <yt/yt/core/misc/fs.h>
-
-#include <yt/yt/core/profiling/timing.h>
-
-#include <yt/yt/core/ytree/convert.h>
-
-#include <library/cpp/porto/proto/rpc.pb.h>
-
-#include <library/cpp/yt/memory/atomic_intrusive_ptr.h>
-
-#include <string>
-
-namespace NYT::NContainers {
-
-using namespace NConcurrency;
-using Porto::EError;
-
-////////////////////////////////////////////////////////////////////////////////
-
-#ifdef _linux_
-
-static const NLogging::TLogger& Logger = ContainersLogger;
-static constexpr auto RetryInterval = TDuration::MilliSeconds(100);
-
-////////////////////////////////////////////////////////////////////////////////
-
-TString PortoErrorCodeFormatter(int code)
-{
- return TEnumTraits<EPortoErrorCode>::ToString(static_cast<EPortoErrorCode>(code));
-}
-
-YT_DEFINE_ERROR_CODE_RANGE(12000, 13999, "NYT::NContainers::EPortoErrorCode", PortoErrorCodeFormatter);
-
-////////////////////////////////////////////////////////////////////////////////
-
-EPortoErrorCode ConvertPortoErrorCode(EError portoError)
-{
- return static_cast<EPortoErrorCode>(PortoErrorCodeBase + portoError);
-}
-
-bool IsRetriableErrorCode(EPortoErrorCode error, bool idempotent)
-{
- return
- error == EPortoErrorCode::Unknown ||
- // TODO(babenko): it's not obvious that we can always retry SocketError
- // but this is how it has used to work for a while.
- error == EPortoErrorCode::SocketError ||
- error == EPortoErrorCode::SocketTimeout && idempotent;
-}
-
-THashMap<TString, TErrorOr<TString>> ParsePortoGetResponse(
- const Porto::TGetResponse_TContainerGetListResponse& response)
-{
- THashMap<TString, TErrorOr<TString>> result;
- for (const auto& property : response.keyval()) {
- if (property.error() == EError::Success) {
- result[property.variable()] = property.value();
- } else {
- result[property.variable()] = TError(ConvertPortoErrorCode(property.error()), property.errormsg())
- << TErrorAttribute("porto_error", ConvertPortoErrorCode(property.error()));
- }
- }
- return result;
-}
-
-THashMap<TString, TErrorOr<TString>> ParseSinglePortoGetResponse(
- const TString& name,
- const Porto::TGetResponse& getResponse)
-{
- for (const auto& container : getResponse.list()) {
- if (container.name() == name) {
- return ParsePortoGetResponse(container);
- }
- }
- THROW_ERROR_EXCEPTION("Unable to get properties from Porto")
- << TErrorAttribute("container", name);
-}
-
-THashMap<TString, THashMap<TString, TErrorOr<TString>>> ParseMultiplePortoGetResponse(
- const Porto::TGetResponse& getResponse)
-{
- THashMap<TString, THashMap<TString, TErrorOr<TString>>> result;
- for (const auto& container : getResponse.list()) {
- result[container.name()] = ParsePortoGetResponse(container);
- }
- return result;
-}
-
-TString FormatEnablePorto(EEnablePorto value)
-{
- switch (value) {
- case EEnablePorto::None: return "none";
- case EEnablePorto::Isolate: return "isolate";
- case EEnablePorto::Full: return "full";
- default: YT_ABORT();
- }
-}
-
-////////////////////////////////////////////////////////////////////////////////
-
-class TPortoExecutor
- : public IPortoExecutor
-{
-public:
- TPortoExecutor(
- TPortoExecutorDynamicConfigPtr config,
- const TString& threadNameSuffix,
- const NProfiling::TProfiler& profiler)
- : Config_(std::move(config))
- , Queue_(New<TActionQueue>(Format("Porto:%v", threadNameSuffix)))
- , Profiler_(profiler)
- , PollExecutor_(New<TPeriodicExecutor>(
- Queue_->GetInvoker(),
- BIND(&TPortoExecutor::DoPoll, MakeWeak(this)),
- Config_->PollPeriod))
- {
- DynamicConfig_.Store(New<TPortoExecutorDynamicConfig>());
-
- Api_->SetTimeout(Config_->ApiTimeout.Seconds());
- Api_->SetDiskTimeout(Config_->ApiDiskTimeout.Seconds());
-
- PollExecutor_->Start();
- }
-
- void SubscribeFailed(const TCallback<void (const TError&)>& callback) override
- {
- Failed_.Subscribe(callback);
- }
-
- void UnsubscribeFailed(const TCallback<void (const TError&)>& callback) override
- {
- Failed_.Unsubscribe(callback);
- }
-
- void OnDynamicConfigChanged(const TPortoExecutorDynamicConfigPtr& newConfig) override
- {
- DynamicConfig_.Store(newConfig);
- }
-
-private:
- template <class T, class... TArgs1, class... TArgs2>
- auto ExecutePortoApiAction(
- T(TPortoExecutor::*Method)(TArgs1...),
- const TString& command,
- TArgs2&&... args)
- {
- YT_LOG_DEBUG("Enqueue Porto API action (Command: %v)", command);
- return BIND(Method, MakeStrong(this), std::forward<TArgs2>(args)...)
- .AsyncVia(Queue_->GetInvoker())
- .Run();
- };
-
-public:
- TFuture<void> CreateContainer(const TString& container) override
- {
- return ExecutePortoApiAction(
- &TPortoExecutor::DoCreateContainer,
- "CreateContainer",
- container);
- }
-
- TFuture<void> CreateContainer(const TRunnableContainerSpec& containerSpec, bool start) override
- {
- return ExecutePortoApiAction(
- &TPortoExecutor::DoCreateContainerFromSpec,
- "CreateContainerFromSpec",
- containerSpec,
- start);
- }
-
- TFuture<std::optional<TString>> GetContainerProperty(
- const TString& container,
- const TString& property) override
- {
- return ExecutePortoApiAction(
- &TPortoExecutor::DoGetContainerProperty,
- "GetContainerProperty",
- container,
- property);
- }
-
- TFuture<THashMap<TString, TErrorOr<TString>>> GetContainerProperties(
- const TString& container,
- const std::vector<TString>& properties) override
- {
- return ExecutePortoApiAction(
- &TPortoExecutor::DoGetContainerProperties,
- "GetContainerProperty",
- container,
- properties);
- }
-
- TFuture<THashMap<TString, THashMap<TString, TErrorOr<TString>>>> GetContainerProperties(
- const std::vector<TString>& containers,
- const std::vector<TString>& properties) override
- {
- return ExecutePortoApiAction(
- &TPortoExecutor::DoGetContainerMultipleProperties,
- "GetContainerProperty",
- containers,
- properties);
- }
-
- TFuture<THashMap<TString, i64>> GetContainerMetrics(
- const std::vector<TString>& containers,
- const TString& metric) override
- {
- return ExecutePortoApiAction(
- &TPortoExecutor::DoGetContainerMetrics,
- "GetContainerMetrics",
- containers,
- metric);
- }
-
- TFuture<void> SetContainerProperty(
- const TString& container,
- const TString& property,
- const TString& value) override
- {
- return ExecutePortoApiAction(
- &TPortoExecutor::DoSetContainerProperty,
- "SetContainerProperty",
- container,
- property,
- value);
- }
-
- TFuture<void> DestroyContainer(const TString& container) override
- {
- return ExecutePortoApiAction(
- &TPortoExecutor::DoDestroyContainer,
- "DestroyContainer",
- container);
- }
-
- TFuture<void> StopContainer(const TString& container) override
- {
- return ExecutePortoApiAction(
- &TPortoExecutor::DoStopContainer,
- "StopContainer",
- container);
- }
-
- TFuture<void> StartContainer(const TString& container) override
- {
- return ExecutePortoApiAction(
- &TPortoExecutor::DoStartContainer,
- "StartContainer",
- container);
- }
-
- TFuture<TString> ConvertPath(const TString& path, const TString& container) override
- {
- return ExecutePortoApiAction(
- &TPortoExecutor::DoConvertPath,
- "ConvertPath",
- path,
- container);
- }
-
- TFuture<void> KillContainer(const TString& container, int signal) override
- {
- return ExecutePortoApiAction(
- &TPortoExecutor::DoKillContainer,
- "KillContainer",
- container,
- signal);
- }
-
- TFuture<std::vector<TString>> ListSubcontainers(
- const TString& rootContainer,
- bool includeRoot) override
- {
- return ExecutePortoApiAction(
- &TPortoExecutor::DoListSubcontainers,
- "ListSubcontainers",
- rootContainer,
- includeRoot);
- }
-
- TFuture<int> PollContainer(const TString& container) override
- {
- return ExecutePortoApiAction(
- &TPortoExecutor::DoPollContainer,
- "PollContainer",
- container);
- }
-
- TFuture<int> WaitContainer(const TString& container) override
- {
- return ExecutePortoApiAction(
- &TPortoExecutor::DoWaitContainer,
- "WaitContainer",
- container);
- }
-
- // This method allocates Porto "resources", so it should be uncancellable.
- TFuture<TString> CreateVolume(
- const TString& path,
- const THashMap<TString, TString>& properties) override
- {
- return ExecutePortoApiAction(
- &TPortoExecutor::DoCreateVolume,
- "CreateVolume",
- path,
- properties)
- .ToUncancelable();
- }
-
- // This method allocates Porto "resources", so it should be uncancellable.
- TFuture<void> LinkVolume(
- const TString& path,
- const TString& name) override
- {
- return ExecutePortoApiAction(
- &TPortoExecutor::DoLinkVolume,
- "LinkVolume",
- path,
- name)
- .ToUncancelable();
- }
-
- // This method deallocates Porto "resources", so it should be uncancellable.
- TFuture<void> UnlinkVolume(
- const TString& path,
- const TString& name) override
- {
- return ExecutePortoApiAction(
- &TPortoExecutor::DoUnlinkVolume,
- "UnlinkVolume",
- path,
- name)
- .ToUncancelable();
- }
-
- TFuture<std::vector<TString>> ListVolumePaths() override
- {
- return ExecutePortoApiAction(
- &TPortoExecutor::DoListVolumePaths,
- "ListVolumePaths");
- }
-
- // This method allocates Porto "resources", so it should be uncancellable.
- TFuture<void> ImportLayer(const TString& archivePath, const TString& layerId, const TString& place) override
- {
- return ExecutePortoApiAction(
- &TPortoExecutor::DoImportLayer,
- "ImportLayer",
- archivePath,
- layerId,
- place)
- .ToUncancelable();
- }
-
- // This method deallocates Porto "resources", so it should be uncancellable.
- TFuture<void> RemoveLayer(const TString& layerId, const TString& place, bool async) override
- {
- return ExecutePortoApiAction(
- &TPortoExecutor::DoRemoveLayer,
- "RemoveLayer",
- layerId,
- place,
- async)
- .ToUncancelable();
- }
-
- TFuture<std::vector<TString>> ListLayers(const TString& place) override
- {
- return ExecutePortoApiAction(
- &TPortoExecutor::DoListLayers,
- "ListLayers",
- place);
- }
-
- IInvokerPtr GetInvoker() const override
- {
- return Queue_->GetInvoker();
- }
-
-private:
- const TPortoExecutorDynamicConfigPtr Config_;
- const TActionQueuePtr Queue_;
- const NProfiling::TProfiler Profiler_;
- const std::unique_ptr<Porto::TPortoApi> Api_ = std::make_unique<Porto::TPortoApi>();
- const TPeriodicExecutorPtr PollExecutor_;
- TAtomicIntrusivePtr<TPortoExecutorDynamicConfig> DynamicConfig_;
-
- std::vector<TString> Containers_;
- THashMap<TString, TPromise<int>> ContainerMap_;
- TSingleShotCallbackList<void(const TError&)> Failed_;
-
- struct TCommandEntry
- {
- explicit TCommandEntry(const NProfiling::TProfiler& registry)
- : TimeGauge(registry.Timer("/command_time"))
- , RetryCounter(registry.Counter("/command_retries"))
- , SuccessCounter(registry.Counter("/command_successes"))
- , FailureCounter(registry.Counter("/command_failures"))
- { }
-
- NProfiling::TEventTimer TimeGauge;
- NProfiling::TCounter RetryCounter;
- NProfiling::TCounter SuccessCounter;
- NProfiling::TCounter FailureCounter;
- };
-
- YT_DECLARE_SPIN_LOCK(NThreading::TSpinLock, CommandLock_);
- THashMap<TString, TCommandEntry> CommandToEntry_;
-
- static const std::vector<TString> ContainerRequestVars_;
-
- bool IsTestPortoFailureEnabled() const
- {
- auto config = DynamicConfig_.Acquire();
- return config->EnableTestPortoFailures;
- }
-
- bool IsTestPortoTimeout() const
- {
- auto config = DynamicConfig_.Acquire();
- return config->EnableTestPortoNotResponding;
- }
-
- EPortoErrorCode GetFailedStubError() const
- {
- auto config = DynamicConfig_.Acquire();
- return config->StubErrorCode;
- }
-
- static TError CreatePortoError(EPortoErrorCode errorCode, const TString& message)
- {
- return TError(errorCode, "Porto API error")
- << TErrorAttribute("original_porto_error_code", static_cast<int>(errorCode) - PortoErrorCodeBase)
- << TErrorAttribute("porto_error_message", message);
- }
-
- THashMap<TString, TErrorOr<TString>> DoGetContainerProperties(
- const TString& container,
- const std::vector<TString>& properties)
- {
- auto response = DoRequestContainerProperties({container}, properties);
- return ParseSinglePortoGetResponse(container, response);
- }
-
- THashMap<TString, THashMap<TString, TErrorOr<TString>>> DoGetContainerMultipleProperties(
- const std::vector<TString>& containers,
- const std::vector<TString>& properties)
- {
- auto response = DoRequestContainerProperties(containers, properties);
- return ParseMultiplePortoGetResponse(response);
- }
-
- std::optional<TString> DoGetContainerProperty(
- const TString& container,
- const TString& property)
- {
- auto response = DoRequestContainerProperties({container}, {property});
- auto parsedResponse = ParseSinglePortoGetResponse(container, response);
- auto it = parsedResponse.find(property);
- if (it == parsedResponse.end()) {
- return std::nullopt;
- } else {
- return it->second.ValueOrThrow();
- }
- }
-
- void DoCreateContainer(const TString& container)
- {
- ExecuteApiCall(
- [&] { return Api_->Create(container); },
- "Create",
- /*idempotent*/ false);
- }
-
- void DoCreateContainerFromSpec(const TRunnableContainerSpec& spec, bool start)
- {
- Porto::TContainerSpec portoSpec;
-
- // Required properties.
- portoSpec.set_name(spec.Name);
- portoSpec.set_command(spec.Command);
-
- portoSpec.set_enable_porto(FormatEnablePorto(spec.EnablePorto));
- portoSpec.set_isolate(spec.Isolate);
-
- if (spec.StdinPath) {
- portoSpec.set_stdin_path(*spec.StdinPath);
- }
- if (spec.StdoutPath) {
- portoSpec.set_stdout_path(*spec.StdoutPath);
- }
- if (spec.StderrPath) {
- portoSpec.set_stderr_path(*spec.StderrPath);
- }
-
- if (spec.CurrentWorkingDirectory) {
- portoSpec.set_cwd(*spec.CurrentWorkingDirectory);
- }
-
- if (spec.CoreCommand) {
- portoSpec.set_core_command(*spec.CoreCommand);
- }
- if (spec.User) {
- portoSpec.set_user(*spec.User);
- }
-
- // Useful for jobs, where we operate with numeric group ids.
- if (spec.GroupId) {
- portoSpec.set_group(ToString(*spec.GroupId));
- }
-
- if (spec.ThreadLimit) {
- portoSpec.set_thread_limit(*spec.ThreadLimit);
- }
-
- if (spec.HostName) {
- // To get a reasonable and unique host name inside container.
- portoSpec.set_hostname(*spec.HostName);
- if (!spec.IPAddresses.empty()) {
- const auto& address = spec.IPAddresses[0];
- auto etcHosts = Format("%v %v\n", address, *spec.HostName);
- // To be able to resolve hostname into IP inside container.
- portoSpec.set_etc_hosts(etcHosts);
- }
- }
-
- if (spec.DisableNetwork) {
- auto* netConfig = portoSpec.mutable_net()->add_cfg();
- netConfig->set_opt("none");
- } else if (!spec.IPAddresses.empty() && Config_->EnableNetworkIsolation) {
- // This label is intended for HBF-agent: YT-12512.
- auto* label = portoSpec.mutable_labels()->add_map();
- label->set_key("HBF.ignore_address");
- label->set_val("1");
-
- auto* netConfig = portoSpec.mutable_net()->add_cfg();
- netConfig->set_opt("L3");
- netConfig->add_arg("veth0");
-
- for (const auto& address : spec.IPAddresses) {
- auto* ipConfig = portoSpec.mutable_ip()->add_cfg();
- ipConfig->set_dev("veth0");
- ipConfig->set_ip(ToString(address));
- }
-
- if (spec.EnableNat64) {
- // Behave like nanny does.
- portoSpec.set_resolv_conf("nameserver fd64::1;nameserver 2a02:6b8:0:3400::5005;options attempts:1 timeout:1");
- }
- }
-
- for (const auto& [key, value] : spec.Labels) {
- auto* map = portoSpec.mutable_labels()->add_map();
- map->set_key(key);
- map->set_val(value);
- }
-
- for (const auto& [name, value] : spec.Env) {
- auto* var = portoSpec.mutable_env()->add_var();
- var->set_name(name);
- var->set_value(value);
- }
-
- for (const auto& controller : spec.CGroupControllers) {
- portoSpec.mutable_controllers()->add_controller(controller);
- }
-
- for (const auto& device : spec.Devices) {
- auto* portoDevice = portoSpec.mutable_devices()->add_device();
- portoDevice->set_device(device.DeviceName);
- portoDevice->set_access(device.Enabled ? "rw" : "-");
- }
-
- auto addBind = [&] (const TBind& bind) {
- auto* portoBind = portoSpec.mutable_bind()->add_bind();
- portoBind->set_target(bind.TargetPath);
- portoBind->set_source(bind.SourcePath);
- portoBind->add_flag(bind.ReadOnly ? "ro" : "rw");
- };
-
- if (spec.RootFS) {
- portoSpec.set_root_readonly(spec.RootFS->IsRootReadOnly);
- portoSpec.set_root(spec.RootFS->RootPath);
-
- for (const auto& bind : spec.RootFS->Binds) {
- addBind(bind);
- }
- }
-
- {
- auto* ulimit = portoSpec.mutable_ulimit()->add_ulimit();
- ulimit->set_type("core");
- if (spec.EnableCoreDumps) {
- ulimit->set_unlimited(true);
- } else {
- ulimit->set_hard(0);
- ulimit->set_soft(0);
- }
- }
-
- // Set some universal defaults.
- portoSpec.set_oom_is_fatal(false);
-
- ExecuteApiCall(
- [&] { return Api_->CreateFromSpec(portoSpec, {}, start); },
- "CreateFromSpec",
- /*idempotent*/ false);
- }
-
- void DoSetContainerProperty(const TString& container, const TString& property, const TString& value)
- {
- ExecuteApiCall(
- [&] { return Api_->SetProperty(container, property, value); },
- "SetProperty",
- /*idempotent*/ true);
- }
-
- void DoDestroyContainer(const TString& container)
- {
- try {
- ExecuteApiCall(
- [&] { return Api_->Destroy(container); },
- "Destroy",
- /*idempotent*/ true);
- } catch (const TErrorException& ex) {
- if (!ex.Error().FindMatching(EPortoErrorCode::ContainerDoesNotExist)) {
- throw;
- }
- }
- }
-
- void DoStopContainer(const TString& container)
- {
- ExecuteApiCall(
- [&] { return Api_->Stop(container); },
- "Stop",
- /*idempotent*/ true);
- }
-
- void DoStartContainer(const TString& container)
- {
- ExecuteApiCall(
- [&] { return Api_->Start(container); },
- "Start",
- /*idempotent*/ false);
- }
-
- TString DoConvertPath(const TString& path, const TString& container)
- {
- TString result;
- ExecuteApiCall(
- [&] { return Api_->ConvertPath(path, container, "self", result); },
- "ConvertPath",
- /*idempotent*/ true);
- return result;
- }
-
- void DoKillContainer(const TString& container, int signal)
- {
- ExecuteApiCall(
- [&] { return Api_->Kill(container, signal); },
- "Kill",
- /*idempotent*/ false);
- }
-
- std::vector<TString> DoListSubcontainers(const TString& rootContainer, bool includeRoot)
- {
- Porto::TListContainersRequest req;
- auto filter = req.add_filters();
- filter->set_name(rootContainer + "/*");
- if (includeRoot) {
- auto rootFilter = req.add_filters();
- rootFilter->set_name(rootContainer);
- }
- auto fieldOptions = req.mutable_field_options();
- fieldOptions->add_properties("absolute_name");
- TVector<Porto::TContainer> containers;
- ExecuteApiCall(
- [&] { return Api_->ListContainersBy(req, containers); },
- "ListContainersBy",
- /*idempotent*/ true);
-
- std::vector<TString> containerNames;
- containerNames.reserve(containers.size());
- for (const auto& container : containers) {
- const auto& absoluteName = container.status().absolute_name();
- if (!absoluteName.empty()) {
- containerNames.push_back(absoluteName);
- }
- }
- return containerNames;
- }
-
- TFuture<int> DoWaitContainer(const TString& container)
- {
- auto result = NewPromise<int>();
- auto waitCallback = [=, this, this_ = MakeStrong(this)] (const Porto::TWaitResponse& rsp) {
- return OnContainerTerminated(rsp, result);
- };
-
- ExecuteApiCall(
- [&] { return Api_->AsyncWait({container}, {}, waitCallback); },
- "AsyncWait",
- /*idempotent*/ false);
-
- return result.ToFuture().ToImmediatelyCancelable();
- }
-
- void OnContainerTerminated(const Porto::TWaitResponse& portoWaitResponse, TPromise<int> result)
- {
- const auto& container = portoWaitResponse.name();
- const auto& state = portoWaitResponse.state();
- if (state != "dead" && state != "stopped") {
- result.TrySet(TError("Container finished with unexpected state")
- << TErrorAttribute("container_name", container)
- << TErrorAttribute("container_state", state));
- return;
- }
-
- // TODO(max42): switch to Subscribe.
- YT_UNUSED_FUTURE(GetContainerProperty(container, "exit_status").Apply(BIND(
- [=] (const TErrorOr<std::optional<TString>>& errorOrExitCode) {
- if (!errorOrExitCode.IsOK()) {
- result.TrySet(TError("Container finished, but exit status is unknown")
- << errorOrExitCode);
- return;
- }
-
- const auto& optionalExitCode = errorOrExitCode.Value();
- if (!optionalExitCode) {
- result.TrySet(TError("Container finished, but exit status is unknown")
- << TErrorAttribute("container_name", container)
- << TErrorAttribute("container_state", state));
- return;
- }
-
- try {
- int exitStatus = FromString<int>(*optionalExitCode);
- result.TrySet(exitStatus);
- } catch (const std::exception& ex) {
- auto error = TError("Failed to parse Porto exit status")
- << TErrorAttribute("container_name", container)
- << TErrorAttribute("exit_status", optionalExitCode.value());
- error.MutableInnerErrors()->push_back(TError(ex));
- result.TrySet(error);
- }
- })));
- }
-
- TFuture<int> DoPollContainer(const TString& container)
- {
- auto [it, inserted] = ContainerMap_.insert({container, NewPromise<int>()});
- if (!inserted) {
- YT_LOG_WARNING("Container already added for polling (Container: %v)",
- container);
- } else {
- Containers_.push_back(container);
- }
- return it->second.ToFuture();
- }
-
- Porto::TGetResponse DoRequestContainerProperties(
- const std::vector<TString>& containers,
- const std::vector<TString>& vars)
- {
- TVector<TString> containers_(containers.begin(), containers.end());
- TVector<TString> vars_(vars.begin(), vars.end());
-
- const Porto::TGetResponse* getResponse;
-
- ExecuteApiCall(
- [&] {
- getResponse = Api_->Get(containers_, vars_);
- return getResponse ? EError::Success : EError::Unknown;
- },
- "Get",
- /*idempotent*/ true);
-
- YT_VERIFY(getResponse);
- return *getResponse;
- }
-
- THashMap<TString, i64> DoGetContainerMetrics(
- const std::vector<TString>& containers,
- const TString& metric)
- {
- TVector<TString> containers_(containers.begin(), containers.end());
-
- TMap<TString, uint64_t> result;
-
- ExecuteApiCall(
- [&] { return Api_->GetProcMetric(containers_, metric, result); },
- "GetProcMetric",
- /*idempotent*/ true);
-
- return {result.begin(), result.end()};
- }
-
- void DoPoll()
- {
- try {
- if (Containers_.empty()) {
- return;
- }
-
- auto getResponse = DoRequestContainerProperties(Containers_, ContainerRequestVars_);
-
- if (getResponse.list().empty()) {
- return;
- }
-
- auto getProperty = [] (
- const Porto::TGetResponse::TContainerGetListResponse& container,
- const TString& name) -> Porto::TGetResponse::TContainerGetValueResponse
- {
- for (const auto& property : container.keyval()) {
- if (property.variable() == name) {
- return property;
- }
- }
-
- return {};
- };
-
- for (const auto& container : getResponse.list()) {
- auto state = getProperty(container, "state");
- if (state.error() == EError::ContainerDoesNotExist) {
- HandleResult(container.name(), state);
- } else if (state.value() == "dead" || state.value() == "stopped") {
- HandleResult(container.name(), getProperty(container, "exit_status"));
- }
- //TODO(dcherednik): other states
- }
- } catch (const std::exception& ex) {
- YT_LOG_ERROR(ex, "Fatal exception occurred while polling Porto");
- Failed_.Fire(TError(ex));
- }
- }
-
- TString DoCreateVolume(
- const TString& path,
- const THashMap<TString, TString>& properties)
- {
- auto volume = path;
- TMap<TString, TString> propertyMap(properties.begin(), properties.end());
- ExecuteApiCall(
- [&] { return Api_->CreateVolume(volume, propertyMap); },
- "CreateVolume",
- /*idempotent*/ false);
- return volume;
- }
-
- void DoLinkVolume(const TString& path, const TString& container)
- {
- ExecuteApiCall(
- [&] { return Api_->LinkVolume(path, container); },
- "LinkVolume",
- /*idempotent*/ false);
- }
-
- void DoUnlinkVolume(const TString& path, const TString& container)
- {
- ExecuteApiCall(
- [&] { return Api_->UnlinkVolume(path, container); },
- "UnlinkVolume",
- /*idempotent*/ false);
- }
-
- std::vector<TString> DoListVolumePaths()
- {
- TVector<TString> volumes;
- ExecuteApiCall(
- [&] { return Api_->ListVolumes(volumes); },
- "ListVolume",
- /*idempotent*/ true);
- return {volumes.begin(), volumes.end()};
- }
-
- void DoImportLayer(const TString& archivePath, const TString& layerId, const TString& place)
- {
- ExecuteApiCall(
- [&] { return Api_->ImportLayer(layerId, archivePath, false, place); },
- "ImportLayer",
- /*idempotent*/ false);
- }
-
- void DoRemoveLayer(const TString& layerId, const TString& place, bool async)
- {
- ExecuteApiCall(
- [&] { return Api_->RemoveLayer(layerId, place, async); },
- "RemoveLayer",
- /*idempotent*/ false);
- }
-
- std::vector<TString> DoListLayers(const TString& place)
- {
- TVector<TString> layers;
- ExecuteApiCall(
- [&] { return Api_->ListLayers(layers, place); },
- "ListLayers",
- /*idempotent*/ true);
- return {layers.begin(), layers.end()};
- }
-
- TCommandEntry* GetCommandEntry(const TString& command)
- {
- auto guard = Guard(CommandLock_);
- if (auto it = CommandToEntry_.find(command)) {
- return &it->second;
- }
- return &CommandToEntry_.emplace(command, TCommandEntry(Profiler_.WithTag("command", command))).first->second;
- }
-
- void ExecuteApiCall(
- std::function<EError()> callback,
- const TString& command,
- bool idempotent)
- {
- YT_LOG_DEBUG("Porto API call started (Command: %v)", command);
-
- if (IsTestPortoTimeout()) {
- YT_LOG_DEBUG("Testing Porto timeout (Command: %v)", command);
-
- auto config = DynamicConfig_.Acquire();
- TDelayedExecutor::WaitForDuration(config->ApiTimeout);
-
- THROW_ERROR CreatePortoError(GetFailedStubError(), "Porto timeout");
- }
-
- if (IsTestPortoFailureEnabled()) {
- YT_LOG_DEBUG("Testing Porto failure (Command: %v)", command);
- THROW_ERROR CreatePortoError(GetFailedStubError(), "Porto stub error");
- }
-
- auto* entry = GetCommandEntry(command);
- auto startTime = NProfiling::GetInstant();
- while (true) {
- EError error;
-
- {
- NProfiling::TWallTimer timer;
- error = callback();
- entry->TimeGauge.Record(timer.GetElapsedTime());
- }
-
- if (error == EError::Success) {
- entry->SuccessCounter.Increment();
- break;
- }
-
- entry->FailureCounter.Increment();
- HandleApiError(command, startTime, idempotent);
-
- YT_LOG_DEBUG("Sleeping and retrying Porto API call (Command: %v)", command);
- entry->RetryCounter.Increment();
-
- TDelayedExecutor::WaitForDuration(RetryInterval);
- }
-
- YT_LOG_DEBUG("Porto API call completed (Command: %v)", command);
- }
-
- void HandleApiError(
- const TString& command,
- TInstant startTime,
- bool idempotent)
- {
- TString errorMessage;
- auto error = ConvertPortoErrorCode(Api_->GetLastError(errorMessage));
-
- // These errors are typical during job cleanup: we might try to kill a container that is already stopped.
- bool debug = (error == EPortoErrorCode::ContainerDoesNotExist || error == EPortoErrorCode::InvalidState);
- YT_LOG_EVENT(
- Logger,
- debug ? NLogging::ELogLevel::Debug : NLogging::ELogLevel::Error,
- "Porto API call error (Error: %v, Command: %v, Message: %v)",
- error,
- command,
- errorMessage);
-
- if (!IsRetriableErrorCode(error, idempotent) || NProfiling::GetInstant() - startTime > Config_->RetriesTimeout) {
- THROW_ERROR CreatePortoError(error, errorMessage);
- }
- }
-
- void HandleResult(const TString& container, const Porto::TGetResponse::TContainerGetValueResponse& rsp)
- {
- auto portoErrorCode = ConvertPortoErrorCode(rsp.error());
- auto it = ContainerMap_.find(container);
- if (it == ContainerMap_.end()) {
- YT_LOG_ERROR("Got an unexpected container "
- "(Container: %v, ResponseError: %v, ErrorMessage: %v, Value: %v)",
- container,
- portoErrorCode,
- rsp.errormsg(),
- rsp.value());
- return;
- } else {
- if (portoErrorCode != EPortoErrorCode::Success) {
- YT_LOG_ERROR("Container finished with Porto API error "
- "(Container: %v, ResponseError: %v, ErrorMessage: %v, Value: %v)",
- container,
- portoErrorCode,
- rsp.errormsg(),
- rsp.value());
- it->second.Set(CreatePortoError(portoErrorCode, rsp.errormsg()));
- } else {
- try {
- int exitStatus = std::stoi(rsp.value());
- YT_LOG_DEBUG("Container finished with exit code (Container: %v, ExitCode: %v)",
- container,
- exitStatus);
-
- it->second.Set(exitStatus);
- } catch (const std::exception& ex) {
- it->second.Set(TError("Failed to parse Porto exit status") << ex);
- }
- }
- }
- RemoveFromPoller(container);
- }
-
- void RemoveFromPoller(const TString& container)
- {
- ContainerMap_.erase(container);
-
- Containers_.clear();
- for (const auto& [name, pid] : ContainerMap_) {
- Containers_.push_back(name);
- }
- }
-};
-
-const std::vector<TString> TPortoExecutor::ContainerRequestVars_ = {
- "state",
- "exit_status"
-};
-
-////////////////////////////////////////////////////////////////////////////////
-
-IPortoExecutorPtr CreatePortoExecutor(
- TPortoExecutorDynamicConfigPtr config,
- const TString& threadNameSuffix,
- const NProfiling::TProfiler& profiler)
-{
- return New<TPortoExecutor>(
- std::move(config),
- threadNameSuffix,
- profiler);
-}
-
-////////////////////////////////////////////////////////////////////////////////
-
-#else
-
-IPortoExecutorPtr CreatePortoExecutor(
- TPortoExecutorDynamicConfigPtr /* config */,
- const TString& /* threadNameSuffix */,
- const NProfiling::TProfiler& /* profiler */)
-{
- THROW_ERROR_EXCEPTION("Porto executor is not available on this platform");
-}
-
-#endif
-
-////////////////////////////////////////////////////////////////////////////////
-
-} // namespace NYT::NContainers
diff --git a/yt/yt/library/containers/porto_executor.h b/yt/yt/library/containers/porto_executor.h
deleted file mode 100644
index d629ab6275..0000000000
--- a/yt/yt/library/containers/porto_executor.h
+++ /dev/null
@@ -1,142 +0,0 @@
-#pragma once
-
-#include "public.h"
-
-#include <yt/yt/library/profiling/sensor.h>
-
-#include <yt/yt/core/actions/future.h>
-#include <yt/yt/core/actions/signal.h>
-
-#include <yt/yt/core/net/address.h>
-
-#include <library/cpp/porto/libporto.hpp>
-
-namespace NYT::NContainers {
-
-////////////////////////////////////////////////////////////////////////////////
-
-struct TVolumeId
-{
- TString Path;
-};
-
-////////////////////////////////////////////////////////////////////////////////
-
-struct TRunnableContainerSpec
-{
- TString Name;
- TString Command;
-
- EEnablePorto EnablePorto = EEnablePorto::None;
- bool Isolate = true;
-
- std::optional<TString> StdinPath;
- std::optional<TString> StdoutPath;
- std::optional<TString> StderrPath;
- std::optional<TString> CurrentWorkingDirectory;
- std::optional<TString> CoreCommand;
- std::optional<TString> User;
- std::optional<int> GroupId;
-
- bool EnableCoreDumps = true;
-
- std::optional<i64> ThreadLimit;
-
- std::optional<TString> HostName;
- std::vector<NYT::NNet::TIP6Address> IPAddresses;
- bool EnableNat64 = false;
- bool DisableNetwork = false;
-
- THashMap<TString, TString> Labels;
- THashMap<TString, TString> Env;
- std::vector<TString> CGroupControllers;
- std::vector<TDevice> Devices;
- std::optional<TRootFS> RootFS;
-};
-
-////////////////////////////////////////////////////////////////////////////////
-
-struct IPortoExecutor
- : public TRefCounted
-{
- virtual void OnDynamicConfigChanged(const TPortoExecutorDynamicConfigPtr& newConfig) = 0;
-
- virtual TFuture<void> CreateContainer(const TString& container) = 0;
-
- virtual TFuture<void> CreateContainer(const TRunnableContainerSpec& containerSpec, bool start) = 0;
-
- virtual TFuture<void> SetContainerProperty(
- const TString& container,
- const TString& property,
- const TString& value) = 0;
-
- virtual TFuture<std::optional<TString>> GetContainerProperty(
- const TString& container,
- const TString& property) = 0;
-
- virtual TFuture<THashMap<TString, TErrorOr<TString>>> GetContainerProperties(
- const TString& container,
- const std::vector<TString>& properties) = 0;
- virtual TFuture<THashMap<TString, THashMap<TString, TErrorOr<TString>>>> GetContainerProperties(
- const std::vector<TString>& containers,
- const std::vector<TString>& properties) = 0;
-
- virtual TFuture<THashMap<TString, i64>> GetContainerMetrics(
- const std::vector<TString>& containers,
- const TString& metric) = 0;
- virtual TFuture<void> DestroyContainer(const TString& container) = 0;
- virtual TFuture<void> StopContainer(const TString& container) = 0;
- virtual TFuture<void> StartContainer(const TString& container) = 0;
- virtual TFuture<void> KillContainer(const TString& container, int signal) = 0;
-
- virtual TFuture<TString> ConvertPath(const TString& path, const TString& container) = 0;
-
- // Returns absolute names of immediate children only.
- virtual TFuture<std::vector<TString>> ListSubcontainers(
- const TString& rootContainer,
- bool includeRoot) = 0;
- // Starts polling a given container, returns future with exit code of finished process.
- virtual TFuture<int> PollContainer(const TString& container) = 0;
-
- // Returns future with exit code of finished process.
- // NB: temporarily broken, see https://st.yandex-team.ru/PORTO-846 for details.
- virtual TFuture<int> WaitContainer(const TString& container) = 0;
-
- virtual TFuture<TString> CreateVolume(
- const TString& path,
- const THashMap<TString, TString>& properties) = 0;
- virtual TFuture<void> LinkVolume(
- const TString& path,
- const TString& name) = 0;
- virtual TFuture<void> UnlinkVolume(
- const TString& path,
- const TString& name) = 0;
- virtual TFuture<std::vector<TString>> ListVolumePaths() = 0;
-
- virtual TFuture<void> ImportLayer(
- const TString& archivePath,
- const TString& layerId,
- const TString& place) = 0;
- virtual TFuture<void> RemoveLayer(
- const TString& layerId,
- const TString& place,
- bool async) = 0;
- virtual TFuture<std::vector<TString>> ListLayers(const TString& place) = 0;
-
- virtual IInvokerPtr GetInvoker() const = 0;
-
- DECLARE_INTERFACE_SIGNAL(void(const TError&), Failed);
-};
-
-DEFINE_REFCOUNTED_TYPE(IPortoExecutor)
-
-////////////////////////////////////////////////////////////////////////////////
-
-IPortoExecutorPtr CreatePortoExecutor(
- TPortoExecutorDynamicConfigPtr config,
- const TString& threadNameSuffix,
- const NProfiling::TProfiler& profiler = {});
-
-////////////////////////////////////////////////////////////////////////////////
-
-} // namespace NYT::NContainers
diff --git a/yt/yt/library/containers/porto_health_checker.cpp b/yt/yt/library/containers/porto_health_checker.cpp
deleted file mode 100644
index 5a5d358441..0000000000
--- a/yt/yt/library/containers/porto_health_checker.cpp
+++ /dev/null
@@ -1,69 +0,0 @@
-
-#include "porto_health_checker.h"
-
-#include "porto_executor.h"
-#include "private.h"
-#include "config.h"
-
-#include <yt/yt/core/actions/future.h>
-
-#include <yt/yt/core/misc/fs.h>
-
-#include <util/random/random.h>
-
-namespace NYT::NContainers {
-
-using namespace NConcurrency;
-using namespace NLogging;
-using namespace NProfiling;
-
-////////////////////////////////////////////////////////////////////////////////
-
-TPortoHealthChecker::TPortoHealthChecker(
- TPortoExecutorDynamicConfigPtr config,
- IInvokerPtr invoker,
- TLogger logger)
- : Config_(std::move(config))
- , Logger(std::move(logger))
- , CheckInvoker_(std::move(invoker))
- , Executor_(CreatePortoExecutor(
- Config_,
- "porto_check"))
-{ }
-
-void TPortoHealthChecker::Start()
-{
- YT_LOG_DEBUG("Porto health checker started");
-
- PeriodicExecutor_ = New<TPeriodicExecutor>(
- CheckInvoker_,
- BIND(&TPortoHealthChecker::OnCheck, MakeWeak(this)),
- Config_->RetriesTimeout);
- PeriodicExecutor_->Start();
-}
-
-void TPortoHealthChecker::OnDynamicConfigChanged(const TPortoExecutorDynamicConfigPtr& newConfig)
-{
- YT_LOG_DEBUG(
- "Porto health checker dynamic config changed (EnableTestPortoFailures: %v, StubErrorCode: %v)",
- Config_->EnableTestPortoFailures,
- Config_->StubErrorCode);
-
- Executor_->OnDynamicConfigChanged(newConfig);
-}
-
-void TPortoHealthChecker::OnCheck()
-{
- YT_LOG_DEBUG("Run Porto health check");
-
- auto result = WaitFor(Executor_->ListVolumePaths().AsVoid());
- if (result.IsOK()) {
- Success_.Fire();
- } else {
- Failed_.Fire(result);
- }
-}
-
-////////////////////////////////////////////////////////////////////////////////
-
-} // namespace NYT::NContainers
diff --git a/yt/yt/library/containers/porto_health_checker.h b/yt/yt/library/containers/porto_health_checker.h
deleted file mode 100644
index f0fb8f0908..0000000000
--- a/yt/yt/library/containers/porto_health_checker.h
+++ /dev/null
@@ -1,52 +0,0 @@
-#pragma once
-
-#include "public.h"
-
-#include <yt/yt/library/profiling/sensor.h>
-
-#include <yt/yt/core/actions/signal.h>
-
-#include <yt/yt/core/concurrency/periodic_executor.h>
-
-#include <yt/yt/core/logging/log.h>
-
-#include <yt/yt/core/misc/error.h>
-
-#include <atomic>
-
-namespace NYT::NContainers {
-
-////////////////////////////////////////////////////////////////////////////////
-
-class TPortoHealthChecker
- : public TRefCounted
-{
-public:
- TPortoHealthChecker(
- TPortoExecutorDynamicConfigPtr config,
- IInvokerPtr invoker,
- NLogging::TLogger logger);
-
- void Start();
-
- void OnDynamicConfigChanged(const TPortoExecutorDynamicConfigPtr& newConfig);
-
- DEFINE_SIGNAL(void(), Success);
-
- DEFINE_SIGNAL(void(const TError&), Failed);
-
-private:
- const TPortoExecutorDynamicConfigPtr Config_;
- const NLogging::TLogger Logger;
- const IInvokerPtr CheckInvoker_;
- const IPortoExecutorPtr Executor_;
- NConcurrency::TPeriodicExecutorPtr PeriodicExecutor_;
-
- void OnCheck();
-};
-
-DEFINE_REFCOUNTED_TYPE(TPortoHealthChecker)
-
-////////////////////////////////////////////////////////////////////////////////
-
-} // namespace NYT::NContainers
diff --git a/yt/yt/library/containers/porto_resource_tracker.cpp b/yt/yt/library/containers/porto_resource_tracker.cpp
deleted file mode 100644
index c1fe48d6af..0000000000
--- a/yt/yt/library/containers/porto_resource_tracker.cpp
+++ /dev/null
@@ -1,711 +0,0 @@
-#include "porto_resource_tracker.h"
-#include "private.h"
-
-#include <yt/yt/core/logging/log.h>
-
-#include <yt/yt/core/misc/error.h>
-
-#include <yt/yt/core/net/address.h>
-
-#include <yt/yt/core/ytree/public.h>
-
-#include <yt/yt/library/process/process.h>
-
-#include <yt/yt/library/containers/cgroup.h>
-#include <yt/yt/library/containers/config.h>
-#include <yt/yt/library/containers/instance.h>
-#include <yt/yt/library/containers/porto_executor.h>
-#include <yt/yt/library/containers/public.h>
-
-namespace NYT::NContainers {
-
-using namespace NProfiling;
-
-static const auto& Logger = ContainersLogger;
-
-#ifdef _linux_
-
-////////////////////////////////////////////////////////////////////////////////
-
-struct TPortoProfilers
- : public TRefCounted
-{
- TPortoResourceProfilerPtr DaemonProfiler;
- TPortoResourceProfilerPtr ContainerProfiler;
-
- TPortoProfilers(
- TPortoResourceProfilerPtr daemonProfiler,
- TPortoResourceProfilerPtr containerProfiler)
- : DaemonProfiler(std::move(daemonProfiler))
- , ContainerProfiler(std::move(containerProfiler))
- { }
-};
-
-DEFINE_REFCOUNTED_TYPE(TPortoProfilers)
-
-////////////////////////////////////////////////////////////////////////////////
-
-static TErrorOr<ui64> GetFieldOrError(
- const TResourceUsage& usage,
- EStatField field)
-{
- auto it = usage.find(field);
- if (it == usage.end()) {
- return TError("Resource usage is missing %Qlv field", field);
- }
- const auto& errorOrValue = it->second;
- if (errorOrValue.FindMatching(EPortoErrorCode::NotSupported)) {
- return TError("Property %Qlv not supported in Porto response", field);
- }
- return errorOrValue;
-}
-
-////////////////////////////////////////////////////////////////////////////////
-
-TPortoResourceTracker::TPortoResourceTracker(
- IInstancePtr instance,
- TDuration updatePeriod,
- bool isDeltaTracker,
- bool isForceUpdate)
- : Instance_(std::move(instance))
- , UpdatePeriod_(updatePeriod)
- , IsDeltaTracker_(isDeltaTracker)
- , IsForceUpdate_(isForceUpdate)
-{
- ResourceUsage_ = {
- {EStatField::IOReadByte, 0},
- {EStatField::IOWriteByte, 0},
- {EStatField::IOBytesLimit, 0},
- {EStatField::IOReadOps, 0},
- {EStatField::IOWriteOps, 0},
- {EStatField::IOOps, 0},
- {EStatField::IOOpsLimit, 0},
- {EStatField::IOTotalTime, 0},
- {EStatField::IOWaitTime, 0}
- };
- ResourceUsageDelta_ = ResourceUsage_;
-}
-
-static TErrorOr<TDuration> ExtractDuration(TErrorOr<ui64> timeNs)
-{
- if (timeNs.IsOK()) {
- return TErrorOr<TDuration>(TDuration::MicroSeconds(timeNs.Value() / 1000));
- } else {
- return TError(timeNs);
- }
-}
-
-TCpuStatistics TPortoResourceTracker::ExtractCpuStatistics(const TResourceUsage& resourceUsage) const
-{
- // NB: Job proxy uses last sample of CPU statistics but we are interested in
- // peak thread count value.
- auto currentThreadCountPeak = GetFieldOrError(resourceUsage, EStatField::ThreadCount);
-
- PeakThreadCount_ = currentThreadCountPeak.IsOK() && PeakThreadCount_.IsOK()
- ? std::max<ui64>(
- PeakThreadCount_.Value(),
- currentThreadCountPeak.Value())
- : currentThreadCountPeak.IsOK() ? currentThreadCountPeak : PeakThreadCount_;
-
- auto totalTimeNs = GetFieldOrError(resourceUsage, EStatField::CpuUsage);
- auto systemTimeNs = GetFieldOrError(resourceUsage, EStatField::CpuSystemUsage);
- auto userTimeNs = GetFieldOrError(resourceUsage, EStatField::CpuUserUsage);
- auto waitTimeNs = GetFieldOrError(resourceUsage, EStatField::CpuWait);
- auto throttledNs = GetFieldOrError(resourceUsage, EStatField::CpuThrottled);
- auto limitTimeNs = GetFieldOrError(resourceUsage, EStatField::CpuLimit);
- auto guaranteeTimeNs = GetFieldOrError(resourceUsage, EStatField::CpuGuarantee);
-
- return TCpuStatistics{
- .TotalUsageTime = ExtractDuration(totalTimeNs),
- .UserUsageTime = ExtractDuration(userTimeNs),
- .SystemUsageTime = ExtractDuration(systemTimeNs),
- .WaitTime = ExtractDuration(waitTimeNs),
- .ThrottledTime = ExtractDuration(throttledNs),
- .ThreadCount = GetFieldOrError(resourceUsage, EStatField::ThreadCount),
- .ContextSwitches = GetFieldOrError(resourceUsage, EStatField::ContextSwitches),
- .ContextSwitchesDelta = GetFieldOrError(resourceUsage, EStatField::ContextSwitchesDelta),
- .PeakThreadCount = PeakThreadCount_,
- .LimitTime = ExtractDuration(limitTimeNs),
- .GuaranteeTime = ExtractDuration(guaranteeTimeNs),
- };
-}
-
-TMemoryStatistics TPortoResourceTracker::ExtractMemoryStatistics(const TResourceUsage& resourceUsage) const
-{
- return TMemoryStatistics{
- .Rss = GetFieldOrError(resourceUsage, EStatField::Rss),
- .MappedFile = GetFieldOrError(resourceUsage, EStatField::MappedFile),
- .MinorPageFaults = GetFieldOrError(resourceUsage, EStatField::MinorPageFaults),
- .MajorPageFaults = GetFieldOrError(resourceUsage, EStatField::MajorPageFaults),
- .FileCacheUsage = GetFieldOrError(resourceUsage, EStatField::FileCacheUsage),
- .AnonUsage = GetFieldOrError(resourceUsage, EStatField::AnonMemoryUsage),
- .AnonLimit = GetFieldOrError(resourceUsage, EStatField::AnonMemoryLimit),
- .MemoryUsage = GetFieldOrError(resourceUsage, EStatField::MemoryUsage),
- .MemoryGuarantee = GetFieldOrError(resourceUsage, EStatField::MemoryGuarantee),
- .MemoryLimit = GetFieldOrError(resourceUsage, EStatField::MemoryLimit),
- .MaxMemoryUsage = GetFieldOrError(resourceUsage, EStatField::MaxMemoryUsage),
- .OomKills = GetFieldOrError(resourceUsage, EStatField::OomKills),
- .OomKillsTotal = GetFieldOrError(resourceUsage, EStatField::OomKillsTotal)
- };
-}
-
-TBlockIOStatistics TPortoResourceTracker::ExtractBlockIOStatistics(const TResourceUsage& resourceUsage) const
-{
- auto totalTimeNs = GetFieldOrError(resourceUsage, EStatField::IOTotalTime);
- auto waitTimeNs = GetFieldOrError(resourceUsage, EStatField::IOWaitTime);
-
- return TBlockIOStatistics{
- .IOReadByte = GetFieldOrError(resourceUsage, EStatField::IOReadByte),
- .IOWriteByte = GetFieldOrError(resourceUsage, EStatField::IOWriteByte),
- .IOBytesLimit = GetFieldOrError(resourceUsage, EStatField::IOBytesLimit),
- .IOReadOps = GetFieldOrError(resourceUsage, EStatField::IOReadOps),
- .IOWriteOps = GetFieldOrError(resourceUsage, EStatField::IOWriteOps),
- .IOOps = GetFieldOrError(resourceUsage, EStatField::IOOps),
- .IOOpsLimit = GetFieldOrError(resourceUsage, EStatField::IOOpsLimit),
- .IOTotalTime = ExtractDuration(totalTimeNs),
- .IOWaitTime = ExtractDuration(waitTimeNs)
- };
-}
-
-TNetworkStatistics TPortoResourceTracker::ExtractNetworkStatistics(const TResourceUsage& resourceUsage) const
-{
- return TNetworkStatistics{
- .TxBytes = GetFieldOrError(resourceUsage, EStatField::NetTxBytes),
- .TxPackets = GetFieldOrError(resourceUsage, EStatField::NetTxPackets),
- .TxDrops = GetFieldOrError(resourceUsage, EStatField::NetTxDrops),
- .TxLimit = GetFieldOrError(resourceUsage, EStatField::NetTxLimit),
-
- .RxBytes = GetFieldOrError(resourceUsage, EStatField::NetRxBytes),
- .RxPackets = GetFieldOrError(resourceUsage, EStatField::NetRxPackets),
- .RxDrops = GetFieldOrError(resourceUsage, EStatField::NetRxDrops),
- .RxLimit = GetFieldOrError(resourceUsage, EStatField::NetRxLimit),
- };
-}
-
-TTotalStatistics TPortoResourceTracker::ExtractTotalStatistics(const TResourceUsage& resourceUsage) const
-{
- return TTotalStatistics{
- .CpuStatistics = ExtractCpuStatistics(resourceUsage),
- .MemoryStatistics = ExtractMemoryStatistics(resourceUsage),
- .BlockIOStatistics = ExtractBlockIOStatistics(resourceUsage),
- .NetworkStatistics = ExtractNetworkStatistics(resourceUsage),
- };
-}
-
-TCpuStatistics TPortoResourceTracker::GetCpuStatistics() const
-{
- return GetStatistics(
- CachedCpuStatistics_,
- "CPU",
- [&] (TResourceUsage& resourceUsage) {
- return ExtractCpuStatistics(resourceUsage);
- });
-}
-
-TMemoryStatistics TPortoResourceTracker::GetMemoryStatistics() const
-{
- return GetStatistics(
- CachedMemoryStatistics_,
- "memory",
- [&] (TResourceUsage& resourceUsage) {
- return ExtractMemoryStatistics(resourceUsage);
- });
-}
-
-TBlockIOStatistics TPortoResourceTracker::GetBlockIOStatistics() const
-{
- return GetStatistics(
- CachedBlockIOStatistics_,
- "block IO",
- [&] (TResourceUsage& resourceUsage) {
- return ExtractBlockIOStatistics(resourceUsage);
- });
-}
-
-TNetworkStatistics TPortoResourceTracker::GetNetworkStatistics() const
-{
- return GetStatistics(
- CachedNetworkStatistics_,
- "network",
- [&] (TResourceUsage& resourceUsage) {
- return ExtractNetworkStatistics(resourceUsage);
- });
-}
-
-TTotalStatistics TPortoResourceTracker::GetTotalStatistics() const
-{
- return GetStatistics(
- CachedTotalStatistics_,
- "total",
- [&] (TResourceUsage& resourceUsage) {
- return ExtractTotalStatistics(resourceUsage);
- });
-}
-
-template <class T, class F>
-T TPortoResourceTracker::GetStatistics(
- std::optional<T>& cachedStatistics,
- const TString& statisticsKind,
- F extractor) const
-{
- UpdateResourceUsageStatisticsIfExpired();
-
- auto guard = Guard(SpinLock_);
- try {
- auto newStatistics = extractor(IsDeltaTracker_ ? ResourceUsageDelta_ : ResourceUsage_);
- cachedStatistics = newStatistics;
- return newStatistics;
- } catch (const std::exception& ex) {
- if (!cachedStatistics) {
- THROW_ERROR_EXCEPTION("Unable to get %v statistics", statisticsKind)
- << ex;
- }
- YT_LOG_WARNING(ex, "Unable to get %v statistics; using the last one", statisticsKind);
- return *cachedStatistics;
- }
-}
-
-bool TPortoResourceTracker::AreResourceUsageStatisticsExpired() const
-{
- return TInstant::Now() - LastUpdateTime_.load() > UpdatePeriod_;
-}
-
-TInstant TPortoResourceTracker::GetLastUpdateTime() const
-{
- return LastUpdateTime_.load();
-}
-
-void TPortoResourceTracker::UpdateResourceUsageStatisticsIfExpired() const
-{
- if (IsForceUpdate_ || AreResourceUsageStatisticsExpired()) {
- DoUpdateResourceUsage();
- }
-}
-
-TErrorOr<ui64> TPortoResourceTracker::CalculateCounterDelta(
- const TErrorOr<ui64>& oldValue,
- const TErrorOr<ui64>& newValue) const
-{
- if (oldValue.IsOK() && newValue.IsOK()) {
- return newValue.Value() - oldValue.Value();
- } else if (newValue.IsOK()) {
- // It is better to return an error than an incorrect value.
- return oldValue;
- } else {
- return newValue;
- }
-}
-
-static bool IsCumulativeStatistics(EStatField statistic)
-{
- return
- statistic == EStatField::CpuUsage ||
- statistic == EStatField::CpuUserUsage ||
- statistic == EStatField::CpuSystemUsage ||
- statistic == EStatField::CpuWait ||
- statistic == EStatField::CpuThrottled ||
-
- statistic == EStatField::ContextSwitches ||
-
- statistic == EStatField::MinorPageFaults ||
- statistic == EStatField::MajorPageFaults ||
-
- statistic == EStatField::IOReadByte ||
- statistic == EStatField::IOWriteByte ||
- statistic == EStatField::IOReadOps ||
- statistic == EStatField::IOWriteOps ||
- statistic == EStatField::IOOps ||
- statistic == EStatField::IOTotalTime ||
- statistic == EStatField::IOWaitTime ||
-
- statistic == EStatField::NetTxBytes ||
- statistic == EStatField::NetTxPackets ||
- statistic == EStatField::NetTxDrops ||
- statistic == EStatField::NetRxBytes ||
- statistic == EStatField::NetRxPackets ||
- statistic == EStatField::NetRxDrops;
-}
-
-void TPortoResourceTracker::ReCalculateResourceUsage(const TResourceUsage& newResourceUsage) const
-{
- auto guard = Guard(SpinLock_);
-
- TResourceUsage resourceUsage;
- TResourceUsage resourceUsageDelta;
-
- for (const auto& stat : InstanceStatFields) {
- TErrorOr<ui64> oldValue;
- TErrorOr<ui64> newValue;
-
- if (auto newValueIt = newResourceUsage.find(stat); newValueIt.IsEnd()) {
- newValue = TError("Missing property %Qlv in Porto response", stat)
- << TErrorAttribute("container", Instance_->GetName());
- } else {
- newValue = newValueIt->second;
- }
-
- if (auto oldValueIt = ResourceUsage_.find(stat); oldValueIt.IsEnd()) {
- oldValue = newValue;
- } else {
- oldValue = oldValueIt->second;
- }
-
- if (newValue.IsOK()) {
- resourceUsage[stat] = newValue;
- } else {
- resourceUsage[stat] = oldValue;
- }
-
- if (IsCumulativeStatistics(stat)) {
- resourceUsageDelta[stat] = CalculateCounterDelta(oldValue, newValue);
- } else {
- if (newValue.IsOK()) {
- resourceUsageDelta[stat] = newValue;
- } else {
- resourceUsageDelta[stat] = oldValue;
- }
- }
- }
-
- ResourceUsage_ = resourceUsage;
- ResourceUsageDelta_ = resourceUsageDelta;
- LastUpdateTime_.store(TInstant::Now());
-}
-
-void TPortoResourceTracker::DoUpdateResourceUsage() const
-{
- try {
- ReCalculateResourceUsage(Instance_->GetResourceUsage());
- } catch (const std::exception& ex) {
- YT_LOG_ERROR(
- ex,
- "Couldn't get metrics from Porto");
- }
-}
-
-////////////////////////////////////////////////////////////////////////////////
-
-TPortoResourceProfiler::TPortoResourceProfiler(
- TPortoResourceTrackerPtr tracker,
- TPodSpecConfigPtr podSpec,
- const TProfiler& profiler)
- : ResourceTracker_(std::move(tracker))
- , PodSpec_(std::move(podSpec))
-{
- profiler.AddProducer("", MakeStrong(this));
-}
-
-static void WriteGaugeIfOk(
- ISensorWriter* writer,
- const TString& path,
- TErrorOr<ui64> valueOrError)
-{
- if (valueOrError.IsOK()) {
- i64 value = static_cast<i64>(valueOrError.Value());
-
- if (value >= 0) {
- writer->AddGauge(path, value);
- }
- }
-}
-
-static void WriteCumulativeGaugeIfOk(
- ISensorWriter* writer,
- const TString& path,
- TErrorOr<ui64> valueOrError,
- i64 timeDeltaUsec)
-{
- if (valueOrError.IsOK()) {
- i64 value = static_cast<i64>(valueOrError.Value());
-
- if (value >= 0) {
- writer->AddGauge(path,
- 1.0 * value * ResourceUsageUpdatePeriod.MicroSeconds() / timeDeltaUsec);
- }
- }
-}
-
-void TPortoResourceProfiler::WriteCpuMetrics(
- ISensorWriter* writer,
- TTotalStatistics& totalStatistics,
- i64 timeDeltaUsec)
-{
- {
- if (totalStatistics.CpuStatistics.UserUsageTime.IsOK()) {
- i64 userUsageTimeUs = totalStatistics.CpuStatistics.UserUsageTime.Value().MicroSeconds();
- double userUsagePercent = std::max<double>(0.0, 100. * userUsageTimeUs / timeDeltaUsec);
- writer->AddGauge("/cpu/user", userUsagePercent);
- }
-
- if (totalStatistics.CpuStatistics.SystemUsageTime.IsOK()) {
- i64 systemUsageTimeUs = totalStatistics.CpuStatistics.SystemUsageTime.Value().MicroSeconds();
- double systemUsagePercent = std::max<double>(0.0, 100. * systemUsageTimeUs / timeDeltaUsec);
- writer->AddGauge("/cpu/system", systemUsagePercent);
- }
-
- if (totalStatistics.CpuStatistics.WaitTime.IsOK()) {
- i64 waitTimeUs = totalStatistics.CpuStatistics.WaitTime.Value().MicroSeconds();
- double waitPercent = std::max<double>(0.0, 100. * waitTimeUs / timeDeltaUsec);
- writer->AddGauge("/cpu/wait", waitPercent);
- }
-
- if (totalStatistics.CpuStatistics.ThrottledTime.IsOK()) {
- i64 throttledTimeUs = totalStatistics.CpuStatistics.ThrottledTime.Value().MicroSeconds();
- double throttledPercent = std::max<double>(0.0, 100. * throttledTimeUs / timeDeltaUsec);
- writer->AddGauge("/cpu/throttled", throttledPercent);
- }
-
- if (totalStatistics.CpuStatistics.TotalUsageTime.IsOK()) {
- i64 totalUsageTimeUs = totalStatistics.CpuStatistics.TotalUsageTime.Value().MicroSeconds();
- double totalUsagePercent = std::max<double>(0.0, 100. * totalUsageTimeUs / timeDeltaUsec);
- writer->AddGauge("/cpu/total", totalUsagePercent);
- }
-
- if (totalStatistics.CpuStatistics.GuaranteeTime.IsOK()) {
- i64 guaranteeTimeUs = totalStatistics.CpuStatistics.GuaranteeTime.Value().MicroSeconds();
- double guaranteePercent = std::max<double>(0.0, (100. * guaranteeTimeUs) / (1'000'000L));
- writer->AddGauge("/cpu/guarantee", guaranteePercent);
- }
-
- if (totalStatistics.CpuStatistics.LimitTime.IsOK()) {
- i64 limitTimeUs = totalStatistics.CpuStatistics.LimitTime.Value().MicroSeconds();
- double limitPercent = std::max<double>(0.0, (100. * limitTimeUs) / (1'000'000L));
- writer->AddGauge("/cpu/limit", limitPercent);
- }
- }
-
- if (PodSpec_->CpuToVCpuFactor) {
- auto factor = *PodSpec_->CpuToVCpuFactor;
-
- writer->AddGauge("/cpu_to_vcpu_factor", factor);
-
- if (totalStatistics.CpuStatistics.UserUsageTime.IsOK()) {
- i64 userUsageTimeUs = totalStatistics.CpuStatistics.UserUsageTime.Value().MicroSeconds();
- double userUsagePercent = std::max<double>(0.0, 100. * userUsageTimeUs * factor / timeDeltaUsec);
- writer->AddGauge("/vcpu/user", userUsagePercent);
- }
-
- if (totalStatistics.CpuStatistics.SystemUsageTime.IsOK()) {
- i64 systemUsageTimeUs = totalStatistics.CpuStatistics.SystemUsageTime.Value().MicroSeconds();
- double systemUsagePercent = std::max<double>(0.0, 100. * systemUsageTimeUs * factor / timeDeltaUsec);
- writer->AddGauge("/vcpu/system", systemUsagePercent);
- }
-
- if (totalStatistics.CpuStatistics.WaitTime.IsOK()) {
- i64 waitTimeUs = totalStatistics.CpuStatistics.WaitTime.Value().MicroSeconds();
- double waitPercent = std::max<double>(0.0, 100. * waitTimeUs * factor / timeDeltaUsec);
- writer->AddGauge("/vcpu/wait", waitPercent);
- }
-
- if (totalStatistics.CpuStatistics.ThrottledTime.IsOK()) {
- i64 throttledTimeUs = totalStatistics.CpuStatistics.ThrottledTime.Value().MicroSeconds();
- double throttledPercent = std::max<double>(0.0, 100. * throttledTimeUs * factor / timeDeltaUsec);
- writer->AddGauge("/vcpu/throttled", throttledPercent);
- }
-
- if (totalStatistics.CpuStatistics.TotalUsageTime.IsOK()) {
- i64 totalUsageTimeUs = totalStatistics.CpuStatistics.TotalUsageTime.Value().MicroSeconds();
- double totalUsagePercent = std::max<double>(0.0, 100. * totalUsageTimeUs * factor / timeDeltaUsec);
- writer->AddGauge("/vcpu/total", totalUsagePercent);
- }
-
- if (totalStatistics.CpuStatistics.GuaranteeTime.IsOK()) {
- i64 guaranteeTimeUs = totalStatistics.CpuStatistics.GuaranteeTime.Value().MicroSeconds();
- double guaranteePercent = std::max<double>(0.0, 100. * guaranteeTimeUs * factor / 1'000'000L);
- writer->AddGauge("/vcpu/guarantee", guaranteePercent);
- }
-
- if (totalStatistics.CpuStatistics.LimitTime.IsOK()) {
- i64 limitTimeUs = totalStatistics.CpuStatistics.LimitTime.Value().MicroSeconds();
- double limitPercent = std::max<double>(0.0, 100. * limitTimeUs * factor / 1'000'000L);
- writer->AddGauge("/vcpu/limit", limitPercent);
- }
- }
-
- WriteGaugeIfOk(writer, "/cpu/thread_count", totalStatistics.CpuStatistics.ThreadCount);
- WriteGaugeIfOk(writer, "/cpu/context_switches", totalStatistics.CpuStatistics.ContextSwitches);
-}
-
-void TPortoResourceProfiler::WriteMemoryMetrics(
- ISensorWriter* writer,
- TTotalStatistics& totalStatistics,
- i64 timeDeltaUsec)
-{
- WriteCumulativeGaugeIfOk(writer,
- "/memory/minor_page_faults",
- totalStatistics.MemoryStatistics.MinorPageFaults,
- timeDeltaUsec);
- WriteCumulativeGaugeIfOk(writer,
- "/memory/major_page_faults",
- totalStatistics.MemoryStatistics.MajorPageFaults,
- timeDeltaUsec);
-
- WriteGaugeIfOk(writer, "/memory/oom_kills", totalStatistics.MemoryStatistics.OomKills);
- WriteGaugeIfOk(writer, "/memory/oom_kills_total", totalStatistics.MemoryStatistics.OomKillsTotal);
-
- WriteGaugeIfOk(writer, "/memory/file_cache_usage", totalStatistics.MemoryStatistics.FileCacheUsage);
- WriteGaugeIfOk(writer, "/memory/anon_usage", totalStatistics.MemoryStatistics.AnonUsage);
- WriteGaugeIfOk(writer, "/memory/anon_limit", totalStatistics.MemoryStatistics.AnonLimit);
- WriteGaugeIfOk(writer, "/memory/memory_usage", totalStatistics.MemoryStatistics.MemoryUsage);
- WriteGaugeIfOk(writer, "/memory/memory_guarantee", totalStatistics.MemoryStatistics.MemoryGuarantee);
- WriteGaugeIfOk(writer, "/memory/memory_limit", totalStatistics.MemoryStatistics.MemoryLimit);
-}
-
-void TPortoResourceProfiler::WriteBlockingIOMetrics(
- ISensorWriter* writer,
- TTotalStatistics& totalStatistics,
- i64 timeDeltaUsec)
-{
- WriteCumulativeGaugeIfOk(writer,
- "/io/read_bytes",
- totalStatistics.BlockIOStatistics.IOReadByte,
- timeDeltaUsec);
- WriteCumulativeGaugeIfOk(writer,
- "/io/write_bytes",
- totalStatistics.BlockIOStatistics.IOWriteByte,
- timeDeltaUsec);
- WriteCumulativeGaugeIfOk(writer,
- "/io/read_ops",
- totalStatistics.BlockIOStatistics.IOReadOps,
- timeDeltaUsec);
- WriteCumulativeGaugeIfOk(writer,
- "/io/write_ops",
- totalStatistics.BlockIOStatistics.IOWriteOps,
- timeDeltaUsec);
- WriteCumulativeGaugeIfOk(writer,
- "/io/ops",
- totalStatistics.BlockIOStatistics.IOOps,
- timeDeltaUsec);
-
- WriteGaugeIfOk(writer,
- "/io/bytes_limit",
- totalStatistics.BlockIOStatistics.IOBytesLimit);
- WriteGaugeIfOk(writer,
- "/io/ops_limit",
- totalStatistics.BlockIOStatistics.IOOpsLimit);
-
- if (totalStatistics.BlockIOStatistics.IOTotalTime.IsOK()) {
- i64 totalTimeUs = totalStatistics.BlockIOStatistics.IOTotalTime.Value().MicroSeconds();
- double totalPercent = std::max<double>(0.0, 100. * totalTimeUs / timeDeltaUsec);
- writer->AddGauge("/io/total", totalPercent);
- }
-
- if (totalStatistics.BlockIOStatistics.IOWaitTime.IsOK()) {
- i64 waitTimeUs = totalStatistics.BlockIOStatistics.IOWaitTime.Value().MicroSeconds();
- double waitPercent = std::max<double>(0.0, 100. * waitTimeUs / timeDeltaUsec);
- writer->AddGauge("/io/wait", waitPercent);
- }
-}
-
-void TPortoResourceProfiler::WriteNetworkMetrics(
- ISensorWriter* writer,
- TTotalStatistics& totalStatistics,
- i64 timeDeltaUsec)
-{
- WriteCumulativeGaugeIfOk(
- writer,
- "/network/rx_bytes",
- totalStatistics.NetworkStatistics.RxBytes,
- timeDeltaUsec);
- WriteCumulativeGaugeIfOk(
- writer,
- "/network/rx_drops",
- totalStatistics.NetworkStatistics.RxDrops,
- timeDeltaUsec);
- WriteCumulativeGaugeIfOk(
- writer,
- "/network/rx_packets",
- totalStatistics.NetworkStatistics.RxPackets,
- timeDeltaUsec);
- WriteGaugeIfOk(
- writer,
- "/network/rx_limit",
- totalStatistics.NetworkStatistics.RxLimit);
-
- WriteCumulativeGaugeIfOk(
- writer,
- "/network/tx_bytes",
- totalStatistics.NetworkStatistics.TxBytes,
- timeDeltaUsec);
- WriteCumulativeGaugeIfOk(
- writer,
- "/network/tx_drops",
- totalStatistics.NetworkStatistics.TxDrops,
- timeDeltaUsec);
- WriteCumulativeGaugeIfOk(
- writer,
- "/network/tx_packets",
- totalStatistics.NetworkStatistics.TxPackets,
- timeDeltaUsec);
- WriteGaugeIfOk(
- writer,
- "/network/tx_limit",
- totalStatistics.NetworkStatistics.TxLimit);
-}
-
-void TPortoResourceProfiler::CollectSensors(ISensorWriter* writer)
-{
- i64 lastUpdate = ResourceTracker_->GetLastUpdateTime().MicroSeconds();
-
- auto totalStatistics = ResourceTracker_->GetTotalStatistics();
- i64 timeDeltaUsec = TInstant::Now().MicroSeconds() - lastUpdate;
-
- WriteCpuMetrics(writer, totalStatistics, timeDeltaUsec);
- WriteMemoryMetrics(writer, totalStatistics, timeDeltaUsec);
- WriteBlockingIOMetrics(writer, totalStatistics, timeDeltaUsec);
- WriteNetworkMetrics(writer, totalStatistics, timeDeltaUsec);
-}
-
-////////////////////////////////////////////////////////////////////////////////
-
-TPortoResourceProfilerPtr CreatePortoProfilerWithTags(
- const IInstancePtr& instance,
- const TString containerCategory,
- const TPodSpecConfigPtr& podSpec)
-{
- auto portoResourceTracker = New<TPortoResourceTracker>(
- instance,
- ResourceUsageUpdatePeriod,
- true,
- true);
-
- return New<TPortoResourceProfiler>(
- portoResourceTracker,
- podSpec,
- TProfiler("/porto")
- .WithTag("container_category", containerCategory));
-}
-
-////////////////////////////////////////////////////////////////////////////////
-
-#endif
-
-#ifdef __linux__
-void EnablePortoResourceTracker(const TPodSpecConfigPtr& podSpec)
-{
- BIND([=] {
- auto executor = CreatePortoExecutor(New<TPortoExecutorDynamicConfig>(), "porto-tracker");
-
- executor->SubscribeFailed(BIND([=] (const TError& error) {
- YT_LOG_ERROR(error, "Fatal error during Porto polling");
- }));
-
- LeakyRefCountedSingleton<TPortoProfilers>(
- CreatePortoProfilerWithTags(GetSelfPortoInstance(executor), "daemon", podSpec),
- CreatePortoProfilerWithTags(GetRootPortoInstance(executor), "pod", podSpec));
- }).AsyncVia(GetCurrentInvoker())
- .Run()
- .Subscribe(BIND([] (const TError& error) {
- YT_LOG_ERROR_IF(!error.IsOK(), error, "Failed to enable Porto profiler");
- }));
-}
-#else
-void EnablePortoResourceTracker(const TPodSpecConfigPtr& /*podSpec*/)
-{
- YT_LOG_WARNING("Porto resource tracker not supported");
-}
-#endif
-
-////////////////////////////////////////////////////////////////////////////////
-
-} // namespace NYT::NContainers
diff --git a/yt/yt/library/containers/porto_resource_tracker.h b/yt/yt/library/containers/porto_resource_tracker.h
deleted file mode 100644
index 8a0f781949..0000000000
--- a/yt/yt/library/containers/porto_resource_tracker.h
+++ /dev/null
@@ -1,158 +0,0 @@
-#pragma once
-
-#include <yt/yt/library/containers/instance.h>
-#include <yt/yt/library/containers/public.h>
-
-#include <yt/yt/library/containers/cgroup.h>
-
-#include <yt/yt/core/misc/singleton.h>
-#include <yt/yt/core/net/address.h>
-#include <yt/yt/core/ytree/public.h>
-
-#include <yt/yt/library/process/process.h>
-#include <yt/yt/library/profiling/producer.h>
-
-namespace NYT::NContainers {
-
-using namespace NProfiling;
-
-////////////////////////////////////////////////////////////////////////////////
-
-static constexpr auto ResourceUsageUpdatePeriod = TDuration::MilliSeconds(1000);
-
-////////////////////////////////////////////////////////////////////////////////
-
-using TCpuStatistics = TCpuAccounting::TStatistics;
-using TBlockIOStatistics = TBlockIO::TStatistics;
-using TMemoryStatistics = TMemory::TStatistics;
-using TNetworkStatistics = TNetwork::TStatistics;
-
-struct TTotalStatistics
-{
-public:
- TCpuStatistics CpuStatistics;
- TMemoryStatistics MemoryStatistics;
- TBlockIOStatistics BlockIOStatistics;
- TNetworkStatistics NetworkStatistics;
-};
-
-#ifdef _linux_
-
-////////////////////////////////////////////////////////////////////////////////
-
-class TPortoResourceTracker
- : public TRefCounted
-{
-public:
- TPortoResourceTracker(
- IInstancePtr instance,
- TDuration updatePeriod,
- bool isDeltaTracker = false,
- bool isForceUpdate = false);
-
- TCpuStatistics GetCpuStatistics() const;
-
- TBlockIOStatistics GetBlockIOStatistics() const;
-
- TMemoryStatistics GetMemoryStatistics() const;
-
- TNetworkStatistics GetNetworkStatistics() const;
-
- TTotalStatistics GetTotalStatistics() const;
-
- bool AreResourceUsageStatisticsExpired() const;
-
- TInstant GetLastUpdateTime() const;
-
-private:
- const IInstancePtr Instance_;
- const TDuration UpdatePeriod_;
- const bool IsDeltaTracker_;
- const bool IsForceUpdate_;
-
- mutable std::atomic<TInstant> LastUpdateTime_ = {};
-
- YT_DECLARE_SPIN_LOCK(NThreading::TSpinLock, SpinLock_);
- mutable TResourceUsage ResourceUsage_;
- mutable TResourceUsage ResourceUsageDelta_;
-
- mutable std::optional<TCpuStatistics> CachedCpuStatistics_;
- mutable std::optional<TMemoryStatistics> CachedMemoryStatistics_;
- mutable std::optional<TBlockIOStatistics> CachedBlockIOStatistics_;
- mutable std::optional<TNetworkStatistics> CachedNetworkStatistics_;
- mutable std::optional<TTotalStatistics> CachedTotalStatistics_;
- mutable TErrorOr<ui64> PeakThreadCount_ = 0;
-
- template <class T, class F>
- T GetStatistics(
- std::optional<T>& cachedStatistics,
- const TString& statisticsKind,
- F extractor) const;
-
- TCpuStatistics ExtractCpuStatistics(const TResourceUsage& resourceUsage) const;
- TMemoryStatistics ExtractMemoryStatistics(const TResourceUsage& resourceUsage) const;
- TBlockIOStatistics ExtractBlockIOStatistics(const TResourceUsage& resourceUsage) const;
- TNetworkStatistics ExtractNetworkStatistics(const TResourceUsage& resourceUsage) const;
- TTotalStatistics ExtractTotalStatistics(const TResourceUsage& resourceUsage) const;
-
- TErrorOr<ui64> CalculateCounterDelta(
- const TErrorOr<ui64>& oldValue,
- const TErrorOr<ui64>& newValue) const;
-
- void ReCalculateResourceUsage(const TResourceUsage& newResourceUsage) const;
-
- void UpdateResourceUsageStatisticsIfExpired() const;
-
- void DoUpdateResourceUsage() const;
-};
-
-DEFINE_REFCOUNTED_TYPE(TPortoResourceTracker)
-
-////////////////////////////////////////////////////////////////////////////////
-
-class TPortoResourceProfiler
- : public ISensorProducer
-{
-public:
- TPortoResourceProfiler(
- TPortoResourceTrackerPtr tracker,
- TPodSpecConfigPtr podSpec,
- const TProfiler& profiler = TProfiler{"/porto"});
-
- void CollectSensors(ISensorWriter* writer) override;
-
-private:
- const TPortoResourceTrackerPtr ResourceTracker_;
- const TPodSpecConfigPtr PodSpec_;
-
- void WriteCpuMetrics(
- ISensorWriter* writer,
- TTotalStatistics& totalStatistics,
- i64 timeDeltaUsec);
-
- void WriteMemoryMetrics(
- ISensorWriter* writer,
- TTotalStatistics& totalStatistics,
- i64 timeDeltaUsec);
-
- void WriteBlockingIOMetrics(
- ISensorWriter* writer,
- TTotalStatistics& totalStatistics,
- i64 timeDeltaUsec);
-
- void WriteNetworkMetrics(
- ISensorWriter* writer,
- TTotalStatistics& totalStatistics,
- i64 timeDeltaUsec);
-};
-
-DECLARE_REFCOUNTED_TYPE(TPortoResourceProfiler)
-DEFINE_REFCOUNTED_TYPE(TPortoResourceProfiler)
-
-////////////////////////////////////////////////////////////////////////////////
-
-#endif
-
-void EnablePortoResourceTracker(const TPodSpecConfigPtr& podSpec);
-
-} // namespace NYT::NContainers
diff --git a/yt/yt/library/containers/private.h b/yt/yt/library/containers/private.h
deleted file mode 100644
index 62682cb364..0000000000
--- a/yt/yt/library/containers/private.h
+++ /dev/null
@@ -1,13 +0,0 @@
-#pragma once
-
-#include <yt/yt/core/logging/log.h>
-
-namespace NYT::NContainers {
-
-////////////////////////////////////////////////////////////////////////////////
-
-inline const NLogging::TLogger ContainersLogger("Containers");
-
-////////////////////////////////////////////////////////////////////////////////
-
-} // namespace NYT::NContainers
diff --git a/yt/yt/library/containers/process.cpp b/yt/yt/library/containers/process.cpp
deleted file mode 100644
index ad1c8d35dc..0000000000
--- a/yt/yt/library/containers/process.cpp
+++ /dev/null
@@ -1,154 +0,0 @@
-#ifdef __linux__
-
-#include "process.h"
-
-#include <yt/yt/library/containers/instance.h>
-
-#include <yt/yt/core/misc/proc.h>
-#include <yt/yt/core/misc/fs.h>
-
-namespace NYT::NContainers {
-
-using namespace NPipes;
-using namespace NNet;
-using namespace NConcurrency;
-
-////////////////////////////////////////////////////////////////////////////////
-
-static inline const NLogging::TLogger Logger("Process");
-
-static constexpr pid_t InvalidProcessId = -1;
-
-////////////////////////////////////////////////////////////////////////////////
-
-TPortoProcess::TPortoProcess(
- const TString& path,
- IInstanceLauncherPtr containerLauncher,
- bool copyEnv)
- : TProcessBase(path)
- , ContainerLauncher_(std::move(containerLauncher))
-{
- AddArgument(NFS::GetFileName(path));
- if (copyEnv) {
- for (char** envIt = environ; *envIt; ++envIt) {
- Env_.push_back(Capture(*envIt));
- }
- }
-}
-
-void TPortoProcess::Kill(int signal)
-{
- if (auto instance = GetInstance()) {
- instance->Kill(signal);
- }
-}
-
-void TPortoProcess::DoSpawn()
-{
- YT_VERIFY(ProcessId_ == InvalidProcessId && !Finished_);
- YT_VERIFY(!GetInstance());
- YT_VERIFY(!Started_);
- YT_VERIFY(!Args_.empty());
-
- if (!WorkingDirectory_.empty()) {
- ContainerLauncher_->SetCwd(WorkingDirectory_);
- }
-
- Started_ = true;
-
- try {
- // TPortoProcess doesn't support running processes inside rootFS.
- YT_VERIFY(!ContainerLauncher_->HasRoot());
- std::vector<TString> args(Args_.begin() + 1, Args_.end());
- auto instance = WaitFor(ContainerLauncher_->Launch(ResolvedPath_, args, DecomposeEnv()))
- .ValueOrThrow();
- ContainerInstance_.Store(instance);
- FinishedPromise_.SetFrom(instance->Wait());
-
- try {
- ProcessId_ = instance->GetPid();
- } catch (const std::exception& ex) {
- // This could happen if Porto container has already died or pid namespace of
- // parent container is not a parent of pid namespace of child container.
- // It's not a problem, since for Porto process pid is used for logging purposes only.
- YT_LOG_DEBUG(ex, "Failed to get pid of root process (Container: %v)",
- instance->GetName());
- }
-
- YT_LOG_DEBUG("Process inside Porto spawned successfully (Path: %v, ExternalPid: %v, Container: %v)",
- ResolvedPath_,
- ProcessId_,
- instance->GetName());
-
- FinishedPromise_.ToFuture().Subscribe(BIND([=, this, this_ = MakeStrong(this)] (const TError& exitStatus) {
- Finished_ = true;
- if (exitStatus.IsOK()) {
- YT_LOG_DEBUG("Process inside Porto exited gracefully (ExternalPid: %v, Container: %v)",
- ProcessId_,
- instance->GetName());
- } else {
- YT_LOG_DEBUG(exitStatus, "Process inside Porto exited with an error (ExternalPid: %v, Container: %v)",
- ProcessId_,
- instance->GetName());
- }
- }));
- } catch (const std::exception& ex) {
- Finished_ = true;
- THROW_ERROR_EXCEPTION("Failed to start child process inside Porto")
- << TErrorAttribute("path", ResolvedPath_)
- << TErrorAttribute("container", ContainerLauncher_->GetName())
- << ex;
- }
-}
-
-IInstancePtr TPortoProcess::GetInstance()
-{
- return ContainerInstance_.Acquire();
-}
-
-THashMap<TString, TString> TPortoProcess::DecomposeEnv() const
-{
- THashMap<TString, TString> result;
- for (const auto& env : Env_) {
- TStringBuf name, value;
- TStringBuf(env).TrySplit('=', name, value);
- result[name] = value;
- }
- return result;
-}
-
-static TString CreateStdIONamedPipePath()
-{
- const TString name = ToString(TGuid::Create());
- return NFS::GetRealPath(NFS::CombinePaths("/tmp", name));
-}
-
-IConnectionWriterPtr TPortoProcess::GetStdInWriter()
-{
- auto pipe = TNamedPipe::Create(CreateStdIONamedPipePath());
- ContainerLauncher_->SetStdIn(pipe->GetPath());
- NamedPipes_.push_back(pipe);
- return pipe->CreateAsyncWriter();
-}
-
-IConnectionReaderPtr TPortoProcess::GetStdOutReader()
-{
- auto pipe = TNamedPipe::Create(CreateStdIONamedPipePath());
- ContainerLauncher_->SetStdOut(pipe->GetPath());
- NamedPipes_.push_back(pipe);
- return pipe->CreateAsyncReader();
-}
-
-IConnectionReaderPtr TPortoProcess::GetStdErrReader()
-{
- auto pipe = TNamedPipe::Create(CreateStdIONamedPipePath());
- ContainerLauncher_->SetStdErr(pipe->GetPath());
- NamedPipes_.push_back(pipe);
- return pipe->CreateAsyncReader();
-}
-
-////////////////////////////////////////////////////////////////////////////////
-
-} // namespace NYT::NContainers
-
-#endif
diff --git a/yt/yt/library/containers/process.h b/yt/yt/library/containers/process.h
deleted file mode 100644
index 75255165d8..0000000000
--- a/yt/yt/library/containers/process.h
+++ /dev/null
@@ -1,46 +0,0 @@
-#pragma once
-
-#include "public.h"
-
-#include <yt/yt/library/process/process.h>
-
-#include <library/cpp/yt/memory/atomic_intrusive_ptr.h>
-
-#include <library/cpp/porto/libporto.hpp>
-
-namespace NYT::NContainers {
-
-////////////////////////////////////////////////////////////////////////////////
-
-// NB(psushin): this class is deprecated and only used to run job proxy.
-// ToDo(psushin): kill me.
-class TPortoProcess
- : public TProcessBase
-{
-public:
- TPortoProcess(
- const TString& path,
- NContainers::IInstanceLauncherPtr containerLauncher,
- bool copyEnv = true);
- void Kill(int signal) override;
- NNet::IConnectionWriterPtr GetStdInWriter() override;
- NNet::IConnectionReaderPtr GetStdOutReader() override;
- NNet::IConnectionReaderPtr GetStdErrReader() override;
-
- NContainers::IInstancePtr GetInstance();
-
-private:
- const NContainers::IInstanceLauncherPtr ContainerLauncher_;
-
- TAtomicIntrusivePtr<NContainers::IInstance> ContainerInstance_;
- std::vector<NPipes::TNamedPipePtr> NamedPipes_;
-
- void DoSpawn() override;
- THashMap<TString, TString> DecomposeEnv() const;
-};
-
-DEFINE_REFCOUNTED_TYPE(TPortoProcess)
-
-////////////////////////////////////////////////////////////////////////////////
-
-} // namespace NYT::NContainers
diff --git a/yt/yt/library/containers/public.h b/yt/yt/library/containers/public.h
deleted file mode 100644
index d8e3cf3491..0000000000
--- a/yt/yt/library/containers/public.h
+++ /dev/null
@@ -1,163 +0,0 @@
-#pragma once
-
-#include <yt/yt/core/misc/public.h>
-
-#include <library/cpp/porto/proto/rpc.pb.h>
-#include <library/cpp/yt/misc/enum.h>
-
-namespace NYT::NContainers {
-
-////////////////////////////////////////////////////////////////////////////////
-
-const int PortoErrorCodeBase = 12000;
-
-DEFINE_ENUM(EPortoErrorCode,
- ((Success) ((PortoErrorCodeBase + Porto::EError::Success)))
- ((Unknown) ((PortoErrorCodeBase + Porto::EError::Unknown)))
- ((InvalidMethod) ((PortoErrorCodeBase + Porto::EError::InvalidMethod)))
- ((ContainerAlreadyExists) ((PortoErrorCodeBase + Porto::EError::ContainerAlreadyExists)))
- ((ContainerDoesNotExist) ((PortoErrorCodeBase + Porto::EError::ContainerDoesNotExist)))
- ((InvalidProperty) ((PortoErrorCodeBase + Porto::EError::InvalidProperty)))
- ((InvalidData) ((PortoErrorCodeBase + Porto::EError::InvalidData)))
- ((InvalidValue) ((PortoErrorCodeBase + Porto::EError::InvalidValue)))
- ((InvalidState) ((PortoErrorCodeBase + Porto::EError::InvalidState)))
- ((NotSupported) ((PortoErrorCodeBase + Porto::EError::NotSupported)))
- ((ResourceNotAvailable) ((PortoErrorCodeBase + Porto::EError::ResourceNotAvailable)))
- ((Permission) ((PortoErrorCodeBase + Porto::EError::Permission)))
- ((VolumeAlreadyExists) ((PortoErrorCodeBase + Porto::EError::VolumeAlreadyExists)))
- ((VolumeNotFound) ((PortoErrorCodeBase + Porto::EError::VolumeNotFound)))
- ((NoSpace) ((PortoErrorCodeBase + Porto::EError::NoSpace)))
- ((Busy) ((PortoErrorCodeBase + Porto::EError::Busy)))
- ((VolumeAlreadyLinked) ((PortoErrorCodeBase + Porto::EError::VolumeAlreadyLinked)))
- ((VolumeNotLinked) ((PortoErrorCodeBase + Porto::EError::VolumeNotLinked)))
- ((LayerAlreadyExists) ((PortoErrorCodeBase + Porto::EError::LayerAlreadyExists)))
- ((LayerNotFound) ((PortoErrorCodeBase + Porto::EError::LayerNotFound)))
- ((NoValue) ((PortoErrorCodeBase + Porto::EError::NoValue)))
- ((VolumeNotReady) ((PortoErrorCodeBase + Porto::EError::VolumeNotReady)))
- ((InvalidCommand) ((PortoErrorCodeBase + Porto::EError::InvalidCommand)))
- ((LostError) ((PortoErrorCodeBase + Porto::EError::LostError)))
- ((DeviceNotFound) ((PortoErrorCodeBase + Porto::EError::DeviceNotFound)))
- ((InvalidPath) ((PortoErrorCodeBase + Porto::EError::InvalidPath)))
- ((InvalidNetworkAddress) ((PortoErrorCodeBase + Porto::EError::InvalidNetworkAddress)))
- ((PortoFrozen) ((PortoErrorCodeBase + Porto::EError::PortoFrozen)))
- ((LabelNotFound) ((PortoErrorCodeBase + Porto::EError::LabelNotFound)))
- ((InvalidLabel) ((PortoErrorCodeBase + Porto::EError::InvalidLabel)))
- ((NotFound) ((PortoErrorCodeBase + Porto::EError::NotFound)))
- ((SocketError) ((PortoErrorCodeBase + Porto::EError::SocketError)))
- ((SocketUnavailable) ((PortoErrorCodeBase + Porto::EError::SocketUnavailable)))
- ((SocketTimeout) ((PortoErrorCodeBase + Porto::EError::SocketTimeout)))
- ((Taint) ((PortoErrorCodeBase + Porto::EError::Taint)))
- ((Queued) ((PortoErrorCodeBase + Porto::EError::Queued)))
-);
-
-////////////////////////////////////////////////////////////////////////////////
-
-YT_DEFINE_ERROR_ENUM(
- ((FailedToStartContainer) (14000))
-);
-
-DEFINE_ENUM(EStatField,
- // CPU
- (CpuUsage)
- (CpuUserUsage)
- (CpuSystemUsage)
- (CpuWait)
- (CpuThrottled)
- (ContextSwitches)
- (ContextSwitchesDelta)
- (ThreadCount)
- (CpuLimit)
- (CpuGuarantee)
-
- // Memory
- (Rss)
- (MappedFile)
- (MajorPageFaults)
- (MinorPageFaults)
- (FileCacheUsage)
- (AnonMemoryUsage)
- (AnonMemoryLimit)
- (MemoryUsage)
- (MemoryGuarantee)
- (MemoryLimit)
- (MaxMemoryUsage)
- (OomKills)
- (OomKillsTotal)
-
- // IO
- (IOReadByte)
- (IOWriteByte)
- (IOBytesLimit)
- (IOReadOps)
- (IOWriteOps)
- (IOOps)
- (IOOpsLimit)
- (IOTotalTime)
- (IOWaitTime)
-
- // Network
- (NetTxBytes)
- (NetTxPackets)
- (NetTxDrops)
- (NetTxLimit)
- (NetRxBytes)
- (NetRxPackets)
- (NetRxDrops)
- (NetRxLimit)
-);
-
-DEFINE_ENUM(EEnablePorto,
- (None)
- (Isolate)
- (Full)
-);
-
-struct TBind
-{
- TString SourcePath;
- TString TargetPath;
- bool ReadOnly;
-};
-
-struct TRootFS
-{
- TString RootPath;
- bool IsRootReadOnly;
- std::vector<TBind> Binds;
-};
-
-struct TDevice
-{
- TString DeviceName;
- bool Enabled;
-};
-
-struct TInstanceLimits
-{
- double Cpu = 0;
- i64 Memory = 0;
- std::optional<i64> NetTx;
- std::optional<i64> NetRx;
-
- bool operator==(const TInstanceLimits&) const = default;
-};
-
-DECLARE_REFCOUNTED_STRUCT(IContainerManager)
-DECLARE_REFCOUNTED_STRUCT(IInstanceLauncher)
-DECLARE_REFCOUNTED_STRUCT(IInstance)
-DECLARE_REFCOUNTED_STRUCT(IPortoExecutor)
-
-DECLARE_REFCOUNTED_CLASS(TPortoHealthChecker)
-DECLARE_REFCOUNTED_CLASS(TInstanceLimitsTracker)
-DECLARE_REFCOUNTED_CLASS(TPortoProcess)
-DECLARE_REFCOUNTED_CLASS(TPortoResourceTracker)
-DECLARE_REFCOUNTED_CLASS(TPortoExecutorDynamicConfig)
-DECLARE_REFCOUNTED_CLASS(TPodSpecConfig)
-
-////////////////////////////////////////////////////////////////////////////////
-
-bool IsValidCGroupType(const TString& type);
-
-////////////////////////////////////////////////////////////////////////////////
-
-} // namespace NYT::NContainers
diff --git a/yt/yt/library/containers/unittests/containers_ut.cpp b/yt/yt/library/containers/unittests/containers_ut.cpp
deleted file mode 100644
index 4f1c10a435..0000000000
--- a/yt/yt/library/containers/unittests/containers_ut.cpp
+++ /dev/null
@@ -1,133 +0,0 @@
-#include <yt/yt/core/test_framework/framework.h>
-
-#ifdef _linux_
-
-#include <yt/yt/library/containers/config.h>
-#include <yt/yt/library/containers/porto_executor.h>
-#include <yt/yt/library/containers/instance.h>
-
-#include <util/system/platform.h>
-#include <util/system/env.h>
-
-namespace NYT::NContainers {
-namespace {
-
-using namespace NConcurrency;
-
-////////////////////////////////////////////////////////////////////////////////
-
-class TContainersTest
- : public ::testing::Test
-{
- void SetUp() override
- {
- if (GetEnv("SKIP_PORTO_TESTS") != "") {
- GTEST_SKIP();
- }
- }
-};
-
-static TString GetUniqueName()
-{
- return "yt_ut_" + ToString(TGuid::Create());
-}
-
-IPortoExecutorPtr CreatePortoExecutor()
-{
- return CreatePortoExecutor(New<TPortoExecutorDynamicConfig>(), "default");
-}
-
-TEST_F(TContainersTest, ListSubcontainers)
-{
- auto executor = CreatePortoExecutor();
- auto name = GetUniqueName();
-
- WaitFor(executor->CreateContainer(name))
- .ThrowOnError();
-
- auto absoluteName = *WaitFor(executor->GetContainerProperty(name, "absolute_name"))
- .ValueOrThrow();
-
- auto nestedName = absoluteName + "/nested";
- WaitFor(executor->CreateContainer(nestedName))
- .ThrowOnError();
-
- auto withRoot = WaitFor(executor->ListSubcontainers(name, true))
- .ValueOrThrow();
- EXPECT_EQ(std::vector<TString>({absoluteName, nestedName}), withRoot);
-
- auto withoutRoot = WaitFor(executor->ListSubcontainers(name, false))
- .ValueOrThrow();
- EXPECT_EQ(std::vector<TString>({nestedName}), withoutRoot);
-
- WaitFor(executor->DestroyContainer(absoluteName))
- .ThrowOnError();
-}
-
-// See https://st.yandex-team.ru/PORTO-846.
-TEST_F(TContainersTest, DISABLED_WaitContainer)
-{
- auto executor = CreatePortoExecutor();
- auto name = GetUniqueName();
-
- WaitFor(executor->CreateContainer(name))
- .ThrowOnError();
-
- WaitFor(executor->SetContainerProperty(name, "command", "sleep 10"))
- .ThrowOnError();
-
- WaitFor(executor->StartContainer(name))
- .ThrowOnError();
-
- auto exitCode = WaitFor(executor->WaitContainer(name))
- .ValueOrThrow();
-
- EXPECT_EQ(0, exitCode);
-
- WaitFor(executor->DestroyContainer(name))
- .ThrowOnError();
-}
-
-TEST_F(TContainersTest, CreateFromSpec)
-{
- auto executor = CreatePortoExecutor();
- auto name = GetUniqueName();
-
- auto spec = TRunnableContainerSpec {
- .Name = name,
- .Command = "sleep 2",
- };
-
- WaitFor(executor->CreateContainer(spec, /*start*/ true))
- .ThrowOnError();
-
- auto exitCode = WaitFor(executor->PollContainer(name))
- .ValueOrThrow();
-
- EXPECT_EQ(0, exitCode);
-
- WaitFor(executor->DestroyContainer(name))
- .ThrowOnError();
-}
-
-TEST_F(TContainersTest, ListPids)
-{
- auto launcher = CreatePortoInstanceLauncher(
- GetUniqueName(),
- CreatePortoExecutor());
-
- auto instance = WaitFor(launcher->Launch("sleep", {"5"}, {}))
- .ValueOrThrow();
-
- auto pids = instance->GetPids();
- EXPECT_LT(0u, pids.size());
-
- instance->Destroy();
-}
-
-////////////////////////////////////////////////////////////////////////////////
-
-} // namespace
-} // namespace NYT::NContainers
-
-#endif
diff --git a/yt/yt/library/containers/unittests/porto_resource_tracker_ut.cpp b/yt/yt/library/containers/unittests/porto_resource_tracker_ut.cpp
deleted file mode 100644
index 04d169ba4e..0000000000
--- a/yt/yt/library/containers/unittests/porto_resource_tracker_ut.cpp
+++ /dev/null
@@ -1,251 +0,0 @@
-#include <yt/yt/core/test_framework/framework.h>
-
-#include <yt/yt/core/ytree/convert.h>
-
-#include <util/system/fs.h>
-#include <util/system/tempfile.h>
-
-#include <yt/yt/library/profiling/producer.h>
-#include <yt/yt/library/containers/config.h>
-#include <yt/yt/library/containers/porto_executor.h>
-#include <yt/yt/library/containers/porto_resource_tracker.h>
-#include <yt/yt/library/containers/instance.h>
-
-#include <util/system/platform.h>
-#include <util/system/env.h>
-
-namespace NYT::NContainers {
-namespace {
-
-using namespace NConcurrency;
-
-////////////////////////////////////////////////////////////////////////////////
-
-static constexpr auto TestUpdatePeriod = TDuration::MilliSeconds(10);
-
-class TPortoTrackerTest
- : public ::testing::Test
-{
-public:
- IPortoExecutorPtr Executor;
-
- void SetUp() override
- {
- if (GetEnv("SKIP_PORTO_TESTS") != "") {
- GTEST_SKIP();
- }
-
- Executor = CreatePortoExecutor(New<TPortoExecutorDynamicConfig>(), "default");
- }
-};
-
-TString GetUniqueName()
-{
- return "yt_porto_ut_" + ToString(TGuid::Create());
-}
-
-TPortoResourceTrackerPtr CreateSumPortoTracker(IPortoExecutorPtr Executor, const TString& name)
-{
- return New<TPortoResourceTracker>(
- GetPortoInstance(Executor, name),
- TestUpdatePeriod,
- false);
-}
-
-TPortoResourceProfilerPtr CreateDeltaPortoProfiler(IPortoExecutorPtr executor, const TString& name)
-{
- auto instance = GetPortoInstance(executor, name);
- auto portoResourceTracker = New<TPortoResourceTracker>(
- instance,
- ResourceUsageUpdatePeriod,
- true,
- true
- );
-
- // Init metrics for delta tracker.
- portoResourceTracker->GetTotalStatistics();
-
- return LeakyRefCountedSingleton<TPortoResourceProfiler>(
- portoResourceTracker,
- New<TPodSpecConfig>(),
- TProfiler("/porto")
- .WithTag("porto_name", instance->GetName())
- .WithTag("container_category", "yt_daemon"));
-}
-
-void AssertGauges(const std::vector<std::tuple<TString, TTagList, double>>& gauges) {
- THashSet<TString> sensors{
- "/cpu/user",
- "/cpu/total",
- "/cpu/system",
- "/cpu/wait",
- "/cpu/throttled",
- "/cpu/guarantee",
- "/cpu/limit",
- "/cpu/thread_count",
- "/cpu/context_switches",
-
- "/memory/minor_page_faults",
- "/memory/major_page_faults",
- "/memory/file_cache_usage",
- "/memory/anon_usage",
- "/memory/anon_limit",
- "/memory/memory_usage",
- "/memory/memory_guarantee",
- "/memory/memory_limit",
-
- "/io/read_bytes",
- "/io/write_bytes",
- "/io/bytes_limit",
-
- "/io/read_ops",
- "/io/write_ops",
- "/io/ops",
- "/io/ops_limit",
- "/io/total",
-
- "/network/rx_bytes",
- "/network/rx_drops",
- "/network/rx_packets",
- "/network/rx_limit",
- "/network/tx_bytes",
- "/network/tx_drops",
- "/network/tx_packets",
- "/network/tx_limit"
- };
-
- THashSet<TString> mayBeEmpty{
- "/cpu/wait",
- "/cpu/throttled",
- "/cpu/guarantee",
- "/cpu/context_switches",
- "/memory/major_page_faults",
- "/memory/memory_guarantee",
- "/io/ops_limit",
- "/io/read_ops",
- "/io/write_ops",
- "/io/wait",
- "/io/bytes_limit",
- "/network/rx_bytes",
- "/network/rx_drops",
- "/network/rx_packets",
- "/network/rx_limit",
- "/network/tx_bytes",
- "/network/tx_drops",
- "/network/tx_packets",
- "/network/tx_limit"
- };
-
- for (const auto& [name, tags, value] : gauges) {
- EXPECT_TRUE(value >= 0 && sensors.find(name) || mayBeEmpty.find(name));
- }
-}
-
-TEST_F(TPortoTrackerTest, ValidateSummaryPortoTracker)
-{
- auto name = GetUniqueName();
-
- WaitFor(Executor->CreateContainer(
- TRunnableContainerSpec {
- .Name = name,
- .Command = "sleep .1",
- }, true))
- .ThrowOnError();
-
- auto tracker = CreateSumPortoTracker(Executor, name);
-
- auto firstStatistics = tracker->GetTotalStatistics();
-
- WaitFor(Executor->StopContainer(name))
- .ThrowOnError();
- WaitFor(Executor->SetContainerProperty(
- name,
- "command",
- "find /"))
- .ThrowOnError();
- WaitFor(Executor->StartContainer(name))
- .ThrowOnError();
- Sleep(TDuration::MilliSeconds(500));
-
- auto secondStatistics = tracker->GetTotalStatistics();
-
- WaitFor(Executor->DestroyContainer(name))
- .ThrowOnError();
-}
-
-TEST_F(TPortoTrackerTest, ValidateDeltaPortoTracker)
-{
- auto name = GetUniqueName();
-
- auto spec = TRunnableContainerSpec {
- .Name = name,
- .Command = "sleep .1",
- };
-
- WaitFor(Executor->CreateContainer(spec, true))
- .ThrowOnError();
-
- auto profiler = CreateDeltaPortoProfiler(Executor, name);
-
- WaitFor(Executor->StopContainer(name))
- .ThrowOnError();
- WaitFor(Executor->SetContainerProperty(
- name,
- "command",
- "find /"))
- .ThrowOnError();
- WaitFor(Executor->StartContainer(name))
- .ThrowOnError();
-
- Sleep(TDuration::MilliSeconds(500));
-
- auto buffer = New<TSensorBuffer>();
- profiler->CollectSensors(buffer.Get());
- AssertGauges(buffer->GetGauges());
-
- WaitFor(Executor->DestroyContainer(name))
- .ThrowOnError();
-}
-
-TEST_F(TPortoTrackerTest, ValidateDeltaRootPortoTracker)
-{
- auto name = GetUniqueName();
-
- auto spec = TRunnableContainerSpec {
- .Name = name,
- .Command = "sleep .1",
- };
-
- WaitFor(Executor->CreateContainer(spec, true))
- .ThrowOnError();
-
- auto profiler = CreateDeltaPortoProfiler(
- Executor,
- GetPortoInstance(
- Executor,
- *GetPortoInstance(Executor, name)->GetRootName())->GetName());
-
- WaitFor(Executor->StopContainer(name))
- .ThrowOnError();
- WaitFor(Executor->SetContainerProperty(
- name,
- "command",
- "find /"))
- .ThrowOnError();
- WaitFor(Executor->StartContainer(name))
- .ThrowOnError();
-
- Sleep(TDuration::MilliSeconds(500));
-
- auto buffer = New<TSensorBuffer>();
- profiler->CollectSensors(buffer.Get());
- AssertGauges(buffer->GetGauges());
-
- WaitFor(Executor->DestroyContainer(name))
- .ThrowOnError();
-}
-
-////////////////////////////////////////////////////////////////////////////////
-
-} // namespace
-} // namespace NYT::NContainers
diff --git a/yt/yt/library/containers/unittests/process_ut.cpp b/yt/yt/library/containers/unittests/process_ut.cpp
deleted file mode 100644
index b9c0d844f4..0000000000
--- a/yt/yt/library/containers/unittests/process_ut.cpp
+++ /dev/null
@@ -1,302 +0,0 @@
-#include <yt/yt/core/test_framework/framework.h>
-
-#ifdef _linux_
-
-#include <yt/yt/core/actions/bind.h>
-
-#include <yt/yt/core/concurrency/action_queue.h>
-#include <yt/yt/core/concurrency/delayed_executor.h>
-#include <yt/yt/core/concurrency/scheduler.h>
-
-#include <yt/yt/core/misc/guid.h>
-#include <yt/yt/core/misc/proc.h>
-
-#include <yt/yt/core/net/connection.h>
-
-#include <yt/yt/library/containers/process.h>
-
-#include <yt/yt/library/containers/config.h>
-#include <yt/yt/library/containers/porto_executor.h>
-#include <yt/yt/library/containers/instance.h>
-
-#include <util/system/platform.h>
-#include <util/system/env.h>
-
-namespace NYT::NContainers {
-namespace {
-
-using namespace NConcurrency;
-
-////////////////////////////////////////////////////////////////////////////////
-
-class TPortoProcessTest
- : public ::testing::Test
-{
- void SetUp() override
- {
- if (GetEnv("SKIP_PORTO_TESTS") != "") {
- GTEST_SKIP();
- }
- }
-};
-
-static TString GetUniqueName()
-{
- return "yt_ut_" + ToString(TGuid::Create());
-}
-
-IPortoExecutorPtr CreatePortoExecutor()
-{
- return CreatePortoExecutor(New<TPortoExecutorDynamicConfig>(), "default");
-}
-
-TEST_F(TPortoProcessTest, Basic)
-{
- auto launcher = CreatePortoInstanceLauncher(
- GetUniqueName(),
- CreatePortoExecutor());
- auto p = New<TPortoProcess>("/bin/ls", launcher, true);
- TFuture<void> finished;
- ASSERT_NO_THROW(finished = p->Spawn());
- ASSERT_TRUE(p->IsStarted());
- auto error = WaitFor(finished);
- EXPECT_TRUE(error.IsOK()) << ToString(error);
- EXPECT_TRUE(p->IsFinished());
- p->GetInstance()->Destroy();
-}
-
-TEST_F(TPortoProcessTest, RunFromPathEnv)
-{
- auto launcher = CreatePortoInstanceLauncher(
- GetUniqueName(),
- CreatePortoExecutor());
- auto p = New<TPortoProcess>("ls", launcher, true);
- TFuture<void> finished;
- ASSERT_NO_THROW(finished = p->Spawn());
- ASSERT_TRUE(p->IsStarted());
- auto error = WaitFor(finished);
- EXPECT_TRUE(error.IsOK()) << ToString(error);
- EXPECT_TRUE(p->IsFinished());
- p->GetInstance()->Destroy();
-}
-
-TEST_F(TPortoProcessTest, MultiBasic)
-{
- auto portoExecutor = CreatePortoExecutor();
- auto l1 = CreatePortoInstanceLauncher(GetUniqueName(), portoExecutor);
- auto l2 = CreatePortoInstanceLauncher(GetUniqueName(), portoExecutor);
- auto p1 = New<TPortoProcess>("/bin/ls", l1, true);
- auto p2 = New<TPortoProcess>("/bin/ls", l2, true);
- TFuture<void> f1;
- TFuture<void> f2;
- ASSERT_NO_THROW(f1 = p1->Spawn());
- ASSERT_NO_THROW(f2 = p2->Spawn());
- auto error = WaitFor((AllSucceeded(std::vector<TFuture<void>>{f1, f2})));
- EXPECT_TRUE(error.IsOK()) << ToString(error);
- EXPECT_TRUE(p1->IsFinished());
- EXPECT_TRUE(p2->IsFinished());
- p1->GetInstance()->Destroy();
- p2->GetInstance()->Destroy();
-}
-
-TEST_F(TPortoProcessTest, InvalidPath)
-{
- auto portoExecutor = CreatePortoExecutor();
- auto launcher = CreatePortoInstanceLauncher(
- GetUniqueName(),
- portoExecutor);
- auto p = New<TPortoProcess>("/some/bad/path/binary", launcher, true);
- TFuture<void> finished;
- ASSERT_NO_THROW(finished = p->Spawn());
- ASSERT_FALSE(p->IsStarted());
- auto error = WaitFor(finished);
- EXPECT_FALSE(p->IsFinished());
- EXPECT_FALSE(error.IsOK());
- WaitFor(portoExecutor->DestroyContainer(launcher->GetName()))
- .ThrowOnError();
-}
-
-TEST_F(TPortoProcessTest, StdOut)
-{
- auto launcher = CreatePortoInstanceLauncher(
- GetUniqueName(),
- CreatePortoExecutor());
- auto p = New<TPortoProcess>("/bin/date", launcher, true);
-
- auto outStream = p->GetStdOutReader();
- TFuture<void> finished;
- ASSERT_NO_THROW(finished = p->Spawn());
- ASSERT_TRUE(p->IsStarted());
- auto error = WaitFor(finished);
- EXPECT_TRUE(error.IsOK()) << ToString(error);
- EXPECT_TRUE(p->IsFinished());
-
- auto buffer = TSharedMutableRef::Allocate(4_KB, {.InitializeStorage = false});
- auto future = outStream->Read(buffer);
- TErrorOr<size_t> result = WaitFor(future);
- size_t sz = result.ValueOrThrow();
- EXPECT_TRUE(sz > 0);
- p->GetInstance()->Destroy();
-}
-
-TEST_F(TPortoProcessTest, GetCommandLine)
-{
- auto launcher = CreatePortoInstanceLauncher(
- GetUniqueName(),
- CreatePortoExecutor());
- auto p = New<TPortoProcess>("/bin/bash", launcher, true);
- EXPECT_EQ("/bin/bash", p->GetCommandLine());
- p->AddArgument("-c");
- EXPECT_EQ("/bin/bash -c", p->GetCommandLine());
- p->AddArgument("exit 0");
- EXPECT_EQ("/bin/bash -c \"exit 0\"", p->GetCommandLine());
-}
-
-TEST_F(TPortoProcessTest, ProcessReturnCode0)
-{
- auto launcher = CreatePortoInstanceLauncher(
- GetUniqueName(),
- CreatePortoExecutor());
- auto p = New<TPortoProcess>("/bin/bash", launcher, true);
- p->AddArgument("-c");
- p->AddArgument("exit 0");
-
- TFuture<void> finished;
- ASSERT_NO_THROW(finished = p->Spawn());
- ASSERT_TRUE(p->IsStarted());
- auto error = WaitFor(finished);
- EXPECT_TRUE(error.IsOK()) << ToString(error);
- EXPECT_TRUE(p->IsFinished());
- p->GetInstance()->Destroy();
-}
-
-TEST_F(TPortoProcessTest, ProcessReturnCode123)
-{
- auto launcher = CreatePortoInstanceLauncher(
- GetUniqueName(),
- CreatePortoExecutor());
- auto p = New<TPortoProcess>("/bin/bash", launcher, true);
- p->AddArgument("-c");
- p->AddArgument("exit 123");
-
- TFuture<void> finished;
- ASSERT_NO_THROW(finished = p->Spawn());
- ASSERT_TRUE(p->IsStarted());
- auto error = WaitFor(finished);
- EXPECT_EQ(EProcessErrorCode::NonZeroExitCode, error.GetCode());
- EXPECT_EQ(123, error.Attributes().Get<int>("exit_code"));
- EXPECT_TRUE(p->IsFinished());
- p->GetInstance()->Destroy();
-}
-
-TEST_F(TPortoProcessTest, Params1)
-{
- auto launcher = CreatePortoInstanceLauncher(
- GetUniqueName(),
- CreatePortoExecutor());
- auto p = New<TPortoProcess>("/bin/bash", launcher, true);
- p->AddArgument("-c");
- p->AddArgument("if test 3 -gt 1; then exit 7; fi");
-
- auto error = WaitFor(p->Spawn());
- EXPECT_FALSE(error.IsOK());
- EXPECT_TRUE(p->IsFinished());
- p->GetInstance()->Destroy();
-}
-
-TEST_F(TPortoProcessTest, Params2)
-{
- auto launcher = CreatePortoInstanceLauncher(
- GetUniqueName(),
- CreatePortoExecutor());
- auto p = New<TPortoProcess>("/bin/bash", launcher, true);
- p->AddArgument("-c");
- p->AddArgument("if test 1 -gt 3; then exit 7; fi");
-
- auto error = WaitFor(p->Spawn());
- EXPECT_TRUE(error.IsOK()) << ToString(error);
- EXPECT_TRUE(p->IsFinished());
- p->GetInstance()->Destroy();
-}
-
-TEST_F(TPortoProcessTest, InheritEnvironment)
-{
- const char* name = "SPAWN_TEST_ENV_VAR";
- const char* value = "42";
- setenv(name, value, 1);
-
- auto launcher = CreatePortoInstanceLauncher(
- GetUniqueName(),
- CreatePortoExecutor());
- auto p = New<TPortoProcess>("/bin/bash", launcher, true);
- p->AddArgument("-c");
- p->AddArgument("if test $SPAWN_TEST_ENV_VAR = 42; then exit 7; fi");
-
- auto error = WaitFor(p->Spawn());
- EXPECT_FALSE(error.IsOK());
- EXPECT_TRUE(p->IsFinished());
-
- unsetenv(name);
- p->GetInstance()->Destroy();
-}
-
-TEST_F(TPortoProcessTest, Kill)
-{
- auto launcher = CreatePortoInstanceLauncher(
- GetUniqueName(),
- CreatePortoExecutor());
- auto p = New<TPortoProcess>("/bin/sleep", launcher, true);
- p->AddArgument("5");
-
- auto finished = p->Spawn();
-
- NConcurrency::TDelayedExecutor::Submit(
- BIND([&] () {
- p->Kill(SIGKILL);
- }),
- TDuration::MilliSeconds(100));
-
- auto error = WaitFor(finished);
- EXPECT_FALSE(error.IsOK()) << ToString(error);
- EXPECT_TRUE(p->IsFinished());
- p->GetInstance()->Destroy();
-}
-
-TEST_F(TPortoProcessTest, KillFinished)
-{
- auto launcher = CreatePortoInstanceLauncher(
- GetUniqueName(),
- CreatePortoExecutor());
- auto p = New<TPortoProcess>("/bin/bash", launcher, true);
- p->AddArgument("-c");
- p->AddArgument("true");
-
- auto finished = p->Spawn();
-
- auto error = WaitFor(finished);
- EXPECT_TRUE(error.IsOK());
-
- p->Kill(SIGKILL);
- p->GetInstance()->Destroy();
-}
-
-TEST_F(TPortoProcessTest, PollDuration)
-{
- auto launcher = CreatePortoInstanceLauncher(
- GetUniqueName(),
- CreatePortoExecutor());
- auto p = New<TPortoProcess>("/bin/sleep", launcher, true);
- p->AddArgument("1");
-
- auto error = WaitFor(p->Spawn());
- EXPECT_TRUE(error.IsOK()) << ToString(error);
- EXPECT_TRUE(p->IsFinished());
- p->GetInstance()->Destroy();
-}
-
-////////////////////////////////////////////////////////////////////////////////
-
-} // namespace
-} // namespace NYT::NContainers
-
-#endif
diff --git a/yt/yt/library/containers/unittests/ya.make b/yt/yt/library/containers/unittests/ya.make
deleted file mode 100644
index 42984e2dc7..0000000000
--- a/yt/yt/library/containers/unittests/ya.make
+++ /dev/null
@@ -1,35 +0,0 @@
-GTEST(unittester-containers)
-
-INCLUDE(${ARCADIA_ROOT}/yt/ya_cpp.make.inc)
-
-ALLOCATOR(TCMALLOC)
-
-IF (AUTOCHECK)
- ENV(SKIP_PORTO_TESTS=1)
-ENDIF()
-
-IF (DISTBUILD) # TODO(prime@): this is always on
- ENV(SKIP_PORTO_TESTS=1)
-ENDIF()
-
-SRCS(
- containers_ut.cpp
- process_ut.cpp
-)
-
-IF(OS_LINUX)
- SRCS(
- porto_resource_tracker_ut.cpp
- )
-ENDIF()
-
-INCLUDE(${ARCADIA_ROOT}/yt/opensource_tests.inc)
-
-PEERDIR(
- yt/yt/build
- yt/yt/library/containers
-)
-
-SIZE(MEDIUM)
-
-END()
diff --git a/yt/yt/library/containers/ya.make b/yt/yt/library/containers/ya.make
deleted file mode 100644
index 499b8d9da8..0000000000
--- a/yt/yt/library/containers/ya.make
+++ /dev/null
@@ -1,37 +0,0 @@
-LIBRARY()
-
-INCLUDE(${ARCADIA_ROOT}/yt/ya_cpp.make.inc)
-
-SRCS(
- cgroup.cpp
- config.cpp
- instance.cpp
- instance_limits_tracker.cpp
- process.cpp
- porto_executor.cpp
- porto_resource_tracker.cpp
- porto_health_checker.cpp
-)
-
-PEERDIR(
- library/cpp/porto/proto
- yt/yt/library/process
- yt/yt/core
-)
-
-IF(OS_LINUX)
- PEERDIR(
- library/cpp/porto
- )
-ENDIF()
-
-END()
-
-RECURSE(
- disk_manager
- cri
-)
-
-RECURSE_FOR_TESTS(
- unittests
-)
diff --git a/yt/yt/library/process/CMakeLists.darwin-x86_64.txt b/yt/yt/library/process/CMakeLists.darwin-x86_64.txt
deleted file mode 100644
index b66c679390..0000000000
--- a/yt/yt/library/process/CMakeLists.darwin-x86_64.txt
+++ /dev/null
@@ -1,26 +0,0 @@
-
-# This file was generated by the build system used internally in the Yandex monorepo.
-# Only simple modifications are allowed (adding source-files to targets, adding simple properties
-# like target_include_directories). These modifications will be ported to original
-# ya.make files by maintainers. Any complex modifications which can't be ported back to the
-# original buildsystem will not be accepted.
-
-
-
-add_library(yt-library-process)
-target_compile_options(yt-library-process PRIVATE
- -Wdeprecated-this-capture
-)
-target_link_libraries(yt-library-process PUBLIC
- contrib-libs-cxxsupp
- yutil
- yt-yt-core
- contrib-libs-re2
-)
-target_sources(yt-library-process PRIVATE
- ${CMAKE_SOURCE_DIR}/yt/yt/library/process/io_dispatcher.cpp
- ${CMAKE_SOURCE_DIR}/yt/yt/library/process/pipe.cpp
- ${CMAKE_SOURCE_DIR}/yt/yt/library/process/process.cpp
- ${CMAKE_SOURCE_DIR}/yt/yt/library/process/pty.cpp
- ${CMAKE_SOURCE_DIR}/yt/yt/library/process/subprocess.cpp
-)
diff --git a/yt/yt/library/process/CMakeLists.txt b/yt/yt/library/process/CMakeLists.txt
index f8b31df0c1..4d48dcdee6 100644
--- a/yt/yt/library/process/CMakeLists.txt
+++ b/yt/yt/library/process/CMakeLists.txt
@@ -8,10 +8,6 @@
if (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND NOT HAVE_CUDA)
include(CMakeLists.linux-aarch64.txt)
-elseif (CMAKE_SYSTEM_NAME STREQUAL "Darwin" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64")
- include(CMakeLists.darwin-x86_64.txt)
-elseif (WIN32 AND CMAKE_SYSTEM_PROCESSOR STREQUAL "AMD64" AND NOT HAVE_CUDA)
- include(CMakeLists.windows-x86_64.txt)
elseif (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND NOT HAVE_CUDA)
include(CMakeLists.linux-x86_64.txt)
endif()
diff --git a/yt/yt/library/process/CMakeLists.windows-x86_64.txt b/yt/yt/library/process/CMakeLists.windows-x86_64.txt
deleted file mode 100644
index 3637ee7dae..0000000000
--- a/yt/yt/library/process/CMakeLists.windows-x86_64.txt
+++ /dev/null
@@ -1,23 +0,0 @@
-
-# This file was generated by the build system used internally in the Yandex monorepo.
-# Only simple modifications are allowed (adding source-files to targets, adding simple properties
-# like target_include_directories). These modifications will be ported to original
-# ya.make files by maintainers. Any complex modifications which can't be ported back to the
-# original buildsystem will not be accepted.
-
-
-
-add_library(yt-library-process)
-target_link_libraries(yt-library-process PUBLIC
- contrib-libs-cxxsupp
- yutil
- yt-yt-core
- contrib-libs-re2
-)
-target_sources(yt-library-process PRIVATE
- ${CMAKE_SOURCE_DIR}/yt/yt/library/process/io_dispatcher.cpp
- ${CMAKE_SOURCE_DIR}/yt/yt/library/process/pipe.cpp
- ${CMAKE_SOURCE_DIR}/yt/yt/library/process/process.cpp
- ${CMAKE_SOURCE_DIR}/yt/yt/library/process/pty.cpp
- ${CMAKE_SOURCE_DIR}/yt/yt/library/process/subprocess.cpp
-)
diff --git a/yt/yt/library/program/CMakeLists.darwin-x86_64.txt b/yt/yt/library/program/CMakeLists.darwin-x86_64.txt
index fc3e796e79..e4ea489293 100644
--- a/yt/yt/library/program/CMakeLists.darwin-x86_64.txt
+++ b/yt/yt/library/program/CMakeLists.darwin-x86_64.txt
@@ -17,7 +17,6 @@ target_link_libraries(yt-library-program PUBLIC
yt-yt-core
core-service_discovery-yp
yt-library-monitoring
- yt-library-containers
library-profiling-solomon
library-profiling-tcmalloc
library-profiling-perf
diff --git a/yt/yt/library/program/CMakeLists.linux-aarch64.txt b/yt/yt/library/program/CMakeLists.linux-aarch64.txt
index be31ba10cf..7c15fb8b7d 100644
--- a/yt/yt/library/program/CMakeLists.linux-aarch64.txt
+++ b/yt/yt/library/program/CMakeLists.linux-aarch64.txt
@@ -18,7 +18,6 @@ target_link_libraries(yt-library-program PUBLIC
yt-yt-core
core-service_discovery-yp
yt-library-monitoring
- yt-library-containers
library-profiling-solomon
library-profiling-tcmalloc
library-profiling-perf
diff --git a/yt/yt/library/program/CMakeLists.linux-x86_64.txt b/yt/yt/library/program/CMakeLists.linux-x86_64.txt
index be31ba10cf..7c15fb8b7d 100644
--- a/yt/yt/library/program/CMakeLists.linux-x86_64.txt
+++ b/yt/yt/library/program/CMakeLists.linux-x86_64.txt
@@ -18,7 +18,6 @@ target_link_libraries(yt-library-program PUBLIC
yt-yt-core
core-service_discovery-yp
yt-library-monitoring
- yt-library-containers
library-profiling-solomon
library-profiling-tcmalloc
library-profiling-perf
diff --git a/yt/yt/library/program/CMakeLists.windows-x86_64.txt b/yt/yt/library/program/CMakeLists.windows-x86_64.txt
index 1f2aea4bd0..7f37407e9d 100644
--- a/yt/yt/library/program/CMakeLists.windows-x86_64.txt
+++ b/yt/yt/library/program/CMakeLists.windows-x86_64.txt
@@ -14,7 +14,6 @@ target_link_libraries(yt-library-program PUBLIC
yt-yt-core
core-service_discovery-yp
yt-library-monitoring
- yt-library-containers
library-profiling-solomon
library-profiling-tcmalloc
library-profiling-perf
diff --git a/yt/yt/library/program/config.cpp b/yt/yt/library/program/config.cpp
index 0705fb48fc..ccc7bb1f2f 100644
--- a/yt/yt/library/program/config.cpp
+++ b/yt/yt/library/program/config.cpp
@@ -103,12 +103,8 @@ void TSingletonsConfig::Register(TRegistrar registrar)
.Default(true);
registrar.Parameter("enable_resource_tracker", &TThis::EnableResourceTracker)
.Default(true);
- registrar.Parameter("enable_porto_resource_tracker", &TThis::EnablePortoResourceTracker)
- .Default(false);
registrar.Parameter("resource_tracker_vcpu_factor", &TThis::ResourceTrackerVCpuFactor)
.Optional();
- registrar.Parameter("pod_spec", &TThis::PodSpec)
- .DefaultNew();
registrar.Parameter("heap_profiler", &TThis::HeapProfiler)
.DefaultNew();
diff --git a/yt/yt/library/program/config.h b/yt/yt/library/program/config.h
index 7d92939f1c..88f649dd8e 100644
--- a/yt/yt/library/program/config.h
+++ b/yt/yt/library/program/config.h
@@ -22,8 +22,6 @@
#include <yt/yt/library/profiling/solomon/exporter.h>
-#include <yt/yt/library/containers/config.h>
-
#include <yt/yt/library/tracing/jaeger/tracer.h>
#include <library/cpp/yt/stockpile/stockpile.h>
@@ -149,9 +147,7 @@ public:
TStockpileConfigPtr Stockpile;
bool EnableRefCountedTrackerProfiling;
bool EnableResourceTracker;
- bool EnablePortoResourceTracker;
std::optional<double> ResourceTrackerVCpuFactor;
- NContainers::TPodSpecConfigPtr PodSpec;
THeapProfilerConfigPtr HeapProfiler;
REGISTER_YSON_STRUCT(TSingletonsConfig);
diff --git a/yt/yt/library/program/helpers.cpp b/yt/yt/library/program/helpers.cpp
index 5c7ff29db1..fc4a84d828 100644
--- a/yt/yt/library/program/helpers.cpp
+++ b/yt/yt/library/program/helpers.cpp
@@ -16,19 +16,14 @@
#include <yt/yt/library/profiling/resource_tracker/resource_tracker.h>
-#include <yt/yt/library/containers/config.h>
-#include <yt/yt/library/containers/porto_resource_tracker.h>
-
#include <yt/yt/core/logging/log_manager.h>
#include <yt/yt/core/concurrency/execution_stack.h>
#include <yt/yt/core/concurrency/periodic_executor.h>
-#include <yt/yt/core/concurrency/private.h>
#include <tcmalloc/malloc_extension.h>
#include <yt/yt/core/net/address.h>
-#include <yt/yt/core/net/local_address.h>
#include <yt/yt/core/rpc/dispatcher.h>
#include <yt/yt/core/rpc/grpc/dispatcher.h>
@@ -37,8 +32,6 @@
#include <yt/yt/core/threading/spin_wait_slow_path_logger.h>
-#include <library/cpp/yt/threading/spin_wait_hook.h>
-
#include <library/cpp/yt/memory/atomic_intrusive_ptr.h>
#include <util/string/split.h>
@@ -236,10 +229,6 @@ void ConfigureSingletonsImpl(const TConfig& config)
NProfiling::SetVCpuFactor(config->ResourceTrackerVCpuFactor.value());
}
}
-
- if (config->EnablePortoResourceTracker) {
- NContainers::EnablePortoResourceTracker(config->PodSpec);
- }
}
void ConfigureSingletons(const TSingletonsConfigPtr& config)
diff --git a/yt/yt/library/program/ya.make b/yt/yt/library/program/ya.make
index 5742ce9287..1d044b5a8d 100644
--- a/yt/yt/library/program/ya.make
+++ b/yt/yt/library/program/ya.make
@@ -16,7 +16,6 @@ PEERDIR(
yt/yt/core
yt/yt/core/service_discovery/yp
yt/yt/library/monitoring
- yt/yt/library/containers
yt/yt/library/profiling/solomon
yt/yt/library/profiling/tcmalloc
yt/yt/library/profiling/perf