diff options
author | aozeritsky <aozeritsky@ydb.tech> | 2023-10-24 23:12:06 +0300 |
---|---|---|
committer | aozeritsky <aozeritsky@ydb.tech> | 2023-10-24 23:41:41 +0300 |
commit | f497b52bb70f52b35c75ba4d18d66fd8904a16e2 (patch) | |
tree | ae38a081a0ac936ba6bfb64d7e77402e25eaf747 | |
parent | 58fc30125015d9e913d6ecc1f1a89007541ede1b (diff) | |
download | ydb-f497b52bb70f52b35c75ba4d18d66fd8904a16e2.tar.gz |
Introduce OSS versions of worker_node/service_node (dq testing utilities)
24 files changed, 1415 insertions, 6 deletions
diff --git a/.mapping.json b/.mapping.json index 5b4c5e3cb0..94d161bbc7 100644 --- a/.mapping.json +++ b/.mapping.json @@ -8366,12 +8366,28 @@ "ydb/library/yql/sql/v1/ut/CMakeLists.linux-x86_64.txt":"", "ydb/library/yql/sql/v1/ut/CMakeLists.txt":"", "ydb/library/yql/sql/v1/ut/CMakeLists.windows-x86_64.txt":"", + "ydb/library/yql/tools/CMakeLists.darwin-x86_64.txt":"", + "ydb/library/yql/tools/CMakeLists.linux-aarch64.txt":"", + "ydb/library/yql/tools/CMakeLists.linux-x86_64.txt":"", "ydb/library/yql/tools/CMakeLists.txt":"", + "ydb/library/yql/tools/CMakeLists.windows-x86_64.txt":"", "ydb/library/yql/tools/astdiff/CMakeLists.darwin-x86_64.txt":"", "ydb/library/yql/tools/astdiff/CMakeLists.linux-aarch64.txt":"", "ydb/library/yql/tools/astdiff/CMakeLists.linux-x86_64.txt":"", "ydb/library/yql/tools/astdiff/CMakeLists.txt":"", "ydb/library/yql/tools/astdiff/CMakeLists.windows-x86_64.txt":"", + "ydb/library/yql/tools/dq/CMakeLists.darwin-x86_64.txt":"", + "ydb/library/yql/tools/dq/CMakeLists.linux-aarch64.txt":"", + "ydb/library/yql/tools/dq/CMakeLists.linux-x86_64.txt":"", + "ydb/library/yql/tools/dq/CMakeLists.txt":"", + "ydb/library/yql/tools/dq/service_node/CMakeLists.darwin-x86_64.txt":"", + "ydb/library/yql/tools/dq/service_node/CMakeLists.linux-aarch64.txt":"", + "ydb/library/yql/tools/dq/service_node/CMakeLists.linux-x86_64.txt":"", + "ydb/library/yql/tools/dq/service_node/CMakeLists.txt":"", + "ydb/library/yql/tools/dq/worker_node/CMakeLists.darwin-x86_64.txt":"", + "ydb/library/yql/tools/dq/worker_node/CMakeLists.linux-aarch64.txt":"", + "ydb/library/yql/tools/dq/worker_node/CMakeLists.linux-x86_64.txt":"", + "ydb/library/yql/tools/dq/worker_node/CMakeLists.txt":"", "ydb/library/yql/tools/dqrun/CMakeLists.darwin-x86_64.txt":"", "ydb/library/yql/tools/dqrun/CMakeLists.linux-aarch64.txt":"", "ydb/library/yql/tools/dqrun/CMakeLists.linux-x86_64.txt":"", diff --git a/ydb/library/yql/tools/CMakeLists.darwin-x86_64.txt b/ydb/library/yql/tools/CMakeLists.darwin-x86_64.txt new file mode 100644 index 0000000000..61ab79236b --- /dev/null +++ b/ydb/library/yql/tools/CMakeLists.darwin-x86_64.txt @@ -0,0 +1,15 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + +add_subdirectory(astdiff) +add_subdirectory(dq) +add_subdirectory(dqrun) +add_subdirectory(mrjob) +add_subdirectory(sql2yql) +add_subdirectory(sql_formatter) +add_subdirectory(yqlrun) diff --git a/ydb/library/yql/tools/CMakeLists.linux-aarch64.txt b/ydb/library/yql/tools/CMakeLists.linux-aarch64.txt new file mode 100644 index 0000000000..61ab79236b --- /dev/null +++ b/ydb/library/yql/tools/CMakeLists.linux-aarch64.txt @@ -0,0 +1,15 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + +add_subdirectory(astdiff) +add_subdirectory(dq) +add_subdirectory(dqrun) +add_subdirectory(mrjob) +add_subdirectory(sql2yql) +add_subdirectory(sql_formatter) +add_subdirectory(yqlrun) diff --git a/ydb/library/yql/tools/CMakeLists.linux-x86_64.txt b/ydb/library/yql/tools/CMakeLists.linux-x86_64.txt new file mode 100644 index 0000000000..61ab79236b --- /dev/null +++ b/ydb/library/yql/tools/CMakeLists.linux-x86_64.txt @@ -0,0 +1,15 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + +add_subdirectory(astdiff) +add_subdirectory(dq) +add_subdirectory(dqrun) +add_subdirectory(mrjob) +add_subdirectory(sql2yql) +add_subdirectory(sql_formatter) +add_subdirectory(yqlrun) diff --git a/ydb/library/yql/tools/CMakeLists.txt b/ydb/library/yql/tools/CMakeLists.txt index bd1f049b04..f8b31df0c1 100644 --- a/ydb/library/yql/tools/CMakeLists.txt +++ b/ydb/library/yql/tools/CMakeLists.txt @@ -6,9 +6,12 @@ # original buildsystem will not be accepted. -add_subdirectory(astdiff) -add_subdirectory(dqrun) -add_subdirectory(mrjob) -add_subdirectory(sql2yql) -add_subdirectory(sql_formatter) -add_subdirectory(yqlrun) +if (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND NOT HAVE_CUDA) + include(CMakeLists.linux-aarch64.txt) +elseif (CMAKE_SYSTEM_NAME STREQUAL "Darwin" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64") + include(CMakeLists.darwin-x86_64.txt) +elseif (WIN32 AND CMAKE_SYSTEM_PROCESSOR STREQUAL "AMD64" AND NOT HAVE_CUDA) + include(CMakeLists.windows-x86_64.txt) +elseif (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND NOT HAVE_CUDA) + include(CMakeLists.linux-x86_64.txt) +endif() diff --git a/ydb/library/yql/tools/CMakeLists.windows-x86_64.txt b/ydb/library/yql/tools/CMakeLists.windows-x86_64.txt new file mode 100644 index 0000000000..bd1f049b04 --- /dev/null +++ b/ydb/library/yql/tools/CMakeLists.windows-x86_64.txt @@ -0,0 +1,14 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + +add_subdirectory(astdiff) +add_subdirectory(dqrun) +add_subdirectory(mrjob) +add_subdirectory(sql2yql) +add_subdirectory(sql_formatter) +add_subdirectory(yqlrun) diff --git a/ydb/library/yql/tools/dq/CMakeLists.darwin-x86_64.txt b/ydb/library/yql/tools/dq/CMakeLists.darwin-x86_64.txt new file mode 100644 index 0000000000..bb8870384d --- /dev/null +++ b/ydb/library/yql/tools/dq/CMakeLists.darwin-x86_64.txt @@ -0,0 +1,10 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + +add_subdirectory(service_node) +add_subdirectory(worker_node) diff --git a/ydb/library/yql/tools/dq/CMakeLists.linux-aarch64.txt b/ydb/library/yql/tools/dq/CMakeLists.linux-aarch64.txt new file mode 100644 index 0000000000..bb8870384d --- /dev/null +++ b/ydb/library/yql/tools/dq/CMakeLists.linux-aarch64.txt @@ -0,0 +1,10 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + +add_subdirectory(service_node) +add_subdirectory(worker_node) diff --git a/ydb/library/yql/tools/dq/CMakeLists.linux-x86_64.txt b/ydb/library/yql/tools/dq/CMakeLists.linux-x86_64.txt new file mode 100644 index 0000000000..bb8870384d --- /dev/null +++ b/ydb/library/yql/tools/dq/CMakeLists.linux-x86_64.txt @@ -0,0 +1,10 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + +add_subdirectory(service_node) +add_subdirectory(worker_node) diff --git a/ydb/library/yql/tools/dq/CMakeLists.txt b/ydb/library/yql/tools/dq/CMakeLists.txt new file mode 100644 index 0000000000..606ff46b4b --- /dev/null +++ b/ydb/library/yql/tools/dq/CMakeLists.txt @@ -0,0 +1,15 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + +if (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND NOT HAVE_CUDA) + include(CMakeLists.linux-aarch64.txt) +elseif (CMAKE_SYSTEM_NAME STREQUAL "Darwin" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64") + include(CMakeLists.darwin-x86_64.txt) +elseif (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND NOT HAVE_CUDA) + include(CMakeLists.linux-x86_64.txt) +endif() diff --git a/ydb/library/yql/tools/dq/service_node/CMakeLists.darwin-x86_64.txt b/ydb/library/yql/tools/dq/service_node/CMakeLists.darwin-x86_64.txt new file mode 100644 index 0000000000..a9eb732340 --- /dev/null +++ b/ydb/library/yql/tools/dq/service_node/CMakeLists.darwin-x86_64.txt @@ -0,0 +1,51 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_executable(service_node) +target_compile_options(service_node PRIVATE + -DUSE_CURRENT_UDF_ABI_VERSION +) +target_link_libraries(service_node PUBLIC + contrib-libs-cxxsupp + yutil + library-cpp-cpuid_check + library-cpp-getopt + cpp-mapreduce-client + yql-sql-pg + yql-parser-pg_wrapper + udf-service-exception_policy + yql-utils-failure_injector + yql-utils-log + utils-log-proto + providers-dq-provider + dq-worker_manager-interface + minikql-invoke_builtins-llvm + yql-utils-backtrace + providers-dq-service + providers-dq-metrics + providers-dq-stats_collector + providers-yt-dq_task_preprocessor + providers-dq-global_worker_manager + dq-actors-yt + yt-yt-client +) +target_link_options(service_node PRIVATE + -Wl,-platform_version,macos,11.0,11.0 + -fPIC + -fPIC + -framework + CoreFoundation +) +target_sources(service_node PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/library/yql/tools/dq/service_node/main.cpp +) +target_allocator(service_node + system_allocator +) +vcs_info(service_node) diff --git a/ydb/library/yql/tools/dq/service_node/CMakeLists.linux-aarch64.txt b/ydb/library/yql/tools/dq/service_node/CMakeLists.linux-aarch64.txt new file mode 100644 index 0000000000..822001577e --- /dev/null +++ b/ydb/library/yql/tools/dq/service_node/CMakeLists.linux-aarch64.txt @@ -0,0 +1,55 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_executable(service_node) +target_compile_options(service_node PRIVATE + -DUSE_CURRENT_UDF_ABI_VERSION +) +target_link_libraries(service_node PUBLIC + contrib-libs-linux-headers + contrib-libs-cxxsupp + yutil + library-cpp-getopt + cpp-mapreduce-client + yql-sql-pg + yql-parser-pg_wrapper + udf-service-exception_policy + yql-utils-failure_injector + yql-utils-log + utils-log-proto + providers-dq-provider + dq-worker_manager-interface + minikql-invoke_builtins-llvm + yql-utils-backtrace + providers-dq-service + providers-dq-metrics + providers-dq-stats_collector + providers-yt-dq_task_preprocessor + providers-dq-global_worker_manager + dq-actors-yt + yt-yt-client +) +target_link_options(service_node PRIVATE + -ldl + -lrt + -Wl,--no-as-needed + -fPIC + -fPIC + -lpthread + -lrt + -ldl + -lutil +) +target_sources(service_node PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/library/yql/tools/dq/service_node/main.cpp +) +target_allocator(service_node + cpp-malloc-jemalloc +) +vcs_info(service_node) diff --git a/ydb/library/yql/tools/dq/service_node/CMakeLists.linux-x86_64.txt b/ydb/library/yql/tools/dq/service_node/CMakeLists.linux-x86_64.txt new file mode 100644 index 0000000000..673a21b278 --- /dev/null +++ b/ydb/library/yql/tools/dq/service_node/CMakeLists.linux-x86_64.txt @@ -0,0 +1,57 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_executable(service_node) +target_compile_options(service_node PRIVATE + -DUSE_CURRENT_UDF_ABI_VERSION +) +target_link_libraries(service_node PUBLIC + contrib-libs-linux-headers + contrib-libs-cxxsupp + yutil + library-cpp-cpuid_check + library-cpp-getopt + cpp-mapreduce-client + yql-sql-pg + yql-parser-pg_wrapper + udf-service-exception_policy + yql-utils-failure_injector + yql-utils-log + utils-log-proto + providers-dq-provider + dq-worker_manager-interface + minikql-invoke_builtins-llvm + yql-utils-backtrace + providers-dq-service + providers-dq-metrics + providers-dq-stats_collector + providers-yt-dq_task_preprocessor + providers-dq-global_worker_manager + dq-actors-yt + yt-yt-client +) +target_link_options(service_node PRIVATE + -ldl + -lrt + -Wl,--no-as-needed + -fPIC + -fPIC + -lpthread + -lrt + -ldl + -lutil +) +target_sources(service_node PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/library/yql/tools/dq/service_node/main.cpp +) +target_allocator(service_node + cpp-malloc-tcmalloc + libs-tcmalloc-no_percpu_cache +) +vcs_info(service_node) diff --git a/ydb/library/yql/tools/dq/service_node/CMakeLists.txt b/ydb/library/yql/tools/dq/service_node/CMakeLists.txt new file mode 100644 index 0000000000..606ff46b4b --- /dev/null +++ b/ydb/library/yql/tools/dq/service_node/CMakeLists.txt @@ -0,0 +1,15 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + +if (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND NOT HAVE_CUDA) + include(CMakeLists.linux-aarch64.txt) +elseif (CMAKE_SYSTEM_NAME STREQUAL "Darwin" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64") + include(CMakeLists.darwin-x86_64.txt) +elseif (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND NOT HAVE_CUDA) + include(CMakeLists.linux-x86_64.txt) +endif() diff --git a/ydb/library/yql/tools/dq/service_node/main.cpp b/ydb/library/yql/tools/dq/service_node/main.cpp new file mode 100644 index 0000000000..fa7d0eea30 --- /dev/null +++ b/ydb/library/yql/tools/dq/service_node/main.cpp @@ -0,0 +1,384 @@ +#include <ydb/library/yql/providers/dq/metrics/metrics_pusher.h> +#include <ydb/library/yql/providers/dq/actors/yt/resource_manager.h> +#include <ydb/library/yql/providers/dq/actors/dynamic_nameserver.h> +#include <ydb/library/yql/providers/dq/stats_collector/pool_stats_collector.h> +#include <ydb/library/yql/providers/dq/service/interconnect_helpers.h> +#include <ydb/library/yql/providers/dq/service/service_node.h> +#include <ydb/library/yql/providers/dq/global_worker_manager/coordination_helper.h> +#include <ydb/library/yql/providers/dq/worker_manager/interface/events.h> +#include <ydb/library/yql/providers/yt/dq_task_preprocessor/yql_yt_dq_task_preprocessor.h> +#include <ydb/library/yql/utils/log/proto/logger_config.pb.h> +#include <ydb/library/yql/utils/log/log.h> +#include <ydb/library/yql/utils/log/tls_backend.h> +#include <ydb/library/yql/utils/failure_injector/failure_injector.h> +#include <ydb/library/yql/minikql/invoke_builtins/mkql_builtins.h> +#include <ydb/library/yql/utils/backtrace/backtrace.h> + +#include <yt/yt/core/misc/shutdown.h> +#include <yt/yt/client/api/client.h> + +#include <library/cpp/getopt/small/last_getopt.h> +#include <library/cpp/threading/future/future.h> +#include <library/cpp/digest/md5/md5.h> + + +#if 0 +# include <yt/yt/core/logging/config.h> +# include <yt/yt/core/logging/log_manager.h> +#endif + +constexpr ui32 THREAD_PER_NODE = 16; + +using namespace NYql; +using namespace NYql::NDqs; +using TFileResource = Yql::DqsProto::TFile; + +static NThreading::TPromise<void> ShouldContinue = NThreading::NewPromise<void>(); + +static void OnTerminate(int) { + ShouldContinue.TrySetValue(); +} + +// TODO: Merge with ydb/library/yql/providers/dq/provider/yql_dq_gateway.cpp#L28 +THashMap<TString, TString> Md5Cache; + +TString GetMd5(const TString& localPath) { + if (Md5Cache.contains(localPath)) { + return Md5Cache[localPath]; + } else { + auto blob = ::TBlob::FromFile(localPath); + TString digest; + + char buf[33] = {0}; + digest = MD5::Data(blob.Data(), blob.Size(), buf); + Md5Cache[localPath] = digest; + return digest; + } +} + +TVector<TFileResource> GetFiles(TString udfsPath, TString vanillaLitePath) { + TVector<TFileResource> files; + + if (!vanillaLitePath.empty()) { + TFileResource vanillaLite; + vanillaLite.SetLocalPath(vanillaLitePath); + vanillaLite.SetName(vanillaLitePath.substr(vanillaLitePath.rfind('/') + 1)); + vanillaLite.SetObjectType(Yql::DqsProto::TFile_EFileType_EEXE_FILE); + vanillaLite.SetObjectId(GetProgramCommitId()); + files.push_back(vanillaLite); + } + + if (!udfsPath.empty()) { + TVector<TString> tmp; + NKikimr::NMiniKQL::FindUdfsInDir(udfsPath, &tmp); + for (const auto& f : tmp) { + TFileResource r; + r.SetLocalPath(f); + r.SetObjectType(Yql::DqsProto::TFile_EFileType_EUDF_FILE); + r.SetObjectId(GetMd5(f)); + files.push_back(r); + } + } + return files; +} + +/* crutch */ +void* GetAppData() { + return nullptr; +} +/* */ + +// Simple usage: ./service_node --id 1 --port 31337 --grpcport 8080 +int main(int argc, char** argv) { + using namespace NLastGetopt; + + auto loggerConfig = NYql::NProto::TLoggingConfig(); + +#if 0 + auto logManager = NYT::NLogging::TLogManager::Get(); + + TString logConfig = " \ + { \ + \"rules\" = [ \ + { \ + \"min_level\" = \"debug\"; \ + \"writers\" = [ \ + \"debug\"; \ + ]; \ + \"exclude_categories\" = [ \ + \"Bus\"; \ + ]; \ + }; \ + ]; \ + \"writers\" = { \ + \"debug\" = { \ + \"type\" = \"stderr\"; \ + } \ + } \ + }"; + + auto ytLogConfigNode = NYT::NYTree::ConvertTo<NYT::NYTree::INodePtr>( + NYT::NYson::TYsonString( + logConfig.Data(), logConfig.Size(), NYT::NYson::EYsonType::Node)); + auto logManagerConfig = NYT::New<NYT::NLogging::TLogManagerConfig>(); + logManagerConfig->Load(ytLogConfigNode); + logManager->Configure(logManagerConfig); +#endif + TOpts opts = TOpts::Default(); + opts.AddHelpOption(); + opts.AddLongOption("id", "Entry node for service"); + opts.AddLongOption("workers", "Worker actors per worker node"); + + opts.AddLongOption("ytprefix", "Yt prefix"); + opts.AddLongOption("proxy", "Yt proxy"); + opts.AddLongOption("yttoken", "Yt token"); + opts.AddLongOption("ytuser", "Yt user"); + + opts.AddLongOption("port", "Port"); + opts.AddLongOption("grpcport", "Grpc Port"); + opts.AddLongOption("mbusport", "Yql worker mbus port"); + + opts.AddLongOption("remote_jobs", "Start YtRM with jobs"); + opts.AddLongOption("jobs_per_op", "Start YtRM with jobs"); + opts.AddLongOption("vanilla_job", "Vanilla job biary"); + + opts.AddLongOption('u', "udfs", "UdfsPath"); + opts.AddLongOption("enabled_failure_injector", "Enabled failure injections"); + opts.AddLongOption("dump_stats", "Dump Statitics"); + + opts.AddLongOption("revision", "Revision for debug"); + opts.AddLongOption("force_leader", "Disable leader election"); + opts.AddLongOption("log_level", "Log Level"); + + TOptsParseResult res(&opts, argc, argv); + + TString hostName, localAddress; + + ui16 interconnectPort = res.Get<ui16>("port"); + ui16 grpcPort = res.Get<ui16>("grpcport"); + ui16 mbusPort = res.GetOrElse<ui16>("mbusport", 0); + + auto logLevel = NYql::NProto::TLoggingConfig::INFO; + if (res.Has("log_level")) { + auto str = res.Get<TString>("log_level"); + if (str == "TRACE") { + logLevel = NYql::NProto::TLoggingConfig::TRACE; + } + } + + loggerConfig.SetAllComponentsLevel(logLevel); + NYql::NLog::InitLogger(loggerConfig, false); + + NProto::TDqConfig::TYtCoordinator coordinatorConfig; + + bool useYtCoordination = false; + if (res.Has("proxy")) { + useYtCoordination = true; + coordinatorConfig.SetPrefix(res.Get<TString>("ytprefix")); + coordinatorConfig.SetClusterName(res.Get<TString>("proxy")); + } + + if (res.Has("yttoken")) { + coordinatorConfig.SetToken(res.Get<TString>("yttoken")); + } + + if (res.Has("ytuser")) { + coordinatorConfig.SetUser(res.Get<TString>("ytuser")); + } + + if (res.Has("enabled_failure_injector")) { + YQL_LOG(INFO) << "Enabled failure injector"; + TFailureInjector::Activate(); + } + + if (res.Has("revision")) { + coordinatorConfig.SetRevision(res.Get<TString>("revision")); + } + + if (useYtCoordination == false || res.Has("force_leader")) { + coordinatorConfig.SetLockType("dummy"); + } + + auto coordinator = CreateCoordiantionHelper(coordinatorConfig, NProto::TDqConfig::TScheduler(), "service_node", interconnectPort); + hostName = coordinator->GetHostname(); + localAddress = coordinator->GetIp(); + + Cerr << hostName + ":" + ToString(localAddress) << Endl; + + TMaybe<ui32> maybeNodeId; + if (useYtCoordination == false && !res.Has("id")) { + Cerr << "--id required!\n"; return -1; + } + if (res.Has("id")) { + maybeNodeId = res.Get<ui32>("id"); + } + auto nodeId = coordinator->GetNodeId( + maybeNodeId, + {ToString(grpcPort)}, + static_cast<ui32>(NDqs::ENodeIdLimits::MinServiceNodeId), + static_cast<ui32>(NDqs::ENodeIdLimits::MinServiceNodeId)+200, + {} + ); + + Cerr << "My nodeId: " << nodeId << Endl; + + TLocalProcessKeyState<NActors::TActorActivityTag>& key = TLocalProcessKeyState<NActors::TActorActivityTag>::GetInstance(); + Cerr << "ActorNames: " << key.GetCount() << Endl; + for (ui64 i = 0; i < key.GetCount(); i++) { + auto name = key.GetNameByIndex(i); + if (name && !name.StartsWith("Activity_")) { + Cerr << " " << name << Endl; + } + } + + NYql::NDqs::TServiceNodeConfig config; + config.NodeId = nodeId; + config.InterconnectAddress = localAddress; + config.GrpcHostname = hostName; + config.Port = interconnectPort; + config.GrpcPort = grpcPort; + config.MbusPort = mbusPort; + config.NameserverFactory = [](const TIntrusivePtr<NActors::TTableNameserverSetup>& setup) { + return NYql::NDqs::CreateDynamicNameserver(setup); + }; + + YQL_LOG(INFO) << "Interconnect addr/port " << config.InterconnectAddress << ":" << config.Port; + YQL_LOG(INFO) << "GRPC addr/port " << config.GrpcHostname << ":" << config.GrpcPort; + YQL_LOG(INFO) << "MBus port " << config.MbusPort; + + auto metricsRegistry = CreateMetricsRegistry(GetSensorsGroupFor(NSensorComponent::kDq)); + auto serviceNode = TServiceNode(config, THREAD_PER_NODE, metricsRegistry); + + auto statsCollector = CreateStatsCollector(5, *serviceNode.GetSetup(), metricsRegistry->GetSensors()); + + auto* actorSystem = serviceNode.StartActorSystem(GetAppData()); + + // push metrics from root group + auto metricsPusherId = NActors::TActorId() ;// actorSystem->Register(CreateMetricsPusher(CreateMetricsRegistry(GetSensorsRootGroup()),mbusPort)); + + if (res.Has("dump_stats")) { + metricsPusherId = actorSystem->Register(CreateMetricsPrinter(metricsRegistry->GetSensors())); + actorSystem->Register(statsCollector); + } + + if (!maybeNodeId) { + coordinator->StartRegistrator(actorSystem); + coordinator->StartCleaner(actorSystem, {}); + } else { + // just create yt wrapper + if (useYtCoordination) { + (void)coordinator->GetWrapper(actorSystem); + } + } + + auto funcRegistry = CreateFunctionRegistry(&NYql::NBacktrace::KikimrBackTrace, NKikimr::NMiniKQL::CreateBuiltinRegistry(), false, TVector<TString>()); + TDqTaskPreprocessorFactoryCollection dqTaskPreprocessorFactories = { + NDq::CreateYtDqTaskPreprocessorFactory(false, funcRegistry) + }; + serviceNode.StartService(dqTaskPreprocessorFactories); + + TVector<NActors::TActorId> ids; + TVector<TResourceManagerOptions> uploadResourcesOptions; + int jobs = res.GetOrElse<int>("remote_jobs", 0); + int jobsPerOperation = res.GetOrElse<int>("jobs_per_op", 2); + if (useYtCoordination) { + coordinatorConfig = coordinator->GetConfig(); + TResourceManagerOptions options; + options.YtBackend.SetClusterName(coordinatorConfig.GetClusterName()); + options.YtBackend.SetUser(coordinatorConfig.GetUser()); + options.YtBackend.SetToken(coordinatorConfig.GetToken()); + options.YtBackend.SetMemoryLimit(16000000000LL); + options.YtBackend.SetPrefix(coordinatorConfig.GetPrefix()); + options.YtBackend.SetUploadPrefix(coordinatorConfig.GetPrefix()); + options.YtBackend.SetMinNodeId(static_cast<ui32>(NDqs::ENodeIdLimits::MinWorkerNodeId)); + options.YtBackend.SetMaxNodeId(static_cast<ui32>(NDqs::ENodeIdLimits::MinWorkerNodeId)+100); + + + options.YtBackend.SetMaxJobs(jobs); + if (jobsPerOperation > 0) { + options.YtBackend.SetJobsPerOperation(jobsPerOperation); + } + + if (jobs > 0) { + TResourceFile vanilla(res.Get<TString>("vanilla_job")); + vanilla.RemoteFileName = "bin/" + ToString(GetProgramCommitId()) + "/" + vanilla.LocalFileName.substr(vanilla.LocalFileName.rfind('/')+1); + options.Files.push_back(vanilla); + for (const auto& r : GetFiles(res.GetOrElse("udfs", ""), res.Get<TString>("vanilla_job") + ".lite")) { + if (r.GetObjectType() == Yql::DqsProto::TFile::EEXE_FILE) { + TResourceFile f(r.GetLocalPath()); + f.RemoteFileName = "bin/" + r.GetObjectId() + "/" + r.GetName(); + options.Files.push_back(f); + } else { + TResourceFile f(r.GetLocalPath()); + f.RemoteFileName = "udfs/" + r.GetObjectId(); + options.Files.push_back(f); + } + } + + // uploader + options.UploadPrefix = options.YtBackend.GetUploadPrefix(); + options.LockName = TString("ytuploader.") + options.YtBackend.GetClusterName(); + // don't start uploader for local-yt + if (options.YtBackend.GetClusterName().find("localhost") != 0) { + ids.push_back(actorSystem->Register(CreateResourceUploader(options, coordinator))); + } + { + // bin cleaner + options.KeepFirst = 5; + options.UploadPrefix = options.YtBackend.GetUploadPrefix() + "/bin"; + ids.push_back(actorSystem->Register(CreateResourceCleaner(options, coordinator))); + } + { + // udf cleaner + options.KeepFirst = 500; + options.DropBefore = TDuration::Days(7); + options.UploadPrefix = options.YtBackend.GetUploadPrefix() + "/udfs"; + ids.push_back(actorSystem->Register(CreateResourceCleaner(options, coordinator))); + } + { + // temporary locks + options.KeepFilter = options.YtBackend.GetClusterName(); // don't remove locks with `ClusterName` in LockName + options.DropBefore = TDuration::Hours(1); + options.UploadPrefix = options.YtBackend.GetPrefix() + "/locks"; + ids.push_back(actorSystem->Register(CreateResourceCleaner(options, coordinator))); + } + + // rm manager + options.Files.clear(); + vanilla.RemoteFileName = vanilla.LocalFileName.substr(vanilla.LocalFileName.rfind('/')+1); + options.Files.push_back(vanilla); + options.UploadPrefix = options.YtBackend.GetUploadPrefix() + "/bin/" + ToString(GetProgramCommitId()); + options.LockName = TString("ytrm.") + options.YtBackend.GetClusterName(); + options.Counters = metricsRegistry->GetSensors()->GetSubgroup("counters", "ytrm"); + ids.push_back(actorSystem->Register(CreateResourceManager(options, coordinator))); + } + + options.UploadPrefix = options.YtBackend.GetUploadPrefix(); + options.Files.clear(); + uploadResourcesOptions.push_back(options); + } + + coordinator->StartGlobalWorker(actorSystem, uploadResourcesOptions, metricsRegistry); + + signal(SIGINT, &OnTerminate); + signal(SIGTERM, &OnTerminate); + + auto future = ShouldContinue.GetFuture(); + future.Wait(); + + for (auto id : ids) { + actorSystem->Send(id, new NActors::TEvents::TEvPoison); + } + actorSystem->Send(NDqs::MakeWorkerManagerActorID(nodeId), new NActors::TEvents::TEvPoison); + actorSystem->Send(metricsPusherId, new NActors::TEvents::TEvPoison); + + coordinator->Stop(actorSystem); + + // TODO: remove this + Sleep(TDuration::Seconds(5)); + + serviceNode.Stop(); + NYT::Shutdown(); + + return 0; +} diff --git a/ydb/library/yql/tools/dq/service_node/ya.make b/ydb/library/yql/tools/dq/service_node/ya.make new file mode 100644 index 0000000000..730bc565e4 --- /dev/null +++ b/ydb/library/yql/tools/dq/service_node/ya.make @@ -0,0 +1,33 @@ +IF (NOT OS_WINDOWS) + PROGRAM() + + PEERDIR( + library/cpp/getopt + yt/cpp/mapreduce/client + ydb/library/yql/sql/pg + ydb/library/yql/parser/pg_wrapper + ydb/library/yql/public/udf/service/exception_policy + ydb/library/yql/utils/failure_injector + ydb/library/yql/utils/log + ydb/library/yql/utils/log/proto + ydb/library/yql/providers/dq/provider + ydb/library/yql/providers/dq/worker_manager/interface + ydb/library/yql/minikql/invoke_builtins/llvm + ydb/library/yql/utils/backtrace + ydb/library/yql/providers/dq/service + ydb/library/yql/providers/dq/metrics + ydb/library/yql/providers/dq/stats_collector + ydb/library/yql/providers/yt/dq_task_preprocessor + ydb/library/yql/providers/dq/global_worker_manager + ydb/library/yql/providers/dq/actors/yt + yt/yt/client + ) + + YQL_LAST_ABI_VERSION() + + SRCS( + main.cpp + ) + + END() +ENDIF() diff --git a/ydb/library/yql/tools/dq/worker_node/CMakeLists.darwin-x86_64.txt b/ydb/library/yql/tools/dq/worker_node/CMakeLists.darwin-x86_64.txt new file mode 100644 index 0000000000..7a8db69ea2 --- /dev/null +++ b/ydb/library/yql/tools/dq/worker_node/CMakeLists.darwin-x86_64.txt @@ -0,0 +1,64 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_executable(worker_node) +target_compile_options(worker_node PRIVATE + -DUSE_CURRENT_UDF_ABI_VERSION +) +target_link_libraries(worker_node PUBLIC + contrib-libs-cxxsupp + yutil + library-cpp-cpuid_check + client-ydb_persqueue_public-codecs + library-cpp-getopt + cpp-mapreduce-client + dq-actors-compute + yql-dq-comp_nodes + dq-integration-transform + yql-dq-transform + minikql-comp_nodes-llvm + providers-clickhouse-actors + providers-common-comp_nodes + providers-dq-runtime + providers-dq-service + providers-dq-metrics + providers-dq-stats_collector + providers-dq-task_runner + providers-pq-async_io + providers-pq-proto + providers-s3-actors + providers-ydb-actors + providers-ydb-comp_nodes + udf-service-exception_policy + library-yql-utils + yql-utils-log + utils-log-proto + yql-utils-failure_injector + yql-utils-backtrace + yt-comp_nodes-dq + providers-yt-mkql_dq + dq-actors-yt + providers-dq-global_worker_manager + yql-sql-pg + yql-parser-pg_wrapper +) +target_link_options(worker_node PRIVATE + -Wl,-platform_version,macos,11.0,11.0 + -fPIC + -fPIC + -framework + CoreFoundation +) +target_sources(worker_node PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/library/yql/tools/dq/worker_node/main.cpp +) +target_allocator(worker_node + system_allocator +) +vcs_info(worker_node) diff --git a/ydb/library/yql/tools/dq/worker_node/CMakeLists.linux-aarch64.txt b/ydb/library/yql/tools/dq/worker_node/CMakeLists.linux-aarch64.txt new file mode 100644 index 0000000000..11e2b042f9 --- /dev/null +++ b/ydb/library/yql/tools/dq/worker_node/CMakeLists.linux-aarch64.txt @@ -0,0 +1,68 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_executable(worker_node) +target_compile_options(worker_node PRIVATE + -DUSE_CURRENT_UDF_ABI_VERSION +) +target_link_libraries(worker_node PUBLIC + contrib-libs-linux-headers + contrib-libs-cxxsupp + yutil + client-ydb_persqueue_public-codecs + library-cpp-getopt + cpp-mapreduce-client + dq-actors-compute + yql-dq-comp_nodes + dq-integration-transform + yql-dq-transform + minikql-comp_nodes-llvm + providers-clickhouse-actors + providers-common-comp_nodes + providers-dq-runtime + providers-dq-service + providers-dq-metrics + providers-dq-stats_collector + providers-dq-task_runner + providers-pq-async_io + providers-pq-proto + providers-s3-actors + providers-ydb-actors + providers-ydb-comp_nodes + udf-service-exception_policy + library-yql-utils + yql-utils-log + utils-log-proto + yql-utils-failure_injector + yql-utils-backtrace + yt-comp_nodes-dq + providers-yt-mkql_dq + dq-actors-yt + providers-dq-global_worker_manager + yql-sql-pg + yql-parser-pg_wrapper +) +target_link_options(worker_node PRIVATE + -ldl + -lrt + -Wl,--no-as-needed + -fPIC + -fPIC + -lpthread + -lrt + -ldl + -lutil +) +target_sources(worker_node PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/library/yql/tools/dq/worker_node/main.cpp +) +target_allocator(worker_node + cpp-malloc-jemalloc +) +vcs_info(worker_node) diff --git a/ydb/library/yql/tools/dq/worker_node/CMakeLists.linux-x86_64.txt b/ydb/library/yql/tools/dq/worker_node/CMakeLists.linux-x86_64.txt new file mode 100644 index 0000000000..17c842dbb4 --- /dev/null +++ b/ydb/library/yql/tools/dq/worker_node/CMakeLists.linux-x86_64.txt @@ -0,0 +1,70 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_executable(worker_node) +target_compile_options(worker_node PRIVATE + -DUSE_CURRENT_UDF_ABI_VERSION +) +target_link_libraries(worker_node PUBLIC + contrib-libs-linux-headers + contrib-libs-cxxsupp + yutil + library-cpp-cpuid_check + client-ydb_persqueue_public-codecs + library-cpp-getopt + cpp-mapreduce-client + dq-actors-compute + yql-dq-comp_nodes + dq-integration-transform + yql-dq-transform + minikql-comp_nodes-llvm + providers-clickhouse-actors + providers-common-comp_nodes + providers-dq-runtime + providers-dq-service + providers-dq-metrics + providers-dq-stats_collector + providers-dq-task_runner + providers-pq-async_io + providers-pq-proto + providers-s3-actors + providers-ydb-actors + providers-ydb-comp_nodes + udf-service-exception_policy + library-yql-utils + yql-utils-log + utils-log-proto + yql-utils-failure_injector + yql-utils-backtrace + yt-comp_nodes-dq + providers-yt-mkql_dq + dq-actors-yt + providers-dq-global_worker_manager + yql-sql-pg + yql-parser-pg_wrapper +) +target_link_options(worker_node PRIVATE + -ldl + -lrt + -Wl,--no-as-needed + -fPIC + -fPIC + -lpthread + -lrt + -ldl + -lutil +) +target_sources(worker_node PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/library/yql/tools/dq/worker_node/main.cpp +) +target_allocator(worker_node + cpp-malloc-tcmalloc + libs-tcmalloc-no_percpu_cache +) +vcs_info(worker_node) diff --git a/ydb/library/yql/tools/dq/worker_node/CMakeLists.txt b/ydb/library/yql/tools/dq/worker_node/CMakeLists.txt new file mode 100644 index 0000000000..606ff46b4b --- /dev/null +++ b/ydb/library/yql/tools/dq/worker_node/CMakeLists.txt @@ -0,0 +1,15 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + +if (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND NOT HAVE_CUDA) + include(CMakeLists.linux-aarch64.txt) +elseif (CMAKE_SYSTEM_NAME STREQUAL "Darwin" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64") + include(CMakeLists.darwin-x86_64.txt) +elseif (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND NOT HAVE_CUDA) + include(CMakeLists.linux-x86_64.txt) +endif() diff --git a/ydb/library/yql/tools/dq/worker_node/main.cpp b/ydb/library/yql/tools/dq/worker_node/main.cpp new file mode 100644 index 0000000000..7ba1ff412d --- /dev/null +++ b/ydb/library/yql/tools/dq/worker_node/main.cpp @@ -0,0 +1,423 @@ +#include <ydb/library/yql/providers/dq/worker_manager/local_worker_manager.h> +#include <ydb/library/yql/parser/pg_wrapper/interface/comp_factory.h> +#include <ydb/library/yql/providers/dq/global_worker_manager/service_node_resolver.h> +#include <ydb/library/yql/providers/dq/global_worker_manager/coordination_helper.h> +#include <ydb/library/yql/providers/dq/actors/yt/yt_wrapper.h> +#include <ydb/library/yql/providers/dq/actors/yt/worker_registrator.h> +#include <ydb/library/yql/providers/dq/actors/yt/nodeid_cleaner.h> +#include <ydb/library/yql/providers/dq/metrics/metrics_pusher.h> +#include <ydb/library/yql/providers/dq/stats_collector/pool_stats_collector.h> +#include <ydb/library/yql/providers/dq/actors/dynamic_nameserver.h> +#include <ydb/library/yql/providers/dq/actors/execution_helpers.h> +#include <ydb/library/yql/utils/bind_in_range.h> +#include <library/cpp/digest/md5/md5.h> + +#include <ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io_factory.h> + +#include <ydb/library/yql/providers/dq/service/interconnect_helpers.h> +#include <ydb/library/yql/providers/dq/runtime/file_cache.h> +#include <ydb/library/yql/providers/dq/runtime/task_command_executor.h> +#include <ydb/library/yql/providers/dq/task_runner/tasks_runner_pipe.h> +#include <ydb/library/yql/providers/dq/task_runner/tasks_runner_local.h> +#include <ydb/library/yql/dq/integration/transform/yql_dq_task_transform.h> +#include <ydb/library/yql/providers/pq/async_io/dq_pq_read_actor.h> +#include <ydb/library/yql/providers/pq/async_io/dq_pq_write_actor.h> +#include <ydb/library/yql/providers/ydb/actors/yql_ydb_source_factory.h> +#include <ydb/library/yql/providers/yt/comp_nodes/dq/dq_yt_factory.h> +#include <ydb/library/yql/providers/yt/mkql_dq/yql_yt_dq_transform.h> +#include <ydb/library/yql/providers/common/comp_nodes/yql_factory.h> +#include <ydb/library/yql/providers/ydb/comp_nodes/yql_ydb_factory.h> +#include <ydb/library/yql/providers/ydb/comp_nodes/yql_ydb_dq_transform.h> + +#include <ydb/library/yql/providers/s3/actors/yql_s3_sink_factory.h> +#include <ydb/library/yql/providers/s3/actors/yql_s3_source_factory.h> +#include <ydb/library/yql/providers/ydb/actors/yql_ydb_source_factory.h> +#include <ydb/library/yql/providers/clickhouse/actors/yql_ch_source_factory.h> + +#include <ydb/library/yql/minikql/comp_nodes/mkql_factories.h> +#include <ydb/library/yql/minikql/invoke_builtins/mkql_builtins.h> +#include <ydb/library/yql/minikql/mkql_stats_registry.h> +#include <ydb/library/yql/utils/backtrace/backtrace.h> +#include <ydb/library/yql/utils/log/log.h> +#include <ydb/library/yql/utils/log/proto/logger_config.pb.h> +#include <ydb/library/yql/utils/log/tls_backend.h> +#include <ydb/library/yql/utils/failure_injector/failure_injector.h> +#include <ydb/library/yql/dq/comp_nodes/yql_common_dq_factory.h> +#include <ydb/library/yql/dq/transform/yql_common_dq_transform.h> + +#include <yt/yt/core/misc/shutdown.h> + +#include <library/cpp/getopt/last_getopt.h> +#include <library/cpp/threading/future/future.h> +#include <library/cpp/svnversion/svnversion.h> + +#include <yt/yt/core/actions/invoker.h> +#include <yt/yt/core/concurrency/action_queue.h> +#include <yt/yt/core/concurrency/thread_pool.h> + +#include <util/generic/scope.h> +#include <util/folder/path.h> +#include <util/system/env.h> +#include <util/system/getpid.h> +#include <util/system/fs.h> + +constexpr ui32 THREAD_PER_NODE = 8; + +using namespace NYql; +using namespace NYql::NDq; +using namespace NYql::NDqs; + +using namespace NActors; + +static NThreading::TPromise<void> ShouldContinue = NThreading::NewPromise<void>(); + +namespace { +void OnTerminate(int) { + ShouldContinue.SetValue(); +} + +class TSerializedTaskRunnerInvoker: public ITaskRunnerInvoker { +public: + TSerializedTaskRunnerInvoker(const NYT::IInvokerPtr& invoker) + : Invoker(NYT::NConcurrency::CreateSerializedInvoker(invoker)) + { } + + void Invoke(const std::function<void(void)>& f) override { + Invoker->Invoke(BIND(f)); + } + +private: + const NYT::IInvokerPtr Invoker; +}; + +class TConcurrentInvokerFactory: public ITaskRunnerInvokerFactory { +public: + TConcurrentInvokerFactory(int capacity) + : ThreadPool(NYT::NConcurrency::CreateThreadPool(capacity, "WorkerActor")) + { } + + ITaskRunnerInvoker::TPtr Create() override { + return new TSerializedTaskRunnerInvoker(ThreadPool->GetInvoker()); + } + + NYT::NConcurrency::IThreadPoolPtr ThreadPool; +}; + +NDq::IDqAsyncIoFactory::TPtr CreateAsyncIoFactory(const NYdb::TDriver& driver, IHTTPGateway::TPtr httpGateway) { + auto factory = MakeIntrusive<NYql::NDq::TDqAsyncIoFactory>(); + RegisterDqPqReadActorFactory(*factory, driver, nullptr); + RegisterYdbReadActorFactory(*factory, driver, nullptr); + RegisterS3ReadActorFactory(*factory, nullptr, httpGateway); + RegisterClickHouseReadActorFactory(*factory, nullptr, httpGateway); + + RegisterDqPqWriteActorFactory(*factory, driver, nullptr); + RegisterS3WriteActorFactory(*factory, nullptr, httpGateway); + return factory; +} + +} + +int main(int argc, char** argv) { + + const auto driverConfig = NYdb::TDriverConfig().SetLog(CreateLogBackend("cerr")); + NYdb::TDriver driver(driverConfig); + + Y_DEFER { + driver.Stop(true); + }; + + NKikimr::NMiniKQL::IStatsRegistryPtr statsRegistry = NKikimr::NMiniKQL::CreateDefaultStatsRegistry(); + + auto dqCompFactory = NKikimr::NMiniKQL::GetCompositeWithBuiltinFactory({ + NYql::GetCommonDqFactory(), + NYql::GetDqYtFactory(statsRegistry.Get()), + NYql::GetDqYdbFactory(driver), + NKikimr::NMiniKQL::GetYqlFactory(), + NYql::GetPgFactory() + }); + + auto dqTaskTransformFactory = NYql::CreateCompositeTaskTransformFactory({ + NYql::CreateCommonDqTaskTransformFactory(), + NYql::CreateYtDqTaskTransformFactory(), + NYql::CreateYdbDqTaskTransformFactory() + }); + + auto patternCache = std::make_shared<NKikimr::NMiniKQL::TComputationPatternLRUCache>(200_MB); + + if (argc > 1 && !strcmp(argv[1], "tasks_runner_proxy")) { + NYql::NBacktrace::RegisterKikimrFatalActions(); + //NYql::NBacktrace::EnableKikimrSymbolize(); // symbolize in gateway + + return NTaskRunnerProxy::CreateTaskCommandExecutor(dqCompFactory, dqTaskTransformFactory, statsRegistry.Get(), true); + } + + using namespace NLastGetopt; + TOpts opts = TOpts::Default(); + opts.AddHelpOption(); + opts.AddLongOption('i', "id", "Node ID"); + opts.AddLongOption('p', "port", "Port"); + opts.AddLongOption('u', "udfs", "UdfsPath"); + opts.AddLongOption("config", "Config"); + opts.AddLongOption("service_addr", "Service Addr (host/port pair)"); + + opts.AddLongOption("ytprefix", "Yt prefix"); + opts.AddLongOption("proxy", "Yt proxy"); + opts.AddLongOption("workers", "Workers"); + opts.AddLongOption("threads", "Threads"); + opts.AddLongOption("yttoken", "Yt token"); + opts.AddLongOption("ytuser", "Yt user"); + opts.AddLongOption("enabled_failure_injector", "Enabled failure injections"); + opts.AddLongOption("revision", "Revision"); + opts.AddLongOption("heartbeat", "HeartbeatPeriod"); + opts.AddLongOption("solomon", "Solomon Token"); + opts.AddLongOption("print_metrics", "Print Metrics"); + opts.AddLongOption("announce_cluster_name", "Send this name in pings"); + opts.AddLongOption("disable_pipe", "Disable pipe"); + opts.AddLongOption("log_level", "Log Level"); + + ui32 threads = THREAD_PER_NODE; + TString host; + TString ip; + TString solomonToken; + int capacity = 1; + int heartbeatPeriodMs = 100; + + TOptsParseResult res(&opts, argc, argv); + + auto loggerConfig = NYql::NProto::TLoggingConfig(); + auto logLevel = NYql::NProto::TLoggingConfig::INFO; + if (res.Has("log_level")) { + auto str = res.Get<TString>("log_level"); + if (str == "TRACE") { + logLevel = NYql::NProto::TLoggingConfig::TRACE; + } + } + + loggerConfig.SetAllComponentsLevel(logLevel); + + NYql::NLog::InitLogger(loggerConfig, false); + + ui16 startPort = res.Get<ui16>("port"); + if (res.Has("heartbeat")) { + heartbeatPeriodMs = res.Get<int>("heartbeat"); + } + if (res.Has("threads")) { + threads = res.Get<int>("threads"); + } + + NProto::TDqConfig::TYtCoordinator coordinatorConfig; + bool useYtCoordination = false; + if (res.Has("proxy")) { + coordinatorConfig.SetPrefix(res.Get<TString>("ytprefix")); + coordinatorConfig.SetClusterName(res.Get<TString>("proxy")); + useYtCoordination = true; + } + coordinatorConfig.SetHeartbeatPeriodMs(heartbeatPeriodMs); + + if (!useYtCoordination) { + coordinatorConfig.SetLockType("dummy"); + } + + if (res.Has("yttoken")) { + coordinatorConfig.SetToken(res.Get<TString>("yttoken")); + } + + if (res.Has("workers")) { + capacity = res.Get<int>("workers"); + } + + if (res.Has("ytuser")) { + coordinatorConfig.SetUser(res.Get<TString>("ytuser")); + } + + if (res.Has("revision")) { + coordinatorConfig.SetRevision(res.Get<TString>("revision")); + YQL_LOG(INFO) << "Set revision '" << coordinatorConfig.GetRevision() << "'"; + } + + if (res.Has("solomon")) { + solomonToken = res.Get<TString>("solomon"); + } + + if (res.Has("enabled_failure_injector")) { + YQL_LOG(INFO) << "Enabled failure injector"; + TFailureInjector::Activate(); + } + + TRangeWalker<int> portWalker(startPort, startPort+100); + auto ports = BindInRange(portWalker); + + auto coordinator = CreateCoordiantionHelper(coordinatorConfig, NProto::TDqConfig::TScheduler(), "worker_node", ports[1].Addr.GetPort()); + coordinatorConfig = coordinator->GetConfig(); + + host = coordinator->GetHostname(); + ip = coordinator->GetIp(); + + NProto::TDqConfig::TYtBackend backendConfig; + backendConfig.SetUploadPrefix(coordinatorConfig.GetPrefix()); + backendConfig.SetUser(coordinatorConfig.GetUser()); + backendConfig.SetToken(coordinatorConfig.GetToken()); + backendConfig.SetClusterName(coordinatorConfig.GetClusterName()); + + TString fileCacheDir = "./file_cache123"; + IFileCache::TPtr fileCache = new TFileCache(fileCacheDir, 16000000000L); + + Cerr << host + ":" + ToString(ip) << Endl; + + TMaybe<ui32> maybeNodeId; + if (res.Has("id")) { + maybeNodeId = res.Get<ui32>("id"); + } + auto nodeId = coordinator->GetNodeId( + maybeNodeId, + {}, + static_cast<ui32>(NDqs::ENodeIdLimits::MinWorkerNodeId), + static_cast<ui32>(NDqs::ENodeIdLimits::MaxWorkerNodeId), + {} + ); + + Cerr << "My nodeId: " << nodeId << Endl; + + TString udfsDir = res.GetOrElse("udfs", ""); + + try { + auto dqSensors = GetSensorsGroupFor(NSensorComponent::kDq); + THolder<NActors::TActorSystemSetup> setup; + TIntrusivePtr<NActors::NLog::TSettings> logSettings; + std::tie(setup, logSettings) = BuildActorSetup( + nodeId, + ip, + ports[1].Addr.GetPort(), + ports[1].Socket->Release(), + {threads}, + dqSensors, + [](const TIntrusivePtr<NActors::TTableNameserverSetup>& setup) { + return NYql::NDqs::CreateDynamicNameserver(setup); + }, + Nothing()); + + auto statsCollector = CreateStatsCollector(5, *setup.Get(), dqSensors); + + TVector<TString> UDFsPaths; + if (!udfsDir.empty()) { + NKikimr::NMiniKQL::FindUdfsInDir(udfsDir, &UDFsPaths); + } + auto functionRegistry = NKikimr::NMiniKQL::CreateFunctionRegistry( + &NYql::NBacktrace::KikimrBackTrace, NKikimr::NMiniKQL::CreateBuiltinRegistry(), false, UDFsPaths)->Clone(); + + for (auto& m : functionRegistry->GetAllModuleNames()) { + auto path = *functionRegistry->FindUdfPath(m); + Cout << m << '\t' << path << Endl; + TString objectId = MD5::Calc(path); // Production env uses MD5::File as an Id. For testing purpose we use fast version. + if (!fileCache->Contains(objectId)) { + TString newPath = fileCacheDir + "/" + objectId; + NFs::Copy(path, newPath); + Cout << "Add " << newPath << " " << objectId << "\n"; + fileCache->AddFile(newPath, objectId); + } + } + + NKikimr::NMiniKQL::FillStaticModules(*functionRegistry); + + auto actorSystem = MakeHolder<NActors::TActorSystem>(setup, nullptr, logSettings); + + actorSystem->Start(); + actorSystem->Register(statsCollector); + + if (!maybeNodeId) { + coordinator->StartRegistrator(actorSystem.Get()); + } + + TVector<TString> hostPort; + if (res.Has("service_addr")) { + TString addresses = res.Get<TString>("service_addr"); + Split(addresses, ",", hostPort); + } + +/* + if (solomonToken) { + TSolomonAgentConfig config = TSolomonAgentConfig() + .WithServer("https://solomon.yandex.net") + .WithPath("/api/v2/push") + .WithPort(443) + .WithProject("yql") + .WithService("dq_vanilla") + .WithCluster("test") + .WithHost("NodeId-" + ToString(nodeId)) + .WithCalcDerivs(true) + .WithAuthorizaton("OAuth " + solomonToken) + .WithCommonLabels({{"ytcluster", "test_cluster"}}) + ; + actorSystem->Register(CreateMetricsPusher(dqSensors, config)); + } +*/ + + if (res.Has("print_metrics")) { + actorSystem->Register(CreateMetricsPrinter(dqSensors)); + } + + auto resolver = coordinator->CreateServiceNodeResolver(actorSystem.Get(), hostPort); + + backendConfig.SetWorkerCapacity(capacity); + TResourceManagerOptions rmOptions; + rmOptions.Capabilities = Yql::DqsProto::RegisterNodeRequest::ECAP_COMPUTE_ACTOR; + rmOptions.YtBackend = backendConfig; + rmOptions.FileCache = fileCache; + rmOptions.TmpDir = "./tmp"; + + if (res.Has("announce_cluster_name")) { + rmOptions.AnnounceClusterName = res.Get<TString>("announce_cluster_name"); + Cerr << "Announce as '" << backendConfig.GetClusterName() << "'\n"; + } + + actorSystem->Register(coordinator->CreateServiceNodePinger(resolver, rmOptions)); + + NYql::NTaskRunnerProxy::TPipeFactoryOptions pfOptions; + pfOptions.ExecPath = TFsPath(argv[0]).RealPath().GetPath(); + pfOptions.FileCache = fileCache; + if (res.Has("revision")) { + pfOptions.Revision = coordinatorConfig.GetRevision(); + } + + NYql::NDqs::TLocalWorkerManagerOptions lwmOptions; + bool disablePipe = res.Has("disable_pipe"); + NKikimr::NMiniKQL::IStatsRegistryPtr statsRegistry = NKikimr::NMiniKQL::CreateDefaultStatsRegistry(); + + lwmOptions.Factory = disablePipe + ? NTaskRunnerProxy::CreateFactory(functionRegistry.Get(), dqCompFactory, dqTaskTransformFactory, patternCache, true) + : NTaskRunnerProxy::CreatePipeFactory(pfOptions); + lwmOptions.AsyncIoFactory = CreateAsyncIoFactory(driver, IHTTPGateway::Make()); + lwmOptions.FunctionRegistry = functionRegistry.Get(); + lwmOptions.RuntimeData = coordinator->GetRuntimeData(); + lwmOptions.TaskRunnerInvokerFactory = disablePipe + ? TTaskRunnerInvokerFactory::TPtr(new NDqs::TTaskRunnerInvokerFactory()) + : TTaskRunnerInvokerFactory::TPtr(new TConcurrentInvokerFactory(2*capacity)); + lwmOptions.TaskRunnerActorFactory = disablePipe + ? NDq::NTaskRunnerActor::CreateLocalTaskRunnerActorFactory([=](const NDq::TDqTaskSettings& task, const NDq::TLogFunc& ) + { + return lwmOptions.Factory->Get(task); + }) + : NTaskRunnerActor::CreateTaskRunnerActorFactory(lwmOptions.Factory, lwmOptions.TaskRunnerInvokerFactory); + lwmOptions.ComputeActorOwnsCounters = true; + auto resman = NDqs::CreateLocalWorkerManager(lwmOptions); + + auto workerManagerActorId = actorSystem->Register(resman); + actorSystem->RegisterLocalService(MakeWorkerManagerActorID(nodeId), workerManagerActorId); + + auto endFuture = ShouldContinue.GetFuture(); + + signal(SIGINT, &OnTerminate); + signal(SIGTERM, &OnTerminate); + + + endFuture.Wait(); + + actorSystem->Stop(); + } catch (...) { + Cerr << CurrentExceptionMessage() << Endl; + return -1; + } + + NYT::Shutdown(); + return 0; +} diff --git a/ydb/library/yql/tools/dq/worker_node/ya.make b/ydb/library/yql/tools/dq/worker_node/ya.make new file mode 100644 index 0000000000..216572850a --- /dev/null +++ b/ydb/library/yql/tools/dq/worker_node/ya.make @@ -0,0 +1,46 @@ +IF (NOT OS_WINDOWS) + PROGRAM() + + PEERDIR( + ydb/public/sdk/cpp/client/ydb_persqueue_public/codecs + library/cpp/getopt + yt/cpp/mapreduce/client + ydb/library/yql/dq/actors/compute + ydb/library/yql/dq/comp_nodes + ydb/library/yql/dq/integration/transform + ydb/library/yql/dq/transform + ydb/library/yql/minikql/comp_nodes/llvm + ydb/library/yql/providers/clickhouse/actors + ydb/library/yql/providers/common/comp_nodes + ydb/library/yql/providers/dq/runtime + ydb/library/yql/providers/dq/service + ydb/library/yql/providers/dq/metrics + ydb/library/yql/providers/dq/stats_collector + ydb/library/yql/providers/dq/task_runner + ydb/library/yql/providers/pq/async_io + ydb/library/yql/providers/pq/proto + ydb/library/yql/providers/s3/actors + ydb/library/yql/providers/ydb/actors + ydb/library/yql/providers/ydb/comp_nodes + ydb/library/yql/public/udf/service/exception_policy + ydb/library/yql/utils + ydb/library/yql/utils/log + ydb/library/yql/utils/log/proto + ydb/library/yql/utils/failure_injector + ydb/library/yql/utils/backtrace + ydb/library/yql/providers/yt/comp_nodes/dq + ydb/library/yql/providers/yt/mkql_dq + ydb/library/yql/providers/dq/actors/yt + ydb/library/yql/providers/dq/global_worker_manager + ydb/library/yql/sql/pg + ydb/library/yql/parser/pg_wrapper + ) + + YQL_LAST_ABI_VERSION() + + SRCS( + main.cpp + ) + + END() +ENDIF() diff --git a/ydb/library/yql/tools/dq/ya.make b/ydb/library/yql/tools/dq/ya.make new file mode 100644 index 0000000000..68eba94d95 --- /dev/null +++ b/ydb/library/yql/tools/dq/ya.make @@ -0,0 +1,4 @@ +RECURSE( + service_node + worker_node +) diff --git a/ydb/library/yql/tools/ya.make b/ydb/library/yql/tools/ya.make index 3f5aa7a058..5ff8e2b273 100644 --- a/ydb/library/yql/tools/ya.make +++ b/ydb/library/yql/tools/ya.make @@ -1,6 +1,7 @@ RECURSE( astdiff dqrun + dq mrjob sql2yql sql_formatter |