aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorIlnaz Nizametdinov <ilnaz@ydb.tech>2024-01-31 13:39:30 +0300
committerGitHub <noreply@github.com>2024-01-31 11:39:30 +0100
commitd834612c3a01e331dab514c3edcd849433484102 (patch)
tree134b15310c01d459b1fc257c1c603f4f071854cf
parentcf6042901fdb1a3921b8b5bc7760e03ea8661113 (diff)
downloadydb-d834612c3a01e331dab514c3edcd849433484102.tar.gz
Revert "Move yql dq job core in os (#1440)" (#1461)
This reverts commit cd27de93a8126f873c11f0b40309b5a3d7339a7b.
-rw-r--r--ydb/library/yql/tools/dq/worker_job/dq_worker.cpp373
-rw-r--r--ydb/library/yql/tools/dq/worker_job/dq_worker.h44
-rw-r--r--ydb/library/yql/tools/dq/worker_job/ya.make31
-rw-r--r--ydb/library/yql/tools/dq/ya.make1
4 files changed, 0 insertions, 449 deletions
diff --git a/ydb/library/yql/tools/dq/worker_job/dq_worker.cpp b/ydb/library/yql/tools/dq/worker_job/dq_worker.cpp
deleted file mode 100644
index a47739ac43..0000000000
--- a/ydb/library/yql/tools/dq/worker_job/dq_worker.cpp
+++ /dev/null
@@ -1,373 +0,0 @@
-#include "dq_worker.h"
-
-#include <ydb/library/yql/utils/signals/signals.h>
-#include <ydb/library/yql/utils/bind_in_range.h>
-
-#include <ydb/library/yql/providers/dq/stats_collector/pool_stats_collector.h>
-#include <ydb/library/yql/providers/dq/actors/yt/nodeid_assigner.h>
-#include <ydb/library/yql/providers/dq/actors/dynamic_nameserver.h>
-#include <ydb/library/yql/providers/dq/service/interconnect_helpers.h>
-#include <ydb/library/yql/providers/dq/global_worker_manager/coordination_helper.h>
-
-#include <ydb/library/yql/providers/dq/runtime/file_cache.h>
-#include <ydb/library/yql/providers/dq/runtime/runtime_data.h>
-#include <ydb/library/yql/providers/dq/worker_manager/local_worker_manager.h>
-
-#include <ydb/library/yql/dq/actors/spilling/spilling_file.h>
-
-#include <ydb/library/yql/utils/log/log.h>
-#include <ydb/library/yql/utils/log/tls_backend.h>
-#include <ydb/library/yql/utils/yql_panic.h>
-#include <ydb/library/yql/utils/range_walker.h>
-
-#include <yt/yt/core/actions/invoker.h>
-#include <yt/yt/core/concurrency/action_queue.h>
-#include <yt/yt/core/concurrency/thread_pool.h>
-
-#include <library/cpp/protobuf/util/pb_io.h>
-
-#include <util/system/fs.h>
-#include <util/stream/file.h>
-#include <util/system/env.h>
-#include <util/system/shellcommand.h>
-
-using namespace NYql::NDqs;
-
-namespace {
- template <typename TMessage>
- THolder<TMessage> ParseProtoConfig(const TString& cfgFile) {
- auto config = MakeHolder<TMessage>();
- TString configData = TFileInput(cfgFile).ReadAll();;
-
- using ::google::protobuf::TextFormat;
- if (!TextFormat::ParseFromString(configData, config.Get())) {
- YQL_LOG(ERROR) << "Bad format of dq_vanilla_job configuration";
- return {};
- }
-
- return config;
- }
-
- static NThreading::TPromise<void> ShouldContinue = NThreading::NewPromise<void>();
-
- static void OnTerminate(int) {
- ShouldContinue.SetValue();
- }
-
- class TSerializedTaskRunnerInvoker: public ITaskRunnerInvoker {
- public:
- TSerializedTaskRunnerInvoker(const NYT::IInvokerPtr& invoker)
- : Invoker(NYT::NConcurrency::CreateSerializedInvoker(invoker))
- { }
-
- void Invoke(const std::function<void(void)>& f) override {
- Invoker->Invoke(BIND(f));
- }
-
- private:
- const NYT::IInvokerPtr Invoker;
- };
-
- class TConcurrentInvokerFactory: public ITaskRunnerInvokerFactory {
- public:
- TConcurrentInvokerFactory(int capacity)
- : ThreadPool(NYT::NConcurrency::CreateThreadPool(capacity, "WorkerActor"))
- { }
-
- ITaskRunnerInvoker::TPtr Create() override {
- return new TSerializedTaskRunnerInvoker(ThreadPool->GetInvoker());
- }
-
- NYT::NConcurrency::IThreadPoolPtr ThreadPool;
- };
-
- void ConfigurePorto(const NYql::NProto::TDqConfig::TYtBackend& config, const TString portoCtl) {
- TString settings[][2] = {
- {"enable_porto", "isolate"},
- {"respawn", "true"}
- };
- int nSettings = 2;
- {
- TShellCommand cmd(portoCtl, {"create", "Outer"});
- cmd.Run().Wait();
- }
- for (int i = 0; i < nSettings; i++) {
- TShellCommand cmd(portoCtl, {"set", "Outer", settings[i][0], settings[i][1]});
- cmd.Run().Wait();
- }
- for (const auto& attr : config.GetPortoSettings().GetSetting()) {
- TShellCommand cmd(portoCtl, {"set", "Outer", attr.GetName(), attr.GetValue()});
- cmd.Run().Wait();
- }
- {
- TShellCommand cmd(portoCtl, {"start", "Outer"});
- cmd.Run().Wait();
- }
- {
- TShellCommand cmd(portoCtl, {"wait", "Outer"});
- cmd.Run().Wait();
- }
- }
-}
-
-namespace NYql::NDq::NWorker {
-
- void TDefaultWorkerConfigurator::ConfigureMetrics(const THolder<NYql::NProto::TLoggingConfig>& /*loggerConfig*/, const THolder<NActors::TActorSystem>& /*actorSystem*/, const NProto::TDqConfig::TYtBackend& /*backendConfig*/, const TResourceManagerOptions& /*rmOptions*/, ui32 /*nodeId*/) const {
- }
-
- NDq::IDqAsyncIoFactory::TPtr TDefaultWorkerConfigurator::CreateAsyncIoFactory() const {
- return MakeIntrusive<NYql::NDq::TDqAsyncIoFactory>();
- }
-
- void TDefaultWorkerConfigurator::OnWorkerFinish() {
- }
-
- TWorkerJob::TWorkerJob()
- : WorkerConfigurator(MakeHolder<TDefaultWorkerConfigurator>(TDefaultWorkerConfigurator()))
- { }
-
- void TWorkerJob::SetConfigFile(const TString& configFile) {
- ConfigFile = configFile;
- }
-
- void TWorkerJob::SetWorkerConfigurator(THolder<IWorkerConfigurator> workerConfigurator) {
- WorkerConfigurator = std::move(workerConfigurator);
- }
-
- void TWorkerJob::Do() {
-
- auto loggerConfig = MakeHolder<NYql::NProto::TLoggingConfig>();
-
- ui16 startPort = 0;
-
- auto deterministicMode = !!GetEnv("YQL_DETERMINISTIC_MODE");
-
- YQL_ENSURE(TryFromString<ui16>(GetEnv(NCommonJobVars::ACTOR_PORT), startPort),
- "Invalid service config port env var empty");
-
- ui32 tryNodeId;
- YQL_ENSURE(TryFromString<ui32>(GetEnv(NCommonJobVars::ACTOR_NODE_ID, "0"), tryNodeId),
- "Invalid nodeId env var");
-
- if (!ConfigFile.empty()) {
- loggerConfig = ParseProtoConfig<NYql::NProto::TLoggingConfig>(ConfigFile);
-
- for (auto& logDest : *loggerConfig->MutableLogDest()) {
- if (logDest.GetType() == NYql::NProto::TLoggingConfig::FILE) {
- TString logFile = logDest.GetTarget() + "." + ToString(tryNodeId);
- logDest.SetTarget(logFile);
- }
- }
-
- loggerConfig->SetAllComponentsLevel(NYql::NProto::TLoggingConfig::TRACE);
- } else {
- loggerConfig->SetAllComponentsLevel(NYql::NProto::TLoggingConfig::DEBUG);
- }
- NYql::NLog::InitLogger(*loggerConfig, false);
- InitSignals();
-
- TString fileCacheDir = GetEnv(NCommonJobVars::UDFS_PATH);
- TString ytCoordinatorStr = GetEnv(TString("YT_SECURE_VAULT_") + NCommonJobVars::YT_COORDINATOR);
-
- TString ytBackendStr = GetEnv(TString("YT_SECURE_VAULT_") + NCommonJobVars::YT_BACKEND);
-
- TString operationId = GetEnv("YT_OPERATION_ID");
- TString jobId = GetEnv("YT_JOB_ID");
-
- TString operationSize = GetEnv(NCommonJobVars::OPERATION_SIZE);
-
- NProto::TDqConfig::TYtCoordinator coordinatorConfig;
- TStringInput inputStream1(ytCoordinatorStr);
- ParseFromTextFormat(inputStream1, coordinatorConfig, EParseFromTextFormatOption::AllowUnknownField);
-
- NProto::TDqConfig::TYtBackend backendConfig;
- TStringInput inputStream2(ytBackendStr);
- ParseFromTextFormat(inputStream2, backendConfig, EParseFromTextFormatOption::AllowUnknownField);
-
- TRangeWalker<int> portWalker(startPort, startPort+100);
- auto ports = BindInRange(portWalker);
-
- auto [host, ip] = NYql::NDqs::GetLocalAddress(
- coordinatorConfig.HasHostName() ? &coordinatorConfig.GetHostName() : nullptr
- );
-
- auto coordinator = CreateCoordiantionHelper(coordinatorConfig, NProto::TDqConfig::TScheduler(), "worker_node", ports[1].Addr.GetPort(), host, ip);
- i64 cacheSize = backendConfig.HasCacheSize()
- ? backendConfig.GetCacheSize()
- : 16000000000L;
- TIntrusivePtr<IFileCache> fileCache = new TFileCache(fileCacheDir + "/cache", cacheSize);
- NFs::SymLink(fileCacheDir, "file_cache"); // COMPAT
- TString layerDir = fileCacheDir + "/layer";
- if (backendConfig.GetPortoLayer().size() > 0) {
- NFs::MakeDirectoryRecursive(layerDir + "/mnt/work");
- for (const auto& layerPath : backendConfig.GetPortoLayer()) {
- auto pos = layerPath.rfind('/');
- auto archive = layerPath.substr(pos+1);
- TShellCommand cmd("tar", {"xf", archive, "-C", layerDir});
- cmd.Run().Wait();
- }
- } else {
- NFs::MakeDirectoryRecursive("mnt/work");
- NFs::MakeDirectoryRecursive("usr/local/bin");
- }
-
- int capacity = backendConfig.GetWorkerCapacity()
- ? backendConfig.GetWorkerCapacity()
- : 1;
-
- NYql::NTaskRunnerProxy::TPipeFactoryOptions pfOptions;
- pfOptions.ExecPath = GetExecPath();
- pfOptions.FileCache = fileCache;
- if (deterministicMode) {
- YQL_LOG(DEBUG) << "deterministicMode On";
- pfOptions.Env["YQL_DETERMINISTIC_MODE"] = "1";
- }
- if (backendConfig.GetEnforceJobUtc()) {
- pfOptions.Env["TZ"] = "UTC0";
- }
- pfOptions.EnablePorto = backendConfig.GetEnablePorto() == "isolate";
- pfOptions.PortoLayer = backendConfig.GetPortoLayer().size() == 0 ? "" : layerDir;
- pfOptions.MaxProcesses = capacity*1.5;
- pfOptions.ContainerName = "Outer";
-
- TResourceManagerOptions rmOptions;
- rmOptions.YtBackend = backendConfig;
- rmOptions.FileCache = fileCache;
- rmOptions.TmpDir = fileCacheDir + "/tmp";
-
- if (NFs::Exists(layerDir + "/usr/bin/portoctl")) {
- TString dst = fileCache->GetDir() + "/portoctl";
- NFs::Copy(layerDir + "/usr/bin/portoctl", dst);
- NFs::HardLink(layerDir + "/usr/bin/portoctl", layerDir + "/usr/sbin/portoctl"); // workaround PORTO-997
- chmod(dst.c_str(), 0755);
- pfOptions.PortoCtlPath = dst;
- rmOptions.DieOnFileAbsence = dst; // die on file absence
- }
-
- Cerr << host + ":" + ip << Endl;
-
- THashMap<TString, TString> attributes;
- attributes[NCommonAttrs::OPERATIONID_ATTR] = operationId;
- attributes[NCommonAttrs::OPERATIONSIZE_ATTR] = operationSize;
- attributes[NCommonAttrs::JOBID_ATTR] = jobId;
- attributes[NCommonAttrs::CLUSTERNAME_ATTR] = backendConfig.GetClusterName();
-
- auto nodeIdOpt = (tryNodeId == 0)
- ? TMaybe<ui32>()
- : TMaybe<ui32>(tryNodeId);
- auto nodeId = coordinator->GetNodeId(
- nodeIdOpt,
- {},
- static_cast<ui32>(NDqs::ENodeIdLimits::MinWorkerNodeId),
- static_cast<ui32>(NDqs::ENodeIdLimits::MaxWorkerNodeId),
- attributes);
-
- Y_ABORT_UNLESS(
- static_cast<ui32>(NDqs::ENodeIdLimits::MinWorkerNodeId) <= nodeId &&
- nodeId < static_cast<ui32>(NDqs::ENodeIdLimits::MaxWorkerNodeId));
-
- Cerr << "My nodeId: " << nodeId << Endl;
-
- Cerr << "Configure porto" << Endl;
- if (backendConfig.GetEnablePorto() == "isolate") {
- ConfigurePorto(backendConfig, pfOptions.PortoCtlPath);
- }
- Cerr << "Configure porto done" << Endl;
-
- auto dqSensors = GetSensorsGroupFor(NSensorComponent::kDq);
- THolder<NActors::TActorSystemSetup> setup;
- TIntrusivePtr<NActors::NLog::TSettings> logSettings;
- std::tie(setup, logSettings) = BuildActorSetup(
- nodeId,
- ip,
- ports[1].Addr.GetPort(),
- ports[1].Socket->Release(),
- {},
- dqSensors,
- [](const TIntrusivePtr<NActors::TTableNameserverSetup>& setup) {
- return NYql::NDqs::CreateDynamicNameserver(setup);
- },
- Nothing(),
- backendConfig.GetICSettings());
-
- auto statsCollector = CreateStatsCollector(5, *setup.Get(), dqSensors);
-
- auto actorSystem = MakeHolder<NActors::TActorSystem>(setup, nullptr, logSettings);
-
- actorSystem->Start();
-
- actorSystem->Register(statsCollector);
-
- TVector<TString> hostPortPairs;
- for (auto hostPortPair : coordinatorConfig.GetServiceNodeHostPort()) {
- hostPortPairs.emplace_back(hostPortPair);
- // tests
- if (hostPortPair.StartsWith("localhost")) {
- rmOptions.ExitOnPingFail = true;
- }
- }
-
- WorkerConfigurator->ConfigureMetrics(loggerConfig, actorSystem, backendConfig, rmOptions, nodeId);
-
- // rmOptions.MetricsRegistry = CreateMetricsRegistry(dqSensors); // send metrics to gwm, unsupported
- auto resolver = coordinator->CreateServiceNodeResolver(actorSystem.Get(), hostPortPairs);
- actorSystem->Register(coordinator->CreateServiceNodePinger(resolver, rmOptions, attributes));
-
- NLog::YqlLogger().UpdateProcInfo(jobId + "/" + GetGuidAsString(coordinator->GetRuntimeData()->WorkerId));
-
- // For testing only
- THashMap<TString, TString> clusterMapping;
- clusterMapping["plato"] = backendConfig.GetClusterName();
-
- auto proxyFactory = NTaskRunnerProxy::CreatePipeFactory(pfOptions);
- ITaskRunnerInvokerFactory::TPtr invokerFactory = new TConcurrentInvokerFactory(2*capacity);
- auto taskRunnerActorFactory = NTaskRunnerActor::CreateTaskRunnerActorFactory(proxyFactory, invokerFactory, nullptr, coordinator->GetRuntimeData());
-
- TLocalWorkerManagerOptions lwmOptions;
- lwmOptions.Factory = proxyFactory;
- lwmOptions.TaskRunnerActorFactory = taskRunnerActorFactory;
- lwmOptions.AsyncIoFactory = WorkerConfigurator->CreateAsyncIoFactory();
- lwmOptions.RuntimeData = coordinator->GetRuntimeData();
- lwmOptions.TaskRunnerInvokerFactory = invokerFactory;
- lwmOptions.ClusterNamesMapping = clusterMapping;
- lwmOptions.ComputeActorOwnsCounters = true;
-
- auto resman = NDqs::CreateLocalWorkerManager(lwmOptions);
-
- auto workerManagerActorId = actorSystem->Register(resman);
- actorSystem->RegisterLocalService(MakeWorkerManagerActorID(nodeId), workerManagerActorId);
-
- if (backendConfig.HasSpillingSettings()) {
- auto spilling = NDq::CreateDqLocalFileSpillingService(
- NDq::TFileSpillingServiceConfig {
- .Root = backendConfig.GetSpillingSettings().GetRoot(),
- .MaxTotalSize = backendConfig.GetSpillingSettings().GetMaxTotalSize(),
- .MaxFileSize = backendConfig.GetSpillingSettings().GetMaxFileSize(),
- .MaxFilePartSize = backendConfig.GetSpillingSettings().GetMaxFilePartSize(),
- .IoThreadPoolWorkersCount = backendConfig.GetSpillingSettings().GetIoThreadPoolWorkersCount(),
- .IoThreadPoolQueueSize = backendConfig.GetSpillingSettings().GetIoThreadPoolQueueSize(),
- .CleanupOnShutdown = backendConfig.GetSpillingSettings().GetCleanupOnShutdown()
- },
- MakeIntrusive<NDq::TSpillingCounters>(dqSensors)
- );
- auto spillingActor = actorSystem->Register(spilling);
- actorSystem->RegisterLocalService(NDq::MakeDqLocalFileSpillingServiceID(nodeId), spillingActor);
- }
-
- auto endFuture = ShouldContinue.GetFuture();
-
- signal(SIGINT, &OnTerminate);
- signal(SIGTERM, &OnTerminate);
- signal(SIGPIPE, SIG_IGN);
-
- // run forever
-
- endFuture.Wait();
- WorkerConfigurator->OnWorkerFinish();
- actorSystem->Stop();
- dqSensors->OutputHtml(Cerr);
- }
-
- REGISTER_VANILLA_JOB(TWorkerJob);
-
-} // namespace NYql::NDq::NWorker
diff --git a/ydb/library/yql/tools/dq/worker_job/dq_worker.h b/ydb/library/yql/tools/dq/worker_job/dq_worker.h
deleted file mode 100644
index f37895c6dc..0000000000
--- a/ydb/library/yql/tools/dq/worker_job/dq_worker.h
+++ /dev/null
@@ -1,44 +0,0 @@
-#pragma once
-
-#include <ydb/library/yql/providers/dq/actors/yt/resource_manager.h>
-#include <ydb/library/yql/providers/dq/task_runner/tasks_runner_pipe.h>
-
-#include <ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io_factory.h>
-
-#include <ydb/library/yql/utils/log/proto/logger_config.pb.h>
-
-#include <yt/cpp/mapreduce/interface/client.h>
-
-namespace NYql::NDq::NWorker {
- struct IWorkerConfigurator
- {
- virtual ~IWorkerConfigurator() = default;
-
- virtual void ConfigureMetrics(const THolder<NYql::NProto::TLoggingConfig>& loggerConfig, const THolder<NActors::TActorSystem>& actorSystem, const NProto::TDqConfig::TYtBackend& backendConfig, const TResourceManagerOptions& rmOptions, ui32 nodeId) const = 0;
- virtual IDqAsyncIoFactory::TPtr CreateAsyncIoFactory() const = 0;
- virtual void OnWorkerFinish() = 0;
- };
-
- struct TDefaultWorkerConfigurator
- : public IWorkerConfigurator
- {
- void ConfigureMetrics(const THolder<NYql::NProto::TLoggingConfig>& /*loggerConfig*/, const THolder<NActors::TActorSystem>& /*actorSystem*/, const NProto::TDqConfig::TYtBackend& /*backendConfig*/, const TResourceManagerOptions& /*rmOptions*/, ui32 /*nodeId*/) const override;
- IDqAsyncIoFactory::TPtr CreateAsyncIoFactory() const override;
- void OnWorkerFinish() override;
- };
-
- class TWorkerJob: public NYT::IVanillaJob<void> {
- public:
- TWorkerJob();
-
- void SetConfigFile(const TString& configFile);
- void SetWorkerConfigurator(THolder<IWorkerConfigurator> workerConfigurator);
-
- void Do() override;
-
- private:
- TString ConfigFile;
- THolder<IWorkerConfigurator> WorkerConfigurator;
- };
-
-} // namespace NYql::NDq::NWorker
diff --git a/ydb/library/yql/tools/dq/worker_job/ya.make b/ydb/library/yql/tools/dq/worker_job/ya.make
deleted file mode 100644
index 14971a568b..0000000000
--- a/ydb/library/yql/tools/dq/worker_job/ya.make
+++ /dev/null
@@ -1,31 +0,0 @@
-LIBRARY()
-
-SRCS(
- dq_worker.cpp
-)
-
-PEERDIR(
- contrib/libs/protobuf
- library/cpp/protobuf/util
- yt/cpp/mapreduce/client
- yt/cpp/mapreduce/interface
- yt/yt/core
- ydb/library/yql/dq/actors/spilling
- ydb/library/yql/minikql/comp_nodes/llvm14
- ydb/library/yql/providers/common/metrics
- ydb/library/yql/providers/dq/runtime
- ydb/library/yql/providers/dq/service
- ydb/library/yql/providers/dq/stats_collector
- ydb/library/yql/providers/dq/task_runner
- ydb/library/yql/public/udf/service/terminate_policy
- ydb/library/yql/utils
- ydb/library/yql/utils/log
- ydb/library/yql/utils/log/proto
- ydb/library/yql/providers/dq/actors/yt
- ydb/library/yql/providers/dq/global_worker_manager
- ydb/library/yql/utils/signals
-)
-
-YQL_LAST_ABI_VERSION()
-
-END()
diff --git a/ydb/library/yql/tools/dq/ya.make b/ydb/library/yql/tools/dq/ya.make
index 3bdf652c40..1043bcb327 100644
--- a/ydb/library/yql/tools/dq/ya.make
+++ b/ydb/library/yql/tools/dq/ya.make
@@ -2,5 +2,4 @@ RECURSE(
dq_cli
service_node
worker_node
- worker_job
)