summaryrefslogtreecommitdiffstats
path: root/yt/cpp/mapreduce/client/init.cpp
diff options
context:
space:
mode:
authormax42 <[email protected]>2023-06-30 11:13:34 +0300
committermax42 <[email protected]>2023-06-30 11:13:34 +0300
commit3e1899838408bbad47622007aa382bc8a2b01f87 (patch)
tree0f21c1e6add187ddb6c3ccc048a7d640ce03fb87 /yt/cpp/mapreduce/client/init.cpp
parent5463eb3f5e72a86f858a3d27c886470a724ede34 (diff)
Revert "YT-19324: move YT provider to ydb/library/yql"
This reverts commit ca272f12fdd0e8d5c3e957fc87939148f1caaf72, reversing changes made to 49f8acfc8b0b5c0071b804423bcf53fda26c7c12.
Diffstat (limited to 'yt/cpp/mapreduce/client/init.cpp')
-rw-r--r--yt/cpp/mapreduce/client/init.cpp280
1 files changed, 0 insertions, 280 deletions
diff --git a/yt/cpp/mapreduce/client/init.cpp b/yt/cpp/mapreduce/client/init.cpp
deleted file mode 100644
index c74598ba14b..00000000000
--- a/yt/cpp/mapreduce/client/init.cpp
+++ /dev/null
@@ -1,280 +0,0 @@
-#include "init.h"
-
-#include "abortable_registry.h"
-#include "job_profiler.h"
-
-#include <yt/cpp/mapreduce/http/requests.h>
-
-#include <yt/cpp/mapreduce/interface/config.h>
-#include <yt/cpp/mapreduce/interface/init.h>
-#include <yt/cpp/mapreduce/interface/operation.h>
-
-#include <yt/cpp/mapreduce/interface/logging/logger.h>
-#include <yt/cpp/mapreduce/interface/logging/yt_log.h>
-
-#include <yt/cpp/mapreduce/io/job_reader.h>
-
-#include <yt/cpp/mapreduce/common/helpers.h>
-#include <yt/cpp/mapreduce/common/wait_proxy.h>
-
-#include <library/cpp/sighandler/async_signals_handler.h>
-
-#include <util/folder/dirut.h>
-
-#include <util/generic/singleton.h>
-
-#include <util/string/builder.h>
-#include <util/string/cast.h>
-#include <util/string/type.h>
-
-#include <util/system/env.h>
-#include <util/system/thread.h>
-
-namespace NYT {
-
-////////////////////////////////////////////////////////////////////////////////
-
-namespace {
-
-void WriteVersionToLog()
-{
- YT_LOG_INFO("Wrapper version: %v",
- TProcessState::Get()->ClientVersion);
-}
-
-static TNode SecureVaultContents; // safe
-
-void InitializeSecureVault()
-{
- SecureVaultContents = NodeFromYsonString(
- GetEnv("YT_SECURE_VAULT", "{}"));
-}
-
-}
-
-////////////////////////////////////////////////////////////////////////////////
-
-const TNode& GetJobSecureVault()
-{
- return SecureVaultContents;
-}
-
-////////////////////////////////////////////////////////////////////////////////
-
-class TAbnormalTerminator
-{
-public:
- TAbnormalTerminator() = default;
-
- static void SetErrorTerminationHandler()
- {
- if (Instance().OldHandler_ != nullptr) {
- return;
- }
-
- Instance().OldHandler_ = std::set_terminate(&TerminateHandler);
-
- SetAsyncSignalFunction(SIGINT, SignalHandler);
- SetAsyncSignalFunction(SIGTERM, SignalHandler);
- }
-
-private:
- static TAbnormalTerminator& Instance()
- {
- return *Singleton<TAbnormalTerminator>();
- }
-
- static void* Invoke(void* opaque)
- {
- (*reinterpret_cast<std::function<void()>*>(opaque))();
- return nullptr;
- }
-
- static void TerminateWithTimeout(
- const TDuration& timeout,
- const std::function<void(void)>& exitFunction,
- const TString& logMessage)
- {
- std::function<void()> threadFun = [=] {
- YT_LOG_INFO("%v",
- logMessage);
- NDetail::TAbortableRegistry::Get()->AbortAllAndBlockForever();
- };
- TThread thread(TThread::TParams(Invoke, &threadFun).SetName("aborter"));
- thread.Start();
- thread.Detach();
-
- Sleep(timeout);
- exitFunction();
- }
-
- static void SignalHandler(int signalNumber)
- {
- TerminateWithTimeout(
- TDuration::Seconds(5),
- std::bind(_exit, -signalNumber),
- ::TStringBuilder() << "Signal " << signalNumber << " received, aborting transactions. Waiting 5 seconds...");
- }
-
- static void TerminateHandler()
- {
- TerminateWithTimeout(
- TDuration::Seconds(5),
- [&] {
- if (Instance().OldHandler_) {
- Instance().OldHandler_();
- } else {
- abort();
- }
- },
- ::TStringBuilder() << "Terminate called, aborting transactions. Waiting 5 seconds...");
- }
-
-private:
- std::terminate_handler OldHandler_ = nullptr;
-};
-
-////////////////////////////////////////////////////////////////////////////////
-
-namespace NDetail {
-
-EInitStatus& GetInitStatus()
-{
- static EInitStatus initStatus = EInitStatus::NotInitialized;
- return initStatus;
-}
-
-static void ElevateInitStatus(const EInitStatus newStatus) {
- NDetail::GetInitStatus() = Max(NDetail::GetInitStatus(), newStatus);
-}
-
-void CommonInitialize(int argc, const char** argv)
-{
- auto logLevelStr = to_lower(TConfig::Get()->LogLevel);
- ILogger::ELevel logLevel;
-
- if (!TryFromString(logLevelStr, logLevel)) {
- Cerr << "Invalid log level: " << TConfig::Get()->LogLevel << Endl;
- exit(1);
- }
-
- SetLogger(CreateStdErrLogger(logLevel));
-
- TProcessState::Get()->SetCommandLine(argc, argv);
-}
-
-void NonJobInitialize(const TInitializeOptions& options)
-{
- if (FromString<bool>(GetEnv("YT_CLEANUP_ON_TERMINATION", "0")) || options.CleanupOnTermination_) {
- TAbnormalTerminator::SetErrorTerminationHandler();
- }
- if (options.WaitProxy_) {
- NDetail::TWaitProxy::Get()->SetProxy(options.WaitProxy_);
- }
- WriteVersionToLog();
-}
-
-void ExecJob(int argc, const char** argv, const TInitializeOptions& options)
-{
- // Now we are definitely in job.
- // We take this setting from environment variable to be consistent with client code.
- TConfig::Get()->UseClientProtobuf = IsTrue(GetEnv("YT_USE_CLIENT_PROTOBUF", ""));
-
- auto execJobImpl = [&options](TString jobName, i64 outputTableCount, bool hasState) {
- auto jobProfiler = CreateJobProfiler();
- jobProfiler->Start();
-
- InitializeSecureVault();
-
- NDetail::OutputTableCount = static_cast<i64>(outputTableCount);
-
- THolder<IInputStream> jobStateStream;
- if (hasState) {
- jobStateStream = MakeHolder<TIFStream>("jobstate");
- } else {
- jobStateStream = MakeHolder<TBufferStream>(0);
- }
-
- int ret = 1;
- try {
- ret = TJobFactory::Get()->GetJobFunction(jobName.data())(outputTableCount, *jobStateStream);
- } catch (const TSystemError& ex) {
- if (ex.Status() == EPIPE) {
- // 32 == EPIPE, write number here so it's easier to grep this exit code in source files
- exit(32);
- }
- throw;
- }
-
- jobProfiler->Stop();
-
- if (options.JobOnExitFunction_) {
- (*options.JobOnExitFunction_)();
- }
- exit(ret);
- };
-
- auto jobArguments = NodeFromYsonString(GetEnv("YT_JOB_ARGUMENTS", "#"));
- if (jobArguments.HasValue()) {
- execJobImpl(
- jobArguments["job_name"].AsString(),
- jobArguments["output_table_count"].AsInt64(),
- jobArguments["has_state"].AsBool());
- Y_UNREACHABLE();
- }
-
- TString jobType = argc >= 2 ? argv[1] : TString();
- if (argc != 5 || jobType != "--yt-map" && jobType != "--yt-reduce") {
- // We are inside job but probably using old API
- // (i.e. both NYT::Initialize and NMR::Initialize are called).
- WriteVersionToLog();
- return;
- }
-
- TString jobName(argv[2]);
- i64 outputTableCount = FromString<i64>(argv[3]);
- int hasState = FromString<int>(argv[4]);
- execJobImpl(jobName, outputTableCount, hasState);
- Y_UNREACHABLE();
-}
-
-} // namespace NDetail
-
-////////////////////////////////////////////////////////////////////////////////
-
-void JoblessInitialize(const TInitializeOptions& options)
-{
- static const char* fakeArgv[] = {"unknown..."};
- NDetail::CommonInitialize(1, fakeArgv);
- NDetail::NonJobInitialize(options);
- NDetail::ElevateInitStatus(NDetail::EInitStatus::JoblessInitialization);
-}
-
-void Initialize(int argc, const char* argv[], const TInitializeOptions& options)
-{
- NDetail::CommonInitialize(argc, argv);
-
- NDetail::ElevateInitStatus(NDetail::EInitStatus::FullInitialization);
-
- const bool isInsideJob = !GetEnv("YT_JOB_ID").empty();
- if (isInsideJob) {
- NDetail::ExecJob(argc, argv, options);
- } else {
- NDetail::NonJobInitialize(options);
- }
-}
-
-void Initialize(int argc, char* argv[], const TInitializeOptions& options)
-{
- return Initialize(argc, const_cast<const char**>(argv), options);
-}
-
-void Initialize(const TInitializeOptions& options)
-{
- static const char* fakeArgv[] = {"unknown..."};
- Initialize(1, fakeArgv, options);
-}
-
-////////////////////////////////////////////////////////////////////////////////
-
-} // namespace NYT