summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorudovichenko-r <[email protected]>2022-08-12 19:45:28 +0300
committerudovichenko-r <[email protected]>2022-08-12 19:45:28 +0300
commit9d8ceae328ea2efda6fe8d5480b434e22ad3ba95 (patch)
treee1586b5e55ecfeb79fa8f67a3f5de709a42b9196
parentfd43310b88d7ab79730f53246dd68a1e9e4a2f3a (diff)
[] Optional porto memory limit
-rw-r--r--ydb/library/yql/providers/dq/common/yql_dq_settings.h1
-rw-r--r--ydb/library/yql/providers/dq/task_runner/tasks_runner_pipe.cpp96
2 files changed, 60 insertions, 37 deletions
diff --git a/ydb/library/yql/providers/dq/common/yql_dq_settings.h b/ydb/library/yql/providers/dq/common/yql_dq_settings.h
index facb9756fff..e3dbf3320fe 100644
--- a/ydb/library/yql/providers/dq/common/yql_dq_settings.h
+++ b/ydb/library/yql/providers/dq/common/yql_dq_settings.h
@@ -17,7 +17,6 @@ struct TDqSettings {
struct TDefault {
static constexpr ui32 MaxTasksPerStage = 20U;
static constexpr ui32 MaxTasksPerOperation = 70U;
- static constexpr ui64 PortoMemoryLimit = 3_GB;
static constexpr bool EnablePorto = false;
static constexpr ui64 DataSizePerJob = 128_MB;
static constexpr ui64 MaxDataSizePerJob = 600_MB;
diff --git a/ydb/library/yql/providers/dq/task_runner/tasks_runner_pipe.cpp b/ydb/library/yql/providers/dq/task_runner/tasks_runner_pipe.cpp
index 5040850ad7c..2f2928e37db 100644
--- a/ydb/library/yql/providers/dq/task_runner/tasks_runner_pipe.cpp
+++ b/ydb/library/yql/providers/dq/task_runner/tasks_runner_pipe.cpp
@@ -25,8 +25,10 @@
#include <util/stream/file.h>
#include <util/stream/pipe.h>
#include <util/generic/size_literals.h>
+#include <util/generic/maybe.h>
#include <util/string/cast.h>
#include <util/string/strip.h>
+#include <util/string/builder.h>
namespace NYql::NTaskRunnerProxy {
@@ -189,7 +191,7 @@ protected:
for (size_t i = 0; i < Args.size(); ++i) {
ExecArgs[i] = const_cast<char*>(Args[i].c_str());
}
-
+
ExecEnv.resize(Env.size() + 1, nullptr);
EnvElems.reserve(Env.size());
size_t i = 0;
@@ -289,7 +291,7 @@ struct TProcessHolder {
struct TPortoSettings {
bool Enable;
- ui64 MemoryLimit;
+ TMaybe<ui64> MemoryLimit;
TString Layer;
TString ContainerNamePrefix;
@@ -314,24 +316,24 @@ public:
, PortoCtl(portoCtl)
, PortoLayer(portoSettings.Layer)
, MemoryLimit(portoSettings.MemoryLimit)
- , ContainerName(WorkDir.substr(WorkDir.rfind("/")+1))
+ , ContainerName(WorkDir.substr(WorkDir.rfind("/") + 1))
, InternalWorkDir_("mnt/work")
, InternalExeDir("usr/local/bin")
, TmpDir("TmpDir" + ContainerName)
{
NFs::MakeDirectory(TmpDir);
auto pos = ExeName.rfind("/");
- TString name = ExeName.substr(pos+1);
+ TString name = ExeName.substr(pos + 1);
if (portoSettings.ContainerNamePrefix) {
ContainerName = portoSettings.ContainerNamePrefix + "/" + ContainerName;
}
- YQL_CLOG(DEBUG, ProviderDq) << "HardLink " << ExeName << "'" << WorkDir + "/" + name << "'";
+ YQL_CLOG(DEBUG, ProviderDq) << "HardLink " << ExeName << "'" << WorkDir << "/" << name << "'";
if (NFs::HardLink(ExeName, WorkDir + "/" + name)) {
ExeName = WorkDir + "/" + name;
} else {
- YQL_CLOG(DEBUG, ProviderDq) << "HardLink Failed " << ExeName << "'" << WorkDir + "/" + name << "'";
+ YQL_CLOG(DEBUG, ProviderDq) << "HardLink Failed " << ExeName << "'" << WorkDir << "/" << name << "'";
}
}
@@ -346,23 +348,42 @@ public:
}
private:
+
+ TString GetPortoSetting(const TString& name) const {
+ TShellCommand cmd(PortoCtl, {"get", ContainerName, name});
+ cmd.Run().Wait();
+ return Strip(cmd.GetOutput());
+ }
+
+ void SetPortoSetting(const TString& name, const TString& value) const {
+ TShellCommand cmd(PortoCtl, {"set", ContainerName, name, value});
+ cmd.Run().Wait();
+ }
+
void Kill() override {
- try {
- // see YQL-13760
- TShellCommand cmd1(PortoCtl, {"get", ContainerName, "anon_limit"});
- cmd1.Run().Wait();
- i64 anonLimit = FromString<i64>(Strip(cmd1.GetOutput()));
- TShellCommand cmd2(PortoCtl, {"get", ContainerName, "anon_usage"});
- cmd2.Run().Wait();
- i64 anonUsage = FromString<i64>(Strip(cmd2.GetOutput()));
- if (anonUsage >= anonLimit) {
- TShellCommand cmd3(PortoCtl, {"set", ContainerName, "anon_limit",
- ToString(anonUsage + (1 << 20))});
- cmd3.Run().Wait();
+ if (MemoryLimit) {
+ try {
+ // see YQL-13760
+ i64 anonLimit = -1;
+ i64 anonUsage = -1;
+ if (auto val = GetPortoSetting("anon_limit")) {
+ if (!TryFromString(val, anonLimit)) {
+ anonLimit = -1;
+ }
+ }
+ if (auto val = GetPortoSetting("anon_usage")) {
+ if (!TryFromString(val, anonUsage)) {
+ anonUsage = -1;
+ }
+ }
+ if (anonLimit != -1 && anonUsage != -1 && anonUsage >= anonLimit) {
+ SetPortoSetting("anon_limit", ToString(anonUsage + 1_MB));
+ }
+ } catch (...) {
+ YQL_CLOG(DEBUG, ProviderDq) << "Cannot set anon_limit: " << CurrentExceptionMessage();
}
- } catch (...) {
- YQL_CLOG(DEBUG, ProviderDq) << "Cannot set anon_limit: " << CurrentExceptionMessage();
}
+
try {
TShellCommand cmd(PortoCtl, {"destroy", ContainerName});
cmd.Run().Wait();
@@ -375,24 +396,25 @@ private:
void PrepareForExec() override {
auto pos = ExeName.rfind("/");
TString exeDir = ExeName.substr(0, pos);
- TString exeName = ExeName.substr(pos+1);
+ TString exeName = ExeName.substr(pos + 1);
- TString command = InternalExeDir + "/" + exeName + " ";
- for (ui64 i = 1; i < Args.size(); ++i) {
- command += Args[i] + " ";
+ TStringBuilder command;
+ command << InternalExeDir << '/' << exeName << ' ';
+ for (size_t i = 1; i < Args.size(); ++i) {
+ command << Args[i] << ' ';
}
TString caps = "CHOWN;DAC_OVERRIDE;FOWNER;KILL;SETPCAP;IPC_LOCK;SYS_CHROOT;SYS_PTRACE;MKNOD;AUDIT_WRITE;SETFCAP";
- TString env;
+ TStringBuilder env;
for (const auto& [k, v] : Env) {
- env += k + "=" + v + ";";
+ env << k << '=' << v << ';';
}
- env += "TMPDIR=/tmp;";
+ env << "TMPDIR=/tmp;";
- TString bind;
- bind += WorkDir + " " + InternalWorkDir() + " rw;"; // see YQL-11392
- bind += exeDir + " " + InternalExeDir + " ro;";
- bind += TmpDir + " /tmp rw;";
+ TStringBuilder bind;
+ bind << WorkDir << ' ' << InternalWorkDir() << " rw;"; // see YQL-11392
+ bind << exeDir << ' ' << InternalExeDir << " ro;";
+ bind << TmpDir << " /tmp rw;";
ArgsElems = TVector<TString>{
"portoctl",
@@ -406,9 +428,11 @@ private:
"bind=" + bind,
"root_readonly=true",
"weak=false",
- "cpu_policy=idle",
- "anon_limit=" + ToString(MemoryLimit)
+ "cpu_policy=idle"
};
+ if (MemoryLimit) {
+ ArgsElems.push_back("anon_limit=" + ToString(*MemoryLimit));
+ }
ExecArgs.resize(ArgsElems.size() + 1, nullptr);
for (size_t i = 0; i < ArgsElems.size(); ++i) {
ExecArgs[i] = const_cast<char*>(ArgsElems[i].c_str());
@@ -427,7 +451,7 @@ private:
const TString PortoCtl;
const TString PortoLayer;
- const ui64 MemoryLimit;
+ const TMaybe<ui64> MemoryLimit;
TString ContainerName;
const TString InternalWorkDir_;
@@ -1626,7 +1650,7 @@ public:
, EnablePorto(options.EnablePorto)
, PortoSettings({
EnablePorto,
- TDqSettings::TDefault::PortoMemoryLimit,
+ Nothing(),
options.PortoLayer,
options.ContainerName
})
@@ -1776,7 +1800,7 @@ private:
conf->Dispatch(settings);
} catch (...) { /* ignore unknown settings */ }
portoSettings.Enable = EnablePorto && conf->EnablePorto.Get().GetOrElse(TDqSettings::TDefault::EnablePorto);
- portoSettings.MemoryLimit = conf->_PortoMemoryLimit.Get().GetOrElse(TDqSettings::TDefault::PortoMemoryLimit);
+ portoSettings.MemoryLimit = conf->_PortoMemoryLimit.Get();
if (portoSettings.Enable) {
YQL_CLOG(DEBUG, ProviderDq) << "Porto enabled";
}