diff options
author | robot-piglet <robot-piglet@yandex-team.com> | 2023-11-14 19:18:07 +0300 |
---|---|---|
committer | robot-piglet <robot-piglet@yandex-team.com> | 2023-11-14 20:20:53 +0300 |
commit | 874ef51d3d3edfa25f5a505ec6ab50e172965d1e (patch) | |
tree | 620fb5e02063d23509d3aa3df2215c099ccde0b7 /yt | |
parent | e356b34d3b0399e2f170881af15c91e4db9e3d11 (diff) | |
download | ydb-874ef51d3d3edfa25f5a505ec6ab50e172965d1e.tar.gz |
Intermediate changes
Diffstat (limited to 'yt')
69 files changed, 106 insertions, 7590 deletions
diff --git a/yt/yt/client/api/rpc_proxy/client_base.cpp b/yt/yt/client/api/rpc_proxy/client_base.cpp index 812b6cd843..17d6fbdf8e 100644 --- a/yt/yt/client/api/rpc_proxy/client_base.cpp +++ b/yt/yt/client/api/rpc_proxy/client_base.cpp @@ -911,11 +911,16 @@ TFuture<std::vector<TUnversionedLookupRowsResult>> TClientBase::MultiLookup( } template<class TRequest> -void FillRequestBySelectRowsOptionsBase(const TSelectRowsOptionsBase& options, TRequest request) +void FillRequestBySelectRowsOptionsBase( + const TSelectRowsOptionsBase& options, + const std::optional<NYPath::TYPath>& defaultUdfRegistryPath, + TRequest request) { request->set_timestamp(options.Timestamp); if (options.UdfRegistryPath) { request->set_udf_registry_path(*options.UdfRegistryPath); + } else if (defaultUdfRegistryPath) { + request->set_udf_registry_path(*options.UdfRegistryPath); } } @@ -929,11 +934,13 @@ TFuture<TSelectRowsResult> TClientBase::SelectRows( req->SetResponseHeavy(true); req->set_query(query); - FillRequestBySelectRowsOptionsBase(options, req); + const auto& config = GetRpcProxyConnection()->GetConfig(); + + FillRequestBySelectRowsOptionsBase(options, config->UdfRegistryPath, req); // TODO(ifsmirnov): retention timestamp in explain_query. req->set_retention_timestamp(options.RetentionTimestamp); // TODO(lukyan): Move to FillRequestBySelectRowsOptionsBase - req->SetTimeout(options.Timeout.value_or(GetRpcProxyConnection()->GetConfig()->DefaultSelectRowsTimeout)); + req->SetTimeout(options.Timeout.value_or(config->DefaultSelectRowsTimeout)); if (options.InputRowLimit) { req->set_input_row_limit(*options.InputRowLimit); @@ -981,7 +988,7 @@ TFuture<TYsonString> TClientBase::ExplainQuery( auto req = proxy.ExplainQuery(); req->set_query(query); - FillRequestBySelectRowsOptionsBase(options, req); + FillRequestBySelectRowsOptionsBase(options, GetRpcProxyConnection()->GetConfig()->UdfRegistryPath, req); return req->Invoke().Apply(BIND([] (const TApiServiceProxy::TRspExplainQueryPtr& rsp) { return TYsonString(rsp->value()); diff --git a/yt/yt/client/api/rpc_proxy/config.cpp b/yt/yt/client/api/rpc_proxy/config.cpp index 7df4d7239f..ced3029902 100644 --- a/yt/yt/client/api/rpc_proxy/config.cpp +++ b/yt/yt/client/api/rpc_proxy/config.cpp @@ -107,6 +107,9 @@ void TConnectionConfig::Register(TRegistrar registrar) registrar.Parameter("clock_cluster_tag", &TThis::ClockClusterTag) .Default(NObjectClient::InvalidCellTag); + registrar.Parameter("udf_registry_path", &TThis::UdfRegistryPath) + .Optional(); + registrar.Postprocessor([] (TThis* config) { if (!config->ProxyEndpoints && !config->ClusterUrl && !config->ProxyAddresses && !config->ProxyUnixDomainSocket) { THROW_ERROR_EXCEPTION("Either \"endpoints\" or \"cluster_url\" or \"proxy_addresses\" or \"proxy_unix_domain_socket\" must be specified"); diff --git a/yt/yt/client/api/rpc_proxy/config.h b/yt/yt/client/api/rpc_proxy/config.h index 244f5cd1d4..3f3de07185 100644 --- a/yt/yt/client/api/rpc_proxy/config.h +++ b/yt/yt/client/api/rpc_proxy/config.h @@ -70,6 +70,9 @@ public: NObjectClient::TCellTag ClockClusterTag; + //! Path in Cypress with UDFs. + std::optional<NYPath::TYPath> UdfRegistryPath; + REGISTER_YSON_STRUCT(TConnectionConfig); static void Register(TRegistrar registrar); diff --git a/yt/yt/client/election/public.h b/yt/yt/client/election/public.h index b48fc712db..7b79682b48 100644 --- a/yt/yt/client/election/public.h +++ b/yt/yt/client/election/public.h @@ -12,8 +12,7 @@ namespace NYT::NElection { using TEpochId = TGuid; using TPeerPriority = std::pair<i64, i64>; -using TPeerId = int; -constexpr TPeerId InvalidPeerId = -1; +constexpr int InvalidPeerId = -1; using TCellId = TGuid; extern const TCellId NullCellId; diff --git a/yt/yt/client/hydra/public.h b/yt/yt/client/hydra/public.h index 5f1e63206b..c2bdbf94cd 100644 --- a/yt/yt/client/hydra/public.h +++ b/yt/yt/client/hydra/public.h @@ -52,7 +52,6 @@ struct TElectionPriority; using NElection::TCellId; using NElection::NullCellId; -using NElection::TPeerId; using NElection::InvalidPeerId; using NElection::TPeerPriority; using NElection::TEpochId; diff --git a/yt/yt/core/yson/protobuf_interop.cpp b/yt/yt/core/yson/protobuf_interop.cpp index 6df2d4854d..6c14f8db3c 100644 --- a/yt/yt/core/yson/protobuf_interop.cpp +++ b/yt/yt/core/yson/protobuf_interop.cpp @@ -69,6 +69,12 @@ static constexpr int ProtobufMapValueFieldNumber = 2; namespace { +//////////////////////////////////////////////////////////////////////////////// + +const NLogging::TLogger Logger("ProtobufInterop"); + +//////////////////////////////////////////////////////////////////////////////// + bool IsSignedIntegralType(FieldDescriptor::Type type) { switch (type) { @@ -850,6 +856,30 @@ protected: } } } + + void ValidateString(TStringBuf data, EUtf8Check checkOption, TString fieldFullName) + { + if (checkOption == EUtf8Check::None || IsUtf(data)) { + return; + } + switch (checkOption) { + case EUtf8Check::None: + break; + case EUtf8Check::Log: + YT_LOG_WARNING("Field %v accepts only valid UTF-8 sequence, but got (%Qv)", + YPathStack_.GetHumanReadablePath(), + data); + break; + case EUtf8Check::Throw: + THROW_ERROR_EXCEPTION("Field %v accepts only valid UTF-8 sequence, but got (%Qv)", + YPathStack_.GetHumanReadablePath(), + data) + << TErrorAttribute("ypath", YPathStack_.GetPath()) + << TErrorAttribute("proto_field", fieldFullName); + break; + } + } + }; //////////////////////////////////////////////////////////////////////////////// @@ -954,12 +984,7 @@ private: const auto* field = FieldStack_.back().Field; switch (field->GetType()) { case FieldDescriptor::TYPE_STRING: - if (Options_.CheckUtf8 && !IsUtf(value)) { - THROW_ERROR_EXCEPTION("Field %v accepts only valid UTF-8 sequence", - YPathStack_.GetHumanReadablePath()) - << TErrorAttribute("ypath", YPathStack_.GetPath()) - << TErrorAttribute("proto_field", field->GetFullName()); - } + ValidateString(value, Options_.CheckUtf8, field->GetFullName()); case FieldDescriptor::TYPE_BYTES: BodyCodedStream_.WriteVarint64(value.length()); BodyCodedStream_.WriteRaw(value.begin(), static_cast<int>(value.length())); @@ -2475,11 +2500,8 @@ private: << TErrorAttribute("proto_field", field->GetFullName()); } TStringBuf data(PooledString_.data(), length); - if (Options_.CheckUtf8 && field->GetType() == FieldDescriptor::TYPE_STRING && !IsUtf(data)) { - THROW_ERROR_EXCEPTION("Field %v expected to contain valid UTF-8 sequence", - YPathStack_.GetHumanReadablePath()) - << TErrorAttribute("ypath", YPathStack_.GetPath()) - << TErrorAttribute("proto_field", field->GetFullName()); + if (field->GetType() == FieldDescriptor::TYPE_STRING) { + ValidateString(data, Options_.CheckUtf8, field->GetFullName()); } ParseScalar([&] { if (field->GetBytesFieldConverter()) { diff --git a/yt/yt/core/yson/protobuf_interop.h b/yt/yt/core/yson/protobuf_interop.h index 48992052eb..3591ef4dab 100644 --- a/yt/yt/core/yson/protobuf_interop.h +++ b/yt/yt/core/yson/protobuf_interop.h @@ -106,11 +106,6 @@ struct TProtobufElementResolveResult TStringBuf TailPath; }; -struct TResolveProtobufElementByYPathOptions -{ - bool AllowUnknownYsonFields = false; -}; - //! Introspects a given #rootType and locates an element (represented //! by TProtobufElement discriminated union) at a given #path. //! Throws if some definite error occurs during resolve (i.e. a malformed @@ -139,20 +134,6 @@ std::unique_ptr<IYsonConsumer> CreateProtobufWriter( //////////////////////////////////////////////////////////////////////////////// -struct TProtobufParserOptions -{ - //! If |true| then fields with numbers not found in protobuf metadata are - //! silently skipped; otherwise an exception is thrown. - bool SkipUnknownFields = false; - - //! If |true| then required fields not found in protobuf metadata are - //! silently skipped; otherwise an exception is thrown. - bool SkipRequiredFields = false; - - // Check if |string| fields contain actual UTF-8 strings. - bool CheckUtf8 = false; -}; - //! Parses a byte sequence and translates it into IYsonConsumer calls. /*! * IMPORTANT! Due to performance reasons the implementation currently assumes diff --git a/yt/yt/core/yson/protobuf_interop_options.h b/yt/yt/core/yson/protobuf_interop_options.h index 254a1faf9f..62173ca268 100644 --- a/yt/yt/core/yson/protobuf_interop_options.h +++ b/yt/yt/core/yson/protobuf_interop_options.h @@ -11,6 +11,17 @@ namespace NYT::NYson { //////////////////////////////////////////////////////////////////////////////// +struct TResolveProtobufElementByYPathOptions +{ + bool AllowUnknownYsonFields = false; +}; + +DEFINE_ENUM(EUtf8Check, + (None) + (Log) + (Throw) +); + struct TProtobufWriterOptions { //! Keep: all unknown fields found during YSON parsing @@ -33,7 +44,21 @@ struct TProtobufWriterOptions bool SkipRequiredFields = false; // Check if |string| fields contain actual UTF-8 strings. - bool CheckUtf8 = false; + EUtf8Check CheckUtf8 = EUtf8Check::None; +}; + +struct TProtobufParserOptions +{ + //! If |true| then fields with numbers not found in protobuf metadata are + //! silently skipped; otherwise an exception is thrown. + bool SkipUnknownFields = false; + + //! If |true| then required fields not found in protobuf metadata are + //! silently skipped; otherwise an exception is thrown. + bool SkipRequiredFields = false; + + // Check if |string| fields contain actual UTF-8 strings. + EUtf8Check CheckUtf8 = EUtf8Check::None; }; //////////////////////////////////////////////////////////////////////////////// diff --git a/yt/yt/core/yson/unittests/protobuf_yson_ut.cpp b/yt/yt/core/yson/unittests/protobuf_yson_ut.cpp index 2719f939a2..111d9cd19e 100644 --- a/yt/yt/core/yson/unittests/protobuf_yson_ut.cpp +++ b/yt/yt/core/yson/unittests/protobuf_yson_ut.cpp @@ -1017,30 +1017,38 @@ TEST(TYsonToProtobufTest, Entities) TEST(TYsonToProtobufTest, ValidUtf8StringCheck) { - TProtobufWriterOptions options{ - .CheckUtf8 = true, - }; - - TString invalidUtf8 = "\xc3\x28"; - - auto check = [&] { - TEST_PROLOGUE_WITH_OPTIONS(TMessage, options) - .BeginMap() - .Item("string_field").Value(invalidUtf8) - .EndMap(); - }; + for (auto option: {EUtf8Check::None, EUtf8Check::Log, EUtf8Check::Throw}) { + TProtobufWriterOptions options{ + .CheckUtf8 = option, + }; - EXPECT_THROW_WITH_SUBSTRING(check(), "valid UTF-8"); + TString invalidUtf8 = "\xc3\x28"; - NProto::TMessage message; - message.set_string_field(invalidUtf8); - TString newYsonString; - TStringOutput newYsonOutputStream(newYsonString); - TYsonWriter ysonWriter(&newYsonOutputStream, EYsonFormat::Pretty); + auto check = [&] { + TEST_PROLOGUE_WITH_OPTIONS(TMessage, options) + .BeginMap() + .Item("string_field").Value(invalidUtf8) + .EndMap(); + }; + if (option == EUtf8Check::Throw) { + EXPECT_THROW_WITH_SUBSTRING(check(), "valid UTF-8"); + } else { + EXPECT_NO_THROW(check()); + } - EXPECT_THROW_WITH_SUBSTRING( - WriteProtobufMessage(&ysonWriter, message, TProtobufParserOptions{.CheckUtf8 = true}), - "valid UTF-8"); + NProto::TMessage message; + message.set_string_field(invalidUtf8); + TString newYsonString; + TStringOutput newYsonOutputStream(newYsonString); + TYsonWriter ysonWriter(&newYsonOutputStream, EYsonFormat::Pretty); + if (option == EUtf8Check::Throw) { + EXPECT_THROW_WITH_SUBSTRING( + WriteProtobufMessage(&ysonWriter, message, TProtobufParserOptions{.CheckUtf8 = option}), + "valid UTF-8"); + } else { + EXPECT_NO_THROW(WriteProtobufMessage(&ysonWriter, message, TProtobufParserOptions{.CheckUtf8 = option})); + } + } } TEST(TYsonToProtobufTest, CustomUnknownFieldsModeResolver) diff --git a/yt/yt/library/CMakeLists.darwin-x86_64.txt b/yt/yt/library/CMakeLists.darwin-x86_64.txt index 2debbb7626..a85455d5cb 100644 --- a/yt/yt/library/CMakeLists.darwin-x86_64.txt +++ b/yt/yt/library/CMakeLists.darwin-x86_64.txt @@ -7,12 +7,10 @@ add_subdirectory(auth) -add_subdirectory(containers) add_subdirectory(decimal) add_subdirectory(erasure) add_subdirectory(monitoring) add_subdirectory(numeric) -add_subdirectory(process) add_subdirectory(profiling) add_subdirectory(program) add_subdirectory(quantile_digest) diff --git a/yt/yt/library/CMakeLists.linux-aarch64.txt b/yt/yt/library/CMakeLists.linux-aarch64.txt index 524ffcf525..07cc57b53c 100644 --- a/yt/yt/library/CMakeLists.linux-aarch64.txt +++ b/yt/yt/library/CMakeLists.linux-aarch64.txt @@ -8,7 +8,6 @@ add_subdirectory(auth) add_subdirectory(backtrace_introspector) -add_subdirectory(containers) add_subdirectory(decimal) add_subdirectory(erasure) add_subdirectory(monitoring) diff --git a/yt/yt/library/CMakeLists.linux-x86_64.txt b/yt/yt/library/CMakeLists.linux-x86_64.txt index 524ffcf525..07cc57b53c 100644 --- a/yt/yt/library/CMakeLists.linux-x86_64.txt +++ b/yt/yt/library/CMakeLists.linux-x86_64.txt @@ -8,7 +8,6 @@ add_subdirectory(auth) add_subdirectory(backtrace_introspector) -add_subdirectory(containers) add_subdirectory(decimal) add_subdirectory(erasure) add_subdirectory(monitoring) diff --git a/yt/yt/library/CMakeLists.windows-x86_64.txt b/yt/yt/library/CMakeLists.windows-x86_64.txt index 4502da4e61..eaf7bb9e34 100644 --- a/yt/yt/library/CMakeLists.windows-x86_64.txt +++ b/yt/yt/library/CMakeLists.windows-x86_64.txt @@ -6,9 +6,7 @@ # original buildsystem will not be accepted. -add_subdirectory(containers) add_subdirectory(monitoring) -add_subdirectory(process) add_subdirectory(profiling) add_subdirectory(program) add_subdirectory(syncmap) diff --git a/yt/yt/library/containers/CMakeLists.darwin-x86_64.txt b/yt/yt/library/containers/CMakeLists.darwin-x86_64.txt deleted file mode 100644 index faab79bbf6..0000000000 --- a/yt/yt/library/containers/CMakeLists.darwin-x86_64.txt +++ /dev/null @@ -1,30 +0,0 @@ - -# This file was generated by the build system used internally in the Yandex monorepo. -# Only simple modifications are allowed (adding source-files to targets, adding simple properties -# like target_include_directories). These modifications will be ported to original -# ya.make files by maintainers. Any complex modifications which can't be ported back to the -# original buildsystem will not be accepted. - - - -add_library(yt-library-containers) -target_compile_options(yt-library-containers PRIVATE - -Wdeprecated-this-capture -) -target_link_libraries(yt-library-containers PUBLIC - contrib-libs-cxxsupp - yutil - cpp-porto-proto - yt-library-process - yt-yt-core -) -target_sources(yt-library-containers PRIVATE - ${CMAKE_SOURCE_DIR}/yt/yt/library/containers/cgroup.cpp - ${CMAKE_SOURCE_DIR}/yt/yt/library/containers/config.cpp - ${CMAKE_SOURCE_DIR}/yt/yt/library/containers/instance.cpp - ${CMAKE_SOURCE_DIR}/yt/yt/library/containers/instance_limits_tracker.cpp - ${CMAKE_SOURCE_DIR}/yt/yt/library/containers/process.cpp - ${CMAKE_SOURCE_DIR}/yt/yt/library/containers/porto_executor.cpp - ${CMAKE_SOURCE_DIR}/yt/yt/library/containers/porto_resource_tracker.cpp - ${CMAKE_SOURCE_DIR}/yt/yt/library/containers/porto_health_checker.cpp -) diff --git a/yt/yt/library/containers/CMakeLists.linux-aarch64.txt b/yt/yt/library/containers/CMakeLists.linux-aarch64.txt deleted file mode 100644 index d3ab3811e0..0000000000 --- a/yt/yt/library/containers/CMakeLists.linux-aarch64.txt +++ /dev/null @@ -1,32 +0,0 @@ - -# This file was generated by the build system used internally in the Yandex monorepo. -# Only simple modifications are allowed (adding source-files to targets, adding simple properties -# like target_include_directories). These modifications will be ported to original -# ya.make files by maintainers. Any complex modifications which can't be ported back to the -# original buildsystem will not be accepted. - - - -add_library(yt-library-containers) -target_compile_options(yt-library-containers PRIVATE - -Wdeprecated-this-capture -) -target_link_libraries(yt-library-containers PUBLIC - contrib-libs-linux-headers - contrib-libs-cxxsupp - yutil - cpp-porto-proto - yt-library-process - yt-yt-core - library-cpp-porto -) -target_sources(yt-library-containers PRIVATE - ${CMAKE_SOURCE_DIR}/yt/yt/library/containers/cgroup.cpp - ${CMAKE_SOURCE_DIR}/yt/yt/library/containers/config.cpp - ${CMAKE_SOURCE_DIR}/yt/yt/library/containers/instance.cpp - ${CMAKE_SOURCE_DIR}/yt/yt/library/containers/instance_limits_tracker.cpp - ${CMAKE_SOURCE_DIR}/yt/yt/library/containers/process.cpp - ${CMAKE_SOURCE_DIR}/yt/yt/library/containers/porto_executor.cpp - ${CMAKE_SOURCE_DIR}/yt/yt/library/containers/porto_resource_tracker.cpp - ${CMAKE_SOURCE_DIR}/yt/yt/library/containers/porto_health_checker.cpp -) diff --git a/yt/yt/library/containers/CMakeLists.linux-x86_64.txt b/yt/yt/library/containers/CMakeLists.linux-x86_64.txt deleted file mode 100644 index d3ab3811e0..0000000000 --- a/yt/yt/library/containers/CMakeLists.linux-x86_64.txt +++ /dev/null @@ -1,32 +0,0 @@ - -# This file was generated by the build system used internally in the Yandex monorepo. -# Only simple modifications are allowed (adding source-files to targets, adding simple properties -# like target_include_directories). These modifications will be ported to original -# ya.make files by maintainers. Any complex modifications which can't be ported back to the -# original buildsystem will not be accepted. - - - -add_library(yt-library-containers) -target_compile_options(yt-library-containers PRIVATE - -Wdeprecated-this-capture -) -target_link_libraries(yt-library-containers PUBLIC - contrib-libs-linux-headers - contrib-libs-cxxsupp - yutil - cpp-porto-proto - yt-library-process - yt-yt-core - library-cpp-porto -) -target_sources(yt-library-containers PRIVATE - ${CMAKE_SOURCE_DIR}/yt/yt/library/containers/cgroup.cpp - ${CMAKE_SOURCE_DIR}/yt/yt/library/containers/config.cpp - ${CMAKE_SOURCE_DIR}/yt/yt/library/containers/instance.cpp - ${CMAKE_SOURCE_DIR}/yt/yt/library/containers/instance_limits_tracker.cpp - ${CMAKE_SOURCE_DIR}/yt/yt/library/containers/process.cpp - ${CMAKE_SOURCE_DIR}/yt/yt/library/containers/porto_executor.cpp - ${CMAKE_SOURCE_DIR}/yt/yt/library/containers/porto_resource_tracker.cpp - ${CMAKE_SOURCE_DIR}/yt/yt/library/containers/porto_health_checker.cpp -) diff --git a/yt/yt/library/containers/CMakeLists.txt b/yt/yt/library/containers/CMakeLists.txt deleted file mode 100644 index f8b31df0c1..0000000000 --- a/yt/yt/library/containers/CMakeLists.txt +++ /dev/null @@ -1,17 +0,0 @@ - -# This file was generated by the build system used internally in the Yandex monorepo. -# Only simple modifications are allowed (adding source-files to targets, adding simple properties -# like target_include_directories). These modifications will be ported to original -# ya.make files by maintainers. Any complex modifications which can't be ported back to the -# original buildsystem will not be accepted. - - -if (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND NOT HAVE_CUDA) - include(CMakeLists.linux-aarch64.txt) -elseif (CMAKE_SYSTEM_NAME STREQUAL "Darwin" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64") - include(CMakeLists.darwin-x86_64.txt) -elseif (WIN32 AND CMAKE_SYSTEM_PROCESSOR STREQUAL "AMD64" AND NOT HAVE_CUDA) - include(CMakeLists.windows-x86_64.txt) -elseif (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND NOT HAVE_CUDA) - include(CMakeLists.linux-x86_64.txt) -endif() diff --git a/yt/yt/library/containers/CMakeLists.windows-x86_64.txt b/yt/yt/library/containers/CMakeLists.windows-x86_64.txt deleted file mode 100644 index 998e1690fa..0000000000 --- a/yt/yt/library/containers/CMakeLists.windows-x86_64.txt +++ /dev/null @@ -1,27 +0,0 @@ - -# This file was generated by the build system used internally in the Yandex monorepo. -# Only simple modifications are allowed (adding source-files to targets, adding simple properties -# like target_include_directories). These modifications will be ported to original -# ya.make files by maintainers. Any complex modifications which can't be ported back to the -# original buildsystem will not be accepted. - - - -add_library(yt-library-containers) -target_link_libraries(yt-library-containers PUBLIC - contrib-libs-cxxsupp - yutil - cpp-porto-proto - yt-library-process - yt-yt-core -) -target_sources(yt-library-containers PRIVATE - ${CMAKE_SOURCE_DIR}/yt/yt/library/containers/cgroup.cpp - ${CMAKE_SOURCE_DIR}/yt/yt/library/containers/config.cpp - ${CMAKE_SOURCE_DIR}/yt/yt/library/containers/instance.cpp - ${CMAKE_SOURCE_DIR}/yt/yt/library/containers/instance_limits_tracker.cpp - ${CMAKE_SOURCE_DIR}/yt/yt/library/containers/process.cpp - ${CMAKE_SOURCE_DIR}/yt/yt/library/containers/porto_executor.cpp - ${CMAKE_SOURCE_DIR}/yt/yt/library/containers/porto_resource_tracker.cpp - ${CMAKE_SOURCE_DIR}/yt/yt/library/containers/porto_health_checker.cpp -) diff --git a/yt/yt/library/containers/cgroup.cpp b/yt/yt/library/containers/cgroup.cpp deleted file mode 100644 index b43ab1e14b..0000000000 --- a/yt/yt/library/containers/cgroup.cpp +++ /dev/null @@ -1,752 +0,0 @@ -#include "cgroup.h" -#include "private.h" - -#include <yt/yt/core/misc/fs.h> -#include <yt/yt/core/misc/proc.h> - -#include <yt/yt/core/ytree/fluent.h> - -#include <util/string/split.h> -#include <util/system/filemap.h> - -#include <util/system/yield.h> - -#ifdef _linux_ - #include <unistd.h> - #include <sys/stat.h> - #include <errno.h> -#endif - -namespace NYT::NContainers { - -using namespace NYTree; - -//////////////////////////////////////////////////////////////////////////////// - -static const auto& Logger = ContainersLogger; -static const TString CGroupRootPath("/sys/fs/cgroup"); -#ifdef _linux_ -static const int ReadByAll = S_IRUSR | S_IRGRP | S_IROTH; -static const int ReadExecuteByAll = ReadByAll | S_IXUSR | S_IXGRP | S_IXOTH; -#endif - -//////////////////////////////////////////////////////////////////////////////// - -namespace { - -TString GetParentFor(const TString& type) -{ -#ifdef _linux_ - auto rawData = TUnbufferedFileInput("/proc/self/cgroup") - .ReadAll(); - auto result = ParseProcessCGroups(rawData); - return result[type]; -#else - Y_UNUSED(type); - return "_parent_"; -#endif -} - -#ifdef _linux_ - -std::vector<TString> ReadAllValues(const TString& fileName) -{ - auto raw = TUnbufferedFileInput(fileName) - .ReadAll(); - - YT_LOG_DEBUG("File %v contains %Qv", - fileName, - raw); - - TVector<TString> values; - StringSplitter(raw.data()) - .SplitBySet(" \n") - .SkipEmpty() - .Collect(&values); - return values; -} - -TDuration FromJiffies(ui64 jiffies) -{ - static const auto TicksPerSecond = sysconf(_SC_CLK_TCK); - return TDuration::MicroSeconds(1000 * 1000 * jiffies / TicksPerSecond); -} - -#endif - -} // namespace - -//////////////////////////////////////////////////////////////////////////////// - -void TKillProcessGroupTool::operator()(const TString& processGroupPath) const -{ - SafeSetUid(0); - TNonOwningCGroup group(processGroupPath); - group.Kill(); -} - -//////////////////////////////////////////////////////////////////////////////// - -TNonOwningCGroup::TNonOwningCGroup(const TString& fullPath) - : FullPath_(fullPath) -{ } - -TNonOwningCGroup::TNonOwningCGroup(const TString& type, const TString& name) - : FullPath_(NFS::CombinePaths({ - CGroupRootPath, - type, - GetParentFor(type), - name - })) -{ } - -TNonOwningCGroup::TNonOwningCGroup(TNonOwningCGroup&& other) - : FullPath_(std::move(other.FullPath_)) -{ } - -void TNonOwningCGroup::AddTask(int pid) const -{ - YT_LOG_INFO( - "Adding task to cgroup (Task: %v, Cgroup: %v)", - pid, - FullPath_); - Append("tasks", ToString(pid)); -} - -void TNonOwningCGroup::AddCurrentTask() const -{ - YT_VERIFY(!IsNull()); -#ifdef _linux_ - auto pid = getpid(); - AddTask(pid); -#endif -} - -TString TNonOwningCGroup::Get(const TString& name) const -{ - YT_VERIFY(!IsNull()); - TString result; -#ifdef _linux_ - const auto path = GetPath(name); - result = TFileInput(path).ReadLine(); -#else - Y_UNUSED(name); -#endif - return result; -} - -void TNonOwningCGroup::Set(const TString& name, const TString& value) const -{ - YT_VERIFY(!IsNull()); -#ifdef _linux_ - auto path = GetPath(name); - TUnbufferedFileOutput output(TFile(path, EOpenModeFlag::WrOnly)); - output << value; -#else - Y_UNUSED(name); - Y_UNUSED(value); -#endif -} - -void TNonOwningCGroup::Append(const TString& name, const TString& value) const -{ - YT_VERIFY(!IsNull()); -#ifdef _linux_ - auto path = GetPath(name); - TUnbufferedFileOutput output(TFile(path, EOpenModeFlag::ForAppend)); - output << value; -#else - Y_UNUSED(name); - Y_UNUSED(value); -#endif -} - -bool TNonOwningCGroup::IsRoot() const -{ - return FullPath_ == CGroupRootPath; -} - -bool TNonOwningCGroup::IsNull() const -{ - return FullPath_.empty(); -} - -bool TNonOwningCGroup::Exists() const -{ - return NFS::Exists(FullPath_); -} - -std::vector<int> TNonOwningCGroup::GetProcesses() const -{ - std::vector<int> results; - if (!IsNull()) { -#ifdef _linux_ - auto values = ReadAllValues(GetPath("cgroup.procs")); - for (const auto& value : values) { - int pid = FromString<int>(value); - results.push_back(pid); - } -#endif - } - return results; -} - -std::vector<int> TNonOwningCGroup::GetTasks() const -{ - std::vector<int> results; - if (!IsNull()) { -#ifdef _linux_ - auto values = ReadAllValues(GetPath("tasks")); - for (const auto& value : values) { - int pid = FromString<int>(value); - results.push_back(pid); - } -#endif - } - return results; -} - -const TString& TNonOwningCGroup::GetFullPath() const -{ - return FullPath_; -} - -std::vector<TNonOwningCGroup> TNonOwningCGroup::GetChildren() const -{ - // We retry enumerating directories, since it may fail with weird diagnostics if - // number of subcgroups changes. - while (true) { - try { - std::vector<TNonOwningCGroup> result; - - if (IsNull()) { - return result; - } - - auto directories = NFS::EnumerateDirectories(FullPath_); - for (const auto& directory : directories) { - result.emplace_back(NFS::CombinePaths(FullPath_, directory)); - } - return result; - } catch (const std::exception& ex) { - YT_LOG_WARNING(ex, "Failed to list subcgroups (Path: %v)", FullPath_); - } - } -} - -void TNonOwningCGroup::EnsureExistence() const -{ - YT_LOG_INFO("Creating cgroup (Cgroup: %v)", FullPath_); - - YT_VERIFY(!IsNull()); - -#ifdef _linux_ - NFS::MakeDirRecursive(FullPath_, 0755); -#endif -} - -void TNonOwningCGroup::Lock() const -{ - Traverse( - BIND([] (const TNonOwningCGroup& group) { group.DoLock(); }), - BIND([] (const TNonOwningCGroup& /*group*/) {})); -} - -void TNonOwningCGroup::Unlock() const -{ - Traverse( - BIND([] (const TNonOwningCGroup& /*group*/) {}), - BIND([] (const TNonOwningCGroup& group) { group.DoUnlock(); })); -} - -void TNonOwningCGroup::Kill() const -{ - YT_VERIFY(!IsRoot()); - - Traverse( - BIND([] (const TNonOwningCGroup& group) { group.DoKill(); }), - BIND([] (const TNonOwningCGroup& /*group*/) {})); -} - -void TNonOwningCGroup::RemoveAllSubcgroups() const -{ - Traverse( - BIND([] (const TNonOwningCGroup& group) { - group.TryUnlock(); - }), - BIND([this_ = this] (const TNonOwningCGroup& group) { - if (this_ != &group) { - group.DoRemove(); - } - })); -} - -void TNonOwningCGroup::RemoveRecursive() const -{ - RemoveAllSubcgroups(); - DoRemove(); -} - -void TNonOwningCGroup::DoLock() const -{ - YT_LOG_INFO("Locking cgroup (Cgroup: %v)", FullPath_); - -#ifdef _linux_ - if (!IsNull()) { - int code = chmod(FullPath_.data(), ReadExecuteByAll); - YT_VERIFY(code == 0); - - code = chmod(GetPath("tasks").data(), ReadByAll); - YT_VERIFY(code == 0); - } -#endif -} - -bool TNonOwningCGroup::TryUnlock() const -{ - YT_LOG_INFO("Unlocking cgroup (Cgroup: %v)", FullPath_); - - if (!Exists()) { - return true; - } - - bool result = true; - -#ifdef _linux_ - if (!IsNull()) { - int code = chmod(GetPath("tasks").data(), ReadByAll | S_IWUSR); - if (code != 0) { - result = false; - } - - code = chmod(FullPath_.data(), ReadExecuteByAll | S_IWUSR); - if (code != 0) { - result = false; - } - } -#endif - - return result; -} - -void TNonOwningCGroup::DoUnlock() const -{ - YT_VERIFY(TryUnlock()); -} - -void TNonOwningCGroup::DoKill() const -{ - YT_LOG_DEBUG("Started killing processes in cgroup (Cgroup: %v)", FullPath_); - -#ifdef _linux_ - while (true) { - auto pids = GetTasks(); - if (pids.empty()) - break; - - YT_LOG_DEBUG("Killing processes (Pids: %v)", pids); - - for (int pid : pids) { - auto result = kill(pid, SIGKILL); - if (result == -1) { - YT_VERIFY(errno == ESRCH); - } - } - - ThreadYield(); - } -#endif - - YT_LOG_DEBUG("Finished killing processes in cgroup (Cgroup: %v)", FullPath_); -} - -void TNonOwningCGroup::DoRemove() const -{ - if (NFS::Exists(FullPath_)) { - NFS::Remove(FullPath_); - } -} - -void TNonOwningCGroup::Traverse( - const TCallback<void(const TNonOwningCGroup&)>& preorderAction, - const TCallback<void(const TNonOwningCGroup&)>& postorderAction) const -{ - preorderAction(*this); - - for (const auto& child : GetChildren()) { - child.Traverse(preorderAction, postorderAction); - } - - postorderAction(*this); -} - -TString TNonOwningCGroup::GetPath(const TString& filename) const -{ - return NFS::CombinePaths(FullPath_, filename); -} - -//////////////////////////////////////////////////////////////////////////////// - -TCGroup::TCGroup(const TString& type, const TString& name) - : TNonOwningCGroup(type, name) -{ } - -TCGroup::TCGroup(TCGroup&& other) - : TNonOwningCGroup(std::move(other)) - , Created_(other.Created_) -{ - other.Created_ = false; -} - -TCGroup::TCGroup(TNonOwningCGroup&& other) - : TNonOwningCGroup(std::move(other)) - , Created_(false) -{ } - -TCGroup::~TCGroup() -{ - if (Created_) { - Destroy(); - } -} - -void TCGroup::Create() -{ - EnsureExistence(); - Created_ = true; -} - -void TCGroup::Destroy() -{ - YT_LOG_INFO("Destroying cgroup (Cgroup: %v)", FullPath_); - YT_VERIFY(Created_); - -#ifdef _linux_ - try { - NFS::Remove(FullPath_); - } catch (const std::exception& ex) { - YT_LOG_FATAL(ex, "Failed to destroy cgroup (Cgroup: %v)", FullPath_); - } -#endif - Created_ = false; -} - -bool TCGroup::IsCreated() const -{ - return Created_; -} - -//////////////////////////////////////////////////////////////////////////////// - -const TString TCpuAccounting::Name = "cpuacct"; - -TCpuAccounting::TStatistics& operator-=(TCpuAccounting::TStatistics& lhs, const TCpuAccounting::TStatistics& rhs) -{ - #define XX(name) lhs.name = lhs.name.ValueOrThrow() - rhs.name.ValueOrThrow(); - XX(UserUsageTime) - XX(SystemUsageTime) - XX(WaitTime) - XX(ThrottledTime) - XX(ContextSwitchesDelta) - XX(PeakThreadCount) - #undef XX - return lhs; -} - -TCpuAccounting::TCpuAccounting(const TString& name) - : TCGroup(Name, name) -{ } - -TCpuAccounting::TCpuAccounting(TNonOwningCGroup&& nonOwningCGroup) - : TCGroup(std::move(nonOwningCGroup)) -{ } - -TCpuAccounting::TStatistics TCpuAccounting::GetStatisticsRecursive() const -{ - TCpuAccounting::TStatistics result; -#ifdef _linux_ - try { - auto path = NFS::CombinePaths(GetFullPath(), "cpuacct.stat"); - auto values = ReadAllValues(path); - YT_VERIFY(values.size() == 4); - - TString type[2]; - ui64 jiffies[2]; - - for (int i = 0; i < 2; ++i) { - type[i] = values[2 * i]; - jiffies[i] = FromString<ui64>(values[2 * i + 1]); - } - - for (int i = 0; i < 2; ++i) { - if (type[i] == "user") { - result.UserUsageTime = FromJiffies(jiffies[i]); - } else if (type[i] == "system") { - result.SystemUsageTime = FromJiffies(jiffies[i]); - } - } - } catch (const std::exception& ex) { - YT_LOG_FATAL( - ex, - "Failed to retrieve CPU statistics from cgroup (Cgroup: %v)", - GetFullPath()); - } -#endif - return result; -} - -TCpuAccounting::TStatistics TCpuAccounting::GetStatistics() const -{ - auto statistics = GetStatisticsRecursive(); - for (auto& cgroup : GetChildren()) { - auto cpuCGroup = TCpuAccounting(std::move(cgroup)); - statistics -= cpuCGroup.GetStatisticsRecursive(); - } - return statistics; -} - - -//////////////////////////////////////////////////////////////////////////////// - -const TString TCpu::Name = "cpu"; - -static const int DefaultCpuShare = 1024; - -TCpu::TCpu(const TString& name) - : TCGroup(Name, name) -{ } - -void TCpu::SetShare(double share) -{ - int cpuShare = static_cast<int>(share * DefaultCpuShare); - Set("cpu.shares", ToString(cpuShare)); -} - -//////////////////////////////////////////////////////////////////////////////// - -const TString TBlockIO::Name = "blkio"; - -TBlockIO::TBlockIO(const TString& name) - : TCGroup(Name, name) -{ } - -// For more information about format of data -// read https://www.kernel.org/doc/Documentation/cgroups/blkio-controller.txt - -TBlockIO::TStatistics TBlockIO::GetStatistics() const -{ - TBlockIO::TStatistics result; -#ifdef _linux_ - auto bytesStats = GetDetailedStatistics("blkio.io_service_bytes"); - for (const auto& item : bytesStats) { - if (item.Type == "Read") { - result.IOReadByte = result.IOReadByte.ValueOrThrow() + item.Value; - } else if (item.Type == "Write") { - result.IOWriteByte = result.IOReadByte.ValueOrThrow() + item.Value; - } - } - - auto ioStats = GetDetailedStatistics("blkio.io_serviced"); - for (const auto& item : ioStats) { - if (item.Type == "Read") { - result.IOReadOps = result.IOReadOps.ValueOrThrow() + item.Value; - result.IOOps = result.IOOps.ValueOrThrow() + item.Value; - } else if (item.Type == "Write") { - result.IOWriteOps = result.IOWriteOps.ValueOrThrow() + item.Value; - result.IOOps = result.IOOps.ValueOrThrow() + item.Value; - } - } -#endif - return result; -} - -std::vector<TBlockIO::TStatisticsItem> TBlockIO::GetIOServiceBytes() const -{ - return GetDetailedStatistics("blkio.io_service_bytes"); -} - -std::vector<TBlockIO::TStatisticsItem> TBlockIO::GetIOServiced() const -{ - return GetDetailedStatistics("blkio.io_serviced"); -} - -std::vector<TBlockIO::TStatisticsItem> TBlockIO::GetDetailedStatistics(const char* filename) const -{ - std::vector<TBlockIO::TStatisticsItem> result; -#ifdef _linux_ - try { - auto path = NFS::CombinePaths(GetFullPath(), filename); - auto values = ReadAllValues(path); - - int lineNumber = 0; - while (3 * lineNumber + 2 < std::ssize(values)) { - TStatisticsItem item; - item.DeviceId = values[3 * lineNumber]; - item.Type = values[3 * lineNumber + 1]; - item.Value = FromString<ui64>(values[3 * lineNumber + 2]); - - { - auto guard = Guard(SpinLock_); - DeviceIds_.insert(item.DeviceId); - } - - if (item.Type == "Read" || item.Type == "Write") { - result.push_back(item); - - YT_LOG_DEBUG("IO operations serviced (OperationCount: %v, OperationType: %v, DeviceId: %v)", - item.Value, - item.Type, - item.DeviceId); - } - ++lineNumber; - } - } catch (const std::exception& ex) { - YT_LOG_FATAL( - ex, - "Failed to retrieve block IO statistics from cgroup (Cgroup: %v)", - GetFullPath()); - } -#else - Y_UNUSED(filename); -#endif - return result; -} - -void TBlockIO::ThrottleOperations(i64 operations) const -{ - auto guard = Guard(SpinLock_); - for (const auto& deviceId : DeviceIds_) { - auto value = Format("%v %v", deviceId, operations); - Append("blkio.throttle.read_iops_device", value); - Append("blkio.throttle.write_iops_device", value); - } -} - -//////////////////////////////////////////////////////////////////////////////// - -const TString TMemory::Name = "memory"; - -TMemory::TMemory(const TString& name) - : TCGroup(Name, name) -{ } - -TMemory::TStatistics TMemory::GetStatistics() const -{ - TMemory::TStatistics result; -#ifdef _linux_ - try { - auto values = ReadAllValues(GetPath("memory.stat")); - int lineNumber = 0; - while (2 * lineNumber + 1 < std::ssize(values)) { - const auto& type = values[2 * lineNumber]; - const auto& unparsedValue = values[2 * lineNumber + 1]; - if (type == "rss") { - result.Rss = FromString<ui64>(unparsedValue); - } - if (type == "mapped_file") { - result.MappedFile = FromString<ui64>(unparsedValue); - } - if (type == "pgmajfault") { - result.MajorPageFaults = FromString<ui64>(unparsedValue); - } - ++lineNumber; - } - } catch (const std::exception& ex) { - YT_LOG_FATAL( - ex, - "Failed to retrieve memory statistics from cgroup (Cgroup: %v)", - GetFullPath()); - } -#endif - return result; -} - -i64 TMemory::GetMaxMemoryUsage() const -{ - return FromString<i64>(Get("memory.max_usage_in_bytes")); -} - -void TMemory::SetLimitInBytes(i64 bytes) const -{ - Set("memory.limit_in_bytes", ToString(bytes)); -} - -void TMemory::ForceEmpty() const -{ - Set("memory.force_empty", "0"); -} - -//////////////////////////////////////////////////////////////////////////////// - -const TString TFreezer::Name = "freezer"; - -TFreezer::TFreezer(const TString& name) - : TCGroup(Name, name) -{ } - -TString TFreezer::GetState() const -{ - return Get("freezer.state"); -} - -void TFreezer::Freeze() const -{ - Set("freezer.state", "FROZEN"); -} - -void TFreezer::Unfreeze() const -{ - Set("freezer.state", "THAWED"); -} - -//////////////////////////////////////////////////////////////////////////////// - -std::map<TString, TString> ParseProcessCGroups(const TString& str) -{ - std::map<TString, TString> result; - - TVector<TString> values; - StringSplitter(str.data()).SplitBySet(":\n").SkipEmpty().Collect(&values); - for (size_t i = 0; i + 2 < values.size(); i += 3) { - // Check format. - FromString<int>(values[i]); - - const auto& subsystemsSet = values[i + 1]; - const auto& name = values[i + 2]; - - TVector<TString> subsystems; - StringSplitter(subsystemsSet.data()).Split(',').SkipEmpty().Collect(&subsystems); - for (const auto& subsystem : subsystems) { - if (!subsystem.StartsWith("name=")) { - int start = 0; - if (name.StartsWith("/")) { - start = 1; - } - result[subsystem] = name.substr(start); - } - } - } - - return result; -} - -std::map<TString, TString> GetProcessCGroups(pid_t pid) -{ - auto cgroupsPath = Format("/proc/%v/cgroup", pid); - auto rawCgroups = TFileInput{cgroupsPath}.ReadAll(); - return ParseProcessCGroups(rawCgroups); -} - -bool IsValidCGroupType(const TString& type) -{ - return - type == TCpuAccounting::Name || - type == TCpu::Name || - type == TBlockIO::Name || - type == TMemory::Name || - type == TFreezer::Name; -} - -//////////////////////////////////////////////////////////////////////////////// - -} // namespace NYT::NContainers diff --git a/yt/yt/library/containers/cgroup.h b/yt/yt/library/containers/cgroup.h deleted file mode 100644 index a61fbbddc3..0000000000 --- a/yt/yt/library/containers/cgroup.h +++ /dev/null @@ -1,290 +0,0 @@ -#pragma once - -#include "public.h" - -#include <yt/yt/core/actions/public.h> - -#include <yt/yt/core/ytree/yson_struct.h> -#include <yt/yt/core/yson/public.h> - -#include <yt/yt/core/misc/property.h> - -#include <library/cpp/yt/threading/spin_lock.h> - -#include <vector> - -namespace NYT::NContainers { - -//////////////////////////////////////////////////////////////////////////////// - -void RemoveAllSubcgroups(const TString& path); - -//////////////////////////////////////////////////////////////////////////////// - -struct TKillProcessGroupTool -{ - void operator()(const TString& processGroupPath) const; -}; - -//////////////////////////////////////////////////////////////////////////////// - -class TNonOwningCGroup - : private TNonCopyable -{ -public: - DEFINE_BYREF_RO_PROPERTY(TString, FullPath); - -public: - TNonOwningCGroup() = default; - explicit TNonOwningCGroup(const TString& fullPath); - TNonOwningCGroup(const TString& type, const TString& name); - TNonOwningCGroup(TNonOwningCGroup&& other); - - void AddTask(int pid) const; - void AddCurrentTask() const; - - bool IsRoot() const; - bool IsNull() const; - bool Exists() const; - - std::vector<int> GetProcesses() const; - std::vector<int> GetTasks() const; - const TString& GetFullPath() const; - - std::vector<TNonOwningCGroup> GetChildren() const; - - void EnsureExistence() const; - - void Lock() const; - void Unlock() const; - - void Kill() const; - - void RemoveAllSubcgroups() const; - void RemoveRecursive() const; - -protected: - TString Get(const TString& name) const; - void Set(const TString& name, const TString& value) const; - void Append(const TString& name, const TString& value) const; - - void DoLock() const; - void DoUnlock() const; - - bool TryUnlock() const; - - void DoKill() const; - - void DoRemove() const; - - void Traverse( - const TCallback<void(const TNonOwningCGroup&)>& preorderAction, - const TCallback<void(const TNonOwningCGroup&)>& postorderAction) const; - - TString GetPath(const TString& filename) const; -}; - -//////////////////////////////////////////////////////////////////////////////// - -class TCGroup - : public TNonOwningCGroup -{ -protected: - TCGroup(const TString& type, const TString& name); - TCGroup(TNonOwningCGroup&& other); - TCGroup(TCGroup&& other); - -public: - ~TCGroup(); - - void Create(); - void Destroy(); - - bool IsCreated() const; - -private: - bool Created_ = false; -}; - -//////////////////////////////////////////////////////////////////////////////// - -class TCpuAccounting - : public TCGroup -{ -public: - static const TString Name; - - struct TStatistics - { - TErrorOr<TDuration> TotalUsageTime; - TErrorOr<TDuration> UserUsageTime; - TErrorOr<TDuration> SystemUsageTime; - TErrorOr<TDuration> WaitTime; - TErrorOr<TDuration> ThrottledTime; - - TErrorOr<ui64> ThreadCount; - TErrorOr<ui64> ContextSwitches; - TErrorOr<ui64> ContextSwitchesDelta; - TErrorOr<ui64> PeakThreadCount; - - TErrorOr<TDuration> LimitTime; - TErrorOr<TDuration> GuaranteeTime; - }; - - explicit TCpuAccounting(const TString& name); - - TStatistics GetStatisticsRecursive() const; - TStatistics GetStatistics() const; - -private: - explicit TCpuAccounting(TNonOwningCGroup&& nonOwningCGroup); -}; - -void Serialize(const TCpuAccounting::TStatistics& statistics, NYson::IYsonConsumer* consumer); - -//////////////////////////////////////////////////////////////////////////////// - -class TCpu - : public TCGroup -{ -public: - static const TString Name; - - explicit TCpu(const TString& name); - - void SetShare(double share); -}; - -//////////////////////////////////////////////////////////////////////////////// - -class TBlockIO - : public TCGroup -{ -public: - static const TString Name; - - struct TStatistics - { - TErrorOr<ui64> IOReadByte; - TErrorOr<ui64> IOWriteByte; - TErrorOr<ui64> IOBytesLimit; - - TErrorOr<ui64> IOReadOps; - TErrorOr<ui64> IOWriteOps; - TErrorOr<ui64> IOOps; - TErrorOr<ui64> IOOpsLimit; - - TErrorOr<TDuration> IOTotalTime; - TErrorOr<TDuration> IOWaitTime; - }; - - struct TStatisticsItem - { - TString DeviceId; - TString Type; - ui64 Value = 0; - }; - - explicit TBlockIO(const TString& name); - - TStatistics GetStatistics() const; - void ThrottleOperations(i64 iops) const; - -private: - //! Guards device ids. - YT_DECLARE_SPIN_LOCK(NThreading::TSpinLock, SpinLock_); - //! Set of all seen device ids. - mutable THashSet<TString> DeviceIds_; - - std::vector<TBlockIO::TStatisticsItem> GetDetailedStatistics(const char* filename) const; - - std::vector<TStatisticsItem> GetIOServiceBytes() const; - std::vector<TStatisticsItem> GetIOServiced() const; -}; - -void Serialize(const TBlockIO::TStatistics& statistics, NYson::IYsonConsumer* consumer); - -//////////////////////////////////////////////////////////////////////////////// - -class TMemory - : public TCGroup -{ -public: - static const TString Name; - - struct TStatistics - { - TErrorOr<ui64> Rss; - TErrorOr<ui64> MappedFile; - TErrorOr<ui64> MinorPageFaults; - TErrorOr<ui64> MajorPageFaults; - - TErrorOr<ui64> FileCacheUsage; - TErrorOr<ui64> AnonUsage; - TErrorOr<ui64> AnonLimit; - TErrorOr<ui64> MemoryUsage; - TErrorOr<ui64> MemoryGuarantee; - TErrorOr<ui64> MemoryLimit; - TErrorOr<ui64> MaxMemoryUsage; - - TErrorOr<ui64> OomKills; - TErrorOr<ui64> OomKillsTotal; - }; - - explicit TMemory(const TString& name); - - TStatistics GetStatistics() const; - i64 GetMaxMemoryUsage() const; - - void SetLimitInBytes(i64 bytes) const; - - void ForceEmpty() const; -}; - -void Serialize(const TMemory::TStatistics& statistics, NYson::IYsonConsumer* consumer); - -//////////////////////////////////////////////////////////////////////////////// - -class TNetwork -{ -public: - struct TStatistics - { - TErrorOr<ui64> TxBytes; - TErrorOr<ui64> TxPackets; - TErrorOr<ui64> TxDrops; - TErrorOr<ui64> TxLimit; - - TErrorOr<ui64> RxBytes; - TErrorOr<ui64> RxPackets; - TErrorOr<ui64> RxDrops; - TErrorOr<ui64> RxLimit; - }; -}; - -void Serialize(const TNetwork::TStatistics& statistics, NYson::IYsonConsumer* consumer); - -//////////////////////////////////////////////////////////////////////////////// - -class TFreezer - : public TCGroup -{ -public: - static const TString Name; - - explicit TFreezer(const TString& name); - - TString GetState() const; - void Freeze() const; - void Unfreeze() const; -}; - -//////////////////////////////////////////////////////////////////////////////// - -std::map<TString, TString> ParseProcessCGroups(const TString& str); -std::map<TString, TString> GetProcessCGroups(pid_t pid); -bool IsValidCGroupType(const TString& type); - -//////////////////////////////////////////////////////////////////////////////// - -} // namespace NYT::NContainers diff --git a/yt/yt/library/containers/config.cpp b/yt/yt/library/containers/config.cpp deleted file mode 100644 index 39e46f2372..0000000000 --- a/yt/yt/library/containers/config.cpp +++ /dev/null @@ -1,64 +0,0 @@ -#include "config.h" - -namespace NYT::NContainers { - -//////////////////////////////////////////////////////////////////////////////// - -void TPodSpecConfig::Register(TRegistrar registrar) -{ - registrar.Parameter("cpu_to_vcpu_factor", &TThis::CpuToVCpuFactor) - .Default(); -} - -//////////////////////////////////////////////////////////////////////////////// - -bool TCGroupConfig::IsCGroupSupported(const TString& cgroupType) const -{ - auto it = std::find_if( - SupportedCGroups.begin(), - SupportedCGroups.end(), - [&] (const TString& type) { - return type == cgroupType; - }); - return it != SupportedCGroups.end(); -} - -void TCGroupConfig::Register(TRegistrar registrar) -{ - registrar.Parameter("supported_cgroups", &TThis::SupportedCGroups) - .Default(); - - registrar.Postprocessor([] (TThis* config) { - for (const auto& type : config->SupportedCGroups) { - if (!IsValidCGroupType(type)) { - THROW_ERROR_EXCEPTION("Invalid cgroup type %Qv", type); - } - } - }); -} - -//////////////////////////////////////////////////////////////////////////////// - -void TPortoExecutorDynamicConfig::Register(TRegistrar registrar) -{ - registrar.Parameter("retries_timeout", &TThis::RetriesTimeout) - .Default(TDuration::Seconds(10)); - registrar.Parameter("poll_period", &TThis::PollPeriod) - .Default(TDuration::MilliSeconds(100)); - registrar.Parameter("api_timeout", &TThis::ApiTimeout) - .Default(TDuration::Minutes(5)); - registrar.Parameter("api_disk_timeout", &TThis::ApiDiskTimeout) - .Default(TDuration::Minutes(30)); - registrar.Parameter("enable_network_isolation", &TThis::EnableNetworkIsolation) - .Default(true); - registrar.Parameter("enable_test_porto_failures", &TThis::EnableTestPortoFailures) - .Default(false); - registrar.Parameter("stub_error_code", &TThis::StubErrorCode) - .Default(EPortoErrorCode::SocketError); - registrar.Parameter("enable_test_porto_not_responding", &TThis::EnableTestPortoNotResponding) - .Default(false); -} - -//////////////////////////////////////////////////////////////////////////////// - -} // namespace NYT::NContainers diff --git a/yt/yt/library/containers/config.h b/yt/yt/library/containers/config.h deleted file mode 100644 index 3639274cff..0000000000 --- a/yt/yt/library/containers/config.h +++ /dev/null @@ -1,64 +0,0 @@ -#pragma once - -#include "public.h" - -#include <yt/yt/core/ytree/yson_struct.h> - -namespace NYT::NContainers { - -//////////////////////////////////////////////////////////////////////////////// - -class TPodSpecConfig - : public virtual NYTree::TYsonStruct -{ -public: - std::optional<double> CpuToVCpuFactor; - - REGISTER_YSON_STRUCT(TPodSpecConfig); - - static void Register(TRegistrar registrar); -}; - -DEFINE_REFCOUNTED_TYPE(TPodSpecConfig) - -//////////////////////////////////////////////////////////////////////////////// - -class TCGroupConfig - : public virtual NYTree::TYsonStruct -{ -public: - std::vector<TString> SupportedCGroups; - - bool IsCGroupSupported(const TString& cgroupType) const; - - REGISTER_YSON_STRUCT(TCGroupConfig); - - static void Register(TRegistrar registrar); -}; - -//////////////////////////////////////////////////////////////////////////////// - -class TPortoExecutorDynamicConfig - : public NYTree::TYsonStruct -{ -public: - TDuration RetriesTimeout; - TDuration PollPeriod; - TDuration ApiTimeout; - TDuration ApiDiskTimeout; - bool EnableNetworkIsolation; - bool EnableTestPortoFailures; - bool EnableTestPortoNotResponding; - - EPortoErrorCode StubErrorCode; - - REGISTER_YSON_STRUCT(TPortoExecutorDynamicConfig); - - static void Register(TRegistrar registrar); -}; - -DEFINE_REFCOUNTED_TYPE(TPortoExecutorDynamicConfig) - -//////////////////////////////////////////////////////////////////////////////// - -} // namespace NYT::NContainers diff --git a/yt/yt/library/containers/cri/config.cpp b/yt/yt/library/containers/cri/config.cpp deleted file mode 100644 index 5572f4d980..0000000000 --- a/yt/yt/library/containers/cri/config.cpp +++ /dev/null @@ -1,54 +0,0 @@ -#include "config.h" -#include "cri_api.h" - -namespace NYT::NContainers::NCri { - -//////////////////////////////////////////////////////////////////////////////// - -void TCriExecutorConfig::Register(TRegistrar registrar) -{ - registrar.Parameter("runtime_endpoint", &TThis::RuntimeEndpoint) - .Default(TString(DefaultCriEndpoint)); - - registrar.Parameter("image_endpoint", &TThis::ImageEndpoint) - .Default(TString(DefaultCriEndpoint)); - - registrar.Parameter("namespace", &TThis::Namespace) - .NonEmpty(); - - registrar.Parameter("runtime_handler", &TThis::RuntimeHandler) - .Optional(); - - registrar.Parameter("base_cgroup", &TThis::BaseCgroup) - .NonEmpty(); - - registrar.Parameter("cpu_period", &TThis::CpuPeriod) - .Default(TDuration::MilliSeconds(100)); -} - -//////////////////////////////////////////////////////////////////////////////// - -void TCriAuthConfig::Register(TRegistrar registrar) -{ - registrar.Parameter("username", &TThis::Username) - .Optional(); - - registrar.Parameter("password", &TThis::Password) - .Optional(); - - registrar.Parameter("auth", &TThis::Auth) - .Optional(); - - registrar.Parameter("server_address", &TThis::ServerAddress) - .Optional(); - - registrar.Parameter("identity_token", &TThis::IdentityToken) - .Optional(); - - registrar.Parameter("registry_token", &TThis::RegistryToken) - .Optional(); -} - -//////////////////////////////////////////////////////////////////////////////// - -} // namespace NYT::NContainers::NCri diff --git a/yt/yt/library/containers/cri/config.h b/yt/yt/library/containers/cri/config.h deleted file mode 100644 index 4ea33fd390..0000000000 --- a/yt/yt/library/containers/cri/config.h +++ /dev/null @@ -1,70 +0,0 @@ -#pragma once - -#include "public.h" - -#include <yt/yt/core/rpc/config.h> - -namespace NYT::NContainers::NCri { - -//////////////////////////////////////////////////////////////////////////////// - -class TCriExecutorConfig - : public NRpc::TRetryingChannelConfig -{ -public: - //! gRPC endpoint for CRI container runtime service. - TString RuntimeEndpoint; - - //! gRPC endpoint for CRI image manager service. - TString ImageEndpoint; - - //! CRI namespace where this executor operates. - TString Namespace; - - //! Name of CRI runtime configuration to use. - TString RuntimeHandler; - - //! Common parent cgroup for all pods. - TString BaseCgroup; - - //! Cpu quota period for cpu limits. - TDuration CpuPeriod; - - REGISTER_YSON_STRUCT(TCriExecutorConfig); - - static void Register(TRegistrar registrar); -}; - -DEFINE_REFCOUNTED_TYPE(TCriExecutorConfig) - -//////////////////////////////////////////////////////////////////////////////// - -// TODO(khlebnikov): split docker registry stuff into common "docker" library. - -//! TCriAuthConfig depicts docker registry authentification -class TCriAuthConfig - : public NYTree::TYsonStruct -{ -public: - TString Username; - - TString Password; - - TString Auth; - - TString ServerAddress; - - TString IdentityToken; - - TString RegistryToken; - - REGISTER_YSON_STRUCT(TCriAuthConfig); - - static void Register(TRegistrar registrar); -}; - -DEFINE_REFCOUNTED_TYPE(TCriAuthConfig) - -//////////////////////////////////////////////////////////////////////////////// - -} // namespace NYT::NContainers::NCri diff --git a/yt/yt/library/containers/cri/cri_api.cpp b/yt/yt/library/containers/cri/cri_api.cpp deleted file mode 100644 index 93457017ba..0000000000 --- a/yt/yt/library/containers/cri/cri_api.cpp +++ /dev/null @@ -1,33 +0,0 @@ -#include "cri_api.h" - -namespace NYT::NContainers::NCri { - -using namespace NRpc; - -//////////////////////////////////////////////////////////////////////////////// - -TCriRuntimeApi::TCriRuntimeApi(IChannelPtr channel) - : TProxyBase(std::move(channel), GetDescriptor()) -{ } - -const TServiceDescriptor& TCriRuntimeApi::GetDescriptor() -{ - static const auto Descriptor = TServiceDescriptor(NProto::RuntimeService::service_full_name()); - return Descriptor; -} - -//////////////////////////////////////////////////////////////////////////////// - -TCriImageApi::TCriImageApi(IChannelPtr channel) - : TProxyBase(std::move(channel), GetDescriptor()) -{ } - -const TServiceDescriptor& TCriImageApi::GetDescriptor() -{ - static const auto Descriptor = TServiceDescriptor(NProto::ImageService::service_full_name()); - return Descriptor; -} - -//////////////////////////////////////////////////////////////////////////////// - -} // namespace NYT::NContainers::NCri diff --git a/yt/yt/library/containers/cri/cri_api.h b/yt/yt/library/containers/cri/cri_api.h deleted file mode 100644 index 74fe9a64a0..0000000000 --- a/yt/yt/library/containers/cri/cri_api.h +++ /dev/null @@ -1,99 +0,0 @@ -#pragma once - -#include <yt/yt/core/rpc/client.h> - -#include <k8s.io/cri-api/pkg/apis/runtime/v1/api.grpc.pb.h> - -namespace NYT::NContainers::NCri { - -//////////////////////////////////////////////////////////////////////////////// - -namespace NProto = ::runtime::v1; - -//! Reasonable default for CRI gRPC socket address. -constexpr TStringBuf DefaultCriEndpoint = "unix:///run/containerd/containerd.sock"; - -//! RuntimeReady means the runtime is up and ready to accept basic containers. -constexpr TStringBuf RuntimeReady = "RuntimeReady"; - -//! NetworkReady means the runtime network is up and ready to accept containers which require network. -constexpr TStringBuf NetworkReady = "NetworkReady"; - -//! CRI uses cgroupfs notation for systemd slices, but each name must ends with ".slice". -constexpr TStringBuf SystemdSliceSuffix = ".slice"; - -//////////////////////////////////////////////////////////////////////////////// - -//! CRI labels for pods and containers managed by YT -constexpr TStringBuf YTPodNamespaceLabel = "tech.ytsaurus.pod.namespace"; -constexpr TStringBuf YTPodNameLabel = "tech.ytsaurus.pod.name"; -constexpr TStringBuf YTContainerNameLabel = "tech.ytsaurus.container.name"; -constexpr TStringBuf YTJobIdLabel = "tech.ytsaurus.job.id"; - -//////////////////////////////////////////////////////////////////////////////// - -#define DEFINE_CRI_API_METHOD(method, ...) \ - DEFINE_RPC_PROXY_METHOD_GENERIC(method, NProto::method##Request, NProto::method##Response, __VA_ARGS__) - -//! See https://github.com/kubernetes/cri-api -class TCriRuntimeApi - : public NRpc::TProxyBase -{ -public: - explicit TCriRuntimeApi(NRpc::IChannelPtr channel); - - static const NRpc::TServiceDescriptor& GetDescriptor(); - - DEFINE_CRI_API_METHOD(Version); - DEFINE_CRI_API_METHOD(RunPodSandbox); - DEFINE_CRI_API_METHOD(StopPodSandbox); - DEFINE_CRI_API_METHOD(RemovePodSandbox); - DEFINE_CRI_API_METHOD(PodSandboxStatus); - DEFINE_CRI_API_METHOD(ListPodSandbox); - DEFINE_CRI_API_METHOD(CreateContainer); - DEFINE_CRI_API_METHOD(StartContainer); - DEFINE_CRI_API_METHOD(StopContainer); - DEFINE_CRI_API_METHOD(RemoveContainer); - DEFINE_CRI_API_METHOD(ListContainers); - DEFINE_CRI_API_METHOD(ContainerStatus); - DEFINE_CRI_API_METHOD(UpdateContainerResources); - DEFINE_CRI_API_METHOD(ReopenContainerLog); - DEFINE_CRI_API_METHOD(ExecSync); - DEFINE_CRI_API_METHOD(Exec); - DEFINE_CRI_API_METHOD(Attach); - DEFINE_CRI_API_METHOD(PortForward); - DEFINE_CRI_API_METHOD(ContainerStats); - DEFINE_CRI_API_METHOD(ListContainerStats); - DEFINE_CRI_API_METHOD(PodSandboxStats); - DEFINE_CRI_API_METHOD(ListPodSandboxStats); - DEFINE_CRI_API_METHOD(UpdateRuntimeConfig); - DEFINE_CRI_API_METHOD(Status); - DEFINE_CRI_API_METHOD(CheckpointContainer); - DEFINE_CRI_API_METHOD(ListMetricDescriptors); - DEFINE_CRI_API_METHOD(ListPodSandboxMetrics); - - // FIXME(khlebnikov): figure out streaming results - // DEFINE_RPC_PROXY_METHOD_GENERIC(GetContainerEvents, NProto::GetEventsRequest, NProto::ContainerEventResponse, - // .SetStreamingEnabled(true)); -}; - -//////////////////////////////////////////////////////////////////////////////// - -class TCriImageApi - : public NRpc::TProxyBase -{ -public: - explicit TCriImageApi(NRpc::IChannelPtr channel); - - static const NRpc::TServiceDescriptor& GetDescriptor(); - - DEFINE_CRI_API_METHOD(ListImages); - DEFINE_CRI_API_METHOD(ImageStatus); - DEFINE_CRI_API_METHOD(PullImage); - DEFINE_CRI_API_METHOD(RemoveImage); - DEFINE_CRI_API_METHOD(ImageFsInfo); -}; - -//////////////////////////////////////////////////////////////////////////////// - -} // namespace NYT::NContainers::NCri diff --git a/yt/yt/library/containers/cri/cri_executor.cpp b/yt/yt/library/containers/cri/cri_executor.cpp deleted file mode 100644 index 428fd93165..0000000000 --- a/yt/yt/library/containers/cri/cri_executor.cpp +++ /dev/null @@ -1,666 +0,0 @@ -#include "cri_executor.h" -#include "private.h" - -#include <yt/yt/core/actions/bind.h> - -#include <yt/yt/core/rpc/grpc/channel.h> - -#include <yt/yt/core/rpc/retrying_channel.h> - -#include <yt/yt/core/misc/error.h> -#include <yt/yt/core/misc/proc.h> -#include <yt/yt/core/misc/protobuf_helpers.h> - -#include <yt/yt/core/concurrency/periodic_executor.h> - -namespace NYT::NContainers::NCri { - -using namespace NRpc; -using namespace NRpc::NGrpc; -using namespace NConcurrency; - -//////////////////////////////////////////////////////////////////////////////// - -void FormatValue(TStringBuilderBase* builder, const TCriDescriptor& descriptor, TStringBuf /*spec*/) -{ - builder->AppendFormat("%v (%s)", descriptor.Id.substr(0, 12), descriptor.Name); -} - -void FormatValue(TStringBuilderBase* builder, const TCriPodDescriptor& descriptor, TStringBuf /*spec*/) -{ - builder->AppendFormat("%v (%s)", descriptor.Id.substr(0, 12), descriptor.Name); -} - -void FormatValue(TStringBuilderBase* builder, const TCriImageDescriptor& descriptor, TStringBuf /*spec*/) -{ - builder->AppendString(descriptor.Image); -} - -static TError DecodeExitCode(int exitCode, const TString& reason) -{ - if (exitCode == 0) { - return TError(); - } - - // TODO(khkebnikov) map reason == "OOMKilled" - - // Common bash notation for signals: 128 + signal - if (exitCode > 128) { - int signalNumber = exitCode - 128; - return TError( - EProcessErrorCode::Signal, - "Process terminated by signal %v", - signalNumber) - << TErrorAttribute("signal", signalNumber) - << TErrorAttribute("reason", reason); - } - - // TODO(khkebnikov) check these - // 125 - container failed to run - // 126 - non executable - // 127 - command not found - // 128 - invalid exit code - // 255 - exit code out of range - - return TError( - EProcessErrorCode::NonZeroExitCode, - "Process exited with code %v", - exitCode) - << TErrorAttribute("exit_code", exitCode) - << TErrorAttribute("reason", reason); -} - -//////////////////////////////////////////////////////////////////////////////// - -class TCriProcess - : public TProcessBase -{ -public: - TCriProcess( - const TString& path, - ICriExecutorPtr executor, - TCriContainerSpecPtr containerSpec, - const TCriPodDescriptor& podDescriptor, - TCriPodSpecPtr podSpec, - TDuration pollPeriod = TDuration::MilliSeconds(100)) - : TProcessBase(path) - , Executor_(std::move(executor)) - , ContainerSpec_(std::move(containerSpec)) - , PodDescriptor_(podDescriptor) - , PodSpec_(std::move(podSpec)) - , PollPeriod_(pollPeriod) - { - // Just for symmetry with sibling classes. - AddArgument(Path_); - } - - void Kill(int /*signal*/) override - { - WaitFor(Executor_->StopContainer(ContainerDescriptor_)) - .ThrowOnError(); - } - - NNet::IConnectionWriterPtr GetStdInWriter() override - { - THROW_ERROR_EXCEPTION("Not implemented for CRI process"); - } - - NNet::IConnectionReaderPtr GetStdOutReader() override - { - THROW_ERROR_EXCEPTION("Not implemented for CRI process"); - } - - NNet::IConnectionReaderPtr GetStdErrReader() override - { - THROW_ERROR_EXCEPTION("Not implemented for CRI process"); - } - -private: - const ICriExecutorPtr Executor_; - const TCriContainerSpecPtr ContainerSpec_; - const TCriPodDescriptor PodDescriptor_; - const TCriPodSpecPtr PodSpec_; - const TDuration PollPeriod_; - - TCriDescriptor ContainerDescriptor_; - - TPeriodicExecutorPtr AsyncWaitExecutor_; - - void DoSpawn() override - { - if (ContainerSpec_->Command.empty()) { - ContainerSpec_->Command = {Path_}; - } - ContainerSpec_->Arguments = std::vector<TString>(Args_.begin() + 1, Args_.end()); - ContainerSpec_->WorkingDirectory = WorkingDirectory_; - - ContainerSpec_->BindMounts.emplace_back( - NCri::TCriBindMount { - .ContainerPath = WorkingDirectory_, - .HostPath = WorkingDirectory_, - .ReadOnly = false, - } - ); - - for (const auto& keyVal : Env_) { - TStringBuf key, val; - if (TStringBuf(keyVal).TrySplit('=', key, val)) { - ContainerSpec_->Environment[key] = val; - } - } - - ContainerDescriptor_ = WaitFor(Executor_->CreateContainer(ContainerSpec_, PodDescriptor_, PodSpec_)) - .ValueOrThrow(); - - YT_LOG_DEBUG("Spawning process (Command: %v, Container: %v)", ContainerSpec_->Command[0], ContainerDescriptor_); - WaitFor(Executor_->StartContainer(ContainerDescriptor_)) - .ThrowOnError(); - - // TODO(khkebnikov) replace polling with CRI event - AsyncWaitExecutor_ = New<TPeriodicExecutor>( - GetSyncInvoker(), - BIND(&TCriProcess::PollContainerStatus, MakeStrong(this)), - PollPeriod_); - - AsyncWaitExecutor_->Start(); - } - - void PollContainerStatus() - { - Executor_->GetContainerStatus(ContainerDescriptor_) - .SubscribeUnique(BIND(&TCriProcess::OnContainerStatus, MakeStrong(this))); - } - - void OnContainerStatus(TErrorOr<TCriRuntimeApi::TRspContainerStatusPtr>&& responseOrError) - { - auto response = responseOrError.ValueOrThrow(); - if (!response->has_status()) { - return; - } - auto status = response->status(); - if (status.state() == NProto::CONTAINER_EXITED) { - auto error = DecodeExitCode(status.exit_code(), status.reason()); - YT_LOG_DEBUG(error, "Process finished (Container: %v)", ContainerDescriptor_); - YT_UNUSED_FUTURE(AsyncWaitExecutor_->Stop()); - FinishedPromise_.TrySet(error); - } - } -}; - -DEFINE_REFCOUNTED_TYPE(TCriProcess) - -//////////////////////////////////////////////////////////////////////////////// - -class TCriExecutor - : public ICriExecutor -{ -public: - TCriExecutor( - TCriExecutorConfigPtr config, - IChannelFactoryPtr channelFactory) - : Config_(std::move(config)) - , RuntimeApi_(CreateRetryingChannel(Config_, channelFactory->CreateChannel(Config_->RuntimeEndpoint))) - , ImageApi_(CreateRetryingChannel(Config_, channelFactory->CreateChannel(Config_->ImageEndpoint))) - { } - - TString GetPodCgroup(TString podName) const override - { - TStringBuilder cgroup; - cgroup.AppendString(Config_->BaseCgroup); - cgroup.AppendString("/"); - cgroup.AppendString(podName); - if (Config_->BaseCgroup.EndsWith(SystemdSliceSuffix)) { - cgroup.AppendString(SystemdSliceSuffix); - } - return cgroup.Flush(); - } - - TFuture<TCriRuntimeApi::TRspStatusPtr> GetRuntimeStatus(bool verbose = false) override - { - auto req = RuntimeApi_.Status(); - req->set_verbose(verbose); - return req->Invoke(); - } - - TFuture<TCriRuntimeApi::TRspListPodSandboxPtr> ListPodSandbox( - std::function<void(NProto::PodSandboxFilter&)> initFilter = nullptr) override - { - auto req = RuntimeApi_.ListPodSandbox(); - - { - auto* filter = req->mutable_filter(); - - if (auto namespace_ = Config_->Namespace) { - auto& labels = *filter->mutable_label_selector(); - labels[YTPodNamespaceLabel] = namespace_; - } - - if (initFilter) { - initFilter(*filter); - } - } - - return req->Invoke(); - } - - TFuture<TCriRuntimeApi::TRspListContainersPtr> ListContainers( - std::function<void(NProto::ContainerFilter&)> initFilter = nullptr) override - { - auto req = RuntimeApi_.ListContainers(); - - { - auto* filter = req->mutable_filter(); - - if (auto namespace_ = Config_->Namespace) { - auto& labels = *filter->mutable_label_selector(); - labels[YTPodNamespaceLabel] = namespace_; - } - - if (initFilter) { - initFilter(*filter); - } - } - - return req->Invoke(); - } - - TFuture<void> ForEachPodSandbox( - const TCallback<void(const TCriPodDescriptor&, const NProto::PodSandbox&)>& callback, - std::function<void(NProto::PodSandboxFilter&)> initFilter) override - { - return ListPodSandbox(initFilter).Apply(BIND([=] (const TCriRuntimeApi::TRspListPodSandboxPtr& rsp) { - for (const auto& pod : rsp->items()) { - TCriPodDescriptor descriptor{.Name=pod.metadata().name(), .Id=pod.id()}; - callback(descriptor, pod); - } - })); - } - - TFuture<void> ForEachContainer( - const TCallback<void(const TCriDescriptor&, const NProto::Container&)>& callback, - std::function<void(NProto::ContainerFilter&)> initFilter = nullptr) override - { - return ListContainers(initFilter).Apply(BIND([=] (const TCriRuntimeApi::TRspListContainersPtr& rsp) { - for (const auto& ct : rsp->containers()) { - TCriDescriptor descriptor{.Name=ct.metadata().name(), .Id=ct.id()}; - callback(descriptor, ct); - } - })); - } - - TFuture<TCriRuntimeApi::TRspPodSandboxStatusPtr> GetPodSandboxStatus( - const TCriPodDescriptor& podDescriptor, bool verbose = false) override - { - auto req = RuntimeApi_.PodSandboxStatus(); - req->set_pod_sandbox_id(podDescriptor.Id); - req->set_verbose(verbose); - return req->Invoke(); - } - - TFuture<TCriRuntimeApi::TRspContainerStatusPtr> GetContainerStatus( - const TCriDescriptor& descriptor, bool verbose = false) override - { - auto req = RuntimeApi_.ContainerStatus(); - req->set_container_id(descriptor.Id); - req->set_verbose(verbose); - return req->Invoke(); - } - - TFuture<TCriPodDescriptor> RunPodSandbox(TCriPodSpecPtr podSpec) override - { - auto req = RuntimeApi_.RunPodSandbox(); - - FillPodSandboxConfig(req->mutable_config(), *podSpec); - - if (Config_->RuntimeHandler) { - req->set_runtime_handler(Config_->RuntimeHandler); - } - - return req->Invoke().Apply(BIND([name = podSpec->Name] (const TCriRuntimeApi::TRspRunPodSandboxPtr& rsp) -> TCriPodDescriptor { - return TCriPodDescriptor{.Name = name, .Id = rsp->pod_sandbox_id()}; - })); - } - - TFuture<void> StopPodSandbox(const TCriPodDescriptor& podDescriptor) override - { - auto req = RuntimeApi_.StopPodSandbox(); - req->set_pod_sandbox_id(podDescriptor.Id); - return req->Invoke().AsVoid(); - } - - TFuture<void> RemovePodSandbox(const TCriPodDescriptor& podDescriptor) override - { - auto req = RuntimeApi_.RemovePodSandbox(); - req->set_pod_sandbox_id(podDescriptor.Id); - return req->Invoke().AsVoid(); - } - - TFuture<void> UpdatePodResources( - const TCriPodDescriptor& /*pod*/, - const TCriContainerResources& /*resources*/) override - { - return MakeFuture(TError("Not implemented")); - } - - TFuture<TCriDescriptor> CreateContainer( - TCriContainerSpecPtr ctSpec, - const TCriPodDescriptor& podDescriptor, - TCriPodSpecPtr podSpec) override - { - auto req = RuntimeApi_.CreateContainer(); - req->set_pod_sandbox_id(podDescriptor.Id); - - auto* config = req->mutable_config(); - - { - auto* metadata = config->mutable_metadata(); - metadata->set_name(ctSpec->Name); - } - - { - auto& labels = *config->mutable_labels(); - - for (const auto& [key, val] : ctSpec->Labels) { - labels[key] = val; - } - - labels[YTPodNamespaceLabel] = Config_->Namespace; - labels[YTPodNameLabel] = podSpec->Name; - labels[YTContainerNameLabel] = ctSpec->Name; - } - - FillImageSpec(config->mutable_image(), ctSpec->Image); - - for (const auto& mountSpec : ctSpec->BindMounts) { - auto* mount = config->add_mounts(); - mount->set_container_path(mountSpec.ContainerPath); - mount->set_host_path(mountSpec.HostPath); - mount->set_readonly(mountSpec.ReadOnly); - mount->set_propagation(NProto::PROPAGATION_PRIVATE); - } - - { - ToProto(config->mutable_command(), ctSpec->Command); - ToProto(config->mutable_args(), ctSpec->Arguments); - - config->set_working_dir(ctSpec->WorkingDirectory); - - for (const auto& [key, val] : ctSpec->Environment) { - auto* env = config->add_envs(); - env->set_key(key); - env->set_value(val); - } - } - - { - auto* linux = config->mutable_linux(); - FillLinuxContainerResources(linux->mutable_resources(), ctSpec->Resources); - - auto* security = linux->mutable_security_context(); - - auto* namespaces = security->mutable_namespace_options(); - namespaces->set_network(NProto::NODE); - - security->set_readonly_rootfs(ctSpec->ReadOnlyRootFS); - - if (ctSpec->Credentials.Uid) { - security->mutable_run_as_user()->set_value(*ctSpec->Credentials.Uid); - } - if (ctSpec->Credentials.Gid) { - security->mutable_run_as_group()->set_value(*ctSpec->Credentials.Gid); - } - ToProto(security->mutable_supplemental_groups(), ctSpec->Credentials.Groups); - } - - FillPodSandboxConfig(req->mutable_sandbox_config(), *podSpec); - - return req->Invoke().Apply(BIND([name = ctSpec->Name] (const TCriRuntimeApi::TRspCreateContainerPtr& rsp) -> TCriDescriptor { - return TCriDescriptor{.Name = "", .Id = rsp->container_id()}; - })); - } - - TFuture<void> StartContainer(const TCriDescriptor& descriptor) override - { - auto req = RuntimeApi_.StartContainer(); - req->set_container_id(descriptor.Id); - return req->Invoke().AsVoid(); - } - - TFuture<void> StopContainer(const TCriDescriptor& descriptor, TDuration timeout) override - { - auto req = RuntimeApi_.StopContainer(); - req->set_container_id(descriptor.Id); - req->set_timeout(timeout.Seconds()); - return req->Invoke().AsVoid(); - } - - TFuture<void> RemoveContainer(const TCriDescriptor& descriptor) override - { - auto req = RuntimeApi_.RemoveContainer(); - req->set_container_id(descriptor.Id); - return req->Invoke().AsVoid(); - } - - TFuture<void> UpdateContainerResources(const TCriDescriptor& descriptor, const TCriContainerResources& resources) override - { - auto req = RuntimeApi_.UpdateContainerResources(); - req->set_container_id(descriptor.Id); - FillLinuxContainerResources(req->mutable_linux(), resources); - return req->Invoke().AsVoid(); - } - - void CleanNamespace() override - { - YT_VERIFY(Config_->Namespace); - auto pods = WaitFor(ListPodSandbox()) - .ValueOrThrow(); - - { - std::vector<TFuture<void>> futures; - futures.reserve(pods->items_size()); - for (const auto& pod : pods->items()) { - TCriPodDescriptor podDescriptor{.Name = pod.metadata().name(), .Id = pod.id() }; - futures.push_back(StopPodSandbox(podDescriptor)); - } - WaitFor(AllSucceeded(std::move(futures))) - .ThrowOnError(); - } - - { - std::vector<TFuture<void>> futures; - futures.reserve(pods->items_size()); - for (const auto& pod : pods->items()) { - TCriPodDescriptor podDescriptor{.Name = pod.metadata().name(), .Id = pod.id()}; - futures.push_back(RemovePodSandbox(podDescriptor)); - } - WaitFor(AllSucceeded(std::move(futures))) - .ThrowOnError(); - } - } - - void CleanPodSandbox(const TCriPodDescriptor& podDescriptor) override - { - auto containers = WaitFor(ListContainers([=] (NProto::ContainerFilter& filter) { - filter.set_pod_sandbox_id(podDescriptor.Id); - })) - .ValueOrThrow(); - - { - std::vector<TFuture<void>> futures; - futures.reserve(containers->containers_size()); - for (const auto& ct : containers->containers()) { - TCriDescriptor ctDescriptor{.Name = ct.metadata().name(), .Id = ct.id()}; - futures.push_back(StopContainer(ctDescriptor, TDuration::Zero())); - } - WaitFor(AllSucceeded(std::move(futures))) - .ThrowOnError(); - } - - { - std::vector<TFuture<void>> futures; - futures.reserve(containers->containers_size()); - for (const auto& ct : containers->containers()) { - TCriDescriptor ctDescriptor{.Name = ct.metadata().name(), .Id = ct.id()}; - futures.push_back(RemoveContainer(ctDescriptor)); - } - WaitFor(AllSucceeded(std::move(futures))) - .ThrowOnError(); - } - } - - TFuture<TCriImageApi::TRspListImagesPtr> ListImages( - std::function<void(NProto::ImageFilter&)> initFilter = nullptr) override - { - auto req = ImageApi_.ListImages(); - if (initFilter) { - initFilter(*req->mutable_filter()); - } - return req->Invoke(); - } - - TFuture<TCriImageApi::TRspImageStatusPtr> GetImageStatus( - const TCriImageDescriptor& image, - bool verbose = false) override - { - auto req = ImageApi_.ImageStatus(); - FillImageSpec(req->mutable_image(), image); - req->set_verbose(verbose); - return req->Invoke(); - } - - TFuture<TCriImageDescriptor> PullImage( - const TCriImageDescriptor& image, - bool always, - TCriAuthConfigPtr authConfig, - TCriPodSpecPtr podSpec) override - { - if (!always) { - return GetImageStatus(image) - .Apply(BIND([=, this, this_ = MakeStrong(this)] (const TCriImageApi::TRspImageStatusPtr& imageStatus) { - if (imageStatus->has_image()) { - return MakeFuture(TCriImageDescriptor{.Image = imageStatus->image().id()}); - } - return PullImage(image, /*always*/ true, authConfig, podSpec); - })); - } - - auto req = ImageApi_.PullImage(); - FillImageSpec(req->mutable_image(), image); - if (authConfig) { - FillAuthConfig(req->mutable_auth(), *authConfig); - } - if (podSpec) { - FillPodSandboxConfig(req->mutable_sandbox_config(), *podSpec); - } - return req->Invoke().Apply(BIND([] (const TCriImageApi::TRspPullImagePtr& rsp) -> TCriImageDescriptor { - return TCriImageDescriptor{.Image = rsp->image_ref()}; - })); - } - - TFuture<void> RemoveImage(const TCriImageDescriptor& image) override - { - auto req = ImageApi_.RemoveImage(); - FillImageSpec(req->mutable_image(), image); - return req->Invoke().AsVoid(); - } - - TProcessBasePtr CreateProcess( - const TString& path, - TCriContainerSpecPtr containerSpec, - const TCriPodDescriptor& podDescriptor, - TCriPodSpecPtr podSpec) override - { - return New<TCriProcess>(path, this, std::move(containerSpec), podDescriptor, std::move(podSpec)); - } - -private: - const TCriExecutorConfigPtr Config_; - TCriRuntimeApi RuntimeApi_; - TCriImageApi ImageApi_; - - void FillLinuxContainerResources(NProto::LinuxContainerResources* resources, const TCriContainerResources& spec) - { - auto* unified = resources->mutable_unified(); - - if (spec.CpuLimit) { - i64 period = Config_->CpuPeriod.MicroSeconds(); - i64 quota = period * *spec.CpuLimit; - - resources->set_cpu_period(period); - resources->set_cpu_quota(quota); - } - - if (spec.MemoryLimit) { - resources->set_memory_limit_in_bytes(*spec.MemoryLimit); - } - - if (spec.MemoryRequest) { - (*unified)["memory.low"] = ToString(*spec.MemoryRequest); - } - } - - void FillPodSandboxConfig(NProto::PodSandboxConfig* config, const TCriPodSpec& spec) - { - { - auto* metadata = config->mutable_metadata(); - metadata->set_namespace_(Config_->Namespace); - metadata->set_name(spec.Name); - metadata->set_uid(spec.Name); - } - - { - auto& labels = *config->mutable_labels(); - labels[YTPodNamespaceLabel] = Config_->Namespace; - labels[YTPodNameLabel] = spec.Name; - } - - { - auto* linux = config->mutable_linux(); - linux->set_cgroup_parent(GetPodCgroup(spec.Name)); - - auto* security = linux->mutable_security_context(); - auto* namespaces = security->mutable_namespace_options(); - namespaces->set_network(NProto::NODE); - } - } - - void FillImageSpec(NProto::ImageSpec* spec, const TCriImageDescriptor& image) - { - spec->set_image(image.Image); - } - - void FillAuthConfig(NProto::AuthConfig* auth, const TCriAuthConfig& authConfig) - { - if (!authConfig.Username.empty()) { - auth->set_username(authConfig.Username); - } - if (!authConfig.Password.empty()) { - auth->set_password(authConfig.Password); - } - if (!authConfig.Auth.empty()) { - auth->set_auth(authConfig.Auth); - } - if (!authConfig.ServerAddress.empty()) { - auth->set_server_address(authConfig.ServerAddress); - } - if (!authConfig.IdentityToken.empty()) { - auth->set_identity_token(authConfig.IdentityToken); - } - if (!authConfig.RegistryToken.empty()) { - auth->set_registry_token(authConfig.RegistryToken); - } - } -}; - -//////////////////////////////////////////////////////////////////////////////// - -ICriExecutorPtr CreateCriExecutor(TCriExecutorConfigPtr config) -{ - return New<TCriExecutor>( - std::move(config), - GetGrpcChannelFactory()); -} - -//////////////////////////////////////////////////////////////////////////////// - -} // namespace NYT::NContainers::NCri diff --git a/yt/yt/library/containers/cri/cri_executor.h b/yt/yt/library/containers/cri/cri_executor.h deleted file mode 100644 index de9741721f..0000000000 --- a/yt/yt/library/containers/cri/cri_executor.h +++ /dev/null @@ -1,207 +0,0 @@ -#pragma once - -#include "public.h" -#include "config.h" -#include "cri_api.h" - -#include <yt/yt/library/process/process.h> - -#include <yt/yt/core/ytree/yson_struct.h> - -namespace NYT::NContainers::NCri { - -//////////////////////////////////////////////////////////////////////////////// - -struct TCriDescriptor -{ - TString Name; - TString Id; -}; - -struct TCriPodDescriptor -{ - TString Name; - TString Id; -}; - -struct TCriImageDescriptor -{ - TString Image; -}; - -void FormatValue(TStringBuilderBase* builder, const TCriDescriptor& descriptor, TStringBuf spec); -void FormatValue(TStringBuilderBase* builder, const TCriPodDescriptor& descriptor, TStringBuf spec); -void FormatValue(TStringBuilderBase* builder, const TCriImageDescriptor& descriptor, TStringBuf spec); - -//////////////////////////////////////////////////////////////////////////////// - -struct TCriContainerResources -{ - std::optional<double> CpuLimit; - std::optional<double> CpuRequest; - std::optional<i64> MemoryLimit; - std::optional<i64> MemoryRequest; -}; - -struct TCriPodSpec - : public TRefCounted -{ - TString Name; - TCriContainerResources Resources; -}; - -DEFINE_REFCOUNTED_TYPE(TCriPodSpec) - -struct TCriBindMount -{ - TString ContainerPath; - TString HostPath; - bool ReadOnly; -}; - -struct TCriCredentials -{ - std::optional<i64> Uid; - std::optional<i64> Gid; - std::vector<i64> Groups; -}; - -struct TCriContainerSpec - : public TRefCounted -{ - TString Name; - - THashMap<TString, TString> Labels; - - TCriImageDescriptor Image; - - bool ReadOnlyRootFS; - - std::vector<TCriBindMount> BindMounts; - - TCriCredentials Credentials; - - TCriContainerResources Resources; - - //! Command to execute (i.e., entrypoint for docker). - std::vector<TString> Command; - - //! Arguments for the Command (i.e., command for docker). - std::vector<TString> Arguments; - - //! Current working directory of the command. - TString WorkingDirectory; - - //! Environment variable to set in the container. - THashMap<TString, TString> Environment; -}; - -DEFINE_REFCOUNTED_TYPE(TCriContainerSpec) - -//////////////////////////////////////////////////////////////////////////////// - -//! Wrapper around CRI gRPC API -//! -//! @see yt/yt/contrib/cri-api/k8s.io/cri-api/pkg/apis/runtime/v1/api.proto -//! @see https://github.com/kubernetes/cri-api -struct ICriExecutor - : public TRefCounted -{ - //! Returns status of the CRI runtime. - //! @param verbose fill field "info" with runtime-specific debug. - virtual TFuture<TCriRuntimeApi::TRspStatusPtr> GetRuntimeStatus(bool verbose = false) = 0; - - // PodSandbox - - virtual TString GetPodCgroup(TString podName) const = 0; - - virtual TFuture<TCriRuntimeApi::TRspListPodSandboxPtr> ListPodSandbox( - std::function<void(NProto::PodSandboxFilter&)> initFilter = nullptr) = 0; - - virtual TFuture<TCriRuntimeApi::TRspListContainersPtr> ListContainers( - std::function<void(NProto::ContainerFilter&)> initFilter = nullptr) = 0; - - virtual TFuture<void> ForEachPodSandbox( - const TCallback<void(const TCriPodDescriptor&, const NProto::PodSandbox&)>& callback, - std::function<void(NProto::PodSandboxFilter&)> initFilter = nullptr) = 0; - - virtual TFuture<void> ForEachContainer( - const TCallback<void(const TCriDescriptor&, const NProto::Container&)>& callback, - std::function<void(NProto::ContainerFilter&)> initFilter = nullptr) = 0; - - //! Returns status of the pod. - //! @param verbose fill field "info" with runtime-specific debug. - virtual TFuture<TCriRuntimeApi::TRspPodSandboxStatusPtr> GetPodSandboxStatus( - const TCriPodDescriptor& pod, bool verbose = false) = 0; - - //! Returns status of the container. - //! @param verbose fill "info" with runtime-specific debug information. - virtual TFuture<TCriRuntimeApi::TRspContainerStatusPtr> GetContainerStatus( - const TCriDescriptor& ct, bool verbose = false) = 0; - - virtual TFuture<TCriPodDescriptor> RunPodSandbox(TCriPodSpecPtr podSpec) = 0; - virtual TFuture<void> StopPodSandbox(const TCriPodDescriptor& pod) = 0; - virtual TFuture<void> RemovePodSandbox(const TCriPodDescriptor& pod) = 0; - virtual TFuture<void> UpdatePodResources( - const TCriPodDescriptor& pod, - const TCriContainerResources& resources) = 0; - - //! Remove all pods and containers in namespace managed by executor. - virtual void CleanNamespace() = 0; - - //! Remove all containers in one pod. - virtual void CleanPodSandbox(const TCriPodDescriptor& pod) = 0; - - virtual TFuture<TCriDescriptor> CreateContainer( - TCriContainerSpecPtr containerSpec, - const TCriPodDescriptor& pod, - TCriPodSpecPtr podSpec) = 0; - - virtual TFuture<void> StartContainer(const TCriDescriptor& ct) = 0; - - //! Stops container if it's running. - //! @param timeout defines timeout for graceful stop, timeout=0 - kill instantly. - virtual TFuture<void> StopContainer( - const TCriDescriptor& ct, - TDuration timeout = TDuration::Zero()) = 0; - - virtual TFuture<void> RemoveContainer(const TCriDescriptor& ct) = 0; - - virtual TFuture<void> UpdateContainerResources( - const TCriDescriptor& ct, - const TCriContainerResources& resources) = 0; - - virtual TFuture<TCriImageApi::TRspListImagesPtr> ListImages( - std::function<void(NProto::ImageFilter&)> initFilter = nullptr) = 0; - - //! Returns status of the image. - //! @param verbose fill field "info" with runtime-specific debug. - virtual TFuture<TCriImageApi::TRspImageStatusPtr> GetImageStatus( - const TCriImageDescriptor& image, - bool verbose = false) = 0; - - virtual TFuture<TCriImageDescriptor> PullImage( - const TCriImageDescriptor& image, - bool always = false, - TCriAuthConfigPtr authConfig = nullptr, - TCriPodSpecPtr podSpec = nullptr) = 0; - - virtual TFuture<void> RemoveImage(const TCriImageDescriptor& image) = 0; - - // FIXME(khlebnikov): temporary compat - virtual TProcessBasePtr CreateProcess( - const TString& path, - TCriContainerSpecPtr containerSpec, - const TCriPodDescriptor& pod, - TCriPodSpecPtr podSpec) = 0; -}; - -DEFINE_REFCOUNTED_TYPE(ICriExecutor) - -//////////////////////////////////////////////////////////////////////////////// - -ICriExecutorPtr CreateCriExecutor(TCriExecutorConfigPtr config); - -//////////////////////////////////////////////////////////////////////////////// - -} // namespace NYT::NContainers::NCri diff --git a/yt/yt/library/containers/cri/private.h b/yt/yt/library/containers/cri/private.h deleted file mode 100644 index 36fdf194f5..0000000000 --- a/yt/yt/library/containers/cri/private.h +++ /dev/null @@ -1,13 +0,0 @@ -#pragma once - -#include <yt/yt/core/logging/log.h> - -namespace NYT::NContainers::NCri { - -//////////////////////////////////////////////////////////////////////////////// - -inline const NLogging::TLogger Logger("Cri"); - -//////////////////////////////////////////////////////////////////////////////// - -} // namespace NYT::NContainers::NCri diff --git a/yt/yt/library/containers/cri/public.h b/yt/yt/library/containers/cri/public.h deleted file mode 100644 index a12ee86d57..0000000000 --- a/yt/yt/library/containers/cri/public.h +++ /dev/null @@ -1,17 +0,0 @@ -#pragma once - -#include <yt/yt/core/misc/intrusive_ptr.h> - -namespace NYT::NContainers::NCri { - -//////////////////////////////////////////////////////////////////////////////// - -DECLARE_REFCOUNTED_STRUCT(TCriPodSpec) -DECLARE_REFCOUNTED_STRUCT(TCriContainerSpec) -DECLARE_REFCOUNTED_CLASS(TCriExecutorConfig) -DECLARE_REFCOUNTED_CLASS(TCriAuthConfig) -DECLARE_REFCOUNTED_STRUCT(ICriExecutor) - -//////////////////////////////////////////////////////////////////////////////// - -} // namespace NYT::NContainers::NCri diff --git a/yt/yt/library/containers/cri/ya.make b/yt/yt/library/containers/cri/ya.make deleted file mode 100644 index dc9dd15a0b..0000000000 --- a/yt/yt/library/containers/cri/ya.make +++ /dev/null @@ -1,22 +0,0 @@ -LIBRARY() - -INCLUDE(${ARCADIA_ROOT}/yt/ya_cpp.make.inc) - -PEERDIR( - yt/yt/core - yt/yt/core/rpc/grpc - yt/yt/contrib/cri-api -) - -SRCS( - cri_api.cpp - cri_executor.cpp - config.cpp -) - -ADDINCL( - ONE_LEVEL - yt/yt/contrib/cri-api -) - -END() diff --git a/yt/yt/library/containers/disk_manager/config.cpp b/yt/yt/library/containers/disk_manager/config.cpp deleted file mode 100644 index 84484db630..0000000000 --- a/yt/yt/library/containers/disk_manager/config.cpp +++ /dev/null @@ -1,61 +0,0 @@ -#include "config.h" - -namespace NYT::NContainers { - -//////////////////////////////////////////////////////////////////////////////// - -void TMockedDiskConfig::Register(TRegistrar registrar) -{ - registrar.Parameter("disk_id", &TThis::DiskId) - .Default(); - registrar.Parameter("device_path", &TThis::DevicePath) - .Default(); - registrar.Parameter("device_name", &TThis::DeviceName) - .Default(); - registrar.Parameter("disk_model", &TThis::DiskModel) - .Default(); - registrar.Parameter("partition_fs_labels", &TThis::PartitionFsLabels) - .Default(); - registrar.Parameter("state", &TThis::State) - .Default(EDiskState::Ok); -} - -//////////////////////////////////////////////////////////////////////////////// - -void TDiskInfoProviderConfig::Register(TRegistrar registrar) -{ - registrar.Parameter("disk_ids", &TThis::DiskIds) - .Default(); -} - -//////////////////////////////////////////////////////////////////////////////// - -void TDiskManagerProxyConfig::Register(TRegistrar registrar) -{ - registrar.Parameter("disk_manager_address", &TThis::DiskManagerAddress) - .Default("unix:/run/yandex-diskmanager/yandex-diskmanager.sock"); - registrar.Parameter("disk_manager_service_name", &TThis::DiskManagerServiceName) - .Default("diskman.DiskManager"); - - registrar.Parameter("is_mock", &TThis::IsMock) - .Default(false); - registrar.Parameter("mock_disks", &TThis::MockDisks) - .Default(); - registrar.Parameter("mock_yt_paths", &TThis::MockYtPaths) - .Default(); - - registrar.Parameter("request_timeout", &TThis::RequestTimeout) - .Default(TDuration::Seconds(10)); -} - -//////////////////////////////////////////////////////////////////////////////// - -void TDiskManagerProxyDynamicConfig::Register(TRegistrar registrar) -{ - registrar.Parameter("request_timeout", &TThis::RequestTimeout) - .Default(); -} - -//////////////////////////////////////////////////////////////////////////////// - -} // namespace NYT::NContainers diff --git a/yt/yt/library/containers/disk_manager/config.h b/yt/yt/library/containers/disk_manager/config.h deleted file mode 100644 index 4f01d378b9..0000000000 --- a/yt/yt/library/containers/disk_manager/config.h +++ /dev/null @@ -1,79 +0,0 @@ -#pragma once - -#include "public.h" - -#include <yt/yt/core/ytree/yson_struct.h> - -namespace NYT::NContainers { - -//////////////////////////////////////////////////////////////////////////////// - -struct TMockedDiskConfig - : public NYTree::TYsonStruct -{ - TString DiskId; - TString DevicePath; - TString DeviceName; - TString DiskModel; - std::vector<TString> PartitionFsLabels; - EDiskState State; - - REGISTER_YSON_STRUCT(TMockedDiskConfig); - - static void Register(TRegistrar registrar); -}; - -DEFINE_REFCOUNTED_TYPE(TMockedDiskConfig) - -//////////////////////////////////////////////////////////////////////////////// - -struct TDiskManagerProxyConfig - : public NYTree::TYsonStruct -{ - TString DiskManagerAddress; - TString DiskManagerServiceName; - - bool IsMock; - std::vector<TMockedDiskConfigPtr> MockDisks; - std::vector<TString> MockYtPaths; - - TDuration RequestTimeout; - - REGISTER_YSON_STRUCT(TDiskManagerProxyConfig); - - static void Register(TRegistrar registrar); -}; - -DEFINE_REFCOUNTED_TYPE(TDiskManagerProxyConfig) - -//////////////////////////////////////////////////////////////////////////////// - -struct TDiskInfoProviderConfig - : public NYTree::TYsonStruct -{ - std::vector<TString> DiskIds; - - REGISTER_YSON_STRUCT(TDiskInfoProviderConfig); - - static void Register(TRegistrar registrar); -}; - -DEFINE_REFCOUNTED_TYPE(TDiskInfoProviderConfig) - -//////////////////////////////////////////////////////////////////////////////// - -struct TDiskManagerProxyDynamicConfig - : public NYTree::TYsonStruct -{ - std::optional<TDuration> RequestTimeout; - - REGISTER_YSON_STRUCT(TDiskManagerProxyDynamicConfig); - - static void Register(TRegistrar registrar); -}; - -DEFINE_REFCOUNTED_TYPE(TDiskManagerProxyDynamicConfig) - -//////////////////////////////////////////////////////////////////////////////// - -} // namespace NYT::NContainers diff --git a/yt/yt/library/containers/disk_manager/disk_info_provider.cpp b/yt/yt/library/containers/disk_manager/disk_info_provider.cpp deleted file mode 100644 index 0ee3a5b6cb..0000000000 --- a/yt/yt/library/containers/disk_manager/disk_info_provider.cpp +++ /dev/null @@ -1,64 +0,0 @@ -#include "disk_info_provider.h" - -#include <yt/yt/library/containers/disk_manager/disk_manager_proxy.h> - -#include <yt/yt/core/actions/future.h> -#include <yt/yt/core/actions/invoker_util.h> - -#include <yt/yt/core/concurrency/public.h> - -namespace NYT::NContainers { - -//////////////////////////////////////////////////////////////////////////////// - -TDiskInfoProvider::TDiskInfoProvider( - IDiskManagerProxyPtr diskManagerProxy, - TDiskInfoProviderConfigPtr config) - : DiskManagerProxy_(std::move(diskManagerProxy)) - , Config_(std::move(config)) -{ } - -const std::vector<TString>& TDiskInfoProvider::GetConfigDiskIds() const -{ - return Config_->DiskIds; -} - -TFuture<std::vector<TDiskInfo>> TDiskInfoProvider::GetYTDiskInfos() -{ - auto diskInfosFuture = DiskManagerProxy_->GetDisks(); - auto ytDiskPathsFuture = DiskManagerProxy_->GetYtDiskMountPaths(); - - // Merge two futures and filter disks placed in /yt. - return diskInfosFuture.Apply(BIND([=] (const std::vector<TDiskInfo>& diskInfos) { - return ytDiskPathsFuture.Apply(BIND([=] (const THashSet<TString>& diskPaths) { - std::vector<TDiskInfo> disks; - - for (const auto& diskInfo : diskInfos) { - for (const auto& partitionFsLabel : diskInfo.PartitionFsLabels) { - if (diskPaths.contains(partitionFsLabel)) { - disks.push_back(diskInfo); - break; - } - } - } - - return disks; - })); - })); -} - -TFuture<void> TDiskInfoProvider::RecoverDisk(const TString& diskId) -{ - return DiskManagerProxy_->RecoverDiskById(diskId, ERecoverPolicy::RecoverAuto); -} - -TFuture<void> TDiskInfoProvider::FailDisk( - const TString& diskId, - const TString& reason) -{ - return DiskManagerProxy_->FailDiskById(diskId, reason); -} - -//////////////////////////////////////////////////////////////////////////////// - -} // namespace NYT::NContainers diff --git a/yt/yt/library/containers/disk_manager/disk_info_provider.h b/yt/yt/library/containers/disk_manager/disk_info_provider.h deleted file mode 100644 index b8d686438d..0000000000 --- a/yt/yt/library/containers/disk_manager/disk_info_provider.h +++ /dev/null @@ -1,38 +0,0 @@ -#pragma once - -#include "public.h" - -#include <yt/yt/core/actions/future.h> - -namespace NYT::NContainers { - -//////////////////////////////////////////////////////////////////////////////// - -class TDiskInfoProvider - : public TRefCounted -{ -public: - TDiskInfoProvider( - IDiskManagerProxyPtr diskManagerProxy, - TDiskInfoProviderConfigPtr config); - - const std::vector<TString>& GetConfigDiskIds() const; - - TFuture<std::vector<TDiskInfo>> GetYTDiskInfos(); - - TFuture<void> RecoverDisk(const TString& diskId); - - TFuture<void> FailDisk( - const TString& diskId, - const TString& reason); - -private: - const IDiskManagerProxyPtr DiskManagerProxy_; - const TDiskInfoProviderConfigPtr Config_; -}; - -DEFINE_REFCOUNTED_TYPE(TDiskInfoProvider) - -//////////////////////////////////////////////////////////////////////////////// - -} // namespace NYT::NContainers diff --git a/yt/yt/library/containers/disk_manager/disk_manager_proxy.cpp b/yt/yt/library/containers/disk_manager/disk_manager_proxy.cpp deleted file mode 100644 index 961723c51f..0000000000 --- a/yt/yt/library/containers/disk_manager/disk_manager_proxy.cpp +++ /dev/null @@ -1,49 +0,0 @@ -#include "disk_manager_proxy.h" - -namespace NYT::NContainers { - -//////////////////////////////////////////////////////////////////////////////// - -struct TDiskManagerProxyMock - : public IDiskManagerProxy -{ - virtual TFuture<THashSet<TString>> GetYtDiskMountPaths() - { - THROW_ERROR_EXCEPTION("Disk manager library is not available under this build configuration"); - } - - virtual TFuture<std::vector<TDiskInfo>> GetDisks() - { - THROW_ERROR_EXCEPTION("Disk manager library is not available under this build configuration"); - } - - virtual TFuture<void> RecoverDiskById(const TString& /*diskId*/, ERecoverPolicy /*recoverPolicy*/) - { - THROW_ERROR_EXCEPTION("Disk manager library is not available under this build configuration"); - } - - virtual TFuture<void> FailDiskById(const TString& /*diskId*/, const TString& /*reason*/) - { - THROW_ERROR_EXCEPTION("Disk manager library is not available under this build configuration"); - } - - virtual void OnDynamicConfigChanged(const TDiskManagerProxyDynamicConfigPtr& /*newConfig*/) - { - // Do nothing - } -}; - -DEFINE_REFCOUNTED_TYPE(TDiskManagerProxyMock) - -//////////////////////////////////////////////////////////////////////////////// - -Y_WEAK IDiskManagerProxyPtr CreateDiskManagerProxy(TDiskManagerProxyConfigPtr /*config*/) -{ - // This implementation is used when disk_manager_proxy_impl.cpp is not linked. - - return New<TDiskManagerProxyMock>(); -} - -//////////////////////////////////////////////////////////////////////////////// - -} // namespace NYT::NContainers diff --git a/yt/yt/library/containers/disk_manager/disk_manager_proxy.h b/yt/yt/library/containers/disk_manager/disk_manager_proxy.h deleted file mode 100644 index d2da5c1873..0000000000 --- a/yt/yt/library/containers/disk_manager/disk_manager_proxy.h +++ /dev/null @@ -1,38 +0,0 @@ -#pragma once - -#include "public.h" - -#include <yt/yt/library/containers/disk_manager/config.h> - -#include <yt/yt/core/misc/atomic_object.h> - -#include <yt/yt/core/rpc/client.h> - -namespace NYT::NContainers { - -//////////////////////////////////////////////////////////////////////////////// - -struct IDiskManagerProxy - : public virtual TRefCounted -{ - virtual TFuture<THashSet<TString>> GetYtDiskMountPaths() = 0; - - virtual TFuture<std::vector<TDiskInfo>> GetDisks() = 0; - - virtual TFuture<void> RecoverDiskById(const TString& diskId, ERecoverPolicy recoverPolicy) = 0; - - virtual TFuture<void> FailDiskById(const TString& diskId, const TString& reason) = 0; - - virtual void OnDynamicConfigChanged(const TDiskManagerProxyDynamicConfigPtr& newConfig) = 0; - -}; - -DEFINE_REFCOUNTED_TYPE(IDiskManagerProxy) - -//////////////////////////////////////////////////////////////////////////////// - -IDiskManagerProxyPtr CreateDiskManagerProxy(TDiskManagerProxyConfigPtr config); - -//////////////////////////////////////////////////////////////////////////////// - -} // namespace NYT::NContainers diff --git a/yt/yt/library/containers/disk_manager/public.h b/yt/yt/library/containers/disk_manager/public.h deleted file mode 100644 index 8a812638f3..0000000000 --- a/yt/yt/library/containers/disk_manager/public.h +++ /dev/null @@ -1,48 +0,0 @@ -#pragma once - -#include <yt/yt/core/misc/public.h> - -namespace NYT::NContainers { - -//////////////////////////////////////////////////////////////////////////////// - -DEFINE_ENUM(EDiskState, - ((Unknown) (0)) - ((Ok) (1)) - ((Failed) (2)) - ((RecoverWait) (3)) -); - -// 1. Remount all disk volumes to it's default state -// 2. Recreate disk layout, all data on disk will be lost -// 3. Replace phisical disk -DEFINE_ENUM(ERecoverPolicy, - ((RecoverAuto) (0)) - ((RecoverMount) (1)) - ((RecoverLayout) (2)) - ((RecoverDisk) (3)) -); - -struct TDiskInfo -{ - TString DiskId; - TString DevicePath; - TString DeviceName; - TString DiskModel; - THashSet<TString> PartitionFsLabels; - EDiskState State; -}; - -//////////////////////////////////////////////////////////////////////////////// - -DECLARE_REFCOUNTED_STRUCT(TMockedDiskConfig) -DECLARE_REFCOUNTED_STRUCT(TDiskManagerProxyConfig) -DECLARE_REFCOUNTED_STRUCT(TDiskManagerProxyDynamicConfig) -DECLARE_REFCOUNTED_STRUCT(TDiskInfoProviderConfig) - -DECLARE_REFCOUNTED_STRUCT(IDiskManagerProxy) -DECLARE_REFCOUNTED_CLASS(TDiskInfoProvider) - -//////////////////////////////////////////////////////////////////////////////// - -} // namespace NYT::NContainers diff --git a/yt/yt/library/containers/disk_manager/ya.make b/yt/yt/library/containers/disk_manager/ya.make deleted file mode 100644 index dcb260cf38..0000000000 --- a/yt/yt/library/containers/disk_manager/ya.make +++ /dev/null @@ -1,19 +0,0 @@ -LIBRARY() - -INCLUDE(${ARCADIA_ROOT}/yt/ya_cpp.make.inc) - -PEERDIR( - yt/yt/core -) - -SRCS( - config.cpp - disk_info_provider.cpp - disk_manager_proxy.cpp -) - -IF (NOT OPENSOURCE) - INCLUDE(ya_non_opensource.inc) -ENDIF() - -END() diff --git a/yt/yt/library/containers/instance.cpp b/yt/yt/library/containers/instance.cpp deleted file mode 100644 index 0a56987e1b..0000000000 --- a/yt/yt/library/containers/instance.cpp +++ /dev/null @@ -1,812 +0,0 @@ -#ifdef __linux__ - -#include "instance.h" - -#include "porto_executor.h" -#include "private.h" - -#include <yt/yt/library/containers/cgroup.h> -#include <yt/yt/library/containers/config.h> - -#include <yt/yt/core/concurrency/scheduler.h> - -#include <yt/yt/core/logging/log.h> - -#include <yt/yt/core/misc/collection_helpers.h> -#include <yt/yt/core/misc/error.h> -#include <yt/yt/core/misc/fs.h> -#include <yt/yt/core/misc/proc.h> - -#include <library/cpp/porto/libporto.hpp> - -#include <util/stream/file.h> - -#include <util/string/cast.h> -#include <util/string/split.h> - -#include <util/system/env.h> - -#include <initializer_list> -#include <string> - -namespace NYT::NContainers { - -using namespace NConcurrency; -using namespace NNet; - -//////////////////////////////////////////////////////////////////////////////// - -namespace NDetail { - -// Porto passes command string to wordexp, where quota (') symbol -// is delimiter. So we must replace it with concatenation ('"'"'). -TString EscapeForWordexp(const char* in) -{ - TString buffer; - while (*in) { - if (*in == '\'') { - buffer.append(R"('"'"')"); - } else { - buffer.append(*in); - } - in++; - } - return buffer; -} - -i64 Extract( - const TString& input, - const TString& pattern, - const TString& terminator = "\n") -{ - auto start = input.find(pattern) + pattern.length(); - auto end = input.find(terminator, start); - return std::stol(input.substr(start, (end == input.npos) ? end : end - start)); -} - -i64 ExtractSum( - const TString& input, - const TString& pattern, - const TString& delimiter, - const TString& terminator = "\n") -{ - i64 sum = 0; - TString::size_type pos = 0; - while (pos < input.length()) { - pos = input.find(pattern, pos); - if (pos == input.npos) { - break; - } - pos += pattern.length(); - - pos = input.find(delimiter, pos); - if (pos == input.npos) { - break; - } - - pos++; - auto end = input.find(terminator, pos); - sum += std::stol(input.substr(pos, (end == input.npos) ? end : end - pos)); - } - return sum; -} - -using TPortoStatRule = std::pair<TString, std::function<i64(const TString& input)>>; - -static const std::function<i64(const TString&)> LongExtractor = [] (const TString& in) { - return std::stol(in); -}; - -static const std::function<i64(const TString&)> CoreNsPerSecondExtractor = [] (const TString& in) { - int pos = in.find("c", 0); - return (std::stod(in.substr(0, pos))) * 1'000'000'000; -}; - -static const std::function<i64(const TString&)> GetIOStatExtractor(const TString& rwMode = "") -{ - return [rwMode] (const TString& in) { - return ExtractSum(in, "hw", rwMode + ":", ";"); - }; -} - -static const std::function<i64(const TString&)> GetStatByKeyExtractor(const TString& statKey) -{ - return [statKey] (const TString& in) { - return Extract(in, statKey); - }; -} - -const THashMap<EStatField, TPortoStatRule> PortoStatRules = { - {EStatField::CpuUsage, {"cpu_usage", LongExtractor}}, - {EStatField::CpuSystemUsage, {"cpu_usage_system", LongExtractor}}, - {EStatField::CpuWait, {"cpu_wait", LongExtractor}}, - {EStatField::CpuThrottled, {"cpu_throttled", LongExtractor}}, - {EStatField::ThreadCount, {"thread_count", LongExtractor}}, - {EStatField::CpuLimit, {"cpu_limit_bound", CoreNsPerSecondExtractor}}, - {EStatField::CpuGuarantee, {"cpu_guarantee_bound", CoreNsPerSecondExtractor}}, - {EStatField::Rss, {"memory.stat", GetStatByKeyExtractor("total_rss")}}, - {EStatField::MappedFile, {"memory.stat", GetStatByKeyExtractor("total_mapped_file")}}, - {EStatField::MinorPageFaults, {"minor_faults", LongExtractor}}, - {EStatField::MajorPageFaults, {"major_faults", LongExtractor}}, - {EStatField::FileCacheUsage, {"cache_usage", LongExtractor}}, - {EStatField::AnonMemoryUsage, {"anon_usage", LongExtractor}}, - {EStatField::AnonMemoryLimit, {"anon_limit_total", LongExtractor}}, - {EStatField::MemoryUsage, {"memory_usage", LongExtractor}}, - {EStatField::MemoryGuarantee, {"memory_guarantee", LongExtractor}}, - {EStatField::MemoryLimit, {"memory_limit_total", LongExtractor}}, - {EStatField::MaxMemoryUsage, {"memory.max_usage_in_bytes", LongExtractor}}, - {EStatField::OomKills, {"oom_kills", LongExtractor}}, - {EStatField::OomKillsTotal, {"oom_kills_total", LongExtractor}}, - - {EStatField::IOReadByte, {"io_read", GetIOStatExtractor()}}, - {EStatField::IOWriteByte, {"io_write", GetIOStatExtractor()}}, - {EStatField::IOBytesLimit, {"io_limit", GetIOStatExtractor()}}, - {EStatField::IOReadOps, {"io_read_ops", GetIOStatExtractor()}}, - {EStatField::IOWriteOps, {"io_write_ops", GetIOStatExtractor()}}, - {EStatField::IOOps, {"io_ops", GetIOStatExtractor()}}, - {EStatField::IOOpsLimit, {"io_ops_limit", GetIOStatExtractor()}}, - {EStatField::IOTotalTime, {"io_time", GetIOStatExtractor()}}, - {EStatField::IOWaitTime, {"io_wait", GetIOStatExtractor()}}, - - {EStatField::NetTxBytes, {"net_tx_bytes[veth]", LongExtractor}}, - {EStatField::NetTxPackets, {"net_tx_packets[veth]", LongExtractor}}, - {EStatField::NetTxDrops, {"net_tx_drops[veth]", LongExtractor}}, - {EStatField::NetTxLimit, {"net_limit[veth]", LongExtractor}}, - {EStatField::NetRxBytes, {"net_rx_bytes[veth]", LongExtractor}}, - {EStatField::NetRxPackets, {"net_rx_packets[veth]", LongExtractor}}, - {EStatField::NetRxDrops, {"net_rx_drops[veth]", LongExtractor}}, - {EStatField::NetRxLimit, {"net_rx_limit[veth]", LongExtractor}}, -}; - -std::optional<TString> GetParentName(const TString& name) -{ - if (name.empty()) { - return std::nullopt; - } - - auto slashPosition = name.rfind('/'); - if (slashPosition == TString::npos) { - return ""; - } - - return name.substr(0, slashPosition); -} - -std::optional<TString> GetRootName(const TString& name) -{ - if (name.empty()) { - return std::nullopt; - } - - if (name == "/") { - return name; - } - - auto slashPosition = name.find('/'); - if (slashPosition == TString::npos) { - return name; - } - - return name.substr(0, slashPosition); -} - -} // namespace NDetail - -//////////////////////////////////////////////////////////////////////////////// - -class TPortoInstanceLauncher - : public IInstanceLauncher -{ -public: - TPortoInstanceLauncher(const TString& name, IPortoExecutorPtr executor) - : Executor_(std::move(executor)) - , Logger(ContainersLogger.WithTag("Container: %v", name)) - { - Spec_.Name = name; - Spec_.CGroupControllers = { - "freezer", - "cpu", - "cpuacct", - "net_cls", - "blkio", - "devices", - "pids" - }; - } - - const TString& GetName() const override - { - return Spec_.Name; - } - - bool HasRoot() const override - { - return static_cast<bool>(Spec_.RootFS); - } - - void SetStdIn(const TString& inputPath) override - { - Spec_.StdinPath = inputPath; - } - - void SetStdOut(const TString& outPath) override - { - Spec_.StdoutPath = outPath; - } - - void SetStdErr(const TString& errorPath) override - { - Spec_.StderrPath = errorPath; - } - - void SetCwd(const TString& pwd) override - { - Spec_.CurrentWorkingDirectory = pwd; - } - - void SetCoreDumpHandler(const std::optional<TString>& handler) override - { - if (handler) { - Spec_.CoreCommand = *handler; - Spec_.EnableCoreDumps = true; - } else { - Spec_.EnableCoreDumps = false; - } - } - - void SetRoot(const TRootFS& rootFS) override - { - Spec_.RootFS = rootFS; - } - - void SetThreadLimit(i64 threadLimit) override - { - Spec_.ThreadLimit = threadLimit; - } - - void SetDevices(const std::vector<TDevice>& devices) override - { - Spec_.Devices = devices; - } - - void SetEnablePorto(EEnablePorto enablePorto) override - { - Spec_.EnablePorto = enablePorto; - } - - void SetIsolate(bool isolate) override - { - Spec_.Isolate = isolate; - } - - void EnableMemoryTracking() override - { - Spec_.CGroupControllers.push_back("memory"); - } - - void SetGroup(int groupId) override - { - Spec_.GroupId = groupId; - } - - void SetUser(const TString& user) override - { - Spec_.User = user; - } - - void SetIPAddresses(const std::vector<NNet::TIP6Address>& addresses, bool enableNat64) override - { - Spec_.IPAddresses = addresses; - Spec_.EnableNat64 = enableNat64; - Spec_.DisableNetwork = false; - } - - void DisableNetwork() override - { - Spec_.DisableNetwork = true; - Spec_.IPAddresses.clear(); - Spec_.EnableNat64 = false; - } - - void SetHostName(const TString& hostName) override - { - Spec_.HostName = hostName; - } - - TFuture<IInstancePtr> Launch( - const TString& path, - const std::vector<TString>& args, - const THashMap<TString, TString>& env) override - { - TStringBuilder commandBuilder; - auto append = [&] (const auto& value) { - commandBuilder.AppendString("'"); - commandBuilder.AppendString(NDetail::EscapeForWordexp(value.c_str())); - commandBuilder.AppendString("' "); - }; - - append(path); - for (const auto& arg : args) { - append(arg); - } - - Spec_.Command = commandBuilder.Flush(); - YT_LOG_DEBUG("Executing Porto container (Name: %v, Command: %v)", - Spec_.Name, - Spec_.Command); - - Spec_.Env = env; - - auto onContainerCreated = [this, this_ = MakeStrong(this)] (const TError& error) -> IInstancePtr { - if (!error.IsOK()) { - THROW_ERROR_EXCEPTION(EErrorCode::FailedToStartContainer, "Unable to start container") - << error; - } - - return GetPortoInstance(Executor_, Spec_.Name); - }; - - return Executor_->CreateContainer(Spec_, /* start */ true) - .Apply(BIND(onContainerCreated)); - } - -private: - IPortoExecutorPtr Executor_; - TRunnableContainerSpec Spec_; - const NLogging::TLogger Logger; -}; - -IInstanceLauncherPtr CreatePortoInstanceLauncher(const TString& name, IPortoExecutorPtr executor) -{ - return New<TPortoInstanceLauncher>(name, executor); -} - -//////////////////////////////////////////////////////////////////////////////// - -class TPortoInstance - : public IInstance -{ -public: - static IInstancePtr GetSelf(IPortoExecutorPtr executor) - { - return New<TPortoInstance>(GetSelfContainerName(executor), executor); - } - - static IInstancePtr GetInstance(IPortoExecutorPtr executor, const TString& name) - { - return New<TPortoInstance>(name, executor); - } - - void Kill(int signal) override - { - auto error = WaitFor(Executor_->KillContainer(Name_, signal)); - // Killing already finished process is not an error. - if (error.FindMatching(EPortoErrorCode::InvalidState)) { - return; - } - if (!error.IsOK()) { - THROW_ERROR_EXCEPTION("Failed to send signal to Porto instance") - << TErrorAttribute("signal", signal) - << TErrorAttribute("container", Name_) - << error; - } - } - - void Destroy() override - { - WaitFor(Executor_->DestroyContainer(Name_)) - .ThrowOnError(); - Destroyed_ = true; - } - - void Stop() override - { - WaitFor(Executor_->StopContainer(Name_)) - .ThrowOnError(); - } - - TErrorOr<ui64> CalculateCpuUserUsage( - TErrorOr<ui64>& cpuUsage, - TErrorOr<ui64>& cpuSystemUsage) const - { - if (cpuUsage.IsOK() && cpuSystemUsage.IsOK()) { - return cpuUsage.Value() > cpuSystemUsage.Value() ? cpuUsage.Value() - cpuSystemUsage.Value() : 0; - } else if (cpuUsage.IsOK()) { - return TError("Missing property %Qlv in Porto response", EStatField::CpuSystemUsage) - << TErrorAttribute("container", Name_); - } else { - return TError("Missing property %Qlv in Porto response", EStatField::CpuUsage) - << TErrorAttribute("container", Name_); - } - } - - TResourceUsage GetResourceUsage( - const std::vector<EStatField>& fields) const override - { - std::vector<TString> properties; - properties.push_back("absolute_name"); - - bool userTimeRequested = false; - bool contextSwitchesRequested = false; - for (auto field : fields) { - if (auto it = NDetail::PortoStatRules.find(field)) { - const auto& rule = it->second; - properties.push_back(rule.first); - } else if (field == EStatField::ContextSwitchesDelta || field == EStatField::ContextSwitches) { - contextSwitchesRequested = true; - } else if (field == EStatField::CpuUserUsage) { - userTimeRequested = true; - } else { - THROW_ERROR_EXCEPTION("Unknown resource field %Qlv requested", field) - << TErrorAttribute("container", Name_); - } - } - - auto propertyMap = WaitFor(Executor_->GetContainerProperties(Name_, properties)) - .ValueOrThrow(); - - TResourceUsage result; - - for (auto field : fields) { - auto ruleIt = NDetail::PortoStatRules.find(field); - if (ruleIt == NDetail::PortoStatRules.end()) { - continue; - } - - const auto& [property, callback] = ruleIt->second; - auto& record = result[field]; - if (auto responseIt = propertyMap.find(property); responseIt != propertyMap.end()) { - const auto& valueOrError = responseIt->second; - if (valueOrError.IsOK()) { - const auto& value = valueOrError.Value(); - - try { - record = callback(value); - } catch (const std::exception& ex) { - record = TError("Error parsing Porto property %Qlv", field) - << TErrorAttribute("container", Name_) - << TErrorAttribute("property_value", value) - << ex; - } - } else { - record = TError("Error getting Porto property %Qlv", field) - << TErrorAttribute("container", Name_) - << valueOrError; - } - } else { - record = TError("Missing property %Qlv in Porto response", field) - << TErrorAttribute("container", Name_); - } - } - - // We should maintain context switch information even if this field - // is not requested since metrics of individual containers can go up and down. - auto subcontainers = WaitFor(Executor_->ListSubcontainers(Name_, /*includeRoot*/ true)) - .ValueOrThrow(); - - auto metricMap = WaitFor(Executor_->GetContainerMetrics(subcontainers, "ctxsw")) - .ValueOrThrow(); - - // TODO(don-dron): remove diff calculation from GetResourceUsage, because GetResourceUsage must return only snapshot stat. - { - auto guard = Guard(ContextSwitchMapLock_); - - for (const auto& [container, newValue] : metricMap) { - auto& prevValue = ContextSwitchMap_[container]; - TotalContextSwitches_ += std::max<i64>(0LL, newValue - prevValue); - prevValue = newValue; - } - - if (contextSwitchesRequested) { - result[EStatField::ContextSwitchesDelta] = TotalContextSwitches_; - } - } - - if (contextSwitchesRequested) { - ui64 totalContextSwitches = 0; - - for (const auto& [container, newValue] : metricMap) { - totalContextSwitches += std::max<ui64>(0UL, newValue); - } - - result[EStatField::ContextSwitches] = totalContextSwitches; - } - - if (userTimeRequested) { - result[EStatField::CpuUserUsage] = CalculateCpuUserUsage( - result[EStatField::CpuUsage], - result[EStatField::CpuSystemUsage]); - } - - return result; - } - - TResourceLimits GetResourceLimits() const override - { - std::vector<TString> properties; - static TString memoryLimitProperty = "memory_limit_total"; - static TString cpuLimitProperty = "cpu_limit_bound"; - static TString cpuGuaranteeProperty = "cpu_guarantee_bound"; - properties.push_back(memoryLimitProperty); - properties.push_back(cpuLimitProperty); - properties.push_back(cpuGuaranteeProperty); - - auto responseOrError = WaitFor(Executor_->GetContainerProperties(Name_, properties)); - THROW_ERROR_EXCEPTION_IF_FAILED(responseOrError, "Failed to get Porto container resource limits"); - - const auto& response = responseOrError.Value(); - - const auto& memoryLimitRsp = response.at(memoryLimitProperty); - THROW_ERROR_EXCEPTION_IF_FAILED(memoryLimitRsp, "Failed to get memory limit from Porto"); - - i64 memoryLimit; - if (!TryFromString<i64>(memoryLimitRsp.Value(), memoryLimit)) { - THROW_ERROR_EXCEPTION("Failed to parse memory limit value from Porto") - << TErrorAttribute(memoryLimitProperty, memoryLimitRsp.Value()); - } - - const auto& cpuLimitRsp = response.at(cpuLimitProperty); - THROW_ERROR_EXCEPTION_IF_FAILED(cpuLimitRsp, "Failed to get CPU limit from Porto"); - - double cpuLimit; - YT_VERIFY(cpuLimitRsp.Value().EndsWith('c')); - auto cpuLimitValue = TStringBuf(cpuLimitRsp.Value().begin(), cpuLimitRsp.Value().size() - 1); - if (!TryFromString<double>(cpuLimitValue, cpuLimit)) { - THROW_ERROR_EXCEPTION("Failed to parse CPU limit value from Porto") - << TErrorAttribute(cpuLimitProperty, cpuLimitRsp.Value()); - } - - const auto& cpuGuaranteeRsp = response.at(cpuGuaranteeProperty); - THROW_ERROR_EXCEPTION_IF_FAILED(cpuGuaranteeRsp, "Failed to get CPU guarantee from Porto"); - - double cpuGuarantee; - if (!cpuGuaranteeRsp.Value()) { - // XXX(ignat): hack for missing response from Porto. - cpuGuarantee = 0.0; - } else { - YT_VERIFY(cpuGuaranteeRsp.Value().EndsWith('c')); - auto cpuGuaranteeValue = TStringBuf(cpuGuaranteeRsp.Value().begin(), cpuGuaranteeRsp.Value().size() - 1); - if (!TryFromString<double>(cpuGuaranteeValue, cpuGuarantee)) { - THROW_ERROR_EXCEPTION("Failed to parse CPU guarantee value from Porto") - << TErrorAttribute(cpuGuaranteeProperty, cpuGuaranteeRsp.Value()); - } - } - - return TResourceLimits{ - .CpuLimit = cpuLimit, - .CpuGuarantee = cpuGuarantee, - .Memory = memoryLimit, - }; - } - - void SetCpuGuarantee(double cores) override - { - SetProperty("cpu_guarantee", ToString(cores) + "c"); - } - - void SetCpuLimit(double cores) override - { - SetProperty("cpu_limit", ToString(cores) + "c"); - } - - void SetCpuWeight(double weight) override - { - SetProperty("cpu_weight", weight); - } - - void SetMemoryGuarantee(i64 memoryGuarantee) override - { - SetProperty("memory_guarantee", memoryGuarantee); - } - - void SetIOWeight(double weight) override - { - SetProperty("io_weight", weight); - } - - void SetIOThrottle(i64 operations) override - { - SetProperty("io_ops_limit", operations); - } - - TString GetStderr() const override - { - return *WaitFor(Executor_->GetContainerProperty(Name_, "stderr")) - .ValueOrThrow(); - } - - TString GetName() const override - { - return Name_; - } - - std::optional<TString> GetParentName() const override - { - return NDetail::GetParentName(Name_); - } - - std::optional<TString> GetRootName() const override - { - return NDetail::GetRootName(Name_); - } - - pid_t GetPid() const override - { - auto pid = *WaitFor(Executor_->GetContainerProperty(Name_, "root_pid")) - .ValueOrThrow(); - return std::stoi(pid); - } - - i64 GetMajorPageFaultCount() const override - { - auto faults = WaitFor(Executor_->GetContainerProperty(Name_, "major_faults")) - .ValueOrThrow(); - return faults - ? std::stoll(*faults) - : 0; - } - - double GetCpuGuarantee() const override - { - auto result = WaitFor(Executor_->GetContainerProperty(Name_, "cpu_guarantee")) - .ValueOrThrow(); - return result - ? std::stod(*result) - : 0; - } - - std::vector<pid_t> GetPids() const override - { - auto getPidCgroup = [&] (const TString& cgroups) { - for (TStringBuf cgroup : StringSplitter(cgroups).SplitByString("; ")) { - if (cgroup.StartsWith("pids:")) { - auto startPosition = cgroup.find('/'); - YT_VERIFY(startPosition != TString::npos); - return cgroup.substr(startPosition); - } - } - THROW_ERROR_EXCEPTION("Pids cgroup not found for container %Qv", GetName()) - << TErrorAttribute("cgroups", cgroups); - }; - - auto cgroups = *WaitFor(Executor_->GetContainerProperty(Name_, "cgroups")) - .ValueOrThrow(); - // Porto returns full cgroup name, with mount prefix, such as "/sys/fs/cgroup/pids". - auto instanceCgroup = getPidCgroup(cgroups); - - std::vector<pid_t> pids; - for (auto pid : ListPids()) { - std::map<TString, TString> cgroups; - try { - cgroups = GetProcessCGroups(pid); - } catch (const std::exception& ex) { - YT_LOG_DEBUG(ex, "Failed to get CGroups for process (Pid: %v)", pid); - continue; - } - - // Pid cgroups are returned in short form. - auto processPidCgroup = cgroups["pids"]; - if (!processPidCgroup.empty() && instanceCgroup.EndsWith(processPidCgroup)) { - pids.push_back(pid); - } - } - - return pids; - } - - TFuture<void> Wait() override - { - return Executor_->PollContainer(Name_) - .Apply(BIND([] (int status) { - StatusToError(status) - .ThrowOnError(); - })); - } - -private: - const TString Name_; - const IPortoExecutorPtr Executor_; - const NLogging::TLogger Logger; - - bool Destroyed_ = false; - - YT_DECLARE_SPIN_LOCK(NThreading::TSpinLock, ContextSwitchMapLock_); - mutable i64 TotalContextSwitches_ = 0; - mutable THashMap<TString, i64> ContextSwitchMap_; - - TPortoInstance(TString name, IPortoExecutorPtr executor) - : Name_(std::move(name)) - , Executor_(std::move(executor)) - , Logger(ContainersLogger.WithTag("Container: %v", Name_)) - { } - - void SetProperty(const TString& key, const TString& value) - { - WaitFor(Executor_->SetContainerProperty(Name_, key, value)) - .ThrowOnError(); - } - - void SetProperty(const TString& key, i64 value) - { - SetProperty(key, ToString(value)); - } - - void SetProperty(const TString& key, double value) - { - SetProperty(key, ToString(value)); - } - - DECLARE_NEW_FRIEND() -}; - -//////////////////////////////////////////////////////////////////////////////// - -TString GetSelfContainerName(const IPortoExecutorPtr& executor) -{ - try { - auto properties = WaitFor(executor->GetContainerProperties( - "self", - std::vector<TString>{"absolute_name", "absolute_namespace"})) - .ValueOrThrow(); - - auto absoluteName = properties.at("absolute_name") - .ValueOrThrow(); - auto absoluteNamespace = properties.at("absolute_namespace") - .ValueOrThrow(); - - if (absoluteName == "/") { - return absoluteName; - } - - if (absoluteName.length() < absoluteNamespace.length()) { - YT_VERIFY(absoluteName + "/" == absoluteNamespace); - return ""; - } else { - YT_VERIFY(absoluteName.StartsWith(absoluteNamespace)); - return absoluteName.substr(absoluteNamespace.length()); - } - } catch (const std::exception& ex) { - THROW_ERROR_EXCEPTION("Failed to get name for container \"self\"") - << ex; - } -} - -IInstancePtr GetSelfPortoInstance(IPortoExecutorPtr executor) -{ - return TPortoInstance::GetSelf(executor); -} - -IInstancePtr GetPortoInstance(IPortoExecutorPtr executor, const TString& name) -{ - return TPortoInstance::GetInstance(executor, name); -} - -IInstancePtr GetRootPortoInstance(IPortoExecutorPtr executor) -{ - auto self = GetSelfPortoInstance(executor); - return TPortoInstance::GetInstance(executor, *self->GetRootName()); -} - -double GetSelfPortoInstanceVCpuFactor() -{ - auto config = New<TPortoExecutorDynamicConfig>(); - auto executorPtr = CreatePortoExecutor(config, ""); - auto currentContainer = GetSelfPortoInstance(executorPtr); - double cpuLimit = currentContainer->GetResourceLimits().CpuLimit; - if (cpuLimit <= 0) { - THROW_ERROR_EXCEPTION("Cpu limit must be greater than 0"); - } - - // DEPLOY_VCPU_LIMIT stores value in millicores - if (TString vcpuLimitStr = GetEnv("DEPLOY_VCPU_LIMIT"); !vcpuLimitStr.Empty()) { - double vcpuLimit = FromString<double>(vcpuLimitStr) / 1000.0; - return vcpuLimit / cpuLimit; - } - THROW_ERROR_EXCEPTION("Failed to get vcpu limit from env variable"); -} - -//////////////////////////////////////////////////////////////////////////////// - -} // namespace NYT::NContainers - -#endif diff --git a/yt/yt/library/containers/instance.h b/yt/yt/library/containers/instance.h deleted file mode 100644 index ff6e0b3ce1..0000000000 --- a/yt/yt/library/containers/instance.h +++ /dev/null @@ -1,168 +0,0 @@ -#pragma once - -#include "public.h" - -#include <yt/yt/core/actions/future.h> - -#include <yt/yt/core/net/address.h> - -namespace NYT::NContainers { - -//////////////////////////////////////////////////////////////////////////////// - -using TResourceUsage = THashMap<EStatField, TErrorOr<ui64>>; - -const std::vector<EStatField> InstanceStatFields{ - EStatField::CpuUsage, - EStatField::CpuUserUsage, - EStatField::CpuSystemUsage, - EStatField::CpuWait, - EStatField::CpuThrottled, - EStatField::ContextSwitches, - EStatField::ContextSwitchesDelta, - EStatField::ThreadCount, - EStatField::CpuLimit, - EStatField::CpuGuarantee, - - EStatField::Rss, - EStatField::MappedFile, - EStatField::MajorPageFaults, - EStatField::MinorPageFaults, - EStatField::FileCacheUsage, - EStatField::AnonMemoryUsage, - EStatField::AnonMemoryLimit, - EStatField::MemoryUsage, - EStatField::MemoryGuarantee, - EStatField::MemoryLimit, - EStatField::MaxMemoryUsage, - EStatField::OomKills, - EStatField::OomKillsTotal, - - EStatField::IOReadByte, - EStatField::IOWriteByte, - EStatField::IOBytesLimit, - EStatField::IOReadOps, - EStatField::IOWriteOps, - EStatField::IOOps, - EStatField::IOOpsLimit, - EStatField::IOTotalTime, - EStatField::IOWaitTime, - - EStatField::NetTxBytes, - EStatField::NetTxPackets, - EStatField::NetTxDrops, - EStatField::NetTxLimit, - EStatField::NetRxBytes, - EStatField::NetRxPackets, - EStatField::NetRxDrops, - EStatField::NetRxLimit, -}; - -struct TResourceLimits -{ - double CpuLimit; - double CpuGuarantee; - i64 Memory; -}; - -//////////////////////////////////////////////////////////////////////////////// - -struct IInstanceLauncher - : public TRefCounted -{ - virtual bool HasRoot() const = 0; - virtual const TString& GetName() const = 0; - - virtual void SetStdIn(const TString& inputPath) = 0; - virtual void SetStdOut(const TString& outPath) = 0; - virtual void SetStdErr(const TString& errorPath) = 0; - virtual void SetCwd(const TString& pwd) = 0; - - // Null core dump handler implies disabled core dumps. - virtual void SetCoreDumpHandler(const std::optional<TString>& handler) = 0; - virtual void SetRoot(const TRootFS& rootFS) = 0; - - virtual void SetThreadLimit(i64 threadLimit) = 0; - virtual void SetDevices(const std::vector<TDevice>& devices) = 0; - - virtual void SetEnablePorto(EEnablePorto enablePorto) = 0; - virtual void SetIsolate(bool isolate) = 0; - virtual void EnableMemoryTracking() = 0; - virtual void SetGroup(int groupId) = 0; - virtual void SetUser(const TString& user) = 0; - virtual void SetIPAddresses( - const std::vector<NNet::TIP6Address>& addresses, - bool enableNat64 = false) = 0; - virtual void DisableNetwork() = 0; - virtual void SetHostName(const TString& hostName) = 0; - - virtual TFuture<IInstancePtr> Launch( - const TString& path, - const std::vector<TString>& args, - const THashMap<TString, TString>& env) = 0; -}; - -DEFINE_REFCOUNTED_TYPE(IInstanceLauncher) - -#ifdef _linux_ -IInstanceLauncherPtr CreatePortoInstanceLauncher(const TString& name, IPortoExecutorPtr executor); -#endif - -//////////////////////////////////////////////////////////////////////////////// - -struct IInstance - : public TRefCounted -{ - virtual void Kill(int signal) = 0; - virtual void Stop() = 0; - virtual void Destroy() = 0; - - virtual TResourceUsage GetResourceUsage( - const std::vector<EStatField>& fields = InstanceStatFields) const = 0; - virtual TResourceLimits GetResourceLimits() const = 0; - virtual void SetCpuGuarantee(double cores) = 0; - virtual void SetCpuLimit(double cores) = 0; - virtual void SetCpuWeight(double weight) = 0; - virtual void SetIOWeight(double weight) = 0; - virtual void SetIOThrottle(i64 operations) = 0; - virtual void SetMemoryGuarantee(i64 memoryGuarantee) = 0; - - virtual TString GetStderr() const = 0; - - virtual TString GetName() const = 0; - virtual std::optional<TString> GetParentName() const = 0; - virtual std::optional<TString> GetRootName() const = 0; - - //! Returns externally visible pid of the root process inside container. - //! Throws if container is not running. - virtual pid_t GetPid() const = 0; - //! Returns the list of externally visible pids of processes running inside container. - virtual std::vector<pid_t> GetPids() const = 0; - - virtual i64 GetMajorPageFaultCount() const = 0; - virtual double GetCpuGuarantee() const = 0; - - //! Future is set when container reaches terminal state (stopped or dead). - //! Resulting error is OK iff container exited with code 0. - virtual TFuture<void> Wait() = 0; -}; - -DEFINE_REFCOUNTED_TYPE(IInstance) - -//////////////////////////////////////////////////////////////////////////////// - -#ifdef _linux_ -TString GetSelfContainerName(const IPortoExecutorPtr& executor); - -IInstancePtr GetSelfPortoInstance(IPortoExecutorPtr executor); -IInstancePtr GetRootPortoInstance(IPortoExecutorPtr executor); -IInstancePtr GetPortoInstance(IPortoExecutorPtr executor, const TString& name); - -//! Works only in Yandex.Deploy pod environment where env DEPLOY_VCPU_LIMIT is set. -//! Throws if this env is absent. -double GetSelfPortoInstanceVCpuFactor(); -#endif - -//////////////////////////////////////////////////////////////////////////////// - -} // namespace NYT::NContainers diff --git a/yt/yt/library/containers/instance_limits_tracker.cpp b/yt/yt/library/containers/instance_limits_tracker.cpp deleted file mode 100644 index 55ef7d2d67..0000000000 --- a/yt/yt/library/containers/instance_limits_tracker.cpp +++ /dev/null @@ -1,179 +0,0 @@ -#include "public.h" -#include "instance_limits_tracker.h" -#include "instance.h" -#include "porto_resource_tracker.h" -#include "private.h" - -#include <yt/yt/core/concurrency/periodic_executor.h> - -#include <yt/yt/core/ytree/fluent.h> -#include <yt/yt/core/ytree/ypath_service.h> - -namespace NYT::NContainers { - -using namespace NYTree; - -//////////////////////////////////////////////////////////////////////////////// - -static const auto& Logger = ContainersLogger; - -//////////////////////////////////////////////////////////////////////////////// - -TInstanceLimitsTracker::TInstanceLimitsTracker( - IInstancePtr instance, - IInstancePtr root, - IInvokerPtr invoker, - TDuration updatePeriod) - : Invoker_(std::move(invoker)) - , Executor_(New<NConcurrency::TPeriodicExecutor>( - Invoker_, - BIND(&TInstanceLimitsTracker::DoUpdateLimits, MakeWeak(this)), - updatePeriod)) -{ -#ifdef _linux_ - SelfTracker_ = New<TPortoResourceTracker>(std::move(instance), updatePeriod / 2); - RootTracker_ = New<TPortoResourceTracker>(std::move(root), updatePeriod / 2); -#else - Y_UNUSED(instance); - Y_UNUSED(root); -#endif -} - -void TInstanceLimitsTracker::Start() -{ - if (!Running_) { - Executor_->Start(); - Running_ = true; - YT_LOG_INFO("Instance limits tracker started"); - } -} - -void TInstanceLimitsTracker::Stop() -{ - if (Running_) { - YT_UNUSED_FUTURE(Executor_->Stop()); - Running_ = false; - YT_LOG_INFO("Instance limits tracker stopped"); - } -} - -void TInstanceLimitsTracker::DoUpdateLimits() -{ - VERIFY_INVOKER_AFFINITY(Invoker_); - -#ifdef _linux_ - YT_LOG_DEBUG("Checking for instance limits update"); - - auto setIfOk = [] (auto* destination, const auto& valueOrError, const TString& fieldName, bool alert = true) { - if (valueOrError.IsOK()) { - *destination = valueOrError.Value(); - } else { - YT_LOG_ALERT_IF(alert, valueOrError, "Failed to get container property (Field: %v)", - fieldName); - - YT_LOG_DEBUG(valueOrError, "Failed to get container property (Field: %v)", - fieldName); - } - }; - - try { - auto memoryStatistics = SelfTracker_->GetMemoryStatistics(); - auto netStatistics = RootTracker_->GetNetworkStatistics(); - auto cpuStatistics = SelfTracker_->GetCpuStatistics(); - - setIfOk(&MemoryUsage_, memoryStatistics.Rss, "MemoryRss"); - - TDuration cpuGuarantee; - TDuration cpuLimit; - - if (cpuStatistics.GuaranteeTime.IsOK()) { - setIfOk(&cpuGuarantee, cpuStatistics.GuaranteeTime, "CpuGuarantee"); - } else { - // XXX(don-dron, ignat): do nothing, see NContainers::TPortoInstance::GetResourceLimits, hack for missing response from Porto. - } - - setIfOk(&cpuLimit, cpuStatistics.LimitTime, "CpuLimit"); - - if (CpuGuarantee_ != cpuGuarantee) { - YT_LOG_INFO("Instance CPU guarantee updated (OldCpuGuarantee: %v, NewCpuGuarantee: %v)", - CpuGuarantee_, - cpuGuarantee); - CpuGuarantee_ = cpuGuarantee; - // NB: We do not fire LimitsUpdated since this value used only for diagnostics. - } - - TInstanceLimits limits; - limits.Cpu = cpuLimit.SecondsFloat(); - - if (memoryStatistics.AnonLimit.IsOK() && memoryStatistics.MemoryLimit.IsOK()) { - i64 anonLimit = memoryStatistics.AnonLimit.Value(); - i64 memoryLimit = memoryStatistics.MemoryLimit.Value(); - - if (anonLimit > 0 && memoryLimit > 0) { - limits.Memory = std::min(anonLimit, memoryLimit); - } else if (anonLimit > 0) { - limits.Memory = anonLimit; - } else { - limits.Memory = memoryLimit; - } - } else { - setIfOk(&limits.Memory, memoryStatistics.MemoryLimit, "MemoryLimit"); - } - - static constexpr bool DontFireAlertOnError = {}; - setIfOk(&limits.NetTx, netStatistics.TxLimit, "NetTxLimit", DontFireAlertOnError); - setIfOk(&limits.NetRx, netStatistics.RxLimit, "NetRxLimit", DontFireAlertOnError); - - if (InstanceLimits_ != limits) { - YT_LOG_INFO("Instance limits updated (OldLimits: %v, NewLimits: %v)", - InstanceLimits_, - limits); - InstanceLimits_ = limits; - LimitsUpdated_.Fire(limits); - } - } catch (const std::exception& ex) { - YT_LOG_WARNING(ex, "Failed to get instance limits"); - } -#endif -} - -IYPathServicePtr TInstanceLimitsTracker::GetOrchidService() -{ - return IYPathService::FromProducer(BIND(&TInstanceLimitsTracker::DoBuildOrchid, MakeStrong(this))) - ->Via(Invoker_); -} - -void TInstanceLimitsTracker::DoBuildOrchid(NYson::IYsonConsumer* consumer) const -{ - NYTree::BuildYsonFluently(consumer) - .BeginMap() - .DoIf(static_cast<bool>(InstanceLimits_), [&] (auto fluent) { - fluent.Item("cpu_limit").Value(InstanceLimits_->Cpu); - }) - .DoIf(static_cast<bool>(CpuGuarantee_), [&] (auto fluent) { - fluent.Item("cpu_guarantee").Value(*CpuGuarantee_); - }) - .DoIf(static_cast<bool>(InstanceLimits_), [&] (auto fluent) { - fluent.Item("memory_limit").Value(InstanceLimits_->Memory); - }) - .DoIf(static_cast<bool>(MemoryUsage_), [&] (auto fluent) { - fluent.Item("memory_usage").Value(*MemoryUsage_); - }) - .EndMap(); -} - -//////////////////////////////////////////////////////////////////////////////// - -void FormatValue(TStringBuilderBase* builder, const TInstanceLimits& limits, TStringBuf /*format*/) -{ - builder->AppendFormat( - "{Cpu: %v, Memory: %v, NetTx: %v, NetRx: %v}", - limits.Cpu, - limits.Memory, - limits.NetTx, - limits.NetRx); -} - -//////////////////////////////////////////////////////////////////////////////// - -} // namespace NYT::NContainers diff --git a/yt/yt/library/containers/instance_limits_tracker.h b/yt/yt/library/containers/instance_limits_tracker.h deleted file mode 100644 index e652fff446..0000000000 --- a/yt/yt/library/containers/instance_limits_tracker.h +++ /dev/null @@ -1,59 +0,0 @@ -#pragma once - -#include "public.h" - -#include <yt/yt/core/actions/signal.h> - -#include <yt/yt/core/concurrency/public.h> - -#include <yt/yt/core/yson/public.h> - -#include <yt/yt/core/ytree/public.h> - -namespace NYT::NContainers { - -//////////////////////////////////////////////////////////////////////////////// - -class TInstanceLimitsTracker - : public TRefCounted -{ -public: - //! Raises when container limits change. - DEFINE_SIGNAL(void(const TInstanceLimits&), LimitsUpdated); - -public: - TInstanceLimitsTracker( - IInstancePtr instance, - IInstancePtr root, - IInvokerPtr invoker, - TDuration updatePeriod); - - void Start(); - void Stop(); - - NYTree::IYPathServicePtr GetOrchidService(); - -private: - void DoUpdateLimits(); - void DoBuildOrchid(NYson::IYsonConsumer* consumer) const; - - TPortoResourceTrackerPtr SelfTracker_; - TPortoResourceTrackerPtr RootTracker_; - const IInvokerPtr Invoker_; - const NConcurrency::TPeriodicExecutorPtr Executor_; - - std::optional<TDuration> CpuGuarantee_; - std::optional<TInstanceLimits> InstanceLimits_; - std::optional<i64> MemoryUsage_; - bool Running_ = false; -}; - -DEFINE_REFCOUNTED_TYPE(TInstanceLimitsTracker) - -//////////////////////////////////////////////////////////////////////////////// - -void FormatValue(TStringBuilderBase* builder, const TInstanceLimits& limits, TStringBuf format); - -//////////////////////////////////////////////////////////////////////////////// - -} // namespace NYT::NContainers diff --git a/yt/yt/library/containers/porto_executor.cpp b/yt/yt/library/containers/porto_executor.cpp deleted file mode 100644 index a6a44fd20f..0000000000 --- a/yt/yt/library/containers/porto_executor.cpp +++ /dev/null @@ -1,1079 +0,0 @@ -#include "porto_executor.h" -#include "config.h" - -#include "private.h" - -#include <yt/yt/core/concurrency/action_queue.h> -#include <yt/yt/core/concurrency/periodic_executor.h> -#include <yt/yt/core/concurrency/scheduler.h> - -#include <yt/yt/core/logging/log.h> - -#include <yt/yt/core/misc/fs.h> - -#include <yt/yt/core/profiling/timing.h> - -#include <yt/yt/core/ytree/convert.h> - -#include <library/cpp/porto/proto/rpc.pb.h> - -#include <library/cpp/yt/memory/atomic_intrusive_ptr.h> - -#include <string> - -namespace NYT::NContainers { - -using namespace NConcurrency; -using Porto::EError; - -//////////////////////////////////////////////////////////////////////////////// - -#ifdef _linux_ - -static const NLogging::TLogger& Logger = ContainersLogger; -static constexpr auto RetryInterval = TDuration::MilliSeconds(100); - -//////////////////////////////////////////////////////////////////////////////// - -TString PortoErrorCodeFormatter(int code) -{ - return TEnumTraits<EPortoErrorCode>::ToString(static_cast<EPortoErrorCode>(code)); -} - -YT_DEFINE_ERROR_CODE_RANGE(12000, 13999, "NYT::NContainers::EPortoErrorCode", PortoErrorCodeFormatter); - -//////////////////////////////////////////////////////////////////////////////// - -EPortoErrorCode ConvertPortoErrorCode(EError portoError) -{ - return static_cast<EPortoErrorCode>(PortoErrorCodeBase + portoError); -} - -bool IsRetriableErrorCode(EPortoErrorCode error, bool idempotent) -{ - return - error == EPortoErrorCode::Unknown || - // TODO(babenko): it's not obvious that we can always retry SocketError - // but this is how it has used to work for a while. - error == EPortoErrorCode::SocketError || - error == EPortoErrorCode::SocketTimeout && idempotent; -} - -THashMap<TString, TErrorOr<TString>> ParsePortoGetResponse( - const Porto::TGetResponse_TContainerGetListResponse& response) -{ - THashMap<TString, TErrorOr<TString>> result; - for (const auto& property : response.keyval()) { - if (property.error() == EError::Success) { - result[property.variable()] = property.value(); - } else { - result[property.variable()] = TError(ConvertPortoErrorCode(property.error()), property.errormsg()) - << TErrorAttribute("porto_error", ConvertPortoErrorCode(property.error())); - } - } - return result; -} - -THashMap<TString, TErrorOr<TString>> ParseSinglePortoGetResponse( - const TString& name, - const Porto::TGetResponse& getResponse) -{ - for (const auto& container : getResponse.list()) { - if (container.name() == name) { - return ParsePortoGetResponse(container); - } - } - THROW_ERROR_EXCEPTION("Unable to get properties from Porto") - << TErrorAttribute("container", name); -} - -THashMap<TString, THashMap<TString, TErrorOr<TString>>> ParseMultiplePortoGetResponse( - const Porto::TGetResponse& getResponse) -{ - THashMap<TString, THashMap<TString, TErrorOr<TString>>> result; - for (const auto& container : getResponse.list()) { - result[container.name()] = ParsePortoGetResponse(container); - } - return result; -} - -TString FormatEnablePorto(EEnablePorto value) -{ - switch (value) { - case EEnablePorto::None: return "none"; - case EEnablePorto::Isolate: return "isolate"; - case EEnablePorto::Full: return "full"; - default: YT_ABORT(); - } -} - -//////////////////////////////////////////////////////////////////////////////// - -class TPortoExecutor - : public IPortoExecutor -{ -public: - TPortoExecutor( - TPortoExecutorDynamicConfigPtr config, - const TString& threadNameSuffix, - const NProfiling::TProfiler& profiler) - : Config_(std::move(config)) - , Queue_(New<TActionQueue>(Format("Porto:%v", threadNameSuffix))) - , Profiler_(profiler) - , PollExecutor_(New<TPeriodicExecutor>( - Queue_->GetInvoker(), - BIND(&TPortoExecutor::DoPoll, MakeWeak(this)), - Config_->PollPeriod)) - { - DynamicConfig_.Store(New<TPortoExecutorDynamicConfig>()); - - Api_->SetTimeout(Config_->ApiTimeout.Seconds()); - Api_->SetDiskTimeout(Config_->ApiDiskTimeout.Seconds()); - - PollExecutor_->Start(); - } - - void SubscribeFailed(const TCallback<void (const TError&)>& callback) override - { - Failed_.Subscribe(callback); - } - - void UnsubscribeFailed(const TCallback<void (const TError&)>& callback) override - { - Failed_.Unsubscribe(callback); - } - - void OnDynamicConfigChanged(const TPortoExecutorDynamicConfigPtr& newConfig) override - { - DynamicConfig_.Store(newConfig); - } - -private: - template <class T, class... TArgs1, class... TArgs2> - auto ExecutePortoApiAction( - T(TPortoExecutor::*Method)(TArgs1...), - const TString& command, - TArgs2&&... args) - { - YT_LOG_DEBUG("Enqueue Porto API action (Command: %v)", command); - return BIND(Method, MakeStrong(this), std::forward<TArgs2>(args)...) - .AsyncVia(Queue_->GetInvoker()) - .Run(); - }; - -public: - TFuture<void> CreateContainer(const TString& container) override - { - return ExecutePortoApiAction( - &TPortoExecutor::DoCreateContainer, - "CreateContainer", - container); - } - - TFuture<void> CreateContainer(const TRunnableContainerSpec& containerSpec, bool start) override - { - return ExecutePortoApiAction( - &TPortoExecutor::DoCreateContainerFromSpec, - "CreateContainerFromSpec", - containerSpec, - start); - } - - TFuture<std::optional<TString>> GetContainerProperty( - const TString& container, - const TString& property) override - { - return ExecutePortoApiAction( - &TPortoExecutor::DoGetContainerProperty, - "GetContainerProperty", - container, - property); - } - - TFuture<THashMap<TString, TErrorOr<TString>>> GetContainerProperties( - const TString& container, - const std::vector<TString>& properties) override - { - return ExecutePortoApiAction( - &TPortoExecutor::DoGetContainerProperties, - "GetContainerProperty", - container, - properties); - } - - TFuture<THashMap<TString, THashMap<TString, TErrorOr<TString>>>> GetContainerProperties( - const std::vector<TString>& containers, - const std::vector<TString>& properties) override - { - return ExecutePortoApiAction( - &TPortoExecutor::DoGetContainerMultipleProperties, - "GetContainerProperty", - containers, - properties); - } - - TFuture<THashMap<TString, i64>> GetContainerMetrics( - const std::vector<TString>& containers, - const TString& metric) override - { - return ExecutePortoApiAction( - &TPortoExecutor::DoGetContainerMetrics, - "GetContainerMetrics", - containers, - metric); - } - - TFuture<void> SetContainerProperty( - const TString& container, - const TString& property, - const TString& value) override - { - return ExecutePortoApiAction( - &TPortoExecutor::DoSetContainerProperty, - "SetContainerProperty", - container, - property, - value); - } - - TFuture<void> DestroyContainer(const TString& container) override - { - return ExecutePortoApiAction( - &TPortoExecutor::DoDestroyContainer, - "DestroyContainer", - container); - } - - TFuture<void> StopContainer(const TString& container) override - { - return ExecutePortoApiAction( - &TPortoExecutor::DoStopContainer, - "StopContainer", - container); - } - - TFuture<void> StartContainer(const TString& container) override - { - return ExecutePortoApiAction( - &TPortoExecutor::DoStartContainer, - "StartContainer", - container); - } - - TFuture<TString> ConvertPath(const TString& path, const TString& container) override - { - return ExecutePortoApiAction( - &TPortoExecutor::DoConvertPath, - "ConvertPath", - path, - container); - } - - TFuture<void> KillContainer(const TString& container, int signal) override - { - return ExecutePortoApiAction( - &TPortoExecutor::DoKillContainer, - "KillContainer", - container, - signal); - } - - TFuture<std::vector<TString>> ListSubcontainers( - const TString& rootContainer, - bool includeRoot) override - { - return ExecutePortoApiAction( - &TPortoExecutor::DoListSubcontainers, - "ListSubcontainers", - rootContainer, - includeRoot); - } - - TFuture<int> PollContainer(const TString& container) override - { - return ExecutePortoApiAction( - &TPortoExecutor::DoPollContainer, - "PollContainer", - container); - } - - TFuture<int> WaitContainer(const TString& container) override - { - return ExecutePortoApiAction( - &TPortoExecutor::DoWaitContainer, - "WaitContainer", - container); - } - - // This method allocates Porto "resources", so it should be uncancellable. - TFuture<TString> CreateVolume( - const TString& path, - const THashMap<TString, TString>& properties) override - { - return ExecutePortoApiAction( - &TPortoExecutor::DoCreateVolume, - "CreateVolume", - path, - properties) - .ToUncancelable(); - } - - // This method allocates Porto "resources", so it should be uncancellable. - TFuture<void> LinkVolume( - const TString& path, - const TString& name) override - { - return ExecutePortoApiAction( - &TPortoExecutor::DoLinkVolume, - "LinkVolume", - path, - name) - .ToUncancelable(); - } - - // This method deallocates Porto "resources", so it should be uncancellable. - TFuture<void> UnlinkVolume( - const TString& path, - const TString& name) override - { - return ExecutePortoApiAction( - &TPortoExecutor::DoUnlinkVolume, - "UnlinkVolume", - path, - name) - .ToUncancelable(); - } - - TFuture<std::vector<TString>> ListVolumePaths() override - { - return ExecutePortoApiAction( - &TPortoExecutor::DoListVolumePaths, - "ListVolumePaths"); - } - - // This method allocates Porto "resources", so it should be uncancellable. - TFuture<void> ImportLayer(const TString& archivePath, const TString& layerId, const TString& place) override - { - return ExecutePortoApiAction( - &TPortoExecutor::DoImportLayer, - "ImportLayer", - archivePath, - layerId, - place) - .ToUncancelable(); - } - - // This method deallocates Porto "resources", so it should be uncancellable. - TFuture<void> RemoveLayer(const TString& layerId, const TString& place, bool async) override - { - return ExecutePortoApiAction( - &TPortoExecutor::DoRemoveLayer, - "RemoveLayer", - layerId, - place, - async) - .ToUncancelable(); - } - - TFuture<std::vector<TString>> ListLayers(const TString& place) override - { - return ExecutePortoApiAction( - &TPortoExecutor::DoListLayers, - "ListLayers", - place); - } - - IInvokerPtr GetInvoker() const override - { - return Queue_->GetInvoker(); - } - -private: - const TPortoExecutorDynamicConfigPtr Config_; - const TActionQueuePtr Queue_; - const NProfiling::TProfiler Profiler_; - const std::unique_ptr<Porto::TPortoApi> Api_ = std::make_unique<Porto::TPortoApi>(); - const TPeriodicExecutorPtr PollExecutor_; - TAtomicIntrusivePtr<TPortoExecutorDynamicConfig> DynamicConfig_; - - std::vector<TString> Containers_; - THashMap<TString, TPromise<int>> ContainerMap_; - TSingleShotCallbackList<void(const TError&)> Failed_; - - struct TCommandEntry - { - explicit TCommandEntry(const NProfiling::TProfiler& registry) - : TimeGauge(registry.Timer("/command_time")) - , RetryCounter(registry.Counter("/command_retries")) - , SuccessCounter(registry.Counter("/command_successes")) - , FailureCounter(registry.Counter("/command_failures")) - { } - - NProfiling::TEventTimer TimeGauge; - NProfiling::TCounter RetryCounter; - NProfiling::TCounter SuccessCounter; - NProfiling::TCounter FailureCounter; - }; - - YT_DECLARE_SPIN_LOCK(NThreading::TSpinLock, CommandLock_); - THashMap<TString, TCommandEntry> CommandToEntry_; - - static const std::vector<TString> ContainerRequestVars_; - - bool IsTestPortoFailureEnabled() const - { - auto config = DynamicConfig_.Acquire(); - return config->EnableTestPortoFailures; - } - - bool IsTestPortoTimeout() const - { - auto config = DynamicConfig_.Acquire(); - return config->EnableTestPortoNotResponding; - } - - EPortoErrorCode GetFailedStubError() const - { - auto config = DynamicConfig_.Acquire(); - return config->StubErrorCode; - } - - static TError CreatePortoError(EPortoErrorCode errorCode, const TString& message) - { - return TError(errorCode, "Porto API error") - << TErrorAttribute("original_porto_error_code", static_cast<int>(errorCode) - PortoErrorCodeBase) - << TErrorAttribute("porto_error_message", message); - } - - THashMap<TString, TErrorOr<TString>> DoGetContainerProperties( - const TString& container, - const std::vector<TString>& properties) - { - auto response = DoRequestContainerProperties({container}, properties); - return ParseSinglePortoGetResponse(container, response); - } - - THashMap<TString, THashMap<TString, TErrorOr<TString>>> DoGetContainerMultipleProperties( - const std::vector<TString>& containers, - const std::vector<TString>& properties) - { - auto response = DoRequestContainerProperties(containers, properties); - return ParseMultiplePortoGetResponse(response); - } - - std::optional<TString> DoGetContainerProperty( - const TString& container, - const TString& property) - { - auto response = DoRequestContainerProperties({container}, {property}); - auto parsedResponse = ParseSinglePortoGetResponse(container, response); - auto it = parsedResponse.find(property); - if (it == parsedResponse.end()) { - return std::nullopt; - } else { - return it->second.ValueOrThrow(); - } - } - - void DoCreateContainer(const TString& container) - { - ExecuteApiCall( - [&] { return Api_->Create(container); }, - "Create", - /*idempotent*/ false); - } - - void DoCreateContainerFromSpec(const TRunnableContainerSpec& spec, bool start) - { - Porto::TContainerSpec portoSpec; - - // Required properties. - portoSpec.set_name(spec.Name); - portoSpec.set_command(spec.Command); - - portoSpec.set_enable_porto(FormatEnablePorto(spec.EnablePorto)); - portoSpec.set_isolate(spec.Isolate); - - if (spec.StdinPath) { - portoSpec.set_stdin_path(*spec.StdinPath); - } - if (spec.StdoutPath) { - portoSpec.set_stdout_path(*spec.StdoutPath); - } - if (spec.StderrPath) { - portoSpec.set_stderr_path(*spec.StderrPath); - } - - if (spec.CurrentWorkingDirectory) { - portoSpec.set_cwd(*spec.CurrentWorkingDirectory); - } - - if (spec.CoreCommand) { - portoSpec.set_core_command(*spec.CoreCommand); - } - if (spec.User) { - portoSpec.set_user(*spec.User); - } - - // Useful for jobs, where we operate with numeric group ids. - if (spec.GroupId) { - portoSpec.set_group(ToString(*spec.GroupId)); - } - - if (spec.ThreadLimit) { - portoSpec.set_thread_limit(*spec.ThreadLimit); - } - - if (spec.HostName) { - // To get a reasonable and unique host name inside container. - portoSpec.set_hostname(*spec.HostName); - if (!spec.IPAddresses.empty()) { - const auto& address = spec.IPAddresses[0]; - auto etcHosts = Format("%v %v\n", address, *spec.HostName); - // To be able to resolve hostname into IP inside container. - portoSpec.set_etc_hosts(etcHosts); - } - } - - if (spec.DisableNetwork) { - auto* netConfig = portoSpec.mutable_net()->add_cfg(); - netConfig->set_opt("none"); - } else if (!spec.IPAddresses.empty() && Config_->EnableNetworkIsolation) { - // This label is intended for HBF-agent: YT-12512. - auto* label = portoSpec.mutable_labels()->add_map(); - label->set_key("HBF.ignore_address"); - label->set_val("1"); - - auto* netConfig = portoSpec.mutable_net()->add_cfg(); - netConfig->set_opt("L3"); - netConfig->add_arg("veth0"); - - for (const auto& address : spec.IPAddresses) { - auto* ipConfig = portoSpec.mutable_ip()->add_cfg(); - ipConfig->set_dev("veth0"); - ipConfig->set_ip(ToString(address)); - } - - if (spec.EnableNat64) { - // Behave like nanny does. - portoSpec.set_resolv_conf("nameserver fd64::1;nameserver 2a02:6b8:0:3400::5005;options attempts:1 timeout:1"); - } - } - - for (const auto& [key, value] : spec.Labels) { - auto* map = portoSpec.mutable_labels()->add_map(); - map->set_key(key); - map->set_val(value); - } - - for (const auto& [name, value] : spec.Env) { - auto* var = portoSpec.mutable_env()->add_var(); - var->set_name(name); - var->set_value(value); - } - - for (const auto& controller : spec.CGroupControllers) { - portoSpec.mutable_controllers()->add_controller(controller); - } - - for (const auto& device : spec.Devices) { - auto* portoDevice = portoSpec.mutable_devices()->add_device(); - portoDevice->set_device(device.DeviceName); - portoDevice->set_access(device.Enabled ? "rw" : "-"); - } - - auto addBind = [&] (const TBind& bind) { - auto* portoBind = portoSpec.mutable_bind()->add_bind(); - portoBind->set_target(bind.TargetPath); - portoBind->set_source(bind.SourcePath); - portoBind->add_flag(bind.ReadOnly ? "ro" : "rw"); - }; - - if (spec.RootFS) { - portoSpec.set_root_readonly(spec.RootFS->IsRootReadOnly); - portoSpec.set_root(spec.RootFS->RootPath); - - for (const auto& bind : spec.RootFS->Binds) { - addBind(bind); - } - } - - { - auto* ulimit = portoSpec.mutable_ulimit()->add_ulimit(); - ulimit->set_type("core"); - if (spec.EnableCoreDumps) { - ulimit->set_unlimited(true); - } else { - ulimit->set_hard(0); - ulimit->set_soft(0); - } - } - - // Set some universal defaults. - portoSpec.set_oom_is_fatal(false); - - ExecuteApiCall( - [&] { return Api_->CreateFromSpec(portoSpec, {}, start); }, - "CreateFromSpec", - /*idempotent*/ false); - } - - void DoSetContainerProperty(const TString& container, const TString& property, const TString& value) - { - ExecuteApiCall( - [&] { return Api_->SetProperty(container, property, value); }, - "SetProperty", - /*idempotent*/ true); - } - - void DoDestroyContainer(const TString& container) - { - try { - ExecuteApiCall( - [&] { return Api_->Destroy(container); }, - "Destroy", - /*idempotent*/ true); - } catch (const TErrorException& ex) { - if (!ex.Error().FindMatching(EPortoErrorCode::ContainerDoesNotExist)) { - throw; - } - } - } - - void DoStopContainer(const TString& container) - { - ExecuteApiCall( - [&] { return Api_->Stop(container); }, - "Stop", - /*idempotent*/ true); - } - - void DoStartContainer(const TString& container) - { - ExecuteApiCall( - [&] { return Api_->Start(container); }, - "Start", - /*idempotent*/ false); - } - - TString DoConvertPath(const TString& path, const TString& container) - { - TString result; - ExecuteApiCall( - [&] { return Api_->ConvertPath(path, container, "self", result); }, - "ConvertPath", - /*idempotent*/ true); - return result; - } - - void DoKillContainer(const TString& container, int signal) - { - ExecuteApiCall( - [&] { return Api_->Kill(container, signal); }, - "Kill", - /*idempotent*/ false); - } - - std::vector<TString> DoListSubcontainers(const TString& rootContainer, bool includeRoot) - { - Porto::TListContainersRequest req; - auto filter = req.add_filters(); - filter->set_name(rootContainer + "/*"); - if (includeRoot) { - auto rootFilter = req.add_filters(); - rootFilter->set_name(rootContainer); - } - auto fieldOptions = req.mutable_field_options(); - fieldOptions->add_properties("absolute_name"); - TVector<Porto::TContainer> containers; - ExecuteApiCall( - [&] { return Api_->ListContainersBy(req, containers); }, - "ListContainersBy", - /*idempotent*/ true); - - std::vector<TString> containerNames; - containerNames.reserve(containers.size()); - for (const auto& container : containers) { - const auto& absoluteName = container.status().absolute_name(); - if (!absoluteName.empty()) { - containerNames.push_back(absoluteName); - } - } - return containerNames; - } - - TFuture<int> DoWaitContainer(const TString& container) - { - auto result = NewPromise<int>(); - auto waitCallback = [=, this, this_ = MakeStrong(this)] (const Porto::TWaitResponse& rsp) { - return OnContainerTerminated(rsp, result); - }; - - ExecuteApiCall( - [&] { return Api_->AsyncWait({container}, {}, waitCallback); }, - "AsyncWait", - /*idempotent*/ false); - - return result.ToFuture().ToImmediatelyCancelable(); - } - - void OnContainerTerminated(const Porto::TWaitResponse& portoWaitResponse, TPromise<int> result) - { - const auto& container = portoWaitResponse.name(); - const auto& state = portoWaitResponse.state(); - if (state != "dead" && state != "stopped") { - result.TrySet(TError("Container finished with unexpected state") - << TErrorAttribute("container_name", container) - << TErrorAttribute("container_state", state)); - return; - } - - // TODO(max42): switch to Subscribe. - YT_UNUSED_FUTURE(GetContainerProperty(container, "exit_status").Apply(BIND( - [=] (const TErrorOr<std::optional<TString>>& errorOrExitCode) { - if (!errorOrExitCode.IsOK()) { - result.TrySet(TError("Container finished, but exit status is unknown") - << errorOrExitCode); - return; - } - - const auto& optionalExitCode = errorOrExitCode.Value(); - if (!optionalExitCode) { - result.TrySet(TError("Container finished, but exit status is unknown") - << TErrorAttribute("container_name", container) - << TErrorAttribute("container_state", state)); - return; - } - - try { - int exitStatus = FromString<int>(*optionalExitCode); - result.TrySet(exitStatus); - } catch (const std::exception& ex) { - auto error = TError("Failed to parse Porto exit status") - << TErrorAttribute("container_name", container) - << TErrorAttribute("exit_status", optionalExitCode.value()); - error.MutableInnerErrors()->push_back(TError(ex)); - result.TrySet(error); - } - }))); - } - - TFuture<int> DoPollContainer(const TString& container) - { - auto [it, inserted] = ContainerMap_.insert({container, NewPromise<int>()}); - if (!inserted) { - YT_LOG_WARNING("Container already added for polling (Container: %v)", - container); - } else { - Containers_.push_back(container); - } - return it->second.ToFuture(); - } - - Porto::TGetResponse DoRequestContainerProperties( - const std::vector<TString>& containers, - const std::vector<TString>& vars) - { - TVector<TString> containers_(containers.begin(), containers.end()); - TVector<TString> vars_(vars.begin(), vars.end()); - - const Porto::TGetResponse* getResponse; - - ExecuteApiCall( - [&] { - getResponse = Api_->Get(containers_, vars_); - return getResponse ? EError::Success : EError::Unknown; - }, - "Get", - /*idempotent*/ true); - - YT_VERIFY(getResponse); - return *getResponse; - } - - THashMap<TString, i64> DoGetContainerMetrics( - const std::vector<TString>& containers, - const TString& metric) - { - TVector<TString> containers_(containers.begin(), containers.end()); - - TMap<TString, uint64_t> result; - - ExecuteApiCall( - [&] { return Api_->GetProcMetric(containers_, metric, result); }, - "GetProcMetric", - /*idempotent*/ true); - - return {result.begin(), result.end()}; - } - - void DoPoll() - { - try { - if (Containers_.empty()) { - return; - } - - auto getResponse = DoRequestContainerProperties(Containers_, ContainerRequestVars_); - - if (getResponse.list().empty()) { - return; - } - - auto getProperty = [] ( - const Porto::TGetResponse::TContainerGetListResponse& container, - const TString& name) -> Porto::TGetResponse::TContainerGetValueResponse - { - for (const auto& property : container.keyval()) { - if (property.variable() == name) { - return property; - } - } - - return {}; - }; - - for (const auto& container : getResponse.list()) { - auto state = getProperty(container, "state"); - if (state.error() == EError::ContainerDoesNotExist) { - HandleResult(container.name(), state); - } else if (state.value() == "dead" || state.value() == "stopped") { - HandleResult(container.name(), getProperty(container, "exit_status")); - } - //TODO(dcherednik): other states - } - } catch (const std::exception& ex) { - YT_LOG_ERROR(ex, "Fatal exception occurred while polling Porto"); - Failed_.Fire(TError(ex)); - } - } - - TString DoCreateVolume( - const TString& path, - const THashMap<TString, TString>& properties) - { - auto volume = path; - TMap<TString, TString> propertyMap(properties.begin(), properties.end()); - ExecuteApiCall( - [&] { return Api_->CreateVolume(volume, propertyMap); }, - "CreateVolume", - /*idempotent*/ false); - return volume; - } - - void DoLinkVolume(const TString& path, const TString& container) - { - ExecuteApiCall( - [&] { return Api_->LinkVolume(path, container); }, - "LinkVolume", - /*idempotent*/ false); - } - - void DoUnlinkVolume(const TString& path, const TString& container) - { - ExecuteApiCall( - [&] { return Api_->UnlinkVolume(path, container); }, - "UnlinkVolume", - /*idempotent*/ false); - } - - std::vector<TString> DoListVolumePaths() - { - TVector<TString> volumes; - ExecuteApiCall( - [&] { return Api_->ListVolumes(volumes); }, - "ListVolume", - /*idempotent*/ true); - return {volumes.begin(), volumes.end()}; - } - - void DoImportLayer(const TString& archivePath, const TString& layerId, const TString& place) - { - ExecuteApiCall( - [&] { return Api_->ImportLayer(layerId, archivePath, false, place); }, - "ImportLayer", - /*idempotent*/ false); - } - - void DoRemoveLayer(const TString& layerId, const TString& place, bool async) - { - ExecuteApiCall( - [&] { return Api_->RemoveLayer(layerId, place, async); }, - "RemoveLayer", - /*idempotent*/ false); - } - - std::vector<TString> DoListLayers(const TString& place) - { - TVector<TString> layers; - ExecuteApiCall( - [&] { return Api_->ListLayers(layers, place); }, - "ListLayers", - /*idempotent*/ true); - return {layers.begin(), layers.end()}; - } - - TCommandEntry* GetCommandEntry(const TString& command) - { - auto guard = Guard(CommandLock_); - if (auto it = CommandToEntry_.find(command)) { - return &it->second; - } - return &CommandToEntry_.emplace(command, TCommandEntry(Profiler_.WithTag("command", command))).first->second; - } - - void ExecuteApiCall( - std::function<EError()> callback, - const TString& command, - bool idempotent) - { - YT_LOG_DEBUG("Porto API call started (Command: %v)", command); - - if (IsTestPortoTimeout()) { - YT_LOG_DEBUG("Testing Porto timeout (Command: %v)", command); - - auto config = DynamicConfig_.Acquire(); - TDelayedExecutor::WaitForDuration(config->ApiTimeout); - - THROW_ERROR CreatePortoError(GetFailedStubError(), "Porto timeout"); - } - - if (IsTestPortoFailureEnabled()) { - YT_LOG_DEBUG("Testing Porto failure (Command: %v)", command); - THROW_ERROR CreatePortoError(GetFailedStubError(), "Porto stub error"); - } - - auto* entry = GetCommandEntry(command); - auto startTime = NProfiling::GetInstant(); - while (true) { - EError error; - - { - NProfiling::TWallTimer timer; - error = callback(); - entry->TimeGauge.Record(timer.GetElapsedTime()); - } - - if (error == EError::Success) { - entry->SuccessCounter.Increment(); - break; - } - - entry->FailureCounter.Increment(); - HandleApiError(command, startTime, idempotent); - - YT_LOG_DEBUG("Sleeping and retrying Porto API call (Command: %v)", command); - entry->RetryCounter.Increment(); - - TDelayedExecutor::WaitForDuration(RetryInterval); - } - - YT_LOG_DEBUG("Porto API call completed (Command: %v)", command); - } - - void HandleApiError( - const TString& command, - TInstant startTime, - bool idempotent) - { - TString errorMessage; - auto error = ConvertPortoErrorCode(Api_->GetLastError(errorMessage)); - - // These errors are typical during job cleanup: we might try to kill a container that is already stopped. - bool debug = (error == EPortoErrorCode::ContainerDoesNotExist || error == EPortoErrorCode::InvalidState); - YT_LOG_EVENT( - Logger, - debug ? NLogging::ELogLevel::Debug : NLogging::ELogLevel::Error, - "Porto API call error (Error: %v, Command: %v, Message: %v)", - error, - command, - errorMessage); - - if (!IsRetriableErrorCode(error, idempotent) || NProfiling::GetInstant() - startTime > Config_->RetriesTimeout) { - THROW_ERROR CreatePortoError(error, errorMessage); - } - } - - void HandleResult(const TString& container, const Porto::TGetResponse::TContainerGetValueResponse& rsp) - { - auto portoErrorCode = ConvertPortoErrorCode(rsp.error()); - auto it = ContainerMap_.find(container); - if (it == ContainerMap_.end()) { - YT_LOG_ERROR("Got an unexpected container " - "(Container: %v, ResponseError: %v, ErrorMessage: %v, Value: %v)", - container, - portoErrorCode, - rsp.errormsg(), - rsp.value()); - return; - } else { - if (portoErrorCode != EPortoErrorCode::Success) { - YT_LOG_ERROR("Container finished with Porto API error " - "(Container: %v, ResponseError: %v, ErrorMessage: %v, Value: %v)", - container, - portoErrorCode, - rsp.errormsg(), - rsp.value()); - it->second.Set(CreatePortoError(portoErrorCode, rsp.errormsg())); - } else { - try { - int exitStatus = std::stoi(rsp.value()); - YT_LOG_DEBUG("Container finished with exit code (Container: %v, ExitCode: %v)", - container, - exitStatus); - - it->second.Set(exitStatus); - } catch (const std::exception& ex) { - it->second.Set(TError("Failed to parse Porto exit status") << ex); - } - } - } - RemoveFromPoller(container); - } - - void RemoveFromPoller(const TString& container) - { - ContainerMap_.erase(container); - - Containers_.clear(); - for (const auto& [name, pid] : ContainerMap_) { - Containers_.push_back(name); - } - } -}; - -const std::vector<TString> TPortoExecutor::ContainerRequestVars_ = { - "state", - "exit_status" -}; - -//////////////////////////////////////////////////////////////////////////////// - -IPortoExecutorPtr CreatePortoExecutor( - TPortoExecutorDynamicConfigPtr config, - const TString& threadNameSuffix, - const NProfiling::TProfiler& profiler) -{ - return New<TPortoExecutor>( - std::move(config), - threadNameSuffix, - profiler); -} - -//////////////////////////////////////////////////////////////////////////////// - -#else - -IPortoExecutorPtr CreatePortoExecutor( - TPortoExecutorDynamicConfigPtr /* config */, - const TString& /* threadNameSuffix */, - const NProfiling::TProfiler& /* profiler */) -{ - THROW_ERROR_EXCEPTION("Porto executor is not available on this platform"); -} - -#endif - -//////////////////////////////////////////////////////////////////////////////// - -} // namespace NYT::NContainers diff --git a/yt/yt/library/containers/porto_executor.h b/yt/yt/library/containers/porto_executor.h deleted file mode 100644 index d629ab6275..0000000000 --- a/yt/yt/library/containers/porto_executor.h +++ /dev/null @@ -1,142 +0,0 @@ -#pragma once - -#include "public.h" - -#include <yt/yt/library/profiling/sensor.h> - -#include <yt/yt/core/actions/future.h> -#include <yt/yt/core/actions/signal.h> - -#include <yt/yt/core/net/address.h> - -#include <library/cpp/porto/libporto.hpp> - -namespace NYT::NContainers { - -//////////////////////////////////////////////////////////////////////////////// - -struct TVolumeId -{ - TString Path; -}; - -//////////////////////////////////////////////////////////////////////////////// - -struct TRunnableContainerSpec -{ - TString Name; - TString Command; - - EEnablePorto EnablePorto = EEnablePorto::None; - bool Isolate = true; - - std::optional<TString> StdinPath; - std::optional<TString> StdoutPath; - std::optional<TString> StderrPath; - std::optional<TString> CurrentWorkingDirectory; - std::optional<TString> CoreCommand; - std::optional<TString> User; - std::optional<int> GroupId; - - bool EnableCoreDumps = true; - - std::optional<i64> ThreadLimit; - - std::optional<TString> HostName; - std::vector<NYT::NNet::TIP6Address> IPAddresses; - bool EnableNat64 = false; - bool DisableNetwork = false; - - THashMap<TString, TString> Labels; - THashMap<TString, TString> Env; - std::vector<TString> CGroupControllers; - std::vector<TDevice> Devices; - std::optional<TRootFS> RootFS; -}; - -//////////////////////////////////////////////////////////////////////////////// - -struct IPortoExecutor - : public TRefCounted -{ - virtual void OnDynamicConfigChanged(const TPortoExecutorDynamicConfigPtr& newConfig) = 0; - - virtual TFuture<void> CreateContainer(const TString& container) = 0; - - virtual TFuture<void> CreateContainer(const TRunnableContainerSpec& containerSpec, bool start) = 0; - - virtual TFuture<void> SetContainerProperty( - const TString& container, - const TString& property, - const TString& value) = 0; - - virtual TFuture<std::optional<TString>> GetContainerProperty( - const TString& container, - const TString& property) = 0; - - virtual TFuture<THashMap<TString, TErrorOr<TString>>> GetContainerProperties( - const TString& container, - const std::vector<TString>& properties) = 0; - virtual TFuture<THashMap<TString, THashMap<TString, TErrorOr<TString>>>> GetContainerProperties( - const std::vector<TString>& containers, - const std::vector<TString>& properties) = 0; - - virtual TFuture<THashMap<TString, i64>> GetContainerMetrics( - const std::vector<TString>& containers, - const TString& metric) = 0; - virtual TFuture<void> DestroyContainer(const TString& container) = 0; - virtual TFuture<void> StopContainer(const TString& container) = 0; - virtual TFuture<void> StartContainer(const TString& container) = 0; - virtual TFuture<void> KillContainer(const TString& container, int signal) = 0; - - virtual TFuture<TString> ConvertPath(const TString& path, const TString& container) = 0; - - // Returns absolute names of immediate children only. - virtual TFuture<std::vector<TString>> ListSubcontainers( - const TString& rootContainer, - bool includeRoot) = 0; - // Starts polling a given container, returns future with exit code of finished process. - virtual TFuture<int> PollContainer(const TString& container) = 0; - - // Returns future with exit code of finished process. - // NB: temporarily broken, see https://st.yandex-team.ru/PORTO-846 for details. - virtual TFuture<int> WaitContainer(const TString& container) = 0; - - virtual TFuture<TString> CreateVolume( - const TString& path, - const THashMap<TString, TString>& properties) = 0; - virtual TFuture<void> LinkVolume( - const TString& path, - const TString& name) = 0; - virtual TFuture<void> UnlinkVolume( - const TString& path, - const TString& name) = 0; - virtual TFuture<std::vector<TString>> ListVolumePaths() = 0; - - virtual TFuture<void> ImportLayer( - const TString& archivePath, - const TString& layerId, - const TString& place) = 0; - virtual TFuture<void> RemoveLayer( - const TString& layerId, - const TString& place, - bool async) = 0; - virtual TFuture<std::vector<TString>> ListLayers(const TString& place) = 0; - - virtual IInvokerPtr GetInvoker() const = 0; - - DECLARE_INTERFACE_SIGNAL(void(const TError&), Failed); -}; - -DEFINE_REFCOUNTED_TYPE(IPortoExecutor) - -//////////////////////////////////////////////////////////////////////////////// - -IPortoExecutorPtr CreatePortoExecutor( - TPortoExecutorDynamicConfigPtr config, - const TString& threadNameSuffix, - const NProfiling::TProfiler& profiler = {}); - -//////////////////////////////////////////////////////////////////////////////// - -} // namespace NYT::NContainers diff --git a/yt/yt/library/containers/porto_health_checker.cpp b/yt/yt/library/containers/porto_health_checker.cpp deleted file mode 100644 index 5a5d358441..0000000000 --- a/yt/yt/library/containers/porto_health_checker.cpp +++ /dev/null @@ -1,69 +0,0 @@ - -#include "porto_health_checker.h" - -#include "porto_executor.h" -#include "private.h" -#include "config.h" - -#include <yt/yt/core/actions/future.h> - -#include <yt/yt/core/misc/fs.h> - -#include <util/random/random.h> - -namespace NYT::NContainers { - -using namespace NConcurrency; -using namespace NLogging; -using namespace NProfiling; - -//////////////////////////////////////////////////////////////////////////////// - -TPortoHealthChecker::TPortoHealthChecker( - TPortoExecutorDynamicConfigPtr config, - IInvokerPtr invoker, - TLogger logger) - : Config_(std::move(config)) - , Logger(std::move(logger)) - , CheckInvoker_(std::move(invoker)) - , Executor_(CreatePortoExecutor( - Config_, - "porto_check")) -{ } - -void TPortoHealthChecker::Start() -{ - YT_LOG_DEBUG("Porto health checker started"); - - PeriodicExecutor_ = New<TPeriodicExecutor>( - CheckInvoker_, - BIND(&TPortoHealthChecker::OnCheck, MakeWeak(this)), - Config_->RetriesTimeout); - PeriodicExecutor_->Start(); -} - -void TPortoHealthChecker::OnDynamicConfigChanged(const TPortoExecutorDynamicConfigPtr& newConfig) -{ - YT_LOG_DEBUG( - "Porto health checker dynamic config changed (EnableTestPortoFailures: %v, StubErrorCode: %v)", - Config_->EnableTestPortoFailures, - Config_->StubErrorCode); - - Executor_->OnDynamicConfigChanged(newConfig); -} - -void TPortoHealthChecker::OnCheck() -{ - YT_LOG_DEBUG("Run Porto health check"); - - auto result = WaitFor(Executor_->ListVolumePaths().AsVoid()); - if (result.IsOK()) { - Success_.Fire(); - } else { - Failed_.Fire(result); - } -} - -//////////////////////////////////////////////////////////////////////////////// - -} // namespace NYT::NContainers diff --git a/yt/yt/library/containers/porto_health_checker.h b/yt/yt/library/containers/porto_health_checker.h deleted file mode 100644 index f0fb8f0908..0000000000 --- a/yt/yt/library/containers/porto_health_checker.h +++ /dev/null @@ -1,52 +0,0 @@ -#pragma once - -#include "public.h" - -#include <yt/yt/library/profiling/sensor.h> - -#include <yt/yt/core/actions/signal.h> - -#include <yt/yt/core/concurrency/periodic_executor.h> - -#include <yt/yt/core/logging/log.h> - -#include <yt/yt/core/misc/error.h> - -#include <atomic> - -namespace NYT::NContainers { - -//////////////////////////////////////////////////////////////////////////////// - -class TPortoHealthChecker - : public TRefCounted -{ -public: - TPortoHealthChecker( - TPortoExecutorDynamicConfigPtr config, - IInvokerPtr invoker, - NLogging::TLogger logger); - - void Start(); - - void OnDynamicConfigChanged(const TPortoExecutorDynamicConfigPtr& newConfig); - - DEFINE_SIGNAL(void(), Success); - - DEFINE_SIGNAL(void(const TError&), Failed); - -private: - const TPortoExecutorDynamicConfigPtr Config_; - const NLogging::TLogger Logger; - const IInvokerPtr CheckInvoker_; - const IPortoExecutorPtr Executor_; - NConcurrency::TPeriodicExecutorPtr PeriodicExecutor_; - - void OnCheck(); -}; - -DEFINE_REFCOUNTED_TYPE(TPortoHealthChecker) - -//////////////////////////////////////////////////////////////////////////////// - -} // namespace NYT::NContainers diff --git a/yt/yt/library/containers/porto_resource_tracker.cpp b/yt/yt/library/containers/porto_resource_tracker.cpp deleted file mode 100644 index c1fe48d6af..0000000000 --- a/yt/yt/library/containers/porto_resource_tracker.cpp +++ /dev/null @@ -1,711 +0,0 @@ -#include "porto_resource_tracker.h" -#include "private.h" - -#include <yt/yt/core/logging/log.h> - -#include <yt/yt/core/misc/error.h> - -#include <yt/yt/core/net/address.h> - -#include <yt/yt/core/ytree/public.h> - -#include <yt/yt/library/process/process.h> - -#include <yt/yt/library/containers/cgroup.h> -#include <yt/yt/library/containers/config.h> -#include <yt/yt/library/containers/instance.h> -#include <yt/yt/library/containers/porto_executor.h> -#include <yt/yt/library/containers/public.h> - -namespace NYT::NContainers { - -using namespace NProfiling; - -static const auto& Logger = ContainersLogger; - -#ifdef _linux_ - -//////////////////////////////////////////////////////////////////////////////// - -struct TPortoProfilers - : public TRefCounted -{ - TPortoResourceProfilerPtr DaemonProfiler; - TPortoResourceProfilerPtr ContainerProfiler; - - TPortoProfilers( - TPortoResourceProfilerPtr daemonProfiler, - TPortoResourceProfilerPtr containerProfiler) - : DaemonProfiler(std::move(daemonProfiler)) - , ContainerProfiler(std::move(containerProfiler)) - { } -}; - -DEFINE_REFCOUNTED_TYPE(TPortoProfilers) - -//////////////////////////////////////////////////////////////////////////////// - -static TErrorOr<ui64> GetFieldOrError( - const TResourceUsage& usage, - EStatField field) -{ - auto it = usage.find(field); - if (it == usage.end()) { - return TError("Resource usage is missing %Qlv field", field); - } - const auto& errorOrValue = it->second; - if (errorOrValue.FindMatching(EPortoErrorCode::NotSupported)) { - return TError("Property %Qlv not supported in Porto response", field); - } - return errorOrValue; -} - -//////////////////////////////////////////////////////////////////////////////// - -TPortoResourceTracker::TPortoResourceTracker( - IInstancePtr instance, - TDuration updatePeriod, - bool isDeltaTracker, - bool isForceUpdate) - : Instance_(std::move(instance)) - , UpdatePeriod_(updatePeriod) - , IsDeltaTracker_(isDeltaTracker) - , IsForceUpdate_(isForceUpdate) -{ - ResourceUsage_ = { - {EStatField::IOReadByte, 0}, - {EStatField::IOWriteByte, 0}, - {EStatField::IOBytesLimit, 0}, - {EStatField::IOReadOps, 0}, - {EStatField::IOWriteOps, 0}, - {EStatField::IOOps, 0}, - {EStatField::IOOpsLimit, 0}, - {EStatField::IOTotalTime, 0}, - {EStatField::IOWaitTime, 0} - }; - ResourceUsageDelta_ = ResourceUsage_; -} - -static TErrorOr<TDuration> ExtractDuration(TErrorOr<ui64> timeNs) -{ - if (timeNs.IsOK()) { - return TErrorOr<TDuration>(TDuration::MicroSeconds(timeNs.Value() / 1000)); - } else { - return TError(timeNs); - } -} - -TCpuStatistics TPortoResourceTracker::ExtractCpuStatistics(const TResourceUsage& resourceUsage) const -{ - // NB: Job proxy uses last sample of CPU statistics but we are interested in - // peak thread count value. - auto currentThreadCountPeak = GetFieldOrError(resourceUsage, EStatField::ThreadCount); - - PeakThreadCount_ = currentThreadCountPeak.IsOK() && PeakThreadCount_.IsOK() - ? std::max<ui64>( - PeakThreadCount_.Value(), - currentThreadCountPeak.Value()) - : currentThreadCountPeak.IsOK() ? currentThreadCountPeak : PeakThreadCount_; - - auto totalTimeNs = GetFieldOrError(resourceUsage, EStatField::CpuUsage); - auto systemTimeNs = GetFieldOrError(resourceUsage, EStatField::CpuSystemUsage); - auto userTimeNs = GetFieldOrError(resourceUsage, EStatField::CpuUserUsage); - auto waitTimeNs = GetFieldOrError(resourceUsage, EStatField::CpuWait); - auto throttledNs = GetFieldOrError(resourceUsage, EStatField::CpuThrottled); - auto limitTimeNs = GetFieldOrError(resourceUsage, EStatField::CpuLimit); - auto guaranteeTimeNs = GetFieldOrError(resourceUsage, EStatField::CpuGuarantee); - - return TCpuStatistics{ - .TotalUsageTime = ExtractDuration(totalTimeNs), - .UserUsageTime = ExtractDuration(userTimeNs), - .SystemUsageTime = ExtractDuration(systemTimeNs), - .WaitTime = ExtractDuration(waitTimeNs), - .ThrottledTime = ExtractDuration(throttledNs), - .ThreadCount = GetFieldOrError(resourceUsage, EStatField::ThreadCount), - .ContextSwitches = GetFieldOrError(resourceUsage, EStatField::ContextSwitches), - .ContextSwitchesDelta = GetFieldOrError(resourceUsage, EStatField::ContextSwitchesDelta), - .PeakThreadCount = PeakThreadCount_, - .LimitTime = ExtractDuration(limitTimeNs), - .GuaranteeTime = ExtractDuration(guaranteeTimeNs), - }; -} - -TMemoryStatistics TPortoResourceTracker::ExtractMemoryStatistics(const TResourceUsage& resourceUsage) const -{ - return TMemoryStatistics{ - .Rss = GetFieldOrError(resourceUsage, EStatField::Rss), - .MappedFile = GetFieldOrError(resourceUsage, EStatField::MappedFile), - .MinorPageFaults = GetFieldOrError(resourceUsage, EStatField::MinorPageFaults), - .MajorPageFaults = GetFieldOrError(resourceUsage, EStatField::MajorPageFaults), - .FileCacheUsage = GetFieldOrError(resourceUsage, EStatField::FileCacheUsage), - .AnonUsage = GetFieldOrError(resourceUsage, EStatField::AnonMemoryUsage), - .AnonLimit = GetFieldOrError(resourceUsage, EStatField::AnonMemoryLimit), - .MemoryUsage = GetFieldOrError(resourceUsage, EStatField::MemoryUsage), - .MemoryGuarantee = GetFieldOrError(resourceUsage, EStatField::MemoryGuarantee), - .MemoryLimit = GetFieldOrError(resourceUsage, EStatField::MemoryLimit), - .MaxMemoryUsage = GetFieldOrError(resourceUsage, EStatField::MaxMemoryUsage), - .OomKills = GetFieldOrError(resourceUsage, EStatField::OomKills), - .OomKillsTotal = GetFieldOrError(resourceUsage, EStatField::OomKillsTotal) - }; -} - -TBlockIOStatistics TPortoResourceTracker::ExtractBlockIOStatistics(const TResourceUsage& resourceUsage) const -{ - auto totalTimeNs = GetFieldOrError(resourceUsage, EStatField::IOTotalTime); - auto waitTimeNs = GetFieldOrError(resourceUsage, EStatField::IOWaitTime); - - return TBlockIOStatistics{ - .IOReadByte = GetFieldOrError(resourceUsage, EStatField::IOReadByte), - .IOWriteByte = GetFieldOrError(resourceUsage, EStatField::IOWriteByte), - .IOBytesLimit = GetFieldOrError(resourceUsage, EStatField::IOBytesLimit), - .IOReadOps = GetFieldOrError(resourceUsage, EStatField::IOReadOps), - .IOWriteOps = GetFieldOrError(resourceUsage, EStatField::IOWriteOps), - .IOOps = GetFieldOrError(resourceUsage, EStatField::IOOps), - .IOOpsLimit = GetFieldOrError(resourceUsage, EStatField::IOOpsLimit), - .IOTotalTime = ExtractDuration(totalTimeNs), - .IOWaitTime = ExtractDuration(waitTimeNs) - }; -} - -TNetworkStatistics TPortoResourceTracker::ExtractNetworkStatistics(const TResourceUsage& resourceUsage) const -{ - return TNetworkStatistics{ - .TxBytes = GetFieldOrError(resourceUsage, EStatField::NetTxBytes), - .TxPackets = GetFieldOrError(resourceUsage, EStatField::NetTxPackets), - .TxDrops = GetFieldOrError(resourceUsage, EStatField::NetTxDrops), - .TxLimit = GetFieldOrError(resourceUsage, EStatField::NetTxLimit), - - .RxBytes = GetFieldOrError(resourceUsage, EStatField::NetRxBytes), - .RxPackets = GetFieldOrError(resourceUsage, EStatField::NetRxPackets), - .RxDrops = GetFieldOrError(resourceUsage, EStatField::NetRxDrops), - .RxLimit = GetFieldOrError(resourceUsage, EStatField::NetRxLimit), - }; -} - -TTotalStatistics TPortoResourceTracker::ExtractTotalStatistics(const TResourceUsage& resourceUsage) const -{ - return TTotalStatistics{ - .CpuStatistics = ExtractCpuStatistics(resourceUsage), - .MemoryStatistics = ExtractMemoryStatistics(resourceUsage), - .BlockIOStatistics = ExtractBlockIOStatistics(resourceUsage), - .NetworkStatistics = ExtractNetworkStatistics(resourceUsage), - }; -} - -TCpuStatistics TPortoResourceTracker::GetCpuStatistics() const -{ - return GetStatistics( - CachedCpuStatistics_, - "CPU", - [&] (TResourceUsage& resourceUsage) { - return ExtractCpuStatistics(resourceUsage); - }); -} - -TMemoryStatistics TPortoResourceTracker::GetMemoryStatistics() const -{ - return GetStatistics( - CachedMemoryStatistics_, - "memory", - [&] (TResourceUsage& resourceUsage) { - return ExtractMemoryStatistics(resourceUsage); - }); -} - -TBlockIOStatistics TPortoResourceTracker::GetBlockIOStatistics() const -{ - return GetStatistics( - CachedBlockIOStatistics_, - "block IO", - [&] (TResourceUsage& resourceUsage) { - return ExtractBlockIOStatistics(resourceUsage); - }); -} - -TNetworkStatistics TPortoResourceTracker::GetNetworkStatistics() const -{ - return GetStatistics( - CachedNetworkStatistics_, - "network", - [&] (TResourceUsage& resourceUsage) { - return ExtractNetworkStatistics(resourceUsage); - }); -} - -TTotalStatistics TPortoResourceTracker::GetTotalStatistics() const -{ - return GetStatistics( - CachedTotalStatistics_, - "total", - [&] (TResourceUsage& resourceUsage) { - return ExtractTotalStatistics(resourceUsage); - }); -} - -template <class T, class F> -T TPortoResourceTracker::GetStatistics( - std::optional<T>& cachedStatistics, - const TString& statisticsKind, - F extractor) const -{ - UpdateResourceUsageStatisticsIfExpired(); - - auto guard = Guard(SpinLock_); - try { - auto newStatistics = extractor(IsDeltaTracker_ ? ResourceUsageDelta_ : ResourceUsage_); - cachedStatistics = newStatistics; - return newStatistics; - } catch (const std::exception& ex) { - if (!cachedStatistics) { - THROW_ERROR_EXCEPTION("Unable to get %v statistics", statisticsKind) - << ex; - } - YT_LOG_WARNING(ex, "Unable to get %v statistics; using the last one", statisticsKind); - return *cachedStatistics; - } -} - -bool TPortoResourceTracker::AreResourceUsageStatisticsExpired() const -{ - return TInstant::Now() - LastUpdateTime_.load() > UpdatePeriod_; -} - -TInstant TPortoResourceTracker::GetLastUpdateTime() const -{ - return LastUpdateTime_.load(); -} - -void TPortoResourceTracker::UpdateResourceUsageStatisticsIfExpired() const -{ - if (IsForceUpdate_ || AreResourceUsageStatisticsExpired()) { - DoUpdateResourceUsage(); - } -} - -TErrorOr<ui64> TPortoResourceTracker::CalculateCounterDelta( - const TErrorOr<ui64>& oldValue, - const TErrorOr<ui64>& newValue) const -{ - if (oldValue.IsOK() && newValue.IsOK()) { - return newValue.Value() - oldValue.Value(); - } else if (newValue.IsOK()) { - // It is better to return an error than an incorrect value. - return oldValue; - } else { - return newValue; - } -} - -static bool IsCumulativeStatistics(EStatField statistic) -{ - return - statistic == EStatField::CpuUsage || - statistic == EStatField::CpuUserUsage || - statistic == EStatField::CpuSystemUsage || - statistic == EStatField::CpuWait || - statistic == EStatField::CpuThrottled || - - statistic == EStatField::ContextSwitches || - - statistic == EStatField::MinorPageFaults || - statistic == EStatField::MajorPageFaults || - - statistic == EStatField::IOReadByte || - statistic == EStatField::IOWriteByte || - statistic == EStatField::IOReadOps || - statistic == EStatField::IOWriteOps || - statistic == EStatField::IOOps || - statistic == EStatField::IOTotalTime || - statistic == EStatField::IOWaitTime || - - statistic == EStatField::NetTxBytes || - statistic == EStatField::NetTxPackets || - statistic == EStatField::NetTxDrops || - statistic == EStatField::NetRxBytes || - statistic == EStatField::NetRxPackets || - statistic == EStatField::NetRxDrops; -} - -void TPortoResourceTracker::ReCalculateResourceUsage(const TResourceUsage& newResourceUsage) const -{ - auto guard = Guard(SpinLock_); - - TResourceUsage resourceUsage; - TResourceUsage resourceUsageDelta; - - for (const auto& stat : InstanceStatFields) { - TErrorOr<ui64> oldValue; - TErrorOr<ui64> newValue; - - if (auto newValueIt = newResourceUsage.find(stat); newValueIt.IsEnd()) { - newValue = TError("Missing property %Qlv in Porto response", stat) - << TErrorAttribute("container", Instance_->GetName()); - } else { - newValue = newValueIt->second; - } - - if (auto oldValueIt = ResourceUsage_.find(stat); oldValueIt.IsEnd()) { - oldValue = newValue; - } else { - oldValue = oldValueIt->second; - } - - if (newValue.IsOK()) { - resourceUsage[stat] = newValue; - } else { - resourceUsage[stat] = oldValue; - } - - if (IsCumulativeStatistics(stat)) { - resourceUsageDelta[stat] = CalculateCounterDelta(oldValue, newValue); - } else { - if (newValue.IsOK()) { - resourceUsageDelta[stat] = newValue; - } else { - resourceUsageDelta[stat] = oldValue; - } - } - } - - ResourceUsage_ = resourceUsage; - ResourceUsageDelta_ = resourceUsageDelta; - LastUpdateTime_.store(TInstant::Now()); -} - -void TPortoResourceTracker::DoUpdateResourceUsage() const -{ - try { - ReCalculateResourceUsage(Instance_->GetResourceUsage()); - } catch (const std::exception& ex) { - YT_LOG_ERROR( - ex, - "Couldn't get metrics from Porto"); - } -} - -//////////////////////////////////////////////////////////////////////////////// - -TPortoResourceProfiler::TPortoResourceProfiler( - TPortoResourceTrackerPtr tracker, - TPodSpecConfigPtr podSpec, - const TProfiler& profiler) - : ResourceTracker_(std::move(tracker)) - , PodSpec_(std::move(podSpec)) -{ - profiler.AddProducer("", MakeStrong(this)); -} - -static void WriteGaugeIfOk( - ISensorWriter* writer, - const TString& path, - TErrorOr<ui64> valueOrError) -{ - if (valueOrError.IsOK()) { - i64 value = static_cast<i64>(valueOrError.Value()); - - if (value >= 0) { - writer->AddGauge(path, value); - } - } -} - -static void WriteCumulativeGaugeIfOk( - ISensorWriter* writer, - const TString& path, - TErrorOr<ui64> valueOrError, - i64 timeDeltaUsec) -{ - if (valueOrError.IsOK()) { - i64 value = static_cast<i64>(valueOrError.Value()); - - if (value >= 0) { - writer->AddGauge(path, - 1.0 * value * ResourceUsageUpdatePeriod.MicroSeconds() / timeDeltaUsec); - } - } -} - -void TPortoResourceProfiler::WriteCpuMetrics( - ISensorWriter* writer, - TTotalStatistics& totalStatistics, - i64 timeDeltaUsec) -{ - { - if (totalStatistics.CpuStatistics.UserUsageTime.IsOK()) { - i64 userUsageTimeUs = totalStatistics.CpuStatistics.UserUsageTime.Value().MicroSeconds(); - double userUsagePercent = std::max<double>(0.0, 100. * userUsageTimeUs / timeDeltaUsec); - writer->AddGauge("/cpu/user", userUsagePercent); - } - - if (totalStatistics.CpuStatistics.SystemUsageTime.IsOK()) { - i64 systemUsageTimeUs = totalStatistics.CpuStatistics.SystemUsageTime.Value().MicroSeconds(); - double systemUsagePercent = std::max<double>(0.0, 100. * systemUsageTimeUs / timeDeltaUsec); - writer->AddGauge("/cpu/system", systemUsagePercent); - } - - if (totalStatistics.CpuStatistics.WaitTime.IsOK()) { - i64 waitTimeUs = totalStatistics.CpuStatistics.WaitTime.Value().MicroSeconds(); - double waitPercent = std::max<double>(0.0, 100. * waitTimeUs / timeDeltaUsec); - writer->AddGauge("/cpu/wait", waitPercent); - } - - if (totalStatistics.CpuStatistics.ThrottledTime.IsOK()) { - i64 throttledTimeUs = totalStatistics.CpuStatistics.ThrottledTime.Value().MicroSeconds(); - double throttledPercent = std::max<double>(0.0, 100. * throttledTimeUs / timeDeltaUsec); - writer->AddGauge("/cpu/throttled", throttledPercent); - } - - if (totalStatistics.CpuStatistics.TotalUsageTime.IsOK()) { - i64 totalUsageTimeUs = totalStatistics.CpuStatistics.TotalUsageTime.Value().MicroSeconds(); - double totalUsagePercent = std::max<double>(0.0, 100. * totalUsageTimeUs / timeDeltaUsec); - writer->AddGauge("/cpu/total", totalUsagePercent); - } - - if (totalStatistics.CpuStatistics.GuaranteeTime.IsOK()) { - i64 guaranteeTimeUs = totalStatistics.CpuStatistics.GuaranteeTime.Value().MicroSeconds(); - double guaranteePercent = std::max<double>(0.0, (100. * guaranteeTimeUs) / (1'000'000L)); - writer->AddGauge("/cpu/guarantee", guaranteePercent); - } - - if (totalStatistics.CpuStatistics.LimitTime.IsOK()) { - i64 limitTimeUs = totalStatistics.CpuStatistics.LimitTime.Value().MicroSeconds(); - double limitPercent = std::max<double>(0.0, (100. * limitTimeUs) / (1'000'000L)); - writer->AddGauge("/cpu/limit", limitPercent); - } - } - - if (PodSpec_->CpuToVCpuFactor) { - auto factor = *PodSpec_->CpuToVCpuFactor; - - writer->AddGauge("/cpu_to_vcpu_factor", factor); - - if (totalStatistics.CpuStatistics.UserUsageTime.IsOK()) { - i64 userUsageTimeUs = totalStatistics.CpuStatistics.UserUsageTime.Value().MicroSeconds(); - double userUsagePercent = std::max<double>(0.0, 100. * userUsageTimeUs * factor / timeDeltaUsec); - writer->AddGauge("/vcpu/user", userUsagePercent); - } - - if (totalStatistics.CpuStatistics.SystemUsageTime.IsOK()) { - i64 systemUsageTimeUs = totalStatistics.CpuStatistics.SystemUsageTime.Value().MicroSeconds(); - double systemUsagePercent = std::max<double>(0.0, 100. * systemUsageTimeUs * factor / timeDeltaUsec); - writer->AddGauge("/vcpu/system", systemUsagePercent); - } - - if (totalStatistics.CpuStatistics.WaitTime.IsOK()) { - i64 waitTimeUs = totalStatistics.CpuStatistics.WaitTime.Value().MicroSeconds(); - double waitPercent = std::max<double>(0.0, 100. * waitTimeUs * factor / timeDeltaUsec); - writer->AddGauge("/vcpu/wait", waitPercent); - } - - if (totalStatistics.CpuStatistics.ThrottledTime.IsOK()) { - i64 throttledTimeUs = totalStatistics.CpuStatistics.ThrottledTime.Value().MicroSeconds(); - double throttledPercent = std::max<double>(0.0, 100. * throttledTimeUs * factor / timeDeltaUsec); - writer->AddGauge("/vcpu/throttled", throttledPercent); - } - - if (totalStatistics.CpuStatistics.TotalUsageTime.IsOK()) { - i64 totalUsageTimeUs = totalStatistics.CpuStatistics.TotalUsageTime.Value().MicroSeconds(); - double totalUsagePercent = std::max<double>(0.0, 100. * totalUsageTimeUs * factor / timeDeltaUsec); - writer->AddGauge("/vcpu/total", totalUsagePercent); - } - - if (totalStatistics.CpuStatistics.GuaranteeTime.IsOK()) { - i64 guaranteeTimeUs = totalStatistics.CpuStatistics.GuaranteeTime.Value().MicroSeconds(); - double guaranteePercent = std::max<double>(0.0, 100. * guaranteeTimeUs * factor / 1'000'000L); - writer->AddGauge("/vcpu/guarantee", guaranteePercent); - } - - if (totalStatistics.CpuStatistics.LimitTime.IsOK()) { - i64 limitTimeUs = totalStatistics.CpuStatistics.LimitTime.Value().MicroSeconds(); - double limitPercent = std::max<double>(0.0, 100. * limitTimeUs * factor / 1'000'000L); - writer->AddGauge("/vcpu/limit", limitPercent); - } - } - - WriteGaugeIfOk(writer, "/cpu/thread_count", totalStatistics.CpuStatistics.ThreadCount); - WriteGaugeIfOk(writer, "/cpu/context_switches", totalStatistics.CpuStatistics.ContextSwitches); -} - -void TPortoResourceProfiler::WriteMemoryMetrics( - ISensorWriter* writer, - TTotalStatistics& totalStatistics, - i64 timeDeltaUsec) -{ - WriteCumulativeGaugeIfOk(writer, - "/memory/minor_page_faults", - totalStatistics.MemoryStatistics.MinorPageFaults, - timeDeltaUsec); - WriteCumulativeGaugeIfOk(writer, - "/memory/major_page_faults", - totalStatistics.MemoryStatistics.MajorPageFaults, - timeDeltaUsec); - - WriteGaugeIfOk(writer, "/memory/oom_kills", totalStatistics.MemoryStatistics.OomKills); - WriteGaugeIfOk(writer, "/memory/oom_kills_total", totalStatistics.MemoryStatistics.OomKillsTotal); - - WriteGaugeIfOk(writer, "/memory/file_cache_usage", totalStatistics.MemoryStatistics.FileCacheUsage); - WriteGaugeIfOk(writer, "/memory/anon_usage", totalStatistics.MemoryStatistics.AnonUsage); - WriteGaugeIfOk(writer, "/memory/anon_limit", totalStatistics.MemoryStatistics.AnonLimit); - WriteGaugeIfOk(writer, "/memory/memory_usage", totalStatistics.MemoryStatistics.MemoryUsage); - WriteGaugeIfOk(writer, "/memory/memory_guarantee", totalStatistics.MemoryStatistics.MemoryGuarantee); - WriteGaugeIfOk(writer, "/memory/memory_limit", totalStatistics.MemoryStatistics.MemoryLimit); -} - -void TPortoResourceProfiler::WriteBlockingIOMetrics( - ISensorWriter* writer, - TTotalStatistics& totalStatistics, - i64 timeDeltaUsec) -{ - WriteCumulativeGaugeIfOk(writer, - "/io/read_bytes", - totalStatistics.BlockIOStatistics.IOReadByte, - timeDeltaUsec); - WriteCumulativeGaugeIfOk(writer, - "/io/write_bytes", - totalStatistics.BlockIOStatistics.IOWriteByte, - timeDeltaUsec); - WriteCumulativeGaugeIfOk(writer, - "/io/read_ops", - totalStatistics.BlockIOStatistics.IOReadOps, - timeDeltaUsec); - WriteCumulativeGaugeIfOk(writer, - "/io/write_ops", - totalStatistics.BlockIOStatistics.IOWriteOps, - timeDeltaUsec); - WriteCumulativeGaugeIfOk(writer, - "/io/ops", - totalStatistics.BlockIOStatistics.IOOps, - timeDeltaUsec); - - WriteGaugeIfOk(writer, - "/io/bytes_limit", - totalStatistics.BlockIOStatistics.IOBytesLimit); - WriteGaugeIfOk(writer, - "/io/ops_limit", - totalStatistics.BlockIOStatistics.IOOpsLimit); - - if (totalStatistics.BlockIOStatistics.IOTotalTime.IsOK()) { - i64 totalTimeUs = totalStatistics.BlockIOStatistics.IOTotalTime.Value().MicroSeconds(); - double totalPercent = std::max<double>(0.0, 100. * totalTimeUs / timeDeltaUsec); - writer->AddGauge("/io/total", totalPercent); - } - - if (totalStatistics.BlockIOStatistics.IOWaitTime.IsOK()) { - i64 waitTimeUs = totalStatistics.BlockIOStatistics.IOWaitTime.Value().MicroSeconds(); - double waitPercent = std::max<double>(0.0, 100. * waitTimeUs / timeDeltaUsec); - writer->AddGauge("/io/wait", waitPercent); - } -} - -void TPortoResourceProfiler::WriteNetworkMetrics( - ISensorWriter* writer, - TTotalStatistics& totalStatistics, - i64 timeDeltaUsec) -{ - WriteCumulativeGaugeIfOk( - writer, - "/network/rx_bytes", - totalStatistics.NetworkStatistics.RxBytes, - timeDeltaUsec); - WriteCumulativeGaugeIfOk( - writer, - "/network/rx_drops", - totalStatistics.NetworkStatistics.RxDrops, - timeDeltaUsec); - WriteCumulativeGaugeIfOk( - writer, - "/network/rx_packets", - totalStatistics.NetworkStatistics.RxPackets, - timeDeltaUsec); - WriteGaugeIfOk( - writer, - "/network/rx_limit", - totalStatistics.NetworkStatistics.RxLimit); - - WriteCumulativeGaugeIfOk( - writer, - "/network/tx_bytes", - totalStatistics.NetworkStatistics.TxBytes, - timeDeltaUsec); - WriteCumulativeGaugeIfOk( - writer, - "/network/tx_drops", - totalStatistics.NetworkStatistics.TxDrops, - timeDeltaUsec); - WriteCumulativeGaugeIfOk( - writer, - "/network/tx_packets", - totalStatistics.NetworkStatistics.TxPackets, - timeDeltaUsec); - WriteGaugeIfOk( - writer, - "/network/tx_limit", - totalStatistics.NetworkStatistics.TxLimit); -} - -void TPortoResourceProfiler::CollectSensors(ISensorWriter* writer) -{ - i64 lastUpdate = ResourceTracker_->GetLastUpdateTime().MicroSeconds(); - - auto totalStatistics = ResourceTracker_->GetTotalStatistics(); - i64 timeDeltaUsec = TInstant::Now().MicroSeconds() - lastUpdate; - - WriteCpuMetrics(writer, totalStatistics, timeDeltaUsec); - WriteMemoryMetrics(writer, totalStatistics, timeDeltaUsec); - WriteBlockingIOMetrics(writer, totalStatistics, timeDeltaUsec); - WriteNetworkMetrics(writer, totalStatistics, timeDeltaUsec); -} - -//////////////////////////////////////////////////////////////////////////////// - -TPortoResourceProfilerPtr CreatePortoProfilerWithTags( - const IInstancePtr& instance, - const TString containerCategory, - const TPodSpecConfigPtr& podSpec) -{ - auto portoResourceTracker = New<TPortoResourceTracker>( - instance, - ResourceUsageUpdatePeriod, - true, - true); - - return New<TPortoResourceProfiler>( - portoResourceTracker, - podSpec, - TProfiler("/porto") - .WithTag("container_category", containerCategory)); -} - -//////////////////////////////////////////////////////////////////////////////// - -#endif - -#ifdef __linux__ -void EnablePortoResourceTracker(const TPodSpecConfigPtr& podSpec) -{ - BIND([=] { - auto executor = CreatePortoExecutor(New<TPortoExecutorDynamicConfig>(), "porto-tracker"); - - executor->SubscribeFailed(BIND([=] (const TError& error) { - YT_LOG_ERROR(error, "Fatal error during Porto polling"); - })); - - LeakyRefCountedSingleton<TPortoProfilers>( - CreatePortoProfilerWithTags(GetSelfPortoInstance(executor), "daemon", podSpec), - CreatePortoProfilerWithTags(GetRootPortoInstance(executor), "pod", podSpec)); - }).AsyncVia(GetCurrentInvoker()) - .Run() - .Subscribe(BIND([] (const TError& error) { - YT_LOG_ERROR_IF(!error.IsOK(), error, "Failed to enable Porto profiler"); - })); -} -#else -void EnablePortoResourceTracker(const TPodSpecConfigPtr& /*podSpec*/) -{ - YT_LOG_WARNING("Porto resource tracker not supported"); -} -#endif - -//////////////////////////////////////////////////////////////////////////////// - -} // namespace NYT::NContainers diff --git a/yt/yt/library/containers/porto_resource_tracker.h b/yt/yt/library/containers/porto_resource_tracker.h deleted file mode 100644 index 8a0f781949..0000000000 --- a/yt/yt/library/containers/porto_resource_tracker.h +++ /dev/null @@ -1,158 +0,0 @@ -#pragma once - -#include <yt/yt/library/containers/instance.h> -#include <yt/yt/library/containers/public.h> - -#include <yt/yt/library/containers/cgroup.h> - -#include <yt/yt/core/misc/singleton.h> -#include <yt/yt/core/net/address.h> -#include <yt/yt/core/ytree/public.h> - -#include <yt/yt/library/process/process.h> -#include <yt/yt/library/profiling/producer.h> - -namespace NYT::NContainers { - -using namespace NProfiling; - -//////////////////////////////////////////////////////////////////////////////// - -static constexpr auto ResourceUsageUpdatePeriod = TDuration::MilliSeconds(1000); - -//////////////////////////////////////////////////////////////////////////////// - -using TCpuStatistics = TCpuAccounting::TStatistics; -using TBlockIOStatistics = TBlockIO::TStatistics; -using TMemoryStatistics = TMemory::TStatistics; -using TNetworkStatistics = TNetwork::TStatistics; - -struct TTotalStatistics -{ -public: - TCpuStatistics CpuStatistics; - TMemoryStatistics MemoryStatistics; - TBlockIOStatistics BlockIOStatistics; - TNetworkStatistics NetworkStatistics; -}; - -#ifdef _linux_ - -//////////////////////////////////////////////////////////////////////////////// - -class TPortoResourceTracker - : public TRefCounted -{ -public: - TPortoResourceTracker( - IInstancePtr instance, - TDuration updatePeriod, - bool isDeltaTracker = false, - bool isForceUpdate = false); - - TCpuStatistics GetCpuStatistics() const; - - TBlockIOStatistics GetBlockIOStatistics() const; - - TMemoryStatistics GetMemoryStatistics() const; - - TNetworkStatistics GetNetworkStatistics() const; - - TTotalStatistics GetTotalStatistics() const; - - bool AreResourceUsageStatisticsExpired() const; - - TInstant GetLastUpdateTime() const; - -private: - const IInstancePtr Instance_; - const TDuration UpdatePeriod_; - const bool IsDeltaTracker_; - const bool IsForceUpdate_; - - mutable std::atomic<TInstant> LastUpdateTime_ = {}; - - YT_DECLARE_SPIN_LOCK(NThreading::TSpinLock, SpinLock_); - mutable TResourceUsage ResourceUsage_; - mutable TResourceUsage ResourceUsageDelta_; - - mutable std::optional<TCpuStatistics> CachedCpuStatistics_; - mutable std::optional<TMemoryStatistics> CachedMemoryStatistics_; - mutable std::optional<TBlockIOStatistics> CachedBlockIOStatistics_; - mutable std::optional<TNetworkStatistics> CachedNetworkStatistics_; - mutable std::optional<TTotalStatistics> CachedTotalStatistics_; - mutable TErrorOr<ui64> PeakThreadCount_ = 0; - - template <class T, class F> - T GetStatistics( - std::optional<T>& cachedStatistics, - const TString& statisticsKind, - F extractor) const; - - TCpuStatistics ExtractCpuStatistics(const TResourceUsage& resourceUsage) const; - TMemoryStatistics ExtractMemoryStatistics(const TResourceUsage& resourceUsage) const; - TBlockIOStatistics ExtractBlockIOStatistics(const TResourceUsage& resourceUsage) const; - TNetworkStatistics ExtractNetworkStatistics(const TResourceUsage& resourceUsage) const; - TTotalStatistics ExtractTotalStatistics(const TResourceUsage& resourceUsage) const; - - TErrorOr<ui64> CalculateCounterDelta( - const TErrorOr<ui64>& oldValue, - const TErrorOr<ui64>& newValue) const; - - void ReCalculateResourceUsage(const TResourceUsage& newResourceUsage) const; - - void UpdateResourceUsageStatisticsIfExpired() const; - - void DoUpdateResourceUsage() const; -}; - -DEFINE_REFCOUNTED_TYPE(TPortoResourceTracker) - -//////////////////////////////////////////////////////////////////////////////// - -class TPortoResourceProfiler - : public ISensorProducer -{ -public: - TPortoResourceProfiler( - TPortoResourceTrackerPtr tracker, - TPodSpecConfigPtr podSpec, - const TProfiler& profiler = TProfiler{"/porto"}); - - void CollectSensors(ISensorWriter* writer) override; - -private: - const TPortoResourceTrackerPtr ResourceTracker_; - const TPodSpecConfigPtr PodSpec_; - - void WriteCpuMetrics( - ISensorWriter* writer, - TTotalStatistics& totalStatistics, - i64 timeDeltaUsec); - - void WriteMemoryMetrics( - ISensorWriter* writer, - TTotalStatistics& totalStatistics, - i64 timeDeltaUsec); - - void WriteBlockingIOMetrics( - ISensorWriter* writer, - TTotalStatistics& totalStatistics, - i64 timeDeltaUsec); - - void WriteNetworkMetrics( - ISensorWriter* writer, - TTotalStatistics& totalStatistics, - i64 timeDeltaUsec); -}; - -DECLARE_REFCOUNTED_TYPE(TPortoResourceProfiler) -DEFINE_REFCOUNTED_TYPE(TPortoResourceProfiler) - -//////////////////////////////////////////////////////////////////////////////// - -#endif - -void EnablePortoResourceTracker(const TPodSpecConfigPtr& podSpec); - -} // namespace NYT::NContainers diff --git a/yt/yt/library/containers/private.h b/yt/yt/library/containers/private.h deleted file mode 100644 index 62682cb364..0000000000 --- a/yt/yt/library/containers/private.h +++ /dev/null @@ -1,13 +0,0 @@ -#pragma once - -#include <yt/yt/core/logging/log.h> - -namespace NYT::NContainers { - -//////////////////////////////////////////////////////////////////////////////// - -inline const NLogging::TLogger ContainersLogger("Containers"); - -//////////////////////////////////////////////////////////////////////////////// - -} // namespace NYT::NContainers diff --git a/yt/yt/library/containers/process.cpp b/yt/yt/library/containers/process.cpp deleted file mode 100644 index ad1c8d35dc..0000000000 --- a/yt/yt/library/containers/process.cpp +++ /dev/null @@ -1,154 +0,0 @@ -#ifdef __linux__ - -#include "process.h" - -#include <yt/yt/library/containers/instance.h> - -#include <yt/yt/core/misc/proc.h> -#include <yt/yt/core/misc/fs.h> - -namespace NYT::NContainers { - -using namespace NPipes; -using namespace NNet; -using namespace NConcurrency; - -//////////////////////////////////////////////////////////////////////////////// - -static inline const NLogging::TLogger Logger("Process"); - -static constexpr pid_t InvalidProcessId = -1; - -//////////////////////////////////////////////////////////////////////////////// - -TPortoProcess::TPortoProcess( - const TString& path, - IInstanceLauncherPtr containerLauncher, - bool copyEnv) - : TProcessBase(path) - , ContainerLauncher_(std::move(containerLauncher)) -{ - AddArgument(NFS::GetFileName(path)); - if (copyEnv) { - for (char** envIt = environ; *envIt; ++envIt) { - Env_.push_back(Capture(*envIt)); - } - } -} - -void TPortoProcess::Kill(int signal) -{ - if (auto instance = GetInstance()) { - instance->Kill(signal); - } -} - -void TPortoProcess::DoSpawn() -{ - YT_VERIFY(ProcessId_ == InvalidProcessId && !Finished_); - YT_VERIFY(!GetInstance()); - YT_VERIFY(!Started_); - YT_VERIFY(!Args_.empty()); - - if (!WorkingDirectory_.empty()) { - ContainerLauncher_->SetCwd(WorkingDirectory_); - } - - Started_ = true; - - try { - // TPortoProcess doesn't support running processes inside rootFS. - YT_VERIFY(!ContainerLauncher_->HasRoot()); - std::vector<TString> args(Args_.begin() + 1, Args_.end()); - auto instance = WaitFor(ContainerLauncher_->Launch(ResolvedPath_, args, DecomposeEnv())) - .ValueOrThrow(); - ContainerInstance_.Store(instance); - FinishedPromise_.SetFrom(instance->Wait()); - - try { - ProcessId_ = instance->GetPid(); - } catch (const std::exception& ex) { - // This could happen if Porto container has already died or pid namespace of - // parent container is not a parent of pid namespace of child container. - // It's not a problem, since for Porto process pid is used for logging purposes only. - YT_LOG_DEBUG(ex, "Failed to get pid of root process (Container: %v)", - instance->GetName()); - } - - YT_LOG_DEBUG("Process inside Porto spawned successfully (Path: %v, ExternalPid: %v, Container: %v)", - ResolvedPath_, - ProcessId_, - instance->GetName()); - - FinishedPromise_.ToFuture().Subscribe(BIND([=, this, this_ = MakeStrong(this)] (const TError& exitStatus) { - Finished_ = true; - if (exitStatus.IsOK()) { - YT_LOG_DEBUG("Process inside Porto exited gracefully (ExternalPid: %v, Container: %v)", - ProcessId_, - instance->GetName()); - } else { - YT_LOG_DEBUG(exitStatus, "Process inside Porto exited with an error (ExternalPid: %v, Container: %v)", - ProcessId_, - instance->GetName()); - } - })); - } catch (const std::exception& ex) { - Finished_ = true; - THROW_ERROR_EXCEPTION("Failed to start child process inside Porto") - << TErrorAttribute("path", ResolvedPath_) - << TErrorAttribute("container", ContainerLauncher_->GetName()) - << ex; - } -} - -IInstancePtr TPortoProcess::GetInstance() -{ - return ContainerInstance_.Acquire(); -} - -THashMap<TString, TString> TPortoProcess::DecomposeEnv() const -{ - THashMap<TString, TString> result; - for (const auto& env : Env_) { - TStringBuf name, value; - TStringBuf(env).TrySplit('=', name, value); - result[name] = value; - } - return result; -} - -static TString CreateStdIONamedPipePath() -{ - const TString name = ToString(TGuid::Create()); - return NFS::GetRealPath(NFS::CombinePaths("/tmp", name)); -} - -IConnectionWriterPtr TPortoProcess::GetStdInWriter() -{ - auto pipe = TNamedPipe::Create(CreateStdIONamedPipePath()); - ContainerLauncher_->SetStdIn(pipe->GetPath()); - NamedPipes_.push_back(pipe); - return pipe->CreateAsyncWriter(); -} - -IConnectionReaderPtr TPortoProcess::GetStdOutReader() -{ - auto pipe = TNamedPipe::Create(CreateStdIONamedPipePath()); - ContainerLauncher_->SetStdOut(pipe->GetPath()); - NamedPipes_.push_back(pipe); - return pipe->CreateAsyncReader(); -} - -IConnectionReaderPtr TPortoProcess::GetStdErrReader() -{ - auto pipe = TNamedPipe::Create(CreateStdIONamedPipePath()); - ContainerLauncher_->SetStdErr(pipe->GetPath()); - NamedPipes_.push_back(pipe); - return pipe->CreateAsyncReader(); -} - -//////////////////////////////////////////////////////////////////////////////// - -} // namespace NYT::NContainers - -#endif diff --git a/yt/yt/library/containers/process.h b/yt/yt/library/containers/process.h deleted file mode 100644 index 75255165d8..0000000000 --- a/yt/yt/library/containers/process.h +++ /dev/null @@ -1,46 +0,0 @@ -#pragma once - -#include "public.h" - -#include <yt/yt/library/process/process.h> - -#include <library/cpp/yt/memory/atomic_intrusive_ptr.h> - -#include <library/cpp/porto/libporto.hpp> - -namespace NYT::NContainers { - -//////////////////////////////////////////////////////////////////////////////// - -// NB(psushin): this class is deprecated and only used to run job proxy. -// ToDo(psushin): kill me. -class TPortoProcess - : public TProcessBase -{ -public: - TPortoProcess( - const TString& path, - NContainers::IInstanceLauncherPtr containerLauncher, - bool copyEnv = true); - void Kill(int signal) override; - NNet::IConnectionWriterPtr GetStdInWriter() override; - NNet::IConnectionReaderPtr GetStdOutReader() override; - NNet::IConnectionReaderPtr GetStdErrReader() override; - - NContainers::IInstancePtr GetInstance(); - -private: - const NContainers::IInstanceLauncherPtr ContainerLauncher_; - - TAtomicIntrusivePtr<NContainers::IInstance> ContainerInstance_; - std::vector<NPipes::TNamedPipePtr> NamedPipes_; - - void DoSpawn() override; - THashMap<TString, TString> DecomposeEnv() const; -}; - -DEFINE_REFCOUNTED_TYPE(TPortoProcess) - -//////////////////////////////////////////////////////////////////////////////// - -} // namespace NYT::NContainers diff --git a/yt/yt/library/containers/public.h b/yt/yt/library/containers/public.h deleted file mode 100644 index d8e3cf3491..0000000000 --- a/yt/yt/library/containers/public.h +++ /dev/null @@ -1,163 +0,0 @@ -#pragma once - -#include <yt/yt/core/misc/public.h> - -#include <library/cpp/porto/proto/rpc.pb.h> -#include <library/cpp/yt/misc/enum.h> - -namespace NYT::NContainers { - -//////////////////////////////////////////////////////////////////////////////// - -const int PortoErrorCodeBase = 12000; - -DEFINE_ENUM(EPortoErrorCode, - ((Success) ((PortoErrorCodeBase + Porto::EError::Success))) - ((Unknown) ((PortoErrorCodeBase + Porto::EError::Unknown))) - ((InvalidMethod) ((PortoErrorCodeBase + Porto::EError::InvalidMethod))) - ((ContainerAlreadyExists) ((PortoErrorCodeBase + Porto::EError::ContainerAlreadyExists))) - ((ContainerDoesNotExist) ((PortoErrorCodeBase + Porto::EError::ContainerDoesNotExist))) - ((InvalidProperty) ((PortoErrorCodeBase + Porto::EError::InvalidProperty))) - ((InvalidData) ((PortoErrorCodeBase + Porto::EError::InvalidData))) - ((InvalidValue) ((PortoErrorCodeBase + Porto::EError::InvalidValue))) - ((InvalidState) ((PortoErrorCodeBase + Porto::EError::InvalidState))) - ((NotSupported) ((PortoErrorCodeBase + Porto::EError::NotSupported))) - ((ResourceNotAvailable) ((PortoErrorCodeBase + Porto::EError::ResourceNotAvailable))) - ((Permission) ((PortoErrorCodeBase + Porto::EError::Permission))) - ((VolumeAlreadyExists) ((PortoErrorCodeBase + Porto::EError::VolumeAlreadyExists))) - ((VolumeNotFound) ((PortoErrorCodeBase + Porto::EError::VolumeNotFound))) - ((NoSpace) ((PortoErrorCodeBase + Porto::EError::NoSpace))) - ((Busy) ((PortoErrorCodeBase + Porto::EError::Busy))) - ((VolumeAlreadyLinked) ((PortoErrorCodeBase + Porto::EError::VolumeAlreadyLinked))) - ((VolumeNotLinked) ((PortoErrorCodeBase + Porto::EError::VolumeNotLinked))) - ((LayerAlreadyExists) ((PortoErrorCodeBase + Porto::EError::LayerAlreadyExists))) - ((LayerNotFound) ((PortoErrorCodeBase + Porto::EError::LayerNotFound))) - ((NoValue) ((PortoErrorCodeBase + Porto::EError::NoValue))) - ((VolumeNotReady) ((PortoErrorCodeBase + Porto::EError::VolumeNotReady))) - ((InvalidCommand) ((PortoErrorCodeBase + Porto::EError::InvalidCommand))) - ((LostError) ((PortoErrorCodeBase + Porto::EError::LostError))) - ((DeviceNotFound) ((PortoErrorCodeBase + Porto::EError::DeviceNotFound))) - ((InvalidPath) ((PortoErrorCodeBase + Porto::EError::InvalidPath))) - ((InvalidNetworkAddress) ((PortoErrorCodeBase + Porto::EError::InvalidNetworkAddress))) - ((PortoFrozen) ((PortoErrorCodeBase + Porto::EError::PortoFrozen))) - ((LabelNotFound) ((PortoErrorCodeBase + Porto::EError::LabelNotFound))) - ((InvalidLabel) ((PortoErrorCodeBase + Porto::EError::InvalidLabel))) - ((NotFound) ((PortoErrorCodeBase + Porto::EError::NotFound))) - ((SocketError) ((PortoErrorCodeBase + Porto::EError::SocketError))) - ((SocketUnavailable) ((PortoErrorCodeBase + Porto::EError::SocketUnavailable))) - ((SocketTimeout) ((PortoErrorCodeBase + Porto::EError::SocketTimeout))) - ((Taint) ((PortoErrorCodeBase + Porto::EError::Taint))) - ((Queued) ((PortoErrorCodeBase + Porto::EError::Queued))) -); - -//////////////////////////////////////////////////////////////////////////////// - -YT_DEFINE_ERROR_ENUM( - ((FailedToStartContainer) (14000)) -); - -DEFINE_ENUM(EStatField, - // CPU - (CpuUsage) - (CpuUserUsage) - (CpuSystemUsage) - (CpuWait) - (CpuThrottled) - (ContextSwitches) - (ContextSwitchesDelta) - (ThreadCount) - (CpuLimit) - (CpuGuarantee) - - // Memory - (Rss) - (MappedFile) - (MajorPageFaults) - (MinorPageFaults) - (FileCacheUsage) - (AnonMemoryUsage) - (AnonMemoryLimit) - (MemoryUsage) - (MemoryGuarantee) - (MemoryLimit) - (MaxMemoryUsage) - (OomKills) - (OomKillsTotal) - - // IO - (IOReadByte) - (IOWriteByte) - (IOBytesLimit) - (IOReadOps) - (IOWriteOps) - (IOOps) - (IOOpsLimit) - (IOTotalTime) - (IOWaitTime) - - // Network - (NetTxBytes) - (NetTxPackets) - (NetTxDrops) - (NetTxLimit) - (NetRxBytes) - (NetRxPackets) - (NetRxDrops) - (NetRxLimit) -); - -DEFINE_ENUM(EEnablePorto, - (None) - (Isolate) - (Full) -); - -struct TBind -{ - TString SourcePath; - TString TargetPath; - bool ReadOnly; -}; - -struct TRootFS -{ - TString RootPath; - bool IsRootReadOnly; - std::vector<TBind> Binds; -}; - -struct TDevice -{ - TString DeviceName; - bool Enabled; -}; - -struct TInstanceLimits -{ - double Cpu = 0; - i64 Memory = 0; - std::optional<i64> NetTx; - std::optional<i64> NetRx; - - bool operator==(const TInstanceLimits&) const = default; -}; - -DECLARE_REFCOUNTED_STRUCT(IContainerManager) -DECLARE_REFCOUNTED_STRUCT(IInstanceLauncher) -DECLARE_REFCOUNTED_STRUCT(IInstance) -DECLARE_REFCOUNTED_STRUCT(IPortoExecutor) - -DECLARE_REFCOUNTED_CLASS(TPortoHealthChecker) -DECLARE_REFCOUNTED_CLASS(TInstanceLimitsTracker) -DECLARE_REFCOUNTED_CLASS(TPortoProcess) -DECLARE_REFCOUNTED_CLASS(TPortoResourceTracker) -DECLARE_REFCOUNTED_CLASS(TPortoExecutorDynamicConfig) -DECLARE_REFCOUNTED_CLASS(TPodSpecConfig) - -//////////////////////////////////////////////////////////////////////////////// - -bool IsValidCGroupType(const TString& type); - -//////////////////////////////////////////////////////////////////////////////// - -} // namespace NYT::NContainers diff --git a/yt/yt/library/containers/unittests/containers_ut.cpp b/yt/yt/library/containers/unittests/containers_ut.cpp deleted file mode 100644 index 4f1c10a435..0000000000 --- a/yt/yt/library/containers/unittests/containers_ut.cpp +++ /dev/null @@ -1,133 +0,0 @@ -#include <yt/yt/core/test_framework/framework.h> - -#ifdef _linux_ - -#include <yt/yt/library/containers/config.h> -#include <yt/yt/library/containers/porto_executor.h> -#include <yt/yt/library/containers/instance.h> - -#include <util/system/platform.h> -#include <util/system/env.h> - -namespace NYT::NContainers { -namespace { - -using namespace NConcurrency; - -//////////////////////////////////////////////////////////////////////////////// - -class TContainersTest - : public ::testing::Test -{ - void SetUp() override - { - if (GetEnv("SKIP_PORTO_TESTS") != "") { - GTEST_SKIP(); - } - } -}; - -static TString GetUniqueName() -{ - return "yt_ut_" + ToString(TGuid::Create()); -} - -IPortoExecutorPtr CreatePortoExecutor() -{ - return CreatePortoExecutor(New<TPortoExecutorDynamicConfig>(), "default"); -} - -TEST_F(TContainersTest, ListSubcontainers) -{ - auto executor = CreatePortoExecutor(); - auto name = GetUniqueName(); - - WaitFor(executor->CreateContainer(name)) - .ThrowOnError(); - - auto absoluteName = *WaitFor(executor->GetContainerProperty(name, "absolute_name")) - .ValueOrThrow(); - - auto nestedName = absoluteName + "/nested"; - WaitFor(executor->CreateContainer(nestedName)) - .ThrowOnError(); - - auto withRoot = WaitFor(executor->ListSubcontainers(name, true)) - .ValueOrThrow(); - EXPECT_EQ(std::vector<TString>({absoluteName, nestedName}), withRoot); - - auto withoutRoot = WaitFor(executor->ListSubcontainers(name, false)) - .ValueOrThrow(); - EXPECT_EQ(std::vector<TString>({nestedName}), withoutRoot); - - WaitFor(executor->DestroyContainer(absoluteName)) - .ThrowOnError(); -} - -// See https://st.yandex-team.ru/PORTO-846. -TEST_F(TContainersTest, DISABLED_WaitContainer) -{ - auto executor = CreatePortoExecutor(); - auto name = GetUniqueName(); - - WaitFor(executor->CreateContainer(name)) - .ThrowOnError(); - - WaitFor(executor->SetContainerProperty(name, "command", "sleep 10")) - .ThrowOnError(); - - WaitFor(executor->StartContainer(name)) - .ThrowOnError(); - - auto exitCode = WaitFor(executor->WaitContainer(name)) - .ValueOrThrow(); - - EXPECT_EQ(0, exitCode); - - WaitFor(executor->DestroyContainer(name)) - .ThrowOnError(); -} - -TEST_F(TContainersTest, CreateFromSpec) -{ - auto executor = CreatePortoExecutor(); - auto name = GetUniqueName(); - - auto spec = TRunnableContainerSpec { - .Name = name, - .Command = "sleep 2", - }; - - WaitFor(executor->CreateContainer(spec, /*start*/ true)) - .ThrowOnError(); - - auto exitCode = WaitFor(executor->PollContainer(name)) - .ValueOrThrow(); - - EXPECT_EQ(0, exitCode); - - WaitFor(executor->DestroyContainer(name)) - .ThrowOnError(); -} - -TEST_F(TContainersTest, ListPids) -{ - auto launcher = CreatePortoInstanceLauncher( - GetUniqueName(), - CreatePortoExecutor()); - - auto instance = WaitFor(launcher->Launch("sleep", {"5"}, {})) - .ValueOrThrow(); - - auto pids = instance->GetPids(); - EXPECT_LT(0u, pids.size()); - - instance->Destroy(); -} - -//////////////////////////////////////////////////////////////////////////////// - -} // namespace -} // namespace NYT::NContainers - -#endif diff --git a/yt/yt/library/containers/unittests/porto_resource_tracker_ut.cpp b/yt/yt/library/containers/unittests/porto_resource_tracker_ut.cpp deleted file mode 100644 index 04d169ba4e..0000000000 --- a/yt/yt/library/containers/unittests/porto_resource_tracker_ut.cpp +++ /dev/null @@ -1,251 +0,0 @@ -#include <yt/yt/core/test_framework/framework.h> - -#include <yt/yt/core/ytree/convert.h> - -#include <util/system/fs.h> -#include <util/system/tempfile.h> - -#include <yt/yt/library/profiling/producer.h> -#include <yt/yt/library/containers/config.h> -#include <yt/yt/library/containers/porto_executor.h> -#include <yt/yt/library/containers/porto_resource_tracker.h> -#include <yt/yt/library/containers/instance.h> - -#include <util/system/platform.h> -#include <util/system/env.h> - -namespace NYT::NContainers { -namespace { - -using namespace NConcurrency; - -//////////////////////////////////////////////////////////////////////////////// - -static constexpr auto TestUpdatePeriod = TDuration::MilliSeconds(10); - -class TPortoTrackerTest - : public ::testing::Test -{ -public: - IPortoExecutorPtr Executor; - - void SetUp() override - { - if (GetEnv("SKIP_PORTO_TESTS") != "") { - GTEST_SKIP(); - } - - Executor = CreatePortoExecutor(New<TPortoExecutorDynamicConfig>(), "default"); - } -}; - -TString GetUniqueName() -{ - return "yt_porto_ut_" + ToString(TGuid::Create()); -} - -TPortoResourceTrackerPtr CreateSumPortoTracker(IPortoExecutorPtr Executor, const TString& name) -{ - return New<TPortoResourceTracker>( - GetPortoInstance(Executor, name), - TestUpdatePeriod, - false); -} - -TPortoResourceProfilerPtr CreateDeltaPortoProfiler(IPortoExecutorPtr executor, const TString& name) -{ - auto instance = GetPortoInstance(executor, name); - auto portoResourceTracker = New<TPortoResourceTracker>( - instance, - ResourceUsageUpdatePeriod, - true, - true - ); - - // Init metrics for delta tracker. - portoResourceTracker->GetTotalStatistics(); - - return LeakyRefCountedSingleton<TPortoResourceProfiler>( - portoResourceTracker, - New<TPodSpecConfig>(), - TProfiler("/porto") - .WithTag("porto_name", instance->GetName()) - .WithTag("container_category", "yt_daemon")); -} - -void AssertGauges(const std::vector<std::tuple<TString, TTagList, double>>& gauges) { - THashSet<TString> sensors{ - "/cpu/user", - "/cpu/total", - "/cpu/system", - "/cpu/wait", - "/cpu/throttled", - "/cpu/guarantee", - "/cpu/limit", - "/cpu/thread_count", - "/cpu/context_switches", - - "/memory/minor_page_faults", - "/memory/major_page_faults", - "/memory/file_cache_usage", - "/memory/anon_usage", - "/memory/anon_limit", - "/memory/memory_usage", - "/memory/memory_guarantee", - "/memory/memory_limit", - - "/io/read_bytes", - "/io/write_bytes", - "/io/bytes_limit", - - "/io/read_ops", - "/io/write_ops", - "/io/ops", - "/io/ops_limit", - "/io/total", - - "/network/rx_bytes", - "/network/rx_drops", - "/network/rx_packets", - "/network/rx_limit", - "/network/tx_bytes", - "/network/tx_drops", - "/network/tx_packets", - "/network/tx_limit" - }; - - THashSet<TString> mayBeEmpty{ - "/cpu/wait", - "/cpu/throttled", - "/cpu/guarantee", - "/cpu/context_switches", - "/memory/major_page_faults", - "/memory/memory_guarantee", - "/io/ops_limit", - "/io/read_ops", - "/io/write_ops", - "/io/wait", - "/io/bytes_limit", - "/network/rx_bytes", - "/network/rx_drops", - "/network/rx_packets", - "/network/rx_limit", - "/network/tx_bytes", - "/network/tx_drops", - "/network/tx_packets", - "/network/tx_limit" - }; - - for (const auto& [name, tags, value] : gauges) { - EXPECT_TRUE(value >= 0 && sensors.find(name) || mayBeEmpty.find(name)); - } -} - -TEST_F(TPortoTrackerTest, ValidateSummaryPortoTracker) -{ - auto name = GetUniqueName(); - - WaitFor(Executor->CreateContainer( - TRunnableContainerSpec { - .Name = name, - .Command = "sleep .1", - }, true)) - .ThrowOnError(); - - auto tracker = CreateSumPortoTracker(Executor, name); - - auto firstStatistics = tracker->GetTotalStatistics(); - - WaitFor(Executor->StopContainer(name)) - .ThrowOnError(); - WaitFor(Executor->SetContainerProperty( - name, - "command", - "find /")) - .ThrowOnError(); - WaitFor(Executor->StartContainer(name)) - .ThrowOnError(); - Sleep(TDuration::MilliSeconds(500)); - - auto secondStatistics = tracker->GetTotalStatistics(); - - WaitFor(Executor->DestroyContainer(name)) - .ThrowOnError(); -} - -TEST_F(TPortoTrackerTest, ValidateDeltaPortoTracker) -{ - auto name = GetUniqueName(); - - auto spec = TRunnableContainerSpec { - .Name = name, - .Command = "sleep .1", - }; - - WaitFor(Executor->CreateContainer(spec, true)) - .ThrowOnError(); - - auto profiler = CreateDeltaPortoProfiler(Executor, name); - - WaitFor(Executor->StopContainer(name)) - .ThrowOnError(); - WaitFor(Executor->SetContainerProperty( - name, - "command", - "find /")) - .ThrowOnError(); - WaitFor(Executor->StartContainer(name)) - .ThrowOnError(); - - Sleep(TDuration::MilliSeconds(500)); - - auto buffer = New<TSensorBuffer>(); - profiler->CollectSensors(buffer.Get()); - AssertGauges(buffer->GetGauges()); - - WaitFor(Executor->DestroyContainer(name)) - .ThrowOnError(); -} - -TEST_F(TPortoTrackerTest, ValidateDeltaRootPortoTracker) -{ - auto name = GetUniqueName(); - - auto spec = TRunnableContainerSpec { - .Name = name, - .Command = "sleep .1", - }; - - WaitFor(Executor->CreateContainer(spec, true)) - .ThrowOnError(); - - auto profiler = CreateDeltaPortoProfiler( - Executor, - GetPortoInstance( - Executor, - *GetPortoInstance(Executor, name)->GetRootName())->GetName()); - - WaitFor(Executor->StopContainer(name)) - .ThrowOnError(); - WaitFor(Executor->SetContainerProperty( - name, - "command", - "find /")) - .ThrowOnError(); - WaitFor(Executor->StartContainer(name)) - .ThrowOnError(); - - Sleep(TDuration::MilliSeconds(500)); - - auto buffer = New<TSensorBuffer>(); - profiler->CollectSensors(buffer.Get()); - AssertGauges(buffer->GetGauges()); - - WaitFor(Executor->DestroyContainer(name)) - .ThrowOnError(); -} - -//////////////////////////////////////////////////////////////////////////////// - -} // namespace -} // namespace NYT::NContainers diff --git a/yt/yt/library/containers/unittests/process_ut.cpp b/yt/yt/library/containers/unittests/process_ut.cpp deleted file mode 100644 index b9c0d844f4..0000000000 --- a/yt/yt/library/containers/unittests/process_ut.cpp +++ /dev/null @@ -1,302 +0,0 @@ -#include <yt/yt/core/test_framework/framework.h> - -#ifdef _linux_ - -#include <yt/yt/core/actions/bind.h> - -#include <yt/yt/core/concurrency/action_queue.h> -#include <yt/yt/core/concurrency/delayed_executor.h> -#include <yt/yt/core/concurrency/scheduler.h> - -#include <yt/yt/core/misc/guid.h> -#include <yt/yt/core/misc/proc.h> - -#include <yt/yt/core/net/connection.h> - -#include <yt/yt/library/containers/process.h> - -#include <yt/yt/library/containers/config.h> -#include <yt/yt/library/containers/porto_executor.h> -#include <yt/yt/library/containers/instance.h> - -#include <util/system/platform.h> -#include <util/system/env.h> - -namespace NYT::NContainers { -namespace { - -using namespace NConcurrency; - -//////////////////////////////////////////////////////////////////////////////// - -class TPortoProcessTest - : public ::testing::Test -{ - void SetUp() override - { - if (GetEnv("SKIP_PORTO_TESTS") != "") { - GTEST_SKIP(); - } - } -}; - -static TString GetUniqueName() -{ - return "yt_ut_" + ToString(TGuid::Create()); -} - -IPortoExecutorPtr CreatePortoExecutor() -{ - return CreatePortoExecutor(New<TPortoExecutorDynamicConfig>(), "default"); -} - -TEST_F(TPortoProcessTest, Basic) -{ - auto launcher = CreatePortoInstanceLauncher( - GetUniqueName(), - CreatePortoExecutor()); - auto p = New<TPortoProcess>("/bin/ls", launcher, true); - TFuture<void> finished; - ASSERT_NO_THROW(finished = p->Spawn()); - ASSERT_TRUE(p->IsStarted()); - auto error = WaitFor(finished); - EXPECT_TRUE(error.IsOK()) << ToString(error); - EXPECT_TRUE(p->IsFinished()); - p->GetInstance()->Destroy(); -} - -TEST_F(TPortoProcessTest, RunFromPathEnv) -{ - auto launcher = CreatePortoInstanceLauncher( - GetUniqueName(), - CreatePortoExecutor()); - auto p = New<TPortoProcess>("ls", launcher, true); - TFuture<void> finished; - ASSERT_NO_THROW(finished = p->Spawn()); - ASSERT_TRUE(p->IsStarted()); - auto error = WaitFor(finished); - EXPECT_TRUE(error.IsOK()) << ToString(error); - EXPECT_TRUE(p->IsFinished()); - p->GetInstance()->Destroy(); -} - -TEST_F(TPortoProcessTest, MultiBasic) -{ - auto portoExecutor = CreatePortoExecutor(); - auto l1 = CreatePortoInstanceLauncher(GetUniqueName(), portoExecutor); - auto l2 = CreatePortoInstanceLauncher(GetUniqueName(), portoExecutor); - auto p1 = New<TPortoProcess>("/bin/ls", l1, true); - auto p2 = New<TPortoProcess>("/bin/ls", l2, true); - TFuture<void> f1; - TFuture<void> f2; - ASSERT_NO_THROW(f1 = p1->Spawn()); - ASSERT_NO_THROW(f2 = p2->Spawn()); - auto error = WaitFor((AllSucceeded(std::vector<TFuture<void>>{f1, f2}))); - EXPECT_TRUE(error.IsOK()) << ToString(error); - EXPECT_TRUE(p1->IsFinished()); - EXPECT_TRUE(p2->IsFinished()); - p1->GetInstance()->Destroy(); - p2->GetInstance()->Destroy(); -} - -TEST_F(TPortoProcessTest, InvalidPath) -{ - auto portoExecutor = CreatePortoExecutor(); - auto launcher = CreatePortoInstanceLauncher( - GetUniqueName(), - portoExecutor); - auto p = New<TPortoProcess>("/some/bad/path/binary", launcher, true); - TFuture<void> finished; - ASSERT_NO_THROW(finished = p->Spawn()); - ASSERT_FALSE(p->IsStarted()); - auto error = WaitFor(finished); - EXPECT_FALSE(p->IsFinished()); - EXPECT_FALSE(error.IsOK()); - WaitFor(portoExecutor->DestroyContainer(launcher->GetName())) - .ThrowOnError(); -} - -TEST_F(TPortoProcessTest, StdOut) -{ - auto launcher = CreatePortoInstanceLauncher( - GetUniqueName(), - CreatePortoExecutor()); - auto p = New<TPortoProcess>("/bin/date", launcher, true); - - auto outStream = p->GetStdOutReader(); - TFuture<void> finished; - ASSERT_NO_THROW(finished = p->Spawn()); - ASSERT_TRUE(p->IsStarted()); - auto error = WaitFor(finished); - EXPECT_TRUE(error.IsOK()) << ToString(error); - EXPECT_TRUE(p->IsFinished()); - - auto buffer = TSharedMutableRef::Allocate(4_KB, {.InitializeStorage = false}); - auto future = outStream->Read(buffer); - TErrorOr<size_t> result = WaitFor(future); - size_t sz = result.ValueOrThrow(); - EXPECT_TRUE(sz > 0); - p->GetInstance()->Destroy(); -} - -TEST_F(TPortoProcessTest, GetCommandLine) -{ - auto launcher = CreatePortoInstanceLauncher( - GetUniqueName(), - CreatePortoExecutor()); - auto p = New<TPortoProcess>("/bin/bash", launcher, true); - EXPECT_EQ("/bin/bash", p->GetCommandLine()); - p->AddArgument("-c"); - EXPECT_EQ("/bin/bash -c", p->GetCommandLine()); - p->AddArgument("exit 0"); - EXPECT_EQ("/bin/bash -c \"exit 0\"", p->GetCommandLine()); -} - -TEST_F(TPortoProcessTest, ProcessReturnCode0) -{ - auto launcher = CreatePortoInstanceLauncher( - GetUniqueName(), - CreatePortoExecutor()); - auto p = New<TPortoProcess>("/bin/bash", launcher, true); - p->AddArgument("-c"); - p->AddArgument("exit 0"); - - TFuture<void> finished; - ASSERT_NO_THROW(finished = p->Spawn()); - ASSERT_TRUE(p->IsStarted()); - auto error = WaitFor(finished); - EXPECT_TRUE(error.IsOK()) << ToString(error); - EXPECT_TRUE(p->IsFinished()); - p->GetInstance()->Destroy(); -} - -TEST_F(TPortoProcessTest, ProcessReturnCode123) -{ - auto launcher = CreatePortoInstanceLauncher( - GetUniqueName(), - CreatePortoExecutor()); - auto p = New<TPortoProcess>("/bin/bash", launcher, true); - p->AddArgument("-c"); - p->AddArgument("exit 123"); - - TFuture<void> finished; - ASSERT_NO_THROW(finished = p->Spawn()); - ASSERT_TRUE(p->IsStarted()); - auto error = WaitFor(finished); - EXPECT_EQ(EProcessErrorCode::NonZeroExitCode, error.GetCode()); - EXPECT_EQ(123, error.Attributes().Get<int>("exit_code")); - EXPECT_TRUE(p->IsFinished()); - p->GetInstance()->Destroy(); -} - -TEST_F(TPortoProcessTest, Params1) -{ - auto launcher = CreatePortoInstanceLauncher( - GetUniqueName(), - CreatePortoExecutor()); - auto p = New<TPortoProcess>("/bin/bash", launcher, true); - p->AddArgument("-c"); - p->AddArgument("if test 3 -gt 1; then exit 7; fi"); - - auto error = WaitFor(p->Spawn()); - EXPECT_FALSE(error.IsOK()); - EXPECT_TRUE(p->IsFinished()); - p->GetInstance()->Destroy(); -} - -TEST_F(TPortoProcessTest, Params2) -{ - auto launcher = CreatePortoInstanceLauncher( - GetUniqueName(), - CreatePortoExecutor()); - auto p = New<TPortoProcess>("/bin/bash", launcher, true); - p->AddArgument("-c"); - p->AddArgument("if test 1 -gt 3; then exit 7; fi"); - - auto error = WaitFor(p->Spawn()); - EXPECT_TRUE(error.IsOK()) << ToString(error); - EXPECT_TRUE(p->IsFinished()); - p->GetInstance()->Destroy(); -} - -TEST_F(TPortoProcessTest, InheritEnvironment) -{ - const char* name = "SPAWN_TEST_ENV_VAR"; - const char* value = "42"; - setenv(name, value, 1); - - auto launcher = CreatePortoInstanceLauncher( - GetUniqueName(), - CreatePortoExecutor()); - auto p = New<TPortoProcess>("/bin/bash", launcher, true); - p->AddArgument("-c"); - p->AddArgument("if test $SPAWN_TEST_ENV_VAR = 42; then exit 7; fi"); - - auto error = WaitFor(p->Spawn()); - EXPECT_FALSE(error.IsOK()); - EXPECT_TRUE(p->IsFinished()); - - unsetenv(name); - p->GetInstance()->Destroy(); -} - -TEST_F(TPortoProcessTest, Kill) -{ - auto launcher = CreatePortoInstanceLauncher( - GetUniqueName(), - CreatePortoExecutor()); - auto p = New<TPortoProcess>("/bin/sleep", launcher, true); - p->AddArgument("5"); - - auto finished = p->Spawn(); - - NConcurrency::TDelayedExecutor::Submit( - BIND([&] () { - p->Kill(SIGKILL); - }), - TDuration::MilliSeconds(100)); - - auto error = WaitFor(finished); - EXPECT_FALSE(error.IsOK()) << ToString(error); - EXPECT_TRUE(p->IsFinished()); - p->GetInstance()->Destroy(); -} - -TEST_F(TPortoProcessTest, KillFinished) -{ - auto launcher = CreatePortoInstanceLauncher( - GetUniqueName(), - CreatePortoExecutor()); - auto p = New<TPortoProcess>("/bin/bash", launcher, true); - p->AddArgument("-c"); - p->AddArgument("true"); - - auto finished = p->Spawn(); - - auto error = WaitFor(finished); - EXPECT_TRUE(error.IsOK()); - - p->Kill(SIGKILL); - p->GetInstance()->Destroy(); -} - -TEST_F(TPortoProcessTest, PollDuration) -{ - auto launcher = CreatePortoInstanceLauncher( - GetUniqueName(), - CreatePortoExecutor()); - auto p = New<TPortoProcess>("/bin/sleep", launcher, true); - p->AddArgument("1"); - - auto error = WaitFor(p->Spawn()); - EXPECT_TRUE(error.IsOK()) << ToString(error); - EXPECT_TRUE(p->IsFinished()); - p->GetInstance()->Destroy(); -} - -//////////////////////////////////////////////////////////////////////////////// - -} // namespace -} // namespace NYT::NContainers - -#endif diff --git a/yt/yt/library/containers/unittests/ya.make b/yt/yt/library/containers/unittests/ya.make deleted file mode 100644 index 42984e2dc7..0000000000 --- a/yt/yt/library/containers/unittests/ya.make +++ /dev/null @@ -1,35 +0,0 @@ -GTEST(unittester-containers) - -INCLUDE(${ARCADIA_ROOT}/yt/ya_cpp.make.inc) - -ALLOCATOR(TCMALLOC) - -IF (AUTOCHECK) - ENV(SKIP_PORTO_TESTS=1) -ENDIF() - -IF (DISTBUILD) # TODO(prime@): this is always on - ENV(SKIP_PORTO_TESTS=1) -ENDIF() - -SRCS( - containers_ut.cpp - process_ut.cpp -) - -IF(OS_LINUX) - SRCS( - porto_resource_tracker_ut.cpp - ) -ENDIF() - -INCLUDE(${ARCADIA_ROOT}/yt/opensource_tests.inc) - -PEERDIR( - yt/yt/build - yt/yt/library/containers -) - -SIZE(MEDIUM) - -END() diff --git a/yt/yt/library/containers/ya.make b/yt/yt/library/containers/ya.make deleted file mode 100644 index 499b8d9da8..0000000000 --- a/yt/yt/library/containers/ya.make +++ /dev/null @@ -1,37 +0,0 @@ -LIBRARY() - -INCLUDE(${ARCADIA_ROOT}/yt/ya_cpp.make.inc) - -SRCS( - cgroup.cpp - config.cpp - instance.cpp - instance_limits_tracker.cpp - process.cpp - porto_executor.cpp - porto_resource_tracker.cpp - porto_health_checker.cpp -) - -PEERDIR( - library/cpp/porto/proto - yt/yt/library/process - yt/yt/core -) - -IF(OS_LINUX) - PEERDIR( - library/cpp/porto - ) -ENDIF() - -END() - -RECURSE( - disk_manager - cri -) - -RECURSE_FOR_TESTS( - unittests -) diff --git a/yt/yt/library/process/CMakeLists.darwin-x86_64.txt b/yt/yt/library/process/CMakeLists.darwin-x86_64.txt deleted file mode 100644 index b66c679390..0000000000 --- a/yt/yt/library/process/CMakeLists.darwin-x86_64.txt +++ /dev/null @@ -1,26 +0,0 @@ - -# This file was generated by the build system used internally in the Yandex monorepo. -# Only simple modifications are allowed (adding source-files to targets, adding simple properties -# like target_include_directories). These modifications will be ported to original -# ya.make files by maintainers. Any complex modifications which can't be ported back to the -# original buildsystem will not be accepted. - - - -add_library(yt-library-process) -target_compile_options(yt-library-process PRIVATE - -Wdeprecated-this-capture -) -target_link_libraries(yt-library-process PUBLIC - contrib-libs-cxxsupp - yutil - yt-yt-core - contrib-libs-re2 -) -target_sources(yt-library-process PRIVATE - ${CMAKE_SOURCE_DIR}/yt/yt/library/process/io_dispatcher.cpp - ${CMAKE_SOURCE_DIR}/yt/yt/library/process/pipe.cpp - ${CMAKE_SOURCE_DIR}/yt/yt/library/process/process.cpp - ${CMAKE_SOURCE_DIR}/yt/yt/library/process/pty.cpp - ${CMAKE_SOURCE_DIR}/yt/yt/library/process/subprocess.cpp -) diff --git a/yt/yt/library/process/CMakeLists.txt b/yt/yt/library/process/CMakeLists.txt index f8b31df0c1..4d48dcdee6 100644 --- a/yt/yt/library/process/CMakeLists.txt +++ b/yt/yt/library/process/CMakeLists.txt @@ -8,10 +8,6 @@ if (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND NOT HAVE_CUDA) include(CMakeLists.linux-aarch64.txt) -elseif (CMAKE_SYSTEM_NAME STREQUAL "Darwin" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64") - include(CMakeLists.darwin-x86_64.txt) -elseif (WIN32 AND CMAKE_SYSTEM_PROCESSOR STREQUAL "AMD64" AND NOT HAVE_CUDA) - include(CMakeLists.windows-x86_64.txt) elseif (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND NOT HAVE_CUDA) include(CMakeLists.linux-x86_64.txt) endif() diff --git a/yt/yt/library/process/CMakeLists.windows-x86_64.txt b/yt/yt/library/process/CMakeLists.windows-x86_64.txt deleted file mode 100644 index 3637ee7dae..0000000000 --- a/yt/yt/library/process/CMakeLists.windows-x86_64.txt +++ /dev/null @@ -1,23 +0,0 @@ - -# This file was generated by the build system used internally in the Yandex monorepo. -# Only simple modifications are allowed (adding source-files to targets, adding simple properties -# like target_include_directories). These modifications will be ported to original -# ya.make files by maintainers. Any complex modifications which can't be ported back to the -# original buildsystem will not be accepted. - - - -add_library(yt-library-process) -target_link_libraries(yt-library-process PUBLIC - contrib-libs-cxxsupp - yutil - yt-yt-core - contrib-libs-re2 -) -target_sources(yt-library-process PRIVATE - ${CMAKE_SOURCE_DIR}/yt/yt/library/process/io_dispatcher.cpp - ${CMAKE_SOURCE_DIR}/yt/yt/library/process/pipe.cpp - ${CMAKE_SOURCE_DIR}/yt/yt/library/process/process.cpp - ${CMAKE_SOURCE_DIR}/yt/yt/library/process/pty.cpp - ${CMAKE_SOURCE_DIR}/yt/yt/library/process/subprocess.cpp -) diff --git a/yt/yt/library/program/CMakeLists.darwin-x86_64.txt b/yt/yt/library/program/CMakeLists.darwin-x86_64.txt index fc3e796e79..e4ea489293 100644 --- a/yt/yt/library/program/CMakeLists.darwin-x86_64.txt +++ b/yt/yt/library/program/CMakeLists.darwin-x86_64.txt @@ -17,7 +17,6 @@ target_link_libraries(yt-library-program PUBLIC yt-yt-core core-service_discovery-yp yt-library-monitoring - yt-library-containers library-profiling-solomon library-profiling-tcmalloc library-profiling-perf diff --git a/yt/yt/library/program/CMakeLists.linux-aarch64.txt b/yt/yt/library/program/CMakeLists.linux-aarch64.txt index be31ba10cf..7c15fb8b7d 100644 --- a/yt/yt/library/program/CMakeLists.linux-aarch64.txt +++ b/yt/yt/library/program/CMakeLists.linux-aarch64.txt @@ -18,7 +18,6 @@ target_link_libraries(yt-library-program PUBLIC yt-yt-core core-service_discovery-yp yt-library-monitoring - yt-library-containers library-profiling-solomon library-profiling-tcmalloc library-profiling-perf diff --git a/yt/yt/library/program/CMakeLists.linux-x86_64.txt b/yt/yt/library/program/CMakeLists.linux-x86_64.txt index be31ba10cf..7c15fb8b7d 100644 --- a/yt/yt/library/program/CMakeLists.linux-x86_64.txt +++ b/yt/yt/library/program/CMakeLists.linux-x86_64.txt @@ -18,7 +18,6 @@ target_link_libraries(yt-library-program PUBLIC yt-yt-core core-service_discovery-yp yt-library-monitoring - yt-library-containers library-profiling-solomon library-profiling-tcmalloc library-profiling-perf diff --git a/yt/yt/library/program/CMakeLists.windows-x86_64.txt b/yt/yt/library/program/CMakeLists.windows-x86_64.txt index 1f2aea4bd0..7f37407e9d 100644 --- a/yt/yt/library/program/CMakeLists.windows-x86_64.txt +++ b/yt/yt/library/program/CMakeLists.windows-x86_64.txt @@ -14,7 +14,6 @@ target_link_libraries(yt-library-program PUBLIC yt-yt-core core-service_discovery-yp yt-library-monitoring - yt-library-containers library-profiling-solomon library-profiling-tcmalloc library-profiling-perf diff --git a/yt/yt/library/program/config.cpp b/yt/yt/library/program/config.cpp index 0705fb48fc..ccc7bb1f2f 100644 --- a/yt/yt/library/program/config.cpp +++ b/yt/yt/library/program/config.cpp @@ -103,12 +103,8 @@ void TSingletonsConfig::Register(TRegistrar registrar) .Default(true); registrar.Parameter("enable_resource_tracker", &TThis::EnableResourceTracker) .Default(true); - registrar.Parameter("enable_porto_resource_tracker", &TThis::EnablePortoResourceTracker) - .Default(false); registrar.Parameter("resource_tracker_vcpu_factor", &TThis::ResourceTrackerVCpuFactor) .Optional(); - registrar.Parameter("pod_spec", &TThis::PodSpec) - .DefaultNew(); registrar.Parameter("heap_profiler", &TThis::HeapProfiler) .DefaultNew(); diff --git a/yt/yt/library/program/config.h b/yt/yt/library/program/config.h index 7d92939f1c..88f649dd8e 100644 --- a/yt/yt/library/program/config.h +++ b/yt/yt/library/program/config.h @@ -22,8 +22,6 @@ #include <yt/yt/library/profiling/solomon/exporter.h> -#include <yt/yt/library/containers/config.h> - #include <yt/yt/library/tracing/jaeger/tracer.h> #include <library/cpp/yt/stockpile/stockpile.h> @@ -149,9 +147,7 @@ public: TStockpileConfigPtr Stockpile; bool EnableRefCountedTrackerProfiling; bool EnableResourceTracker; - bool EnablePortoResourceTracker; std::optional<double> ResourceTrackerVCpuFactor; - NContainers::TPodSpecConfigPtr PodSpec; THeapProfilerConfigPtr HeapProfiler; REGISTER_YSON_STRUCT(TSingletonsConfig); diff --git a/yt/yt/library/program/helpers.cpp b/yt/yt/library/program/helpers.cpp index 5c7ff29db1..fc4a84d828 100644 --- a/yt/yt/library/program/helpers.cpp +++ b/yt/yt/library/program/helpers.cpp @@ -16,19 +16,14 @@ #include <yt/yt/library/profiling/resource_tracker/resource_tracker.h> -#include <yt/yt/library/containers/config.h> -#include <yt/yt/library/containers/porto_resource_tracker.h> - #include <yt/yt/core/logging/log_manager.h> #include <yt/yt/core/concurrency/execution_stack.h> #include <yt/yt/core/concurrency/periodic_executor.h> -#include <yt/yt/core/concurrency/private.h> #include <tcmalloc/malloc_extension.h> #include <yt/yt/core/net/address.h> -#include <yt/yt/core/net/local_address.h> #include <yt/yt/core/rpc/dispatcher.h> #include <yt/yt/core/rpc/grpc/dispatcher.h> @@ -37,8 +32,6 @@ #include <yt/yt/core/threading/spin_wait_slow_path_logger.h> -#include <library/cpp/yt/threading/spin_wait_hook.h> - #include <library/cpp/yt/memory/atomic_intrusive_ptr.h> #include <util/string/split.h> @@ -236,10 +229,6 @@ void ConfigureSingletonsImpl(const TConfig& config) NProfiling::SetVCpuFactor(config->ResourceTrackerVCpuFactor.value()); } } - - if (config->EnablePortoResourceTracker) { - NContainers::EnablePortoResourceTracker(config->PodSpec); - } } void ConfigureSingletons(const TSingletonsConfigPtr& config) diff --git a/yt/yt/library/program/ya.make b/yt/yt/library/program/ya.make index 5742ce9287..1d044b5a8d 100644 --- a/yt/yt/library/program/ya.make +++ b/yt/yt/library/program/ya.make @@ -16,7 +16,6 @@ PEERDIR( yt/yt/core yt/yt/core/service_discovery/yp yt/yt/library/monitoring - yt/yt/library/containers yt/yt/library/profiling/solomon yt/yt/library/profiling/tcmalloc yt/yt/library/profiling/perf |