diff options
author | vvvv <vvvv@yandex-team.com> | 2024-11-01 15:41:40 +0300 |
---|---|---|
committer | vvvv <vvvv@yandex-team.com> | 2024-11-01 15:55:52 +0300 |
commit | 3325f745e67f7f442790822b5c9c5e9996708be7 (patch) | |
tree | f7318d68bbe8990092715436444b05297ce35777 /yql/essentials/utils/signals | |
parent | 6dce3f1c71786f2694b73b1a5155efc58f4557dd (diff) | |
download | ydb-3325f745e67f7f442790822b5c9c5e9996708be7.tar.gz |
Moved yql/utils YQL-19206
Также была выделена жирная зависимость из yql/utils в yql/utils/network, в результате library/cpp/getopt была добавлена явно в те проекты, которые ее ранее наследовали, а не указывали явно
commit_hash:36aa4c41f807b4cbbf70a3ed7ac0a1a5079bb75d
Diffstat (limited to 'yql/essentials/utils/signals')
-rw-r--r-- | yql/essentials/utils/signals/signals.cpp | 300 | ||||
-rw-r--r-- | yql/essentials/utils/signals/signals.h | 34 | ||||
-rw-r--r-- | yql/essentials/utils/signals/utils.cpp | 122 | ||||
-rw-r--r-- | yql/essentials/utils/signals/utils.h | 21 | ||||
-rw-r--r-- | yql/essentials/utils/signals/ya.make | 20 |
5 files changed, 497 insertions, 0 deletions
diff --git a/yql/essentials/utils/signals/signals.cpp b/yql/essentials/utils/signals/signals.cpp new file mode 100644 index 0000000000..9ebe667120 --- /dev/null +++ b/yql/essentials/utils/signals/signals.cpp @@ -0,0 +1,300 @@ +#include "signals.h" +#include "utils.h" + +#include <yql/essentials/utils/log/log.h> +#include <yql/essentials/utils/backtrace/backtrace.h> +#include <contrib/ydb/library/yql/providers/yt/lib/log/yt_logger.h> + +#include <util/stream/output.h> +#include <util/generic/yexception.h> +#include <util/datetime/base.h> +#include <util/network/socket.h> +#include <util/system/getpid.h> + +#ifdef _linux_ +# include <sys/prctl.h> +#endif + +#include <string.h> +#include <signal.h> +#include <errno.h> +#include <stdlib.h> + + +namespace NYql { + +volatile sig_atomic_t NeedTerminate = 0; +volatile sig_atomic_t NeedQuit = 0; +volatile sig_atomic_t NeedReconfigure = 0; +volatile sig_atomic_t NeedReopenLog = 0; +volatile sig_atomic_t NeedReapZombies = 0; +volatile sig_atomic_t NeedInterrupt = 0; + +volatile sig_atomic_t CatchInterrupt = 0; + +TPipe SignalPipeW; +TPipe SignalPipeR; + +namespace { + +void SignalHandler(int signo) +{ + switch (signo) { + case SIGTERM: + NeedTerminate = 1; + break; + + case SIGQUIT: + NeedQuit = 1; + break; + +#ifdef _unix_ + case SIGHUP: + NeedReconfigure = 1; + break; + + case SIGUSR1: + NeedReopenLog = 1; + break; + + case SIGCHLD: + NeedReapZombies = 1; + break; +#endif + + case SIGINT: + if (CatchInterrupt) { + NeedInterrupt = 1; + } else { + fprintf(stderr, "%s (pid=%d) captured SIGINT\n", + GetProcTitle(), getpid()); + signal(signo, SIG_DFL); + raise(signo); + } + break; + + default: + break; + } +} + +void SignalHandlerWithSelfPipe(int signo) +{ + SignalHandler(signo); + + int savedErrno = errno; + if (write(SignalPipeW.GetHandle(), "x", 1) == -1 && errno != EAGAIN) { + static TStringBuf msg("cannot write to signal pipe"); +#ifndef STDERR_FILENO +#define STDERR_FILENO 2 +#endif + write(STDERR_FILENO, msg.data(), msg.size()); + abort(); + } + errno = savedErrno; +} + + +#ifndef _unix_ +const char* strsignal(int signo) +{ + switch (signo) { + case SIGTERM: return "SIGTERM"; + case SIGINT: return "SIGINT"; + case SIGQUIT: return "SIGQUIT"; + default: + return "UNKNOWN"; + } +} +#endif + +namespace { + +class TEmergencyLogOutput: public IOutputStream { +public: + TEmergencyLogOutput() + : Current_(Buf_) + , End_(Y_ARRAY_END(Buf_)) + { + } + + ~TEmergencyLogOutput() { + } + +private: + inline size_t Avail() const noexcept { + return End_ - Current_; + } + + void DoFlush() override { + if (Current_ != Buf_) { + NYql::NLog::YqlLogger().Write(TLOG_EMERG, Buf_, Current_ - Buf_); + Current_ = Buf_; + } + } + + void DoWrite(const void* buf, size_t len) override { + len = Min(len, Avail()); + if (len) { + char* end = Current_ + len; + memcpy(Current_, buf, len); + Current_ = end; + } + } + +private: + char Buf_[1 << 20]; + char* Current_; + char* const End_; + +}; + +TEmergencyLogOutput EMERGENCY_LOG_OUT; + +} + +void LogBacktraceOnSignal(int signum) +{ + if (NYql::NLog::IsYqlLoggerInitialized()) { + EMERGENCY_LOG_OUT << strsignal(signum) << TStringBuf(" (pid=") << GetPID() << TStringBuf("): "); + NYql::NBacktrace::KikimrBackTraceFormatImpl(&EMERGENCY_LOG_OUT); + EMERGENCY_LOG_OUT.Flush(); + } + NYql::FlushYtDebugLog(); + /* Now reraise the signal. We reactivate the signal’s default handling, + which is to terminate the process. We could just call exit or abort, + but reraising the signal sets the return status from the process + correctly. */ + raise(signum); +} + + +#ifdef _unix_ +int SetSignalHandler(int signo, void (*handler)(int)) +{ + struct sigaction sa; + sa.sa_flags = SA_RESTART; + sa.sa_handler = handler; + sigemptyset(&sa.sa_mask); + + return sigaction(signo, &sa, nullptr); +} + +#else +int SetSignalHandler(int signo, void (*handler)(int)) +{ + return (signal(signo, handler) == SIG_ERR) ? -1 : 0; +} + +#endif + +struct TSignalHandlerDesc +{ + int signo; + void (*handler)(int); +}; + +void SetSignalHandlers(const TSignalHandlerDesc* handlerDescs) +{ + sigset_t interestedSignals; + SigEmptySet(&interestedSignals); + + for (int i = 0; handlerDescs[i].signo != -1; i++) { + int signo = handlerDescs[i].signo; + SigAddSet(&interestedSignals, signo); + + if (SetSignalHandler(signo, handlerDescs[i].handler) == -1) { + ythrow TSystemError() << "Cannot set handler for signal " + << strsignal(signo); + } + } + + if (SigProcMask(SIG_BLOCK, &interestedSignals, NULL) == -1) { + ythrow TSystemError() << "Cannot set sigprocmask"; + } + + NYql::NBacktrace::AddAfterFatalCallback([](int signo){ LogBacktraceOnSignal(signo); }); + NYql::NBacktrace::RegisterKikimrFatalActions(); +} + +} // namespace + + +void InitSignals() +{ + TSignalHandlerDesc handlerDescs[] = { + { SIGTERM, SignalHandler }, + { SIGINT, SignalHandler }, + { SIGQUIT, SignalHandler }, +#ifdef _unix_ + { SIGPIPE, SIG_IGN }, + { SIGHUP, SignalHandler }, + { SIGUSR1, SignalHandler }, + { SIGCHLD, SignalHandler }, +#endif + { -1, nullptr } + }; + + SetSignalHandlers(handlerDescs); +} + +void InitSignalsWithSelfPipe() +{ + TSignalHandlerDesc handlerDescs[] = { + { SIGTERM, SignalHandlerWithSelfPipe }, + { SIGINT, SignalHandlerWithSelfPipe }, + { SIGQUIT, SignalHandlerWithSelfPipe }, +#ifdef _unix_ + { SIGPIPE, SIG_IGN }, + { SIGHUP, SignalHandlerWithSelfPipe }, + { SIGUSR1, SignalHandlerWithSelfPipe }, + { SIGCHLD, SignalHandlerWithSelfPipe }, +#endif + { -1, nullptr } + }; + + TPipe::Pipe(SignalPipeR, SignalPipeW); + SetNonBlock(SignalPipeR.GetHandle()); + SetNonBlock(SignalPipeW.GetHandle()); + + SetSignalHandlers(handlerDescs); +} + +void CatchInterruptSignal(bool doCatch) { + CatchInterrupt = doCatch; +} + +void SigSuspend(const sigset_t* mask) +{ +#ifdef _unix_ + sigsuspend(mask); +#else + Y_UNUSED(mask); + Sleep(TDuration::Seconds(1)); +#endif +} + +void AllowAnySignals() +{ + sigset_t blockMask; + SigEmptySet(&blockMask); + + if (SigProcMask(SIG_SETMASK, &blockMask, NULL) == -1) { + ythrow TSystemError() << "Cannot set sigprocmask"; + } +} + +bool HasPendingQuitOrTerm() { +#ifdef _unix_ + sigset_t signals; + SigEmptySet(&signals); + if (sigpending(&signals)) { + ythrow TSystemError() << "Error in sigpending"; + } + + return (SigIsMember(&signals, SIGQUIT) == 1) || (SigIsMember(&signals, SIGTERM) == 1); +#else + return false; +#endif +} +} // namespace NYql diff --git a/yql/essentials/utils/signals/signals.h b/yql/essentials/utils/signals/signals.h new file mode 100644 index 0000000000..612f906207 --- /dev/null +++ b/yql/essentials/utils/signals/signals.h @@ -0,0 +1,34 @@ +#pragma once + +#include <util/system/defaults.h> +#include <util/system/sigset.h> +#include <util/system/pipe.h> + +#include <signal.h> + + +namespace NYql { + +#ifdef _win_ +using sig_atomic_t = int; +#endif + +extern volatile sig_atomic_t NeedTerminate; +extern volatile sig_atomic_t NeedQuit; +extern volatile sig_atomic_t NeedReconfigure; +extern volatile sig_atomic_t NeedReopenLog; +extern volatile sig_atomic_t NeedReapZombies; +extern volatile sig_atomic_t NeedInterrupt; + +extern TPipe SignalPipeW; +extern TPipe SignalPipeR; + +void InitSignals(); +void InitSignalsWithSelfPipe(); +void CatchInterruptSignal(bool doCatch); + +void SigSuspend(const sigset_t* mask); +void AllowAnySignals(); +bool HasPendingQuitOrTerm(); + +} // namespace NYql diff --git a/yql/essentials/utils/signals/utils.cpp b/yql/essentials/utils/signals/utils.cpp new file mode 100644 index 0000000000..b1de131b30 --- /dev/null +++ b/yql/essentials/utils/signals/utils.cpp @@ -0,0 +1,122 @@ +#include "utils.h" + +#include <util/generic/yexception.h> +#include <util/string/subst.h> + +#include <google/protobuf/text_format.h> + +#include <library/cpp/protobuf/json/proto2json.h> +#include <library/cpp/json/yson/json2yson.h> + +#include <string.h> + +extern char** environ; + +namespace NYql { + +static char** g_OriginalArgv = nullptr; +static char* g_OriginalArgvLast = nullptr; + +/* + * To change the process title in Linux and Darwin we have to set argv[1] + * to NULL and to copy the title to the same place where the argv[0] points to. + * However, argv[0] may be too small to hold a new title. Fortunately, Linux + * and Darwin store argv[] and environ[] one after another. So we should + * ensure that is the continuous memory and then we allocate the new memory + * for environ[] and copy it. After this we could use the memory starting + * from argv[0] for our process title. + * + * continuous memory block for process title + * ________________________________/\____________________________________ + * / \ + * +---------+---------+-----+------+------------+------------+-----+------+ + * | argv[0] | argv[1] | ... | NULL | environ[0] | environ[1] | ... | NULL | + * +---------+---------+-----+------+------------+------------+-----+------+ + * \_________________ _________________/ + * \/ + * must be relocated elsewhere + */ +void ProcTitleInit(int argc, const char* argv[]) +{ + Y_UNUSED(argc); + Y_ABORT_UNLESS(!g_OriginalArgv, "ProcTitleInit() was already called"); + + g_OriginalArgv = const_cast<char**>(argv); + + size_t size = 0; + for (int i = 0; environ[i]; i++) { + size += strlen(environ[i]) + 1; + } + + char* newEnviron = new char[size]; + g_OriginalArgvLast = g_OriginalArgv[0]; + + for (int i = 0; g_OriginalArgv[i]; i++) { + if (g_OriginalArgvLast == g_OriginalArgv[i]) { + g_OriginalArgvLast = g_OriginalArgv[i] + strlen(g_OriginalArgv[i]) + 1; + } + } + + for (int i = 0; environ[i]; i++) { + if (g_OriginalArgvLast == environ[i]) { + size_t size = strlen(environ[i]) + 1; + g_OriginalArgvLast = environ[i] + size; + + strncpy(newEnviron, environ[i], size); + environ[i] = newEnviron; + newEnviron += size; + } + } + + g_OriginalArgvLast--; +} + +void SetProcTitle(const char* title) +{ + if (!g_OriginalArgv) return; + + char* p = g_OriginalArgv[0]; + p += strlcpy(p, "yqlworker: ", g_OriginalArgvLast - p); + p += strlcpy(p, title, g_OriginalArgvLast - p); + + if (g_OriginalArgvLast - p > 0) { + memset(p, 0, g_OriginalArgvLast - p); + } + + g_OriginalArgv[1] = nullptr; +} + +void AddProcTitleSuffix(const char* suffix) +{ + if (!g_OriginalArgv) return; + + char* p = g_OriginalArgv[0]; + p += strlcat(p, " ", g_OriginalArgvLast - p); + p += strlcat(p, suffix, g_OriginalArgvLast - p); +} + +const char* GetProcTitle() +{ + return g_OriginalArgv ? g_OriginalArgv[0] : "UNKNOWN"; +} + +TString PbMessageToStr(const google::protobuf::Message& msg) +{ + TString str; + ::google::protobuf::TextFormat::Printer printer; + printer.SetSingleLineMode(true); + printer.PrintToString(msg, &str); + return str; +} + +TString Proto2Yson(const google::protobuf::Message& proto) { + NJson::TJsonValue json; + NProtobufJson::Proto2Json(proto, json); + + TString ysonResult; + TStringOutput stream(ysonResult); + NJson2Yson::SerializeJsonValueAsYson(json, &stream); + return ysonResult; +} + +} // namespace NYql diff --git a/yql/essentials/utils/signals/utils.h b/yql/essentials/utils/signals/utils.h new file mode 100644 index 0000000000..75c55244fa --- /dev/null +++ b/yql/essentials/utils/signals/utils.h @@ -0,0 +1,21 @@ +#pragma once + +#include <util/generic/fwd.h> + +namespace google { +namespace protobuf { + class Message; +} // namespace protobuf +} // namespace google + +namespace NYql { + +void ProcTitleInit(int argc, const char* argv[]); +void SetProcTitle(const char* title); +void AddProcTitleSuffix(const char* suffix); +const char* GetProcTitle(); + +TString PbMessageToStr(const google::protobuf::Message& msg); +TString Proto2Yson(const google::protobuf::Message& proto); + +} // namespace NYql diff --git a/yql/essentials/utils/signals/ya.make b/yql/essentials/utils/signals/ya.make new file mode 100644 index 0000000000..3c72783dbf --- /dev/null +++ b/yql/essentials/utils/signals/ya.make @@ -0,0 +1,20 @@ +LIBRARY() + +SRCS( + signals.cpp + signals.h + utils.cpp + utils.h +) + +PEERDIR( + contrib/libs/protobuf + library/cpp/logger/global + library/cpp/protobuf/json + library/cpp/json/yson + yql/essentials/utils/log + yql/essentials/utils/backtrace + contrib/ydb/library/yql/providers/yt/lib/log +) + +END() |