diff options
author | udovichenko-r <[email protected]> | 2022-08-12 19:45:28 +0300 |
---|---|---|
committer | udovichenko-r <[email protected]> | 2022-08-12 19:45:28 +0300 |
commit | 9d8ceae328ea2efda6fe8d5480b434e22ad3ba95 (patch) | |
tree | e1586b5e55ecfeb79fa8f67a3f5de709a42b9196 | |
parent | fd43310b88d7ab79730f53246dd68a1e9e4a2f3a (diff) |
[] Optional porto memory limit
-rw-r--r-- | ydb/library/yql/providers/dq/common/yql_dq_settings.h | 1 | ||||
-rw-r--r-- | ydb/library/yql/providers/dq/task_runner/tasks_runner_pipe.cpp | 96 |
2 files changed, 60 insertions, 37 deletions
diff --git a/ydb/library/yql/providers/dq/common/yql_dq_settings.h b/ydb/library/yql/providers/dq/common/yql_dq_settings.h index facb9756fff..e3dbf3320fe 100644 --- a/ydb/library/yql/providers/dq/common/yql_dq_settings.h +++ b/ydb/library/yql/providers/dq/common/yql_dq_settings.h @@ -17,7 +17,6 @@ struct TDqSettings { struct TDefault { static constexpr ui32 MaxTasksPerStage = 20U; static constexpr ui32 MaxTasksPerOperation = 70U; - static constexpr ui64 PortoMemoryLimit = 3_GB; static constexpr bool EnablePorto = false; static constexpr ui64 DataSizePerJob = 128_MB; static constexpr ui64 MaxDataSizePerJob = 600_MB; diff --git a/ydb/library/yql/providers/dq/task_runner/tasks_runner_pipe.cpp b/ydb/library/yql/providers/dq/task_runner/tasks_runner_pipe.cpp index 5040850ad7c..2f2928e37db 100644 --- a/ydb/library/yql/providers/dq/task_runner/tasks_runner_pipe.cpp +++ b/ydb/library/yql/providers/dq/task_runner/tasks_runner_pipe.cpp @@ -25,8 +25,10 @@ #include <util/stream/file.h> #include <util/stream/pipe.h> #include <util/generic/size_literals.h> +#include <util/generic/maybe.h> #include <util/string/cast.h> #include <util/string/strip.h> +#include <util/string/builder.h> namespace NYql::NTaskRunnerProxy { @@ -189,7 +191,7 @@ protected: for (size_t i = 0; i < Args.size(); ++i) { ExecArgs[i] = const_cast<char*>(Args[i].c_str()); } - + ExecEnv.resize(Env.size() + 1, nullptr); EnvElems.reserve(Env.size()); size_t i = 0; @@ -289,7 +291,7 @@ struct TProcessHolder { struct TPortoSettings { bool Enable; - ui64 MemoryLimit; + TMaybe<ui64> MemoryLimit; TString Layer; TString ContainerNamePrefix; @@ -314,24 +316,24 @@ public: , PortoCtl(portoCtl) , PortoLayer(portoSettings.Layer) , MemoryLimit(portoSettings.MemoryLimit) - , ContainerName(WorkDir.substr(WorkDir.rfind("/")+1)) + , ContainerName(WorkDir.substr(WorkDir.rfind("/") + 1)) , InternalWorkDir_("mnt/work") , InternalExeDir("usr/local/bin") , TmpDir("TmpDir" + ContainerName) { NFs::MakeDirectory(TmpDir); auto pos = ExeName.rfind("/"); - TString name = ExeName.substr(pos+1); + TString name = ExeName.substr(pos + 1); if (portoSettings.ContainerNamePrefix) { ContainerName = portoSettings.ContainerNamePrefix + "/" + ContainerName; } - YQL_CLOG(DEBUG, ProviderDq) << "HardLink " << ExeName << "'" << WorkDir + "/" + name << "'"; + YQL_CLOG(DEBUG, ProviderDq) << "HardLink " << ExeName << "'" << WorkDir << "/" << name << "'"; if (NFs::HardLink(ExeName, WorkDir + "/" + name)) { ExeName = WorkDir + "/" + name; } else { - YQL_CLOG(DEBUG, ProviderDq) << "HardLink Failed " << ExeName << "'" << WorkDir + "/" + name << "'"; + YQL_CLOG(DEBUG, ProviderDq) << "HardLink Failed " << ExeName << "'" << WorkDir << "/" << name << "'"; } } @@ -346,23 +348,42 @@ public: } private: + + TString GetPortoSetting(const TString& name) const { + TShellCommand cmd(PortoCtl, {"get", ContainerName, name}); + cmd.Run().Wait(); + return Strip(cmd.GetOutput()); + } + + void SetPortoSetting(const TString& name, const TString& value) const { + TShellCommand cmd(PortoCtl, {"set", ContainerName, name, value}); + cmd.Run().Wait(); + } + void Kill() override { - try { - // see YQL-13760 - TShellCommand cmd1(PortoCtl, {"get", ContainerName, "anon_limit"}); - cmd1.Run().Wait(); - i64 anonLimit = FromString<i64>(Strip(cmd1.GetOutput())); - TShellCommand cmd2(PortoCtl, {"get", ContainerName, "anon_usage"}); - cmd2.Run().Wait(); - i64 anonUsage = FromString<i64>(Strip(cmd2.GetOutput())); - if (anonUsage >= anonLimit) { - TShellCommand cmd3(PortoCtl, {"set", ContainerName, "anon_limit", - ToString(anonUsage + (1 << 20))}); - cmd3.Run().Wait(); + if (MemoryLimit) { + try { + // see YQL-13760 + i64 anonLimit = -1; + i64 anonUsage = -1; + if (auto val = GetPortoSetting("anon_limit")) { + if (!TryFromString(val, anonLimit)) { + anonLimit = -1; + } + } + if (auto val = GetPortoSetting("anon_usage")) { + if (!TryFromString(val, anonUsage)) { + anonUsage = -1; + } + } + if (anonLimit != -1 && anonUsage != -1 && anonUsage >= anonLimit) { + SetPortoSetting("anon_limit", ToString(anonUsage + 1_MB)); + } + } catch (...) { + YQL_CLOG(DEBUG, ProviderDq) << "Cannot set anon_limit: " << CurrentExceptionMessage(); } - } catch (...) { - YQL_CLOG(DEBUG, ProviderDq) << "Cannot set anon_limit: " << CurrentExceptionMessage(); } + try { TShellCommand cmd(PortoCtl, {"destroy", ContainerName}); cmd.Run().Wait(); @@ -375,24 +396,25 @@ private: void PrepareForExec() override { auto pos = ExeName.rfind("/"); TString exeDir = ExeName.substr(0, pos); - TString exeName = ExeName.substr(pos+1); + TString exeName = ExeName.substr(pos + 1); - TString command = InternalExeDir + "/" + exeName + " "; - for (ui64 i = 1; i < Args.size(); ++i) { - command += Args[i] + " "; + TStringBuilder command; + command << InternalExeDir << '/' << exeName << ' '; + for (size_t i = 1; i < Args.size(); ++i) { + command << Args[i] << ' '; } TString caps = "CHOWN;DAC_OVERRIDE;FOWNER;KILL;SETPCAP;IPC_LOCK;SYS_CHROOT;SYS_PTRACE;MKNOD;AUDIT_WRITE;SETFCAP"; - TString env; + TStringBuilder env; for (const auto& [k, v] : Env) { - env += k + "=" + v + ";"; + env << k << '=' << v << ';'; } - env += "TMPDIR=/tmp;"; + env << "TMPDIR=/tmp;"; - TString bind; - bind += WorkDir + " " + InternalWorkDir() + " rw;"; // see YQL-11392 - bind += exeDir + " " + InternalExeDir + " ro;"; - bind += TmpDir + " /tmp rw;"; + TStringBuilder bind; + bind << WorkDir << ' ' << InternalWorkDir() << " rw;"; // see YQL-11392 + bind << exeDir << ' ' << InternalExeDir << " ro;"; + bind << TmpDir << " /tmp rw;"; ArgsElems = TVector<TString>{ "portoctl", @@ -406,9 +428,11 @@ private: "bind=" + bind, "root_readonly=true", "weak=false", - "cpu_policy=idle", - "anon_limit=" + ToString(MemoryLimit) + "cpu_policy=idle" }; + if (MemoryLimit) { + ArgsElems.push_back("anon_limit=" + ToString(*MemoryLimit)); + } ExecArgs.resize(ArgsElems.size() + 1, nullptr); for (size_t i = 0; i < ArgsElems.size(); ++i) { ExecArgs[i] = const_cast<char*>(ArgsElems[i].c_str()); @@ -427,7 +451,7 @@ private: const TString PortoCtl; const TString PortoLayer; - const ui64 MemoryLimit; + const TMaybe<ui64> MemoryLimit; TString ContainerName; const TString InternalWorkDir_; @@ -1626,7 +1650,7 @@ public: , EnablePorto(options.EnablePorto) , PortoSettings({ EnablePorto, - TDqSettings::TDefault::PortoMemoryLimit, + Nothing(), options.PortoLayer, options.ContainerName }) @@ -1776,7 +1800,7 @@ private: conf->Dispatch(settings); } catch (...) { /* ignore unknown settings */ } portoSettings.Enable = EnablePorto && conf->EnablePorto.Get().GetOrElse(TDqSettings::TDefault::EnablePorto); - portoSettings.MemoryLimit = conf->_PortoMemoryLimit.Get().GetOrElse(TDqSettings::TDefault::PortoMemoryLimit); + portoSettings.MemoryLimit = conf->_PortoMemoryLimit.Get(); if (portoSettings.Enable) { YQL_CLOG(DEBUG, ProviderDq) << "Porto enabled"; } |