aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMarina Pereskokova <35163152+Krisha11@users.noreply.github.com>2024-02-01 23:05:21 +0300
committerGitHub <noreply@github.com>2024-02-01 21:05:21 +0100
commita67787558ac061e4c62feb2ebfde87b51b376856 (patch)
tree6f68aa14b91157cbd48c5d964762b6e0d2c89852
parent15d1fa83ceda46e25089822d9792988defb20c60 (diff)
downloadydb-a67787558ac061e4c62feb2ebfde87b51b376856.tar.gz
Init yt dq jobs (#1460)
-rw-r--r--ydb/library/yql/providers/dq/actors/yt/resource_manager.h3
-rw-r--r--ydb/library/yql/providers/dq/actors/yt/yt_resource_manager.cpp2
-rw-r--r--ydb/library/yql/tools/dq/worker_job/dq_worker.cpp20
-rw-r--r--ydb/library/yql/yt/dq_vanilla_job.lite/main.cpp44
-rw-r--r--ydb/library/yql/yt/dq_vanilla_job.lite/ya.make30
-rw-r--r--ydb/library/yql/yt/dq_vanilla_job/main.cpp65
-rw-r--r--ydb/library/yql/yt/dq_vanilla_job/ya.make26
-rw-r--r--ydb/library/yql/yt/ya.make2
8 files changed, 188 insertions, 4 deletions
diff --git a/ydb/library/yql/providers/dq/actors/yt/resource_manager.h b/ydb/library/yql/providers/dq/actors/yt/resource_manager.h
index 950d2e35567..8a2760081ad 100644
--- a/ydb/library/yql/providers/dq/actors/yt/resource_manager.h
+++ b/ydb/library/yql/providers/dq/actors/yt/resource_manager.h
@@ -19,6 +19,7 @@ namespace NYql {
extern const TString OPERATION_SIZE;
extern const TString YT_COORDINATOR;
extern const TString YT_BACKEND;
+ extern const TString YT_FORCE_IPV4;
}
class ICoordinationHelper;
@@ -68,6 +69,8 @@ namespace NYql {
int Capabilities = 0;
int MaxRetries = -1;
+ bool ForceIPv4 = false;
+
// Pinger
TString DieOnFileAbsence; // see YQL-14099
diff --git a/ydb/library/yql/providers/dq/actors/yt/yt_resource_manager.cpp b/ydb/library/yql/providers/dq/actors/yt/yt_resource_manager.cpp
index 37e401aa90b..cc254f9b964 100644
--- a/ydb/library/yql/providers/dq/actors/yt/yt_resource_manager.cpp
+++ b/ydb/library/yql/providers/dq/actors/yt/yt_resource_manager.cpp
@@ -33,6 +33,7 @@ namespace NYql {
const TString OPERATION_SIZE("OPERATION_SIZE");
const TString YT_COORDINATOR("YT_COORDINATOR");
const TString YT_BACKEND("YT_BACKEND");
+ const TString YT_FORCE_IPV4("YT_FORCE_IPV4");
}
using namespace NActors;
@@ -600,6 +601,7 @@ namespace NYql {
.BeginMap()
.Item(NCommonJobVars::YT_COORDINATOR).Value(coordinatorStr)
.Item(NCommonJobVars::YT_BACKEND).Value(backendStr)
+ .Item(NCommonJobVars::YT_FORCE_IPV4).Value(Options.ForceIPv4)
.DoFor(Options.YtBackend.GetVaultEnv(), [&] (NYT::TFluentMap fluent, const NYql::NProto::TDqConfig::TAttr& envVar) { // Добавляем env variables
TString tokenValue;
try {
diff --git a/ydb/library/yql/tools/dq/worker_job/dq_worker.cpp b/ydb/library/yql/tools/dq/worker_job/dq_worker.cpp
index ea3ebe2a447..b6b799c8240 100644
--- a/ydb/library/yql/tools/dq/worker_job/dq_worker.cpp
+++ b/ydb/library/yql/tools/dq/worker_job/dq_worker.cpp
@@ -23,6 +23,8 @@
#include <yt/yt/core/actions/invoker.h>
#include <yt/yt/core/concurrency/action_queue.h>
#include <yt/yt/core/concurrency/thread_pool.h>
+#include <yt/yt/core/net/address.h>
+#include <yt/yt/core/net/config.h>
#include <library/cpp/protobuf/util/pb_io.h>
@@ -30,6 +32,7 @@
#include <util/stream/file.h>
#include <util/system/env.h>
#include <util/system/shellcommand.h>
+#include <util/string/type.h>
using namespace NYql::NDqs;
@@ -187,11 +190,20 @@ namespace NYql::NDq::NWorker {
TRangeWalker<int> portWalker(startPort, startPort+100);
auto ports = BindInRange(portWalker);
+ auto forceIPv4 = IsTrue(GetEnv(TString("YT_SECURE_VAULT_") + NCommonJobVars::YT_FORCE_IPV4, ""));
+ if (forceIPv4) {
+ auto config = NYT::New<NYT::NNet::TAddressResolverConfig>();
+ config->EnableIPv4 = true;
+ config->EnableIPv6 = false;
+ NYT::NNet::TAddressResolver::Get()->Configure(config);
+ }
+
auto [host, ip] = NYql::NDqs::GetLocalAddress(
- coordinatorConfig.HasHostName() ? &coordinatorConfig.GetHostName() : nullptr
+ coordinatorConfig.HasHostName() ? &coordinatorConfig.GetHostName() : nullptr,
+ forceIPv4 ? AF_INET : AF_INET6
);
- auto coordinator = CreateCoordiantionHelper(coordinatorConfig, NProto::TDqConfig::TScheduler(), "worker_node", ports[1].Addr.GetPort(), host, ip);
+ auto coordinator = CreateCoordiantionHelper(coordinatorConfig, NProto::TDqConfig::TScheduler(), "worker_node", ports[forceIPv4 ? 0 : 1].Addr.GetPort(), host, ip);
i64 cacheSize = backendConfig.HasCacheSize()
? backendConfig.GetCacheSize()
: 16000000000L;
@@ -280,8 +292,8 @@ namespace NYql::NDq::NWorker {
std::tie(setup, logSettings) = BuildActorSetup(
nodeId,
ip,
- ports[1].Addr.GetPort(),
- ports[1].Socket->Release(),
+ ports[forceIPv4 ? 0 : 1].Addr.GetPort(),
+ ports[forceIPv4 ? 0 : 1].Socket->Release(),
{},
dqSensors,
[](const TIntrusivePtr<NActors::TTableNameserverSetup>& setup) {
diff --git a/ydb/library/yql/yt/dq_vanilla_job.lite/main.cpp b/ydb/library/yql/yt/dq_vanilla_job.lite/main.cpp
new file mode 100644
index 00000000000..2e7fd6ddba8
--- /dev/null
+++ b/ydb/library/yql/yt/dq_vanilla_job.lite/main.cpp
@@ -0,0 +1,44 @@
+#include <ydb/library/yql/providers/dq/runtime/task_command_executor.h>
+
+#include <ydb/library/yql/providers/yt/comp_nodes/dq/dq_yt_factory.h>
+#include <ydb/library/yql/providers/yt/mkql_dq/yql_yt_dq_transform.h>
+#include <ydb/library/yql/providers/common/comp_nodes/yql_factory.h>
+
+#include <ydb/library/yql/utils/backtrace/backtrace.h>
+
+#include <ydb/library/yql/minikql/computation/mkql_computation_node.h>
+#include <ydb/library/yql/minikql/comp_nodes/mkql_factories.h>
+#include <ydb/library/yql/minikql/mkql_stats_registry.h>
+
+#include <ydb/library/yql/dq/integration/transform/yql_dq_task_transform.h>
+#include <ydb/library/yql/dq/comp_nodes/yql_common_dq_factory.h>
+#include <ydb/library/yql/dq/transform/yql_common_dq_transform.h>
+
+#include <library/cpp/yt/mlock/mlock.h>
+
+#include <util/system/mlock.h>
+#include <util/stream/output.h>
+
+using namespace NYql;
+
+int main() {
+ NBacktrace::RegisterKikimrFatalActions();
+ if (!NYT::MlockFileMappings()) {
+ Cerr << "mlockall failed, but that's fine" << Endl;
+ }
+
+ NKikimr::NMiniKQL::IStatsRegistryPtr statsRegistry = NKikimr::NMiniKQL::CreateDefaultStatsRegistry();
+
+ auto dqCompFactory = NKikimr::NMiniKQL::GetCompositeWithBuiltinFactory({
+ GetCommonDqFactory(),
+ GetDqYtFactory(statsRegistry.Get()),
+ NKikimr::NMiniKQL::GetYqlFactory(),
+ });
+
+ auto dqTaskTransformFactory = NYql::CreateCompositeTaskTransformFactory({
+ CreateCommonDqTaskTransformFactory(),
+ CreateYtDqTaskTransformFactory(),
+ });
+
+ return NTaskRunnerProxy::CreateTaskCommandExecutor(dqCompFactory, dqTaskTransformFactory, statsRegistry.Get(), true);
+}
diff --git a/ydb/library/yql/yt/dq_vanilla_job.lite/ya.make b/ydb/library/yql/yt/dq_vanilla_job.lite/ya.make
new file mode 100644
index 00000000000..c149282d674
--- /dev/null
+++ b/ydb/library/yql/yt/dq_vanilla_job.lite/ya.make
@@ -0,0 +1,30 @@
+PROGRAM()
+
+PEERDIR(
+ library/cpp/yt/mlock
+ yt/cpp/mapreduce/client
+ ydb/library/yql/minikql/comp_nodes/llvm14
+ ydb/library/yql/public/udf/service/terminate_policy
+ ydb/library/yql/utils/backtrace
+ ydb/library/yql/dq/comp_nodes
+ ydb/library/yql/dq/integration/transform
+ ydb/library/yql/dq/transform
+ ydb/library/yql/dq/runtime
+ ydb/library/yql/providers/common/comp_nodes
+ ydb/library/yql/providers/dq/common
+ ydb/library/yql/providers/dq/runtime
+ ydb/library/yql/providers/yt/comp_nodes/dq
+ ydb/library/yql/providers/yt/mkql_dq
+ ydb/library/yql/providers/yt/codec/codegen
+ ydb/library/yql/providers/yt/comp_nodes/llvm14
+ ydb/library/yql/sql/pg
+ ydb/library/yql/parser/pg_wrapper
+)
+
+YQL_LAST_ABI_VERSION()
+
+SRCS(
+ main.cpp
+)
+
+END()
diff --git a/ydb/library/yql/yt/dq_vanilla_job/main.cpp b/ydb/library/yql/yt/dq_vanilla_job/main.cpp
new file mode 100644
index 00000000000..eb5fb6951db
--- /dev/null
+++ b/ydb/library/yql/yt/dq_vanilla_job/main.cpp
@@ -0,0 +1,65 @@
+#include <ydb/library/yql/providers/dq/actors/execution_helpers.h>
+#include <ydb/library/yql/providers/dq/runtime/task_command_executor.h>
+
+#include <ydb/library/yql/providers/yt/comp_nodes/dq/dq_yt_factory.h>
+#include <ydb/library/yql/providers/yt/mkql_dq/yql_yt_dq_transform.h>
+
+#include <ydb/library/yql/providers/common/comp_nodes/yql_factory.h>
+
+#include <ydb/library/yql/dq/comp_nodes/yql_common_dq_factory.h>
+#include <ydb/library/yql/dq/integration/transform/yql_dq_task_transform.h>
+#include <ydb/library/yql/dq/transform/yql_common_dq_transform.h>
+
+#include <ydb/library/yql/minikql/comp_nodes/mkql_factories.h>
+#include <ydb/library/yql/tools/dq/worker_job/dq_worker.h>
+#include <ydb/library/yql/utils/backtrace/backtrace.h>
+
+#include <library/cpp/svnversion/svnversion.h>
+
+#include <library/cpp/yt/mlock/mlock.h>
+
+using namespace NYql;
+
+int main(int argc, const char* argv[]) {
+ NBacktrace::RegisterKikimrFatalActions();
+ NBacktrace::EnableKikimrSymbolize();
+
+ if (!NYT::MlockFileMappings()) {
+ Cerr << "mlockall failed, but that's fine" << Endl;
+ }
+
+ if (argc > 1) {
+ if (!strcmp(argv[1], "-V")) {
+ Cerr << ToString(GetProgramCommitId()) << Endl;
+ return 0;
+ } else if (!strcmp(argv[1], "tasks_runner_proxy")) {
+ NKikimr::NMiniKQL::IStatsRegistryPtr statsRegistry = NKikimr::NMiniKQL::CreateDefaultStatsRegistry();
+
+ auto dqCompFactory = NKikimr::NMiniKQL::GetCompositeWithBuiltinFactory({
+ GetCommonDqFactory(),
+ GetDqYtFactory(statsRegistry.Get()),
+ NKikimr::NMiniKQL::GetYqlFactory(),
+ });
+
+ auto dqTaskTransformFactory = CreateCompositeTaskTransformFactory({
+ CreateCommonDqTaskTransformFactory(),
+ CreateYtDqTaskTransformFactory(),
+ });
+
+ return NTaskRunnerProxy::CreateTaskCommandExecutor(dqCompFactory, dqTaskTransformFactory, statsRegistry.Get(), true);
+ }
+ }
+
+ try {
+ NYT::Initialize(argc, argv);
+
+ auto job = new NDq::NWorker::TWorkerJob();
+
+ job->Do();
+ } catch (...) {
+ Cerr << CurrentExceptionMessage();
+ return -1;
+ }
+
+ return 0;
+}
diff --git a/ydb/library/yql/yt/dq_vanilla_job/ya.make b/ydb/library/yql/yt/dq_vanilla_job/ya.make
new file mode 100644
index 00000000000..4dc08fb2e83
--- /dev/null
+++ b/ydb/library/yql/yt/dq_vanilla_job/ya.make
@@ -0,0 +1,26 @@
+PROGRAM()
+
+PEERDIR(
+ library/cpp/svnversion
+ library/cpp/yt/mlock
+ ydb/library/yql/dq/comp_nodes
+ ydb/library/yql/dq/integration/transform
+ ydb/library/yql/dq/transform
+ ydb/library/yql/providers/common/comp_nodes
+ ydb/library/yql/providers/yt/codec/codegen
+ ydb/library/yql/providers/yt/comp_nodes/llvm14
+ ydb/library/yql/utils/backtrace
+ ydb/library/yql/providers/yt/comp_nodes/dq
+ ydb/library/yql/providers/yt/mkql_dq
+ ydb/library/yql/tools/dq/worker_job
+ ydb/library/yql/sql/pg
+ ydb/library/yql/parser/pg_wrapper
+)
+
+YQL_LAST_ABI_VERSION()
+
+SRCS(
+ main.cpp
+)
+
+END()
diff --git a/ydb/library/yql/yt/ya.make b/ydb/library/yql/yt/ya.make
index 4aeb5b9bc60..abd7fe1a951 100644
--- a/ydb/library/yql/yt/ya.make
+++ b/ydb/library/yql/yt/ya.make
@@ -9,5 +9,7 @@ END()
RECURSE(
dynamic
native
+ dq_vanilla_job
+ dq_vanilla_job.lite
)