aboutsummaryrefslogtreecommitdiffstats
path: root/yql/essentials/utils/signals
diff options
context:
space:
mode:
authorvvvv <vvvv@yandex-team.com>2024-11-01 15:41:40 +0300
committervvvv <vvvv@yandex-team.com>2024-11-01 15:55:52 +0300
commit3325f745e67f7f442790822b5c9c5e9996708be7 (patch)
treef7318d68bbe8990092715436444b05297ce35777 /yql/essentials/utils/signals
parent6dce3f1c71786f2694b73b1a5155efc58f4557dd (diff)
downloadydb-3325f745e67f7f442790822b5c9c5e9996708be7.tar.gz
Moved yql/utils YQL-19206
Также была выделена жирная зависимость из yql/utils в yql/utils/network, в результате library/cpp/getopt была добавлена явно в те проекты, которые ее ранее наследовали, а не указывали явно commit_hash:36aa4c41f807b4cbbf70a3ed7ac0a1a5079bb75d
Diffstat (limited to 'yql/essentials/utils/signals')
-rw-r--r--yql/essentials/utils/signals/signals.cpp300
-rw-r--r--yql/essentials/utils/signals/signals.h34
-rw-r--r--yql/essentials/utils/signals/utils.cpp122
-rw-r--r--yql/essentials/utils/signals/utils.h21
-rw-r--r--yql/essentials/utils/signals/ya.make20
5 files changed, 497 insertions, 0 deletions
diff --git a/yql/essentials/utils/signals/signals.cpp b/yql/essentials/utils/signals/signals.cpp
new file mode 100644
index 0000000000..9ebe667120
--- /dev/null
+++ b/yql/essentials/utils/signals/signals.cpp
@@ -0,0 +1,300 @@
+#include "signals.h"
+#include "utils.h"
+
+#include <yql/essentials/utils/log/log.h>
+#include <yql/essentials/utils/backtrace/backtrace.h>
+#include <contrib/ydb/library/yql/providers/yt/lib/log/yt_logger.h>
+
+#include <util/stream/output.h>
+#include <util/generic/yexception.h>
+#include <util/datetime/base.h>
+#include <util/network/socket.h>
+#include <util/system/getpid.h>
+
+#ifdef _linux_
+# include <sys/prctl.h>
+#endif
+
+#include <string.h>
+#include <signal.h>
+#include <errno.h>
+#include <stdlib.h>
+
+
+namespace NYql {
+
+volatile sig_atomic_t NeedTerminate = 0;
+volatile sig_atomic_t NeedQuit = 0;
+volatile sig_atomic_t NeedReconfigure = 0;
+volatile sig_atomic_t NeedReopenLog = 0;
+volatile sig_atomic_t NeedReapZombies = 0;
+volatile sig_atomic_t NeedInterrupt = 0;
+
+volatile sig_atomic_t CatchInterrupt = 0;
+
+TPipe SignalPipeW;
+TPipe SignalPipeR;
+
+namespace {
+
+void SignalHandler(int signo)
+{
+ switch (signo) {
+ case SIGTERM:
+ NeedTerminate = 1;
+ break;
+
+ case SIGQUIT:
+ NeedQuit = 1;
+ break;
+
+#ifdef _unix_
+ case SIGHUP:
+ NeedReconfigure = 1;
+ break;
+
+ case SIGUSR1:
+ NeedReopenLog = 1;
+ break;
+
+ case SIGCHLD:
+ NeedReapZombies = 1;
+ break;
+#endif
+
+ case SIGINT:
+ if (CatchInterrupt) {
+ NeedInterrupt = 1;
+ } else {
+ fprintf(stderr, "%s (pid=%d) captured SIGINT\n",
+ GetProcTitle(), getpid());
+ signal(signo, SIG_DFL);
+ raise(signo);
+ }
+ break;
+
+ default:
+ break;
+ }
+}
+
+void SignalHandlerWithSelfPipe(int signo)
+{
+ SignalHandler(signo);
+
+ int savedErrno = errno;
+ if (write(SignalPipeW.GetHandle(), "x", 1) == -1 && errno != EAGAIN) {
+ static TStringBuf msg("cannot write to signal pipe");
+#ifndef STDERR_FILENO
+#define STDERR_FILENO 2
+#endif
+ write(STDERR_FILENO, msg.data(), msg.size());
+ abort();
+ }
+ errno = savedErrno;
+}
+
+
+#ifndef _unix_
+const char* strsignal(int signo)
+{
+ switch (signo) {
+ case SIGTERM: return "SIGTERM";
+ case SIGINT: return "SIGINT";
+ case SIGQUIT: return "SIGQUIT";
+ default:
+ return "UNKNOWN";
+ }
+}
+#endif
+
+namespace {
+
+class TEmergencyLogOutput: public IOutputStream {
+public:
+ TEmergencyLogOutput()
+ : Current_(Buf_)
+ , End_(Y_ARRAY_END(Buf_))
+ {
+ }
+
+ ~TEmergencyLogOutput() {
+ }
+
+private:
+ inline size_t Avail() const noexcept {
+ return End_ - Current_;
+ }
+
+ void DoFlush() override {
+ if (Current_ != Buf_) {
+ NYql::NLog::YqlLogger().Write(TLOG_EMERG, Buf_, Current_ - Buf_);
+ Current_ = Buf_;
+ }
+ }
+
+ void DoWrite(const void* buf, size_t len) override {
+ len = Min(len, Avail());
+ if (len) {
+ char* end = Current_ + len;
+ memcpy(Current_, buf, len);
+ Current_ = end;
+ }
+ }
+
+private:
+ char Buf_[1 << 20];
+ char* Current_;
+ char* const End_;
+
+};
+
+TEmergencyLogOutput EMERGENCY_LOG_OUT;
+
+}
+
+void LogBacktraceOnSignal(int signum)
+{
+ if (NYql::NLog::IsYqlLoggerInitialized()) {
+ EMERGENCY_LOG_OUT << strsignal(signum) << TStringBuf(" (pid=") << GetPID() << TStringBuf("): ");
+ NYql::NBacktrace::KikimrBackTraceFormatImpl(&EMERGENCY_LOG_OUT);
+ EMERGENCY_LOG_OUT.Flush();
+ }
+ NYql::FlushYtDebugLog();
+ /* Now reraise the signal. We reactivate the signal’s default handling,
+ which is to terminate the process. We could just call exit or abort,
+ but reraising the signal sets the return status from the process
+ correctly. */
+ raise(signum);
+}
+
+
+#ifdef _unix_
+int SetSignalHandler(int signo, void (*handler)(int))
+{
+ struct sigaction sa;
+ sa.sa_flags = SA_RESTART;
+ sa.sa_handler = handler;
+ sigemptyset(&sa.sa_mask);
+
+ return sigaction(signo, &sa, nullptr);
+}
+
+#else
+int SetSignalHandler(int signo, void (*handler)(int))
+{
+ return (signal(signo, handler) == SIG_ERR) ? -1 : 0;
+}
+
+#endif
+
+struct TSignalHandlerDesc
+{
+ int signo;
+ void (*handler)(int);
+};
+
+void SetSignalHandlers(const TSignalHandlerDesc* handlerDescs)
+{
+ sigset_t interestedSignals;
+ SigEmptySet(&interestedSignals);
+
+ for (int i = 0; handlerDescs[i].signo != -1; i++) {
+ int signo = handlerDescs[i].signo;
+ SigAddSet(&interestedSignals, signo);
+
+ if (SetSignalHandler(signo, handlerDescs[i].handler) == -1) {
+ ythrow TSystemError() << "Cannot set handler for signal "
+ << strsignal(signo);
+ }
+ }
+
+ if (SigProcMask(SIG_BLOCK, &interestedSignals, NULL) == -1) {
+ ythrow TSystemError() << "Cannot set sigprocmask";
+ }
+
+ NYql::NBacktrace::AddAfterFatalCallback([](int signo){ LogBacktraceOnSignal(signo); });
+ NYql::NBacktrace::RegisterKikimrFatalActions();
+}
+
+} // namespace
+
+
+void InitSignals()
+{
+ TSignalHandlerDesc handlerDescs[] = {
+ { SIGTERM, SignalHandler },
+ { SIGINT, SignalHandler },
+ { SIGQUIT, SignalHandler },
+#ifdef _unix_
+ { SIGPIPE, SIG_IGN },
+ { SIGHUP, SignalHandler },
+ { SIGUSR1, SignalHandler },
+ { SIGCHLD, SignalHandler },
+#endif
+ { -1, nullptr }
+ };
+
+ SetSignalHandlers(handlerDescs);
+}
+
+void InitSignalsWithSelfPipe()
+{
+ TSignalHandlerDesc handlerDescs[] = {
+ { SIGTERM, SignalHandlerWithSelfPipe },
+ { SIGINT, SignalHandlerWithSelfPipe },
+ { SIGQUIT, SignalHandlerWithSelfPipe },
+#ifdef _unix_
+ { SIGPIPE, SIG_IGN },
+ { SIGHUP, SignalHandlerWithSelfPipe },
+ { SIGUSR1, SignalHandlerWithSelfPipe },
+ { SIGCHLD, SignalHandlerWithSelfPipe },
+#endif
+ { -1, nullptr }
+ };
+
+ TPipe::Pipe(SignalPipeR, SignalPipeW);
+ SetNonBlock(SignalPipeR.GetHandle());
+ SetNonBlock(SignalPipeW.GetHandle());
+
+ SetSignalHandlers(handlerDescs);
+}
+
+void CatchInterruptSignal(bool doCatch) {
+ CatchInterrupt = doCatch;
+}
+
+void SigSuspend(const sigset_t* mask)
+{
+#ifdef _unix_
+ sigsuspend(mask);
+#else
+ Y_UNUSED(mask);
+ Sleep(TDuration::Seconds(1));
+#endif
+}
+
+void AllowAnySignals()
+{
+ sigset_t blockMask;
+ SigEmptySet(&blockMask);
+
+ if (SigProcMask(SIG_SETMASK, &blockMask, NULL) == -1) {
+ ythrow TSystemError() << "Cannot set sigprocmask";
+ }
+}
+
+bool HasPendingQuitOrTerm() {
+#ifdef _unix_
+ sigset_t signals;
+ SigEmptySet(&signals);
+ if (sigpending(&signals)) {
+ ythrow TSystemError() << "Error in sigpending";
+ }
+
+ return (SigIsMember(&signals, SIGQUIT) == 1) || (SigIsMember(&signals, SIGTERM) == 1);
+#else
+ return false;
+#endif
+}
+} // namespace NYql
diff --git a/yql/essentials/utils/signals/signals.h b/yql/essentials/utils/signals/signals.h
new file mode 100644
index 0000000000..612f906207
--- /dev/null
+++ b/yql/essentials/utils/signals/signals.h
@@ -0,0 +1,34 @@
+#pragma once
+
+#include <util/system/defaults.h>
+#include <util/system/sigset.h>
+#include <util/system/pipe.h>
+
+#include <signal.h>
+
+
+namespace NYql {
+
+#ifdef _win_
+using sig_atomic_t = int;
+#endif
+
+extern volatile sig_atomic_t NeedTerminate;
+extern volatile sig_atomic_t NeedQuit;
+extern volatile sig_atomic_t NeedReconfigure;
+extern volatile sig_atomic_t NeedReopenLog;
+extern volatile sig_atomic_t NeedReapZombies;
+extern volatile sig_atomic_t NeedInterrupt;
+
+extern TPipe SignalPipeW;
+extern TPipe SignalPipeR;
+
+void InitSignals();
+void InitSignalsWithSelfPipe();
+void CatchInterruptSignal(bool doCatch);
+
+void SigSuspend(const sigset_t* mask);
+void AllowAnySignals();
+bool HasPendingQuitOrTerm();
+
+} // namespace NYql
diff --git a/yql/essentials/utils/signals/utils.cpp b/yql/essentials/utils/signals/utils.cpp
new file mode 100644
index 0000000000..b1de131b30
--- /dev/null
+++ b/yql/essentials/utils/signals/utils.cpp
@@ -0,0 +1,122 @@
+#include "utils.h"
+
+#include <util/generic/yexception.h>
+#include <util/string/subst.h>
+
+#include <google/protobuf/text_format.h>
+
+#include <library/cpp/protobuf/json/proto2json.h>
+#include <library/cpp/json/yson/json2yson.h>
+
+#include <string.h>
+
+extern char** environ;
+
+namespace NYql {
+
+static char** g_OriginalArgv = nullptr;
+static char* g_OriginalArgvLast = nullptr;
+
+/*
+ * To change the process title in Linux and Darwin we have to set argv[1]
+ * to NULL and to copy the title to the same place where the argv[0] points to.
+ * However, argv[0] may be too small to hold a new title. Fortunately, Linux
+ * and Darwin store argv[] and environ[] one after another. So we should
+ * ensure that is the continuous memory and then we allocate the new memory
+ * for environ[] and copy it. After this we could use the memory starting
+ * from argv[0] for our process title.
+ *
+ * continuous memory block for process title
+ * ________________________________/\____________________________________
+ * / \
+ * +---------+---------+-----+------+------------+------------+-----+------+
+ * | argv[0] | argv[1] | ... | NULL | environ[0] | environ[1] | ... | NULL |
+ * +---------+---------+-----+------+------------+------------+-----+------+
+ * \_________________ _________________/
+ * \/
+ * must be relocated elsewhere
+ */
+void ProcTitleInit(int argc, const char* argv[])
+{
+ Y_UNUSED(argc);
+ Y_ABORT_UNLESS(!g_OriginalArgv, "ProcTitleInit() was already called");
+
+ g_OriginalArgv = const_cast<char**>(argv);
+
+ size_t size = 0;
+ for (int i = 0; environ[i]; i++) {
+ size += strlen(environ[i]) + 1;
+ }
+
+ char* newEnviron = new char[size];
+ g_OriginalArgvLast = g_OriginalArgv[0];
+
+ for (int i = 0; g_OriginalArgv[i]; i++) {
+ if (g_OriginalArgvLast == g_OriginalArgv[i]) {
+ g_OriginalArgvLast = g_OriginalArgv[i] + strlen(g_OriginalArgv[i]) + 1;
+ }
+ }
+
+ for (int i = 0; environ[i]; i++) {
+ if (g_OriginalArgvLast == environ[i]) {
+ size_t size = strlen(environ[i]) + 1;
+ g_OriginalArgvLast = environ[i] + size;
+
+ strncpy(newEnviron, environ[i], size);
+ environ[i] = newEnviron;
+ newEnviron += size;
+ }
+ }
+
+ g_OriginalArgvLast--;
+}
+
+void SetProcTitle(const char* title)
+{
+ if (!g_OriginalArgv) return;
+
+ char* p = g_OriginalArgv[0];
+ p += strlcpy(p, "yqlworker: ", g_OriginalArgvLast - p);
+ p += strlcpy(p, title, g_OriginalArgvLast - p);
+
+ if (g_OriginalArgvLast - p > 0) {
+ memset(p, 0, g_OriginalArgvLast - p);
+ }
+
+ g_OriginalArgv[1] = nullptr;
+}
+
+void AddProcTitleSuffix(const char* suffix)
+{
+ if (!g_OriginalArgv) return;
+
+ char* p = g_OriginalArgv[0];
+ p += strlcat(p, " ", g_OriginalArgvLast - p);
+ p += strlcat(p, suffix, g_OriginalArgvLast - p);
+}
+
+const char* GetProcTitle()
+{
+ return g_OriginalArgv ? g_OriginalArgv[0] : "UNKNOWN";
+}
+
+TString PbMessageToStr(const google::protobuf::Message& msg)
+{
+ TString str;
+ ::google::protobuf::TextFormat::Printer printer;
+ printer.SetSingleLineMode(true);
+ printer.PrintToString(msg, &str);
+ return str;
+}
+
+TString Proto2Yson(const google::protobuf::Message& proto) {
+ NJson::TJsonValue json;
+ NProtobufJson::Proto2Json(proto, json);
+
+ TString ysonResult;
+ TStringOutput stream(ysonResult);
+ NJson2Yson::SerializeJsonValueAsYson(json, &stream);
+ return ysonResult;
+}
+
+} // namespace NYql
diff --git a/yql/essentials/utils/signals/utils.h b/yql/essentials/utils/signals/utils.h
new file mode 100644
index 0000000000..75c55244fa
--- /dev/null
+++ b/yql/essentials/utils/signals/utils.h
@@ -0,0 +1,21 @@
+#pragma once
+
+#include <util/generic/fwd.h>
+
+namespace google {
+namespace protobuf {
+ class Message;
+} // namespace protobuf
+} // namespace google
+
+namespace NYql {
+
+void ProcTitleInit(int argc, const char* argv[]);
+void SetProcTitle(const char* title);
+void AddProcTitleSuffix(const char* suffix);
+const char* GetProcTitle();
+
+TString PbMessageToStr(const google::protobuf::Message& msg);
+TString Proto2Yson(const google::protobuf::Message& proto);
+
+} // namespace NYql
diff --git a/yql/essentials/utils/signals/ya.make b/yql/essentials/utils/signals/ya.make
new file mode 100644
index 0000000000..3c72783dbf
--- /dev/null
+++ b/yql/essentials/utils/signals/ya.make
@@ -0,0 +1,20 @@
+LIBRARY()
+
+SRCS(
+ signals.cpp
+ signals.h
+ utils.cpp
+ utils.h
+)
+
+PEERDIR(
+ contrib/libs/protobuf
+ library/cpp/logger/global
+ library/cpp/protobuf/json
+ library/cpp/json/yson
+ yql/essentials/utils/log
+ yql/essentials/utils/backtrace
+ contrib/ydb/library/yql/providers/yt/lib/log
+)
+
+END()