From 3e1899838408bbad47622007aa382bc8a2b01f87 Mon Sep 17 00:00:00 2001 From: max42 Date: Fri, 30 Jun 2023 11:13:34 +0300 Subject: Revert "YT-19324: move YT provider to ydb/library/yql" This reverts commit ca272f12fdd0e8d5c3e957fc87939148f1caaf72, reversing changes made to 49f8acfc8b0b5c0071b804423bcf53fda26c7c12. --- yt/cpp/mapreduce/client/init.cpp | 280 --------------------------------------- 1 file changed, 280 deletions(-) delete mode 100644 yt/cpp/mapreduce/client/init.cpp (limited to 'yt/cpp/mapreduce/client/init.cpp') diff --git a/yt/cpp/mapreduce/client/init.cpp b/yt/cpp/mapreduce/client/init.cpp deleted file mode 100644 index c74598ba14b..00000000000 --- a/yt/cpp/mapreduce/client/init.cpp +++ /dev/null @@ -1,280 +0,0 @@ -#include "init.h" - -#include "abortable_registry.h" -#include "job_profiler.h" - -#include - -#include -#include -#include - -#include -#include - -#include - -#include -#include - -#include - -#include - -#include - -#include -#include -#include - -#include -#include - -namespace NYT { - -//////////////////////////////////////////////////////////////////////////////// - -namespace { - -void WriteVersionToLog() -{ - YT_LOG_INFO("Wrapper version: %v", - TProcessState::Get()->ClientVersion); -} - -static TNode SecureVaultContents; // safe - -void InitializeSecureVault() -{ - SecureVaultContents = NodeFromYsonString( - GetEnv("YT_SECURE_VAULT", "{}")); -} - -} - -//////////////////////////////////////////////////////////////////////////////// - -const TNode& GetJobSecureVault() -{ - return SecureVaultContents; -} - -//////////////////////////////////////////////////////////////////////////////// - -class TAbnormalTerminator -{ -public: - TAbnormalTerminator() = default; - - static void SetErrorTerminationHandler() - { - if (Instance().OldHandler_ != nullptr) { - return; - } - - Instance().OldHandler_ = std::set_terminate(&TerminateHandler); - - SetAsyncSignalFunction(SIGINT, SignalHandler); - SetAsyncSignalFunction(SIGTERM, SignalHandler); - } - -private: - static TAbnormalTerminator& Instance() - { - return *Singleton(); - } - - static void* Invoke(void* opaque) - { - (*reinterpret_cast*>(opaque))(); - return nullptr; - } - - static void TerminateWithTimeout( - const TDuration& timeout, - const std::function& exitFunction, - const TString& logMessage) - { - std::function threadFun = [=] { - YT_LOG_INFO("%v", - logMessage); - NDetail::TAbortableRegistry::Get()->AbortAllAndBlockForever(); - }; - TThread thread(TThread::TParams(Invoke, &threadFun).SetName("aborter")); - thread.Start(); - thread.Detach(); - - Sleep(timeout); - exitFunction(); - } - - static void SignalHandler(int signalNumber) - { - TerminateWithTimeout( - TDuration::Seconds(5), - std::bind(_exit, -signalNumber), - ::TStringBuilder() << "Signal " << signalNumber << " received, aborting transactions. Waiting 5 seconds..."); - } - - static void TerminateHandler() - { - TerminateWithTimeout( - TDuration::Seconds(5), - [&] { - if (Instance().OldHandler_) { - Instance().OldHandler_(); - } else { - abort(); - } - }, - ::TStringBuilder() << "Terminate called, aborting transactions. Waiting 5 seconds..."); - } - -private: - std::terminate_handler OldHandler_ = nullptr; -}; - -//////////////////////////////////////////////////////////////////////////////// - -namespace NDetail { - -EInitStatus& GetInitStatus() -{ - static EInitStatus initStatus = EInitStatus::NotInitialized; - return initStatus; -} - -static void ElevateInitStatus(const EInitStatus newStatus) { - NDetail::GetInitStatus() = Max(NDetail::GetInitStatus(), newStatus); -} - -void CommonInitialize(int argc, const char** argv) -{ - auto logLevelStr = to_lower(TConfig::Get()->LogLevel); - ILogger::ELevel logLevel; - - if (!TryFromString(logLevelStr, logLevel)) { - Cerr << "Invalid log level: " << TConfig::Get()->LogLevel << Endl; - exit(1); - } - - SetLogger(CreateStdErrLogger(logLevel)); - - TProcessState::Get()->SetCommandLine(argc, argv); -} - -void NonJobInitialize(const TInitializeOptions& options) -{ - if (FromString(GetEnv("YT_CLEANUP_ON_TERMINATION", "0")) || options.CleanupOnTermination_) { - TAbnormalTerminator::SetErrorTerminationHandler(); - } - if (options.WaitProxy_) { - NDetail::TWaitProxy::Get()->SetProxy(options.WaitProxy_); - } - WriteVersionToLog(); -} - -void ExecJob(int argc, const char** argv, const TInitializeOptions& options) -{ - // Now we are definitely in job. - // We take this setting from environment variable to be consistent with client code. - TConfig::Get()->UseClientProtobuf = IsTrue(GetEnv("YT_USE_CLIENT_PROTOBUF", "")); - - auto execJobImpl = [&options](TString jobName, i64 outputTableCount, bool hasState) { - auto jobProfiler = CreateJobProfiler(); - jobProfiler->Start(); - - InitializeSecureVault(); - - NDetail::OutputTableCount = static_cast(outputTableCount); - - THolder jobStateStream; - if (hasState) { - jobStateStream = MakeHolder("jobstate"); - } else { - jobStateStream = MakeHolder(0); - } - - int ret = 1; - try { - ret = TJobFactory::Get()->GetJobFunction(jobName.data())(outputTableCount, *jobStateStream); - } catch (const TSystemError& ex) { - if (ex.Status() == EPIPE) { - // 32 == EPIPE, write number here so it's easier to grep this exit code in source files - exit(32); - } - throw; - } - - jobProfiler->Stop(); - - if (options.JobOnExitFunction_) { - (*options.JobOnExitFunction_)(); - } - exit(ret); - }; - - auto jobArguments = NodeFromYsonString(GetEnv("YT_JOB_ARGUMENTS", "#")); - if (jobArguments.HasValue()) { - execJobImpl( - jobArguments["job_name"].AsString(), - jobArguments["output_table_count"].AsInt64(), - jobArguments["has_state"].AsBool()); - Y_UNREACHABLE(); - } - - TString jobType = argc >= 2 ? argv[1] : TString(); - if (argc != 5 || jobType != "--yt-map" && jobType != "--yt-reduce") { - // We are inside job but probably using old API - // (i.e. both NYT::Initialize and NMR::Initialize are called). - WriteVersionToLog(); - return; - } - - TString jobName(argv[2]); - i64 outputTableCount = FromString(argv[3]); - int hasState = FromString(argv[4]); - execJobImpl(jobName, outputTableCount, hasState); - Y_UNREACHABLE(); -} - -} // namespace NDetail - -//////////////////////////////////////////////////////////////////////////////// - -void JoblessInitialize(const TInitializeOptions& options) -{ - static const char* fakeArgv[] = {"unknown..."}; - NDetail::CommonInitialize(1, fakeArgv); - NDetail::NonJobInitialize(options); - NDetail::ElevateInitStatus(NDetail::EInitStatus::JoblessInitialization); -} - -void Initialize(int argc, const char* argv[], const TInitializeOptions& options) -{ - NDetail::CommonInitialize(argc, argv); - - NDetail::ElevateInitStatus(NDetail::EInitStatus::FullInitialization); - - const bool isInsideJob = !GetEnv("YT_JOB_ID").empty(); - if (isInsideJob) { - NDetail::ExecJob(argc, argv, options); - } else { - NDetail::NonJobInitialize(options); - } -} - -void Initialize(int argc, char* argv[], const TInitializeOptions& options) -{ - return Initialize(argc, const_cast(argv), options); -} - -void Initialize(const TInitializeOptions& options) -{ - static const char* fakeArgv[] = {"unknown..."}; - Initialize(1, fakeArgv, options); -} - -//////////////////////////////////////////////////////////////////////////////// - -} // namespace NYT -- cgit v1.3