diff options
author | Marina Pereskokova <35163152+Krisha11@users.noreply.github.com> | 2024-02-01 23:05:21 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-02-01 21:05:21 +0100 |
commit | a67787558ac061e4c62feb2ebfde87b51b376856 (patch) | |
tree | 6f68aa14b91157cbd48c5d964762b6e0d2c89852 | |
parent | 15d1fa83ceda46e25089822d9792988defb20c60 (diff) | |
download | ydb-a67787558ac061e4c62feb2ebfde87b51b376856.tar.gz |
Init yt dq jobs (#1460)
-rw-r--r-- | ydb/library/yql/providers/dq/actors/yt/resource_manager.h | 3 | ||||
-rw-r--r-- | ydb/library/yql/providers/dq/actors/yt/yt_resource_manager.cpp | 2 | ||||
-rw-r--r-- | ydb/library/yql/tools/dq/worker_job/dq_worker.cpp | 20 | ||||
-rw-r--r-- | ydb/library/yql/yt/dq_vanilla_job.lite/main.cpp | 44 | ||||
-rw-r--r-- | ydb/library/yql/yt/dq_vanilla_job.lite/ya.make | 30 | ||||
-rw-r--r-- | ydb/library/yql/yt/dq_vanilla_job/main.cpp | 65 | ||||
-rw-r--r-- | ydb/library/yql/yt/dq_vanilla_job/ya.make | 26 | ||||
-rw-r--r-- | ydb/library/yql/yt/ya.make | 2 |
8 files changed, 188 insertions, 4 deletions
diff --git a/ydb/library/yql/providers/dq/actors/yt/resource_manager.h b/ydb/library/yql/providers/dq/actors/yt/resource_manager.h index 950d2e35567..8a2760081ad 100644 --- a/ydb/library/yql/providers/dq/actors/yt/resource_manager.h +++ b/ydb/library/yql/providers/dq/actors/yt/resource_manager.h @@ -19,6 +19,7 @@ namespace NYql { extern const TString OPERATION_SIZE; extern const TString YT_COORDINATOR; extern const TString YT_BACKEND; + extern const TString YT_FORCE_IPV4; } class ICoordinationHelper; @@ -68,6 +69,8 @@ namespace NYql { int Capabilities = 0; int MaxRetries = -1; + bool ForceIPv4 = false; + // Pinger TString DieOnFileAbsence; // see YQL-14099 diff --git a/ydb/library/yql/providers/dq/actors/yt/yt_resource_manager.cpp b/ydb/library/yql/providers/dq/actors/yt/yt_resource_manager.cpp index 37e401aa90b..cc254f9b964 100644 --- a/ydb/library/yql/providers/dq/actors/yt/yt_resource_manager.cpp +++ b/ydb/library/yql/providers/dq/actors/yt/yt_resource_manager.cpp @@ -33,6 +33,7 @@ namespace NYql { const TString OPERATION_SIZE("OPERATION_SIZE"); const TString YT_COORDINATOR("YT_COORDINATOR"); const TString YT_BACKEND("YT_BACKEND"); + const TString YT_FORCE_IPV4("YT_FORCE_IPV4"); } using namespace NActors; @@ -600,6 +601,7 @@ namespace NYql { .BeginMap() .Item(NCommonJobVars::YT_COORDINATOR).Value(coordinatorStr) .Item(NCommonJobVars::YT_BACKEND).Value(backendStr) + .Item(NCommonJobVars::YT_FORCE_IPV4).Value(Options.ForceIPv4) .DoFor(Options.YtBackend.GetVaultEnv(), [&] (NYT::TFluentMap fluent, const NYql::NProto::TDqConfig::TAttr& envVar) { // Добавляем env variables TString tokenValue; try { diff --git a/ydb/library/yql/tools/dq/worker_job/dq_worker.cpp b/ydb/library/yql/tools/dq/worker_job/dq_worker.cpp index ea3ebe2a447..b6b799c8240 100644 --- a/ydb/library/yql/tools/dq/worker_job/dq_worker.cpp +++ b/ydb/library/yql/tools/dq/worker_job/dq_worker.cpp @@ -23,6 +23,8 @@ #include <yt/yt/core/actions/invoker.h> #include <yt/yt/core/concurrency/action_queue.h> #include <yt/yt/core/concurrency/thread_pool.h> +#include <yt/yt/core/net/address.h> +#include <yt/yt/core/net/config.h> #include <library/cpp/protobuf/util/pb_io.h> @@ -30,6 +32,7 @@ #include <util/stream/file.h> #include <util/system/env.h> #include <util/system/shellcommand.h> +#include <util/string/type.h> using namespace NYql::NDqs; @@ -187,11 +190,20 @@ namespace NYql::NDq::NWorker { TRangeWalker<int> portWalker(startPort, startPort+100); auto ports = BindInRange(portWalker); + auto forceIPv4 = IsTrue(GetEnv(TString("YT_SECURE_VAULT_") + NCommonJobVars::YT_FORCE_IPV4, "")); + if (forceIPv4) { + auto config = NYT::New<NYT::NNet::TAddressResolverConfig>(); + config->EnableIPv4 = true; + config->EnableIPv6 = false; + NYT::NNet::TAddressResolver::Get()->Configure(config); + } + auto [host, ip] = NYql::NDqs::GetLocalAddress( - coordinatorConfig.HasHostName() ? &coordinatorConfig.GetHostName() : nullptr + coordinatorConfig.HasHostName() ? &coordinatorConfig.GetHostName() : nullptr, + forceIPv4 ? AF_INET : AF_INET6 ); - auto coordinator = CreateCoordiantionHelper(coordinatorConfig, NProto::TDqConfig::TScheduler(), "worker_node", ports[1].Addr.GetPort(), host, ip); + auto coordinator = CreateCoordiantionHelper(coordinatorConfig, NProto::TDqConfig::TScheduler(), "worker_node", ports[forceIPv4 ? 0 : 1].Addr.GetPort(), host, ip); i64 cacheSize = backendConfig.HasCacheSize() ? backendConfig.GetCacheSize() : 16000000000L; @@ -280,8 +292,8 @@ namespace NYql::NDq::NWorker { std::tie(setup, logSettings) = BuildActorSetup( nodeId, ip, - ports[1].Addr.GetPort(), - ports[1].Socket->Release(), + ports[forceIPv4 ? 0 : 1].Addr.GetPort(), + ports[forceIPv4 ? 0 : 1].Socket->Release(), {}, dqSensors, [](const TIntrusivePtr<NActors::TTableNameserverSetup>& setup) { diff --git a/ydb/library/yql/yt/dq_vanilla_job.lite/main.cpp b/ydb/library/yql/yt/dq_vanilla_job.lite/main.cpp new file mode 100644 index 00000000000..2e7fd6ddba8 --- /dev/null +++ b/ydb/library/yql/yt/dq_vanilla_job.lite/main.cpp @@ -0,0 +1,44 @@ +#include <ydb/library/yql/providers/dq/runtime/task_command_executor.h> + +#include <ydb/library/yql/providers/yt/comp_nodes/dq/dq_yt_factory.h> +#include <ydb/library/yql/providers/yt/mkql_dq/yql_yt_dq_transform.h> +#include <ydb/library/yql/providers/common/comp_nodes/yql_factory.h> + +#include <ydb/library/yql/utils/backtrace/backtrace.h> + +#include <ydb/library/yql/minikql/computation/mkql_computation_node.h> +#include <ydb/library/yql/minikql/comp_nodes/mkql_factories.h> +#include <ydb/library/yql/minikql/mkql_stats_registry.h> + +#include <ydb/library/yql/dq/integration/transform/yql_dq_task_transform.h> +#include <ydb/library/yql/dq/comp_nodes/yql_common_dq_factory.h> +#include <ydb/library/yql/dq/transform/yql_common_dq_transform.h> + +#include <library/cpp/yt/mlock/mlock.h> + +#include <util/system/mlock.h> +#include <util/stream/output.h> + +using namespace NYql; + +int main() { + NBacktrace::RegisterKikimrFatalActions(); + if (!NYT::MlockFileMappings()) { + Cerr << "mlockall failed, but that's fine" << Endl; + } + + NKikimr::NMiniKQL::IStatsRegistryPtr statsRegistry = NKikimr::NMiniKQL::CreateDefaultStatsRegistry(); + + auto dqCompFactory = NKikimr::NMiniKQL::GetCompositeWithBuiltinFactory({ + GetCommonDqFactory(), + GetDqYtFactory(statsRegistry.Get()), + NKikimr::NMiniKQL::GetYqlFactory(), + }); + + auto dqTaskTransformFactory = NYql::CreateCompositeTaskTransformFactory({ + CreateCommonDqTaskTransformFactory(), + CreateYtDqTaskTransformFactory(), + }); + + return NTaskRunnerProxy::CreateTaskCommandExecutor(dqCompFactory, dqTaskTransformFactory, statsRegistry.Get(), true); +} diff --git a/ydb/library/yql/yt/dq_vanilla_job.lite/ya.make b/ydb/library/yql/yt/dq_vanilla_job.lite/ya.make new file mode 100644 index 00000000000..c149282d674 --- /dev/null +++ b/ydb/library/yql/yt/dq_vanilla_job.lite/ya.make @@ -0,0 +1,30 @@ +PROGRAM() + +PEERDIR( + library/cpp/yt/mlock + yt/cpp/mapreduce/client + ydb/library/yql/minikql/comp_nodes/llvm14 + ydb/library/yql/public/udf/service/terminate_policy + ydb/library/yql/utils/backtrace + ydb/library/yql/dq/comp_nodes + ydb/library/yql/dq/integration/transform + ydb/library/yql/dq/transform + ydb/library/yql/dq/runtime + ydb/library/yql/providers/common/comp_nodes + ydb/library/yql/providers/dq/common + ydb/library/yql/providers/dq/runtime + ydb/library/yql/providers/yt/comp_nodes/dq + ydb/library/yql/providers/yt/mkql_dq + ydb/library/yql/providers/yt/codec/codegen + ydb/library/yql/providers/yt/comp_nodes/llvm14 + ydb/library/yql/sql/pg + ydb/library/yql/parser/pg_wrapper +) + +YQL_LAST_ABI_VERSION() + +SRCS( + main.cpp +) + +END() diff --git a/ydb/library/yql/yt/dq_vanilla_job/main.cpp b/ydb/library/yql/yt/dq_vanilla_job/main.cpp new file mode 100644 index 00000000000..eb5fb6951db --- /dev/null +++ b/ydb/library/yql/yt/dq_vanilla_job/main.cpp @@ -0,0 +1,65 @@ +#include <ydb/library/yql/providers/dq/actors/execution_helpers.h> +#include <ydb/library/yql/providers/dq/runtime/task_command_executor.h> + +#include <ydb/library/yql/providers/yt/comp_nodes/dq/dq_yt_factory.h> +#include <ydb/library/yql/providers/yt/mkql_dq/yql_yt_dq_transform.h> + +#include <ydb/library/yql/providers/common/comp_nodes/yql_factory.h> + +#include <ydb/library/yql/dq/comp_nodes/yql_common_dq_factory.h> +#include <ydb/library/yql/dq/integration/transform/yql_dq_task_transform.h> +#include <ydb/library/yql/dq/transform/yql_common_dq_transform.h> + +#include <ydb/library/yql/minikql/comp_nodes/mkql_factories.h> +#include <ydb/library/yql/tools/dq/worker_job/dq_worker.h> +#include <ydb/library/yql/utils/backtrace/backtrace.h> + +#include <library/cpp/svnversion/svnversion.h> + +#include <library/cpp/yt/mlock/mlock.h> + +using namespace NYql; + +int main(int argc, const char* argv[]) { + NBacktrace::RegisterKikimrFatalActions(); + NBacktrace::EnableKikimrSymbolize(); + + if (!NYT::MlockFileMappings()) { + Cerr << "mlockall failed, but that's fine" << Endl; + } + + if (argc > 1) { + if (!strcmp(argv[1], "-V")) { + Cerr << ToString(GetProgramCommitId()) << Endl; + return 0; + } else if (!strcmp(argv[1], "tasks_runner_proxy")) { + NKikimr::NMiniKQL::IStatsRegistryPtr statsRegistry = NKikimr::NMiniKQL::CreateDefaultStatsRegistry(); + + auto dqCompFactory = NKikimr::NMiniKQL::GetCompositeWithBuiltinFactory({ + GetCommonDqFactory(), + GetDqYtFactory(statsRegistry.Get()), + NKikimr::NMiniKQL::GetYqlFactory(), + }); + + auto dqTaskTransformFactory = CreateCompositeTaskTransformFactory({ + CreateCommonDqTaskTransformFactory(), + CreateYtDqTaskTransformFactory(), + }); + + return NTaskRunnerProxy::CreateTaskCommandExecutor(dqCompFactory, dqTaskTransformFactory, statsRegistry.Get(), true); + } + } + + try { + NYT::Initialize(argc, argv); + + auto job = new NDq::NWorker::TWorkerJob(); + + job->Do(); + } catch (...) { + Cerr << CurrentExceptionMessage(); + return -1; + } + + return 0; +} diff --git a/ydb/library/yql/yt/dq_vanilla_job/ya.make b/ydb/library/yql/yt/dq_vanilla_job/ya.make new file mode 100644 index 00000000000..4dc08fb2e83 --- /dev/null +++ b/ydb/library/yql/yt/dq_vanilla_job/ya.make @@ -0,0 +1,26 @@ +PROGRAM() + +PEERDIR( + library/cpp/svnversion + library/cpp/yt/mlock + ydb/library/yql/dq/comp_nodes + ydb/library/yql/dq/integration/transform + ydb/library/yql/dq/transform + ydb/library/yql/providers/common/comp_nodes + ydb/library/yql/providers/yt/codec/codegen + ydb/library/yql/providers/yt/comp_nodes/llvm14 + ydb/library/yql/utils/backtrace + ydb/library/yql/providers/yt/comp_nodes/dq + ydb/library/yql/providers/yt/mkql_dq + ydb/library/yql/tools/dq/worker_job + ydb/library/yql/sql/pg + ydb/library/yql/parser/pg_wrapper +) + +YQL_LAST_ABI_VERSION() + +SRCS( + main.cpp +) + +END() diff --git a/ydb/library/yql/yt/ya.make b/ydb/library/yql/yt/ya.make index 4aeb5b9bc60..abd7fe1a951 100644 --- a/ydb/library/yql/yt/ya.make +++ b/ydb/library/yql/yt/ya.make @@ -9,5 +9,7 @@ END() RECURSE( dynamic native + dq_vanilla_job + dq_vanilla_job.lite ) |