diff options
author | Ilnaz Nizametdinov <ilnaz@ydb.tech> | 2024-01-31 13:39:30 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-01-31 11:39:30 +0100 |
commit | d834612c3a01e331dab514c3edcd849433484102 (patch) | |
tree | 134b15310c01d459b1fc257c1c603f4f071854cf | |
parent | cf6042901fdb1a3921b8b5bc7760e03ea8661113 (diff) | |
download | ydb-d834612c3a01e331dab514c3edcd849433484102.tar.gz |
Revert "Move yql dq job core in os (#1440)" (#1461)
This reverts commit cd27de93a8126f873c11f0b40309b5a3d7339a7b.
-rw-r--r-- | ydb/library/yql/tools/dq/worker_job/dq_worker.cpp | 373 | ||||
-rw-r--r-- | ydb/library/yql/tools/dq/worker_job/dq_worker.h | 44 | ||||
-rw-r--r-- | ydb/library/yql/tools/dq/worker_job/ya.make | 31 | ||||
-rw-r--r-- | ydb/library/yql/tools/dq/ya.make | 1 |
4 files changed, 0 insertions, 449 deletions
diff --git a/ydb/library/yql/tools/dq/worker_job/dq_worker.cpp b/ydb/library/yql/tools/dq/worker_job/dq_worker.cpp deleted file mode 100644 index a47739ac43..0000000000 --- a/ydb/library/yql/tools/dq/worker_job/dq_worker.cpp +++ /dev/null @@ -1,373 +0,0 @@ -#include "dq_worker.h" - -#include <ydb/library/yql/utils/signals/signals.h> -#include <ydb/library/yql/utils/bind_in_range.h> - -#include <ydb/library/yql/providers/dq/stats_collector/pool_stats_collector.h> -#include <ydb/library/yql/providers/dq/actors/yt/nodeid_assigner.h> -#include <ydb/library/yql/providers/dq/actors/dynamic_nameserver.h> -#include <ydb/library/yql/providers/dq/service/interconnect_helpers.h> -#include <ydb/library/yql/providers/dq/global_worker_manager/coordination_helper.h> - -#include <ydb/library/yql/providers/dq/runtime/file_cache.h> -#include <ydb/library/yql/providers/dq/runtime/runtime_data.h> -#include <ydb/library/yql/providers/dq/worker_manager/local_worker_manager.h> - -#include <ydb/library/yql/dq/actors/spilling/spilling_file.h> - -#include <ydb/library/yql/utils/log/log.h> -#include <ydb/library/yql/utils/log/tls_backend.h> -#include <ydb/library/yql/utils/yql_panic.h> -#include <ydb/library/yql/utils/range_walker.h> - -#include <yt/yt/core/actions/invoker.h> -#include <yt/yt/core/concurrency/action_queue.h> -#include <yt/yt/core/concurrency/thread_pool.h> - -#include <library/cpp/protobuf/util/pb_io.h> - -#include <util/system/fs.h> -#include <util/stream/file.h> -#include <util/system/env.h> -#include <util/system/shellcommand.h> - -using namespace NYql::NDqs; - -namespace { - template <typename TMessage> - THolder<TMessage> ParseProtoConfig(const TString& cfgFile) { - auto config = MakeHolder<TMessage>(); - TString configData = TFileInput(cfgFile).ReadAll();; - - using ::google::protobuf::TextFormat; - if (!TextFormat::ParseFromString(configData, config.Get())) { - YQL_LOG(ERROR) << "Bad format of dq_vanilla_job configuration"; - return {}; - } - - return config; - } - - static NThreading::TPromise<void> ShouldContinue = NThreading::NewPromise<void>(); - - static void OnTerminate(int) { - ShouldContinue.SetValue(); - } - - class TSerializedTaskRunnerInvoker: public ITaskRunnerInvoker { - public: - TSerializedTaskRunnerInvoker(const NYT::IInvokerPtr& invoker) - : Invoker(NYT::NConcurrency::CreateSerializedInvoker(invoker)) - { } - - void Invoke(const std::function<void(void)>& f) override { - Invoker->Invoke(BIND(f)); - } - - private: - const NYT::IInvokerPtr Invoker; - }; - - class TConcurrentInvokerFactory: public ITaskRunnerInvokerFactory { - public: - TConcurrentInvokerFactory(int capacity) - : ThreadPool(NYT::NConcurrency::CreateThreadPool(capacity, "WorkerActor")) - { } - - ITaskRunnerInvoker::TPtr Create() override { - return new TSerializedTaskRunnerInvoker(ThreadPool->GetInvoker()); - } - - NYT::NConcurrency::IThreadPoolPtr ThreadPool; - }; - - void ConfigurePorto(const NYql::NProto::TDqConfig::TYtBackend& config, const TString portoCtl) { - TString settings[][2] = { - {"enable_porto", "isolate"}, - {"respawn", "true"} - }; - int nSettings = 2; - { - TShellCommand cmd(portoCtl, {"create", "Outer"}); - cmd.Run().Wait(); - } - for (int i = 0; i < nSettings; i++) { - TShellCommand cmd(portoCtl, {"set", "Outer", settings[i][0], settings[i][1]}); - cmd.Run().Wait(); - } - for (const auto& attr : config.GetPortoSettings().GetSetting()) { - TShellCommand cmd(portoCtl, {"set", "Outer", attr.GetName(), attr.GetValue()}); - cmd.Run().Wait(); - } - { - TShellCommand cmd(portoCtl, {"start", "Outer"}); - cmd.Run().Wait(); - } - { - TShellCommand cmd(portoCtl, {"wait", "Outer"}); - cmd.Run().Wait(); - } - } -} - -namespace NYql::NDq::NWorker { - - void TDefaultWorkerConfigurator::ConfigureMetrics(const THolder<NYql::NProto::TLoggingConfig>& /*loggerConfig*/, const THolder<NActors::TActorSystem>& /*actorSystem*/, const NProto::TDqConfig::TYtBackend& /*backendConfig*/, const TResourceManagerOptions& /*rmOptions*/, ui32 /*nodeId*/) const { - } - - NDq::IDqAsyncIoFactory::TPtr TDefaultWorkerConfigurator::CreateAsyncIoFactory() const { - return MakeIntrusive<NYql::NDq::TDqAsyncIoFactory>(); - } - - void TDefaultWorkerConfigurator::OnWorkerFinish() { - } - - TWorkerJob::TWorkerJob() - : WorkerConfigurator(MakeHolder<TDefaultWorkerConfigurator>(TDefaultWorkerConfigurator())) - { } - - void TWorkerJob::SetConfigFile(const TString& configFile) { - ConfigFile = configFile; - } - - void TWorkerJob::SetWorkerConfigurator(THolder<IWorkerConfigurator> workerConfigurator) { - WorkerConfigurator = std::move(workerConfigurator); - } - - void TWorkerJob::Do() { - - auto loggerConfig = MakeHolder<NYql::NProto::TLoggingConfig>(); - - ui16 startPort = 0; - - auto deterministicMode = !!GetEnv("YQL_DETERMINISTIC_MODE"); - - YQL_ENSURE(TryFromString<ui16>(GetEnv(NCommonJobVars::ACTOR_PORT), startPort), - "Invalid service config port env var empty"); - - ui32 tryNodeId; - YQL_ENSURE(TryFromString<ui32>(GetEnv(NCommonJobVars::ACTOR_NODE_ID, "0"), tryNodeId), - "Invalid nodeId env var"); - - if (!ConfigFile.empty()) { - loggerConfig = ParseProtoConfig<NYql::NProto::TLoggingConfig>(ConfigFile); - - for (auto& logDest : *loggerConfig->MutableLogDest()) { - if (logDest.GetType() == NYql::NProto::TLoggingConfig::FILE) { - TString logFile = logDest.GetTarget() + "." + ToString(tryNodeId); - logDest.SetTarget(logFile); - } - } - - loggerConfig->SetAllComponentsLevel(NYql::NProto::TLoggingConfig::TRACE); - } else { - loggerConfig->SetAllComponentsLevel(NYql::NProto::TLoggingConfig::DEBUG); - } - NYql::NLog::InitLogger(*loggerConfig, false); - InitSignals(); - - TString fileCacheDir = GetEnv(NCommonJobVars::UDFS_PATH); - TString ytCoordinatorStr = GetEnv(TString("YT_SECURE_VAULT_") + NCommonJobVars::YT_COORDINATOR); - - TString ytBackendStr = GetEnv(TString("YT_SECURE_VAULT_") + NCommonJobVars::YT_BACKEND); - - TString operationId = GetEnv("YT_OPERATION_ID"); - TString jobId = GetEnv("YT_JOB_ID"); - - TString operationSize = GetEnv(NCommonJobVars::OPERATION_SIZE); - - NProto::TDqConfig::TYtCoordinator coordinatorConfig; - TStringInput inputStream1(ytCoordinatorStr); - ParseFromTextFormat(inputStream1, coordinatorConfig, EParseFromTextFormatOption::AllowUnknownField); - - NProto::TDqConfig::TYtBackend backendConfig; - TStringInput inputStream2(ytBackendStr); - ParseFromTextFormat(inputStream2, backendConfig, EParseFromTextFormatOption::AllowUnknownField); - - TRangeWalker<int> portWalker(startPort, startPort+100); - auto ports = BindInRange(portWalker); - - auto [host, ip] = NYql::NDqs::GetLocalAddress( - coordinatorConfig.HasHostName() ? &coordinatorConfig.GetHostName() : nullptr - ); - - auto coordinator = CreateCoordiantionHelper(coordinatorConfig, NProto::TDqConfig::TScheduler(), "worker_node", ports[1].Addr.GetPort(), host, ip); - i64 cacheSize = backendConfig.HasCacheSize() - ? backendConfig.GetCacheSize() - : 16000000000L; - TIntrusivePtr<IFileCache> fileCache = new TFileCache(fileCacheDir + "/cache", cacheSize); - NFs::SymLink(fileCacheDir, "file_cache"); // COMPAT - TString layerDir = fileCacheDir + "/layer"; - if (backendConfig.GetPortoLayer().size() > 0) { - NFs::MakeDirectoryRecursive(layerDir + "/mnt/work"); - for (const auto& layerPath : backendConfig.GetPortoLayer()) { - auto pos = layerPath.rfind('/'); - auto archive = layerPath.substr(pos+1); - TShellCommand cmd("tar", {"xf", archive, "-C", layerDir}); - cmd.Run().Wait(); - } - } else { - NFs::MakeDirectoryRecursive("mnt/work"); - NFs::MakeDirectoryRecursive("usr/local/bin"); - } - - int capacity = backendConfig.GetWorkerCapacity() - ? backendConfig.GetWorkerCapacity() - : 1; - - NYql::NTaskRunnerProxy::TPipeFactoryOptions pfOptions; - pfOptions.ExecPath = GetExecPath(); - pfOptions.FileCache = fileCache; - if (deterministicMode) { - YQL_LOG(DEBUG) << "deterministicMode On"; - pfOptions.Env["YQL_DETERMINISTIC_MODE"] = "1"; - } - if (backendConfig.GetEnforceJobUtc()) { - pfOptions.Env["TZ"] = "UTC0"; - } - pfOptions.EnablePorto = backendConfig.GetEnablePorto() == "isolate"; - pfOptions.PortoLayer = backendConfig.GetPortoLayer().size() == 0 ? "" : layerDir; - pfOptions.MaxProcesses = capacity*1.5; - pfOptions.ContainerName = "Outer"; - - TResourceManagerOptions rmOptions; - rmOptions.YtBackend = backendConfig; - rmOptions.FileCache = fileCache; - rmOptions.TmpDir = fileCacheDir + "/tmp"; - - if (NFs::Exists(layerDir + "/usr/bin/portoctl")) { - TString dst = fileCache->GetDir() + "/portoctl"; - NFs::Copy(layerDir + "/usr/bin/portoctl", dst); - NFs::HardLink(layerDir + "/usr/bin/portoctl", layerDir + "/usr/sbin/portoctl"); // workaround PORTO-997 - chmod(dst.c_str(), 0755); - pfOptions.PortoCtlPath = dst; - rmOptions.DieOnFileAbsence = dst; // die on file absence - } - - Cerr << host + ":" + ip << Endl; - - THashMap<TString, TString> attributes; - attributes[NCommonAttrs::OPERATIONID_ATTR] = operationId; - attributes[NCommonAttrs::OPERATIONSIZE_ATTR] = operationSize; - attributes[NCommonAttrs::JOBID_ATTR] = jobId; - attributes[NCommonAttrs::CLUSTERNAME_ATTR] = backendConfig.GetClusterName(); - - auto nodeIdOpt = (tryNodeId == 0) - ? TMaybe<ui32>() - : TMaybe<ui32>(tryNodeId); - auto nodeId = coordinator->GetNodeId( - nodeIdOpt, - {}, - static_cast<ui32>(NDqs::ENodeIdLimits::MinWorkerNodeId), - static_cast<ui32>(NDqs::ENodeIdLimits::MaxWorkerNodeId), - attributes); - - Y_ABORT_UNLESS( - static_cast<ui32>(NDqs::ENodeIdLimits::MinWorkerNodeId) <= nodeId && - nodeId < static_cast<ui32>(NDqs::ENodeIdLimits::MaxWorkerNodeId)); - - Cerr << "My nodeId: " << nodeId << Endl; - - Cerr << "Configure porto" << Endl; - if (backendConfig.GetEnablePorto() == "isolate") { - ConfigurePorto(backendConfig, pfOptions.PortoCtlPath); - } - Cerr << "Configure porto done" << Endl; - - auto dqSensors = GetSensorsGroupFor(NSensorComponent::kDq); - THolder<NActors::TActorSystemSetup> setup; - TIntrusivePtr<NActors::NLog::TSettings> logSettings; - std::tie(setup, logSettings) = BuildActorSetup( - nodeId, - ip, - ports[1].Addr.GetPort(), - ports[1].Socket->Release(), - {}, - dqSensors, - [](const TIntrusivePtr<NActors::TTableNameserverSetup>& setup) { - return NYql::NDqs::CreateDynamicNameserver(setup); - }, - Nothing(), - backendConfig.GetICSettings()); - - auto statsCollector = CreateStatsCollector(5, *setup.Get(), dqSensors); - - auto actorSystem = MakeHolder<NActors::TActorSystem>(setup, nullptr, logSettings); - - actorSystem->Start(); - - actorSystem->Register(statsCollector); - - TVector<TString> hostPortPairs; - for (auto hostPortPair : coordinatorConfig.GetServiceNodeHostPort()) { - hostPortPairs.emplace_back(hostPortPair); - // tests - if (hostPortPair.StartsWith("localhost")) { - rmOptions.ExitOnPingFail = true; - } - } - - WorkerConfigurator->ConfigureMetrics(loggerConfig, actorSystem, backendConfig, rmOptions, nodeId); - - // rmOptions.MetricsRegistry = CreateMetricsRegistry(dqSensors); // send metrics to gwm, unsupported - auto resolver = coordinator->CreateServiceNodeResolver(actorSystem.Get(), hostPortPairs); - actorSystem->Register(coordinator->CreateServiceNodePinger(resolver, rmOptions, attributes)); - - NLog::YqlLogger().UpdateProcInfo(jobId + "/" + GetGuidAsString(coordinator->GetRuntimeData()->WorkerId)); - - // For testing only - THashMap<TString, TString> clusterMapping; - clusterMapping["plato"] = backendConfig.GetClusterName(); - - auto proxyFactory = NTaskRunnerProxy::CreatePipeFactory(pfOptions); - ITaskRunnerInvokerFactory::TPtr invokerFactory = new TConcurrentInvokerFactory(2*capacity); - auto taskRunnerActorFactory = NTaskRunnerActor::CreateTaskRunnerActorFactory(proxyFactory, invokerFactory, nullptr, coordinator->GetRuntimeData()); - - TLocalWorkerManagerOptions lwmOptions; - lwmOptions.Factory = proxyFactory; - lwmOptions.TaskRunnerActorFactory = taskRunnerActorFactory; - lwmOptions.AsyncIoFactory = WorkerConfigurator->CreateAsyncIoFactory(); - lwmOptions.RuntimeData = coordinator->GetRuntimeData(); - lwmOptions.TaskRunnerInvokerFactory = invokerFactory; - lwmOptions.ClusterNamesMapping = clusterMapping; - lwmOptions.ComputeActorOwnsCounters = true; - - auto resman = NDqs::CreateLocalWorkerManager(lwmOptions); - - auto workerManagerActorId = actorSystem->Register(resman); - actorSystem->RegisterLocalService(MakeWorkerManagerActorID(nodeId), workerManagerActorId); - - if (backendConfig.HasSpillingSettings()) { - auto spilling = NDq::CreateDqLocalFileSpillingService( - NDq::TFileSpillingServiceConfig { - .Root = backendConfig.GetSpillingSettings().GetRoot(), - .MaxTotalSize = backendConfig.GetSpillingSettings().GetMaxTotalSize(), - .MaxFileSize = backendConfig.GetSpillingSettings().GetMaxFileSize(), - .MaxFilePartSize = backendConfig.GetSpillingSettings().GetMaxFilePartSize(), - .IoThreadPoolWorkersCount = backendConfig.GetSpillingSettings().GetIoThreadPoolWorkersCount(), - .IoThreadPoolQueueSize = backendConfig.GetSpillingSettings().GetIoThreadPoolQueueSize(), - .CleanupOnShutdown = backendConfig.GetSpillingSettings().GetCleanupOnShutdown() - }, - MakeIntrusive<NDq::TSpillingCounters>(dqSensors) - ); - auto spillingActor = actorSystem->Register(spilling); - actorSystem->RegisterLocalService(NDq::MakeDqLocalFileSpillingServiceID(nodeId), spillingActor); - } - - auto endFuture = ShouldContinue.GetFuture(); - - signal(SIGINT, &OnTerminate); - signal(SIGTERM, &OnTerminate); - signal(SIGPIPE, SIG_IGN); - - // run forever - - endFuture.Wait(); - WorkerConfigurator->OnWorkerFinish(); - actorSystem->Stop(); - dqSensors->OutputHtml(Cerr); - } - - REGISTER_VANILLA_JOB(TWorkerJob); - -} // namespace NYql::NDq::NWorker diff --git a/ydb/library/yql/tools/dq/worker_job/dq_worker.h b/ydb/library/yql/tools/dq/worker_job/dq_worker.h deleted file mode 100644 index f37895c6dc..0000000000 --- a/ydb/library/yql/tools/dq/worker_job/dq_worker.h +++ /dev/null @@ -1,44 +0,0 @@ -#pragma once - -#include <ydb/library/yql/providers/dq/actors/yt/resource_manager.h> -#include <ydb/library/yql/providers/dq/task_runner/tasks_runner_pipe.h> - -#include <ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io_factory.h> - -#include <ydb/library/yql/utils/log/proto/logger_config.pb.h> - -#include <yt/cpp/mapreduce/interface/client.h> - -namespace NYql::NDq::NWorker { - struct IWorkerConfigurator - { - virtual ~IWorkerConfigurator() = default; - - virtual void ConfigureMetrics(const THolder<NYql::NProto::TLoggingConfig>& loggerConfig, const THolder<NActors::TActorSystem>& actorSystem, const NProto::TDqConfig::TYtBackend& backendConfig, const TResourceManagerOptions& rmOptions, ui32 nodeId) const = 0; - virtual IDqAsyncIoFactory::TPtr CreateAsyncIoFactory() const = 0; - virtual void OnWorkerFinish() = 0; - }; - - struct TDefaultWorkerConfigurator - : public IWorkerConfigurator - { - void ConfigureMetrics(const THolder<NYql::NProto::TLoggingConfig>& /*loggerConfig*/, const THolder<NActors::TActorSystem>& /*actorSystem*/, const NProto::TDqConfig::TYtBackend& /*backendConfig*/, const TResourceManagerOptions& /*rmOptions*/, ui32 /*nodeId*/) const override; - IDqAsyncIoFactory::TPtr CreateAsyncIoFactory() const override; - void OnWorkerFinish() override; - }; - - class TWorkerJob: public NYT::IVanillaJob<void> { - public: - TWorkerJob(); - - void SetConfigFile(const TString& configFile); - void SetWorkerConfigurator(THolder<IWorkerConfigurator> workerConfigurator); - - void Do() override; - - private: - TString ConfigFile; - THolder<IWorkerConfigurator> WorkerConfigurator; - }; - -} // namespace NYql::NDq::NWorker diff --git a/ydb/library/yql/tools/dq/worker_job/ya.make b/ydb/library/yql/tools/dq/worker_job/ya.make deleted file mode 100644 index 14971a568b..0000000000 --- a/ydb/library/yql/tools/dq/worker_job/ya.make +++ /dev/null @@ -1,31 +0,0 @@ -LIBRARY() - -SRCS( - dq_worker.cpp -) - -PEERDIR( - contrib/libs/protobuf - library/cpp/protobuf/util - yt/cpp/mapreduce/client - yt/cpp/mapreduce/interface - yt/yt/core - ydb/library/yql/dq/actors/spilling - ydb/library/yql/minikql/comp_nodes/llvm14 - ydb/library/yql/providers/common/metrics - ydb/library/yql/providers/dq/runtime - ydb/library/yql/providers/dq/service - ydb/library/yql/providers/dq/stats_collector - ydb/library/yql/providers/dq/task_runner - ydb/library/yql/public/udf/service/terminate_policy - ydb/library/yql/utils - ydb/library/yql/utils/log - ydb/library/yql/utils/log/proto - ydb/library/yql/providers/dq/actors/yt - ydb/library/yql/providers/dq/global_worker_manager - ydb/library/yql/utils/signals -) - -YQL_LAST_ABI_VERSION() - -END() diff --git a/ydb/library/yql/tools/dq/ya.make b/ydb/library/yql/tools/dq/ya.make index 3bdf652c40..1043bcb327 100644 --- a/ydb/library/yql/tools/dq/ya.make +++ b/ydb/library/yql/tools/dq/ya.make @@ -2,5 +2,4 @@ RECURSE( dq_cli service_node worker_node - worker_job ) |