aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorwhcrc <whcrc@ydb.tech>2022-08-04 21:48:25 +0300
committerwhcrc <whcrc@ydb.tech>2022-08-04 21:48:25 +0300
commita8d863a231b00ef5c1cc13f73c2e53f286fa878c (patch)
tree955b630fa5f114e39012596be61992e8d7d6bbf9
parent4227aa8a54523d7592ff3898124003115e0b998b (diff)
downloadydb-a8d863a231b00ef5c1cc13f73c2e53f286fa878c.tar.gz
fix vanilla job's child process halts
-rw-r--r--ydb/library/yql/providers/dq/task_runner/tasks_runner_pipe.cpp64
1 files changed, 34 insertions, 30 deletions
diff --git a/ydb/library/yql/providers/dq/task_runner/tasks_runner_pipe.cpp b/ydb/library/yql/providers/dq/task_runner/tasks_runner_pipe.cpp
index 72bf3bc24d0..d0da7f383bd 100644
--- a/ydb/library/yql/providers/dq/task_runner/tasks_runner_pipe.cpp
+++ b/ydb/library/yql/providers/dq/task_runner/tasks_runner_pipe.cpp
@@ -83,6 +83,7 @@ public:
Y_VERIFY(pipe(output) == 0);
Y_VERIFY(pipe(error) == 0);
+ PrepareForExec();
Pid = fork();
Y_VERIFY(Pid >= 0);
@@ -117,11 +118,14 @@ public:
Stdin = MakeHolder<TPipedOutput>(input[1]);
Stdout = MakeHolder<TPipedInput>(output[0]);
Stderr = MakeHolder<TPipedInput>(error[0]);
+ YQL_CLOG(DEBUG, ProviderDq) << "Forked child, pid: " << Pid;
#endif
}
virtual void Kill() {
#ifndef _win_
+ // todo: investigate why ain't killed sometimes
+ YQL_CLOG(DEBUG, ProviderDq) << "Kill child, pid: " << Pid;
kill(Pid, 9);
#endif
}
@@ -131,7 +135,7 @@ public:
return true;
#else
int status;
- YQL_CLOG(DEBUG, ProviderDq) << "Check Pid " << Pid;
+ YQL_CLOG(TRACE, ProviderDq) << "Check Pid " << Pid;
return waitpid(Pid, &status, WNOHANG) <= 0;
#endif
}
@@ -173,35 +177,37 @@ protected:
THolder<TPipedOutput> Stdin;
THolder<TPipedInput> Stdout;
THolder<TPipedInput> Stderr;
+ TVector<TString> EnvElems;
+ TVector<char*> ExecArgs;
+ TVector<char*> ExecEnv;
int Pid = -1;
- virtual void Exec() {
- char ** args;
- args = new char*[Args.size() + 1];
- ui32 i;
- for (i = 0; i < Args.size(); ++i) {
- args[i] = (char*) Args[i].c_str();
+ virtual void PrepareForExec() {
+ ExecArgs.resize(Args.size() + 1, nullptr);
+ for (uint i = 0; i < Args.size(); ++i) {
+ ExecArgs[i] = const_cast<char*>(Args[i].c_str());
}
- args[i] = nullptr;
-
- for (i = 3; i < 32768; ++i) {
- close(i);
+
+ ExecEnv.resize(Env.size() + 1, nullptr);
+ EnvElems.reserve(Env.size());
+ uint i = 0;
+ for (const auto& [k, v] : Env) {
+ EnvElems.push_back(k + "=" + v);
+ ExecEnv[i++] = const_cast<char*>(EnvElems.back().c_str());
}
+ }
- char** env;
- env = new char*[Env.size() + 1];
- i = 0;
- for (const auto& [k, v] : Env) {
- env[i++] = (char*)(k + "=" + v).c_str();
+ virtual void Exec() {
+ for (uint i = 3; i < 32768; ++i) {
+ close(i);
}
- env[i++] = nullptr;
if (!WorkDir.empty()) {
NFs::SetCurrentWorkingDirectory(WorkDir);
}
- if (execve(ExeName.c_str(), (char*const*)args, (char*const*)env) == -1) {
+ if (execve(ExeName.c_str(), ExecArgs.data(), ExecEnv.data()) == -1) {
ythrow TSystemError() << "Cannot execl";
}
}
@@ -365,7 +371,7 @@ private:
TChildProcess::Kill();
}
- void Exec() override {
+ void PrepareForExec() override {
auto pos = ExeName.rfind("/");
TString exeDir = ExeName.substr(0, pos);
TString exeName = ExeName.substr(pos+1);
@@ -383,12 +389,11 @@ private:
env += "TMPDIR=/tmp;";
TString bind;
- //bind += WorkDir + " " + InternalWorkDir() + " ro;";
bind += WorkDir + " " + InternalWorkDir() + " rw;"; // see YQL-11392
bind += exeDir + " " + InternalExeDir + " ro;";
bind += TmpDir + " /tmp rw;";
- TVector<TString> vargs = {
+ ArgsElems = TVector<TString>{
"portoctl",
"exec",
"-L",
@@ -403,20 +408,18 @@ private:
"cpu_policy=idle",
"anon_limit=" + ToString(MemoryLimit)
};
-
- char ** args;
- args = new char*[vargs.size() + 1];
- ui32 i;
- for (i = 0; i < vargs.size(); ++i) {
- args[i] = (char*) vargs[i].c_str();
+ ExecArgs.resize(ArgsElems.size() + 1, nullptr);
+ for (uint i = 0; i < ArgsElems.size(); ++i) {
+ ExecArgs[i] = const_cast<char*>(ArgsElems[i].c_str());
}
- args[i] = nullptr;
+ }
- for (i = 3; i < 32768; ++i) {
+ void Exec() override {
+ for (uint i = 3; i < 32768; ++i) {
close(i);
}
- if (execvp(PortoCtl.c_str(), (char*const*)args) == -1) {
+ if (execvp(PortoCtl.c_str(), ExecArgs.data()) == -1) {
ythrow TSystemError() << "Cannot execl";
}
}
@@ -429,6 +432,7 @@ private:
const TString InternalWorkDir_;
const TString InternalExeDir;
const TString TmpDir;
+ TVector<TString> ArgsElems;
};
/*______________________________________________________________________________________________*/