diff options
author | max42 <max42@yandex-team.com> | 2023-06-30 03:37:03 +0300 |
---|---|---|
committer | max42 <max42@yandex-team.com> | 2023-06-30 03:37:03 +0300 |
commit | fac2bd72b4b31ec3238292caf8fb2a8aaa6d6c4a (patch) | |
tree | b8cbc1deb00309c7f1a7ab6df520a76cf0b5c6d7 /yt/cpp/mapreduce/client/init.cpp | |
parent | 7bf166b1a7ed0af927f230022b245af618e998c1 (diff) | |
download | ydb-fac2bd72b4b31ec3238292caf8fb2a8aaa6d6c4a.tar.gz |
YT-19324: move YT provider to ydb/library/yql
This commit is formed by the following script: https://paste.yandex-team.ru/6f92e4b8-efc5-4d34-948b-15ee2accd7e7/text.
This commit has zero effect on all projects that depend on YQL.
The summary of changes:
- `yql/providers/yt -> ydb/library/yql/providers/yt `- the whole implementation of YT provider is moved into YDB code base for further export as a part of YT YQL plugin shared library;
- `yql/providers/stat/{expr_nodes,uploader} -> ydb/library/yql/providers/stat/{expr_nodes,uploader}` - a small interface without implementation and the description of stat expr nodes;
- `yql/core/extract_predicate/ut -> ydb/library/yql/core/extract_predicate/ut`;
- `yql/core/{ut,ut_common} -> ydb/library/yql/core/{ut,ut_common}`;
- `yql/core` is gone;
- `yql/library/url_preprocessing -> ydb/library/yql/core/url_preprocessing`.
**NB**: all new targets inside `ydb/` are under `IF (NOT CMAKE_EXPORT)` clause which disables them from open-source cmake generation and ya make build. They will be enabled in the subsequent commits.
Diffstat (limited to 'yt/cpp/mapreduce/client/init.cpp')
-rw-r--r-- | yt/cpp/mapreduce/client/init.cpp | 280 |
1 files changed, 280 insertions, 0 deletions
diff --git a/yt/cpp/mapreduce/client/init.cpp b/yt/cpp/mapreduce/client/init.cpp new file mode 100644 index 0000000000..c74598ba14 --- /dev/null +++ b/yt/cpp/mapreduce/client/init.cpp @@ -0,0 +1,280 @@ +#include "init.h" + +#include "abortable_registry.h" +#include "job_profiler.h" + +#include <yt/cpp/mapreduce/http/requests.h> + +#include <yt/cpp/mapreduce/interface/config.h> +#include <yt/cpp/mapreduce/interface/init.h> +#include <yt/cpp/mapreduce/interface/operation.h> + +#include <yt/cpp/mapreduce/interface/logging/logger.h> +#include <yt/cpp/mapreduce/interface/logging/yt_log.h> + +#include <yt/cpp/mapreduce/io/job_reader.h> + +#include <yt/cpp/mapreduce/common/helpers.h> +#include <yt/cpp/mapreduce/common/wait_proxy.h> + +#include <library/cpp/sighandler/async_signals_handler.h> + +#include <util/folder/dirut.h> + +#include <util/generic/singleton.h> + +#include <util/string/builder.h> +#include <util/string/cast.h> +#include <util/string/type.h> + +#include <util/system/env.h> +#include <util/system/thread.h> + +namespace NYT { + +//////////////////////////////////////////////////////////////////////////////// + +namespace { + +void WriteVersionToLog() +{ + YT_LOG_INFO("Wrapper version: %v", + TProcessState::Get()->ClientVersion); +} + +static TNode SecureVaultContents; // safe + +void InitializeSecureVault() +{ + SecureVaultContents = NodeFromYsonString( + GetEnv("YT_SECURE_VAULT", "{}")); +} + +} + +//////////////////////////////////////////////////////////////////////////////// + +const TNode& GetJobSecureVault() +{ + return SecureVaultContents; +} + +//////////////////////////////////////////////////////////////////////////////// + +class TAbnormalTerminator +{ +public: + TAbnormalTerminator() = default; + + static void SetErrorTerminationHandler() + { + if (Instance().OldHandler_ != nullptr) { + return; + } + + Instance().OldHandler_ = std::set_terminate(&TerminateHandler); + + SetAsyncSignalFunction(SIGINT, SignalHandler); + SetAsyncSignalFunction(SIGTERM, SignalHandler); + } + +private: + static TAbnormalTerminator& Instance() + { + return *Singleton<TAbnormalTerminator>(); + } + + static void* Invoke(void* opaque) + { + (*reinterpret_cast<std::function<void()>*>(opaque))(); + return nullptr; + } + + static void TerminateWithTimeout( + const TDuration& timeout, + const std::function<void(void)>& exitFunction, + const TString& logMessage) + { + std::function<void()> threadFun = [=] { + YT_LOG_INFO("%v", + logMessage); + NDetail::TAbortableRegistry::Get()->AbortAllAndBlockForever(); + }; + TThread thread(TThread::TParams(Invoke, &threadFun).SetName("aborter")); + thread.Start(); + thread.Detach(); + + Sleep(timeout); + exitFunction(); + } + + static void SignalHandler(int signalNumber) + { + TerminateWithTimeout( + TDuration::Seconds(5), + std::bind(_exit, -signalNumber), + ::TStringBuilder() << "Signal " << signalNumber << " received, aborting transactions. Waiting 5 seconds..."); + } + + static void TerminateHandler() + { + TerminateWithTimeout( + TDuration::Seconds(5), + [&] { + if (Instance().OldHandler_) { + Instance().OldHandler_(); + } else { + abort(); + } + }, + ::TStringBuilder() << "Terminate called, aborting transactions. Waiting 5 seconds..."); + } + +private: + std::terminate_handler OldHandler_ = nullptr; +}; + +//////////////////////////////////////////////////////////////////////////////// + +namespace NDetail { + +EInitStatus& GetInitStatus() +{ + static EInitStatus initStatus = EInitStatus::NotInitialized; + return initStatus; +} + +static void ElevateInitStatus(const EInitStatus newStatus) { + NDetail::GetInitStatus() = Max(NDetail::GetInitStatus(), newStatus); +} + +void CommonInitialize(int argc, const char** argv) +{ + auto logLevelStr = to_lower(TConfig::Get()->LogLevel); + ILogger::ELevel logLevel; + + if (!TryFromString(logLevelStr, logLevel)) { + Cerr << "Invalid log level: " << TConfig::Get()->LogLevel << Endl; + exit(1); + } + + SetLogger(CreateStdErrLogger(logLevel)); + + TProcessState::Get()->SetCommandLine(argc, argv); +} + +void NonJobInitialize(const TInitializeOptions& options) +{ + if (FromString<bool>(GetEnv("YT_CLEANUP_ON_TERMINATION", "0")) || options.CleanupOnTermination_) { + TAbnormalTerminator::SetErrorTerminationHandler(); + } + if (options.WaitProxy_) { + NDetail::TWaitProxy::Get()->SetProxy(options.WaitProxy_); + } + WriteVersionToLog(); +} + +void ExecJob(int argc, const char** argv, const TInitializeOptions& options) +{ + // Now we are definitely in job. + // We take this setting from environment variable to be consistent with client code. + TConfig::Get()->UseClientProtobuf = IsTrue(GetEnv("YT_USE_CLIENT_PROTOBUF", "")); + + auto execJobImpl = [&options](TString jobName, i64 outputTableCount, bool hasState) { + auto jobProfiler = CreateJobProfiler(); + jobProfiler->Start(); + + InitializeSecureVault(); + + NDetail::OutputTableCount = static_cast<i64>(outputTableCount); + + THolder<IInputStream> jobStateStream; + if (hasState) { + jobStateStream = MakeHolder<TIFStream>("jobstate"); + } else { + jobStateStream = MakeHolder<TBufferStream>(0); + } + + int ret = 1; + try { + ret = TJobFactory::Get()->GetJobFunction(jobName.data())(outputTableCount, *jobStateStream); + } catch (const TSystemError& ex) { + if (ex.Status() == EPIPE) { + // 32 == EPIPE, write number here so it's easier to grep this exit code in source files + exit(32); + } + throw; + } + + jobProfiler->Stop(); + + if (options.JobOnExitFunction_) { + (*options.JobOnExitFunction_)(); + } + exit(ret); + }; + + auto jobArguments = NodeFromYsonString(GetEnv("YT_JOB_ARGUMENTS", "#")); + if (jobArguments.HasValue()) { + execJobImpl( + jobArguments["job_name"].AsString(), + jobArguments["output_table_count"].AsInt64(), + jobArguments["has_state"].AsBool()); + Y_UNREACHABLE(); + } + + TString jobType = argc >= 2 ? argv[1] : TString(); + if (argc != 5 || jobType != "--yt-map" && jobType != "--yt-reduce") { + // We are inside job but probably using old API + // (i.e. both NYT::Initialize and NMR::Initialize are called). + WriteVersionToLog(); + return; + } + + TString jobName(argv[2]); + i64 outputTableCount = FromString<i64>(argv[3]); + int hasState = FromString<int>(argv[4]); + execJobImpl(jobName, outputTableCount, hasState); + Y_UNREACHABLE(); +} + +} // namespace NDetail + +//////////////////////////////////////////////////////////////////////////////// + +void JoblessInitialize(const TInitializeOptions& options) +{ + static const char* fakeArgv[] = {"unknown..."}; + NDetail::CommonInitialize(1, fakeArgv); + NDetail::NonJobInitialize(options); + NDetail::ElevateInitStatus(NDetail::EInitStatus::JoblessInitialization); +} + +void Initialize(int argc, const char* argv[], const TInitializeOptions& options) +{ + NDetail::CommonInitialize(argc, argv); + + NDetail::ElevateInitStatus(NDetail::EInitStatus::FullInitialization); + + const bool isInsideJob = !GetEnv("YT_JOB_ID").empty(); + if (isInsideJob) { + NDetail::ExecJob(argc, argv, options); + } else { + NDetail::NonJobInitialize(options); + } +} + +void Initialize(int argc, char* argv[], const TInitializeOptions& options) +{ + return Initialize(argc, const_cast<const char**>(argv), options); +} + +void Initialize(const TInitializeOptions& options) +{ + static const char* fakeArgv[] = {"unknown..."}; + Initialize(1, fakeArgv, options); +} + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT |