aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoraozeritsky <aozeritsky@ydb.tech>2023-10-24 23:12:06 +0300
committeraozeritsky <aozeritsky@ydb.tech>2023-10-24 23:41:41 +0300
commitf497b52bb70f52b35c75ba4d18d66fd8904a16e2 (patch)
treeae38a081a0ac936ba6bfb64d7e77402e25eaf747
parent58fc30125015d9e913d6ecc1f1a89007541ede1b (diff)
downloadydb-f497b52bb70f52b35c75ba4d18d66fd8904a16e2.tar.gz
Introduce OSS versions of worker_node/service_node (dq testing utilities)
-rw-r--r--.mapping.json16
-rw-r--r--ydb/library/yql/tools/CMakeLists.darwin-x86_64.txt15
-rw-r--r--ydb/library/yql/tools/CMakeLists.linux-aarch64.txt15
-rw-r--r--ydb/library/yql/tools/CMakeLists.linux-x86_64.txt15
-rw-r--r--ydb/library/yql/tools/CMakeLists.txt15
-rw-r--r--ydb/library/yql/tools/CMakeLists.windows-x86_64.txt14
-rw-r--r--ydb/library/yql/tools/dq/CMakeLists.darwin-x86_64.txt10
-rw-r--r--ydb/library/yql/tools/dq/CMakeLists.linux-aarch64.txt10
-rw-r--r--ydb/library/yql/tools/dq/CMakeLists.linux-x86_64.txt10
-rw-r--r--ydb/library/yql/tools/dq/CMakeLists.txt15
-rw-r--r--ydb/library/yql/tools/dq/service_node/CMakeLists.darwin-x86_64.txt51
-rw-r--r--ydb/library/yql/tools/dq/service_node/CMakeLists.linux-aarch64.txt55
-rw-r--r--ydb/library/yql/tools/dq/service_node/CMakeLists.linux-x86_64.txt57
-rw-r--r--ydb/library/yql/tools/dq/service_node/CMakeLists.txt15
-rw-r--r--ydb/library/yql/tools/dq/service_node/main.cpp384
-rw-r--r--ydb/library/yql/tools/dq/service_node/ya.make33
-rw-r--r--ydb/library/yql/tools/dq/worker_node/CMakeLists.darwin-x86_64.txt64
-rw-r--r--ydb/library/yql/tools/dq/worker_node/CMakeLists.linux-aarch64.txt68
-rw-r--r--ydb/library/yql/tools/dq/worker_node/CMakeLists.linux-x86_64.txt70
-rw-r--r--ydb/library/yql/tools/dq/worker_node/CMakeLists.txt15
-rw-r--r--ydb/library/yql/tools/dq/worker_node/main.cpp423
-rw-r--r--ydb/library/yql/tools/dq/worker_node/ya.make46
-rw-r--r--ydb/library/yql/tools/dq/ya.make4
-rw-r--r--ydb/library/yql/tools/ya.make1
24 files changed, 1415 insertions, 6 deletions
diff --git a/.mapping.json b/.mapping.json
index 5b4c5e3cb0..94d161bbc7 100644
--- a/.mapping.json
+++ b/.mapping.json
@@ -8366,12 +8366,28 @@
"ydb/library/yql/sql/v1/ut/CMakeLists.linux-x86_64.txt":"",
"ydb/library/yql/sql/v1/ut/CMakeLists.txt":"",
"ydb/library/yql/sql/v1/ut/CMakeLists.windows-x86_64.txt":"",
+ "ydb/library/yql/tools/CMakeLists.darwin-x86_64.txt":"",
+ "ydb/library/yql/tools/CMakeLists.linux-aarch64.txt":"",
+ "ydb/library/yql/tools/CMakeLists.linux-x86_64.txt":"",
"ydb/library/yql/tools/CMakeLists.txt":"",
+ "ydb/library/yql/tools/CMakeLists.windows-x86_64.txt":"",
"ydb/library/yql/tools/astdiff/CMakeLists.darwin-x86_64.txt":"",
"ydb/library/yql/tools/astdiff/CMakeLists.linux-aarch64.txt":"",
"ydb/library/yql/tools/astdiff/CMakeLists.linux-x86_64.txt":"",
"ydb/library/yql/tools/astdiff/CMakeLists.txt":"",
"ydb/library/yql/tools/astdiff/CMakeLists.windows-x86_64.txt":"",
+ "ydb/library/yql/tools/dq/CMakeLists.darwin-x86_64.txt":"",
+ "ydb/library/yql/tools/dq/CMakeLists.linux-aarch64.txt":"",
+ "ydb/library/yql/tools/dq/CMakeLists.linux-x86_64.txt":"",
+ "ydb/library/yql/tools/dq/CMakeLists.txt":"",
+ "ydb/library/yql/tools/dq/service_node/CMakeLists.darwin-x86_64.txt":"",
+ "ydb/library/yql/tools/dq/service_node/CMakeLists.linux-aarch64.txt":"",
+ "ydb/library/yql/tools/dq/service_node/CMakeLists.linux-x86_64.txt":"",
+ "ydb/library/yql/tools/dq/service_node/CMakeLists.txt":"",
+ "ydb/library/yql/tools/dq/worker_node/CMakeLists.darwin-x86_64.txt":"",
+ "ydb/library/yql/tools/dq/worker_node/CMakeLists.linux-aarch64.txt":"",
+ "ydb/library/yql/tools/dq/worker_node/CMakeLists.linux-x86_64.txt":"",
+ "ydb/library/yql/tools/dq/worker_node/CMakeLists.txt":"",
"ydb/library/yql/tools/dqrun/CMakeLists.darwin-x86_64.txt":"",
"ydb/library/yql/tools/dqrun/CMakeLists.linux-aarch64.txt":"",
"ydb/library/yql/tools/dqrun/CMakeLists.linux-x86_64.txt":"",
diff --git a/ydb/library/yql/tools/CMakeLists.darwin-x86_64.txt b/ydb/library/yql/tools/CMakeLists.darwin-x86_64.txt
new file mode 100644
index 0000000000..61ab79236b
--- /dev/null
+++ b/ydb/library/yql/tools/CMakeLists.darwin-x86_64.txt
@@ -0,0 +1,15 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+add_subdirectory(astdiff)
+add_subdirectory(dq)
+add_subdirectory(dqrun)
+add_subdirectory(mrjob)
+add_subdirectory(sql2yql)
+add_subdirectory(sql_formatter)
+add_subdirectory(yqlrun)
diff --git a/ydb/library/yql/tools/CMakeLists.linux-aarch64.txt b/ydb/library/yql/tools/CMakeLists.linux-aarch64.txt
new file mode 100644
index 0000000000..61ab79236b
--- /dev/null
+++ b/ydb/library/yql/tools/CMakeLists.linux-aarch64.txt
@@ -0,0 +1,15 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+add_subdirectory(astdiff)
+add_subdirectory(dq)
+add_subdirectory(dqrun)
+add_subdirectory(mrjob)
+add_subdirectory(sql2yql)
+add_subdirectory(sql_formatter)
+add_subdirectory(yqlrun)
diff --git a/ydb/library/yql/tools/CMakeLists.linux-x86_64.txt b/ydb/library/yql/tools/CMakeLists.linux-x86_64.txt
new file mode 100644
index 0000000000..61ab79236b
--- /dev/null
+++ b/ydb/library/yql/tools/CMakeLists.linux-x86_64.txt
@@ -0,0 +1,15 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+add_subdirectory(astdiff)
+add_subdirectory(dq)
+add_subdirectory(dqrun)
+add_subdirectory(mrjob)
+add_subdirectory(sql2yql)
+add_subdirectory(sql_formatter)
+add_subdirectory(yqlrun)
diff --git a/ydb/library/yql/tools/CMakeLists.txt b/ydb/library/yql/tools/CMakeLists.txt
index bd1f049b04..f8b31df0c1 100644
--- a/ydb/library/yql/tools/CMakeLists.txt
+++ b/ydb/library/yql/tools/CMakeLists.txt
@@ -6,9 +6,12 @@
# original buildsystem will not be accepted.
-add_subdirectory(astdiff)
-add_subdirectory(dqrun)
-add_subdirectory(mrjob)
-add_subdirectory(sql2yql)
-add_subdirectory(sql_formatter)
-add_subdirectory(yqlrun)
+if (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND NOT HAVE_CUDA)
+ include(CMakeLists.linux-aarch64.txt)
+elseif (CMAKE_SYSTEM_NAME STREQUAL "Darwin" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64")
+ include(CMakeLists.darwin-x86_64.txt)
+elseif (WIN32 AND CMAKE_SYSTEM_PROCESSOR STREQUAL "AMD64" AND NOT HAVE_CUDA)
+ include(CMakeLists.windows-x86_64.txt)
+elseif (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND NOT HAVE_CUDA)
+ include(CMakeLists.linux-x86_64.txt)
+endif()
diff --git a/ydb/library/yql/tools/CMakeLists.windows-x86_64.txt b/ydb/library/yql/tools/CMakeLists.windows-x86_64.txt
new file mode 100644
index 0000000000..bd1f049b04
--- /dev/null
+++ b/ydb/library/yql/tools/CMakeLists.windows-x86_64.txt
@@ -0,0 +1,14 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+add_subdirectory(astdiff)
+add_subdirectory(dqrun)
+add_subdirectory(mrjob)
+add_subdirectory(sql2yql)
+add_subdirectory(sql_formatter)
+add_subdirectory(yqlrun)
diff --git a/ydb/library/yql/tools/dq/CMakeLists.darwin-x86_64.txt b/ydb/library/yql/tools/dq/CMakeLists.darwin-x86_64.txt
new file mode 100644
index 0000000000..bb8870384d
--- /dev/null
+++ b/ydb/library/yql/tools/dq/CMakeLists.darwin-x86_64.txt
@@ -0,0 +1,10 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+add_subdirectory(service_node)
+add_subdirectory(worker_node)
diff --git a/ydb/library/yql/tools/dq/CMakeLists.linux-aarch64.txt b/ydb/library/yql/tools/dq/CMakeLists.linux-aarch64.txt
new file mode 100644
index 0000000000..bb8870384d
--- /dev/null
+++ b/ydb/library/yql/tools/dq/CMakeLists.linux-aarch64.txt
@@ -0,0 +1,10 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+add_subdirectory(service_node)
+add_subdirectory(worker_node)
diff --git a/ydb/library/yql/tools/dq/CMakeLists.linux-x86_64.txt b/ydb/library/yql/tools/dq/CMakeLists.linux-x86_64.txt
new file mode 100644
index 0000000000..bb8870384d
--- /dev/null
+++ b/ydb/library/yql/tools/dq/CMakeLists.linux-x86_64.txt
@@ -0,0 +1,10 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+add_subdirectory(service_node)
+add_subdirectory(worker_node)
diff --git a/ydb/library/yql/tools/dq/CMakeLists.txt b/ydb/library/yql/tools/dq/CMakeLists.txt
new file mode 100644
index 0000000000..606ff46b4b
--- /dev/null
+++ b/ydb/library/yql/tools/dq/CMakeLists.txt
@@ -0,0 +1,15 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+if (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND NOT HAVE_CUDA)
+ include(CMakeLists.linux-aarch64.txt)
+elseif (CMAKE_SYSTEM_NAME STREQUAL "Darwin" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64")
+ include(CMakeLists.darwin-x86_64.txt)
+elseif (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND NOT HAVE_CUDA)
+ include(CMakeLists.linux-x86_64.txt)
+endif()
diff --git a/ydb/library/yql/tools/dq/service_node/CMakeLists.darwin-x86_64.txt b/ydb/library/yql/tools/dq/service_node/CMakeLists.darwin-x86_64.txt
new file mode 100644
index 0000000000..a9eb732340
--- /dev/null
+++ b/ydb/library/yql/tools/dq/service_node/CMakeLists.darwin-x86_64.txt
@@ -0,0 +1,51 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_executable(service_node)
+target_compile_options(service_node PRIVATE
+ -DUSE_CURRENT_UDF_ABI_VERSION
+)
+target_link_libraries(service_node PUBLIC
+ contrib-libs-cxxsupp
+ yutil
+ library-cpp-cpuid_check
+ library-cpp-getopt
+ cpp-mapreduce-client
+ yql-sql-pg
+ yql-parser-pg_wrapper
+ udf-service-exception_policy
+ yql-utils-failure_injector
+ yql-utils-log
+ utils-log-proto
+ providers-dq-provider
+ dq-worker_manager-interface
+ minikql-invoke_builtins-llvm
+ yql-utils-backtrace
+ providers-dq-service
+ providers-dq-metrics
+ providers-dq-stats_collector
+ providers-yt-dq_task_preprocessor
+ providers-dq-global_worker_manager
+ dq-actors-yt
+ yt-yt-client
+)
+target_link_options(service_node PRIVATE
+ -Wl,-platform_version,macos,11.0,11.0
+ -fPIC
+ -fPIC
+ -framework
+ CoreFoundation
+)
+target_sources(service_node PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/tools/dq/service_node/main.cpp
+)
+target_allocator(service_node
+ system_allocator
+)
+vcs_info(service_node)
diff --git a/ydb/library/yql/tools/dq/service_node/CMakeLists.linux-aarch64.txt b/ydb/library/yql/tools/dq/service_node/CMakeLists.linux-aarch64.txt
new file mode 100644
index 0000000000..822001577e
--- /dev/null
+++ b/ydb/library/yql/tools/dq/service_node/CMakeLists.linux-aarch64.txt
@@ -0,0 +1,55 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_executable(service_node)
+target_compile_options(service_node PRIVATE
+ -DUSE_CURRENT_UDF_ABI_VERSION
+)
+target_link_libraries(service_node PUBLIC
+ contrib-libs-linux-headers
+ contrib-libs-cxxsupp
+ yutil
+ library-cpp-getopt
+ cpp-mapreduce-client
+ yql-sql-pg
+ yql-parser-pg_wrapper
+ udf-service-exception_policy
+ yql-utils-failure_injector
+ yql-utils-log
+ utils-log-proto
+ providers-dq-provider
+ dq-worker_manager-interface
+ minikql-invoke_builtins-llvm
+ yql-utils-backtrace
+ providers-dq-service
+ providers-dq-metrics
+ providers-dq-stats_collector
+ providers-yt-dq_task_preprocessor
+ providers-dq-global_worker_manager
+ dq-actors-yt
+ yt-yt-client
+)
+target_link_options(service_node PRIVATE
+ -ldl
+ -lrt
+ -Wl,--no-as-needed
+ -fPIC
+ -fPIC
+ -lpthread
+ -lrt
+ -ldl
+ -lutil
+)
+target_sources(service_node PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/tools/dq/service_node/main.cpp
+)
+target_allocator(service_node
+ cpp-malloc-jemalloc
+)
+vcs_info(service_node)
diff --git a/ydb/library/yql/tools/dq/service_node/CMakeLists.linux-x86_64.txt b/ydb/library/yql/tools/dq/service_node/CMakeLists.linux-x86_64.txt
new file mode 100644
index 0000000000..673a21b278
--- /dev/null
+++ b/ydb/library/yql/tools/dq/service_node/CMakeLists.linux-x86_64.txt
@@ -0,0 +1,57 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_executable(service_node)
+target_compile_options(service_node PRIVATE
+ -DUSE_CURRENT_UDF_ABI_VERSION
+)
+target_link_libraries(service_node PUBLIC
+ contrib-libs-linux-headers
+ contrib-libs-cxxsupp
+ yutil
+ library-cpp-cpuid_check
+ library-cpp-getopt
+ cpp-mapreduce-client
+ yql-sql-pg
+ yql-parser-pg_wrapper
+ udf-service-exception_policy
+ yql-utils-failure_injector
+ yql-utils-log
+ utils-log-proto
+ providers-dq-provider
+ dq-worker_manager-interface
+ minikql-invoke_builtins-llvm
+ yql-utils-backtrace
+ providers-dq-service
+ providers-dq-metrics
+ providers-dq-stats_collector
+ providers-yt-dq_task_preprocessor
+ providers-dq-global_worker_manager
+ dq-actors-yt
+ yt-yt-client
+)
+target_link_options(service_node PRIVATE
+ -ldl
+ -lrt
+ -Wl,--no-as-needed
+ -fPIC
+ -fPIC
+ -lpthread
+ -lrt
+ -ldl
+ -lutil
+)
+target_sources(service_node PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/tools/dq/service_node/main.cpp
+)
+target_allocator(service_node
+ cpp-malloc-tcmalloc
+ libs-tcmalloc-no_percpu_cache
+)
+vcs_info(service_node)
diff --git a/ydb/library/yql/tools/dq/service_node/CMakeLists.txt b/ydb/library/yql/tools/dq/service_node/CMakeLists.txt
new file mode 100644
index 0000000000..606ff46b4b
--- /dev/null
+++ b/ydb/library/yql/tools/dq/service_node/CMakeLists.txt
@@ -0,0 +1,15 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+if (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND NOT HAVE_CUDA)
+ include(CMakeLists.linux-aarch64.txt)
+elseif (CMAKE_SYSTEM_NAME STREQUAL "Darwin" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64")
+ include(CMakeLists.darwin-x86_64.txt)
+elseif (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND NOT HAVE_CUDA)
+ include(CMakeLists.linux-x86_64.txt)
+endif()
diff --git a/ydb/library/yql/tools/dq/service_node/main.cpp b/ydb/library/yql/tools/dq/service_node/main.cpp
new file mode 100644
index 0000000000..fa7d0eea30
--- /dev/null
+++ b/ydb/library/yql/tools/dq/service_node/main.cpp
@@ -0,0 +1,384 @@
+#include <ydb/library/yql/providers/dq/metrics/metrics_pusher.h>
+#include <ydb/library/yql/providers/dq/actors/yt/resource_manager.h>
+#include <ydb/library/yql/providers/dq/actors/dynamic_nameserver.h>
+#include <ydb/library/yql/providers/dq/stats_collector/pool_stats_collector.h>
+#include <ydb/library/yql/providers/dq/service/interconnect_helpers.h>
+#include <ydb/library/yql/providers/dq/service/service_node.h>
+#include <ydb/library/yql/providers/dq/global_worker_manager/coordination_helper.h>
+#include <ydb/library/yql/providers/dq/worker_manager/interface/events.h>
+#include <ydb/library/yql/providers/yt/dq_task_preprocessor/yql_yt_dq_task_preprocessor.h>
+#include <ydb/library/yql/utils/log/proto/logger_config.pb.h>
+#include <ydb/library/yql/utils/log/log.h>
+#include <ydb/library/yql/utils/log/tls_backend.h>
+#include <ydb/library/yql/utils/failure_injector/failure_injector.h>
+#include <ydb/library/yql/minikql/invoke_builtins/mkql_builtins.h>
+#include <ydb/library/yql/utils/backtrace/backtrace.h>
+
+#include <yt/yt/core/misc/shutdown.h>
+#include <yt/yt/client/api/client.h>
+
+#include <library/cpp/getopt/small/last_getopt.h>
+#include <library/cpp/threading/future/future.h>
+#include <library/cpp/digest/md5/md5.h>
+
+
+#if 0
+# include <yt/yt/core/logging/config.h>
+# include <yt/yt/core/logging/log_manager.h>
+#endif
+
+constexpr ui32 THREAD_PER_NODE = 16;
+
+using namespace NYql;
+using namespace NYql::NDqs;
+using TFileResource = Yql::DqsProto::TFile;
+
+static NThreading::TPromise<void> ShouldContinue = NThreading::NewPromise<void>();
+
+static void OnTerminate(int) {
+ ShouldContinue.TrySetValue();
+}
+
+// TODO: Merge with ydb/library/yql/providers/dq/provider/yql_dq_gateway.cpp#L28
+THashMap<TString, TString> Md5Cache;
+
+TString GetMd5(const TString& localPath) {
+ if (Md5Cache.contains(localPath)) {
+ return Md5Cache[localPath];
+ } else {
+ auto blob = ::TBlob::FromFile(localPath);
+ TString digest;
+
+ char buf[33] = {0};
+ digest = MD5::Data(blob.Data(), blob.Size(), buf);
+ Md5Cache[localPath] = digest;
+ return digest;
+ }
+}
+
+TVector<TFileResource> GetFiles(TString udfsPath, TString vanillaLitePath) {
+ TVector<TFileResource> files;
+
+ if (!vanillaLitePath.empty()) {
+ TFileResource vanillaLite;
+ vanillaLite.SetLocalPath(vanillaLitePath);
+ vanillaLite.SetName(vanillaLitePath.substr(vanillaLitePath.rfind('/') + 1));
+ vanillaLite.SetObjectType(Yql::DqsProto::TFile_EFileType_EEXE_FILE);
+ vanillaLite.SetObjectId(GetProgramCommitId());
+ files.push_back(vanillaLite);
+ }
+
+ if (!udfsPath.empty()) {
+ TVector<TString> tmp;
+ NKikimr::NMiniKQL::FindUdfsInDir(udfsPath, &tmp);
+ for (const auto& f : tmp) {
+ TFileResource r;
+ r.SetLocalPath(f);
+ r.SetObjectType(Yql::DqsProto::TFile_EFileType_EUDF_FILE);
+ r.SetObjectId(GetMd5(f));
+ files.push_back(r);
+ }
+ }
+ return files;
+}
+
+/* crutch */
+void* GetAppData() {
+ return nullptr;
+}
+/* */
+
+// Simple usage: ./service_node --id 1 --port 31337 --grpcport 8080
+int main(int argc, char** argv) {
+ using namespace NLastGetopt;
+
+ auto loggerConfig = NYql::NProto::TLoggingConfig();
+
+#if 0
+ auto logManager = NYT::NLogging::TLogManager::Get();
+
+ TString logConfig = " \
+ { \
+ \"rules\" = [ \
+ { \
+ \"min_level\" = \"debug\"; \
+ \"writers\" = [ \
+ \"debug\"; \
+ ]; \
+ \"exclude_categories\" = [ \
+ \"Bus\"; \
+ ]; \
+ }; \
+ ]; \
+ \"writers\" = { \
+ \"debug\" = { \
+ \"type\" = \"stderr\"; \
+ } \
+ } \
+ }";
+
+ auto ytLogConfigNode = NYT::NYTree::ConvertTo<NYT::NYTree::INodePtr>(
+ NYT::NYson::TYsonString(
+ logConfig.Data(), logConfig.Size(), NYT::NYson::EYsonType::Node));
+ auto logManagerConfig = NYT::New<NYT::NLogging::TLogManagerConfig>();
+ logManagerConfig->Load(ytLogConfigNode);
+ logManager->Configure(logManagerConfig);
+#endif
+ TOpts opts = TOpts::Default();
+ opts.AddHelpOption();
+ opts.AddLongOption("id", "Entry node for service");
+ opts.AddLongOption("workers", "Worker actors per worker node");
+
+ opts.AddLongOption("ytprefix", "Yt prefix");
+ opts.AddLongOption("proxy", "Yt proxy");
+ opts.AddLongOption("yttoken", "Yt token");
+ opts.AddLongOption("ytuser", "Yt user");
+
+ opts.AddLongOption("port", "Port");
+ opts.AddLongOption("grpcport", "Grpc Port");
+ opts.AddLongOption("mbusport", "Yql worker mbus port");
+
+ opts.AddLongOption("remote_jobs", "Start YtRM with jobs");
+ opts.AddLongOption("jobs_per_op", "Start YtRM with jobs");
+ opts.AddLongOption("vanilla_job", "Vanilla job biary");
+
+ opts.AddLongOption('u', "udfs", "UdfsPath");
+ opts.AddLongOption("enabled_failure_injector", "Enabled failure injections");
+ opts.AddLongOption("dump_stats", "Dump Statitics");
+
+ opts.AddLongOption("revision", "Revision for debug");
+ opts.AddLongOption("force_leader", "Disable leader election");
+ opts.AddLongOption("log_level", "Log Level");
+
+ TOptsParseResult res(&opts, argc, argv);
+
+ TString hostName, localAddress;
+
+ ui16 interconnectPort = res.Get<ui16>("port");
+ ui16 grpcPort = res.Get<ui16>("grpcport");
+ ui16 mbusPort = res.GetOrElse<ui16>("mbusport", 0);
+
+ auto logLevel = NYql::NProto::TLoggingConfig::INFO;
+ if (res.Has("log_level")) {
+ auto str = res.Get<TString>("log_level");
+ if (str == "TRACE") {
+ logLevel = NYql::NProto::TLoggingConfig::TRACE;
+ }
+ }
+
+ loggerConfig.SetAllComponentsLevel(logLevel);
+ NYql::NLog::InitLogger(loggerConfig, false);
+
+ NProto::TDqConfig::TYtCoordinator coordinatorConfig;
+
+ bool useYtCoordination = false;
+ if (res.Has("proxy")) {
+ useYtCoordination = true;
+ coordinatorConfig.SetPrefix(res.Get<TString>("ytprefix"));
+ coordinatorConfig.SetClusterName(res.Get<TString>("proxy"));
+ }
+
+ if (res.Has("yttoken")) {
+ coordinatorConfig.SetToken(res.Get<TString>("yttoken"));
+ }
+
+ if (res.Has("ytuser")) {
+ coordinatorConfig.SetUser(res.Get<TString>("ytuser"));
+ }
+
+ if (res.Has("enabled_failure_injector")) {
+ YQL_LOG(INFO) << "Enabled failure injector";
+ TFailureInjector::Activate();
+ }
+
+ if (res.Has("revision")) {
+ coordinatorConfig.SetRevision(res.Get<TString>("revision"));
+ }
+
+ if (useYtCoordination == false || res.Has("force_leader")) {
+ coordinatorConfig.SetLockType("dummy");
+ }
+
+ auto coordinator = CreateCoordiantionHelper(coordinatorConfig, NProto::TDqConfig::TScheduler(), "service_node", interconnectPort);
+ hostName = coordinator->GetHostname();
+ localAddress = coordinator->GetIp();
+
+ Cerr << hostName + ":" + ToString(localAddress) << Endl;
+
+ TMaybe<ui32> maybeNodeId;
+ if (useYtCoordination == false && !res.Has("id")) {
+ Cerr << "--id required!\n"; return -1;
+ }
+ if (res.Has("id")) {
+ maybeNodeId = res.Get<ui32>("id");
+ }
+ auto nodeId = coordinator->GetNodeId(
+ maybeNodeId,
+ {ToString(grpcPort)},
+ static_cast<ui32>(NDqs::ENodeIdLimits::MinServiceNodeId),
+ static_cast<ui32>(NDqs::ENodeIdLimits::MinServiceNodeId)+200,
+ {}
+ );
+
+ Cerr << "My nodeId: " << nodeId << Endl;
+
+ TLocalProcessKeyState<NActors::TActorActivityTag>& key = TLocalProcessKeyState<NActors::TActorActivityTag>::GetInstance();
+ Cerr << "ActorNames: " << key.GetCount() << Endl;
+ for (ui64 i = 0; i < key.GetCount(); i++) {
+ auto name = key.GetNameByIndex(i);
+ if (name && !name.StartsWith("Activity_")) {
+ Cerr << " " << name << Endl;
+ }
+ }
+
+ NYql::NDqs::TServiceNodeConfig config;
+ config.NodeId = nodeId;
+ config.InterconnectAddress = localAddress;
+ config.GrpcHostname = hostName;
+ config.Port = interconnectPort;
+ config.GrpcPort = grpcPort;
+ config.MbusPort = mbusPort;
+ config.NameserverFactory = [](const TIntrusivePtr<NActors::TTableNameserverSetup>& setup) {
+ return NYql::NDqs::CreateDynamicNameserver(setup);
+ };
+
+ YQL_LOG(INFO) << "Interconnect addr/port " << config.InterconnectAddress << ":" << config.Port;
+ YQL_LOG(INFO) << "GRPC addr/port " << config.GrpcHostname << ":" << config.GrpcPort;
+ YQL_LOG(INFO) << "MBus port " << config.MbusPort;
+
+ auto metricsRegistry = CreateMetricsRegistry(GetSensorsGroupFor(NSensorComponent::kDq));
+ auto serviceNode = TServiceNode(config, THREAD_PER_NODE, metricsRegistry);
+
+ auto statsCollector = CreateStatsCollector(5, *serviceNode.GetSetup(), metricsRegistry->GetSensors());
+
+ auto* actorSystem = serviceNode.StartActorSystem(GetAppData());
+
+ // push metrics from root group
+ auto metricsPusherId = NActors::TActorId() ;// actorSystem->Register(CreateMetricsPusher(CreateMetricsRegistry(GetSensorsRootGroup()),mbusPort));
+
+ if (res.Has("dump_stats")) {
+ metricsPusherId = actorSystem->Register(CreateMetricsPrinter(metricsRegistry->GetSensors()));
+ actorSystem->Register(statsCollector);
+ }
+
+ if (!maybeNodeId) {
+ coordinator->StartRegistrator(actorSystem);
+ coordinator->StartCleaner(actorSystem, {});
+ } else {
+ // just create yt wrapper
+ if (useYtCoordination) {
+ (void)coordinator->GetWrapper(actorSystem);
+ }
+ }
+
+ auto funcRegistry = CreateFunctionRegistry(&NYql::NBacktrace::KikimrBackTrace, NKikimr::NMiniKQL::CreateBuiltinRegistry(), false, TVector<TString>());
+ TDqTaskPreprocessorFactoryCollection dqTaskPreprocessorFactories = {
+ NDq::CreateYtDqTaskPreprocessorFactory(false, funcRegistry)
+ };
+ serviceNode.StartService(dqTaskPreprocessorFactories);
+
+ TVector<NActors::TActorId> ids;
+ TVector<TResourceManagerOptions> uploadResourcesOptions;
+ int jobs = res.GetOrElse<int>("remote_jobs", 0);
+ int jobsPerOperation = res.GetOrElse<int>("jobs_per_op", 2);
+ if (useYtCoordination) {
+ coordinatorConfig = coordinator->GetConfig();
+ TResourceManagerOptions options;
+ options.YtBackend.SetClusterName(coordinatorConfig.GetClusterName());
+ options.YtBackend.SetUser(coordinatorConfig.GetUser());
+ options.YtBackend.SetToken(coordinatorConfig.GetToken());
+ options.YtBackend.SetMemoryLimit(16000000000LL);
+ options.YtBackend.SetPrefix(coordinatorConfig.GetPrefix());
+ options.YtBackend.SetUploadPrefix(coordinatorConfig.GetPrefix());
+ options.YtBackend.SetMinNodeId(static_cast<ui32>(NDqs::ENodeIdLimits::MinWorkerNodeId));
+ options.YtBackend.SetMaxNodeId(static_cast<ui32>(NDqs::ENodeIdLimits::MinWorkerNodeId)+100);
+
+
+ options.YtBackend.SetMaxJobs(jobs);
+ if (jobsPerOperation > 0) {
+ options.YtBackend.SetJobsPerOperation(jobsPerOperation);
+ }
+
+ if (jobs > 0) {
+ TResourceFile vanilla(res.Get<TString>("vanilla_job"));
+ vanilla.RemoteFileName = "bin/" + ToString(GetProgramCommitId()) + "/" + vanilla.LocalFileName.substr(vanilla.LocalFileName.rfind('/')+1);
+ options.Files.push_back(vanilla);
+ for (const auto& r : GetFiles(res.GetOrElse("udfs", ""), res.Get<TString>("vanilla_job") + ".lite")) {
+ if (r.GetObjectType() == Yql::DqsProto::TFile::EEXE_FILE) {
+ TResourceFile f(r.GetLocalPath());
+ f.RemoteFileName = "bin/" + r.GetObjectId() + "/" + r.GetName();
+ options.Files.push_back(f);
+ } else {
+ TResourceFile f(r.GetLocalPath());
+ f.RemoteFileName = "udfs/" + r.GetObjectId();
+ options.Files.push_back(f);
+ }
+ }
+
+ // uploader
+ options.UploadPrefix = options.YtBackend.GetUploadPrefix();
+ options.LockName = TString("ytuploader.") + options.YtBackend.GetClusterName();
+ // don't start uploader for local-yt
+ if (options.YtBackend.GetClusterName().find("localhost") != 0) {
+ ids.push_back(actorSystem->Register(CreateResourceUploader(options, coordinator)));
+ }
+ {
+ // bin cleaner
+ options.KeepFirst = 5;
+ options.UploadPrefix = options.YtBackend.GetUploadPrefix() + "/bin";
+ ids.push_back(actorSystem->Register(CreateResourceCleaner(options, coordinator)));
+ }
+ {
+ // udf cleaner
+ options.KeepFirst = 500;
+ options.DropBefore = TDuration::Days(7);
+ options.UploadPrefix = options.YtBackend.GetUploadPrefix() + "/udfs";
+ ids.push_back(actorSystem->Register(CreateResourceCleaner(options, coordinator)));
+ }
+ {
+ // temporary locks
+ options.KeepFilter = options.YtBackend.GetClusterName(); // don't remove locks with `ClusterName` in LockName
+ options.DropBefore = TDuration::Hours(1);
+ options.UploadPrefix = options.YtBackend.GetPrefix() + "/locks";
+ ids.push_back(actorSystem->Register(CreateResourceCleaner(options, coordinator)));
+ }
+
+ // rm manager
+ options.Files.clear();
+ vanilla.RemoteFileName = vanilla.LocalFileName.substr(vanilla.LocalFileName.rfind('/')+1);
+ options.Files.push_back(vanilla);
+ options.UploadPrefix = options.YtBackend.GetUploadPrefix() + "/bin/" + ToString(GetProgramCommitId());
+ options.LockName = TString("ytrm.") + options.YtBackend.GetClusterName();
+ options.Counters = metricsRegistry->GetSensors()->GetSubgroup("counters", "ytrm");
+ ids.push_back(actorSystem->Register(CreateResourceManager(options, coordinator)));
+ }
+
+ options.UploadPrefix = options.YtBackend.GetUploadPrefix();
+ options.Files.clear();
+ uploadResourcesOptions.push_back(options);
+ }
+
+ coordinator->StartGlobalWorker(actorSystem, uploadResourcesOptions, metricsRegistry);
+
+ signal(SIGINT, &OnTerminate);
+ signal(SIGTERM, &OnTerminate);
+
+ auto future = ShouldContinue.GetFuture();
+ future.Wait();
+
+ for (auto id : ids) {
+ actorSystem->Send(id, new NActors::TEvents::TEvPoison);
+ }
+ actorSystem->Send(NDqs::MakeWorkerManagerActorID(nodeId), new NActors::TEvents::TEvPoison);
+ actorSystem->Send(metricsPusherId, new NActors::TEvents::TEvPoison);
+
+ coordinator->Stop(actorSystem);
+
+ // TODO: remove this
+ Sleep(TDuration::Seconds(5));
+
+ serviceNode.Stop();
+ NYT::Shutdown();
+
+ return 0;
+}
diff --git a/ydb/library/yql/tools/dq/service_node/ya.make b/ydb/library/yql/tools/dq/service_node/ya.make
new file mode 100644
index 0000000000..730bc565e4
--- /dev/null
+++ b/ydb/library/yql/tools/dq/service_node/ya.make
@@ -0,0 +1,33 @@
+IF (NOT OS_WINDOWS)
+ PROGRAM()
+
+ PEERDIR(
+ library/cpp/getopt
+ yt/cpp/mapreduce/client
+ ydb/library/yql/sql/pg
+ ydb/library/yql/parser/pg_wrapper
+ ydb/library/yql/public/udf/service/exception_policy
+ ydb/library/yql/utils/failure_injector
+ ydb/library/yql/utils/log
+ ydb/library/yql/utils/log/proto
+ ydb/library/yql/providers/dq/provider
+ ydb/library/yql/providers/dq/worker_manager/interface
+ ydb/library/yql/minikql/invoke_builtins/llvm
+ ydb/library/yql/utils/backtrace
+ ydb/library/yql/providers/dq/service
+ ydb/library/yql/providers/dq/metrics
+ ydb/library/yql/providers/dq/stats_collector
+ ydb/library/yql/providers/yt/dq_task_preprocessor
+ ydb/library/yql/providers/dq/global_worker_manager
+ ydb/library/yql/providers/dq/actors/yt
+ yt/yt/client
+ )
+
+ YQL_LAST_ABI_VERSION()
+
+ SRCS(
+ main.cpp
+ )
+
+ END()
+ENDIF()
diff --git a/ydb/library/yql/tools/dq/worker_node/CMakeLists.darwin-x86_64.txt b/ydb/library/yql/tools/dq/worker_node/CMakeLists.darwin-x86_64.txt
new file mode 100644
index 0000000000..7a8db69ea2
--- /dev/null
+++ b/ydb/library/yql/tools/dq/worker_node/CMakeLists.darwin-x86_64.txt
@@ -0,0 +1,64 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_executable(worker_node)
+target_compile_options(worker_node PRIVATE
+ -DUSE_CURRENT_UDF_ABI_VERSION
+)
+target_link_libraries(worker_node PUBLIC
+ contrib-libs-cxxsupp
+ yutil
+ library-cpp-cpuid_check
+ client-ydb_persqueue_public-codecs
+ library-cpp-getopt
+ cpp-mapreduce-client
+ dq-actors-compute
+ yql-dq-comp_nodes
+ dq-integration-transform
+ yql-dq-transform
+ minikql-comp_nodes-llvm
+ providers-clickhouse-actors
+ providers-common-comp_nodes
+ providers-dq-runtime
+ providers-dq-service
+ providers-dq-metrics
+ providers-dq-stats_collector
+ providers-dq-task_runner
+ providers-pq-async_io
+ providers-pq-proto
+ providers-s3-actors
+ providers-ydb-actors
+ providers-ydb-comp_nodes
+ udf-service-exception_policy
+ library-yql-utils
+ yql-utils-log
+ utils-log-proto
+ yql-utils-failure_injector
+ yql-utils-backtrace
+ yt-comp_nodes-dq
+ providers-yt-mkql_dq
+ dq-actors-yt
+ providers-dq-global_worker_manager
+ yql-sql-pg
+ yql-parser-pg_wrapper
+)
+target_link_options(worker_node PRIVATE
+ -Wl,-platform_version,macos,11.0,11.0
+ -fPIC
+ -fPIC
+ -framework
+ CoreFoundation
+)
+target_sources(worker_node PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/tools/dq/worker_node/main.cpp
+)
+target_allocator(worker_node
+ system_allocator
+)
+vcs_info(worker_node)
diff --git a/ydb/library/yql/tools/dq/worker_node/CMakeLists.linux-aarch64.txt b/ydb/library/yql/tools/dq/worker_node/CMakeLists.linux-aarch64.txt
new file mode 100644
index 0000000000..11e2b042f9
--- /dev/null
+++ b/ydb/library/yql/tools/dq/worker_node/CMakeLists.linux-aarch64.txt
@@ -0,0 +1,68 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_executable(worker_node)
+target_compile_options(worker_node PRIVATE
+ -DUSE_CURRENT_UDF_ABI_VERSION
+)
+target_link_libraries(worker_node PUBLIC
+ contrib-libs-linux-headers
+ contrib-libs-cxxsupp
+ yutil
+ client-ydb_persqueue_public-codecs
+ library-cpp-getopt
+ cpp-mapreduce-client
+ dq-actors-compute
+ yql-dq-comp_nodes
+ dq-integration-transform
+ yql-dq-transform
+ minikql-comp_nodes-llvm
+ providers-clickhouse-actors
+ providers-common-comp_nodes
+ providers-dq-runtime
+ providers-dq-service
+ providers-dq-metrics
+ providers-dq-stats_collector
+ providers-dq-task_runner
+ providers-pq-async_io
+ providers-pq-proto
+ providers-s3-actors
+ providers-ydb-actors
+ providers-ydb-comp_nodes
+ udf-service-exception_policy
+ library-yql-utils
+ yql-utils-log
+ utils-log-proto
+ yql-utils-failure_injector
+ yql-utils-backtrace
+ yt-comp_nodes-dq
+ providers-yt-mkql_dq
+ dq-actors-yt
+ providers-dq-global_worker_manager
+ yql-sql-pg
+ yql-parser-pg_wrapper
+)
+target_link_options(worker_node PRIVATE
+ -ldl
+ -lrt
+ -Wl,--no-as-needed
+ -fPIC
+ -fPIC
+ -lpthread
+ -lrt
+ -ldl
+ -lutil
+)
+target_sources(worker_node PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/tools/dq/worker_node/main.cpp
+)
+target_allocator(worker_node
+ cpp-malloc-jemalloc
+)
+vcs_info(worker_node)
diff --git a/ydb/library/yql/tools/dq/worker_node/CMakeLists.linux-x86_64.txt b/ydb/library/yql/tools/dq/worker_node/CMakeLists.linux-x86_64.txt
new file mode 100644
index 0000000000..17c842dbb4
--- /dev/null
+++ b/ydb/library/yql/tools/dq/worker_node/CMakeLists.linux-x86_64.txt
@@ -0,0 +1,70 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_executable(worker_node)
+target_compile_options(worker_node PRIVATE
+ -DUSE_CURRENT_UDF_ABI_VERSION
+)
+target_link_libraries(worker_node PUBLIC
+ contrib-libs-linux-headers
+ contrib-libs-cxxsupp
+ yutil
+ library-cpp-cpuid_check
+ client-ydb_persqueue_public-codecs
+ library-cpp-getopt
+ cpp-mapreduce-client
+ dq-actors-compute
+ yql-dq-comp_nodes
+ dq-integration-transform
+ yql-dq-transform
+ minikql-comp_nodes-llvm
+ providers-clickhouse-actors
+ providers-common-comp_nodes
+ providers-dq-runtime
+ providers-dq-service
+ providers-dq-metrics
+ providers-dq-stats_collector
+ providers-dq-task_runner
+ providers-pq-async_io
+ providers-pq-proto
+ providers-s3-actors
+ providers-ydb-actors
+ providers-ydb-comp_nodes
+ udf-service-exception_policy
+ library-yql-utils
+ yql-utils-log
+ utils-log-proto
+ yql-utils-failure_injector
+ yql-utils-backtrace
+ yt-comp_nodes-dq
+ providers-yt-mkql_dq
+ dq-actors-yt
+ providers-dq-global_worker_manager
+ yql-sql-pg
+ yql-parser-pg_wrapper
+)
+target_link_options(worker_node PRIVATE
+ -ldl
+ -lrt
+ -Wl,--no-as-needed
+ -fPIC
+ -fPIC
+ -lpthread
+ -lrt
+ -ldl
+ -lutil
+)
+target_sources(worker_node PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/tools/dq/worker_node/main.cpp
+)
+target_allocator(worker_node
+ cpp-malloc-tcmalloc
+ libs-tcmalloc-no_percpu_cache
+)
+vcs_info(worker_node)
diff --git a/ydb/library/yql/tools/dq/worker_node/CMakeLists.txt b/ydb/library/yql/tools/dq/worker_node/CMakeLists.txt
new file mode 100644
index 0000000000..606ff46b4b
--- /dev/null
+++ b/ydb/library/yql/tools/dq/worker_node/CMakeLists.txt
@@ -0,0 +1,15 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+if (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND NOT HAVE_CUDA)
+ include(CMakeLists.linux-aarch64.txt)
+elseif (CMAKE_SYSTEM_NAME STREQUAL "Darwin" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64")
+ include(CMakeLists.darwin-x86_64.txt)
+elseif (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND NOT HAVE_CUDA)
+ include(CMakeLists.linux-x86_64.txt)
+endif()
diff --git a/ydb/library/yql/tools/dq/worker_node/main.cpp b/ydb/library/yql/tools/dq/worker_node/main.cpp
new file mode 100644
index 0000000000..7ba1ff412d
--- /dev/null
+++ b/ydb/library/yql/tools/dq/worker_node/main.cpp
@@ -0,0 +1,423 @@
+#include <ydb/library/yql/providers/dq/worker_manager/local_worker_manager.h>
+#include <ydb/library/yql/parser/pg_wrapper/interface/comp_factory.h>
+#include <ydb/library/yql/providers/dq/global_worker_manager/service_node_resolver.h>
+#include <ydb/library/yql/providers/dq/global_worker_manager/coordination_helper.h>
+#include <ydb/library/yql/providers/dq/actors/yt/yt_wrapper.h>
+#include <ydb/library/yql/providers/dq/actors/yt/worker_registrator.h>
+#include <ydb/library/yql/providers/dq/actors/yt/nodeid_cleaner.h>
+#include <ydb/library/yql/providers/dq/metrics/metrics_pusher.h>
+#include <ydb/library/yql/providers/dq/stats_collector/pool_stats_collector.h>
+#include <ydb/library/yql/providers/dq/actors/dynamic_nameserver.h>
+#include <ydb/library/yql/providers/dq/actors/execution_helpers.h>
+#include <ydb/library/yql/utils/bind_in_range.h>
+#include <library/cpp/digest/md5/md5.h>
+
+#include <ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io_factory.h>
+
+#include <ydb/library/yql/providers/dq/service/interconnect_helpers.h>
+#include <ydb/library/yql/providers/dq/runtime/file_cache.h>
+#include <ydb/library/yql/providers/dq/runtime/task_command_executor.h>
+#include <ydb/library/yql/providers/dq/task_runner/tasks_runner_pipe.h>
+#include <ydb/library/yql/providers/dq/task_runner/tasks_runner_local.h>
+#include <ydb/library/yql/dq/integration/transform/yql_dq_task_transform.h>
+#include <ydb/library/yql/providers/pq/async_io/dq_pq_read_actor.h>
+#include <ydb/library/yql/providers/pq/async_io/dq_pq_write_actor.h>
+#include <ydb/library/yql/providers/ydb/actors/yql_ydb_source_factory.h>
+#include <ydb/library/yql/providers/yt/comp_nodes/dq/dq_yt_factory.h>
+#include <ydb/library/yql/providers/yt/mkql_dq/yql_yt_dq_transform.h>
+#include <ydb/library/yql/providers/common/comp_nodes/yql_factory.h>
+#include <ydb/library/yql/providers/ydb/comp_nodes/yql_ydb_factory.h>
+#include <ydb/library/yql/providers/ydb/comp_nodes/yql_ydb_dq_transform.h>
+
+#include <ydb/library/yql/providers/s3/actors/yql_s3_sink_factory.h>
+#include <ydb/library/yql/providers/s3/actors/yql_s3_source_factory.h>
+#include <ydb/library/yql/providers/ydb/actors/yql_ydb_source_factory.h>
+#include <ydb/library/yql/providers/clickhouse/actors/yql_ch_source_factory.h>
+
+#include <ydb/library/yql/minikql/comp_nodes/mkql_factories.h>
+#include <ydb/library/yql/minikql/invoke_builtins/mkql_builtins.h>
+#include <ydb/library/yql/minikql/mkql_stats_registry.h>
+#include <ydb/library/yql/utils/backtrace/backtrace.h>
+#include <ydb/library/yql/utils/log/log.h>
+#include <ydb/library/yql/utils/log/proto/logger_config.pb.h>
+#include <ydb/library/yql/utils/log/tls_backend.h>
+#include <ydb/library/yql/utils/failure_injector/failure_injector.h>
+#include <ydb/library/yql/dq/comp_nodes/yql_common_dq_factory.h>
+#include <ydb/library/yql/dq/transform/yql_common_dq_transform.h>
+
+#include <yt/yt/core/misc/shutdown.h>
+
+#include <library/cpp/getopt/last_getopt.h>
+#include <library/cpp/threading/future/future.h>
+#include <library/cpp/svnversion/svnversion.h>
+
+#include <yt/yt/core/actions/invoker.h>
+#include <yt/yt/core/concurrency/action_queue.h>
+#include <yt/yt/core/concurrency/thread_pool.h>
+
+#include <util/generic/scope.h>
+#include <util/folder/path.h>
+#include <util/system/env.h>
+#include <util/system/getpid.h>
+#include <util/system/fs.h>
+
+constexpr ui32 THREAD_PER_NODE = 8;
+
+using namespace NYql;
+using namespace NYql::NDq;
+using namespace NYql::NDqs;
+
+using namespace NActors;
+
+static NThreading::TPromise<void> ShouldContinue = NThreading::NewPromise<void>();
+
+namespace {
+void OnTerminate(int) {
+ ShouldContinue.SetValue();
+}
+
+class TSerializedTaskRunnerInvoker: public ITaskRunnerInvoker {
+public:
+ TSerializedTaskRunnerInvoker(const NYT::IInvokerPtr& invoker)
+ : Invoker(NYT::NConcurrency::CreateSerializedInvoker(invoker))
+ { }
+
+ void Invoke(const std::function<void(void)>& f) override {
+ Invoker->Invoke(BIND(f));
+ }
+
+private:
+ const NYT::IInvokerPtr Invoker;
+};
+
+class TConcurrentInvokerFactory: public ITaskRunnerInvokerFactory {
+public:
+ TConcurrentInvokerFactory(int capacity)
+ : ThreadPool(NYT::NConcurrency::CreateThreadPool(capacity, "WorkerActor"))
+ { }
+
+ ITaskRunnerInvoker::TPtr Create() override {
+ return new TSerializedTaskRunnerInvoker(ThreadPool->GetInvoker());
+ }
+
+ NYT::NConcurrency::IThreadPoolPtr ThreadPool;
+};
+
+NDq::IDqAsyncIoFactory::TPtr CreateAsyncIoFactory(const NYdb::TDriver& driver, IHTTPGateway::TPtr httpGateway) {
+ auto factory = MakeIntrusive<NYql::NDq::TDqAsyncIoFactory>();
+ RegisterDqPqReadActorFactory(*factory, driver, nullptr);
+ RegisterYdbReadActorFactory(*factory, driver, nullptr);
+ RegisterS3ReadActorFactory(*factory, nullptr, httpGateway);
+ RegisterClickHouseReadActorFactory(*factory, nullptr, httpGateway);
+
+ RegisterDqPqWriteActorFactory(*factory, driver, nullptr);
+ RegisterS3WriteActorFactory(*factory, nullptr, httpGateway);
+ return factory;
+}
+
+}
+
+int main(int argc, char** argv) {
+
+ const auto driverConfig = NYdb::TDriverConfig().SetLog(CreateLogBackend("cerr"));
+ NYdb::TDriver driver(driverConfig);
+
+ Y_DEFER {
+ driver.Stop(true);
+ };
+
+ NKikimr::NMiniKQL::IStatsRegistryPtr statsRegistry = NKikimr::NMiniKQL::CreateDefaultStatsRegistry();
+
+ auto dqCompFactory = NKikimr::NMiniKQL::GetCompositeWithBuiltinFactory({
+ NYql::GetCommonDqFactory(),
+ NYql::GetDqYtFactory(statsRegistry.Get()),
+ NYql::GetDqYdbFactory(driver),
+ NKikimr::NMiniKQL::GetYqlFactory(),
+ NYql::GetPgFactory()
+ });
+
+ auto dqTaskTransformFactory = NYql::CreateCompositeTaskTransformFactory({
+ NYql::CreateCommonDqTaskTransformFactory(),
+ NYql::CreateYtDqTaskTransformFactory(),
+ NYql::CreateYdbDqTaskTransformFactory()
+ });
+
+ auto patternCache = std::make_shared<NKikimr::NMiniKQL::TComputationPatternLRUCache>(200_MB);
+
+ if (argc > 1 && !strcmp(argv[1], "tasks_runner_proxy")) {
+ NYql::NBacktrace::RegisterKikimrFatalActions();
+ //NYql::NBacktrace::EnableKikimrSymbolize(); // symbolize in gateway
+
+ return NTaskRunnerProxy::CreateTaskCommandExecutor(dqCompFactory, dqTaskTransformFactory, statsRegistry.Get(), true);
+ }
+
+ using namespace NLastGetopt;
+ TOpts opts = TOpts::Default();
+ opts.AddHelpOption();
+ opts.AddLongOption('i', "id", "Node ID");
+ opts.AddLongOption('p', "port", "Port");
+ opts.AddLongOption('u', "udfs", "UdfsPath");
+ opts.AddLongOption("config", "Config");
+ opts.AddLongOption("service_addr", "Service Addr (host/port pair)");
+
+ opts.AddLongOption("ytprefix", "Yt prefix");
+ opts.AddLongOption("proxy", "Yt proxy");
+ opts.AddLongOption("workers", "Workers");
+ opts.AddLongOption("threads", "Threads");
+ opts.AddLongOption("yttoken", "Yt token");
+ opts.AddLongOption("ytuser", "Yt user");
+ opts.AddLongOption("enabled_failure_injector", "Enabled failure injections");
+ opts.AddLongOption("revision", "Revision");
+ opts.AddLongOption("heartbeat", "HeartbeatPeriod");
+ opts.AddLongOption("solomon", "Solomon Token");
+ opts.AddLongOption("print_metrics", "Print Metrics");
+ opts.AddLongOption("announce_cluster_name", "Send this name in pings");
+ opts.AddLongOption("disable_pipe", "Disable pipe");
+ opts.AddLongOption("log_level", "Log Level");
+
+ ui32 threads = THREAD_PER_NODE;
+ TString host;
+ TString ip;
+ TString solomonToken;
+ int capacity = 1;
+ int heartbeatPeriodMs = 100;
+
+ TOptsParseResult res(&opts, argc, argv);
+
+ auto loggerConfig = NYql::NProto::TLoggingConfig();
+ auto logLevel = NYql::NProto::TLoggingConfig::INFO;
+ if (res.Has("log_level")) {
+ auto str = res.Get<TString>("log_level");
+ if (str == "TRACE") {
+ logLevel = NYql::NProto::TLoggingConfig::TRACE;
+ }
+ }
+
+ loggerConfig.SetAllComponentsLevel(logLevel);
+
+ NYql::NLog::InitLogger(loggerConfig, false);
+
+ ui16 startPort = res.Get<ui16>("port");
+ if (res.Has("heartbeat")) {
+ heartbeatPeriodMs = res.Get<int>("heartbeat");
+ }
+ if (res.Has("threads")) {
+ threads = res.Get<int>("threads");
+ }
+
+ NProto::TDqConfig::TYtCoordinator coordinatorConfig;
+ bool useYtCoordination = false;
+ if (res.Has("proxy")) {
+ coordinatorConfig.SetPrefix(res.Get<TString>("ytprefix"));
+ coordinatorConfig.SetClusterName(res.Get<TString>("proxy"));
+ useYtCoordination = true;
+ }
+ coordinatorConfig.SetHeartbeatPeriodMs(heartbeatPeriodMs);
+
+ if (!useYtCoordination) {
+ coordinatorConfig.SetLockType("dummy");
+ }
+
+ if (res.Has("yttoken")) {
+ coordinatorConfig.SetToken(res.Get<TString>("yttoken"));
+ }
+
+ if (res.Has("workers")) {
+ capacity = res.Get<int>("workers");
+ }
+
+ if (res.Has("ytuser")) {
+ coordinatorConfig.SetUser(res.Get<TString>("ytuser"));
+ }
+
+ if (res.Has("revision")) {
+ coordinatorConfig.SetRevision(res.Get<TString>("revision"));
+ YQL_LOG(INFO) << "Set revision '" << coordinatorConfig.GetRevision() << "'";
+ }
+
+ if (res.Has("solomon")) {
+ solomonToken = res.Get<TString>("solomon");
+ }
+
+ if (res.Has("enabled_failure_injector")) {
+ YQL_LOG(INFO) << "Enabled failure injector";
+ TFailureInjector::Activate();
+ }
+
+ TRangeWalker<int> portWalker(startPort, startPort+100);
+ auto ports = BindInRange(portWalker);
+
+ auto coordinator = CreateCoordiantionHelper(coordinatorConfig, NProto::TDqConfig::TScheduler(), "worker_node", ports[1].Addr.GetPort());
+ coordinatorConfig = coordinator->GetConfig();
+
+ host = coordinator->GetHostname();
+ ip = coordinator->GetIp();
+
+ NProto::TDqConfig::TYtBackend backendConfig;
+ backendConfig.SetUploadPrefix(coordinatorConfig.GetPrefix());
+ backendConfig.SetUser(coordinatorConfig.GetUser());
+ backendConfig.SetToken(coordinatorConfig.GetToken());
+ backendConfig.SetClusterName(coordinatorConfig.GetClusterName());
+
+ TString fileCacheDir = "./file_cache123";
+ IFileCache::TPtr fileCache = new TFileCache(fileCacheDir, 16000000000L);
+
+ Cerr << host + ":" + ToString(ip) << Endl;
+
+ TMaybe<ui32> maybeNodeId;
+ if (res.Has("id")) {
+ maybeNodeId = res.Get<ui32>("id");
+ }
+ auto nodeId = coordinator->GetNodeId(
+ maybeNodeId,
+ {},
+ static_cast<ui32>(NDqs::ENodeIdLimits::MinWorkerNodeId),
+ static_cast<ui32>(NDqs::ENodeIdLimits::MaxWorkerNodeId),
+ {}
+ );
+
+ Cerr << "My nodeId: " << nodeId << Endl;
+
+ TString udfsDir = res.GetOrElse("udfs", "");
+
+ try {
+ auto dqSensors = GetSensorsGroupFor(NSensorComponent::kDq);
+ THolder<NActors::TActorSystemSetup> setup;
+ TIntrusivePtr<NActors::NLog::TSettings> logSettings;
+ std::tie(setup, logSettings) = BuildActorSetup(
+ nodeId,
+ ip,
+ ports[1].Addr.GetPort(),
+ ports[1].Socket->Release(),
+ {threads},
+ dqSensors,
+ [](const TIntrusivePtr<NActors::TTableNameserverSetup>& setup) {
+ return NYql::NDqs::CreateDynamicNameserver(setup);
+ },
+ Nothing());
+
+ auto statsCollector = CreateStatsCollector(5, *setup.Get(), dqSensors);
+
+ TVector<TString> UDFsPaths;
+ if (!udfsDir.empty()) {
+ NKikimr::NMiniKQL::FindUdfsInDir(udfsDir, &UDFsPaths);
+ }
+ auto functionRegistry = NKikimr::NMiniKQL::CreateFunctionRegistry(
+ &NYql::NBacktrace::KikimrBackTrace, NKikimr::NMiniKQL::CreateBuiltinRegistry(), false, UDFsPaths)->Clone();
+
+ for (auto& m : functionRegistry->GetAllModuleNames()) {
+ auto path = *functionRegistry->FindUdfPath(m);
+ Cout << m << '\t' << path << Endl;
+ TString objectId = MD5::Calc(path); // Production env uses MD5::File as an Id. For testing purpose we use fast version.
+ if (!fileCache->Contains(objectId)) {
+ TString newPath = fileCacheDir + "/" + objectId;
+ NFs::Copy(path, newPath);
+ Cout << "Add " << newPath << " " << objectId << "\n";
+ fileCache->AddFile(newPath, objectId);
+ }
+ }
+
+ NKikimr::NMiniKQL::FillStaticModules(*functionRegistry);
+
+ auto actorSystem = MakeHolder<NActors::TActorSystem>(setup, nullptr, logSettings);
+
+ actorSystem->Start();
+ actorSystem->Register(statsCollector);
+
+ if (!maybeNodeId) {
+ coordinator->StartRegistrator(actorSystem.Get());
+ }
+
+ TVector<TString> hostPort;
+ if (res.Has("service_addr")) {
+ TString addresses = res.Get<TString>("service_addr");
+ Split(addresses, ",", hostPort);
+ }
+
+/*
+ if (solomonToken) {
+ TSolomonAgentConfig config = TSolomonAgentConfig()
+ .WithServer("https://solomon.yandex.net")
+ .WithPath("/api/v2/push")
+ .WithPort(443)
+ .WithProject("yql")
+ .WithService("dq_vanilla")
+ .WithCluster("test")
+ .WithHost("NodeId-" + ToString(nodeId))
+ .WithCalcDerivs(true)
+ .WithAuthorizaton("OAuth " + solomonToken)
+ .WithCommonLabels({{"ytcluster", "test_cluster"}})
+ ;
+ actorSystem->Register(CreateMetricsPusher(dqSensors, config));
+ }
+*/
+
+ if (res.Has("print_metrics")) {
+ actorSystem->Register(CreateMetricsPrinter(dqSensors));
+ }
+
+ auto resolver = coordinator->CreateServiceNodeResolver(actorSystem.Get(), hostPort);
+
+ backendConfig.SetWorkerCapacity(capacity);
+ TResourceManagerOptions rmOptions;
+ rmOptions.Capabilities = Yql::DqsProto::RegisterNodeRequest::ECAP_COMPUTE_ACTOR;
+ rmOptions.YtBackend = backendConfig;
+ rmOptions.FileCache = fileCache;
+ rmOptions.TmpDir = "./tmp";
+
+ if (res.Has("announce_cluster_name")) {
+ rmOptions.AnnounceClusterName = res.Get<TString>("announce_cluster_name");
+ Cerr << "Announce as '" << backendConfig.GetClusterName() << "'\n";
+ }
+
+ actorSystem->Register(coordinator->CreateServiceNodePinger(resolver, rmOptions));
+
+ NYql::NTaskRunnerProxy::TPipeFactoryOptions pfOptions;
+ pfOptions.ExecPath = TFsPath(argv[0]).RealPath().GetPath();
+ pfOptions.FileCache = fileCache;
+ if (res.Has("revision")) {
+ pfOptions.Revision = coordinatorConfig.GetRevision();
+ }
+
+ NYql::NDqs::TLocalWorkerManagerOptions lwmOptions;
+ bool disablePipe = res.Has("disable_pipe");
+ NKikimr::NMiniKQL::IStatsRegistryPtr statsRegistry = NKikimr::NMiniKQL::CreateDefaultStatsRegistry();
+
+ lwmOptions.Factory = disablePipe
+ ? NTaskRunnerProxy::CreateFactory(functionRegistry.Get(), dqCompFactory, dqTaskTransformFactory, patternCache, true)
+ : NTaskRunnerProxy::CreatePipeFactory(pfOptions);
+ lwmOptions.AsyncIoFactory = CreateAsyncIoFactory(driver, IHTTPGateway::Make());
+ lwmOptions.FunctionRegistry = functionRegistry.Get();
+ lwmOptions.RuntimeData = coordinator->GetRuntimeData();
+ lwmOptions.TaskRunnerInvokerFactory = disablePipe
+ ? TTaskRunnerInvokerFactory::TPtr(new NDqs::TTaskRunnerInvokerFactory())
+ : TTaskRunnerInvokerFactory::TPtr(new TConcurrentInvokerFactory(2*capacity));
+ lwmOptions.TaskRunnerActorFactory = disablePipe
+ ? NDq::NTaskRunnerActor::CreateLocalTaskRunnerActorFactory([=](const NDq::TDqTaskSettings& task, const NDq::TLogFunc& )
+ {
+ return lwmOptions.Factory->Get(task);
+ })
+ : NTaskRunnerActor::CreateTaskRunnerActorFactory(lwmOptions.Factory, lwmOptions.TaskRunnerInvokerFactory);
+ lwmOptions.ComputeActorOwnsCounters = true;
+ auto resman = NDqs::CreateLocalWorkerManager(lwmOptions);
+
+ auto workerManagerActorId = actorSystem->Register(resman);
+ actorSystem->RegisterLocalService(MakeWorkerManagerActorID(nodeId), workerManagerActorId);
+
+ auto endFuture = ShouldContinue.GetFuture();
+
+ signal(SIGINT, &OnTerminate);
+ signal(SIGTERM, &OnTerminate);
+
+
+ endFuture.Wait();
+
+ actorSystem->Stop();
+ } catch (...) {
+ Cerr << CurrentExceptionMessage() << Endl;
+ return -1;
+ }
+
+ NYT::Shutdown();
+ return 0;
+}
diff --git a/ydb/library/yql/tools/dq/worker_node/ya.make b/ydb/library/yql/tools/dq/worker_node/ya.make
new file mode 100644
index 0000000000..216572850a
--- /dev/null
+++ b/ydb/library/yql/tools/dq/worker_node/ya.make
@@ -0,0 +1,46 @@
+IF (NOT OS_WINDOWS)
+ PROGRAM()
+
+ PEERDIR(
+ ydb/public/sdk/cpp/client/ydb_persqueue_public/codecs
+ library/cpp/getopt
+ yt/cpp/mapreduce/client
+ ydb/library/yql/dq/actors/compute
+ ydb/library/yql/dq/comp_nodes
+ ydb/library/yql/dq/integration/transform
+ ydb/library/yql/dq/transform
+ ydb/library/yql/minikql/comp_nodes/llvm
+ ydb/library/yql/providers/clickhouse/actors
+ ydb/library/yql/providers/common/comp_nodes
+ ydb/library/yql/providers/dq/runtime
+ ydb/library/yql/providers/dq/service
+ ydb/library/yql/providers/dq/metrics
+ ydb/library/yql/providers/dq/stats_collector
+ ydb/library/yql/providers/dq/task_runner
+ ydb/library/yql/providers/pq/async_io
+ ydb/library/yql/providers/pq/proto
+ ydb/library/yql/providers/s3/actors
+ ydb/library/yql/providers/ydb/actors
+ ydb/library/yql/providers/ydb/comp_nodes
+ ydb/library/yql/public/udf/service/exception_policy
+ ydb/library/yql/utils
+ ydb/library/yql/utils/log
+ ydb/library/yql/utils/log/proto
+ ydb/library/yql/utils/failure_injector
+ ydb/library/yql/utils/backtrace
+ ydb/library/yql/providers/yt/comp_nodes/dq
+ ydb/library/yql/providers/yt/mkql_dq
+ ydb/library/yql/providers/dq/actors/yt
+ ydb/library/yql/providers/dq/global_worker_manager
+ ydb/library/yql/sql/pg
+ ydb/library/yql/parser/pg_wrapper
+ )
+
+ YQL_LAST_ABI_VERSION()
+
+ SRCS(
+ main.cpp
+ )
+
+ END()
+ENDIF()
diff --git a/ydb/library/yql/tools/dq/ya.make b/ydb/library/yql/tools/dq/ya.make
new file mode 100644
index 0000000000..68eba94d95
--- /dev/null
+++ b/ydb/library/yql/tools/dq/ya.make
@@ -0,0 +1,4 @@
+RECURSE(
+ service_node
+ worker_node
+)
diff --git a/ydb/library/yql/tools/ya.make b/ydb/library/yql/tools/ya.make
index 3f5aa7a058..5ff8e2b273 100644
--- a/ydb/library/yql/tools/ya.make
+++ b/ydb/library/yql/tools/ya.make
@@ -1,6 +1,7 @@
RECURSE(
astdiff
dqrun
+ dq
mrjob
sql2yql
sql_formatter