diff options
author | Marina Pereskokova <35163152+Krisha11@users.noreply.github.com> | 2024-01-31 12:16:49 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-01-31 10:16:49 +0100 |
commit | cd27de93a8126f873c11f0b40309b5a3d7339a7b (patch) | |
tree | 13bd82ba9356f0230b6f6777a2d1481f37c342b6 | |
parent | 8c77bc140c36d19e235725554b9e5ceaf141d304 (diff) | |
download | ydb-cd27de93a8126f873c11f0b40309b5a3d7339a7b.tar.gz |
Move yql dq job core in os (#1440)
-rw-r--r-- | ydb/library/yql/tools/dq/worker_job/dq_worker.cpp | 373 | ||||
-rw-r--r-- | ydb/library/yql/tools/dq/worker_job/dq_worker.h | 44 | ||||
-rw-r--r-- | ydb/library/yql/tools/dq/worker_job/ya.make | 31 | ||||
-rw-r--r-- | ydb/library/yql/tools/dq/ya.make | 1 |
4 files changed, 449 insertions, 0 deletions
diff --git a/ydb/library/yql/tools/dq/worker_job/dq_worker.cpp b/ydb/library/yql/tools/dq/worker_job/dq_worker.cpp new file mode 100644 index 0000000000..a47739ac43 --- /dev/null +++ b/ydb/library/yql/tools/dq/worker_job/dq_worker.cpp @@ -0,0 +1,373 @@ +#include "dq_worker.h" + +#include <ydb/library/yql/utils/signals/signals.h> +#include <ydb/library/yql/utils/bind_in_range.h> + +#include <ydb/library/yql/providers/dq/stats_collector/pool_stats_collector.h> +#include <ydb/library/yql/providers/dq/actors/yt/nodeid_assigner.h> +#include <ydb/library/yql/providers/dq/actors/dynamic_nameserver.h> +#include <ydb/library/yql/providers/dq/service/interconnect_helpers.h> +#include <ydb/library/yql/providers/dq/global_worker_manager/coordination_helper.h> + +#include <ydb/library/yql/providers/dq/runtime/file_cache.h> +#include <ydb/library/yql/providers/dq/runtime/runtime_data.h> +#include <ydb/library/yql/providers/dq/worker_manager/local_worker_manager.h> + +#include <ydb/library/yql/dq/actors/spilling/spilling_file.h> + +#include <ydb/library/yql/utils/log/log.h> +#include <ydb/library/yql/utils/log/tls_backend.h> +#include <ydb/library/yql/utils/yql_panic.h> +#include <ydb/library/yql/utils/range_walker.h> + +#include <yt/yt/core/actions/invoker.h> +#include <yt/yt/core/concurrency/action_queue.h> +#include <yt/yt/core/concurrency/thread_pool.h> + +#include <library/cpp/protobuf/util/pb_io.h> + +#include <util/system/fs.h> +#include <util/stream/file.h> +#include <util/system/env.h> +#include <util/system/shellcommand.h> + +using namespace NYql::NDqs; + +namespace { + template <typename TMessage> + THolder<TMessage> ParseProtoConfig(const TString& cfgFile) { + auto config = MakeHolder<TMessage>(); + TString configData = TFileInput(cfgFile).ReadAll();; + + using ::google::protobuf::TextFormat; + if (!TextFormat::ParseFromString(configData, config.Get())) { + YQL_LOG(ERROR) << "Bad format of dq_vanilla_job configuration"; + return {}; + } + + return config; + } + + static NThreading::TPromise<void> ShouldContinue = NThreading::NewPromise<void>(); + + static void OnTerminate(int) { + ShouldContinue.SetValue(); + } + + class TSerializedTaskRunnerInvoker: public ITaskRunnerInvoker { + public: + TSerializedTaskRunnerInvoker(const NYT::IInvokerPtr& invoker) + : Invoker(NYT::NConcurrency::CreateSerializedInvoker(invoker)) + { } + + void Invoke(const std::function<void(void)>& f) override { + Invoker->Invoke(BIND(f)); + } + + private: + const NYT::IInvokerPtr Invoker; + }; + + class TConcurrentInvokerFactory: public ITaskRunnerInvokerFactory { + public: + TConcurrentInvokerFactory(int capacity) + : ThreadPool(NYT::NConcurrency::CreateThreadPool(capacity, "WorkerActor")) + { } + + ITaskRunnerInvoker::TPtr Create() override { + return new TSerializedTaskRunnerInvoker(ThreadPool->GetInvoker()); + } + + NYT::NConcurrency::IThreadPoolPtr ThreadPool; + }; + + void ConfigurePorto(const NYql::NProto::TDqConfig::TYtBackend& config, const TString portoCtl) { + TString settings[][2] = { + {"enable_porto", "isolate"}, + {"respawn", "true"} + }; + int nSettings = 2; + { + TShellCommand cmd(portoCtl, {"create", "Outer"}); + cmd.Run().Wait(); + } + for (int i = 0; i < nSettings; i++) { + TShellCommand cmd(portoCtl, {"set", "Outer", settings[i][0], settings[i][1]}); + cmd.Run().Wait(); + } + for (const auto& attr : config.GetPortoSettings().GetSetting()) { + TShellCommand cmd(portoCtl, {"set", "Outer", attr.GetName(), attr.GetValue()}); + cmd.Run().Wait(); + } + { + TShellCommand cmd(portoCtl, {"start", "Outer"}); + cmd.Run().Wait(); + } + { + TShellCommand cmd(portoCtl, {"wait", "Outer"}); + cmd.Run().Wait(); + } + } +} + +namespace NYql::NDq::NWorker { + + void TDefaultWorkerConfigurator::ConfigureMetrics(const THolder<NYql::NProto::TLoggingConfig>& /*loggerConfig*/, const THolder<NActors::TActorSystem>& /*actorSystem*/, const NProto::TDqConfig::TYtBackend& /*backendConfig*/, const TResourceManagerOptions& /*rmOptions*/, ui32 /*nodeId*/) const { + } + + NDq::IDqAsyncIoFactory::TPtr TDefaultWorkerConfigurator::CreateAsyncIoFactory() const { + return MakeIntrusive<NYql::NDq::TDqAsyncIoFactory>(); + } + + void TDefaultWorkerConfigurator::OnWorkerFinish() { + } + + TWorkerJob::TWorkerJob() + : WorkerConfigurator(MakeHolder<TDefaultWorkerConfigurator>(TDefaultWorkerConfigurator())) + { } + + void TWorkerJob::SetConfigFile(const TString& configFile) { + ConfigFile = configFile; + } + + void TWorkerJob::SetWorkerConfigurator(THolder<IWorkerConfigurator> workerConfigurator) { + WorkerConfigurator = std::move(workerConfigurator); + } + + void TWorkerJob::Do() { + + auto loggerConfig = MakeHolder<NYql::NProto::TLoggingConfig>(); + + ui16 startPort = 0; + + auto deterministicMode = !!GetEnv("YQL_DETERMINISTIC_MODE"); + + YQL_ENSURE(TryFromString<ui16>(GetEnv(NCommonJobVars::ACTOR_PORT), startPort), + "Invalid service config port env var empty"); + + ui32 tryNodeId; + YQL_ENSURE(TryFromString<ui32>(GetEnv(NCommonJobVars::ACTOR_NODE_ID, "0"), tryNodeId), + "Invalid nodeId env var"); + + if (!ConfigFile.empty()) { + loggerConfig = ParseProtoConfig<NYql::NProto::TLoggingConfig>(ConfigFile); + + for (auto& logDest : *loggerConfig->MutableLogDest()) { + if (logDest.GetType() == NYql::NProto::TLoggingConfig::FILE) { + TString logFile = logDest.GetTarget() + "." + ToString(tryNodeId); + logDest.SetTarget(logFile); + } + } + + loggerConfig->SetAllComponentsLevel(NYql::NProto::TLoggingConfig::TRACE); + } else { + loggerConfig->SetAllComponentsLevel(NYql::NProto::TLoggingConfig::DEBUG); + } + NYql::NLog::InitLogger(*loggerConfig, false); + InitSignals(); + + TString fileCacheDir = GetEnv(NCommonJobVars::UDFS_PATH); + TString ytCoordinatorStr = GetEnv(TString("YT_SECURE_VAULT_") + NCommonJobVars::YT_COORDINATOR); + + TString ytBackendStr = GetEnv(TString("YT_SECURE_VAULT_") + NCommonJobVars::YT_BACKEND); + + TString operationId = GetEnv("YT_OPERATION_ID"); + TString jobId = GetEnv("YT_JOB_ID"); + + TString operationSize = GetEnv(NCommonJobVars::OPERATION_SIZE); + + NProto::TDqConfig::TYtCoordinator coordinatorConfig; + TStringInput inputStream1(ytCoordinatorStr); + ParseFromTextFormat(inputStream1, coordinatorConfig, EParseFromTextFormatOption::AllowUnknownField); + + NProto::TDqConfig::TYtBackend backendConfig; + TStringInput inputStream2(ytBackendStr); + ParseFromTextFormat(inputStream2, backendConfig, EParseFromTextFormatOption::AllowUnknownField); + + TRangeWalker<int> portWalker(startPort, startPort+100); + auto ports = BindInRange(portWalker); + + auto [host, ip] = NYql::NDqs::GetLocalAddress( + coordinatorConfig.HasHostName() ? &coordinatorConfig.GetHostName() : nullptr + ); + + auto coordinator = CreateCoordiantionHelper(coordinatorConfig, NProto::TDqConfig::TScheduler(), "worker_node", ports[1].Addr.GetPort(), host, ip); + i64 cacheSize = backendConfig.HasCacheSize() + ? backendConfig.GetCacheSize() + : 16000000000L; + TIntrusivePtr<IFileCache> fileCache = new TFileCache(fileCacheDir + "/cache", cacheSize); + NFs::SymLink(fileCacheDir, "file_cache"); // COMPAT + TString layerDir = fileCacheDir + "/layer"; + if (backendConfig.GetPortoLayer().size() > 0) { + NFs::MakeDirectoryRecursive(layerDir + "/mnt/work"); + for (const auto& layerPath : backendConfig.GetPortoLayer()) { + auto pos = layerPath.rfind('/'); + auto archive = layerPath.substr(pos+1); + TShellCommand cmd("tar", {"xf", archive, "-C", layerDir}); + cmd.Run().Wait(); + } + } else { + NFs::MakeDirectoryRecursive("mnt/work"); + NFs::MakeDirectoryRecursive("usr/local/bin"); + } + + int capacity = backendConfig.GetWorkerCapacity() + ? backendConfig.GetWorkerCapacity() + : 1; + + NYql::NTaskRunnerProxy::TPipeFactoryOptions pfOptions; + pfOptions.ExecPath = GetExecPath(); + pfOptions.FileCache = fileCache; + if (deterministicMode) { + YQL_LOG(DEBUG) << "deterministicMode On"; + pfOptions.Env["YQL_DETERMINISTIC_MODE"] = "1"; + } + if (backendConfig.GetEnforceJobUtc()) { + pfOptions.Env["TZ"] = "UTC0"; + } + pfOptions.EnablePorto = backendConfig.GetEnablePorto() == "isolate"; + pfOptions.PortoLayer = backendConfig.GetPortoLayer().size() == 0 ? "" : layerDir; + pfOptions.MaxProcesses = capacity*1.5; + pfOptions.ContainerName = "Outer"; + + TResourceManagerOptions rmOptions; + rmOptions.YtBackend = backendConfig; + rmOptions.FileCache = fileCache; + rmOptions.TmpDir = fileCacheDir + "/tmp"; + + if (NFs::Exists(layerDir + "/usr/bin/portoctl")) { + TString dst = fileCache->GetDir() + "/portoctl"; + NFs::Copy(layerDir + "/usr/bin/portoctl", dst); + NFs::HardLink(layerDir + "/usr/bin/portoctl", layerDir + "/usr/sbin/portoctl"); // workaround PORTO-997 + chmod(dst.c_str(), 0755); + pfOptions.PortoCtlPath = dst; + rmOptions.DieOnFileAbsence = dst; // die on file absence + } + + Cerr << host + ":" + ip << Endl; + + THashMap<TString, TString> attributes; + attributes[NCommonAttrs::OPERATIONID_ATTR] = operationId; + attributes[NCommonAttrs::OPERATIONSIZE_ATTR] = operationSize; + attributes[NCommonAttrs::JOBID_ATTR] = jobId; + attributes[NCommonAttrs::CLUSTERNAME_ATTR] = backendConfig.GetClusterName(); + + auto nodeIdOpt = (tryNodeId == 0) + ? TMaybe<ui32>() + : TMaybe<ui32>(tryNodeId); + auto nodeId = coordinator->GetNodeId( + nodeIdOpt, + {}, + static_cast<ui32>(NDqs::ENodeIdLimits::MinWorkerNodeId), + static_cast<ui32>(NDqs::ENodeIdLimits::MaxWorkerNodeId), + attributes); + + Y_ABORT_UNLESS( + static_cast<ui32>(NDqs::ENodeIdLimits::MinWorkerNodeId) <= nodeId && + nodeId < static_cast<ui32>(NDqs::ENodeIdLimits::MaxWorkerNodeId)); + + Cerr << "My nodeId: " << nodeId << Endl; + + Cerr << "Configure porto" << Endl; + if (backendConfig.GetEnablePorto() == "isolate") { + ConfigurePorto(backendConfig, pfOptions.PortoCtlPath); + } + Cerr << "Configure porto done" << Endl; + + auto dqSensors = GetSensorsGroupFor(NSensorComponent::kDq); + THolder<NActors::TActorSystemSetup> setup; + TIntrusivePtr<NActors::NLog::TSettings> logSettings; + std::tie(setup, logSettings) = BuildActorSetup( + nodeId, + ip, + ports[1].Addr.GetPort(), + ports[1].Socket->Release(), + {}, + dqSensors, + [](const TIntrusivePtr<NActors::TTableNameserverSetup>& setup) { + return NYql::NDqs::CreateDynamicNameserver(setup); + }, + Nothing(), + backendConfig.GetICSettings()); + + auto statsCollector = CreateStatsCollector(5, *setup.Get(), dqSensors); + + auto actorSystem = MakeHolder<NActors::TActorSystem>(setup, nullptr, logSettings); + + actorSystem->Start(); + + actorSystem->Register(statsCollector); + + TVector<TString> hostPortPairs; + for (auto hostPortPair : coordinatorConfig.GetServiceNodeHostPort()) { + hostPortPairs.emplace_back(hostPortPair); + // tests + if (hostPortPair.StartsWith("localhost")) { + rmOptions.ExitOnPingFail = true; + } + } + + WorkerConfigurator->ConfigureMetrics(loggerConfig, actorSystem, backendConfig, rmOptions, nodeId); + + // rmOptions.MetricsRegistry = CreateMetricsRegistry(dqSensors); // send metrics to gwm, unsupported + auto resolver = coordinator->CreateServiceNodeResolver(actorSystem.Get(), hostPortPairs); + actorSystem->Register(coordinator->CreateServiceNodePinger(resolver, rmOptions, attributes)); + + NLog::YqlLogger().UpdateProcInfo(jobId + "/" + GetGuidAsString(coordinator->GetRuntimeData()->WorkerId)); + + // For testing only + THashMap<TString, TString> clusterMapping; + clusterMapping["plato"] = backendConfig.GetClusterName(); + + auto proxyFactory = NTaskRunnerProxy::CreatePipeFactory(pfOptions); + ITaskRunnerInvokerFactory::TPtr invokerFactory = new TConcurrentInvokerFactory(2*capacity); + auto taskRunnerActorFactory = NTaskRunnerActor::CreateTaskRunnerActorFactory(proxyFactory, invokerFactory, nullptr, coordinator->GetRuntimeData()); + + TLocalWorkerManagerOptions lwmOptions; + lwmOptions.Factory = proxyFactory; + lwmOptions.TaskRunnerActorFactory = taskRunnerActorFactory; + lwmOptions.AsyncIoFactory = WorkerConfigurator->CreateAsyncIoFactory(); + lwmOptions.RuntimeData = coordinator->GetRuntimeData(); + lwmOptions.TaskRunnerInvokerFactory = invokerFactory; + lwmOptions.ClusterNamesMapping = clusterMapping; + lwmOptions.ComputeActorOwnsCounters = true; + + auto resman = NDqs::CreateLocalWorkerManager(lwmOptions); + + auto workerManagerActorId = actorSystem->Register(resman); + actorSystem->RegisterLocalService(MakeWorkerManagerActorID(nodeId), workerManagerActorId); + + if (backendConfig.HasSpillingSettings()) { + auto spilling = NDq::CreateDqLocalFileSpillingService( + NDq::TFileSpillingServiceConfig { + .Root = backendConfig.GetSpillingSettings().GetRoot(), + .MaxTotalSize = backendConfig.GetSpillingSettings().GetMaxTotalSize(), + .MaxFileSize = backendConfig.GetSpillingSettings().GetMaxFileSize(), + .MaxFilePartSize = backendConfig.GetSpillingSettings().GetMaxFilePartSize(), + .IoThreadPoolWorkersCount = backendConfig.GetSpillingSettings().GetIoThreadPoolWorkersCount(), + .IoThreadPoolQueueSize = backendConfig.GetSpillingSettings().GetIoThreadPoolQueueSize(), + .CleanupOnShutdown = backendConfig.GetSpillingSettings().GetCleanupOnShutdown() + }, + MakeIntrusive<NDq::TSpillingCounters>(dqSensors) + ); + auto spillingActor = actorSystem->Register(spilling); + actorSystem->RegisterLocalService(NDq::MakeDqLocalFileSpillingServiceID(nodeId), spillingActor); + } + + auto endFuture = ShouldContinue.GetFuture(); + + signal(SIGINT, &OnTerminate); + signal(SIGTERM, &OnTerminate); + signal(SIGPIPE, SIG_IGN); + + // run forever + + endFuture.Wait(); + WorkerConfigurator->OnWorkerFinish(); + actorSystem->Stop(); + dqSensors->OutputHtml(Cerr); + } + + REGISTER_VANILLA_JOB(TWorkerJob); + +} // namespace NYql::NDq::NWorker diff --git a/ydb/library/yql/tools/dq/worker_job/dq_worker.h b/ydb/library/yql/tools/dq/worker_job/dq_worker.h new file mode 100644 index 0000000000..f37895c6dc --- /dev/null +++ b/ydb/library/yql/tools/dq/worker_job/dq_worker.h @@ -0,0 +1,44 @@ +#pragma once + +#include <ydb/library/yql/providers/dq/actors/yt/resource_manager.h> +#include <ydb/library/yql/providers/dq/task_runner/tasks_runner_pipe.h> + +#include <ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io_factory.h> + +#include <ydb/library/yql/utils/log/proto/logger_config.pb.h> + +#include <yt/cpp/mapreduce/interface/client.h> + +namespace NYql::NDq::NWorker { + struct IWorkerConfigurator + { + virtual ~IWorkerConfigurator() = default; + + virtual void ConfigureMetrics(const THolder<NYql::NProto::TLoggingConfig>& loggerConfig, const THolder<NActors::TActorSystem>& actorSystem, const NProto::TDqConfig::TYtBackend& backendConfig, const TResourceManagerOptions& rmOptions, ui32 nodeId) const = 0; + virtual IDqAsyncIoFactory::TPtr CreateAsyncIoFactory() const = 0; + virtual void OnWorkerFinish() = 0; + }; + + struct TDefaultWorkerConfigurator + : public IWorkerConfigurator + { + void ConfigureMetrics(const THolder<NYql::NProto::TLoggingConfig>& /*loggerConfig*/, const THolder<NActors::TActorSystem>& /*actorSystem*/, const NProto::TDqConfig::TYtBackend& /*backendConfig*/, const TResourceManagerOptions& /*rmOptions*/, ui32 /*nodeId*/) const override; + IDqAsyncIoFactory::TPtr CreateAsyncIoFactory() const override; + void OnWorkerFinish() override; + }; + + class TWorkerJob: public NYT::IVanillaJob<void> { + public: + TWorkerJob(); + + void SetConfigFile(const TString& configFile); + void SetWorkerConfigurator(THolder<IWorkerConfigurator> workerConfigurator); + + void Do() override; + + private: + TString ConfigFile; + THolder<IWorkerConfigurator> WorkerConfigurator; + }; + +} // namespace NYql::NDq::NWorker diff --git a/ydb/library/yql/tools/dq/worker_job/ya.make b/ydb/library/yql/tools/dq/worker_job/ya.make new file mode 100644 index 0000000000..14971a568b --- /dev/null +++ b/ydb/library/yql/tools/dq/worker_job/ya.make @@ -0,0 +1,31 @@ +LIBRARY() + +SRCS( + dq_worker.cpp +) + +PEERDIR( + contrib/libs/protobuf + library/cpp/protobuf/util + yt/cpp/mapreduce/client + yt/cpp/mapreduce/interface + yt/yt/core + ydb/library/yql/dq/actors/spilling + ydb/library/yql/minikql/comp_nodes/llvm14 + ydb/library/yql/providers/common/metrics + ydb/library/yql/providers/dq/runtime + ydb/library/yql/providers/dq/service + ydb/library/yql/providers/dq/stats_collector + ydb/library/yql/providers/dq/task_runner + ydb/library/yql/public/udf/service/terminate_policy + ydb/library/yql/utils + ydb/library/yql/utils/log + ydb/library/yql/utils/log/proto + ydb/library/yql/providers/dq/actors/yt + ydb/library/yql/providers/dq/global_worker_manager + ydb/library/yql/utils/signals +) + +YQL_LAST_ABI_VERSION() + +END() diff --git a/ydb/library/yql/tools/dq/ya.make b/ydb/library/yql/tools/dq/ya.make index 1043bcb327..3bdf652c40 100644 --- a/ydb/library/yql/tools/dq/ya.make +++ b/ydb/library/yql/tools/dq/ya.make @@ -2,4 +2,5 @@ RECURSE( dq_cli service_node worker_node + worker_job ) |